Add TimestampMapper to match timestamps with data

This finishes the main sorting code for
https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#

This creates an object to buffer sorted data in, and to buffer data to
match every time we find a timestamp.  Currently, there is no timing out
of the data buffered, but only the data that *could* be forwarded is
buffered.

We also have a bunch of restrictions here to simplify the logic.  The
plan is to relax them as we run into them rather than solve everything
at the start.  And, we can then add tests a lot better.

Change-Id: Idbc515e5594bc031139c7b994aaa71826ff68c0a
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index d0afdc9..4a19209 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -281,6 +281,9 @@
 
   std::string_view filename() const { return message_reader_.filename(); }
 
+  // Returns the LogParts that holds the filenames we are reading.
+  const LogParts &parts() const { return parts_; }
+
   const LogFileHeader *log_file_header() const {
     return message_reader_.log_file_header();
   }
@@ -332,6 +335,25 @@
 
 std::ostream &operator<<(std::ostream &os, const Message &m);
 
+// Structure to hold a full message and all the timestamps, which may or may not
+// have been sent from a remote node.  The remote_queue_index will be invalid if
+// this message is from the point of view of the node which sent it.
+struct TimestampedMessage {
+  uint32_t channel_index = 0xffffffff;
+
+  uint32_t queue_index = 0xffffffff;
+  monotonic_clock::time_point monotonic_event_time = monotonic_clock::min_time;
+  realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+
+  uint32_t remote_queue_index = 0xffffffff;
+  monotonic_clock::time_point monotonic_remote_time = monotonic_clock::min_time;
+  realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
+
+  SizePrefixedFlatbufferVector<MessageHeader> data;
+};
+
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
+
 // Class to sort the resulting messages from a PartsMessageReader.
 class LogPartsSorter {
  public:
@@ -346,6 +368,13 @@
     return parts_message_reader_.log_file_header();
   }
 
+  monotonic_clock::time_point monotonic_start_time() const {
+    return parts_message_reader_.parts().monotonic_start_time;
+  }
+  realtime_clock::time_point realtime_start_time() const {
+    return parts_message_reader_.parts().realtime_start_time;
+  }
+
   // The time this data is sorted until.
   monotonic_clock::time_point sorted_until() const { return sorted_until_; }
 
