Add multi-node log file reading

This handles timestamps, sorting, and merging with data.

For simplicity, we read the log files once per node.  Once benchmarks
show if this is a bad idea, we can fix it.

Change-Id: I445ac5bfc7186bda25cc899602ac8d95a4cb946d
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 5b2bfa6..9a849b2 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -26,10 +26,11 @@
   // The message originated on another node. Log it and the delivery times
   // together.  The message_gateway is responsible for logging any messages
   // which didn't get delivered.
-  kLogMessageAndDeliveryTime
+  kLogMessageAndDeliveryTime,
+  // The message originated on the other node and should be logged on this node.
+  kLogRemoteMessage
 };
 
-
 // This class manages efficiently writing a sequence of detached buffers to a
 // file.  It queues them up and batches the write operation.
 class DetachedBufferWriter {
@@ -37,6 +38,8 @@
   DetachedBufferWriter(std::string_view filename);
   ~DetachedBufferWriter();
 
+  std::string_view filename() const { return filename_; }
+
   // TODO(austin): Snappy compress the log file if it ends with .snappy!
 
   // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
@@ -51,6 +54,8 @@
   void Flush();
 
  private:
+  const std::string filename_;
+
   int fd_ = -1;
 
   // Size of all the data in the queue.
@@ -68,6 +73,8 @@
     flatbuffers::FlatBufferBuilder *fbb, const Context &context,
     int channel_index, LogType log_type);
 
+FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
+
 // Class to read chunks out of a log file.
 class SpanReader {
  public:
@@ -75,6 +82,8 @@
 
   ~SpanReader() { close(fd_); }
 
+  std::string_view filename() const { return filename_; }
+
   // Returns a span with the data for a message from the log file, excluding
   // the size.
   absl::Span<const uint8_t> ReadMessage();
@@ -92,6 +101,8 @@
   // Reads a chunk of data into data_.  Returns false if no data was read.
   bool ReadBlock();
 
+  const std::string filename_;
+
   // File descriptor for the log file.
   int fd_ = -1;
 
@@ -138,6 +149,8 @@
  public:
   MessageReader(std::string_view filename);
 
+  std::string_view filename() const { return span_reader_.filename(); }
+
   // Returns the header from the log file.
   const LogFileHeader *log_file_header() const {
     return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
@@ -177,52 +190,64 @@
   monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
 };
 
-// We need to read a large chunk at a time, then kit it up into parts and
-// sort.
-//
-// We want to read 256 KB chunks at a time.  This is the fastest read size.
-// This leaves us with a fragmentation problem though.
-//
-// The easy answer is to read 256 KB chunks.  Then, malloc and memcpy those
-// chunks into single flatbuffer messages and manage them in a sorted queue.
-// Everything is copied three times (into 256 kb buffer, then into separate
-// buffer, then into sender), but none of it is all that expensive.  We can
-// optimize if it is slow later.
-//
-// As we place the elements in the sorted list of times, keep doing this
-// until we read a message that is newer than the threshold.
-//
-// Then repeat.  Keep filling up the sorted list with 256 KB chunks (need a
-// small state machine so we can resume), and keep pulling messages back out
-// and sending.
-//
-// For sorting, we want to use the fact that each channel is sorted, and
-// then merge sort the channels.  Have a vector of deques, and then hold a
-// sorted list of pointers to those.
-class SortedMessageReader {
- public:
-  SortedMessageReader(const std::vector<std::string> &filenames);
+class TimestampMerger;
 
-  // Returns the header from the log file.
+// A design requirement is that the relevant data for a channel is not more than
+// max_out_of_order_duration out of order. We approach sorting in layers.
+//
+// 1) Split each (maybe chunked) log file into one queue per channel.  Read this
+//    log file looking for data pertaining to a specific node.
+//    (SplitMessageReader)
+// 2) Merge all the data per channel from the different log files into a sorted
+//    list of timestamps and messages. (TimestampMerger)
+// 3) Combine the timestamps and messages. (TimestampMerger)
+// 4) Merge all the channels to produce the next message on a node.
+//    (ChannelMerger)
+// 5) Duplicate this entire stack per node.
+
+// This class splits messages and timestamps up into a queue per channel, and
+// handles reading data from multiple chunks.
+class SplitMessageReader {
+ public:
+  SplitMessageReader(const std::vector<std::string> &filenames);
+
+  // Sets the TimestampMerger that gets notified for each channel.  The node
+  // that the TimestampMerger is merging as needs to be passed in.
+  void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
+                          const Node *target_node);
+
+  // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
+  // max_time if there is nothing in the channel.
+  std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
+      int channel) {
+    return channels_[channel].data.front_timestamp();
+  }
+
+  // Returns the (timestamp, queue_index) for the oldest delivery time in a
+  // channel, or max_time if there is nothing in the channel.
+  std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
+      int channel, int destination_node) {
+    return channels_[channel].timestamps[destination_node].front_timestamp();
+  }
+
+  // Returns the timestamp, queue_index, and message for the oldest data on a
+  // channel.  Requeues data as needed.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+  PopOldest(int channel_index);
+
+  // Returns the timestamp, queue_index, and message for the oldest timestamp on
+  // a channel delivered to a node.  Requeues data as needed.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+  PopOldest(int channel, int node_index);
+
+  // Returns the header for the log files.
   const LogFileHeader *log_file_header() const {
     return &log_file_header_.message();
   }
 
