blob: 6b39c212cad535a37c1ab9335ad88f34b7e3c635 [file] [log] [blame]
James Kuszmaulef0e0cc2021-10-28 23:00:04 -07001#ifndef AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
2#define AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_
3
4#include <string_view>
5
6#include "absl/types/span.h"
7#include "aos/containers/resizeable_buffer.h"
8#include "aos/events/logging/buffer_encoder.h"
9#include "aos/events/logging/logger_generated.h"
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070010#include "flatbuffers/flatbuffers.h"
Austin Schuhd5bd91a2022-09-16 15:11:54 -070011#include "snappy-sinksource.h"
12#include "snappy.h"
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070013
14namespace aos::logger {
15
16// Encodes buffers using snappy.
17class SnappyEncoder final : public DetachedBufferEncoder {
18 public:
19 explicit SnappyEncoder(size_t chunk_size = 32768);
20
21 void Encode(flatbuffers::DetachedBuffer &&in) final;
22 void Finish() final;
23 void Clear(int n) final;
24 std::vector<absl::Span<const uint8_t>> queue() const final;
25 size_t queued_bytes() const final;
26 size_t total_bytes() const final { return total_bytes_; }
27 size_t queue_size() const final { return queue_.size(); }
28
29 private:
30 class DetachedBufferSource : public snappy::Source {
31 public:
32 size_t Available() const final;
33 const char *Peek(size_t *length) final;
34 void Skip(size_t n) final;
35 void AppendBuffer(flatbuffers::DetachedBuffer &&buffer);
36
37 private:
38 std::vector<flatbuffers::DetachedBuffer> buffers_;
39 size_t index_into_first_buffer_ = 0;
40 };
41
42 // Flushes buffer_source_ and stores the compressed buffer in queue_.
43 void EncodeCurrentBuffer();
44
45 // To queue up data:
46 // 1) When Encode is called, we use AppendBuffer to store the DetachedBuffer
47 // in buffer_source_.
48 // 2) Once we've queued up at least chunk_size_ data in buffer_source_, we
49 // use snappy to compress all the data. This flushes everything out of
50 // buffer_source_ and adds a single buffer to queue_. Note that we do
51 // not split up flatbuffer buffers to ensure that we produce chunks of
52 // exactly chunk_size_ uncompressed data--if we get a 1MB DetachedBuffer
53 // we will compress it all at once.
54 // 3) queue_ is the data that is actually read by queue() and cleared by
55 // Clear() to be written to disk.
56 const size_t chunk_size_;
57 DetachedBufferSource buffer_source_;
58 std::optional<uint32_t> accumulated_checksum_;
59 std::vector<ResizeableBuffer> queue_;
60 size_t total_bytes_ = 0;
61};
62
63// Decompresses data with snappy.
64class SnappyDecoder final : public DataDecoder {
65 public:
James Kuszmauldd0a5042021-10-28 23:38:04 -070066 static constexpr std::string_view kExtension = ".sz";
67
James Kuszmaulef0e0cc2021-10-28 23:00:04 -070068 explicit SnappyDecoder(std::unique_ptr<DataDecoder> underlying_decoder)
69 : underlying_decoder_(std::move(underlying_decoder)) {}
70 explicit SnappyDecoder(std::string_view filename)
71 : SnappyDecoder(std::make_unique<DummyDecoder>(filename)) {}
72
73 size_t Read(uint8_t *begin, uint8_t *end) final;
74 std::string_view filename() const final {
75 return underlying_decoder_->filename();
76 }
77
78 private:
79 // decoder to use for reading data out of the file itself.
80 std::unique_ptr<DataDecoder> underlying_decoder_;
81 // Buffer to use for reading data from the file. This being a member variable
82 // is purely an optimization to avoid constant reallocations on every call to
83 // Read().
84 ResizeableBuffer compressed_buffer_;
85 // Buffer of any uncompressed data that we've read but which hasn't yet been
86 // consumed by a call to Read().
87 ResizeableBuffer uncompressed_buffer_;
88
89 size_t total_output_ = 0;
90};
91
92} // namespace aos::logger
93
94#endif // AOS_EVENTS_LOGGING_SNAPPY_ENCODER_H_