Encode flatbuffers directly into the encoder when logging
We were running out of memory when running for many hours. Initial
debugging looked like it was a heap fragmentation issue. Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory. The heap was growing though.
Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.
Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index 5cb9897..f583818 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -50,7 +50,8 @@
} // namespace
-LzmaEncoder::LzmaEncoder(const uint32_t compression_preset, size_t block_size)
+LzmaEncoder::LzmaEncoder(size_t max_message_size,
+ const uint32_t compression_preset, size_t block_size)
: stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
CHECK_GE(compression_preset_, 0u)
<< ": Compression preset must be in the range [0, 9].";
@@ -78,15 +79,26 @@
stream_.avail_out = 0;
VLOG(2) << "LzmaEncoder: Initialization succeeded.";
+
+ // TODO(austin): We don't write the biggest messages very often. Is it more
+ // efficient to allocate if we go over a threshold to keep the static memory
+ // in use smaller, or just allocate the worst case like we are doing here?
+ input_buffer_.resize(max_message_size);
}
LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
-void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
- CHECK(in.data()) << ": Encode called with nullptr.";
+void LzmaEncoder::Encode(Copier *copy) {
+ const size_t copy_size = copy->size();
+ // LZMA compresses the data as it goes along, copying the compressed results
+ // into another buffer. So, there's no need to store more than one message
+ // since lzma is going to take it from here.
+ CHECK_LE(copy_size, input_buffer_.size());
- stream_.next_in = in.data();
- stream_.avail_in = in.size();
+ CHECK_EQ(copy->Copy(input_buffer_.data()), copy_size);
+
+ stream_.next_in = input_buffer_.data();
+ stream_.avail_in = copy_size;
RunLzmaCode(LZMA_RUN);
}
@@ -103,20 +115,21 @@
}
}
-std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
- std::vector<absl::Span<const uint8_t>> queue;
+absl::Span<const absl::Span<const uint8_t>> LzmaEncoder::queue() {
+ return_queue_.clear();
if (queue_.empty()) {
- return queue;
+ return return_queue_;
}
+ return_queue_.reserve(queue_.size());
for (size_t i = 0; i < queue_.size() - 1; ++i) {
- queue.emplace_back(
+ return_queue_.emplace_back(
absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
}
// For the last buffer in the queue, we must account for the possibility that
// the buffer isn't full yet.
- queue.emplace_back(absl::MakeConstSpan(
+ return_queue_.emplace_back(absl::MakeConstSpan(
queue_.back().data(), queue_.back().size() - stream_.avail_out));
- return queue;
+ return return_queue_;
}
size_t LzmaEncoder::queued_bytes() const {