blob: 0e96092a3b0bae78e7504145848f19b985a222e9 [file] [log] [blame]
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07001#include "aos/events/logging/snappy_encoder.h"
2
Ravago Jonese12b7902022-02-04 22:50:44 -08003#include "aos/util/crc32.h"
Austin Schuhd5bd91a2022-09-16 15:11:54 -07004#include "snappy.h"
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07005
6namespace aos::logger {
7// Snappy file format is a series of chunks. Each chunk consists of:
8// 1-byte: Format identifier.
9// 3-bytes: Little-endian chunk length, not counting the four bytes of format
10// + length.
11//
12// The relevant chunk identifiers are 0xff (header) and 0x00 (compressed data).
13// Compressed data's first four bytes are a CRC-32C that is masked by some
14// magic function. Technically the size of the uncompressed data per chunk
15// is supposed to be no more than 65536 bytes, but I am too lazy to enforce
16// that at the moment.
17// 0x01 is the type for uncompressed data, which has not been implemented here.
18namespace {
19const std::vector<uint8_t> kSnappyIdentifierChunk = {
20 0xFF, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'};
21
22// Magic mask that snappy's framing format requires for some reason.
23uint32_t MaskChecksum(uint32_t x) {
24 return ((x >> 15) | (x << 17)) + 0xa282ead8;
25}
26
27uint32_t SnappyChecksum(uint8_t *data, size_t size) {
28 return MaskChecksum(ComputeCrc32({data, size}));
29}
30} // namespace
31
Austin Schuh48d10d62022-10-16 22:19:23 -070032SnappyEncoder::SnappyEncoder(size_t max_message_size, size_t chunk_size)
33 : chunk_size_(chunk_size),
34 // Make sure we have enough space to always add our largest message to the
35 // chunks we will flush.
36 buffer_source_(max_message_size + chunk_size - 1) {
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070037 queue_.emplace_back();
38 queue_.back().resize(kSnappyIdentifierChunk.size());
39 std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(),
40 queue_.back().begin());
41 total_bytes_ += queue_.back().size();
42}
43
44void SnappyEncoder::Finish() { EncodeCurrentBuffer(); }
45
Austin Schuh8bdfc492023-02-11 12:53:13 -080046size_t SnappyEncoder::Encode(Copier *copy, size_t start_byte) {
47 CHECK_EQ(start_byte, 0u);
Austin Schuh48d10d62022-10-16 22:19:23 -070048 buffer_source_.Append(copy);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070049
50 if (buffer_source_.Available() >= chunk_size_) {
51 EncodeCurrentBuffer();
52 }
Austin Schuh8bdfc492023-02-11 12:53:13 -080053
54 return copy->size();
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070055}
56
57void SnappyEncoder::EncodeCurrentBuffer() {
58 if (buffer_source_.Available() == 0) {
59 return;
60 }
61 const uint32_t uncompressed_checksum =
Austin Schuh48d10d62022-10-16 22:19:23 -070062 MaskChecksum(buffer_source_.accumulated_checksum());
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070063 queue_.emplace_back();
64 constexpr size_t kPrefixSize = 8;
65 queue_.back().resize(kPrefixSize +
66 snappy::MaxCompressedLength(buffer_source_.Available()));
67 queue_.back().data()[0] = '\x00';
68 char *const compressed_start =
69 reinterpret_cast<char *>(queue_.back().data()) + kPrefixSize;
70 snappy::UncheckedByteArraySink snappy_sink(compressed_start);
71 const size_t compressed_size =
72 snappy::Compress(&buffer_source_, &snappy_sink);
73 CHECK_LT(compressed_size + kPrefixSize, queue_.back().size());
74 queue_.back().resize(compressed_size + kPrefixSize);
75 const size_t chunk_size = compressed_size + 4;
76 CHECK_LT(chunk_size, 1U << 24);
77 queue_.back().data()[1] = chunk_size & 0xFF;
78 queue_.back().data()[2] = (chunk_size >> 8) & 0xFF;
79 queue_.back().data()[3] = (chunk_size >> 16) & 0xFF;
80 queue_.back().data()[4] = uncompressed_checksum & 0xFF;
81 queue_.back().data()[5] = (uncompressed_checksum >> 8) & 0xFF;
82 queue_.back().data()[6] = (uncompressed_checksum >> 16) & 0xFF;
83 queue_.back().data()[7] = (uncompressed_checksum >> 24) & 0xFF;
84
85 total_bytes_ += queue_.back().size();
86
Austin Schuh48d10d62022-10-16 22:19:23 -070087 buffer_source_.ResetAccumulatedChecksum();
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070088 CHECK_EQ(0u, buffer_source_.Available());
89}
90
91void SnappyEncoder::Clear(int n) {
92 CHECK_GE(n, 0);
93 CHECK_LE(static_cast<size_t>(n), queue_size());
94 queue_.erase(queue_.begin(), queue_.begin() + n);
95}
96
97size_t SnappyEncoder::queued_bytes() const {
98 size_t bytes = 0;
99 for (const auto &buffer : queue_) {
100 bytes += buffer.size();
101 }
102 return bytes;
103}
104
Austin Schuh48d10d62022-10-16 22:19:23 -0700105absl::Span<const absl::Span<const uint8_t>> SnappyEncoder::queue() {
106 return_queue_.clear();
107 return_queue_.reserve(queue_.size());
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700108 for (const auto &buffer : queue_) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700109 return_queue_.emplace_back(buffer.data(), buffer.size());
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700110 }
Austin Schuh48d10d62022-10-16 22:19:23 -0700111 return return_queue_;
112}
113
114SnappyEncoder::DetachedBufferSource::DetachedBufferSource(size_t buffer_size) {
115 data_.reserve(buffer_size);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700116}
117
118size_t SnappyEncoder::DetachedBufferSource::Available() const {
Austin Schuh48d10d62022-10-16 22:19:23 -0700119 return data_.size() - index_into_first_buffer_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700120}
121
122const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700123 *CHECK_NOTNULL(length) = data_.size() - index_into_first_buffer_;
124 return reinterpret_cast<char *>(data_.data()) + index_into_first_buffer_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700125}
126
127void SnappyEncoder::DetachedBufferSource::Skip(size_t n) {
128 if (n == 0) {
129 return;
130 }
131
Austin Schuh48d10d62022-10-16 22:19:23 -0700132 CHECK_NE(data_.size(), 0u);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700133
134 index_into_first_buffer_ += n;
Austin Schuh48d10d62022-10-16 22:19:23 -0700135 CHECK_LE(index_into_first_buffer_, data_.size())
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700136 << ": " << n << " is too large a skip.";
Austin Schuh48d10d62022-10-16 22:19:23 -0700137 if (index_into_first_buffer_ == data_.size()) {
138 data_.resize(0u);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700139 index_into_first_buffer_ = 0;
140 }
141}
142
Austin Schuh48d10d62022-10-16 22:19:23 -0700143void SnappyEncoder::DetachedBufferSource::Append(Copier *copy) {
144 const size_t copy_size = copy->size();
145 CHECK_LE(copy_size + data_.size(), data_.capacity());
146 size_t starting_size = data_.size();
147 data_.resize(starting_size + copy_size);
Austin Schuh71a40d42023-02-04 21:22:22 -0800148 CHECK_EQ(copy->Copy(data_.data() + starting_size, 0, copy_size), copy_size);
Austin Schuh48d10d62022-10-16 22:19:23 -0700149 accumulated_checksum_ = AccumulateCrc32(
150 {data_.data() + starting_size, copy_size}, accumulated_checksum_);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700151}
152
153size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) {
154 // Decoding process:
155 // We maintain an uncompressed_buffer_ of data that may have been uncompressed
156 // in the past but which hasn't been read yet.
157 //
158 // When we run out of that buffer and need to actually read new data, we
159 // need to read things from the file in one-chunk units. Snappy expects to
160 // uncompress an entire chunk at once, so there isn't any value in attempting
161 // to be more incremental. Each chunk will be of some variable size (specified
162 // at the start of the chunk). Once we read and uncompress the chunk, we can
163 // then copy it into the user's buffer until either we run out of data and
164 // need to uncompress a new chunk or user's buffer runs out in which case
165 // we fill up their buffer and store the extra data in uncompressed_buffer_
166 // to be populated later.
167
168 uint8_t *current_output = begin;
169 if (uncompressed_buffer_.size() > 0) {
170 const size_t amount_to_copy =
171 std::min<size_t>(uncompressed_buffer_.size(), end - current_output);
172 std::memcpy(current_output, uncompressed_buffer_.data(), amount_to_copy);
173 current_output += amount_to_copy;
174 uncompressed_buffer_.erase_front(amount_to_copy);
175 }
176
177 while (current_output != end) {
178 uint8_t header[4];
179 const size_t read_length = underlying_decoder_->Read(header, header + 4);
180 if (read_length == 0) {
181 break;
182 }
183 if (read_length != 4u) {
184 LOG(WARNING) << "Logfile data is truncated.";
185 break;
186 }
187 const uint8_t chunk_type = header[0];
188 const size_t chunk_size = header[1] + (header[2] << 8) + (header[3] << 16);
189 compressed_buffer_.resize(chunk_size);
190 if (chunk_size !=
191 underlying_decoder_->Read(compressed_buffer_.data(),
192 compressed_buffer_.data() + chunk_size)) {
193 LOG(WARNING) << "Logfile data is truncated.";
194 break;
195 }
196 if (chunk_type == 0xFF) {
197 // Start of file, don't bother
198 // checking the contents.
199 continue;
200 } else if (chunk_type == 0x00) {
James Kuszmauld54c77c2021-11-11 08:11:52 -0800201 if (compressed_buffer_.size() < 4u) {
202 LOG(WARNING) << "Logfile data is truncated.";
203 break;
204 }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -0700205 const uint32_t checksum = compressed_buffer_.data()[0] +
206 (compressed_buffer_.data()[1] << 8) +
207 (compressed_buffer_.data()[2] << 16) +
208 (compressed_buffer_.data()[3] << 24);
209
210 const char *input_data =
211 reinterpret_cast<char *>(compressed_buffer_.data() + 4);
212 const size_t input_size = compressed_buffer_.size() - 4;
213
214 size_t uncompressed_length;
215 CHECK(snappy::GetUncompressedLength(input_data, input_size,
216 &uncompressed_length));
217
218 // If the user's buffer can fit the entire uncompressed data, we
219 // will uncompress directly into their buffer. Otherwise, we uncompress
220 // into a separate buffer and then copy the correct number of bytes out
221 // to fill up the user's buffer.
222 // TODO(james): In the future, overload a snappy::Sink to avoid
223 // unnecessary copies by extracting the initial N bytes directly into
224 // the user's buffer.
225 if (end > (uncompressed_length + current_output)) {
226 CHECK(snappy::RawUncompress(input_data, input_size,
227 reinterpret_cast<char *>(current_output)))
228 << ": Corrupted snappy chunk.";
229 CHECK_EQ(checksum, SnappyChecksum(current_output, uncompressed_length))
230 << ": Checksum mismatch.";
231
232 current_output += uncompressed_length;
233 } else {
234 uncompressed_buffer_.resize(uncompressed_length);
235 CHECK(snappy::RawUncompress(
236 input_data, input_size,
237 reinterpret_cast<char *>(uncompressed_buffer_.data())))
238 << ": Corrupted snappy chunk.";
239 CHECK_EQ(checksum, SnappyChecksum(uncompressed_buffer_.data(),
240 uncompressed_buffer_.size()))
241 << ": Checksum mismatch.";
242 std::memcpy(current_output, uncompressed_buffer_.data(),
243 end - current_output);
244 uncompressed_buffer_.erase_front(end - current_output);
245 current_output = end;
246 }
247 } else {
248 // Unimplemented.
249 LOG(FATAL) << "Unsupported snappy chunk type "
250 << static_cast<int>(chunk_type);
251 }
252 }
253 total_output_ += current_output - begin;
254
255 return current_output - begin;
256}
257
258} // namespace aos::logger