Introduce interfaces for compressing and decompressing log files

Change-Id: Ia7da3f840a1780a04203f1c312447b50b142a5a3
diff --git a/aos/containers/BUILD b/aos/containers/BUILD
index 3e24fab..8277212 100644
--- a/aos/containers/BUILD
+++ b/aos/containers/BUILD
@@ -56,3 +56,13 @@
         "//aos/testing:googletest",
     ],
 )
+
+cc_library(
+    name = "resizeable_buffer",
+    hdrs = [
+        "resizeable_buffer.h",
+    ],
+    deps = [
+        "@com_github_google_glog//:glog",
+    ],
+)
diff --git a/aos/containers/resizeable_buffer.h b/aos/containers/resizeable_buffer.h
new file mode 100644
index 0000000..b94f8cc
--- /dev/null
+++ b/aos/containers/resizeable_buffer.h
@@ -0,0 +1,101 @@
+#ifndef AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
+#define AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
+
+#include <stdlib.h>
+
+#include <memory>
+
+#include "glog/logging.h"
+
+namespace aos {
+
+// Kind of like a subset of vector<uint8_t>, but with less destructor calls.
+// When building unoptimized, especially with sanitizers, the vector<uint8_t>
+// version ends up being really slow in tests.
+class ResizeableBuffer {
+ public:
+  ResizeableBuffer() = default;
+
+  ResizeableBuffer(const ResizeableBuffer &other) { *this = other; }
+  ResizeableBuffer(ResizeableBuffer &&other) { *this = std::move(other); }
+  ResizeableBuffer &operator=(const ResizeableBuffer &other) {
+    resize(other.size());
+    memcpy(storage_.get(), other.storage_.get(), size());
+    return *this;
+  }
+  ResizeableBuffer &operator=(ResizeableBuffer &&other) {
+    std::swap(storage_, other.storage_);
+    std::swap(size_, other.size_);
+    std::swap(capacity_, other.capacity_);
+    return *this;
+  }
+
+  uint8_t *data() { return static_cast<uint8_t *>(storage_.get()); }
+  const uint8_t *data() const {
+    return static_cast<const uint8_t *>(storage_.get());
+  }
+
+  uint8_t *begin() { return data(); }
+  const uint8_t *begin() const { return data(); }
+  uint8_t *end() { return data() + size(); }
+  const uint8_t *end() const { return data() + size(); }
+
+  size_t size() const { return size_; }
+  size_t capacity() const { return capacity_; }
+
+  void resize(size_t new_size) {
+    if (new_size > capacity_) {
+      Allocate(new_size);
+    }
+    size_ = new_size;
+  }
+
+  void erase_front(size_t count) {
+    if (count == 0) {
+      return;
+    }
+    CHECK_LE(count, size_);
+    memmove(static_cast<void *>(data()), static_cast<void *>(data() + count),
+            size_ - count);
+    size_ -= count;
+  }
+
+  void push_back(uint8_t x) {
+    if (size_ == capacity_) {
+      Allocate(std::max<size_t>(16, size_ * 2));
+    }
+    *end() = x;
+    ++size_;
+    CHECK_LE(size_, capacity_);
+  }
+
+ private:
+  // We need this silly function because C++ is bad, and extensions to it which
+  // we work with make it a true nightmare.
+  //
+  // (a) You can't easily write out the signature of free because it depends on
+  // whether exceptions are enabled or not. You could use decltype, but see (b).
+  //
+  // (b) You can't easily write &free because CUDA overloads it with a
+  // __device__ version. You could cast to the appropriate version, but see (a).
+  //
+  // There's probably some kind of SFINAE thing which could find the matching
+  // signature from a set of choices, and then we could just
+  // static_cast<TheSignature>(&free). However, that sounds like a nightmare,
+  // especially because it has to conditionally enable the part mentioning CUDA
+  // identifiers in the preprocessor. This little function is way simpler.
+  static void DoFree(void *p) { free(p); }
+
+  void Allocate(size_t new_capacity) {
+    void *const old = storage_.release();
+    storage_.reset(CHECK_NOTNULL(realloc(old, new_capacity)));
+    capacity_ = new_capacity;
+  }
+
+  std::unique_ptr<void, decltype(&DoFree)> storage_{nullptr, &DoFree};
+  size_t size_ = 0, capacity_ = 0;
+};
+
+}  // namespace aos
+
+#endif  // AOS_CONTAINERS_RESIZEABLE_BUFFER_H_
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 48409bb..3f43e07 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -21,9 +21,12 @@
     ],
     visibility = ["//visibility:public"],
     deps = [
+        ":buffer_encoder",
         ":logger_fbs",
         "//aos:configuration",
         "//aos:flatbuffer_merge",
+        "//aos:flatbuffers",
+        "//aos/containers:resizeable_buffer",
         "//aos/events:event_loop",
         "//aos/util:file",
         "@com_github_gflags_gflags//:gflags",
@@ -34,6 +37,56 @@
 )
 
 cc_library(
+    name = "buffer_encoder",
+    srcs = [
+        "buffer_encoder.cc",
+    ],
+    hdrs = [
+        "buffer_encoder.h",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":logger_fbs",
+        "//aos:configuration_fbs",
+        "@com_github_google_flatbuffers//:flatbuffers",
+        "@com_github_google_glog//:glog",
+        "@com_google_absl//absl/types:span",
+    ],
+)
+
+cc_test(
+    name = "buffer_encoder_test",
+    srcs = [
+        "buffer_encoder_test.cc",
+    ],
+    shard_count = 4,
+    deps = [
+        ":buffer_encoder",
+        ":buffer_encoder_param_test",
+        "//aos/testing:googletest",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_library(
+    name = "buffer_encoder_param_test",
+    testonly = True,
+    srcs = [
+        "buffer_encoder_param_test.cc",
+    ],
+    hdrs = [
+        "buffer_encoder_param_test.h",
+    ],
+    deps = [
+        ":logfile_utils",
+        ":logger_fbs",
+        "//aos/testing:googletest",
+        "//aos/testing:random_seed",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_library(
     name = "logger",
     srcs = [
         "log_namer.cc",
diff --git a/aos/events/logging/buffer_encoder.cc b/aos/events/logging/buffer_encoder.cc
new file mode 100644
index 0000000..10a7ed1
--- /dev/null
+++ b/aos/events/logging/buffer_encoder.cc
@@ -0,0 +1,65 @@
+#include "aos/events/logging/buffer_encoder.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include "glog/logging.h"
+
+namespace aos::logger {
+
+void DummyEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+  CHECK(in.data()) << ": Encode called with nullptr.";
+
+  total_bytes_ += in.size();
+  queue_.emplace_back(std::move(in));
+}
+
+void DummyEncoder::Clear(const int n) {
+  CHECK_GE(n, 0);
+  CHECK_LE(static_cast<size_t>(n), queue_size());
+  queue_.erase(queue_.begin(), queue_.begin() + n);
+}
+
+std::vector<absl::Span<const uint8_t>> DummyEncoder::queue() const {
+  std::vector<absl::Span<const uint8_t>> queue;
+  queue.reserve(queue_.size());
+  for (const auto &buffer : queue_) {
+    queue.emplace_back(buffer.data(), buffer.size());
+  }
+  return queue;
+}
+
+size_t DummyEncoder::queued_bytes() const {
+  size_t bytes = 0;
+  for (const auto &buffer : queue_) {
+    bytes += buffer.size();
+  }
+  return bytes;
+}
+
+DummyDecoder::DummyDecoder(std::string_view filename)
+    : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DummyDecoder::~DummyDecoder() {
+  int status = close(fd_);
+  if (status != 0) {
+    PLOG(ERROR) << "DummyDecoder: Failed to close file";
+  }
+}
+
+size_t DummyDecoder::Read(uint8_t *begin, uint8_t *end) {
+  if (end_of_file_) {
+    return 0;
+  }
+  const ssize_t count = read(fd_, begin, end - begin);
+  PCHECK(count >= 0) << ": Failed to read from file";
+  if (count == 0) {
+    end_of_file_ = true;
+  }
+  return static_cast<size_t>(count);
+}
+
+}  // namespace aos::logger
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
new file mode 100644
index 0000000..cc16fea
--- /dev/null
+++ b/aos/events/logging/buffer_encoder.h
@@ -0,0 +1,108 @@
+#ifndef AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
+#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
+
+#include "absl/types/span.h"
+#include "flatbuffers/flatbuffers.h"
+
+#include "aos/events/logging/logger_generated.h"
+
+namespace aos::logger {
+
+// Interface to encode DetachedBuffers as they are written to a file.
+//
+// The interface to a file is represented as a series of buffers, appropriate to
+// pass to writev(2). The backing storage for these is managed internally so
+// subclasses can handle resizing/moving as desired. Implementations must
+// enqueue as many of these backing buffers as demanded, leaving it up to the
+// caller to write them to a file when desired.
+class DetachedBufferEncoder {
+ public:
+  virtual ~DetachedBufferEncoder() = default;
+
+  // Encodes and enqueues the given DetachedBuffer.
+  //
+  // Note that in may be moved-from, or it may be left unchanged, depending on
+  // what makes the most sense for a given implementation.
+  virtual void Encode(flatbuffers::DetachedBuffer &&in) = 0;
+
+  // If this returns true, the encoder may be bypassed by writing directly to
+  // the file.
+  virtual bool may_bypass() const { return false; }
+
+  // Finalizes the encoding process. After this, queue_size() represents the
+  // full extent of data which will be written to this file.
+  //
+  // Encode may not be called after this method.
+  virtual void Finish() = 0;
+
+  // Clears the first n encoded buffers from the queue.
+  virtual void Clear(int n) = 0;
+
+  // Returns a view of the queue of encoded buffers.
+  virtual std::vector<absl::Span<const uint8_t>> queue() const = 0;
+
+  // Returns the total number of of bytes currently queued up.
+  virtual size_t queued_bytes() const = 0;
+
+  // Returns the cumulative number of bytes which have been queued. This
+  // includes data which has been removed via Clear.
+  virtual size_t total_bytes() const = 0;
+
+  // Returns the number of elements in the queue.
+  virtual size_t queue_size() const = 0;
+};
+
+// This class does not encode the data. It just claims ownership of the raw data
+// and queues it up as is.
+class DummyEncoder final : public DetachedBufferEncoder {
+ public:
+  ~DummyEncoder() override = default;
+
+  // No encoding happens, the raw data is queued up as is.
+  void Encode(flatbuffers::DetachedBuffer &&in) final;
+  bool may_bypass() const final { return true; }
+  void Finish() final {}
+  void Clear(int n) final;
+  std::vector<absl::Span<const uint8_t>> queue() const final;
+  size_t queued_bytes() const final;
+  size_t total_bytes() const final { return total_bytes_; }
+  size_t queue_size() const final { return queue_.size(); }
+
+ private:
+  std::vector<flatbuffers::DetachedBuffer> queue_;
+  size_t total_bytes_ = 0;
+};
+
+// Interface to decode chunks of data. Implementations of this interface will
+// manage opening, reading, and closing the file stream.
+class DataDecoder {
+ public:
+  virtual ~DataDecoder() = default;
+
+  // Reads data into the given range. Returns the number of bytes read.
+  //
+  // Returns less than end-begin if all bytes have been read. Otherwise, this
+  // will always fill the whole range.
+  virtual size_t Read(uint8_t *begin, uint8_t *end) = 0;
+};
+
+// Simply reads the contents of the file into the target buffer.
+class DummyDecoder final : public DataDecoder {
+ public:
+  explicit DummyDecoder(std::string_view filename);
+  ~DummyDecoder() override;
+
+  size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+  // File descriptor for the log file.
+  int fd_;
+
+  // Cached bit for if we have reached the end of the file.  Otherwise we will
+  // hammer on the kernel asking for more data each time we send.
+  bool end_of_file_ = false;
+};
+
+}  // namespace aos::logger
+
+#endif  // AOS_EVENTS_LOGGING_BUFFER_ENCODER_H_
diff --git a/aos/events/logging/buffer_encoder_param_test.cc b/aos/events/logging/buffer_encoder_param_test.cc
new file mode 100644
index 0000000..825696e
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_param_test.cc
@@ -0,0 +1,92 @@
+#include "aos/events/logging/buffer_encoder_param_test.h"
+
+#include "gmock/gmock.h"
+
+namespace aos::logger::testing {
+
+// Verifies that Clear affects the sizes as expected.
+TEST_P(BufferEncoderTest, ClearAndSizes) {
+  const auto encoder = MakeEncoder();
+
+  std::uniform_int_distribution<int> quantity_distribution(500, 600);
+  const int first_quantity = quantity_distribution(*random_number_generator());
+  const int second_quantity = quantity_distribution(*random_number_generator());
+
+  const auto first_buffers = CreateAndEncode(first_quantity, encoder.get());
+  const auto first_size = encoder->queued_bytes();
+  // We want at least 2 buffers to ensure this test is actually validating the
+  // behavior.
+  ASSERT_GT(first_size, 2u) << ": Encoder chunk size is too big for this test";
+  ASSERT_EQ(encoder->total_bytes(), first_size);
+  ASSERT_EQ(TotalSize(encoder->queue()), first_size);
+  ASSERT_EQ(encoder->queue_size(), encoder->queue().size());
+
+  encoder->Clear(encoder->queue_size());
+  ASSERT_EQ(encoder->queued_bytes(), 0);
+  ASSERT_EQ(encoder->total_bytes(), first_size);
+  ASSERT_EQ(encoder->queue().size(), 0);
+  ASSERT_EQ(encoder->queue_size(), 0);
+
+  const auto second_buffers = CreateAndEncode(second_quantity, encoder.get());
+  const auto second_size = encoder->queued_bytes();
+  ASSERT_GT(second_size, 2u) << ": Encoder chunk size is too big for this test";
+  ASSERT_EQ(encoder->total_bytes(), first_size + second_size);
+  ASSERT_EQ(TotalSize(encoder->queue()), second_size);
+  ASSERT_EQ(encoder->queue_size(), encoder->queue().size());
+}
+
+// Runs data in randomly-chunked sizes through the encoder and decoder to verify
+// it comes back out the same.
+TEST_P(BufferEncoderTest, RoundTrip) {
+  std::uniform_int_distribution<int> quantity_distribution(20, 60);
+  const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+  const std::string file_path = std::string(test_dir) + "/foo";
+
+  std::vector<std::vector<uint8_t>> encoded_buffers;
+  {
+    const int encode_chunks = quantity_distribution(*random_number_generator());
+    const auto encoder = MakeEncoder();
+    encoded_buffers = CreateAndEncode(encode_chunks, encoder.get());
+    encoder->Finish();
+
+    std::ofstream output_file(file_path, std::ios::binary);
+    for (auto span : encoder->queue()) {
+      output_file.write(reinterpret_cast<const char *>(span.data()),
+                        span.size());
+    }
+  }
+
+  const size_t total_encoded_size = TotalSize(encoded_buffers);
+
+  // Try decoding in multiple random chunkings.
+  for (int i = 0; i < 20; ++i) {
+    const auto decoder = MakeDecoder(file_path);
+    std::vector<std::vector<uint8_t>> decoded_buffers;
+    size_t total_decoded_size = 0;
+    while (true) {
+      const int chunk_size = quantity_distribution(*random_number_generator());
+      std::vector<uint8_t> chunk(chunk_size);
+      const size_t read_result =
+          decoder->Read(chunk.data(), chunk.data() + chunk_size);
+      if (read_result + total_decoded_size != total_encoded_size) {
+        // We didn't read anything, so we should've read the complete chunk.
+        ASSERT_EQ(read_result, chunk_size);
+      }
+      // Eventually we'll get here, once the decoder is really sure it's done.
+      if (read_result == 0) {
+        // Sanity check the math in the test code.
+        CHECK_EQ(total_decoded_size, TotalSize(decoded_buffers));
+        // Bail out because we're done.
+        break;
+      }
+      // If we're at the end, trim off the 0s so our comparison later works out.
+      chunk.resize(read_result);
+      total_decoded_size += read_result;
+      decoded_buffers.emplace_back(std::move(chunk));
+    }
+    ASSERT_THAT(Flatten(decoded_buffers),
+                ::testing::Eq(Flatten(encoded_buffers)));
+  }
+}
+
+}  // namespace aos::logger::testing
diff --git a/aos/events/logging/buffer_encoder_param_test.h b/aos/events/logging/buffer_encoder_param_test.h
new file mode 100644
index 0000000..fd1a00c
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_param_test.h
@@ -0,0 +1,114 @@
+#ifndef AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
+#define AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
+
+#include <functional>
+#include <memory>
+#include <random>
+#include <vector>
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/testing/random_seed.h"
+
+namespace aos::logger::testing {
+
+// Contains some helpful infrastructure for testing DetachedBufferEncoder
+// implementations.
+class BufferEncoderBaseTest : public ::testing::Test {
+ public:
+  // Creates and encodes n random buffers, returning a copy of the encoded data.
+  std::vector<std::vector<uint8_t>> CreateAndEncode(
+      int n, DetachedBufferEncoder *encoder) {
+    std::vector<std::vector<uint8_t>> result;
+    for (int i = 0; i < n; i++) {
+      flatbuffers::DetachedBuffer buffer = CreateRandomBuffer();
+      result.emplace_back(buffer.data(), buffer.data() + buffer.size());
+      encoder->Encode(std::move(buffer));
+    }
+    return result;
+  }
+
+  // Returns the total size of a vector full of objects with a size() method.
+  template <typename T>
+  static size_t TotalSize(const std::vector<T> &buffers) {
+    size_t result = 0;
+    for (const auto &v : buffers) {
+      result += v.size();
+    }
+    return result;
+  }
+
+  // Returns a flattened copy of a vector of sequences.
+  template <typename T>
+  static std::vector<typename T::value_type> Flatten(
+      const std::vector<T> &buffers) {
+    std::vector<typename T::value_type> result;
+    for (const auto &buffer : buffers) {
+      result.insert(result.end(), buffer.begin(), buffer.end());
+    }
+    return result;
+  }
+
+  void Reseed(int seed) {
+    // This algorithm allows a reasonable --runs_per_test to cover new seeds.
+    random_number_generator_ =
+        std::mt19937(::aos::testing::RandomSeed() + 1000 * seed);
+  }
+
+  std::mt19937 *random_number_generator() { return &random_number_generator_; }
+
+ private:
+  flatbuffers::DetachedBuffer CreateRandomBuffer() {
+    std::uniform_int_distribution<int> length_distribution(2800, 3000);
+    const size_t length = length_distribution(random_number_generator_);
+
+    flatbuffers::FlatBufferBuilder fbb;
+    uint8_t *data;
+    const flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+        fbb.CreateUninitializedVector(length, &data);
+    {
+      std::independent_bits_engine<std::mt19937, CHAR_BIT, uint8_t> engine(
+          std::ref(random_number_generator_));
+      const uint8_t byte = engine();
+      std::fill(data, data + length, byte);
+    }
+
+    MessageHeader::Builder builder(fbb);
+    builder.add_data(data_offset);
+    fbb.FinishSizePrefixed(builder.Finish());
+
+    return fbb.Release();
+  }
+
+  std::mt19937 random_number_generator_{
+      std::mt19937(::aos::testing::RandomSeed())};
+};
+
+// Tests some generic properties for any DetachedBufferEncoder+DataDecoder
+// implementation pair.
+//
+// First and second test parameters are methods to create instances of the
+// encoder and decoder. Third test parameter is a random seed, to allow testing
+// many different data patterns.
+class BufferEncoderTest
+    : public BufferEncoderBaseTest,
+      public ::testing::WithParamInterface<std::tuple<
+          std::function<std::unique_ptr<DetachedBufferEncoder>()>,
+          std::function<std::unique_ptr<DataDecoder>(std::string_view)>, int>> {
+ public:
+  BufferEncoderTest() { Reseed(std::get<2>(GetParam())); }
+
+  std::unique_ptr<DetachedBufferEncoder> MakeEncoder() const {
+    return std::get<0>(GetParam())();
+  }
+  std::unique_ptr<DataDecoder> MakeDecoder(std::string_view filename) const {
+    return std::get<1>(GetParam())(filename);
+  }
+};
+
+}  // namespace aos::logger::testing
+
+#endif  // AOS_EVENTS_LOGGING_BUFFER_ENCODER_PARAM_TEST_H_
diff --git a/aos/events/logging/buffer_encoder_test.cc b/aos/events/logging/buffer_encoder_test.cc
new file mode 100644
index 0000000..a9c2bbb
--- /dev/null
+++ b/aos/events/logging/buffer_encoder_test.cc
@@ -0,0 +1,106 @@
+#include "aos/events/logging/buffer_encoder.h"
+
+#include <algorithm>
+#include <fstream>
+#include <string>
+
+#include "glog/logging.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/logging/buffer_encoder_param_test.h"
+
+namespace aos::logger::testing {
+
+class DummyEncoderTest : public BufferEncoderBaseTest {};
+
+// Tests that buffers are enqueued without any changes.
+TEST_F(DummyEncoderTest, QueuesBuffersAsIs) {
+  DummyEncoder encoder;
+  const auto expected = CreateAndEncode(100, &encoder);
+
+  auto queue = encoder.queue();
+  EXPECT_THAT(queue, ::testing::ElementsAreArray(expected));
+}
+
+// Checks that DummyDecoder can read into a buffer.
+TEST(DummyDecoderTest, ReadsIntoExactBuffer) {
+  static const std::string kTestString{"Just some random words."};
+
+  const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+  const std::string file_path = std::string(test_dir) + "/foo";
+  std::ofstream(file_path, std::ios::binary) << kTestString;
+
+  // Read the contents of the file into the buffer.
+  DummyDecoder dummy_decoder(file_path.c_str());
+  std::vector<uint8_t> buffer(kTestString.size());
+  const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+  ASSERT_EQ(std::string(buffer.data(), buffer.data() + count), kTestString);
+
+  for (int i = 0; i < 10; ++i) {
+    // Verify that there is no more data to read from the file.
+    ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+  }
+}
+
+// Checks that DummyDecoder can read into a buffer that can accommodate all the
+// data in the file.
+TEST(DummyDecoderTest, ReadsIntoLargerBuffer) {
+  static const std::string kTestString{"Just some random words."};
+
+  const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+  const std::string file_path = std::string(test_dir) + "/foo";
+  std::ofstream(file_path, std::ios::binary) << kTestString;
+
+  DummyDecoder dummy_decoder(file_path.c_str());
+  std::vector<uint8_t> buffer(100);
+  const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+  buffer.resize(count);
+  ASSERT_EQ(std::string(buffer.data(), buffer.data() + count), kTestString);
+
+  // Verify that there is no more data to read from the file.
+  ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+}
+
+// Checks that DummyDecoder can repeatedly read the contents of the file into a
+// smaller buffer until there is no more to read.
+TEST(DummyDecoderTest, ReadsRepeatedlyIntoSmallerBuffer) {
+  static const std::string kTestString{"Just some random words."};
+
+  const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+  const std::string file_path = std::string(test_dir) + "/foo";
+  std::ofstream(file_path, std::ios::binary) << kTestString;
+
+  DummyDecoder dummy_decoder(file_path.c_str());
+  std::vector<uint8_t> buffer((kTestString.size() + 1) / 2);
+
+  {
+    // Read into our buffer once, and verify the contents.
+    const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+    ASSERT_EQ(std::string(buffer.data(), buffer.data() + count),
+              kTestString.substr(0, buffer.size()));
+  }
+
+  {
+    // Read into the same buffer again, and verify the contents.
+    const size_t count = dummy_decoder.Read(&*buffer.begin(), &*buffer.end());
+    ASSERT_EQ(
+        std::string(buffer.data(), buffer.data() + count),
+        kTestString.substr(buffer.size(), kTestString.size() - buffer.size()));
+  }
+
+  // Verify that there is no more data to read from the file.
+  ASSERT_EQ(dummy_decoder.Read(&*buffer.begin(), &*buffer.end()), 0);
+}
+
+INSTANTIATE_TEST_CASE_P(
+    Dummy, BufferEncoderTest,
+    ::testing::Combine(::testing::Values([]() {
+                         return std::make_unique<DummyEncoder>();
+                       }),
+                       ::testing::Values([](std::string_view filename) {
+                         return std::make_unique<DummyDecoder>(filename);
+                       }),
+                       ::testing::Range(0, 100)));
+
+}  // namespace aos::logger::testing
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index 4078f9b..9961bd8 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -38,7 +38,8 @@
       aos::logger::SpanReader span_reader(orig_path);
       CHECK(!span_reader.ReadMessage().empty()) << ": Empty header, aborting";
 
-      aos::logger::DetachedBufferWriter buffer_writer(FLAGS_logfile);
+      aos::logger::DetachedBufferWriter buffer_writer(
+          FLAGS_logfile, std::make_unique<aos::logger::DummyEncoder>());
       buffer_writer.QueueSizedFlatbuffer(&fbb);
 
       while (true) {
@@ -47,7 +48,7 @@
           break;
         }
 
-        buffer_writer.WriteSizedFlatbuffer(msg_data);
+        buffer_writer.QueueSpan(msg_data);
       }
     } else {
       aos::logger::MessageReader reader(FLAGS_logfile);
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index def0842..a635682 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -32,7 +32,7 @@
     const Node *node) {
   CHECK_EQ(node, this->node());
   UpdateHeader(header, uuid_, part_number_);
-  data_writer_->WriteSizedFlatbuffer(header->full_span());
+  data_writer_->QueueSpan(header->full_span());
 }
 
 DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
@@ -47,7 +47,7 @@
   ++part_number_;
   *data_writer_ = std::move(*OpenDataWriter());
   UpdateHeader(header, uuid_, part_number_);
-  data_writer_->WriteSizedFlatbuffer(header->full_span());
+  data_writer_->QueueSpan(header->full_span());
 }
 
 DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
@@ -81,14 +81,14 @@
     const Node *node) {
   if (node == this->node()) {
     UpdateHeader(header, uuid_, part_number_);
-    data_writer_->WriteSizedFlatbuffer(header->full_span());
+    data_writer_->QueueSpan(header->full_span());
   } else {
     for (std::pair<const Channel *const, DataWriter> &data_writer :
          data_writers_) {
       if (node == data_writer.second.node) {
         UpdateHeader(header, data_writer.second.uuid,
                      data_writer.second.part_number);
-        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+        data_writer.second.writer->QueueSpan(header->full_span());
       }
     }
   }
@@ -101,7 +101,7 @@
     ++part_number_;
     *data_writer_ = std::move(*OpenDataWriter());
     UpdateHeader(header, uuid_, part_number_);
-    data_writer_->WriteSizedFlatbuffer(header->full_span());
+    data_writer_->QueueSpan(header->full_span());
   } else {
     for (std::pair<const Channel *const, DataWriter> &data_writer :
          data_writers_) {
@@ -110,7 +110,7 @@
         data_writer.second.rotate(data_writer.first, &data_writer.second);
         UpdateHeader(header, data_writer.second.uuid,
                      data_writer.second.part_number);
-        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+        data_writer.second.writer->QueueSpan(header->full_span());
       }
     }
   }
