Send *Statistics and Timestamp messages from SimulatedMessageBridge

This makes the simulation environment much closer to reality.

Change-Id: Ie0f44c17d9c9a750363335def1b008cef8c809b0
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 01e256c..5346647 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -307,6 +307,8 @@
         ":event_loop",
         ":simple_channel",
         "//aos/ipc_lib:index",
+        "//aos/network:message_bridge_client_status",
+        "//aos/network:message_bridge_server_status",
         "//aos/util:phased_loop",
         "@com_google_absl//absl/container:btree",
     ],
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index faf6361..57d0525 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -103,16 +103,60 @@
     static const std::string kJson = R"config({
   "channels": [
     {
-      "name": "/aos/me",
+      "name": "/me/aos",
       "type": "aos.logging.LogMessageFbs",
       "source_node": "me"
     },
     {
-      "name": "/aos/them",
+      "name": "/them/aos",
       "type": "aos.logging.LogMessageFbs",
       "source_node": "them"
     },
     {
+      "name": "/me/aos",
+      "type": "aos.message_bridge.Timestamp",
+      "source_node": "me",
+      "destination_nodes": [
+        {
+          "name": "them"
+        }
+      ]
+    },
+    {
+      "name": "/them/aos",
+      "type": "aos.message_bridge.Timestamp",
+      "source_node": "them",
+      "destination_nodes": [
+        {
+          "name": "me"
+        }
+      ]
+    },
+    {
+      "name": "/me/aos",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "me",
+      "frequency": 2
+    },
+    {
+      "name": "/them/aos",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "them",
+      "frequency": 2
+    },
+    {
+      "name": "/me/aos",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "me",
+      "frequency": 2
+    },
+    {
+      "name": "/them/aos",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "them",
+      "frequency": 2
+    },
+    {
       "name": "/aos",
       "type": "aos.timing.Report",
       "source_node": "me"
@@ -146,29 +190,28 @@
   "maps": [
     {
       "match": {
-        "name": "/aos",
-        "type": "aos.logging.LogMessageFbs",
+        "name": "/aos*",
         "source_node": "me"
       },
       "rename": {
-        "name": "/aos/me"
+        "name": "/me/aos"
       }
     },
     {
       "match": {
-        "name": "/aos",
-        "type": "aos.logging.LogMessageFbs",
+        "name": "/aos*",
         "source_node": "them"
       },
       "rename": {
-        "name": "/aos/them"
+        "name": "/them/aos"
       }
     }
   ]
 })config";
 
-    flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
-        JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable()));
+    flatbuffer_ = configuration::MergeConfiguration(
+        FlatbufferDetachedBuffer<Configuration>(
+            JsonToFlatbuffer(kJson, Configuration::MiniReflectTypeTable())));
 
     my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
   }
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index b5a4377..c3af9e2 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -131,6 +131,9 @@
     flatbuffers = [
         "//aos/events:ping_fbs",
         "//aos/events:pong_fbs",
+        "//aos/network:message_bridge_client_fbs",
+        "//aos/network:message_bridge_server_fbs",
+        "//aos/network:timestamp_fbs",
     ],
     deps = ["//aos/events:config"],
 )
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index eff977b..abab5f6 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -593,6 +593,10 @@
 
       event_loop_factory_->DisableForwarding(remapped_channel);
     }
+
+    // If we are replaying a log, we don't want a bunch of redundant messages
+    // from both the real message bridge and simulated message bridge.
+    event_loop_factory_->DisableStatistics();
   }
 
   // While we are starting the system up, we might be relying on matching data
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index b6a30cb..7dbbb8e 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -171,6 +171,7 @@
   unlink(logfile.c_str());
 
   LOG(INFO) << "Logging data to " << logfile;
