Extract client side timestamp filtering logic

This sets us up to reuse it as part of the simulated event loop message
gateway.  It is surprisingly isolated.

Change-Id: Ia72de5ab8fbb9b3495fec32cd2762e9fdfc7c1e7
diff --git a/aos/network/BUILD b/aos/network/BUILD
index c7e5a28..f9fe7ea 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -207,6 +207,24 @@
 )
 
 cc_library(
+    name = "message_bridge_client_status",
+    srcs = [
+        "message_bridge_client_status.cc",
+    ],
+    hdrs = [
+        "message_bridge_client_status.h",
+    ],
+    deps = [
+        ":message_bridge_client_fbs",
+        ":message_bridge_server_fbs",
+        ":timestamp_filter",
+        "//aos:flatbuffers",
+        "//aos/events:event_loop",
+        "//aos/time",
+    ],
+)
+
+cc_library(
     name = "message_bridge_client_lib",
     srcs = [
         "message_bridge_client_lib.cc",
@@ -220,6 +238,7 @@
     deps = [
         ":connect_fbs",
         ":message_bridge_client_fbs",
+        ":message_bridge_client_status",
         ":message_bridge_protocol",
         ":message_bridge_server_fbs",
         ":sctp_client",
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 3963485..b371181 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -86,46 +86,14 @@
   return fbb.Release();
 }
 
-FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
-    const std::vector<std::string_view> &source_node_names) {
-  flatbuffers::FlatBufferBuilder fbb;
-  fbb.ForceDefaults(true);
-
-  std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
-  for (const std::string_view node_name : source_node_names) {
-    flatbuffers::Offset<flatbuffers::String> node_name_offset =
-        fbb.CreateString(node_name);
-
-    Node::Builder node_builder(fbb);
-    node_builder.add_name(node_name_offset);
-    flatbuffers::Offset<Node> node_offset = node_builder.Finish();
-
-    ClientConnection::Builder connection_builder(fbb);
-    connection_builder.add_node(node_offset);
-    connection_builder.add_state(State::DISCONNECTED);
-    // TODO(austin): Track dropped packets.
-    connection_builder.add_received_packets(0);
-    connection_builder.add_monotonic_offset(0);
-    connection_offsets.emplace_back(connection_builder.Finish());
-  }
-  flatbuffers::Offset<
-      flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
-      connections_offset = fbb.CreateVector(connection_offsets);
-
-  ClientStatistics::Builder client_statistics_builder(fbb);
-  client_statistics_builder.add_connections(connections_offset);
-  fbb.Finish(client_statistics_builder.Finish());
-
-  return fbb.Release();
-}
 
 }  // namespace
 
 SctpClientConnection::SctpClientConnection(
     aos::ShmEventLoop *const event_loop, std::string_view remote_name,
     const Node *my_node, std::string_view local_host,
-    std::vector<std::unique_ptr<aos::RawSender>> *channels,
-    ClientConnection *connection)
+    std::vector<std::unique_ptr<aos::RawSender>> *channels, int client_index,
+    MessageBridgeClientStatus *client_status)
     : event_loop_(event_loop),
       connect_message_(MakeConnectMessage(event_loop->configuration(), my_node,
                                           remote_name)),
@@ -141,7 +109,9 @@
           StreamToChannel(event_loop->configuration(), my_node, remote_node_)),
       stream_reply_with_timestamp_(StreamReplyWithTimestamp(
           event_loop->configuration(), my_node, remote_node_)),
