blob: f32d7e2ab7720fa7aaebea68000319c72d2e6b6f [file] [log] [blame]
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001#include "aos/events/logging/lzma_encoder.h"
2
3#include "glog/logging.h"
4
5namespace aos::logger {
6namespace {
7
Austin Schuh3bd4c402020-11-06 18:19:06 -08008// Returns true if `status` is not an error code, false if it is recoverable, or
9// otherwise logs the appropriate error message and crashes.
Austin Schuhed292dc2020-12-22 22:32:59 -080010bool LzmaCodeIsOk(lzma_ret status, std::string_view filename = "") {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070011 switch (status) {
12 case LZMA_OK:
13 case LZMA_STREAM_END:
Austin Schuh3bd4c402020-11-06 18:19:06 -080014 return true;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070015 case LZMA_MEM_ERROR:
16 LOG(FATAL) << "Memory allocation failed:" << status;
17 case LZMA_OPTIONS_ERROR:
18 LOG(FATAL) << "The given compression preset or decompression options are "
19 "not supported: "
20 << status;
21 case LZMA_UNSUPPORTED_CHECK:
22 LOG(FATAL) << "The given check type is not supported: " << status;
23 case LZMA_PROG_ERROR:
24 LOG(FATAL) << "One or more of the parameters have values that will never "
25 "be valid: "
26 << status;
27 case LZMA_MEMLIMIT_ERROR:
28 LOG(FATAL) << "Decoder needs more memory than allowed by the specified "
29 "memory usage limit: "
30 << status;
31 case LZMA_FORMAT_ERROR:
Austin Schuhed292dc2020-12-22 22:32:59 -080032 if (filename.empty()) {
33 LOG(FATAL) << "File format not recognized: " << status;
34 } else {
35 LOG(FATAL) << "File format of " << filename
36 << " not recognized: " << status;
37 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070038 case LZMA_DATA_ERROR:
Brian Silverman517431e2021-11-10 12:48:58 -080039 VLOG(1) << "Compressed file is corrupt: " << status;
Austin Schuh3bd4c402020-11-06 18:19:06 -080040 return false;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070041 case LZMA_BUF_ERROR:
Brian Silverman517431e2021-11-10 12:48:58 -080042 VLOG(1) << "Compressed file is truncated or corrupt: " << status;
Austin Schuh3bd4c402020-11-06 18:19:06 -080043 return false;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070044 default:
45 LOG(FATAL) << "Unexpected return value: " << status;
46 }
47}
48
49} // namespace
50
51LzmaEncoder::LzmaEncoder(const uint32_t compression_preset)
52 : stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
53 CHECK_GE(compression_preset_, 0u)
54 << ": Compression preset must be in the range [0, 9].";
55 CHECK_LE(compression_preset_, 9u)
56 << ": Compression preset must be in the range [0, 9].";
57
58 lzma_ret status =
59 lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
Austin Schuh3bd4c402020-11-06 18:19:06 -080060 CHECK(LzmaCodeIsOk(status));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070061 stream_.avail_out = 0;
62 VLOG(2) << "LzmaEncoder: Initialization succeeded.";
63}
64
65LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
66
67void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
68 CHECK(in.data()) << ": Encode called with nullptr.";
69
70 stream_.next_in = in.data();
71 stream_.avail_in = in.size();
72
73 RunLzmaCode(LZMA_RUN);
74}
75
76void LzmaEncoder::Finish() { RunLzmaCode(LZMA_FINISH); }
77
78void LzmaEncoder::Clear(const int n) {
79 CHECK_GE(n, 0);
80 CHECK_LE(static_cast<size_t>(n), queue_size());
81 queue_.erase(queue_.begin(), queue_.begin() + n);
82 if (queue_.empty()) {
83 stream_.next_out = nullptr;
84 stream_.avail_out = 0;
85 }
86}
87
88std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
89 std::vector<absl::Span<const uint8_t>> queue;
90 if (queue_.empty()) {
91 return queue;
92 }
93 for (size_t i = 0; i < queue_.size() - 1; ++i) {
94 queue.emplace_back(
95 absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
96 }
97 // For the last buffer in the queue, we must account for the possibility that
98 // the buffer isn't full yet.
99 queue.emplace_back(absl::MakeConstSpan(
100 queue_.back().data(), queue_.back().size() - stream_.avail_out));
101 return queue;
102}
103
104size_t LzmaEncoder::queued_bytes() const {
105 size_t bytes = queue_size() * kEncodedBufferSizeBytes;
106 // Subtract the bytes that the encoder hasn't filled yet.
107 bytes -= stream_.avail_out;
108 return bytes;
109}
110
111void LzmaEncoder::RunLzmaCode(lzma_action action) {
112 CHECK(!finished_);
113
114 // This is to keep track of how many bytes resulted from encoding this input
115 // buffer.
116 size_t last_avail_out = stream_.avail_out;
117
118 while (stream_.avail_in > 0 || action == LZMA_FINISH) {
119 // If output buffer is full, create a new one, queue it up, and resume
120 // encoding. This could happen in the first call to Encode after
121 // construction or a Reset, or when an input buffer is large enough to fill
122 // more than one output buffer.
123 if (stream_.avail_out == 0) {
124 queue_.emplace_back();
125 queue_.back().resize(kEncodedBufferSizeBytes);
126 stream_.next_out = queue_.back().data();
127 stream_.avail_out = kEncodedBufferSizeBytes;
128 // Update the byte count.
129 total_bytes_ += last_avail_out;
130 last_avail_out = stream_.avail_out;
131 }
132
133 // Encode the data.
134 lzma_ret status = lzma_code(&stream_, action);
Austin Schuh3bd4c402020-11-06 18:19:06 -0800135 CHECK(LzmaCodeIsOk(status));
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700136 if (action == LZMA_FINISH) {
137 if (status == LZMA_STREAM_END) {
138 // This is returned when lzma_code is all done.
139 finished_ = true;
140 break;
141 }
142 } else {
143 CHECK(status != LZMA_STREAM_END);
144 }
145 VLOG(2) << "LzmaEncoder: Encoded chunk.";
146 }
147
148 // Update the number of resulting encoded bytes.
149 total_bytes_ += last_avail_out - stream_.avail_out;
150}
151
Austin Schuhcd368422021-11-22 21:23:29 -0800152LzmaDecoder::LzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
153 bool quiet)
Tyler Chatow2015bc62021-08-04 21:15:09 -0700154 : underlying_decoder_(std::move(underlying_decoder)),
Austin Schuhcd368422021-11-22 21:23:29 -0800155 stream_(LZMA_STREAM_INIT),
156 quiet_(quiet) {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700157 compressed_data_.resize(kBufSize);
158
159 lzma_ret status =
160 lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
Austin Schuh3bd4c402020-11-06 18:19:06 -0800161 CHECK(LzmaCodeIsOk(status)) << "Failed initializing LZMA stream decoder.";
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700162 stream_.avail_out = 0;
163 VLOG(2) << "LzmaDecoder: Initialization succeeded.";
164}
165
166LzmaDecoder::~LzmaDecoder() { lzma_end(&stream_); }
167
168size_t LzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
169 if (finished_) {
170 return 0;
171 }
172
173 // Write into the given range.
174 stream_.next_out = begin;
175 stream_.avail_out = end - begin;
176 // Keep decompressing until we run out of buffer space.
177 while (stream_.avail_out > 0) {
178 if (action_ == LZMA_RUN && stream_.avail_in == 0) {
179 // Read more bytes from the file if we're all out.
Tyler Chatow2015bc62021-08-04 21:15:09 -0700180 const size_t count = underlying_decoder_->Read(compressed_data_.begin(),
181 compressed_data_.end());
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700182 if (count == 0) {
183 // No more data to read in the file, begin the finishing operation.
184 action_ = LZMA_FINISH;
185 } else {
186 stream_.next_in = compressed_data_.data();
187 stream_.avail_in = count;
188 }
189 }
190 // Decompress the data.
191 const lzma_ret status = lzma_code(&stream_, action_);
192 // Return if we're done.
193 if (status == LZMA_STREAM_END) {
194 CHECK_EQ(action_, LZMA_FINISH)
195 << ": Got LZMA_STREAM_END when action wasn't LZMA_FINISH";
196 finished_ = true;
197 return (end - begin) - stream_.avail_out;
198 }
Austin Schuh3bd4c402020-11-06 18:19:06 -0800199
200 // If we fail to decompress, give up. Return everything that has been
201 // produced so far.
Tyler Chatow2015bc62021-08-04 21:15:09 -0700202 if (!LzmaCodeIsOk(status, filename())) {
Austin Schuh3bd4c402020-11-06 18:19:06 -0800203 finished_ = true;
Brian Silverman517431e2021-11-10 12:48:58 -0800204 if (status == LZMA_DATA_ERROR) {
Austin Schuhcd368422021-11-22 21:23:29 -0800205 if (!quiet_ || VLOG_IS_ON(1)) {
206 LOG(WARNING) << filename() << " is corrupted.";
207 }
Brian Silverman517431e2021-11-10 12:48:58 -0800208 } else if (status == LZMA_BUF_ERROR) {
Austin Schuhcd368422021-11-22 21:23:29 -0800209 if (!quiet_ || VLOG_IS_ON(1)) {
210 LOG(WARNING) << filename() << " is truncated or corrupted.";
211 }
Brian Silverman517431e2021-11-10 12:48:58 -0800212 } else {
213 LOG(FATAL) << "Unknown error " << status << " when reading "
214 << filename();
215 }
Austin Schuh3bd4c402020-11-06 18:19:06 -0800216 return (end - begin) - stream_.avail_out;
217 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700218 }
219 return end - begin;
220}
221
Tyler Chatow2015bc62021-08-04 21:15:09 -0700222ThreadedLzmaDecoder::ThreadedLzmaDecoder(
Austin Schuhcd368422021-11-22 21:23:29 -0800223 std::unique_ptr<DataDecoder> underlying_decoder, bool quiet)
224 : decoder_(std::move(underlying_decoder), quiet), decode_thread_([this] {
Tyler Chatow7df60832021-07-15 21:18:36 -0700225 std::unique_lock lock(decode_mutex_);
226 while (true) {
227 // Wake if the queue is too small or we are finished.
228 continue_decoding_.wait(lock, [this] {
229 return decoded_queue_.size() < kQueueSize || finished_;
230 });
231
232 if (finished_) {
233 return;
234 }
235
236 while (true) {
237 CHECK(!finished_);
238 // Release our lock on the queue before doing decompression work.
239 lock.unlock();
240
241 ResizeableBuffer buffer;
242 buffer.resize(kBufSize);
243
244 const size_t bytes_read =
245 decoder_.Read(buffer.begin(), buffer.end());
246 buffer.resize(bytes_read);
247
248 // Relock the queue and move the new buffer to the end. This should
249 // be fast. We also need to stay locked when we wait().
250 lock.lock();
251 if (bytes_read > 0) {
252 decoded_queue_.emplace_back(std::move(buffer));
253 } else {
254 finished_ = true;
255 }
256
257 // If we've filled the queue or are out of data, go back to sleep.
258 if (decoded_queue_.size() >= kQueueSize || finished_) {
259 break;
260 }
261 }
262
263 // Notify main thread in case it was waiting for us to queue more
264 // data.
265 queue_filled_.notify_one();
266 }
267 }) {}
268
269ThreadedLzmaDecoder::~ThreadedLzmaDecoder() {
270 // Wake up decode thread so it can return.
271 {
272 std::scoped_lock lock(decode_mutex_);
273 finished_ = true;
274 }
275 continue_decoding_.notify_one();
276 decode_thread_.join();
277}
278
279size_t ThreadedLzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
280 std::unique_lock lock(decode_mutex_);
281
282 // Strip any empty buffers
283 for (auto iter = decoded_queue_.begin(); iter != decoded_queue_.end();) {
284 if (iter->size() == 0) {
285 iter = decoded_queue_.erase(iter);
286 } else {
287 ++iter;
288 }
289 }
290
291 // If the queue is empty, sleep until the decoder thread has produced another
292 // buffer.
293 if (decoded_queue_.empty()) {
294 continue_decoding_.notify_one();
295 queue_filled_.wait(lock,
296 [this] { return finished_ || !decoded_queue_.empty(); });
297 if (finished_ && decoded_queue_.empty()) {
298 return 0;
299 }
300 }
301 // Sanity check if the queue is empty and we're not finished.
302 CHECK(!decoded_queue_.empty()) << "Decoded queue unexpectedly empty";
303
304 ResizeableBuffer &front_buffer = decoded_queue_.front();
305
306 // Copy some data from our working buffer to the requested destination.
307 const std::size_t bytes_requested = end - begin;
308 const std::size_t bytes_to_copy =
309 std::min(bytes_requested, front_buffer.size());
310 memcpy(begin, front_buffer.data(), bytes_to_copy);
311 front_buffer.erase_front(bytes_to_copy);
312
313 // Ensure the decoding thread wakes up if the queue isn't full.
314 if (!finished_ && decoded_queue_.size() < kQueueSize) {
315 continue_decoding_.notify_one();
316 }
317
318 return bytes_to_copy;
319}
320
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700321} // namespace aos::logger