+  ping_.set_quiet(true);
 
   {
     DetachedBufferWriter writer(logfile);
@@ -352,25 +353,25 @@
 
     // Timing reports, pings
     EXPECT_THAT(CountChannelsData(logfiles_[0]),
-                ::testing::ElementsAre(::testing::Pair(1, 40),
-                                       ::testing::Pair(4, 2001)));
+                ::testing::ElementsAre(::testing::Pair(4, 40),
+                                       ::testing::Pair(10, 2001)));
     // Timestamps for pong
     EXPECT_THAT(CountChannelsTimestamp(logfiles_[0]),
-                ::testing::ElementsAre(::testing::Pair(5, 2001)));
+                ::testing::ElementsAre(::testing::Pair(11, 2001)));
 
     // Pong data.
     EXPECT_THAT(CountChannelsData(logfiles_[1]),
-                ::testing::ElementsAre(::testing::Pair(5, 2001)));
+                ::testing::ElementsAre(::testing::Pair(11, 2001)));
     // No timestamps
     EXPECT_THAT(CountChannelsTimestamp(logfiles_[1]), ::testing::ElementsAre());
 
     // Timing reports and pongs.
     EXPECT_THAT(CountChannelsData(logfiles_[2]),
-                ::testing::ElementsAre(::testing::Pair(3, 40),
-                                       ::testing::Pair(5, 2001)));
+                ::testing::ElementsAre(::testing::Pair(9, 40),
+                                       ::testing::Pair(11, 2001)));
     // And ping timestamps.
     EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]),
-                ::testing::ElementsAre(::testing::Pair(4, 2001)));
+                ::testing::ElementsAre(::testing::Pair(10, 2001)));
   }
 
   LogReader reader({std::vector<std::string>{logfiles_[0]},
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
index 1b1424e..a78b42c 100644
--- a/aos/events/logging/multinode_pingpong.json
+++ b/aos/events/logging/multinode_pingpong.json
@@ -33,6 +33,54 @@
       "num_senders": 20,
       "max_size": 2048
     },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.message_bridge.ServerStatistics",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.message_bridge.ServerStatistics",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi2"
+    },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.message_bridge.ClientStatistics",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.message_bridge.ClientStatistics",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi2"
+    },
+    {
+      "name": "/pi1/aos",
+      "type": "aos.message_bridge.Timestamp",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "timestamp_logger": "NOT_LOGGED"
+        }
+      ]
+    },
+    {
+      "name": "/pi2/aos",
+      "type": "aos.message_bridge.Timestamp",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi2",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "timestamp_logger": "NOT_LOGGED"
+        }
+      ]
+    },
     /* Forwarded to pi2 */
     {
       "name": "/test",
diff --git a/aos/events/ping_lib.cc b/aos/events/ping_lib.cc
index 9bc2446..7a0bfec 100644
--- a/aos/events/ping_lib.cc
+++ b/aos/events/ping_lib.cc
@@ -49,14 +49,14 @@
   const chrono::nanoseconds round_trip_time =
       monotonic_now - monotonic_send_time;
 
-  if (last_pong_value_ + 1 != pong.value()) {
+  if (last_pong_value_ + 1 != pong.value() && (!quiet_ || VLOG_IS_ON(1))) {
     LOG(WARNING) << "Pong message lost";
   }
 
   if (pong.value() == count_) {
     VLOG(1) << "Elapsed time " << round_trip_time.count() << " ns "
             << FlatbufferToJson(&pong);
-  } else {
+  } else if (!quiet_ || VLOG_IS_ON(1)) {
     LOG(WARNING) << "Missmatched pong message, got " << FlatbufferToJson(&pong)
                  << " expected " << count_;
   }
diff --git a/aos/events/ping_lib.h b/aos/events/ping_lib.h
index e14c3f2..6107494 100644
--- a/aos/events/ping_lib.h
+++ b/aos/events/ping_lib.h
@@ -14,6 +14,8 @@
  public:
   Ping(EventLoop *event_loop);
 
+  void set_quiet(bool quiet) { quiet_ = quiet; }
+
  private:
   // Sends out the ping message with an incrementing count.
   void SendPing();
@@ -29,6 +31,8 @@
   int count_ = 0;
   // Last pong value received so we can detect missed pongs.
   int last_pong_value_ = 0;
+
+  bool quiet_ = false;
 };
 
 }  // namespace aos
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index c339ce0..b2e54bb 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -986,7 +986,13 @@
 }
 
 void SimulatedEventLoopFactory::DisableForwarding(const Channel *channel) {
+  CHECK(bridge_) << ": Can't disable forwarding without a message bridge.";
   bridge_->DisableForwarding(channel);
 }
 
