Change LogReader API to be able to replace messages

The mutation API in LogReader was not able to express dropping messages,
or growing messages.  This enables more aggressive mutation.

Change-Id: I477482da4262483a780d15ebf8c98a51e37099f6
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/BUILD b/aos/BUILD
index de4ef4f..b6d357b 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -509,6 +509,7 @@
     deps = [
         "//aos:macros",
         "//aos/containers:resizeable_buffer",
+        "//aos/ipc_lib:data_alignment",
         "//aos/util:file",
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_github_google_glog//:glog",
diff --git a/aos/containers/resizeable_buffer.h b/aos/containers/resizeable_buffer.h
index 484d93e..664fa2f 100644
--- a/aos/containers/resizeable_buffer.h
+++ b/aos/containers/resizeable_buffer.h
@@ -55,6 +55,8 @@
   size_t size() const { return size_; }
   size_t capacity() const { return capacity_; }
 
+  bool empty() const { return size_ == 0; }
+
   void reserve(size_t new_size) {
     if (new_size > capacity_) {
       Allocate(new_size);
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 83393c2..ff629f9 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -166,7 +166,8 @@
 
   // Sends a single block of data by refcounting it to avoid copies.  The data
   // must not change after being passed into Send. The remote arguments have the
-  // same meaning as in Send above.
+  // same meaning as in Send above.  Note: some implmementations will have to
+  // copy anyways, but other implementations can skip the copy.
   Error Send(const SharedSpan data);
   Error Send(const SharedSpan data,
              monotonic_clock::time_point monotonic_remote_time,
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index f056bca..ec3d51e 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -929,6 +929,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
         ":multinode_logger_test_lib",
+        "//aos/flatbuffers:aligned_allocator",
     ],
 )
 
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index df1ed8c..192dade 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -1301,7 +1301,7 @@
     std::function<void()> notice_realtime_end, const Node *node,
     LogReader::State::ThreadedBuffering threading,
     std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
-    const std::vector<std::function<void(void *message)>>
+    const std::vector<std::function<SharedSpan(TimestampedMessage &)>>
         &before_send_callbacks)
     : timestamp_mapper_(std::move(timestamp_mapper)),
       timestamp_queue_strategy_(timestamp_queue_strategy),
@@ -1416,7 +1416,7 @@
   timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
 }
 
-bool LogReader::State::Send(const TimestampedMessage &&timestamped_message) {
+bool LogReader::State::Send(TimestampedMessage &&timestamped_message) {
   aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
   CHECK(sender);
   uint32_t remote_queue_index = 0xffffffff;
@@ -1506,21 +1506,31 @@
                  ->boot_uuid());
   }
 
+  SharedSpan to_send;
   // Right before sending allow the user to process the message.
   if (before_send_callbacks_[timestamped_message.channel_index]) {
-    // Only channels that are forwarded and sent from this State's node will be
-    // in the queue_index_map_
-    if (queue_index_map_[timestamped_message.channel_index]) {
-      before_send_callbacks_[timestamped_message.channel_index](
-          timestamped_message.data->mutable_data());
+    // Only channels which are forwarded and on the destination node have
+    // channel_source_state_ set to non-null.  See RegisterDuringStartup.
+    if (channel_source_state_[timestamped_message.channel_index] == nullptr) {
+      // It is safe in this case since there is only one caller to Send, and the
+      // data is not mutated after Send is called.
+      to_send = before_send_callbacks_[timestamped_message.channel_index](
+          timestamped_message);
+      *timestamped_message.data.get() = to_send;
+    } else {
+      to_send = *timestamped_message.data;
     }
+    if (!to_send) {
+      return false;
+    }
+  } else {
+    to_send = *timestamped_message.data;
   }
 
   // Send!  Use the replayed queue index here instead of the logged queue index
   // for the remote queue index.  This makes re-logging work.
   const RawSender::Error err = sender->Send(
-      SharedSpan(timestamped_message.data, &timestamped_message.data->span),
-      timestamped_message.monotonic_remote_time.time,
+      std::move(to_send), timestamped_message.monotonic_remote_time.time,
       timestamped_message.realtime_remote_time,
       timestamped_message.monotonic_remote_transmit_time.time,
       remote_queue_index,
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index fb19e78..6d70476 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -346,17 +346,25 @@
   // implementation. And, the callback is called only once one the Sender's Node
   // if the channel is forwarded.
   //
+  // The callback should have a signature like:
+  //   [](aos::examples::Ping *ping,
+  //      const TimestampedMessage &timestamped_message) -> SharedSpan {
+  //          if (drop) {
+  //            return nullptr;
+  //          } else {
+  //            return *timestamped_message.data;
+  //          }
+  //      }
+  //
+  // If nullptr is returned, the message will not be sent.
+  //
   // See multinode_logger_test for examples of usage.
-  template <typename Callback>
+  template <typename MessageType, typename Callback>
   void AddBeforeSendCallback(std::string_view channel_name,
                              Callback &&callback) {
     CHECK(!AreStatesInitialized())
         << ": Cannot add callbacks after calling Register";
 
-    using MessageType = typename std::remove_pointer<
-        typename event_loop_internal::watch_message_type_trait<
-            decltype(&Callback::operator())>::message_type>::type;
-
     const Channel *channel = configuration::GetChannel(
         logged_configuration(), channel_name,
         MessageType::GetFullyQualifiedName(), "", nullptr);
@@ -373,9 +381,16 @@
         << ":{ \"name\": \"" << channel_name << "\", \"type\": \""
         << MessageType::GetFullyQualifiedName() << "\" }";
 
-    before_send_callbacks_[channel_index] = [callback](void *message) {
-      callback(flatbuffers::GetMutableRoot<MessageType>(
-          reinterpret_cast<char *>(message)));
+    before_send_callbacks_[channel_index] =
+        [callback](TimestampedMessage &timestamped_message) -> SharedSpan {
+      // Note: the const_cast is because SharedSpan is defined to be a pointer
+      // to const data, even though it wraps mutable data.
+      // TODO(austin): Refactor to make it non-const properly to drop the const
+      // cast.
+      return callback(flatbuffers::GetMutableRoot<MessageType>(
+                          reinterpret_cast<char *>(const_cast<uint8_t *>(
+                              timestamped_message.data.get()->get()->data()))),
+                      timestamped_message);
     };
   }
 
@@ -460,7 +475,7 @@
           std::function<void()> notice_realtime_end, const Node *node,
           ThreadedBuffering threading,
           std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
-          const std::vector<std::function<void(void *message)>>
+          const std::vector<std::function<SharedSpan(TimestampedMessage &)>>
               &before_send_callbacks);
 
     // Connects up the timestamp mappers.
@@ -705,8 +720,9 @@
           std::max(monotonic_now(), next_time + clock_offset()));
     }
 
