Add helper classes for log reading

The goal is to inject span reader factory deep into the log reading
and sorting. LogFilesContainer encapsulates log files and provides
a couple of useful validations and abstractions to simplify log sorting.

It is the first part of change. Follow one will push SelectedLogParts
even deeper into the log reader.

Change-Id: Ic5253bd2b7c87fbbf55ad8d39a480af2871ddb71
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index d0957bf..af8a4e8 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -238,33 +238,28 @@
 LogReader::LogReader(std::string_view filename,
                      const Configuration *replay_configuration,
                      const ReplayChannels *replay_channels)
-    : LogReader(SortParts({std::string(filename)}), replay_configuration,
-                replay_channels) {}
+    : LogReader(LogFilesContainer(SortParts({std::string(filename)})),
+                replay_configuration, replay_channels) {}
 
 LogReader::LogReader(std::vector<LogFile> log_files,
                      const Configuration *replay_configuration,
                      const ReplayChannels *replay_channels)
+    : LogReader(LogFilesContainer(std::move(log_files)), replay_configuration,
+                replay_channels) {}
+
+LogReader::LogReader(LogFilesContainer log_files,
+                     const Configuration *replay_configuration,
+                     const ReplayChannels *replay_channels)
     : log_files_(std::move(log_files)),
       replay_configuration_(replay_configuration),
       replay_channels_(replay_channels) {
   SetStartTime(FLAGS_start_time);
   SetEndTime(FLAGS_end_time);
 
-  CHECK_GT(log_files_.size(), 0u);
   {
-    // Validate that we have the same config everwhere.  This will be true if
-    // all the parts were sorted together and the configs match.
-    const Configuration *config = nullptr;
-    for (const LogFile &log_file : log_files_) {
-      if (log_file.config.get() == nullptr) {
-        LOG(FATAL) << "Couldn't find a config in " << log_file;
-      }
-      if (config == nullptr) {
-        config = log_file.config.get();
-      } else {
-        CHECK_EQ(config, log_file.config.get());
-      }
-    }
+    // Log files container validates that log files shared the same config.
+    const Configuration *config = log_files_.config();
+    CHECK_NOTNULL(config);
   }
 
   if (replay_channels_ != nullptr) {
@@ -398,7 +393,7 @@
 }
 
 const Configuration *LogReader::logged_configuration() const {
-  return log_files_[0].config.get();
+  return log_files_.config();
 }
 
 const Configuration *LogReader::configuration() const {
@@ -603,25 +598,23 @@
   filters_ =
       std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
           event_loop_factory_->configuration(), logged_configuration(),
-          log_files_[0].boots, FLAGS_skip_order_validation,
+          log_files_.front_boots(), FLAGS_skip_order_validation,
           chrono::duration_cast<chrono::nanoseconds>(
               chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 
   std::vector<TimestampMapper *> timestamp_mappers;
   for (const Node *node : configuration::GetNodes(configuration())) {
-    const size_t node_index =
-        configuration::GetNodeIndex(configuration(), node);
-    std::vector<LogParts> filtered_parts = FilterPartsForNode(
-        log_files_, node != nullptr ? node->name()->string_view() : "");
+    size_t node_index = configuration::GetNodeIndex(configuration(), node);
+    std::string_view node_name = MaybeNodeName(node);
 
     // We don't run with threading on the buffering for simulated event loops
     // because we haven't attempted to validate how the interactions beteen the
     // buffering and the timestamp mapper works when running multiple nodes
     // concurrently.
     states_[node_index] = std::make_unique<State>(
-        filtered_parts.size() == 0u
+        !log_files_.ContainsPartsForNode(node_name)
             ? nullptr
-            : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+            : std::make_unique<TimestampMapper>(node_name, log_files_),
         filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
         State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndices(node),
         before_send_callbacks_);
@@ -792,21 +785,20 @@
   filters_ =
       std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
           event_loop->configuration(), logged_configuration(),
-          log_files_[0].boots, FLAGS_skip_order_validation,
+          log_files_.front_boots(), FLAGS_skip_order_validation,
           chrono::duration_cast<chrono::nanoseconds>(
               chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 
   std::vector<TimestampMapper *> timestamp_mappers;
   for (const Node *node : configuration::GetNodes(configuration())) {
+    auto node_name = MaybeNodeName(node);
     const size_t node_index =
         configuration::GetNodeIndex(configuration(), node);
-    std::vector<LogParts> filtered_parts = FilterPartsForNode(
-        log_files_, node != nullptr ? node->name()->string_view() : "");
 
     states_[node_index] = std::make_unique<State>(
-        filtered_parts.size() == 0u
+        !log_files_.ContainsPartsForNode(node_name)
             ? nullptr
-            : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+            : std::make_unique<TimestampMapper>(node_name, log_files_),
         filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
         State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndices(node),
         before_send_callbacks_);
@@ -1115,8 +1107,8 @@
           // case and tell the user what is happening so they can either update
           // their config to log the channel or can find a log with the data.
           const std::vector<std::string> logger_nodes =
-              FindLoggerNodes(log_files_);
-          if (logger_nodes.size()) {
+              log_files_.logger_nodes();
+          if (!logger_nodes.empty()) {
             // We have old logs which don't have the logger nodes logged.  In
             // that case, we can't be helpful :(
             bool data_logged = false;
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index b691b30..0486665 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -9,6 +9,8 @@
 #include <vector>
 
 #include "flatbuffers/flatbuffers.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
 
 #include "aos/condition.h"
 #include "aos/events/event_loop.h"
@@ -102,6 +104,9 @@
   LogReader(std::vector<LogFile> log_files,
             const Configuration *replay_configuration = nullptr,
             const ReplayChannels *replay_channels = nullptr);
+  LogReader(LogFilesContainer log_files,
+            const Configuration *replay_configuration = nullptr,
+            const ReplayChannels *replay_channels = nullptr);
   ~LogReader();
 
   // Registers all the callbacks to send the log file data out on an event loop
@@ -334,10 +339,7 @@
     return event_loop_factory_;
   }
 
-  std::string_view name() const { return log_files_[0].name; }
-  std::string_view log_event_uuid() const {
-    return log_files_[0].log_event_uuid;
-  }
+  std::string_view name() const { return log_files_.name(); }
 
   // Set whether to exit the SimulatedEventLoopFactory when we finish reading
   // the logfile.
@@ -420,7 +422,7 @@
   // entire event loop once all nodes are stopped.
   void NoticeRealtimeEnd();
 
-  const std::vector<LogFile> log_files_;
+  const LogFilesContainer log_files_;
 
   // Class to manage sending RemoteMessages on the provided node after the
   // correct delay.
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 582c3f5..0566120 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -1998,45 +1998,6 @@
   return sorter.SortParts();
 }
 
-std::vector<std::string> FindNodes(const std::vector<LogFile> &parts) {
-  std::set<std::string> nodes;
-  for (const LogFile &log_file : parts) {
-    for (const LogParts &part : log_file.parts) {
-      nodes.insert(part.node);
-    }
-  }
-  std::vector<std::string> node_list;
-  while (!nodes.empty()) {
-    node_list.emplace_back(std::move(nodes.extract(nodes.begin()).value()));
-  }
-  return node_list;
-}
-
-std::vector<std::string> FindLoggerNodes(const std::vector<LogFile> &parts) {
-  std::set<std::string> nodes;
-  for (const LogFile &log_file : parts) {
-    nodes.insert(log_file.logger_node);
-  }
-  std::vector<std::string> node_list;
-  while (!nodes.empty()) {
-    node_list.emplace_back(nodes.extract(nodes.begin()).value());
-  }
-  return node_list;
-}
-
-std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
-                                         std::string_view node) {
-  std::vector<LogParts> result;
-  for (const LogFile &log_file : parts) {
-    for (const LogParts &part : log_file.parts) {
-      if (part.node == node) {
-        result.emplace_back(part);
-      }
-    }
-  }
-  return result;
-}
-
 std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
   stream << "{\n";
   if (!file.log_event_uuid.empty()) {
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 266619b..41a3498 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -4,6 +4,9 @@
 #include <iostream>
 #include <map>
 #include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include "aos/configuration.h"
@@ -137,15 +140,6 @@
 // Sort parts of a single log.
 std::vector<LogFile> SortParts(const LogSource &log_source);
 
-// Finds all the nodes which have parts logged from their point of view.
-std::vector<std::string> FindNodes(const std::vector<LogFile> &parts);
-// Finds all the parts which are from the point of view of a single node.
-std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
-                                         std::string_view node);
-
-// Finds all the nodes on which the loggers which generated these log files ran.
-std::vector<std::string> FindLoggerNodes(const std::vector<LogFile> &parts);
-
 // Recursively searches the file/folder for .bfbs and .bfbs.xz files and adds
 // them to the vector.
 void FindLogs(std::vector<std::string> *files, std::string filename);
@@ -157,6 +151,186 @@
 // Recursively searches for logfiles in argv[1] and onward.
 std::vector<std::string> FindLogs(int argc, char **argv);
 
+// Validates that collection of log files or log parts shares the same configs.
+template <typename TCollection>
+bool CheckMatchingConfigs(const TCollection &items) {
+  const Configuration *config = nullptr;
+  for (const auto &item : items) {
+    VLOG(1) << item;
+    if (config == nullptr) {
+      config = item.config.get();
+    } else {
+      if (config != item.config.get()) {
+        LOG(ERROR) << ": Config mismatched: " << config << " vs. "
+                   << item.config.get();
+        return false;
+      }
+    }
+  }
+  if (config == nullptr) {
+    LOG(ERROR) << ": No configs are found";
+    return false;
+  }
+  return true;
+}
+
+// Collection of log parts that associated with pair: node and boot.
+class SelectedLogParts {
+ public:
+  SelectedLogParts(std::optional<const LogSource *> log_source,
+                   std::string_view node_name, size_t boot_count,
+                   std::vector<LogParts> log_parts)
+      : log_source_(log_source),
+        node_name_(node_name),
+        boot_count_(boot_count),
+        log_parts_(std::move(log_parts)) {
+    CHECK_GT(log_parts_.size(), 0u) << ": Nothing was selected for node "
+                                    << node_name << " boot " << boot_count;
+    configs_matched_ = CheckMatchingConfigs(log_parts_);
+
+    // Enforce that we are sorting things only from a single node from a single
+    // boot.
+    const std::string_view part0_source_boot_uuid =
+        log_parts_.front().source_boot_uuid;
+    for (const auto &part : log_parts_) {
+      CHECK_EQ(node_name_, part.node) << ": Can't merge different nodes.";
+      CHECK_EQ(part0_source_boot_uuid, part.source_boot_uuid)
+          << ": Can't merge different boots.";
+      CHECK_EQ(boot_count_, part.boot_count);
+    }
+  }
+
+  // Use items in fancy loops.
+  auto begin() { return log_parts_.begin(); }
+  auto end() { return log_parts_.end(); }
+  auto cbegin() const { return log_parts_.cbegin(); }
+  auto cend() const { return log_parts_.cend(); }
+  auto begin() const { return log_parts_.begin(); }
+  auto end() const { return log_parts_.end(); }
+
+  const Configuration *config() const {
+    // TODO (Alexei): it is a first usage of assumption that all parts have
+    // matching configs. Should it be strong requirement and validated in the
+    // constructor?
+    CHECK(configs_matched_);
+    return log_parts_.front().config.get();
+  }
+
+  const std::string &node_name() const { return node_name_; }
+
+  // Number of boots found in the log parts.
+  size_t boot_count() const { return boot_count_; }
+
+ private:
+  std::optional<const LogSource *> log_source_;
+  std::string node_name_;
+  size_t boot_count_;
+  std::vector<LogParts> log_parts_;
+
+  // Indicates that all parts shared the same config.
+  bool configs_matched_;
+};
+
+// Container that keeps a sorted list of log files and provides functions that
+// commonly used during log reading.
+class LogFilesContainer {
+ public:
+  // Initializes log file container with the list of sorted files (results of
+  // SortParts).
+  explicit LogFilesContainer(std::vector<LogFile> log_files)
+      : LogFilesContainer(std::nullopt, std::move(log_files)) {}
+
+  // Sorts and initializes log container with files from an abstract log source.
+  explicit LogFilesContainer(const LogSource *log_source)
+      : LogFilesContainer(log_source, SortParts(*log_source)) {}
+
+  // Returns true when at least on of the log files associated with node.
+  bool ContainsPartsForNode(std::string_view node_name) const {
+    // TODO (Alexei): Implement
+    // https://en.cppreference.com/w/cpp/container/unordered_map/find with C++20
+    return nodes_boots_.count(std::string(node_name)) > 0;
+  }
+
+  // Returns numbers of reboots found in log files associated with the node.
+  size_t BootsForNode(std::string_view node_name) const {
+    const auto &node_item = nodes_boots_.find(std::string(node_name));
+    CHECK(node_item != nodes_boots_.end())
+        << ": Missing parts associated with node " << node_name;
+    CHECK_GT(node_item->second, 0u) << ": No boots for node " << node_name;
+    return node_item->second;
+  }
+
+  // Get only log parts that associated with node and boot number.
+  SelectedLogParts SelectParts(std::string_view node_name,
+                               size_t boot_count) const {
+    std::vector<LogParts> result;
+    for (const LogFile &log_file : log_files_) {
+      for (const LogParts &part : log_file.parts) {
+        if (part.node == node_name && part.boot_count == boot_count) {
+          result.emplace_back(part);
+        }
+      }
+    }
+    return SelectedLogParts(log_source_, node_name, boot_count, result);
+  }
+
+  // It provides access to boots of the first element. I'm not sure why...
+  const auto &front_boots() const { return log_files_.front().boots; }
+
+  // Access the configuration shared with all log files in the container.
+  const Configuration *config() const {
+    // TODO (Alexei): it is a first usage of assumption that all parts have
+    // matching configs. Should it be strong requirement and validated in the
+    // constructor?
+    CHECK(configs_matched_);
+    return log_files_.front().config.get();
+  }
+
+  // List of logger nodes for given set of log files.
+  const auto &logger_nodes() const { return logger_nodes_; }
+
+  // TODO (Alexei): it is not clear what it represents for multiple log events.
+  // Review its usage.
+  std::string_view name() const { return log_files_[0].name; }
+
+ private:
+  LogFilesContainer(std::optional<const LogSource *> log_source,
+                    std::vector<LogFile> log_files)
+      : log_source_(log_source), log_files_(std::move(log_files)) {
+    CHECK_GT(log_files_.size(), 0u);
+
+    std::unordered_set<std::string> logger_nodes;
+
+    // Scan and collect all related nodes and number of reboots per node.
+    for (const LogFile &log_file : log_files_) {
+      for (const LogParts &part : log_file.parts) {
+        auto node_item = nodes_boots_.find(part.node);
+        if (node_item != nodes_boots_.end()) {
+          node_item->second = std::max(node_item->second, part.boot_count + 1);
+        } else {
+          nodes_boots_[part.node] = part.boot_count + 1;
+        }
+      }
+      logger_nodes.insert(log_file.logger_node);
+    }
+    while (!logger_nodes.empty()) {
+      logger_nodes_.emplace_back(
+          logger_nodes.extract(logger_nodes.begin()).value());
+    }
+    configs_matched_ = CheckMatchingConfigs(log_files_);
+  }
+
+  std::optional<const LogSource *> log_source_;
+  std::vector<LogFile> log_files_;
+
+  // Keeps information about nodes and number of reboots per node.
+  std::unordered_map<std::string, size_t> nodes_boots_;
+  std::vector<std::string> logger_nodes_;
+
+  // Indicates that all parts shared the same config.
+  bool configs_matched_;
+};
+
 }  // namespace logger
 }  // namespace aos
 
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index b76bcad..62c19ae 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1845,21 +1845,12 @@
   return ss.str();
 }
 
-PartsMerger::PartsMerger(std::vector<LogParts> parts) {
-  CHECK_GE(parts.size(), 1u);
-  // Enforce that we are sorting things only from a single node from a single
-  // boot.
-  const std::string_view part0_node = parts[0].node;
-  const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
-  for (size_t i = 1; i < parts.size(); ++i) {
-    CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
-    CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
-        << ": Can't merge different boots.";
-  }
+PartsMerger::PartsMerger(std::string_view node_name, size_t boot_count,
+                         const LogFilesContainer &log_files) {
+  const auto parts = log_files.SelectParts(node_name, boot_count);
+  node_ = configuration::GetNodeIndex(parts.config(), node_name);
 
-  node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
-
-  for (LogParts &part : parts) {
+  for (LogParts part : parts) {
     message_sorters_.emplace_back(std::move(part));
   }
 
@@ -1977,26 +1968,14 @@
   current_ = nullptr;
 }
 
-BootMerger::BootMerger(std::vector<LogParts> files) {
-  std::vector<std::vector<LogParts>> boots;
-
-  // Now, we need to split things out by boot.
-  for (size_t i = 0; i < files.size(); ++i) {
-    const size_t boot_count = files[i].boot_count;
-    if (boot_count + 1 > boots.size()) {
-      boots.resize(boot_count + 1);
-    }
-    boots[boot_count].emplace_back(std::move(files[i]));
-  }
-
-  parts_mergers_.reserve(boots.size());
-  for (size_t i = 0; i < boots.size(); ++i) {
+BootMerger::BootMerger(std::string_view node_name,
+                       const LogFilesContainer &log_files) {
+  size_t number_of_boots = log_files.BootsForNode(node_name);
+  parts_mergers_.reserve(number_of_boots);
+  for (size_t i = 0; i < number_of_boots; ++i) {
     VLOG(2) << "Boot " << i;
-    for (auto &p : boots[i]) {
-      VLOG(2) << "Part " << p;
-    }
     parts_mergers_.emplace_back(
-        std::make_unique<PartsMerger>(std::move(boots[i])));
+        std::make_unique<PartsMerger>(node_name, i, log_files));
   }
 }
 
@@ -2030,8 +2009,9 @@
   return results;
 }
 
-TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
-    : boot_merger_(std::move(parts)),
+TimestampMapper::TimestampMapper(std::string_view node_name,
+                                 const LogFilesContainer &log_files)
+    : boot_merger_(node_name, log_files),
       timestamp_callback_([](TimestampedMessage *) {}) {
   for (const LogParts *part : boot_merger_.Parts()) {
     if (!configuration_) {
@@ -2041,7 +2021,7 @@
     }
   }
   const Configuration *config = configuration_.get();
-  // Only fill out nodes_data_ if there are nodes.  Otherwise everything gets
+  // Only fill out nodes_data_ if there are nodes. Otherwise, everything is
   // pretty simple.
   if (configuration::MultiNode(config)) {
     nodes_data_.resize(config->nodes()->size());
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 67b6b84..3281bc8 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -592,7 +592,8 @@
 // boot.
 class PartsMerger {
  public:
-  PartsMerger(std::vector<LogParts> parts);
+  PartsMerger(std::string_view node_name, size_t boot_count,
+              const LogFilesContainer &log_files);
 
   // Copying and moving will mess up the internal raw pointers.  Just don't do
   // it.
@@ -657,7 +658,7 @@
 // stream.
 class BootMerger {
  public:
-  BootMerger(std::vector<LogParts> file);
+  BootMerger(std::string_view node_name, const LogFilesContainer &log_files);
 
   // Copying and moving will mess up the internal raw pointers.  Just don't do
   // it.
@@ -716,7 +717,8 @@
 // notifying when new data is queued as well as queueing until a point in time.
 class TimestampMapper {
  public:
-  TimestampMapper(std::vector<LogParts> file);
+  TimestampMapper(std::string_view node_name,
+                  const LogFilesContainer &log_files);
 
   // Copying and moving will mess up the internal raw pointers.  Just don't do
   // it.
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 8c057e9..cd6da67 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -764,9 +764,10 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
   ASSERT_EQ(parts.size(), 1u);
 
-  PartsMerger merger(FilterPartsForNode(parts, "pi1"));
+  PartsMerger merger("pi1", 0, log_files);
 
   EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
 
@@ -865,9 +866,10 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
   ASSERT_EQ(parts.size(), 1u);
 
-  PartsMerger merger(FilterPartsForNode(parts, "pi1"));
+  PartsMerger merger("pi1", 0, log_files);
 
   EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
 
@@ -929,16 +931,17 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
-
+  LogFilesContainer log_files(parts);
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1071,20 +1074,21 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
-
+  LogFilesContainer log_files(parts);
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   // mapper0 will not provide any messages while mapper1 will provide all
   // messages due to the channel filter callbacks used
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   mapper0.set_replay_channels_callback(
       [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
   mapper1.set_replay_channels_callback(
@@ -1211,19 +1215,15 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
-
-  for (const auto &p : parts) {
-    LOG(INFO) << p;
-  }
-
+  LogFilesContainer log_files(parts);
   ASSERT_EQ(parts.size(), 1u);
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1335,16 +1335,17 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
 
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1454,16 +1455,17 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
 
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1537,16 +1539,17 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
 
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1621,16 +1624,17 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
 
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1695,16 +1699,17 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
 
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1761,9 +1766,10 @@
 
   const std::vector<LogFile> parts =
       SortParts({logfile0_, logfile1_, logfile2_});
+  LogFilesContainer log_files(parts);
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
 
@@ -1799,12 +1805,13 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
 
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1872,16 +1879,17 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
 
   ASSERT_EQ(parts[0].logger_node, "pi1");
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -2130,10 +2138,11 @@
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  LogFilesContainer log_files(parts);
   ASSERT_EQ(parts.size(), 1u);
   ASSERT_EQ(parts[0].parts.size(), 2u);
 
-  BootMerger merger(FilterPartsForNode(parts, "pi2"));
+  BootMerger merger("pi2", log_files);
 
   EXPECT_EQ(merger.node(), 1u);
 
@@ -2308,6 +2317,7 @@
 
   const std::vector<LogFile> parts =
       SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+  LogFilesContainer log_files(parts);
 
   for (const auto &x : parts) {
     LOG(INFO) << x;
@@ -2316,11 +2326,11 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -2526,6 +2536,7 @@
 
   const std::vector<LogFile> parts =
       SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+  LogFilesContainer log_files(parts);
 
   for (const auto &x : parts) {
     LOG(INFO) << x;
@@ -2534,11 +2545,11 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper0("pi1", log_files);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+  TimestampMapper mapper1("pi2", log_files);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
diff --git a/aos/events/logging/single_node_merge.cc b/aos/events/logging/single_node_merge.cc
index cc78c2b..afb88af 100644
--- a/aos/events/logging/single_node_merge.cc
+++ b/aos/events/logging/single_node_merge.cc
@@ -19,30 +19,10 @@
 
 namespace chrono = std::chrono;
 
-std::string LogFileVectorToString(std::vector<logger::LogFile> log_files) {
-  std::stringstream ss;
-  for (const auto &f : log_files) {
-    ss << f << "\n";
-  }
-  return ss.str();
-}
-
 int Main(int argc, char **argv) {
   const std::vector<std::string> unsorted_logfiles = FindLogs(argc, argv);
-  const std::vector<LogFile> log_files = SortParts(unsorted_logfiles);
-
-  CHECK_GT(log_files.size(), 0u);
-  // Validate that we have the same config everwhere.  This will be true if
-  // all the parts were sorted together and the configs match.
-  const Configuration *config = nullptr;
-  for (const LogFile &log_file : log_files) {
-    VLOG(1) << log_file;
-    if (config == nullptr) {
-      config = log_file.config.get();
-    } else {
-      CHECK_EQ(config, log_file.config.get());
-    }
-  }
+  const LogFilesContainer log_files(SortParts(unsorted_logfiles));
+  const Configuration *config = log_files.config();
 
   // Haven't tested this on a single node log, and don't really see a need to
   // right now.  The higher layers just work.
@@ -53,16 +33,14 @@
   TimestampMapper *node_mapper = nullptr;
 
   for (const Node *node : configuration::GetNodes(config)) {
-    std::vector<LogParts> filtered_parts =
-        FilterPartsForNode(log_files, node->name()->string_view());
-
+    const auto node_name = MaybeNodeName(node);
     // Confirm that all the parts are from the same boot if there are enough
     // parts to not be from the same boot.
-    if (!filtered_parts.empty()) {
+    if (log_files.ContainsPartsForNode(node_name)) {
       // Filter the parts relevant to each node when building the mapper.
       mappers.emplace_back(
-          std::make_unique<TimestampMapper>(std::move(filtered_parts)));
-      if (node->name()->string_view() == FLAGS_node) {
+          std::make_unique<TimestampMapper>(node_name, log_files));
+      if (node_name == FLAGS_node) {
         node_mapper = mappers.back().get();
       }
     } else {
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index 67cbc44..cf0ea5a 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -17,30 +17,10 @@
 
 namespace chrono = std::chrono;
 
-std::string LogFileVectorToString(std::vector<logger::LogFile> log_files) {
-  std::stringstream ss;
-  for (const auto &f : log_files) {
-    ss << f << "\n";
-  }
-  return ss.str();
-}
-
 int Main(int argc, char **argv) {
   const std::vector<std::string> unsorted_logfiles = FindLogs(argc, argv);
-  const std::vector<LogFile> log_files = SortParts(unsorted_logfiles);
-
-  CHECK_GT(log_files.size(), 0u);
-  // Validate that we have the same config everwhere.  This will be true if
-  // all the parts were sorted together and the configs match.
-  const Configuration *config = nullptr;
-  for (const LogFile &log_file : log_files) {
-    VLOG(1) << log_file;
-    if (config == nullptr) {
-      config = log_file.config.get();
-    } else {
-      CHECK_EQ(config, log_file.config.get());
-    }
-  }
+  const LogFilesContainer log_files(SortParts(unsorted_logfiles));
+  const Configuration *config = log_files.config();
 
   CHECK(configuration::MultiNode(config))
       << ": Timestamps only make sense in a multi-node world.";
@@ -49,15 +29,13 @@
   std::vector<std::unique_ptr<TimestampMapper>> mappers;
 
   for (const Node *node : configuration::GetNodes(config)) {
-    std::vector<LogParts> filtered_parts =
-        FilterPartsForNode(log_files, node->name()->string_view());
-
+    auto node_name = MaybeNodeName(node);
     // Confirm that all the parts are from the same boot if there are enough
     // parts to not be from the same boot.
-    if (!filtered_parts.empty()) {
+    if (!log_files.ContainsPartsForNode(node_name)) {
       // Filter the parts relevant to each node when building the mapper.
       mappers.emplace_back(
-          std::make_unique<TimestampMapper>(std::move(filtered_parts)));
+          std::make_unique<TimestampMapper>(node_name, log_files));
     } else {
       mappers.emplace_back(nullptr);
     }
@@ -65,7 +43,7 @@
 
   // Now, build up the estimator used to solve for time.
   message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
-      config, config, log_files[0].boots, FLAGS_skip_order_validation,
+      config, config, log_files.front_boots(), FLAGS_skip_order_validation,
       chrono::seconds(0));
   multinode_estimator.set_reboot_found(
       [config](distributed_clock::time_point reboot_time,