Encode flatbuffers directly into the encoder when logging
We were running out of memory when running for many hours. Initial
debugging looked like it was a heap fragmentation issue. Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory. The heap was growing though.
Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.
Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/snappy_encoder.h b/aos/events/logging/snappy_encoder.h
index 6b39c21..d3edbe5 100644
--- a/aos/events/logging/snappy_encoder.h
+++ b/aos/events/logging/snappy_encoder.h
@@ -14,29 +14,42 @@
namespace aos::logger {
// Encodes buffers using snappy.
-class SnappyEncoder final : public DetachedBufferEncoder {
+class SnappyEncoder final : public DataEncoder {
public:
- explicit SnappyEncoder(size_t chunk_size = 32768);
+ explicit SnappyEncoder(size_t max_message_size, size_t chunk_size = 32768);
- void Encode(flatbuffers::DetachedBuffer &&in) final;
+ void Encode(Copier *copy) final;
+
void Finish() final;
void Clear(int n) final;
- std::vector<absl::Span<const uint8_t>> queue() const final;
+ absl::Span<const absl::Span<const uint8_t>> queue() final;
size_t queued_bytes() const final;
+ bool HasSpace(size_t /*request*/) const final {
+ // Since the output always mallocs space, we have infinite output space.
+ return true;
+ }
size_t total_bytes() const final { return total_bytes_; }
size_t queue_size() const final { return queue_.size(); }
private:
class DetachedBufferSource : public snappy::Source {
public:
+ DetachedBufferSource(size_t buffer_size);
size_t Available() const final;
const char *Peek(size_t *length) final;
void Skip(size_t n) final;
- void AppendBuffer(flatbuffers::DetachedBuffer &&buffer);
+ void Append(Copier *copy);
+
+ uint32_t accumulated_checksum() const {
+ return accumulated_checksum_.value();
+ }
+
+ void ResetAccumulatedChecksum() { accumulated_checksum_.reset(); }
private:
- std::vector<flatbuffers::DetachedBuffer> buffers_;
+ ResizeableBuffer data_;
size_t index_into_first_buffer_ = 0;
+ std::optional<uint32_t> accumulated_checksum_;
};
// Flushes buffer_source_ and stores the compressed buffer in queue_.
@@ -55,8 +68,9 @@
// Clear() to be written to disk.
const size_t chunk_size_;
DetachedBufferSource buffer_source_;
- std::optional<uint32_t> accumulated_checksum_;
std::vector<ResizeableBuffer> queue_;
+
+ std::vector<absl::Span<const uint8_t>> return_queue_;
size_t total_bytes_ = 0;
};