-    // Sends a buffer on the provided channel index.
-    bool Send(const TimestampedMessage &&timestamped_message);
+    // Sends a buffer on the provided channel index.  Returns true if the
+    // message was actually sent, and false otherwise.
+    bool Send(TimestampedMessage &&timestamped_message);
 
     void MaybeSetClockOffset();
     std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
@@ -886,7 +902,7 @@
     // indices of the channels to replay for the Node represented by
     // the instance of LogReader::State.
     std::unique_ptr<const ReplayChannelIndices> replay_channel_indices_;
-    const std::vector<std::function<void(void *message)>>
+    const std::vector<std::function<SharedSpan(TimestampedMessage &)>>
         before_send_callbacks_;
   };
 
@@ -934,7 +950,8 @@
 
   // The callbacks that will be called before sending a message indexed by the
   // channel index from the logged_configuration
-  std::vector<std::function<void(void *message)>> before_send_callbacks_;
+  std::vector<std::function<SharedSpan(TimestampedMessage &)>>
+      before_send_callbacks_;
 
   // If true, the replay timer will ignore any missing data.  This is used
   // during startup when we are bootstrapping everything and trying to get to
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index c01026f..30ed3d7 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1579,14 +1579,14 @@
   os << "{.channel_index=" << msg.channel_index
      << ", .queue_index=" << msg.queue_index
      << ", .timestamp=" << msg.timestamp;
-  if (msg.data != nullptr) {
-    if (msg.data->remote_queue_index.has_value()) {
-      os << ", .remote_queue_index=" << *msg.data->remote_queue_index;
+  if (msg.header != nullptr) {
+    if (msg.header->remote_queue_index.has_value()) {
+      os << ", .remote_queue_index=" << *msg.header->remote_queue_index;
     }
-    if (msg.data->monotonic_remote_time.has_value()) {
-      os << ", .monotonic_remote_time=" << *msg.data->monotonic_remote_time;
+    if (msg.header->monotonic_remote_time.has_value()) {
+      os << ", .monotonic_remote_time=" << *msg.header->monotonic_remote_time;
     }
-    os << ", .data=" << msg.data;
+    os << ", .header=" << msg.header;
   }
   os << "}";
   return os;
@@ -1614,7 +1614,7 @@
     os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
   }
   if (msg.data != nullptr) {
-    os << ", .data=" << *msg.data;
+    os << ", .data=" << msg.data.get();
   } else {
     os << ", .data=nullptr";
   }
@@ -1666,15 +1666,20 @@
         monotonic_remote_boot = *boot;
       }
 
-      messages_.insert(
-          Message{.channel_index = msg->channel_index,
-                  .queue_index = BootQueueIndex{.boot = parts().boot_count,
-                                                .index = msg->queue_index},
-                  .timestamp = BootTimestamp{.boot = parts().boot_count,
-                                             .time = msg->monotonic_sent_time},
-                  .monotonic_remote_boot = monotonic_remote_boot,
-                  .monotonic_timestamp_boot = monotonic_timestamp_boot,
-                  .data = std::move(msg)});
+      std::shared_ptr<SharedSpan> data =
+          std::make_shared<SharedSpan>(msg, &msg->span);
+
+      messages_.insert(Message{
+          .channel_index = msg->channel_index,
+          .queue_index = BootQueueIndex{.boot = parts().boot_count,
+                                        .index = msg->queue_index},
+          .timestamp = BootTimestamp{.boot = parts().boot_count,
+                                     .time = msg->monotonic_sent_time},
+          .monotonic_remote_boot = monotonic_remote_boot,
+          .monotonic_timestamp_boot = monotonic_timestamp_boot,
+          .header = std::move(msg),
+          .data = std::move(data),
+      });
 
       // Now, update sorted_until_ to match the new message.
       if (parts_message_reader_.newest_timestamp() >
@@ -1827,15 +1832,15 @@
     } else if (*msg == *oldest) {
       // Found a duplicate.  If there is a choice, we want the one which has
       // the timestamp time.
-      if (!msg->data->has_monotonic_timestamp_time) {
+      if (!msg->header->has_monotonic_timestamp_time) {
         message_sorter.PopFront();
-      } else if (!oldest->data->has_monotonic_timestamp_time) {
+      } else if (!oldest->header->has_monotonic_timestamp_time) {
         current_->PopFront();
         current_ = &message_sorter;
         oldest = msg;
       } else {
-        CHECK_EQ(msg->data->monotonic_timestamp_time,
-                 oldest->data->monotonic_timestamp_time);
+        CHECK_EQ(msg->header->monotonic_timestamp_time,
+                 oldest->header->monotonic_timestamp_time);
         message_sorter.PopFront();
       }
     }
@@ -2037,26 +2042,30 @@
     }
     CHECK_LT(msg->channel_index, source_node.size());
     if (source_node[msg->channel_index] != static_cast<size_t>(node())) {
-      timestamp_messages_.emplace_back(TimestampedMessage{
+      TimestampedMessage timestamped_message{
           .channel_index = msg->channel_index,
           .queue_index = msg->queue_index,
           .monotonic_event_time = msg->timestamp,
-          .realtime_event_time = msg->data->realtime_sent_time,
+          .realtime_event_time = msg->header->realtime_sent_time,
           .remote_queue_index =
               BootQueueIndex{.boot = msg->monotonic_remote_boot,
-                             .index = msg->data->remote_queue_index.value()},
+                             .index = msg->header->remote_queue_index.value()},
           .monotonic_remote_time = {msg->monotonic_remote_boot,
-                                    msg->data->monotonic_remote_time.value()},
-          .realtime_remote_time = msg->data->realtime_remote_time.value(),
+                                    msg->header->monotonic_remote_time.value()},
+          .realtime_remote_time = msg->header->realtime_remote_time.value(),
           .monotonic_remote_transmit_time =
               {msg->monotonic_remote_boot,
-               msg->data->monotonic_remote_transmit_time},
+               msg->header->monotonic_remote_transmit_time},
           .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
-                                       msg->data->monotonic_timestamp_time},
-          .data = std::move(msg->data)});
+                                       msg->header->monotonic_timestamp_time},
+          .data = msg->data,
+      };
 
