Merge "Encode flatbuffers directly into the encoder when logging"
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 3b82e98..82e7061 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -117,6 +117,8 @@
     deps = [
         ":logger_fbs",
         "//aos:configuration_fbs",
+        "//aos:flatbuffers",
+        "//aos/containers:resizeable_buffer",
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_github_google_glog//:glog",
         "@com_google_absl//absl/types:span",
@@ -397,6 +399,11 @@
     srcs = [
         "logger_main.cc",
     ],
+    copts = select({
+        "//tools:cpu_k8": ["-DLZMA=1"],
+        "//tools:cpu_arm64": ["-DLZMA=1"],
+        "//conditions:default": [],
+    }),
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
diff --git a/aos/events/logging/buffer_encoder.cc b/aos/events/logging/buffer_encoder.cc
index 6ef61d4..ae20440 100644
--- a/aos/events/logging/buffer_encoder.cc
+++ b/aos/events/logging/buffer_encoder.cc
@@ -4,39 +4,60 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 
+#include "aos/flatbuffers.h"
 #include "glog/logging.h"
 
 namespace aos::logger {
 
-void DummyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
-  CHECK(in.data()) << ": Encode called with nullptr.";
+DummyEncoder::DummyEncoder(size_t max_buffer_size) {
+  // TODO(austin): This is going to end up writing > 128k chunks, not 128k
+  // chunks exactly.  If we really want to, we could make it always write 128k
+  // chunks by only exposing n * 128k chunks as we go.  This might improve write
+  // performance, then again, it might have no effect if the kernel is combining
+  // writes...
+  constexpr size_t kWritePageSize = 128 * 1024;
+  // Round up to the nearest page size.
+  input_buffer_.reserve(
+      ((max_buffer_size + kWritePageSize - 1) / kWritePageSize) *
+      kWritePageSize);
+  return_queue_.resize(1);
+}
 
-  total_bytes_ += in.size();
-  queue_.emplace_back(std::move(in));
+bool DummyEncoder::HasSpace(size_t request) const {
+  return request + input_buffer_.size() < input_buffer_.capacity();
+}
+
+void DummyEncoder::Encode(Copier *copy) {
+  DCHECK(HasSpace(copy->size()));
+  const size_t input_buffer_initial_size = input_buffer_.size();
+
+  input_buffer_.resize(input_buffer_initial_size + copy->size());
+  const size_t written_size =
+      copy->Copy(input_buffer_.data() + input_buffer_initial_size);
+  DCHECK_EQ(written_size, copy->size());
+
+  total_bytes_ += written_size;
 }
 
 void DummyEncoder::Clear(const int n) {
   CHECK_GE(n, 0);
   CHECK_LE(static_cast<size_t>(n), queue_size());
-  queue_.erase(queue_.begin(), queue_.begin() + n);
+  if (n != 0) {
+    input_buffer_.resize(0u);
+  }
 }
 
-std::vector<absl::Span<const uint8_t>> DummyEncoder::queue() const {
-  std::vector<absl::Span<const uint8_t>> queue;
-  queue.reserve(queue_.size());
-  for (const auto &buffer : queue_) {
-    queue.emplace_back(buffer.data(), buffer.size());
+absl::Span<const absl::Span<const uint8_t>> DummyEncoder::queue() {
+  if (input_buffer_.size() != 0) {
+    return_queue_[0] =
+        absl::Span<const uint8_t>(input_buffer_.data(), input_buffer_.size());
+    return return_queue_;
+  } else {
+    return absl::Span<const absl::Span<const uint8_t>>();
   }
-  return queue;
 }
 
-size_t DummyEncoder::queued_bytes() const {
-  size_t bytes = 0;
-  for (const auto &buffer : queue_) {
-    bytes += buffer.size();
-  }
-  return bytes;
-}
+size_t DummyEncoder::queued_bytes() const { return input_buffer_.size(); }
 
 DummyDecoder::DummyDecoder(std::string_view filename)
     : filename_(filename), fd_(open(filename_.c_str(), O_RDONLY | O_CLOEXEC)) {
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
index 235a49c..4cd661d 100644
--- a/aos/events/logging/buffer_encoder.h
+++ b/aos/events/logging/buffer_encoder.h
@@ -2,27 +2,57 @@
 #define AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
 
 #include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
 #include "aos/events/logging/logger_generated.h"
 #include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
 
 namespace aos::logger {
 
-// Interface to encode DetachedBuffers as they are written to a file.
-//
-// The interface to a file is represented as a series of buffers, appropriate to
-// pass to writev(2). The backing storage for these is managed internally so
-// subclasses can handle resizing/moving as desired. Implementations must
-// enqueue as many of these backing buffers as demanded, leaving it up to the
-// caller to write them to a file when desired.
-class DetachedBufferEncoder {
+// Interface to encode data as it is written to a file.
+class DataEncoder {
  public:
-  virtual ~DetachedBufferEncoder() = default;
+  virtual ~DataEncoder() = default;
 
-  // Encodes and enqueues the given DetachedBuffer.
-  //
-  // Note that in may be moved-from, or it may be left unchanged, depending on
-  // what makes the most sense for a given implementation.
-  virtual void Encode(flatbuffers::DetachedBuffer &&in) = 0;
+  // Interface to copy data into a buffer.
+  class Copier {
+   public:
+    Copier(size_t size) : size_(size) {}
+
+    // Returns the data this will write.
+    size_t size() const { return size_; }
+
+    // Writes size() bytes to data, and returns the data written.
+    [[nodiscard]] virtual size_t Copy(uint8_t *data) = 0;
+
+   private:
+    size_t size_;
+  };
+
+  // Coppies a span.  The span must have a longer lifetime than the coppier is
+  // being used.
+  class SpanCopier : public Copier {
+   public:
+    SpanCopier(absl::Span<const uint8_t> data)
+        : Copier(data.size()), data_(data) {
+      CHECK(data_.data());
+    }
+
+    size_t Copy(uint8_t *data) final {
+      std::memcpy(data, data_.data(), data_.size());
+      return data_.size();
+    }
+
+   private:
+    const absl::Span<const uint8_t> data_;
+  };
+
+  // Returns true if there is space in the buffer for the next request, or if
+  // the output needs to be flushed.
+  virtual bool HasSpace(size_t request) const = 0;
+
+  // Encodes and enqueues the given data encoder.
+  virtual void Encode(Copier *copy) = 0;
 
   // If this returns true, the encoder may be bypassed by writing directly to
   // the file.
@@ -37,8 +67,9 @@
   // Clears the first n encoded buffers from the queue.
   virtual void Clear(int n) = 0;
 
-  // Returns a view of the queue of encoded buffers.
-  virtual std::vector<absl::Span<const uint8_t>> queue() const = 0;
+  // Returns a view of the queue of encoded buffers.  Valid until any other
+  // method on this class is called.
+  virtual absl::Span<const absl::Span<const uint8_t>> queue() = 0;
 
   // Returns the total number of of bytes currently queued up.
   virtual size_t queued_bytes() const = 0;
@@ -53,28 +84,32 @@
 
 // This class does not encode the data. It just claims ownership of the raw data
 // and queues it up as is.
-class DummyEncoder final : public DetachedBufferEncoder {
+class DummyEncoder final : public DataEncoder {
  public:
-  DummyEncoder() {}
+  DummyEncoder(size_t max_buffer_size);
   DummyEncoder(const DummyEncoder &) = delete;
   DummyEncoder(DummyEncoder &&other) = delete;
   DummyEncoder &operator=(const DummyEncoder &) = delete;
   DummyEncoder &operator=(DummyEncoder &&other) = delete;
   ~DummyEncoder() override = default;
 
-  // No encoding happens, the raw data is queued up as is.
-  void Encode(flatbuffers::DetachedBuffer &&in) final;
+  bool HasSpace(size_t request) const final;
+  void Encode(Copier *copy) final;
   bool may_bypass() const final { return true; }
   void Finish() final {}
   void Clear(int n) final;
-  std::vector<absl::Span<const uint8_t>> queue() const final;
+  absl::Span<const absl::Span<const uint8_t>> queue() final;
   size_t queued_bytes() const final;
   size_t total_bytes() const final { return total_bytes_; }
-  size_t queue_size() const final { return queue_.size(); }
+  size_t queue_size() const final {
+    return input_buffer_.size() != 0 ? 1u : 0u;
+  }
 
  private:
-  std::vector<flatbuffers::DetachedBuffer> queue_;
   size_t total_bytes_ = 0;
+
+  ResizeableBuffer input_buffer_;
+  std::vector<absl::Span<const uint8_t>> return_queue_;
 };
 
 // Interface to decode chunks of data. Implementations of this interface will
diff --git a/aos/events/logging/buffer_encoder_param_test.h b/aos/events/logging/buffer_encoder_param_test.h
index 871e25e..e6c4867 100644
--- a/aos/events/logging/buffer_encoder_param_test.h
+++ b/aos/events/logging/buffer_encoder_param_test.h
@@ -14,24 +14,55 @@
 
 namespace aos::logger::testing {
 
-// Contains some helpful infrastructure for testing DetachedBufferEncoder
+// Contains some helpful infrastructure for testing DataEncoder
 // implementations.
 class BufferEncoderBaseTest : public ::testing::Test {
  public:
+  static constexpr size_t kMaxMessageSize = 2 * 1024 * 1024;
+
+  class DetachedBufferCopier : public DataEncoder::Copier {
+   public:
+    DetachedBufferCopier(flatbuffers::DetachedBuffer &&data)
+        : DataEncoder::Copier(data.size()), data_(std::move(data)) {}
+
+    size_t Copy(uint8_t *data) final {
+      std::memcpy(data, data_.data(), data_.size());
+      return data_.size();
+    }
+
+   private:
+    const flatbuffers::DetachedBuffer data_;
+  };
+
   // Creates and encodes n random buffers, returning a copy of the encoded data.
-  std::vector<std::vector<uint8_t>> CreateAndEncode(
-      int n, DetachedBufferEncoder *encoder) {
+  std::vector<std::vector<uint8_t>> CreateAndEncode(int n,
+                                                    DataEncoder *encoder) {
     std::vector<std::vector<uint8_t>> result;
     for (int i = 0; i < n; i++) {
       flatbuffers::DetachedBuffer buffer = CreateRandomBuffer();
       result.emplace_back(buffer.data(), buffer.data() + buffer.size());
-      encoder->Encode(std::move(buffer));
+      LOG(INFO) << "Encoding " << buffer.size();
+      CHECK(encoder->HasSpace(buffer.size()))
+          << ": The test isn't smart enough to flush, figure out what to do. "
+             "Has "
+          << encoder->queued_bytes() << ", encoding " << buffer.size();
+
+      DetachedBufferCopier coppier(std::move(buffer));
+      encoder->Encode(&coppier);
     }
     return result;
   }
 
   // Returns the total size of a vector full of objects with a size() method.
   template <typename T>
+  static size_t TotalSize(const absl::Span<T> buffers) {
+    size_t result = 0;
+    for (const auto &v : buffers) {
+      result += v.size();
+    }
+    return result;
+  }
+  template <typename T>
   static size_t TotalSize(const std::vector<T> &buffers) {
     size_t result = 0;
     for (const auto &v : buffers) {
@@ -86,7 +117,7 @@
       std::mt19937(::aos::testing::RandomSeed())};
 };
 
-// Tests some generic properties for any DetachedBufferEncoder+DataDecoder
+// Tests some generic properties for any DataEncoder+DataDecoder
 // implementation pair.
 //
 // First and second test parameters are methods to create instances of the
@@ -95,13 +126,13 @@
 class BufferEncoderTest
     : public BufferEncoderBaseTest,
       public ::testing::WithParamInterface<std::tuple<
-          std::function<std::unique_ptr<DetachedBufferEncoder>()>,
+          std::function<std::unique_ptr<DataEncoder>(size_t)>,
           std::function<std::unique_ptr<DataDecoder>(std::string_view)>, int>> {
  public:
   BufferEncoderTest() { Reseed(std::get<2>(GetParam())); }
 
-  std::unique_ptr<DetachedBufferEncoder> MakeEncoder() const {
-    return std::get<0>(GetParam())();
+  std::unique_ptr<DataEncoder> MakeEncoder() const {
+    return std::get<0>(GetParam())(BufferEncoderBaseTest::kMaxMessageSize);
   }
   std::unique_ptr<DataDecoder> MakeDecoder(std::string_view filename) const {
     return std::get<1>(GetParam())(filename);
diff --git a/aos/events/logging/buffer_encoder_test.cc b/aos/events/logging/buffer_encoder_test.cc
index 5a9b85f..8e12e37 100644
--- a/aos/events/logging/buffer_encoder_test.cc
+++ b/aos/events/logging/buffer_encoder_test.cc
@@ -13,13 +13,26 @@
 
 class DummyEncoderTest : public BufferEncoderBaseTest {};
 
-// Tests that buffers are enqueued without any changes.
+// Tests that buffers are concatenated without being modified.
 TEST_F(DummyEncoderTest, QueuesBuffersAsIs) {
-  DummyEncoder encoder;
+  DummyEncoder encoder(BufferEncoderBaseTest::kMaxMessageSize);
   const auto expected = CreateAndEncode(100, &encoder);
+  std::vector<uint8_t> data = Flatten(expected);
 
   auto queue = encoder.queue();
-  EXPECT_THAT(queue, ::testing::ElementsAreArray(expected));
+  ASSERT_EQ(queue.size(), 1u);
+  EXPECT_EQ(queue[0], absl::Span<const uint8_t>(data));
+}
+
+// Tests that buffers are concatenated without being modified.
+TEST_F(DummyEncoderTest, CoppiesBuffersAsIs) {
+  DummyEncoder encoder(BufferEncoderBaseTest::kMaxMessageSize);
+  const auto expected = CreateAndEncode(100, &encoder);
+  std::vector<uint8_t> data = Flatten(expected);
+
+  auto queue = encoder.queue();
+  ASSERT_EQ(queue.size(), 1u);
+  EXPECT_EQ(queue[0], absl::Span<const uint8_t>(data));
 }
 
 // Checks that DummyDecoder can read into a buffer.
@@ -94,8 +107,8 @@
 
 INSTANTIATE_TEST_SUITE_P(
     Dummy, BufferEncoderTest,
-    ::testing::Combine(::testing::Values([]() {
-                         return std::make_unique<DummyEncoder>();
+    ::testing::Combine(::testing::Values([](size_t max_buffer_size) {
+                         return std::make_unique<DummyEncoder>(max_buffer_size);
                        }),
                        ::testing::Values([](std::string_view filename) {
                          return std::make_unique<DummyDecoder>(filename);
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index b0f8ba5..969d59c 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -18,6 +18,11 @@
     "If provided, this is the path to the JSON with the log file header.  If "
     "not provided, _header.json will be appended to --logfile.");
 
+DEFINE_int32(
+    max_message_size, 128 * 1024 * 1024,
+    "Max size of a message to be written.  This sets the buffers inside "
+    "the encoders.");
+
 int main(int argc, char **argv) {
   gflags::SetUsageMessage(R"(This tool lets us manipulate log files.)");
   aos::InitGoogle(&argc, &argv);
@@ -44,7 +49,8 @@
     CHECK(!span_reader.ReadMessage().empty()) << ": Empty header, aborting";
 
     aos::logger::DetachedBufferWriter buffer_writer(
-        FLAGS_logfile, std::make_unique<aos::logger::DummyEncoder>());
+        FLAGS_logfile,
+        std::make_unique<aos::logger::DummyEncoder>(FLAGS_max_message_size));
     buffer_writer.QueueSpan(header.span());
 
     while (true) {
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a04d0ea..120bd79 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -20,14 +20,16 @@
 NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
                              const Node *logger_node,
                              std::function<void(NewDataWriter *)> reopen,
-                             std::function<void(NewDataWriter *)> close)
+                             std::function<void(NewDataWriter *)> close,
+                             size_t max_message_size)
     : node_(node),
       node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
       logger_node_index_(
           configuration::GetNodeIndex(log_namer->configuration_, logger_node)),
       log_namer_(log_namer),
       reopen_(std::move(reopen)),
-      close_(std::move(close)) {
+      close_(std::move(close)),
+      max_message_size_(max_message_size) {
   state_.resize(configuration::NodesCount(log_namer->configuration_));
   CHECK_LT(node_index_, state_.size());
 }
@@ -179,9 +181,9 @@
   }
 }
 
-void NewDataWriter::QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
-                                 const UUID &source_node_boot_uuid,
-                                 aos::monotonic_clock::time_point now) {
+void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
+                                const UUID &source_node_boot_uuid,
+                                aos::monotonic_clock::time_point now) {
   // Trigger a reboot if we detect the boot UUID change.
   UpdateBoot(source_node_boot_uuid);
 
@@ -202,7 +204,7 @@
   CHECK(writer);
   CHECK(header_written_) << ": Attempting to write message before header to "
                          << writer->filename();
-  writer->QueueSizedFlatbuffer(fbb, now);
+  writer->CopyMessage(coppier, now);
 }
 
 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
@@ -228,6 +230,16 @@
   CHECK_EQ(state_[node_index_].boot_uuid,
            UUID::FromString(header.message().source_node_boot_uuid()));
   if (!writer) {
+    // Since we haven't opened the first time, it's still not too late to update
+    // the max message size.  Make sure the header fits.
+    //
+    // This won't work well on reboots, but the structure of the header is fixed
+    // by that point in time, so it's size is fixed too.
+    //
+    // Most of the time, the minimum buffer size inside the encoder of around
+    // 128k will make this a non-issue.
+    UpdateMaxMessageSize(header.span().size());
+
     reopen_(this);
   }
 
@@ -235,10 +247,8 @@
           << aos::FlatbufferToJson(
                  header, {.multi_line = false, .max_vector_size = 100});
 
-  // TODO(austin): This triggers a dummy allocation that we don't need as part
-  // of releasing.  Can we skip it?
   CHECK(writer);
-  writer->QueueSizedFlatbuffer(header.Release());
+  writer->QueueSpan(header.span());
   header_written_ = true;
   monotonic_start_time_ = log_namer_->monotonic_start_time(
       node_index_, state_[node_index_].boot_uuid);
@@ -593,9 +603,10 @@
       base_name_, separator, config_sha256, ".bfbs", extension_, temp_suffix_);
 
   std::unique_ptr<DetachedBufferWriter> writer =
-      std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
+      std::make_unique<DetachedBufferWriter>(
+          filename, encoder_factory_(header->span().size()));
 
-  writer->QueueSizedFlatbuffer(header->Release());
+  writer->QueueSpan(header->span());
 
   if (!writer->ran_out_of_space()) {
     all_filenames_.emplace_back(
@@ -624,8 +635,10 @@
   // generated if it is sendable on this node.
   if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
     if (!data_writer_) {
-      OpenDataWriter();
+      MakeDataWriter();
     }
+    data_writer_->UpdateMaxMessageSize(PackMessageSize(
+        LogType::kLogRemoteMessage, channel->max_size()));
     return data_writer_.get();
   }
 
@@ -647,9 +660,8 @@
       [this, channel](NewDataWriter *data_writer) {
         OpenWriter(channel, data_writer);
       },
-      [this](NewDataWriter *data_writer) {
-        CloseWriter(&data_writer->writer);
-      });
+      [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+      PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
   return &(
       data_writers_.emplace(channel, std::move(data_writer)).first->second);
 }
@@ -672,9 +684,8 @@
       [this, channel](NewDataWriter *data_writer) {
         OpenForwardedTimestampWriter(channel, data_writer);
       },
-      [this](NewDataWriter *data_writer) {
-        CloseWriter(&data_writer->writer);
-      });
+      [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+      PackRemoteMessageSize());
   return &(
       data_writers_.emplace(channel, std::move(data_writer)).first->second);
 }
@@ -690,8 +701,10 @@
   }
 
   if (!data_writer_) {
-    OpenDataWriter();
+    MakeDataWriter();
   }
+  data_writer_->UpdateMaxMessageSize(
+      PackMessageSize(LogType::kLogDeliveryTimeOnly, 0));
   return data_writer_.get();
 }
 
@@ -724,7 +737,8 @@
       absl::StrCat("timestamps", channel->name()->string_view(), "/",
                    channel->type()->string_view(), ".part",
                    data_writer->parts_index(), ".bfbs", extension_);
-  CreateBufferWriter(filename, &data_writer->writer);
+  CreateBufferWriter(filename, data_writer->max_message_size(),
+                     &data_writer->writer);
 }
 
 void MultiNodeLogNamer::OpenWriter(const Channel *channel,
@@ -733,10 +747,11 @@
       CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
       channel->name()->string_view(), "/", channel->type()->string_view(),
       ".part", data_writer->parts_index(), ".bfbs", extension_);
-  CreateBufferWriter(filename, &data_writer->writer);
+  CreateBufferWriter(filename, data_writer->max_message_size(),
+                     &data_writer->writer);
 }
 
-void MultiNodeLogNamer::OpenDataWriter() {
+void MultiNodeLogNamer::MakeDataWriter() {
   if (!data_writer_) {
     data_writer_ = std::make_unique<NewDataWriter>(
         this, node_, node_,
@@ -747,16 +762,20 @@
           }
           absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
                           extension_);
-          CreateBufferWriter(name, &writer->writer);
+          CreateBufferWriter(name, writer->max_message_size(), &writer->writer);
         },
         [this](NewDataWriter *data_writer) {
           CloseWriter(&data_writer->writer);
-        });
+        },
+        // Default size is 0 so it will be obvious if something doesn't fix it
+        // afterwards.
+        0);
   }
 }
 
 void MultiNodeLogNamer::CreateBufferWriter(
-    std::string_view path, std::unique_ptr<DetachedBufferWriter> *destination) {
+    std::string_view path, size_t max_message_size,
+    std::unique_ptr<DetachedBufferWriter> *destination) {
   if (ran_out_of_space_) {
     // Refuse to open any new files, which might skip data. Any existing files
     // are in the same folder, which means they're on the same filesystem, which
@@ -778,8 +797,8 @@
           DetachedBufferWriter::already_out_of_space_t());
       return;
     }
-    *destination =
-        std::make_unique<DetachedBufferWriter>(filename, encoder_factory_());
+    *destination = std::make_unique<DetachedBufferWriter>(
+        filename, encoder_factory_(max_message_size));
     if (!destination->get()->ran_out_of_space()) {
       all_filenames_.emplace_back(path);
     }
@@ -793,7 +812,8 @@
     return;
   }
 
-  *destination->get() = DetachedBufferWriter(filename, encoder_factory_());
+  *destination->get() =
+      DetachedBufferWriter(filename, encoder_factory_(max_message_size));
   if (!destination->get()->ran_out_of_space()) {
     all_filenames_.emplace_back(path);
   }
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 00ed63f..4e8c990 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -38,7 +38,16 @@
   // close is called to close that file and extract any statistics.
   NewDataWriter(LogNamer *log_namer, const Node *node, const Node *logger_node,
                 std::function<void(NewDataWriter *)> reopen,
-                std::function<void(NewDataWriter *)> close);
+                std::function<void(NewDataWriter *)> close,
+                size_t max_message_size);
+
+  void UpdateMaxMessageSize(size_t new_size) {
+    if (new_size > max_message_size_) {
+      CHECK(!header_written_);
+      max_message_size_ = new_size;
+    }
+  }
+  size_t max_message_size() const { return max_message_size_; }
 
   NewDataWriter(NewDataWriter &&other) = default;
   aos::logger::NewDataWriter &operator=(NewDataWriter &&other) = default;
@@ -58,10 +67,10 @@
                     bool reliable,
                     monotonic_clock::time_point monotonic_timestamp_time =
                         monotonic_clock::min_time);
-  // Queues up a message with the provided boot UUID.
-  void QueueMessage(flatbuffers::FlatBufferBuilder *fbb,
-                    const UUID &node_boot_uuid,
-                    aos::monotonic_clock::time_point now);
+  // Coppies a message with the provided boot UUID.
+  void CopyMessage(DataEncoder::Copier *coppier,
+                   const UUID &source_node_boot_uuid,
+                   aos::monotonic_clock::time_point now);
 
   // Updates the current boot for the source node.  This is useful when you want
   // to queue a message that may trigger a reboot rotation, but then need to
@@ -148,6 +157,8 @@
   bool header_written_ = false;
 
   std::vector<State> state_;
+
+  size_t max_message_size_;
 };
 
 // Interface describing how to name, track, and add headers to log file parts.
@@ -313,11 +324,12 @@
     temp_suffix_ = temp_suffix;
   }
 
-  // Sets the function for creating encoders.
+  // Sets the function for creating encoders.  The argument is the max message
+  // size (including headers) that will be written into this encoder.
   //
   // Defaults to just creating DummyEncoders.
   void set_encoder_factory(
-      std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory) {
+      std::function<std::unique_ptr<DataEncoder>(size_t)> encoder_factory) {
     encoder_factory_ = std::move(encoder_factory);
   }
 
@@ -449,9 +461,9 @@
   void OpenWriter(const Channel *channel, NewDataWriter *data_writer);
 
   // Opens the main data writer file for this node responsible for data_writer_.
-  void OpenDataWriter();
+  void MakeDataWriter();
 
-  void CreateBufferWriter(std::string_view path,
+  void CreateBufferWriter(std::string_view path, size_t max_message_size,
                           std::unique_ptr<DetachedBufferWriter> *destination);
 
   void RenameTempFile(DetachedBufferWriter *destination);
@@ -479,8 +491,10 @@
   std::vector<std::string> all_filenames_;
 
   std::string temp_suffix_;
-  std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory_ =
-      []() { return std::make_unique<DummyEncoder>(); };
+  std::function<std::unique_ptr<DataEncoder>(size_t)> encoder_factory_ =
+      [](size_t max_message_size) {
+        return std::make_unique<DummyEncoder>(max_message_size);
+      };
   std::string extension_;
 
   // Storage for statistics from previously-rotated DetachedBufferWriters.
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 36116a3..ab7d3c6 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -759,6 +759,64 @@
   }
 }
 
+// Class to copy a context into the provided buffer.
+class ContextDataCopier : public DataEncoder::Copier {
+ public:
+  ContextDataCopier(const Context &context, int channel_index,
+                     LogType log_type, EventLoop *event_loop)
+      : DataEncoder::Copier(PackMessageSize(log_type, context.size)),
+        context_(context),
+        channel_index_(channel_index),
+        log_type_(log_type),
+        event_loop_(event_loop) {}
+
+  monotonic_clock::time_point end_time() const { return end_time_; }
+
+  size_t Copy(uint8_t *data) final {
+    size_t result =
+        PackMessageInline(data, context_, channel_index_, log_type_);
+    end_time_ = event_loop_->monotonic_now();
+    return result;
+  }
+
+ private:
+  const Context &context_;
+  const int channel_index_;
+  const LogType log_type_;
+  EventLoop *event_loop_;
+  monotonic_clock::time_point end_time_;
+};
+
+// Class to copy a RemoteMessage into the provided buffer.
+class RemoteMessageCopier : public DataEncoder::Copier {
+ public:
+  RemoteMessageCopier(
+      const message_bridge::RemoteMessage *message, int channel_index,
+      aos::monotonic_clock::time_point monotonic_timestamp_time,
+      EventLoop *event_loop)
+      : DataEncoder::Copier(PackRemoteMessageSize()),
+        message_(message),
+        channel_index_(channel_index),
+        monotonic_timestamp_time_(monotonic_timestamp_time),
+        event_loop_(event_loop) {}
+
+  monotonic_clock::time_point end_time() const { return end_time_; }
+
+  size_t Copy(uint8_t *data) final {
+    size_t result = PackRemoteMessageInline(data, message_, channel_index_,
+                                            monotonic_timestamp_time_);
+    end_time_ = event_loop_->monotonic_now();
+    return result;
+  }
+
+ private:
+  const message_bridge::RemoteMessage *message_;
+  int channel_index_;
+  aos::monotonic_clock::time_point monotonic_timestamp_time_;
+  EventLoop *event_loop_;
+  monotonic_clock::time_point end_time_;
+};
+
 void Logger::WriteData(NewDataWriter *writer, const FetcherStruct &f) {
   if (writer != nullptr) {
     const UUID source_node_boot_uuid =
@@ -767,25 +825,17 @@
             : event_loop_->boot_uuid();
     // Write!
     const auto start = event_loop_->monotonic_now();
-    flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
-                                       max_header_size_);
-    fbb.ForceDefaults(true);
 
-    fbb.FinishSizePrefixed(
-        PackMessage(&fbb, f.fetcher->context(), f.channel_index, f.log_type));
-    const auto end = event_loop_->monotonic_now();
-    RecordCreateMessageTime(start, end, f);
+    ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
+                               f.log_type, event_loop_);
 
-    max_header_size_ =
-        std::max(max_header_size_, fbb.GetSize() - f.fetcher->context().size);
-    writer->QueueMessage(&fbb, source_node_boot_uuid, end);
+    writer->CopyMessage(&coppier, source_node_boot_uuid, start);
+    RecordCreateMessageTime(start, coppier.end_time(), f);
 
     VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
             << " for channel "
             << configuration::CleanedChannelToString(f.fetcher->channel())
-            << " to " << writer->filename() << " data "
-            << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                   fbb.GetBufferPointer()));
+            << " to " << writer->filename();
   }
 }
 
