Encode flatbuffers directly into the encoder when logging
We were running out of memory when running for many hours. Initial
debugging looked like it was a heap fragmentation issue. Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory. The heap was growing though.
Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.
Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a04d0ea..120bd79 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -20,14 +20,16 @@
NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
const Node *logger_node,
std::function<void(NewDataWriter *)> reopen,
- std::function<void(NewDataWriter *)> close)
+ std::function<void(NewDataWriter *)> close,
+ size_t max_message_size)
: node_(node),
node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
logger_node_index_(
configuration::GetNodeIndex(log_namer->configuration_, logger_node)),
log_namer_(log_namer),
reopen_(std::move(reopen)),
- close_(std::move(close)) {
+ close_(std::move(close)),
+ max_message_size_(max_message_size) {
state_.resize(configuration::NodesCount(log_namer->configuration_));
CHECK_LT(node_index_, state_.size());
}
@@ -179,9 +181,9 @@
}
}
-void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
- const UUID &source_node_boot_uuid,
- aos::monotonic_clock::time_point now) {
+void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
+ const UUID &source_node_boot_uuid,
+ aos::monotonic_clock::time_point now) {
// Trigger a reboot if we detect the boot UUID change.
UpdateBoot(source_node_boot_uuid);
@@ -202,7 +204,7 @@
CHECK(writer);
CHECK(header_written_) << ": Attempting to write message before header to "
<< writer->filename();
- writer->QueueSizedFlatbuffer(fbb, now);
+ writer->CopyMessage(coppier, now);
}
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
@@ -228,6 +230,16 @@
CHECK_EQ(state_[node_index_].boot_uuid,
UUID::FromString(header.message().source_node_boot_uuid()));
if (!writer) {
+ // Since we haven't opened the first time, it's still not too late to update
+ // the max message size. Make sure the header fits.
+ //
+ // This won't work well on reboots, but the structure of the header is fixed
+ // by that point in time, so it's size is fixed too.
+ //
+ // Most of the time, the minimum buffer size inside the encoder of around
+ // 128k will make this a non-issue.
+ UpdateMaxMessageSize(header.span().size());
+
reopen_(this);
}
@@ -235,10 +247,8 @@
<< aos::FlatbufferToJson(
header, {.multi_line = false, .max_vector_size = 100});
- // TODO(austin): This triggers a dummy allocation that we don't need as part
- // of releasing. Can we skip it?
CHECK(writer);
- writer->QueueSizedFlatbuffer(header.Release());
+ writer->QueueSpan(header.span());
header_written_ = true;
monotonic_start_time_ = log_namer_->monotonic_start_time(
node_index_, state_[node_index_].boot_uuid);
@@ -593,9 +603,10 @@
base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
std::unique_ptr<DetachedBufferWriter> writer =
- std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
+ std::make_unique<DetachedBufferWriter>(
+ filename, encoder_factory_(header->span().size()));
- writer->QueueSizedFlatbuffer(header->Release());
+ writer->QueueSpan(header->span());
if (!writer->ran_out_of_space()) {
all_filenames_.emplace_back(
@@ -624,8 +635,10 @@
// generated if it is sendable on this node.
if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
if (!data_writer_) {
- OpenDataWriter();
+ MakeDataWriter();
}
+ data_writer_->UpdateMaxMessageSize(PackMessageSize(
+ LogType::kLogRemoteMessage, channel->max_size()));
return data_writer_.get();
}
@@ -647,9 +660,8 @@
[this, channel](NewDataWriter *data_writer) {
OpenWriter(channel, data_writer);
},
- [this](NewDataWriter *data_writer) {
- CloseWriter(&data_writer->writer);
- });
+ [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+ PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
return &(
data_writers_.emplace(channel, std::move(data_writer)).first->second);
}
@@ -672,9 +684,8 @@
[this, channel](NewDataWriter *data_writer) {
OpenForwardedTimestampWriter(channel, data_writer);
},
- [this](NewDataWriter *data_writer) {
- CloseWriter(&data_writer->writer);
- });
+ [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+ PackRemoteMessageSize());
return &(
data_writers_.emplace(channel, std::move(data_writer)).first->second);
}
@@ -690,8 +701,10 @@
}
if (!data_writer_) {
- OpenDataWriter();
+ MakeDataWriter();
}
+ data_writer_->UpdateMaxMessageSize(
+ PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
return data_writer_.get();
}
@@ -724,7 +737,8 @@
absl::StrCat("timestamps", channel->name()->string_view(), "/",
channel->type()->string_view(), ".part",
data_writer->parts_index(), ".bfbs", extension_);
- CreateBufferWriter(filename, &data_writer->writer);
+ CreateBufferWriter(filename, data_writer->max_message_size(),
+ &data_writer->writer);
}
void MultiNodeLogNamer::OpenWriter(const Channel *channel,
@@ -733,10 +747,11 @@
CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
channel->name()->string_view(), "/", channel->type()->string_view(),
".part", data_writer->parts_index(), ".bfbs", extension_);
- CreateBufferWriter(filename, &data_writer->writer);
+ CreateBufferWriter(filename, data_writer->max_message_size(),
+ &data_writer->writer);
}
-void MultiNodeLogNamer::OpenDataWriter() {
+void MultiNodeLogNamer::MakeDataWriter() {
if (!data_writer_) {
data_writer_ = std::make_unique<NewDataWriter>(
this, node_, node_,
@@ -747,16 +762,20 @@
}
absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
extension_);
- CreateBufferWriter(name, &writer->writer);
+ CreateBufferWriter(name, writer->max_message_size(), &writer->writer);
},
[this](NewDataWriter *data_writer) {
CloseWriter(&data_writer->writer);
- });
+ },
+ // Default size is 0 so it will be obvious if something doesn't fix it
+ // afterwards.
+ 0);
}
}
void MultiNodeLogNamer::CreateBufferWriter(
- std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
+ std::string_view path, size_t max_message_size,
+ std::unique_ptr<DetachedBufferWriter> *destination) {
if (ran_out_of_space_) {
// Refuse to open any new files, which might skip data. Any existing files
// are in the same folder, which means they're on the same filesystem, which
@@ -778,8 +797,8 @@
DetachedBufferWriter::already_out_of_space_t());
return;
}
- *destination =
- std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
+ *destination = std::make_unique<DetachedBufferWriter>(
+ filename, encoder_factory_(max_message_size));
if (!destination->get()->ran_out_of_space()) {
all_filenames_.emplace_back(path);
}
@@ -793,7 +812,8 @@
return;
}
- *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
+ *destination->get() =
+ DetachedBufferWriter(filename, encoder_factory_(max_message_size));
if (!destination->get()->ran_out_of_space()) {
all_filenames_.emplace_back(path);
}