Merge "Add drivetrain position plot to drivetrain plotter"
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 59f478c..00322e1 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -530,6 +530,9 @@
   if (m.realtime_remote_time != realtime_clock::min_time) {
     os << ", .realtime_remote_time=" << m.realtime_remote_time;
   }
+  if (m.monotonic_timestamp_time != monotonic_clock::min_time) {
+    os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+  }
   if (m.data.Verify()) {
     os << ", .data="
        << aos::FlatbufferToJson(m.data,
@@ -666,9 +669,19 @@
       oldest = m;
       current_ = &parts_sorter;
     } else if (*m == *oldest) {
-      // Found a duplicate.  It doesn't matter which one we return.  It is
-      // easiest to just drop the new one.
-      parts_sorter.PopFront();
+      // Found a duplicate.  If there is a choice, we want the one which has the
+      // timestamp time.
+      if (!m->data.message().has_monotonic_timestamp_time()) {
+        parts_sorter.PopFront();
+      } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
+        current_->PopFront();
+        current_ = &parts_sorter;
+        oldest = m;
+      } else {
+        CHECK_EQ(m->data.message().monotonic_timestamp_time(),
+                 oldest->data.message().monotonic_timestamp_time());
+        parts_sorter.PopFront();
+      }
     }
 
     // PopFront may change this, so compute it down here.
@@ -702,6 +715,7 @@
                .remote_queue_index = 0xffffffff,
                .monotonic_remote_time = monotonic_clock::min_time,
                .realtime_remote_time = realtime_clock::min_time,
+               .monotonic_timestamp_time = monotonic_clock::min_time,
                .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()} {
   for (const LogParts *part : node_merger_.Parts()) {
     if (!configuration_) {
@@ -770,6 +784,7 @@
       .remote_queue_index = 0xffffffff,
       .monotonic_remote_time = monotonic_clock::min_time,
       .realtime_remote_time = realtime_clock::min_time,
+      .monotonic_timestamp_time = monotonic_clock::min_time,
       .data = std::move(m->data)};
 }
 
@@ -843,6 +858,9 @@
                 m->data.message().monotonic_remote_time())),
         .realtime_remote_time = realtime_clock::time_point(
             std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
+        .monotonic_timestamp_time =
+            monotonic_clock::time_point(std::chrono::nanoseconds(
+                m->data.message().monotonic_timestamp_time())),
         .data = std::move(data.data)};
     CHECK_GE(message_.monotonic_event_time, last_message_time_);
     last_message_time_ = message_.monotonic_event_time;
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 088ce73..94525cf 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -358,6 +358,9 @@
   monotonic_clock::time_point monotonic_remote_time = monotonic_clock::min_time;
   realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
 
+  monotonic_clock::time_point monotonic_timestamp_time =
+      monotonic_clock::min_time;
+
   SizePrefixedFlatbufferVector<MessageHeader> data;
 };
 
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 7795e66..979821e 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -376,6 +376,22 @@
   "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
   "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
   "parts_index": 0
+})")),
+        config4_(MakeHeader(config_,
+                            R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 2000000,
+  "realtime_start_time": 1000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
+  "parts_index": 0
 })")) {
     unlink(logfile0_.c_str());
     unlink(logfile1_.c_str());