@@ -793,29 +843,24 @@
                              const FetcherStruct &f) {
   if (timestamp_writer != nullptr) {
     // And now handle timestamps.
-    const auto start = event_loop_->monotonic_now();
-    flatbuffers::FlatBufferBuilder fbb;
-    fbb.ForceDefaults(true);
-
-    fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                       f.channel_index,
-                                       LogType::kLogDeliveryTimeOnly));
-    const auto end = event_loop_->monotonic_now();
-    RecordCreateMessageTime(start, end, f);
 
     // Tell our writer that we know something about the remote boot.
     timestamp_writer->UpdateRemote(
         f.data_node_index, f.fetcher->context().source_boot_uuid,
         f.fetcher->context().monotonic_remote_time,
         f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
-    timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
+
+    const auto start = event_loop_->monotonic_now();
+    ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
+                               LogType::kLogDeliveryTimeOnly, event_loop_);
+
+    timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start);
+    RecordCreateMessageTime(start, coppier.end_time(), f);
 
     VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
             << " for channel "
             << configuration::CleanedChannelToString(f.fetcher->channel())
-            << " to " << timestamp_writer->filename() << " timestamp "
-            << FlatbufferToJson(flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                   fbb.GetBufferPointer()));
+            << " to " << timestamp_writer->filename() << " timestamp";
   }
 }
 
