Encode flatbuffers directly into the encoder when logging
We were running out of memory when running for many hours. Initial
debugging looked like it was a heap fragmentation issue. Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory. The heap was growing though.
Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.
Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/snappy_encoder.cc b/aos/events/logging/snappy_encoder.cc
index 19011da..2ef8363 100644
--- a/aos/events/logging/snappy_encoder.cc
+++ b/aos/events/logging/snappy_encoder.cc
@@ -29,7 +29,11 @@
}
} // namespace
-SnappyEncoder::SnappyEncoder(size_t chunk_size) : chunk_size_(chunk_size) {
+SnappyEncoder::SnappyEncoder(size_t max_message_size, size_t chunk_size)
+ : chunk_size_(chunk_size),
+ // Make sure we have enough space to always add our largest message to the
+ // chunks we will flush.
+ buffer_source_(max_message_size + chunk_size - 1) {
queue_.emplace_back();
queue_.back().resize(kSnappyIdentifierChunk.size());
std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(),
@@ -39,10 +43,8 @@
void SnappyEncoder::Finish() { EncodeCurrentBuffer(); }
-void SnappyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
- accumulated_checksum_ =
- AccumulateCrc32({in.data(), in.size()}, accumulated_checksum_);
- buffer_source_.AppendBuffer(std::move(in));
+void SnappyEncoder::Encode(Copier *copy) {
+ buffer_source_.Append(copy);
if (buffer_source_.Available() >= chunk_size_) {
EncodeCurrentBuffer();
@@ -54,7 +56,7 @@
return;
}
const uint32_t uncompressed_checksum =
- MaskChecksum(accumulated_checksum_.value());
+ MaskChecksum(buffer_source_.accumulated_checksum());
queue_.emplace_back();
constexpr size_t kPrefixSize = 8;
queue_.back().resize(kPrefixSize +
@@ -79,7 +81,7 @@
total_bytes_ += queue_.back().size();
- accumulated_checksum_.reset();
+ buffer_source_.ResetAccumulatedChecksum();
CHECK_EQ(0u, buffer_source_.Available());
}
@@ -97,27 +99,26 @@
return bytes;
}
-std::vector<absl::Span<const uint8_t>> SnappyEncoder::queue() const {
- std::vector<absl::Span<const uint8_t>> queue;
- queue.reserve(queue_.size());
+absl::Span<const absl::Span<const uint8_t>> SnappyEncoder::queue() {
+ return_queue_.clear();
+ return_queue_.reserve(queue_.size());
for (const auto &buffer : queue_) {
- queue.emplace_back(buffer.data(), buffer.size());
+ return_queue_.emplace_back(buffer.data(), buffer.size());
}
- return queue;
+ return return_queue_;
+}
+
+SnappyEncoder::DetachedBufferSource::DetachedBufferSource(size_t buffer_size) {
+ data_.reserve(buffer_size);
}
size_t SnappyEncoder::DetachedBufferSource::Available() const {
- size_t total_available = 0;
- for (const flatbuffers::DetachedBuffer &buffer : buffers_) {
- total_available += buffer.size();
- }
- return total_available;
+ return data_.size() - index_into_first_buffer_;
}
const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) {
- *CHECK_NOTNULL(length) = buffers_[0].size() - index_into_first_buffer_;
- return reinterpret_cast<char *>(buffers_[0].data()) +
- index_into_first_buffer_;
+ *CHECK_NOTNULL(length) = data_.size() - index_into_first_buffer_;
+ return reinterpret_cast<char *>(data_.data()) + index_into_first_buffer_;
}
void SnappyEncoder::DetachedBufferSource::Skip(size_t n) {
@@ -125,20 +126,25 @@
return;
}
- CHECK(!buffers_.empty());
+ CHECK_NE(data_.size(), 0u);
index_into_first_buffer_ += n;
- CHECK_LE(index_into_first_buffer_, buffers_[0].size())
+ CHECK_LE(index_into_first_buffer_, data_.size())
<< ": " << n << " is too large a skip.";
- if (index_into_first_buffer_ == buffers_[0].size()) {
- buffers_.erase(buffers_.begin());
+ if (index_into_first_buffer_ == data_.size()) {
+ data_.resize(0u);
index_into_first_buffer_ = 0;
}
}
-void SnappyEncoder::DetachedBufferSource::AppendBuffer(
- flatbuffers::DetachedBuffer &&buffer) {
- buffers_.emplace_back(std::move(buffer));
+void SnappyEncoder::DetachedBufferSource::Append(Copier *copy) {
+ const size_t copy_size = copy->size();
+ CHECK_LE(copy_size + data_.size(), data_.capacity());
+ size_t starting_size = data_.size();
+ data_.resize(starting_size + copy_size);
+ CHECK_EQ(copy->Copy(data_.data() + starting_size), copy_size);
+ accumulated_checksum_ = AccumulateCrc32(
+ {data_.data() + starting_size, copy_size}, accumulated_checksum_);
}
size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) {