Add per-channel statistics to message bridge ServerStatistics

When trying to debug issues with message bridge, we wanted to be able to
tell which channels were having issues. This will help with it (although
like the timing reports, it will help to have a utility to transform
from channel indices to channel names).

Change-Id: Ief2a11c7b45e32cc8bc4ab6861e05b7f57aaeb9c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test_lib.h b/aos/events/logging/multinode_logger_test_lib.h
index f95c6cc..a2aa4af 100644
--- a/aos/events/logging/multinode_logger_test_lib.h
+++ b/aos/events/logging/multinode_logger_test_lib.h
@@ -60,13 +60,13 @@
 };
 
 constexpr std::string_view kCombinedConfigSha1() {
-  return "e630fdd5533159ddad89075f93d9df90ae93a5a5841d6af7e1ec86875792bf27";
+  return "a72e2a1e21ac07b27648825151ff9b436fd80b62254839d4ac47ee3400fa9dc1";
 }
 constexpr std::string_view kSplitConfigSha1() {
-  return "7ed547b800f84e5b56825d11d39d3686fb770c2021658c3a9031f2cbf94e82a4";
+  return "6e585268f58791591f48b1e6d00564f49e6dcec46d18c4809ec49d94afbb3b1c";
 }
 constexpr std::string_view kReloggedSplitConfigSha1() {
-  return "7b17a3349852133aa56790fce93650b82455bad36ac669a4adebf33419c8ece9";
+  return "6aa4cbc21e2382ea8b9ef0145e9031bf542827e29b93995dd5e203ed0c198ef7";
 }
 
 LoggerState MakeLoggerState(NodeEventLoopFactory *node,
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 14e0a08..0afa22e 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -619,6 +619,17 @@
 
           EXPECT_TRUE(connection->has_monotonic_offset());
           EXPECT_EQ(connection->monotonic_offset(), 0);
+
+          EXPECT_TRUE(connection->has_channels());
+          int accumulated_sent_count = 0;
+          int accumulated_dropped_count = 0;
+          for (const message_bridge::ServerChannelStatistics *channel :
+               *connection->channels()) {
+            accumulated_sent_count += channel->sent_packets();
+            accumulated_dropped_count += channel->dropped_packets();
+          }
+          EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
+          EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
         }
         ++pi1_server_statistics_count;
       });
@@ -639,6 +650,18 @@
         EXPECT_EQ(connection->monotonic_offset(), 0);
         EXPECT_EQ(connection->connection_count(), 1u);
         EXPECT_EQ(connection->connected_since_time(), 0);
+
+        EXPECT_TRUE(connection->has_channels());
+        int accumulated_sent_count = 0;
+        int accumulated_dropped_count = 0;
+        for (const message_bridge::ServerChannelStatistics *channel :
+             *connection->channels()) {
+          accumulated_sent_count += channel->sent_packets();
+          accumulated_dropped_count += channel->dropped_packets();
+        }
+        EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
+        EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
+
         ++pi2_server_statistics_count;
       });
 
@@ -658,6 +681,18 @@
         EXPECT_EQ(connection->monotonic_offset(), 0);
         EXPECT_EQ(connection->connection_count(), 1u);
         EXPECT_EQ(connection->connected_since_time(), 0);
