Add SnappyEncoder/Decoder

The encoder/decoder should work. Haven't actually hooked it into the log
namer yet, though.

Change-Id: Id20be8161d46fe080f89eee8032103977574e7ad
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/events/logging/snappy_encoder.cc b/aos/events/logging/snappy_encoder.cc
new file mode 100644
index 0000000..d57b3dd
--- /dev/null
+++ b/aos/events/logging/snappy_encoder.cc
@@ -0,0 +1,249 @@
+#include "aos/events/logging/snappy_encoder.h"
+
+#include "aos/events/logging/crc32.h"
+#include "external/snappy/snappy.h"
+
+namespace aos::logger {
+// Snappy file format is a series of chunks. Each chunk consists of:
+// 1-byte: Format identifier.
+// 3-bytes: Little-endian chunk length, not counting the four bytes of format
+//          + length.
+//
+// The relevant chunk identifiers are 0xff (header) and 0x00 (compressed data).
+// Compressed data's first four bytes are a CRC-32C that is masked by some
+// magic function. Technically the size of the uncompressed data per chunk
+// is supposed to be no more than 65536 bytes, but I am too lazy to enforce
+// that at the moment.
+// 0x01 is the type for uncompressed data, which has not been implemented here.
+namespace {
+const std::vector<uint8_t> kSnappyIdentifierChunk = {
+    0xFF, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'};
+
+// Magic mask that snappy's framing format requires for some reason.
+uint32_t MaskChecksum(uint32_t x) {
+  return ((x >> 15) | (x << 17)) + 0xa282ead8;
+}
+
+uint32_t SnappyChecksum(uint8_t *data, size_t size) {
+  return MaskChecksum(ComputeCrc32({data, size}));
+}
+}  // namespace
+
+SnappyEncoder::SnappyEncoder(size_t chunk_size) : chunk_size_(chunk_size) {
+  queue_.emplace_back();
+  queue_.back().resize(kSnappyIdentifierChunk.size());
+  std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(),
+            queue_.back().begin());
+  total_bytes_ += queue_.back().size();
+}
+
+void SnappyEncoder::Finish() { EncodeCurrentBuffer(); }
+
+void SnappyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+  accumulated_checksum_ =
+      AccumulateCrc32({in.data(), in.size()}, accumulated_checksum_);
+  buffer_source_.AppendBuffer(std::move(in));
+
+  if (buffer_source_.Available() >= chunk_size_) {
+    EncodeCurrentBuffer();
+  }
+}
+
+void SnappyEncoder::EncodeCurrentBuffer() {
+  if (buffer_source_.Available() == 0) {
+    return;
+  }
+  const uint32_t uncompressed_checksum =
+      MaskChecksum(accumulated_checksum_.value());
+  queue_.emplace_back();
+  constexpr size_t kPrefixSize = 8;
+  queue_.back().resize(kPrefixSize +
+                       snappy::MaxCompressedLength(buffer_source_.Available()));
+  queue_.back().data()[0] = '\x00';
+  char *const compressed_start =
+      reinterpret_cast<char *>(queue_.back().data()) + kPrefixSize;
+  snappy::UncheckedByteArraySink snappy_sink(compressed_start);
+  const size_t compressed_size =
+      snappy::Compress(&buffer_source_, &snappy_sink);
+  CHECK_LT(compressed_size + kPrefixSize, queue_.back().size());
+  queue_.back().resize(compressed_size + kPrefixSize);
+  const size_t chunk_size = compressed_size + 4;
+  CHECK_LT(chunk_size, 1U << 24);
+  queue_.back().data()[1] = chunk_size & 0xFF;
+  queue_.back().data()[2] = (chunk_size >> 8) & 0xFF;
+  queue_.back().data()[3] = (chunk_size >> 16) & 0xFF;
+  queue_.back().data()[4] = uncompressed_checksum & 0xFF;
+  queue_.back().data()[5] = (uncompressed_checksum >> 8) & 0xFF;
+  queue_.back().data()[6] = (uncompressed_checksum >> 16) & 0xFF;
+  queue_.back().data()[7] = (uncompressed_checksum >> 24) & 0xFF;
+
+  total_bytes_ += queue_.back().size();
+
+  accumulated_checksum_.reset();
+  CHECK_EQ(0u, buffer_source_.Available());
+}
+
+void SnappyEncoder::Clear(int n) {
+  CHECK_GE(n, 0);
+  CHECK_LE(static_cast<size_t>(n), queue_size());
+  queue_.erase(queue_.begin(), queue_.begin() + n);
+}
+
+size_t SnappyEncoder::queued_bytes() const {
+  size_t bytes = 0;
+  for (const auto &buffer : queue_) {
+    bytes += buffer.size();
+  }
+  return bytes;
+}
+
+std::vector<absl::Span<const uint8_t>> SnappyEncoder::queue() const {
+  std::vector<absl::Span<const uint8_t>> queue;
+  queue.reserve(queue_.size());
+  for (const auto &buffer : queue_) {
+    queue.emplace_back(buffer.data(), buffer.size());
+  }
+  return queue;
+}
+
+size_t SnappyEncoder::DetachedBufferSource::Available() const {
+  size_t total_available = 0;
+  for (const flatbuffers::DetachedBuffer &buffer : buffers_) {
+    total_available += buffer.size();
+  }
+  return total_available;
+}
+
+const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) {
+  *CHECK_NOTNULL(length) = buffers_[0].size() - index_into_first_buffer_;
+  return reinterpret_cast<char *>(buffers_[0].data()) +
+         index_into_first_buffer_;
+}
+
+void SnappyEncoder::DetachedBufferSource::Skip(size_t n) {
+  if (n == 0) {
+    return;
+  }
+
+  CHECK(!buffers_.empty());
+
+  index_into_first_buffer_ += n;
+  CHECK_LE(index_into_first_buffer_, buffers_[0].size())
+      << ": " << n << " is too large a skip.";
+  if (index_into_first_buffer_ == buffers_[0].size()) {
+    buffers_.erase(buffers_.begin());
+    index_into_first_buffer_ = 0;
+  }
+}
+
+void SnappyEncoder::DetachedBufferSource::AppendBuffer(
+    flatbuffers::DetachedBuffer &&buffer) {
+  buffers_.emplace_back(std::move(buffer));
+}
+
+size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) {
+  // Decoding process:
+  // We maintain an uncompressed_buffer_ of data that may have been uncompressed
+  // in the past but which hasn't been read yet.
+  //
+  // When we run out of that buffer and need to actually read new data, we
+  // need to read things from the file in one-chunk units. Snappy expects to
+  // uncompress an entire chunk at once, so there isn't any value in attempting
+  // to be more incremental. Each chunk will be of some variable size (specified
+  // at the start of the chunk). Once we read and uncompress the chunk, we can
+  // then copy it into the user's buffer until either we run out of data and
+  // need to uncompress a new chunk or user's buffer runs out in which case
+  // we fill up their buffer and store the extra data in uncompressed_buffer_
+  // to be populated later.
+
+  uint8_t *current_output = begin;
+  if (uncompressed_buffer_.size() > 0) {
+    const size_t amount_to_copy =
+        std::min<size_t>(uncompressed_buffer_.size(), end - current_output);
+    std::memcpy(current_output, uncompressed_buffer_.data(), amount_to_copy);
+    current_output += amount_to_copy;
+    uncompressed_buffer_.erase_front(amount_to_copy);
+  }
+
+  while (current_output != end) {
+    uint8_t header[4];
+    const size_t read_length = underlying_decoder_->Read(header, header + 4);
+    if (read_length == 0) {
+      break;
+    }
+    if (read_length != 4u) {
+      LOG(WARNING) << "Logfile data is truncated.";
+      break;
+    }
+    const uint8_t chunk_type = header[0];
+    const size_t chunk_size = header[1] + (header[2] << 8) + (header[3] << 16);
+    compressed_buffer_.resize(chunk_size);
+    if (chunk_size !=
+        underlying_decoder_->Read(compressed_buffer_.data(),
+                                  compressed_buffer_.data() + chunk_size)) {
+      LOG(WARNING) << "Logfile data is truncated.";
+      break;
+    }
+    if (chunk_type == 0xFF) {
+      // Start of file, don't bother
+      // checking the contents.
+      continue;
+    } else if (chunk_type == 0x00) {
+      // Note: This section uses CHECKs rather than terminating more silently
+      // because any of these CHECK's failing implies outright data corruption
+      // rather than mere truncation.
+      CHECK_LT(4u, compressed_buffer_.size()) << ": Missing checksum.";
+      const uint32_t checksum = compressed_buffer_.data()[0] +
+                                (compressed_buffer_.data()[1] << 8) +
+                                (compressed_buffer_.data()[2] << 16) +
+                                (compressed_buffer_.data()[3] << 24);
+
+      const char *input_data =
+          reinterpret_cast<char *>(compressed_buffer_.data() + 4);
+      const size_t input_size = compressed_buffer_.size() - 4;
+
+      size_t uncompressed_length;
+      CHECK(snappy::GetUncompressedLength(input_data, input_size,
+                                          &uncompressed_length));
+
+      // If the user's buffer can fit the entire uncompressed data, we
+      // will uncompress directly into their buffer. Otherwise, we uncompress
+      // into a separate buffer and then copy the correct number of bytes out
+      // to fill up the user's buffer.
+      // TODO(james): In the future, overload a snappy::Sink to avoid
+      // unnecessary copies by extracting the initial N bytes directly into
+      // the user's buffer.
+      if (end > (uncompressed_length + current_output)) {
+        CHECK(snappy::RawUncompress(input_data, input_size,
+                                    reinterpret_cast<char *>(current_output)))
+            << ": Corrupted snappy chunk.";
+        CHECK_EQ(checksum, SnappyChecksum(current_output, uncompressed_length))
+            << ": Checksum mismatch.";
+
+        current_output += uncompressed_length;
+      } else {
+        uncompressed_buffer_.resize(uncompressed_length);
+        CHECK(snappy::RawUncompress(
+            input_data, input_size,
+            reinterpret_cast<char *>(uncompressed_buffer_.data())))
+            << ": Corrupted snappy chunk.";
+        CHECK_EQ(checksum, SnappyChecksum(uncompressed_buffer_.data(),
+                                          uncompressed_buffer_.size()))
+            << ": Checksum mismatch.";
+        std::memcpy(current_output, uncompressed_buffer_.data(),
+                    end - current_output);
+        uncompressed_buffer_.erase_front(end - current_output);
+        current_output = end;
+      }
+    } else {
+      // Unimplemented.
+      LOG(FATAL) << "Unsupported snappy chunk type "
+                 << static_cast<int>(chunk_type);
+    }
+  }
+  total_output_ += current_output - begin;
+
+  return current_output - begin;
+}
+
+}  // namespace aos::logger