Stop stripping the size prefix off

This turns out to be super dangerous to do.  A flatbuffer is aligned
assuming that the size is either there or not there.  By removing it,
you break alignment.

This necesitates having 2 subclasses of Flatbuffer.  A SizePrefixed
version and a non size prefixed version.  That lets us distinguish for
methods which care.

Once all that's done, deal with the fallout through the code base,
including logfile_utils and the chaos that causes rippling out.

Change-Id: I91b7be355279a1c19e5c956c33359df01a17eacf
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index d906997..330c78e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -348,7 +348,7 @@
   return true;
 }
 
-std::optional<FlatbufferVector<LogFileHeader>> ReadHeader(
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
     std::string_view filename) {
   SpanReader span_reader(filename);
   absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
@@ -360,13 +360,12 @@
 
   // And copy the config so we have it forever, removing the size prefix.
   ResizeableBuffer data;
-  data.resize(config_data.size() - sizeof(flatbuffers::uoffset_t));
-  memcpy(data.data(), config_data.begin() + sizeof(flatbuffers::uoffset_t),
-         data.size());
-  return FlatbufferVector<LogFileHeader>(std::move(data));
+  data.resize(config_data.size());
+  memcpy(data.data(), config_data.begin(), data.size());
+  return SizePrefixedFlatbufferVector<LogFileHeader>(std::move(data));
 }
 
-std::optional<FlatbufferVector<MessageHeader>> ReadNthMessage(
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
     std::string_view filename, size_t n) {
   SpanReader span_reader(filename);
   absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
@@ -381,15 +380,15 @@
 
   // And copy the config so we have it forever, removing the size prefix.
   ResizeableBuffer data;
-  data.resize(data_span.size() - sizeof(flatbuffers::uoffset_t));
-  memcpy(data.data(), data_span.begin() + sizeof(flatbuffers::uoffset_t),
-         data.size());
-  return FlatbufferVector<MessageHeader>(std::move(data));
+  data.resize(data_span.size());
+  memcpy(data.data(), data_span.begin(), data.size());
+  return SizePrefixedFlatbufferVector<MessageHeader>(std::move(data));
 }
 
 MessageReader::MessageReader(std::string_view filename)
     : span_reader_(filename),
-      raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
+      raw_log_file_header_(
+          SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
   // Make sure we have enough to read the size.
   absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
 
@@ -399,12 +398,10 @@
 
   // And copy the header data so we have it forever.
   ResizeableBuffer header_data_copy;
-  header_data_copy.resize(header_data.size() - sizeof(flatbuffers::uoffset_t));
-  memcpy(header_data_copy.data(),
-         header_data.begin() + sizeof(flatbuffers::uoffset_t),
-         header_data_copy.size());
+  header_data_copy.resize(header_data.size());
+  memcpy(header_data_copy.data(), header_data.begin(), header_data_copy.size());
   raw_log_file_header_ =
-      FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
+      SizePrefixedFlatbufferVector<LogFileHeader>(std::move(header_data_copy));
 
   max_out_of_order_duration_ =
       chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
@@ -413,18 +410,17 @@
           << FlatbufferToJson(log_file_header()->node());
 }
 
-std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
+MessageReader::ReadMessage() {
   absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
   if (msg_data == absl::Span<const uint8_t>()) {
     return std::nullopt;
   }
 
   ResizeableBuffer result_buffer;
-  result_buffer.resize(msg_data.size() - sizeof(flatbuffers::uoffset_t));
-  memcpy(result_buffer.data(),
-         msg_data.begin() + sizeof(flatbuffers::uoffset_t),
-         result_buffer.size());
-  FlatbufferVector<MessageHeader> result(std::move(result_buffer));
+  result_buffer.resize(msg_data.size());
+  memcpy(result_buffer.data(), msg_data.begin(), result_buffer.size());
+  SizePrefixedFlatbufferVector<MessageHeader> result(std::move(result_buffer));
 
   const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
       chrono::nanoseconds(result.message().monotonic_sent_time()));