-      connection_(connection) {
+      client_status_(client_status),
+      client_index_(client_index),
+      connection_(client_status_->GetClientConnection(client_index_)) {
   VLOG(1) << "Connect request for " << remote_node_->name()->string_view()
           << ": " << FlatbufferToJson(connect_message_);
 
@@ -230,7 +200,7 @@
 
   remote_assoc_id_ = assoc_id;
   connection_->mutate_state(State::CONNECTED);
-  filter_.Reset();
+  client_status_->SampleReset(client_index_);
 }
 
 void SctpClientConnection::NodeDisconnected() {
@@ -240,7 +210,7 @@
   remote_assoc_id_ = 0;
   connection_->mutate_state(State::DISCONNECTED);
   connection_->mutate_monotonic_offset(0);
-  filter_.Reset();
+  client_status_->SampleReset(client_index_);
 }
 
 void SctpClientConnection::HandleData(const Message *message) {
@@ -264,25 +234,11 @@
                    chrono::nanoseconds(message_header->realtime_sent_time())),
                message_header->queue_index());
 
-  const std::chrono::nanoseconds offset =
-      sender->monotonic_sent_time() -
+  client_status_->SampleFilter(
+      client_index_,
       aos::monotonic_clock::time_point(
-          chrono::nanoseconds(message_header->monotonic_sent_time()));
-
-  // If this is our first observation, use that to seed the base offset.  That
-  // gets us in the ballpark.
-  if (!filter_.has_sample()) {
-    filter_.set_base_offset(offset);
-  }
-
-  // We can now measure the latency!
-  filter_.Sample(sender->monotonic_sent_time(), offset);
-
-  connection_->mutate_monotonic_offset(
-      (chrono::duration_cast<chrono::nanoseconds>(
-           chrono::duration<double>(filter_.offset())) +
-       filter_.base_offset())
-          .count());
+          chrono::nanoseconds(message_header->monotonic_sent_time())),
+      sender->monotonic_sent_time());
 
   if (stream_reply_with_timestamp_[stream]) {
     // TODO(austin): Send back less if we are only acking.  Maybe only a
@@ -330,13 +286,7 @@
 }
 
 MessageBridgeClient::MessageBridgeClient(aos::ShmEventLoop *event_loop)
-    : event_loop_(event_loop),
-      sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
-      source_node_names_(configuration::SourceNodeNames(
-          event_loop->configuration(), event_loop->node())),
-      statistics_(MakeClientStatistics(source_node_names_)) {
-  client_connection_offsets_.reserve(
-      statistics_.message().connections()->size());
+    : event_loop_(event_loop), client_status_(event_loop_) {
   std::string_view node_name = event_loop->node()->name()->string_view();
 
   // Find all the channels which are supposed to be delivered to us.
@@ -361,67 +311,13 @@
   }
 
   // Now, for each source node, build a connection.
-  int node_index = 0;
-  for (const std::string_view source_node : source_node_names_) {
+  for (const std::string_view source_node : configuration::SourceNodeNames(
+           event_loop->configuration(), event_loop->node())) {
     // Open an unspecified connection (:: in ipv6 terminology)
     connections_.emplace_back(new SctpClientConnection(
         event_loop, source_node, event_loop->node(), "::", &channels_,
-        statistics_.mutable_message()->mutable_connections()->GetMutableObject(
-            node_index)));
-    ++node_index;
+        client_status_.FindClientIndex(source_node), &client_status_));
   }
