blob: 93508ca0c9d1f58a7c83707c9d3a491b61d6f7df [file] [log] [blame]
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07001#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
2#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
3
Tyler Chatow7df60832021-07-15 21:18:36 -07004#include <condition_variable>
5#include <mutex>
Austin Schuh60e77942022-05-16 17:48:24 -07006#include <string_view>
Tyler Chatow7df60832021-07-15 21:18:36 -07007#include <thread>
Brian Silvermanf59fe3f2020-09-22 21:04:09 -07008
Tyler Chatow7df60832021-07-15 21:18:36 -07009#include "absl/types/span.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070010#include "flatbuffers/flatbuffers.h"
11
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070012#include "aos/containers/resizeable_buffer.h"
13#include "aos/events/logging/buffer_encoder.h"
14#include "aos/events/logging/logger_generated.h"
Tyler Chatow7df60832021-07-15 21:18:36 -070015#include "lzma.h"
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070016
17namespace aos::logger {
18
19// Encodes buffers using liblzma.
Austin Schuh48d10d62022-10-16 22:19:23 -070020class LzmaEncoder final : public DataEncoder {
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070021 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070022 static constexpr std::string_view kExtension = ".xz";
23
Austin Schuhbe91b342022-06-27 00:53:45 -070024 // Initializes the LZMA stream and encoder. The block size is the block size
25 // used by the multithreaded encoder for batching. A block size of 0 tells
26 // lzma to pick it's favorite block size.
Austin Schuh48d10d62022-10-16 22:19:23 -070027 explicit LzmaEncoder(size_t max_message_size, uint32_t compression_preset,
28 size_t block_size = 0);
Austin Schuhc41603c2020-10-11 16:17:37 -070029 LzmaEncoder(const LzmaEncoder &) = delete;
30 LzmaEncoder(LzmaEncoder &&other) = delete;
31 LzmaEncoder &operator=(const LzmaEncoder &) = delete;
32 LzmaEncoder &operator=(LzmaEncoder &&other) = delete;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070033 // Gracefully shuts down the encoder.
34 ~LzmaEncoder() final;
35
Austin Schuh48d10d62022-10-16 22:19:23 -070036 bool HasSpace(size_t /*request*/) const override {
37 // Since the underlying lzma encoder handles buffering, we always have
38 // space.
39 return true;
40 }
Austin Schuh8bdfc492023-02-11 12:53:13 -080041 size_t space() const final { return input_buffer_.capacity(); }
42 size_t Encode(Copier *copy, size_t start_byte) final;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070043 void Finish() final;
44 void Clear(int n) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070045 absl::Span<const absl::Span<const uint8_t>> queue() final;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070046 size_t queued_bytes() const final;
47 size_t total_bytes() const final { return total_bytes_; }
48 size_t queue_size() const final { return queue_.size(); }
49
50 private:
Austin Schuh48d10d62022-10-16 22:19:23 -070051 static constexpr size_t kEncodedBufferSizeBytes{1024 * 128};
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070052
53 void RunLzmaCode(lzma_action action);
54
55 lzma_stream stream_;
56 uint32_t compression_preset_;
57 std::vector<ResizeableBuffer> queue_;
Austin Schuh0b0f8bb2023-03-24 15:09:08 -070058 // Since we pretty much just allocate a couple of buffers, then allocate and
59 // release them over and over with very similar memory usage and without much
60 // variation in the peak usage, put the allocate chunks in a free queue to
61 // reduce fragmentation.
62 std::vector<ResizeableBuffer> free_queue_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070063 bool finished_ = false;
64 // Total bytes that resulted from encoding raw data since the last call to
65 // Reset.
66 size_t total_bytes_ = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -070067
68 // Buffer that messages get coppied into for encoding.
69 ResizeableBuffer input_buffer_;
70
71 std::vector<absl::Span<const uint8_t>> return_queue_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070072};
73
74// Decompresses data with liblzma.
75class LzmaDecoder final : public DataDecoder {
76 public:
James Kuszmauldd0a5042021-10-28 23:38:04 -070077 static constexpr std::string_view kExtension = ".xz";
78
Austin Schuhcd368422021-11-22 21:23:29 -080079 explicit LzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
80 bool quiet = false);
81 explicit LzmaDecoder(std::string_view filename, bool quiet = false)
82 : LzmaDecoder(std::make_unique<DummyDecoder>(filename), quiet) {}
Austin Schuhc41603c2020-10-11 16:17:37 -070083 LzmaDecoder(const LzmaDecoder &) = delete;
84 LzmaDecoder(LzmaDecoder &&other) = delete;
85 LzmaDecoder &operator=(const LzmaDecoder &) = delete;
86 LzmaDecoder &operator=(LzmaDecoder &&other) = delete;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070087 ~LzmaDecoder();
88
89 size_t Read(uint8_t *begin, uint8_t *end) final;
Tyler Chatow2015bc62021-08-04 21:15:09 -070090 std::string_view filename() const final {
91 return underlying_decoder_->filename();
92 }
Brian Silvermanf59fe3f2020-09-22 21:04:09 -070093
94 private:
95 // Size of temporary buffer to use.
96 static constexpr size_t kBufSize{256 * 1024};
97
98 // Temporary buffer for storing compressed data.
99 ResizeableBuffer compressed_data_;
100 // Used for reading data from the file.
Tyler Chatow2015bc62021-08-04 21:15:09 -0700101 std::unique_ptr<DataDecoder> underlying_decoder_;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700102 // Stream for decompression.
103 lzma_stream stream_;
104 // The current action. This is LZMA_RUN until we've run out of data to read
105 // from the file.
106 lzma_action action_ = LZMA_RUN;
107 // Flag that represents whether or not all the data from the file has been
108 // successfully decoded.
109 bool finished_ = false;
Austin Schuhcd368422021-11-22 21:23:29 -0800110 // Flag to signal how quiet to be when logging potential issues around
111 // truncation.
112 const bool quiet_ = false;
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700113};
114
Tyler Chatow7df60832021-07-15 21:18:36 -0700115// Decompresses data with liblzma in a new thread, up to a maximum queue
116// size. Calls to Read() will return data from the queue if available,
117// or block until more data is queued or the stream finishes.
118class ThreadedLzmaDecoder : public DataDecoder {
119 public:
Austin Schuhcd368422021-11-22 21:23:29 -0800120 explicit ThreadedLzmaDecoder(std::string_view filename, bool quiet = false)
121 : ThreadedLzmaDecoder(std::make_unique<DummyDecoder>(filename), quiet) {}
122 explicit ThreadedLzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
123 bool quiet = false);
Tyler Chatow7df60832021-07-15 21:18:36 -0700124 ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
125 ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
126
127 ~ThreadedLzmaDecoder();
128
129 size_t Read(uint8_t *begin, uint8_t *end) final;
130
Tyler Chatow2015bc62021-08-04 21:15:09 -0700131 std::string_view filename() const final { return decoder_.filename(); }
132
Tyler Chatow7df60832021-07-15 21:18:36 -0700133 private:
134 static constexpr size_t kBufSize{256 * 1024};
135 static constexpr size_t kQueueSize{8};
136
137 LzmaDecoder decoder_;
138
139 // Queue of decompressed data to return on calls to Read
140 std::vector<ResizeableBuffer> decoded_queue_;
141
142 // Mutex to control access to decoded_queue_.
143 std::mutex decode_mutex_;
144 std::condition_variable continue_decoding_;
145 std::condition_variable queue_filled_;
146
147 bool finished_ = false;
148
149 std::thread decode_thread_;
150};
151
Brian Silvermanf59fe3f2020-09-22 21:04:09 -0700152} // namespace aos::logger
153
154#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_