Optimize simulated event loop log reading.

Currenly, we make 2 copies of a message before it is sent in
simulation: First, from the source (file or lzma decoder) to a
flatbuffer vector, then to the simulated message. By aligning the
data part of a MessageHeader in the first copy from the source,
we don't need to copy it again. This is all tracked with
shared_ptr's so the lifetime management is easy.

Change-Id: I82c86ef3f9662d4c615dc57862fa89b1b9981ed4
Signed-off-by: Tyler Chatow <tchatow@gmail.com>
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 8728111..68cc923 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -31,6 +31,15 @@
 
 RawSender::~RawSender() { event_loop_->DeleteSender(this); }
 
+bool RawSender::DoSend(const SharedSpan data,
+                       monotonic_clock::time_point monotonic_remote_time,
+                       realtime_clock::time_point realtime_remote_time,
+                       uint32_t remote_queue_index,
+                       const UUID &source_boot_uuid) {
+  return DoSend(data->data(), data->size(), monotonic_remote_time,
+                realtime_remote_time, remote_queue_index, source_boot_uuid);
+}
+
 RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
     : event_loop_(event_loop),
       channel_(channel),
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 1000703..0e7b66a 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -136,6 +136,8 @@
 // and as a building block to implement typed senders.
 class RawSender {
  public:
+  using SharedSpan = std::shared_ptr<const absl::Span<const uint8_t>>;
+
   RawSender(EventLoop *event_loop, const Channel *channel);
   RawSender(const RawSender &) = delete;
   RawSender &operator=(const RawSender &) = delete;
@@ -164,6 +166,15 @@
             realtime_clock::time_point realtime_remote_time,
             uint32_t remote_queue_index, const UUID &source_boot_uuid);
 
+  // Sends a single block of data by refcounting it to avoid copies.  The data
+  // must not change after being passed into Send. The remote arguments have the
+  // same meaning as in Send above.
+  bool Send(const SharedSpan data);
+  bool Send(const SharedSpan data,
+            monotonic_clock::time_point monotonic_remote_time,
+            realtime_clock::time_point realtime_remote_time,
+            uint32_t remote_queue_index, const UUID &remote_boot_uuid);
+
   const Channel *channel() const { return channel_; }
 
   // Returns the time_points that the last message was sent at.
@@ -209,6 +220,11 @@
                       realtime_clock::time_point realtime_remote_time,
                       uint32_t remote_queue_index,
                       const UUID &source_boot_uuid) = 0;
+  virtual bool DoSend(const SharedSpan data,
+                      monotonic_clock::time_point monotonic_remote_time,
+                      realtime_clock::time_point realtime_remote_time,
+                      uint32_t remote_queue_index,
+                      const UUID &source_boot_uuid);
 
   EventLoop *const event_loop_;
   const Channel *const channel_;
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 698191e..7e560fe 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -36,8 +36,9 @@
 
 template <typename Watch>
 void EventLoop::MakeWatcher(const std::string_view channel_name, Watch &&w) {
-  using MessageType = typename event_loop_internal::watch_message_type_trait<
-      decltype(&Watch::operator())>::message_type;
+  using MessageType =
+      typename event_loop_internal::watch_message_type_trait<decltype(
+          &Watch::operator())>::message_type;
   const Channel *channel = configuration::GetChannel(
       configuration_, channel_name, MessageType::GetFullyQualifiedName(),
       name(), node());
@@ -174,6 +175,31 @@
   return false;
 }
 