-      VLOG(2) << this << " Queued timestamp of " << timestamp_messages_.back();
-      fn(&timestamp_messages_.back());
+      fn(&timestamped_message);
+
+      VLOG(2) << this << " Queued timestamp of " << timestamped_message;
+
+      timestamp_messages_.emplace_back(std::move(*msg));
     } else {
       VLOG(2) << this << " Dropped data";
     }
@@ -2100,25 +2109,12 @@
     CHECK(queue_timestamps_ran_);
   }
 
-  // timestamp_messages_ is a queue of TimestampedMessage, but we are supposed
-  // to return a Message.  We need to convert the first message in the list
-  // before returning it (and comparing, honestly).  Fill next_timestamp_ in if
-  // it is empty so the rest of the logic here can just look at next_timestamp_
-  // and use that instead.
-  if (!next_timestamp_ && !timestamp_messages_.empty()) {
-    auto &front = timestamp_messages_.front();
-    next_timestamp_ = Message{
-        .channel_index = front.channel_index,
-        .queue_index = front.queue_index,
-        .timestamp = front.monotonic_event_time,
-        .monotonic_remote_boot = front.remote_queue_index.boot,
-        .monotonic_timestamp_boot = front.monotonic_timestamp_time.boot,
-        .data = std::move(front.data),
-    };
-    timestamp_messages_.pop_front();
+  const Message *timestamp_messages_front = nullptr;
+  if (!timestamp_messages_.empty()) {
+    timestamp_messages_front = &timestamp_messages_.front();
   }
 
-  if (!next_timestamp_) {
+  if (!timestamp_messages_front) {
     message_source_ = MessageSource::kBootMerger;
     if (boot_merger_front != nullptr) {
       VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
@@ -2134,15 +2130,15 @@
     message_source_ = MessageSource::kTimestampMessage;
 
     VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
-            << next_timestamp_.value();
-    return &next_timestamp_.value();
+            << *timestamp_messages_front;
+    return timestamp_messages_front;
   }
 
-  if (*boot_merger_front <= next_timestamp_.value()) {
-    if (*boot_merger_front == next_timestamp_.value()) {
+  if (*boot_merger_front <= *timestamp_messages_front) {
+    if (*boot_merger_front == *timestamp_messages_front) {
       VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
               << " Dropping duplicate timestamp.";
-      next_timestamp_.reset();
+      timestamp_messages_.pop_front();
     }
     message_source_ = MessageSource::kBootMerger;
     if (boot_merger_front != nullptr) {
@@ -2156,16 +2152,16 @@
   } else {
     message_source_ = MessageSource::kTimestampMessage;
     VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
-            << next_timestamp_.value();
-    return &next_timestamp_.value();
+            << *timestamp_messages_front;
+    return timestamp_messages_front;
   }
 }
 
 void SplitTimestampBootMerger::PopFront() {
   switch (message_source_) {
     case MessageSource::kTimestampMessage:
-      CHECK(next_timestamp_.has_value());
-      next_timestamp_.reset();
+      CHECK(!timestamp_messages_.empty());
+      timestamp_messages_.pop_front();
       break;
     case MessageSource::kBootMerger:
       boot_merger_.PopFront();
@@ -2246,7 +2242,7 @@
       .channel_index = msg->channel_index,
       .queue_index = msg->queue_index,
       .monotonic_event_time = msg->timestamp,
-      .realtime_event_time = msg->data->realtime_sent_time,
+      .realtime_event_time = msg->header->realtime_sent_time,
       .remote_queue_index = BootQueueIndex::Invalid(),
       .monotonic_remote_time = BootTimestamp::min_time(),
       .realtime_remote_time = realtime_clock::min_time,
@@ -2368,18 +2364,18 @@
         .channel_index = msg->channel_index,
         .queue_index = msg->queue_index,
         .monotonic_event_time = msg->timestamp,
-        .realtime_event_time = msg->data->realtime_sent_time,
+        .realtime_event_time = msg->header->realtime_sent_time,
         .remote_queue_index =
             BootQueueIndex{.boot = msg->monotonic_remote_boot,
-                           .index = msg->data->remote_queue_index.value()},
+                           .index = msg->header->remote_queue_index.value()},
         .monotonic_remote_time = {msg->monotonic_remote_boot,
-                                  msg->data->monotonic_remote_time.value()},
-        .realtime_remote_time = msg->data->realtime_remote_time.value(),
+                                  msg->header->monotonic_remote_time.value()},
+        .realtime_remote_time = msg->header->realtime_remote_time.value(),
         .monotonic_remote_transmit_time =
             {msg->monotonic_remote_boot,
-             msg->data->monotonic_remote_transmit_time},
+             msg->header->monotonic_remote_transmit_time},
         .monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
-                                     msg->data->monotonic_timestamp_time},
+                                     msg->header->monotonic_timestamp_time},
         .data = std::move(data.data)});
     VLOG(1) << node_name() << " Inserted timestamp "
             << matched_messages_.back();
