Add SnappyEncoder/Decoder

The encoder/decoder should work. Haven't actually hooked it into the log
namer yet, though.

Change-Id: Id20be8161d46fe080f89eee8032103977574e7ad
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/events/logging/snappy_encoder.h b/aos/events/logging/snappy_encoder.h
new file mode 100644
index 0000000..57a9a21
--- /dev/null
+++ b/aos/events/logging/snappy_encoder.h
@@ -0,0 +1,92 @@
+#ifndef AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
+#define AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
+
+#include <string_view>
+
+#include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/logging/buffer_encoder.h"
+#include "aos/events/logging/logger_generated.h"
+#include "external/snappy/snappy-sinksource.h"
+#include "external/snappy/snappy.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos::logger {
+
+// Encodes buffers using snappy.
+class SnappyEncoder final : public DetachedBufferEncoder {
+ public:
+  explicit SnappyEncoder(size_t chunk_size = 32768);
+
+  void Encode(flatbuffers::DetachedBuffer &&in) final;
+  void Finish() final;
+  void Clear(int n) final;
+  std::vector<absl::Span<const uint8_t>> queue() const final;
+  size_t queued_bytes() const final;
+  size_t total_bytes() const final { return total_bytes_; }
+  size_t queue_size() const final { return queue_.size(); }
+
+ private:
+  class DetachedBufferSource : public snappy::Source {
+   public:
+    size_t Available() const final;
+    const char *Peek(size_t *length) final;
+    void Skip(size_t n) final;
+    void AppendBuffer(flatbuffers::DetachedBuffer &&buffer);
+
+   private:
+    std::vector<flatbuffers::DetachedBuffer> buffers_;
+    size_t index_into_first_buffer_ = 0;
+  };
+
+  // Flushes buffer_source_ and stores the compressed buffer in queue_.
+  void EncodeCurrentBuffer();
+
+  // To queue up data:
+  // 1) When Encode is called, we use AppendBuffer to store the DetachedBuffer
+  //    in buffer_source_.
+  // 2) Once we've queued up at least chunk_size_ data in buffer_source_, we
+  //    use snappy to compress all the data. This flushes everything out of
+  //    buffer_source_ and adds a single buffer to queue_. Note that we do
+  //    not split up flatbuffer buffers to ensure that we produce chunks of
+  //    exactly chunk_size_ uncompressed data--if we get a 1MB DetachedBuffer
+  //    we will compress it all at once.
+  // 3) queue_ is the data that is actually read by queue() and cleared by
+  //    Clear() to be written to disk.
+  const size_t chunk_size_;
+  DetachedBufferSource buffer_source_;
+  std::optional<uint32_t> accumulated_checksum_;
+  std::vector<ResizeableBuffer> queue_;
+  size_t total_bytes_ = 0;
+};
+
+// Decompresses data with snappy.
+class SnappyDecoder final : public DataDecoder {
+ public:
+  explicit SnappyDecoder(std::unique_ptr<DataDecoder> underlying_decoder)
+      : underlying_decoder_(std::move(underlying_decoder)) {}
+  explicit SnappyDecoder(std::string_view filename)
+      : SnappyDecoder(std::make_unique<DummyDecoder>(filename)) {}
+
+  size_t Read(uint8_t *begin, uint8_t *end) final;
+  std::string_view filename() const final {
+    return underlying_decoder_->filename();
+  }
+
+ private:
+  // decoder to use for reading data out of the file itself.
+  std::unique_ptr<DataDecoder> underlying_decoder_;
+  // Buffer to use for reading data from the file. This being a member variable
+  // is purely an optimization to avoid constant reallocations on every call to
+  // Read().
+  ResizeableBuffer compressed_buffer_;
+  // Buffer of any uncompressed data that we've read but which hasn't yet been
+  // consumed by a call to Read().
+  ResizeableBuffer uncompressed_buffer_;
+
+  size_t total_output_ = 0;
+};
+
+}  // namespace aos::logger
+
+#endif  // AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_