+inline bool RawSender::Send(const SharedSpan data) {
+  return Send(std::move(data), monotonic_clock::min_time,
+              realtime_clock::min_time, 0xffffffffu, event_loop_->boot_uuid());
+}
+
+inline bool RawSender::Send(
+    const SharedSpan data,
+    aos::monotonic_clock::time_point monotonic_remote_time,
+    aos::realtime_clock::time_point realtime_remote_time,
+    uint32_t remote_queue_index, const UUID &uuid) {
+  const size_t size = data->size();
+  if (DoSend(std::move(data), monotonic_remote_time, realtime_remote_time,
+             remote_queue_index, uuid)) {
+    timing_.size.Add(size);
+    timing_.sender->mutate_count(timing_.sender->count() + 1);
+    ftrace_.FormatMessage(
+        "%.*s: sent shared: event=%" PRId64 " queue=%" PRIu32,
+        static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+        static_cast<int64_t>(monotonic_sent_time().time_since_epoch().count()),
+        sent_queue_index());
+    return true;
+  }
+  return false;
+}
+
 inline monotonic_clock::time_point TimerHandler::Call(
     std::function<monotonic_clock::time_point()> get_time,
     monotonic_clock::time_point event_time) {
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index d6213ae..a80b0e5 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -706,7 +706,7 @@
             state->monotonic_start_time(
                 timestamped_message.monotonic_event_time.boot) ||
         event_loop_factory_ != nullptr) {
-      if (timestamped_message.data.span().size() != 0u) {
+      if (timestamped_message.data != nullptr) {
         if (timestamped_message.monotonic_remote_time !=
             BootTimestamp::min_time()) {
           // Confirm that the message was sent on the sending node before the
@@ -851,7 +851,7 @@
           // through events like this, they can set
           // --skip_missing_forwarding_entries or ignore_missing_data_.
           CHECK_LT(next.channel_index, last_message.size());
-          if (next.data.span().size() == 0u) {
+          if (next.data == nullptr) {
             last_message[next.channel_index] = true;
           } else {
             if (last_message[next.channel_index]) {
@@ -874,9 +874,7 @@
                           .count()
                    << " start "
                    << monotonic_start_time().time_since_epoch().count() << " "
-                   << FlatbufferToJson(
-                          timestamped_message.data,
-                          {.multi_line = false, .max_vector_size = 100});
+                   << *timestamped_message.data;
     }
 
     const BootTimestamp next_time = state->OldestMessageTime();
@@ -1421,8 +1419,8 @@
   // Send!  Use the replayed queue index here instead of the logged queue index
   // for the remote queue index.  This makes re-logging work.
   const bool sent = sender->Send(
-      timestamped_message.data.message().data()->Data(),
-      timestamped_message.data.message().data()->size(),
+      RawSender::SharedSpan(timestamped_message.data,
+                            &timestamped_message.data->span),
       timestamped_message.monotonic_remote_time.time,
       timestamped_message.realtime_remote_time, remote_queue_index,
       (channel_source_state_[timestamped_message.channel_index] != nullptr
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 73578e6..dd82418 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -43,9 +43,20 @@
             "last header as the actual header.");
 
 namespace aos::logger {
+namespace {
 
 namespace chrono = std::chrono;
 
+template <typename T>
+void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
+  if (t.has_value()) {
+    *os << *t;
+  } else {
+    *os << "null";
+  }
+}
+}  // namespace
+
 DetachedBufferWriter::DetachedBufferWriter(
     std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
     : filename_(filename), encoder_(std::move(encoder)) {
@@ -499,23 +510,81 @@
           << FlatbufferToJson(log_file_header()->node());
 }
 
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
-MessageReader::ReadMessage() {
+std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
   absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
   if (msg_data == absl::Span<const uint8_t>()) {
-    return std::nullopt;
+    return nullptr;
   }
 
-  SizePrefixedFlatbufferVector<MessageHeader> result(msg_data);
+  SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
+  CHECK(msg.Verify()) << ": Corrupted message from " << filename();
 
-  CHECK(result.Verify()) << ": Corrupted message from " << filename();
+  auto result = UnpackedMessageHeader::MakeMessage(msg.message());
 
-  const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
-      chrono::nanoseconds(result.message().monotonic_sent_time()));
+  const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
 
   newest_timestamp_ = std::max(newest_timestamp_, timestamp);
-  VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
-  return std::move(result);
+  VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
+  return result;
+}
+
+std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
+    const MessageHeader &message) {
+  const size_t data_size = message.has_data() ? message.data()->size() : 0;
+
+  UnpackedMessageHeader *const unpacked_message =
+      reinterpret_cast<UnpackedMessageHeader *>(
+          malloc(sizeof(UnpackedMessageHeader) + data_size +
+                 kChannelDataAlignment - 1));
+
+  CHECK(message.has_channel_index());
+  CHECK(message.has_monotonic_sent_time());
+
+  absl::Span<uint8_t> span;
+  if (data_size > 0) {
+    span =
+        absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
+                                &unpacked_message->actual_data[0], data_size)),
+                            data_size);
+  }
+
+  std::optional<std::chrono::nanoseconds> monotonic_remote_time;
+  if (message.has_monotonic_remote_time()) {
+    monotonic_remote_time =
+        std::chrono::nanoseconds(message.monotonic_remote_time());
+  }
+  std::optional<realtime_clock::time_point> realtime_remote_time;
+  if (message.has_realtime_remote_time()) {
+    realtime_remote_time = realtime_clock::time_point(
+        chrono::nanoseconds(message.realtime_remote_time()));
+  }
+
+  std::optional<uint32_t> remote_queue_index;
+  if (message.has_remote_queue_index()) {
+    remote_queue_index = message.remote_queue_index();
+  }
+
+  new (unpacked_message) UnpackedMessageHeader{
+      .channel_index = message.channel_index(),
+      .monotonic_sent_time = monotonic_clock::time_point(
+          chrono::nanoseconds(message.monotonic_sent_time())),
+      .realtime_sent_time = realtime_clock::time_point(
+          chrono::nanoseconds(message.realtime_sent_time())),
+      .queue_index = message.queue_index(),
+      .monotonic_remote_time = monotonic_remote_time,
+      .realtime_remote_time = realtime_remote_time,
+      .remote_queue_index = remote_queue_index,
+      .monotonic_timestamp_time = monotonic_clock::time_point(
+          std::chrono::nanoseconds(message.monotonic_timestamp_time())),
+      .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
+      .span = span};
+
+  if (data_size > 0) {
+    memcpy(span.data(), message.data()->data(), data_size);
+  }
+
+  return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
+                                                &DestroyAndFree);
 }
 
 PartsMessageReader::PartsMessageReader(LogParts log_parts)
@@ -569,19 +638,19 @@
   }
 }
 
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
-PartsMessageReader::ReadMessage() {
+std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
   while (!done_) {
-    std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
+    std::shared_ptr<UnpackedMessageHeader> message =
         message_reader_.ReadMessage();
     if (message) {
       newest_timestamp_ = message_reader_.newest_timestamp();
-      const monotonic_clock::time_point monotonic_sent_time(
-          chrono::nanoseconds(message->message().monotonic_sent_time()));
-      // TODO(austin): Does this work with startup?  Might need to use the start
-      // time.
-      // TODO(austin): Does this work with startup when we don't know the remote
-      // start time too?  Look at one of those logs to compare.
+      const monotonic_clock::time_point monotonic_sent_time =
+          message->monotonic_sent_time;
+
+      // TODO(austin): Does this work with startup?  Might need to use the
+      // start time.
+      // TODO(austin): Does this work with startup when we don't know the
+      // remote start time too?  Look at one of those logs to compare.
       if (monotonic_sent_time >
           parts_.monotonic_start_time + max_out_of_order_duration()) {
         after_start_ = true;
@@ -599,7 +668,7 @@
     NextLog();
   }
   newest_timestamp_ = monotonic_clock::max_time;
-  return std::nullopt;
+  return nullptr;
 }
 
 void PartsMessageReader::NextLog() {
@@ -645,13 +714,29 @@
          channel_index == m2.channel_index && queue_index == m2.queue_index;
 }
 
+std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
+  os << "{.channel_index=" << m.channel_index
+     << ", .monotonic_sent_time=" << m.monotonic_sent_time
+     << ", .realtime_sent_time=" << m.realtime_sent_time
+     << ", .queue_index=" << m.queue_index;
+  if (m.monotonic_remote_time) {
+    os << ", .monotonic_remote_time=" << m.monotonic_remote_time->count();
+  }
+  os << ", .realtime_remote_time=";
+  PrintOptionalOrNull(&os, m.realtime_remote_time);
+  os << ", .remote_queue_index=";
+  PrintOptionalOrNull(&os, m.remote_queue_index);
+  if (m.has_monotonic_timestamp_time) {
+    os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+  }
+  return os;
+}
+
 std::ostream &operator<<(std::ostream &os, const Message &m) {
   os << "{.channel_index=" << m.channel_index
      << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
-  if (m.data.Verify()) {
-    os << ", .data="
-       << aos::FlatbufferToJson(m.data,
-                                {.multi_line = false, .max_vector_size = 1});
+  if (m.data != nullptr) {
+    os << ", .data=" << m;
   }
   os << "}";
   return os;
@@ -674,10 +759,8 @@
   if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
     os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
   }
-  if (m.data.Verify()) {
-    os << ", .data="
-       << aos::FlatbufferToJson(m.data,
-                                {.multi_line = false, .max_vector_size = 1});
+  if (m.data != nullptr) {
+    os << ", .data=" << *m.data;
   }
   os << "}";
   return os;
@@ -700,7 +783,7 @@
         break;
       }
 
-      std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+      std::shared_ptr<UnpackedMessageHeader> m =
           parts_message_reader_.ReadMessage();
       // No data left, sorted forever, work through what is left.
       if (!m) {
@@ -709,36 +792,32 @@
       }
 
       size_t monotonic_timestamp_boot = 0;
-      if (m.value().message().has_monotonic_timestamp_time()) {
+      if (m->has_monotonic_timestamp_time) {
         monotonic_timestamp_boot = parts().logger_boot_count;
       }
       size_t monotonic_remote_boot = 0xffffff;
 
-      if (m.value().message().has_monotonic_remote_time()) {
+      if (m->monotonic_remote_time.has_value()) {
         const Node *node = parts().config->nodes()->Get(
-            source_node_index_[m->message().channel_index()]);
+            source_node_index_[m->channel_index]);
 
         std::optional<size_t> boot = parts_message_reader_.boot_count(
-            source_node_index_[m->message().channel_index()]);
+            source_node_index_[m->channel_index]);
         CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
                     << ", with index "
-                    << source_node_index_[m->message().channel_index()];
+                    << source_node_index_[m->channel_index];
         monotonic_remote_boot = *boot;
       }
 
-      messages_.insert(Message{
-          .channel_index = m.value().message().channel_index(),
-          .queue_index =
-              BootQueueIndex{.boot = parts().boot_count,
-                             .index = m.value().message().queue_index()},
-          .timestamp =
-              BootTimestamp{
-                  .boot = parts().boot_count,
-                  .time = monotonic_clock::time_point(std::chrono::nanoseconds(
-                      m.value().message().monotonic_sent_time()))},
-          .monotonic_remote_boot = monotonic_remote_boot,
-          .monotonic_timestamp_boot = monotonic_timestamp_boot,
-          .data = std::move(m.value())});
+      messages_.insert(
+          Message{.channel_index = m->channel_index,
+                  .queue_index = BootQueueIndex{.boot = parts().boot_count,
+                                                .index = m->queue_index},
+                  .timestamp = BootTimestamp{.boot = parts().boot_count,
+                                             .time = m->monotonic_sent_time},
+                  .monotonic_remote_boot = monotonic_remote_boot,
+                  .monotonic_timestamp_boot = monotonic_timestamp_boot,
+                  .data = std::move(m)});
 
       // Now, update sorted_until_ to match the new message.
       if (parts_message_reader_.newest_timestamp() >
@@ -878,17 +957,17 @@
       oldest = m;
       current_ = &parts_sorter;
     } else if (*m == *oldest) {
-      // Found a duplicate.  If there is a choice, we want the one which has the
-      // timestamp time.
-      if (!m->data.message().has_monotonic_timestamp_time()) {
+      // Found a duplicate.  If there is a choice, we want the one which has
+      // the timestamp time.
+      if (!m->data->has_monotonic_timestamp_time) {
         parts_sorter.PopFront();
-      } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
+      } else if (!oldest->data->has_monotonic_timestamp_time) {
         current_->PopFront();
         current_ = &parts_sorter;
         oldest = m;
       } else {
-        CHECK_EQ(m->data.message().monotonic_timestamp_time(),
-                 oldest->data.message().monotonic_timestamp_time());
+        CHECK_EQ(m->data->monotonic_timestamp_time,
+                 oldest->data->monotonic_timestamp_time);
         parts_sorter.PopFront();
       }
     }
@@ -1017,7 +1096,7 @@
   CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
 
   NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
-  // Only set it if this node delivers to the peer timestamp_mapper.  Otherwise
+  // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
   // we could needlessly save data.
   if (node_data->any_delivered) {
     VLOG(1) << "Registering on node " << node() << " for peer node "
@@ -1035,8 +1114,7 @@
       .channel_index = m->channel_index,
       .queue_index = m->queue_index,
       .monotonic_event_time = m->timestamp,
-      .realtime_event_time = aos::realtime_clock::time_point(
-          std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+      .realtime_event_time = m->data->realtime_sent_time,
       .remote_queue_index = BootQueueIndex::Invalid(),
       .monotonic_remote_time = BootTimestamp::min_time(),
       .realtime_remote_time = realtime_clock::min_time,
@@ -1086,16 +1164,17 @@
     return true;
   }
 
-  // We need to only add messages to the list so they get processed for messages
-  // which are delivered.  Reuse the flow below which uses messages_ by just
-  // adding the new message to messages_ and continuing.
+  // We need to only add messages to the list so they get processed for
+  // messages which are delivered.  Reuse the flow below which uses messages_
+  // by just adding the new message to messages_ and continuing.
   if (messages_.empty()) {
     if (!Queue()) {
       // Found nothing to add, we are out of data!
       return false;
     }
 
-    // Now that it has been added (and cannibalized), forget about it upstream.
+    // Now that it has been added (and cannibalized), forget about it
+    // upstream.
     boot_merger_.PopFront();
   }
 
@@ -1110,7 +1189,8 @@
     timestamp_callback_(&matched_messages_.back());
     return true;
   } else {
-    // Got a timestamp, find the matching remote data, match it, and return it.
+    // Got a timestamp, find the matching remote data, match it, and return
+    // it.
     Message data = MatchingMessageFor(*m);
 
     // Return the data from the remote.  The local message only has timestamp
@@ -1119,21 +1199,16 @@
         .channel_index = m->channel_index,
         .queue_index = m->queue_index,
         .monotonic_event_time = m->timestamp,
-        .realtime_event_time = aos::realtime_clock::time_point(
-            std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+        .realtime_event_time = m->data->realtime_sent_time,
         .remote_queue_index =
             BootQueueIndex{.boot = m->monotonic_remote_boot,
-                           .index = m->data.message().remote_queue_index()},
-        .monotonic_remote_time =
-            {m->monotonic_remote_boot,
-             monotonic_clock::time_point(std::chrono::nanoseconds(
-                 m->data.message().monotonic_remote_time()))},
-        .realtime_remote_time = realtime_clock::time_point(
-            std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
-        .monotonic_timestamp_time =
-            {m->monotonic_timestamp_boot,
-             monotonic_clock::time_point(std::chrono::nanoseconds(
-                 m->data.message().monotonic_timestamp_time()))},
+                           .index = m->data->remote_queue_index.value()},
+        .monotonic_remote_time = {m->monotonic_remote_boot,
+                                  monotonic_clock::time_point(
+                                      m->data->monotonic_remote_time.value())},
+        .realtime_remote_time = m->data->realtime_remote_time.value(),
+        .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
+                                     m->data->monotonic_timestamp_time},
         .data = std::move(data.data)});
     CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
     last_message_time_ = matched_messages_.back().monotonic_event_time;
@@ -1153,8 +1228,9 @@
 }
 
 void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