@@ -2446,23 +2442,23 @@
 
 Message TimestampMapper::MatchingMessageFor(const Message &message) {
   // Figure out what queue index we are looking for.
-  CHECK_NOTNULL(message.data);
-  CHECK(message.data->remote_queue_index.has_value());
+  CHECK_NOTNULL(message.header);
+  CHECK(message.header->remote_queue_index.has_value());
   const BootQueueIndex remote_queue_index =
       BootQueueIndex{.boot = message.monotonic_remote_boot,
-                     .index = *message.data->remote_queue_index};
+                     .index = *message.header->remote_queue_index};
 
-  CHECK(message.data->monotonic_remote_time.has_value());
-  CHECK(message.data->realtime_remote_time.has_value());
+  CHECK(message.header->monotonic_remote_time.has_value());
+  CHECK(message.header->realtime_remote_time.has_value());
 
   const BootTimestamp monotonic_remote_time{
       .boot = message.monotonic_remote_boot,
-      .time = message.data->monotonic_remote_time.value()};
+      .time = message.header->monotonic_remote_time.value()};
   const realtime_clock::time_point realtime_remote_time =
-      *message.data->realtime_remote_time;
+      *message.header->realtime_remote_time;
 
   TimestampMapper *peer =
-      nodes_data_[source_node_[message.data->channel_index]].peer;
+      nodes_data_[source_node_[message.header->channel_index]].peer;
 
   // We only register the peers which we have data for.  So, if we are being
   // asked to pull a timestamp from a peer which doesn't exist, return an
@@ -2475,6 +2471,7 @@
                    .timestamp = monotonic_remote_time,
                    .monotonic_remote_boot = 0xffffff,
                    .monotonic_timestamp_boot = 0xffffff,
+                   .header = nullptr,
                    .data = nullptr};
   }
 
@@ -2490,6 +2487,7 @@
                    .timestamp = monotonic_remote_time,
                    .monotonic_remote_boot = 0xffffff,
                    .monotonic_timestamp_boot = 0xffffff,
+                   .header = nullptr,
                    .data = nullptr};
   }
 
@@ -2500,6 +2498,7 @@
                    .timestamp = monotonic_remote_time,
                    .monotonic_remote_boot = 0xffffff,
                    .monotonic_timestamp_boot = 0xffffff,
+                   .header = nullptr,
                    .data = nullptr};
   }
 
@@ -2520,7 +2519,7 @@
 
     CHECK_EQ(result.timestamp, monotonic_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
-    CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+    CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
     // Now drop the data off the front.  We have deduplicated timestamps, so we
     // are done.  And all the data is in order.
@@ -2544,6 +2543,7 @@
                      .timestamp = monotonic_remote_time,
                      .monotonic_remote_boot = 0xffffff,
                      .monotonic_timestamp_boot = 0xffffff,
+                     .header = nullptr,
                      .data = nullptr};
     }
 
@@ -2552,7 +2552,7 @@
     CHECK_EQ(result.timestamp, monotonic_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please "
            "investigate!";
-    CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+    CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please "
            "investigate!";
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f9a62b1..bb312eb 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -489,12 +489,6 @@
   // pointer to track.
   absl::Span<const uint8_t> span;
 
-  // Used to be able to mutate the data in the span. This is only used for
-  // mutating the message inside of LogReader for the Before Send Callback. It
-  // is safe in this case since there is only one caller to Send, and the data
-  // is not mutated after Send is called.
-  uint8_t *mutable_data() { return const_cast<uint8_t *>(span.data()); }
-
   char actual_data[];
 
  private:
@@ -526,7 +520,13 @@
 
   size_t monotonic_timestamp_boot = 0xffffff;
 
-  std::shared_ptr<UnpackedMessageHeader> data;
+  // Pointer to the unpacked header.
+  std::shared_ptr<UnpackedMessageHeader> header;
+
+  // Pointer to a pointer to the span with the flatbuffer to publish in it.  The
+  // second layer of indirection lets us modify all copies of a message when
+  // sending inside the log reader.
+  std::shared_ptr<SharedSpan> data;
 
   bool operator<(const Message &m2) const;
   bool operator<=(const Message &m2) const;
@@ -554,7 +554,10 @@
 
   BootTimestamp monotonic_timestamp_time;
 
-  std::shared_ptr<UnpackedMessageHeader> data;
+  // Pointer to a pointer to the data.  If the outer pointer isn't populated, no
+  // data exists to send, we only have the timestamps. If the inner pointer is
+  // nullptr, the user has marked the message as something to not send.
+  std::shared_ptr<SharedSpan> data;
 };
 
 std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
@@ -807,14 +810,8 @@
   // Boot merger for just timestamps.  Any data read from here is to be ignored.
   std::unique_ptr<BootMerger> timestamp_boot_merger_;
 
-  // The callback requires us to convert each message to a TimestampedMessage.
-  std::deque<TimestampedMessage> timestamp_messages_;
-
-  // Storage for the next timestamp message to return.  This is separate so we
-  // can convert them back to a Message.
-  //
-  // TODO(austin): It would be nice to not have to convert...
-  std::optional<Message> next_timestamp_;
+  // Deque of all the timestamp messages.
+  std::deque<Message> timestamp_messages_;
 
   // Start times for each boot.
   std::vector<monotonic_clock::time_point> monotonic_start_time_;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 4ceca20..87c6f5d 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -274,6 +274,7 @@
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
              .monotonic_remote_boot = 0xffffff,
              .monotonic_timestamp_boot = 0xffffff,
+             .header = nullptr,
              .data = nullptr};
   Message m2{.channel_index = 0,
              .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
@@ -281,6 +282,7 @@
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
              .monotonic_remote_boot = 0xffffff,
              .monotonic_timestamp_boot = 0xffffff,
+             .header = nullptr,
              .data = nullptr};
 
   EXPECT_LT(m1, m2);
@@ -960,24 +962,24 @@
 
   EXPECT_EQ(output[0].timestamp.boot, 0u);
   EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
-  EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
+  EXPECT_FALSE(output[0].header->has_monotonic_timestamp_time);
 
   EXPECT_EQ(output[1].timestamp.boot, 0u);
   EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