-  // Returns a pointer to the channel with the oldest message in it, and the
-  // timestamp.
-  const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
-    return channel_heap_.front();
-  }
-
-  // Returns the number of channels with data still in them.
-  size_t active_channel_count() const { return channel_heap_.size(); }
-
-  // Returns the configuration from the log file header.
-  const Configuration *configuration() const {
-    return log_file_header()->configuration();
-  }
-
-  // Returns the start time on both the monotonic and realtime clocks.
+  // Returns the starting time for this set of log files.
   monotonic_clock::time_point monotonic_start_time() {
     return monotonic_clock::time_point(
         std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
@@ -232,74 +257,304 @@
         std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
   }
 
+  // Returns the configuration from the log file header.
+  const Configuration *configuration() const {
+    return log_file_header()->configuration();
+  }
+
   // Returns the node who's point of view this log file is from.  Make sure this
   // is a pointer in the configuration() nodes list so it can be consumed
   // elsewhere.
   const Node *node() const {
     if (configuration()->has_nodes()) {
-      CHECK(log_file_header()->has_node());
-      CHECK(log_file_header()->node()->has_name());
-      return configuration::GetNode(
-          configuration(), log_file_header()->node()->name()->string_view());
+      return configuration::GetNodeOrDie(configuration(),
+                                         log_file_header()->node());
     } else {
       CHECK(!log_file_header()->has_node());
       return nullptr;
     }
   }
 
-  // Pops a pointer to the channel with the oldest message in it, and the
-  // timestamp.
-  std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
-  PopOldestChannel();
+  // Returns the timestamp of the newest message read from the log file, and the
+  // timestamp that we need to re-queue data.
+  monotonic_clock::time_point newest_timestamp() const {
+    return message_reader_->newest_timestamp();
+  }
+  monotonic_clock::time_point queue_data_time() const {
+    return message_reader_->queue_data_time();
+  }
+
+
+  // Adds more messages to the sorted list.  This reads enough data such that
+  // oldest_message_time can be replayed safely.  Returns false if the log file
+  // has all been read.
+  bool QueueMessages(monotonic_clock::time_point oldest_message_time);
 
  private:
+  // TODO(austin): Need to copy or refcount the message instead of running
+  // multiple copies of the reader.  Or maybe have a "as_node" index and hide it
+  // inside.
+
   // Moves to the next log file in the list.
   bool NextLogFile();
 
-  // Adds more messages to the sorted list.
-  void QueueMessages();
+  // Filenames of the log files.
+  std::vector<std::string> filenames_;
+  // And the index of the next file to open.
+  size_t next_filename_index_ = 0;
 
-  // Moves the message to the correct channel queue.
-  void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
-
-  // Pushes a pointer to the channel for the given timestamp to the sorted
-  // channel list.
-  void PushChannelHeap(monotonic_clock::time_point timestamp,
-                       int channel_index);
-
+  // Log file header to report.  This is a copy.
+  FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+  // Current log file being read.
+  std::unique_ptr<MessageReader> message_reader_;
 
   // Datastructure to hold the list of messages, cached timestamp for the
   // oldest message, and sender to send with.
-  struct ChannelData {
-    monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
-    std::deque<FlatbufferVector<MessageHeader>> data;
-    std::unique_ptr<RawSender> raw_sender;
+  struct MessageHeaderQueue {
+    // If true, this is a timestamp queue.
+    bool timestamps = false;
 
-    // Returns the oldest message.
-    const FlatbufferVector<MessageHeader> &front() { return data.front(); }
-
-    // Returns the timestamp for the oldest message.
-    const monotonic_clock::time_point front_timestamp() {
-      return monotonic_clock::time_point(
-          std::chrono::nanoseconds(front().message().monotonic_sent_time()));
+    // Returns a reference to the the oldest message.
+    FlatbufferVector<MessageHeader> &front() {
+      CHECK_GT(data_.size(), 0u);
+      return data_.front();
     }
+
+    // Adds a message to the back of the queue.
+    void emplace_back(FlatbufferVector<MessageHeader> &&msg);
+
+    // Drops the front message.  Invalidates the front() reference.
+    void pop_front();
+
+    // The size of the queue.
+    size_t size() { return data_.size(); }
+
+    // Returns the (timestamp, queue_index) for the oldest message.
+    const std::tuple<monotonic_clock::time_point, uint32_t> front_timestamp() {
+      CHECK_GT(data_.size(), 0u);
+      return std::make_tuple(
+          monotonic_clock::time_point(std::chrono::nanoseconds(
+              front().message().monotonic_sent_time())),
+          front().message().queue_index());
+    }
+
+    // Pointer to the timestamp merger for this queue if available.
+    TimestampMerger *timestamp_merger = nullptr;
+    // Pointer to the reader which feeds this queue.
+    SplitMessageReader *split_reader = nullptr;
+
+   private:
+    // The data.
+    std::deque<FlatbufferVector<MessageHeader>> data_;
   };
 
-  std::vector<std::string> filenames_;
-  size_t next_filename_index_ = 0;
+  // All the queues needed for a channel.  There isn't going to be data in all
+  // of these.
+  struct ChannelData {
+    // The data queue for the channel.
+    MessageHeaderQueue data;
+    // Queues for timestamps for each node.
+    std::vector<MessageHeaderQueue> timestamps;
+  };
 
-  FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
-  std::unique_ptr<MessageReader> message_reader_;
-
-  // TODO(austin): Multithreaded read at some point.  Gotta go faster!
-  // Especially if we start compressing.
-
-  // List of channels and messages for them.
+  // Data for all the channels.
   std::vector<ChannelData> channels_;
 
-  // Heap of channels so we can track which channel to send next.
+  // Once we know the node that this SplitMessageReader will be writing as,
+  // there will be only one MessageHeaderQueue that a specific channel matches.
+  // Precompute this here for efficiency.
+  std::vector<MessageHeaderQueue *> channels_to_write_;
+
+  // Number of messages queued.
+  size_t queued_messages_ = 0;
+};
+
+class ChannelMerger;
+
+// Sorts channels (and timestamps) from multiple log files for a single channel.
+class TimestampMerger {
+ public:
+  TimestampMerger(const Configuration *configuration,
+                  std::vector<SplitMessageReader *> split_message_readers,
+                  int channel_index, const Node *target_node,
+                  ChannelMerger *channel_merger);
+
+  // Metadata used to schedule the message.
+  struct DeliveryTimestamp {
+    monotonic_clock::time_point monotonic_event_time =
+        monotonic_clock::min_time;
+    realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+
+    monotonic_clock::time_point monotonic_remote_time =
+        monotonic_clock::min_time;
+    realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
+    uint32_t remote_queue_index = 0xffffffff;
+  };
+
+  // Pushes SplitMessageReader onto the timestamp heap.  This should only be
+  // called when timestamps are placed in the channel this class is merging for
+  // the reader.
+  void UpdateTimestamp(
+      SplitMessageReader *split_message_reader,
+      std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+    PushTimestampHeap(oldest_message_time, split_message_reader);
+  }
+  // Pushes SplitMessageReader onto the message heap.  This should only be
+  // called when data is placed in the channel this class is merging for the
+  // reader.
+  void Update(
+      SplitMessageReader *split_message_reader,
+      std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+    PushMessageHeap(oldest_message_time, split_message_reader);
+  }
+
+  // Returns the oldest combined timestamp and data for this channel.
+  std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
+
+  // Tracks if the channel merger has pushed this onto it's heap or not.
+  bool pushed() { return pushed_; }
+  // Sets if this has been pushed to the channel merger heap.  Should only be
+  // called by the channel merger.
+  void set_pushed(bool pushed) { pushed_ = pushed; }
+
+ private:
+  // Pushes messages and timestamps to the corresponding heaps.
+  void PushMessageHeap(
+      std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+      SplitMessageReader *split_message_reader);
+  void PushTimestampHeap(
+      std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+      SplitMessageReader *split_message_reader);
+
+  // Pops a message from the message heap.  This automatically triggers the
+  // split message reader to re-fetch any new data.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+  PopMessageHeap();
+  // Pops a message from the timestamp heap.  This automatically triggers the
+  // split message reader to re-fetch any new data.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+  PopTimestampHeap();
+
+  const Configuration *configuration_;
+
+  // If true, this is a forwarded channel and timestamps should be matched.
+  bool has_timestamps_ = false;
+
+  // Tracks if the ChannelMerger has pushed this onto it's queue.
+  bool pushed_ = false;
+
+  // The split message readers used for source data.
+  std::vector<SplitMessageReader *> split_message_readers_;
+
+  // The channel to merge.
+  int channel_index_;
+
+  // Our node.
+  int node_index_;
+
+  // Heaps for messages and timestamps.
+  std::vector<
+      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+      message_heap_;
+  std::vector<
+      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+      timestamp_heap_;
+
+  // Parent channel merger.
+  ChannelMerger *channel_merger_;
+};
+
+// This class handles constructing all the split message readers, channel
+// mergers, and combining the results.
+class ChannelMerger {
+ public:
+  // Builds a ChannelMerger around a set of log files.  These are of the format:
+  //   {
+  //     {log1_part0, log1_part1, ...},
+  //     {log2}
+  //   }
+  // The inner vector is a list of log file chunks which form up a log file.
+  // The outer vector is a list of log files with subsets of the messages, or
+  // messages from different nodes.
+  ChannelMerger(const std::vector<std::vector<std::string>> &filenames);
+
+  // Returns the nodes that we know how to merge.
+  const std::vector<const Node *> nodes() const;
+  // Sets the node that we will return messages as.  Returns true if the node
+  // has log files and will produce data.  This can only be called once, and
+  // will likely corrupt state if called a second time.
+  bool SetNode(const Node *target_node);
+
+  // Everything else needs the node set before it works.
+
+  // Returns a timestamp for the oldest message in this group of logfiles.
+  monotonic_clock::time_point OldestMessage() const;
+  // Pops the oldest message.
+  std::tuple<TimestampMerger::DeliveryTimestamp, int,
+             FlatbufferVector<MessageHeader>>
+  PopOldest();
+
+  // Returns the config for this set of log files.
+  const Configuration *configuration() const {
+    return log_file_header()->configuration();
+  }
+
+  const LogFileHeader *log_file_header() const {
+    return &log_file_header_.message();
+  }
+
+  // Returns the start times for the configured node's log files.
+  monotonic_clock::time_point monotonic_start_time() {
+    return monotonic_clock::time_point(
+        std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
+  }
+  realtime_clock::time_point realtime_start_time() {
+    return realtime_clock::time_point(
+        std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
+  }
+
+  // Returns the node set by SetNode above.
+  const Node *node() const { return node_; }
+
+  // Called by the TimestampMerger when new data is available with the provided
+  // timestamp and channel_index.
+  void Update(monotonic_clock::time_point timestamp, int channel_index) {
+    PushChannelHeap(timestamp, channel_index);
+  }
+
+ private:
+  // Queues messages from each SplitMessageReader until enough data is queued
+  // such that we can guarentee all sorting has happened.
+  void QueueMessages(monotonic_clock::time_point oldest_message_time);
+
+  // Pushes the timestamp for new data on the provided channel.
+  void PushChannelHeap(monotonic_clock::time_point timestamp,
+                       int channel_index);
+
+  // All the message readers.
+  std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
+
+  // The log header we are claiming to be.
+  FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+
+  // The timestamp mergers which combine data from the split message readers.
+  std::vector<TimestampMerger> timestamp_mergers_;
+
+  // A heap of the channel readers and timestamps for the oldest data in each.
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
 
+  // This holds a heap of split_message_readers sorted by the time at which they
+  // need to have QueueMessages called on them.
+  std::vector<std::pair<monotonic_clock::time_point, int>>
+      split_message_reader_heap_;
+
+  // Configured node.
+  const Node *node_;
+
+  // Cached copy of the list of nodes.
+  std::vector<const Node *> nodes_;
 };
 
 }  // namespace logger