-  // Note: queueing for time doesn't really work well across boots.  So we just
-  // assume that if you are using this, you only care about the current boot.
+  // Note: queueing for time doesn't really work well across boots.  So we
+  // just assume that if you are using this, you only care about the current
+  // boot.
   //
   // TODO(austin): Is that the right concept?
   //
@@ -1191,36 +1267,37 @@
 
 Message TimestampMapper::MatchingMessageFor(const Message &message) {
   // Figure out what queue index we are looking for.
-  CHECK(message.data.message().has_remote_queue_index());
+  CHECK_NOTNULL(message.data);
+  CHECK(message.data->remote_queue_index.has_value());
   const BootQueueIndex remote_queue_index =
       BootQueueIndex{.boot = message.monotonic_remote_boot,
-                     .index = message.data.message().remote_queue_index()};
+                     .index = *message.data->remote_queue_index};
 
-  CHECK(message.data.message().has_monotonic_remote_time());
-  CHECK(message.data.message().has_realtime_remote_time());
+  CHECK(message.data->monotonic_remote_time.has_value());
+  CHECK(message.data->realtime_remote_time.has_value());
 
   const BootTimestamp monotonic_remote_time{
       .boot = message.monotonic_remote_boot,
-      .time = monotonic_clock::time_point(std::chrono::nanoseconds(
-          message.data.message().monotonic_remote_time()))};
-  const realtime_clock::time_point realtime_remote_time(
-      std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
+      .time = monotonic_clock::time_point(
+          message.data->monotonic_remote_time.value())};
+  const realtime_clock::time_point realtime_remote_time =
+      *message.data->realtime_remote_time;
 
