Stop stripping the size prefix off

This turns out to be super dangerous to do.  A flatbuffer is aligned
assuming that the size is either there or not there.  By removing it,
you break alignment.

This necesitates having 2 subclasses of Flatbuffer.  A SizePrefixed
version and a non size prefixed version.  That lets us distinguish for
methods which care.

Once all that's done, deal with the fallout through the code base,
including logfile_utils and the chaos that causes rippling out.

Change-Id: I91b7be355279a1c19e5c956c33359df01a17eacf
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 5c0b628..08fa7fd 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -193,12 +193,12 @@
   // Also, flatbuffers build from the back end.  So place this at the back end
   // of the buffer.  We only have to care because we are using this in a very
   // raw fashion.
-  CHECK_LE(timing_report_.size(), timing_report_sender_->size())
+  CHECK_LE(timing_report_.span().size(), timing_report_sender_->size())
       << ": Timing report bigger than the sender size.";
-  std::copy(timing_report_.data(),
-            timing_report_.data() + timing_report_.size(),
+  std::copy(timing_report_.span().data(),
+            timing_report_.span().data() + timing_report_.span().size(),
             reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
-                timing_report_sender_->size() - timing_report_.size());
+                timing_report_sender_->size() - timing_report_.span().size());
 
   for (const std::unique_ptr<TimerHandler> &timer : timers_) {
     timer->timing_.ResetTimingReport();
@@ -215,7 +215,7 @@
   for (RawFetcher *fetcher : fetchers_) {
     fetcher->timing_.ResetTimingReport();
   }
-  timing_report_sender_->Send(timing_report_.size());
+  timing_report_sender_->Send(timing_report_.span().size());
 }
 
 void EventLoop::UpdateTimingReport() {
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 9dac88b..5b699aa 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -350,7 +350,7 @@
   Builder MakeBuilder();
 
   // Sends a prebuilt flatbuffer.
-  bool Send(const Flatbuffer<T> &flatbuffer);
+  bool Send(const NonSizePrefixedFlatbuffer<T> &flatbuffer);
 
   // Sends a prebuilt flatbuffer which was detached from a Builder created via
   // MakeBuilder() on this object.
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 9e16d20..055e82c 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -337,17 +337,17 @@
 };
 
 template <typename T>
-bool Sender<T>::Send(const Flatbuffer<T> &flatbuffer) {
-  return sender_->Send(flatbuffer.data(), flatbuffer.size());
+bool Sender<T>::Send(const NonSizePrefixedFlatbuffer<T> &flatbuffer) {
+  return sender_->Send(flatbuffer.span().data(), flatbuffer.span().size());
 }
 
 template <typename T>
 bool Sender<T>::SendDetached(FlatbufferDetachedBuffer<T> detached) {
-  CHECK_EQ(
-      static_cast<void *>(detached.data() + detached.size() - sender_->size()),
-      sender_->data())
+  CHECK_EQ(static_cast<void *>(detached.span().data() + detached.span().size() -
+                               sender_->size()),
+           sender_->data())
       << ": May only send the buffer detached from this Sender";
-  return sender_->Send(detached.size());
+  return sender_->Send(detached.span().size());
 }
 
 }  // namespace aos
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index e0da59e..caa2b90 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -111,8 +111,9 @@
               << std::endl;
 
     while (true) {
-      std::optional<aos::FlatbufferVector<aos::logger::MessageHeader>> message =
-          reader.ReadMessage();
+      std::optional<
+          aos::SizePrefixedFlatbufferVector<aos::logger::MessageHeader>>
+          message = reader.ReadMessage();
       if (!message) {
         break;
       }
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 411e666..ec60143 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -32,7 +32,7 @@
     const Node *node) {
   CHECK_EQ(node, this->node());
   UpdateHeader(header, uuid_, part_number_);
-  data_writer_->QueueSpan(header->full_span());
+  data_writer_->QueueSpan(header->span());
 }
 
 DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
@@ -48,7 +48,7 @@
   ++part_number_;
   *data_writer_ = std::move(*OpenDataWriter());
   UpdateHeader(header, uuid_, part_number_);
-  data_writer_->QueueSpan(header->full_span());
+  data_writer_->QueueSpan(header->span());
 }
 
 DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
@@ -88,14 +88,14 @@
       OpenDataWriter();
     }
     UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
-    data_writer_.writer->QueueSpan(header->full_span());
+    data_writer_.writer->QueueSpan(header->span());
   } else {
     for (std::pair<const Channel *const, DataWriter> &data_writer :
          data_writers_) {
       if (node == data_writer.second.node) {
         UpdateHeader(header, data_writer.second.uuid,
                      data_writer.second.part_number);
-        data_writer.second.writer->QueueSpan(header->full_span());
+        data_writer.second.writer->QueueSpan(header->span());
       }
     }
   }
@@ -110,7 +110,7 @@
     }
     OpenDataWriter();
     UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
