Publish recreated RemoteMessages delayed from LogReader

This makes the new log -> replay -> logger loop fully correct, and
preserves the timestamp.  Note: replaying log files from before this
time will cause RemoteMessage to come out at the wrong time and may
cause problems when replaying.  There should be a small number of logs
that will ever be replayed which matter, and those can be recollected.

Change-Id: Ia7350ef8648276623833cc666e955e335b6bde1f
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e71f31d..e3f5380 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -491,6 +491,49 @@
 
   const std::vector<LogFile> log_files_;
 
+  // Class to manage sending RemoteMessages on the provided node after the
+  // correct delay.
+  class RemoteMessageSender{
+   public:
+    RemoteMessageSender(aos::Sender<message_bridge::RemoteMessage> sender,
+                        EventLoop *event_loop);
+    RemoteMessageSender(RemoteMessageSender const &) = delete;
+    RemoteMessageSender &operator=(RemoteMessageSender const &) = delete;
+
+    // Sends the provided message.  If monotonic_timestamp_time is min_time,
+    // send it immediately.
+    void Send(
+        FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
+        monotonic_clock::time_point monotonic_timestamp_time);
+
+   private:
+    // Handles actually sending the timestamp if we were delayed.
+    void SendTimestamp();
+    // Handles scheduling the timer to send at the correct time.
+    void ScheduleTimestamp();
+
+    EventLoop *event_loop_;
+    aos::Sender<message_bridge::RemoteMessage> sender_;
+    aos::TimerHandler *timer_;
+
+    // Time we are scheduled for, or min_time if we aren't scheduled.
+    monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
+
+    struct Timestamp {
+      Timestamp(FlatbufferDetachedBuffer<message_bridge::RemoteMessage>
+                    new_remote_message,
+                monotonic_clock::time_point new_monotonic_timestamp_time)
+          : remote_message(std::move(new_remote_message)),
+            monotonic_timestamp_time(new_monotonic_timestamp_time) {}
+      FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message;
+      monotonic_clock::time_point monotonic_timestamp_time;
+    };
+
+    // List of messages to send. The timer works through them and then disables
+    // itself automatically.
+    std::deque<Timestamp> remote_timestamps_;
+  };
+
   // State per node.
   class State {
    public:
@@ -545,8 +588,7 @@
 
     // Returns the MessageHeader sender to log delivery timestamps to for the
     // provided remote node.
-    aos::Sender<message_bridge::RemoteMessage> *RemoteTimestampSender(
-        const Node *delivered_node);
+    RemoteMessageSender *RemoteTimestampSender(const Node *delivered_node);
 
     // Converts a timestamp from the monotonic clock on this node to the
     // distributed clock.
@@ -581,12 +623,11 @@
     void SetChannelCount(size_t count);
 
     // Sets the sender, filter, and target factory for a channel.
-    void SetChannel(
-        size_t logged_channel_index, size_t factory_channel_index,
-        std::unique_ptr<RawSender> sender,
-        message_bridge::NoncausalOffsetEstimator *filter,
-        aos::Sender<message_bridge::RemoteMessage> *remote_timestamp_sender,
-        State *source_state);
+    void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+                    std::unique_ptr<RawSender> sender,
+                    message_bridge::NoncausalOffsetEstimator *filter,
+                    RemoteMessageSender *remote_timestamp_sender,
+                    State *source_state);
 
     // Returns if we have read all the messages from all the logs.
     bool at_end() const {
@@ -642,8 +683,7 @@
 
     // Senders.
     std::vector<std::unique_ptr<RawSender>> channels_;
-    std::vector<aos::Sender<message_bridge::RemoteMessage> *>
-        remote_timestamp_senders_;
+    std::vector<RemoteMessageSender *> remote_timestamp_senders_;
     // The mapping from logged channel index to sent channel index.  Needed for
     // sending out MessageHeaders.
     std::vector<int> factory_channel_index_;
@@ -684,7 +724,7 @@
     // channel) which correspond to the originating node.
     std::vector<State *> channel_source_state_;
 
-    std::map<const Node *, aos::Sender<message_bridge::RemoteMessage>>
+    std::map<const Node *, std::unique_ptr<RemoteMessageSender>>
         remote_timestamp_senders_map_;
   };