Introduce interfaces for compressing and decompressing log files

Change-Id: Ia7da3f840a1780a04203f1c312447b50b142a5a3
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 26c3825..c7f013d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1,16 +1,15 @@
 #include "aos/events/logging/logfile_utils.h"
 
 #include <fcntl.h>
-#include <limits.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <sys/uio.h>
 
-#include <vector>
+#include <algorithm>
+#include <climits>
 
 #include "absl/strings/escaping.h"
 #include "aos/configuration.h"
-#include "aos/events/logging/logger_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/util/file.h"
 #include "flatbuffers/flatbuffers.h"
@@ -20,13 +19,13 @@
 DEFINE_int32(flush_size, 128000,
              "Number of outstanding bytes to allow before flushing to disk.");
 
-namespace aos {
-namespace logger {
+namespace aos::logger {
 
 namespace chrono = std::chrono;
 
-DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
-    : filename_(filename) {
+DetachedBufferWriter::DetachedBufferWriter(
+    std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+    : filename_(filename), encoder_(std::move(encoder)) {
   util::MkdirP(filename, 0777);
   fd_ = open(std::string(filename).c_str(),
              O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
@@ -35,7 +34,10 @@
 }
 
 DetachedBufferWriter::~DetachedBufferWriter() {
-  Flush();
+  encoder_->Finish();
+  while (encoder_->queue_size() > 0) {
+    Flush();
+  }
   PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
   VLOG(1) << "Closed " << filename_;
 }
@@ -50,77 +52,95 @@
 DetachedBufferWriter &DetachedBufferWriter::operator=(
     DetachedBufferWriter &&other) {
   std::swap(filename_, other.filename_);
+  std::swap(encoder_, other.encoder_);
   std::swap(fd_, other.fd_);
-  std::swap(queued_size_, other.queued_size_);
-  std::swap(written_size_, other.written_size_);
-  std::swap(queue_, other.queue_);
   std::swap(iovec_, other.iovec_);
+  std::swap(max_write_time_, other.max_write_time_);
+  std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
+  std::swap(max_write_time_messages_, other.max_write_time_messages_);
+  std::swap(total_write_time_, other.total_write_time_);
+  std::swap(total_write_count_, other.total_write_count_);
+  std::swap(total_write_messages_, other.total_write_messages_);
+  std::swap(total_write_bytes_, other.total_write_bytes_);
   return *this;
 }
 
-void DetachedBufferWriter::QueueSizedFlatbuffer(
-    flatbuffers::FlatBufferBuilder *fbb) {
-  QueueSizedFlatbuffer(fbb->Release());
-}
+void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
+  if (encoder_->may_bypass() && span.size() > 4096u) {
+    // Over this threshold, we'll assume it's cheaper to add an extra
+    // syscall to write the data immediately instead of copying it to
+    // enqueue.
 
-void DetachedBufferWriter::WriteSizedFlatbuffer(
-    absl::Span<const uint8_t> span) {
-  // Cheat aggressively...  Write out the queued up data, and then write this
-  // data once without buffering.  It is hard to make a DetachedBuffer out of
-  // this data, and we don't want to worry about lifetimes.
-  Flush();
-  iovec_.clear();
-  iovec_.reserve(1);
+    // First, flush everything.
+    while (encoder_->queue_size() > 0u) {
+      Flush();
+    }
 
-  struct iovec n;
-  n.iov_base = const_cast<uint8_t *>(span.data());
-  n.iov_len = span.size();
-  iovec_.emplace_back(n);
-
-  const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
-  PCHECK(written == static_cast<ssize_t>(n.iov_len))
-      << ": Wrote " << written << " expected " << n.iov_len;
-  written_size_ += written;
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
-    flatbuffers::DetachedBuffer &&buffer) {
-  queued_size_ += buffer.size();
-  queue_.emplace_back(std::move(buffer));
-
-  // Flush if we are at the max number of iovs per writev, or have written
-  // enough data.  Otherwise writev will fail with an invalid argument.
-  if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
-      queue_.size() == IOV_MAX) {
-    Flush();
+    // Then, write it directly.
+    const auto start = aos::monotonic_clock::now();
+    const ssize_t written = write(fd_, span.data(), span.size());
+    const auto end = aos::monotonic_clock::now();
+    PCHECK(written >= 0) << ": write failed";
+    CHECK_EQ(written, static_cast<ssize_t>(span.size()))
+        << ": Wrote " << written << " expected " << span.size();
+    UpdateStatsForWrite(end - start, written, 1);
+  } else {
+    encoder_->Encode(CopySpanAsDetachedBuffer(span));
   }
+
+  FlushAtThreshold();
 }
 
 void DetachedBufferWriter::Flush() {
-  if (queue_.size() == 0u) {
+  const auto queue = encoder_->queue();
+  if (queue.empty()) {
     return;
   }
+
   iovec_.clear();
-  iovec_.reserve(queue_.size());
+  const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
+  iovec_.resize(iovec_size);
   size_t counted_size = 0;
-  for (size_t i = 0; i < queue_.size(); ++i) {
-    struct iovec n;
-    n.iov_base = queue_[i].data();
-    n.iov_len = queue_[i].size();
-    counted_size += n.iov_len;
-    iovec_.emplace_back(std::move(n));
+  for (size_t i = 0; i < iovec_size; ++i) {
+    iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
+    iovec_[i].iov_len = queue[i].size();
+    counted_size += iovec_[i].iov_len;
   }
-  CHECK_EQ(counted_size, queued_size_);
+
+  const auto start = aos::monotonic_clock::now();
   const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
-  PCHECK(written == static_cast<ssize_t>(queued_size_))
-      << ": Wrote " << written << " expected " << queued_size_;
-  written_size_ += written;
-
-  queued_size_ = 0;
-  queue_.clear();
+  const auto end = aos::monotonic_clock::now();
+  PCHECK(written >= 0) << ": writev failed";
   // TODO(austin): Handle partial writes in some way other than crashing...
+  CHECK_EQ(written, static_cast<ssize_t>(counted_size))
+      << ": Wrote " << written << " expected " << counted_size;
+
+  encoder_->Clear(iovec_size);
+
+  UpdateStatsForWrite(end - start, written, iovec_size);
+}
+
+void DetachedBufferWriter::UpdateStatsForWrite(
+    aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
+  if (duration > max_write_time_) {
+    max_write_time_ = duration;
+    max_write_time_bytes_ = written;
+    max_write_time_messages_ = iovec_size;
+  }
+  total_write_time_ += duration;
+  ++total_write_count_;
+  total_write_messages_ += iovec_size;
+  total_write_bytes_ += written;
+}
+
+void DetachedBufferWriter::FlushAtThreshold() {
+  // Flush if we are at the max number of iovs per writev, because there's no
+  // point queueing up any more data in memory. Also flush once we have enough
+  // data queued up.
+  while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
+         encoder_->queue_size() >= IOV_MAX) {
+    Flush();
+  }
 }
 
 flatbuffers::Offset<MessageHeader> PackMessage(
@@ -185,10 +205,10 @@
   return message_header_builder.Finish();
 }
 
-SpanReader::SpanReader(std::string_view filename)
-    : filename_(filename),
-      fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
-  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
+  // Support for other kinds of decoders based on the filename should be added
+  // here.
+  decoder_ = std::make_unique<DummyDecoder>(filename);
 }
 
 absl::Span<const uint8_t> SpanReader::ReadMessage() {
@@ -244,18 +264,13 @@
 }
 
 bool SpanReader::ReadBlock() {
-  if (end_of_file_) {
-    return false;
-  }
-
-  // Appends 256k.  This is enough that the read call is efficient.  We don't
-  // want to spend too much time reading small chunks because the syscalls for
-  // that will be expensive.
+  // This is the amount of data we grab at a time. Doing larger chunks minimizes
+  // syscalls and helps decompressors batch things more efficiently.
   constexpr size_t kReadSize = 256 * 1024;
 
   // Strip off any unused data at the front.
   if (consumed_data_ != 0) {
-    data_.erase(data_.begin(), data_.begin() + consumed_data_);
+    data_.erase_front(consumed_data_);
     consumed_data_ = 0;
   }
 
@@ -264,15 +279,14 @@
   // This should automatically grow the backing store.  It won't shrink if we
   // get a small chunk later.  This reduces allocations when we want to append
   // more data.
-  data_.resize(data_.size() + kReadSize);
+  data_.resize(starting_size + kReadSize);
 
-  ssize_t count = read(fd_, &data_[starting_size], kReadSize);
-  data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+  const size_t count =
+      decoder_->Read(data_.begin() + starting_size, data_.end());
+  data_.resize(starting_size + count);
   if (count == 0) {
-    end_of_file_ = true;
     return false;
   }
-  PCHECK(count > 0);
 
   return true;
 }
@@ -1520,5 +1534,4 @@
   return "";
 }
 
-}  // namespace logger
-}  // namespace aos
+}  // namespace aos::logger