blob: 27d01ab05dd4cdc04dc13f4a36fdf07e2d4b6509 [file] [log] [blame]
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001#include "aos/events/logging/lzma_encoder.h"
2
3#include "glog/logging.h"
4
Austin Schuhbe91b342022-06-27 00:53:45 -07005DEFINE_int32(lzma_threads, 1, "Number of threads to use for encoding");
6
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07007namespace aos::logger {
8namespace {
9
Austin Schuh3bd4c402020-11-06 18:19:06 -080010// Returns true if `status` is not an error code, false if it is recoverable, or
11// otherwise logs the appropriate error message and crashes.
Austin Schuhed292dc2020-12-22 22:32:59 -080012bool LzmaCodeIsOk(lzma_ret status, std::string_view filename = "") {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070013 switch (status) {
14 case LZMA_OK:
15 case LZMA_STREAM_END:
Austin Schuh3bd4c402020-11-06 18:19:06 -080016 return true;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070017 case LZMA_MEM_ERROR:
18 LOG(FATAL) << "Memory allocation failed:" << status;
19 case LZMA_OPTIONS_ERROR:
20 LOG(FATAL) << "The given compression preset or decompression options are "
21 "not supported: "
22 << status;
23 case LZMA_UNSUPPORTED_CHECK:
24 LOG(FATAL) << "The given check type is not supported: " << status;
25 case LZMA_PROG_ERROR:
26 LOG(FATAL) << "One or more of the parameters have values that will never "
27 "be valid: "
28 << status;
29 case LZMA_MEMLIMIT_ERROR:
30 LOG(FATAL) << "Decoder needs more memory than allowed by the specified "
31 "memory usage limit: "
32 << status;
33 case LZMA_FORMAT_ERROR:
Austin Schuhed292dc2020-12-22 22:32:59 -080034 if (filename.empty()) {
35 LOG(FATAL) << "File format not recognized: " << status;
36 } else {
37 LOG(FATAL) << "File format of " << filename
38 << " not recognized: " << status;
39 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070040 case LZMA_DATA_ERROR:
Brian Silverman517431e2021-11-10 12:48:58 -080041 VLOG(1) << "Compressed file is corrupt: " << status;
Austin Schuh3bd4c402020-11-06 18:19:06 -080042 return false;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070043 case LZMA_BUF_ERROR:
Brian Silverman517431e2021-11-10 12:48:58 -080044 VLOG(1) << "Compressed file is truncated or corrupt: " << status;
Austin Schuh3bd4c402020-11-06 18:19:06 -080045 return false;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070046 default:
47 LOG(FATAL) << "Unexpected return value: " << status;
48 }
49}
50
51} // namespace
52
Austin Schuh48d10d62022-10-16 22:19:23 -070053LzmaEncoder::LzmaEncoder(size_t max_message_size,
54 const uint32_t compression_preset, size_t block_size)
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070055 : stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
56 CHECK_GE(compression_preset_, 0u)
57 << ": Compression preset must be in the range [0, 9].";
58 CHECK_LE(compression_preset_, 9u)
59 << ": Compression preset must be in the range [0, 9].";
60
Austin Schuhbe91b342022-06-27 00:53:45 -070061 if (FLAGS_lzma_threads <= 1) {
62 lzma_ret status =
63 lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
64 CHECK(LzmaCodeIsOk(status));
65 } else {
66 lzma_mt mt_options;
67 memset(&mt_options, 0, sizeof(mt_options));
68 mt_options.threads = FLAGS_lzma_threads;
69 mt_options.block_size = block_size;
70 // Compress for at most 100 ms before relinquishing control back to the main
71 // thread.
72 mt_options.timeout = 100;
73 mt_options.preset = compression_preset_;
74 mt_options.filters = nullptr;
75 mt_options.check = LZMA_CHECK_CRC64;
76 lzma_ret status = lzma_stream_encoder_mt(&stream_, &mt_options);
77 CHECK(LzmaCodeIsOk(status));
78 }
79
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070080 stream_.avail_out = 0;
81 VLOG(2) << "LzmaEncoder: Initialization succeeded.";
Austin Schuh48d10d62022-10-16 22:19:23 -070082
83 // TODO(austin): We don't write the biggest messages very often. Is it more
84 // efficient to allocate if we go over a threshold to keep the static memory
85 // in use smaller, or just allocate the worst case like we are doing here?
86 input_buffer_.resize(max_message_size);
Austin Schuh0b0f8bb2023-03-24 15:09:08 -070087
88 // Start our queues out with a reasonable space allocation. The cost of this
89 // is negligable compared to the buffer cost, but removes a bunch of
90 // allocations at runtime.
91 queue_.reserve(4);
92 free_queue_.reserve(4);
93 return_queue_.reserve(4);
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070094}
95
96LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
97
Austin Schuh8bdfc492023-02-11 12:53:13 -080098size_t LzmaEncoder::Encode(Copier *copy, size_t start_byte) {
Austin Schuh48d10d62022-10-16 22:19:23 -070099 const size_t copy_size = copy->size();
100 // LZMA compresses the data as it goes along, copying the compressed results
101 // into another buffer. So, there's no need to store more than one message
102 // since lzma is going to take it from here.
103 CHECK_LE(copy_size, input_buffer_.size());
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700104
Austin Schuh8bdfc492023-02-11 12:53:13 -0800105 CHECK_EQ(copy->Copy(input_buffer_.data(), start_byte, copy_size - start_byte),
106 copy_size - start_byte);
Austin Schuh48d10d62022-10-16 22:19:23 -0700107
108 stream_.next_in = input_buffer_.data();
109 stream_.avail_in = copy_size;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700110
111 RunLzmaCode(LZMA_RUN);
Austin Schuh8bdfc492023-02-11 12:53:13 -0800112
113 return copy_size - start_byte;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700114}
115
116void LzmaEncoder::Finish() { RunLzmaCode(LZMA_FINISH); }
117
118void LzmaEncoder::Clear(const int n) {
119 CHECK_GE(n, 0);
120 CHECK_LE(static_cast<size_t>(n), queue_size());
Austin Schuh0b0f8bb2023-03-24 15:09:08 -0700121 for (int i = 0; i < n; ++i) {
122 free_queue_.emplace_back(std::move(queue_[i]));
123 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700124 queue_.erase(queue_.begin(), queue_.begin() + n);
125 if (queue_.empty()) {
126 stream_.next_out = nullptr;
127 stream_.avail_out = 0;
128 }
129}
130
Austin Schuh48d10d62022-10-16 22:19:23 -0700131absl::Span<const absl::Span<const uint8_t>> LzmaEncoder::queue() {
132 return_queue_.clear();
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700133 if (queue_.empty()) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700134 return return_queue_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700135 }
Austin Schuh48d10d62022-10-16 22:19:23 -0700136 return_queue_.reserve(queue_.size());
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700137 for (size_t i = 0; i < queue_.size() - 1; ++i) {
Austin Schuh48d10d62022-10-16 22:19:23 -0700138 return_queue_.emplace_back(
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700139 absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
140 }
141 // For the last buffer in the queue, we must account for the possibility that
142 // the buffer isn't full yet.
Austin Schuh790ec9c2023-05-03 13:47:15 -0700143 if (queue_.back().size() != stream_.avail_out) {
144 return_queue_.emplace_back(absl::MakeConstSpan(
145 queue_.back().data(), queue_.back().size() - stream_.avail_out));
146 }
Austin Schuh48d10d62022-10-16 22:19:23 -0700147 return return_queue_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700148}
149
150size_t LzmaEncoder::queued_bytes() const {
151 size_t bytes = queue_size() * kEncodedBufferSizeBytes;
152 // Subtract the bytes that the encoder hasn't filled yet.
153 bytes -= stream_.avail_out;
154 return bytes;
155}
156
157void LzmaEncoder::RunLzmaCode(lzma_action action) {
158 CHECK(!finished_);
159
160 // This is to keep track of how many bytes resulted from encoding this input
161 // buffer.
162 size_t last_avail_out = stream_.avail_out;
163
164 while (stream_.avail_in > 0 || action == LZMA_FINISH) {
165 // If output buffer is full, create a new one, queue it up, and resume
166 // encoding. This could happen in the first call to Encode after
167 // construction or a Reset, or when an input buffer is large enough to fill
168 // more than one output buffer.
169 if (stream_.avail_out == 0) {
Austin Schuh0b0f8bb2023-03-24 15:09:08 -0700170 if (!free_queue_.empty()) {
171 queue_.emplace_back(std::move(free_queue_.back()));
172 free_queue_.pop_back();
173 } else {
174 queue_.emplace_back();
175 queue_.back().resize(kEncodedBufferSizeBytes);
176 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700177 stream_.next_out = queue_.back().data();
178 stream_.avail_out = kEncodedBufferSizeBytes;
179 // Update the byte count.
180 total_bytes_ += last_avail_out;
181 last_avail_out = stream_.avail_out;
182 }
183
184 // Encode the data.
185 lzma_ret status = lzma_code(&stream_, action);
Austin Schuh3bd4c402020-11-06 18:19:06 -0800186 CHECK(LzmaCodeIsOk(status));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700187 if (action == LZMA_FINISH) {
188 if (status == LZMA_STREAM_END) {
189 // This is returned when lzma_code is all done.
190 finished_ = true;
191 break;
192 }
193 } else {
194 CHECK(status != LZMA_STREAM_END);
195 }
196 VLOG(2) << "LzmaEncoder: Encoded chunk.";
197 }
198
199 // Update the number of resulting encoded bytes.
200 total_bytes_ += last_avail_out - stream_.avail_out;
201}
202
Austin Schuhcd368422021-11-22 21:23:29 -0800203LzmaDecoder::LzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
204 bool quiet)
Tyler Chatow2015bc62021-08-04 21:15:09 -0700205 : underlying_decoder_(std::move(underlying_decoder)),
Austin Schuhcd368422021-11-22 21:23:29 -0800206 stream_(LZMA_STREAM_INIT),
207 quiet_(quiet) {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700208 compressed_data_.resize(kBufSize);
209
210 lzma_ret status =
211 lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
Austin Schuh3bd4c402020-11-06 18:19:06 -0800212 CHECK(LzmaCodeIsOk(status)) << "Failed initializing LZMA stream decoder.";
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700213 stream_.avail_out = 0;
214 VLOG(2) << "LzmaDecoder: Initialization succeeded.";
215}
216
217LzmaDecoder::~LzmaDecoder() { lzma_end(&stream_); }
218
219size_t LzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
220 if (finished_) {
221 return 0;
222 }
223
224 // Write into the given range.
225 stream_.next_out = begin;
226 stream_.avail_out = end - begin;
227 // Keep decompressing until we run out of buffer space.
228 while (stream_.avail_out > 0) {
229 if (action_ == LZMA_RUN && stream_.avail_in == 0) {
230 // Read more bytes from the file if we're all out.
Tyler Chatow2015bc62021-08-04 21:15:09 -0700231 const size_t count = underlying_decoder_->Read(compressed_data_.begin(),
232 compressed_data_.end());
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700233 if (count == 0) {
234 // No more data to read in the file, begin the finishing operation.
235 action_ = LZMA_FINISH;
236 } else {
237 stream_.next_in = compressed_data_.data();
238 stream_.avail_in = count;
239 }
240 }
241 // Decompress the data.
242 const lzma_ret status = lzma_code(&stream_, action_);
243 // Return if we're done.
244 if (status == LZMA_STREAM_END) {
245 CHECK_EQ(action_, LZMA_FINISH)
246 << ": Got LZMA_STREAM_END when action wasn't LZMA_FINISH";
247 finished_ = true;
248 return (end - begin) - stream_.avail_out;
249 }
Austin Schuh3bd4c402020-11-06 18:19:06 -0800250
251 // If we fail to decompress, give up. Return everything that has been
252 // produced so far.
Tyler Chatow2015bc62021-08-04 21:15:09 -0700253 if (!LzmaCodeIsOk(status, filename())) {
Austin Schuh3bd4c402020-11-06 18:19:06 -0800254 finished_ = true;
Brian Silverman517431e2021-11-10 12:48:58 -0800255 if (status == LZMA_DATA_ERROR) {
Austin Schuhcd368422021-11-22 21:23:29 -0800256 if (!quiet_ || VLOG_IS_ON(1)) {
257 LOG(WARNING) << filename() << " is corrupted.";
258 }
Brian Silverman517431e2021-11-10 12:48:58 -0800259 } else if (status == LZMA_BUF_ERROR) {
Austin Schuhcd368422021-11-22 21:23:29 -0800260 if (!quiet_ || VLOG_IS_ON(1)) {
261 LOG(WARNING) << filename() << " is truncated or corrupted.";
262 }
Brian Silverman517431e2021-11-10 12:48:58 -0800263 } else {
264 LOG(FATAL) << "Unknown error " << status << " when reading "
265 << filename();
266 }
Austin Schuh3bd4c402020-11-06 18:19:06 -0800267 return (end - begin) - stream_.avail_out;
268 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700269 }
270 return end - begin;
271}
272
Tyler Chatow2015bc62021-08-04 21:15:09 -0700273ThreadedLzmaDecoder::ThreadedLzmaDecoder(
Austin Schuhcd368422021-11-22 21:23:29 -0800274 std::unique_ptr<DataDecoder> underlying_decoder, bool quiet)
275 : decoder_(std::move(underlying_decoder), quiet), decode_thread_([this] {
Tyler Chatow7df60832021-07-15 21:18:36 -0700276 std::unique_lock lock(decode_mutex_);
277 while (true) {
278 // Wake if the queue is too small or we are finished.
279 continue_decoding_.wait(lock, [this] {
280 return decoded_queue_.size() < kQueueSize || finished_;
281 });
282
283 if (finished_) {
284 return;
285 }
286
287 while (true) {
288 CHECK(!finished_);
289 // Release our lock on the queue before doing decompression work.
290 lock.unlock();
291
292 ResizeableBuffer buffer;
293 buffer.resize(kBufSize);
294
295 const size_t bytes_read =
296 decoder_.Read(buffer.begin(), buffer.end());
297 buffer.resize(bytes_read);
298
299 // Relock the queue and move the new buffer to the end. This should
300 // be fast. We also need to stay locked when we wait().
301 lock.lock();
302 if (bytes_read > 0) {
303 decoded_queue_.emplace_back(std::move(buffer));
304 } else {
305 finished_ = true;
306 }
307
308 // If we've filled the queue or are out of data, go back to sleep.
309 if (decoded_queue_.size() >= kQueueSize || finished_) {
310 break;
311 }
312 }
313
314 // Notify main thread in case it was waiting for us to queue more
315 // data.
316 queue_filled_.notify_one();
317 }
318 }) {}
319
320ThreadedLzmaDecoder::~ThreadedLzmaDecoder() {
321 // Wake up decode thread so it can return.
322 {
323 std::scoped_lock lock(decode_mutex_);
324 finished_ = true;
325 }
326 continue_decoding_.notify_one();
327 decode_thread_.join();
328}
329
330size_t ThreadedLzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
331 std::unique_lock lock(decode_mutex_);
332
333 // Strip any empty buffers
334 for (auto iter = decoded_queue_.begin(); iter != decoded_queue_.end();) {
335 if (iter->size() == 0) {
336 iter = decoded_queue_.erase(iter);
337 } else {
338 ++iter;
339 }
340 }
341
342 // If the queue is empty, sleep until the decoder thread has produced another
343 // buffer.
344 if (decoded_queue_.empty()) {
345 continue_decoding_.notify_one();
346 queue_filled_.wait(lock,
347 [this] { return finished_ || !decoded_queue_.empty(); });
348 if (finished_ && decoded_queue_.empty()) {
349 return 0;
350 }
351 }
352 // Sanity check if the queue is empty and we're not finished.
353 CHECK(!decoded_queue_.empty()) << "Decoded queue unexpectedly empty";
354
355 ResizeableBuffer &front_buffer = decoded_queue_.front();
356
357 // Copy some data from our working buffer to the requested destination.
358 const std::size_t bytes_requested = end - begin;
359 const std::size_t bytes_to_copy =
360 std::min(bytes_requested, front_buffer.size());
361 memcpy(begin, front_buffer.data(), bytes_to_copy);
362 front_buffer.erase_front(bytes_to_copy);
363
364 // Ensure the decoding thread wakes up if the queue isn't full.
365 if (!finished_ && decoded_queue_.size() < kQueueSize) {
366 continue_decoding_.notify_one();
367 }
368
369 return bytes_to_copy;
370}
371
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700372} // namespace aos::logger