-  EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
-  EXPECT_EQ(output[1].data->monotonic_timestamp_time,
+  EXPECT_TRUE(output[1].header->has_monotonic_timestamp_time);
+  EXPECT_EQ(output[1].header->monotonic_timestamp_time,
             monotonic_clock::time_point(std::chrono::nanoseconds(971)));
 
   EXPECT_EQ(output[2].timestamp.boot, 0u);
   EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
-  EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
-  EXPECT_EQ(output[2].data->monotonic_timestamp_time,
+  EXPECT_TRUE(output[2].header->has_monotonic_timestamp_time);
+  EXPECT_EQ(output[2].header->monotonic_timestamp_time,
             monotonic_clock::time_point(std::chrono::nanoseconds(972)));
 
   EXPECT_EQ(output[3].timestamp.boot, 0u);
   EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
-  EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
-  EXPECT_EQ(output[3].data->monotonic_timestamp_time,
+  EXPECT_TRUE(output[3].header->has_monotonic_timestamp_time);
+  EXPECT_EQ(output[3].header->monotonic_timestamp_time,
             monotonic_clock::time_point(std::chrono::nanoseconds(973)));
 }
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 99b2359..c810203 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -155,9 +155,15 @@
   // passing in a separate config.
   LogReader reader(logfile, &config_.message());
 
-  reader.AddBeforeSendCallback("/test", [](aos::examples::Ping *ping) {
-    ping->mutate_value(ping->value() + 1);
-  });
+  const uint8_t *data_ptr = nullptr;
+  reader.AddBeforeSendCallback<aos::examples::Ping>(
+      "/test",
+      [&data_ptr](aos::examples::Ping *ping,
+                  const TimestampedMessage &timestamped_message) -> SharedSpan {
+        ping->mutate_value(ping->value() + 10000);
+        data_ptr = timestamped_message.data.get()->get()->data();
+        return *timestamped_message.data;
+      });
 
   // This sends out the fetched messages and advances time to the start of the
   // log file.
@@ -170,15 +176,21 @@
 
   // Confirm that the ping and pong counts both match, and the value also
   // matches.
-  int ping_count = 10;
-  test_event_loop->MakeWatcher("/test",
-                               [&ping_count](const examples::Ping &ping) {
-                                 ++ping_count;
-                                 EXPECT_EQ(ping.value(), ping_count);
-                               });
+  int ping_count = 10010;
+  test_event_loop->MakeWatcher(
+      "/test",
+      [&test_event_loop, &data_ptr, &ping_count](const examples::Ping &ping) {
+        ++ping_count;
+        EXPECT_EQ(ping.value(), ping_count);
+        // Since simulated event loops (especially log replay) refcount the
+        // shared data, we can verify if the right data got published by
+        // verifying that the actual pointer to the flatbuffer matches.  This
+        // only is guarenteed to hold during this callback.
+        EXPECT_EQ(test_event_loop->context().data, data_ptr);
+      });
 
   reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
-  EXPECT_EQ(ping_count, 2010);
+  EXPECT_EQ(ping_count, 12010);
 }
 
 // Tests calling StartLogging twice.
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 2cf8ffe..c252616 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -8,6 +8,7 @@
 #include "aos/events/message_counter.h"
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
+#include "aos/flatbuffers/aligned_allocator.h"
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/timestamp_generated.h"
 #include "aos/testing/tmpdir.h"
@@ -600,7 +601,7 @@
 }
 
 // MultinodeLoggerTest that tests the mutate callback works across multiple
-// nodes with remapping
+// nodes with remapping.
 TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
   time_converter_.StartEqual();
   std::vector<std::string> actual_filenames;
@@ -629,14 +630,18 @@
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
 
-  int pong_count = 0;
+  int pong_count = 10;
   // Adds a callback which mutates the value of the pong message before the
   // message is sent which is the feature we are testing here
-  reader.AddBeforeSendCallback("/test",
-                               [&pong_count](aos::examples::Pong *pong) {
-                                 pong->mutate_value(pong->value() + 1);
-                                 pong_count = pong->value();
-                               });
+  reader.AddBeforeSendCallback<aos::examples::Pong>(
+      "/test",
+      [&pong_count](
+          aos::examples::Pong *pong,
+          const TimestampedMessage &timestamped_message) -> SharedSpan {
+        pong->mutate_value(pong_count + 1);
+        ++pong_count;
+        return *timestamped_message.data;
+      });
 
   // This sends out the fetched messages and advances time to the start of the
   // log file.
@@ -698,14 +703,18 @@
 
   LogReader reader(sorted_parts, &config_.message());
 
-  int pong_count = 0;
+  int pong_count = 10;
   // Adds a callback which mutates the value of the pong message before the
   // message is sent which is the feature we are testing here
-  reader.AddBeforeSendCallback("/test",
-                               [&pong_count](aos::examples::Pong *pong) {
-                                 pong->mutate_value(pong->value() + 1);
-                                 pong_count = pong->value();
-                               });
+  reader.AddBeforeSendCallback<aos::examples::Pong>(
+      "/test",
+      [&pong_count](
+          aos::examples::Pong *pong,
+          const TimestampedMessage &timestamped_message) -> SharedSpan {
+        pong->mutate_value(pong_count + 1);
+        ++pong_count;
+        return *timestamped_message.data;
+      });
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
 
@@ -772,11 +781,15 @@
   int ping_count = 0;
   // Adds a callback which mutates the value of the pong message before the
   // message is sent which is the feature we are testing here
-  reader.AddBeforeSendCallback("/test",
-                               [&ping_count](aos::examples::Ping *ping) {
-                                 ++ping_count;
-                                 ping->mutate_value(ping_count);
-                               });
+  reader.AddBeforeSendCallback<aos::examples::Ping>(
+      "/test",
+      [&ping_count](
+          aos::examples::Ping *ping,
+          const TimestampedMessage &timestamped_message) -> SharedSpan {
+        ++ping_count;
+        ping->mutate_value(ping_count);
+        return *timestamped_message.data;
+      });
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -821,6 +834,291 @@
   reader.Deregister();
 }
 