@@ -415,25 +431,45 @@
 
   flatbuffers::DetachedBuffer MakeTimestampMessage(
       const aos::monotonic_clock::time_point sender_monotonic_now,
-      int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
-    aos::Context context;
-    context.monotonic_remote_time = sender_monotonic_now;
-    context.realtime_remote_time = aos::realtime_clock::epoch() +
-                                   chrono::seconds(1000) +
-                                   sender_monotonic_now.time_since_epoch();
-    context.remote_queue_index = queue_index_[channel_index] - 1;
-    context.monotonic_event_time =
+      int channel_index, chrono::nanoseconds receiver_monotonic_offset,
+      monotonic_clock::time_point monotonic_timestamp_time =
+          monotonic_clock::min_time) {
+    const monotonic_clock::time_point monotonic_sent_time =
         sender_monotonic_now + receiver_monotonic_offset;
-    context.realtime_event_time =
-        aos::realtime_clock::epoch() + chrono::seconds(1000) +
-        context.monotonic_event_time.time_since_epoch();
-    context.queue_index = queue_index_[channel_index] - 1 + 100;
-    context.size = 0;
-    context.data = nullptr;
 
     flatbuffers::FlatBufferBuilder fbb;
-    fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
-                                       LogType::kLogDeliveryTimeOnly));
+    fbb.ForceDefaults(true);
+
+    logger::MessageHeader::Builder message_header_builder(fbb);
+
+    message_header_builder.add_channel_index(channel_index);
+
+    message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
+                                           100);
+    message_header_builder.add_monotonic_sent_time(
+        monotonic_sent_time.time_since_epoch().count());
+    message_header_builder.add_realtime_sent_time(
+        (aos::realtime_clock::epoch() + chrono::seconds(1000) +
+         monotonic_sent_time.time_since_epoch())
+            .time_since_epoch()
+            .count());
+
+    message_header_builder.add_monotonic_remote_time(
+        sender_monotonic_now.time_since_epoch().count());
+    message_header_builder.add_realtime_remote_time(
+        (aos::realtime_clock::epoch() + chrono::seconds(1000) +
+         sender_monotonic_now.time_since_epoch())
+            .time_since_epoch()
+            .count());
+    message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
+                                                  1);
+
+    if (monotonic_timestamp_time != monotonic_clock::min_time) {
+      message_header_builder.add_monotonic_timestamp_time(
+          monotonic_timestamp_time.time_since_epoch().count());
+    }
+
+    fbb.FinishSizePrefixed(message_header_builder.Finish());
     LOG(INFO) << aos::FlatbufferToJson(
         aos::SizePrefixedFlatbufferSpan<MessageHeader>(
             absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
@@ -450,6 +486,7 @@
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
 
   std::vector<uint32_t> queue_index_;
 };
@@ -676,6 +713,78 @@
   EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
 }
 
+// Tests that we can merge timestamps with various combinations of
+// monotonic_timestamp_time.
+TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config1_.span());
+
+    // Neither has it.
+    MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
+    writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    // First only has it.
+    MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
+    writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1001), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(971)));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
+
+    // Second only has it.
+    MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
+    writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1002), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(972)));
+
+    // Both have it.
+    MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
+    writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1003), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(973)));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1003), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(973)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  ASSERT_EQ(parts.size(), 1u);
+
+  NodeMerger merger(FilterPartsForNode(parts, "pi1"));
+
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+
+  std::deque<Message> output;
+
+  for (int i = 0; i < 4; ++i) {
+    ASSERT_TRUE(merger.Front() != nullptr);
+    output.emplace_back(std::move(*merger.Front()));
+    merger.PopFront();
+  }
+  ASSERT_TRUE(merger.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(101000));
+  EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
+  EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(101001));
+  EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
+  EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
+  EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(101002));
+  EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
+  EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(101003));
+  EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
+  EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
+}
+
 // Tests that we can match timestamps on delivered messages.
 TEST_F(TimestampMapperTest, ReadNode0First) {
   const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
@@ -775,6 +884,99 @@
   }
 }
 
+// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
+// returned.
+TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config4_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(971)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(5458)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  for (const auto &p : parts) {
+    LOG(INFO) << p;
+  }
+
+  ASSERT_EQ(parts.size(), 1u);
+
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    for (int i = 0; i < 3; ++i) {
+      ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
+      output0.emplace_back(std::move(*mapper0.Front()));
+      mapper0.PopFront();
+    }
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_timestamp_time, monotonic_clock::min_time);
+    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_timestamp_time, monotonic_clock::min_time);
+    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_timestamp_time, monotonic_clock::min_time);
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    for (int i = 0; i < 3; ++i) {
+      ASSERT_TRUE(mapper1.Front() != nullptr);
+      output1.emplace_back(std::move(*mapper1.Front()));
+      mapper1.PopFront();
+    }
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(output1[0].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_timestamp_time,
+              e + chrono::nanoseconds(971));
+    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_EQ(output1[1].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_timestamp_time,
+              e + chrono::nanoseconds(5458));
+    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_EQ(output1[2].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_timestamp_time, monotonic_clock::min_time);
+    EXPECT_TRUE(output1[2].data.Verify());
+  }
+}
+
 // Tests that we can match timestamps on delivered messages.  By doing this in
 // the reverse order, the second node needs to queue data up from the first node
 // to find the matching timestamp.
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 91cf7dc..9da790a 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -874,6 +874,11 @@
         message_header_builder.add_remote_queue_index(
             msg->remote_queue_index());
 
+        message_header_builder.add_monotonic_timestamp_time(
+            f.fetcher->context()
+                .monotonic_event_time.time_since_epoch()
+                .count());
+
         fbb.FinishSizePrefixed(message_header_builder.Finish());
         const auto end = event_loop_->monotonic_now();
         RecordCreateMessageTime(start, end, &f);
