Introduce interfaces for compressing and decompressing log files
Change-Id: Ia7da3f840a1780a04203f1c312447b50b142a5a3
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 26c3825..c7f013d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1,16 +1,15 @@
#include "aos/events/logging/logfile_utils.h"
#include <fcntl.h>
-#include <limits.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
-#include <vector>
+#include <algorithm>
+#include <climits>
#include "absl/strings/escaping.h"
#include "aos/configuration.h"
-#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/util/file.h"
#include "flatbuffers/flatbuffers.h"
@@ -20,13 +19,13 @@
DEFINE_int32(flush_size, 128000,
"Number of outstanding bytes to allow before flushing to disk.");
-namespace aos {
-namespace logger {
+namespace aos::logger {
namespace chrono = std::chrono;
-DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
- : filename_(filename) {
+DetachedBufferWriter::DetachedBufferWriter(
+ std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+ : filename_(filename), encoder_(std::move(encoder)) {
util::MkdirP(filename, 0777);
fd_ = open(std::string(filename).c_str(),
O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
@@ -35,7 +34,10 @@
}
DetachedBufferWriter::~DetachedBufferWriter() {
- Flush();
+ encoder_->Finish();
+ while (encoder_->queue_size() > 0) {
+ Flush();
+ }
PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
VLOG(1) << "Closed " << filename_;
}
@@ -50,77 +52,95 @@
DetachedBufferWriter &DetachedBufferWriter::operator=(
DetachedBufferWriter &&other) {
std::swap(filename_, other.filename_);
+ std::swap(encoder_, other.encoder_);
std::swap(fd_, other.fd_);
- std::swap(queued_size_, other.queued_size_);
- std::swap(written_size_, other.written_size_);
- std::swap(queue_, other.queue_);
std::swap(iovec_, other.iovec_);
+ std::swap(max_write_time_, other.max_write_time_);
+ std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
+ std::swap(max_write_time_messages_, other.max_write_time_messages_);
+ std::swap(total_write_time_, other.total_write_time_);
+ std::swap(total_write_count_, other.total_write_count_);
+ std::swap(total_write_messages_, other.total_write_messages_);
+ std::swap(total_write_bytes_, other.total_write_bytes_);
return *this;
}
-void DetachedBufferWriter::QueueSizedFlatbuffer(
- flatbuffers::FlatBufferBuilder *fbb) {
- QueueSizedFlatbuffer(fbb->Release());
-}
+void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
+ if (encoder_->may_bypass() && span.size() > 4096u) {
+ // Over this threshold, we'll assume it's cheaper to add an extra
+ // syscall to write the data immediately instead of copying it to
+ // enqueue.
-void DetachedBufferWriter::WriteSizedFlatbuffer(
- absl::Span<const uint8_t> span) {
- // Cheat aggressively... Write out the queued up data, and then write this
- // data once without buffering. It is hard to make a DetachedBuffer out of
- // this data, and we don't want to worry about lifetimes.
- Flush();
- iovec_.clear();
- iovec_.reserve(1);
+ // First, flush everything.
+ while (encoder_->queue_size() > 0u) {
+ Flush();
+ }
- struct iovec n;
- n.iov_base = const_cast<uint8_t *>(span.data());
- n.iov_len = span.size();
- iovec_.emplace_back(n);
-
- const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
- PCHECK(written == static_cast<ssize_t>(n.iov_len))
- << ": Wrote " << written << " expected " << n.iov_len;
- written_size_ += written;
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
- flatbuffers::DetachedBuffer &&buffer) {
- queued_size_ += buffer.size();
- queue_.emplace_back(std::move(buffer));
-
- // Flush if we are at the max number of iovs per writev, or have written
- // enough data. Otherwise writev will fail with an invalid argument.
- if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
- queue_.size() == IOV_MAX) {
- Flush();
+ // Then, write it directly.
+ const auto start = aos::monotonic_clock::now();
+ const ssize_t written = write(fd_, span.data(), span.size());
+ const auto end = aos::monotonic_clock::now();
+ PCHECK(written >= 0) << ": write failed";
+ CHECK_EQ(written, static_cast<ssize_t>(span.size()))
+ << ": Wrote " << written << " expected " << span.size();
+ UpdateStatsForWrite(end - start, written, 1);
+ } else {
+ encoder_->Encode(CopySpanAsDetachedBuffer(span));
}
+
+ FlushAtThreshold();
}
void DetachedBufferWriter::Flush() {
- if (queue_.size() == 0u) {
+ const auto queue = encoder_->queue();
+ if (queue.empty()) {
return;
}
+
iovec_.clear();
- iovec_.reserve(queue_.size());
+ const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
+ iovec_.resize(iovec_size);
size_t counted_size = 0;
- for (size_t i = 0; i < queue_.size(); ++i) {
- struct iovec n;
- n.iov_base = queue_[i].data();
- n.iov_len = queue_[i].size();
- counted_size += n.iov_len;
- iovec_.emplace_back(std::move(n));
+ for (size_t i = 0; i < iovec_size; ++i) {
+ iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
+ iovec_[i].iov_len = queue[i].size();
+ counted_size += iovec_[i].iov_len;
}
- CHECK_EQ(counted_size, queued_size_);
+
+ const auto start = aos::monotonic_clock::now();
const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
- PCHECK(written == static_cast<ssize_t>(queued_size_))
- << ": Wrote " << written << " expected " << queued_size_;
- written_size_ += written;
-
- queued_size_ = 0;
- queue_.clear();
+ const auto end = aos::monotonic_clock::now();
+ PCHECK(written >= 0) << ": writev failed";
// TODO(austin): Handle partial writes in some way other than crashing...
+ CHECK_EQ(written, static_cast<ssize_t>(counted_size))
+ << ": Wrote " << written << " expected " << counted_size;
+
+ encoder_->Clear(iovec_size);
+
+ UpdateStatsForWrite(end - start, written, iovec_size);
+}
+
+void DetachedBufferWriter::UpdateStatsForWrite(
+ aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
+ if (duration > max_write_time_) {
+ max_write_time_ = duration;
+ max_write_time_bytes_ = written;
+ max_write_time_messages_ = iovec_size;
+ }
+ total_write_time_ += duration;
+ ++total_write_count_;
+ total_write_messages_ += iovec_size;
+ total_write_bytes_ += written;
+}
+
+void DetachedBufferWriter::FlushAtThreshold() {
+ // Flush if we are at the max number of iovs per writev, because there's no
+ // point queueing up any more data in memory. Also flush once we have enough
+ // data queued up.
+ while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
+ encoder_->queue_size() >= IOV_MAX) {
+ Flush();
+ }
}
flatbuffers::Offset<MessageHeader> PackMessage(
@@ -185,10 +205,10 @@
return message_header_builder.Finish();
}
-SpanReader::SpanReader(std::string_view filename)
- : filename_(filename),
- fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
- PCHECK(fd_ != -1) << ": Failed to open " << filename;
+SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
+ // Support for other kinds of decoders based on the filename should be added
+ // here.
+ decoder_ = std::make_unique<DummyDecoder>(filename);
}
absl::Span<const uint8_t> SpanReader::ReadMessage() {
@@ -244,18 +264,13 @@
}
bool SpanReader::ReadBlock() {
- if (end_of_file_) {
- return false;
- }
-
- // Appends 256k. This is enough that the read call is efficient. We don't
- // want to spend too much time reading small chunks because the syscalls for
- // that will be expensive.
+ // This is the amount of data we grab at a time. Doing larger chunks minimizes
+ // syscalls and helps decompressors batch things more efficiently.
constexpr size_t kReadSize = 256 * 1024;
// Strip off any unused data at the front.
if (consumed_data_ != 0) {
- data_.erase(data_.begin(), data_.begin() + consumed_data_);
+ data_.erase_front(consumed_data_);
consumed_data_ = 0;
}
@@ -264,15 +279,14 @@
// This should automatically grow the backing store. It won't shrink if we
// get a small chunk later. This reduces allocations when we want to append
// more data.
- data_.resize(data_.size() + kReadSize);
+ data_.resize(starting_size + kReadSize);
- ssize_t count = read(fd_, &data_[starting_size], kReadSize);
- data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+ const size_t count =
+ decoder_->Read(data_.begin() + starting_size, data_.end());
+ data_.resize(starting_size + count);
if (count == 0) {
- end_of_file_ = true;
return false;
}
- PCHECK(count > 0);
return true;
}
@@ -1520,5 +1534,4 @@
return "";
}
-} // namespace logger
-} // namespace aos
+} // namespace aos::logger