+
+        EXPECT_TRUE(connection->has_channels());
+        int accumulated_sent_count = 0;
+        int accumulated_dropped_count = 0;
+        for (const message_bridge::ServerChannelStatistics *channel :
+             *connection->channels()) {
+          accumulated_sent_count += channel->sent_packets();
+          accumulated_dropped_count += channel->dropped_packets();
+        }
+        EXPECT_EQ(connection->sent_packets(), accumulated_sent_count);
+        EXPECT_EQ(connection->dropped_packets(), accumulated_dropped_count);
+
         ++pi3_server_statistics_count;
       });
 
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index b363142..35cc3a6 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -54,6 +54,8 @@
     if (server_status) {
       server_connection_ =
           server_status_->FindServerConnection(send_node_factory_->node());
+      server_index_ = configuration::GetNodeIndex(
+          send_node_factory_->configuration(), send_node_factory_->node());
     }
     if (delivery_time_is_logged_ && timestamp_loggers != nullptr &&
         !forwarding_disabled_) {
@@ -153,13 +155,11 @@
         << " at " << fetch_node_factory_->monotonic_now();
 
     if (timer_) {
-      server_connection_->mutate_sent_packets(
-          server_connection_->sent_packets() + 1);
+      server_status_->AddSentPacket(server_index_, channel_);
       timer_->Schedule(monotonic_delivered_time);
       timer_scheduled_ = true;
     } else {
-      server_connection_->mutate_dropped_packets(
-          server_connection_->dropped_packets() + 1);
+      server_status_->AddDroppedPacket(server_index_, channel_);
       sent_ = true;
     }
   }
@@ -192,13 +192,11 @@
         << " at " << fetch_node_factory_->monotonic_now();
 
     if (timer_) {
-      server_connection_->mutate_sent_packets(
-          server_connection_->sent_packets() + 1);
+      server_status_->AddSentPacket(server_index_, channel_);
       timer_->Schedule(monotonic_delivered_time);
       timer_scheduled_ = true;
     } else {
-      server_connection_->mutate_dropped_packets(
-          server_connection_->dropped_packets() + 1);
+      server_status_->AddDroppedPacket(server_index_, channel_);
       sent_ = true;
       Schedule();
     }
@@ -218,8 +216,7 @@
 
       if (server_connection_->state() != State::CONNECTED) {
         sent_ = true;
-        server_connection_->mutate_dropped_packets(
-            server_connection_->dropped_packets() + 1);
+        server_status_->AddDroppedPacket(server_index_, channel_);
         continue;
       }
 
@@ -241,8 +238,7 @@
                    << fetcher_->context().monotonic_event_time << " now is "
                    << fetch_node_factory_->monotonic_now();
       sent_ = true;
-      server_connection_->mutate_dropped_packets(
-          server_connection_->dropped_packets() + 1);
+      server_status_->AddDroppedPacket(server_index_, channel_);
     }
   }
 
@@ -400,6 +396,7 @@
   bool sent_ = false;
 
   ServerConnection *server_connection_ = nullptr;
+  int server_index_ = -1;
   MessageBridgeClientStatus *client_status_ = nullptr;
   int client_index_ = -1;
   ClientConnection *client_connection_ = nullptr;
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
index 30017c4..e936828 100644
--- a/aos/network/message_bridge_server.fbs
+++ b/aos/network/message_bridge_server.fbs
@@ -2,6 +2,19 @@
 
 namespace aos.message_bridge;
 
+// Per-channel statistics for any channels being forwarded to other nodes by the
+// message bridge server.
+table ServerChannelStatistics {
+  // Index into the configuration channels list for this channel.
+  channel_index:uint64 (id: 0);
+  // Total number of messages that were sent (this does not necessarily
+  // mean that the message made it to the client).
+  sent_packets:uint (id: 1);
+  // Total number of messages that were dropped while sending (e.g.,
+  // those dropped by the kernel).
+  dropped_packets:uint (id: 2);
+}
+
 // State of the connection.
 enum State: ubyte {
   CONNECTED,
@@ -45,6 +58,10 @@
   // Number of times we've had an invalid connection with something wrong in
   // the connection message, but we were able to match which node it was.
   invalid_connection_count:uint (id: 9);
+
+  // Statistics for every channel being forwarded to this node. Ordering is arbitrary;
+  // the channels are identified by an index in the ServerChannelStatistics.
+  channels:[ServerChannelStatistics] (id: 10);
 }
 
 // Statistics for all connections to all the clients.
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index e9f6e0d..22b2f3f 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -79,14 +79,12 @@
                          fbb.GetSize()),
                      peer.sac_assoc_id, peer.stream,
                      peer.connection->time_to_live() / 1000000)) {
-      peer.server_connection_statistics->mutate_sent_packets(
-          peer.server_connection_statistics->sent_packets() + 1);
+      peer.server_status->AddSentPacket(peer.node_index, channel_);
       if (peer.logged_remotely) {
         ++sent_count;
       }
     } else {
-      peer.server_connection_statistics->mutate_dropped_packets(
-          peer.server_connection_statistics->dropped_packets() + 1);
+      peer.server_status->AddDroppedPacket(peer.node_index, channel_);
     }
   }
 