-
-  // And kick it all off.
-  statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
-  statistics_timer_->set_name("statistics");
-  event_loop_->OnRun([this]() {
-    statistics_timer_->Setup(
-        event_loop_->monotonic_now() + chrono::milliseconds(100),
-        chrono::milliseconds(100));
-  });
-}
-
-void MessageBridgeClient::SendStatistics() {
-  // Copy from statistics_ and drop monotonic_offset if it isn't populated yet.
-  // There doesn't exist a good way to drop fields otherwise.
-  aos::Sender<ClientStatistics>::Builder builder = sender_.MakeBuilder();
-  client_connection_offsets_.clear();
-
-  for (const ClientConnection *connection :
-       *statistics_.message().connections()) {
-    flatbuffers::Offset<flatbuffers::String> node_name_offset =
-        builder.fbb()->CreateString(connection->node()->name()->string_view());
-    Node::Builder node_builder = builder.MakeBuilder<Node>();
-    node_builder.add_name(node_name_offset);
-    flatbuffers::Offset<Node> node_offset = node_builder.Finish();
-
-    ClientConnection::Builder client_connection_builder =
-        builder.MakeBuilder<ClientConnection>();
-
-    client_connection_builder.add_node(node_offset);
-    client_connection_builder.add_state(connection->state());
-    client_connection_builder.add_received_packets(
-        connection->received_packets());
-
-    // Strip out the monotonic offset if it isn't populated.
-    if (connection->monotonic_offset() != 0) {
-      client_connection_builder.add_monotonic_offset(
-          connection->monotonic_offset());
-    }
-
-    client_connection_offsets_.emplace_back(client_connection_builder.Finish());
-  }
-
-  flatbuffers::Offset<
-      flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
-      client_connections_offset =
-          builder.fbb()->CreateVector(client_connection_offsets_);
-
-  ClientStatistics::Builder client_statistics_builder =
-      builder.MakeBuilder<ClientStatistics>();
-  client_statistics_builder.add_connections(client_connections_offset);
-
-  builder.Send(client_statistics_builder.Finish());
 }
 
 }  // namespace message_bridge
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
index 2811553..8134cbd 100644
--- a/aos/network/message_bridge_client_lib.h
+++ b/aos/network/message_bridge_client_lib.h
@@ -7,8 +7,8 @@
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/network/connect_generated.h"
-#include "aos/network/timestamp_filter.h"
 #include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_client_status.h"
 #include "aos/network/sctp_client.h"
 #include "aos/network/sctp_lib.h"
 
@@ -25,7 +25,8 @@
                        std::string_view remote_name, const Node *my_node,
                        std::string_view local_host,
                        std::vector<std::unique_ptr<aos::RawSender>> *channels,
-                       ClientConnection *connection);
+                       int client_index,
+                       MessageBridgeClientStatus *client_status);
 
   ~SctpClientConnection() { event_loop_->epoll()->DeleteFd(client_.fd()); }
 
@@ -70,16 +71,15 @@
   // Timer which fires to handle reconnections.
   aos::TimerHandler *connect_timer_;
 
-  // ClientConnection statistics message to modify.  This will be published
-  // periodicially.
-  ClientConnection *connection_;
-
   // id of the server once known.  This is only valid if connection_ says
   // connected.
   sctp_assoc_t remote_assoc_id_ = 0;
 
-  // Filter for the timestamp offset for this connection.
-  TimestampFilter filter_;
+  // ClientConnection statistics message to modify.  This will be published
+  // periodicially.
+  MessageBridgeClientStatus *client_status_;
+  int client_index_;
+  ClientConnection *connection_;
 };
 
 // This encapsulates the state required to talk to *all* the servers from this
@@ -91,24 +91,10 @@
   ~MessageBridgeClient() {}
 
  private:
-  // Sends out the statistics that are continually updated by the
-  // SctpClientConnections.
-  void SendStatistics();
-
   // Event loop to schedule everything on.
   aos::ShmEventLoop *event_loop_;
-  // Sender to publish statistics on.
-  aos::Sender<ClientStatistics> sender_;
-  aos::TimerHandler *statistics_timer_;
 