-  TimestampMapper *peer = nodes_data_[source_node_[message.channel_index]].peer;
+  TimestampMapper *peer =
+      nodes_data_[source_node_[message.data->channel_index]].peer;
 
   // We only register the peers which we have data for.  So, if we are being
-  // asked to pull a timestamp from a peer which doesn't exist, return an empty
-  // message.
+  // asked to pull a timestamp from a peer which doesn't exist, return an
+  // empty message.
   if (peer == nullptr) {
     // TODO(austin): Make sure the tests hit all these paths with a boot count
     // of 1...
-    return Message{
-        .channel_index = message.channel_index,
-        .queue_index = remote_queue_index,
-        .timestamp = monotonic_remote_time,
-        .monotonic_remote_boot = 0xffffff,
-        .monotonic_timestamp_boot = 0xffffff,
-        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+    return Message{.channel_index = message.channel_index,
+                   .queue_index = remote_queue_index,
+                   .timestamp = monotonic_remote_time,
+                   .monotonic_remote_boot = 0xffffff,
+                   .monotonic_timestamp_boot = 0xffffff,
+                   .data = nullptr};
   }
 
   // The queue which will have the matching data, if available.
@@ -1230,13 +1307,12 @@
   peer->QueueUnmatchedUntil(monotonic_remote_time);
 
   if (data_queue->empty()) {
-    return Message{
-        .channel_index = message.channel_index,
-        .queue_index = remote_queue_index,
-        .timestamp = monotonic_remote_time,
-        .monotonic_remote_boot = 0xffffff,
-        .monotonic_timestamp_boot = 0xffffff,
-        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+    return Message{.channel_index = message.channel_index,
+                   .queue_index = remote_queue_index,
+                   .timestamp = monotonic_remote_time,
+                   .monotonic_remote_boot = 0xffffff,
+                   .monotonic_timestamp_boot = 0xffffff,
+                   .data = nullptr};
   }
 
   if (remote_queue_index < data_queue->front().queue_index ||
@@ -1247,7 +1323,7 @@
         .timestamp = monotonic_remote_time,
         .monotonic_remote_boot = 0xffffff,
         .monotonic_timestamp_boot = 0xffffff,
-        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+        .data = nullptr};
   }
 
   // The algorithm below is constant time with some assumptions.  We need there
@@ -1267,8 +1343,7 @@
 
     CHECK_EQ(result.timestamp, monotonic_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
-    CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
-                 result.data.message().realtime_sent_time())),
+    CHECK_EQ(result.data->realtime_sent_time,
              realtime_remote_time)
         << ": Queue index matches, but timestamp doesn't.  Please investigate!";
     // Now drop the data off the front.  We have deduplicated timestamps, so we
@@ -1288,23 +1363,22 @@
                  m.timestamp.boot == remote_boot;
         });
     if (it == data_queue->end()) {
-      return Message{
-          .channel_index = message.channel_index,
-          .queue_index = remote_queue_index,
-          .timestamp = monotonic_remote_time,
-          .monotonic_remote_boot = 0xffffff,
-          .monotonic_timestamp_boot = 0xffffff,
-          .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+      return Message{.channel_index = message.channel_index,
+                     .queue_index = remote_queue_index,
+                     .timestamp = monotonic_remote_time,
+                     .monotonic_remote_boot = 0xffffff,
+                     .monotonic_timestamp_boot = 0xffffff,
+                     .data = nullptr};
     }
 
     Message result = std::move(*it);
 
     CHECK_EQ(result.timestamp, monotonic_remote_time)
-        << ": Queue index matches, but timestamp doesn't.  Please investigate!";
-    CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
-                 result.data.message().realtime_sent_time())),
-             realtime_remote_time)
-        << ": Queue index matches, but timestamp doesn't.  Please investigate!";
+        << ": Queue index matches, but timestamp doesn't.  Please "
+           "investigate!";
+    CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+        << ": Queue index matches, but timestamp doesn't.  Please "
+           "investigate!";
 
     // TODO(austin): We still go in order, so we can erase from the beginning to
     // our iterator minus 1.  That'll keep 1 in the queue.