-    data_writer_.writer->QueueSpan(header->full_span());
+    data_writer_.writer->QueueSpan(header->span());
   } else {
     for (std::pair<const Channel *const, DataWriter> &data_writer :
          data_writers_) {
@@ -119,7 +119,7 @@
         data_writer.second.rotate(data_writer.first, &data_writer.second);
         UpdateHeader(header, data_writer.second.uuid,
                      data_writer.second.part_number);
-        data_writer.second.writer->QueueSpan(header->full_span());
+        data_writer.second.writer->QueueSpan(header->span());
       }
     }
   }
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 43e6aa2..08a230c 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -65,7 +65,7 @@
 
   // Now extract everything into our datastructures above for sorting.
   for (const std::string &part : parts) {
-    std::optional<FlatbufferVector<LogFileHeader>> log_header =
+    std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
         ReadHeader(part);
     if (!log_header) {
       LOG(WARNING) << "Skipping " << part << " without a header";
@@ -94,7 +94,7 @@
     if (!log_header->message().has_parts_uuid() &&
         !log_header->message().has_parts_index() &&
         !log_header->message().has_node()) {
-      std::optional<FlatbufferVector<MessageHeader>> first_message =
+      std::optional<SizePrefixedFlatbufferVector<MessageHeader>> first_message =
           ReadNthMessage(part, 0);
       if (!first_message) {
         LOG(WARNING) << "Skipping " << part << " without any messages";
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index d906997..330c78e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -348,7 +348,7 @@
   return true;
 }
 
-std::optional<FlatbufferVector<LogFileHeader>> ReadHeader(
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
     std::string_view filename) {
   SpanReader span_reader(filename);
   absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
@@ -360,13 +360,12 @@
 
   // And copy the config so we have it forever, removing the size prefix.
   ResizeableBuffer data;
-  data.resize(config_data.size() - sizeof(flatbuffers::uoffset_t));
-  memcpy(data.data(), config_data.begin() + sizeof(flatbuffers::uoffset_t),
-         data.size());
-  return FlatbufferVector<LogFileHeader>(std::move(data));
+  data.resize(config_data.size());
+  memcpy(data.data(), config_data.begin(), data.size());
+  return SizePrefixedFlatbufferVector<LogFileHeader>(std::move(data));
 }
 
-std::optional<FlatbufferVector<MessageHeader>> ReadNthMessage(
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
     std::string_view filename, size_t n) {
   SpanReader span_reader(filename);
   absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
@@ -381,15 +380,15 @@
 
   // And copy the config so we have it forever, removing the size prefix.
   ResizeableBuffer data;
-  data.resize(data_span.size() - sizeof(flatbuffers::uoffset_t));
-  memcpy(data.data(), data_span.begin() + sizeof(flatbuffers::uoffset_t),
-         data.size());
-  return FlatbufferVector<MessageHeader>(std::move(data));
+  data.resize(data_span.size());
+  memcpy(data.data(), data_span.begin(), data.size());
+  return SizePrefixedFlatbufferVector<MessageHeader>(std::move(data));
 }
 
 MessageReader::MessageReader(std::string_view filename)
     : span_reader_(filename),
-      raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
+      raw_log_file_header_(
+          SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
   // Make sure we have enough to read the size.
   absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
 
@@ -399,12 +398,10 @@
 
   // And copy the header data so we have it forever.
   ResizeableBuffer header_data_copy;
-  header_data_copy.resize(header_data.size() - sizeof(flatbuffers::uoffset_t));
-  memcpy(header_data_copy.data(),
-         header_data.begin() + sizeof(flatbuffers::uoffset_t),
-         header_data_copy.size());
+  header_data_copy.resize(header_data.size());
+  memcpy(header_data_copy.data(), header_data.begin(), header_data_copy.size());
   raw_log_file_header_ =
-      FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
+      SizePrefixedFlatbufferVector<LogFileHeader>(std::move(header_data_copy));
 
   max_out_of_order_duration_ =
       chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
@@ -413,18 +410,17 @@
           << FlatbufferToJson(log_file_header()->node());
 }
 
-std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
+MessageReader::ReadMessage() {
   absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
   if (msg_data == absl::Span<const uint8_t>()) {
     return std::nullopt;
   }
 
   ResizeableBuffer result_buffer;
-  result_buffer.resize(msg_data.size() - sizeof(flatbuffers::uoffset_t));
-  memcpy(result_buffer.data(),
-         msg_data.begin() + sizeof(flatbuffers::uoffset_t),
-         result_buffer.size());
-  FlatbufferVector<MessageHeader> result(std::move(result_buffer));
+  result_buffer.resize(msg_data.size());
+  memcpy(result_buffer.data(), msg_data.begin(), result_buffer.size());
+  SizePrefixedFlatbufferVector<MessageHeader> result(std::move(result_buffer));
 
   const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
       chrono::nanoseconds(result.message().monotonic_sent_time()));
@@ -437,10 +433,10 @@
 PartsMessageReader::PartsMessageReader(LogParts log_parts)
     : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
 
-std::optional<FlatbufferVector<MessageHeader>>
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
 PartsMessageReader::ReadMessage() {
   while (!done_) {
-    std::optional<FlatbufferVector<MessageHeader>> message =
+    std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
         message_reader_.ReadMessage();
     if (message) {
       newest_timestamp_ = message_reader_.newest_timestamp();
@@ -468,7 +464,7 @@
 SplitMessageReader::SplitMessageReader(
     const std::vector<std::string> &filenames)
     : filenames_(filenames),
-      log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
+      log_file_header_(SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
   CHECK(NextLogFile()) << ": filenames is empty.  Need files to read.";
 
   // Grab any log file header.  They should all match (and we will check as we
@@ -674,7 +670,7 @@
       return true;
     }
 
-    if (std::optional<FlatbufferVector<MessageHeader>> msg =
+    if (std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
             message_reader_->ReadMessage()) {
       const MessageHeader &header = msg.value().message();
 
@@ -687,7 +683,7 @@
                   << newest_timestamp() << " start time "
                   << monotonic_start_time() << " " << FlatbufferToJson(&header);
       } else if (VLOG_IS_ON(1)) {
-        FlatbufferVector<MessageHeader> copy = msg.value();
+        SizePrefixedFlatbufferVector<MessageHeader> copy = msg.value();
         copy.mutable_message()->clear_data();
         LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
                   << filename() << " ttq: " << time_to_queue_ << " now "
@@ -777,12 +773,12 @@
 }
 
 std::tuple<monotonic_clock::time_point, uint32_t,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 SplitMessageReader::PopOldest(int channel_index) {
   CHECK_GT(channels_[channel_index].data.size(), 0u);
   const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
       timestamp = channels_[channel_index].data.front_timestamp();
-  FlatbufferVector<MessageHeader> front =
+  SizePrefixedFlatbufferVector<MessageHeader> front =
       std::move(channels_[channel_index].data.front());
   channels_[channel_index].data.PopFront();
 
@@ -799,12 +795,12 @@
 }
 
 std::tuple<monotonic_clock::time_point, uint32_t,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
   CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
   const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
       timestamp = channels_[channel].timestamps[node_index].front_timestamp();
-  FlatbufferVector<MessageHeader> front =
+  SizePrefixedFlatbufferVector<MessageHeader> front =
       std::move(channels_[channel].timestamps[node_index].front());
   channels_[channel].timestamps[node_index].PopFront();
 
@@ -823,7 +819,7 @@
 }
 
 bool SplitMessageReader::MessageHeaderQueue::emplace_back(
-    FlatbufferVector<MessageHeader> &&msg) {
+    SizePrefixedFlatbufferVector<MessageHeader> &&msg) {
   CHECK(split_reader != nullptr);
 
   // If there is no timestamp merger for this queue, nobody is listening.  Drop
@@ -1021,7 +1017,7 @@
 }
 
 std::tuple<monotonic_clock::time_point, uint32_t,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 TimestampMerger::PopMessageHeap() {
   // Pop the oldest message reader pointer off the heap.
   CHECK_GT(message_heap_.size(), 0u);
@@ -1035,7 +1031,7 @@
   // Pop the oldest message.  This re-pushes any messages from the reader to the
   // message heap.
   std::tuple<monotonic_clock::time_point, uint32_t,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
       oldest_message =
           std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
 
@@ -1063,7 +1059,7 @@
       // Pop the next oldest message.  This re-pushes any messages from the
       // reader.
       std::tuple<monotonic_clock::time_point, uint32_t,
-                 FlatbufferVector<MessageHeader>>
+                 SizePrefixedFlatbufferVector<MessageHeader>>
           next_oldest_message = std::get<2>(next_oldest_message_reader)
                                     ->PopOldest(channel_index_);
 
@@ -1080,7 +1076,7 @@
 }
 
 std::tuple<monotonic_clock::time_point, uint32_t,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 TimestampMerger::PopTimestampHeap() {
   // Pop the oldest message reader pointer off the heap.
   CHECK_GT(timestamp_heap_.size(), 0u);
@@ -1097,7 +1093,7 @@
   // Pop the oldest message.  This re-pushes any timestamps from the reader to
   // the timestamp heap.
   std::tuple<monotonic_clock::time_point, uint32_t,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
       oldest_timestamp = std::get<2>(oldest_timestamp_reader)
                              ->PopOldestTimestamp(channel_index_, node_index_);
 
@@ -1128,7 +1124,7 @@
       // Pop the next oldest timestamp.  This re-pushes any messages from the
       // reader.
       std::tuple<monotonic_clock::time_point, uint32_t,
-                 FlatbufferVector<MessageHeader>>
+                 SizePrefixedFlatbufferVector<MessageHeader>>
           next_oldest_timestamp =
               std::get<2>(next_oldest_timestamp_reader)
                   ->PopOldestTimestamp(channel_index_, node_index_);
@@ -1157,7 +1153,8 @@
   return oldest_timestamp;
 }
 
-std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
+std::tuple<TimestampMerger::DeliveryTimestamp,
+           SizePrefixedFlatbufferVector<MessageHeader>>
 TimestampMerger::PopOldest() {
   if (has_timestamps_) {
     VLOG(1) << "Looking for matching timestamp for "
@@ -1168,7 +1165,7 @@
 
     // Read the timestamps.
     std::tuple<monotonic_clock::time_point, uint32_t,
-               FlatbufferVector<MessageHeader>>
+               SizePrefixedFlatbufferVector<MessageHeader>>
         oldest_timestamp = PopTimestampHeap();
 
     TimestampMerger::DeliveryTimestamp timestamp;
@@ -1250,7 +1247,7 @@
                      configuration_->channels()->Get(channel_index_))
               << " (" << channel_index_ << ")";
       std::tuple<monotonic_clock::time_point, uint32_t,
-                 FlatbufferVector<MessageHeader>>
+                 SizePrefixedFlatbufferVector<MessageHeader>>
           oldest_message = PopMessageHeap();
 
       timestamp.realtime_remote_time =
@@ -1272,7 +1269,7 @@
     }
   } else {
     std::tuple<monotonic_clock::time_point, uint32_t,
-               FlatbufferVector<MessageHeader>>
+               SizePrefixedFlatbufferVector<MessageHeader>>
         oldest_message = PopMessageHeap();
 
     TimestampMerger::DeliveryTimestamp timestamp;
@@ -1354,7 +1351,7 @@
       if (both_null || node_names_identical) {
         if (!found_node) {
           found_node = true;
-          log_file_header_ = CopyFlatBuffer(reader->log_file_header());
+          log_file_header_ = reader->raw_log_file_header();
           VLOG(1) << "Found log file " << reader->filename() << " with node "
                   << FlatbufferToJson(reader->node()) << " start_time "
                   << monotonic_start_time();
@@ -1467,7 +1464,7 @@
 }
 
 std::tuple<TimestampMerger::DeliveryTimestamp, int,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 ChannelMerger::PopOldest() {
   CHECK_GT(channel_heap_.size(), 0u);
   std::pair<monotonic_clock::time_point, int> oldest_channel_data =
@@ -1483,7 +1480,7 @@
 
   // Merger handles any queueing needed from here.
   std::tuple<TimestampMerger::DeliveryTimestamp,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
       message = merger->PopOldest();
   DCHECK_EQ(std::get<0>(message).monotonic_event_time,
             oldest_channel_data.first)
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 8381a9a..985a6bc 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -181,9 +181,9 @@
     flatbuffers::FlatBufferBuilder *fbb, const Context &context,
     int channel_index, LogType log_type);
 
-std::optional<FlatbufferVector<LogFileHeader>> ReadHeader(
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
     std::string_view filename);
-std::optional<FlatbufferVector<MessageHeader>> ReadNthMessage(
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
     std::string_view filename, size_t n);
 
 // Class to read chunks out of a log file.
@@ -234,7 +234,8 @@
   }
 
   // Returns the raw data of the header from the log file.
-  const FlatbufferVector<LogFileHeader> &raw_log_file_header() const {
+  const SizePrefixedFlatbufferVector<LogFileHeader> &raw_log_file_header()
+      const {
     return raw_log_file_header_;
   }
 
@@ -250,7 +251,7 @@
   }
 
   // Returns the next message if there is one.
-  std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
+  std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
 
   // The time at which we need to read another chunk from the logfile.
   monotonic_clock::time_point queue_data_time() const {
@@ -262,7 +263,7 @@
   SpanReader span_reader_;
 
   // Vector holding the raw data for the log file header.
-  FlatbufferVector<LogFileHeader> raw_log_file_header_;
+  SizePrefixedFlatbufferVector<LogFileHeader> raw_log_file_header_;
 
   // Minimum amount of data to queue up for sorting before we are guarenteed
   // to not see data out of order.
@@ -293,7 +294,7 @@
   // Returns the next message if there is one, or nullopt if we have reached the
   // end of all the files.
   // Note: reading the next message may change the max_out_of_order_duration().
-  std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
+  std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
 
  private:
   // Opens the next log and updates message_reader_.  Sets done_ if there is
@@ -351,13 +352,13 @@
   // Returns the timestamp, queue_index, and message for the oldest data on a
   // channel.  Requeues data as needed.
   std::tuple<monotonic_clock::time_point, uint32_t,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
   PopOldest(int channel_index);
 
   // Returns the timestamp, queue_index, and message for the oldest timestamp on
   // a channel delivered to a node.  Requeues data as needed.
   std::tuple<monotonic_clock::time_point, uint32_t,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
   PopOldestTimestamp(int channel, int node_index);
 
   // Returns the header for the log files.
@@ -365,7 +366,8 @@
     return &log_file_header_.message();
   }
 
-  const FlatbufferVector<LogFileHeader> &raw_log_file_header() const {
+  const SizePrefixedFlatbufferVector<LogFileHeader> &raw_log_file_header()
+      const {
     return log_file_header_;
   }
 
@@ -444,7 +446,7 @@
   const Node *target_node_ = nullptr;
 
   // Log file header to report.  This is a copy.
-  FlatbufferVector<LogFileHeader> log_file_header_;
+  SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
   // Current log file being read.
   std::unique_ptr<MessageReader> message_reader_;
 
@@ -455,14 +457,14 @@
     bool timestamps = false;
 
     // Returns a reference to the the oldest message.
-    FlatbufferVector<MessageHeader> &front() {
+    SizePrefixedFlatbufferVector<MessageHeader> &front() {
       CHECK_GT(data_.size(), 0u);
       return data_.front();
     }
 
     // Adds a message to the back of the queue. Returns true if it was actually
     // emplaced.
-    bool emplace_back(FlatbufferVector<MessageHeader> &&msg);
+    bool emplace_back(SizePrefixedFlatbufferVector<MessageHeader> &&msg);
 
     // Drops the front message.  Invalidates the front() reference.
     void PopFront();
@@ -492,7 +494,7 @@
 
    private:
     // The data.
-    std::deque<FlatbufferVector<MessageHeader>> data_;
+    std::deque<SizePrefixedFlatbufferVector<MessageHeader>> data_;
   };
 
   // All the queues needed for a channel.  There isn't going to be data in all
@@ -569,7 +571,8 @@
   // Returns the oldest combined timestamp and data for this channel.  If there
   // isn't a matching piece of data, returns only the timestamp with no data.
   // The caller can determine what the appropriate action is to recover.
-  std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
+  std::tuple<DeliveryTimestamp, SizePrefixedFlatbufferVector<MessageHeader>>
+  PopOldest();
 
   // Tracks if the channel merger has pushed this onto it's heap or not.
   bool pushed() { return pushed_; }
@@ -609,7 +612,7 @@
   // Pops a message from the message heap.  This automatically triggers the
   // split message reader to re-fetch any new data.
   std::tuple<monotonic_clock::time_point, uint32_t,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
   PopMessageHeap();
 
   std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
@@ -619,7 +622,7 @@
   // Pops a message from the timestamp heap.  This automatically triggers the
   // split message reader to re-fetch any new data.
   std::tuple<monotonic_clock::time_point, uint32_t,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
   PopTimestampHeap();
 
   const Configuration *configuration_;
@@ -678,7 +681,7 @@
   monotonic_clock::time_point OldestMessageTime() const;
   // Pops the oldest message.
   std::tuple<TimestampMerger::DeliveryTimestamp, int,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
   PopOldest();
 
   // Returns the config for this set of log files.
@@ -734,7 +737,7 @@
   std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
 
   // The log header we are claiming to be.
-  FlatbufferVector<LogFileHeader> log_file_header_;
+  SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
 
   // The timestamp mergers which combine data from the split message readers.
   std::vector<TimestampMerger> timestamp_mergers_;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 3644419..14d1de7 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -39,15 +39,15 @@
 
   {
     DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
-    writer.QueueSpan(m1.full_span());
-    writer.QueueSpan(m2.full_span());
+    writer.QueueSpan(m1.span());
+    writer.QueueSpan(m2.span());
   }
 
   SpanReader reader(logfile);
 
   EXPECT_EQ(reader.filename(), logfile);
-  EXPECT_EQ(reader.ReadMessage(), m1.full_span());
-  EXPECT_EQ(reader.ReadMessage(), m2.full_span());
+  EXPECT_EQ(reader.ReadMessage(), m1.span());
+  EXPECT_EQ(reader.ReadMessage(), m2.span());
   EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
 }
 
@@ -69,9 +69,9 @@
 
   {
     DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
-    writer.QueueSpan(config.full_span());
-    writer.QueueSpan(m1.full_span());
-    writer.QueueSpan(m2.full_span());
+    writer.QueueSpan(config.span());
+    writer.QueueSpan(m1.span());
+    writer.QueueSpan(m2.span());
   }
 
   MessageReader reader(logfile);
@@ -117,10 +117,10 @@
 
   {
     DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
-    writer.QueueSpan(config0.full_span());
-    writer.QueueSpan(m1.full_span());
-    writer.QueueSpan(m2.full_span());
-    writer.QueueSpan(m3.full_span());
+    writer.QueueSpan(config0.span());
+    writer.QueueSpan(m1.span());
+    writer.QueueSpan(m2.span());
+    writer.QueueSpan(m3.span());
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0});
@@ -168,13 +168,13 @@
 
   {
     DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
-    writer.QueueSpan(config0.full_span());
-    writer.QueueSpan(m1.full_span());
+    writer.QueueSpan(config0.span());
+    writer.QueueSpan(m1.span());
   }
   {
     DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
-    writer.QueueSpan(config1.full_span());
-    writer.QueueSpan(m2.full_span());
+    writer.QueueSpan(config1.span());
+    writer.QueueSpan(m2.span());
   }
 
   const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 977a82f..b32c748 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -37,11 +37,11 @@
 namespace logger {
 namespace {
 // Helper to safely read a header, or CHECK.
-FlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
+SizePrefixedFlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
     const std::vector<std::vector<std::string>> &filenames) {
   CHECK_GE(filenames.size(), 1u) << ": Empty filenames list";
   CHECK_GE(filenames[0].size(), 1u) << ": Empty filenames list";
-  std::optional<FlatbufferVector<LogFileHeader>> result =
+  std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> result =
       ReadHeader(filenames[0][0]);
   CHECK(result);
   return result.value();
@@ -1285,8 +1285,8 @@
     }
     TimestampMerger::DeliveryTimestamp channel_timestamp;
     int channel_index;
-    FlatbufferVector<MessageHeader> channel_data =
-        FlatbufferVector<MessageHeader>::Empty();
+    SizePrefixedFlatbufferVector<MessageHeader> channel_data =
+        SizePrefixedFlatbufferVector<MessageHeader>::Empty();
 
     if (VLOG_IS_ON(1)) {
       LogFit("Offset was");
@@ -1900,12 +1900,12 @@
 }
 
 std::tuple<TimestampMerger::DeliveryTimestamp, int,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 LogReader::State::PopOldest(bool *update_time) {
   CHECK_GT(sorted_messages_.size(), 0u);
 
   std::tuple<TimestampMerger::DeliveryTimestamp, int,
-             FlatbufferVector<MessageHeader>,
+             SizePrefixedFlatbufferVector<MessageHeader>,
              message_bridge::NoncausalOffsetEstimator *>
       result = std::move(sorted_messages_.front());
   VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
@@ -1955,8 +1955,8 @@
 
     TimestampMerger::DeliveryTimestamp channel_timestamp;
     int channel_index;
-    FlatbufferVector<MessageHeader> channel_data =
-        FlatbufferVector<MessageHeader>::Empty();
+    SizePrefixedFlatbufferVector<MessageHeader> channel_data =
+        SizePrefixedFlatbufferVector<MessageHeader>::Empty();
 
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
 
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index b37fea2..f6a037b 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -454,7 +454,7 @@
 
   // This is *a* log file header used to provide the logged config.  The rest of
   // the header is likely distracting.
-  FlatbufferVector<LogFileHeader> log_file_header_;
+  SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
 
   // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
   std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
@@ -472,7 +472,7 @@
     // update_time (will be) set to true when popping this message causes the
     // filter to change the time offset estimation function.
     std::tuple<TimestampMerger::DeliveryTimestamp, int,
-               FlatbufferVector<MessageHeader>>
+               SizePrefixedFlatbufferVector<MessageHeader>>
     PopOldest(bool *update_time);
 
     // Returns the monotonic time of the oldest message.
@@ -614,7 +614,7 @@
     std::unique_ptr<ChannelMerger> channel_merger_;
 
     std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
-                          FlatbufferVector<MessageHeader>,
+                          SizePrefixedFlatbufferVector<MessageHeader>,
                           message_bridge::NoncausalOffsetEstimator *>>
         sorted_messages_;
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index c55b18b..dbc8a78 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -263,7 +263,7 @@
   {
     // Confirm that the UUIDs match for both the parts and the logger, and the
     // parts_index increments.
-    std::vector<FlatbufferVector<LogFileHeader>> log_header;
+    std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
     for (std::string_view f : {logfile0, logfile1}) {
       log_header.emplace_back(ReadHeader(f).value());
     }
@@ -563,7 +563,7 @@
       message_reader.log_file_header()->configuration()->channels()->size(), 0);
 
   while (true) {
-    std::optional<FlatbufferVector<MessageHeader>> msg =
+    std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
         message_reader.ReadMessage();
     if (!msg) {
       break;
@@ -636,7 +636,7 @@
     std::set<std::string> parts_uuids;
     // Confirm that we have the expected number of UUIDs for both the logfile
     // UUIDs and parts UUIDs.
-    std::vector<FlatbufferVector<LogFileHeader>> log_header;
+    std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
     for (std::string_view f : logfiles_) {
       log_header.emplace_back(ReadHeader(f).value());
       logfile_uuids.insert(log_header.back().message().log_event_uuid()->str());
diff --git a/aos/flatbuffer_introspection_test.cc b/aos/flatbuffer_introspection_test.cc
index d771b55..d830b34 100644
--- a/aos/flatbuffer_introspection_test.cc
+++ b/aos/flatbuffer_introspection_test.cc
@@ -12,7 +12,7 @@
   FlatbufferIntrospectionTest()
       : schema_data_(
             util::ReadFileToStringOrDie("aos/json_to_flatbuffer.bfbs")) {
-    schema_ = reflection::GetSchema(schema_data_.data());
+    schema_ = reflection::GetSchema(schema_data_.span().data());
   }
 
  protected:
diff --git a/aos/flatbuffer_merge.cc b/aos/flatbuffer_merge.cc
index dd4b4fa..8bd020a 100644
--- a/aos/flatbuffer_merge.cc
+++ b/aos/flatbuffer_merge.cc
@@ -496,36 +496,6 @@
   return fbb->EndTable(start);
 }
 
-flatbuffers::Offset<flatbuffers::Table> MergeFlatBuffers(
-    const flatbuffers::TypeTable *typetable, const uint8_t *data1,
-    const uint8_t *data2, flatbuffers::FlatBufferBuilder *fbb) {
-  // Grab the 2 tables.
-  const flatbuffers::Table *t1 =
-      data1 != nullptr ? flatbuffers::GetRoot<flatbuffers::Table>(data1)
-                       : nullptr;
-  const flatbuffers::Table *t2 =
-      data2 != nullptr ? flatbuffers::GetRoot<flatbuffers::Table>(data2)
-                       : nullptr;
-
-  // And then do the actual build.  This doesn't contain finish so we can nest
-  // them nicely.
-  return flatbuffers::Offset<flatbuffers::Table>(
-      MergeFlatBuffers(typetable, t1, t2, fbb));
-}
-
-flatbuffers::DetachedBuffer MergeFlatBuffers(
-    const flatbuffers::TypeTable *typetable, const uint8_t *data1,
-    const uint8_t *data2) {
-  // Build up a builder.
-  flatbuffers::FlatBufferBuilder fbb;
-  fbb.ForceDefaults(true);
-
-  // Finish up the buffer and return it.
-  fbb.Finish(MergeFlatBuffers(typetable, data1, data2, &fbb));
-
-  return fbb.Release();
-}
-
 bool CompareFlatBuffer(const flatbuffers::TypeTable *typetable,
                        const flatbuffers::Table *t1,
                        const flatbuffers::Table *t2) {
diff --git a/aos/flatbuffer_merge.h b/aos/flatbuffer_merge.h
index 537d121..d98a9eb 100644
--- a/aos/flatbuffer_merge.h
+++ b/aos/flatbuffer_merge.h
@@ -9,10 +9,6 @@
 
 namespace aos {
 
-flatbuffers::DetachedBuffer MergeFlatBuffers(
-    const flatbuffers::TypeTable *typetable, const uint8_t *data1,
-    const uint8_t *data2);
-
 // Merges 2 flat buffers with the provided type table into the builder.  Returns
 // the offset to the flatbuffers.
 // One or both of t1 and t2 must be non-null.  If one is null, this method
@@ -23,35 +19,12 @@
 
 template <class T>
 inline flatbuffers::Offset<T> MergeFlatBuffers(
-    const flatbuffers::Table *t1,
-    const flatbuffers::Table *t2, flatbuffers::FlatBufferBuilder *fbb) {
+    const flatbuffers::Table *t1, const flatbuffers::Table *t2,
+    flatbuffers::FlatBufferBuilder *fbb) {
   return MergeFlatBuffers(T::MiniReflectTypeTable(), t1, t2, fbb).o;
 }
 
 template <class T>
-inline flatbuffers::DetachedBuffer MergeFlatBuffers(const uint8_t *data1,
-                                                    const uint8_t *data2) {
-  return MergeFlatBuffers(T::MiniReflectTypeTable(), data1, data2);
-}
-
-template <class T>
-inline flatbuffers::DetachedBuffer MergeFlatBuffers(
-    const flatbuffers::DetachedBuffer &data1,
-    const flatbuffers::DetachedBuffer &data2) {
-  return MergeFlatBuffers(T::MiniReflectTypeTable(), data1.data(),
-                          data2.data());
-}
-
-template <class T>
-inline aos::FlatbufferDetachedBuffer<T> MergeFlatBuffers(
-    const aos::Flatbuffer<T> &fb1, const aos::Flatbuffer<T> &fb2) {
-const uint8_t *data1 = fb1.data();
-const uint8_t *data2 = fb2.data();
-  return aos::FlatbufferDetachedBuffer<T>(
-      MergeFlatBuffers(T::MiniReflectTypeTable(), data1, data2));
-}
-
-template <class T>
 inline aos::FlatbufferDetachedBuffer<T> MergeFlatBuffers(const T *fb1,
                                                          const T *fb2) {
   flatbuffers::FlatBufferBuilder fbb;
@@ -70,6 +43,13 @@
                              fbb);
 }
 
+template <class T>
+inline aos::FlatbufferDetachedBuffer<T> MergeFlatBuffers(
+    const aos::Flatbuffer<T> &fb1, const aos::Flatbuffer<T> &fb2) {
+  return aos::FlatbufferDetachedBuffer<T>(
+      MergeFlatBuffers<T>(&fb1.message(), &fb2.message()));
+}
+
 // Copies a flatbuffer by walking the tree and copying all the pieces.  This
 // converts DAGs to trees.
 template <class T>
@@ -105,20 +85,44 @@
           .o);
 }
 
-// Copies a flatbuffer by copying all the data without looking inside and
-// pointing inside it.
-template <class T>
-inline flatbuffers::Offset<T> BlindCopyFlatBuffer(
-    const Flatbuffer<T> &t, flatbuffers::FlatBufferBuilder *fbb) {
+namespace flatbuffer_merge_internal {
+
+inline flatbuffers::uoffset_t DoBlindCopyFlatBuffer(
+    const void *message, absl::Span<const uint8_t> span,
+    flatbuffers::FlatBufferBuilder *fbb) {
   // Enforce 8 byte alignment so anything inside the flatbuffer can be read.
   fbb->Align(sizeof(flatbuffers::largest_scalar_t));
 
   // We don't know how much of the start of the flatbuffer is padding.  The
   // safest thing to do from an alignment point of view (without looking inside)
   // is to copy the initial offset and leave it as dead space.
-  fbb->PushBytes(t.data(), t.size());
+  fbb->PushBytes(span.data(), span.size());
+  // Then, compute the offset from the back by computing the distance from the
+  // front to the start of the message.
   return fbb->GetSize() -
-         flatbuffers::ReadScalar<flatbuffers::uoffset_t>(t.data());
+         static_cast<flatbuffers::uoffset_t>(
+             reinterpret_cast<const uint8_t *>(message) - span.data());
+}
+
+}  // namespace flatbuffer_merge_internal
+
+// Copies a flatbuffer by copying all the data without looking inside and
+// pointing inside it.
+template <class T>
+inline flatbuffers::Offset<T> BlindCopyFlatBuffer(
+    const NonSizePrefixedFlatbuffer<T> &t,
+    flatbuffers::FlatBufferBuilder *fbb) {
+  return flatbuffer_merge_internal::DoBlindCopyFlatBuffer(&t.message(),
+                                                          t.span(), fbb);
+}
+
+// Copies a flatbuffer by copying all the data without looking inside and
+// pointing inside it.
+template <class T>
+inline flatbuffers::Offset<T> BlindCopyFlatBuffer(
+    const SizePrefixedFlatbuffer<T> &t, flatbuffers::FlatBufferBuilder *fbb) {
+  return flatbuffer_merge_internal::DoBlindCopyFlatBuffer(&t.message(),
+                                                          t.span(), fbb);
 }
 
 template <class T>
@@ -179,8 +183,14 @@
 }
 
 template <class T>
-inline bool CompareFlatBuffer(const aos::Flatbuffer<T> &t1,
-                              const aos::Flatbuffer<T> &t2) {
+inline bool CompareFlatBuffer(const aos::NonSizePrefixedFlatbuffer<T> &t1,
+                              const aos::NonSizePrefixedFlatbuffer<T> &t2) {
+  return t1.span() == t2.span();
+}
+
+template <class T>
+inline bool CompareFlatBuffer(const aos::SizePrefixedFlatbuffer<T> &t1,
+                              const aos::SizePrefixedFlatbuffer<T> &t2) {
   return t1.span() == t2.span();
 }
 
diff --git a/aos/flatbuffer_merge_test.cc b/aos/flatbuffer_merge_test.cc
index 85773a3..4162edc 100644
--- a/aos/flatbuffer_merge_test.cc
+++ b/aos/flatbuffer_merge_test.cc
@@ -18,63 +18,59 @@
       fbb.GetSize());
 }
 
-std::string_view FromFbb(const flatbuffers::DetachedBuffer &b) {
-  return std::string_view(reinterpret_cast<const char *>(b.data()), b.size());
-}
-
-absl::Span<const uint8_t> ToSpan(const flatbuffers::DetachedBuffer &b) {
-  return absl::Span<const uint8_t>(b.data(), b.size());
+std::string_view FromFbb(const FlatbufferDetachedBuffer<Configuration> &b) {
+  return std::string_view(reinterpret_cast<const char *>(b.span().data()),
+                          b.span().size());
 }
 
 class FlatbufferMerge : public ::testing::Test {
  public:
   FlatbufferMerge() {}
 
-  void ExpectMergedOutput(const flatbuffers::DetachedBuffer &fb_merged,
-                          std::string_view expected_output) {
-    ASSERT_NE(fb_merged.size(), 0u);
+  void ExpectMergedOutput(
+      const NonSizePrefixedFlatbuffer<Configuration> &fb_merged,
+      std::string_view expected_output) {
+    ASSERT_NE(fb_merged.span().size(), 0u);
 
-    const ::std::string merged_output =
-        FlatbufferToJson(fb_merged, ConfigurationTypeTable());
+    const ::std::string merged_output = FlatbufferToJson(fb_merged);
     EXPECT_EQ(expected_output, merged_output);
 
     aos::FlatbufferDetachedBuffer<Configuration> expected_message(
         JsonToFlatbuffer(std::string(expected_output).c_str(),
                          ConfigurationTypeTable()));
     EXPECT_TRUE(
-        CompareFlatBuffer(flatbuffers::GetRoot<Configuration>(fb_merged.data()),
-                          &expected_message.message()));
+        CompareFlatBuffer(&fb_merged.message(), &expected_message.message()));
   }
 
   void JsonMerge(const ::std::string in1, const ::std::string in2,
                  const ::std::string out) {
-    printf("Merging: %s\n", in1.c_str());
-    printf("Merging: %s\n", in2.c_str());
-    const flatbuffers::DetachedBuffer fb1 = JsonToFlatbuffer(
+    LOG(INFO) << "Merging: " << in1.c_str();
+    LOG(INFO) << "Merging: " << in2.c_str();
+    const FlatbufferDetachedBuffer<Configuration> fb1 = JsonToFlatbuffer(
         static_cast<const char *>(in1.c_str()), ConfigurationTypeTable());
 
     const ::std::string in1_nested = "{ \"nested_config\": " + in1 + " }";
-    const flatbuffers::DetachedBuffer fb1_nested =
+    const FlatbufferDetachedBuffer<Configuration> fb1_nested =
         JsonToFlatbuffer(static_cast<const char *>(in1_nested.c_str()),
                          ConfigurationTypeTable());
 
-    const flatbuffers::DetachedBuffer fb2 = JsonToFlatbuffer(
+    const FlatbufferDetachedBuffer<Configuration> fb2 = JsonToFlatbuffer(
         static_cast<const char *>(in2.c_str()), ConfigurationTypeTable());
 
     const ::std::string in2_nested = "{ \"nested_config\": " + in2 + " }";
-    const flatbuffers::DetachedBuffer fb2_nested =
+    const FlatbufferDetachedBuffer<Configuration> fb2_nested =
         JsonToFlatbuffer(static_cast<const char *>(in2_nested.c_str()),
                          ConfigurationTypeTable());
 
     const ::std::string out_nested = "{ \"nested_config\": " + out + " }";
 
-    const flatbuffers::DetachedBuffer empty =
+    const FlatbufferDetachedBuffer<Configuration> empty =
         JsonToFlatbuffer("{ }", ConfigurationTypeTable());
 
-    ASSERT_NE(fb1.size(), 0u);
-    ASSERT_NE(fb2.size(), 0u);
-    ASSERT_NE(fb1_nested.size(), 0u);
-    ASSERT_NE(fb2_nested.size(), 0u);
+    ASSERT_NE(fb1.span().size(), 0u);
+    ASSERT_NE(fb2.span().size(), 0u);
+    ASSERT_NE(fb1_nested.span().size(), 0u);
+    ASSERT_NE(fb2_nested.span().size(), 0u);
 
     // We now want to run 7 tests.
     //  in1 merged "" -> in1.
@@ -87,167 +83,154 @@
 
     {
       // in1 merged with "" => in1.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(fb1.data(), empty.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(fb1, empty);
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in1, FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in1, FlatbufferToJson(fb_merged));
     }
 
     {
       // in2 merged with "" => in2.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(fb2.data(), empty.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(fb2, empty);
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in2, FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in2, FlatbufferToJson(fb_merged));
     }
 
     {
       // "" merged with in1 => in1.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(empty.data(), fb1.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(empty, fb1);
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in1, FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in1, FlatbufferToJson(fb_merged));
     }
 
     {
       // "" merged with in2 => in2.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(empty.data(), fb2.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(empty, fb2);
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in2, FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in2, FlatbufferToJson(fb_merged));
     }
 
     {
       // nullptr merged with in1 => in1.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(nullptr, fb1.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(nullptr, &fb1.message());
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in1, FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in1, FlatbufferToJson(fb_merged));
     }
 
     {
       // in1 merged with nullptr => in1.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(fb1.data(), nullptr);
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(&fb1.message(), nullptr);
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in1, FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in1, FlatbufferToJson(fb_merged));
     }
 
     {
       // in1 merged with in2 => out.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(fb1.data(), fb2.data());
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(fb1, fb2);
 
       ExpectMergedOutput(fb_merged, out);
     }
 
     // Test all the different merge methods:
-    ExpectMergedOutput(
-        MergeFlatBuffers(ConfigurationTypeTable(), fb1.data(), fb2.data()),
-        out);
+    ExpectMergedOutput(MergeFlatBuffers<Configuration>(fb1, fb2), out);
     {
       flatbuffers::FlatBufferBuilder fbb;
       fbb.ForceDefaults(true);
       fbb.Finish(MergeFlatBuffers(
           ConfigurationTypeTable(),
-          flatbuffers::GetRoot<flatbuffers::Table>(fb1.data()),
-          flatbuffers::GetRoot<flatbuffers::Table>(fb2.data()), &fbb));
-      ExpectMergedOutput(fbb.Release(), out);
+          reinterpret_cast<const flatbuffers::Table *>(&fb1.message()),
+          reinterpret_cast<const flatbuffers::Table *>(&fb2.message()), &fbb));
+      FlatbufferDetachedBuffer<Configuration> fb(fbb.Release());
+      ExpectMergedOutput(fb, out);
     }
     {
       flatbuffers::FlatBufferBuilder fbb;
       fbb.ForceDefaults(true);
-      fbb.Finish(MergeFlatBuffers<Configuration>(
-          flatbuffers::GetRoot<flatbuffers::Table>(fb1.data()),
-          flatbuffers::GetRoot<flatbuffers::Table>(fb2.data()), &fbb));
-      ExpectMergedOutput(fbb.Release(), out);
+      fbb.Finish(MergeFlatBuffers<Configuration>(&fb1.message(), &fb2.message(),
+                                                 &fbb));
+      FlatbufferDetachedBuffer<Configuration> fb(fbb.Release());
+      ExpectMergedOutput(fb, out);
     }
-    ExpectMergedOutput(MergeFlatBuffers<Configuration>(fb1.data(), fb2.data()),
-                       out);
+    ExpectMergedOutput(
+        MergeFlatBuffers<Configuration>(&fb1.message(), &fb2.message()), out);
     ExpectMergedOutput(MergeFlatBuffers<Configuration>(fb1, fb2), out);
-    ExpectMergedOutput(MergeFlatBuffers<Configuration>(
-                           flatbuffers::GetRoot<Configuration>(fb1.data()),
-                           flatbuffers::GetRoot<Configuration>(fb2.data()))
-                           .buffer(),
-                       out);
 
     // Now, to make things extra exciting, nest a config inside a config.  And
     // run all the tests.  This will exercise some fun nested merges and copies.
 
     {
       // in1_nested merged with "" => in1.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(fb1_nested.data(), empty.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(fb1_nested, empty);
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in1_nested,
-                FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in1_nested, FlatbufferToJson(fb_merged));
     }
 
     {
       // in2_nested merged with "" => in2_nested.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(fb2_nested.data(), empty.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(fb2_nested, empty);
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in2_nested,
-                FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in2_nested, FlatbufferToJson(fb_merged));
     }
 
     {
       // "" merged with in1_nested => in1_nested.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(empty.data(), fb1_nested.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(empty, fb1_nested);
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in1_nested,
-                FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in1_nested, FlatbufferToJson(fb_merged));
     }
 
     {
       // "" merged with in2_nested => in2_nested.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(empty.data(), fb2_nested.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(empty, fb2_nested);
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in2_nested,
-                FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in2_nested, FlatbufferToJson(fb_merged));
     }
 
     {
       // nullptr merged with in1_nested => in1_nested.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(nullptr, fb1_nested.data());
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged =
+          MergeFlatBuffers<Configuration>(nullptr, &fb1_nested.message());
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in1_nested,
-                FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in1_nested, FlatbufferToJson(fb_merged));
     }
 
     {
       // nullptr merged with in1_nested => in1_nested.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(fb1_nested.data(), nullptr);
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged(
+          MergeFlatBuffers<Configuration>(&fb1_nested.message(), nullptr));
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(in1_nested,
-                FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(in1_nested, FlatbufferToJson(fb_merged));
     }
 
     {
       // in1_nested merged with in2_nested => out_nested.
-      flatbuffers::DetachedBuffer fb_merged =
-          MergeFlatBuffers<Configuration>(fb1_nested, fb2_nested);
-      ASSERT_NE(fb_merged.size(), 0u);
+      FlatbufferDetachedBuffer<Configuration> fb_merged(
+          MergeFlatBuffers<Configuration>(fb1_nested, fb2_nested));
+      ASSERT_NE(fb_merged.span().size(), 0u);
 
-      EXPECT_EQ(out_nested,
-                FlatbufferToJson(fb_merged, ConfigurationTypeTable()));
+      EXPECT_EQ(out_nested, FlatbufferToJson(fb_merged));
     }
 
     // TODO(austin): Try more flatbuffers...
@@ -258,19 +241,16 @@
 
       LOG(INFO) << "Copying " << in1 << " "
                 << absl::BytesToHexString(FromFbb(fb1)) << " at "
-                << reinterpret_cast<const void *>(fb1.data()) << " size "
-                << fb1.size();
+                << reinterpret_cast<const void *>(fb1.span().data()) << " size "
+                << fb1.span().size();
       aos_flatbuffer_copy_fbb.Finish(CopyFlatBuffer<Configuration>(
-          flatbuffers::GetRoot<Configuration>(fb1.data()),
-          &aos_flatbuffer_copy_fbb));
+          &fb1.message(), &aos_flatbuffer_copy_fbb));
 