@@ -254,11 +252,9 @@
                                             fbb.GetSize()),
                            peer.sac_assoc_id, peer.stream,
                            peer.connection->time_to_live() / 1000000)) {
-            peer.server_connection_statistics->mutate_sent_packets(
-                peer.server_connection_statistics->sent_packets() + 1);
+            peer.server_status->AddSentPacket(peer.node_index, channel_);
           } else {
-            peer.server_connection_statistics->mutate_dropped_packets(
-                peer.server_connection_statistics->dropped_packets() + 1);
+            peer.server_status->AddDroppedPacket(peer.node_index, channel_);
           }
         }
       }
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index d823f30..5607d4a 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -18,6 +18,21 @@
 
 namespace chrono = std::chrono;
 
+std::vector<const Channel *> ChannelsForNodePair(
+    const Configuration *configuration, const aos::Node *source_node,
+    const aos::Node *destination_node) {
+  std::vector<const Channel *> result;
+  for (size_t channel_index = 0;
+       channel_index < configuration->channels()->size(); ++channel_index) {
+    const aos::Channel *channel = configuration->channels()->Get(channel_index);
+    if (configuration::ChannelIsForwardedFromNode(channel, source_node) &&
+        configuration::ChannelIsReadableOnNode(channel, destination_node)) {
+      result.push_back(channel);
+    }
+  }
+  return result;
+}
+
 // Builds up the "empty" server statistics message to be pointed to by all the
 // connections, updated at runtime, and periodically sent.
 FlatbufferDetachedBuffer<ServerStatistics> MakeServerStatistics(
@@ -111,13 +126,30 @@
                                   Timestamp::GetFullyQualifiedName(),
                                   event_loop_->name(), destination_node);
 
-    nodes_[node_index] = NodeState{
-        .server_connection =
-            FindServerConnection(destination_node->name()->string_view()),
-        .timestamp_fetcher = event_loop_->MakeFetcher<Timestamp>(
-            other_timestamp_channel->name()->string_view()),
-        .filter = {},
-        .boot_uuid = std::nullopt};
+    ServerConnection *const server_connection =
+        FindServerConnection(destination_node->name()->string_view());
+    std::map<const Channel *, ServerChannelStatisticsT> channel_statistics;
+    for (const Channel *channel :
+         ChannelsForNodePair(event_loop_->configuration(), event_loop_->node(),
+                             destination_node)) {
+      ServerChannelStatisticsT initial_statistics;
+      initial_statistics.channel_index =
+          configuration::ChannelIndex(event_loop_->configuration(), channel);
+      initial_statistics.sent_packets = 0;
+      initial_statistics.dropped_packets = 0;
+      channel_statistics[channel] = initial_statistics;
+    }
+
+    nodes_[node_index] =
+        NodeState{.server_connection = server_connection,
+                  .channel_statistics = channel_statistics,
+                  .channel_offsets_buffer =
+                      std::vector<flatbuffers::Offset<ServerChannelStatistics>>(
+                          channel_statistics.size()),
+                  .timestamp_fetcher = event_loop_->MakeFetcher<Timestamp>(
+                      other_timestamp_channel->name()->string_view()),
+                  .filter = {},
+                  .boot_uuid = std::nullopt};
   }
 
   statistics_timer_ = event_loop_->AddTimer([this]() { Tick(); });
@@ -142,6 +174,23 @@
   return FindServerConnection(node->name()->string_view());
 }
 