@@ -208,9 +208,11 @@
                    data_writer->part_number, ".bfbs");
 
   if (!data_writer->writer) {
-    data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+    data_writer->writer = std::make_unique<DetachedBufferWriter>(
+        filename, std::make_unique<DummyEncoder>());
   } else {
-    *data_writer->writer = DetachedBufferWriter(filename);
+    *data_writer->writer =
+        DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
   }
 }
 
@@ -221,16 +223,19 @@
       channel->name()->string_view(), "/", channel->type()->string_view(),
       ".part", data_writer->part_number, ".bfbs");
   if (!data_writer->writer) {
-    data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+    data_writer->writer = std::make_unique<DetachedBufferWriter>(
+        filename, std::make_unique<DummyEncoder>());
   } else {
-    *data_writer->writer = DetachedBufferWriter(filename);
+    *data_writer->writer =
+        DetachedBufferWriter(filename, std::make_unique<DummyEncoder>());
   }
 }
 
 std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
   return std::make_unique<DetachedBufferWriter>(
       absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
-                   part_number_, ".bfbs"));
+                   part_number_, ".bfbs"),
+      std::make_unique<DummyEncoder>());
 }
 
 }  // namespace logger
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 7611cb8..1bef629 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -108,7 +108,8 @@
   // Creates a new data writer with the new part number.
   std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
     return std::make_unique<DetachedBufferWriter>(
-        absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+        absl::StrCat(base_name_, ".part", part_number_, ".bfbs"),
+        std::make_unique<aos::logger::DummyEncoder>());
   }
 
   const std::string base_name_;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 26c3825..c7f013d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1,16 +1,15 @@
 #include "aos/events/logging/logfile_utils.h"
 
 #include <fcntl.h>