-      aos::FlatbufferDetachedBuffer<Configuration> fb_copy(
+      const aos::FlatbufferDetachedBuffer<Configuration> fb_copy(
           aos_flatbuffer_copy_fbb.Release());
-      ASSERT_NE(fb_copy.size(), 0u);
+      ASSERT_NE(fb_copy.span().size(), 0u);
 
-      flatbuffers::Verifier v(fb1.data(), fb1.size());
-      EXPECT_TRUE(
-          v.VerifyTable(flatbuffers::GetRoot<Configuration>(fb1.data())));
+      EXPECT_TRUE(fb1.Verify());
 
       ASSERT_TRUE(fb_copy.Verify()) << in1;
 
@@ -282,27 +262,24 @@
       aos_flatbuffer_copy_message_ptr_fbb.ForceDefaults(true);
 
       aos_flatbuffer_copy_message_ptr_fbb.Finish(CopyFlatBuffer<Configuration>(
-          flatbuffers::GetRoot<Configuration>(fb2.data()),
-          &aos_flatbuffer_copy_message_ptr_fbb));
+          &fb2.message(), &aos_flatbuffer_copy_message_ptr_fbb));
 
       aos::FlatbufferDetachedBuffer<Configuration> fb_copy_message_ptr(
           aos_flatbuffer_copy_message_ptr_fbb.Release());
