Austin Schuh | 8611071 | 2022-09-16 15:40:54 -0700 | [diff] [blame] | 1 | #include "aos/events/logging/s3_fetcher.h" |
| 2 | |
| 3 | #include <aws/core/Aws.h> |
| 4 | #include <aws/s3/model/ListObjectsV2Request.h> |
| 5 | |
| 6 | #include "absl/strings/str_cat.h" |
| 7 | #include "glog/logging.h" |
| 8 | |
| 9 | // When we first start reading a log folder, we end up reading the first part of |
| 10 | // each file twice. We could speed this up by restructuring the API so all the |
| 11 | // downloads could be started in parallel, and the initial chunk of data cached. |
| 12 | // However, even though this initial part is slower than necessary, |
| 13 | // decompressing and sorting the main part of the log file is still the slowest |
| 14 | // part, and this implementation does parallelize downloads with that, so it's |
| 15 | // good enough for now. |
| 16 | |
| 17 | namespace aos::logger { |
| 18 | namespace { |
| 19 | |
| 20 | struct AwsAPIOwner { |
| 21 | Aws::SDKOptions options; |
| 22 | |
| 23 | AwsAPIOwner() { |
| 24 | options.httpOptions.installSigPipeHandler = true; |
| 25 | Aws::InitAPI(options); |
| 26 | } |
| 27 | ~AwsAPIOwner() { Aws::ShutdownAPI(options); } |
| 28 | }; |
| 29 | |
| 30 | // If this doesn't fit the largest message, the per-request overhead very |
| 31 | // quickly dominates log reading time, because there's no processing in between |
| 32 | // reading all the pieces of a single message. Bigger takes more memory, but our |
| 33 | // logs aren't split across all that many files usually, so this can be fairly |
| 34 | // large without increasing memory requirements for log reading too much. |
| 35 | constexpr int kChunkSize = 10 * 1024 * 1024; |
| 36 | |
| 37 | void InitAwsAPI() { static AwsAPIOwner api_owner; } |
| 38 | |
| 39 | Aws::Client::ClientConfiguration MakeClientConfiguration() { |
| 40 | InitAwsAPI(); |
| 41 | Aws::Client::ClientConfiguration config; |
| 42 | config.region = Aws::Region::AWS_GLOBAL; |
| 43 | return config; |
| 44 | } |
| 45 | |
| 46 | struct ParsedRange { |
| 47 | uint64_t start, end, total_size; |
| 48 | }; |
| 49 | |
| 50 | ParsedRange ParseRange(std::string_view string) { |
| 51 | static constexpr std::string_view kBytes = "bytes "; |
| 52 | CHECK(string.substr(0, kBytes.size()) == kBytes) |
| 53 | << ": Invalid range: " << string; |
| 54 | string = string.substr(kBytes.size()); |
| 55 | |
| 56 | const size_t dash = string.find('-'); |
| 57 | CHECK(dash != string.npos) << ": Invalid range: " << string; |
| 58 | const size_t slash = string.find('/'); |
| 59 | CHECK(slash != string.npos) << ": Invalid range: " << string; |
| 60 | |
| 61 | ParsedRange result; |
| 62 | const std::string_view start_string = string.substr(0, dash); |
| 63 | CHECK(absl::SimpleAtoi(start_string, &result.start)) |
| 64 | << ": failed to parse " << start_string << " from " << string; |
| 65 | const std::string_view end_string = string.substr(dash + 1, slash - dash - 1); |
| 66 | CHECK(absl::SimpleAtoi(end_string, &result.end)) |
| 67 | << ": failed to parse " << end_string << " from " << string; |
| 68 | const std::string_view total_string = string.substr(slash + 1); |
| 69 | CHECK(absl::SimpleAtoi(total_string, &result.total_size)) |
| 70 | << ": failed to parse " << total_string << " from " << string; |
| 71 | return result; |
| 72 | } |
| 73 | |
| 74 | } // namespace |
| 75 | |
| 76 | ObjectName ParseUrl(std::string_view url) { |
| 77 | static constexpr std::string_view kS3 = "s3://"; |
| 78 | if (url.substr(0, kS3.size()) != kS3) { |
| 79 | LOG(FATAL) << "Not an S3 URL: " << url; |
| 80 | } |
| 81 | url = url.substr(kS3.size()); |
| 82 | const size_t slash = url.find('/'); |
| 83 | CHECK(slash != url.npos) << ": Invalid S3 URL: " << url; |
| 84 | ObjectName result; |
| 85 | result.bucket = url.substr(0, slash); |
| 86 | result.key = url.substr(slash + 1); |
| 87 | return result; |
| 88 | } |
| 89 | |
| 90 | // This client is thread-safe, so it should be used globally. Destroying it can |
| 91 | // take a while to shut down all the threads. |
| 92 | Aws::S3::S3Client &GetS3Client() { |
| 93 | static Aws::S3::S3Client result(MakeClientConfiguration()); |
| 94 | return result; |
| 95 | } |
| 96 | |
| 97 | S3Fetcher::S3Fetcher(std::string_view url) : url_(url) { |
| 98 | VLOG(1) << "opening " << url; |
| 99 | // Start the initial request now. |
| 100 | StartRequest(); |
| 101 | } |
| 102 | |
| 103 | size_t S3Fetcher::Read(uint8_t *begin, uint8_t *end) { |
| 104 | VLOG(1) << "looking to read " << (end - begin); |
| 105 | size_t total_read = 0; |
| 106 | |
| 107 | while (true) { |
| 108 | // First copy any data we already have. |
| 109 | const size_t current_size = |
| 110 | std::min<size_t>(current_chunk_.size(), end - begin - total_read); |
| 111 | memcpy(begin + total_read, current_chunk_.data(), current_size); |
| 112 | total_read += current_size; |
| 113 | current_chunk_.erase_front(current_size); |
| 114 | if (static_cast<ssize_t>(total_read) == end - begin) { |
| 115 | VLOG(1) << "Got all " << total_read; |
| 116 | // Got all of what the caller wants, done now. |
| 117 | return total_read; |
| 118 | } |
| 119 | CHECK_EQ(current_chunk_.size(), 0u) |
| 120 | << ": Should have already copied this data out"; |
| 121 | if (end_of_object_) { |
| 122 | VLOG(1) << "At end after " << total_read; |
| 123 | // Nothing more to read. |
| 124 | return total_read; |
| 125 | } |
| 126 | |
| 127 | // Read data from the last request. |
| 128 | CHECK(get_next_chunk_.valid()) << ": Should have a request started already"; |
| 129 | Aws::S3::Model::GetObjectOutcome get_outcome = get_next_chunk_.get(); |
| 130 | if (!get_outcome.IsSuccess()) { |
| 131 | if (next_byte_to_request_ == 0 && |
| 132 | get_outcome.GetError().GetResponseCode() == |
| 133 | Aws::Http::HttpResponseCode::REQUESTED_RANGE_NOT_SATISFIABLE) { |
| 134 | VLOG(1) << "At beginning of empty file"; |
| 135 | // This is what happens with an empty file. |
| 136 | // TODO(Brian): Do a List operation to verify it's actually empty? |
| 137 | CHECK_EQ(0u, total_read); |
| 138 | end_of_object_ = true; |
| 139 | return 0; |
| 140 | } |
| 141 | LOG(FATAL) << ": GET for " << url_ |
| 142 | << " failed: " << get_outcome.GetError(); |
| 143 | } |
| 144 | const ParsedRange content_range = |
| 145 | ParseRange(get_outcome.GetResult().GetContentRange()); |
| 146 | const uint64_t content_bytes = content_range.end - content_range.start + 1; |
| 147 | CHECK_EQ(content_range.start, next_byte_to_request_); |
| 148 | next_byte_to_request_ += kChunkSize; |
| 149 | |
| 150 | auto &stream = get_outcome.GetResult().GetBody(); |
| 151 | current_chunk_.resize(content_bytes); |
| 152 | stream.read(reinterpret_cast<char *>(current_chunk_.data()), content_bytes); |
| 153 | const size_t stream_read = stream.gcount(); |
| 154 | VLOG(1) << "got " << stream_read << " from " |
| 155 | << get_outcome.GetResult().GetContentRange(); |
| 156 | CHECK_EQ(stream_read, content_bytes); |
| 157 | if (content_range.end + 1 == content_range.total_size) { |
| 158 | end_of_object_ = true; |
| 159 | continue; |
| 160 | } |
| 161 | |
| 162 | // Kick off the next request. |
| 163 | StartRequest(); |
| 164 | } |
| 165 | |
| 166 | return total_read; |
| 167 | } |
| 168 | |
| 169 | void S3Fetcher::StartRequest() { |
| 170 | Aws::S3::Model::GetObjectRequest get_request; |
| 171 | const ObjectName object_name = ParseUrl(url_); |
| 172 | get_request.SetBucket(object_name.bucket); |
| 173 | get_request.SetKey(object_name.key); |
| 174 | const uint64_t last_byte_to_request = next_byte_to_request_ + kChunkSize; |
| 175 | get_request.SetRange(absl::StrCat("bytes=", next_byte_to_request_, "-", |
| 176 | last_byte_to_request - 1)); |
| 177 | VLOG(1) << "request for " << next_byte_to_request_ << "-" |
| 178 | << last_byte_to_request << ": " << get_request.GetRange(); |
| 179 | get_next_chunk_ = GetS3Client().GetObjectCallable(get_request); |
| 180 | } |
| 181 | |
| 182 | std::vector<std::string> ListS3Objects(std::string_view url) { |
| 183 | Aws::S3::Model::ListObjectsV2Request list_request; |
| 184 | const ObjectName object_name = ParseUrl(url); |
| 185 | list_request.SetBucket(object_name.bucket); |
| 186 | list_request.SetPrefix(object_name.key); |
| 187 | Aws::S3::Model::ListObjectsV2Outcome list_outcome = |
| 188 | GetS3Client().ListObjectsV2(list_request); |
| 189 | std::vector<std::string> result; |
| 190 | while (true) { |
| 191 | CHECK(list_outcome.IsSuccess()) << ": Listing objects for " << url |
| 192 | << " failed: " << list_outcome.GetError(); |
| 193 | auto &list_result = list_outcome.GetResult(); |
| 194 | for (const Aws::S3::Model::Object &object : list_result.GetContents()) { |
| 195 | result.push_back(absl::StrCat("s3://", list_outcome.GetResult().GetName(), |
| 196 | "/", object.GetKey())); |
| 197 | VLOG(2) << "got " << result.back(); |
| 198 | } |
| 199 | if (!list_result.GetIsTruncated()) { |
| 200 | break; |
| 201 | } |
| 202 | list_request.SetContinuationToken(list_result.GetNextContinuationToken()); |
| 203 | list_outcome = GetS3Client().ListObjectsV2(list_request); |
| 204 | } |
| 205 | return result; |
| 206 | } |
| 207 | |
| 208 | } // namespace aos::logger |