Stop stripping the size prefix off
This turns out to be super dangerous to do. A flatbuffer is aligned
assuming that the size is either there or not there. By removing it,
you break alignment.
This necesitates having 2 subclasses of Flatbuffer. A SizePrefixed
version and a non size prefixed version. That lets us distinguish for
methods which care.
Once all that's done, deal with the fallout through the code base,
including logfile_utils and the chaos that causes rippling out.
Change-Id: I91b7be355279a1c19e5c956c33359df01a17eacf
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index e0da59e..caa2b90 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -111,8 +111,9 @@
<< std::endl;
while (true) {
- std::optional<aos::FlatbufferVector<aos::logger::MessageHeader>> message =
- reader.ReadMessage();
+ std::optional<
+ aos::SizePrefixedFlatbufferVector<aos::logger::MessageHeader>>
+ message = reader.ReadMessage();
if (!message) {
break;
}
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 411e666..ec60143 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -32,7 +32,7 @@
const Node *node) {
CHECK_EQ(node, this->node());
UpdateHeader(header, uuid_, part_number_);
- data_writer_->QueueSpan(header->full_span());
+ data_writer_->QueueSpan(header->span());
}
DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
@@ -48,7 +48,7 @@
++part_number_;
*data_writer_ = std::move(*OpenDataWriter());
UpdateHeader(header, uuid_, part_number_);
- data_writer_->QueueSpan(header->full_span());
+ data_writer_->QueueSpan(header->span());
}
DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
@@ -88,14 +88,14 @@
OpenDataWriter();
}
UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
- data_writer_.writer->QueueSpan(header->full_span());
+ data_writer_.writer->QueueSpan(header->span());
} else {
for (std::pair<const Channel *const, DataWriter> &data_writer :
data_writers_) {
if (node == data_writer.second.node) {
UpdateHeader(header, data_writer.second.uuid,
data_writer.second.part_number);
- data_writer.second.writer->QueueSpan(header->full_span());
+ data_writer.second.writer->QueueSpan(header->span());
}
}
}
@@ -110,7 +110,7 @@
}
OpenDataWriter();
UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
- data_writer_.writer->QueueSpan(header->full_span());
+ data_writer_.writer->QueueSpan(header->span());
} else {
for (std::pair<const Channel *const, DataWriter> &data_writer :
data_writers_) {
@@ -119,7 +119,7 @@
data_writer.second.rotate(data_writer.first, &data_writer.second);
UpdateHeader(header, data_writer.second.uuid,
data_writer.second.part_number);
- data_writer.second.writer->QueueSpan(header->full_span());
+ data_writer.second.writer->QueueSpan(header->span());
}
}
}
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 43e6aa2..08a230c 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -65,7 +65,7 @@
// Now extract everything into our datastructures above for sorting.
for (const std::string &part : parts) {
- std::optional<FlatbufferVector<LogFileHeader>> log_header =
+ std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> log_header =
ReadHeader(part);
if (!log_header) {
LOG(WARNING) << "Skipping " << part << " without a header";
@@ -94,7 +94,7 @@
if (!log_header->message().has_parts_uuid() &&
!log_header->message().has_parts_index() &&
!log_header->message().has_node()) {
- std::optional<FlatbufferVector<MessageHeader>> first_message =
+ std::optional<SizePrefixedFlatbufferVector<MessageHeader>> first_message =
ReadNthMessage(part, 0);
if (!first_message) {
LOG(WARNING) << "Skipping " << part << " without any messages";
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index d906997..330c78e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -348,7 +348,7 @@
return true;
}
-std::optional<FlatbufferVector<LogFileHeader>> ReadHeader(
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
std::string_view filename) {
SpanReader span_reader(filename);
absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
@@ -360,13 +360,12 @@
// And copy the config so we have it forever, removing the size prefix.
ResizeableBuffer data;
- data.resize(config_data.size() - sizeof(flatbuffers::uoffset_t));
- memcpy(data.data(), config_data.begin() + sizeof(flatbuffers::uoffset_t),
- data.size());
- return FlatbufferVector<LogFileHeader>(std::move(data));
+ data.resize(config_data.size());
+ memcpy(data.data(), config_data.begin(), data.size());
+ return SizePrefixedFlatbufferVector<LogFileHeader>(std::move(data));
}
-std::optional<FlatbufferVector<MessageHeader>> ReadNthMessage(
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
std::string_view filename, size_t n) {
SpanReader span_reader(filename);
absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
@@ -381,15 +380,15 @@
// And copy the config so we have it forever, removing the size prefix.
ResizeableBuffer data;
- data.resize(data_span.size() - sizeof(flatbuffers::uoffset_t));
- memcpy(data.data(), data_span.begin() + sizeof(flatbuffers::uoffset_t),
- data.size());
- return FlatbufferVector<MessageHeader>(std::move(data));
+ data.resize(data_span.size());
+ memcpy(data.data(), data_span.begin(), data.size());
+ return SizePrefixedFlatbufferVector<MessageHeader>(std::move(data));
}
MessageReader::MessageReader(std::string_view filename)
: span_reader_(filename),
- raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
+ raw_log_file_header_(
+ SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
// Make sure we have enough to read the size.
absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
@@ -399,12 +398,10 @@
// And copy the header data so we have it forever.
ResizeableBuffer header_data_copy;
- header_data_copy.resize(header_data.size() - sizeof(flatbuffers::uoffset_t));
- memcpy(header_data_copy.data(),
- header_data.begin() + sizeof(flatbuffers::uoffset_t),
- header_data_copy.size());
+ header_data_copy.resize(header_data.size());
+ memcpy(header_data_copy.data(), header_data.begin(), header_data_copy.size());
raw_log_file_header_ =
- FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
+ SizePrefixedFlatbufferVector<LogFileHeader>(std::move(header_data_copy));
max_out_of_order_duration_ =
chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
@@ -413,18 +410,17 @@
<< FlatbufferToJson(log_file_header()->node());
}
-std::optional<FlatbufferVector<MessageHeader>> MessageReader::ReadMessage() {
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
+MessageReader::ReadMessage() {
absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
if (msg_data == absl::Span<const uint8_t>()) {
return std::nullopt;
}
ResizeableBuffer result_buffer;
- result_buffer.resize(msg_data.size() - sizeof(flatbuffers::uoffset_t));
- memcpy(result_buffer.data(),
- msg_data.begin() + sizeof(flatbuffers::uoffset_t),
- result_buffer.size());
- FlatbufferVector<MessageHeader> result(std::move(result_buffer));
+ result_buffer.resize(msg_data.size());
+ memcpy(result_buffer.data(), msg_data.begin(), result_buffer.size());
+ SizePrefixedFlatbufferVector<MessageHeader> result(std::move(result_buffer));
const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
chrono::nanoseconds(result.message().monotonic_sent_time()));
@@ -437,10 +433,10 @@
PartsMessageReader::PartsMessageReader(LogParts log_parts)
: parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
-std::optional<FlatbufferVector<MessageHeader>>
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
PartsMessageReader::ReadMessage() {
while (!done_) {
- std::optional<FlatbufferVector<MessageHeader>> message =
+ std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
message_reader_.ReadMessage();
if (message) {
newest_timestamp_ = message_reader_.newest_timestamp();
@@ -468,7 +464,7 @@
SplitMessageReader::SplitMessageReader(
const std::vector<std::string> &filenames)
: filenames_(filenames),
- log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
+ log_file_header_(SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
CHECK(NextLogFile()) << ": filenames is empty. Need files to read.";
// Grab any log file header. They should all match (and we will check as we
@@ -674,7 +670,7 @@
return true;
}
- if (std::optional<FlatbufferVector<MessageHeader>> msg =
+ if (std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
message_reader_->ReadMessage()) {
const MessageHeader &header = msg.value().message();
@@ -687,7 +683,7 @@
<< newest_timestamp() << " start time "
<< monotonic_start_time() << " " << FlatbufferToJson(&header);
} else if (VLOG_IS_ON(1)) {
- FlatbufferVector<MessageHeader> copy = msg.value();
+ SizePrefixedFlatbufferVector<MessageHeader> copy = msg.value();
copy.mutable_message()->clear_data();
LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
<< filename() << " ttq: " << time_to_queue_ << " now "
@@ -777,12 +773,12 @@
}
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
SplitMessageReader::PopOldest(int channel_index) {
CHECK_GT(channels_[channel_index].data.size(), 0u);
const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp = channels_[channel_index].data.front_timestamp();
- FlatbufferVector<MessageHeader> front =
+ SizePrefixedFlatbufferVector<MessageHeader> front =
std::move(channels_[channel_index].data.front());
channels_[channel_index].data.PopFront();
@@ -799,12 +795,12 @@
}
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
timestamp = channels_[channel].timestamps[node_index].front_timestamp();
- FlatbufferVector<MessageHeader> front =
+ SizePrefixedFlatbufferVector<MessageHeader> front =
std::move(channels_[channel].timestamps[node_index].front());
channels_[channel].timestamps[node_index].PopFront();
@@ -823,7 +819,7 @@
}
bool SplitMessageReader::MessageHeaderQueue::emplace_back(
- FlatbufferVector<MessageHeader> &&msg) {
+ SizePrefixedFlatbufferVector<MessageHeader> &&msg) {
CHECK(split_reader != nullptr);
// If there is no timestamp merger for this queue, nobody is listening. Drop
@@ -1021,7 +1017,7 @@
}
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
TimestampMerger::PopMessageHeap() {
// Pop the oldest message reader pointer off the heap.
CHECK_GT(message_heap_.size(), 0u);
@@ -1035,7 +1031,7 @@
// Pop the oldest message. This re-pushes any messages from the reader to the
// message heap.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_message =
std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
@@ -1063,7 +1059,7 @@
// Pop the next oldest message. This re-pushes any messages from the
// reader.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
next_oldest_message = std::get<2>(next_oldest_message_reader)
->PopOldest(channel_index_);
@@ -1080,7 +1076,7 @@
}
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
TimestampMerger::PopTimestampHeap() {
// Pop the oldest message reader pointer off the heap.
CHECK_GT(timestamp_heap_.size(), 0u);
@@ -1097,7 +1093,7 @@
// Pop the oldest message. This re-pushes any timestamps from the reader to
// the timestamp heap.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_timestamp = std::get<2>(oldest_timestamp_reader)
->PopOldestTimestamp(channel_index_, node_index_);
@@ -1128,7 +1124,7 @@
// Pop the next oldest timestamp. This re-pushes any messages from the
// reader.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
next_oldest_timestamp =
std::get<2>(next_oldest_timestamp_reader)
->PopOldestTimestamp(channel_index_, node_index_);
@@ -1157,7 +1153,8 @@
return oldest_timestamp;
}
-std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
+std::tuple<TimestampMerger::DeliveryTimestamp,
+ SizePrefixedFlatbufferVector<MessageHeader>>
TimestampMerger::PopOldest() {
if (has_timestamps_) {
VLOG(1) << "Looking for matching timestamp for "
@@ -1168,7 +1165,7 @@
// Read the timestamps.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_timestamp = PopTimestampHeap();
TimestampMerger::DeliveryTimestamp timestamp;
@@ -1250,7 +1247,7 @@
configuration_->channels()->Get(channel_index_))
<< " (" << channel_index_ << ")";
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_message = PopMessageHeap();
timestamp.realtime_remote_time =
@@ -1272,7 +1269,7 @@
}
} else {
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
oldest_message = PopMessageHeap();
TimestampMerger::DeliveryTimestamp timestamp;
@@ -1354,7 +1351,7 @@
if (both_null || node_names_identical) {
if (!found_node) {
found_node = true;
- log_file_header_ = CopyFlatBuffer(reader->log_file_header());
+ log_file_header_ = reader->raw_log_file_header();
VLOG(1) << "Found log file " << reader->filename() << " with node "
<< FlatbufferToJson(reader->node()) << " start_time "
<< monotonic_start_time();
@@ -1467,7 +1464,7 @@
}
std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
ChannelMerger::PopOldest() {
CHECK_GT(channel_heap_.size(), 0u);
std::pair<monotonic_clock::time_point, int> oldest_channel_data =
@@ -1483,7 +1480,7 @@
// Merger handles any queueing needed from here.
std::tuple<TimestampMerger::DeliveryTimestamp,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
message = merger->PopOldest();
DCHECK_EQ(std::get<0>(message).monotonic_event_time,
oldest_channel_data.first)
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 8381a9a..985a6bc 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -181,9 +181,9 @@
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type);
-std::optional<FlatbufferVector<LogFileHeader>> ReadHeader(
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
std::string_view filename);
-std::optional<FlatbufferVector<MessageHeader>> ReadNthMessage(
+std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
std::string_view filename, size_t n);
// Class to read chunks out of a log file.
@@ -234,7 +234,8 @@
}
// Returns the raw data of the header from the log file.
- const FlatbufferVector<LogFileHeader> &raw_log_file_header() const {
+ const SizePrefixedFlatbufferVector<LogFileHeader> &raw_log_file_header()
+ const {
return raw_log_file_header_;
}
@@ -250,7 +251,7 @@
}
// Returns the next message if there is one.
- std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
+ std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
// The time at which we need to read another chunk from the logfile.
monotonic_clock::time_point queue_data_time() const {
@@ -262,7 +263,7 @@
SpanReader span_reader_;
// Vector holding the raw data for the log file header.
- FlatbufferVector<LogFileHeader> raw_log_file_header_;
+ SizePrefixedFlatbufferVector<LogFileHeader> raw_log_file_header_;
// Minimum amount of data to queue up for sorting before we are guarenteed
// to not see data out of order.
@@ -293,7 +294,7 @@
// Returns the next message if there is one, or nullopt if we have reached the
// end of all the files.
// Note: reading the next message may change the max_out_of_order_duration().
- std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
+ std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
private:
// Opens the next log and updates message_reader_. Sets done_ if there is
@@ -351,13 +352,13 @@
// Returns the timestamp, queue_index, and message for the oldest data on a
// channel. Requeues data as needed.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
PopOldest(int channel_index);
// Returns the timestamp, queue_index, and message for the oldest timestamp on
// a channel delivered to a node. Requeues data as needed.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
PopOldestTimestamp(int channel, int node_index);
// Returns the header for the log files.
@@ -365,7 +366,8 @@
return &log_file_header_.message();
}
- const FlatbufferVector<LogFileHeader> &raw_log_file_header() const {
+ const SizePrefixedFlatbufferVector<LogFileHeader> &raw_log_file_header()
+ const {
return log_file_header_;
}
@@ -444,7 +446,7 @@
const Node *target_node_ = nullptr;
// Log file header to report. This is a copy.
- FlatbufferVector<LogFileHeader> log_file_header_;
+ SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
// Current log file being read.
std::unique_ptr<MessageReader> message_reader_;
@@ -455,14 +457,14 @@
bool timestamps = false;
// Returns a reference to the the oldest message.
- FlatbufferVector<MessageHeader> &front() {
+ SizePrefixedFlatbufferVector<MessageHeader> &front() {
CHECK_GT(data_.size(), 0u);
return data_.front();
}
// Adds a message to the back of the queue. Returns true if it was actually
// emplaced.
- bool emplace_back(FlatbufferVector<MessageHeader> &&msg);
+ bool emplace_back(SizePrefixedFlatbufferVector<MessageHeader> &&msg);
// Drops the front message. Invalidates the front() reference.
void PopFront();
@@ -492,7 +494,7 @@
private:
// The data.
- std::deque<FlatbufferVector<MessageHeader>> data_;
+ std::deque<SizePrefixedFlatbufferVector<MessageHeader>> data_;
};
// All the queues needed for a channel. There isn't going to be data in all
@@ -569,7 +571,8 @@
// Returns the oldest combined timestamp and data for this channel. If there
// isn't a matching piece of data, returns only the timestamp with no data.
// The caller can determine what the appropriate action is to recover.
- std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
+ std::tuple<DeliveryTimestamp, SizePrefixedFlatbufferVector<MessageHeader>>
+ PopOldest();
// Tracks if the channel merger has pushed this onto it's heap or not.
bool pushed() { return pushed_; }
@@ -609,7 +612,7 @@
// Pops a message from the message heap. This automatically triggers the
// split message reader to re-fetch any new data.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
PopMessageHeap();
std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
@@ -619,7 +622,7 @@
// Pops a message from the timestamp heap. This automatically triggers the
// split message reader to re-fetch any new data.
std::tuple<monotonic_clock::time_point, uint32_t,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
PopTimestampHeap();
const Configuration *configuration_;
@@ -678,7 +681,7 @@
monotonic_clock::time_point OldestMessageTime() const;
// Pops the oldest message.
std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
PopOldest();
// Returns the config for this set of log files.
@@ -734,7 +737,7 @@
std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
// The log header we are claiming to be.
- FlatbufferVector<LogFileHeader> log_file_header_;
+ SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
// The timestamp mergers which combine data from the split message readers.
std::vector<TimestampMerger> timestamp_mergers_;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 3644419..14d1de7 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -39,15 +39,15 @@
{
DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
- writer.QueueSpan(m1.full_span());
- writer.QueueSpan(m2.full_span());
+ writer.QueueSpan(m1.span());
+ writer.QueueSpan(m2.span());
}
SpanReader reader(logfile);
EXPECT_EQ(reader.filename(), logfile);
- EXPECT_EQ(reader.ReadMessage(), m1.full_span());
- EXPECT_EQ(reader.ReadMessage(), m2.full_span());
+ EXPECT_EQ(reader.ReadMessage(), m1.span());
+ EXPECT_EQ(reader.ReadMessage(), m2.span());
EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
}
@@ -69,9 +69,9 @@
{
DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
- writer.QueueSpan(config.full_span());
- writer.QueueSpan(m1.full_span());
- writer.QueueSpan(m2.full_span());
+ writer.QueueSpan(config.span());
+ writer.QueueSpan(m1.span());
+ writer.QueueSpan(m2.span());
}
MessageReader reader(logfile);
@@ -117,10 +117,10 @@
{
DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
- writer.QueueSpan(config0.full_span());
- writer.QueueSpan(m1.full_span());
- writer.QueueSpan(m2.full_span());
- writer.QueueSpan(m3.full_span());
+ writer.QueueSpan(config0.span());
+ writer.QueueSpan(m1.span());
+ writer.QueueSpan(m2.span());
+ writer.QueueSpan(m3.span());
}
const std::vector<LogFile> parts = SortParts({logfile0});
@@ -168,13 +168,13 @@
{
DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
- writer.QueueSpan(config0.full_span());
- writer.QueueSpan(m1.full_span());
+ writer.QueueSpan(config0.span());
+ writer.QueueSpan(m1.span());
}
{
DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
- writer.QueueSpan(config1.full_span());
- writer.QueueSpan(m2.full_span());
+ writer.QueueSpan(config1.span());
+ writer.QueueSpan(m2.span());
}
const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 977a82f..b32c748 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -37,11 +37,11 @@
namespace logger {
namespace {
// Helper to safely read a header, or CHECK.
-FlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
+SizePrefixedFlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
const std::vector<std::vector<std::string>> &filenames) {
CHECK_GE(filenames.size(), 1u) << ": Empty filenames list";
CHECK_GE(filenames[0].size(), 1u) << ": Empty filenames list";
- std::optional<FlatbufferVector<LogFileHeader>> result =
+ std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> result =
ReadHeader(filenames[0][0]);
CHECK(result);
return result.value();
@@ -1285,8 +1285,8 @@
}
TimestampMerger::DeliveryTimestamp channel_timestamp;
int channel_index;
- FlatbufferVector<MessageHeader> channel_data =
- FlatbufferVector<MessageHeader>::Empty();
+ SizePrefixedFlatbufferVector<MessageHeader> channel_data =
+ SizePrefixedFlatbufferVector<MessageHeader>::Empty();
if (VLOG_IS_ON(1)) {
LogFit("Offset was");
@@ -1900,12 +1900,12 @@
}
std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
LogReader::State::PopOldest(bool *update_time) {
CHECK_GT(sorted_messages_.size(), 0u);
std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>,
+ SizePrefixedFlatbufferVector<MessageHeader>,
message_bridge::NoncausalOffsetEstimator *>
result = std::move(sorted_messages_.front());
VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
@@ -1955,8 +1955,8 @@
TimestampMerger::DeliveryTimestamp channel_timestamp;
int channel_index;
- FlatbufferVector<MessageHeader> channel_data =
- FlatbufferVector<MessageHeader>::Empty();
+ SizePrefixedFlatbufferVector<MessageHeader> channel_data =
+ SizePrefixedFlatbufferVector<MessageHeader>::Empty();
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index b37fea2..f6a037b 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -454,7 +454,7 @@
// This is *a* log file header used to provide the logged config. The rest of
// the header is likely distracting.
- FlatbufferVector<LogFileHeader> log_file_header_;
+ SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
// Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
@@ -472,7 +472,7 @@
// update_time (will be) set to true when popping this message causes the
// filter to change the time offset estimation function.
std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>>
+ SizePrefixedFlatbufferVector<MessageHeader>>
PopOldest(bool *update_time);
// Returns the monotonic time of the oldest message.
@@ -614,7 +614,7 @@
std::unique_ptr<ChannelMerger> channel_merger_;
std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
- FlatbufferVector<MessageHeader>,
+ SizePrefixedFlatbufferVector<MessageHeader>,
message_bridge::NoncausalOffsetEstimator *>>
sorted_messages_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index c55b18b..dbc8a78 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -263,7 +263,7 @@
{
// Confirm that the UUIDs match for both the parts and the logger, and the
// parts_index increments.
- std::vector<FlatbufferVector<LogFileHeader>> log_header;
+ std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
for (std::string_view f : {logfile0, logfile1}) {
log_header.emplace_back(ReadHeader(f).value());
}
@@ -563,7 +563,7 @@
message_reader.log_file_header()->configuration()->channels()->size(), 0);
while (true) {
- std::optional<FlatbufferVector<MessageHeader>> msg =
+ std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
message_reader.ReadMessage();
if (!msg) {
break;
@@ -636,7 +636,7 @@
std::set<std::string> parts_uuids;
// Confirm that we have the expected number of UUIDs for both the logfile
// UUIDs and parts UUIDs.
- std::vector<FlatbufferVector<LogFileHeader>> log_header;
+ std::vector<SizePrefixedFlatbufferVector<LogFileHeader>> log_header;
for (std::string_view f : logfiles_) {
log_header.emplace_back(ReadHeader(f).value());
logfile_uuids.insert(log_header.back().message().log_event_uuid()->str());