Change LogReader API to be able to replace messages

The mutation API in LogReader was not able to express dropping messages,
or growing messages.  This enables more aggressive mutation.

Change-Id: I477482da4262483a780d15ebf8c98a51e37099f6
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/flatbuffers/BUILD b/aos/flatbuffers/BUILD
index 32f1d39..1d6c602 100644
--- a/aos/flatbuffers/BUILD
+++ b/aos/flatbuffers/BUILD
@@ -33,6 +33,7 @@
     name = "base_test",
     srcs = ["base_test.cc"],
     deps = [
+        ":aligned_allocator",
         ":base",
         "//aos/testing:googletest",
     ],
@@ -165,3 +166,17 @@
     srcs = ["test_static.h"],
     visibility = [":__subpackages__"],
 )
+
+cc_library(
+    name = "aligned_allocator",
+    srcs = ["aligned_allocator.cc"],
+    hdrs = ["aligned_allocator.h"],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":base",
+        "//aos/containers:resizeable_buffer",
+        "//aos/events:event_loop",
+        "//aos/ipc_lib:data_alignment",
+        "@com_github_google_glog//:glog",
+    ],
+)
diff --git a/aos/flatbuffers/aligned_allocator.cc b/aos/flatbuffers/aligned_allocator.cc
new file mode 100644
index 0000000..d2c47e5
--- /dev/null
+++ b/aos/flatbuffers/aligned_allocator.cc
@@ -0,0 +1,96 @@
+#include "aos/flatbuffers/aligned_allocator.h"
+
+namespace aos::fbs {
+
+AlignedVectorAllocator::~AlignedVectorAllocator() {
+  CHECK(buffer_.empty())
+      << ": Must deallocate before destroying the AlignedVectorAllocator.";
+}
+
+std::optional<std::span<uint8_t>> AlignedVectorAllocator::Allocate(
+    size_t size, size_t /*alignment*/, fbs::SetZero set_zero) {
+  CHECK(buffer_.empty()) << ": Must deallocate before calling Allocate().";
+  buffer_.resize(((size + kAlignment - 1) / kAlignment) * kAlignment);
+  allocated_size_ = size;
+  if (set_zero == fbs::SetZero::kYes) {
+    memset(buffer_.data(), 0, buffer_.size());
+  }
+
+  return std::span<uint8_t>{data(), allocated_size_};
+}
+
+std::optional<std::span<uint8_t>> AlignedVectorAllocator::InsertBytes(
+    void *insertion_point, size_t bytes, size_t /*alignment*/,
+    fbs::SetZero set_zero) {
+  DCHECK_GE(reinterpret_cast<const uint8_t *>(insertion_point), data());
+  DCHECK_LE(reinterpret_cast<const uint8_t *>(insertion_point),
+            data() + allocated_size_);
+  const size_t buffer_offset =
+      reinterpret_cast<const uint8_t *>(insertion_point) - data();
+  // TODO(austin): This has an extra memcpy in it that isn't strictly needed
+  // when we resize.  Remove it if performance is a concern.
+  const size_t absolute_buffer_offset =
+      reinterpret_cast<const uint8_t *>(insertion_point) - buffer_.data();
+  const size_t previous_size = buffer_.size();
+
+  buffer_.resize(((allocated_size_ + bytes + kAlignment - 1) / kAlignment) *
+                 kAlignment);
+
+  // Now, we've got space both before and after the block of data.  Move the
+  // data after to the end, and the data before to the start.
+
+  const size_t new_space_after = buffer_.size() - previous_size;
+
+  // Move the rest of the data to be end aligned.  If the buffer wasn't resized,
+  // this will be a nop.
+  memmove(buffer_.data() + absolute_buffer_offset + new_space_after,
+          buffer_.data() + absolute_buffer_offset,
+          previous_size - absolute_buffer_offset);
+
+  // Now, move the data at the front to be aligned too.
+  memmove(buffer_.data() + buffer_.size() - (allocated_size_ + bytes),
+          buffer_.data() + previous_size - allocated_size_,
+          allocated_size_ - (previous_size - absolute_buffer_offset));
+
+  if (set_zero == fbs::SetZero::kYes) {
+    memset(data() - bytes + buffer_offset, 0, bytes);
+  }
+  allocated_size_ += bytes;
+
+  return std::span<uint8_t>{data(), allocated_size_};
+}
+
+std::span<uint8_t> AlignedVectorAllocator::RemoveBytes(
+    std::span<uint8_t> remove_bytes) {
+  const ssize_t removal_index = remove_bytes.data() - buffer_.data();
+  const size_t old_start_index = buffer_.size() - allocated_size_;
+  CHECK_LE(static_cast<ssize_t>(old_start_index), removal_index);
+  CHECK_LE(removal_index, static_cast<ssize_t>(buffer_.size()));
+  CHECK_LE(removal_index + remove_bytes.size(), buffer_.size());
+  uint8_t *old_buffer_start = buffer_.data() + old_start_index;
+  memmove(old_buffer_start + remove_bytes.size(), old_buffer_start,
+          removal_index - old_start_index);
+  allocated_size_ -= remove_bytes.size();
+
+  return std::span<uint8_t>{data(), allocated_size_};
+}
+
+void AlignedVectorAllocator::Deallocate(std::span<uint8_t>) {
+  if (!released_) {
+    CHECK(!buffer_.empty())
+        << ": Called Deallocate() without a prior allocation.";
+  }
+  released_ = false;
+  buffer_.resize(0);
+}
+
+aos::SharedSpan AlignedVectorAllocator::Release() {
+  absl::Span<uint8_t> span{data(), allocated_size_};
+  std::shared_ptr<SharedSpanHolder> result = std::make_shared<SharedSpanHolder>(
+      std::move(buffer_), absl::Span<const uint8_t>());
+  result->span = span;
+  released_ = true;
+  return aos::SharedSpan(result, &(result->span));
+}
+
+}  // namespace aos::fbs
diff --git a/aos/flatbuffers/aligned_allocator.h b/aos/flatbuffers/aligned_allocator.h
new file mode 100644
index 0000000..a974818
--- /dev/null
+++ b/aos/flatbuffers/aligned_allocator.h
@@ -0,0 +1,57 @@
+#ifndef AOS_FLATBUFFERS_ALIGNED_ALLOCATOR_H_
+#define AOS_FLATBUFFERS_ALIGNED_ALLOCATOR_H_
+
+#include <memory>
+#include <optional>
+#include <span>
+
+#include "glog/logging.h"
+
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/event_loop.h"
+#include "aos/flatbuffers/base.h"
+#include "aos/ipc_lib/data_alignment.h"
+
+namespace aos::fbs {
+
+// Allocator that uses an AllocatorResizeableBuffer to allow arbitrary-sized
+// allocations.  Aligns the end of the buffer to an alignment of
+// kChannelDataAlignment.
+class AlignedVectorAllocator : public fbs::Allocator {
+ public:
+  static constexpr size_t kAlignment = aos::kChannelDataAlignment;
+  AlignedVectorAllocator() {}
+  ~AlignedVectorAllocator();
+
+  std::optional<std::span<uint8_t>> Allocate(size_t size, size_t alignment,
+                                             fbs::SetZero set_zero) override;
+
+  std::optional<std::span<uint8_t>> InsertBytes(void *insertion_point,
+                                                size_t bytes, size_t alignment,
+                                                fbs::SetZero set_zero) override;
+
+  std::span<uint8_t> RemoveBytes(std::span<uint8_t> remove_bytes) override;
+
+  void Deallocate(std::span<uint8_t>) override;
+
+  aos::SharedSpan Release();
+
+ private:
+  struct SharedSpanHolder {
+    aos::AllocatorResizeableBuffer<
+        aos::AlignedReallocator<kChannelDataAlignment>>
+        buffer;
+    absl::Span<const uint8_t> span;
+  };
+  uint8_t *data() { return buffer_.data() + buffer_.size() - allocated_size_; }
+
+  aos::AllocatorResizeableBuffer<aos::AlignedReallocator<kChannelDataAlignment>>
+      buffer_;
+
+  size_t allocated_size_ = 0u;
+  bool released_ = false;
+};
+
+}  // namespace aos::fbs
+
+#endif  // AOS_FLATBUFFERS_ALIGNED_ALLOCATOR_H_
diff --git a/aos/flatbuffers/base.h b/aos/flatbuffers/base.h
index ff81c9a..a92cc91 100644
--- a/aos/flatbuffers/base.h
+++ b/aos/flatbuffers/base.h
@@ -1,5 +1,6 @@
 #ifndef AOS_FLATBUFFERS_BASE_H_
 #define AOS_FLATBUFFERS_BASE_H_