@@ -1278,7 +1283,7 @@
         logged_configuration()->channels()->Get(logged_channel_index));
 
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
-    aos::Sender<RemoteMessage> *remote_timestamp_sender = nullptr;
+    RemoteMessageSender *remote_timestamp_sender = nullptr;
 
     State *source_state = nullptr;
 
@@ -1861,7 +1866,7 @@
     size_t logged_channel_index, size_t factory_channel_index,
     std::unique_ptr<RawSender> sender,
     message_bridge::NoncausalOffsetEstimator *filter,
-    aos::Sender<RemoteMessage> *remote_timestamp_sender, State *source_state) {
+    RemoteMessageSender *remote_timestamp_sender, State *source_state) {
   channels_[logged_channel_index] = std::move(sender);
   filters_[logged_channel_index] = filter;
   remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
@@ -1949,15 +1954,12 @@
         timestamp);
   } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
              nullptr) {
-    aos::Sender<RemoteMessage>::Builder builder =
-        remote_timestamp_senders_[timestamped_message.channel_index]
-            ->MakeBuilder();
-
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.ForceDefaults(true);
     flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
-        builder.fbb()->CreateString(event_loop_->boot_uuid().string_view());
+        fbb.CreateString(event_loop_->boot_uuid().string_view());
 
-    RemoteMessage::Builder message_header_builder =
-        builder.MakeBuilder<RemoteMessage>();
+    RemoteMessage::Builder message_header_builder(fbb);
 
     message_header_builder.add_channel_index(
         factory_channel_index_[timestamped_message.channel_index]);
@@ -1978,27 +1980,90 @@
     message_header_builder.add_remote_queue_index(remote_queue_index);
     message_header_builder.add_boot_uuid(boot_uuid_offset);
 
-    builder.Send(message_header_builder.Finish());
+    fbb.Finish(message_header_builder.Finish());
+
+    remote_timestamp_senders_[timestamped_message.channel_index]->Send(
+        FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
+        timestamped_message.monotonic_timestamp_time);
   }
 
   return true;
 }
 
-aos::Sender<RemoteMessage> *LogReader::State::RemoteTimestampSender(
+LogReader::RemoteMessageSender::RemoteMessageSender(
+    aos::Sender<message_bridge::RemoteMessage> sender, EventLoop *event_loop)
+    : event_loop_(event_loop),
+      sender_(std::move(sender)),
+      timer_(event_loop->AddTimer([this]() { SendTimestamp(); })) {}
+
+void LogReader::RemoteMessageSender::ScheduleTimestamp() {
+  if (remote_timestamps_.empty()) {
+    CHECK_NOTNULL(timer_);
+    timer_->Disable();
+    scheduled_time_ = monotonic_clock::min_time;
+    return;
+  }
+
+  if (scheduled_time_ != remote_timestamps_.front().monotonic_timestamp_time) {
+    CHECK_NOTNULL(timer_);
+    timer_->Setup(
+        remote_timestamps_.front().monotonic_timestamp_time);
+    scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
+  }
+}
+
+void LogReader::RemoteMessageSender::Send(
+    FlatbufferDetachedBuffer<RemoteMessage> remote_message,
+    monotonic_clock::time_point monotonic_timestamp_time) {
+  // There are 2 cases.  Either we have a monotonic_timestamp_time and need to
+  // resend the timestamp at the correct time, or we don't and can send it
+  // immediately.
+  if (monotonic_timestamp_time == monotonic_clock::min_time) {
+    CHECK(remote_timestamps_.empty())
+        << ": Unsupported mix of timestamps and no timestamps.";
+    sender_.Send(std::move(remote_message));
+  } else {
+    remote_timestamps_.emplace_back(std::move(remote_message),
+                                    monotonic_timestamp_time);
+    ScheduleTimestamp();
+  }
+}
+
+void LogReader::RemoteMessageSender::SendTimestamp() {
+  CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_);
+  CHECK(!remote_timestamps_.empty());
+
+  // Send out all timestamps at the currently scheduled time.
+  while (remote_timestamps_.front().monotonic_timestamp_time ==
+         scheduled_time_) {
+    sender_.Send(std::move(remote_timestamps_.front().remote_message));
+    remote_timestamps_.pop_front();
+    if (remote_timestamps_.empty()) {
+      break;
+    }
+  }
+  scheduled_time_ = monotonic_clock::min_time;
+
+  ScheduleTimestamp();
+}
+
+LogReader::RemoteMessageSender *LogReader::State::RemoteTimestampSender(
     const Node *delivered_node) {
   auto sender = remote_timestamp_senders_map_.find(delivered_node);
 
   if (sender == remote_timestamp_senders_map_.end()) {
-    sender = remote_timestamp_senders_map_
-                 .emplace(std::make_pair(
-                     delivered_node,
-                     event_loop()->MakeSender<RemoteMessage>(
-                         absl::StrCat("/aos/remote_timestamps/",
-                                      delivered_node->name()->string_view()))))
-                 .first;
+    sender =
+        remote_timestamp_senders_map_
+            .emplace(delivered_node,
+                     std::make_unique<RemoteMessageSender>(
+                         event_loop()->MakeSender<RemoteMessage>(absl::StrCat(
+                             "/aos/remote_timestamps/",
+                             delivered_node->name()->string_view())),
+                         event_loop()))
+            .first;
   }
 
-  return &(sender->second);
+  return sender->second.get();
 }
 
 const TimestampedMessage &LogReader::State::PeekOldest() {
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 33702e0..2896963 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -124,6 +124,10 @@
   realtime_remote_time:int64 = -9223372036854775808 (id: 6);
   // Queue index of this message on the remote node.
   remote_queue_index:uint32 = 4294967295 (id: 7);
+
+  // Time this timestamp was received on the monotonic clock of the logger node
+  // in nanoseconds.
+  monotonic_timestamp_time:int64 = -9223372036854775808 (id: 8);
 }
 
 root_type MessageHeader;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e71f31d..e3f5380 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -491,6 +491,49 @@
 
   const std::vector<LogFile> log_files_;
 