+void SimulatedEventLoopFactory::DisableStatistics() {
+  CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
+  bridge_->DisableStatistics();
+}
+
 }  // namespace aos
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index de19b11..bc2721e 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -105,6 +105,9 @@
   // for things like the logger.
   void DisableForwarding(const Channel *channel);
 
+  // Disables the messages sent by the simulated message gateway.
+  void DisableStatistics();
+
  private:
   const Configuration *const configuration_;
   EventSchedulerScheduler scheduler_scheduler_;
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 78c0d44..44ba3e5 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -6,6 +6,9 @@
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
 #include "aos/events/test_message_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/timestamp_generated.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -250,13 +253,28 @@
             0.0);
 }
 
-// Tests that ping and pong work when on 2 different nodes.
+template <typename T>
+class MessageCounter {
+ public:
+  MessageCounter(aos::EventLoop *event_loop, std::string_view name) {
+    event_loop->MakeNoArgWatcher<T>(name, [this]() { ++count_; });
+  }
+
+  size_t count() const { return count_; }
+
+ private:
+  size_t count_ = 0;
+};
+
+// Tests that ping and pong work when on 2 different nodes, and the message
+// gateway messages are sent out as expected.
 TEST(SimulatedEventLoopTest, MultinodePingPong) {
   aos::FlatbufferDetachedBuffer<aos::Configuration> config =
       aos::configuration::ReadConfig(
           "aos/events/multinode_pingpong_config.json");
   const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
   const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+  const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
 
   SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
 
@@ -270,24 +288,364 @@
 
   std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+  MessageCounter<examples::Pong> pi2_pong_counter(
+      pi2_pong_counter_event_loop.get(), "/test");
 
-  int pi2_pong_count = 0;
-  pi2_pong_counter_event_loop->MakeWatcher(
-      "/test",
-      [&pi2_pong_count](const examples::Pong & /*pong*/) { ++pi2_pong_count; });
+  std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
 
   std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
-  int pi1_pong_count = 0;
+  MessageCounter<examples::Pong> pi1_pong_counter(
+      pi1_pong_counter_event_loop.get(), "/test");
+
+  // Count timestamps.
+  MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
+      pi1_pong_counter_event_loop.get(), "/pi1/aos");
+  MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
+      pi2_pong_counter_event_loop.get(), "/pi1/aos");
+  MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
+      pi3_pong_counter_event_loop.get(), "/pi1/aos");
+  MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
+      pi1_pong_counter_event_loop.get(), "/pi2/aos");
+  MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
+      pi2_pong_counter_event_loop.get(), "/pi2/aos");
+  MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
+      pi1_pong_counter_event_loop.get(), "/pi3/aos");
+  MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
+      pi3_pong_counter_event_loop.get(), "/pi3/aos");
+
+  // Wait to let timestamp estimation start up before looking for the results.
+  simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
+
+  int pi1_server_statistics_count = 0;
   pi1_pong_counter_event_loop->MakeWatcher(
-      "/test",
-      [&pi1_pong_count](const examples::Pong & /*pong*/) { ++pi1_pong_count; });
+      "/pi1/aos", [&pi1_server_statistics_count](
+                      const message_bridge::ServerStatistics &stats) {
+        VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+        EXPECT_EQ(stats.connections()->size(), 2u);
+        for (const message_bridge::ServerConnection *connection :
+             *stats.connections()) {
+          EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+          if (connection->node()->name()->string_view() == "pi2") {
+            EXPECT_GT(connection->sent_packets(), 50);
+          } else if (connection->node()->name()->string_view() == "pi3") {
+            EXPECT_GE(connection->sent_packets(), 5);
+          } else {
+            LOG(FATAL) << "Unknown connection";
+          }
+
+          EXPECT_TRUE(connection->has_monotonic_offset());
+          EXPECT_EQ(connection->monotonic_offset(), 0);
+        }
+        ++pi1_server_statistics_count;
+      });
+
+  int pi2_server_statistics_count = 0;
+  pi2_pong_counter_event_loop->MakeWatcher(
+      "/pi2/aos", [&pi2_server_statistics_count](
+                      const message_bridge::ServerStatistics &stats) {
+        VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+        EXPECT_EQ(stats.connections()->size(), 1u);
+
+        const message_bridge::ServerConnection *connection =
+            stats.connections()->Get(0);
+        EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+        EXPECT_GT(connection->sent_packets(), 50);
+        EXPECT_TRUE(connection->has_monotonic_offset());
+        EXPECT_EQ(connection->monotonic_offset(), 0);
+        ++pi2_server_statistics_count;
+      });
+
+  int pi3_server_statistics_count = 0;
+  pi3_pong_counter_event_loop->MakeWatcher(
+      "/pi3/aos", [&pi3_server_statistics_count](
+                      const message_bridge::ServerStatistics &stats) {
+        VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
+        EXPECT_EQ(stats.connections()->size(), 1u);
+
+        const message_bridge::ServerConnection *connection =
+            stats.connections()->Get(0);
+        EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+        EXPECT_GE(connection->sent_packets(), 5);
+        EXPECT_TRUE(connection->has_monotonic_offset());
+        EXPECT_EQ(connection->monotonic_offset(), 0);
+        ++pi3_server_statistics_count;
+      });
+
+  int pi1_client_statistics_count = 0;
+  pi1_pong_counter_event_loop->MakeWatcher(
+      "/pi1/aos", [&pi1_client_statistics_count](
+                      const message_bridge::ClientStatistics &stats) {
+        VLOG(1) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+        EXPECT_EQ(stats.connections()->size(), 2u);
+
+        for (const message_bridge::ClientConnection *connection :
+             *stats.connections()) {
+          EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+          if (connection->node()->name()->string_view() == "pi2") {
+            EXPECT_GT(connection->received_packets(), 50);
+          } else if (connection->node()->name()->string_view() == "pi3") {
+            EXPECT_GE(connection->received_packets(), 5);
+          } else {
+            LOG(FATAL) << "Unknown connection";
+          }
+
+          EXPECT_TRUE(connection->has_monotonic_offset());
+          EXPECT_EQ(connection->monotonic_offset(), 150000);
+        }
+        ++pi1_client_statistics_count;
+      });
+
+  int pi2_client_statistics_count = 0;
+  pi2_pong_counter_event_loop->MakeWatcher(
+      "/pi2/aos", [&pi2_client_statistics_count](
+                      const message_bridge::ClientStatistics &stats) {
+        VLOG(1) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+        EXPECT_EQ(stats.connections()->size(), 1u);
+
+        const message_bridge::ClientConnection *connection =
+            stats.connections()->Get(0);
+        EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+        EXPECT_GT(connection->received_packets(), 50);
+        EXPECT_TRUE(connection->has_monotonic_offset());
+        EXPECT_EQ(connection->monotonic_offset(), 150000);
+        ++pi2_client_statistics_count;
+      });
+
+  int pi3_client_statistics_count = 0;
+  pi3_pong_counter_event_loop->MakeWatcher(
+      "/pi3/aos", [&pi3_client_statistics_count](
+                      const message_bridge::ClientStatistics &stats) {
+        VLOG(1) << "pi3 ClientStatistics " << FlatbufferToJson(&stats);
+        EXPECT_EQ(stats.connections()->size(), 1u);
+
+        const message_bridge::ClientConnection *connection =
+            stats.connections()->Get(0);
+        EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+        EXPECT_GE(connection->received_packets(), 5);
+        EXPECT_TRUE(connection->has_monotonic_offset());
+        EXPECT_EQ(connection->monotonic_offset(), 150000);
+        ++pi3_client_statistics_count;
+      });
+
+  simulated_event_loop_factory.RunFor(chrono::seconds(10) -
+                                      chrono::milliseconds(500) +
+                                      chrono::milliseconds(5));
+
+  EXPECT_EQ(pi1_pong_counter.count(), 1001);
+  EXPECT_EQ(pi2_pong_counter.count(), 1001);
+
+  EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 100);
+  EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 100);
+  EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 100);
+  EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 100);
+  EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 100);
+  EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 100);
+  EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 100);
+
+  EXPECT_EQ(pi1_server_statistics_count, 9);
+  EXPECT_EQ(pi2_server_statistics_count, 9);
+  EXPECT_EQ(pi3_server_statistics_count, 9);
+
+  EXPECT_EQ(pi1_client_statistics_count, 95);
+  EXPECT_EQ(pi2_client_statistics_count, 95);
+  EXPECT_EQ(pi3_client_statistics_count, 95);
+}
+
+// Tests that an offset between nodes can be recovered and shows up in
+// ServerStatistics correctly.
+TEST(SimulatedEventLoopTest, MultinodePingPongWithOffset) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          "aos/events/multinode_pingpong_config.json");
+  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+  const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+  NodeEventLoopFactory *pi2_factory =
+      simulated_event_loop_factory.GetNodeEventLoopFactory(pi2);
+
+  constexpr chrono::milliseconds kOffset{1501};
+  pi2_factory->SetDistributedOffset(kOffset, 1.0);
+
+  std::unique_ptr<EventLoop> ping_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  Ping ping(ping_event_loop.get());
+
+  std::unique_ptr<EventLoop> pong_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  Pong pong(pong_event_loop.get());
+
+  std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+  std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+
+  std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+
+  // Wait to let timestamp estimation start up before looking for the results.
+  simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
+
+  // Confirm the offsets are being recovered correctly.
+  int pi1_server_statistics_count = 0;
+  pi1_pong_counter_event_loop->MakeWatcher(
+      "/pi1/aos", [&pi1_server_statistics_count,
+                   kOffset](const message_bridge::ServerStatistics &stats) {
+        VLOG(1) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+        EXPECT_EQ(stats.connections()->size(), 2u);
+        for (const message_bridge::ServerConnection *connection :
+             *stats.connections()) {
+          EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+          if (connection->node()->name()->string_view() == "pi2") {
+            EXPECT_EQ(connection->monotonic_offset(),
+                      chrono::nanoseconds(kOffset).count());
+          } else if (connection->node()->name()->string_view() == "pi3") {
+            EXPECT_EQ(connection->monotonic_offset(), 0);
+          } else {
+            LOG(FATAL) << "Unknown connection";
+          }
+
+          EXPECT_TRUE(connection->has_monotonic_offset());
+        }
+        ++pi1_server_statistics_count;
+      });
+
+  int pi2_server_statistics_count = 0;
+  pi2_pong_counter_event_loop->MakeWatcher(
+      "/pi2/aos", [&pi2_server_statistics_count,
+                   kOffset](const message_bridge::ServerStatistics &stats) {
+        VLOG(1) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+        EXPECT_EQ(stats.connections()->size(), 1u);
+
+        const message_bridge::ServerConnection *connection =
+            stats.connections()->Get(0);
+        EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+        EXPECT_TRUE(connection->has_monotonic_offset());
+        EXPECT_EQ(connection->monotonic_offset(),
+                  -chrono::nanoseconds(kOffset).count());
+        ++pi2_server_statistics_count;
+      });
+
+  int pi3_server_statistics_count = 0;
+  pi3_pong_counter_event_loop->MakeWatcher(
+      "/pi3/aos", [&pi3_server_statistics_count](
+                      const message_bridge::ServerStatistics &stats) {
+        VLOG(1) << "pi3 ServerStatistics " << FlatbufferToJson(&stats);
+        EXPECT_EQ(stats.connections()->size(), 1u);
+
+        const message_bridge::ServerConnection *connection =
+            stats.connections()->Get(0);
+        EXPECT_EQ(connection->state(), message_bridge::State::CONNECTED);
+        EXPECT_TRUE(connection->has_monotonic_offset());
+        EXPECT_EQ(connection->monotonic_offset(), 0);
+        ++pi3_server_statistics_count;
+      });
+
+  simulated_event_loop_factory.RunFor(chrono::seconds(10) -
+                                      chrono::milliseconds(500) +
+                                      chrono::milliseconds(5));
+
+  EXPECT_EQ(pi1_server_statistics_count, 9);
+  EXPECT_EQ(pi2_server_statistics_count, 9);
+  EXPECT_EQ(pi3_server_statistics_count, 9);
+}
+
+// Test that disabling statistics actually disables them.
+TEST(SimulatedEventLoopTest, MultinodeWithoutStatistics) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          "aos/events/multinode_pingpong_config.json");
+  const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+  const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+  const Node *pi3 = configuration::GetNode(&config.message(), "pi3");
+
+  SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+  simulated_event_loop_factory.DisableStatistics();
+
+  std::unique_ptr<EventLoop> ping_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+  Ping ping(ping_event_loop.get());
+
+  std::unique_ptr<EventLoop> pong_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+  Pong pong(pong_event_loop.get());
+
+  std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+  MessageCounter<examples::Pong> pi2_pong_counter(
+      pi2_pong_counter_event_loop.get(), "/test");
+
+  std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
+
+  std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+      simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+
+  MessageCounter<examples::Pong> pi1_pong_counter(
+      pi1_pong_counter_event_loop.get(), "/test");
+
+  // Count timestamps.
+  MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
+      pi1_pong_counter_event_loop.get(), "/pi1/aos");
+  MessageCounter<message_bridge::Timestamp> pi1_on_pi2_timestamp_counter(
+      pi2_pong_counter_event_loop.get(), "/pi1/aos");
+  MessageCounter<message_bridge::Timestamp> pi1_on_pi3_timestamp_counter(
+      pi3_pong_counter_event_loop.get(), "/pi1/aos");
+  MessageCounter<message_bridge::Timestamp> pi2_on_pi1_timestamp_counter(
+      pi1_pong_counter_event_loop.get(), "/pi2/aos");
+  MessageCounter<message_bridge::Timestamp> pi2_on_pi2_timestamp_counter(
+      pi2_pong_counter_event_loop.get(), "/pi2/aos");
+  MessageCounter<message_bridge::Timestamp> pi3_on_pi1_timestamp_counter(
+      pi1_pong_counter_event_loop.get(), "/pi3/aos");
+  MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
+      pi3_pong_counter_event_loop.get(), "/pi3/aos");
+
+  MessageCounter<message_bridge::ServerStatistics>
+      pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
+                                    "/pi1/aos");
+  MessageCounter<message_bridge::ServerStatistics>
+      pi2_server_statistics_counter(pi2_pong_counter_event_loop.get(),
+                                    "/pi2/aos");
+  MessageCounter<message_bridge::ServerStatistics>
+      pi3_server_statistics_counter(pi3_pong_counter_event_loop.get(),
+                                    "/pi3/aos");
+
+  MessageCounter<message_bridge::ClientStatistics>
+      pi1_client_statistics_counter(pi1_pong_counter_event_loop.get(),
+                                    "/pi1/aos");
+  MessageCounter<message_bridge::ClientStatistics>
+      pi2_client_statistics_counter(pi2_pong_counter_event_loop.get(),
+                                    "/pi2/aos");
+  MessageCounter<message_bridge::ClientStatistics>
+      pi3_client_statistics_counter(pi3_pong_counter_event_loop.get(),
+                                    "/pi3/aos");
 
   simulated_event_loop_factory.RunFor(chrono::seconds(10) +
                                       chrono::milliseconds(5));
 
