blob: 5cb98974afcf64ea439fc720575ea85e820d44e2 [file] [log] [blame]
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001#include "aos/events/logging/lzma_encoder.h"
2
3#include "glog/logging.h"
4
Austin Schuhbe91b342022-06-27 00:53:45 -07005DEFINE_int32(lzma_threads, 1, "Number of threads to use for encoding");
6
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07007namespace aos::logger {
8namespace {
9
Austin Schuh3bd4c402020-11-06 18:19:06 -080010// Returns true if `status` is not an error code, false if it is recoverable, or
11// otherwise logs the appropriate error message and crashes.
Austin Schuhed292dc2020-12-22 22:32:59 -080012bool LzmaCodeIsOk(lzma_ret status, std::string_view filename = "") {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070013 switch (status) {
14 case LZMA_OK:
15 case LZMA_STREAM_END:
Austin Schuh3bd4c402020-11-06 18:19:06 -080016 return true;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070017 case LZMA_MEM_ERROR:
18 LOG(FATAL) << "Memory allocation failed:" << status;
19 case LZMA_OPTIONS_ERROR:
20 LOG(FATAL) << "The given compression preset or decompression options are "
21 "not supported: "
22 << status;
23 case LZMA_UNSUPPORTED_CHECK:
24 LOG(FATAL) << "The given check type is not supported: " << status;
25 case LZMA_PROG_ERROR:
26 LOG(FATAL) << "One or more of the parameters have values that will never "
27 "be valid: "
28 << status;
29 case LZMA_MEMLIMIT_ERROR:
30 LOG(FATAL) << "Decoder needs more memory than allowed by the specified "
31 "memory usage limit: "
32 << status;
33 case LZMA_FORMAT_ERROR:
Austin Schuhed292dc2020-12-22 22:32:59 -080034 if (filename.empty()) {
35 LOG(FATAL) << "File format not recognized: " << status;
36 } else {
37 LOG(FATAL) << "File format of " << filename
38 << " not recognized: " << status;
39 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070040 case LZMA_DATA_ERROR:
Brian Silverman517431e2021-11-10 12:48:58 -080041 VLOG(1) << "Compressed file is corrupt: " << status;
Austin Schuh3bd4c402020-11-06 18:19:06 -080042 return false;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070043 case LZMA_BUF_ERROR:
Brian Silverman517431e2021-11-10 12:48:58 -080044 VLOG(1) << "Compressed file is truncated or corrupt: " << status;
Austin Schuh3bd4c402020-11-06 18:19:06 -080045 return false;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070046 default:
47 LOG(FATAL) << "Unexpected return value: " << status;
48 }
49}
50
51} // namespace
52
Austin Schuhbe91b342022-06-27 00:53:45 -070053LzmaEncoder::LzmaEncoder(const uint32_t compression_preset, size_t block_size)
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070054 : stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
55 CHECK_GE(compression_preset_, 0u)
56 << ": Compression preset must be in the range [0, 9].";
57 CHECK_LE(compression_preset_, 9u)
58 << ": Compression preset must be in the range [0, 9].";
59
Austin Schuhbe91b342022-06-27 00:53:45 -070060 if (FLAGS_lzma_threads <= 1) {
61 lzma_ret status =
62 lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
63 CHECK(LzmaCodeIsOk(status));
64 } else {
65 lzma_mt mt_options;
66 memset(&mt_options, 0, sizeof(mt_options));
67 mt_options.threads = FLAGS_lzma_threads;
68 mt_options.block_size = block_size;
69 // Compress for at most 100 ms before relinquishing control back to the main
70 // thread.
71 mt_options.timeout = 100;
72 mt_options.preset = compression_preset_;
73 mt_options.filters = nullptr;
74 mt_options.check = LZMA_CHECK_CRC64;
75 lzma_ret status = lzma_stream_encoder_mt(&stream_, &mt_options);
76 CHECK(LzmaCodeIsOk(status));
77 }
78
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070079 stream_.avail_out = 0;
80 VLOG(2) << "LzmaEncoder: Initialization succeeded.";
81}
82
83LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
84
85void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
86 CHECK(in.data()) << ": Encode called with nullptr.";
87
88 stream_.next_in = in.data();
89 stream_.avail_in = in.size();
90
91 RunLzmaCode(LZMA_RUN);
92}
93
94void LzmaEncoder::Finish() { RunLzmaCode(LZMA_FINISH); }
95
96void LzmaEncoder::Clear(const int n) {
97 CHECK_GE(n, 0);
98 CHECK_LE(static_cast<size_t>(n), queue_size());
99 queue_.erase(queue_.begin(), queue_.begin() + n);
100 if (queue_.empty()) {
101 stream_.next_out = nullptr;
102 stream_.avail_out = 0;
103 }
104}
105
106std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
107 std::vector<absl::Span<const uint8_t>> queue;
108 if (queue_.empty()) {
109 return queue;
110 }
111 for (size_t i = 0; i < queue_.size() - 1; ++i) {
112 queue.emplace_back(
113 absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
114 }
115 // For the last buffer in the queue, we must account for the possibility that
116 // the buffer isn't full yet.
117 queue.emplace_back(absl::MakeConstSpan(
118 queue_.back().data(), queue_.back().size() - stream_.avail_out));
119 return queue;
120}
121
122size_t LzmaEncoder::queued_bytes() const {
123 size_t bytes = queue_size() * kEncodedBufferSizeBytes;
124 // Subtract the bytes that the encoder hasn't filled yet.
125 bytes -= stream_.avail_out;
126 return bytes;
127}
128
129void LzmaEncoder::RunLzmaCode(lzma_action action) {
130 CHECK(!finished_);
131
132 // This is to keep track of how many bytes resulted from encoding this input
133 // buffer.
134 size_t last_avail_out = stream_.avail_out;
135
136 while (stream_.avail_in > 0 || action == LZMA_FINISH) {
137 // If output buffer is full, create a new one, queue it up, and resume
138 // encoding. This could happen in the first call to Encode after
139 // construction or a Reset, or when an input buffer is large enough to fill
140 // more than one output buffer.
141 if (stream_.avail_out == 0) {
142 queue_.emplace_back();
143 queue_.back().resize(kEncodedBufferSizeBytes);
144 stream_.next_out = queue_.back().data();
145 stream_.avail_out = kEncodedBufferSizeBytes;
146 // Update the byte count.
147 total_bytes_ += last_avail_out;
148 last_avail_out = stream_.avail_out;
149 }
150
151 // Encode the data.
152 lzma_ret status = lzma_code(&stream_, action);
Austin Schuh3bd4c402020-11-06 18:19:06 -0800153 CHECK(LzmaCodeIsOk(status));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700154 if (action == LZMA_FINISH) {
155 if (status == LZMA_STREAM_END) {
156 // This is returned when lzma_code is all done.
157 finished_ = true;
158 break;
159 }
160 } else {
161 CHECK(status != LZMA_STREAM_END);
162 }
163 VLOG(2) << "LzmaEncoder: Encoded chunk.";
164 }
165
166 // Update the number of resulting encoded bytes.
167 total_bytes_ += last_avail_out - stream_.avail_out;
168}
169
Austin Schuhcd368422021-11-22 21:23:29 -0800170LzmaDecoder::LzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
171 bool quiet)
Tyler Chatow2015bc62021-08-04 21:15:09 -0700172 : underlying_decoder_(std::move(underlying_decoder)),
Austin Schuhcd368422021-11-22 21:23:29 -0800173 stream_(LZMA_STREAM_INIT),
174 quiet_(quiet) {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700175 compressed_data_.resize(kBufSize);
176
177 lzma_ret status =
178 lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
Austin Schuh3bd4c402020-11-06 18:19:06 -0800179 CHECK(LzmaCodeIsOk(status)) << "Failed initializing LZMA stream decoder.";
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700180 stream_.avail_out = 0;
181 VLOG(2) << "LzmaDecoder: Initialization succeeded.";
182}
183
184LzmaDecoder::~LzmaDecoder() { lzma_end(&stream_); }
185
186size_t LzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
187 if (finished_) {
188 return 0;
189 }
190
191 // Write into the given range.
192 stream_.next_out = begin;
193 stream_.avail_out = end - begin;
194 // Keep decompressing until we run out of buffer space.
195 while (stream_.avail_out > 0) {
196 if (action_ == LZMA_RUN && stream_.avail_in == 0) {
197 // Read more bytes from the file if we're all out.
Tyler Chatow2015bc62021-08-04 21:15:09 -0700198 const size_t count = underlying_decoder_->Read(compressed_data_.begin(),
199 compressed_data_.end());
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700200 if (count == 0) {
201 // No more data to read in the file, begin the finishing operation.
202 action_ = LZMA_FINISH;
203 } else {
204 stream_.next_in = compressed_data_.data();
205 stream_.avail_in = count;
206 }
207 }
208 // Decompress the data.
209 const lzma_ret status = lzma_code(&stream_, action_);
210 // Return if we're done.
211 if (status == LZMA_STREAM_END) {
212 CHECK_EQ(action_, LZMA_FINISH)
213 << ": Got LZMA_STREAM_END when action wasn't LZMA_FINISH";
214 finished_ = true;
215 return (end - begin) - stream_.avail_out;
216 }
Austin Schuh3bd4c402020-11-06 18:19:06 -0800217
218 // If we fail to decompress, give up. Return everything that has been
219 // produced so far.
Tyler Chatow2015bc62021-08-04 21:15:09 -0700220 if (!LzmaCodeIsOk(status, filename())) {
Austin Schuh3bd4c402020-11-06 18:19:06 -0800221 finished_ = true;
Brian Silverman517431e2021-11-10 12:48:58 -0800222 if (status == LZMA_DATA_ERROR) {
Austin Schuhcd368422021-11-22 21:23:29 -0800223 if (!quiet_ || VLOG_IS_ON(1)) {
224 LOG(WARNING) << filename() << " is corrupted.";
225 }
Brian Silverman517431e2021-11-10 12:48:58 -0800226 } else if (status == LZMA_BUF_ERROR) {
Austin Schuhcd368422021-11-22 21:23:29 -0800227 if (!quiet_ || VLOG_IS_ON(1)) {
228 LOG(WARNING) << filename() << " is truncated or corrupted.";
229 }
Brian Silverman517431e2021-11-10 12:48:58 -0800230 } else {
231 LOG(FATAL) << "Unknown error " << status << " when reading "
232 << filename();
233 }
Austin Schuh3bd4c402020-11-06 18:19:06 -0800234 return (end - begin) - stream_.avail_out;
235 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700236 }
237 return end - begin;
238}
239
Tyler Chatow2015bc62021-08-04 21:15:09 -0700240ThreadedLzmaDecoder::ThreadedLzmaDecoder(
Austin Schuhcd368422021-11-22 21:23:29 -0800241 std::unique_ptr<DataDecoder> underlying_decoder, bool quiet)
242 : decoder_(std::move(underlying_decoder), quiet), decode_thread_([this] {
Tyler Chatow7df60832021-07-15 21:18:36 -0700243 std::unique_lock lock(decode_mutex_);
244 while (true) {
245 // Wake if the queue is too small or we are finished.
246 continue_decoding_.wait(lock, [this] {
247 return decoded_queue_.size() < kQueueSize || finished_;
248 });
249
250 if (finished_) {
251 return;
252 }
253
254 while (true) {
255 CHECK(!finished_);
256 // Release our lock on the queue before doing decompression work.
257 lock.unlock();
258
259 ResizeableBuffer buffer;
260 buffer.resize(kBufSize);
261
262 const size_t bytes_read =
263 decoder_.Read(buffer.begin(), buffer.end());
264 buffer.resize(bytes_read);
265
266 // Relock the queue and move the new buffer to the end. This should
267 // be fast. We also need to stay locked when we wait().
268 lock.lock();
269 if (bytes_read > 0) {
270 decoded_queue_.emplace_back(std::move(buffer));
271 } else {
272 finished_ = true;
273 }
274
275 // If we've filled the queue or are out of data, go back to sleep.
276 if (decoded_queue_.size() >= kQueueSize || finished_) {
277 break;
278 }
279 }
280
281 // Notify main thread in case it was waiting for us to queue more
282 // data.
283 queue_filled_.notify_one();
284 }
285 }) {}
286
287ThreadedLzmaDecoder::~ThreadedLzmaDecoder() {
288 // Wake up decode thread so it can return.
289 {
290 std::scoped_lock lock(decode_mutex_);
291 finished_ = true;
292 }
293 continue_decoding_.notify_one();
294 decode_thread_.join();
295}
296
297size_t ThreadedLzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
298 std::unique_lock lock(decode_mutex_);
299
300 // Strip any empty buffers
301 for (auto iter = decoded_queue_.begin(); iter != decoded_queue_.end();) {
302 if (iter->size() == 0) {
303 iter = decoded_queue_.erase(iter);
304 } else {
305 ++iter;
306 }
307 }
308
309 // If the queue is empty, sleep until the decoder thread has produced another
310 // buffer.
311 if (decoded_queue_.empty()) {
312 continue_decoding_.notify_one();
313 queue_filled_.wait(lock,
314 [this] { return finished_ || !decoded_queue_.empty(); });
315 if (finished_ && decoded_queue_.empty()) {
316 return 0;
317 }
318 }
319 // Sanity check if the queue is empty and we're not finished.
320 CHECK(!decoded_queue_.empty()) << "Decoded queue unexpectedly empty";
321
322 ResizeableBuffer &front_buffer = decoded_queue_.front();
323
324 // Copy some data from our working buffer to the requested destination.
325 const std::size_t bytes_requested = end - begin;
326 const std::size_t bytes_to_copy =
327 std::min(bytes_requested, front_buffer.size());
328 memcpy(begin, front_buffer.data(), bytes_to_copy);
329 front_buffer.erase_front(bytes_to_copy);
330
331 // Ensure the decoding thread wakes up if the queue isn't full.
332 if (!finished_ && decoded_queue_.size() < kQueueSize) {
333 continue_decoding_.notify_one();
334 }
335
336 return bytes_to_copy;
337}
338
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700339} // namespace aos::logger