+  // Class to manage sending RemoteMessages on the provided node after the
+  // correct delay.
+  class RemoteMessageSender{
+   public:
+    RemoteMessageSender(aos::Sender<message_bridge::RemoteMessage> sender,
+                        EventLoop *event_loop);
+    RemoteMessageSender(RemoteMessageSender const &) = delete;
+    RemoteMessageSender &operator=(RemoteMessageSender const &) = delete;
+
+    // Sends the provided message.  If monotonic_timestamp_time is min_time,
+    // send it immediately.
+    void Send(
+        FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
+        monotonic_clock::time_point monotonic_timestamp_time);
+
+   private:
+    // Handles actually sending the timestamp if we were delayed.
+    void SendTimestamp();
+    // Handles scheduling the timer to send at the correct time.
+    void ScheduleTimestamp();
+
+    EventLoop *event_loop_;
+    aos::Sender<message_bridge::RemoteMessage> sender_;
+    aos::TimerHandler *timer_;
+
+    // Time we are scheduled for, or min_time if we aren't scheduled.
+    monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
+
+    struct Timestamp {
+      Timestamp(FlatbufferDetachedBuffer<message_bridge::RemoteMessage>
+                    new_remote_message,
+                monotonic_clock::time_point new_monotonic_timestamp_time)
+          : remote_message(std::move(new_remote_message)),
+            monotonic_timestamp_time(new_monotonic_timestamp_time) {}
+      FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message;
+      monotonic_clock::time_point monotonic_timestamp_time;
+    };
+
+    // List of messages to send. The timer works through them and then disables
+    // itself automatically.
+    std::deque<Timestamp> remote_timestamps_;
+  };
+
   // State per node.
   class State {
    public:
@@ -545,8 +588,7 @@
 
     // Returns the MessageHeader sender to log delivery timestamps to for the
     // provided remote node.
-    aos::Sender<message_bridge::RemoteMessage> *RemoteTimestampSender(
-        const Node *delivered_node);
+    RemoteMessageSender *RemoteTimestampSender(const Node *delivered_node);
 
     // Converts a timestamp from the monotonic clock on this node to the
     // distributed clock.
@@ -581,12 +623,11 @@
     void SetChannelCount(size_t count);
 
     // Sets the sender, filter, and target factory for a channel.
-    void SetChannel(
-        size_t logged_channel_index, size_t factory_channel_index,
-        std::unique_ptr<RawSender> sender,
-        message_bridge::NoncausalOffsetEstimator *filter,
-        aos::Sender<message_bridge::RemoteMessage> *remote_timestamp_sender,
-        State *source_state);
+    void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+                    std::unique_ptr<RawSender> sender,
+                    message_bridge::NoncausalOffsetEstimator *filter,
+                    RemoteMessageSender *remote_timestamp_sender,
+                    State *source_state);
 
     // Returns if we have read all the messages from all the logs.
     bool at_end() const {
@@ -642,8 +683,7 @@
 
     // Senders.
     std::vector<std::unique_ptr<RawSender>> channels_;
-    std::vector<aos::Sender<message_bridge::RemoteMessage> *>
-        remote_timestamp_senders_;
+    std::vector<RemoteMessageSender *> remote_timestamp_senders_;
     // The mapping from logged channel index to sent channel index.  Needed for
     // sending out MessageHeaders.
     std::vector<int> factory_channel_index_;
@@ -684,7 +724,7 @@
     // channel) which correspond to the originating node.
     std::vector<State *> channel_source_state_;
 
-    std::map<const Node *, aos::Sender<message_bridge::RemoteMessage>>
+    std::map<const Node *, std::unique_ptr<RemoteMessageSender>>
         remote_timestamp_senders_map_;
   };
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 2b5d6ca..05bafe4 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -26,6 +26,8 @@
 using aos::message_bridge::RemoteMessage;
 using aos::testing::MessageCounter;
 
