Encode flatbuffers directly into the encoder when logging

We were running out of memory when running for many hours.  Initial
debugging looked like it was a heap fragmentation issue.  Tracking the
allocated memory using the malloc hooks wasn't showing any growth of
memory.  The heap was growing though.

Instead of allocating a FlatBufferBuilder/DetachedBuffer for each
message to be logged, we can instead have the BufferEncoder provide
memory to write to, and have it only alloate that buffer space once, and
allocate it to the maximum size that a writer might see.

Change-Id: I046bd2422aea368867b0c63cee7d04c6033fe724
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/buffer_encoder.cc b/aos/events/logging/buffer_encoder.cc
index 6ef61d4..ae20440 100644
--- a/aos/events/logging/buffer_encoder.cc
+++ b/aos/events/logging/buffer_encoder.cc
@@ -4,39 +4,60 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 
+#include "aos/flatbuffers.h"
 #include "glog/logging.h"
 
 namespace aos::logger {
 
-void DummyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
-  CHECK(in.data()) << ": Encode called with nullptr.";
+DummyEncoder::DummyEncoder(size_t max_buffer_size) {
+  // TODO(austin): This is going to end up writing > 128k chunks, not 128k
+  // chunks exactly.  If we really want to, we could make it always write 128k
+  // chunks by only exposing n * 128k chunks as we go.  This might improve write
+  // performance, then again, it might have no effect if the kernel is combining
+  // writes...
+  constexpr size_t kWritePageSize = 128 * 1024;
+  // Round up to the nearest page size.
+  input_buffer_.reserve(
+      ((max_buffer_size + kWritePageSize - 1) / kWritePageSize) *
+      kWritePageSize);
+  return_queue_.resize(1);
+}
 
-  total_bytes_ += in.size();
-  queue_.emplace_back(std::move(in));
+bool DummyEncoder::HasSpace(size_t request) const {
+  return request + input_buffer_.size() < input_buffer_.capacity();
+}
+
+void DummyEncoder::Encode(Copier *copy) {
+  DCHECK(HasSpace(copy->size()));
+  const size_t input_buffer_initial_size = input_buffer_.size();
+
+  input_buffer_.resize(input_buffer_initial_size + copy->size());
+  const size_t written_size =
+      copy->Copy(input_buffer_.data() + input_buffer_initial_size);
+  DCHECK_EQ(written_size, copy->size());
+
+  total_bytes_ += written_size;
 }
 
 void DummyEncoder::Clear(const int n) {
   CHECK_GE(n, 0);
   CHECK_LE(static_cast<size_t>(n), queue_size());
-  queue_.erase(queue_.begin(), queue_.begin() + n);
+  if (n != 0) {
+    input_buffer_.resize(0u);
+  }
 }
 
-std::vector<absl::Span<const uint8_t>> DummyEncoder::queue() const {
-  std::vector<absl::Span<const uint8_t>> queue;
-  queue.reserve(queue_.size());
-  for (const auto &buffer : queue_) {
-    queue.emplace_back(buffer.data(), buffer.size());
+absl::Span<const absl::Span<const uint8_t>> DummyEncoder::queue() {
+  if (input_buffer_.size() != 0) {
+    return_queue_[0] =
+        absl::Span<const uint8_t>(input_buffer_.data(), input_buffer_.size());
+    return return_queue_;
+  } else {
+    return absl::Span<const absl::Span<const uint8_t>>();
   }
-  return queue;
 }
 
-size_t DummyEncoder::queued_bytes() const {
-  size_t bytes = 0;
-  for (const auto &buffer : queue_) {
-    bytes += buffer.size();
-  }
-  return bytes;
-}
+size_t DummyEncoder::queued_bytes() const { return input_buffer_.size(); }
 
 DummyDecoder::DummyDecoder(std::string_view filename)
     : filename_(filename), fd_(open(filename_.c_str(), O_RDONLY | O_CLOEXEC)) {