blob: 3136d9322b845edb4b298d41fb91c010177516f2 [file] [log] [blame]
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
2#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
3
Tyler Chatow7df60832021-07-15 21:18:36 -07004#include <condition_variable>
5#include <mutex>
Austin Schuh60e77942022-05-16 17:48:24 -07006#include <string_view>
Tyler Chatow7df60832021-07-15 21:18:36 -07007#include <thread>
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07008
Tyler Chatow7df60832021-07-15 21:18:36 -07009#include "absl/types/span.h"
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070010#include "aos/containers/resizeable_buffer.h"
11#include "aos/events/logging/buffer_encoder.h"
12#include "aos/events/logging/logger_generated.h"
Tyler Chatow7df60832021-07-15 21:18:36 -070013#include "flatbuffers/flatbuffers.h"
14#include "lzma.h"
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070015
16namespace aos::logger {
17
18// Encodes buffers using liblzma.
Austin Schuh48d10d62022-10-16 22:19:23 -070019class LzmaEncoder final : public DataEncoder {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070020 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070021 static constexpr std::string_view kExtension = ".xz";
22
Austin Schuhbe91b342022-06-27 00:53:45 -070023 // Initializes the LZMA stream and encoder. The block size is the block size
24 // used by the multithreaded encoder for batching. A block size of 0 tells
25 // lzma to pick it's favorite block size.
Austin Schuh48d10d62022-10-16 22:19:23 -070026 explicit LzmaEncoder(size_t max_message_size, uint32_t compression_preset,
27 size_t block_size = 0);
Austin Schuhc41603c2020-10-11 16:17:37 -070028 LzmaEncoder(const LzmaEncoder &) = delete;
29 LzmaEncoder(LzmaEncoder &&other) = delete;
30 LzmaEncoder &operator=(const LzmaEncoder &) = delete;
31 LzmaEncoder &operator=(LzmaEncoder &&other) = delete;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070032 // Gracefully shuts down the encoder.
33 ~LzmaEncoder() final;
34
Austin Schuh48d10d62022-10-16 22:19:23 -070035 bool HasSpace(size_t /*request*/) const override {
36 // Since the underlying lzma encoder handles buffering, we always have
37 // space.
38 return true;
39 }
Austin Schuh8bdfc492023-02-11 12:53:13 -080040 size_t space() const final { return input_buffer_.capacity(); }
41 size_t Encode(Copier *copy, size_t start_byte) final;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070042 void Finish() final;
43 void Clear(int n) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070044 absl::Span<const absl::Span<const uint8_t>> queue() final;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070045 size_t queued_bytes() const final;
46 size_t total_bytes() const final { return total_bytes_; }
47 size_t queue_size() const final { return queue_.size(); }
48
49 private:
Austin Schuh48d10d62022-10-16 22:19:23 -070050 static constexpr size_t kEncodedBufferSizeBytes{1024 * 128};
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070051
52 void RunLzmaCode(lzma_action action);
53
54 lzma_stream stream_;
55 uint32_t compression_preset_;
56 std::vector<ResizeableBuffer> queue_;
Austin Schuh0b0f8bb2023-03-24 15:09:08 -070057 // Since we pretty much just allocate a couple of buffers, then allocate and
58 // release them over and over with very similar memory usage and without much
59 // variation in the peak usage, put the allocate chunks in a free queue to
60 // reduce fragmentation.
61 std::vector<ResizeableBuffer> free_queue_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070062 bool finished_ = false;
63 // Total bytes that resulted from encoding raw data since the last call to
64 // Reset.
65 size_t total_bytes_ = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -070066
67 // Buffer that messages get coppied into for encoding.
68 ResizeableBuffer input_buffer_;
69
70 std::vector<absl::Span<const uint8_t>> return_queue_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070071};
72
73// Decompresses data with liblzma.
74class LzmaDecoder final : public DataDecoder {
75 public:
James Kuszmauldd0a5042021-10-28 23:38:04 -070076 static constexpr std::string_view kExtension = ".xz";
77
Austin Schuhcd368422021-11-22 21:23:29 -080078 explicit LzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
79 bool quiet = false);
80 explicit LzmaDecoder(std::string_view filename, bool quiet = false)
81 : LzmaDecoder(std::make_unique<DummyDecoder>(filename), quiet) {}
Austin Schuhc41603c2020-10-11 16:17:37 -070082 LzmaDecoder(const LzmaDecoder &) = delete;
83 LzmaDecoder(LzmaDecoder &&other) = delete;
84 LzmaDecoder &operator=(const LzmaDecoder &) = delete;
85 LzmaDecoder &operator=(LzmaDecoder &&other) = delete;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070086 ~LzmaDecoder();
87
88 size_t Read(uint8_t *begin, uint8_t *end) final;
Tyler Chatow2015bc62021-08-04 21:15:09 -070089 std::string_view filename() const final {
90 return underlying_decoder_->filename();
91 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070092
93 private:
94 // Size of temporary buffer to use.
95 static constexpr size_t kBufSize{256 * 1024};
96
97 // Temporary buffer for storing compressed data.
98 ResizeableBuffer compressed_data_;
99 // Used for reading data from the file.
Tyler Chatow2015bc62021-08-04 21:15:09 -0700100 std::unique_ptr<DataDecoder> underlying_decoder_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700101 // Stream for decompression.
102 lzma_stream stream_;
103 // The current action. This is LZMA_RUN until we've run out of data to read
104 // from the file.
105 lzma_action action_ = LZMA_RUN;
106 // Flag that represents whether or not all the data from the file has been
107 // successfully decoded.
108 bool finished_ = false;
Austin Schuhcd368422021-11-22 21:23:29 -0800109 // Flag to signal how quiet to be when logging potential issues around
110 // truncation.
111 const bool quiet_ = false;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700112};
113
Tyler Chatow7df60832021-07-15 21:18:36 -0700114// Decompresses data with liblzma in a new thread, up to a maximum queue
115// size. Calls to Read() will return data from the queue if available,
116// or block until more data is queued or the stream finishes.
117class ThreadedLzmaDecoder : public DataDecoder {
118 public:
Austin Schuhcd368422021-11-22 21:23:29 -0800119 explicit ThreadedLzmaDecoder(std::string_view filename, bool quiet = false)
120 : ThreadedLzmaDecoder(std::make_unique<DummyDecoder>(filename), quiet) {}
121 explicit ThreadedLzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
122 bool quiet = false);
Tyler Chatow7df60832021-07-15 21:18:36 -0700123 ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
124 ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
125
126 ~ThreadedLzmaDecoder();
127
128 size_t Read(uint8_t *begin, uint8_t *end) final;
129
Tyler Chatow2015bc62021-08-04 21:15:09 -0700130 std::string_view filename() const final { return decoder_.filename(); }
131
Tyler Chatow7df60832021-07-15 21:18:36 -0700132 private:
133 static constexpr size_t kBufSize{256 * 1024};
134 static constexpr size_t kQueueSize{8};
135
136 LzmaDecoder decoder_;
137
138 // Queue of decompressed data to return on calls to Read
139 std::vector<ResizeableBuffer> decoded_queue_;
140
141 // Mutex to control access to decoded_queue_.
142 std::mutex decode_mutex_;
143 std::condition_variable continue_decoding_;
144 std::condition_variable queue_filled_;
145
146 bool finished_ = false;
147
148 std::thread decode_thread_;
149};
150
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700151} // namespace aos::logger
152
153#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_