Extract client side timestamp filtering logic
This sets us up to reuse it as part of the simulated event loop message
gateway. It is surprisingly isolated.
Change-Id: Ia72de5ab8fbb9b3495fec32cd2762e9fdfc7c1e7
diff --git a/aos/network/BUILD b/aos/network/BUILD
index c7e5a28..f9fe7ea 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -207,6 +207,24 @@
)
cc_library(
+ name = "message_bridge_client_status",
+ srcs = [
+ "message_bridge_client_status.cc",
+ ],
+ hdrs = [
+ "message_bridge_client_status.h",
+ ],
+ deps = [
+ ":message_bridge_client_fbs",
+ ":message_bridge_server_fbs",
+ ":timestamp_filter",
+ "//aos:flatbuffers",
+ "//aos/events:event_loop",
+ "//aos/time",
+ ],
+)
+
+cc_library(
name = "message_bridge_client_lib",
srcs = [
"message_bridge_client_lib.cc",
@@ -220,6 +238,7 @@
deps = [
":connect_fbs",
":message_bridge_client_fbs",
+ ":message_bridge_client_status",
":message_bridge_protocol",
":message_bridge_server_fbs",
":sctp_client",
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 3963485..b371181 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -86,46 +86,14 @@
return fbb.Release();
}
-FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
- const std::vector<std::string_view> &source_node_names) {
- flatbuffers::FlatBufferBuilder fbb;
- fbb.ForceDefaults(true);
-
- std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
- for (const std::string_view node_name : source_node_names) {
- flatbuffers::Offset<flatbuffers::String> node_name_offset =
- fbb.CreateString(node_name);
-
- Node::Builder node_builder(fbb);
- node_builder.add_name(node_name_offset);
- flatbuffers::Offset<Node> node_offset = node_builder.Finish();
-
- ClientConnection::Builder connection_builder(fbb);
- connection_builder.add_node(node_offset);
- connection_builder.add_state(State::DISCONNECTED);
- // TODO(austin): Track dropped packets.
- connection_builder.add_received_packets(0);
- connection_builder.add_monotonic_offset(0);
- connection_offsets.emplace_back(connection_builder.Finish());
- }
- flatbuffers::Offset<
- flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
- connections_offset = fbb.CreateVector(connection_offsets);
-
- ClientStatistics::Builder client_statistics_builder(fbb);
- client_statistics_builder.add_connections(connections_offset);
- fbb.Finish(client_statistics_builder.Finish());
-
- return fbb.Release();
-}
} // namespace
SctpClientConnection::SctpClientConnection(
aos::ShmEventLoop *const event_loop, std::string_view remote_name,
const Node *my_node, std::string_view local_host,
- std::vector<std::unique_ptr<aos::RawSender>> *channels,
- ClientConnection *connection)
+ std::vector<std::unique_ptr<aos::RawSender>> *channels, int client_index,
+ MessageBridgeClientStatus *client_status)
: event_loop_(event_loop),
connect_message_(MakeConnectMessage(event_loop->configuration(), my_node,
remote_name)),
@@ -141,7 +109,9 @@
StreamToChannel(event_loop->configuration(), my_node, remote_node_)),
stream_reply_with_timestamp_(StreamReplyWithTimestamp(
event_loop->configuration(), my_node, remote_node_)),
- connection_(connection) {
+ client_status_(client_status),
+ client_index_(client_index),
+ connection_(client_status_->GetClientConnection(client_index_)) {
VLOG(1) << "Connect request for " << remote_node_->name()->string_view()
<< ": " << FlatbufferToJson(connect_message_);
@@ -230,7 +200,7 @@
remote_assoc_id_ = assoc_id;
connection_->mutate_state(State::CONNECTED);
- filter_.Reset();
+ client_status_->SampleReset(client_index_);
}
void SctpClientConnection::NodeDisconnected() {
@@ -240,7 +210,7 @@
remote_assoc_id_ = 0;
connection_->mutate_state(State::DISCONNECTED);
connection_->mutate_monotonic_offset(0);
- filter_.Reset();
+ client_status_->SampleReset(client_index_);
}
void SctpClientConnection::HandleData(const Message *message) {
@@ -264,25 +234,11 @@
chrono::nanoseconds(message_header->realtime_sent_time())),
message_header->queue_index());
- const std::chrono::nanoseconds offset =
- sender->monotonic_sent_time() -
+ client_status_->SampleFilter(
+ client_index_,
aos::monotonic_clock::time_point(
- chrono::nanoseconds(message_header->monotonic_sent_time()));
-
- // If this is our first observation, use that to seed the base offset. That
- // gets us in the ballpark.
- if (!filter_.has_sample()) {
- filter_.set_base_offset(offset);
- }
-
- // We can now measure the latency!
- filter_.Sample(sender->monotonic_sent_time(), offset);
-
- connection_->mutate_monotonic_offset(
- (chrono::duration_cast<chrono::nanoseconds>(
- chrono::duration<double>(filter_.offset())) +
- filter_.base_offset())
- .count());
+ chrono::nanoseconds(message_header->monotonic_sent_time())),
+ sender->monotonic_sent_time());
if (stream_reply_with_timestamp_[stream]) {
// TODO(austin): Send back less if we are only acking. Maybe only a
@@ -330,13 +286,7 @@
}
MessageBridgeClient::MessageBridgeClient(aos::ShmEventLoop *event_loop)
- : event_loop_(event_loop),
- sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
- source_node_names_(configuration::SourceNodeNames(
- event_loop->configuration(), event_loop->node())),
- statistics_(MakeClientStatistics(source_node_names_)) {
- client_connection_offsets_.reserve(
- statistics_.message().connections()->size());
+ : event_loop_(event_loop), client_status_(event_loop_) {
std::string_view node_name = event_loop->node()->name()->string_view();
// Find all the channels which are supposed to be delivered to us.
@@ -361,67 +311,13 @@
}
// Now, for each source node, build a connection.
- int node_index = 0;
- for (const std::string_view source_node : source_node_names_) {
+ for (const std::string_view source_node : configuration::SourceNodeNames(
+ event_loop->configuration(), event_loop->node())) {
// Open an unspecified connection (:: in ipv6 terminology)
connections_.emplace_back(new SctpClientConnection(
event_loop, source_node, event_loop->node(), "::", &channels_,
- statistics_.mutable_message()->mutable_connections()->GetMutableObject(
- node_index)));
- ++node_index;
+ client_status_.FindClientIndex(source_node), &client_status_));
}
-
- // And kick it all off.
- statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
- statistics_timer_->set_name("statistics");
- event_loop_->OnRun([this]() {
- statistics_timer_->Setup(
- event_loop_->monotonic_now() + chrono::milliseconds(100),
- chrono::milliseconds(100));
- });
-}
-
-void MessageBridgeClient::SendStatistics() {
- // Copy from statistics_ and drop monotonic_offset if it isn't populated yet.
- // There doesn't exist a good way to drop fields otherwise.
- aos::Sender<ClientStatistics>::Builder builder = sender_.MakeBuilder();
- client_connection_offsets_.clear();
-
- for (const ClientConnection *connection :
- *statistics_.message().connections()) {
- flatbuffers::Offset<flatbuffers::String> node_name_offset =
- builder.fbb()->CreateString(connection->node()->name()->string_view());
- Node::Builder node_builder = builder.MakeBuilder<Node>();
- node_builder.add_name(node_name_offset);
- flatbuffers::Offset<Node> node_offset = node_builder.Finish();
-
- ClientConnection::Builder client_connection_builder =
- builder.MakeBuilder<ClientConnection>();
-
- client_connection_builder.add_node(node_offset);
- client_connection_builder.add_state(connection->state());
- client_connection_builder.add_received_packets(
- connection->received_packets());
-
- // Strip out the monotonic offset if it isn't populated.
- if (connection->monotonic_offset() != 0) {
- client_connection_builder.add_monotonic_offset(
- connection->monotonic_offset());
- }
-
- client_connection_offsets_.emplace_back(client_connection_builder.Finish());
- }
-
- flatbuffers::Offset<
- flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
- client_connections_offset =
- builder.fbb()->CreateVector(client_connection_offsets_);
-
- ClientStatistics::Builder client_statistics_builder =
- builder.MakeBuilder<ClientStatistics>();
- client_statistics_builder.add_connections(client_connections_offset);
-
- builder.Send(client_statistics_builder.Finish());
}
} // namespace message_bridge
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
index 2811553..8134cbd 100644
--- a/aos/network/message_bridge_client_lib.h
+++ b/aos/network/message_bridge_client_lib.h
@@ -7,8 +7,8 @@
#include "aos/events/logging/logger_generated.h"
#include "aos/events/shm_event_loop.h"
#include "aos/network/connect_generated.h"
-#include "aos/network/timestamp_filter.h"
#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_client_status.h"
#include "aos/network/sctp_client.h"
#include "aos/network/sctp_lib.h"
@@ -25,7 +25,8 @@
std::string_view remote_name, const Node *my_node,
std::string_view local_host,
std::vector<std::unique_ptr<aos::RawSender>> *channels,
- ClientConnection *connection);
+ int client_index,
+ MessageBridgeClientStatus *client_status);
~SctpClientConnection() { event_loop_->epoll()->DeleteFd(client_.fd()); }
@@ -70,16 +71,15 @@
// Timer which fires to handle reconnections.
aos::TimerHandler *connect_timer_;
- // ClientConnection statistics message to modify. This will be published
- // periodicially.
- ClientConnection *connection_;
-
// id of the server once known. This is only valid if connection_ says
// connected.
sctp_assoc_t remote_assoc_id_ = 0;
- // Filter for the timestamp offset for this connection.
- TimestampFilter filter_;
+ // ClientConnection statistics message to modify. This will be published
+ // periodicially.
+ MessageBridgeClientStatus *client_status_;
+ int client_index_;
+ ClientConnection *connection_;
};
// This encapsulates the state required to talk to *all* the servers from this
@@ -91,24 +91,10 @@
~MessageBridgeClient() {}
private:
- // Sends out the statistics that are continually updated by the
- // SctpClientConnections.
- void SendStatistics();
-
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
- // Sender to publish statistics on.
- aos::Sender<ClientStatistics> sender_;
- aos::TimerHandler *statistics_timer_;
- // Nodes to receive data from.
- const std::vector<std::string_view> source_node_names_;
-
- // Data to publish.
- FlatbufferDetachedBuffer<ClientStatistics> statistics_;
- // Reserved memory for the client connection offsets to reduce heap
- // allocations.
- std::vector<flatbuffers::Offset<ClientConnection>> client_connection_offsets_;
+ MessageBridgeClientStatus client_status_;
// Channels to send data over.
std::vector<std::unique_ptr<aos::RawSender>> channels_;
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
new file mode 100644
index 0000000..72b8ee1
--- /dev/null
+++ b/aos/network/message_bridge_client_status.cc
@@ -0,0 +1,154 @@
+#include "aos/network/message_bridge_client_status.h"
+
+#include <chrono>
+#include <string_view>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/network/message_bridge_client_generated.h"
+
+namespace aos {
+namespace message_bridge {
+namespace {
+namespace chrono = std::chrono;
+
+FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
+ const std::vector<std::string_view> &source_node_names) {
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+
+ std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
+ for (const std::string_view node_name : source_node_names) {
+ flatbuffers::Offset<flatbuffers::String> node_name_offset =
+ fbb.CreateString(node_name);
+
+ Node::Builder node_builder(fbb);
+ node_builder.add_name(node_name_offset);
+ flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+ ClientConnection::Builder connection_builder(fbb);
+ connection_builder.add_node(node_offset);
+ connection_builder.add_state(State::DISCONNECTED);
+ // TODO(austin): Track dropped packets.
+ connection_builder.add_received_packets(0);
+ connection_builder.add_monotonic_offset(0);
+ connection_offsets.emplace_back(connection_builder.Finish());
+ }
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+ connections_offset = fbb.CreateVector(connection_offsets);
+
+ ClientStatistics::Builder client_statistics_builder(fbb);
+ client_statistics_builder.add_connections(connections_offset);
+ fbb.Finish(client_statistics_builder.Finish());
+
+ return fbb.Release();
+}
+
+} // namespace
+
+MessageBridgeClientStatus::MessageBridgeClientStatus(aos::EventLoop *event_loop)
+ : event_loop_(event_loop),
+ sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
+ source_node_names_(configuration::SourceNodeNames(
+ event_loop->configuration(), event_loop->node())),
+ statistics_(MakeClientStatistics(source_node_names_)) {
+ client_connection_offsets_.reserve(
+ statistics_.message().connections()->size());
+ filters_.resize(statistics_.message().connections()->size());
+
+ statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+ statistics_timer_->set_name("statistics");
+ event_loop_->OnRun([this]() {
+ statistics_timer_->Setup(
+ event_loop_->monotonic_now() + chrono::milliseconds(100),
+ chrono::milliseconds(100));
+ });
+}
+
+void MessageBridgeClientStatus::SendStatistics() {
+ // Copy from statistics_ and drop monotonic_offset if it isn't populated yet.
+ // There doesn't exist a good way to drop fields otherwise.
+ aos::Sender<ClientStatistics>::Builder builder = sender_.MakeBuilder();
+ client_connection_offsets_.clear();
+
+ for (const ClientConnection *connection :
+ *statistics_.message().connections()) {
+ flatbuffers::Offset<flatbuffers::String> node_name_offset =
+ builder.fbb()->CreateString(connection->node()->name()->string_view());
+ Node::Builder node_builder = builder.MakeBuilder<Node>();
+ node_builder.add_name(node_name_offset);
+ flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+ ClientConnection::Builder client_connection_builder =
+ builder.MakeBuilder<ClientConnection>();
+
+ client_connection_builder.add_node(node_offset);
+ client_connection_builder.add_state(connection->state());
+ client_connection_builder.add_received_packets(
+ connection->received_packets());
+
+ // Strip out the monotonic offset if it isn't populated.
+ TimestampFilter *filter = &filters_[client_connection_offsets_.size()];
+ if (filter->has_sample()) {
+ client_connection_builder.add_monotonic_offset(
+ (chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(filter->offset())) +
+ filter->base_offset())
+ .count());
+ }
+
+ client_connection_offsets_.emplace_back(client_connection_builder.Finish());
+ }
+
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+ client_connections_offset =
+ builder.fbb()->CreateVector(client_connection_offsets_);
+
+ ClientStatistics::Builder client_statistics_builder =
+ builder.MakeBuilder<ClientStatistics>();
+ client_statistics_builder.add_connections(client_connections_offset);
+
+ builder.Send(client_statistics_builder.Finish());
+}
+
+int MessageBridgeClientStatus::FindClientIndex(std::string_view node_name) {
+ for (size_t i = 0; i < statistics_.message().connections()->size(); ++i) {
+ const ClientConnection *client_connection =
+ statistics_.message().connections()->Get(i);
+ if (client_connection->node()->name()->string_view() == node_name) {
+ return i;
+ }
+ }
+
+ LOG(FATAL) << "Unknown client " << node_name;
+}
+
+ClientConnection *MessageBridgeClientStatus::GetClientConnection(
+ int client_index) {
+ return statistics_.mutable_message()->mutable_connections()->GetMutableObject(
+ client_index);
+}
+
+void MessageBridgeClientStatus::SampleFilter(
+ int client_index,
+ const aos::monotonic_clock::time_point monotonic_sent_time,
+ const aos::monotonic_clock::time_point monotonic_delivered_time) {
+ TimestampFilter *filter = &filters_[client_index];
+
+ const std::chrono::nanoseconds offset =
+ monotonic_delivered_time - monotonic_sent_time;
+
+ // If this is our first observation, use that to seed the base offset. That
+ // gets us in the ballpark.
+ if (!filter->has_sample()) {
+ filter->set_base_offset(offset);
+ }
+
+ // We can now measure the latency!
+ filter->Sample(monotonic_delivered_time, offset);
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/message_bridge_client_status.h b/aos/network/message_bridge_client_status.h
new file mode 100644
index 0000000..0653c9d
--- /dev/null
+++ b/aos/network/message_bridge_client_status.h
@@ -0,0 +1,70 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_STATUS_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_STATUS_H_
+
+#include <string_view>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/timestamp_filter.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class is responsible for publishing the (filtered) client side
+// statistics periodically.
+class MessageBridgeClientStatus {
+ public:
+ MessageBridgeClientStatus(aos::EventLoop *event_loop);
+
+ MessageBridgeClientStatus(const MessageBridgeClientStatus &) = delete;
+ MessageBridgeClientStatus(MessageBridgeClientStatus &&) = delete;
+ MessageBridgeClientStatus &operator=(const MessageBridgeClientStatus &) =
+ delete;
+ MessageBridgeClientStatus &operator=(MessageBridgeClientStatus &&) = delete;
+
+ // Returns the connection datastructure for the provided node.
+ int FindClientIndex(std::string_view node_name);
+ ClientConnection *GetClientConnection(int client_index);
+
+ // Returns the ClientStatistics message this holds.
+ ClientStatistics *mutable_client_statistics() {
+ return statistics_.mutable_message();
+ }
+
+ // Adds a sample for the provided client index given the sent time (on the
+ // remote node) and the delivered time (on this node).
+ void SampleFilter(
+ int client_index,
+ const aos::monotonic_clock::time_point monotonic_sent_time,
+ const aos::monotonic_clock::time_point monotonic_delivered_time);
+
+ // Clears out the filter state.
+ void SampleReset(int client_index) { filters_[client_index].Reset(); }
+
+ private:
+ // Sends out the statistics that are continually updated by the
+ // SctpClientConnections.
+ void SendStatistics();
+
+ aos::EventLoop *event_loop_;
+ aos::TimerHandler *statistics_timer_;
+
+ // Sender to publish statistics on.
+ aos::Sender<ClientStatistics> sender_;
+
+ // Nodes to receive data from.
+ const std::vector<std::string_view> source_node_names_;
+ // Data to publish.
+ FlatbufferDetachedBuffer<ClientStatistics> statistics_;
+ // Reserved memory for the client connection offsets to reduce heap
+ // allocations.
+ std::vector<flatbuffers::Offset<ClientConnection>> client_connection_offsets_;
+
+ std::vector<TimestampFilter> filters_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_STATUS_H_