Publish recreated RemoteMessages delayed from LogReader

This makes the new log -> replay -> logger loop fully correct, and
preserves the timestamp.  Note: replaying log files from before this
time will cause RemoteMessage to come out at the wrong time and may
cause problems when replaying.  There should be a small number of logs
that will ever be replayed which matter, and those can be recollected.

Change-Id: Ia7350ef8648276623833cc666e955e335b6bde1f
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 91cf7dc..9da790a 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -874,6 +874,11 @@
         message_header_builder.add_remote_queue_index(
             msg->remote_queue_index());
 
+        message_header_builder.add_monotonic_timestamp_time(
+            f.fetcher->context()
+                .monotonic_event_time.time_since_epoch()
+                .count());
+
         fbb.FinishSizePrefixed(message_header_builder.Finish());
         const auto end = event_loop_->monotonic_now();
         RecordCreateMessageTime(start, end, &f);
@@ -1278,7 +1283,7 @@
         logged_configuration()->channels()->Get(logged_channel_index));
 
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
-    aos::Sender<RemoteMessage> *remote_timestamp_sender = nullptr;
+    RemoteMessageSender *remote_timestamp_sender = nullptr;
 
     State *source_state = nullptr;
 
@@ -1861,7 +1866,7 @@
     size_t logged_channel_index, size_t factory_channel_index,
     std::unique_ptr<RawSender> sender,
     message_bridge::NoncausalOffsetEstimator *filter,
-    aos::Sender<RemoteMessage> *remote_timestamp_sender, State *source_state) {
+    RemoteMessageSender *remote_timestamp_sender, State *source_state) {
   channels_[logged_channel_index] = std::move(sender);
   filters_[logged_channel_index] = filter;
   remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
@@ -1949,15 +1954,12 @@
         timestamp);
   } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
              nullptr) {
-    aos::Sender<RemoteMessage>::Builder builder =
-        remote_timestamp_senders_[timestamped_message.channel_index]
-            ->MakeBuilder();
-
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.ForceDefaults(true);
     flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
-        builder.fbb()->CreateString(event_loop_->boot_uuid().string_view());
+        fbb.CreateString(event_loop_->boot_uuid().string_view());
 
-    RemoteMessage::Builder message_header_builder =
-        builder.MakeBuilder<RemoteMessage>();
+    RemoteMessage::Builder message_header_builder(fbb);
 
     message_header_builder.add_channel_index(
         factory_channel_index_[timestamped_message.channel_index]);
@@ -1978,27 +1980,90 @@
     message_header_builder.add_remote_queue_index(remote_queue_index);
     message_header_builder.add_boot_uuid(boot_uuid_offset);
 
-    builder.Send(message_header_builder.Finish());
+    fbb.Finish(message_header_builder.Finish());
+
+    remote_timestamp_senders_[timestamped_message.channel_index]->Send(
+        FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
+        timestamped_message.monotonic_timestamp_time);
   }
 
   return true;
 }
 
-aos::Sender<RemoteMessage> *LogReader::State::RemoteTimestampSender(
+LogReader::RemoteMessageSender::RemoteMessageSender(
+    aos::Sender<message_bridge::RemoteMessage> sender, EventLoop *event_loop)
+    : event_loop_(event_loop),
+      sender_(std::move(sender)),
+      timer_(event_loop->AddTimer([this]() { SendTimestamp(); })) {}
+
+void LogReader::RemoteMessageSender::ScheduleTimestamp() {
+  if (remote_timestamps_.empty()) {
+    CHECK_NOTNULL(timer_);
+    timer_->Disable();
+    scheduled_time_ = monotonic_clock::min_time;
+    return;
+  }
+
+  if (scheduled_time_ != remote_timestamps_.front().monotonic_timestamp_time) {
+    CHECK_NOTNULL(timer_);
+    timer_->Setup(
+        remote_timestamps_.front().monotonic_timestamp_time);
+    scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
+  }
+}
+
+void LogReader::RemoteMessageSender::Send(
+    FlatbufferDetachedBuffer<RemoteMessage> remote_message,
+    monotonic_clock::time_point monotonic_timestamp_time) {
+  // There are 2 cases.  Either we have a monotonic_timestamp_time and need to
+  // resend the timestamp at the correct time, or we don't and can send it
+  // immediately.
+  if (monotonic_timestamp_time == monotonic_clock::min_time) {
+    CHECK(remote_timestamps_.empty())
+        << ": Unsupported mix of timestamps and no timestamps.";
+    sender_.Send(std::move(remote_message));
+  } else {
+    remote_timestamps_.emplace_back(std::move(remote_message),
+                                    monotonic_timestamp_time);
+    ScheduleTimestamp();
+  }
+}
+
+void LogReader::RemoteMessageSender::SendTimestamp() {
+  CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_);
+  CHECK(!remote_timestamps_.empty());
+
+  // Send out all timestamps at the currently scheduled time.
+  while (remote_timestamps_.front().monotonic_timestamp_time ==
+         scheduled_time_) {
+    sender_.Send(std::move(remote_timestamps_.front().remote_message));
+    remote_timestamps_.pop_front();
+    if (remote_timestamps_.empty()) {
+      break;
+    }
+  }
+  scheduled_time_ = monotonic_clock::min_time;
+
+  ScheduleTimestamp();
+}
+
+LogReader::RemoteMessageSender *LogReader::State::RemoteTimestampSender(
     const Node *delivered_node) {
   auto sender = remote_timestamp_senders_map_.find(delivered_node);
 
   if (sender == remote_timestamp_senders_map_.end()) {
-    sender = remote_timestamp_senders_map_
-                 .emplace(std::make_pair(
-                     delivered_node,
-                     event_loop()->MakeSender<RemoteMessage>(
-                         absl::StrCat("/aos/remote_timestamps/",
-                                      delivered_node->name()->string_view()))))
-                 .first;
+    sender =
+        remote_timestamp_senders_map_
+            .emplace(delivered_node,
+                     std::make_unique<RemoteMessageSender>(
+                         event_loop()->MakeSender<RemoteMessage>(absl::StrCat(
+                             "/aos/remote_timestamps/",
+                             delivered_node->name()->string_view())),
+                         event_loop()))
+            .first;
   }
 
-  return &(sender->second);
+  return sender->second.get();
 }
 
 const TimestampedMessage &LogReader::State::PeekOldest() {