James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 1 | #include "aos/events/logging/snappy_encoder.h" |
| 2 | |
Ravago Jones | e12b790 | 2022-02-04 22:50:44 -0800 | [diff] [blame] | 3 | #include "aos/util/crc32.h" |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 4 | #include "external/snappy/snappy.h" |
| 5 | |
| 6 | namespace aos::logger { |
| 7 | // Snappy file format is a series of chunks. Each chunk consists of: |
| 8 | // 1-byte: Format identifier. |
| 9 | // 3-bytes: Little-endian chunk length, not counting the four bytes of format |
| 10 | // + length. |
| 11 | // |
| 12 | // The relevant chunk identifiers are 0xff (header) and 0x00 (compressed data). |
| 13 | // Compressed data's first four bytes are a CRC-32C that is masked by some |
| 14 | // magic function. Technically the size of the uncompressed data per chunk |
| 15 | // is supposed to be no more than 65536 bytes, but I am too lazy to enforce |
| 16 | // that at the moment. |
| 17 | // 0x01 is the type for uncompressed data, which has not been implemented here. |
| 18 | namespace { |
| 19 | const std::vector<uint8_t> kSnappyIdentifierChunk = { |
| 20 | 0xFF, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'}; |
| 21 | |
| 22 | // Magic mask that snappy's framing format requires for some reason. |
| 23 | uint32_t MaskChecksum(uint32_t x) { |
| 24 | return ((x >> 15) | (x << 17)) + 0xa282ead8; |
| 25 | } |
| 26 | |
| 27 | uint32_t SnappyChecksum(uint8_t *data, size_t size) { |
| 28 | return MaskChecksum(ComputeCrc32({data, size})); |
| 29 | } |
| 30 | } // namespace |
| 31 | |
| 32 | SnappyEncoder::SnappyEncoder(size_t chunk_size) : chunk_size_(chunk_size) { |
| 33 | queue_.emplace_back(); |
| 34 | queue_.back().resize(kSnappyIdentifierChunk.size()); |
| 35 | std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(), |
| 36 | queue_.back().begin()); |
| 37 | total_bytes_ += queue_.back().size(); |
| 38 | } |
| 39 | |
| 40 | void SnappyEncoder::Finish() { EncodeCurrentBuffer(); } |
| 41 | |
| 42 | void SnappyEncoder::Encode(flatbuffers::DetachedBuffer &&in) { |
| 43 | accumulated_checksum_ = |
| 44 | AccumulateCrc32({in.data(), in.size()}, accumulated_checksum_); |
| 45 | buffer_source_.AppendBuffer(std::move(in)); |
| 46 | |
| 47 | if (buffer_source_.Available() >= chunk_size_) { |
| 48 | EncodeCurrentBuffer(); |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | void SnappyEncoder::EncodeCurrentBuffer() { |
| 53 | if (buffer_source_.Available() == 0) { |
| 54 | return; |
| 55 | } |
| 56 | const uint32_t uncompressed_checksum = |
| 57 | MaskChecksum(accumulated_checksum_.value()); |
| 58 | queue_.emplace_back(); |
| 59 | constexpr size_t kPrefixSize = 8; |
| 60 | queue_.back().resize(kPrefixSize + |
| 61 | snappy::MaxCompressedLength(buffer_source_.Available())); |
| 62 | queue_.back().data()[0] = '\x00'; |
| 63 | char *const compressed_start = |
| 64 | reinterpret_cast<char *>(queue_.back().data()) + kPrefixSize; |
| 65 | snappy::UncheckedByteArraySink snappy_sink(compressed_start); |
| 66 | const size_t compressed_size = |
| 67 | snappy::Compress(&buffer_source_, &snappy_sink); |
| 68 | CHECK_LT(compressed_size + kPrefixSize, queue_.back().size()); |
| 69 | queue_.back().resize(compressed_size + kPrefixSize); |
| 70 | const size_t chunk_size = compressed_size + 4; |
| 71 | CHECK_LT(chunk_size, 1U << 24); |
| 72 | queue_.back().data()[1] = chunk_size & 0xFF; |
| 73 | queue_.back().data()[2] = (chunk_size >> 8) & 0xFF; |
| 74 | queue_.back().data()[3] = (chunk_size >> 16) & 0xFF; |
| 75 | queue_.back().data()[4] = uncompressed_checksum & 0xFF; |
| 76 | queue_.back().data()[5] = (uncompressed_checksum >> 8) & 0xFF; |
| 77 | queue_.back().data()[6] = (uncompressed_checksum >> 16) & 0xFF; |
| 78 | queue_.back().data()[7] = (uncompressed_checksum >> 24) & 0xFF; |
| 79 | |
| 80 | total_bytes_ += queue_.back().size(); |
| 81 | |
| 82 | accumulated_checksum_.reset(); |
| 83 | CHECK_EQ(0u, buffer_source_.Available()); |
| 84 | } |
| 85 | |
| 86 | void SnappyEncoder::Clear(int n) { |
| 87 | CHECK_GE(n, 0); |
| 88 | CHECK_LE(static_cast<size_t>(n), queue_size()); |
| 89 | queue_.erase(queue_.begin(), queue_.begin() + n); |
| 90 | } |
| 91 | |
| 92 | size_t SnappyEncoder::queued_bytes() const { |
| 93 | size_t bytes = 0; |
| 94 | for (const auto &buffer : queue_) { |
| 95 | bytes += buffer.size(); |
| 96 | } |
| 97 | return bytes; |
| 98 | } |
| 99 | |
| 100 | std::vector<absl::Span<const uint8_t>> SnappyEncoder::queue() const { |
| 101 | std::vector<absl::Span<const uint8_t>> queue; |
| 102 | queue.reserve(queue_.size()); |
| 103 | for (const auto &buffer : queue_) { |
| 104 | queue.emplace_back(buffer.data(), buffer.size()); |
| 105 | } |
| 106 | return queue; |
| 107 | } |
| 108 | |
| 109 | size_t SnappyEncoder::DetachedBufferSource::Available() const { |
| 110 | size_t total_available = 0; |
| 111 | for (const flatbuffers::DetachedBuffer &buffer : buffers_) { |
| 112 | total_available += buffer.size(); |
| 113 | } |
| 114 | return total_available; |
| 115 | } |
| 116 | |
| 117 | const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) { |
| 118 | *CHECK_NOTNULL(length) = buffers_[0].size() - index_into_first_buffer_; |
| 119 | return reinterpret_cast<char *>(buffers_[0].data()) + |
| 120 | index_into_first_buffer_; |
| 121 | } |
| 122 | |
| 123 | void SnappyEncoder::DetachedBufferSource::Skip(size_t n) { |
| 124 | if (n == 0) { |
| 125 | return; |
| 126 | } |
| 127 | |
| 128 | CHECK(!buffers_.empty()); |
| 129 | |
| 130 | index_into_first_buffer_ += n; |
| 131 | CHECK_LE(index_into_first_buffer_, buffers_[0].size()) |
| 132 | << ": " << n << " is too large a skip."; |
| 133 | if (index_into_first_buffer_ == buffers_[0].size()) { |
| 134 | buffers_.erase(buffers_.begin()); |
| 135 | index_into_first_buffer_ = 0; |
| 136 | } |
| 137 | } |
| 138 | |
| 139 | void SnappyEncoder::DetachedBufferSource::AppendBuffer( |
| 140 | flatbuffers::DetachedBuffer &&buffer) { |
| 141 | buffers_.emplace_back(std::move(buffer)); |
| 142 | } |
| 143 | |
| 144 | size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) { |
| 145 | // Decoding process: |
| 146 | // We maintain an uncompressed_buffer_ of data that may have been uncompressed |
| 147 | // in the past but which hasn't been read yet. |
| 148 | // |
| 149 | // When we run out of that buffer and need to actually read new data, we |
| 150 | // need to read things from the file in one-chunk units. Snappy expects to |
| 151 | // uncompress an entire chunk at once, so there isn't any value in attempting |
| 152 | // to be more incremental. Each chunk will be of some variable size (specified |
| 153 | // at the start of the chunk). Once we read and uncompress the chunk, we can |
| 154 | // then copy it into the user's buffer until either we run out of data and |
| 155 | // need to uncompress a new chunk or user's buffer runs out in which case |
| 156 | // we fill up their buffer and store the extra data in uncompressed_buffer_ |
| 157 | // to be populated later. |
| 158 | |
| 159 | uint8_t *current_output = begin; |
| 160 | if (uncompressed_buffer_.size() > 0) { |
| 161 | const size_t amount_to_copy = |
| 162 | std::min<size_t>(uncompressed_buffer_.size(), end - current_output); |
| 163 | std::memcpy(current_output, uncompressed_buffer_.data(), amount_to_copy); |
| 164 | current_output += amount_to_copy; |
| 165 | uncompressed_buffer_.erase_front(amount_to_copy); |
| 166 | } |
| 167 | |
| 168 | while (current_output != end) { |
| 169 | uint8_t header[4]; |
| 170 | const size_t read_length = underlying_decoder_->Read(header, header + 4); |
| 171 | if (read_length == 0) { |
| 172 | break; |
| 173 | } |
| 174 | if (read_length != 4u) { |
| 175 | LOG(WARNING) << "Logfile data is truncated."; |
| 176 | break; |
| 177 | } |
| 178 | const uint8_t chunk_type = header[0]; |
| 179 | const size_t chunk_size = header[1] + (header[2] << 8) + (header[3] << 16); |
| 180 | compressed_buffer_.resize(chunk_size); |
| 181 | if (chunk_size != |
| 182 | underlying_decoder_->Read(compressed_buffer_.data(), |
| 183 | compressed_buffer_.data() + chunk_size)) { |
| 184 | LOG(WARNING) << "Logfile data is truncated."; |
| 185 | break; |
| 186 | } |
| 187 | if (chunk_type == 0xFF) { |
| 188 | // Start of file, don't bother |
| 189 | // checking the contents. |
| 190 | continue; |
| 191 | } else if (chunk_type == 0x00) { |
James Kuszmaul | d54c77c | 2021-11-11 08:11:52 -0800 | [diff] [blame] | 192 | if (compressed_buffer_.size() < 4u) { |
| 193 | LOG(WARNING) << "Logfile data is truncated."; |
| 194 | break; |
| 195 | } |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 196 | const uint32_t checksum = compressed_buffer_.data()[0] + |
| 197 | (compressed_buffer_.data()[1] << 8) + |
| 198 | (compressed_buffer_.data()[2] << 16) + |
| 199 | (compressed_buffer_.data()[3] << 24); |
| 200 | |
| 201 | const char *input_data = |
| 202 | reinterpret_cast<char *>(compressed_buffer_.data() + 4); |
| 203 | const size_t input_size = compressed_buffer_.size() - 4; |
| 204 | |
| 205 | size_t uncompressed_length; |
| 206 | CHECK(snappy::GetUncompressedLength(input_data, input_size, |
| 207 | &uncompressed_length)); |
| 208 | |
| 209 | // If the user's buffer can fit the entire uncompressed data, we |
| 210 | // will uncompress directly into their buffer. Otherwise, we uncompress |
| 211 | // into a separate buffer and then copy the correct number of bytes out |
| 212 | // to fill up the user's buffer. |
| 213 | // TODO(james): In the future, overload a snappy::Sink to avoid |
| 214 | // unnecessary copies by extracting the initial N bytes directly into |
| 215 | // the user's buffer. |
| 216 | if (end > (uncompressed_length + current_output)) { |
| 217 | CHECK(snappy::RawUncompress(input_data, input_size, |
| 218 | reinterpret_cast<char *>(current_output))) |
| 219 | << ": Corrupted snappy chunk."; |
| 220 | CHECK_EQ(checksum, SnappyChecksum(current_output, uncompressed_length)) |
| 221 | << ": Checksum mismatch."; |
| 222 | |
| 223 | current_output += uncompressed_length; |
| 224 | } else { |
| 225 | uncompressed_buffer_.resize(uncompressed_length); |
| 226 | CHECK(snappy::RawUncompress( |
| 227 | input_data, input_size, |
| 228 | reinterpret_cast<char *>(uncompressed_buffer_.data()))) |
| 229 | << ": Corrupted snappy chunk."; |
| 230 | CHECK_EQ(checksum, SnappyChecksum(uncompressed_buffer_.data(), |
| 231 | uncompressed_buffer_.size())) |
| 232 | << ": Checksum mismatch."; |
| 233 | std::memcpy(current_output, uncompressed_buffer_.data(), |
| 234 | end - current_output); |
| 235 | uncompressed_buffer_.erase_front(end - current_output); |
| 236 | current_output = end; |
| 237 | } |
| 238 | } else { |
| 239 | // Unimplemented. |
| 240 | LOG(FATAL) << "Unsupported snappy chunk type " |
| 241 | << static_cast<int>(chunk_type); |
| 242 | } |
| 243 | } |
| 244 | total_output_ += current_output - begin; |
| 245 | |
| 246 | return current_output - begin; |
| 247 | } |
| 248 | |
| 249 | } // namespace aos::logger |