Switch LogReader to the new API, and remove the old

Now that we have a fancy new log sorter, lets flip over to using it.
This doesn't simplify anything yet on the log reader side to take
advantage of the simpler code.

While we are here, the new API really wants a LogFiles object instead of
vectors of strings.  Convert any calls over to the new LogFiles API.

The log file was updated to add UUIDs.  They are required with
multi-node log files, and it doesn't seem worth changing that
requirement for this old log.

Change-Id: I84bd63c7339ec43ed01c106131153e1cb6d213bb
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index f6a037b..f0f0a69 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -319,24 +319,10 @@
   // pass it in here. It must provide all the channels that the original logged
   // config did.
   //
-  // Log filenames are in the following format:
-  //
-  //   {
-  //     {log1_part0, log1_part1, ...},
-  //     {log2}
-  //   }
-  // The inner vector is a list of log file chunks which form up a log file.
-  // The outer vector is a list of log files with subsets of the messages, or
-  // messages from different nodes.
-  //
-  // If the outer vector isn't provided, it is assumed to be of size 1.
+  // The single file constructor calls SortParts internally.
   LogReader(std::string_view filename,
             const Configuration *replay_configuration = nullptr);
-  LogReader(const std::vector<std::string> &filenames,
-            const Configuration *replay_configuration = nullptr);
-  LogReader(const std::vector<std::vector<std::string>> &filenames,
-            const Configuration *replay_configuration = nullptr);
-  LogReader(const std::vector<LogFile> &log_files,
+  LogReader(std::vector<LogFile> log_files,
             const Configuration *replay_configuration = nullptr);
   ~LogReader();
 
@@ -450,7 +436,7 @@
                : logged_configuration()->nodes()->size();
   }
 
-  const std::vector<std::vector<std::string>> filenames_;
+  const std::vector<LogFile> log_files_;
 
   // This is *a* log file header used to provide the logged config.  The rest of
   // the header is likely distracting.
@@ -466,14 +452,15 @@
   // State per node.
   class State {
    public:
-    State(std::unique_ptr<ChannelMerger> channel_merger);
+    State(std::unique_ptr<TimestampMapper> timestamp_mapper);
+
+    // Connects up the timestamp mappers.
+    void AddPeer(State *peer);
 
     // Returns the timestamps, channel_index, and message from a channel.
     // update_time (will be) set to true when popping this message causes the
     // filter to change the time offset estimation function.
-    std::tuple<TimestampMerger::DeliveryTimestamp, int,
-               SizePrefixedFlatbufferVector<MessageHeader>>
-    PopOldest(bool *update_time);
+    TimestampedMessage PopOldest(bool *update_time);
 
     // Returns the monotonic time of the oldest message.
     monotonic_clock::time_point OldestMessageTime() const;
@@ -484,10 +471,12 @@
 
     // Returns the starting time for this node.
     monotonic_clock::time_point monotonic_start_time() const {
-      return channel_merger_->monotonic_start_time();
+      return timestamp_mapper_ ? timestamp_mapper_->monotonic_start_time()
+                               : monotonic_clock::min_time;
     }
     realtime_clock::time_point realtime_start_time() const {
-      return channel_merger_->realtime_start_time();
+      return timestamp_mapper_ ? timestamp_mapper_->realtime_start_time()
+                               : realtime_clock::min_time;
     }
 
     // Sets the node event loop factory for replaying into a
@@ -555,10 +544,6 @@
       return node_event_loop_factory_->monotonic_now();
     }
 
-    // Sets the node we will be merging as, and returns true if there is any
-    // data on it.
-    bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
-
     // Sets the number of channels.
     void SetChannelCount(size_t count);
 
@@ -570,7 +555,9 @@
                     State *source_state);
 
     // Returns if we have read all the messages from all the logs.
-    bool at_end() const { return channel_merger_->at_end(); }
+    bool at_end() const {
+      return timestamp_mapper_ ? timestamp_mapper_->Front() == nullptr : true;
+    }
 
     // Unregisters everything so we can destory the event loop.
     void Deregister();
@@ -586,8 +573,7 @@
     }
 
     // Sends a buffer on the provided channel index.
-    bool Send(size_t channel_index, const void *data, size_t size,
-              const TimestampMerger::DeliveryTimestamp &delivery_timestamp);
+    bool Send(const TimestampedMessage &timestamped_message);
 
     // Returns a debug string for the channel merger.
     std::string DebugString() const {
@@ -599,22 +585,24 @@
                    << "]: " << std::get<0>(message).monotonic_event_time << " "
                    << configuration::StrippedChannelToString(
                           event_loop_->configuration()->channels()->Get(
-                              std::get<2>(message).message().channel_index()))
+                              std::get<0>(message).channel_index))
                    << "\n";
         } else if (i == 7) {
           messages << "...\n";
         }
         ++i;
       }
-      return messages.str() + channel_merger_->DebugString();
+      if (!timestamp_mapper_) {
+        return messages.str();
+      }
+      return messages.str() + timestamp_mapper_->DebugString();
     }
 
    private:
     // Log file.
-    std::unique_ptr<ChannelMerger> channel_merger_;
+    std::unique_ptr<TimestampMapper> timestamp_mapper_;
 
-    std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
-                          SizePrefixedFlatbufferVector<MessageHeader>,
+    std::deque<std::tuple<TimestampedMessage,
                           message_bridge::NoncausalOffsetEstimator *>>
         sorted_messages_;