-      ASSERT_NE(fb_copy_message_ptr.size(), 0u);
+      ASSERT_NE(fb_copy_message_ptr.span().size(), 0u);
 
       flatbuffers::FlatBufferBuilder aos_flatbuffer_copy_full_fbb;
       aos_flatbuffer_copy_full_fbb.ForceDefaults(true);
 
       aos_flatbuffer_copy_full_fbb.Finish(BlindCopyFlatBuffer<Configuration>(
-          aos::FlatbufferSpan<Configuration>(ToSpan(fb2)),
+          aos::FlatbufferSpan<Configuration>(fb2.span()),
           &aos_flatbuffer_copy_full_fbb));
 
       aos::FlatbufferDetachedBuffer<Configuration> fb_copy_full(
           aos_flatbuffer_copy_full_fbb.Release());
-      ASSERT_NE(fb_copy_full.size(), 0u);
+      ASSERT_NE(fb_copy_full.span().size(), 0u);
 
-      flatbuffers::Verifier v(fb2.data(), fb2.size());
-      EXPECT_TRUE(
-          v.VerifyTable(flatbuffers::GetRoot<Configuration>(fb2.data())));
+      EXPECT_TRUE(fb2.Verify());
 
       LOG(INFO) << "Verifying copy of " << in2;
       ASSERT_TRUE(fb_copy_message_ptr.Verify()) << in2;
