Publish recreated RemoteMessages delayed from LogReader

This makes the new log -> replay -> logger loop fully correct, and
preserves the timestamp.  Note: replaying log files from before this
time will cause RemoteMessage to come out at the wrong time and may
cause problems when replaying.  There should be a small number of logs
that will ever be replayed which matter, and those can be recollected.

Change-Id: Ia7350ef8648276623833cc666e955e335b6bde1f
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 91cf7dc..9da790a 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -874,6 +874,11 @@
         message_header_builder.add_remote_queue_index(
             msg->remote_queue_index());
 
+        message_header_builder.add_monotonic_timestamp_time(
+            f.fetcher->context()
+                .monotonic_event_time.time_since_epoch()
+                .count());
+
         fbb.FinishSizePrefixed(message_header_builder.Finish());
         const auto end = event_loop_->monotonic_now();
         RecordCreateMessageTime(start, end, &f);
@@ -1278,7 +1283,7 @@
         logged_configuration()->channels()->Get(logged_channel_index));
 
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
-    aos::Sender<RemoteMessage> *remote_timestamp_sender = nullptr;
+    RemoteMessageSender *remote_timestamp_sender = nullptr;
 
     State *source_state = nullptr;
 
@@ -1861,7 +1866,7 @@
     size_t logged_channel_index, size_t factory_channel_index,
     std::unique_ptr<RawSender> sender,
     message_bridge::NoncausalOffsetEstimator *filter,
-    aos::Sender<RemoteMessage> *remote_timestamp_sender, State *source_state) {
+    RemoteMessageSender *remote_timestamp_sender, State *source_state) {
   channels_[logged_channel_index] = std::move(sender);
   filters_[logged_channel_index] = filter;
   remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
@@ -1949,15 +1954,12 @@
         timestamp);
   } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
              nullptr) {
-    aos::Sender<RemoteMessage>::Builder builder =
-        remote_timestamp_senders_[timestamped_message.channel_index]
-            ->MakeBuilder();
-
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.ForceDefaults(true);
     flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
-        builder.fbb()->CreateString(event_loop_->boot_uuid().string_view());
+        fbb.CreateString(event_loop_->boot_uuid().string_view());
 
-    RemoteMessage::Builder message_header_builder =
-        builder.MakeBuilder<RemoteMessage>();
+    RemoteMessage::Builder message_header_builder(fbb);
 
     message_header_builder.add_channel_index(
         factory_channel_index_[timestamped_message.channel_index]);
@@ -1978,27 +1980,90 @@
     message_header_builder.add_remote_queue_index(remote_queue_index);
     message_header_builder.add_boot_uuid(boot_uuid_offset);
 
-    builder.Send(message_header_builder.Finish());
+    fbb.Finish(message_header_builder.Finish());
+
+    remote_timestamp_senders_[timestamped_message.channel_index]->Send(
+        FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
+        timestamped_message.monotonic_timestamp_time);
   }
 
   return true;
 }
 
