Handle log files not starting at the same time.

The monotonic clocks were assumed to be in sync.  That isn't realistic.
This assumption leaked into how we kept the queues primed, and how the
event loop was initialized.

This isn't enough to actually replay in sync.  We are assuming that the
realtime clocks are in sync and the monotonic clocks don't drift from
each other.  That'll be good enough to get started, but not for long.

Change-Id: Ic18e31598f1a76edee0b0d5a2d7936deee1fbfec
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 9a849b2..e5e0175 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -163,6 +163,7 @@
     return max_out_of_order_duration_;
   }
 
+  // Returns the newest timestamp read out of the log file.
   monotonic_clock::time_point newest_timestamp() const {
     return newest_timestamp_;
   }
@@ -218,15 +219,15 @@
 
   // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
   // max_time if there is nothing in the channel.
-  std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
-      int channel) {
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+  oldest_message(int channel) {
     return channels_[channel].data.front_timestamp();
   }
 
   // Returns the (timestamp, queue_index) for the oldest delivery time in a
   // channel, or max_time if there is nothing in the channel.
-  std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
-      int channel, int destination_node) {
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+  oldest_message(int channel, int destination_node) {
     return channels_[channel].timestamps[destination_node].front_timestamp();
   }
 
@@ -278,18 +279,29 @@
   // Returns the timestamp of the newest message read from the log file, and the
   // timestamp that we need to re-queue data.
   monotonic_clock::time_point newest_timestamp() const {
-    return message_reader_->newest_timestamp();
-  }
-  monotonic_clock::time_point queue_data_time() const {
-    return message_reader_->queue_data_time();
+    return newest_timestamp_;
   }
 
+  // Returns the next time to trigger a requeue.
+  monotonic_clock::time_point time_to_queue() const { return time_to_queue_; }
+
+  // Returns the minimum amount of data needed to queue up for sorting before
+  // ware guarenteed to not see data out of order.
+  std::chrono::nanoseconds max_out_of_order_duration() const {
+    return message_reader_->max_out_of_order_duration();
+  }
+
+  std::string_view filename() const { return message_reader_->filename(); }
 
   // Adds more messages to the sorted list.  This reads enough data such that
   // oldest_message_time can be replayed safely.  Returns false if the log file
   // has all been read.
   bool QueueMessages(monotonic_clock::time_point oldest_message_time);
 
+  // Returns debug strings for a channel, and timestamps for a node.
+  std::string DebugString(int channel) const;
+  std::string DebugString(int channel, int node_index) const;
+
  private:
   // TODO(austin): Need to copy or refcount the message instead of running
   // multiple copies of the reader.  Or maybe have a "as_node" index and hide it
@@ -320,8 +332,9 @@
       return data_.front();
     }
 
-    // Adds a message to the back of the queue.
-    void emplace_back(FlatbufferVector<MessageHeader> &&msg);
+    // Adds a message to the back of the queue. Returns true if it was actually
+    // emplaced.
+    bool emplace_back(FlatbufferVector<MessageHeader> &&msg);
 
     // Drops the front message.  Invalidates the front() reference.
     void pop_front();
@@ -329,13 +342,18 @@
     // The size of the queue.
     size_t size() { return data_.size(); }
 
+    // Returns a debug string with info about each message in the queue.
+    std::string DebugString() const;
+
     // Returns the (timestamp, queue_index) for the oldest message.
-    const std::tuple<monotonic_clock::time_point, uint32_t> front_timestamp() {
+    const std::tuple<monotonic_clock::time_point, uint32_t,
+                     const MessageHeader *>
+    front_timestamp() {
       CHECK_GT(data_.size(), 0u);
       return std::make_tuple(
           monotonic_clock::time_point(std::chrono::nanoseconds(
               front().message().monotonic_sent_time())),
-          front().message().queue_index());
+          front().message().queue_index(), &front().message());
     }
 
     // Pointer to the timestamp merger for this queue if available.