@@ -437,10 +433,10 @@
 PartsMessageReader::PartsMessageReader(LogParts log_parts)
     : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
 
-std::optional<FlatbufferVector<MessageHeader>>
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
 PartsMessageReader::ReadMessage() {
   while (!done_) {
-    std::optional<FlatbufferVector<MessageHeader>> message =
+    std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
         message_reader_.ReadMessage();
     if (message) {
       newest_timestamp_ = message_reader_.newest_timestamp();
@@ -468,7 +464,7 @@
 SplitMessageReader::SplitMessageReader(
     const std::vector<std::string> &filenames)
     : filenames_(filenames),
-      log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
+      log_file_header_(SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
   CHECK(NextLogFile()) << ": filenames is empty.  Need files to read.";
 
   // Grab any log file header.  They should all match (and we will check as we
@@ -674,7 +670,7 @@
       return true;
     }
 
-    if (std::optional<FlatbufferVector<MessageHeader>> msg =
+    if (std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
             message_reader_->ReadMessage()) {
       const MessageHeader &header = msg.value().message();
 
@@ -687,7 +683,7 @@
                   << newest_timestamp() << " start time "
                   << monotonic_start_time() << " " << FlatbufferToJson(&header);
       } else if (VLOG_IS_ON(1)) {
-        FlatbufferVector<MessageHeader> copy = msg.value();
+        SizePrefixedFlatbufferVector<MessageHeader> copy = msg.value();
         copy.mutable_message()->clear_data();
         LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
                   << filename() << " ttq: " << time_to_queue_ << " now "
@@ -777,12 +773,12 @@
 }
 
 std::tuple<monotonic_clock::time_point, uint32_t,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 SplitMessageReader::PopOldest(int channel_index) {
   CHECK_GT(channels_[channel_index].data.size(), 0u);
   const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
       timestamp = channels_[channel_index].data.front_timestamp();
-  FlatbufferVector<MessageHeader> front =
+  SizePrefixedFlatbufferVector<MessageHeader> front =
       std::move(channels_[channel_index].data.front());
   channels_[channel_index].data.PopFront();
 
@@ -799,12 +795,12 @@
 }
 
 std::tuple<monotonic_clock::time_point, uint32_t,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
   CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
   const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
       timestamp = channels_[channel].timestamps[node_index].front_timestamp();
-  FlatbufferVector<MessageHeader> front =
+  SizePrefixedFlatbufferVector<MessageHeader> front =
       std::move(channels_[channel].timestamps[node_index].front());
   channels_[channel].timestamps[node_index].PopFront();
 
@@ -823,7 +819,7 @@
 }
 
 bool SplitMessageReader::MessageHeaderQueue::emplace_back(
-    FlatbufferVector<MessageHeader> &&msg) {
+    SizePrefixedFlatbufferVector<MessageHeader> &&msg) {
   CHECK(split_reader != nullptr);
 
   // If there is no timestamp merger for this queue, nobody is listening.  Drop
@@ -1021,7 +1017,7 @@
 }
 
 std::tuple<monotonic_clock::time_point, uint32_t,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 TimestampMerger::PopMessageHeap() {
   // Pop the oldest message reader pointer off the heap.
   CHECK_GT(message_heap_.size(), 0u);
@@ -1035,7 +1031,7 @@
   // Pop the oldest message.  This re-pushes any messages from the reader to the
   // message heap.
   std::tuple<monotonic_clock::time_point, uint32_t,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
       oldest_message =
           std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
 
@@ -1063,7 +1059,7 @@
       // Pop the next oldest message.  This re-pushes any messages from the
       // reader.
       std::tuple<monotonic_clock::time_point, uint32_t,
-                 FlatbufferVector<MessageHeader>>
+                 SizePrefixedFlatbufferVector<MessageHeader>>
           next_oldest_message = std::get<2>(next_oldest_message_reader)
                                     ->PopOldest(channel_index_);
 
@@ -1080,7 +1076,7 @@
 }
 
 std::tuple<monotonic_clock::time_point, uint32_t,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 TimestampMerger::PopTimestampHeap() {
   // Pop the oldest message reader pointer off the heap.
   CHECK_GT(timestamp_heap_.size(), 0u);
@@ -1097,7 +1093,7 @@
   // Pop the oldest message.  This re-pushes any timestamps from the reader to
   // the timestamp heap.
   std::tuple<monotonic_clock::time_point, uint32_t,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
       oldest_timestamp = std::get<2>(oldest_timestamp_reader)
                              ->PopOldestTimestamp(channel_index_, node_index_);
 
@@ -1128,7 +1124,7 @@
       // Pop the next oldest timestamp.  This re-pushes any messages from the
       // reader.
       std::tuple<monotonic_clock::time_point, uint32_t,
-                 FlatbufferVector<MessageHeader>>
+                 SizePrefixedFlatbufferVector<MessageHeader>>
           next_oldest_timestamp =
               std::get<2>(next_oldest_timestamp_reader)
                   ->PopOldestTimestamp(channel_index_, node_index_);
@@ -1157,7 +1153,8 @@
   return oldest_timestamp;
 }
 
-std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
+std::tuple<TimestampMerger::DeliveryTimestamp,
+           SizePrefixedFlatbufferVector<MessageHeader>>
 TimestampMerger::PopOldest() {
   if (has_timestamps_) {
     VLOG(1) << "Looking for matching timestamp for "
@@ -1168,7 +1165,7 @@
 
     // Read the timestamps.
     std::tuple<monotonic_clock::time_point, uint32_t,
-               FlatbufferVector<MessageHeader>>
+               SizePrefixedFlatbufferVector<MessageHeader>>
         oldest_timestamp = PopTimestampHeap();
 
     TimestampMerger::DeliveryTimestamp timestamp;
@@ -1250,7 +1247,7 @@
                      configuration_->channels()->Get(channel_index_))
               << " (" << channel_index_ << ")";
       std::tuple<monotonic_clock::time_point, uint32_t,
-                 FlatbufferVector<MessageHeader>>
+                 SizePrefixedFlatbufferVector<MessageHeader>>
           oldest_message = PopMessageHeap();
 
       timestamp.realtime_remote_time =
@@ -1272,7 +1269,7 @@
     }
   } else {
     std::tuple<monotonic_clock::time_point, uint32_t,
-               FlatbufferVector<MessageHeader>>
+               SizePrefixedFlatbufferVector<MessageHeader>>
         oldest_message = PopMessageHeap();
 
     TimestampMerger::DeliveryTimestamp timestamp;
@@ -1354,7 +1351,7 @@
       if (both_null || node_names_identical) {
         if (!found_node) {
           found_node = true;
-          log_file_header_ = CopyFlatBuffer(reader->log_file_header());
+          log_file_header_ = reader->raw_log_file_header();
           VLOG(1) << "Found log file " << reader->filename() << " with node "
                   << FlatbufferToJson(reader->node()) << " start_time "
                   << monotonic_start_time();
@@ -1467,7 +1464,7 @@
 }
 
 std::tuple<TimestampMerger::DeliveryTimestamp, int,
-           FlatbufferVector<MessageHeader>>
+           SizePrefixedFlatbufferVector<MessageHeader>>
 ChannelMerger::PopOldest() {
   CHECK_GT(channel_heap_.size(), 0u);
   std::pair<monotonic_clock::time_point, int> oldest_channel_data =
@@ -1483,7 +1480,7 @@
 
   // Merger handles any queueing needed from here.
   std::tuple<TimestampMerger::DeliveryTimestamp,
-             FlatbufferVector<MessageHeader>>
+             SizePrefixedFlatbufferVector<MessageHeader>>
       message = merger->PopOldest();
   DCHECK_EQ(std::get<0>(message).monotonic_event_time,
             oldest_channel_data.first)