-aos::Sender<RemoteMessage> *LogReader::State::RemoteTimestampSender(
+LogReader::RemoteMessageSender::RemoteMessageSender(
+    aos::Sender<message_bridge::RemoteMessage> sender, EventLoop *event_loop)
+    : event_loop_(event_loop),
+      sender_(std::move(sender)),
+      timer_(event_loop->AddTimer([this]() { SendTimestamp(); })) {}
+
+void LogReader::RemoteMessageSender::ScheduleTimestamp() {
+  if (remote_timestamps_.empty()) {
+    CHECK_NOTNULL(timer_);
+    timer_->Disable();
+    scheduled_time_ = monotonic_clock::min_time;
+    return;
+  }
+
+  if (scheduled_time_ != remote_timestamps_.front().monotonic_timestamp_time) {
+    CHECK_NOTNULL(timer_);
+    timer_->Setup(
+        remote_timestamps_.front().monotonic_timestamp_time);
+    scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
+  }
+}
+
+void LogReader::RemoteMessageSender::Send(
+    FlatbufferDetachedBuffer<RemoteMessage> remote_message,
+    monotonic_clock::time_point monotonic_timestamp_time) {
+  // There are 2 cases.  Either we have a monotonic_timestamp_time and need to
+  // resend the timestamp at the correct time, or we don't and can send it
+  // immediately.
+  if (monotonic_timestamp_time == monotonic_clock::min_time) {
+    CHECK(remote_timestamps_.empty())
+        << ": Unsupported mix of timestamps and no timestamps.";
+    sender_.Send(std::move(remote_message));
+  } else {
+    remote_timestamps_.emplace_back(std::move(remote_message),
+                                    monotonic_timestamp_time);
+    ScheduleTimestamp();
+  }
+}
+
+void LogReader::RemoteMessageSender::SendTimestamp() {
+  CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_);
+  CHECK(!remote_timestamps_.empty());
+
+  // Send out all timestamps at the currently scheduled time.
+  while (remote_timestamps_.front().monotonic_timestamp_time ==
+         scheduled_time_) {
+    sender_.Send(std::move(remote_timestamps_.front().remote_message));
+    remote_timestamps_.pop_front();
+    if (remote_timestamps_.empty()) {
+      break;
+    }
+  }
+  scheduled_time_ = monotonic_clock::min_time;
+
+  ScheduleTimestamp();
+}
+
+LogReader::RemoteMessageSender *LogReader::State::RemoteTimestampSender(
     const Node *delivered_node) {
   auto sender = remote_timestamp_senders_map_.find(delivered_node);
 
   if (sender == remote_timestamp_senders_map_.end()) {
-    sender = remote_timestamp_senders_map_
-                 .emplace(std::make_pair(
-                     delivered_node,
-                     event_loop()->MakeSender<RemoteMessage>(
-                         absl::StrCat("/aos/remote_timestamps/",
-                                      delivered_node->name()->string_view()))))
-                 .first;
+    sender =
+        remote_timestamp_senders_map_
+            .emplace(delivered_node,
+                     std::make_unique<RemoteMessageSender>(
+                         event_loop()->MakeSender<RemoteMessage>(absl::StrCat(
+                             "/aos/remote_timestamps/",
+                             delivered_node->name()->string_view())),
+                         event_loop()))
+            .first;
   }
 
-  return &(sender->second);
+  return sender->second.get();
 }
 
 const TimestampedMessage &LogReader::State::PeekOldest() {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e71f31d..e3f5380 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -491,6 +491,49 @@
 
   const std::vector<LogFile> log_files_;
 
+  // Class to manage sending RemoteMessages on the provided node after the
+  // correct delay.
+  class RemoteMessageSender{
+   public:
+    RemoteMessageSender(aos::Sender<message_bridge::RemoteMessage> sender,
+                        EventLoop *event_loop);
+    RemoteMessageSender(RemoteMessageSender const &) = delete;
+    RemoteMessageSender &operator=(RemoteMessageSender const &) = delete;
+
+    // Sends the provided message.  If monotonic_timestamp_time is min_time,
+    // send it immediately.
+    void Send(
+        FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
+        monotonic_clock::time_point monotonic_timestamp_time);
+
+   private:
+    // Handles actually sending the timestamp if we were delayed.
+    void SendTimestamp();
+    // Handles scheduling the timer to send at the correct time.
+    void ScheduleTimestamp();
+
+    EventLoop *event_loop_;
+    aos::Sender<message_bridge::RemoteMessage> sender_;
+    aos::TimerHandler *timer_;
+
+    // Time we are scheduled for, or min_time if we aren't scheduled.
+    monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
+
+    struct Timestamp {
+      Timestamp(FlatbufferDetachedBuffer<message_bridge::RemoteMessage>
+                    new_remote_message,
+                monotonic_clock::time_point new_monotonic_timestamp_time)
+          : remote_message(std::move(new_remote_message)),
+            monotonic_timestamp_time(new_monotonic_timestamp_time) {}
+      FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message;
+      monotonic_clock::time_point monotonic_timestamp_time;
+    };
+
+    // List of messages to send. The timer works through them and then disables
+    // itself automatically.
+    std::deque<Timestamp> remote_timestamps_;
+  };
+
   // State per node.
   class State {
    public:
@@ -545,8 +588,7 @@
 
     // Returns the MessageHeader sender to log delivery timestamps to for the
     // provided remote node.
-    aos::Sender<message_bridge::RemoteMessage> *RemoteTimestampSender(
-        const Node *delivered_node);
+    RemoteMessageSender *RemoteTimestampSender(const Node *delivered_node);
 
     // Converts a timestamp from the monotonic clock on this node to the
     // distributed clock.
@@ -581,12 +623,11 @@
     void SetChannelCount(size_t count);
 
     // Sets the sender, filter, and target factory for a channel.
-    void SetChannel(
-        size_t logged_channel_index, size_t factory_channel_index,
-        std::unique_ptr<RawSender> sender,
-        message_bridge::NoncausalOffsetEstimator *filter,
-        aos::Sender<message_bridge::RemoteMessage> *remote_timestamp_sender,
-        State *source_state);
+    void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+                    std::unique_ptr<RawSender> sender,
+                    message_bridge::NoncausalOffsetEstimator *filter,
+                    RemoteMessageSender *remote_timestamp_sender,
+                    State *source_state);
 
     // Returns if we have read all the messages from all the logs.
     bool at_end() const {
@@ -642,8 +683,7 @@
 
     // Senders.
     std::vector<std::unique_ptr<RawSender>> channels_;
-    std::vector<aos::Sender<message_bridge::RemoteMessage> *>
-        remote_timestamp_senders_;
+    std::vector<RemoteMessageSender *> remote_timestamp_senders_;
     // The mapping from logged channel index to sent channel index.  Needed for
     // sending out MessageHeaders.
     std::vector<int> factory_channel_index_;
@@ -684,7 +724,7 @@
     // channel) which correspond to the originating node.
     std::vector<State *> channel_source_state_;
 
-    std::map<const Node *, aos::Sender<message_bridge::RemoteMessage>>
+    std::map<const Node *, std::unique_ptr<RemoteMessageSender>>
         remote_timestamp_senders_map_;
   };
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 4a311ca..05bafe4 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1590,12 +1590,14 @@
   const size_t pong_timestamp_channel = configuration::ChannelIndex(
       pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
 
+  const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
+
   pi1_event_loop->MakeWatcher(
       "/aos/remote_timestamps/pi2",
       [&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
        ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
        &pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
-       &ping_on_pi2_fetcher](const RemoteMessage &header) {
+       &ping_on_pi2_fetcher, network_delay](const RemoteMessage &header) {
         const aos::monotonic_clock::time_point header_monotonic_sent_time(
             chrono::nanoseconds(header.monotonic_sent_time()));
         const aos::realtime_clock::time_point header_realtime_sent_time(
@@ -1645,13 +1647,19 @@
                   header_realtime_remote_time);
         EXPECT_EQ(pi1_context->monotonic_event_time,
                   header_monotonic_remote_time);
+
+        EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
+                  pi2_context->monotonic_event_time +
+                      (pi1_event_loop->monotonic_now() -
+                       pi2_event_loop->monotonic_now()) +
+                      network_delay);
       });
   pi2_event_loop->MakeWatcher(
       "/aos/remote_timestamps/pi1",
       [&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
        pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
        &pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
-       &pong_on_pi1_fetcher](const RemoteMessage &header) {
+       &pong_on_pi1_fetcher, network_delay](const RemoteMessage &header) {
         const aos::monotonic_clock::time_point header_monotonic_sent_time(
             chrono::nanoseconds(header.monotonic_sent_time()));
         const aos::realtime_clock::time_point header_realtime_sent_time(
@@ -1701,6 +1709,12 @@
                   header_realtime_remote_time);
         EXPECT_EQ(pi2_context->monotonic_event_time,
                   header_monotonic_remote_time);
+
+        EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
+                  pi1_context->monotonic_event_time +
+                      (pi2_event_loop->monotonic_now() -
+                       pi1_event_loop->monotonic_now()) +
+                      network_delay);
       });
 
   // And confirm we can re-create a log again, while checking the contents.