@@ -365,8 +383,16 @@
   // Precompute this here for efficiency.
   std::vector<MessageHeaderQueue *> channels_to_write_;
 
-  // Number of messages queued.
-  size_t queued_messages_ = 0;
+  monotonic_clock::time_point time_to_queue_ = monotonic_clock::min_time;
+
+  // Latches true when we hit the end of the last log file and there is no sense
+  // poking it further.
+  bool at_end_ = false;
+
+  // Timestamp of the newest message that was read and actually queued.  We want
+  // to track this independently from the log file because we need the
+  // timestamps here to be timestamps of messages that are queued.
+  monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
 };
 
 class ChannelMerger;
@@ -396,7 +422,8 @@
   // the reader.
   void UpdateTimestamp(
       SplitMessageReader *split_message_reader,
-      std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+          oldest_message_time) {
     PushTimestampHeap(oldest_message_time, split_message_reader);
   }
   // Pushes SplitMessageReader onto the message heap.  This should only be
@@ -404,11 +431,14 @@
   // reader.
   void Update(
       SplitMessageReader *split_message_reader,
-      std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+          oldest_message_time) {
     PushMessageHeap(oldest_message_time, split_message_reader);
   }
 
-  // Returns the oldest combined timestamp and data for this channel.
+  // Returns the oldest combined timestamp and data for this channel.  If there
+  // isn't a matching piece of data, returns only the timestamp with no data.
+  // The caller can determine what the appropriate action is to recover.
   std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
 
   // Tracks if the channel merger has pushed this onto it's heap or not.
@@ -417,13 +447,18 @@
   // called by the channel merger.
   void set_pushed(bool pushed) { pushed_ = pushed; }
 
+  // Returns a debug string with the heaps printed out.
+  std::string DebugString() const;
+
  private:
   // Pushes messages and timestamps to the corresponding heaps.
   void PushMessageHeap(
-      std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+          timestamp,
       SplitMessageReader *split_message_reader);
   void PushTimestampHeap(
-      std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+          timestamp,
       SplitMessageReader *split_message_reader);
 
   // Pops a message from the message heap.  This automatically triggers the
@@ -431,6 +466,11 @@
   std::tuple<monotonic_clock::time_point, uint32_t,
              FlatbufferVector<MessageHeader>>
   PopMessageHeap();
+
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+  oldest_message() const;
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+  oldest_timestamp() const;
   // Pops a message from the timestamp heap.  This automatically triggers the
   // split message reader to re-fetch any new data.
   std::tuple<monotonic_clock::time_point, uint32_t,
@@ -506,11 +546,11 @@
   }
 
   // Returns the start times for the configured node's log files.
-  monotonic_clock::time_point monotonic_start_time() {
+  monotonic_clock::time_point monotonic_start_time() const {
     return monotonic_clock::time_point(
         std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
   }
-  realtime_clock::time_point realtime_start_time() {
+  realtime_clock::time_point realtime_start_time() const {
     return realtime_clock::time_point(
         std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
   }
@@ -524,11 +564,11 @@
     PushChannelHeap(timestamp, channel_index);
   }
 
- private:
-  // Queues messages from each SplitMessageReader until enough data is queued
-  // such that we can guarentee all sorting has happened.
-  void QueueMessages(monotonic_clock::time_point oldest_message_time);
+  // Returns a debug string with all the heaps in it.  Generally only useful for
+  // debugging what went wrong.
+  std::string DebugString() const;
 
+ private:
   // Pushes the timestamp for new data on the provided channel.
   void PushChannelHeap(monotonic_clock::time_point timestamp,
                        int channel_index);
@@ -545,11 +585,6 @@
   // A heap of the channel readers and timestamps for the oldest data in each.
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
 
-  // This holds a heap of split_message_readers sorted by the time at which they
-  // need to have QueueMessages called on them.
-  std::vector<std::pair<monotonic_clock::time_point, int>>
-      split_message_reader_heap_;
-
   // Configured node.
   const Node *node_;