blob: 4b6130c885084bd207fdaff4cae95708dd04b0fa [file] [log] [blame]
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07001#ifndef AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
2#define AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
3
4#include <string_view>
5
6#include "absl/types/span.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07007#include "flatbuffers/flatbuffers.h"
8
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07009#include "aos/containers/resizeable_buffer.h"
10#include "aos/events/logging/buffer_encoder.h"
11#include "aos/events/logging/logger_generated.h"
Austin Schuhd5bd91a2022-09-16 15:11:54 -070012#include "snappy-sinksource.h"
13#include "snappy.h"
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070014
15namespace aos::logger {
16
17// Encodes buffers using snappy.
Austin Schuh48d10d62022-10-16 22:19:23 -070018class SnappyEncoder final : public DataEncoder {
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070019 public:
Philipp Schrader790cb542023-07-05 21:06:52 -070020 explicit SnappyEncoder(size_t max_message_size,
21 size_t chunk_size = 128 * 1024);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070022
Austin Schuh8bdfc492023-02-11 12:53:13 -080023 size_t Encode(Copier *copy, size_t start_byte) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070024
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070025 void Finish() final;
26 void Clear(int n) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070027 absl::Span<const absl::Span<const uint8_t>> queue() final;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070028 size_t queued_bytes() const final;
Austin Schuh48d10d62022-10-16 22:19:23 -070029 bool HasSpace(size_t /*request*/) const final {
30 // Since the output always mallocs space, we have infinite output space.
31 return true;
32 }
Austin Schuh8bdfc492023-02-11 12:53:13 -080033 size_t space() const final { return buffer_source_.space(); }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070034 size_t total_bytes() const final { return total_bytes_; }
35 size_t queue_size() const final { return queue_.size(); }
36
37 private:
38 class DetachedBufferSource : public snappy::Source {
39 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070040 DetachedBufferSource(size_t buffer_size);
Austin Schuh8bdfc492023-02-11 12:53:13 -080041 size_t space() const { return data_.capacity() - data_.size(); }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070042 size_t Available() const final;
43 const char *Peek(size_t *length) final;
44 void Skip(size_t n) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070045 void Append(Copier *copy);
46
47 uint32_t accumulated_checksum() const {
48 return accumulated_checksum_.value();
49 }
50
51 void ResetAccumulatedChecksum() { accumulated_checksum_.reset(); }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070052
53 private:
Austin Schuh48d10d62022-10-16 22:19:23 -070054 ResizeableBuffer data_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070055 size_t index_into_first_buffer_ = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -070056 std::optional<uint32_t> accumulated_checksum_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070057 };
58
59 // Flushes buffer_source_ and stores the compressed buffer in queue_.
60 void EncodeCurrentBuffer();
61
62 // To queue up data:
63 // 1) When Encode is called, we use AppendBuffer to store the DetachedBuffer
64 // in buffer_source_.
65 // 2) Once we've queued up at least chunk_size_ data in buffer_source_, we
66 // use snappy to compress all the data. This flushes everything out of
67 // buffer_source_ and adds a single buffer to queue_. Note that we do
68 // not split up flatbuffer buffers to ensure that we produce chunks of
69 // exactly chunk_size_ uncompressed data--if we get a 1MB DetachedBuffer
70 // we will compress it all at once.
71 // 3) queue_ is the data that is actually read by queue() and cleared by
72 // Clear() to be written to disk.
73 const size_t chunk_size_;
74 DetachedBufferSource buffer_source_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070075 std::vector<ResizeableBuffer> queue_;
Austin Schuh48d10d62022-10-16 22:19:23 -070076
77 std::vector<absl::Span<const uint8_t>> return_queue_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070078 size_t total_bytes_ = 0;
79};
80
81// Decompresses data with snappy.
82class SnappyDecoder final : public DataDecoder {
83 public:
James Kuszmauldd0a5042021-10-28 23:38:04 -070084 static constexpr std::string_view kExtension = ".sz";
85
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070086 explicit SnappyDecoder(std::unique_ptr<DataDecoder> underlying_decoder)
87 : underlying_decoder_(std::move(underlying_decoder)) {}
88 explicit SnappyDecoder(std::string_view filename)
89 : SnappyDecoder(std::make_unique<DummyDecoder>(filename)) {}
90
91 size_t Read(uint8_t *begin, uint8_t *end) final;
92 std::string_view filename() const final {
93 return underlying_decoder_->filename();
94 }
95
96 private:
97 // decoder to use for reading data out of the file itself.
98 std::unique_ptr<DataDecoder> underlying_decoder_;
99 // Buffer to use for reading data from the file. This being a member variable
100 // is purely an optimization to avoid constant reallocations on every call to
101 // Read().
102 ResizeableBuffer compressed_buffer_;
103 // Buffer of any uncompressed data that we've read but which hasn't yet been
104 // consumed by a call to Read().
105 ResizeableBuffer uncompressed_buffer_;
106
107 size_t total_output_ = 0;
108};
109
110} // namespace aos::logger
111
112#endif // AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_