@@ -318,12 +295,11 @@
       aos_flatbuffer_copy_fbb.ForceDefaults(true);
 
       aos_flatbuffer_copy_fbb.Finish(CopyFlatBuffer<Configuration>(
-          flatbuffers::GetRoot<Configuration>(fb1_nested.data()),
-          &aos_flatbuffer_copy_fbb));
+          &fb1_nested.message(), &aos_flatbuffer_copy_fbb));
 
       aos::FlatbufferDetachedBuffer<Configuration> fb_copy(
           aos_flatbuffer_copy_fbb.Release());
-      ASSERT_NE(fb_copy.size(), 0u);
+      ASSERT_NE(fb_copy.span().size(), 0u);
 
       ASSERT_TRUE(fb_copy.Verify());
 
@@ -335,12 +311,11 @@
       aos_flatbuffer_copy_fbb.ForceDefaults(true);
 
       aos_flatbuffer_copy_fbb.Finish(CopyFlatBuffer<Configuration>(
-          flatbuffers::GetRoot<Configuration>(fb2_nested.data()),
-          &aos_flatbuffer_copy_fbb));
+          &fb2_nested.message(), &aos_flatbuffer_copy_fbb));
 
       aos::FlatbufferDetachedBuffer<Configuration> fb_copy(
           aos_flatbuffer_copy_fbb.Release());