@@ -1330,7 +1404,8 @@
       return;
     }
 
-    // Now that it has been added (and cannibalized), forget about it upstream.
+    // Now that it has been added (and cannibalized), forget about it
+    // upstream.
     boot_merger_.PopFront();
   }
 }
@@ -1346,8 +1421,8 @@
     if (node_data.channels[m->channel_index].delivered) {
       // TODO(austin): This copies the data...  Probably not worth stressing
       // about yet.
-      // TODO(austin): Bound how big this can get.  We tend not to send massive
-      // data, so we can probably ignore this for a bit.
+      // TODO(austin): Bound how big this can get.  We tend not to send
+      // massive data, so we can probably ignore this for a bit.
       node_data.channels[m->channel_index].messages.emplace_back(*m);
     }
   }
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 5c5f709..4c82e4d 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -253,6 +253,8 @@
 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
     std::string_view filename, size_t n);
 
+class UnpackedMessageHeader;
+
 // Class which handles reading the header and messages from the log file.  This
 // handles any per-file state left before merging below.
 class MessageReader {
@@ -284,7 +286,7 @@
   }
 
   // Returns the next message if there is one.
-  std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
+  std::shared_ptr<UnpackedMessageHeader> ReadMessage();
 
   // The time at which we need to read another chunk from the logfile.
   monotonic_clock::time_point queue_data_time() const {
@@ -334,7 +336,7 @@
   // Returns the next message if there is one, or nullopt if we have reached the
   // end of all the files.
   // Note: reading the next message may change the max_out_of_order_duration().
-  std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
+  std::shared_ptr<UnpackedMessageHeader> ReadMessage();
 
   // Returns the boot count for the requested node, or std::nullopt if we don't
   // know.
@@ -377,6 +379,54 @@
   std::vector<std::optional<size_t>> boot_counts_;
 };
 
+// Stores MessageHeader as a flat header and inline, aligned block of data.
+class UnpackedMessageHeader {
+ public:
+  UnpackedMessageHeader(const UnpackedMessageHeader &) = delete;
+  UnpackedMessageHeader &operator=(const UnpackedMessageHeader &) = delete;
+
+  // The channel.
+  uint32_t channel_index = 0xffffffff;
+
+  monotonic_clock::time_point monotonic_sent_time;
+  realtime_clock::time_point realtime_sent_time;
+
+  // The local queue index.
+  uint32_t queue_index = 0xffffffff;
+
+  std::optional<std::chrono::nanoseconds> monotonic_remote_time;
+
+  std::optional<realtime_clock::time_point> realtime_remote_time;
+  std::optional<uint32_t> remote_queue_index;
+
+  // This field is defaulted in the flatbuffer, so we need to store both the
+  // possibly defaulted value and whether it is defaulted.
+  monotonic_clock::time_point monotonic_timestamp_time;
+  bool has_monotonic_timestamp_time;
+
+  static std::shared_ptr<UnpackedMessageHeader> MakeMessage(
+      const MessageHeader &message);
+
+  // Note: we are storing a span here because we need something to put in the
+  // SharedSpan pointer that RawSender takes.  We are using the aliasing
+  // constructor of shared_ptr to avoid the allocation, and it needs a nice
+  // pointer to track.
+  absl::Span<const uint8_t> span;
+
+  char actual_data[];
+
+ private:
+  ~UnpackedMessageHeader() {}
+
+  static void DestroyAndFree(UnpackedMessageHeader *p) {
+    p->~UnpackedMessageHeader();
+    free(p);
+  }
+};
+
+std::ostream &operator<<(std::ostream &os,
+                         const UnpackedMessageHeader &message);
+
 // Struct to hold a message as it gets sorted on a single node.
 struct Message {
   // The channel.
@@ -394,8 +444,7 @@
 
   size_t monotonic_timestamp_boot = 0xffffff;
 
-  // The data (either a timestamp header, or a data header).
-  SizePrefixedFlatbufferVector<MessageHeader> data;
+  std::shared_ptr<UnpackedMessageHeader> data;
 
   bool operator<(const Message &m2) const;
   bool operator>=(const Message &m2) const;
@@ -420,7 +469,7 @@
 
   BootTimestamp monotonic_timestamp_time;
 
-  SizePrefixedFlatbufferVector<MessageHeader> data;
+  std::shared_ptr<UnpackedMessageHeader> data;
 };
 
 std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 1c67312..e1e2ebd 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -230,14 +230,14 @@
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
              .monotonic_remote_boot = 0xffffff,
              .monotonic_timestamp_boot = 0xffffff,
-             .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+             .data = nullptr};
   Message m2{.channel_index = 0,
              .queue_index = BootQueueIndex{.boot = 0, .index = 0u},
              .timestamp =
                  BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
              .monotonic_remote_boot = 0xffffff,
              .monotonic_timestamp_boot = 0xffffff,
-             .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+             .data = nullptr};
 
   EXPECT_LT(m1, m2);
   EXPECT_GE(m2, m1);
@@ -847,22 +847,25 @@
 
   EXPECT_EQ(output[0].timestamp.boot, 0u);
   EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
-  EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
+  EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
 
   EXPECT_EQ(output[1].timestamp.boot, 0u);
   EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
-  EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
-  EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
+  EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
+  EXPECT_EQ(output[1].data->monotonic_timestamp_time,
+            monotonic_clock::time_point(std::chrono::nanoseconds(971)));
 
   EXPECT_EQ(output[2].timestamp.boot, 0u);
   EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
-  EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
-  EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
+  EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
+  EXPECT_EQ(output[2].data->monotonic_timestamp_time,
+            monotonic_clock::time_point(std::chrono::nanoseconds(972)));
 
   EXPECT_EQ(output[3].timestamp.boot, 0u);
   EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
-  EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
-  EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
+  EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
+  EXPECT_EQ(output[3].data->monotonic_timestamp_time,
+            monotonic_clock::time_point(std::chrono::nanoseconds(973)));
 }
 
 // Tests that we can match timestamps on delivered messages.
@@ -941,17 +944,17 @@
     EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[0].monotonic_event_time.time,
               e + chrono::milliseconds(1000));
-    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_TRUE(output0[0].data != nullptr);
 
     EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[1].monotonic_event_time.time,
               e + chrono::milliseconds(2000));
