Stop stripping the size prefix off
This turns out to be super dangerous to do. A flatbuffer is aligned
assuming that the size is either there or not there. By removing it,
you break alignment.
This necesitates having 2 subclasses of Flatbuffer. A SizePrefixed
version and a non size prefixed version. That lets us distinguish for
methods which care.
Once all that's done, deal with the fallout through the code base,
including logfile_utils and the chaos that causes rippling out.
Change-Id: I91b7be355279a1c19e5c956c33359df01a17eacf
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index d906997..330c78e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -348,7 +348,7 @@
return true;
}
-std::optional<FlatbufferVector<LogFileHeader>> ReadHeader(
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
std::string_view filename) {
SpanReader span_reader(filename);
absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
@@ -360,13 +360,12 @@
// And copy the config so we have it forever, removing the size prefix.
ResizeableBuffer data;
- data.resize(config_data.size() - sizeof(flatbuffers::uoffset_t));
- memcpy(data.data(), config_data.begin() + sizeof(flatbuffers::uoffset_t),
- data.size());
- return FlatbufferVector<LogFileHeader>(std::move(data));
+ data.resize(config_data.size());
+ memcpy(data.data(), config_data.begin(), data.size());
+ return SizePrefixedFlatbufferVector<LogFileHeader>(std::move(data));
}
-std::optional<FlatbufferVector<MessageHeader>> ReadNthMessage(
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
std::string_view filename, size_t n) {
SpanReader span_reader(filename);
absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
@@ -381,15 +380,15 @@
// And copy the config so we have it forever, removing the size prefix.
ResizeableBuffer data;
- data.resize(data_span.size() - sizeof(flatbuffers::uoffset_t));
- memcpy(data.data(), data_span.begin() + sizeof(flatbuffers::uoffset_t),
- data.size());
- return FlatbufferVector<MessageHeader>(std::move(data));
+ data.resize(data_span.size());
+ memcpy(data.data(), data_span.begin(), data.size());
+ return SizePrefixedFlatbufferVector<MessageHeader>(std::move(data));
}
MessageReader::MessageReader(std::string_view filename)
: span_reader_(filename),
- raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
+ raw_log_file_header_(
+ SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
// Make sure we have enough to read the size.
absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
@@ -399,12 +398,10 @@
// And copy the header data so we have it forever.
ResizeableBuffer header_data_copy;
- header_data_copy.resize(header_data.size() - sizeof(flatbuffers::uoffset_t));
- memcpy(header_data_copy.data(),
- header_data.begin() + sizeof(flatbuffers::uoffset_t),
- header_data_copy.size());
+ header_data_copy.resize(header_data.size());
+ memcpy(header_data_copy.data(), header_data.begin(), header_data_copy.size());
raw_log_file_header_ =
- FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
+ SizePrefixedFlatbufferVector<LogFileHeader>(std::move(header_data_copy));
max_out_of_order_duration_ =
chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
@@ -413,18 +410,17 @@
<< FlatbufferToJson(log_file_header()->node());
}
-std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
+MessageReader::ReadMessage() {
absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
if (msg_data == absl::Span<const uint8_t>()) {
return std::nullopt;
}
ResizeableBuffer result_buffer;
- result_buffer.resize(msg_data.size() - sizeof(flatbuffers::uoffset_t));
- memcpy(result_buffer.data(),
- msg_data.begin() + sizeof(flatbuffers::uoffset_t),
- result_buffer.size());
- FlatbufferVector<MessageHeader> result(std::move(result_buffer));
+ result_buffer.resize(msg_data.size());
+ memcpy(result_buffer.data(), msg_data.begin(), result_buffer.size());
+ SizePrefixedFlatbufferVector<MessageHeader> result(std::move(result_buffer));
const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
chrono::nanoseconds(result.message().monotonic_sent_time()));
@@ -437,10 +433,10 @@
PartsMessageReader::PartsMessageReader(LogParts log_parts)
: parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
-std::optional<FlatbufferVector<MessageHeader>>
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
PartsMessageReader::ReadMessage() {
while (!done_) {
- std::optional<FlatbufferVector<MessageHeader>> message =
+ std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
message_reader_.ReadMessage();
if (message) {
newest_timestamp_ = message_reader_.newest_timestamp();
@@ -468,7 +464,7 @@
SplitMessageReader::SplitMessageReader(
const std::vector<std::string> &filenames)
: filenames_(filenames),
- log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
+ log_file_header_(SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
CHECK(NextLogFile()) << ": filenames is empty. Need files to read.";
// Grab any log file header. They should all match (and we will check as we
@@ -674,7 +670,7 @@
return true;
}
- if (std::optional<FlatbufferVector<MessageHeader>> msg =
+ if (std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
message_reader_->ReadMessage()) {
const MessageHeader &header = msg.value().message();
@@ -687,7 +683,7 @@
<< newest_timestamp() << " start time "
<< monotonic_start_time() << " " << FlatbufferToJson(&header);
} else if (VLOG_IS_ON(1)) {
- FlatbufferVector<MessageHeader> copy = msg.value();
+ SizePrefixedFlatbufferVector<MessageHeader> copy = msg.value();
copy.mutable_message()->clear_data();
LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
<< filename() << " ttq: " << time_to_queue_ << " now "
@@ -777,12 +773,12 @@
}
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
SplitMessageReader::PopOldest(int channel_index) {
CHECK_GT(channels_[channel_index].data.size(), 0u);
const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp = channels_[channel_index].data.front_timestamp();
- FlatbufferVector<MessageHeader> front =
+ SizePrefixedFlatbufferVector<MessageHeader> front =
std::move(channels_[channel_index].data.front());
channels_[channel_index].data.PopFront();
@@ -799,12 +795,12 @@
}
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp = channels_[channel].timestamps[node_index].front_timestamp();
- FlatbufferVector<MessageHeader> front =
+ SizePrefixedFlatbufferVector<MessageHeader> front =
std::move(channels_[channel].timestamps[node_index].front());
channels_[channel].timestamps[node_index].PopFront();
@@ -823,7 +819,7 @@
}
bool SplitMessageReader::MessageHeaderQueue::emplace_back(
- FlatbufferVector<MessageHeader> &&msg) {
+ SizePrefixedFlatbufferVector<MessageHeader> &&msg) {
CHECK(split_reader != nullptr);
// If there is no timestamp merger for this queue, nobody is listening. Drop
@@ -1021,7 +1017,7 @@
}
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
TimestampMerger::PopMessageHeap() {
// Pop the oldest message reader pointer off the heap.
CHECK_GT(message_heap_.size(), 0u);
@@ -1035,7 +1031,7 @@
// Pop the oldest message. This re-pushes any messages from the reader to the
// message heap.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_message =
std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
@@ -1063,7 +1059,7 @@
// Pop the next oldest message. This re-pushes any messages from the
// reader.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
next_oldest_message = std::get<2>(next_oldest_message_reader)
->PopOldest(channel_index_);
@@ -1080,7 +1076,7 @@
}
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
TimestampMerger::PopTimestampHeap() {
// Pop the oldest message reader pointer off the heap.
CHECK_GT(timestamp_heap_.size(), 0u);
@@ -1097,7 +1093,7 @@
// Pop the oldest message. This re-pushes any timestamps from the reader to
// the timestamp heap.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_timestamp = std::get<2>(oldest_timestamp_reader)
->PopOldestTimestamp(channel_index_, node_index_);
@@ -1128,7 +1124,7 @@
// Pop the next oldest timestamp. This re-pushes any messages from the
// reader.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
next_oldest_timestamp =
std::get<2>(next_oldest_timestamp_reader)
->PopOldestTimestamp(channel_index_, node_index_);
@@ -1157,7 +1153,8 @@
return oldest_timestamp;
}
-std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
+std::tuple<TimestampMerger::DeliveryTimestamp,
+ SizePrefixedFlatbufferVector<MessageHeader>>
TimestampMerger::PopOldest() {
if (has_timestamps_) {
VLOG(1) << "Looking for matching timestamp for "
@@ -1168,7 +1165,7 @@
// Read the timestamps.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_timestamp = PopTimestampHeap();
TimestampMerger::DeliveryTimestamp timestamp;
@@ -1250,7 +1247,7 @@
configuration_->channels()->Get(channel_index_))
<< " (" << channel_index_ << ")";
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_message = PopMessageHeap();
timestamp.realtime_remote_time =
@@ -1272,7 +1269,7 @@
}
} else {
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_message = PopMessageHeap();
TimestampMerger::DeliveryTimestamp timestamp;
@@ -1354,7 +1351,7 @@
if (both_null || node_names_identical) {
if (!found_node) {
found_node = true;
- log_file_header_ = CopyFlatBuffer(reader->log_file_header());
+ log_file_header_ = reader->raw_log_file_header();
VLOG(1) << "Found log file " << reader->filename() << " with node "
<< FlatbufferToJson(reader->node()) << " start_time "
<< monotonic_start_time();
@@ -1467,7 +1464,7 @@
}
std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
ChannelMerger::PopOldest() {
CHECK_GT(channel_heap_.size(), 0u);
std::pair<monotonic_clock::time_point, int> oldest_channel_data =
@@ -1483,7 +1480,7 @@
// Merger handles any queueing needed from here.
std::tuple<TimestampMerger::DeliveryTimestamp,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
message = merger->PopOldest();
DCHECK_EQ(std::get<0>(message).monotonic_event_time,
oldest_channel_data.first)