Add factory for SpanReader
Change-Id: Id3f382ed7c1ad4ca42ed674d1e8e98a2045ac8fc
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 81305d7..5568770 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -319,12 +319,9 @@
// There are enough of these in the wild that this is worth supporting.
std::vector<UnsortedOldParts> old_parts;
- // Some implementations have slow destructors, so save those for later to let
- // the slow parts can mostly run in parallel.
- std::vector<SpanReader> part_readers;
-
// Populates the class's datastructures from the input file list.
- void PopulateFromFiles(const std::vector<std::string> &parts);
+ void PopulateFromFiles(ReadersPool *readers,
+ const std::vector<std::string> &parts);
// Wrangles everything into a map of boot uuids -> boot counts.
MapBoots ComputeBootCounts();
@@ -340,19 +337,18 @@
// Reformats parts_list into a list of logfiles and returns it. This destroys
// state in PartsSorter.
std::vector<LogFile> FormatNewParts();
+
+ // Returns a list of all the parts that we have sorted.
+ std::vector<LogFile> SortParts();
};
-void PartsSorter::PopulateFromFiles(const std::vector<std::string> &parts) {
+void PartsSorter::PopulateFromFiles(ReadersPool *readers,
+ const std::vector<std::string> &parts) {
// Now extract everything into our datastructures above for sorting.
for (const std::string &part : parts) {
- if (part_readers.size() > 50) {
- // Don't leave arbitrary numbers of readers open, because they each take
- // resources, so close a big batch at once periodically.
- part_readers.clear();
- }
- part_readers.emplace_back(part, FLAGS_quiet_sorting);
+ SpanReader *reader = readers->BorrowReader(part);
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
- ReadHeader(&part_readers.back());
+ ReadHeader(reader);
if (!log_header) {
if (!FLAGS_quiet_sorting) {
LOG(WARNING) << "Skipping " << part << " without a header";
@@ -1970,27 +1966,29 @@
return result;
}
-std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
- PartsSorter sorter;
- sorter.PopulateFromFiles(parts);
-
- if (sorter.old_parts.empty() && sorter.parts_list.empty()) {
- if (parts.empty()) {
- return std::vector<LogFile>{};
- } else {
+std::vector<LogFile> PartsSorter::SortParts() {
+ if (old_parts.empty() && parts_list.empty()) {
+ if (!corrupted.empty()) {
LogFile log_file;
- log_file.corrupted = std::move(sorter.corrupted);
+ log_file.corrupted = std::move(corrupted);
return std::vector<LogFile>{log_file};
}
+ return std::vector<LogFile>{};
}
- CHECK_NE(sorter.old_parts.empty(), sorter.parts_list.empty())
+ CHECK_NE(old_parts.empty(), parts_list.empty())
<< ": Can't have a mix of old and new parts.";
-
- if (!sorter.old_parts.empty()) {
- return sorter.FormatOldParts();
+ if (!old_parts.empty()) {
+ return FormatOldParts();
+ } else {
+ return FormatNewParts();
}
+}
- return sorter.FormatNewParts();
+std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
+ LogReadersPool readers;
+ PartsSorter sorter;
+ sorter.PopulateFromFiles(&readers, parts);
+ return sorter.SortParts();
}
std::vector<std::string> FindNodes(const std::vector<LogFile> &parts) {
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 3c8c50d..fbec1b5 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -66,6 +66,8 @@
"corrupt message found by MessageReader be silently ignored, "
"providing access to all uncorrupted messages in a logfile.");
+DECLARE_bool(quiet_sorting);
+
namespace aos::logger {
namespace {
namespace chrono = std::chrono;
@@ -1281,6 +1283,23 @@
return true;
}
+LogReadersPool::LogReadersPool(const LogSource *log_source, size_t pool_size)
+ : log_source_(log_source), pool_size_(pool_size) {}
+
+SpanReader *LogReadersPool::BorrowReader(std::string_view id) {
+ if (part_readers_.size() > pool_size_) {
+ // Don't leave arbitrary numbers of readers open, because they each take
+ // resources, so close a big batch at once periodically.
+ part_readers_.clear();
+ }
+ if (log_source_ == nullptr) {
+ part_readers_.emplace_back(id, FLAGS_quiet_sorting);
+ } else {
+ part_readers_.emplace_back(id, log_source_->GetDecoder(id));
+ }
+ return &part_readers_.back();
+}
+
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
SpanReader *span_reader) {
absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f8d6b8e..b04be9f 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -228,6 +228,30 @@
bool is_finished_ = false;
};
+// Class to borrow log readers from pool based on their ids. This is used as a
+// factory and helps with performance when construction or descrution of
+// decoders are not free. For instance,, S3 fetchers are slow to destroy.
+class ReadersPool {
+ public:
+ virtual ~ReadersPool() = default;
+
+ // Borrow reader from pool based on the id.
+ virtual SpanReader *BorrowReader(std::string_view id) = 0;
+};
+
+class LogReadersPool : public ReadersPool {
+ public:
+ explicit LogReadersPool(const LogSource *log_source = nullptr,
+ size_t pool_size = 50);
+
+ SpanReader *BorrowReader(std::string_view id) override;
+
+ private:
+ const LogSource *log_source_;
+ std::vector<SpanReader> part_readers_;
+ const size_t pool_size_;
+};
+
// Reads the last header from a log file. This handles any duplicate headers
// that were written.
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(