-    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_TRUE(output0[1].data != nullptr);
 
     EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[2].monotonic_event_time.time,
               e + chrono::milliseconds(3000));
-    EXPECT_TRUE(output0[2].data.Verify());
+    EXPECT_TRUE(output0[2].data != nullptr);
   }
 
   {
@@ -993,17 +996,17 @@
     EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
-    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_TRUE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(2000));
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(3000));
-    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_TRUE(output1[2].data != nullptr);
   }
 }
 
@@ -1072,7 +1075,7 @@
     EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
               monotonic_clock::min_time);
-    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_TRUE(output0[0].data != nullptr);
 
     EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[1].monotonic_event_time.time,
@@ -1080,7 +1083,7 @@
     EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
               monotonic_clock::min_time);
-    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_TRUE(output0[1].data != nullptr);
 
     EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[2].monotonic_event_time.time,
@@ -1088,7 +1091,7 @@
     EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
               monotonic_clock::min_time);
-    EXPECT_TRUE(output0[2].data.Verify());
+    EXPECT_TRUE(output0[2].data != nullptr);
   }
 
   {
@@ -1109,7 +1112,7 @@
     EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
               e + chrono::nanoseconds(971));
-    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_TRUE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
@@ -1117,7 +1120,7 @@
     EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
               e + chrono::nanoseconds(5458));
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
@@ -1125,7 +1128,7 @@
     EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
               monotonic_clock::min_time);
-    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_TRUE(output1[2].data != nullptr);
   }
 
   EXPECT_EQ(mapper0_count, 3u);
@@ -1200,17 +1203,17 @@
     EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
-    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_TRUE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(2000));
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(3000));
-    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_TRUE(output1[2].data != nullptr);
   }
 
   {
@@ -1236,17 +1239,17 @@
     EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[0].monotonic_event_time.time,
               e + chrono::milliseconds(1000));
-    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_TRUE(output0[0].data != nullptr);
 
     EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[1].monotonic_event_time.time,
               e + chrono::milliseconds(2000));
-    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_TRUE(output0[1].data != nullptr);
 
     EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[2].monotonic_event_time.time,
               e + chrono::milliseconds(3000));
-    EXPECT_TRUE(output0[2].data.Verify());
+    EXPECT_TRUE(output0[2].data != nullptr);
   }
 
   EXPECT_EQ(mapper0_count, 3u);
@@ -1319,17 +1322,17 @@
     EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
-    EXPECT_FALSE(output1[0].data.Verify());
+    EXPECT_FALSE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(2000));
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(3000));
-    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_TRUE(output1[2].data != nullptr);
   }
 
   EXPECT_EQ(mapper0_count, 0u);
@@ -1402,17 +1405,17 @@
     EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
-    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_TRUE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(2000));
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(3000));
-    EXPECT_FALSE(output1[2].data.Verify());
+    EXPECT_FALSE(output1[2].data != nullptr);
   }
 
   EXPECT_EQ(mapper0_count, 0u);
@@ -1477,12 +1480,12 @@
     EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
-    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_TRUE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(3000));
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
   }
 
   EXPECT_EQ(mapper0_count, 0u);
@@ -1550,22 +1553,22 @@
     EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
-    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_TRUE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(2000));
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(2000));
-    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_TRUE(output1[2].data != nullptr);
 
     EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[3].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(3000));
-    EXPECT_TRUE(output1[3].data.Verify());
+    EXPECT_TRUE(output1[3].data != nullptr);
   }
 
   EXPECT_EQ(mapper0_count, 0u);
@@ -1650,17 +1653,17 @@
     EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
-    EXPECT_FALSE(output1[0].data.Verify());
+    EXPECT_FALSE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(2000));
-    EXPECT_FALSE(output1[1].data.Verify());
+    EXPECT_FALSE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(3000));
-    EXPECT_FALSE(output1[2].data.Verify());
+    EXPECT_FALSE(output1[2].data != nullptr);
   }
   EXPECT_EQ(mapper1_count, 3u);
 }
@@ -1754,22 +1757,22 @@
     EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[0].monotonic_event_time.time,
               e + chrono::milliseconds(1000));
-    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_TRUE(output0[0].data != nullptr);
 
     EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[1].monotonic_event_time.time,
               e + chrono::milliseconds(1000));
-    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_TRUE(output0[1].data != nullptr);
 
     EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[2].monotonic_event_time.time,
               e + chrono::milliseconds(2000));
-    EXPECT_TRUE(output0[2].data.Verify());
+    EXPECT_TRUE(output0[2].data != nullptr);
 
     EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[3].monotonic_event_time.time,
               e + chrono::milliseconds(3000));
-    EXPECT_TRUE(output0[3].data.Verify());
+    EXPECT_TRUE(output0[3].data != nullptr);
   }
 
   {
@@ -1827,22 +1830,22 @@
     EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
-    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_TRUE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1000));
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(2000));
-    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_TRUE(output1[2].data != nullptr);
 
     EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output1[3].monotonic_event_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(3000));
-    EXPECT_TRUE(output1[3].data.Verify());
+    EXPECT_TRUE(output1[3].data != nullptr);
   }
 }
 
@@ -2194,7 +2197,7 @@
               (BootQueueIndex{.boot = 0u, .index = 0u}));
     EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
     EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
-    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_TRUE(output0[0].data != nullptr);
 
     EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[1].monotonic_event_time.time,
@@ -2203,7 +2206,7 @@
               (BootQueueIndex{.boot = 0u, .index = 1u}));
     EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
     EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
-    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_TRUE(output0[1].data != nullptr);
 
     EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[2].monotonic_event_time.time,
@@ -2212,7 +2215,7 @@
               (BootQueueIndex{.boot = 0u, .index = 2u}));
     EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
     EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
-    EXPECT_TRUE(output0[2].data.Verify());
+    EXPECT_TRUE(output0[2].data != nullptr);
   }
 
   {
@@ -2267,7 +2270,7 @@
     EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
               e + chrono::milliseconds(1001));
-    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_TRUE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
@@ -2280,7 +2283,7 @@
     EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
               e + chrono::milliseconds(2001));
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
@@ -2293,7 +2296,7 @@
     EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
               e + chrono::milliseconds(2001));
-    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_TRUE(output1[2].data != nullptr);
 
     EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
     EXPECT_EQ(output1[3].monotonic_event_time.time,
@@ -2306,7 +2309,7 @@
     EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
               e + chrono::milliseconds(3001));
