Encode flatbuffers directly into the encoder when logging
We were running out of memory when running for many hours. Initial
debugging looked like it was a heap fragmentation issue. Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory. The heap was growing though.
Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.
Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 00ed63f..4e8c990 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -38,7 +38,16 @@
// close is called to close that file and extract any statistics.
NewDataWriter(LogNamer *log_namer, const Node *node, const Node *logger_node,
std::function<void(NewDataWriter *)> reopen,
- std::function<void(NewDataWriter *)> close);
+ std::function<void(NewDataWriter *)> close,
+ size_t max_message_size);
+
+ void UpdateMaxMessageSize(size_t new_size) {
+ if (new_size > max_message_size_) {
+ CHECK(!header_written_);
+ max_message_size_ = new_size;
+ }
+ }
+ size_t max_message_size() const { return max_message_size_; }
NewDataWriter(NewDataWriter &&other) = default;
aos::logger::NewDataWriter &operator=(NewDataWriter &&other) = default;
@@ -58,10 +67,10 @@
bool reliable,
monotonic_clock::time_point monotonic_timestamp_time =
monotonic_clock::min_time);
- // Queues up a message with the provided boot UUID.
- void QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
- const UUID &node_boot_uuid,
- aos::monotonic_clock::time_point now);
+ // Coppies a message with the provided boot UUID.
+ void CopyMessage(DataEncoder::Copier *coppier,
+ const UUID &source_node_boot_uuid,
+ aos::monotonic_clock::time_point now);
// Updates the current boot for the source node. This is useful when you want
// to queue a message that may trigger a reboot rotation, but then need to
@@ -148,6 +157,8 @@
bool header_written_ = false;
std::vector<State> state_;
+
+ size_t max_message_size_;
};
// Interface describing how to name, track, and add headers to log file parts.
@@ -313,11 +324,12 @@
temp_suffix_ = temp_suffix;
}
- // Sets the function for creating encoders.
+ // Sets the function for creating encoders. The argument is the max message
+ // size (including headers) that will be written into this encoder.
//
// Defaults to just creating DummyEncoders.
void set_encoder_factory(
- std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory) {
+ std::function<std::unique_ptr<DataEncoder>(size_t)> encoder_factory) {
encoder_factory_ = std::move(encoder_factory);
}
@@ -449,9 +461,9 @@
void OpenWriter(const Channel *channel, NewDataWriter *data_writer);
// Opens the main data writer file for this node responsible for data_writer_.
- void OpenDataWriter();
+ void MakeDataWriter();
- void CreateBufferWriter(std::string_view path,
+ void CreateBufferWriter(std::string_view path, size_t max_message_size,
std::unique_ptr<DetachedBufferWriter> *destination);
void RenameTempFile(DetachedBufferWriter *destination);
@@ -479,8 +491,10 @@
std::vector<std::string> all_filenames_;
std::string temp_suffix_;
- std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory_ =
- []() { return std::make_unique<DummyEncoder>(); };
+ std::function<std::unique_ptr<DataEncoder>(size_t)> encoder_factory_ =
+ [](size_t max_message_size) {
+ return std::make_unique<DummyEncoder>(max_message_size);
+ };
std::string extension_;
// Storage for statistics from previously-rotated DetachedBufferWriters.