blob: d3edbe5b607ff4876c1d495b80a5b5c5964bad58 [file] [log] [blame]
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07001#ifndef AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
2#define AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
3
4#include <string_view>
5
6#include "absl/types/span.h"
7#include "aos/containers/resizeable_buffer.h"
8#include "aos/events/logging/buffer_encoder.h"
9#include "aos/events/logging/logger_generated.h"
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070010#include "flatbuffers/flatbuffers.h"
Austin Schuhd5bd91a2022-09-16 15:11:54 -070011#include "snappy-sinksource.h"
12#include "snappy.h"
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070013
14namespace aos::logger {
15
16// Encodes buffers using snappy.
Austin Schuh48d10d62022-10-16 22:19:23 -070017class SnappyEncoder final : public DataEncoder {
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070018 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070019 explicit SnappyEncoder(size_t max_message_size, size_t chunk_size = 32768);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070020
Austin Schuh48d10d62022-10-16 22:19:23 -070021 void Encode(Copier *copy) final;
22
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070023 void Finish() final;
24 void Clear(int n) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070025 absl::Span<const absl::Span<const uint8_t>> queue() final;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070026 size_t queued_bytes() const final;
Austin Schuh48d10d62022-10-16 22:19:23 -070027 bool HasSpace(size_t /*request*/) const final {
28 // Since the output always mallocs space, we have infinite output space.
29 return true;
30 }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070031 size_t total_bytes() const final { return total_bytes_; }
32 size_t queue_size() const final { return queue_.size(); }
33
34 private:
35 class DetachedBufferSource : public snappy::Source {
36 public:
Austin Schuh48d10d62022-10-16 22:19:23 -070037 DetachedBufferSource(size_t buffer_size);
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070038 size_t Available() const final;
39 const char *Peek(size_t *length) final;
40 void Skip(size_t n) final;
Austin Schuh48d10d62022-10-16 22:19:23 -070041 void Append(Copier *copy);
42
43 uint32_t accumulated_checksum() const {
44 return accumulated_checksum_.value();
45 }
46
47 void ResetAccumulatedChecksum() { accumulated_checksum_.reset(); }
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070048
49 private:
Austin Schuh48d10d62022-10-16 22:19:23 -070050 ResizeableBuffer data_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070051 size_t index_into_first_buffer_ = 0;
Austin Schuh48d10d62022-10-16 22:19:23 -070052 std::optional<uint32_t> accumulated_checksum_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070053 };
54
55 // Flushes buffer_source_ and stores the compressed buffer in queue_.
56 void EncodeCurrentBuffer();
57
58 // To queue up data:
59 // 1) When Encode is called, we use AppendBuffer to store the DetachedBuffer
60 // in buffer_source_.
61 // 2) Once we've queued up at least chunk_size_ data in buffer_source_, we
62 // use snappy to compress all the data. This flushes everything out of
63 // buffer_source_ and adds a single buffer to queue_. Note that we do
64 // not split up flatbuffer buffers to ensure that we produce chunks of
65 // exactly chunk_size_ uncompressed data--if we get a 1MB DetachedBuffer
66 // we will compress it all at once.
67 // 3) queue_ is the data that is actually read by queue() and cleared by
68 // Clear() to be written to disk.
69 const size_t chunk_size_;
70 DetachedBufferSource buffer_source_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070071 std::vector<ResizeableBuffer> queue_;
Austin Schuh48d10d62022-10-16 22:19:23 -070072
73 std::vector<absl::Span<const uint8_t>> return_queue_;
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070074 size_t total_bytes_ = 0;
75};
76
77// Decompresses data with snappy.
78class SnappyDecoder final : public DataDecoder {
79 public:
James Kuszmauldd0a5042021-10-28 23:38:04 -070080 static constexpr std::string_view kExtension = ".sz";
81
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070082 explicit SnappyDecoder(std::unique_ptr<DataDecoder> underlying_decoder)
83 : underlying_decoder_(std::move(underlying_decoder)) {}
84 explicit SnappyDecoder(std::string_view filename)
85 : SnappyDecoder(std::make_unique<DummyDecoder>(filename)) {}
86
87 size_t Read(uint8_t *begin, uint8_t *end) final;
88 std::string_view filename() const final {
89 return underlying_decoder_->filename();
90 }
91
92 private:
93 // decoder to use for reading data out of the file itself.
94 std::unique_ptr<DataDecoder> underlying_decoder_;
95 // Buffer to use for reading data from the file. This being a member variable
96 // is purely an optimization to avoid constant reallocations on every call to
97 // Read().
98 ResizeableBuffer compressed_buffer_;
99 // Buffer of any uncompressed data that we've read but which hasn't yet been
100 // consumed by a call to Read().
101 ResizeableBuffer uncompressed_buffer_;
102
103 size_t total_output_ = 0;
104};
105
106} // namespace aos::logger
107
108#endif // AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_