-#include <limits.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <sys/uio.h>
 
-#include <vector>
+#include <algorithm>
+#include <climits>
 
 #include "absl/strings/escaping.h"
 #include "aos/configuration.h"
-#include "aos/events/logging/logger_generated.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/util/file.h"
 #include "flatbuffers/flatbuffers.h"
@@ -20,13 +19,13 @@
 DEFINE_int32(flush_size, 128000,
              "Number of outstanding bytes to allow before flushing to disk.");
 
-namespace aos {
-namespace logger {
+namespace aos::logger {
 
 namespace chrono = std::chrono;
 
-DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
-    : filename_(filename) {
+DetachedBufferWriter::DetachedBufferWriter(
+    std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
+    : filename_(filename), encoder_(std::move(encoder)) {
   util::MkdirP(filename, 0777);
   fd_ = open(std::string(filename).c_str(),
              O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
@@ -35,7 +34,10 @@
 }
 
 DetachedBufferWriter::~DetachedBufferWriter() {
-  Flush();
+  encoder_->Finish();
+  while (encoder_->queue_size() > 0) {
+    Flush();
+  }
   PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
   VLOG(1) << "Closed " << filename_;
 }
@@ -50,77 +52,95 @@
 DetachedBufferWriter &DetachedBufferWriter::operator=(
     DetachedBufferWriter &&other) {
   std::swap(filename_, other.filename_);
+  std::swap(encoder_, other.encoder_);
   std::swap(fd_, other.fd_);
-  std::swap(queued_size_, other.queued_size_);
-  std::swap(written_size_, other.written_size_);
-  std::swap(queue_, other.queue_);
   std::swap(iovec_, other.iovec_);
+  std::swap(max_write_time_, other.max_write_time_);
+  std::swap(max_write_time_bytes_, other.max_write_time_bytes_);
+  std::swap(max_write_time_messages_, other.max_write_time_messages_);
+  std::swap(total_write_time_, other.total_write_time_);
+  std::swap(total_write_count_, other.total_write_count_);
+  std::swap(total_write_messages_, other.total_write_messages_);
+  std::swap(total_write_bytes_, other.total_write_bytes_);
   return *this;
 }
 
-void DetachedBufferWriter::QueueSizedFlatbuffer(
-    flatbuffers::FlatBufferBuilder *fbb) {
-  QueueSizedFlatbuffer(fbb->Release());
-}
+void DetachedBufferWriter::QueueSpan(absl::Span<const uint8_t> span) {
+  if (encoder_->may_bypass() && span.size() > 4096u) {
+    // Over this threshold, we'll assume it's cheaper to add an extra
+    // syscall to write the data immediately instead of copying it to
+    // enqueue.
 
-void DetachedBufferWriter::WriteSizedFlatbuffer(
-    absl::Span<const uint8_t> span) {
-  // Cheat aggressively...  Write out the queued up data, and then write this
-  // data once without buffering.  It is hard to make a DetachedBuffer out of
-  // this data, and we don't want to worry about lifetimes.
-  Flush();
-  iovec_.clear();
-  iovec_.reserve(1);
+    // First, flush everything.
+    while (encoder_->queue_size() > 0u) {
+      Flush();
+    }
 
-  struct iovec n;
-  n.iov_base = const_cast<uint8_t *>(span.data());
-  n.iov_len = span.size();
-  iovec_.emplace_back(n);
-
-  const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
-  PCHECK(written == static_cast<ssize_t>(n.iov_len))
-      << ": Wrote " << written << " expected " << n.iov_len;
-  written_size_ += written;
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
-    flatbuffers::DetachedBuffer &&buffer) {
-  queued_size_ += buffer.size();
-  queue_.emplace_back(std::move(buffer));
-
-  // Flush if we are at the max number of iovs per writev, or have written
-  // enough data.  Otherwise writev will fail with an invalid argument.
-  if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
-      queue_.size() == IOV_MAX) {
-    Flush();
+    // Then, write it directly.
+    const auto start = aos::monotonic_clock::now();
+    const ssize_t written = write(fd_, span.data(), span.size());
+    const auto end = aos::monotonic_clock::now();
+    PCHECK(written >= 0) << ": write failed";
+    CHECK_EQ(written, static_cast<ssize_t>(span.size()))
+        << ": Wrote " << written << " expected " << span.size();
+    UpdateStatsForWrite(end - start, written, 1);
+  } else {
+    encoder_->Encode(CopySpanAsDetachedBuffer(span));
   }
+
+  FlushAtThreshold();
 }
 
 void DetachedBufferWriter::Flush() {
-  if (queue_.size() == 0u) {
+  const auto queue = encoder_->queue();
+  if (queue.empty()) {
     return;
   }
+
   iovec_.clear();
-  iovec_.reserve(queue_.size());
+  const size_t iovec_size = std::min<size_t>(queue.size(), IOV_MAX);
+  iovec_.resize(iovec_size);
   size_t counted_size = 0;
-  for (size_t i = 0; i < queue_.size(); ++i) {
-    struct iovec n;
-    n.iov_base = queue_[i].data();
-    n.iov_len = queue_[i].size();
-    counted_size += n.iov_len;
-    iovec_.emplace_back(std::move(n));
+  for (size_t i = 0; i < iovec_size; ++i) {
+    iovec_[i].iov_base = const_cast<uint8_t *>(queue[i].data());
+    iovec_[i].iov_len = queue[i].size();
+    counted_size += iovec_[i].iov_len;
   }
-  CHECK_EQ(counted_size, queued_size_);
+
+  const auto start = aos::monotonic_clock::now();
   const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
-  PCHECK(written == static_cast<ssize_t>(queued_size_))
-      << ": Wrote " << written << " expected " << queued_size_;
-  written_size_ += written;
-
-  queued_size_ = 0;
-  queue_.clear();
+  const auto end = aos::monotonic_clock::now();
+  PCHECK(written >= 0) << ": writev failed";
   // TODO(austin): Handle partial writes in some way other than crashing...
+  CHECK_EQ(written, static_cast<ssize_t>(counted_size))
+      << ": Wrote " << written << " expected " << counted_size;
+
+  encoder_->Clear(iovec_size);
+
+  UpdateStatsForWrite(end - start, written, iovec_size);
+}
+
+void DetachedBufferWriter::UpdateStatsForWrite(
+    aos::monotonic_clock::duration duration, ssize_t written, int iovec_size) {
+  if (duration > max_write_time_) {
+    max_write_time_ = duration;
+    max_write_time_bytes_ = written;
+    max_write_time_messages_ = iovec_size;
+  }
+  total_write_time_ += duration;
+  ++total_write_count_;
+  total_write_messages_ += iovec_size;
+  total_write_bytes_ += written;
+}
+
+void DetachedBufferWriter::FlushAtThreshold() {
+  // Flush if we are at the max number of iovs per writev, because there's no
+  // point queueing up any more data in memory. Also flush once we have enough
+  // data queued up.
+  while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
+         encoder_->queue_size() >= IOV_MAX) {
+    Flush();
+  }
 }
 
 flatbuffers::Offset<MessageHeader> PackMessage(
@@ -185,10 +205,10 @@
   return message_header_builder.Finish();
 }
 
-SpanReader::SpanReader(std::string_view filename)
-    : filename_(filename),
-      fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
-  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
+  // Support for other kinds of decoders based on the filename should be added
+  // here.
+  decoder_ = std::make_unique<DummyDecoder>(filename);
 }
 
 absl::Span<const uint8_t> SpanReader::ReadMessage() {
@@ -244,18 +264,13 @@
 }
 
 bool SpanReader::ReadBlock() {
-  if (end_of_file_) {
-    return false;
-  }
-
-  // Appends 256k.  This is enough that the read call is efficient.  We don't
-  // want to spend too much time reading small chunks because the syscalls for
-  // that will be expensive.
+  // This is the amount of data we grab at a time. Doing larger chunks minimizes
+  // syscalls and helps decompressors batch things more efficiently.
   constexpr size_t kReadSize = 256 * 1024;
 
   // Strip off any unused data at the front.
   if (consumed_data_ != 0) {
-    data_.erase(data_.begin(), data_.begin() + consumed_data_);
+    data_.erase_front(consumed_data_);
     consumed_data_ = 0;
   }
 
@@ -264,15 +279,14 @@
   // This should automatically grow the backing store.  It won't shrink if we
   // get a small chunk later.  This reduces allocations when we want to append
   // more data.
-  data_.resize(data_.size() + kReadSize);
+  data_.resize(starting_size + kReadSize);
 
-  ssize_t count = read(fd_, &data_[starting_size], kReadSize);
-  data_.resize(starting_size + std::max(count, static_cast<ssize_t>(0)));
+  const size_t count =
+      decoder_->Read(data_.begin() + starting_size, data_.end());
+  data_.resize(starting_size + count);
   if (count == 0) {
-    end_of_file_ = true;
     return false;
   }
-  PCHECK(count > 0);
 
   return true;
 }
@@ -1520,5 +1534,4 @@
   return "";
 }
 
-}  // namespace logger
-}  // namespace aos
+}  // namespace aos::logger
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 9534860..f64881c 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -15,12 +15,14 @@
 #include <vector>
 
 #include "absl/types/span.h"
+#include "aos/containers/resizeable_buffer.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/buffer_encoder.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/flatbuffers.h"
 #include "flatbuffers/flatbuffers.h"
 
-namespace aos {
-namespace logger {
+namespace aos::logger {
 
 enum class LogType : uint8_t {
   // The message originated on this node and should be logged here.
@@ -37,10 +39,11 @@
 };
 
 // This class manages efficiently writing a sequence of detached buffers to a
-// file.  It queues them up and batches the write operation.
+// file.  It encodes them, queues them up, and batches the write operation.
 class DetachedBufferWriter {
  public:
-  DetachedBufferWriter(std::string_view filename);
+  DetachedBufferWriter(std::string_view filename,
+                       std::unique_ptr<DetachedBufferEncoder> encoder);
   DetachedBufferWriter(DetachedBufferWriter &&other);
   DetachedBufferWriter(const DetachedBufferWriter &) = delete;
 
@@ -55,39 +58,86 @@
   // it.  The main use case is updating start times after a log file starts.
   void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
 
-  // TODO(austin): Snappy compress the log file if it ends with .snappy!
+  // Queues up a finished FlatBufferBuilder to be encoded and written.
+  //
+  // Triggers a flush if there's enough data queued up.
+  //
+  // Steals the detached buffer from it.
+  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb) {
+    QueueSizedFlatbuffer(fbb->Release());
+  }
+  // May steal the backing storage of buffer, or may leave it alone.
+  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
+    encoder_->Encode(std::move(buffer));
+    FlushAtThreshold();
+  }
 
-  // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
-  // buffer from it.
-  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
-  // Queues up a detached buffer directly.
-  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
-  // Writes a Span.  This is not terribly optimized right now.
-  void WriteSizedFlatbuffer(absl::Span<const uint8_t> span);
+  // Queues up data in span. May copy or may write it to disk immediately.
+  void QueueSpan(absl::Span<const uint8_t> span);
 
-  // Triggers data to be provided to the kernel and written.
-  void Flush();
+  // Returns the total number of bytes written and currently queued.
+  size_t total_bytes() const { return encoder_->total_bytes(); }
 
-  // Returns the number of bytes written.
-  size_t written_size() const { return written_size_; }
-
-  // Returns the number of bytes written or currently queued.
-  size_t total_size() const { return written_size_ + queued_size_; }
+  // The maximum time for a single write call, or 0 if none have been performed.
+  std::chrono::nanoseconds max_write_time() const { return max_write_time_; }
+  // The number of bytes in the longest write call, or -1 if none have been
+  // performed.
+  int max_write_time_bytes() const { return max_write_time_bytes_; }
+  // The number of buffers in the longest write call, or -1 if none have been
+  // performed.
+  int max_write_time_messages() const { return max_write_time_messages_; }
+  // The total time spent in write calls.
+  std::chrono::nanoseconds total_write_time() const {
+    return total_write_time_;
+  }
+  // The total number of writes which have been performed.
+  int total_write_count() const { return total_write_count_; }
+  // The total number of messages which have been written.
+  int total_write_messages() const { return total_write_messages_; }
+  // The total number of bytes which have been written.
+  int total_write_bytes() const { return total_write_bytes_; }
+  void ResetStatistics() {
+    max_write_time_ = std::chrono::nanoseconds::zero();
+    max_write_time_bytes_ = -1;
+    max_write_time_messages_ = -1;
+    total_write_time_ = std::chrono::nanoseconds::zero();
+    total_write_count_ = 0;
+    total_write_messages_ = 0;
+    total_write_bytes_ = 0;
+  }
 
  private:
+  // Performs a single writev call with as much of the data we have queued up as
+  // possible.
+  //
+  // This will normally take all of the data we have queued up, unless an
+  // encoder has spit out a big enough chunk all at once that we can't manage
+  // all of it.
+  void Flush();
+
+  void UpdateStatsForWrite(aos::monotonic_clock::duration duration,
+                           ssize_t written, int iovec_size);
+
+  // Flushes data if we've reached the threshold to do that as part of normal
+  // operation.
+  void FlushAtThreshold();
+
   std::string filename_;
+  std::unique_ptr<DetachedBufferEncoder> encoder_;
 
   int fd_ = -1;
 
-  // Size of all the data in the queue.
-  size_t queued_size_ = 0;
-  size_t written_size_ = 0;
-
-  // List of buffers to flush.
-  std::vector<flatbuffers::DetachedBuffer> queue_;
   // List of iovecs to use with writev.  This is a member variable to avoid
   // churn.
   std::vector<struct iovec> iovec_;
+
+  std::chrono::nanoseconds max_write_time_ = std::chrono::nanoseconds::zero();
+  int max_write_time_bytes_ = -1;
+  int max_write_time_messages_ = -1;
+  std::chrono::nanoseconds total_write_time_ = std::chrono::nanoseconds::zero();
+  int total_write_count_ = 0;
+  int total_write_messages_ = 0;
+  int total_write_bytes_ = 0;
 };
 
 // Packes a message pointed to by the context into a MessageHeader.
@@ -104,8 +154,6 @@
  public:
   SpanReader(std::string_view filename);
 
-  ~SpanReader() { close(fd_); }
-
   std::string_view filename() const { return filename_; }
 
   // Returns a span with the data for a message from the log file, excluding
@@ -121,50 +169,22 @@
   //   Allocate the 256k blocks like we do today.  But, refcount them with
   //   shared_ptr pointed to by the messageheader that is returned.  This avoids
   //   the copy.  Need to do more benchmarking.
+  //   And (Brian): Consider just mmapping the file and handing out refcounted
+  //   pointers into that too.
 
   // Reads a chunk of data into data_.  Returns false if no data was read.
   bool ReadBlock();
 
   const std::string filename_;
 
-  // File descriptor for the log file.
-  int fd_ = -1;
+  // File reader and data decoder.
+  std::unique_ptr<DataDecoder> decoder_;
 
-  // Allocator which doesn't zero initialize memory.
-  template <typename T>
-  struct DefaultInitAllocator {
-    typedef T value_type;
-
-    template <typename U>
-    void construct(U *p) {
-      ::new (static_cast<void *>(p)) U;
-    }
-
-    template <typename U, typename... Args>
-    void construct(U *p, Args &&... args) {
-      ::new (static_cast<void *>(p)) U(std::forward<Args>(args)...);
-    }
-
-    T *allocate(std::size_t n) {
-      return reinterpret_cast<T *>(::operator new(sizeof(T) * n));
-    }
-
-    template <typename U>
-    void deallocate(U *p, std::size_t /*n*/) {
-      ::operator delete(static_cast<void *>(p));
-    }
-  };
-
-  // Vector to read into.  This uses an allocator which doesn't zero
-  // initialize the memory.
-  std::vector<uint8_t, DefaultInitAllocator<uint8_t>> data_;
+  // Vector to read into.
+  ResizeableBuffer data_;
 
   // Amount of data consumed already in data_.
   size_t consumed_data_ = 0;
-
-  // Cached bit for if we have reached the end of the file.  Otherwise we will
-  // hammer on the kernel asking for more data each time we send.
-  bool end_of_file_ = false;
 };
 
 // Class which handles reading the header and messages from the log file.  This
@@ -668,7 +688,6 @@
 // a single node.
 std::string MaybeNodeName(const Node *);
 
-}  // namespace logger
-}  // namespace aos
+}  // namespace aos::logger
 
 #endif  // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index bab006a..33d50d3 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -359,6 +359,16 @@
   flatbuffers::DetachedBuffer buffer_;
 };
 
+inline flatbuffers::DetachedBuffer CopySpanAsDetachedBuffer(
+    absl::Span<const uint8_t> span) {
+  // Copy the data from the span.
+  uint8_t *buf = flatbuffers::DefaultAllocator().allocate(span.size());
+  memcpy(buf, span.data(), span.size());
+  // Then give it to a DetachedBuffer to manage.
+  return flatbuffers::DetachedBuffer(nullptr, false, buf, span.size(), buf,
+                                     span.size());
+}
+
 }  // namespace aos
 
 #endif  // AOS_FLATBUFFERS_H_