| #include "aos/events/logging/snappy_encoder.h" |
| |
| #include "aos/util/crc32.h" |
| #include "snappy.h" |
| |
| namespace aos::logger { |
| // Snappy file format is a series of chunks. Each chunk consists of: |
| // 1-byte: Format identifier. |
| // 3-bytes: Little-endian chunk length, not counting the four bytes of format |
| // + length. |
| // |
| // The relevant chunk identifiers are 0xff (header) and 0x00 (compressed data). |
| // Compressed data's first four bytes are a CRC-32C that is masked by some |
| // magic function. Technically the size of the uncompressed data per chunk |
| // is supposed to be no more than 65536 bytes, but I am too lazy to enforce |
| // that at the moment. |
| // 0x01 is the type for uncompressed data, which has not been implemented here. |
| namespace { |
| const std::vector<uint8_t> kSnappyIdentifierChunk = { |
| 0xFF, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'}; |
| |
| // Magic mask that snappy's framing format requires for some reason. |
| uint32_t MaskChecksum(uint32_t x) { |
| return ((x >> 15) | (x << 17)) + 0xa282ead8; |
| } |
| |
| uint32_t SnappyChecksum(uint8_t *data, size_t size) { |
| return MaskChecksum(ComputeCrc32({data, size})); |
| } |
| } // namespace |
| |
| SnappyEncoder::SnappyEncoder(size_t max_message_size, size_t chunk_size) |
| : chunk_size_(chunk_size), |
| // Make sure we have enough space to always add our largest message to the |
| // chunks we will flush. |
| buffer_source_(max_message_size + chunk_size - 1) { |
| queue_.emplace_back(); |
| queue_.back().resize(kSnappyIdentifierChunk.size()); |
| std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(), |
| queue_.back().begin()); |
| total_bytes_ += queue_.back().size(); |
| } |
| |
| void SnappyEncoder::Finish() { EncodeCurrentBuffer(); } |
| |
| size_t SnappyEncoder::Encode(Copier *copy, size_t start_byte) { |
| CHECK_EQ(start_byte, 0u); |
| buffer_source_.Append(copy); |
| |
| if (buffer_source_.Available() >= chunk_size_) { |
| EncodeCurrentBuffer(); |
| } |
| |
| return copy->size(); |
| } |
| |
| void SnappyEncoder::EncodeCurrentBuffer() { |
| if (buffer_source_.Available() == 0) { |
| return; |
| } |
| const uint32_t uncompressed_checksum = |
| MaskChecksum(buffer_source_.accumulated_checksum()); |
| queue_.emplace_back(); |
| constexpr size_t kPrefixSize = 8; |
| queue_.back().resize(kPrefixSize + |
| snappy::MaxCompressedLength(buffer_source_.Available())); |
| queue_.back().data()[0] = '\x00'; |
| char *const compressed_start = |
| reinterpret_cast<char *>(queue_.back().data()) + kPrefixSize; |
| snappy::UncheckedByteArraySink snappy_sink(compressed_start); |
| const size_t compressed_size = |
| snappy::Compress(&buffer_source_, &snappy_sink); |
| CHECK_LT(compressed_size + kPrefixSize, queue_.back().size()); |
| queue_.back().resize(compressed_size + kPrefixSize); |
| const size_t chunk_size = compressed_size + 4; |
| CHECK_LT(chunk_size, 1U << 24); |
| queue_.back().data()[1] = chunk_size & 0xFF; |
| queue_.back().data()[2] = (chunk_size >> 8) & 0xFF; |
| queue_.back().data()[3] = (chunk_size >> 16) & 0xFF; |
| queue_.back().data()[4] = uncompressed_checksum & 0xFF; |
| queue_.back().data()[5] = (uncompressed_checksum >> 8) & 0xFF; |
| queue_.back().data()[6] = (uncompressed_checksum >> 16) & 0xFF; |
| queue_.back().data()[7] = (uncompressed_checksum >> 24) & 0xFF; |
| |
| total_bytes_ += queue_.back().size(); |
| |
| buffer_source_.ResetAccumulatedChecksum(); |
| CHECK_EQ(0u, buffer_source_.Available()); |
| } |
| |
| void SnappyEncoder::Clear(int n) { |
| CHECK_GE(n, 0); |
| CHECK_LE(static_cast<size_t>(n), queue_size()); |
| queue_.erase(queue_.begin(), queue_.begin() + n); |
| } |
| |
| size_t SnappyEncoder::queued_bytes() const { |
| size_t bytes = 0; |
| for (const auto &buffer : queue_) { |
| bytes += buffer.size(); |
| } |
| return bytes; |
| } |
| |
| absl::Span<const absl::Span<const uint8_t>> SnappyEncoder::queue() { |
| return_queue_.clear(); |
| return_queue_.reserve(queue_.size()); |
| for (const auto &buffer : queue_) { |
| return_queue_.emplace_back(buffer.data(), buffer.size()); |
| } |
| return return_queue_; |
| } |
| |
| SnappyEncoder::DetachedBufferSource::DetachedBufferSource(size_t buffer_size) { |
| data_.reserve(buffer_size); |
| } |
| |
| size_t SnappyEncoder::DetachedBufferSource::Available() const { |
| return data_.size() - index_into_first_buffer_; |
| } |
| |
| const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) { |
| *CHECK_NOTNULL(length) = data_.size() - index_into_first_buffer_; |
| return reinterpret_cast<char *>(data_.data()) + index_into_first_buffer_; |
| } |
| |
| void SnappyEncoder::DetachedBufferSource::Skip(size_t n) { |
| if (n == 0) { |
| return; |
| } |
| |
| CHECK_NE(data_.size(), 0u); |
| |
| index_into_first_buffer_ += n; |
| CHECK_LE(index_into_first_buffer_, data_.size()) |
| << ": " << n << " is too large a skip."; |
| if (index_into_first_buffer_ == data_.size()) { |
| data_.resize(0u); |
| index_into_first_buffer_ = 0; |
| } |
| } |
| |
| void SnappyEncoder::DetachedBufferSource::Append(Copier *copy) { |
| const size_t copy_size = copy->size(); |
| CHECK_LE(copy_size + data_.size(), data_.capacity()); |
| size_t starting_size = data_.size(); |
| data_.resize(starting_size + copy_size); |
| CHECK_EQ(copy->Copy(data_.data() + starting_size, 0, copy_size), copy_size); |
| accumulated_checksum_ = AccumulateCrc32( |
| {data_.data() + starting_size, copy_size}, accumulated_checksum_); |
| } |
| |
| size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) { |
| // Decoding process: |
| // We maintain an uncompressed_buffer_ of data that may have been uncompressed |
| // in the past but which hasn't been read yet. |
| // |
| // When we run out of that buffer and need to actually read new data, we |
| // need to read things from the file in one-chunk units. Snappy expects to |
| // uncompress an entire chunk at once, so there isn't any value in attempting |
| // to be more incremental. Each chunk will be of some variable size (specified |
| // at the start of the chunk). Once we read and uncompress the chunk, we can |
| // then copy it into the user's buffer until either we run out of data and |
| // need to uncompress a new chunk or user's buffer runs out in which case |
| // we fill up their buffer and store the extra data in uncompressed_buffer_ |
| // to be populated later. |
| |
| uint8_t *current_output = begin; |
| if (uncompressed_buffer_.size() > 0) { |
| const size_t amount_to_copy = |
| std::min<size_t>(uncompressed_buffer_.size(), end - current_output); |
| std::memcpy(current_output, uncompressed_buffer_.data(), amount_to_copy); |
| current_output += amount_to_copy; |
| uncompressed_buffer_.erase_front(amount_to_copy); |
| } |
| |
| while (current_output != end) { |
| uint8_t header[4]; |
| const size_t read_length = underlying_decoder_->Read(header, header + 4); |
| if (read_length == 0) { |
| break; |
| } |
| if (read_length != 4u) { |
| LOG(WARNING) << "Logfile data is truncated."; |
| break; |
| } |
| const uint8_t chunk_type = header[0]; |
| const size_t chunk_size = header[1] + (header[2] << 8) + (header[3] << 16); |
| compressed_buffer_.resize(chunk_size); |
| if (chunk_size != |
| underlying_decoder_->Read(compressed_buffer_.data(), |
| compressed_buffer_.data() + chunk_size)) { |
| LOG(WARNING) << "Logfile data is truncated."; |
| break; |
| } |
| if (chunk_type == 0xFF) { |
| // Start of file, don't bother |
| // checking the contents. |
| continue; |
| } else if (chunk_type == 0x00) { |
| if (compressed_buffer_.size() < 4u) { |
| LOG(WARNING) << "Logfile data is truncated."; |
| break; |
| } |
| const uint32_t checksum = compressed_buffer_.data()[0] + |
| (compressed_buffer_.data()[1] << 8) + |
| (compressed_buffer_.data()[2] << 16) + |
| (compressed_buffer_.data()[3] << 24); |
| |
| const char *input_data = |
| reinterpret_cast<char *>(compressed_buffer_.data() + 4); |
| const size_t input_size = compressed_buffer_.size() - 4; |
| |
| size_t uncompressed_length; |
| CHECK(snappy::GetUncompressedLength(input_data, input_size, |
| &uncompressed_length)); |
| |
| // If the user's buffer can fit the entire uncompressed data, we |
| // will uncompress directly into their buffer. Otherwise, we uncompress |
| // into a separate buffer and then copy the correct number of bytes out |
| // to fill up the user's buffer. |
| // TODO(james): In the future, overload a snappy::Sink to avoid |
| // unnecessary copies by extracting the initial N bytes directly into |
| // the user's buffer. |
| if (end > (uncompressed_length + current_output)) { |
| CHECK(snappy::RawUncompress(input_data, input_size, |
| reinterpret_cast<char *>(current_output))) |
| << ": Corrupted snappy chunk."; |
| CHECK_EQ(checksum, SnappyChecksum(current_output, uncompressed_length)) |
| << ": Checksum mismatch."; |
| |
| current_output += uncompressed_length; |
| } else { |
| uncompressed_buffer_.resize(uncompressed_length); |
| CHECK(snappy::RawUncompress( |
| input_data, input_size, |
| reinterpret_cast<char *>(uncompressed_buffer_.data()))) |
| << ": Corrupted snappy chunk."; |
| CHECK_EQ(checksum, SnappyChecksum(uncompressed_buffer_.data(), |
| uncompressed_buffer_.size())) |
| << ": Checksum mismatch."; |
| std::memcpy(current_output, uncompressed_buffer_.data(), |
| end - current_output); |
| uncompressed_buffer_.erase_front(end - current_output); |
| current_output = end; |
| } |
| } else { |
| // Unimplemented. |
| LOG(FATAL) << "Unsupported snappy chunk type " |
| << static_cast<int>(chunk_type); |
| } |
| } |
| total_output_ += current_output - begin; |
| |
| return current_output - begin; |
| } |
| |
| } // namespace aos::logger |