-      ASSERT_NE(fb_copy.size(), 0u);
+      ASSERT_NE(fb_copy.span().size(), 0u);
 
       ASSERT_TRUE(fb_copy.Verify());
 
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 21dd30c..efb4f0e 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -108,55 +108,66 @@
   }
 
   // Returns a message from the buffer.
-  const T &message() const {
-    return *flatbuffers::GetRoot<T>(reinterpret_cast<const void *>(data()));
-  }
+  virtual const T &message() const = 0;
   // Returns a mutable message.  It can be mutated via the flatbuffer rules.
-  T *mutable_message() {
-    return flatbuffers::GetMutableRoot<T>(reinterpret_cast<void *>(data()));
-  }
-
-  virtual const uint8_t *data() const = 0;
-  virtual uint8_t *data() = 0;
-  virtual size_t size() const = 0;
-
-  absl::Span<uint8_t> span() { return absl::Span<uint8_t>(data(), size()); }
-  absl::Span<const uint8_t> span() const {
-    return absl::Span<const uint8_t>(data(), size());
-  }
+  virtual T *mutable_message() = 0;
 
   // Wipes out the data buffer. This is handy to mark an instance as freed, and
   // make attempts to use it fail more obviously.
-  void Wipe() { memset(data(), 0, size()); }
+  void Wipe() { memset(span().data(), 0, span().size()); }
 
-  virtual bool Verify() const {
-    flatbuffers::Verifier v(data(), size());
+  bool Verify() const {
+    flatbuffers::Verifier v(span().data(), span().size());
     return v.VerifyTable(&message());
   }
+
+ protected:
+  virtual absl::Span<uint8_t> span() = 0;
+  virtual absl::Span<const uint8_t> span() const = 0;
+};
+
+// Base class for non-size prefixed flatbuffers.  span() means different things
+// across the 2 types, so you end up with a different GetRoot.
+template <typename T>
+class NonSizePrefixedFlatbuffer : public Flatbuffer<T> {
+ public:
+  const T &message() const override {
+    return *flatbuffers::GetRoot<T>(
+        reinterpret_cast<const void *>(this->span().data()));
+  }
+  T *mutable_message() override {
+    return flatbuffers::GetMutableRoot<T>(
+        reinterpret_cast<void *>(this->span().data()));
+  }
+
+  absl::Span<uint8_t> span() override = 0;
+  absl::Span<const uint8_t> span() const override = 0;
 };
 
 // Non-owning Span backed flatbuffer.
 template <typename T>