+// MultinodeLoggerTest that tests the mutate callback can fully replace the
+// message.
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackReplacement) {
+  time_converter_.StartEqual();
+  std::vector<std::string> actual_filenames;
+
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+  EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+
+  LogReader reader(sorted_parts, &config_.message());
+
+  int pong_count = 10;
+  const uint8_t *data_ptr = nullptr;
+  // Adds a callback which replaces the pong message before the message is sent.
+  reader.AddBeforeSendCallback<aos::examples::Pong>(
+      "/test",
+      [&pong_count, &data_ptr](aos::examples::Pong *pong,
+                               const TimestampedMessage &) -> SharedSpan {
+        fbs::AlignedVectorAllocator allocator;
+        aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
+        CHECK(pong_static->FromFlatbuffer(*pong));
+
+        pong_static->set_value(pong_count + 101);
+        ++pong_count;
+
+        SharedSpan result = allocator.Release();
+
+        data_ptr = result->data();
+
+        return result;
+      });
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  int pi1_pong_count = 10;
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
+                &data_ptr](const examples::Pong &pong) {
+        ++pi1_pong_count;
+        // Since simulated event loops (especially log replay) refcount the
+        // shared data, we can verify if the right data got published by
+        // verifying that the actual pointer to the flatbuffer matches.  This
+        // only is guarenteed to hold during this callback.
+        EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
+        EXPECT_EQ(pong_count + 100, pong.value());
+        EXPECT_EQ(pi1_pong_count + 101, pong.value());
+      });
+
+  pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
+                                        &data_ptr](const examples::Pong &pong) {
+    // Same goes for the forwarded side, that should be the same contents too.
+    EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
+    EXPECT_EQ(pong_count + 100, pong.value());
+  });
+
+  reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+  reader.Deregister();
+
+  EXPECT_EQ(pong_count, 2011);
+}
+
+// MultinodeLoggerTest that tests the mutate callback can delete messages by
+// returning nullptr.
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackDelete) {
+  time_converter_.StartEqual();
+  std::vector<std::string> actual_filenames;
+
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+  EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+
+  LogReader reader(sorted_parts, &config_.message());
+
+  int pong_count = 10;
+  const uint8_t *data_ptr = nullptr;
+  // Adds a callback which mutates the value of the pong message before the
+  // message is sent which is the feature we are testing here
+  reader.AddBeforeSendCallback<aos::examples::Pong>(
+      "/test",
+      [&pong_count, &data_ptr](aos::examples::Pong *pong,
+                               const TimestampedMessage &) -> SharedSpan {
+        fbs::AlignedVectorAllocator allocator;
+        aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
+        CHECK(pong_static->FromFlatbuffer(*pong));
+
+        pong_static->set_value(pong_count + 101);
+        ++pong_count;
+
+        if ((pong_count % 2) == 0) {
+          data_ptr = nullptr;
+          return nullptr;
+        }
+
+        SharedSpan result = allocator.Release();
+
+        data_ptr = result->data();
+
+        return result;
+      });
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  int pi1_pong_count = 10;
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
+                &data_ptr](const examples::Pong &pong) {
+        pi1_pong_count += 2;
+        // Since simulated event loops (especially log replay) refcount the
+        // shared data, we can verify if the right data got published by
+        // verifying that the actual pointer to the flatbuffer matches.  This
+        // only is guarenteed to hold during this callback.
+        EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
+        EXPECT_EQ(pong_count + 100, pong.value());
+        EXPECT_EQ(pi1_pong_count + 101, pong.value());
+      });
+
+  pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
+                                        &data_ptr](const examples::Pong &pong) {
+    // Same goes for the forwarded side, that should be the same contents too.
+    EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
+    EXPECT_EQ(pong_count + 100, pong.value());
+  });
+
+  reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+  reader.Deregister();
+
+  EXPECT_EQ(pong_count, 2011);
+  // Since we count up by 2 each time we get a message, and the last pong gets
+  // dropped since it is an odd number we expect the number on pi1 to be 1 less.
+  EXPECT_EQ(pi1_pong_count, 2010);
+}
+
+// MultinodeLoggerTest that tests that non-forwarded channels are able to be
+// mutated.
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackNotForwarded) {
+  time_converter_.StartEqual();
+  std::vector<std::string> actual_filenames;
+
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    pi1_logger.AppendAllFilenames(&actual_filenames);
+    pi2_logger.AppendAllFilenames(&actual_filenames);
+  }
+
+  const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+  EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+
+  LogReader reader(sorted_parts, &config_.message());
+
+  int ping_count = 10;
+  const uint8_t *data_ptr = nullptr;
+  // Adds a callback which mutates the value of the pong message before the
+  // message is sent which is the feature we are testing here
+  reader.AddBeforeSendCallback<aos::examples::Ping>(
+      "/pi1/aos",
+      [&ping_count, &data_ptr](aos::examples::Ping *ping,
+                               const TimestampedMessage &) -> SharedSpan {
+        fbs::AlignedVectorAllocator allocator;
+        aos::fbs::Builder<aos::examples::PingStatic> ping_static(&allocator);
+        CHECK(ping_static->FromFlatbuffer(*ping));
+
+        ping_static->set_value(ping_count + 101);
+        ++ping_count;
+
+        SharedSpan result = allocator.Release();
+
+        data_ptr = result->data();
+
+        return result;
+      });
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  EXPECT_THAT(reader.LoggedNodes(),
+              ::testing::ElementsAre(
+                  configuration::GetNode(reader.logged_configuration(), pi1),
+                  configuration::GetNode(reader.logged_configuration(), pi2)));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  int pi1_ping_count = 10;
+  pi1_event_loop->MakeWatcher(
+      "/aos", [&pi1_event_loop, &ping_count, &pi1_ping_count,
+               &data_ptr](const examples::Ping &ping) {
+        ++pi1_ping_count;
+        // Since simulated event loops (especially log replay) refcount the
+        // shared data, we can verify if the right data got published by
+        // verifying that the actual pointer to the flatbuffer matches.  This
+        // only is guarenteed to hold during this callback.
+        EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
+        EXPECT_EQ(ping_count + 100, ping.value());
+        EXPECT_EQ(pi1_ping_count + 101, ping.value());
+      });
+
+  reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+  reader.Deregister();
+
+  EXPECT_EQ(ping_count, 2011);
+}
+
 // Tests that we do not allow adding callbacks after Register is called
 TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
   time_converter_.StartEqual();