+constexpr std::string_view kSingleConfigSha1(
+    "bc8c9c2e31589eae6f0e36d766f6a437643e861d9568b7483106841cf7504dea");
 constexpr std::string_view kConfigSha1(
     "0000c81e444ac470b8d29fb864621ae93a0e294a7e90c0dc4840d0f0d40fd72e");
 
@@ -60,8 +62,11 @@
 TEST_F(LoggerTest, Starts) {
   const ::std::string tmpdir = aos::testing::TestTmpDir();
   const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string config =
+      absl::StrCat(base_name, kSingleConfigSha1, ".bfbs");
   const ::std::string logfile = base_name + ".part0.bfbs";
   // Remove it.
+  unlink(config.c_str());
   unlink(logfile.c_str());
 
   LOG(INFO) << "Logging data to " << logfile;
@@ -121,11 +126,17 @@
 TEST_F(LoggerDeathTest, ExtraStart) {
   const ::std::string tmpdir = aos::testing::TestTmpDir();
   const ::std::string base_name1 = tmpdir + "/logfile1";
+  const ::std::string config1 =
+      absl::StrCat(base_name1, kSingleConfigSha1, ".bfbs");
   const ::std::string logfile1 = base_name1 + ".part0.bfbs";
   const ::std::string base_name2 = tmpdir + "/logfile2";
+  const ::std::string config2 =
+      absl::StrCat(base_name2, kSingleConfigSha1, ".bfbs");
   const ::std::string logfile2 = base_name2 + ".part0.bfbs";
   unlink(logfile1.c_str());
+  unlink(config1.c_str());
   unlink(logfile2.c_str());
+  unlink(config2.c_str());
 
   LOG(INFO) << "Logging data to " << logfile1 << " then " << logfile2;
 
@@ -153,8 +164,11 @@
 TEST_F(LoggerDeathTest, ExtraStop) {
   const ::std::string tmpdir = aos::testing::TestTmpDir();
   const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string config =
+      absl::StrCat(base_name, kSingleConfigSha1, ".bfbs");
   const ::std::string logfile = base_name + ".part0.bfbs";
   // Remove it.
+  unlink(config.c_str());
   unlink(logfile.c_str());
 
   LOG(INFO) << "Logging data to " << logfile;
@@ -183,11 +197,17 @@
 TEST_F(LoggerTest, StartsTwice) {
   const ::std::string tmpdir = aos::testing::TestTmpDir();
   const ::std::string base_name1 = tmpdir + "/logfile1";
+  const ::std::string config1 =
+      absl::StrCat(base_name1, kSingleConfigSha1, ".bfbs");
   const ::std::string logfile1 = base_name1 + ".part0.bfbs";
   const ::std::string base_name2 = tmpdir + "/logfile2";
+  const ::std::string config2 =
+      absl::StrCat(base_name2, kSingleConfigSha1, ".bfbs");
   const ::std::string logfile2 = base_name2 + ".part0.bfbs";
   unlink(logfile1.c_str());
+  unlink(config1.c_str());
   unlink(logfile2.c_str());
+  unlink(config2.c_str());
 
   LOG(INFO) << "Logging data to " << logfile1 << " then " << logfile2;
 
@@ -247,9 +267,12 @@
 TEST_F(LoggerTest, RotatedLogFile) {
   const ::std::string tmpdir = aos::testing::TestTmpDir();
   const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string config =
+      absl::StrCat(base_name, kSingleConfigSha1, ".bfbs");
   const ::std::string logfile0 = base_name + ".part0.bfbs";
   const ::std::string logfile1 = base_name + ".part1.bfbs";
   // Remove it.
+  unlink(config.c_str());
   unlink(logfile0.c_str());
   unlink(logfile1.c_str());
 
@@ -329,8 +352,11 @@
 TEST_F(LoggerTest, ManyMessages) {
   const ::std::string tmpdir = aos::testing::TestTmpDir();
   const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string config =
+      absl::StrCat(base_name, kSingleConfigSha1, ".bfbs");
   const ::std::string logfile = base_name + ".part0.bfbs";
   // Remove the log file.
+  unlink(config.c_str());
   unlink(logfile.c_str());
 
   LOG(INFO) << "Logging data to " << logfile;
@@ -369,7 +395,8 @@
   }
 }
 
-std::vector<std::string> MakeLogFiles(std::string logfile_base1, std::string logfile_base2) {
+std::vector<std::string> MakeLogFiles(std::string logfile_base1,
+                                      std::string logfile_base2) {
   return std::vector<std::string>(
       {logfile_base1 + "_pi1_data.part0.bfbs",
        logfile_base1 + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
@@ -453,7 +480,8 @@
       unlink((file + ".xz").c_str());
     }
 
-    for (const auto file : MakeLogFiles("relogged1", "relogged2")) {
+    for (const auto file :
+         MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2")) {
       unlink(file.c_str());
     }
 
@@ -632,8 +660,7 @@
 // Counts the number of messages on a channel.  Returns (channel name, channel
 // type, count) for every message matching matcher()
 std::vector<std::tuple<std::string, std::string, int>> CountChannelsMatching(
-    std::shared_ptr<const aos::Configuration> config,
-    std::string_view filename,
+    std::shared_ptr<const aos::Configuration> config, std::string_view filename,
     std::function<bool(const MessageHeader *)> matcher) {
   MessageReader message_reader(filename);
   std::vector<int> counts(config->channels()->size(), 0);
@@ -788,9 +815,11 @@
         << " : " << logfiles_[1];
 
     // No timestamps
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[1]), UnorderedElementsAre())
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[1]),
+                UnorderedElementsAre())
         << " : " << logfiles_[1];
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]), UnorderedElementsAre())
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
+                UnorderedElementsAre())
         << " : " << logfiles_[2];
 
     // Timing reports and pongs.
@@ -841,13 +870,17 @@
 
     // And then test that the remotely logged timestamp data files only have
     // timestamps in them.
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]), UnorderedElementsAre())
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
+                UnorderedElementsAre())
         << " : " << logfiles_[8];
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]), UnorderedElementsAre())
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
+                UnorderedElementsAre())
         << " : " << logfiles_[9];
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]), UnorderedElementsAre())
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
+                UnorderedElementsAre())
         << " : " << logfiles_[10];
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]), UnorderedElementsAre())
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
+                UnorderedElementsAre())
         << " : " << logfiles_[11];
 
     EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