-class FlatbufferSpan : public Flatbuffer<T> {
+class FlatbufferSpan : public NonSizePrefixedFlatbuffer<T> {
  public:
   // Builds a flatbuffer pointing to the contents of a span.
   FlatbufferSpan(const absl::Span<const uint8_t> data) : data_(data) {}
   // Builds a Flatbuffer pointing to the contents of another flatbuffer.
-  FlatbufferSpan(const Flatbuffer<T> &other) { data_ = other.span(); }
+  FlatbufferSpan(const NonSizePrefixedFlatbuffer<T> &other) {
+    data_ = other.span();
+  }
 
   // Copies the data from the other flatbuffer.
-  FlatbufferSpan &operator=(const Flatbuffer<T> &other) {
+  FlatbufferSpan &operator=(const NonSizePrefixedFlatbuffer<T> &other) {
     data_ = other.span();
     return *this;
   }
 
   virtual ~FlatbufferSpan() override {}
 
-  const uint8_t *data() const override {
-    return data_.data();
+  absl::Span<uint8_t> span() override {
+    LOG(FATAL) << "Unimplemented";
+    return absl::Span<uint8_t>(nullptr, 0);
   }
-  uint8_t *data() override { return CHECK_NOTNULL(nullptr); }
-  size_t size() const override { return data_.size(); }
+  absl::Span<const uint8_t> span() const override { return data_; }
 
  private:
   absl::Span<const uint8_t> data_;
@@ -164,45 +175,49 @@
 
 // String backed flatbuffer.
 template <typename T>
-class FlatbufferString : public Flatbuffer<T> {
+class FlatbufferString : public NonSizePrefixedFlatbuffer<T> {
  public:
   // Builds a flatbuffer using the contents of the string.
   FlatbufferString(const std::string_view data) : data_(data) {}
   // Builds a Flatbuffer by copying the data from the other flatbuffer.
-  FlatbufferString(const Flatbuffer<T> &other) {
-    data_ =
-        std::string(reinterpret_cast<const char *>(other.data()), other.size());
+  FlatbufferString(const NonSizePrefixedFlatbuffer<T> &other) {
+    absl::Span<const uint8_t> d = other.span();
+    data_ = std::string(reinterpret_cast<const char *>(d.data()), d.size());
   }
 
   // Copies the data from the other flatbuffer.
-  FlatbufferString &operator=(const Flatbuffer<T> &other) {
-    data_ = std::string(other.data(), other.size());
+  FlatbufferString &operator=(const NonSizePrefixedFlatbuffer<T> &other) {
+    absl::Span<const uint8_t> d = other.span();
+    data_ = std::string(reinterpret_cast<const char *>(d.data()), d.size());
     return *this;
   }
 
   virtual ~FlatbufferString() override {}
 
-  const uint8_t *data() const override {
-    return reinterpret_cast<const uint8_t *>(data_.data());
+  absl::Span<uint8_t> span() override {
+    return absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(data_.data()),
+                               data_.size());
   }
-  uint8_t *data() override { return reinterpret_cast<uint8_t *>(data_.data()); }
-  size_t size() const override { return data_.size(); }
+  absl::Span<const uint8_t> span() const override {
+    return absl::Span<const uint8_t>(
+        reinterpret_cast<const uint8_t *>(data_.data()), data_.size());
+  }
 
  private:
   std::string data_;
 };
 
-// Vector backed flatbuffer.
+// ResizeableBuffer backed flatbuffer.
 template <typename T>
-class FlatbufferVector : public Flatbuffer<T> {
+class FlatbufferVector : public NonSizePrefixedFlatbuffer<T> {
  public:
-  // Builds a Flatbuffer around a vector.
+  // Builds a Flatbuffer around a ResizeableBuffer.
   FlatbufferVector(ResizeableBuffer &&data) : data_(std::move(data)) {}
 
   // Builds a Flatbuffer by copying the data from the other flatbuffer.
-  FlatbufferVector(const Flatbuffer<T> &other) {
-    data_.resize(other.size());
-    memcpy(data_.data(), other.data(), data_.size());
+  FlatbufferVector(const NonSizePrefixedFlatbuffer<T> &other) {
+    data_.resize(other.span().size());
+    memcpy(data_.data(), other.span().data(), data_.size());
   }
 
   // Copy constructor.
@@ -229,9 +244,12 @@
 
   virtual ~FlatbufferVector() override {}
 
-  const uint8_t *data() const override { return data_.data(); }
-  uint8_t *data() override { return data_.data(); }
-  size_t size() const override { return data_.size(); }
+  absl::Span<uint8_t> span() override {
+    return absl::Span<uint8_t>(data_.data(), data_.size());
+  }
+  absl::Span<const uint8_t> span() const override {
+    return absl::Span<const uint8_t>(data_.data(), data_.size());
+  }
 
  private:
   ResizeableBuffer data_;
@@ -243,7 +261,7 @@
 // From a usage point of view, pointers to the data are very different than
 // pointers to the tables.
 template <typename T>
-class FlatbufferDetachedBuffer final : public Flatbuffer<T> {
+class FlatbufferDetachedBuffer final : public NonSizePrefixedFlatbuffer<T> {
  public:
   // Builds a Flatbuffer by taking ownership of the buffer.
   FlatbufferDetachedBuffer(flatbuffers::DetachedBuffer &&buffer)
@@ -271,9 +289,13 @@
 
   // Returns references to the buffer, and the data.
   const flatbuffers::DetachedBuffer &buffer() const { return buffer_; }
-  const uint8_t *data() const override { return buffer_.data(); }
-  uint8_t *data() override { return buffer_.data(); }
-  size_t size() const override { return buffer_.size(); }
+
+  absl::Span<uint8_t> span() override {
+    return absl::Span<uint8_t>(buffer_.data(), buffer_.size());
+  }
+  absl::Span<const uint8_t> span() const override {
+    return absl::Span<const uint8_t>(buffer_.data(), buffer_.size());
+  }
 
  private:
   flatbuffers::DetachedBuffer buffer_;
@@ -281,14 +303,15 @@
 
 // Array backed flatbuffer which manages building of the flatbuffer.
 template <typename T, size_t Size>
-class FlatbufferFixedAllocatorArray final : public Flatbuffer<T> {
+class FlatbufferFixedAllocatorArray final
+    : public NonSizePrefixedFlatbuffer<T> {
  public:
   FlatbufferFixedAllocatorArray() : buffer_(), allocator_(&buffer_[0], Size) {}
 
   FlatbufferFixedAllocatorArray(const FlatbufferFixedAllocatorArray &) = delete;
-  void operator=(const Flatbuffer<T> &) = delete;
+  void operator=(const NonSizePrefixedFlatbuffer<T> &) = delete;
 
-  void CopyFrom(const Flatbuffer<T> &other) {
+  void CopyFrom(const NonSizePrefixedFlatbuffer<T> &other) {
     CHECK(!allocator_.is_allocated()) << ": May not overwrite while building";
     memcpy(buffer_.begin(), other.data(), other.size());
     data_ = buffer_.begin();
@@ -320,15 +343,12 @@
     DCHECK_LE(size_, Size);
   }
 
-  const uint8_t *data() const override {
-    CHECK_NOTNULL(data_);
-    return data_;
+  absl::Span<uint8_t> span() override {
+    return absl::Span<uint8_t>(data_, size_);
   }
-  uint8_t *data() override {
-    CHECK_NOTNULL(data_);
-    return data_;
+  absl::Span<const uint8_t> span() const override {
+    return absl::Span<const uint8_t>(data_, size_);
   }
-  size_t size() const override { return size_; }
 
  private:
   std::array<uint8_t, Size> buffer_;
@@ -338,13 +358,31 @@
   size_t size_ = 0;
 };
 
+template <typename T>
+class SizePrefixedFlatbuffer : public Flatbuffer<T> {
+ public:
+  const T &message() const override {
+    return *flatbuffers::GetSizePrefixedRoot<T>(
+        reinterpret_cast<const void *>(this->span().data()));
+  }
+
+  T *mutable_message() override {
+    return flatbuffers::GetMutableSizePrefixedRoot<T>(
+        reinterpret_cast<void *>(this->span().data()));
+  }
+
+  absl::Span<uint8_t> span() override = 0;
+  absl::Span<const uint8_t> span() const override = 0;
+};
+
 // This object associates the message type with the memory storing the
 // flatbuffer.  This only stores root tables.
 //
 // From a usage point of view, pointers to the data are very different than
 // pointers to the tables.
 template <typename T>
-class SizePrefixedFlatbufferDetachedBuffer final : public Flatbuffer<T> {
+class SizePrefixedFlatbufferDetachedBuffer final
+    : public SizePrefixedFlatbuffer<T> {
  public:
   // Builds a Flatbuffer by taking ownership of the buffer.
   SizePrefixedFlatbufferDetachedBuffer(flatbuffers::DetachedBuffer &&buffer)
@@ -374,35 +412,69 @@
   }
 
   // Returns references to the buffer, and the data.
-  const flatbuffers::DetachedBuffer &buffer() const { return buffer_; }
-  const uint8_t *data() const override {
-    return buffer_.data() + sizeof(flatbuffers::uoffset_t);
-  }
-  uint8_t *data() override {
-    return buffer_.data() + sizeof(flatbuffers::uoffset_t);
-  }
-  size_t size() const override {
-    return buffer_.size() - sizeof(flatbuffers::uoffset_t);
-  }
-
-  absl::Span<uint8_t> full_span() {
+  absl::Span<uint8_t> span() override {
     return absl::Span<uint8_t>(buffer_.data(), buffer_.size());
   }
-  absl::Span<const uint8_t> full_span() const {
+  absl::Span<const uint8_t> span() const override {
     return absl::Span<const uint8_t>(buffer_.data(), buffer_.size());
   }
 
-  bool Verify() const override {
-    // TODO(austin): Should we push full_span up to Flatbuffer<> class?
-    // The base pointer has the wrong alignment if we strip off the size.
-    flatbuffers::Verifier v(full_span().data(), full_span().size());
-    return v.VerifyTable(&this->message());
-  }
-
  private:
   flatbuffers::DetachedBuffer buffer_;
 };
 
+// ResizeableBuffer backed flatbuffer.
+template <typename T>
+class SizePrefixedFlatbufferVector : public SizePrefixedFlatbuffer<T> {
+ public:
+  // Builds a Flatbuffer around a ResizeableBuffer.
+  SizePrefixedFlatbufferVector(ResizeableBuffer &&data)
+      : data_(std::move(data)) {}
+
+  // Builds a Flatbuffer by copying the data from the other flatbuffer.
+  SizePrefixedFlatbufferVector(const SizePrefixedFlatbuffer<T> &other) {
+    data_.resize(other.span().size());
+    memcpy(data_.data(), other.span().data(), data_.size());
+  }
+
+  // Copy constructor.
+  SizePrefixedFlatbufferVector(const SizePrefixedFlatbufferVector<T> &other)
+      : data_(other.data_) {}
+
+  // Move constructor.
+  SizePrefixedFlatbufferVector(SizePrefixedFlatbufferVector<T> &&other)
+      : data_(std::move(other.data_)) {}
+
+  // Copies the data from the other flatbuffer.
+  SizePrefixedFlatbufferVector &operator=(
+      const SizePrefixedFlatbufferVector<T> &other) {
+    data_ = other.data_;
+    return *this;
+  }
+  SizePrefixedFlatbufferVector &operator=(
+      SizePrefixedFlatbufferVector<T> &&other) {
+    data_ = std::move(other.data_);
+    return *this;
+  }
+
+  // Constructs an empty flatbuffer of type T.
+  static SizePrefixedFlatbufferVector<T> Empty() {
+    return SizePrefixedFlatbufferVector<T>(ResizeableBuffer());
+  }
+
+  virtual ~SizePrefixedFlatbufferVector() override {}
+
+  absl::Span<uint8_t> span() override {
+    return absl::Span<uint8_t>(data_.data(), data_.size());
+  }
+  absl::Span<const uint8_t> span() const override {
+    return absl::Span<const uint8_t>(data_.data(), data_.size());
+  }
+
+ private:
+  ResizeableBuffer data_;
+};
+
 inline flatbuffers::DetachedBuffer CopySpanAsDetachedBuffer(
     absl::Span<const uint8_t> span) {
   // Copy the data from the span.
diff --git a/aos/json_to_flatbuffer.cc b/aos/json_to_flatbuffer.cc
index 9980328..d78fe99 100644
--- a/aos/json_to_flatbuffer.cc
+++ b/aos/json_to_flatbuffer.cc
@@ -788,19 +788,6 @@
   }
 }
 
-::std::string BufferFlatbufferToJson(const uint8_t *buffer,
-                                     const ::flatbuffers::TypeTable *typetable,
-                                     JsonOptions json_options) {
-  // It is pretty common to get passed in a nullptr when a test fails.  Rather
-  // than CHECK, return a more user friendly result.
-  if (buffer == nullptr) {
-    return "null";
-  }
-  return TableFlatbufferToJson(reinterpret_cast<const flatbuffers::Table *>(
-                                   flatbuffers::GetRoot<uint8_t>(buffer)),
-                               typetable, json_options);
-}
-
 namespace {
 
 // A visitor which manages skipping the contents of vectors that are longer than
diff --git a/aos/json_to_flatbuffer.h b/aos/json_to_flatbuffer.h
index c997f03..c1aa736 100644
--- a/aos/json_to_flatbuffer.h
+++ b/aos/json_to_flatbuffer.h
@@ -43,29 +43,18 @@
 };
 
 // Converts a flatbuffer into a Json string.
-// The methods below are generally more useful than BufferFlatbufferToJson and
-// TableFlatbufferToJson.
-::std::string BufferFlatbufferToJson(const uint8_t *buffer,
-                                     const flatbuffers::TypeTable *typetable,
-                                     JsonOptions json_options = {});
-
+// The methods below are generally more useful than TableFlatbufferToJson.
 ::std::string TableFlatbufferToJson(const flatbuffers::Table *t,
                                     const ::flatbuffers::TypeTable *typetable,
                                     JsonOptions json_options = {});
 
-// Converts a DetachedBuffer holding a flatbuffer to JSON.
-inline ::std::string FlatbufferToJson(const flatbuffers::DetachedBuffer &buffer,
-                                      const flatbuffers::TypeTable *typetable,
-                                      JsonOptions json_options = {}) {
-  return BufferFlatbufferToJson(buffer.data(), typetable, json_options);
-}
-
 // Converts a Flatbuffer<T> holding a flatbuffer to JSON.
 template <typename T>
 inline ::std::string FlatbufferToJson(const Flatbuffer<T> &flatbuffer,
                                       JsonOptions json_options = {}) {
-  return BufferFlatbufferToJson(
-      flatbuffer.data(), Flatbuffer<T>::MiniReflectTypeTable(), json_options);
+  return TableFlatbufferToJson(
+      reinterpret_cast<const flatbuffers::Table *>(&flatbuffer.message()),
+      Flatbuffer<T>::MiniReflectTypeTable(), json_options);
 }
 
 // Converts a flatbuffer::Table to JSON.
@@ -97,10 +86,10 @@
   json_file.close();
 }
 
-// Writes a Flatbuffer to a binary file, or dies.
+// Writes a NonSizePrefixedFlatbuffer to a binary file, or dies.
 template <typename T>
 inline void WriteFlatbufferToFile(const std::string_view filename,
-                                  const Flatbuffer<T> &msg) {
+                                  const NonSizePrefixedFlatbuffer<T> &msg) {
   std::ofstream file(std::string(filename),
                      std::ios::out | std::ofstream::binary);
   CHECK(file) << ": Couldn't open " << filename;
diff --git a/aos/json_to_flatbuffer_test.cc b/aos/json_to_flatbuffer_test.cc
index 80dec7c..4379ae9 100644
--- a/aos/json_to_flatbuffer_test.cc
+++ b/aos/json_to_flatbuffer_test.cc
@@ -16,14 +16,14 @@
 
   bool JsonAndBack(const ::std::string in, const ::std::string out) {
     printf("Testing: %s\n", in.c_str());
-    const flatbuffers::DetachedBuffer fb =
-        JsonToFlatbuffer(in.data(), ConfigurationTypeTable());
+    FlatbufferDetachedBuffer<Configuration> fb =
+        JsonToFlatbuffer<Configuration>(in.data());
 
-    if (fb.size() == 0) {
+    if (fb.span().size() == 0) {
       return false;
     }
 
-    const ::std::string back = FlatbufferToJson(fb, ConfigurationTypeTable());
+    const ::std::string back = FlatbufferToJson(fb);
 
     printf("Back to string: %s\n", back.c_str());
 
@@ -215,17 +215,17 @@
   json_short += " ] }";
   json_long += ", 101 ] }";
 
-  const flatbuffers::DetachedBuffer fb_short =
-      JsonToFlatbuffer(json_short.data(), ConfigurationTypeTable());
-  ASSERT_GT(fb_short.size(), 0);
-  const flatbuffers::DetachedBuffer fb_long =
-      JsonToFlatbuffer(json_long.data(), ConfigurationTypeTable());
-  ASSERT_GT(fb_long.size(), 0);
+  const FlatbufferDetachedBuffer<Configuration> fb_short(
+      JsonToFlatbuffer<Configuration>(json_short));
+  ASSERT_GT(fb_short.span().size(), 0);
+  const FlatbufferDetachedBuffer<Configuration> fb_long(
+      JsonToFlatbuffer<Configuration>(json_long));
+  ASSERT_GT(fb_long.span().size(), 0);
 
-  const std::string back_json_short = FlatbufferToJson(
-      fb_short, ConfigurationTypeTable(), {.multi_line = false, .max_vector_size = 100});
-  const std::string back_json_long = FlatbufferToJson(
-      fb_long, ConfigurationTypeTable(), {.multi_line = false, .max_vector_size = 100});
+  const std::string back_json_short = FlatbufferToJson<Configuration>(
+      fb_short, {.multi_line = false, .max_vector_size = 100});
+  const std::string back_json_long = FlatbufferToJson<Configuration>(
+      fb_long, {.multi_line = false, .max_vector_size = 100});
 
   EXPECT_EQ(json_short, back_json_short);
   EXPECT_EQ("{ \"vector_foo_int\": [ ... 101 elements ... ] }", back_json_long);
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index bc05835..e5c18b6 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -123,7 +123,7 @@
   event_loop_->OnRun(
       [this]() { connect_timer_->Setup(event_loop_->monotonic_now()); });
 
-  int max_size = connect_message_.size();
+  int max_size = connect_message_.span().size();
 
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
     CHECK(channel->has_source_node());
@@ -185,9 +185,9 @@
 void SctpClientConnection::SendConnect() {
   // Try to send the connect message.  If that fails, retry.
   if (!client_.Send(kConnectStream(),
-                    std::string_view(
-                        reinterpret_cast<const char *>(connect_message_.data()),
-                        connect_message_.size()),
+                    std::string_view(reinterpret_cast<const char *>(
+                                         connect_message_.span().data()),
+                                     connect_message_.span().size()),
                     0)) {
     NodeDisconnected();
   }
@@ -271,8 +271,8 @@
     // guarentee that this ack gets received too...  Same path as the logger.
     client_.Send(kTimestampStream(),
                  std::string_view(reinterpret_cast<const char *>(
-                                      message_reception_reply_.data()),
-                                  message_reception_reply_.size()),
+                                      message_reception_reply_.span().data()),
+                                  message_reception_reply_.span().size()),
                  0);
   }
 
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index b80547d..50ac97e 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -228,6 +228,7 @@
                            configuration::GetNode(event_loop->configuration(),
                                                   destination_node_name),
                            event_loop->node()->name()->string_view())
+            .span()
             .size());
     VLOG(1) << "Connection to " << destination_node_name << " has size "
             << connect_size;
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 0f5546c..1f2de0b 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -319,8 +319,8 @@
   context.monotonic_event_time = timestamp_sender_.monotonic_sent_time();
   context.realtime_event_time = timestamp_sender_.realtime_sent_time();
   context.queue_index = timestamp_sender_.sent_queue_index();
-  context.size = timestamp_copy.size();
-  context.data = timestamp_copy.data();
+  context.size = timestamp_copy.span().size();
+  context.data = timestamp_copy.span().data();
 
   // Since we are building up the timestamp to send here, we need to trigger the
   // SendData call ourselves.
diff --git a/frc971/analysis/py_log_reader.cc b/frc971/analysis/py_log_reader.cc
index 08f3707..e8843a9 100644
--- a/frc971/analysis/py_log_reader.cc
+++ b/frc971/analysis/py_log_reader.cc
@@ -218,7 +218,8 @@
       aos::CopyFlatBuffer(tools->reader->configuration());
 
   return PyBytes_FromStringAndSize(
-      reinterpret_cast<const char *>(buffer.data()), buffer.size());
+      reinterpret_cast<const char *>(buffer.span().data()),
+      buffer.span().size());
 }
 
 static PyMethodDef LogReader_methods[] = {
diff --git a/third_party/flatbuffers/include/flatbuffers/flatbuffers.h b/third_party/flatbuffers/include/flatbuffers/flatbuffers.h
index 5fd610b..e903e43 100644
--- a/third_party/flatbuffers/include/flatbuffers/flatbuffers.h
+++ b/third_party/flatbuffers/include/flatbuffers/flatbuffers.h
@@ -2048,8 +2048,13 @@
   return GetMutableRoot<T>(const_cast<void *>(buf));
 }
 
+template<typename T> T *GetMutableSizePrefixedRoot(void *buf) {
+  return GetMutableRoot<T>(reinterpret_cast<uint8_t *>(buf) +
+                           sizeof(uoffset_t));
+}
+
 template<typename T> const T *GetSizePrefixedRoot(const void *buf) {
-  return GetRoot<T>(reinterpret_cast<const uint8_t *>(buf) + sizeof(uoffset_t));
+  return GetMutableSizePrefixedRoot<T>(const_cast<void *>(buf));
 }
 
 /// Helpers to get a typed pointer to objects that are currently being built.