-  EXPECT_EQ(pi1_pong_count, 1001);
-  EXPECT_EQ(pi2_pong_count, 1001);
+  EXPECT_EQ(pi1_pong_counter.count(), 1001u);
+  EXPECT_EQ(pi2_pong_counter.count(), 1001u);
+
+  EXPECT_EQ(pi1_on_pi1_timestamp_counter.count(), 0u);
+  EXPECT_EQ(pi1_on_pi2_timestamp_counter.count(), 0u);
+  EXPECT_EQ(pi1_on_pi3_timestamp_counter.count(), 0u);
+  EXPECT_EQ(pi2_on_pi1_timestamp_counter.count(), 0u);
+  EXPECT_EQ(pi2_on_pi2_timestamp_counter.count(), 0u);
+  EXPECT_EQ(pi3_on_pi1_timestamp_counter.count(), 0u);
+  EXPECT_EQ(pi3_on_pi3_timestamp_counter.count(), 0u);
+
+  EXPECT_EQ(pi1_server_statistics_counter.count(), 0u);
+  EXPECT_EQ(pi2_server_statistics_counter.count(), 0u);
+  EXPECT_EQ(pi3_server_statistics_counter.count(), 0u);
+
+  EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
+  EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
+  EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
 }
 
 }  // namespace testing
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 0ab366a..0f6e8bc 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -17,12 +17,18 @@
                     aos::NodeEventLoopFactory *send_node_factory,
                     aos::EventLoop *send_event_loop,
                     std::unique_ptr<aos::RawFetcher> fetcher,