@@ -825,20 +870,10 @@
     const auto start = event_loop_->monotonic_now();
     // And now handle the special message contents channel.  Copy the
     // message into a FlatBufferBuilder and save it to disk.
-    //
-    // TODO(austin): We can be more efficient here when we start to
-    // care...
-    flatbuffers::FlatBufferBuilder fbb;
-    fbb.ForceDefaults(true);
-
     const RemoteMessage *msg =
         flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
 
     CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
-    // TODO(austin): This needs to check the channel_index and confirm
-    // that it should be logged before squirreling away the timestamp to
-    // disk.  We don't want to log irrelevant timestamps.
-
     // Translate from the channel index that the event loop uses to the
     // channel index in the log file.
     const int channel_index =
@@ -847,11 +882,6 @@
     const aos::monotonic_clock::time_point monotonic_timestamp_time =
         f.fetcher->context().monotonic_event_time;
 
-    fbb.FinishSizePrefixed(
-        PackRemoteMessage(&fbb, msg, channel_index, monotonic_timestamp_time));
-    const auto end = event_loop_->monotonic_now();
-    RecordCreateMessageTime(start, end, f);
-
     // Timestamps tell us information about what happened too!
     // Capture any reboots so UpdateRemote is properly recorded.
     contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
@@ -871,8 +901,13 @@
             chrono::nanoseconds(msg->monotonic_sent_time())),
         reliable, monotonic_timestamp_time);
 