-  // Nodes to receive data from.
-  const std::vector<std::string_view> source_node_names_;
-
-  // Data to publish.
-  FlatbufferDetachedBuffer<ClientStatistics> statistics_;
-  // Reserved memory for the client connection offsets to reduce heap
-  // allocations.
-  std::vector<flatbuffers::Offset<ClientConnection>> client_connection_offsets_;
+  MessageBridgeClientStatus client_status_;
 
   // Channels to send data over.
   std::vector<std::unique_ptr<aos::RawSender>> channels_;
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
new file mode 100644
index 0000000..72b8ee1
--- /dev/null
+++ b/aos/network/message_bridge_client_status.cc
@@ -0,0 +1,154 @@
+#include "aos/network/message_bridge_client_status.h"
+
+#include <chrono>
+#include <string_view>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/network/message_bridge_client_generated.h"
+
+namespace aos {
+namespace message_bridge {
+namespace {
+namespace chrono = std::chrono;
+
+FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
+    const std::vector<std::string_view> &source_node_names) {
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+
+  std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
+  for (const std::string_view node_name : source_node_names) {
+    flatbuffers::Offset<flatbuffers::String> node_name_offset =
+        fbb.CreateString(node_name);
+
+    Node::Builder node_builder(fbb);
+    node_builder.add_name(node_name_offset);
+    flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+    ClientConnection::Builder connection_builder(fbb);
+    connection_builder.add_node(node_offset);
+    connection_builder.add_state(State::DISCONNECTED);
+    // TODO(austin): Track dropped packets.
+    connection_builder.add_received_packets(0);
+    connection_builder.add_monotonic_offset(0);
+    connection_offsets.emplace_back(connection_builder.Finish());
+  }
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+      connections_offset = fbb.CreateVector(connection_offsets);
+
+  ClientStatistics::Builder client_statistics_builder(fbb);
+  client_statistics_builder.add_connections(connections_offset);
+  fbb.Finish(client_statistics_builder.Finish());
+
+  return fbb.Release();
+}
+
+}  // namespace
+
+MessageBridgeClientStatus::MessageBridgeClientStatus(aos::EventLoop *event_loop)
+    : event_loop_(event_loop),
+      sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
+      source_node_names_(configuration::SourceNodeNames(
+          event_loop->configuration(), event_loop->node())),
+      statistics_(MakeClientStatistics(source_node_names_)) {
+  client_connection_offsets_.reserve(
+      statistics_.message().connections()->size());
+  filters_.resize(statistics_.message().connections()->size());
+
+  statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+  statistics_timer_->set_name("statistics");
+  event_loop_->OnRun([this]() {
+    statistics_timer_->Setup(
+        event_loop_->monotonic_now() + chrono::milliseconds(100),
+        chrono::milliseconds(100));
+  });
+}
+
+void MessageBridgeClientStatus::SendStatistics() {
+  // Copy from statistics_ and drop monotonic_offset if it isn't populated yet.
+  // There doesn't exist a good way to drop fields otherwise.
+  aos::Sender<ClientStatistics>::Builder builder = sender_.MakeBuilder();
+  client_connection_offsets_.clear();
+
+  for (const ClientConnection *connection :
+       *statistics_.message().connections()) {
+    flatbuffers::Offset<flatbuffers::String> node_name_offset =
+        builder.fbb()->CreateString(connection->node()->name()->string_view());
+    Node::Builder node_builder = builder.MakeBuilder<Node>();
+    node_builder.add_name(node_name_offset);
+    flatbuffers::Offset<Node> node_offset = node_builder.Finish();
+
+    ClientConnection::Builder client_connection_builder =
+        builder.MakeBuilder<ClientConnection>();
+
+    client_connection_builder.add_node(node_offset);
+    client_connection_builder.add_state(connection->state());
+    client_connection_builder.add_received_packets(
+        connection->received_packets());
+
+    // Strip out the monotonic offset if it isn't populated.
+    TimestampFilter *filter = &filters_[client_connection_offsets_.size()];
+    if (filter->has_sample()) {
+      client_connection_builder.add_monotonic_offset(
+          (chrono::duration_cast<chrono::nanoseconds>(
+               chrono::duration<double>(filter->offset())) +
+           filter->base_offset())
+              .count());
+    }
+
+    client_connection_offsets_.emplace_back(client_connection_builder.Finish());
+  }
+
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+      client_connections_offset =
+          builder.fbb()->CreateVector(client_connection_offsets_);
+
+  ClientStatistics::Builder client_statistics_builder =
+      builder.MakeBuilder<ClientStatistics>();
+  client_statistics_builder.add_connections(client_connections_offset);
+
+  builder.Send(client_statistics_builder.Finish());
+}
+
+int MessageBridgeClientStatus::FindClientIndex(std::string_view node_name) {
+  for (size_t i = 0; i < statistics_.message().connections()->size(); ++i) {
+    const ClientConnection *client_connection =
+        statistics_.message().connections()->Get(i);
+    if (client_connection->node()->name()->string_view() == node_name) {
+      return i;
+    }
+  }
+
+  LOG(FATAL) << "Unknown client " << node_name;
+}
+
+ClientConnection *MessageBridgeClientStatus::GetClientConnection(
+    int client_index) {
+  return statistics_.mutable_message()->mutable_connections()->GetMutableObject(
+      client_index);
+}
+
+void MessageBridgeClientStatus::SampleFilter(
+    int client_index,
+    const aos::monotonic_clock::time_point monotonic_sent_time,
+    const aos::monotonic_clock::time_point monotonic_delivered_time) {
+  TimestampFilter *filter = &filters_[client_index];
+
+  const std::chrono::nanoseconds offset =
+      monotonic_delivered_time - monotonic_sent_time;
+
+  // If this is our first observation, use that to seed the base offset.  That
+  // gets us in the ballpark.
+  if (!filter->has_sample()) {
+    filter->set_base_offset(offset);
+  }
+
+  // We can now measure the latency!
+  filter->Sample(monotonic_delivered_time, offset);
+}
+
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_client_status.h b/aos/network/message_bridge_client_status.h
new file mode 100644
index 0000000..0653c9d
--- /dev/null
+++ b/aos/network/message_bridge_client_status.h
@@ -0,0 +1,70 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_STATUS_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_STATUS_H_
+
+#include <string_view>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/timestamp_filter.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class is responsible for publishing the (filtered) client side
+// statistics periodically.
+class MessageBridgeClientStatus {
+ public:
+  MessageBridgeClientStatus(aos::EventLoop *event_loop);
+
+  MessageBridgeClientStatus(const MessageBridgeClientStatus &) = delete;
+  MessageBridgeClientStatus(MessageBridgeClientStatus &&) = delete;
+  MessageBridgeClientStatus &operator=(const MessageBridgeClientStatus &) =
+      delete;
+  MessageBridgeClientStatus &operator=(MessageBridgeClientStatus &&) = delete;
+
+  // Returns the connection datastructure for the provided node.
+  int FindClientIndex(std::string_view node_name);
+  ClientConnection *GetClientConnection(int client_index);
+
+  // Returns the ClientStatistics message this holds.
+  ClientStatistics *mutable_client_statistics() {
+    return statistics_.mutable_message();
+  }
+
+  // Adds a sample for the provided client index given the sent time (on the
+  // remote node) and the delivered time (on this node).
+  void SampleFilter(
+      int client_index,
+      const aos::monotonic_clock::time_point monotonic_sent_time,
+      const aos::monotonic_clock::time_point monotonic_delivered_time);
+
+  // Clears out the filter state.
+  void SampleReset(int client_index) { filters_[client_index].Reset(); }
+
+ private:
+  // Sends out the statistics that are continually updated by the
+  // SctpClientConnections.
+  void SendStatistics();
+
+  aos::EventLoop *event_loop_;
+  aos::TimerHandler *statistics_timer_;
+
+  // Sender to publish statistics on.
+  aos::Sender<ClientStatistics> sender_;
+
+  // Nodes to receive data from.
+  const std::vector<std::string_view> source_node_names_;
+  // Data to publish.
+  FlatbufferDetachedBuffer<ClientStatistics> statistics_;
+  // Reserved memory for the client connection offsets to reduce heap
+  // allocations.
+  std::vector<flatbuffers::Offset<ClientConnection>> client_connection_offsets_;
+
+  std::vector<TimestampFilter> filters_;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_STATUS_H_