@@ -1557,12 +1590,14 @@
   const size_t pong_timestamp_channel = configuration::ChannelIndex(
       pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
 
+  const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
+
   pi1_event_loop->MakeWatcher(
       "/aos/remote_timestamps/pi2",
       [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
        ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
        &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
-       &ping_on_pi2_fetcher](const RemoteMessage &header) {
+       &ping_on_pi2_fetcher, network_delay](const RemoteMessage &header) {
         const aos::monotonic_clock::time_point header_monotonic_sent_time(
             chrono::nanoseconds(header.monotonic_sent_time()));
         const aos::realtime_clock::time_point header_realtime_sent_time(
@@ -1612,13 +1647,19 @@
                   header_realtime_remote_time);
         EXPECT_EQ(pi1_context->monotonic_event_time,
                   header_monotonic_remote_time);
+
+        EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
+                  pi2_context->monotonic_event_time +
+                      (pi1_event_loop->monotonic_now() -
+                       pi2_event_loop->monotonic_now()) +
+                      network_delay);
       });
   pi2_event_loop->MakeWatcher(
       "/aos/remote_timestamps/pi1",
       [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
        pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
        &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
-       &pong_on_pi1_fetcher](const RemoteMessage &header) {
+       &pong_on_pi1_fetcher, network_delay](const RemoteMessage &header) {
         const aos::monotonic_clock::time_point header_monotonic_sent_time(
             chrono::nanoseconds(header.monotonic_sent_time()));
         const aos::realtime_clock::time_point header_realtime_sent_time(
@@ -1668,6 +1709,12 @@
                   header_realtime_remote_time);
         EXPECT_EQ(pi2_context->monotonic_event_time,
                   header_monotonic_remote_time);
+
+        EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
+                  pi1_context->monotonic_event_time +
+                      (pi2_event_loop->monotonic_now() -
+                       pi1_event_loop->monotonic_now()) +
+                      network_delay);
       });
 
   // And confirm we can re-create a log again, while checking the contents.