+
 #include <stdint.h>
 #include <sys/types.h>
 
@@ -15,6 +16,7 @@
 #include "glog/logging.h"
 
 namespace aos::fbs {
+
 using ::flatbuffers::soffset_t;
 using ::flatbuffers::uoffset_t;
 using ::flatbuffers::voffset_t;
diff --git a/aos/flatbuffers/base_test.cc b/aos/flatbuffers/base_test.cc
index f0eaf04..87d89fa 100644
--- a/aos/flatbuffers/base_test.cc
+++ b/aos/flatbuffers/base_test.cc
@@ -6,6 +6,8 @@
 
 #include "gtest/gtest.h"
 
+#include "aos/flatbuffers/aligned_allocator.h"
+
 namespace aos::fbs::testing {
 // Tests that PaddedSize() behaves as expected.
 TEST(BaseTest, PaddedSize) {
@@ -16,7 +18,7 @@
   EXPECT_EQ(8, PaddedSize(7, 4));
 }
 
-inline constexpr size_t kDefaultSize = 16;
+inline constexpr size_t kDefaultSize = AlignedVectorAllocator::kAlignment * 2;
 template <typename T>
 class AllocatorTest : public ::testing::Test {
  protected:
@@ -32,7 +34,8 @@
       allocator_(std::make_unique<SpanAllocator>(
           std::span<uint8_t>{buffer_.data(), buffer_.size()})) {}
 
-using AllocatorTypes = ::testing::Types<SpanAllocator, VectorAllocator>;
+using AllocatorTypes =
+    ::testing::Types<SpanAllocator, VectorAllocator, AlignedVectorAllocator>;
 TYPED_TEST_SUITE(AllocatorTest, AllocatorTypes);
 
 // Tests that we can create and not use a VectorAllocator.
@@ -79,6 +82,11 @@
 
 // Tests that we can remove bytes from an arbitrary spot in the buffer.
 TYPED_TEST(AllocatorTest, RemoveBytes) {
+  // Deletion doesn't require resizing, so we don't need to worry about it being
+  // larger than the alignment to test everything.  The test requires the size
+  // to be < 255 to store the sentinal values.
+  const size_t kDefaultSize = 128;
+
   const size_t half_size = kDefaultSize / 2;
   std::span<uint8_t> span =
       this->allocator_->Allocate(kDefaultSize, 4, SetZero::kYes).value();