blob: 93508ca0c9d1f58a7c83707c9d3a491b61d6f7df [file] [log] [blame]
#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
#include <condition_variable>
#include <mutex>
#include <string_view>
#include <thread>
#include "absl/types/span.h"
#include "flatbuffers/flatbuffers.h"
#include "aos/containers/resizeable_buffer.h"
#include "aos/events/logging/buffer_encoder.h"
#include "aos/events/logging/logger_generated.h"
#include "lzma.h"
namespace aos::logger {
// Encodes buffers using liblzma.
class LzmaEncoder final : public DataEncoder {
public:
static constexpr std::string_view kExtension = ".xz";
// Initializes the LZMA stream and encoder. The block size is the block size
// used by the multithreaded encoder for batching. A block size of 0 tells
// lzma to pick it's favorite block size.
explicit LzmaEncoder(size_t max_message_size, uint32_t compression_preset,
size_t block_size = 0);
LzmaEncoder(const LzmaEncoder &) = delete;
LzmaEncoder(LzmaEncoder &&other) = delete;
LzmaEncoder &operator=(const LzmaEncoder &) = delete;
LzmaEncoder &operator=(LzmaEncoder &&other) = delete;
// Gracefully shuts down the encoder.
~LzmaEncoder() final;
bool HasSpace(size_t /*request*/) const override {
// Since the underlying lzma encoder handles buffering, we always have
// space.
return true;
}
size_t space() const final { return input_buffer_.capacity(); }
size_t Encode(Copier *copy, size_t start_byte) final;
void Finish() final;
void Clear(int n) final;
absl::Span<const absl::Span<const uint8_t>> queue() final;
size_t queued_bytes() const final;
size_t total_bytes() const final { return total_bytes_; }
size_t queue_size() const final { return queue_.size(); }
private:
static constexpr size_t kEncodedBufferSizeBytes{1024 * 128};
void RunLzmaCode(lzma_action action);
lzma_stream stream_;
uint32_t compression_preset_;
std::vector<ResizeableBuffer> queue_;
// Since we pretty much just allocate a couple of buffers, then allocate and
// release them over and over with very similar memory usage and without much
// variation in the peak usage, put the allocate chunks in a free queue to
// reduce fragmentation.
std::vector<ResizeableBuffer> free_queue_;
bool finished_ = false;
// Total bytes that resulted from encoding raw data since the last call to
// Reset.
size_t total_bytes_ = 0;
// Buffer that messages get coppied into for encoding.
ResizeableBuffer input_buffer_;
std::vector<absl::Span<const uint8_t>> return_queue_;
};
// Decompresses data with liblzma.
class LzmaDecoder final : public DataDecoder {
public:
static constexpr std::string_view kExtension = ".xz";
explicit LzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
bool quiet = false);
explicit LzmaDecoder(std::string_view filename, bool quiet = false)
: LzmaDecoder(std::make_unique<DummyDecoder>(filename), quiet) {}
LzmaDecoder(const LzmaDecoder &) = delete;
LzmaDecoder(LzmaDecoder &&other) = delete;
LzmaDecoder &operator=(const LzmaDecoder &) = delete;
LzmaDecoder &operator=(LzmaDecoder &&other) = delete;
~LzmaDecoder();
size_t Read(uint8_t *begin, uint8_t *end) final;
std::string_view filename() const final {
return underlying_decoder_->filename();
}
private:
// Size of temporary buffer to use.
static constexpr size_t kBufSize{256 * 1024};
// Temporary buffer for storing compressed data.
ResizeableBuffer compressed_data_;
// Used for reading data from the file.
std::unique_ptr<DataDecoder> underlying_decoder_;
// Stream for decompression.
lzma_stream stream_;
// The current action. This is LZMA_RUN until we've run out of data to read
// from the file.
lzma_action action_ = LZMA_RUN;
// Flag that represents whether or not all the data from the file has been
// successfully decoded.
bool finished_ = false;
// Flag to signal how quiet to be when logging potential issues around
// truncation.
const bool quiet_ = false;
};
// Decompresses data with liblzma in a new thread, up to a maximum queue
// size. Calls to Read() will return data from the queue if available,
// or block until more data is queued or the stream finishes.
class ThreadedLzmaDecoder : public DataDecoder {
public:
explicit ThreadedLzmaDecoder(std::string_view filename, bool quiet = false)
: ThreadedLzmaDecoder(std::make_unique<DummyDecoder>(filename), quiet) {}
explicit ThreadedLzmaDecoder(std::unique_ptr<DataDecoder> underlying_decoder,
bool quiet = false);
ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
~ThreadedLzmaDecoder();
size_t Read(uint8_t *begin, uint8_t *end) final;
std::string_view filename() const final { return decoder_.filename(); }
private:
static constexpr size_t kBufSize{256 * 1024};
static constexpr size_t kQueueSize{8};
LzmaDecoder decoder_;
// Queue of decompressed data to return on calls to Read
std::vector<ResizeableBuffer> decoded_queue_;
// Mutex to control access to decoded_queue_.
std::mutex decode_mutex_;
std::condition_variable continue_decoding_;
std::condition_variable queue_filled_;
bool finished_ = false;
std::thread decode_thread_;
};
} // namespace aos::logger
#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_