James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 1 | #include "aos/events/logging/snappy_encoder.h" |
| 2 | |
Ravago Jones | e12b790 | 2022-02-04 22:50:44 -0800 | [diff] [blame] | 3 | #include "aos/util/crc32.h" |
Austin Schuh | d5bd91a | 2022-09-16 15:11:54 -0700 | [diff] [blame] | 4 | #include "snappy.h" |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 5 | |
| 6 | namespace aos::logger { |
| 7 | // Snappy file format is a series of chunks. Each chunk consists of: |
| 8 | // 1-byte: Format identifier. |
| 9 | // 3-bytes: Little-endian chunk length, not counting the four bytes of format |
| 10 | // + length. |
| 11 | // |
| 12 | // The relevant chunk identifiers are 0xff (header) and 0x00 (compressed data). |
| 13 | // Compressed data's first four bytes are a CRC-32C that is masked by some |
| 14 | // magic function. Technically the size of the uncompressed data per chunk |
| 15 | // is supposed to be no more than 65536 bytes, but I am too lazy to enforce |
| 16 | // that at the moment. |
| 17 | // 0x01 is the type for uncompressed data, which has not been implemented here. |
| 18 | namespace { |
| 19 | const std::vector<uint8_t> kSnappyIdentifierChunk = { |
| 20 | 0xFF, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'}; |
| 21 | |
| 22 | // Magic mask that snappy's framing format requires for some reason. |
| 23 | uint32_t MaskChecksum(uint32_t x) { |
| 24 | return ((x >> 15) | (x << 17)) + 0xa282ead8; |
| 25 | } |
| 26 | |
| 27 | uint32_t SnappyChecksum(uint8_t *data, size_t size) { |
| 28 | return MaskChecksum(ComputeCrc32({data, size})); |
| 29 | } |
| 30 | } // namespace |
| 31 | |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 32 | SnappyEncoder::SnappyEncoder(size_t max_message_size, size_t chunk_size) |
| 33 | : chunk_size_(chunk_size), |
| 34 | // Make sure we have enough space to always add our largest message to the |
| 35 | // chunks we will flush. |
| 36 | buffer_source_(max_message_size + chunk_size - 1) { |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 37 | queue_.emplace_back(); |
| 38 | queue_.back().resize(kSnappyIdentifierChunk.size()); |
| 39 | std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(), |
| 40 | queue_.back().begin()); |
| 41 | total_bytes_ += queue_.back().size(); |
| 42 | } |
| 43 | |
| 44 | void SnappyEncoder::Finish() { EncodeCurrentBuffer(); } |
| 45 | |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 46 | void SnappyEncoder::Encode(Copier *copy) { |
| 47 | buffer_source_.Append(copy); |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 48 | |
| 49 | if (buffer_source_.Available() >= chunk_size_) { |
| 50 | EncodeCurrentBuffer(); |
| 51 | } |
| 52 | } |
| 53 | |
| 54 | void SnappyEncoder::EncodeCurrentBuffer() { |
| 55 | if (buffer_source_.Available() == 0) { |
| 56 | return; |
| 57 | } |
| 58 | const uint32_t uncompressed_checksum = |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 59 | MaskChecksum(buffer_source_.accumulated_checksum()); |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 60 | queue_.emplace_back(); |
| 61 | constexpr size_t kPrefixSize = 8; |
| 62 | queue_.back().resize(kPrefixSize + |
| 63 | snappy::MaxCompressedLength(buffer_source_.Available())); |
| 64 | queue_.back().data()[0] = '\x00'; |
| 65 | char *const compressed_start = |
| 66 | reinterpret_cast<char *>(queue_.back().data()) + kPrefixSize; |
| 67 | snappy::UncheckedByteArraySink snappy_sink(compressed_start); |
| 68 | const size_t compressed_size = |
| 69 | snappy::Compress(&buffer_source_, &snappy_sink); |
| 70 | CHECK_LT(compressed_size + kPrefixSize, queue_.back().size()); |
| 71 | queue_.back().resize(compressed_size + kPrefixSize); |
| 72 | const size_t chunk_size = compressed_size + 4; |
| 73 | CHECK_LT(chunk_size, 1U << 24); |
| 74 | queue_.back().data()[1] = chunk_size & 0xFF; |
| 75 | queue_.back().data()[2] = (chunk_size >> 8) & 0xFF; |
| 76 | queue_.back().data()[3] = (chunk_size >> 16) & 0xFF; |
| 77 | queue_.back().data()[4] = uncompressed_checksum & 0xFF; |
| 78 | queue_.back().data()[5] = (uncompressed_checksum >> 8) & 0xFF; |
| 79 | queue_.back().data()[6] = (uncompressed_checksum >> 16) & 0xFF; |
| 80 | queue_.back().data()[7] = (uncompressed_checksum >> 24) & 0xFF; |
| 81 | |
| 82 | total_bytes_ += queue_.back().size(); |
| 83 | |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 84 | buffer_source_.ResetAccumulatedChecksum(); |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 85 | CHECK_EQ(0u, buffer_source_.Available()); |
| 86 | } |
| 87 | |
| 88 | void SnappyEncoder::Clear(int n) { |
| 89 | CHECK_GE(n, 0); |
| 90 | CHECK_LE(static_cast<size_t>(n), queue_size()); |
| 91 | queue_.erase(queue_.begin(), queue_.begin() + n); |
| 92 | } |
| 93 | |
| 94 | size_t SnappyEncoder::queued_bytes() const { |
| 95 | size_t bytes = 0; |
| 96 | for (const auto &buffer : queue_) { |
| 97 | bytes += buffer.size(); |
| 98 | } |
| 99 | return bytes; |
| 100 | } |
| 101 | |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 102 | absl::Span<const absl::Span<const uint8_t>> SnappyEncoder::queue() { |
| 103 | return_queue_.clear(); |
| 104 | return_queue_.reserve(queue_.size()); |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 105 | for (const auto &buffer : queue_) { |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 106 | return_queue_.emplace_back(buffer.data(), buffer.size()); |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 107 | } |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 108 | return return_queue_; |
| 109 | } |
| 110 | |
| 111 | SnappyEncoder::DetachedBufferSource::DetachedBufferSource(size_t buffer_size) { |
| 112 | data_.reserve(buffer_size); |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 113 | } |
| 114 | |
| 115 | size_t SnappyEncoder::DetachedBufferSource::Available() const { |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 116 | return data_.size() - index_into_first_buffer_; |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 117 | } |
| 118 | |
| 119 | const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) { |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 120 | *CHECK_NOTNULL(length) = data_.size() - index_into_first_buffer_; |
| 121 | return reinterpret_cast<char *>(data_.data()) + index_into_first_buffer_; |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 122 | } |
| 123 | |
| 124 | void SnappyEncoder::DetachedBufferSource::Skip(size_t n) { |
| 125 | if (n == 0) { |
| 126 | return; |
| 127 | } |
| 128 | |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 129 | CHECK_NE(data_.size(), 0u); |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 130 | |
| 131 | index_into_first_buffer_ += n; |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 132 | CHECK_LE(index_into_first_buffer_, data_.size()) |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 133 | << ": " << n << " is too large a skip."; |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 134 | if (index_into_first_buffer_ == data_.size()) { |
| 135 | data_.resize(0u); |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 136 | index_into_first_buffer_ = 0; |
| 137 | } |
| 138 | } |
| 139 | |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 140 | void SnappyEncoder::DetachedBufferSource::Append(Copier *copy) { |
| 141 | const size_t copy_size = copy->size(); |
| 142 | CHECK_LE(copy_size + data_.size(), data_.capacity()); |
| 143 | size_t starting_size = data_.size(); |
| 144 | data_.resize(starting_size + copy_size); |
Austin Schuh | 71a40d4 | 2023-02-04 21:22:22 -0800 | [diff] [blame^] | 145 | CHECK_EQ(copy->Copy(data_.data() + starting_size, 0, copy_size), copy_size); |
Austin Schuh | 48d10d6 | 2022-10-16 22:19:23 -0700 | [diff] [blame] | 146 | accumulated_checksum_ = AccumulateCrc32( |
| 147 | {data_.data() + starting_size, copy_size}, accumulated_checksum_); |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 148 | } |
| 149 | |
| 150 | size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) { |
| 151 | // Decoding process: |
| 152 | // We maintain an uncompressed_buffer_ of data that may have been uncompressed |
| 153 | // in the past but which hasn't been read yet. |
| 154 | // |
| 155 | // When we run out of that buffer and need to actually read new data, we |
| 156 | // need to read things from the file in one-chunk units. Snappy expects to |
| 157 | // uncompress an entire chunk at once, so there isn't any value in attempting |
| 158 | // to be more incremental. Each chunk will be of some variable size (specified |
| 159 | // at the start of the chunk). Once we read and uncompress the chunk, we can |
| 160 | // then copy it into the user's buffer until either we run out of data and |
| 161 | // need to uncompress a new chunk or user's buffer runs out in which case |
| 162 | // we fill up their buffer and store the extra data in uncompressed_buffer_ |
| 163 | // to be populated later. |
| 164 | |
| 165 | uint8_t *current_output = begin; |
| 166 | if (uncompressed_buffer_.size() > 0) { |
| 167 | const size_t amount_to_copy = |
| 168 | std::min<size_t>(uncompressed_buffer_.size(), end - current_output); |
| 169 | std::memcpy(current_output, uncompressed_buffer_.data(), amount_to_copy); |
| 170 | current_output += amount_to_copy; |
| 171 | uncompressed_buffer_.erase_front(amount_to_copy); |
| 172 | } |
| 173 | |
| 174 | while (current_output != end) { |
| 175 | uint8_t header[4]; |
| 176 | const size_t read_length = underlying_decoder_->Read(header, header + 4); |
| 177 | if (read_length == 0) { |
| 178 | break; |
| 179 | } |
| 180 | if (read_length != 4u) { |
| 181 | LOG(WARNING) << "Logfile data is truncated."; |
| 182 | break; |
| 183 | } |
| 184 | const uint8_t chunk_type = header[0]; |
| 185 | const size_t chunk_size = header[1] + (header[2] << 8) + (header[3] << 16); |
| 186 | compressed_buffer_.resize(chunk_size); |
| 187 | if (chunk_size != |
| 188 | underlying_decoder_->Read(compressed_buffer_.data(), |
| 189 | compressed_buffer_.data() + chunk_size)) { |
| 190 | LOG(WARNING) << "Logfile data is truncated."; |
| 191 | break; |
| 192 | } |
| 193 | if (chunk_type == 0xFF) { |
| 194 | // Start of file, don't bother |
| 195 | // checking the contents. |
| 196 | continue; |
| 197 | } else if (chunk_type == 0x00) { |
James Kuszmaul | d54c77c | 2021-11-11 08:11:52 -0800 | [diff] [blame] | 198 | if (compressed_buffer_.size() < 4u) { |
| 199 | LOG(WARNING) << "Logfile data is truncated."; |
| 200 | break; |
| 201 | } |
James Kuszmaul | ef0e0cc | 2021-10-28 23:00:04 -0700 | [diff] [blame] | 202 | const uint32_t checksum = compressed_buffer_.data()[0] + |
| 203 | (compressed_buffer_.data()[1] << 8) + |
| 204 | (compressed_buffer_.data()[2] << 16) + |
| 205 | (compressed_buffer_.data()[3] << 24); |
| 206 | |
| 207 | const char *input_data = |
| 208 | reinterpret_cast<char *>(compressed_buffer_.data() + 4); |
| 209 | const size_t input_size = compressed_buffer_.size() - 4; |
| 210 | |
| 211 | size_t uncompressed_length; |
| 212 | CHECK(snappy::GetUncompressedLength(input_data, input_size, |
| 213 | &uncompressed_length)); |
| 214 | |
| 215 | // If the user's buffer can fit the entire uncompressed data, we |
| 216 | // will uncompress directly into their buffer. Otherwise, we uncompress |
| 217 | // into a separate buffer and then copy the correct number of bytes out |
| 218 | // to fill up the user's buffer. |
| 219 | // TODO(james): In the future, overload a snappy::Sink to avoid |
| 220 | // unnecessary copies by extracting the initial N bytes directly into |
| 221 | // the user's buffer. |
| 222 | if (end > (uncompressed_length + current_output)) { |
| 223 | CHECK(snappy::RawUncompress(input_data, input_size, |
| 224 | reinterpret_cast<char *>(current_output))) |
| 225 | << ": Corrupted snappy chunk."; |
| 226 | CHECK_EQ(checksum, SnappyChecksum(current_output, uncompressed_length)) |
| 227 | << ": Checksum mismatch."; |
| 228 | |
| 229 | current_output += uncompressed_length; |
| 230 | } else { |
| 231 | uncompressed_buffer_.resize(uncompressed_length); |
| 232 | CHECK(snappy::RawUncompress( |
| 233 | input_data, input_size, |
| 234 | reinterpret_cast<char *>(uncompressed_buffer_.data()))) |
| 235 | << ": Corrupted snappy chunk."; |
| 236 | CHECK_EQ(checksum, SnappyChecksum(uncompressed_buffer_.data(), |
| 237 | uncompressed_buffer_.size())) |
| 238 | << ": Checksum mismatch."; |
| 239 | std::memcpy(current_output, uncompressed_buffer_.data(), |
| 240 | end - current_output); |
| 241 | uncompressed_buffer_.erase_front(end - current_output); |
| 242 | current_output = end; |
| 243 | } |
| 244 | } else { |
| 245 | // Unimplemented. |
| 246 | LOG(FATAL) << "Unsupported snappy chunk type " |
| 247 | << static_cast<int>(chunk_type); |
| 248 | } |
| 249 | } |
| 250 | total_output_ += current_output - begin; |
| 251 | |
| 252 | return current_output - begin; |
| 253 | } |
| 254 | |
| 255 | } // namespace aos::logger |