Sort messages between nodes properly

We used to assume the realtime clocks were in sync.  This isn't
realistic.  Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.

Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled.  They can't be converted to the
distributed_clock once.  They need to now be converted every time they
are compared between nodes.

Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index e5e0175..4ab4dca 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -302,6 +302,10 @@
   std::string DebugString(int channel) const;
   std::string DebugString(int channel, int node_index) const;
 
+  // Returns true if all the messages have been queued from the last log file in
+  // the list of log files chunks.
+  bool at_end() const { return at_end_; }
+
  private:
   // TODO(austin): Need to copy or refcount the message instead of running
   // multiple copies of the reader.  Or maybe have a "as_node" index and hide it
@@ -441,6 +445,9 @@
   // The caller can determine what the appropriate action is to recover.
   std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
 
+  // Returns the oldest forwarding timestamp.
+  DeliveryTimestamp OldestTimestamp() const;
+
   // Tracks if the channel merger has pushed this onto it's heap or not.
   bool pushed() { return pushed_; }
   // Sets if this has been pushed to the channel merger heap.  Should only be
@@ -450,6 +457,13 @@
   // Returns a debug string with the heaps printed out.
   std::string DebugString() const;
 
+  // Returns true if we have timestamps.
+  bool has_timestamps() const { return has_timestamps_; }
+
+  // Records that one of the log files ran out of data.  This should only be
+  // called by a SplitMessageReader.
+  void NoticeAtEnd();
+
  private:
   // Pushes messages and timestamps to the corresponding heaps.
   void PushMessageHeap(
@@ -536,6 +550,12 @@
              FlatbufferVector<MessageHeader>>
   PopOldest();
 
+  // Returns the oldest timestamp in the timestamp heap.
+  TimestampMerger::DeliveryTimestamp OldestTimestamp() const;
+  // Returns the oldest timestamp in the timestamp heap for a specific channel.
+  TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
+      int channel) const;
+
   // Returns the config for this set of log files.
   const Configuration *configuration() const {
     return log_file_header()->configuration();
@@ -568,6 +588,15 @@
   // debugging what went wrong.
   std::string DebugString() const;
 
+  // Returns true if one of the log files has finished reading everything.  When
+  // log file chunks are involved, this means that the last chunk in a log file
+  // has been read.  It is acceptable to be missing data at this point in time.
+  bool at_end() const { return at_end_; }
+
+  // Marks that one of the log files is at the end.  This should only be called
+  // by timestamp mergers.
+  void NoticeAtEnd() { at_end_ = true; }
+
  private:
   // Pushes the timestamp for new data on the provided channel.
   void PushChannelHeap(monotonic_clock::time_point timestamp,
@@ -584,10 +613,15 @@
 
   // A heap of the channel readers and timestamps for the oldest data in each.
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+  // A heap of just the timestamp channel readers and timestamps for the oldest
+  // data in each.
+  std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap_;
 
   // Configured node.
   const Node *node_;
 
+  bool at_end_ = false;
+
   // Cached copy of the list of nodes.
   std::vector<const Node *> nodes_;
 };