-                    std::unique_ptr<aos::RawSender> sender)
+                    std::unique_ptr<aos::RawSender> sender,
+                    ServerConnection *server_connection, int client_index,
+                    MessageBridgeClientStatus *client_status)
       : fetch_node_factory_(fetch_node_factory),
         send_node_factory_(send_node_factory),
         send_event_loop_(send_event_loop),
         fetcher_(std::move(fetcher)),
-        sender_(std::move(sender)) {
+        sender_(std::move(sender)),
+        server_connection_(server_connection),
+        client_status_(client_status),
+        client_index_(client_index),
+        client_connection_(client_status_->GetClientConnection(client_index)) {
     timer_ = send_event_loop_->AddTimer([this]() { Send(); });
 
     Schedule();
@@ -55,6 +61,8 @@
                    << fetcher_->context().monotonic_event_time << " now is "
                    << fetch_node_factory_->monotonic_now();
       sent_ = true;
+      server_connection_->mutate_dropped_packets(
+          server_connection_->dropped_packets() + 1);
     }
 
     if (fetcher_->context().data == nullptr) {
@@ -72,25 +80,27 @@
     CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
         << ": Trying to deliver message in the past...";
 
+    server_connection_->mutate_sent_packets(server_connection_->sent_packets() +
+                                            1);
     timer_->Setup(monotonic_delivered_time);
   }
 
  private:
   // Acutally sends the message, and reschedules.
   void Send() {
-    // Compute the time to publish this message.
-    const monotonic_clock::time_point monotonic_delivered_time =
-        DeliveredTime(fetcher_->context());
-
-    CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
-        << ": Message to be sent at the wrong time.";
-
-    // And also fill out the send times as well.
+    // Fill out the send times.
     sender_->Send(fetcher_->context().data, fetcher_->context().size,
                   fetcher_->context().monotonic_event_time,
                   fetcher_->context().realtime_event_time,
                   fetcher_->context().queue_index);
 
+    // And simulate message_bridge's offset recovery.
+    client_status_->SampleFilter(client_index_,
+                                 fetcher_->context().monotonic_event_time,
+                                 sender_->monotonic_sent_time());
+
+    client_connection_->mutate_received_packets(
+        client_connection_->received_packets() + 1);
     sent_ = true;
     Schedule();
   }
@@ -121,6 +131,11 @@
   std::unique_ptr<aos::RawSender> sender_;
   // True if we have sent the message in the fetcher.
   bool sent_ = false;
+
+  ServerConnection *server_connection_ = nullptr;
+  MessageBridgeClientStatus *client_status_ = nullptr;
+  int client_index_;
+  ClientConnection *client_connection_ = nullptr;
 };
 
 SimulatedMessageBridge::SimulatedMessageBridge(
@@ -130,14 +145,35 @@
 
   // Pre-build up event loops for every node.  They are pretty cheap anyways.
   for (const Node *node : simulated_event_loop_factory->nodes()) {
-    auto it = event_loop_map_.insert(
-        {node,
-         simulated_event_loop_factory->MakeEventLoop("message_bridge", node)});
+    auto it = event_loop_map_.emplace(std::make_pair(
+        node,
+        simulated_event_loop_factory->MakeEventLoop("message_bridge", node)));
 
     CHECK(it.second);
 
-    it.first->second->SkipTimingReport();
-    it.first->second->SkipAosLog();
+    it.first->second.event_loop->SkipTimingReport();
+    it.first->second.event_loop->SkipAosLog();
+
+    for (ServerConnection *connection :
+         it.first->second.server_status.server_connection()) {
+      if (connection == nullptr) continue;
+
+      connection->mutate_state(message_bridge::State::CONNECTED);
+    }
+
+    for (size_t i = 0;
+         i < it.first->second.client_status.mutable_client_statistics()
+                 ->mutable_connections()
+                 ->size();
+         ++i) {
+      ClientConnection *connection =
+          it.first->second.client_status.mutable_client_statistics()
+              ->mutable_connections()
+              ->GetMutableObject(i);
+      if (connection == nullptr) continue;
+
+      connection->mutate_state(message_bridge::State::CONNECTED);
+    }
   }
 
   for (const Channel *channel :
@@ -164,23 +200,48 @@
       auto destination_event_loop = event_loop_map_.find(destination_node);
       CHECK(destination_event_loop != event_loop_map_.end());
 
+      ServerConnection *server_connection =
+          source_event_loop->second.server_status.FindServerConnection(
+              connection->name()->string_view());
+
+      int client_index =
+          destination_event_loop->second.client_status.FindClientIndex(
+              channel->source_node()->string_view());
+
       delayers->emplace_back(std::make_unique<RawMessageDelayer>(
           simulated_event_loop_factory->GetNodeEventLoopFactory(node),
           simulated_event_loop_factory->GetNodeEventLoopFactory(
               destination_node),
-          destination_event_loop->second.get(),
-          source_event_loop->second->MakeRawFetcher(channel),
-          destination_event_loop->second->MakeRawSender(channel)));
+          destination_event_loop->second.event_loop.get(),
+          source_event_loop->second.event_loop->MakeRawFetcher(channel),
+          destination_event_loop->second.event_loop->MakeRawSender(channel),
+          server_connection, client_index,
+          &destination_event_loop->second.client_status));
     }
 