-    EXPECT_TRUE(output1[3].data.Verify());
+    EXPECT_TRUE(output1[3].data != nullptr);
 
     LOG(INFO) << output1[0];
     LOG(INFO) << output1[1];
@@ -2414,7 +2417,7 @@
     EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
               e + chrono::seconds(100) + chrono::milliseconds(1001));
-    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_TRUE(output0[0].data != nullptr);
 
     EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[1].monotonic_event_time.time,
@@ -2425,7 +2428,7 @@
     EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
               e + chrono::seconds(20) + chrono::milliseconds(2001));
-    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_TRUE(output0[1].data != nullptr);
 
     EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
     EXPECT_EQ(output0[2].monotonic_event_time.time,
@@ -2436,7 +2439,7 @@
     EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
     EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
               e + chrono::seconds(20) + chrono::milliseconds(3001));
-    EXPECT_TRUE(output0[2].data.Verify());
+    EXPECT_TRUE(output0[2].data != nullptr);
   }
 
   {
@@ -2480,21 +2483,21 @@
               e + chrono::seconds(100) + chrono::milliseconds(1000));
     EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
     EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
-    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_TRUE(output1[0].data != nullptr);
 
     EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
     EXPECT_EQ(output1[1].monotonic_event_time.time,
               e + chrono::seconds(20) + chrono::milliseconds(2000));
     EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
     EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
-    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_TRUE(output1[1].data != nullptr);
 
     EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
     EXPECT_EQ(output1[2].monotonic_event_time.time,
               e + chrono::seconds(20) + chrono::milliseconds(3000));
     EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
     EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
-    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_TRUE(output1[2].data != nullptr);
 
     LOG(INFO) << output1[0];
     LOG(INFO) << output1[1];
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 0be64bb..a923cf6 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,9 +1,8 @@
-#include "aos/events/logging/log_reader.h"
-
 #include <sys/stat.h>
 
 #include "absl/strings/str_format.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/logging/log_writer.h"
 #include "aos/events/message_counter.h"
 #include "aos/events/ping_lib.h"
@@ -849,19 +848,18 @@
 // type, count) for every message matching matcher()
 std::vector<std::tuple<std::string, std::string, int>> CountChannelsMatching(
     std::shared_ptr<const aos::Configuration> config, std::string_view filename,
-    std::function<bool(const MessageHeader *)> matcher) {
+    std::function<bool(const UnpackedMessageHeader *)> matcher) {
   MessageReader message_reader(filename);
   std::vector<int> counts(config->channels()->size(), 0);
 
   while (true) {
-    std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
-        message_reader.ReadMessage();
+    std::shared_ptr<UnpackedMessageHeader> msg = message_reader.ReadMessage();
     if (!msg) {
       break;
     }
 
-    if (matcher(&msg.value().message())) {
-      counts[msg.value().message().channel_index()]++;
+    if (matcher(msg.get())) {
+      counts[msg->channel_index]++;
     }
   }
 
@@ -883,30 +881,32 @@
 std::vector<std::tuple<std::string, std::string, int>> CountChannelsData(
     std::shared_ptr<const aos::Configuration> config,
     std::string_view filename) {
-  return CountChannelsMatching(config, filename, [](const MessageHeader *msg) {
-    if (msg->has_data()) {
-      CHECK(!msg->has_monotonic_remote_time());
-      CHECK(!msg->has_realtime_remote_time());
-      CHECK(!msg->has_remote_queue_index());
-      return true;
-    }
-    return false;
-  });
+  return CountChannelsMatching(
+      config, filename, [](const UnpackedMessageHeader *msg) {
+        if (msg->span.data() != nullptr) {
+          CHECK(!msg->monotonic_remote_time.has_value());
+          CHECK(!msg->realtime_remote_time.has_value());
+          CHECK(!msg->remote_queue_index.has_value());
+          return true;
+        }
+        return false;
+      });
 }
 
 // Counts the number of messages (channel, count) for all timestamp messages.
 std::vector<std::tuple<std::string, std::string, int>> CountChannelsTimestamp(
     std::shared_ptr<const aos::Configuration> config,
     std::string_view filename) {
-  return CountChannelsMatching(config, filename, [](const MessageHeader *msg) {
-    if (!msg->has_data()) {
-      CHECK(msg->has_monotonic_remote_time());
-      CHECK(msg->has_realtime_remote_time());
-      CHECK(msg->has_remote_queue_index());
-      return true;
-    }
-    return false;
-  });
+  return CountChannelsMatching(
+      config, filename, [](const UnpackedMessageHeader *msg) {
+        if (msg->span.data() == nullptr) {
+          CHECK(msg->monotonic_remote_time.has_value());
+          CHECK(msg->realtime_remote_time.has_value());
+          CHECK(msg->remote_queue_index.has_value());
+          return true;
+        }
+        return false;
+      });
 }
 
 // Tests that we can write and read simple multi-node log files.
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 3cd5c5b..9121a8f 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -39,38 +39,71 @@
   const bool prior_;
 };
 
+// Holds storage for a span object and the data referenced by that span for
+// compatibility with RawSender::SharedSpan users. If constructed with
+// MakeSharedSpan, span points to only the aligned segment of the entire data.
+struct AlignedOwningSpan {
+  AlignedOwningSpan(const AlignedOwningSpan &) = delete;
+  AlignedOwningSpan &operator=(const AlignedOwningSpan &) = delete;
+  absl::Span<const uint8_t> span;
+  char data[];
+};
+
+// Constructs a span which owns its data through a shared_ptr. The owning span
+// points to a const view of the data; also returns a temporary mutable span
+// which is only valid while the const shared span is kept alive.
+std::pair<RawSender::SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(
+    size_t size) {
+  AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
+      malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
+
+  absl::Span mutable_span(
+      reinterpret_cast<uint8_t *>(RoundChannelData(&span->data[0], size)),
+      size);
+  new (span) AlignedOwningSpan{.span = mutable_span};
+
+  return std::make_pair(
+      RawSender::SharedSpan(
+          std::shared_ptr<AlignedOwningSpan>(span,
+                                             [](AlignedOwningSpan *s) {
+                                               s->~AlignedOwningSpan();
+                                               free(s);
+                                             }),
+          &span->span),
+      mutable_span);
+}
+
 // Container for both a message, and the context for it for simulation.  This
 // makes tracking the timestamps associated with the data easy.
 struct SimulatedMessage final {
   SimulatedMessage(const SimulatedMessage &) = delete;
   SimulatedMessage &operator=(const SimulatedMessage &) = delete;
+  ~SimulatedMessage();
 
   // Creates a SimulatedMessage with size bytes of storage.
   // This is a shared_ptr so we don't have to implement refcounting or copying.
-  static std::shared_ptr<SimulatedMessage> Make(SimulatedChannel *channel);
+  static std::shared_ptr<SimulatedMessage> Make(
+      SimulatedChannel *channel, const RawSender::SharedSpan data);
 
   // Context for the data.
   Context context;
 
   SimulatedChannel *const channel = nullptr;
 
-  // The data.
-  char *data(size_t buffer_size) {
-    return RoundChannelData(&actual_data[0], buffer_size);
-  }
+  // Owning span to this message's data. Depending on the sender may either
+  // represent the data of just the flatbuffer, or max channel size.
+  RawSender::SharedSpan data;
 
-  // Then the data, including padding on the end so we can align the buffer we
-  // actually return from data().
-  char actual_data[];
+  // Mutable view of above data. If empty, this message is not mutable.
+  absl::Span<uint8_t> mutable_data;
 
- private:
+  // Determines whether this message is mutable. Used for Send where the user
+  // fills out a message stored internally then gives us the size of data used.
+  bool is_mutable() const { return data->size() == mutable_data.size(); }
+
+  // Note: this should be private but make_shared requires it to be public.  Use
+  // Make() above to construct.
   SimulatedMessage(SimulatedChannel *channel_in);
-  ~SimulatedMessage();
-
-  static void DestroyAndFree(SimulatedMessage *p) {
-    p->~SimulatedMessage();
-    free(p);
-  }
 };
 
 }  // namespace