@@ -1679,8 +1726,8 @@
         configuration::GetNode(log_reader_factory.configuration(), pi2_),
         &log_reader_factory);
 
-    StartLogger(&pi1_logger, "relogged1");
-    StartLogger(&pi2_logger, "relogged2");
+    StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
+    StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
 
     log_reader_factory.Run();
   }
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 6aedae7..18e7cc9 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -1321,11 +1321,15 @@
   std::unique_ptr<EventLoop> pi1_remote_timestamp =
       simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
 
+  const chrono::nanoseconds network_delay =
+      simulated_event_loop_factory.network_delay();
+
   int reliable_timestamp_count = 0;
   pi1_remote_timestamp->MakeWatcher(
       "/pi1/aos/remote_timestamps/pi2",
       [reliable_channel_index, &reliable_timestamp_count,
-       &simulated_event_loop_factory, pi2](const RemoteMessage &header) {
+       &simulated_event_loop_factory, pi2, network_delay, &pi2_pong_event_loop,
+       &pi1_remote_timestamp](const RemoteMessage &header) {
         EXPECT_TRUE(header.has_boot_uuid());
         EXPECT_EQ(header.boot_uuid()->string_view(),
                   simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
@@ -1335,6 +1339,14 @@
         if (header.channel_index() == reliable_channel_index) {
           ++reliable_timestamp_count;
         }
+
+        const aos::monotonic_clock::time_point header_monotonic_sent_time(
+            chrono::nanoseconds(header.monotonic_sent_time()));
+
+        EXPECT_EQ(pi1_remote_timestamp->context().monotonic_event_time,
+                  header_monotonic_sent_time + network_delay +
+                      (pi1_remote_timestamp->monotonic_now() -
+                       pi2_pong_event_loop->monotonic_now()));
       });
 
   // Wait to let timestamp estimation start up before looking for the results.
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 0b79504..17ebcbc 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -18,6 +18,7 @@
  public:
   RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
                     aos::NodeEventLoopFactory *send_node_factory,
+                    aos::EventLoop *fetch_event_loop,
                     aos::EventLoop *send_event_loop,
                     std::unique_ptr<aos::RawFetcher> fetcher,
                     std::unique_ptr<aos::RawSender> sender,
@@ -29,6 +30,7 @@
                     aos::Sender<RemoteMessage> *timestamp_logger)
       : fetch_node_factory_(fetch_node_factory),
         send_node_factory_(send_node_factory),
+        fetch_event_loop_(fetch_event_loop),
         send_event_loop_(send_event_loop),
         fetcher_(std::move(fetcher)),
         sender_(std::move(sender)),
@@ -41,6 +43,8 @@
         channel_index_(channel_index),
         timestamp_logger_(timestamp_logger) {
     timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+    timestamp_timer_ =
+        fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
 
     Schedule();
   }
@@ -137,6 +141,8 @@
         client_connection_->received_packets() + 1);
 
     if (timestamp_logger_) {
+      flatbuffers::FlatBufferBuilder fbb;
+        fbb.ForceDefaults(true);
       aos::Sender<RemoteMessage>::Builder builder =
           timestamp_logger_->MakeBuilder();
 
@@ -151,11 +157,10 @@
       }
 
       flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
-          builder.fbb()->CreateString(
+          fbb.CreateString(
               send_node_factory_->boot_uuid().string_view());
 
-      RemoteMessage::Builder message_header_builder =
-          builder.MakeBuilder<RemoteMessage>();
+      RemoteMessage::Builder message_header_builder(fbb);
 
       message_header_builder.add_channel_index(channel_index_);
 
@@ -175,13 +180,57 @@
       message_header_builder.add_queue_index(sender_->sent_queue_index());
       message_header_builder.add_boot_uuid(boot_uuid_offset);
 
-      builder.Send(message_header_builder.Finish());
+      fbb.Finish(message_header_builder.Finish());
+
+      remote_timestamps_.emplace_back(
+          FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
+          fetch_node_factory_->monotonic_now() +
+              send_node_factory_->network_delay());
+      ScheduleTimestamp();
     }
 
     sent_ = true;
     Schedule();
   }
 