-    contents_writer->QueueMessage(&fbb, UUID::FromVector(msg->boot_uuid()),
-                                  end);
+    RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
+                                 event_loop_);
+
+    contents_writer->CopyMessage(&coppier, UUID::FromVector(msg->boot_uuid()),
+                                 start);
+
+    RecordCreateMessageTime(start, coppier.end_time(), f);
   }
 }
 
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 51b46ae..3e1832e 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -321,10 +321,6 @@
   // Last time that data was written for all channels to disk.
   monotonic_clock::time_point last_synchronized_time_;
 
-  // Max size that the header has consumed.  This much extra data will be
-  // reserved in the builder to avoid reallocating.
-  size_t max_header_size_ = 0;
-
   // If true, write the message header into a separate file.
   bool separate_config_ = true;
 
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 7f32f44..4c06f0a 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -29,7 +29,7 @@
 #include "aos/events/logging/lzma_encoder.h"
 #endif
 
-DEFINE_int32(flush_size, 128000,
+DEFINE_int32(flush_size, 128 * 1024,
              "Number of outstanding bytes to allow before flushing to disk.");
 DEFINE_double(
     flush_period, 5.0,
@@ -76,13 +76,13 @@
 }
 }  // namespace
 
-DetachedBufferWriter::DetachedBufferWriter(
-    std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+DetachedBufferWriter::DetachedBufferWriter(std::string_view filename,
+                                           std::unique_ptr<DataEncoder> encoder)
     : filename_(filename), encoder_(std::move(encoder)) {
   if (!util::MkdirPIfSpace(filename, 0777)) {
     ran_out_of_space_ = true;
   } else {
-    fd_ = open(std::string(filename).c_str(),
+    fd_ = open(filename_.c_str(),
                O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
     if (fd_ == -1 && errno == ENOSPC) {
       ran_out_of_space_ = true;
@@ -136,29 +136,27 @@
     return;
   }
 
-  aos::monotonic_clock::time_point now;
-  if (encoder_->may_bypass() && span.size() > 4096u) {
-    // Over this threshold, we'll assume it's cheaper to add an extra
-    // syscall to write the data immediately instead of copying it to
-    // enqueue.
+  if (!encoder_->HasSpace(span.size())) {
+    Flush();
+    CHECK(encoder_->HasSpace(span.size()));
+  }
+  DataEncoder::SpanCopier coppier(span);
+  encoder_->Encode(&coppier);
+  FlushAtThreshold(aos::monotonic_clock::now());
+}
 
-    // First, flush everything.
-    while (encoder_->queue_size() > 0u) {
-      Flush();
-    }
-
-    // Then, write it directly.
-    const auto start = aos::monotonic_clock::now();
-    const ssize_t written = write(fd_, span.data(), span.size());
-    const auto end = aos::monotonic_clock::now();
-    HandleWriteReturn(written, span.size());
-    UpdateStatsForWrite(end - start, written, 1);
-    now = end;
-  } else {
-    encoder_->Encode(CopySpanAsDetachedBuffer(span));
-    now = aos::monotonic_clock::now();
+void DetachedBufferWriter::CopyMessage(DataEncoder::Copier *coppier,
+                                       aos::monotonic_clock::time_point now) {
+  if (ran_out_of_space_) {
+    return;
   }
 
+  if (!encoder_->HasSpace(coppier->size())) {
+    Flush();
+    CHECK(encoder_->HasSpace(coppier->size()));
+  }
+
+  encoder_->Encode(coppier);
   FlushAtThreshold(now);
 }
 
@@ -575,11 +573,11 @@
 }
 
 flatbuffers::uoffset_t PackMessageSize(LogType log_type,
-                                       const Context &context) {
+                                       size_t data_size) {
   static_assert(sizeof(flatbuffers::uoffset_t) == 4u,
                 "Update size logic please.");
   const flatbuffers::uoffset_t aligned_data_length =
-      ((context.size + 7) & 0xfffffff8u);
+      ((data_size + 7) & 0xfffffff8u);
   switch (log_type) {
     case LogType::kLogDeliveryTimeOnly:
       return PackMessageHeaderSize(log_type);
@@ -596,8 +594,11 @@
 
 size_t PackMessageInline(uint8_t *buffer, const Context &context,
                          int channel_index, LogType log_type) {
+  // TODO(austin): Figure out how to copy directly from shared memory instead of
+  // first into the fetcher's memory and then into here.  That would save a lot
+  // of memory.
   const flatbuffers::uoffset_t message_size =
-      PackMessageSize(log_type, context);
+      PackMessageSize(log_type, context.size);
 
   buffer = Push<flatbuffers::uoffset_t>(
       buffer, message_size - sizeof(flatbuffers::uoffset_t));
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 16f3675..e034bfe 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -50,7 +50,7 @@
   struct already_out_of_space_t {};
 
   DetachedBufferWriter(std::string_view filename,
-                       std::unique_ptr<DetachedBufferEncoder> encoder);
+                       std::unique_ptr<DataEncoder> encoder);
   // Creates a dummy instance which won't even open a file. It will act as if
   // opening the file ran out of space immediately.
   DetachedBufferWriter(already_out_of_space_t) : ran_out_of_space_(true) {}
@@ -73,23 +73,8 @@
   // Triggers a flush if there's enough data queued up.
   //
   // Steals the detached buffer from it.
-  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb,
-                            aos::monotonic_clock::time_point now) {
-    QueueSizedFlatbuffer(fbb->Release(), now);
-  }
-  // May steal the backing storage of buffer, or may leave it alone.
-  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer,
-                            aos::monotonic_clock::time_point now) {
-    QueueSizedFlatbuffer(std::move(buffer));
-    FlushAtThreshold(now);
-  }
-  // Unconditionally queues the buffer.
-  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
-    if (ran_out_of_space_) {
-      return;
-    }
-    encoder_->Encode(std::move(buffer));
-  }
+  void CopyMessage(DataEncoder::Copier *coppier,
+                   aos::monotonic_clock::time_point now);
 
   // Queues up data in span. May copy or may write it to disk immediately.
   void QueueSpan(absl::Span<const uint8_t> span);
@@ -173,7 +158,7 @@
   void FlushAtThreshold(aos::monotonic_clock::time_point now);
 
   std::string filename_;
-  std::unique_ptr<DetachedBufferEncoder> encoder_;
+  std::unique_ptr<DataEncoder> encoder_;
 
   int fd_ = -1;
   bool ran_out_of_space_ = false;
@@ -213,8 +198,7 @@
 
 // Returns the size that the packed message from PackMessage or
 // PackMessageInline will be.
-flatbuffers::uoffset_t PackMessageSize(LogType log_type,
-                                       const Context &context);
+flatbuffers::uoffset_t PackMessageSize(LogType log_type, size_t data_size);
 
 // Packs the provided message pointed to by context into the provided buffer.
 // This is equivalent to PackMessage, but doesn't require allocating a
diff --git a/aos/events/logging/logfile_utils_out_of_space_test_runner.cc b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
index b08bebb..afc4d77 100644
--- a/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
+++ b/aos/events/logging/logfile_utils_out_of_space_test_runner.cc
@@ -15,10 +15,12 @@
   FLAGS_flush_size = 1;
   CHECK(!FLAGS_tmpfs.empty()) << ": Must specify a tmpfs location";
 
-  aos::logger::DetachedBufferWriter writer(
-      FLAGS_tmpfs + "/file", std::make_unique<aos::logger::DummyEncoder>());
   std::array<uint8_t, 10240> data;
   data.fill(0);
+
+  aos::logger::DetachedBufferWriter writer(
+      FLAGS_tmpfs + "/file",
+      std::make_unique<aos::logger::DummyEncoder>(data.size()));
   for (int i = 0; i < 8; ++i) {
     writer.QueueSpan(data);
     CHECK(!writer.ran_out_of_space()) << ": " << i;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index c8d16e0..3d99757 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -31,8 +31,11 @@
 // test only boilerplate to DetachedBufferWriter.
 class TestDetachedBufferWriter : public DetachedBufferWriter {
  public:
+  // Pick a max size that is rather conservative.
+  static constexpr size_t kMaxMessageSize = 128 * 1024;
   TestDetachedBufferWriter(std::string_view filename)
-      : DetachedBufferWriter(filename, std::make_unique<DummyEncoder>()) {}
+      : DetachedBufferWriter(filename,
+                             std::make_unique<DummyEncoder>(kMaxMessageSize)) {}
   void WriteSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
     QueueSpan(absl::Span<const uint8_t>(buffer.data(), buffer.size()));
   }
@@ -2983,12 +2986,13 @@
           fbb.GetBufferSpan().size()));
 
       // Make sure that both the builder and inline method agree on sizes.
-      ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context))
+      ASSERT_EQ(fbb.GetSize(), PackMessageSize(type, context.size))
           << "log type " << static_cast<int>(type);
 
       // Initialize the buffer to something nonzero to make sure all the padding
       // bytes are set to 0.
