Add SnappyEncoder/Decoder
The encoder/decoder should work. Haven't actually hooked it into the log
namer yet, though.
Change-Id: Id20be8161d46fe080f89eee8032103977574e7ad
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/events/logging/snappy_encoder.cc b/aos/events/logging/snappy_encoder.cc
new file mode 100644
index 0000000..d57b3dd
--- /dev/null
+++ b/aos/events/logging/snappy_encoder.cc
@@ -0,0 +1,249 @@
+#include "aos/events/logging/snappy_encoder.h"
+
+#include "aos/events/logging/crc32.h"
+#include "external/snappy/snappy.h"
+
+namespace aos::logger {
+// Snappy file format is a series of chunks. Each chunk consists of:
+// 1-byte: Format identifier.
+// 3-bytes: Little-endian chunk length, not counting the four bytes of format
+// + length.
+//
+// The relevant chunk identifiers are 0xff (header) and 0x00 (compressed data).
+// Compressed data's first four bytes are a CRC-32C that is masked by some
+// magic function. Technically the size of the uncompressed data per chunk
+// is supposed to be no more than 65536 bytes, but I am too lazy to enforce
+// that at the moment.
+// 0x01 is the type for uncompressed data, which has not been implemented here.
+namespace {
+const std::vector<uint8_t> kSnappyIdentifierChunk = {
+ 0xFF, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'};
+
+// Magic mask that snappy's framing format requires for some reason.
+uint32_t MaskChecksum(uint32_t x) {
+ return ((x >> 15) | (x << 17)) + 0xa282ead8;
+}
+
+uint32_t SnappyChecksum(uint8_t *data, size_t size) {
+ return MaskChecksum(ComputeCrc32({data, size}));
+}
+} // namespace
+
+SnappyEncoder::SnappyEncoder(size_t chunk_size) : chunk_size_(chunk_size) {
+ queue_.emplace_back();
+ queue_.back().resize(kSnappyIdentifierChunk.size());
+ std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(),
+ queue_.back().begin());
+ total_bytes_ += queue_.back().size();
+}
+
+void SnappyEncoder::Finish() { EncodeCurrentBuffer(); }
+
+void SnappyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+ accumulated_checksum_ =
+ AccumulateCrc32({in.data(), in.size()}, accumulated_checksum_);
+ buffer_source_.AppendBuffer(std::move(in));
+
+ if (buffer_source_.Available() >= chunk_size_) {
+ EncodeCurrentBuffer();
+ }
+}
+
+void SnappyEncoder::EncodeCurrentBuffer() {
+ if (buffer_source_.Available() == 0) {
+ return;
+ }
+ const uint32_t uncompressed_checksum =
+ MaskChecksum(accumulated_checksum_.value());
+ queue_.emplace_back();
+ constexpr size_t kPrefixSize = 8;
+ queue_.back().resize(kPrefixSize +
+ snappy::MaxCompressedLength(buffer_source_.Available()));
+ queue_.back().data()[0] = '\x00';
+ char *const compressed_start =
+ reinterpret_cast<char *>(queue_.back().data()) + kPrefixSize;
+ snappy::UncheckedByteArraySink snappy_sink(compressed_start);
+ const size_t compressed_size =
+ snappy::Compress(&buffer_source_, &snappy_sink);
+ CHECK_LT(compressed_size + kPrefixSize, queue_.back().size());
+ queue_.back().resize(compressed_size + kPrefixSize);
+ const size_t chunk_size = compressed_size + 4;
+ CHECK_LT(chunk_size, 1U << 24);
+ queue_.back().data()[1] = chunk_size & 0xFF;
+ queue_.back().data()[2] = (chunk_size >> 8) & 0xFF;
+ queue_.back().data()[3] = (chunk_size >> 16) & 0xFF;
+ queue_.back().data()[4] = uncompressed_checksum & 0xFF;
+ queue_.back().data()[5] = (uncompressed_checksum >> 8) & 0xFF;
+ queue_.back().data()[6] = (uncompressed_checksum >> 16) & 0xFF;
+ queue_.back().data()[7] = (uncompressed_checksum >> 24) & 0xFF;
+
+ total_bytes_ += queue_.back().size();
+
+ accumulated_checksum_.reset();
+ CHECK_EQ(0u, buffer_source_.Available());
+}
+
+void SnappyEncoder::Clear(int n) {
+ CHECK_GE(n, 0);
+ CHECK_LE(static_cast<size_t>(n), queue_size());
+ queue_.erase(queue_.begin(), queue_.begin() + n);
+}
+
+size_t SnappyEncoder::queued_bytes() const {
+ size_t bytes = 0;
+ for (const auto &buffer : queue_) {
+ bytes += buffer.size();
+ }
+ return bytes;
+}
+
+std::vector<absl::Span<const uint8_t>> SnappyEncoder::queue() const {
+ std::vector<absl::Span<const uint8_t>> queue;
+ queue.reserve(queue_.size());
+ for (const auto &buffer : queue_) {
+ queue.emplace_back(buffer.data(), buffer.size());
+ }
+ return queue;
+}
+
+size_t SnappyEncoder::DetachedBufferSource::Available() const {
+ size_t total_available = 0;
+ for (const flatbuffers::DetachedBuffer &buffer : buffers_) {
+ total_available += buffer.size();
+ }
+ return total_available;
+}
+
+const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) {
+ *CHECK_NOTNULL(length) = buffers_[0].size() - index_into_first_buffer_;
+ return reinterpret_cast<char *>(buffers_[0].data()) +
+ index_into_first_buffer_;
+}
+
+void SnappyEncoder::DetachedBufferSource::Skip(size_t n) {
+ if (n == 0) {
+ return;
+ }
+
+ CHECK(!buffers_.empty());
+
+ index_into_first_buffer_ += n;
+ CHECK_LE(index_into_first_buffer_, buffers_[0].size())
+ << ": " << n << " is too large a skip.";
+ if (index_into_first_buffer_ == buffers_[0].size()) {
+ buffers_.erase(buffers_.begin());
+ index_into_first_buffer_ = 0;
+ }
+}
+
+void SnappyEncoder::DetachedBufferSource::AppendBuffer(
+ flatbuffers::DetachedBuffer &&buffer) {
+ buffers_.emplace_back(std::move(buffer));
+}
+
+size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) {
+ // Decoding process:
+ // We maintain an uncompressed_buffer_ of data that may have been uncompressed
+ // in the past but which hasn't been read yet.
+ //
+ // When we run out of that buffer and need to actually read new data, we
+ // need to read things from the file in one-chunk units. Snappy expects to
+ // uncompress an entire chunk at once, so there isn't any value in attempting
+ // to be more incremental. Each chunk will be of some variable size (specified
+ // at the start of the chunk). Once we read and uncompress the chunk, we can
+ // then copy it into the user's buffer until either we run out of data and
+ // need to uncompress a new chunk or user's buffer runs out in which case
+ // we fill up their buffer and store the extra data in uncompressed_buffer_
+ // to be populated later.
+
+ uint8_t *current_output = begin;
+ if (uncompressed_buffer_.size() > 0) {
+ const size_t amount_to_copy =
+ std::min<size_t>(uncompressed_buffer_.size(), end - current_output);
+ std::memcpy(current_output, uncompressed_buffer_.data(), amount_to_copy);
+ current_output += amount_to_copy;
+ uncompressed_buffer_.erase_front(amount_to_copy);
+ }
+
+ while (current_output != end) {
+ uint8_t header[4];
+ const size_t read_length = underlying_decoder_->Read(header, header + 4);
+ if (read_length == 0) {
+ break;
+ }
+ if (read_length != 4u) {
+ LOG(WARNING) << "Logfile data is truncated.";
+ break;
+ }
+ const uint8_t chunk_type = header[0];
+ const size_t chunk_size = header[1] + (header[2] << 8) + (header[3] << 16);
+ compressed_buffer_.resize(chunk_size);
+ if (chunk_size !=
+ underlying_decoder_->Read(compressed_buffer_.data(),
+ compressed_buffer_.data() + chunk_size)) {
+ LOG(WARNING) << "Logfile data is truncated.";
+ break;
+ }
+ if (chunk_type == 0xFF) {
+ // Start of file, don't bother
+ // checking the contents.
+ continue;
+ } else if (chunk_type == 0x00) {
+ // Note: This section uses CHECKs rather than terminating more silently
+ // because any of these CHECK's failing implies outright data corruption
+ // rather than mere truncation.
+ CHECK_LT(4u, compressed_buffer_.size()) << ": Missing checksum.";
+ const uint32_t checksum = compressed_buffer_.data()[0] +
+ (compressed_buffer_.data()[1] << 8) +
+ (compressed_buffer_.data()[2] << 16) +
+ (compressed_buffer_.data()[3] << 24);
+
+ const char *input_data =
+ reinterpret_cast<char *>(compressed_buffer_.data() + 4);
+ const size_t input_size = compressed_buffer_.size() - 4;
+
+ size_t uncompressed_length;
+ CHECK(snappy::GetUncompressedLength(input_data, input_size,
+ &uncompressed_length));
+
+ // If the user's buffer can fit the entire uncompressed data, we
+ // will uncompress directly into their buffer. Otherwise, we uncompress
+ // into a separate buffer and then copy the correct number of bytes out
+ // to fill up the user's buffer.
+ // TODO(james): In the future, overload a snappy::Sink to avoid
+ // unnecessary copies by extracting the initial N bytes directly into
+ // the user's buffer.
+ if (end > (uncompressed_length + current_output)) {
+ CHECK(snappy::RawUncompress(input_data, input_size,
+ reinterpret_cast<char *>(current_output)))
+ << ": Corrupted snappy chunk.";
+ CHECK_EQ(checksum, SnappyChecksum(current_output, uncompressed_length))
+ << ": Checksum mismatch.";
+
+ current_output += uncompressed_length;
+ } else {
+ uncompressed_buffer_.resize(uncompressed_length);
+ CHECK(snappy::RawUncompress(
+ input_data, input_size,
+ reinterpret_cast<char *>(uncompressed_buffer_.data())))
+ << ": Corrupted snappy chunk.";
+ CHECK_EQ(checksum, SnappyChecksum(uncompressed_buffer_.data(),
+ uncompressed_buffer_.size()))
+ << ": Checksum mismatch.";
+ std::memcpy(current_output, uncompressed_buffer_.data(),
+ end - current_output);
+ uncompressed_buffer_.erase_front(end - current_output);
+ current_output = end;
+ }
+ } else {
+ // Unimplemented.
+ LOG(FATAL) << "Unsupported snappy chunk type "
+ << static_cast<int>(chunk_type);
+ }
+ }
+ total_output_ += current_output - begin;
+
+ return current_output - begin;
+}
+
+} // namespace aos::logger