-    // And register every delayer to be poked when a new message shows up.
-    source_event_loop->second->MakeRawNoArgWatcher(
-        channel, [captured_delayers = delayers.get()](const Context &) {
-          for (std::unique_ptr<RawMessageDelayer> &delayer :
-               *captured_delayers) {
-            delayer->Schedule();
-          }
-        });
+    const Channel *const timestamp_channel = configuration::GetChannel(
+        simulated_event_loop_factory->configuration(), "/aos",
+        Timestamp::GetFullyQualifiedName(),
+        source_event_loop->second.event_loop->name(), node);
+
+    if (channel == timestamp_channel) {
+      source_event_loop->second.server_status.set_send_data(
+          [captured_delayers = delayers.get()](const Context &) {
+            for (std::unique_ptr<RawMessageDelayer> &delayer :
+                 *captured_delayers) {
+              delayer->Schedule();
+            }
+          });
+    } else {
+      // And register every delayer to be poked when a new message shows up.
+      source_event_loop->second.event_loop->MakeRawNoArgWatcher(
+          channel, [captured_delayers = delayers.get()](const Context &) {
+            for (std::unique_ptr<RawMessageDelayer> &delayer :
+                 *captured_delayers) {
+              delayer->Schedule();
+            }
+          });
+    }
     delayers_list_.emplace_back(std::move(delayers));
   }
 }
