blob: f44a20bcf57021bbcdd8e060c43103afef545cb4 [file] [log] [blame]
Austin Schuh86110712022-09-16 15:40:54 -07001#include "aos/events/logging/s3_fetcher.h"
2
3#include <aws/core/Aws.h>
4#include <aws/s3/model/ListObjectsV2Request.h>
5
6#include "absl/strings/str_cat.h"
7#include "glog/logging.h"
8
9// When we first start reading a log folder, we end up reading the first part of
10// each file twice. We could speed this up by restructuring the API so all the
11// downloads could be started in parallel, and the initial chunk of data cached.
12// However, even though this initial part is slower than necessary,
13// decompressing and sorting the main part of the log file is still the slowest
14// part, and this implementation does parallelize downloads with that, so it's
15// good enough for now.
16
17namespace aos::logger {
18namespace {
19
20struct AwsAPIOwner {
21 Aws::SDKOptions options;
22
23 AwsAPIOwner() {
24 options.httpOptions.installSigPipeHandler = true;
25 Aws::InitAPI(options);
26 }
27 ~AwsAPIOwner() { Aws::ShutdownAPI(options); }
28};
29
30// If this doesn't fit the largest message, the per-request overhead very
31// quickly dominates log reading time, because there's no processing in between
32// reading all the pieces of a single message. Bigger takes more memory, but our
33// logs aren't split across all that many files usually, so this can be fairly
34// large without increasing memory requirements for log reading too much.
35constexpr int kChunkSize = 10 * 1024 * 1024;
36
37void InitAwsAPI() { static AwsAPIOwner api_owner; }
38
39Aws::Client::ClientConfiguration MakeClientConfiguration() {
40 InitAwsAPI();
41 Aws::Client::ClientConfiguration config;
42 config.region = Aws::Region::AWS_GLOBAL;
43 return config;
44}
45
46struct ParsedRange {
47 uint64_t start, end, total_size;
48};
49
50ParsedRange ParseRange(std::string_view string) {
51 static constexpr std::string_view kBytes = "bytes ";
52 CHECK(string.substr(0, kBytes.size()) == kBytes)
53 << ": Invalid range: " << string;
54 string = string.substr(kBytes.size());
55
56 const size_t dash = string.find('-');
57 CHECK(dash != string.npos) << ": Invalid range: " << string;
58 const size_t slash = string.find('/');
59 CHECK(slash != string.npos) << ": Invalid range: " << string;
60
61 ParsedRange result;
62 const std::string_view start_string = string.substr(0, dash);
63 CHECK(absl::SimpleAtoi(start_string, &result.start))
64 << ": failed to parse " << start_string << " from " << string;
65 const std::string_view end_string = string.substr(dash + 1, slash - dash - 1);
66 CHECK(absl::SimpleAtoi(end_string, &result.end))
67 << ": failed to parse " << end_string << " from " << string;
68 const std::string_view total_string = string.substr(slash + 1);
69 CHECK(absl::SimpleAtoi(total_string, &result.total_size))
70 << ": failed to parse " << total_string << " from " << string;
71 return result;
72}
73
74} // namespace
75
76ObjectName ParseUrl(std::string_view url) {
77 static constexpr std::string_view kS3 = "s3://";
78 if (url.substr(0, kS3.size()) != kS3) {
79 LOG(FATAL) << "Not an S3 URL: " << url;
80 }
81 url = url.substr(kS3.size());
82 const size_t slash = url.find('/');
83 CHECK(slash != url.npos) << ": Invalid S3 URL: " << url;
84 ObjectName result;
85 result.bucket = url.substr(0, slash);
86 result.key = url.substr(slash + 1);
87 return result;
88}
89
90// This client is thread-safe, so it should be used globally. Destroying it can
91// take a while to shut down all the threads.
92Aws::S3::S3Client &GetS3Client() {
93 static Aws::S3::S3Client result(MakeClientConfiguration());
94 return result;
95}
96
97S3Fetcher::S3Fetcher(std::string_view url) : url_(url) {
98 VLOG(1) << "opening " << url;
99 // Start the initial request now.
100 StartRequest();
101}
102
103size_t S3Fetcher::Read(uint8_t *begin, uint8_t *end) {
104 VLOG(1) << "looking to read " << (end - begin);
105 size_t total_read = 0;
106
107 while (true) {
108 // First copy any data we already have.
109 const size_t current_size =
110 std::min<size_t>(current_chunk_.size(), end - begin - total_read);
111 memcpy(begin + total_read, current_chunk_.data(), current_size);
112 total_read += current_size;
113 current_chunk_.erase_front(current_size);
114 if (static_cast<ssize_t>(total_read) == end - begin) {
115 VLOG(1) << "Got all " << total_read;
116 // Got all of what the caller wants, done now.
117 return total_read;
118 }
119 CHECK_EQ(current_chunk_.size(), 0u)
120 << ": Should have already copied this data out";
121 if (end_of_object_) {
122 VLOG(1) << "At end after " << total_read;
123 // Nothing more to read.
124 return total_read;
125 }
126
127 // Read data from the last request.
128 CHECK(get_next_chunk_.valid()) << ": Should have a request started already";
129 Aws::S3::Model::GetObjectOutcome get_outcome = get_next_chunk_.get();
130 if (!get_outcome.IsSuccess()) {
131 if (next_byte_to_request_ == 0 &&
132 get_outcome.GetError().GetResponseCode() ==
133 Aws::Http::HttpResponseCode::REQUESTED_RANGE_NOT_SATISFIABLE) {
134 VLOG(1) << "At beginning of empty file";
135 // This is what happens with an empty file.
136 // TODO(Brian): Do a List operation to verify it's actually empty?
137 CHECK_EQ(0u, total_read);
138 end_of_object_ = true;
139 return 0;
140 }
141 LOG(FATAL) << ": GET for " << url_
142 << " failed: " << get_outcome.GetError();
143 }
144 const ParsedRange content_range =
145 ParseRange(get_outcome.GetResult().GetContentRange());
146 const uint64_t content_bytes = content_range.end - content_range.start + 1;
147 CHECK_EQ(content_range.start, next_byte_to_request_);
148 next_byte_to_request_ += kChunkSize;
149
150 auto &stream = get_outcome.GetResult().GetBody();
151 current_chunk_.resize(content_bytes);
152 stream.read(reinterpret_cast<char *>(current_chunk_.data()), content_bytes);
153 const size_t stream_read = stream.gcount();
154 VLOG(1) << "got " << stream_read << " from "
155 << get_outcome.GetResult().GetContentRange();
156 CHECK_EQ(stream_read, content_bytes);
157 if (content_range.end + 1 == content_range.total_size) {
158 end_of_object_ = true;
159 continue;
160 }
161
162 // Kick off the next request.
163 StartRequest();
164 }
165
166 return total_read;
167}
168
169void S3Fetcher::StartRequest() {
170 Aws::S3::Model::GetObjectRequest get_request;
171 const ObjectName object_name = ParseUrl(url_);
172 get_request.SetBucket(object_name.bucket);
173 get_request.SetKey(object_name.key);
174 const uint64_t last_byte_to_request = next_byte_to_request_ + kChunkSize;
175 get_request.SetRange(absl::StrCat("bytes=", next_byte_to_request_, "-",
176 last_byte_to_request - 1));
177 VLOG(1) << "request for " << next_byte_to_request_ << "-"
178 << last_byte_to_request << ": " << get_request.GetRange();
179 get_next_chunk_ = GetS3Client().GetObjectCallable(get_request);
180}
181
182std::vector<std::string> ListS3Objects(std::string_view url) {
183 Aws::S3::Model::ListObjectsV2Request list_request;
184 const ObjectName object_name = ParseUrl(url);
185 list_request.SetBucket(object_name.bucket);
186 list_request.SetPrefix(object_name.key);
187 Aws::S3::Model::ListObjectsV2Outcome list_outcome =
188 GetS3Client().ListObjectsV2(list_request);
189 std::vector<std::string> result;
190 while (true) {
191 CHECK(list_outcome.IsSuccess()) << ": Listing objects for " << url
192 << " failed: " << list_outcome.GetError();
193 auto &list_result = list_outcome.GetResult();
194 for (const Aws::S3::Model::Object &object : list_result.GetContents()) {
195 result.push_back(absl::StrCat("s3://", list_outcome.GetResult().GetName(),
196 "/", object.GetKey()));
197 VLOG(2) << "got " << result.back();
198 }
199 if (!list_result.GetIsTruncated()) {
200 break;
201 }
202 list_request.SetContinuationToken(list_result.GetNextContinuationToken());
203 list_outcome = GetS3Client().ListObjectsV2(list_request);
204 }
205 return result;
206}
207
208} // namespace aos::logger