@@ -848,9 +1146,13 @@
   reader.Register(&log_reader_factory);
   EXPECT_DEATH(
       {
-        reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
-          LOG(FATAL) << "This should not be called";
-        });
+        reader.AddBeforeSendCallback<aos::examples::Pong>(
+            "/test",
+            [](aos::examples::Pong *,
+               const TimestampedMessage &timestamped_message) -> SharedSpan {
+              LOG(FATAL) << "This should not be called";
+              return *timestamped_message.data;
+            });
       },
       "Cannot add callbacks after calling Register");
   reader.Deregister();
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 607303c..e5de120 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -9,6 +9,7 @@
 #include "glog/logging.h"
 
 #include "aos/containers/resizeable_buffer.h"
+#include "aos/ipc_lib/data_alignment.h"
 #include "aos/macros.h"
 #include "aos/util/file.h"
 
diff --git a/aos/flatbuffers/BUILD b/aos/flatbuffers/BUILD
index 32f1d39..1d6c602 100644
--- a/aos/flatbuffers/BUILD
+++ b/aos/flatbuffers/BUILD
@@ -33,6 +33,7 @@
     name = "base_test",
     srcs = ["base_test.cc"],
     deps = [
+        ":aligned_allocator",
         ":base",
         "//aos/testing:googletest",
     ],
@@ -165,3 +166,17 @@
     srcs = ["test_static.h"],
     visibility = [":__subpackages__"],
 )
+
+cc_library(
+    name = "aligned_allocator",
+    srcs = ["aligned_allocator.cc"],
+    hdrs = ["aligned_allocator.h"],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":base",
+        "//aos/containers:resizeable_buffer",
+        "//aos/events:event_loop",
+        "//aos/ipc_lib:data_alignment",
+        "@com_github_google_glog//:glog",
+    ],
+)
diff --git a/aos/flatbuffers/aligned_allocator.cc b/aos/flatbuffers/aligned_allocator.cc
new file mode 100644
index 0000000..d2c47e5
--- /dev/null
+++ b/aos/flatbuffers/aligned_allocator.cc
@@ -0,0 +1,96 @@
+#include "aos/flatbuffers/aligned_allocator.h"
+
+namespace aos::fbs {
+
+AlignedVectorAllocator::~AlignedVectorAllocator() {
+  CHECK(buffer_.empty())
+      << ": Must deallocate before destroying the AlignedVectorAllocator.";
+}
+
+std::optional<std::span<uint8_t>> AlignedVectorAllocator::Allocate(
+    size_t size, size_t /*alignment*/, fbs::SetZero set_zero) {
+  CHECK(buffer_.empty()) << ": Must deallocate before calling Allocate().";
+  buffer_.resize(((size + kAlignment - 1) / kAlignment) * kAlignment);
+  allocated_size_ = size;
+  if (set_zero == fbs::SetZero::kYes) {
+    memset(buffer_.data(), 0, buffer_.size());
+  }
+
+  return std::span<uint8_t>{data(), allocated_size_};
+}
+
+std::optional<std::span<uint8_t>> AlignedVectorAllocator::InsertBytes(
+    void *insertion_point, size_t bytes, size_t /*alignment*/,
+    fbs::SetZero set_zero) {
+  DCHECK_GE(reinterpret_cast<const uint8_t *>(insertion_point), data());
+  DCHECK_LE(reinterpret_cast<const uint8_t *>(insertion_point),
+            data() + allocated_size_);
+  const size_t buffer_offset =
+      reinterpret_cast<const uint8_t *>(insertion_point) - data();
+  // TODO(austin): This has an extra memcpy in it that isn't strictly needed
+  // when we resize.  Remove it if performance is a concern.
+  const size_t absolute_buffer_offset =
+      reinterpret_cast<const uint8_t *>(insertion_point) - buffer_.data();
+  const size_t previous_size = buffer_.size();
+
+  buffer_.resize(((allocated_size_ + bytes + kAlignment - 1) / kAlignment) *
+                 kAlignment);
+
+  // Now, we've got space both before and after the block of data.  Move the
+  // data after to the end, and the data before to the start.
+
+  const size_t new_space_after = buffer_.size() - previous_size;
+
+  // Move the rest of the data to be end aligned.  If the buffer wasn't resized,
+  // this will be a nop.
+  memmove(buffer_.data() + absolute_buffer_offset + new_space_after,
+          buffer_.data() + absolute_buffer_offset,
+          previous_size - absolute_buffer_offset);
+
+  // Now, move the data at the front to be aligned too.
+  memmove(buffer_.data() + buffer_.size() - (allocated_size_ + bytes),
+          buffer_.data() + previous_size - allocated_size_,
+          allocated_size_ - (previous_size - absolute_buffer_offset));
+
+  if (set_zero == fbs::SetZero::kYes) {
+    memset(data() - bytes + buffer_offset, 0, bytes);
+  }
+  allocated_size_ += bytes;
+
+  return std::span<uint8_t>{data(), allocated_size_};
+}
+
+std::span<uint8_t> AlignedVectorAllocator::RemoveBytes(
+    std::span<uint8_t> remove_bytes) {
+  const ssize_t removal_index = remove_bytes.data() - buffer_.data();
+  const size_t old_start_index = buffer_.size() - allocated_size_;
+  CHECK_LE(static_cast<ssize_t>(old_start_index), removal_index);
+  CHECK_LE(removal_index, static_cast<ssize_t>(buffer_.size()));
+  CHECK_LE(removal_index + remove_bytes.size(), buffer_.size());
+  uint8_t *old_buffer_start = buffer_.data() + old_start_index;
+  memmove(old_buffer_start + remove_bytes.size(), old_buffer_start,
+          removal_index - old_start_index);
+  allocated_size_ -= remove_bytes.size();
+
+  return std::span<uint8_t>{data(), allocated_size_};
+}
+
+void AlignedVectorAllocator::Deallocate(std::span<uint8_t>) {
+  if (!released_) {
+    CHECK(!buffer_.empty())
+        << ": Called Deallocate() without a prior allocation.";
+  }
+  released_ = false;
+  buffer_.resize(0);
+}
+
+aos::SharedSpan AlignedVectorAllocator::Release() {
+  absl::Span<uint8_t> span{data(), allocated_size_};
+  std::shared_ptr<SharedSpanHolder> result = std::make_shared<SharedSpanHolder>(
+      std::move(buffer_), absl::Span<const uint8_t>());
+  result->span = span;
+  released_ = true;
+  return aos::SharedSpan(result, &(result->span));
+}
+
+}  // namespace aos::fbs
diff --git a/aos/flatbuffers/aligned_allocator.h b/aos/flatbuffers/aligned_allocator.h
new file mode 100644
index 0000000..a974818
--- /dev/null
+++ b/aos/flatbuffers/aligned_allocator.h
@@ -0,0 +1,57 @@
+#ifndef AOS_FLATBUFFERS_ALIGNED_ALLOCATOR_H_
+#define AOS_FLATBUFFERS_ALIGNED_ALLOCATOR_H_
+
+#include <memory>
+#include <optional>
+#include <span>
+
+#include "glog/logging.h"
+
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/event_loop.h"
+#include "aos/flatbuffers/base.h"
+#include "aos/ipc_lib/data_alignment.h"
+
+namespace aos::fbs {
+
+// Allocator that uses an AllocatorResizeableBuffer to allow arbitrary-sized
+// allocations.  Aligns the end of the buffer to an alignment of
+// kChannelDataAlignment.
+class AlignedVectorAllocator : public fbs::Allocator {
+ public:
+  static constexpr size_t kAlignment = aos::kChannelDataAlignment;
+  AlignedVectorAllocator() {}
+  ~AlignedVectorAllocator();
+
+  std::optional<std::span<uint8_t>> Allocate(size_t size, size_t alignment,
+                                             fbs::SetZero set_zero) override;
+
+  std::optional<std::span<uint8_t>> InsertBytes(void *insertion_point,
+                                                size_t bytes, size_t alignment,
+                                                fbs::SetZero set_zero) override;
+
+  std::span<uint8_t> RemoveBytes(std::span<uint8_t> remove_bytes) override;
+
+  void Deallocate(std::span<uint8_t>) override;
+
+  aos::SharedSpan Release();
+
+ private:
+  struct SharedSpanHolder {
+    aos::AllocatorResizeableBuffer<
+        aos::AlignedReallocator<kChannelDataAlignment>>
+        buffer;
+    absl::Span<const uint8_t> span;
+  };
+  uint8_t *data() { return buffer_.data() + buffer_.size() - allocated_size_; }
+
+  aos::AllocatorResizeableBuffer<aos::AlignedReallocator<kChannelDataAlignment>>
+      buffer_;
+
+  size_t allocated_size_ = 0u;
+  bool released_ = false;
+};
+
+}  // namespace aos::fbs
+
+#endif  // AOS_FLATBUFFERS_ALIGNED_ALLOCATOR_H_
diff --git a/aos/flatbuffers/base.h b/aos/flatbuffers/base.h
index ff81c9a..a92cc91 100644
--- a/aos/flatbuffers/base.h
+++ b/aos/flatbuffers/base.h
@@ -1,5 +1,6 @@
 #ifndef AOS_FLATBUFFERS_BASE_H_
 #define AOS_FLATBUFFERS_BASE_H_