-      std::vector<uint8_t> repacked_message(PackMessageSize(type, context), 67);
+      std::vector<uint8_t> repacked_message(PackMessageSize(type, context.size),
+                                            67);
 
       // And verify packing inline works as expected.
       EXPECT_EQ(repacked_message.size(),
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index 526eb31..81b4e3f 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -3,6 +3,9 @@
 
 #include "aos/configuration.h"
 #include "aos/events/logging/log_writer.h"
+#ifdef LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
 #include "aos/events/logging/snappy_encoder.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/init.h"
@@ -18,9 +21,17 @@
 
 DEFINE_bool(snappy_compress, false, "If true, compress log data using snappy.");
 
+#ifdef LZMA
+DEFINE_bool(xz_compress, false, "If true, compress log data using xz.");
+#endif
+
 DEFINE_double(rotate_every, 0.0,
               "If set, rotate the logger after this many seconds");
 
+#ifdef LZMA
+DEFINE_int32(xz_compression_level, 9, "Compression level for the LZMA Encoder");
+#endif
+
 int main(int argc, char *argv[]) {
   gflags::SetUsageMessage(
       "This program provides a simple logger binary that logs all SHMEM data "
@@ -41,8 +52,17 @@
 
   if (FLAGS_snappy_compress) {
     log_namer->set_extension(aos::logger::SnappyDecoder::kExtension);
-    log_namer->set_encoder_factory(
-        []() { return std::make_unique<aos::logger::SnappyEncoder>(); });
+    log_namer->set_encoder_factory([](size_t max_message_size) {
+      return std::make_unique<aos::logger::SnappyEncoder>(max_message_size);
+    });
+#ifdef LZMA
+  } else if (FLAGS_xz_compress) {
+    log_namer->set_extension(aos::logger::LzmaEncoder::kExtension);
+    log_namer->set_encoder_factory([](size_t max_message_size) {
+      return std::make_unique<aos::logger::LzmaEncoder>(
+          max_message_size, FLAGS_xz_compression_level);
+    });
+#endif
   }
 
   aos::monotonic_clock::time_point last_rotation_time =
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index c5b1768..cf5ec32 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -540,7 +540,8 @@
 
 struct CompressionParams {
   std::string_view extension;
-  std::function<std::unique_ptr<DetachedBufferEncoder>()> encoder_factory;
+  std::function<std::unique_ptr<DataEncoder>(size_t max_message_size)>
+      encoder_factory;
 };
 
 std::ostream &operator<<(std::ostream &ostream,
@@ -550,12 +551,19 @@
 }
 
 std::vector<CompressionParams> SupportedCompressionAlgorithms() {
-  return {{"", []() { return std::make_unique<DummyEncoder>(); }},
+  return {{"",
+           [](size_t max_message_size) {
+             return std::make_unique<DummyEncoder>(max_message_size);
+           }},
           {SnappyDecoder::kExtension,
-           []() { return std::make_unique<SnappyEncoder>(); }},
+           [](size_t max_message_size) {
+             return std::make_unique<SnappyEncoder>(max_message_size);
+           }},
 #ifdef LZMA
           {LzmaDecoder::kExtension,
-           []() { return std::make_unique<LzmaEncoder>(3); }}
+           [](size_t max_message_size) {
+             return std::make_unique<LzmaEncoder>(max_message_size, 3);
+           }}
 #endif  // LZMA
   };
 }
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index 5cb9897..f583818 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -50,7 +50,8 @@
 
 }  // namespace
 
-LzmaEncoder::LzmaEncoder(const uint32_t compression_preset, size_t block_size)
+LzmaEncoder::LzmaEncoder(size_t max_message_size,
+                         const uint32_t compression_preset, size_t block_size)
     : stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
   CHECK_GE(compression_preset_, 0u)
       << ": Compression preset must be in the range [0, 9].";
@@ -78,15 +79,26 @@
 
   stream_.avail_out = 0;
   VLOG(2) << "LzmaEncoder: Initialization succeeded.";
+
+  // TODO(austin): We don't write the biggest messages very often.  Is it more
+  // efficient to allocate if we go over a threshold to keep the static memory
+  // in use smaller, or just allocate the worst case like we are doing here?
+  input_buffer_.resize(max_message_size);
 }
 
 LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
 
