Add factory for SpanReader

Change-Id: Id3f382ed7c1ad4ca42ed674d1e8e98a2045ac8fc
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 81305d7..5568770 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -319,12 +319,9 @@
   // There are enough of these in the wild that this is worth supporting.
   std::vector<UnsortedOldParts> old_parts;
 
-  // Some implementations have slow destructors, so save those for later to let
-  // the slow parts can mostly run in parallel.
-  std::vector<SpanReader> part_readers;
-
   // Populates the class's datastructures from the input file list.
-  void PopulateFromFiles(const std::vector<std::string> &parts);
+  void PopulateFromFiles(ReadersPool *readers,
+                         const std::vector<std::string> &parts);
 
   // Wrangles everything into a map of boot uuids -> boot counts.
   MapBoots ComputeBootCounts();
@@ -340,19 +337,18 @@
   // Reformats parts_list into a list of logfiles and returns it.  This destroys
   // state in PartsSorter.
   std::vector<LogFile> FormatNewParts();
+
+  // Returns a list of all the parts that we have sorted.
+  std::vector<LogFile> SortParts();
 };
 
-void PartsSorter::PopulateFromFiles(const std::vector<std::string> &parts) {
+void PartsSorter::PopulateFromFiles(ReadersPool *readers,
+                                    const std::vector<std::string> &parts) {
   // Now extract everything into our datastructures above for sorting.
   for (const std::string &part : parts) {
-    if (part_readers.size() > 50) {
-      // Don't leave arbitrary numbers of readers open, because they each take
-      // resources, so close a big batch at once periodically.
-      part_readers.clear();
-    }
-    part_readers.emplace_back(part, FLAGS_quiet_sorting);
+    SpanReader *reader = readers->BorrowReader(part);
     std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
-        ReadHeader(&part_readers.back());
+        ReadHeader(reader);
     if (!log_header) {
       if (!FLAGS_quiet_sorting) {
         LOG(WARNING) << "Skipping " << part << " without a header";
@@ -1970,27 +1966,29 @@
   return result;
 }
 
-std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
-  PartsSorter sorter;
-  sorter.PopulateFromFiles(parts);
-
-  if (sorter.old_parts.empty() && sorter.parts_list.empty()) {
-    if (parts.empty()) {
-      return std::vector<LogFile>{};
-    } else {
+std::vector<LogFile> PartsSorter::SortParts() {
+  if (old_parts.empty() && parts_list.empty()) {
+    if (!corrupted.empty()) {
       LogFile log_file;
-      log_file.corrupted = std::move(sorter.corrupted);
+      log_file.corrupted = std::move(corrupted);
       return std::vector<LogFile>{log_file};
     }
+    return std::vector<LogFile>{};
   }
-  CHECK_NE(sorter.old_parts.empty(), sorter.parts_list.empty())
+  CHECK_NE(old_parts.empty(), parts_list.empty())
       << ": Can't have a mix of old and new parts.";
-
-  if (!sorter.old_parts.empty()) {
-    return sorter.FormatOldParts();
+  if (!old_parts.empty()) {
+    return FormatOldParts();
+  } else {
+    return FormatNewParts();
   }
+}
 
-  return sorter.FormatNewParts();
+std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
+  LogReadersPool readers;
+  PartsSorter sorter;
+  sorter.PopulateFromFiles(&readers, parts);
+  return sorter.SortParts();
 }
 
 std::vector<std::string> FindNodes(const std::vector<LogFile> &parts) {
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 3c8c50d..fbec1b5 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -66,6 +66,8 @@
             "corrupt message found by MessageReader be silently ignored, "
             "providing access to all uncorrupted messages in a logfile.");
 
+DECLARE_bool(quiet_sorting);
+
 namespace aos::logger {
 namespace {
 namespace chrono = std::chrono;
@@ -1281,6 +1283,23 @@
   return true;
 }
 
+LogReadersPool::LogReadersPool(const LogSource *log_source, size_t pool_size)
+    : log_source_(log_source), pool_size_(pool_size) {}
+
+SpanReader *LogReadersPool::BorrowReader(std::string_view id) {
+  if (part_readers_.size() > pool_size_) {
+    // Don't leave arbitrary numbers of readers open, because they each take
+    // resources, so close a big batch at once periodically.
+    part_readers_.clear();
+  }
+  if (log_source_ == nullptr) {
+    part_readers_.emplace_back(id, FLAGS_quiet_sorting);
+  } else {
+    part_readers_.emplace_back(id, log_source_->GetDecoder(id));
+  }
+  return &part_readers_.back();
+}
+
 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
     SpanReader *span_reader) {
   absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f8d6b8e..b04be9f 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -228,6 +228,30 @@
   bool is_finished_ = false;
 };
 
+//  Class to borrow log readers from pool based on their ids. This is used as a
+//  factory and helps with performance when construction or descrution of
+//  decoders are not free. For instance,, S3 fetchers are slow to destroy.
+class ReadersPool {
+ public:
+  virtual ~ReadersPool() = default;
+
+  // Borrow reader from pool based on the id.
+  virtual SpanReader *BorrowReader(std::string_view id) = 0;
+};
+
+class LogReadersPool : public ReadersPool {
+ public:
+  explicit LogReadersPool(const LogSource *log_source = nullptr,
+                          size_t pool_size = 50);
+
+  SpanReader *BorrowReader(std::string_view id) override;
+
+ private:
+  const LogSource *log_source_;
+  std::vector<SpanReader> part_readers_;
+  const size_t pool_size_;
+};
+
 // Reads the last header from a log file.  This handles any duplicate headers
 // that were written.
 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(