blob: 2ef8363a10efd2a746fe37b7e3bb668fdcacb4e8 [file] [log] [blame]
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07001#include "aos/events/logging/snappy_encoder.h"
2
Ravago Jonese12b7902022-02-04 22:50:44 -08003#include "aos/util/crc32.h"
Austin Schuhd5bd91a2022-09-16 15:11:54 -07004#include "snappy.h"
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07005
6namespace aos::logger {
7// Snappy file format is a series of chunks. Each chunk consists of:
8// 1-byte: Format identifier.
9// 3-bytes: Little-endian chunk length, not counting the four bytes of format
10// + length.
11//
12// The relevant chunk identifiers are 0xff (header) and 0x00 (compressed data).
13// Compressed data's first four bytes are a CRC-32C that is masked by some
14// magic function. Technically the size of the uncompressed data per chunk
15// is supposed to be no more than 65536 bytes, but I am too lazy to enforce
16// that at the moment.
17// 0x01 is the type for uncompressed data, which has not been implemented here.
18namespace {
19const std::vector<uint8_t> kSnappyIdentifierChunk = {
20 0xFF, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'};
21
22// Magic mask that snappy's framing format requires for some reason.
23uint32_t MaskChecksum(uint32_t x) {
24 return ((x >> 15) | (x << 17)) + 0xa282ead8;
25}
26
27uint32_t SnappyChecksum(uint8_t *data, size_t size) {
28 return MaskChecksum(ComputeCrc32({data, size}));
29}
30} // namespace
31
Austin Schuh48d10d62022-10-16 22:19:23 -070032SnappyEncoder::SnappyEncoder(size_t max_message_size, size_t chunk_size)
33 : chunk_size_(chunk_size),
34 // Make sure we have enough space to always add our largest message to the
35 // chunks we will flush.
36 buffer_source_(max_message_size + chunk_size - 1) {
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070037 queue_.emplace_back();
38 queue_.back().resize(kSnappyIdentifierChunk.size());
39 std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(),
40 queue_.back().begin());
41 total_bytes_ += queue_.back().size();
42}
43
44void SnappyEncoder::Finish() { EncodeCurrentBuffer(); }
45
Austin Schuh48d10d62022-10-16 22:19:23 -070046void SnappyEncoder::Encode(Copier *copy) {
47 buffer_source_.Append(copy);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070048
49 if (buffer_source_.Available() >= chunk_size_) {
50 EncodeCurrentBuffer();
51 }
52}
53
54void SnappyEncoder::EncodeCurrentBuffer() {
55 if (buffer_source_.Available() == 0) {
56 return;
57 }
58 const uint32_t uncompressed_checksum =
Austin Schuh48d10d62022-10-16 22:19:23 -070059 MaskChecksum(buffer_source_.accumulated_checksum());
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070060 queue_.emplace_back();
61 constexpr size_t kPrefixSize = 8;
62 queue_.back().resize(kPrefixSize +
63 snappy::MaxCompressedLength(buffer_source_.Available()));
64 queue_.back().data()[0] = '\x00';
65 char *const compressed_start =
66 reinterpret_cast<char *>(queue_.back().data()) + kPrefixSize;
67 snappy::UncheckedByteArraySink snappy_sink(compressed_start);
68 const size_t compressed_size =
69 snappy::Compress(&buffer_source_, &snappy_sink);
70 CHECK_LT(compressed_size + kPrefixSize, queue_.back().size());
71 queue_.back().resize(compressed_size + kPrefixSize);
72 const size_t chunk_size = compressed_size + 4;
73 CHECK_LT(chunk_size, 1U << 24);
74 queue_.back().data()[1] = chunk_size & 0xFF;
75 queue_.back().data()[2] = (chunk_size >> 8) & 0xFF;
76 queue_.back().data()[3] = (chunk_size >> 16) & 0xFF;
77 queue_.back().data()[4] = uncompressed_checksum & 0xFF;
78 queue_.back().data()[5] = (uncompressed_checksum >> 8) & 0xFF;
79 queue_.back().data()[6] = (uncompressed_checksum >> 16) & 0xFF;
80 queue_.back().data()[7] = (uncompressed_checksum >> 24) & 0xFF;
81
82 total_bytes_ += queue_.back().size();
83
Austin Schuh48d10d62022-10-16 22:19:23 -070084 buffer_source_.ResetAccumulatedChecksum();
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070085 CHECK_EQ(0u, buffer_source_.Available());
86}
87
88void SnappyEncoder::Clear(int n) {
89 CHECK_GE(n, 0);
90 CHECK_LE(static_cast<size_t>(n), queue_size());
91 queue_.erase(queue_.begin(), queue_.begin() + n);
92}
93
94size_t SnappyEncoder::queued_bytes() const {
95 size_t bytes = 0;
96 for (const auto &buffer : queue_) {
97 bytes += buffer.size();
98 }
99 return bytes;
100}
101
Austin Schuh48d10d62022-10-16 22:19:23 -0700102absl::Span<const absl::Span<const uint8_t>> SnappyEncoder::queue() {
103 return_queue_.clear();
104 return_queue_.reserve(queue_.size());
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700105 for (const auto &buffer : queue_) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700106 return_queue_.emplace_back(buffer.data(), buffer.size());
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700107 }
Austin Schuh48d10d62022-10-16 22:19:23 -0700108 return return_queue_;
109}
110
111SnappyEncoder::DetachedBufferSource::DetachedBufferSource(size_t buffer_size) {
112 data_.reserve(buffer_size);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700113}
114
115size_t SnappyEncoder::DetachedBufferSource::Available() const {
Austin Schuh48d10d62022-10-16 22:19:23 -0700116 return data_.size() - index_into_first_buffer_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700117}
118
119const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700120 *CHECK_NOTNULL(length) = data_.size() - index_into_first_buffer_;
121 return reinterpret_cast<char *>(data_.data()) + index_into_first_buffer_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700122}
123
124void SnappyEncoder::DetachedBufferSource::Skip(size_t n) {
125 if (n == 0) {
126 return;
127 }
128
Austin Schuh48d10d62022-10-16 22:19:23 -0700129 CHECK_NE(data_.size(), 0u);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700130
131 index_into_first_buffer_ += n;
Austin Schuh48d10d62022-10-16 22:19:23 -0700132 CHECK_LE(index_into_first_buffer_, data_.size())
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700133 << ": " << n << " is too large a skip.";
Austin Schuh48d10d62022-10-16 22:19:23 -0700134 if (index_into_first_buffer_ == data_.size()) {
135 data_.resize(0u);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700136 index_into_first_buffer_ = 0;
137 }
138}
139
Austin Schuh48d10d62022-10-16 22:19:23 -0700140void SnappyEncoder::DetachedBufferSource::Append(Copier *copy) {
141 const size_t copy_size = copy->size();
142 CHECK_LE(copy_size + data_.size(), data_.capacity());
143 size_t starting_size = data_.size();
144 data_.resize(starting_size + copy_size);
145 CHECK_EQ(copy->Copy(data_.data() + starting_size), copy_size);
146 accumulated_checksum_ = AccumulateCrc32(
147 {data_.data() + starting_size, copy_size}, accumulated_checksum_);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700148}
149
150size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) {
151 // Decoding process:
152 // We maintain an uncompressed_buffer_ of data that may have been uncompressed
153 // in the past but which hasn't been read yet.
154 //
155 // When we run out of that buffer and need to actually read new data, we
156 // need to read things from the file in one-chunk units. Snappy expects to
157 // uncompress an entire chunk at once, so there isn't any value in attempting
158 // to be more incremental. Each chunk will be of some variable size (specified
159 // at the start of the chunk). Once we read and uncompress the chunk, we can
160 // then copy it into the user's buffer until either we run out of data and
161 // need to uncompress a new chunk or user's buffer runs out in which case
162 // we fill up their buffer and store the extra data in uncompressed_buffer_
163 // to be populated later.
164
165 uint8_t *current_output = begin;
166 if (uncompressed_buffer_.size() > 0) {
167 const size_t amount_to_copy =
168 std::min<size_t>(uncompressed_buffer_.size(), end - current_output);
169 std::memcpy(current_output, uncompressed_buffer_.data(), amount_to_copy);
170 current_output += amount_to_copy;
171 uncompressed_buffer_.erase_front(amount_to_copy);
172 }
173
174 while (current_output != end) {
175 uint8_t header[4];
176 const size_t read_length = underlying_decoder_->Read(header, header + 4);
177 if (read_length == 0) {
178 break;
179 }
180 if (read_length != 4u) {
181 LOG(WARNING) << "Logfile data is truncated.";
182 break;
183 }
184 const uint8_t chunk_type = header[0];
185 const size_t chunk_size = header[1] + (header[2] << 8) + (header[3] << 16);
186 compressed_buffer_.resize(chunk_size);
187 if (chunk_size !=
188 underlying_decoder_->Read(compressed_buffer_.data(),
189 compressed_buffer_.data() + chunk_size)) {
190 LOG(WARNING) << "Logfile data is truncated.";
191 break;
192 }
193 if (chunk_type == 0xFF) {
194 // Start of file, don't bother
195 // checking the contents.
196 continue;
197 } else if (chunk_type == 0x00) {
James Kuszmauld54c77c2021-11-11 08:11:52 -0800198 if (compressed_buffer_.size() < 4u) {
199 LOG(WARNING) << "Logfile data is truncated.";
200 break;
201 }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700202 const uint32_t checksum = compressed_buffer_.data()[0] +
203 (compressed_buffer_.data()[1] << 8) +
204 (compressed_buffer_.data()[2] << 16) +
205 (compressed_buffer_.data()[3] << 24);
206
207 const char *input_data =
208 reinterpret_cast<char *>(compressed_buffer_.data() + 4);
209 const size_t input_size = compressed_buffer_.size() - 4;
210
211 size_t uncompressed_length;
212 CHECK(snappy::GetUncompressedLength(input_data, input_size,
213 &uncompressed_length));
214
215 // If the user's buffer can fit the entire uncompressed data, we
216 // will uncompress directly into their buffer. Otherwise, we uncompress
217 // into a separate buffer and then copy the correct number of bytes out
218 // to fill up the user's buffer.
219 // TODO(james): In the future, overload a snappy::Sink to avoid
220 // unnecessary copies by extracting the initial N bytes directly into
221 // the user's buffer.
222 if (end > (uncompressed_length + current_output)) {
223 CHECK(snappy::RawUncompress(input_data, input_size,
224 reinterpret_cast<char *>(current_output)))
225 << ": Corrupted snappy chunk.";
226 CHECK_EQ(checksum, SnappyChecksum(current_output, uncompressed_length))
227 << ": Checksum mismatch.";
228
229 current_output += uncompressed_length;
230 } else {
231 uncompressed_buffer_.resize(uncompressed_length);
232 CHECK(snappy::RawUncompress(
233 input_data, input_size,
234 reinterpret_cast<char *>(uncompressed_buffer_.data())))
235 << ": Corrupted snappy chunk.";
236 CHECK_EQ(checksum, SnappyChecksum(uncompressed_buffer_.data(),
237 uncompressed_buffer_.size()))
238 << ": Checksum mismatch.";
239 std::memcpy(current_output, uncompressed_buffer_.data(),
240 end - current_output);
241 uncompressed_buffer_.erase_front(end - current_output);
242 current_output = end;
243 }
244 } else {
245 // Unimplemented.
246 LOG(FATAL) << "Unsupported snappy chunk type "
247 << static_cast<int>(chunk_type);
248 }
249 }
250 total_output_ += current_output - begin;
251
252 return current_output - begin;
253}
254
255} // namespace aos::logger