-void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
-  CHECK(in.data()) << ": Encode called with nullptr.";
+void LzmaEncoder::Encode(Copier *copy) {
+  const size_t copy_size = copy->size();
+  // LZMA compresses the data as it goes along, copying the compressed results
+  // into another buffer.  So, there's no need to store more than one message
+  // since lzma is going to take it from here.
+  CHECK_LE(copy_size, input_buffer_.size());
 
-  stream_.next_in = in.data();
-  stream_.avail_in = in.size();
+  CHECK_EQ(copy->Copy(input_buffer_.data()), copy_size);
+
+  stream_.next_in = input_buffer_.data();
+  stream_.avail_in = copy_size;
 
   RunLzmaCode(LZMA_RUN);
 }
@@ -103,20 +115,21 @@
   }
 }
 
-std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
-  std::vector<absl::Span<const uint8_t>> queue;
+absl::Span<const absl::Span<const uint8_t>> LzmaEncoder::queue() {
+  return_queue_.clear();
   if (queue_.empty()) {
-    return queue;
+    return return_queue_;
   }
+  return_queue_.reserve(queue_.size());
   for (size_t i = 0; i < queue_.size() - 1; ++i) {
-    queue.emplace_back(
+    return_queue_.emplace_back(
         absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
   }
   // For the last buffer in the queue, we must account for the possibility that
   // the buffer isn't full yet.
-  queue.emplace_back(absl::MakeConstSpan(
+  return_queue_.emplace_back(absl::MakeConstSpan(
       queue_.back().data(), queue_.back().size() - stream_.avail_out));
-  return queue;
+  return return_queue_;
 }
 
 size_t LzmaEncoder::queued_bytes() const {
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 14c00eb..bbd739a 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -16,12 +16,15 @@
 namespace aos::logger {
 
 // Encodes buffers using liblzma.
-class LzmaEncoder final : public DetachedBufferEncoder {
+class LzmaEncoder final : public DataEncoder {
  public:
+  static constexpr std::string_view kExtension = ".xz";
+
   // Initializes the LZMA stream and encoder.  The block size is the block size
   // used by the multithreaded encoder for batching.  A block size of 0 tells
   // lzma to pick it's favorite block size.
-  explicit LzmaEncoder(uint32_t compression_preset, size_t block_size = 0);
+  explicit LzmaEncoder(size_t max_message_size, uint32_t compression_preset,
+                       size_t block_size = 0);
   LzmaEncoder(const LzmaEncoder &) = delete;
   LzmaEncoder(LzmaEncoder &&other) = delete;
   LzmaEncoder &operator=(const LzmaEncoder &) = delete;
@@ -29,16 +32,21 @@
   // Gracefully shuts down the encoder.
   ~LzmaEncoder() final;
 
-  void Encode(flatbuffers::DetachedBuffer &&in) final;
+  bool HasSpace(size_t /*request*/) const override {
+    // Since the underlying lzma encoder handles buffering, we always have
+    // space.
+    return true;
+  }
+  void Encode(Copier *copy) final;
   void Finish() final;
   void Clear(int n) final;
-  std::vector<absl::Span<const uint8_t>> queue() const final;
+  absl::Span<const absl::Span<const uint8_t>> queue() final;
   size_t queued_bytes() const final;
   size_t total_bytes() const final { return total_bytes_; }
   size_t queue_size() const final { return queue_.size(); }
 
  private:
-  static constexpr size_t kEncodedBufferSizeBytes{4096 * 10};
+  static constexpr size_t kEncodedBufferSizeBytes{1024 * 128};
 
   void RunLzmaCode(lzma_action action);
 
@@ -49,6 +57,11 @@
   // Total bytes that resulted from encoding raw data since the last call to
   // Reset.
   size_t total_bytes_ = 0;
+
+  // Buffer that messages get coppied into for encoding.
+  ResizeableBuffer input_buffer_;
+
+  std::vector<absl::Span<const uint8_t>> return_queue_;
 };
 
 // Decompresses data with liblzma.
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
index 1b8c895..1e814ff 100644
--- a/aos/events/logging/lzma_encoder_test.cc
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -11,9 +11,10 @@
 
 INSTANTIATE_TEST_SUITE_P(
     MtLzma, BufferEncoderTest,
-    ::testing::Combine(::testing::Values([]() {
+    ::testing::Combine(::testing::Values([](size_t max_message_size) {
                          FLAGS_lzma_threads = 3;
-                         return std::make_unique<LzmaEncoder>(2, 4096);
+                         return std::make_unique<LzmaEncoder>(max_message_size,
+                                                              2, 4096);
                        }),
                        ::testing::Values([](std::string_view filename) {
                          return std::make_unique<LzmaDecoder>(filename);
@@ -22,9 +23,10 @@
 
 INSTANTIATE_TEST_SUITE_P(
     MtLzmaThreaded, BufferEncoderTest,
-    ::testing::Combine(::testing::Values([]() {
+    ::testing::Combine(::testing::Values([](size_t max_message_size) {
                          FLAGS_lzma_threads = 3;
-                         return std::make_unique<LzmaEncoder>(5, 4096);
+                         return std::make_unique<LzmaEncoder>(max_message_size,
+                                                              5, 4096);
                        }),
                        ::testing::Values([](std::string_view filename) {
                          return std::make_unique<ThreadedLzmaDecoder>(filename);
@@ -33,9 +35,10 @@
 
 INSTANTIATE_TEST_SUITE_P(
     Lzma, BufferEncoderTest,
-    ::testing::Combine(::testing::Values([]() {
+    ::testing::Combine(::testing::Values([](size_t max_message_size) {
                          FLAGS_lzma_threads = 1;
-                         return std::make_unique<LzmaEncoder>(2, 4096);
+                         return std::make_unique<LzmaEncoder>(max_message_size,
+                                                              2, 4096);
                        }),
                        ::testing::Values([](std::string_view filename) {
                          return std::make_unique<LzmaDecoder>(filename);
@@ -44,9 +47,10 @@
 
 INSTANTIATE_TEST_SUITE_P(
     LzmaThreaded, BufferEncoderTest,
-    ::testing::Combine(::testing::Values([]() {
+    ::testing::Combine(::testing::Values([](size_t max_message_size) {
                          FLAGS_lzma_threads = 1;
-                         return std::make_unique<LzmaEncoder>(5, 4096);
+                         return std::make_unique<LzmaEncoder>(max_message_size,
+                                                              5, 4096);
                        }),
                        ::testing::Values([](std::string_view filename) {
                          return std::make_unique<ThreadedLzmaDecoder>(filename);
@@ -63,7 +67,8 @@
   std::vector<std::vector<uint8_t>> encoded_buffers;
   {
     const int encode_chunks = quantity_distribution(*random_number_generator());
-    const auto encoder = std::make_unique<LzmaEncoder>(2);
+    const auto encoder = std::make_unique<LzmaEncoder>(
+        BufferEncoderBaseTest::kMaxMessageSize, 2);
     encoded_buffers = CreateAndEncode(encode_chunks, encoder.get());
     encoder->Finish();
 
diff --git a/aos/events/logging/snappy_encoder.cc b/aos/events/logging/snappy_encoder.cc
index 19011da..2ef8363 100644
--- a/aos/events/logging/snappy_encoder.cc
+++ b/aos/events/logging/snappy_encoder.cc
@@ -29,7 +29,11 @@
 }
 }  // namespace
 
-SnappyEncoder::SnappyEncoder(size_t chunk_size) : chunk_size_(chunk_size) {
+SnappyEncoder::SnappyEncoder(size_t max_message_size, size_t chunk_size)
+    : chunk_size_(chunk_size),
+      // Make sure we have enough space to always add our largest message to the
+      // chunks we will flush.
+      buffer_source_(max_message_size + chunk_size - 1) {
   queue_.emplace_back();
   queue_.back().resize(kSnappyIdentifierChunk.size());
   std::copy(kSnappyIdentifierChunk.begin(), kSnappyIdentifierChunk.end(),
@@ -39,10 +43,8 @@
 
 void SnappyEncoder::Finish() { EncodeCurrentBuffer(); }
 
-void SnappyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
-  accumulated_checksum_ =
-      AccumulateCrc32({in.data(), in.size()}, accumulated_checksum_);
-  buffer_source_.AppendBuffer(std::move(in));
+void SnappyEncoder::Encode(Copier *copy) {
+  buffer_source_.Append(copy);
 
   if (buffer_source_.Available() >= chunk_size_) {
     EncodeCurrentBuffer();
@@ -54,7 +56,7 @@
     return;
   }
   const uint32_t uncompressed_checksum =
-      MaskChecksum(accumulated_checksum_.value());
+      MaskChecksum(buffer_source_.accumulated_checksum());
   queue_.emplace_back();
   constexpr size_t kPrefixSize = 8;
   queue_.back().resize(kPrefixSize +
@@ -79,7 +81,7 @@
 
   total_bytes_ += queue_.back().size();
 
-  accumulated_checksum_.reset();
+  buffer_source_.ResetAccumulatedChecksum();
   CHECK_EQ(0u, buffer_source_.Available());
 }
 
@@ -97,27 +99,26 @@
   return bytes;
 }
 
-std::vector<absl::Span<const uint8_t>> SnappyEncoder::queue() const {
-  std::vector<absl::Span<const uint8_t>> queue;
-  queue.reserve(queue_.size());
+absl::Span<const absl::Span<const uint8_t>> SnappyEncoder::queue() {
+  return_queue_.clear();
+  return_queue_.reserve(queue_.size());
   for (const auto &buffer : queue_) {
-    queue.emplace_back(buffer.data(), buffer.size());
+    return_queue_.emplace_back(buffer.data(), buffer.size());
   }
-  return queue;
+  return return_queue_;
+}
+
+SnappyEncoder::DetachedBufferSource::DetachedBufferSource(size_t buffer_size) {
+  data_.reserve(buffer_size);
 }
 
 size_t SnappyEncoder::DetachedBufferSource::Available() const {
-  size_t total_available = 0;
-  for (const flatbuffers::DetachedBuffer &buffer : buffers_) {
-    total_available += buffer.size();
-  }
-  return total_available;
+  return data_.size() - index_into_first_buffer_;
 }
 
 const char *SnappyEncoder::DetachedBufferSource::Peek(size_t *length) {
-  *CHECK_NOTNULL(length) = buffers_[0].size() - index_into_first_buffer_;
-  return reinterpret_cast<char *>(buffers_[0].data()) +
-         index_into_first_buffer_;
+  *CHECK_NOTNULL(length) = data_.size() - index_into_first_buffer_;
+  return reinterpret_cast<char *>(data_.data()) + index_into_first_buffer_;
 }
 
 void SnappyEncoder::DetachedBufferSource::Skip(size_t n) {
@@ -125,20 +126,25 @@
     return;
   }
 
-  CHECK(!buffers_.empty());
+  CHECK_NE(data_.size(), 0u);
 
   index_into_first_buffer_ += n;
-  CHECK_LE(index_into_first_buffer_, buffers_[0].size())
+  CHECK_LE(index_into_first_buffer_, data_.size())
       << ": " << n << " is too large a skip.";
-  if (index_into_first_buffer_ == buffers_[0].size()) {
-    buffers_.erase(buffers_.begin());
+  if (index_into_first_buffer_ == data_.size()) {
+    data_.resize(0u);
     index_into_first_buffer_ = 0;
   }
 }
 
-void SnappyEncoder::DetachedBufferSource::AppendBuffer(
-    flatbuffers::DetachedBuffer &&buffer) {
-  buffers_.emplace_back(std::move(buffer));
+void SnappyEncoder::DetachedBufferSource::Append(Copier *copy) {
+  const size_t copy_size = copy->size();
+  CHECK_LE(copy_size + data_.size(), data_.capacity());
+  size_t starting_size = data_.size();
+  data_.resize(starting_size + copy_size);
+  CHECK_EQ(copy->Copy(data_.data() + starting_size), copy_size);
+  accumulated_checksum_ = AccumulateCrc32(
+      {data_.data() + starting_size, copy_size}, accumulated_checksum_);
 }
 
 size_t SnappyDecoder::Read(uint8_t *begin, uint8_t *end) {
diff --git a/aos/events/logging/snappy_encoder.h b/aos/events/logging/snappy_encoder.h
index 6b39c21..d3edbe5 100644
--- a/aos/events/logging/snappy_encoder.h
+++ b/aos/events/logging/snappy_encoder.h
@@ -14,29 +14,42 @@
 namespace aos::logger {
 
 // Encodes buffers using snappy.
-class SnappyEncoder final : public DetachedBufferEncoder {
+class SnappyEncoder final : public DataEncoder {
  public:
-  explicit SnappyEncoder(size_t chunk_size = 32768);
+  explicit SnappyEncoder(size_t max_message_size, size_t chunk_size = 32768);
 
-  void Encode(flatbuffers::DetachedBuffer &&in) final;
+  void Encode(Copier *copy) final;
+
   void Finish() final;
   void Clear(int n) final;
-  std::vector<absl::Span<const uint8_t>> queue() const final;
+  absl::Span<const absl::Span<const uint8_t>> queue() final;
   size_t queued_bytes() const final;
+  bool HasSpace(size_t /*request*/) const final {
+    // Since the output always mallocs space, we have infinite output space.
+    return true;
+  }
   size_t total_bytes() const final { return total_bytes_; }
   size_t queue_size() const final { return queue_.size(); }
 
  private:
   class DetachedBufferSource : public snappy::Source {
    public:
+    DetachedBufferSource(size_t buffer_size);
     size_t Available() const final;
     const char *Peek(size_t *length) final;
     void Skip(size_t n) final;
-    void AppendBuffer(flatbuffers::DetachedBuffer &&buffer);
+    void Append(Copier *copy);
+
+    uint32_t accumulated_checksum() const {
+      return accumulated_checksum_.value();
+    }
+
+    void ResetAccumulatedChecksum() { accumulated_checksum_.reset(); }
 
    private:
-    std::vector<flatbuffers::DetachedBuffer> buffers_;
+    ResizeableBuffer data_;
     size_t index_into_first_buffer_ = 0;
+    std::optional<uint32_t> accumulated_checksum_;
   };
 
   // Flushes buffer_source_ and stores the compressed buffer in queue_.
@@ -55,8 +68,9 @@
   //    Clear() to be written to disk.
   const size_t chunk_size_;
   DetachedBufferSource buffer_source_;
-  std::optional<uint32_t> accumulated_checksum_;
   std::vector<ResizeableBuffer> queue_;
+
+  std::vector<absl::Span<const uint8_t>> return_queue_;
   size_t total_bytes_ = 0;
 };
 
diff --git a/aos/events/logging/snappy_encoder_test.cc b/aos/events/logging/snappy_encoder_test.cc
index 3334602..7e61f3f 100644
--- a/aos/events/logging/snappy_encoder_test.cc
+++ b/aos/events/logging/snappy_encoder_test.cc
@@ -9,8 +9,8 @@
 
 INSTANTIATE_TEST_SUITE_P(
     Snappy, BufferEncoderTest,
-    ::testing::Combine(::testing::Values([]() {
-                         return std::make_unique<SnappyEncoder>();
+    ::testing::Combine(::testing::Values([](size_t max_message_size) {
+                         return std::make_unique<SnappyEncoder>(max_message_size);
                        }),
                        ::testing::Values([](std::string_view filename) {
                          return std::make_unique<SnappyDecoder>(filename);
diff --git a/aos/events/pingpong.json b/aos/events/pingpong.json
index 64fb27f..48bd2ec 100644
--- a/aos/events/pingpong.json
+++ b/aos/events/pingpong.json
@@ -1,10 +1,14 @@
 {
   "applications": [
     {
-      "name": "ping"
+      "name": "ping",
+      "executable_name": "./ping",
+      "args": ["--config=aos_config.json"]
     },
     {
-      "name": "pong"
+      "name": "pong",
+      "executable_name": "./pong",
+      "args": ["--config=aos_config.json"]
     }
   ],
   "channels": [
@@ -22,4 +26,4 @@
   "imports": [
     "aos.json"
   ]
-}
\ No newline at end of file
+}
diff --git a/aos/flatbuffer_merge.h b/aos/flatbuffer_merge.h
index 61dbf6b..4d2d395 100644
--- a/aos/flatbuffer_merge.h
+++ b/aos/flatbuffer_merge.h
@@ -12,7 +12,7 @@
 // Merges 2 flat buffers with the provided type table into the builder.  Returns
 // the offset to the flatbuffers.
 // One or both of t1 and t2 must be non-null.  If one is null, this method
-// coppies instead of merging.
+// copies instead of merging.
 flatbuffers::Offset<flatbuffers::Table> MergeFlatBuffers(
     const flatbuffers::TypeTable *typetable, const flatbuffers::Table *t1,
     const flatbuffers::Table *t2, flatbuffers::FlatBufferBuilder *fbb);
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index a88e801..8c48fa1 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -681,7 +681,7 @@
                                 size_t /*iteration*/) {
   // TODO(austin): Can we take a problem directly without
   // AddConstraintSlackVariable being called on it and fill everything in?
-  // That'll avoid a bunch of allocations and coppies of rather sizable
+  // That'll avoid a bunch of allocations and copies of rather sizable
   // matricies.
   //
   // I think I want to do it as another review after everything works with the
diff --git a/aos/starter/BUILD b/aos/starter/BUILD
index 5bf3811..d4566a0 100644
--- a/aos/starter/BUILD
+++ b/aos/starter/BUILD
@@ -188,12 +188,16 @@
         "$(rootpath //aos/events:ping)",
         "$(rootpath //aos/events:pong)",
         "$(rootpath :aos_starter)",
+        "$(rootpath //aos:aos_dump)",
+        "$(rootpath //aos/events/logging:logger_main)",
     ],
     data = [
         ":aos_starter",
         ":starterd",
+        "//aos:aos_dump",
         "//aos/events:ping",
         "//aos/events:pingpong_config",
         "//aos/events:pong",
+        "//aos/events/logging:logger_main",
     ],
 )
diff --git a/aos/starter/starter_demo.py b/aos/starter/starter_demo.py
index f8fa520..5a50890 100755
--- a/aos/starter/starter_demo.py
+++ b/aos/starter/starter_demo.py
@@ -24,6 +24,8 @@
     parser.add_argument("ping", help="Location of ping")
     parser.add_argument("pong", help="Location of pong")
     parser.add_argument("starter_cmd", help="Location of starter_cmd")
+    parser.add_argument("aos_dump", help="Location of aos_dump")
+    parser.add_argument("logger_main", help="Location of logger_main")
     args = parser.parse_args()
 
     # Copy all the interesting files into a temporary directory and run
@@ -38,6 +40,8 @@
         shutil.copy(args.ping, tmpdir + "/ping")
         shutil.copy(args.pong, tmpdir + "/pong")
         shutil.copy(args.starter_cmd, tmpdir + "/starter_cmd")
+        shutil.copy(args.aos_dump, tmpdir + "/aos_dump")
+        shutil.copy(args.logger_main, tmpdir + "/logger_main")
         print(f"Running starter from {tmpdir}")
 
         print(f"\n\nTo run starter_cmd, do:\ncd {tmpdir}\n./starter_cmd\n\n")