+
 #include <stdint.h>
 #include <sys/types.h>
 
@@ -15,6 +16,7 @@
 #include "glog/logging.h"
 
 namespace aos::fbs {
+
 using ::flatbuffers::soffset_t;
 using ::flatbuffers::uoffset_t;
 using ::flatbuffers::voffset_t;
diff --git a/aos/flatbuffers/base_test.cc b/aos/flatbuffers/base_test.cc
index f0eaf04..87d89fa 100644
--- a/aos/flatbuffers/base_test.cc
+++ b/aos/flatbuffers/base_test.cc
@@ -6,6 +6,8 @@
 
 #include "gtest/gtest.h"
 
+#include "aos/flatbuffers/aligned_allocator.h"
+
 namespace aos::fbs::testing {
 // Tests that PaddedSize() behaves as expected.
 TEST(BaseTest, PaddedSize) {
@@ -16,7 +18,7 @@
   EXPECT_EQ(8, PaddedSize(7, 4));
 }
 
-inline constexpr size_t kDefaultSize = 16;
+inline constexpr size_t kDefaultSize = AlignedVectorAllocator::kAlignment * 2;
 template <typename T>
 class AllocatorTest : public ::testing::Test {
  protected:
@@ -32,7 +34,8 @@
       allocator_(std::make_unique<SpanAllocator>(
           std::span<uint8_t>{buffer_.data(), buffer_.size()})) {}
 
-using AllocatorTypes = ::testing::Types<SpanAllocator, VectorAllocator>;
+using AllocatorTypes =
+    ::testing::Types<SpanAllocator, VectorAllocator, AlignedVectorAllocator>;
 TYPED_TEST_SUITE(AllocatorTest, AllocatorTypes);
 
 // Tests that we can create and not use a VectorAllocator.
@@ -79,6 +82,11 @@
 
 // Tests that we can remove bytes from an arbitrary spot in the buffer.
 TYPED_TEST(AllocatorTest, RemoveBytes) {
+  // Deletion doesn't require resizing, so we don't need to worry about it being
+  // larger than the alignment to test everything.  The test requires the size
+  // to be < 255 to store the sentinal values.
+  const size_t kDefaultSize = 128;
+
   const size_t half_size = kDefaultSize / 2;
   std::span<uint8_t> span =
       this->allocator_->Allocate(kDefaultSize, 4, SetZero::kYes).value();