@@ -204,5 +265,12 @@
   }
 }
 
+void SimulatedMessageBridge::DisableStatistics() {
+  for (std::pair<const Node *const, State> &state : event_loop_map_) {
+    state.second.server_status.DisableStatistics();
+    state.second.client_status.DisableStatistics();
+  }
+}
+
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 7aeef64..5784bb3 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -3,6 +3,8 @@
 
 #include "aos/events/event_loop.h"
 #include "aos/events/simulated_event_loop.h"
+#include "aos/network/message_bridge_client_status.h"
+#include "aos/network/message_bridge_server_status.h"
 
 namespace aos {
 namespace message_bridge {
@@ -23,10 +25,27 @@
   // for things like the logger.
   void DisableForwarding(const Channel *channel);
 
+  // Disables generating and sending the messages which message_gateway sends.
+  // The messages are the ClientStatistics, ServerStatistics and Timestamp
+  // messages.
+  void DisableStatistics();
+
  private:
+  struct State {
+    State(std::unique_ptr<aos::EventLoop> &&new_event_loop)
+        : event_loop(std::move(new_event_loop)),
+          server_status(event_loop.get()),
+          client_status(event_loop.get()) {}
+
+    State(const State &state) = delete;
+
+    std::unique_ptr<aos::EventLoop> event_loop;
+    MessageBridgeServerStatus server_status;
+    MessageBridgeClientStatus client_status;
+  };
   // Map of nodes to event loops.  This is a member variable so that the
   // lifetime of the event loops matches the lifetime of the bridge.
-  std::map<const Node *, std::unique_ptr<aos::EventLoop>> event_loop_map_;
+  std::map<const Node *, State> event_loop_map_;
 
   // List of delayers used to resend the messages.
   using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;