Split out an interface for finding log files
This allows new implementations which list log files from cloud storage,
etc.
Change-Id: I962946962e92eedf582e6b0b3c65e9ba606835cb
Signed-off-by: Brian Silverman <bsilver16384@gmail.com>
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index c2134ad..ce9b3ae 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -27,10 +27,65 @@
str.substr(str.size() - ending.size()) == ending;
}
-bool FileExists(std::string filename) {
- struct stat stat_results;
- int error = stat(filename.c_str(), &stat_results);
- return error == 0;
+class FileOperations {
+ public:
+ virtual ~FileOperations() = default;
+
+ virtual bool Exists() = 0;
+ virtual void FindLogs(std::vector<std::string> *files) = 0;
+};
+
+// Implements FileOperations with standard POSIX filesystem APIs. These work on
+// files local to the machine they're running on.
+class LocalFileOperations final : public FileOperations {
+ public:
+ LocalFileOperations(std::string_view filename) : filename_(filename) {}
+
+ bool Exists() override {
+ struct stat stat_results;
+ int error = stat(filename_.c_str(), &stat_results);
+ return error == 0;
+ }
+
+ void FindLogs(std::vector<std::string> *files) override;
+
+ private:
+ std::string filename_;
+};
+
+bool IsValidFilename(std::string_view filename) {
+ return EndsWith(filename, ".bfbs") || EndsWith(filename, ".bfbs.xz");
+}
+
+std::unique_ptr<FileOperations> MakeFileOperations(std::string_view filename) {
+ if (filename.find("://") != filename.npos) {
+ LOG(FATAL) << "This looks like a URL of an unknown type: " << filename;
+ }
+ return std::make_unique<LocalFileOperations>(filename);
+}
+
+void LocalFileOperations::FindLogs(std::vector<std::string> *files) {
+ DIR *directory = opendir(filename_.c_str());
+
+ if (directory == nullptr) {
+ if (IsValidFilename(filename_)) {
+ files->emplace_back(filename_);
+ }
+ return;
+ }
+
+ struct dirent *directory_entry;
+ while ((directory_entry = readdir(directory)) != nullptr) {
+ std::string next_filename = directory_entry->d_name;
+ if (next_filename == "." || next_filename == "..") {
+ continue;
+ }
+
+ std::string path = filename_ + "/" + next_filename;
+ std::make_unique<LocalFileOperations>(path)->FindLogs(files);
+ }
+
+ closedir(directory);
}
bool ConfigOnly(const LogFileHeader *header) {
@@ -79,27 +134,7 @@
} // namespace
void FindLogs(std::vector<std::string> *files, std::string filename) {
- DIR *directory = opendir(filename.c_str());
-
- if (directory == nullptr) {
- if (EndsWith(filename, ".bfbs") || EndsWith(filename, ".bfbs.xz")) {
- files->emplace_back(filename);
- }
- return;
- }
-
- struct dirent *directory_entry;
- while ((directory_entry = readdir(directory)) != nullptr) {
- std::string next_filename = directory_entry->d_name;
- if (next_filename == "." || next_filename == "..") {
- continue;
- }
-
- std::string path = filename + "/" + next_filename;
- FindLogs(files, path);
- }
-
- closedir(directory);
+ MakeFileOperations(filename)->FindLogs(files);
}
std::vector<std::string> FindLogs(std::string filename) {
@@ -113,8 +148,9 @@
for (int i = 1; i < argc; i++) {
std::string filename = argv[i];
- if (FileExists(filename)) {
- aos::logger::FindLogs(&found_logfiles, filename);
+ const auto file_operations = MakeFileOperations(filename);
+ if (file_operations->Exists()) {
+ file_operations->FindLogs(&found_logfiles);
} else {
LOG(FATAL) << "File " << filename << " does not exist";
}
@@ -268,6 +304,10 @@
// There are enough of these in the wild that this is worth supporting.
std::vector<UnsortedOldParts> old_parts;
+ // Some implementations have slow destructors, so save those for later to let
+ // the slow parts can mostly run in parallel.
+ std::vector<SpanReader> part_readers;
+
// Populates the class's datastructures from the input file list.
void PopulateFromFiles(const std::vector<std::string> &parts);
@@ -290,8 +330,14 @@
void PartsSorter::PopulateFromFiles(const std::vector<std::string> &parts) {
// Now extract everything into our datastructures above for sorting.
for (const std::string &part : parts) {
+ if (part_readers.size() > 200) {
+ // Don't leave arbitrary numbers of readers open, because they each take
+ // resources, so close a big batch at once periodically.
+ part_readers.clear();
+ }
+ part_readers.emplace_back(part);
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
- ReadHeader(part);
+ ReadHeader(&part_readers.back());
if (!log_header) {
LOG(WARNING) << "Skipping " << part << " without a header";
corrupted.emplace_back(part);
@@ -997,7 +1043,7 @@
for (const std::string &boot : node_state.second.boots) {
LOG(INFO) << " boot " << boot;
}
- for (const std::pair<std::string,
+ for (const std::pair<const std::string,
std::vector<std::pair<std::string, bool>>>
&constraints : node_state.second.constraints) {
for (const std::pair<std::string, bool> &constraint :
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 8a10876..31fdc83 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -520,6 +520,9 @@
PartsMessageReader::PartsMessageReader(LogParts log_parts)
: parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
+ if (parts_.parts.size() >= 2) {
+ next_message_reader_.emplace(parts_.parts[1]);
+ }
ComputeBootCounts();
}
@@ -601,11 +604,18 @@
void PartsMessageReader::NextLog() {
if (next_part_index_ == parts_.parts.size()) {
+ CHECK(!next_message_reader_);
done_ = true;
return;
}
- message_reader_ = MessageReader(parts_.parts[next_part_index_]);
+ CHECK(next_message_reader_);
+ message_reader_ = std::move(*next_message_reader_);
ComputeBootCounts();
+ if (next_part_index_ + 1 < parts_.parts.size()) {
+ next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
+ } else {
+ next_message_reader_.reset();
+ }
++next_part_index_;
}
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 26fc74d..5c5f709 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -199,15 +199,6 @@
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type);
-// Reads the last header from a log file. This handles any duplicate headers
-// that were written.
-std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
- std::string_view filename);
-// Reads the Nth message from a log file, excluding the header. Note: this
-// doesn't handle duplicate headers.
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
- std::string_view filename, size_t n);
-
// Class to read chunks out of a log file.
class SpanReader {
public:
@@ -251,6 +242,17 @@
size_t consumed_data_ = 0;
};
+// Reads the last header from a log file. This handles any duplicate headers
+// that were written.
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
+ SpanReader *span_reader);
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
+ std::string_view filename);
+// Reads the Nth message from a log file, excluding the header. Note: this
+// doesn't handle duplicate headers.
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
+ std::string_view filename, size_t n);
+
// Class which handles reading the header and messages from the log file. This
// handles any per-file state left before merging below.
class MessageReader {
@@ -352,6 +354,13 @@
size_t next_part_index_ = 1u;
bool done_ = false;
MessageReader message_reader_;
+ // We instantiate the next one early, to allow implementations to prefetch.
+ // TODO(Brian): To get optimal performance when downloading, this needs more
+ // communication with the implementation to prioritize the next part and add
+ // more parallelism when it helps. Maybe some kind of a queue of parts in
+ // order, and the implementation gets to pull however many make sense off the
+ // front?
+ std::optional<MessageReader> next_message_reader_;
// True after we have seen a message after the start of the log. The
// guarentees on logging essentially are that all data from before the