blob: d698ee1d4140f7b7a3bb69afacb9bbbf41b0597a [file] [log] [blame]
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07001#ifndef AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
2#define AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
3
4#include <string_view>
5
6#include "absl/types/span.h"
7#include "aos/containers/resizeable_buffer.h"
8#include "aos/events/logging/buffer_encoder.h"
9#include "aos/events/logging/logger_generated.h"
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070010#include "flatbuffers/flatbuffers.h"
Austin Schuhd5bd91a2022-09-16 15:11:54 -070011#include "snappy-sinksource.h"
12#include "snappy.h"
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070013
14namespace aos::logger {
15
16// Encodes buffers using snappy.
Austin Schuh48d10d62022-10-16 22:19:23 -070017class SnappyEncoder final : public DataEncoder {
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070018 public:
Austin Schuh8bdfc492023-02-11 12:53:13 -080019 explicit SnappyEncoder(size_t max_message_size, size_t chunk_size = 128 * 1024);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070020
Austin Schuh8bdfc492023-02-11 12:53:13 -080021 size_t Encode(Copier *copy, size_t start_byte) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070022
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070023 void Finish() final;
24 void Clear(int n) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070025 absl::Span<const absl::Span<const uint8_t>> queue() final;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070026 size_t queued_bytes() const final;
Austin Schuh48d10d62022-10-16 22:19:23 -070027 bool HasSpace(size_t /*request*/) const final {
28 // Since the output always mallocs space, we have infinite output space.
29 return true;
30 }
Austin Schuh8bdfc492023-02-11 12:53:13 -080031 size_t space() const final { return buffer_source_.space(); }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070032 size_t total_bytes() const final { return total_bytes_; }
33 size_t queue_size() const final { return queue_.size(); }
34
35 private:
36 class DetachedBufferSource : public snappy::Source {
37 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070038 DetachedBufferSource(size_t buffer_size);
Austin Schuh8bdfc492023-02-11 12:53:13 -080039 size_t space() const { return data_.capacity() - data_.size(); }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070040 size_t Available() const final;
41 const char *Peek(size_t *length) final;
42 void Skip(size_t n) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070043 void Append(Copier *copy);
44
45 uint32_t accumulated_checksum() const {
46 return accumulated_checksum_.value();
47 }
48
49 void ResetAccumulatedChecksum() { accumulated_checksum_.reset(); }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070050
51 private:
Austin Schuh48d10d62022-10-16 22:19:23 -070052 ResizeableBuffer data_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070053 size_t index_into_first_buffer_ = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -070054 std::optional<uint32_t> accumulated_checksum_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070055 };
56
57 // Flushes buffer_source_ and stores the compressed buffer in queue_.
58 void EncodeCurrentBuffer();
59
60 // To queue up data:
61 // 1) When Encode is called, we use AppendBuffer to store the DetachedBuffer
62 // in buffer_source_.
63 // 2) Once we've queued up at least chunk_size_ data in buffer_source_, we
64 // use snappy to compress all the data. This flushes everything out of
65 // buffer_source_ and adds a single buffer to queue_. Note that we do
66 // not split up flatbuffer buffers to ensure that we produce chunks of
67 // exactly chunk_size_ uncompressed data--if we get a 1MB DetachedBuffer
68 // we will compress it all at once.
69 // 3) queue_ is the data that is actually read by queue() and cleared by
70 // Clear() to be written to disk.
71 const size_t chunk_size_;
72 DetachedBufferSource buffer_source_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070073 std::vector<ResizeableBuffer> queue_;
Austin Schuh48d10d62022-10-16 22:19:23 -070074
75 std::vector<absl::Span<const uint8_t>> return_queue_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070076 size_t total_bytes_ = 0;
77};
78
79// Decompresses data with snappy.
80class SnappyDecoder final : public DataDecoder {
81 public:
James Kuszmauldd0a5042021-10-28 23:38:04 -070082 static constexpr std::string_view kExtension = ".sz";
83
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070084 explicit SnappyDecoder(std::unique_ptr<DataDecoder> underlying_decoder)
85 : underlying_decoder_(std::move(underlying_decoder)) {}
86 explicit SnappyDecoder(std::string_view filename)
87 : SnappyDecoder(std::make_unique<DummyDecoder>(filename)) {}
88
89 size_t Read(uint8_t *begin, uint8_t *end) final;
90 std::string_view filename() const final {
91 return underlying_decoder_->filename();
92 }
93
94 private:
95 // decoder to use for reading data out of the file itself.
96 std::unique_ptr<DataDecoder> underlying_decoder_;
97 // Buffer to use for reading data from the file. This being a member variable
98 // is purely an optimization to avoid constant reallocations on every call to
99 // Read().
100 ResizeableBuffer compressed_buffer_;
101 // Buffer of any uncompressed data that we've read but which hasn't yet been
102 // consumed by a call to Read().
103 ResizeableBuffer uncompressed_buffer_;
104
105 size_t total_output_ = 0;
106};
107
108} // namespace aos::logger
109
110#endif // AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_