Encode flatbuffers directly into the encoder when logging

We were running out of memory when running for many hours.  Initial
debugging looked like it was a heap fragmentation issue.  Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory.  The heap was growing though.

Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.

Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
index 235a49c..4cd661d 100644
--- a/aos/events/logging/buffer_encoder.h
+++ b/aos/events/logging/buffer_encoder.h
@@ -2,27 +2,57 @@
 #define AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
 
 #include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
 #include "aos/events/logging/logger_generated.h"
 #include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
 
 namespace aos::logger {
 
-// Interface to encode DetachedBuffers as they are written to a file.
-//
-// The interface to a file is represented as a series of buffers, appropriate to
-// pass to writev(2). The backing storage for these is managed internally so
-// subclasses can handle resizing/moving as desired. Implementations must
-// enqueue as many of these backing buffers as demanded, leaving it up to the
-// caller to write them to a file when desired.
-class DetachedBufferEncoder {
+// Interface to encode data as it is written to a file.
+class DataEncoder {
  public:
-  virtual ~DetachedBufferEncoder() = default;
+  virtual ~DataEncoder() = default;
 
-  // Encodes and enqueues the given DetachedBuffer.
-  //
-  // Note that in may be moved-from, or it may be left unchanged, depending on
-  // what makes the most sense for a given implementation.
-  virtual void Encode(flatbuffers::DetachedBuffer &&in) = 0;
+  // Interface to copy data into a buffer.
+  class Copier {
+   public:
+    Copier(size_t size) : size_(size) {}
+
+    // Returns the data this will write.
+    size_t size() const { return size_; }
+
+    // Writes size() bytes to data, and returns the data written.
+    [[nodiscard]] virtual size_t Copy(uint8_t *data) = 0;
+
+   private:
+    size_t size_;
+  };
+
+  // Coppies a span.  The span must have a longer lifetime than the coppier is
+  // being used.
+  class SpanCopier : public Copier {
+   public:
+    SpanCopier(absl::Span<const uint8_t> data)
+        : Copier(data.size()), data_(data) {
+      CHECK(data_.data());
+    }
+
+    size_t Copy(uint8_t *data) final {
+      std::memcpy(data, data_.data(), data_.size());
+      return data_.size();
+    }
+
+   private:
+    const absl::Span<const uint8_t> data_;
+  };
+
+  // Returns true if there is space in the buffer for the next request, or if
+  // the output needs to be flushed.
+  virtual bool HasSpace(size_t request) const = 0;
+
+  // Encodes and enqueues the given data encoder.
+  virtual void Encode(Copier *copy) = 0;
 
   // If this returns true, the encoder may be bypassed by writing directly to
   // the file.
@@ -37,8 +67,9 @@
   // Clears the first n encoded buffers from the queue.
   virtual void Clear(int n) = 0;
 
-  // Returns a view of the queue of encoded buffers.
-  virtual std::vector<absl::Span<const uint8_t>> queue() const = 0;
+  // Returns a view of the queue of encoded buffers.  Valid until any other
+  // method on this class is called.
+  virtual absl::Span<const absl::Span<const uint8_t>> queue() = 0;
 
   // Returns the total number of of bytes currently queued up.
   virtual size_t queued_bytes() const = 0;
@@ -53,28 +84,32 @@
 
 // This class does not encode the data. It just claims ownership of the raw data
 // and queues it up as is.
-class DummyEncoder final : public DetachedBufferEncoder {
+class DummyEncoder final : public DataEncoder {
  public:
-  DummyEncoder() {}
+  DummyEncoder(size_t max_buffer_size);
   DummyEncoder(const DummyEncoder &) = delete;
   DummyEncoder(DummyEncoder &&other) = delete;
   DummyEncoder &operator=(const DummyEncoder &) = delete;
   DummyEncoder &operator=(DummyEncoder &&other) = delete;
   ~DummyEncoder() override = default;
 
-  // No encoding happens, the raw data is queued up as is.
-  void Encode(flatbuffers::DetachedBuffer &&in) final;
+  bool HasSpace(size_t request) const final;
+  void Encode(Copier *copy) final;
   bool may_bypass() const final { return true; }
   void Finish() final {}
   void Clear(int n) final;
-  std::vector<absl::Span<const uint8_t>> queue() const final;
+  absl::Span<const absl::Span<const uint8_t>> queue() final;
   size_t queued_bytes() const final;
   size_t total_bytes() const final { return total_bytes_; }
-  size_t queue_size() const final { return queue_.size(); }
+  size_t queue_size() const final {
+    return input_buffer_.size() != 0 ? 1u : 0u;
+  }
 
  private:
-  std::vector<flatbuffers::DetachedBuffer> queue_;
   size_t total_bytes_ = 0;
+
+  ResizeableBuffer input_buffer_;
+  std::vector<absl::Span<const uint8_t>> return_queue_;
 };
 
 // Interface to decode chunks of data. Implementations of this interface will