@@ -260,19 +293,17 @@
 namespace {
 
 std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
-    SimulatedChannel *channel) {
+    SimulatedChannel *channel, RawSender::SharedSpan data) {
   // The allocations in here are due to infrastructure and don't count in the no
   // mallocs in RT code.
   ScopedNotRealtime nrt;
-  const size_t size = channel->max_size();
-  SimulatedMessage *const message = reinterpret_cast<SimulatedMessage *>(
-      malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
-  new (message) SimulatedMessage(channel);
-  message->context.size = size;
-  message->context.data = message->data(size);
 
-  return std::shared_ptr<SimulatedMessage>(message,
-                                           &SimulatedMessage::DestroyAndFree);
+  auto message = std::make_shared<SimulatedMessage>(channel);
+  message->context.size = data->size();
+  message->context.data = data->data();
+  message->data = std::move(data);
+
+  return message;
 }
 
 SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
@@ -292,9 +323,13 @@
 
   void *data() override {
     if (!message_) {
-      message_ = SimulatedMessage::Make(simulated_channel_);
+      auto [span, mutable_span] =
+          MakeSharedSpan(simulated_channel_->max_size());
+      message_ = SimulatedMessage::Make(simulated_channel_, span);
+      message_->mutable_data = mutable_span;
     }
-    return message_->data(simulated_channel_->max_size());
+    CHECK(message_->is_mutable());
+    return message_->mutable_data.data();
   }
 
   size_t size() override { return simulated_channel_->max_size(); }
@@ -310,6 +345,12 @@
               uint32_t remote_queue_index,
               const UUID &source_boot_uuid) override;
 
+  bool DoSend(const SharedSpan data,
+              aos::monotonic_clock::time_point monotonic_remote_time,
+              aos::realtime_clock::time_point realtime_remote_time,
+              uint32_t remote_queue_index,
+              const UUID &source_boot_uuid) override;
+
   int buffer_index() override {
     // First, ensure message_ is allocated.
     data();
@@ -869,8 +910,12 @@
 uint32_t SimulatedChannel::Send(std::shared_ptr<SimulatedMessage> message) {
   const uint32_t queue_index = next_queue_index_.index();
   message->context.queue_index = queue_index;
-  message->context.data = message->data(channel()->max_size()) +
-                          channel()->max_size() - message->context.size;
+
+  // Points to the actual data depending on the size set in context. Data may
+  // allocate more than the actual size of the message, so offset from the back
+  // of that to get the actual start of the data.
+  message->context.data =
+      message->data->data() + message->data->size() - message->context.size;
 
   DCHECK(channel()->has_schema())
       << ": Missing schema for channel "
@@ -959,21 +1004,35 @@
       << ": Attempting to send too big a message on "
       << configuration::CleanedChannelToString(simulated_channel_->channel());
 
-  // This is wasteful, but since flatbuffers fill from the back end of the
-  // queue, we need it to be full sized.
-  message_ = SimulatedMessage::Make(simulated_channel_);
+  // Allocates an aligned buffer in which to copy unaligned msg.
+  auto [span, mutable_span] = MakeSharedSpan(size);
+  message_ = SimulatedMessage::Make(simulated_channel_, span);
 
   // Now fill in the message.  size is already populated above, and
-  // queue_index will be populated in simulated_channel_.  Put this at the
-  // back of the data segment.
-  memcpy(message_->data(simulated_channel_->max_size()) +
-             simulated_channel_->max_size() - size,
-         msg, size);
+  // queue_index will be populated in simulated_channel_.
+  memcpy(mutable_span.data(), msg, size);
 
   return DoSend(size, monotonic_remote_time, realtime_remote_time,
                 remote_queue_index, source_boot_uuid);
 }
 
+bool SimulatedSender::DoSend(const RawSender::SharedSpan data,
+                             monotonic_clock::time_point monotonic_remote_time,
+                             realtime_clock::time_point realtime_remote_time,
+                             uint32_t remote_queue_index,
+                             const UUID &source_boot_uuid) {
+  CHECK_LE(data->size(), this->size())
+      << ": Attempting to send too big a message on "
+      << configuration::CleanedChannelToString(simulated_channel_->channel());
+
+  // Constructs a message sharing the already allocated and aligned message
+  // data.
+  message_ = SimulatedMessage::Make(simulated_channel_, data);
+
+  return DoSend(data->size(), monotonic_remote_time, realtime_remote_time,
+                remote_queue_index, source_boot_uuid);
+}
+
 SimulatedTimerHandler::SimulatedTimerHandler(
     EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
     ::std::function<void()> fn)