blob: d57b3dd310b183287483281cba3f38ef955d0675 [file] [log] [blame]
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07001#include "aos/events/logging/snappy_encoder.h"
2
3#include "aos/events/logging/crc32.h"
4#include "external/snappy/snappy.h"
5
6namespace aos::logger {
7// Snappy file format is a series of chunks. Each chunk consists of:
8// 1-byte: Format identifier.
9// 3-bytes: Little-endian chunk length, not counting the four bytes of format
10// + length.
11//
12// The relevant chunk identifiers are 0xff (header) and 0x00 (compressed data).
13// Compressed data's first four bytes are a CRC-32C that is masked by some
14// magic function. Technically the size of the uncompressed data per chunk
15// is supposed to be no more than 65536 bytes, but I am too lazy to enforce
16// that at the moment.
17// 0x01 is the type for uncompressed data, which has not been implemented here.
18namespace {
19const std::vector<uint8_t> kSnappyIdentifierChunk = {
20 0xFF, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'};
21
22// Magic mask that snappy's framing format requires for some reason.
23uint32_t MaskChecksum(uint32_t x) {
24 return ((x >> 15) | (x << 17)) + 0xa282ead8;
25}
26
27uint32_t SnappyChecksum(uint8_t *data, size_t size) {
28 return MaskChecksum(ComputeCrc32({data, size}));
29}
30} // namespace
31
32SnappyEncoder::SnappyEncoder(size_t chunk_size) : chunk_size_(chunk_size) {
33 queue_.emplace_back();
34 queue_.back().resize(kSnappyIdentifierChunk.size());
35 std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(),
36 queue_.back().begin());
37 total_bytes_ += queue_.back().size();
38}
39
40void SnappyEncoder::Finish() { EncodeCurrentBuffer(); }
41
42void SnappyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
43 accumulated_checksum_ =
44 AccumulateCrc32({in.data(), in.size()}, accumulated_checksum_);
45 buffer_source_.AppendBuffer(std::move(in));
46
47 if (buffer_source_.Available() >= chunk_size_) {
48 EncodeCurrentBuffer();
49 }
50}
51
52void SnappyEncoder::EncodeCurrentBuffer() {
53 if (buffer_source_.Available() == 0) {
54 return;
55 }
56 const uint32_t uncompressed_checksum =
57 MaskChecksum(accumulated_checksum_.value());
58 queue_.emplace_back();
59 constexpr size_t kPrefixSize = 8;
60 queue_.back().resize(kPrefixSize +
61 snappy::MaxCompressedLength(buffer_source_.Available()));
62 queue_.back().data()[0] = '\x00';
63 char *const compressed_start =
64 reinterpret_cast<char *>(queue_.back().data()) + kPrefixSize;
65 snappy::UncheckedByteArraySink snappy_sink(compressed_start);
66 const size_t compressed_size =
67 snappy::Compress(&buffer_source_, &snappy_sink);
68 CHECK_LT(compressed_size + kPrefixSize, queue_.back().size());
69 queue_.back().resize(compressed_size + kPrefixSize);
70 const size_t chunk_size = compressed_size + 4;
71 CHECK_LT(chunk_size, 1U << 24);
72 queue_.back().data()[1] = chunk_size & 0xFF;
73 queue_.back().data()[2] = (chunk_size >> 8) & 0xFF;
74 queue_.back().data()[3] = (chunk_size >> 16) & 0xFF;
75 queue_.back().data()[4] = uncompressed_checksum & 0xFF;
76 queue_.back().data()[5] = (uncompressed_checksum >> 8) & 0xFF;
77 queue_.back().data()[6] = (uncompressed_checksum >> 16) & 0xFF;
78 queue_.back().data()[7] = (uncompressed_checksum >> 24) & 0xFF;
79
80 total_bytes_ += queue_.back().size();
81
82 accumulated_checksum_.reset();
83 CHECK_EQ(0u, buffer_source_.Available());
84}
85
86void SnappyEncoder::Clear(int n) {
87 CHECK_GE(n, 0);
88 CHECK_LE(static_cast<size_t>(n), queue_size());
89 queue_.erase(queue_.begin(), queue_.begin() + n);
90}
91
92size_t SnappyEncoder::queued_bytes() const {
93 size_t bytes = 0;
94 for (const auto &buffer : queue_) {
95 bytes += buffer.size();
96 }
97 return bytes;
98}
99
100std::vector<absl::Span<const uint8_t>> SnappyEncoder::queue() const {
101 std::vector<absl::Span<const uint8_t>> queue;
102 queue.reserve(queue_.size());
103 for (const auto &buffer : queue_) {
104 queue.emplace_back(buffer.data(), buffer.size());
105 }
106 return queue;
107}
108
109size_t SnappyEncoder::DetachedBufferSource::Available() const {
110 size_t total_available = 0;
111 for (const flatbuffers::DetachedBuffer &buffer : buffers_) {
112 total_available += buffer.size();
113 }
114 return total_available;
115}
116
117const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) {
118 *CHECK_NOTNULL(length) = buffers_[0].size() - index_into_first_buffer_;
119 return reinterpret_cast<char *>(buffers_[0].data()) +
120 index_into_first_buffer_;
121}
122
123void SnappyEncoder::DetachedBufferSource::Skip(size_t n) {
124 if (n == 0) {
125 return;
126 }
127
128 CHECK(!buffers_.empty());
129
130 index_into_first_buffer_ += n;
131 CHECK_LE(index_into_first_buffer_, buffers_[0].size())
132 << ": " << n << " is too large a skip.";
133 if (index_into_first_buffer_ == buffers_[0].size()) {
134 buffers_.erase(buffers_.begin());
135 index_into_first_buffer_ = 0;
136 }
137}
138
139void SnappyEncoder::DetachedBufferSource::AppendBuffer(
140 flatbuffers::DetachedBuffer &&buffer) {
141 buffers_.emplace_back(std::move(buffer));
142}
143
144size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) {
145 // Decoding process:
146 // We maintain an uncompressed_buffer_ of data that may have been uncompressed
147 // in the past but which hasn't been read yet.
148 //
149 // When we run out of that buffer and need to actually read new data, we
150 // need to read things from the file in one-chunk units. Snappy expects to
151 // uncompress an entire chunk at once, so there isn't any value in attempting
152 // to be more incremental. Each chunk will be of some variable size (specified
153 // at the start of the chunk). Once we read and uncompress the chunk, we can
154 // then copy it into the user's buffer until either we run out of data and
155 // need to uncompress a new chunk or user's buffer runs out in which case
156 // we fill up their buffer and store the extra data in uncompressed_buffer_
157 // to be populated later.
158
159 uint8_t *current_output = begin;
160 if (uncompressed_buffer_.size() > 0) {
161 const size_t amount_to_copy =
162 std::min<size_t>(uncompressed_buffer_.size(), end - current_output);
163 std::memcpy(current_output, uncompressed_buffer_.data(), amount_to_copy);
164 current_output += amount_to_copy;
165 uncompressed_buffer_.erase_front(amount_to_copy);
166 }
167
168 while (current_output != end) {
169 uint8_t header[4];
170 const size_t read_length = underlying_decoder_->Read(header, header + 4);
171 if (read_length == 0) {
172 break;
173 }
174 if (read_length != 4u) {
175 LOG(WARNING) << "Logfile data is truncated.";
176 break;
177 }
178 const uint8_t chunk_type = header[0];
179 const size_t chunk_size = header[1] + (header[2] << 8) + (header[3] << 16);
180 compressed_buffer_.resize(chunk_size);
181 if (chunk_size !=
182 underlying_decoder_->Read(compressed_buffer_.data(),
183 compressed_buffer_.data() + chunk_size)) {
184 LOG(WARNING) << "Logfile data is truncated.";
185 break;
186 }
187 if (chunk_type == 0xFF) {
188 // Start of file, don't bother
189 // checking the contents.
190 continue;
191 } else if (chunk_type == 0x00) {
192 // Note: This section uses CHECKs rather than terminating more silently
193 // because any of these CHECK's failing implies outright data corruption
194 // rather than mere truncation.
195 CHECK_LT(4u, compressed_buffer_.size()) << ": Missing checksum.";
196 const uint32_t checksum = compressed_buffer_.data()[0] +
197 (compressed_buffer_.data()[1] << 8) +
198 (compressed_buffer_.data()[2] << 16) +
199 (compressed_buffer_.data()[3] << 24);
200
201 const char *input_data =
202 reinterpret_cast<char *>(compressed_buffer_.data() + 4);
203 const size_t input_size = compressed_buffer_.size() - 4;
204
205 size_t uncompressed_length;
206 CHECK(snappy::GetUncompressedLength(input_data, input_size,
207 &uncompressed_length));
208
209 // If the user's buffer can fit the entire uncompressed data, we
210 // will uncompress directly into their buffer. Otherwise, we uncompress
211 // into a separate buffer and then copy the correct number of bytes out
212 // to fill up the user's buffer.
213 // TODO(james): In the future, overload a snappy::Sink to avoid
214 // unnecessary copies by extracting the initial N bytes directly into
215 // the user's buffer.
216 if (end > (uncompressed_length + current_output)) {
217 CHECK(snappy::RawUncompress(input_data, input_size,
218 reinterpret_cast<char *>(current_output)))
219 << ": Corrupted snappy chunk.";
220 CHECK_EQ(checksum, SnappyChecksum(current_output, uncompressed_length))
221 << ": Checksum mismatch.";
222
223 current_output += uncompressed_length;
224 } else {
225 uncompressed_buffer_.resize(uncompressed_length);
226 CHECK(snappy::RawUncompress(
227 input_data, input_size,
228 reinterpret_cast<char *>(uncompressed_buffer_.data())))
229 << ": Corrupted snappy chunk.";
230 CHECK_EQ(checksum, SnappyChecksum(uncompressed_buffer_.data(),
231 uncompressed_buffer_.size()))
232 << ": Checksum mismatch.";
233 std::memcpy(current_output, uncompressed_buffer_.data(),
234 end - current_output);
235 uncompressed_buffer_.erase_front(end - current_output);
236 current_output = end;
237 }
238 } else {
239 // Unimplemented.
240 LOG(FATAL) << "Unsupported snappy chunk type "
241 << static_cast<int>(chunk_type);
242 }
243 }
244 total_output_ += current_output - begin;
245
246 return current_output - begin;
247}
248
249} // namespace aos::logger