Introduce interfaces for compressing and decompressing log files

Change-Id: Ia7da3f840a1780a04203f1c312447b50b142a5a3
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 9534860..f64881c 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -15,12 +15,14 @@
 #include <vector>
 
 #include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/buffer_encoder.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffers.h"
 #include "flatbuffers/flatbuffers.h"
 
-namespace aos {
-namespace logger {
+namespace aos::logger {
 
 enum class LogType : uint8_t {
   // The message originated on this node and should be logged here.
@@ -37,10 +39,11 @@
 };
 
 // This class manages efficiently writing a sequence of detached buffers to a
-// file.  It queues them up and batches the write operation.
+// file.  It encodes them, queues them up, and batches the write operation.
 class DetachedBufferWriter {
  public:
-  DetachedBufferWriter(std::string_view filename);
+  DetachedBufferWriter(std::string_view filename,
+                       std::unique_ptr<DetachedBufferEncoder> encoder);
   DetachedBufferWriter(DetachedBufferWriter &&other);
   DetachedBufferWriter(const DetachedBufferWriter &) = delete;
 
@@ -55,39 +58,86 @@
   // it.  The main use case is updating start times after a log file starts.
   void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
 
-  // TODO(austin): Snappy compress the log file if it ends with .snappy!
+  // Queues up a finished FlatBufferBuilder to be encoded and written.
+  //
+  // Triggers a flush if there's enough data queued up.
+  //
+  // Steals the detached buffer from it.
+  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb) {
+    QueueSizedFlatbuffer(fbb->Release());
+  }
+  // May steal the backing storage of buffer, or may leave it alone.
+  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
+    encoder_->Encode(std::move(buffer));
+    FlushAtThreshold();
+  }
 
-  // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
-  // buffer from it.
-  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
-  // Queues up a detached buffer directly.
-  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
-  // Writes a Span.  This is not terribly optimized right now.
-  void WriteSizedFlatbuffer(absl::Span<const uint8_t> span);
+  // Queues up data in span. May copy or may write it to disk immediately.
+  void QueueSpan(absl::Span<const uint8_t> span);
 
-  // Triggers data to be provided to the kernel and written.
-  void Flush();
+  // Returns the total number of bytes written and currently queued.
+  size_t total_bytes() const { return encoder_->total_bytes(); }
 
-  // Returns the number of bytes written.
-  size_t written_size() const { return written_size_; }
-
-  // Returns the number of bytes written or currently queued.
-  size_t total_size() const { return written_size_ + queued_size_; }
+  // The maximum time for a single write call, or 0 if none have been performed.
+  std::chrono::nanoseconds max_write_time() const { return max_write_time_; }
+  // The number of bytes in the longest write call, or -1 if none have been
+  // performed.
+  int max_write_time_bytes() const { return max_write_time_bytes_; }
+  // The number of buffers in the longest write call, or -1 if none have been
+  // performed.
+  int max_write_time_messages() const { return max_write_time_messages_; }
+  // The total time spent in write calls.
+  std::chrono::nanoseconds total_write_time() const {
+    return total_write_time_;
+  }
+  // The total number of writes which have been performed.
+  int total_write_count() const { return total_write_count_; }
+  // The total number of messages which have been written.
+  int total_write_messages() const { return total_write_messages_; }
+  // The total number of bytes which have been written.
+  int total_write_bytes() const { return total_write_bytes_; }
+  void ResetStatistics() {
+    max_write_time_ = std::chrono::nanoseconds::zero();
+    max_write_time_bytes_ = -1;
+    max_write_time_messages_ = -1;
+    total_write_time_ = std::chrono::nanoseconds::zero();
+    total_write_count_ = 0;
+    total_write_messages_ = 0;
+    total_write_bytes_ = 0;
+  }
 
  private:
+  // Performs a single writev call with as much of the data we have queued up as
+  // possible.
+  //
+  // This will normally take all of the data we have queued up, unless an
+  // encoder has spit out a big enough chunk all at once that we can't manage
+  // all of it.
+  void Flush();
+
+  void UpdateStatsForWrite(aos::monotonic_clock::duration duration,
+                           ssize_t written, int iovec_size);
+
+  // Flushes data if we've reached the threshold to do that as part of normal
+  // operation.
+  void FlushAtThreshold();
+
   std::string filename_;
+  std::unique_ptr<DetachedBufferEncoder> encoder_;
 
   int fd_ = -1;
 
-  // Size of all the data in the queue.
-  size_t queued_size_ = 0;
-  size_t written_size_ = 0;
-
-  // List of buffers to flush.
-  std::vector<flatbuffers::DetachedBuffer> queue_;
   // List of iovecs to use with writev.  This is a member variable to avoid
   // churn.
   std::vector<struct iovec> iovec_;
+
+  std::chrono::nanoseconds max_write_time_ = std::chrono::nanoseconds::zero();
+  int max_write_time_bytes_ = -1;
+  int max_write_time_messages_ = -1;
+  std::chrono::nanoseconds total_write_time_ = std::chrono::nanoseconds::zero();
+  int total_write_count_ = 0;
+  int total_write_messages_ = 0;
+  int total_write_bytes_ = 0;
 };
 
 // Packes a message pointed to by the context into a MessageHeader.
@@ -104,8 +154,6 @@
  public:
   SpanReader(std::string_view filename);
 
-  ~SpanReader() { close(fd_); }
-
   std::string_view filename() const { return filename_; }
 
   // Returns a span with the data for a message from the log file, excluding
@@ -121,50 +169,22 @@
   //   Allocate the 256k blocks like we do today.  But, refcount them with
   //   shared_ptr pointed to by the messageheader that is returned.  This avoids
   //   the copy.  Need to do more benchmarking.
+  //   And (Brian): Consider just mmapping the file and handing out refcounted
+  //   pointers into that too.
 
   // Reads a chunk of data into data_.  Returns false if no data was read.
   bool ReadBlock();
 
   const std::string filename_;
 
-  // File descriptor for the log file.
-  int fd_ = -1;
+  // File reader and data decoder.
+  std::unique_ptr<DataDecoder> decoder_;
 
-  // Allocator which doesn't zero initialize memory.
-  template <typename T>
-  struct DefaultInitAllocator {
-    typedef T value_type;
-
-    template <typename U>
-    void construct(U *p) {
-      ::new (static_cast<void *>(p)) U;
-    }
-
-    template <typename U, typename... Args>
-    void construct(U *p, Args &&... args) {
-      ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
-    }
-
-    T *allocate(std::size_t n) {
-      return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
-    }
-
-    template <typename U>
-    void deallocate(U *p, std::size_t /*n*/) {
-      ::operator delete(static_cast<void *>(p));
-    }
-  };
-
-  // Vector to read into.  This uses an allocator which doesn't zero
-  // initialize the memory.
-  std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+  // Vector to read into.
+  ResizeableBuffer data_;
 
   // Amount of data consumed already in data_.
   size_t consumed_data_ = 0;
-
-  // Cached bit for if we have reached the end of the file.  Otherwise we will
-  // hammer on the kernel asking for more data each time we send.
-  bool end_of_file_ = false;
 };
 
 // Class which handles reading the header and messages from the log file.  This
@@ -668,7 +688,6 @@
 // a single node.
 std::string MaybeNodeName(const Node *);
 
-}  // namespace logger
-}  // namespace aos
+}  // namespace aos::logger
 
 #endif  // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_