@@ -374,12 +403,22 @@
 // instances.
 class NodeMerger {
  public:
-  NodeMerger(std::vector<std::unique_ptr<LogPartsSorter>> parts);
+  NodeMerger(std::vector<LogParts> parts);
+
+  // Node index in the configuration of this node.
+  int node() const { return node_; }
 
   // The log file header for one of the log files.
   const LogFileHeader *log_file_header() const {
     CHECK(!parts_sorters_.empty());
-    return parts_sorters_[0]->log_file_header();
+    return parts_sorters_[0].log_file_header();
+  }
+
+  monotonic_clock::time_point monotonic_start_time() const {
+    return monotonic_start_time_;
+  }
+  realtime_clock::time_point realtime_start_time() const {
+    return realtime_start_time_;
   }
 
   // The time this data is sorted until.
@@ -394,12 +433,143 @@
 
  private:
   // Unsorted list of all parts sorters.
-  std::vector<std::unique_ptr<LogPartsSorter>> parts_sorters_;
+  std::vector<LogPartsSorter> parts_sorters_;
   // Pointer to the parts sorter holding the current Front message if one
   // exists, or nullptr if a new one needs to be found.
   LogPartsSorter *current_ = nullptr;
   // Cached sorted_until value.
   aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
+
+  // Cached node.
+  int node_;
+
+  realtime_clock::time_point realtime_start_time_ = realtime_clock::max_time;
+  monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::max_time;
+};
+
+// Class to match timestamps with the corresponding data from other nodes.
+class TimestampMapper {
+ public:
+  TimestampMapper(std::vector<LogParts> file);
+
+  // Copying and moving will mess up the internal raw pointers.  Just don't do
+  // it.
+  TimestampMapper(TimestampMapper const &) = delete;
+  TimestampMapper(TimestampMapper &&) = delete;
+  void operator=(TimestampMapper const &) = delete;
+  void operator=(TimestampMapper &&) = delete;
+
+  // TODO(austin): It would be super helpful to provide a way to queue up to
+  // time X without matching timestamps, and to then be able to pull the
+  // timestamps out of this queue.  This lets us bootstrap time estimation
+  // without exploding memory usage worst case.
+
+  // Returns a log file header for this node.
+  const LogFileHeader *log_file_header() const {
+    return node_merger_.log_file_header();
+  }
+
+  // Returns which node this is sorting for.
+  size_t node() const { return node_; }
+
+  // The start time of this log.
+  monotonic_clock::time_point monotonic_start_time() const {
+    return node_merger_.monotonic_start_time();
+  }
+  realtime_clock::time_point realtime_start_time() const {
+    return node_merger_.realtime_start_time();
+  }
+
+  // Uses timestamp_mapper as the peer for its node. Only one mapper may be set
+  // for each node.  Peers are used to look up the data for timestamps on this
+  // node.
+  void AddPeer(TimestampMapper *timestamp_mapper);
+
+  // Time that we are sorted until internally.
+  monotonic_clock::time_point sorted_until() const {
+    return node_merger_.sorted_until();
+  }
+
+  // Returns the next message for this node.
+  TimestampedMessage *Front();
+  // Pops the next message.  Front must be called first.
+  void PopFront();
+
+  // Returns debug information about this node.
+  std::string DebugString() const;
+
+ private:
+  // The state for a remote node.  This holds the data that needs to be matched
+  // with the remote node's timestamps.
+  struct NodeData {
+    // True if we should save data here.  This should be true if any of the
+    // bools in delivered below are true.
+    bool any_delivered = false;
+
+    // Peer pointer.  This node is only to be considered if a peer is set.
+    TimestampMapper *peer = nullptr;
+
+    struct ChannelData {
+      // Deque per channel.  This contains the data from the outside
+      // TimestampMapper node which is relevant for the node this NodeData
+      // points to.
+      std::deque<Message> messages;
+      // Bool tracking per channel if a message is delivered to the node this
+      // NodeData represents.
+      bool delivered = false;
+    };
+
+    // Vector with per channel data.
+    std::vector<ChannelData> channels;
+  };
+
+  // Returns (and forgets about) the data for the provided timestamp message
+  // showing when it was delivered to this node.
+  Message MatchingMessageFor(const Message &message);
+
+  // Queues up a single message into our message queue, and any nodes that this
+  // message is delivered to.  Returns true if one was available, false
+  // otherwise.
+  bool Queue();
+
+  // Queues up data until we have at least one message >= to time t.
+  // Useful for triggering a remote node to read enough data to have the
+  // timestamp you care about available.
+  void QueueUntil(monotonic_clock::time_point t);
+
+  // Fills message_ with the contents of m.
+  void FillMessage(Message *m);
+
+  // The node merger to source messages from.
+  NodeMerger node_merger_;
+  // Our node.
+  const size_t node_;
+  // The buffer of messages for this node.  These are not matched with any
+  // remote data.
+  std::deque<Message> messages_;
+  // The node index for the source node for each channel.
+  std::vector<size_t> source_node_;
+
+  // Vector per node.  Not all nodes will have anything.
+  std::vector<NodeData> nodes_data_;
+
+  // Latest message to return.
+  TimestampedMessage message_;
+
+  // Tracks if the first message points to message_, nullptr (all done), or is
+  // invalid.
+  enum class FirstMessage {
+    kNeedsUpdate,
+    kInMessage,
+    kNullptr,
+  };
+  FirstMessage first_message_ = FirstMessage::kNeedsUpdate;
+
+  // Timestamp of the last message returned.  Used to make sure nothing goes
+  // backwards.
+  monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
+  // Time this node is queued up until.  Used for caching.
+  monotonic_clock::time_point queued_until_ = monotonic_clock::min_time;
 };
 
 class TimestampMerger;