Support reading from S3 directly

Change-Id: Ice18f6739a7e315bea223a2a6f634b6c4c725c11
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 82e7061..7201e70 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -48,6 +48,10 @@
         "logfile_sorting.h",
         "logfile_utils.h",
     ],
+    copts = select({
+        "//tools:cpu_k8": ["-DENABLE_S3=1"],
+        "//conditions:default": ["-DENABLE_S3=0"],
+    }),
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
@@ -69,7 +73,10 @@
         "@com_google_absl//absl/types:span",
         "@boringssl//:crypto",
     ] + select({
-        "//tools:cpu_k8": [":lzma_encoder"],
+        "//tools:cpu_k8": [
+            ":s3_fetcher",
+            ":lzma_encoder",
+        ],
         "//tools:cpu_arm64": [":lzma_encoder"],
         "//conditions:default": [],
     }),
@@ -200,6 +207,27 @@
     ],
 )
 
+cc_library(
+    name = "s3_fetcher",
+    srcs = [
+        "s3_fetcher.cc",
+    ],
+    hdrs = [
+        "s3_fetcher.h",
+    ],
+    target_compatible_with = ["@platforms//os:linux"],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":buffer_encoder",
+        "//aos/containers:resizeable_buffer",
+        "@aws_sdk//:core",
+        "@aws_sdk//:s3",
+        "@com_github_google_glog//:glog",
+        "@com_google_absl//absl/strings",
+        "@com_google_absl//absl/types:span",
+    ],
+)
+
 cc_test(
     name = "lzma_encoder_test",
     srcs = [
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 1a16456..89801b7 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -16,14 +16,17 @@
 #include "openssl/sha.h"
 #include "sys/stat.h"
 
+#if ENABLE_S3
+#include "aos/events/logging/s3_fetcher.h"
+#endif
+
 DEFINE_bool(quiet_sorting, false,
             "If true, sort with minimal messages about truncated files.");
 
 namespace aos {
 namespace logger {
-namespace chrono = std::chrono;
-
 namespace {
+namespace chrono = std::chrono;
 
 // Check if string ends with ending
 bool EndsWith(std::string_view str, std::string_view ending) {
@@ -62,7 +65,36 @@
          EndsWith(filename, ".bfbs.sz");
 }
 
+#if ENABLE_S3
+class S3FileOperations final : public FileOperations {
+ public:
+  S3FileOperations(std::string_view url) : object_urls_(ListS3Objects(url)) {}
+
+  bool Exists() override { return !object_urls_.empty(); }
+  void FindLogs(std::vector<std::string> *files) override {
+    // We already have a recursive listing, so just grab all the objects from
+    // there.
+    for (const std::string &object_url : object_urls_) {
+      if (IsValidFilename(object_url)) {
+        files->push_back(object_url);
+      }
+    }
+  }
+
+ private:
+  const std::vector<std::string> object_urls_;
+};
+#endif
+
 std::unique_ptr<FileOperations> MakeFileOperations(std::string_view filename) {
+  static constexpr std::string_view kS3 = "s3:";
+  if (filename.substr(0, kS3.size()) == kS3) {
+#if ENABLE_S3
+    return std::make_unique<S3FileOperations>(filename);
+#else
+    LOG(FATAL) << "Reading files from S3 not supported on this platform";
+#endif
+  }
   if (filename.find("://") != filename.npos) {
     LOG(FATAL) << "This looks like a URL of an unknown type: " << filename;
   }
@@ -391,7 +423,7 @@
 void PartsSorter::PopulateFromFiles(const std::vector<std::string> &parts) {
   // Now extract everything into our datastructures above for sorting.
   for (const std::string &part : parts) {
-    if (part_readers.size() > 200) {
+    if (part_readers.size() > 50) {
       // Don't leave arbitrary numbers of readers open, because they each take
       // resources, so close a big batch at once periodically.
       part_readers.clear();
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 4c06f0a..a76c1a8 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -28,6 +28,9 @@
 #if ENABLE_LZMA
 #include "aos/events/logging/lzma_encoder.h"
 #endif
+#if ENABLE_S3
+#include "aos/events/logging/s3_fetcher.h"
+#endif
 
 DEFINE_int32(flush_size, 128 * 1024,
              "Number of outstanding bytes to allow before flushing to disk.");
@@ -862,7 +865,16 @@
 
 SpanReader::SpanReader(std::string_view filename, bool quiet)
     : filename_(filename) {
-  decoder_ = std::make_unique<DummyDecoder>(filename);
+  static constexpr std::string_view kS3 = "s3:";
+  if (filename.substr(0, kS3.size()) == kS3) {
+#if ENABLE_S3
+    decoder_ = std::make_unique<S3Fetcher>(filename);
+#else
+    LOG(FATAL) << "Reading files from S3 not supported on this platform";
+#endif
+  } else {
+    decoder_ = std::make_unique<DummyDecoder>(filename);
+  }
 
   static constexpr std::string_view kXz = ".xz";
   static constexpr std::string_view kSnappy = SnappyDecoder::kExtension;
diff --git a/aos/events/logging/s3_fetcher.cc b/aos/events/logging/s3_fetcher.cc
new file mode 100644
index 0000000..f44a20b
--- /dev/null
+++ b/aos/events/logging/s3_fetcher.cc
@@ -0,0 +1,208 @@
+#include "aos/events/logging/s3_fetcher.h"
+
+#include <aws/core/Aws.h>
+#include <aws/s3/model/ListObjectsV2Request.h>
+
+#include "absl/strings/str_cat.h"
+#include "glog/logging.h"
+
+// When we first start reading a log folder, we end up reading the first part of
+// each file twice. We could speed this up by restructuring the API so all the
+// downloads could be started in parallel, and the initial chunk of data cached.
+// However, even though this initial part is slower than necessary,
+// decompressing and sorting the main part of the log file is still the slowest
+// part, and this implementation does parallelize downloads with that, so it's
+// good enough for now.
+
+namespace aos::logger {
+namespace {
+
+struct AwsAPIOwner {
+  Aws::SDKOptions options;
+
+  AwsAPIOwner() {
+    options.httpOptions.installSigPipeHandler = true;
+    Aws::InitAPI(options);
+  }
+  ~AwsAPIOwner() { Aws::ShutdownAPI(options); }
+};
+
+// If this doesn't fit the largest message, the per-request overhead very
+// quickly dominates log reading time, because there's no processing in between
+// reading all the pieces of a single message. Bigger takes more memory, but our
+// logs aren't split across all that many files usually, so this can be fairly
+// large without increasing memory requirements for log reading too much.
+constexpr int kChunkSize = 10 * 1024 * 1024;
+
+void InitAwsAPI() { static AwsAPIOwner api_owner; }
+
+Aws::Client::ClientConfiguration MakeClientConfiguration() {
+  InitAwsAPI();
+  Aws::Client::ClientConfiguration config;
+  config.region = Aws::Region::AWS_GLOBAL;
+  return config;
+}
+
+struct ParsedRange {
+  uint64_t start, end, total_size;
+};
+
+ParsedRange ParseRange(std::string_view string) {
+  static constexpr std::string_view kBytes = "bytes ";
+  CHECK(string.substr(0, kBytes.size()) == kBytes)
+      << ": Invalid range: " << string;
+  string = string.substr(kBytes.size());
+
+  const size_t dash = string.find('-');
+  CHECK(dash != string.npos) << ": Invalid range: " << string;
+  const size_t slash = string.find('/');
+  CHECK(slash != string.npos) << ": Invalid range: " << string;
+
+  ParsedRange result;
+  const std::string_view start_string = string.substr(0, dash);
+  CHECK(absl::SimpleAtoi(start_string, &result.start))
+      << ": failed to parse " << start_string << " from " << string;
+  const std::string_view end_string = string.substr(dash + 1, slash - dash - 1);
+  CHECK(absl::SimpleAtoi(end_string, &result.end))
+      << ": failed to parse " << end_string << " from " << string;
+  const std::string_view total_string = string.substr(slash + 1);
+  CHECK(absl::SimpleAtoi(total_string, &result.total_size))
+      << ": failed to parse " << total_string << " from " << string;
+  return result;
+}
+
+}  // namespace
+
+ObjectName ParseUrl(std::string_view url) {
+  static constexpr std::string_view kS3 = "s3://";
+  if (url.substr(0, kS3.size()) != kS3) {
+    LOG(FATAL) << "Not an S3 URL: " << url;
+  }
+  url = url.substr(kS3.size());
+  const size_t slash = url.find('/');
+  CHECK(slash != url.npos) << ": Invalid S3 URL: " << url;
+  ObjectName result;
+  result.bucket = url.substr(0, slash);
+  result.key = url.substr(slash + 1);
+  return result;
+}
+
+// This client is thread-safe, so it should be used globally. Destroying it can
+// take a while to shut down all the threads.
+Aws::S3::S3Client &GetS3Client() {
+  static Aws::S3::S3Client result(MakeClientConfiguration());
+  return result;
+}
+
+S3Fetcher::S3Fetcher(std::string_view url) : url_(url) {
+  VLOG(1) << "opening " << url;
+  // Start the initial request now.
+  StartRequest();
+}
+
+size_t S3Fetcher::Read(uint8_t *begin, uint8_t *end) {
+  VLOG(1) << "looking to read " << (end - begin);
+  size_t total_read = 0;
+
+  while (true) {
+    // First copy any data we already have.
+    const size_t current_size =
+        std::min<size_t>(current_chunk_.size(), end - begin - total_read);
+    memcpy(begin + total_read, current_chunk_.data(), current_size);
+    total_read += current_size;
+    current_chunk_.erase_front(current_size);
+    if (static_cast<ssize_t>(total_read) == end - begin) {
+      VLOG(1) << "Got all " << total_read;
+      // Got all of what the caller wants, done now.
+      return total_read;
+    }
+    CHECK_EQ(current_chunk_.size(), 0u)
+        << ": Should have already copied this data out";
+    if (end_of_object_) {
+      VLOG(1) << "At end after " << total_read;
+      // Nothing more to read.
+      return total_read;
+    }
+
+    // Read data from the last request.
+    CHECK(get_next_chunk_.valid()) << ": Should have a request started already";
+    Aws::S3::Model::GetObjectOutcome get_outcome = get_next_chunk_.get();
+    if (!get_outcome.IsSuccess()) {
+      if (next_byte_to_request_ == 0 &&
+          get_outcome.GetError().GetResponseCode() ==
+              Aws::Http::HttpResponseCode::REQUESTED_RANGE_NOT_SATISFIABLE) {
+        VLOG(1) << "At beginning of empty file";
+        // This is what happens with an empty file.
+        // TODO(Brian): Do a List operation to verify it's actually empty?
+        CHECK_EQ(0u, total_read);
+        end_of_object_ = true;
+        return 0;
+      }
+      LOG(FATAL) << ": GET for " << url_
+                 << " failed: " << get_outcome.GetError();
+    }
+    const ParsedRange content_range =
+        ParseRange(get_outcome.GetResult().GetContentRange());
+    const uint64_t content_bytes = content_range.end - content_range.start + 1;
+    CHECK_EQ(content_range.start, next_byte_to_request_);
+    next_byte_to_request_ += kChunkSize;
+
+    auto &stream = get_outcome.GetResult().GetBody();
+    current_chunk_.resize(content_bytes);
+    stream.read(reinterpret_cast<char *>(current_chunk_.data()), content_bytes);
+    const size_t stream_read = stream.gcount();
+    VLOG(1) << "got " << stream_read << " from "
+            << get_outcome.GetResult().GetContentRange();
+    CHECK_EQ(stream_read, content_bytes);
+    if (content_range.end + 1 == content_range.total_size) {
+      end_of_object_ = true;
+      continue;
+    }
+
+    // Kick off the next request.
+    StartRequest();
+  }
+
+  return total_read;
+}
+
+void S3Fetcher::StartRequest() {
+  Aws::S3::Model::GetObjectRequest get_request;
+  const ObjectName object_name = ParseUrl(url_);
+  get_request.SetBucket(object_name.bucket);
+  get_request.SetKey(object_name.key);
+  const uint64_t last_byte_to_request = next_byte_to_request_ + kChunkSize;
+  get_request.SetRange(absl::StrCat("bytes=", next_byte_to_request_, "-",
+                                    last_byte_to_request - 1));
+  VLOG(1) << "request for " << next_byte_to_request_ << "-"
+          << last_byte_to_request << ": " << get_request.GetRange();
+  get_next_chunk_ = GetS3Client().GetObjectCallable(get_request);
+}
+
+std::vector<std::string> ListS3Objects(std::string_view url) {
+  Aws::S3::Model::ListObjectsV2Request list_request;
+  const ObjectName object_name = ParseUrl(url);
+  list_request.SetBucket(object_name.bucket);
+  list_request.SetPrefix(object_name.key);
+  Aws::S3::Model::ListObjectsV2Outcome list_outcome =
+      GetS3Client().ListObjectsV2(list_request);
+  std::vector<std::string> result;
+  while (true) {
+    CHECK(list_outcome.IsSuccess()) << ": Listing objects for " << url
+                                    << " failed: " << list_outcome.GetError();
+    auto &list_result = list_outcome.GetResult();
+    for (const Aws::S3::Model::Object &object : list_result.GetContents()) {
+      result.push_back(absl::StrCat("s3://", list_outcome.GetResult().GetName(),
+                                    "/", object.GetKey()));
+      VLOG(2) << "got " << result.back();
+    }
+    if (!list_result.GetIsTruncated()) {
+      break;
+    }
+    list_request.SetContinuationToken(list_result.GetNextContinuationToken());
+    list_outcome = GetS3Client().ListObjectsV2(list_request);
+  }
+  return result;
+}
+
+}  // namespace aos::logger
diff --git a/aos/events/logging/s3_fetcher.h b/aos/events/logging/s3_fetcher.h
new file mode 100644
index 0000000..cf00226
--- /dev/null
+++ b/aos/events/logging/s3_fetcher.h
@@ -0,0 +1,62 @@
+
+#ifndef AOS_EVENTS_LOGGING_S3_FETCHER_H_
+#define AOS_EVENTS_LOGGING_S3_FETCHER_H_
+
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/GetObjectRequest.h>
+
+#include <future>
+#include <string_view>
+
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/logging/buffer_encoder.h"
+
+namespace aos::logger {
+
+// Fetches data from an S3 URL.
+class S3Fetcher final : public DataDecoder {
+ public:
+  explicit S3Fetcher(std::string_view url);
+  S3Fetcher(const S3Fetcher &) = delete;
+  S3Fetcher &operator=(const S3Fetcher &) = delete;
+
+  size_t Read(uint8_t *begin, uint8_t *end) final;
+  std::string_view filename() const final { return url_; }
+
+ private:
+  const std::string url_;
+
+  // The current chunk we're reading from. Empty if there is no current chunk or
+  // we've read all of it.
+  ResizeableBuffer current_chunk_;
+  // If valid, the next chunk which we've triggered to be retrieved in the
+  // background.
+  std::future<Aws::S3::Model::GetObjectOutcome> get_next_chunk_;
+
+  // The next byte index we're going to request. This means we've already made
+  // requests for all prior bytes, but not necessarily received them.
+  uint64_t next_byte_to_request_ = 0;
+
+  // Set once we've received data for the end of the object. Some of it may
+  // still be in current_chunk_ though.
+  bool end_of_object_ = false;
+
+  // Kicks off a request for the next chunk.
+  void StartRequest();
+};
+
+Aws::S3::S3Client& GetS3Client();
+
+struct ObjectName {
+  std::string bucket, key;
+};
+
+ObjectName ParseUrl(std::string_view url);
+
+// Does an S3 object listing with the given URL prefix. Returns the URLs for all
+// the objects under it.
+std::vector<std::string> ListS3Objects(std::string_view url);
+
+}  // namespace aos::logger
+
+#endif  // AOS_EVENTS_LOGGING_S3_FETCHER_H_