Delay sending RemoteMessage in SimulatedNetworkBridge by network delay

In preparation for using the return timestamp on RemoteMessage for round
trip time calculation, we need to have it be accurate in simulation.
Fix part of that by fixing SimulatedNetworkBridge.

Change-Id: I3d2a24b89594bac7f320a8bf8749a9ef942dc73b
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 0b79504..17ebcbc 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -18,6 +18,7 @@
  public:
   RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
                     aos::NodeEventLoopFactory *send_node_factory,
+                    aos::EventLoop *fetch_event_loop,
                     aos::EventLoop *send_event_loop,
                     std::unique_ptr<aos::RawFetcher> fetcher,
                     std::unique_ptr<aos::RawSender> sender,
@@ -29,6 +30,7 @@
                     aos::Sender<RemoteMessage> *timestamp_logger)
       : fetch_node_factory_(fetch_node_factory),
         send_node_factory_(send_node_factory),
+        fetch_event_loop_(fetch_event_loop),
         send_event_loop_(send_event_loop),
         fetcher_(std::move(fetcher)),
         sender_(std::move(sender)),
@@ -41,6 +43,8 @@
         channel_index_(channel_index),
         timestamp_logger_(timestamp_logger) {
     timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+    timestamp_timer_ =
+        fetch_event_loop_->AddTimer([this]() { SendTimestamp(); });
 
     Schedule();
   }
@@ -137,6 +141,8 @@
         client_connection_->received_packets() + 1);
 
     if (timestamp_logger_) {
+      flatbuffers::FlatBufferBuilder fbb;
+        fbb.ForceDefaults(true);
       aos::Sender<RemoteMessage>::Builder builder =
           timestamp_logger_->MakeBuilder();
 
@@ -151,11 +157,10 @@
       }
 
       flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
-          builder.fbb()->CreateString(
+          fbb.CreateString(
               send_node_factory_->boot_uuid().string_view());
 
-      RemoteMessage::Builder message_header_builder =
-          builder.MakeBuilder<RemoteMessage>();
+      RemoteMessage::Builder message_header_builder(fbb);
 
       message_header_builder.add_channel_index(channel_index_);
 
@@ -175,13 +180,57 @@
       message_header_builder.add_queue_index(sender_->sent_queue_index());
       message_header_builder.add_boot_uuid(boot_uuid_offset);
 
-      builder.Send(message_header_builder.Finish());
+      fbb.Finish(message_header_builder.Finish());
+
+      remote_timestamps_.emplace_back(
+          FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
+          fetch_node_factory_->monotonic_now() +
+              send_node_factory_->network_delay());
+      ScheduleTimestamp();
     }
 
     sent_ = true;
     Schedule();
   }
 
+  // Schedules sending the next timestamp in remote_timestamps_ if there is one.
+  void ScheduleTimestamp() {
+    if (remote_timestamps_.empty()) {
+      timestamp_timer_->Disable();
+      return;
+    }
+
+    if (scheduled_time_ !=
+        remote_timestamps_.front().monotonic_timestamp_time) {
+      timestamp_timer_->Setup(
+          remote_timestamps_.front().monotonic_timestamp_time);
+      scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
+      return;
+    } else {
+      scheduled_time_ = monotonic_clock::min_time;
+    }
+  }
+
+  // Sends the next timestamp in remote_timestamps_.
+  void SendTimestamp() {
+    CHECK(!remote_timestamps_.empty());
+
+    // Send out all timestamps at the currently scheduled time.
+    while (remote_timestamps_.front().monotonic_timestamp_time ==
+           scheduled_time_) {
+      if (server_connection_->state() == State::CONNECTED) {
+        timestamp_logger_->Send(
+            std::move(remote_timestamps_.front().remote_message));
+      }
+      remote_timestamps_.pop_front();
+      if (remote_timestamps_.empty()) {
+        break;
+      }
+    }
+
+    ScheduleTimestamp();
+  }
+
   // Converts from time on the sending node to time on the receiving node.
   monotonic_clock::time_point DeliveredTime(const Context &context) const {
     const distributed_clock::time_point distributed_sent_time =
@@ -196,10 +245,18 @@
   aos::NodeEventLoopFactory *fetch_node_factory_;
   aos::NodeEventLoopFactory *send_node_factory_;
 
+  // Event loop which fetching and sending timestamps are scheduled on.
+  aos::EventLoop *fetch_event_loop_;
   // Event loop which sending is scheduled on.
   aos::EventLoop *send_event_loop_;
   // Timer used to send.
   aos::TimerHandler *timer_;
+  // Timer used to send timestamps out.
+  aos::TimerHandler *timestamp_timer_;
+  // Time that the timer is scheduled for.  Used to track if it needs to be
+  // rescheduled.
+  monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
+
   // Fetcher used to receive messages.
   std::unique_ptr<aos::RawFetcher> fetcher_;
   // Sender to send them back out.
@@ -217,6 +274,17 @@
 
   size_t channel_index_;
   aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
+
+  struct Timestamp {
+    Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
+              monotonic_clock::time_point new_monotonic_timestamp_time)
+        : remote_message(std::move(new_remote_message)),
+          monotonic_timestamp_time(new_monotonic_timestamp_time) {}
+    FlatbufferDetachedBuffer<RemoteMessage> remote_message;
+    monotonic_clock::time_point monotonic_timestamp_time;
+  };
+
+  std::deque<Timestamp> remote_timestamps_;
 };
 
 SimulatedMessageBridge::SimulatedMessageBridge(
@@ -322,6 +390,7 @@
           simulated_event_loop_factory->GetNodeEventLoopFactory(node),
           simulated_event_loop_factory->GetNodeEventLoopFactory(
               destination_node),
+          source_event_loop->second.event_loop.get(),
           destination_event_loop->second.event_loop.get(),
           source_event_loop->second.event_loop->MakeRawFetcher(channel),
           destination_event_loop->second.event_loop->MakeRawSender(channel),