+  // Schedules sending the next timestamp in remote_timestamps_ if there is one.
+  void ScheduleTimestamp() {
+    if (remote_timestamps_.empty()) {
+      timestamp_timer_->Disable();
+      return;
+    }
+
+    if (scheduled_time_ !=
+        remote_timestamps_.front().monotonic_timestamp_time) {
+      timestamp_timer_->Setup(
+          remote_timestamps_.front().monotonic_timestamp_time);
+      scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
+      return;
+    } else {
+      scheduled_time_ = monotonic_clock::min_time;
+    }
+  }
+
+  // Sends the next timestamp in remote_timestamps_.
+  void SendTimestamp() {
+    CHECK(!remote_timestamps_.empty());
+
+    // Send out all timestamps at the currently scheduled time.
+    while (remote_timestamps_.front().monotonic_timestamp_time ==
+           scheduled_time_) {
+      if (server_connection_->state() == State::CONNECTED) {
+        timestamp_logger_->Send(
+            std::move(remote_timestamps_.front().remote_message));
+      }
+      remote_timestamps_.pop_front();
+      if (remote_timestamps_.empty()) {
+        break;
+      }
+    }
+
+    ScheduleTimestamp();
+  }
+
   // Converts from time on the sending node to time on the receiving node.
   monotonic_clock::time_point DeliveredTime(const Context &context) const {
     const distributed_clock::time_point distributed_sent_time =
@@ -196,10 +245,18 @@
   aos::NodeEventLoopFactory *fetch_node_factory_;
   aos::NodeEventLoopFactory *send_node_factory_;
 
+  // Event loop which fetching and sending timestamps are scheduled on.
+  aos::EventLoop *fetch_event_loop_;
   // Event loop which sending is scheduled on.
   aos::EventLoop *send_event_loop_;
   // Timer used to send.
   aos::TimerHandler *timer_;
+  // Timer used to send timestamps out.
+  aos::TimerHandler *timestamp_timer_;
+  // Time that the timer is scheduled for.  Used to track if it needs to be
+  // rescheduled.
+  monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
+
   // Fetcher used to receive messages.
   std::unique_ptr<aos::RawFetcher> fetcher_;
   // Sender to send them back out.
@@ -217,6 +274,17 @@
 
   size_t channel_index_;
   aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
+
+  struct Timestamp {
+    Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
+              monotonic_clock::time_point new_monotonic_timestamp_time)
+        : remote_message(std::move(new_remote_message)),
+          monotonic_timestamp_time(new_monotonic_timestamp_time) {}
+    FlatbufferDetachedBuffer<RemoteMessage> remote_message;
+    monotonic_clock::time_point monotonic_timestamp_time;
+  };
+
+  std::deque<Timestamp> remote_timestamps_;
 };
 
 SimulatedMessageBridge::SimulatedMessageBridge(
@@ -322,6 +390,7 @@
           simulated_event_loop_factory->GetNodeEventLoopFactory(node),
           simulated_event_loop_factory->GetNodeEventLoopFactory(
               destination_node),
+          source_event_loop->second.event_loop.get(),
           destination_event_loop->second.event_loop.get(),
           source_event_loop->second.event_loop->MakeRawFetcher(channel),
           destination_event_loop->second.event_loop->MakeRawSender(channel),
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index ba6ae80..e8520cf 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -61,9 +61,11 @@
   statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
   statistics_timer_->set_name("statistics");
   event_loop_->OnRun([this]() {
-    statistics_timer_->Setup(
-        event_loop_->monotonic_now() + chrono::milliseconds(100),
-        chrono::milliseconds(100));
+    if (send_) {
+      statistics_timer_->Setup(
+          event_loop_->monotonic_now() + chrono::milliseconds(100),
+          chrono::milliseconds(100));
+    }
   });
 }
 
@@ -163,7 +165,11 @@
   filter->Sample(monotonic_delivered_time, offset);
 }
 
-void MessageBridgeClientStatus::DisableStatistics() { send_ = false; }
+void MessageBridgeClientStatus::DisableStatistics() {
+  statistics_timer_->Disable();
+  // TODO(austin): Re-arm when re-enabled.
+  send_ = false;
+}
 
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index dd7865b..6e32a95 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -123,8 +123,10 @@
 
   statistics_timer_ = event_loop_->AddTimer([this]() { Tick(); });
   event_loop_->OnRun([this]() {
-    statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
-                             kPingPeriod);
+    if (send_) {
+      statistics_timer_->Setup(event_loop_->monotonic_now() + kPingPeriod,
+                               kPingPeriod);
+    }
   });
 }
 
@@ -358,6 +360,7 @@
 
 void MessageBridgeServerStatus::DisableStatistics() {
   send_ = false;
+  statistics_timer_->Disable();
 }
 
 }  // namespace message_bridge