Encode flatbuffers directly into the encoder when logging
We were running out of memory when running for many hours. Initial
debugging looked like it was a heap fragmentation issue. Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory. The heap was growing though.
Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.
Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 7f32f44..4c06f0a 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -29,7 +29,7 @@
#include "aos/events/logging/lzma_encoder.h"
#endif
-DEFINE_int32(flush_size, 128000,
+DEFINE_int32(flush_size, 128 * 1024,
"Number of outstanding bytes to allow before flushing to disk.");
DEFINE_double(
flush_period, 5.0,
@@ -76,13 +76,13 @@
}
} // namespace
-DetachedBufferWriter::DetachedBufferWriter(
- std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
+ std::unique_ptr<DataEncoder> encoder)
: filename_(filename), encoder_(std::move(encoder)) {
if (!util::MkdirPIfSpace(filename, 0777)) {
ran_out_of_space_ = true;
} else {
- fd_ = open(std::string(filename).c_str(),
+ fd_ = open(filename_.c_str(),
O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
if (fd_ == -1 && errno == ENOSPC) {
ran_out_of_space_ = true;
@@ -136,29 +136,27 @@
return;
}
- aos::monotonic_clock::time_point now;
- if (encoder_->may_bypass() && span.size() > 4096u) {
- // Over this threshold, we'll assume it's cheaper to add an extra
- // syscall to write the data immediately instead of copying it to
- // enqueue.
+ if (!encoder_->HasSpace(span.size())) {
+ Flush();
+ CHECK(encoder_->HasSpace(span.size()));
+ }
+ DataEncoder::SpanCopier coppier(span);
+ encoder_->Encode(&coppier);
+ FlushAtThreshold(aos::monotonic_clock::now());
+}
- // First, flush everything.
- while (encoder_->queue_size() > 0u) {
- Flush();
- }
-
- // Then, write it directly.
- const auto start = aos::monotonic_clock::now();
- const ssize_t written = write(fd_, span.data(), span.size());
- const auto end = aos::monotonic_clock::now();
- HandleWriteReturn(written, span.size());
- UpdateStatsForWrite(end - start, written, 1);
- now = end;
- } else {
- encoder_->Encode(CopySpanAsDetachedBuffer(span));
- now = aos::monotonic_clock::now();
+void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *coppier,
+ aos::monotonic_clock::time_point now) {
+ if (ran_out_of_space_) {
+ return;
}
+ if (!encoder_->HasSpace(coppier->size())) {
+ Flush();
+ CHECK(encoder_->HasSpace(coppier->size()));
+ }
+
+ encoder_->Encode(coppier);
FlushAtThreshold(now);
}
@@ -575,11 +573,11 @@
}
flatbuffers::uoffset_t PackMessageSize(LogType log_type,
- const Context &context) {
+ size_t data_size) {
static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
"Update size logic please.");
const flatbuffers::uoffset_t aligned_data_length =
- ((context.size + 7) & 0xfffffff8u);
+ ((data_size + 7) & 0xfffffff8u);
switch (log_type) {
case LogType::kLogDeliveryTimeOnly:
return PackMessageHeaderSize(log_type);
@@ -596,8 +594,11 @@
size_t PackMessageInline(uint8_t *buffer, const Context &context,
int channel_index, LogType log_type) {
+ // TODO(austin): Figure out how to copy directly from shared memory instead of
+ // first into the fetcher's memory and then into here. That would save a lot
+ // of memory.
const flatbuffers::uoffset_t message_size =
- PackMessageSize(log_type, context);
+ PackMessageSize(log_type, context.size);
buffer = Push<flatbuffers::uoffset_t>(
buffer, message_size - sizeof(flatbuffers::uoffset_t));