Encode flatbuffers directly into the encoder when logging

We were running out of memory when running for many hours.  Initial
debugging looked like it was a heap fragmentation issue.  Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory.  The heap was growing though.

Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.

Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 36116a3..ab7d3c6 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -759,6 +759,64 @@
   }
 }
 
+// Class to copy a context into the provided buffer.
+class ContextDataCopier : public DataEncoder::Copier {
+ public:
+  ContextDataCopier(const Context &context, int channel_index,
+                     LogType log_type, EventLoop *event_loop)
+      : DataEncoder::Copier(PackMessageSize(log_type, context.size)),
+        context_(context),
+        channel_index_(channel_index),
+        log_type_(log_type),
+        event_loop_(event_loop) {}
+
+  monotonic_clock::time_point end_time() const { return end_time_; }
+
+  size_t Copy(uint8_t *data) final {
+    size_t result =
+        PackMessageInline(data, context_, channel_index_, log_type_);
+    end_time_ = event_loop_->monotonic_now();
+    return result;
+  }
+
+ private:
+  const Context &context_;
+  const int channel_index_;
+  const LogType log_type_;
+  EventLoop *event_loop_;
+  monotonic_clock::time_point end_time_;
+};
+
+// Class to copy a RemoteMessage into the provided buffer.
+class RemoteMessageCopier : public DataEncoder::Copier {
+ public:
+  RemoteMessageCopier(
+      const message_bridge::RemoteMessage *message, int channel_index,
+      aos::monotonic_clock::time_point monotonic_timestamp_time,
+      EventLoop *event_loop)
+      : DataEncoder::Copier(PackRemoteMessageSize()),
+        message_(message),
+        channel_index_(channel_index),
+        monotonic_timestamp_time_(monotonic_timestamp_time),
+        event_loop_(event_loop) {}
+
+  monotonic_clock::time_point end_time() const { return end_time_; }
+
+  size_t Copy(uint8_t *data) final {
+    size_t result = PackRemoteMessageInline(data, message_, channel_index_,
+                                            monotonic_timestamp_time_);
+    end_time_ = event_loop_->monotonic_now();
+    return result;
+  }
+
+ private:
+  const message_bridge::RemoteMessage *message_;
+  int channel_index_;
+  aos::monotonic_clock::time_point monotonic_timestamp_time_;
+  EventLoop *event_loop_;
+  monotonic_clock::time_point end_time_;
+};
+
 void Logger::WriteData(NewDataWriter *writer, const FetcherStruct &f) {
   if (writer != nullptr) {
     const UUID source_node_boot_uuid =
@@ -767,25 +825,17 @@
             : event_loop_->boot_uuid();
     // Write!
     const auto start = event_loop_->monotonic_now();
-    flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
-                                       max_header_size_);
-    fbb.ForceDefaults(true);
 
-    fbb.FinishSizePrefixed(
-        PackMessage(&fbb, f.fetcher->context(), f.channel_index, f.log_type));
-    const auto end = event_loop_->monotonic_now();
-    RecordCreateMessageTime(start, end, f);
+    ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
+                               f.log_type, event_loop_);
 
-    max_header_size_ =
-        std::max(max_header_size_, fbb.GetSize() - f.fetcher->context().size);
-    writer->QueueMessage(&fbb, source_node_boot_uuid, end);
+    writer->CopyMessage(&coppier, source_node_boot_uuid, start);
+    RecordCreateMessageTime(start, coppier.end_time(), f);
 
     VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
             << " for channel "
             << configuration::CleanedChannelToString(f.fetcher->channel())
-            << " to " << writer->filename() << " data "
-            << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                   fbb.GetBufferPointer()));
+            << " to " << writer->filename();
   }
 }
 
@@ -793,29 +843,24 @@
                              const FetcherStruct &f) {
   if (timestamp_writer != nullptr) {
     // And now handle timestamps.
-    const auto start = event_loop_->monotonic_now();
-    flatbuffers::FlatBufferBuilder fbb;
-    fbb.ForceDefaults(true);
-
-    fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                       f.channel_index,
-                                       LogType::kLogDeliveryTimeOnly));
-    const auto end = event_loop_->monotonic_now();
-    RecordCreateMessageTime(start, end, f);
 
     // Tell our writer that we know something about the remote boot.
     timestamp_writer->UpdateRemote(
         f.data_node_index, f.fetcher->context().source_boot_uuid,
         f.fetcher->context().monotonic_remote_time,
         f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
-    timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
+
+    const auto start = event_loop_->monotonic_now();
+    ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
+                               LogType::kLogDeliveryTimeOnly, event_loop_);
+
+    timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start);
+    RecordCreateMessageTime(start, coppier.end_time(), f);
 
     VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
             << " for channel "
             << configuration::CleanedChannelToString(f.fetcher->channel())
-            << " to " << timestamp_writer->filename() << " timestamp "
-            << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                   fbb.GetBufferPointer()));
+            << " to " << timestamp_writer->filename() << " timestamp";
   }
 }
 
@@ -825,20 +870,10 @@
     const auto start = event_loop_->monotonic_now();
     // And now handle the special message contents channel.  Copy the
     // message into a FlatBufferBuilder and save it to disk.
-    //
-    // TODO(austin): We can be more efficient here when we start to
-    // care...
-    flatbuffers::FlatBufferBuilder fbb;
-    fbb.ForceDefaults(true);
-
     const RemoteMessage *msg =
         flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
 
     CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
-    // TODO(austin): This needs to check the channel_index and confirm
-    // that it should be logged before squirreling away the timestamp to
-    // disk.  We don't want to log irrelevant timestamps.
-
     // Translate from the channel index that the event loop uses to the
     // channel index in the log file.
     const int channel_index =
@@ -847,11 +882,6 @@
     const aos::monotonic_clock::time_point monotonic_timestamp_time =
         f.fetcher->context().monotonic_event_time;
 
-    fbb.FinishSizePrefixed(
-        PackRemoteMessage(&fbb, msg, channel_index, monotonic_timestamp_time));
-    const auto end = event_loop_->monotonic_now();
-    RecordCreateMessageTime(start, end, f);
-
     // Timestamps tell us information about what happened too!
     // Capture any reboots so UpdateRemote is properly recorded.
     contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
@@ -871,8 +901,13 @@
             chrono::nanoseconds(msg->monotonic_sent_time())),
         reliable, monotonic_timestamp_time);
 
-    contents_writer->QueueMessage(&fbb, UUID::FromVector(msg->boot_uuid()),
-                                  end);
+    RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
+                                 event_loop_);
+
+    contents_writer->CopyMessage(&coppier, UUID::FromVector(msg->boot_uuid()),
+                                 start);
+
+    RecordCreateMessageTime(start, coppier.end_time(), f);
   }
 }