+void MessageBridgeServerStatus::AddSentPacket(int node_index,
+                                              const aos::Channel *channel) {
+  CHECK(nodes_[node_index].has_value());
+  NodeState &node = nodes_[node_index].value();
+  ServerConnection *connection = node.server_connection;
+  connection->mutate_sent_packets(connection->sent_packets() + 1);
+  node.channel_statistics[channel].sent_packets++;
+}
+void MessageBridgeServerStatus::AddDroppedPacket(int node_index,
+                                                 const aos::Channel *channel) {
+  CHECK(nodes_[node_index].has_value());
+  NodeState &node = nodes_[node_index].value();
+  ServerConnection *connection = node.server_connection;
+  connection->mutate_dropped_packets(connection->dropped_packets() + 1);
+  node.channel_statistics[channel].dropped_packets++;
+}
+
 void MessageBridgeServerStatus::SetBootUUID(int node_index,
                                             const UUID &boot_uuid) {
   nodes_[node_index].value().boot_uuid = boot_uuid;
@@ -193,6 +242,7 @@
     const int node_index =
         configuration::GetNodeIndex(event_loop_->configuration(),
                                     connection->node()->name()->string_view());
+    CHECK(nodes_[node_index].has_value());
 
     flatbuffers::Offset<flatbuffers::String> node_name_offset =
         builder.fbb()->CreateString(connection->node()->name()->string_view());
@@ -208,6 +258,22 @@
               builder.fbb());
     }
 
+    {
+      size_t index = 0;
+      CHECK_EQ(nodes_[node_index].value().channel_offsets_buffer.size(),
+               nodes_[node_index].value().channel_statistics.size());
+      for (const auto &channel :
+           nodes_[node_index].value().channel_statistics) {
+        nodes_[node_index].value().channel_offsets_buffer.at(index) =
+            ServerChannelStatistics::Pack(*builder.fbb(), &channel.second);
+        index++;
+      }
+    }
+    const flatbuffers::Offset<
+        flatbuffers::Vector<flatbuffers::Offset<ServerChannelStatistics>>>
+        channels_offset = builder.fbb()->CreateVector(
+            nodes_[node_index].value().channel_offsets_buffer);
+
     ServerConnection::Builder server_connection_builder =
         builder.MakeBuilder<ServerConnection>();
     server_connection_builder.add_node(node_offset);
@@ -217,6 +283,7 @@
     server_connection_builder.add_sent_packets(connection->sent_packets());
     server_connection_builder.add_partial_deliveries(
         PartialDeliveries(node_index));
+    server_connection_builder.add_channels(channels_offset);
 
     if (connection->connected_since_time() !=
         monotonic_clock::min_time.time_since_epoch().count()) {
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index c71bd84..d6edb89 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -3,6 +3,8 @@
 
 #include <chrono>
 #include <functional>
+#include <map>
+#include <vector>
 
 #include "aos/events/event_loop.h"
 #include "aos/network/message_bridge_client_generated.h"
@@ -29,8 +31,18 @@
     // Mutable status for this node, to be sent out in the ServerStatistics
     // message.
     ServerConnection *server_connection;
-    // Fetcher to retrieve timestamps for the connection to the other node, for
-    // feeding the timestamp filter.
+    // Mapping of channel index in the Configuration to the statistics for that
+    // channel.
+    std::map<const aos::Channel *, ServerChannelStatisticsT> channel_statistics;
+    // Buffer for the above offsets, because flatbuffers doesn't provide a great
+    // API for creating vectors of tables (namely, CreateUninitializedVector
+    // doesn't work for tables because it can't handle offsets and
+    // CreateVector(len, generator_function) creates an intermediate
+    // std::vector).
+    std::vector<flatbuffers::Offset<ServerChannelStatistics>>
+        channel_offsets_buffer;
+    // Fetcher to retrieve timestamps for the connection to the other node,
+    // for feeding the timestamp filter.
     aos::Fetcher<Timestamp> timestamp_fetcher;
     // Filter for calculating current time offsets to the other node.
     ClippedAverageFilter filter;
@@ -80,6 +92,12 @@
     return nodes_[node_index].value().partial_deliveries;
   }
 
