blob: b4964fbb4dfbc06eee9a0107d5f49f6e51af59db [file] [log] [blame]
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
2#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
3
Tyler Chatow7df60832021-07-15 21:18:36 -07004#include <condition_variable>
5#include <mutex>
Austin Schuh60e77942022-05-16 17:48:24 -07006#include <string_view>
Tyler Chatow7df60832021-07-15 21:18:36 -07007#include <thread>
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07008
Tyler Chatow7df60832021-07-15 21:18:36 -07009#include "absl/types/span.h"
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070010#include "aos/containers/resizeable_buffer.h"
11#include "aos/events/logging/buffer_encoder.h"
12#include "aos/events/logging/logger_generated.h"
Tyler Chatow7df60832021-07-15 21:18:36 -070013#include "flatbuffers/flatbuffers.h"
14#include "lzma.h"
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070015
16namespace aos::logger {
17
18// Encodes buffers using liblzma.
Austin Schuh48d10d62022-10-16 22:19:23 -070019class LzmaEncoder final : public DataEncoder {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070021 static constexpr std::string_view kExtension = ".xz";
22
Austin Schuhbe91b342022-06-27 00:53:45 -070023 // Initializes the LZMA stream and encoder. The block size is the block size
24 // used by the multithreaded encoder for batching. A block size of 0 tells
25 // lzma to pick it's favorite block size.
Austin Schuh48d10d62022-10-16 22:19:23 -070026 explicit LzmaEncoder(size_t max_message_size, uint32_t compression_preset,
27 size_t block_size = 0);
Austin Schuhc41603c2020-10-11 16:17:37 -070028 LzmaEncoder(const LzmaEncoder &) = delete;
29 LzmaEncoder(LzmaEncoder &&other) = delete;
30 LzmaEncoder &operator=(const LzmaEncoder &) = delete;
31 LzmaEncoder &operator=(LzmaEncoder &&other) = delete;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070032 // Gracefully shuts down the encoder.
33 ~LzmaEncoder() final;
34
Austin Schuh48d10d62022-10-16 22:19:23 -070035 bool HasSpace(size_t /*request*/) const override {
36 // Since the underlying lzma encoder handles buffering, we always have
37 // space.
38 return true;
39 }
Austin Schuh8bdfc492023-02-11 12:53:13 -080040 size_t space() const final { return input_buffer_.capacity(); }
41 size_t Encode(Copier *copy, size_t start_byte) final;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070042 void Finish() final;
43 void Clear(int n) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070044 absl::Span<const absl::Span<const uint8_t>> queue() final;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070045 size_t queued_bytes() const final;
46 size_t total_bytes() const final { return total_bytes_; }
47 size_t queue_size() const final { return queue_.size(); }
48
49 private:
Austin Schuh48d10d62022-10-16 22:19:23 -070050 static constexpr size_t kEncodedBufferSizeBytes{1024 * 128};
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070051
52 void RunLzmaCode(lzma_action action);
53
54 lzma_stream stream_;
55 uint32_t compression_preset_;
56 std::vector<ResizeableBuffer> queue_;
57 bool finished_ = false;
58 // Total bytes that resulted from encoding raw data since the last call to
59 // Reset.
60 size_t total_bytes_ = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -070061
62 // Buffer that messages get coppied into for encoding.
63 ResizeableBuffer input_buffer_;
64
65 std::vector<absl::Span<const uint8_t>> return_queue_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070066};
67
68// Decompresses data with liblzma.
69class LzmaDecoder final : public DataDecoder {
70 public:
James Kuszmauldd0a5042021-10-28 23:38:04 -070071 static constexpr std::string_view kExtension = ".xz";
72
Austin Schuhcd368422021-11-22 21:23:29 -080073 explicit LzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
74 bool quiet = false);
75 explicit LzmaDecoder(std::string_view filename, bool quiet = false)
76 : LzmaDecoder(std::make_unique<DummyDecoder>(filename), quiet) {}
Austin Schuhc41603c2020-10-11 16:17:37 -070077 LzmaDecoder(const LzmaDecoder &) = delete;
78 LzmaDecoder(LzmaDecoder &&other) = delete;
79 LzmaDecoder &operator=(const LzmaDecoder &) = delete;
80 LzmaDecoder &operator=(LzmaDecoder &&other) = delete;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070081 ~LzmaDecoder();
82
83 size_t Read(uint8_t *begin, uint8_t *end) final;
Tyler Chatow2015bc62021-08-04 21:15:09 -070084 std::string_view filename() const final {
85 return underlying_decoder_->filename();
86 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070087
88 private:
89 // Size of temporary buffer to use.
90 static constexpr size_t kBufSize{256 * 1024};
91
92 // Temporary buffer for storing compressed data.
93 ResizeableBuffer compressed_data_;
94 // Used for reading data from the file.
Tyler Chatow2015bc62021-08-04 21:15:09 -070095 std::unique_ptr<DataDecoder> underlying_decoder_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070096 // Stream for decompression.
97 lzma_stream stream_;
98 // The current action. This is LZMA_RUN until we've run out of data to read
99 // from the file.
100 lzma_action action_ = LZMA_RUN;
101 // Flag that represents whether or not all the data from the file has been
102 // successfully decoded.
103 bool finished_ = false;
Austin Schuhcd368422021-11-22 21:23:29 -0800104 // Flag to signal how quiet to be when logging potential issues around
105 // truncation.
106 const bool quiet_ = false;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700107};
108
Tyler Chatow7df60832021-07-15 21:18:36 -0700109// Decompresses data with liblzma in a new thread, up to a maximum queue
110// size. Calls to Read() will return data from the queue if available,
111// or block until more data is queued or the stream finishes.
112class ThreadedLzmaDecoder : public DataDecoder {
113 public:
Austin Schuhcd368422021-11-22 21:23:29 -0800114 explicit ThreadedLzmaDecoder(std::string_view filename, bool quiet = false)
115 : ThreadedLzmaDecoder(std::make_unique<DummyDecoder>(filename), quiet) {}
116 explicit ThreadedLzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
117 bool quiet = false);
Tyler Chatow7df60832021-07-15 21:18:36 -0700118 ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
119 ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
120
121 ~ThreadedLzmaDecoder();
122
123 size_t Read(uint8_t *begin, uint8_t *end) final;
124
Tyler Chatow2015bc62021-08-04 21:15:09 -0700125 std::string_view filename() const final { return decoder_.filename(); }
126
Tyler Chatow7df60832021-07-15 21:18:36 -0700127 private:
128 static constexpr size_t kBufSize{256 * 1024};
129 static constexpr size_t kQueueSize{8};
130
131 LzmaDecoder decoder_;
132
133 // Queue of decompressed data to return on calls to Read
134 std::vector<ResizeableBuffer> decoded_queue_;
135
136 // Mutex to control access to decoded_queue_.
137 std::mutex decode_mutex_;
138 std::condition_variable continue_decoding_;
139 std::condition_variable queue_filled_;
140
141 bool finished_ = false;
142
143 std::thread decode_thread_;
144};
145
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700146} // namespace aos::logger
147
148#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_