Split out an interface for finding log files

This allows new implementations which list log files from cloud storage,
etc.

Change-Id: I962946962e92eedf582e6b0b3c65e9ba606835cb
Signed-off-by: Brian Silverman <bsilver16384@gmail.com>
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index c2134ad..ce9b3ae 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -27,10 +27,65 @@
          str.substr(str.size() - ending.size()) == ending;
 }
 
-bool FileExists(std::string filename) {
-  struct stat stat_results;
-  int error = stat(filename.c_str(), &stat_results);
-  return error == 0;
+class FileOperations {
+ public:
+  virtual ~FileOperations() = default;
+
+  virtual bool Exists() = 0;
+  virtual void FindLogs(std::vector<std::string> *files) = 0;
+};
+
+// Implements FileOperations with standard POSIX filesystem APIs. These work on
+// files local to the machine they're running on.
+class LocalFileOperations final : public FileOperations {
+ public:
+  LocalFileOperations(std::string_view filename) : filename_(filename) {}
+
+  bool Exists() override {
+    struct stat stat_results;
+    int error = stat(filename_.c_str(), &stat_results);
+    return error == 0;
+  }
+
+  void FindLogs(std::vector<std::string> *files) override;
+
+ private:
+  std::string filename_;
+};
+
+bool IsValidFilename(std::string_view filename) {
+  return EndsWith(filename, ".bfbs") || EndsWith(filename, ".bfbs.xz");
+}
+
+std::unique_ptr<FileOperations> MakeFileOperations(std::string_view filename) {
+  if (filename.find("://") != filename.npos) {
+    LOG(FATAL) << "This looks like a URL of an unknown type: " << filename;
+  }
+  return std::make_unique<LocalFileOperations>(filename);
+}
+
+void LocalFileOperations::FindLogs(std::vector<std::string> *files) {
+  DIR *directory = opendir(filename_.c_str());
+
+  if (directory == nullptr) {
+    if (IsValidFilename(filename_)) {
+      files->emplace_back(filename_);
+    }
+    return;
+  }
+
+  struct dirent *directory_entry;
+  while ((directory_entry = readdir(directory)) != nullptr) {
+    std::string next_filename = directory_entry->d_name;
+    if (next_filename == "." || next_filename == "..") {
+      continue;
+    }
+
+    std::string path = filename_ + "/" + next_filename;
+    std::make_unique<LocalFileOperations>(path)->FindLogs(files);
+  }
+
+  closedir(directory);
 }
 
 bool ConfigOnly(const LogFileHeader *header) {
@@ -79,27 +134,7 @@
 }  // namespace
 
 void FindLogs(std::vector<std::string> *files, std::string filename) {
-  DIR *directory = opendir(filename.c_str());
-
-  if (directory == nullptr) {
-    if (EndsWith(filename, ".bfbs") || EndsWith(filename, ".bfbs.xz")) {
-      files->emplace_back(filename);
-    }
-    return;
-  }
-
-  struct dirent *directory_entry;
-  while ((directory_entry = readdir(directory)) != nullptr) {
-    std::string next_filename = directory_entry->d_name;
-    if (next_filename == "." || next_filename == "..") {
-      continue;
-    }
-
-    std::string path = filename + "/" + next_filename;
-    FindLogs(files, path);
-  }
-
-  closedir(directory);
+  MakeFileOperations(filename)->FindLogs(files);
 }
 
 std::vector<std::string> FindLogs(std::string filename) {
@@ -113,8 +148,9 @@
 
   for (int i = 1; i < argc; i++) {
     std::string filename = argv[i];
-    if (FileExists(filename)) {
-      aos::logger::FindLogs(&found_logfiles, filename);
+    const auto file_operations = MakeFileOperations(filename);
+    if (file_operations->Exists()) {
+      file_operations->FindLogs(&found_logfiles);
     } else {
       LOG(FATAL) << "File " << filename << " does not exist";
     }
@@ -268,6 +304,10 @@
   // There are enough of these in the wild that this is worth supporting.
   std::vector<UnsortedOldParts> old_parts;
 
+  // Some implementations have slow destructors, so save those for later to let
+  // the slow parts can mostly run in parallel.
+  std::vector<SpanReader> part_readers;
+
   // Populates the class's datastructures from the input file list.
   void PopulateFromFiles(const std::vector<std::string> &parts);
 
@@ -290,8 +330,14 @@
 void PartsSorter::PopulateFromFiles(const std::vector<std::string> &parts) {
   // Now extract everything into our datastructures above for sorting.
   for (const std::string &part : parts) {
+    if (part_readers.size() > 200) {
+      // Don't leave arbitrary numbers of readers open, because they each take
+      // resources, so close a big batch at once periodically.
+      part_readers.clear();
+    }
+    part_readers.emplace_back(part);
     std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
-        ReadHeader(part);
+        ReadHeader(&part_readers.back());
     if (!log_header) {
       LOG(WARNING) << "Skipping " << part << " without a header";
       corrupted.emplace_back(part);
@@ -997,7 +1043,7 @@
       for (const std::string &boot : node_state.second.boots) {
         LOG(INFO) << "  boot " << boot;
       }
-      for (const std::pair<std::string,
+      for (const std::pair<const std::string,
                            std::vector<std::pair<std::string, bool>>>
                &constraints : node_state.second.constraints) {
         for (const std::pair<std::string, bool> &constraint :
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 8a10876..31fdc83 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -520,6 +520,9 @@
 
 PartsMessageReader::PartsMessageReader(LogParts log_parts)
     : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
+  if (parts_.parts.size() >= 2) {
+    next_message_reader_.emplace(parts_.parts[1]);
+  }
   ComputeBootCounts();
 }
 
@@ -601,11 +604,18 @@
 
 void PartsMessageReader::NextLog() {
   if (next_part_index_ == parts_.parts.size()) {
+    CHECK(!next_message_reader_);
     done_ = true;
     return;
   }
-  message_reader_ = MessageReader(parts_.parts[next_part_index_]);
+  CHECK(next_message_reader_);
+  message_reader_ = std::move(*next_message_reader_);
   ComputeBootCounts();
+  if (next_part_index_ + 1 < parts_.parts.size()) {
+    next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
+  } else {
+    next_message_reader_.reset();
+  }
   ++next_part_index_;
 }
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 26fc74d..5c5f709 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -199,15 +199,6 @@
     flatbuffers::FlatBufferBuilder *fbb, const Context &context,
     int channel_index, LogType log_type);
 
-// Reads the last header from a log file.  This handles any duplicate headers
-// that were written.
-std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
-    std::string_view filename);
-// Reads the Nth message from a log file, excluding the header.  Note: this
-// doesn't handle duplicate headers.
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
-    std::string_view filename, size_t n);
-
 // Class to read chunks out of a log file.
 class SpanReader {
  public:
@@ -251,6 +242,17 @@
   size_t consumed_data_ = 0;
 };
 
+// Reads the last header from a log file.  This handles any duplicate headers
+// that were written.
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
+    SpanReader *span_reader);
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
+    std::string_view filename);
+// Reads the Nth message from a log file, excluding the header.  Note: this
+// doesn't handle duplicate headers.
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
+    std::string_view filename, size_t n);
+
 // Class which handles reading the header and messages from the log file.  This
 // handles any per-file state left before merging below.
 class MessageReader {
@@ -352,6 +354,13 @@
   size_t next_part_index_ = 1u;
   bool done_ = false;
   MessageReader message_reader_;
+  // We instantiate the next one early, to allow implementations to prefetch.
+  // TODO(Brian): To get optimal performance when downloading, this needs more
+  // communication with the implementation to prioritize the next part and add
+  // more parallelism when it helps. Maybe some kind of a queue of parts in
+  // order, and the implementation gets to pull however many make sense off the
+  // front?
+  std::optional<MessageReader> next_message_reader_;
 
   // True after we have seen a message after the start of the log.  The
   // guarentees on logging essentially are that all data from before the