+  // Track an additional sent/dropped packets on each channel. node_index
+  // represents the node being sent to.
+  // node_index must be a valid client node.
+  void AddSentPacket(int node_index, const aos::Channel *channel);
+  void AddDroppedPacket(int node_index, const aos::Channel *channel);
+
   // Returns the ServerConnection message which is updated by the server.
   ServerConnection *FindServerConnection(std::string_view node_name);
   ServerConnection *FindServerConnection(const Node *node);
diff --git a/y2020/y2020_logger.json b/y2020/y2020_logger.json
index 911be81..d538228 100644
--- a/y2020/y2020_logger.json
+++ b/y2020/y2020_logger.json
@@ -149,6 +149,7 @@
       "name": "/logger/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "logger",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2020/y2020_pi_template.json b/y2020/y2020_pi_template.json
index c609632..61f1f78 100644
--- a/y2020/y2020_pi_template.json
+++ b/y2020/y2020_pi_template.json
@@ -57,6 +57,7 @@
       "name": "/pi{{ NUM }}/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "pi{{ NUM }}",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2020/y2020_roborio.json b/y2020/y2020_roborio.json
index eb97b1f..86e4f63 100644
--- a/y2020/y2020_roborio.json
+++ b/y2020/y2020_roborio.json
@@ -87,6 +87,7 @@
       "name": "/roborio/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "roborio",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2022/y2022_imu.json b/y2022/y2022_imu.json
index bbe3a11..843811f 100644
--- a/y2022/y2022_imu.json
+++ b/y2022/y2022_imu.json
@@ -111,6 +111,7 @@
       "name": "/imu/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "imu",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2022/y2022_logger.json b/y2022/y2022_logger.json
index 65a6e98..85d522d 100644
--- a/y2022/y2022_logger.json
+++ b/y2022/y2022_logger.json
@@ -191,6 +191,7 @@
       "name": "/logger/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "logger",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2022/y2022_pi_template.json b/y2022/y2022_pi_template.json
index cb90fee..907cac1 100644
--- a/y2022/y2022_pi_template.json
+++ b/y2022/y2022_pi_template.json
@@ -68,6 +68,7 @@
       "name": "/pi{{ NUM }}/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "pi{{ NUM }}",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2022/y2022_roborio.json b/y2022/y2022_roborio.json
index d046568..dd42a4b 100644
--- a/y2022/y2022_roborio.json
+++ b/y2022/y2022_roborio.json
@@ -139,6 +139,7 @@
       "name": "/roborio/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "roborio",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2023/y2023_imu.json b/y2023/y2023_imu.json
index 1982c6e..667e569 100644
--- a/y2023/y2023_imu.json
+++ b/y2023/y2023_imu.json
@@ -140,6 +140,7 @@
       "name": "/imu/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "imu",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2023/y2023_logger.json b/y2023/y2023_logger.json
index 47977ad..e526c50 100644
--- a/y2023/y2023_logger.json
+++ b/y2023/y2023_logger.json
@@ -71,6 +71,7 @@
       "name": "/logger/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "logger",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2023/y2023_pi_template.json b/y2023/y2023_pi_template.json
index eee4ebb..6e4fe5f 100644
--- a/y2023/y2023_pi_template.json
+++ b/y2023/y2023_pi_template.json
@@ -34,6 +34,7 @@
       "name": "/pi{{ NUM }}/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "pi{{ NUM }}",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },
diff --git a/y2023/y2023_roborio.json b/y2023/y2023_roborio.json
index 172d11c..4af3d34 100644
--- a/y2023/y2023_roborio.json
+++ b/y2023/y2023_roborio.json
@@ -59,6 +59,7 @@
       "name": "/roborio/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "roborio",
+      "max_size": 2048,
       "frequency": 10,
       "num_senders": 2
     },