Change LogReader API to be able to replace messages
The mutation API in LogReader was not able to express dropping messages,
or growing messages. This enables more aggressive mutation.
Change-Id: I477482da4262483a780d15ebf8c98a51e37099f6
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/BUILD b/aos/BUILD
index de4ef4f..b6d357b 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -509,6 +509,7 @@
deps = [
"//aos:macros",
"//aos/containers:resizeable_buffer",
+ "//aos/ipc_lib:data_alignment",
"//aos/util:file",
"@com_github_google_flatbuffers//:flatbuffers",
"@com_github_google_glog//:glog",
diff --git a/aos/containers/resizeable_buffer.h b/aos/containers/resizeable_buffer.h
index 484d93e..664fa2f 100644
--- a/aos/containers/resizeable_buffer.h
+++ b/aos/containers/resizeable_buffer.h
@@ -55,6 +55,8 @@
size_t size() const { return size_; }
size_t capacity() const { return capacity_; }
+ bool empty() const { return size_ == 0; }
+
void reserve(size_t new_size) {
if (new_size > capacity_) {
Allocate(new_size);
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 83393c2..ff629f9 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -166,7 +166,8 @@
// Sends a single block of data by refcounting it to avoid copies. The data
// must not change after being passed into Send. The remote arguments have the
- // same meaning as in Send above.
+ // same meaning as in Send above. Note: some implmementations will have to
+ // copy anyways, but other implementations can skip the copy.
Error Send(const SharedSpan data);
Error Send(const SharedSpan data,
monotonic_clock::time_point monotonic_remote_time,
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index f056bca..ec3d51e 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -929,6 +929,7 @@
target_compatible_with = ["@platforms//os:linux"],
deps = [
":multinode_logger_test_lib",
+ "//aos/flatbuffers:aligned_allocator",
],
)
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index df1ed8c..192dade 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -1301,7 +1301,7 @@
std::function<void()> notice_realtime_end, const Node *node,
LogReader::State::ThreadedBuffering threading,
std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
- const std::vector<std::function<void(void *message)>>
+ const std::vector<std::function<SharedSpan(TimestampedMessage &)>>
&before_send_callbacks)
: timestamp_mapper_(std::move(timestamp_mapper)),
timestamp_queue_strategy_(timestamp_queue_strategy),
@@ -1416,7 +1416,7 @@
timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
}
-bool LogReader::State::Send(const TimestampedMessage &×tamped_message) {
+bool LogReader::State::Send(TimestampedMessage &×tamped_message) {
aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
CHECK(sender);
uint32_t remote_queue_index = 0xffffffff;
@@ -1506,21 +1506,31 @@
->boot_uuid());
}
+ SharedSpan to_send;
// Right before sending allow the user to process the message.
if (before_send_callbacks_[timestamped_message.channel_index]) {
- // Only channels that are forwarded and sent from this State's node will be
- // in the queue_index_map_
- if (queue_index_map_[timestamped_message.channel_index]) {
- before_send_callbacks_[timestamped_message.channel_index](
- timestamped_message.data->mutable_data());
+ // Only channels which are forwarded and on the destination node have
+ // channel_source_state_ set to non-null. See RegisterDuringStartup.
+ if (channel_source_state_[timestamped_message.channel_index] == nullptr) {
+ // It is safe in this case since there is only one caller to Send, and the
+ // data is not mutated after Send is called.
+ to_send = before_send_callbacks_[timestamped_message.channel_index](
+ timestamped_message);
+ *timestamped_message.data.get() = to_send;
+ } else {
+ to_send = *timestamped_message.data;
}
+ if (!to_send) {
+ return false;
+ }
+ } else {
+ to_send = *timestamped_message.data;
}
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
const RawSender::Error err = sender->Send(
- SharedSpan(timestamped_message.data, ×tamped_message.data->span),
- timestamped_message.monotonic_remote_time.time,
+ std::move(to_send), timestamped_message.monotonic_remote_time.time,
timestamped_message.realtime_remote_time,
timestamped_message.monotonic_remote_transmit_time.time,
remote_queue_index,
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index fb19e78..6d70476 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -346,17 +346,25 @@
// implementation. And, the callback is called only once one the Sender's Node
// if the channel is forwarded.
//
+ // The callback should have a signature like:
+ // [](aos::examples::Ping *ping,
+ // const TimestampedMessage ×tamped_message) -> SharedSpan {
+ // if (drop) {
+ // return nullptr;
+ // } else {
+ // return *timestamped_message.data;
+ // }
+ // }
+ //
+ // If nullptr is returned, the message will not be sent.
+ //
// See multinode_logger_test for examples of usage.
- template <typename Callback>
+ template <typename MessageType, typename Callback>
void AddBeforeSendCallback(std::string_view channel_name,
Callback &&callback) {
CHECK(!AreStatesInitialized())
<< ": Cannot add callbacks after calling Register";
- using MessageType = typename std::remove_pointer<
- typename event_loop_internal::watch_message_type_trait<
- decltype(&Callback::operator())>::message_type>::type;
-
const Channel *channel = configuration::GetChannel(
logged_configuration(), channel_name,
MessageType::GetFullyQualifiedName(), "", nullptr);
@@ -373,9 +381,16 @@
<< ":{ \"name\": \"" << channel_name << "\", \"type\": \""
<< MessageType::GetFullyQualifiedName() << "\" }";
- before_send_callbacks_[channel_index] = [callback](void *message) {
- callback(flatbuffers::GetMutableRoot<MessageType>(
- reinterpret_cast<char *>(message)));
+ before_send_callbacks_[channel_index] =
+ [callback](TimestampedMessage ×tamped_message) -> SharedSpan {
+ // Note: the const_cast is because SharedSpan is defined to be a pointer
+ // to const data, even though it wraps mutable data.
+ // TODO(austin): Refactor to make it non-const properly to drop the const
+ // cast.
+ return callback(flatbuffers::GetMutableRoot<MessageType>(
+ reinterpret_cast<char *>(const_cast<uint8_t *>(
+ timestamped_message.data.get()->get()->data()))),
+ timestamped_message);
};
}
@@ -460,7 +475,7 @@
std::function<void()> notice_realtime_end, const Node *node,
ThreadedBuffering threading,
std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
- const std::vector<std::function<void(void *message)>>
+ const std::vector<std::function<SharedSpan(TimestampedMessage &)>>
&before_send_callbacks);
// Connects up the timestamp mappers.
@@ -705,8 +720,9 @@
std::max(monotonic_now(), next_time + clock_offset()));
}
- // Sends a buffer on the provided channel index.
- bool Send(const TimestampedMessage &×tamped_message);
+ // Sends a buffer on the provided channel index. Returns true if the
+ // message was actually sent, and false otherwise.
+ bool Send(TimestampedMessage &×tamped_message);
void MaybeSetClockOffset();
std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
@@ -886,7 +902,7 @@
// indices of the channels to replay for the Node represented by
// the instance of LogReader::State.
std::unique_ptr<const ReplayChannelIndices> replay_channel_indices_;
- const std::vector<std::function<void(void *message)>>
+ const std::vector<std::function<SharedSpan(TimestampedMessage &)>>
before_send_callbacks_;
};
@@ -934,7 +950,8 @@
// The callbacks that will be called before sending a message indexed by the
// channel index from the logged_configuration
- std::vector<std::function<void(void *message)>> before_send_callbacks_;
+ std::vector<std::function<SharedSpan(TimestampedMessage &)>>
+ before_send_callbacks_;
// If true, the replay timer will ignore any missing data. This is used
// during startup when we are bootstrapping everything and trying to get to
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index c01026f..30ed3d7 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1579,14 +1579,14 @@
os << "{.channel_index=" << msg.channel_index
<< ", .queue_index=" << msg.queue_index
<< ", .timestamp=" << msg.timestamp;
- if (msg.data != nullptr) {
- if (msg.data->remote_queue_index.has_value()) {
- os << ", .remote_queue_index=" << *msg.data->remote_queue_index;
+ if (msg.header != nullptr) {
+ if (msg.header->remote_queue_index.has_value()) {
+ os << ", .remote_queue_index=" << *msg.header->remote_queue_index;
}
- if (msg.data->monotonic_remote_time.has_value()) {
- os << ", .monotonic_remote_time=" << *msg.data->monotonic_remote_time;
+ if (msg.header->monotonic_remote_time.has_value()) {
+ os << ", .monotonic_remote_time=" << *msg.header->monotonic_remote_time;
}
- os << ", .data=" << msg.data;
+ os << ", .header=" << msg.header;
}
os << "}";
return os;
@@ -1614,7 +1614,7 @@
os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
}
if (msg.data != nullptr) {
- os << ", .data=" << *msg.data;
+ os << ", .data=" << msg.data.get();
} else {
os << ", .data=nullptr";
}
@@ -1666,15 +1666,20 @@
monotonic_remote_boot = *boot;
}
- messages_.insert(
- Message{.channel_index = msg->channel_index,
- .queue_index = BootQueueIndex{.boot = parts().boot_count,
- .index = msg->queue_index},
- .timestamp = BootTimestamp{.boot = parts().boot_count,
- .time = msg->monotonic_sent_time},
- .monotonic_remote_boot = monotonic_remote_boot,
- .monotonic_timestamp_boot = monotonic_timestamp_boot,
- .data = std::move(msg)});
+ std::shared_ptr<SharedSpan> data =
+ std::make_shared<SharedSpan>(msg, &msg->span);
+
+ messages_.insert(Message{
+ .channel_index = msg->channel_index,
+ .queue_index = BootQueueIndex{.boot = parts().boot_count,
+ .index = msg->queue_index},
+ .timestamp = BootTimestamp{.boot = parts().boot_count,
+ .time = msg->monotonic_sent_time},
+ .monotonic_remote_boot = monotonic_remote_boot,
+ .monotonic_timestamp_boot = monotonic_timestamp_boot,
+ .header = std::move(msg),
+ .data = std::move(data),
+ });
// Now, update sorted_until_ to match the new message.
if (parts_message_reader_.newest_timestamp() >
@@ -1827,15 +1832,15 @@
} else if (*msg == *oldest) {
// Found a duplicate. If there is a choice, we want the one which has
// the timestamp time.
- if (!msg->data->has_monotonic_timestamp_time) {
+ if (!msg->header->has_monotonic_timestamp_time) {
message_sorter.PopFront();
- } else if (!oldest->data->has_monotonic_timestamp_time) {
+ } else if (!oldest->header->has_monotonic_timestamp_time) {
current_->PopFront();
current_ = &message_sorter;
oldest = msg;
} else {
- CHECK_EQ(msg->data->monotonic_timestamp_time,
- oldest->data->monotonic_timestamp_time);
+ CHECK_EQ(msg->header->monotonic_timestamp_time,
+ oldest->header->monotonic_timestamp_time);
message_sorter.PopFront();
}
}
@@ -2037,26 +2042,30 @@
}
CHECK_LT(msg->channel_index, source_node.size());
if (source_node[msg->channel_index] != static_cast<size_t>(node())) {
- timestamp_messages_.emplace_back(TimestampedMessage{
+ TimestampedMessage timestamped_message{
.channel_index = msg->channel_index,
.queue_index = msg->queue_index,
.monotonic_event_time = msg->timestamp,
- .realtime_event_time = msg->data->realtime_sent_time,
+ .realtime_event_time = msg->header->realtime_sent_time,
.remote_queue_index =
BootQueueIndex{.boot = msg->monotonic_remote_boot,
- .index = msg->data->remote_queue_index.value()},
+ .index = msg->header->remote_queue_index.value()},
.monotonic_remote_time = {msg->monotonic_remote_boot,
- msg->data->monotonic_remote_time.value()},
- .realtime_remote_time = msg->data->realtime_remote_time.value(),
+ msg->header->monotonic_remote_time.value()},
+ .realtime_remote_time = msg->header->realtime_remote_time.value(),
.monotonic_remote_transmit_time =
{msg->monotonic_remote_boot,
- msg->data->monotonic_remote_transmit_time},
+ msg->header->monotonic_remote_transmit_time},
.monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
- msg->data->monotonic_timestamp_time},
- .data = std::move(msg->data)});
+ msg->header->monotonic_timestamp_time},
+ .data = msg->data,
+ };
- VLOG(2) << this << " Queued timestamp of " << timestamp_messages_.back();
- fn(×tamp_messages_.back());
+ fn(×tamped_message);
+
+ VLOG(2) << this << " Queued timestamp of " << timestamped_message;
+
+ timestamp_messages_.emplace_back(std::move(*msg));
} else {
VLOG(2) << this << " Dropped data";
}
@@ -2100,25 +2109,12 @@
CHECK(queue_timestamps_ran_);
}
- // timestamp_messages_ is a queue of TimestampedMessage, but we are supposed
- // to return a Message. We need to convert the first message in the list
- // before returning it (and comparing, honestly). Fill next_timestamp_ in if
- // it is empty so the rest of the logic here can just look at next_timestamp_
- // and use that instead.
- if (!next_timestamp_ && !timestamp_messages_.empty()) {
- auto &front = timestamp_messages_.front();
- next_timestamp_ = Message{
- .channel_index = front.channel_index,
- .queue_index = front.queue_index,
- .timestamp = front.monotonic_event_time,
- .monotonic_remote_boot = front.remote_queue_index.boot,
- .monotonic_timestamp_boot = front.monotonic_timestamp_time.boot,
- .data = std::move(front.data),
- };
- timestamp_messages_.pop_front();
+ const Message *timestamp_messages_front = nullptr;
+ if (!timestamp_messages_.empty()) {
+ timestamp_messages_front = ×tamp_messages_.front();
}
- if (!next_timestamp_) {
+ if (!timestamp_messages_front) {
message_source_ = MessageSource::kBootMerger;
if (boot_merger_front != nullptr) {
VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
@@ -2134,15 +2130,15 @@
message_source_ = MessageSource::kTimestampMessage;
VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
- << next_timestamp_.value();
- return &next_timestamp_.value();
+ << *timestamp_messages_front;
+ return timestamp_messages_front;
}
- if (*boot_merger_front <= next_timestamp_.value()) {
- if (*boot_merger_front == next_timestamp_.value()) {
+ if (*boot_merger_front <= *timestamp_messages_front) {
+ if (*boot_merger_front == *timestamp_messages_front) {
VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
<< " Dropping duplicate timestamp.";
- next_timestamp_.reset();
+ timestamp_messages_.pop_front();
}
message_source_ = MessageSource::kBootMerger;
if (boot_merger_front != nullptr) {
@@ -2156,16 +2152,16 @@
} else {
message_source_ = MessageSource::kTimestampMessage;
VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
- << next_timestamp_.value();
- return &next_timestamp_.value();
+ << *timestamp_messages_front;
+ return timestamp_messages_front;
}
}
void SplitTimestampBootMerger::PopFront() {
switch (message_source_) {
case MessageSource::kTimestampMessage:
- CHECK(next_timestamp_.has_value());
- next_timestamp_.reset();
+ CHECK(!timestamp_messages_.empty());
+ timestamp_messages_.pop_front();
break;
case MessageSource::kBootMerger:
boot_merger_.PopFront();
@@ -2246,7 +2242,7 @@
.channel_index = msg->channel_index,
.queue_index = msg->queue_index,
.monotonic_event_time = msg->timestamp,
- .realtime_event_time = msg->data->realtime_sent_time,
+ .realtime_event_time = msg->header->realtime_sent_time,
.remote_queue_index = BootQueueIndex::Invalid(),
.monotonic_remote_time = BootTimestamp::min_time(),
.realtime_remote_time = realtime_clock::min_time,
@@ -2368,18 +2364,18 @@
.channel_index = msg->channel_index,
.queue_index = msg->queue_index,
.monotonic_event_time = msg->timestamp,
- .realtime_event_time = msg->data->realtime_sent_time,
+ .realtime_event_time = msg->header->realtime_sent_time,
.remote_queue_index =
BootQueueIndex{.boot = msg->monotonic_remote_boot,
- .index = msg->data->remote_queue_index.value()},
+ .index = msg->header->remote_queue_index.value()},
.monotonic_remote_time = {msg->monotonic_remote_boot,
- msg->data->monotonic_remote_time.value()},
- .realtime_remote_time = msg->data->realtime_remote_time.value(),
+ msg->header->monotonic_remote_time.value()},
+ .realtime_remote_time = msg->header->realtime_remote_time.value(),
.monotonic_remote_transmit_time =
{msg->monotonic_remote_boot,
- msg->data->monotonic_remote_transmit_time},
+ msg->header->monotonic_remote_transmit_time},
.monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
- msg->data->monotonic_timestamp_time},
+ msg->header->monotonic_timestamp_time},
.data = std::move(data.data)});
VLOG(1) << node_name() << " Inserted timestamp "
<< matched_messages_.back();
@@ -2446,23 +2442,23 @@
Message TimestampMapper::MatchingMessageFor(const Message &message) {
// Figure out what queue index we are looking for.
- CHECK_NOTNULL(message.data);
- CHECK(message.data->remote_queue_index.has_value());
+ CHECK_NOTNULL(message.header);
+ CHECK(message.header->remote_queue_index.has_value());
const BootQueueIndex remote_queue_index =
BootQueueIndex{.boot = message.monotonic_remote_boot,
- .index = *message.data->remote_queue_index};
+ .index = *message.header->remote_queue_index};
- CHECK(message.data->monotonic_remote_time.has_value());
- CHECK(message.data->realtime_remote_time.has_value());
+ CHECK(message.header->monotonic_remote_time.has_value());
+ CHECK(message.header->realtime_remote_time.has_value());
const BootTimestamp monotonic_remote_time{
.boot = message.monotonic_remote_boot,
- .time = message.data->monotonic_remote_time.value()};
+ .time = message.header->monotonic_remote_time.value()};
const realtime_clock::time_point realtime_remote_time =
- *message.data->realtime_remote_time;
+ *message.header->realtime_remote_time;
TimestampMapper *peer =
- nodes_data_[source_node_[message.data->channel_index]].peer;
+ nodes_data_[source_node_[message.header->channel_index]].peer;
// We only register the peers which we have data for. So, if we are being
// asked to pull a timestamp from a peer which doesn't exist, return an
@@ -2475,6 +2471,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
}
@@ -2490,6 +2487,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
}
@@ -2500,6 +2498,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
}
@@ -2520,7 +2519,7 @@
CHECK_EQ(result.timestamp, monotonic_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
- CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+ CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
// Now drop the data off the front. We have deduplicated timestamps, so we
// are done. And all the data is in order.
@@ -2544,6 +2543,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
}
@@ -2552,7 +2552,7 @@
CHECK_EQ(result.timestamp, monotonic_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please "
"investigate!";
- CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+ CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please "
"investigate!";
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f9a62b1..bb312eb 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -489,12 +489,6 @@
// pointer to track.
absl::Span<const uint8_t> span;
- // Used to be able to mutate the data in the span. This is only used for
- // mutating the message inside of LogReader for the Before Send Callback. It
- // is safe in this case since there is only one caller to Send, and the data
- // is not mutated after Send is called.
- uint8_t *mutable_data() { return const_cast<uint8_t *>(span.data()); }
-
char actual_data[];
private:
@@ -526,7 +520,13 @@
size_t monotonic_timestamp_boot = 0xffffff;
- std::shared_ptr<UnpackedMessageHeader> data;
+ // Pointer to the unpacked header.
+ std::shared_ptr<UnpackedMessageHeader> header;
+
+ // Pointer to a pointer to the span with the flatbuffer to publish in it. The
+ // second layer of indirection lets us modify all copies of a message when
+ // sending inside the log reader.
+ std::shared_ptr<SharedSpan> data;
bool operator<(const Message &m2) const;
bool operator<=(const Message &m2) const;
@@ -554,7 +554,10 @@
BootTimestamp monotonic_timestamp_time;
- std::shared_ptr<UnpackedMessageHeader> data;
+ // Pointer to a pointer to the data. If the outer pointer isn't populated, no
+ // data exists to send, we only have the timestamps. If the inner pointer is
+ // nullptr, the user has marked the message as something to not send.
+ std::shared_ptr<SharedSpan> data;
};
std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
@@ -807,14 +810,8 @@
// Boot merger for just timestamps. Any data read from here is to be ignored.
std::unique_ptr<BootMerger> timestamp_boot_merger_;
- // The callback requires us to convert each message to a TimestampedMessage.
- std::deque<TimestampedMessage> timestamp_messages_;
-
- // Storage for the next timestamp message to return. This is separate so we
- // can convert them back to a Message.
- //
- // TODO(austin): It would be nice to not have to convert...
- std::optional<Message> next_timestamp_;
+ // Deque of all the timestamp messages.
+ std::deque<Message> timestamp_messages_;
// Start times for each boot.
std::vector<monotonic_clock::time_point> monotonic_start_time_;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 4ceca20..87c6f5d 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -274,6 +274,7 @@
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
Message m2{.channel_index = 0,
.queue_index = BootQueueIndex{.boot = 0, .index = 0u},
@@ -281,6 +282,7 @@
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
EXPECT_LT(m1, m2);
@@ -960,24 +962,24 @@
EXPECT_EQ(output[0].timestamp.boot, 0u);
EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
- EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
+ EXPECT_FALSE(output[0].header->has_monotonic_timestamp_time);
EXPECT_EQ(output[1].timestamp.boot, 0u);
EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
- EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
- EXPECT_EQ(output[1].data->monotonic_timestamp_time,
+ EXPECT_TRUE(output[1].header->has_monotonic_timestamp_time);
+ EXPECT_EQ(output[1].header->monotonic_timestamp_time,
monotonic_clock::time_point(std::chrono::nanoseconds(971)));
EXPECT_EQ(output[2].timestamp.boot, 0u);
EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
- EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
- EXPECT_EQ(output[2].data->monotonic_timestamp_time,
+ EXPECT_TRUE(output[2].header->has_monotonic_timestamp_time);
+ EXPECT_EQ(output[2].header->monotonic_timestamp_time,
monotonic_clock::time_point(std::chrono::nanoseconds(972)));
EXPECT_EQ(output[3].timestamp.boot, 0u);
EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
- EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
- EXPECT_EQ(output[3].data->monotonic_timestamp_time,
+ EXPECT_TRUE(output[3].header->has_monotonic_timestamp_time);
+ EXPECT_EQ(output[3].header->monotonic_timestamp_time,
monotonic_clock::time_point(std::chrono::nanoseconds(973)));
}
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 99b2359..c810203 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -155,9 +155,15 @@
// passing in a separate config.
LogReader reader(logfile, &config_.message());
- reader.AddBeforeSendCallback("/test", [](aos::examples::Ping *ping) {
- ping->mutate_value(ping->value() + 1);
- });
+ const uint8_t *data_ptr = nullptr;
+ reader.AddBeforeSendCallback<aos::examples::Ping>(
+ "/test",
+ [&data_ptr](aos::examples::Ping *ping,
+ const TimestampedMessage ×tamped_message) -> SharedSpan {
+ ping->mutate_value(ping->value() + 10000);
+ data_ptr = timestamped_message.data.get()->get()->data();
+ return *timestamped_message.data;
+ });
// This sends out the fetched messages and advances time to the start of the
// log file.
@@ -170,15 +176,21 @@
// Confirm that the ping and pong counts both match, and the value also
// matches.
- int ping_count = 10;
- test_event_loop->MakeWatcher("/test",
- [&ping_count](const examples::Ping &ping) {
- ++ping_count;
- EXPECT_EQ(ping.value(), ping_count);
- });
+ int ping_count = 10010;
+ test_event_loop->MakeWatcher(
+ "/test",
+ [&test_event_loop, &data_ptr, &ping_count](const examples::Ping &ping) {
+ ++ping_count;
+ EXPECT_EQ(ping.value(), ping_count);
+ // Since simulated event loops (especially log replay) refcount the
+ // shared data, we can verify if the right data got published by
+ // verifying that the actual pointer to the flatbuffer matches. This
+ // only is guarenteed to hold during this callback.
+ EXPECT_EQ(test_event_loop->context().data, data_ptr);
+ });
reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
- EXPECT_EQ(ping_count, 2010);
+ EXPECT_EQ(ping_count, 12010);
}
// Tests calling StartLogging twice.
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 2cf8ffe..c252616 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -8,6 +8,7 @@
#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
+#include "aos/flatbuffers/aligned_allocator.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/timestamp_generated.h"
#include "aos/testing/tmpdir.h"
@@ -600,7 +601,7 @@
}
// MultinodeLoggerTest that tests the mutate callback works across multiple
-// nodes with remapping
+// nodes with remapping.
TEST_P(MultinodeLoggerTest, MultiNodeRemapMutateCallback) {
time_converter_.StartEqual();
std::vector<std::string> actual_filenames;
@@ -629,14 +630,18 @@
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
- int pong_count = 0;
+ int pong_count = 10;
// Adds a callback which mutates the value of the pong message before the
// message is sent which is the feature we are testing here
- reader.AddBeforeSendCallback("/test",
- [&pong_count](aos::examples::Pong *pong) {
- pong->mutate_value(pong->value() + 1);
- pong_count = pong->value();
- });
+ reader.AddBeforeSendCallback<aos::examples::Pong>(
+ "/test",
+ [&pong_count](
+ aos::examples::Pong *pong,
+ const TimestampedMessage ×tamped_message) -> SharedSpan {
+ pong->mutate_value(pong_count + 1);
+ ++pong_count;
+ return *timestamped_message.data;
+ });
// This sends out the fetched messages and advances time to the start of the
// log file.
@@ -698,14 +703,18 @@
LogReader reader(sorted_parts, &config_.message());
- int pong_count = 0;
+ int pong_count = 10;
// Adds a callback which mutates the value of the pong message before the
// message is sent which is the feature we are testing here
- reader.AddBeforeSendCallback("/test",
- [&pong_count](aos::examples::Pong *pong) {
- pong->mutate_value(pong->value() + 1);
- pong_count = pong->value();
- });
+ reader.AddBeforeSendCallback<aos::examples::Pong>(
+ "/test",
+ [&pong_count](
+ aos::examples::Pong *pong,
+ const TimestampedMessage ×tamped_message) -> SharedSpan {
+ pong->mutate_value(pong_count + 1);
+ ++pong_count;
+ return *timestamped_message.data;
+ });
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
@@ -772,11 +781,15 @@
int ping_count = 0;
// Adds a callback which mutates the value of the pong message before the
// message is sent which is the feature we are testing here
- reader.AddBeforeSendCallback("/test",
- [&ping_count](aos::examples::Ping *ping) {
- ++ping_count;
- ping->mutate_value(ping_count);
- });
+ reader.AddBeforeSendCallback<aos::examples::Ping>(
+ "/test",
+ [&ping_count](
+ aos::examples::Ping *ping,
+ const TimestampedMessage ×tamped_message) -> SharedSpan {
+ ++ping_count;
+ ping->mutate_value(ping_count);
+ return *timestamped_message.data;
+ });
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -821,6 +834,291 @@
reader.Deregister();
}
+// MultinodeLoggerTest that tests the mutate callback can fully replace the
+// message.
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackReplacement) {
+ time_converter_.StartEqual();
+ std::vector<std::string> actual_filenames;
+
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ pi1_logger.AppendAllFilenames(&actual_filenames);
+ pi2_logger.AppendAllFilenames(&actual_filenames);
+ }
+
+ const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+
+ LogReader reader(sorted_parts, &config_.message());
+
+ int pong_count = 10;
+ const uint8_t *data_ptr = nullptr;
+ // Adds a callback which replaces the pong message before the message is sent.
+ reader.AddBeforeSendCallback<aos::examples::Pong>(
+ "/test",
+ [&pong_count, &data_ptr](aos::examples::Pong *pong,
+ const TimestampedMessage &) -> SharedSpan {
+ fbs::AlignedVectorAllocator allocator;
+ aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
+ CHECK(pong_static->FromFlatbuffer(*pong));
+
+ pong_static->set_value(pong_count + 101);
+ ++pong_count;
+
+ SharedSpan result = allocator.Release();
+
+ data_ptr = result->data();
+
+ return result;
+ });
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ EXPECT_THAT(reader.LoggedNodes(),
+ ::testing::ElementsAre(
+ configuration::GetNode(reader.logged_configuration(), pi1),
+ configuration::GetNode(reader.logged_configuration(), pi2)));
+
+ std::unique_ptr<EventLoop> pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
+
+ int pi1_pong_count = 10;
+ pi1_event_loop->MakeWatcher(
+ "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
+ &data_ptr](const examples::Pong &pong) {
+ ++pi1_pong_count;
+ // Since simulated event loops (especially log replay) refcount the
+ // shared data, we can verify if the right data got published by
+ // verifying that the actual pointer to the flatbuffer matches. This
+ // only is guarenteed to hold during this callback.
+ EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
+ EXPECT_EQ(pong_count + 100, pong.value());
+ EXPECT_EQ(pi1_pong_count + 101, pong.value());
+ });
+
+ pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
+ &data_ptr](const examples::Pong &pong) {
+ // Same goes for the forwarded side, that should be the same contents too.
+ EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
+ EXPECT_EQ(pong_count + 100, pong.value());
+ });
+
+ reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+ reader.Deregister();
+
+ EXPECT_EQ(pong_count, 2011);
+}
+
+// MultinodeLoggerTest that tests the mutate callback can delete messages by
+// returning nullptr.
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackDelete) {
+ time_converter_.StartEqual();
+ std::vector<std::string> actual_filenames;
+
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ pi1_logger.AppendAllFilenames(&actual_filenames);
+ pi2_logger.AppendAllFilenames(&actual_filenames);
+ }
+
+ const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+
+ LogReader reader(sorted_parts, &config_.message());
+
+ int pong_count = 10;
+ const uint8_t *data_ptr = nullptr;
+ // Adds a callback which mutates the value of the pong message before the
+ // message is sent which is the feature we are testing here
+ reader.AddBeforeSendCallback<aos::examples::Pong>(
+ "/test",
+ [&pong_count, &data_ptr](aos::examples::Pong *pong,
+ const TimestampedMessage &) -> SharedSpan {
+ fbs::AlignedVectorAllocator allocator;
+ aos::fbs::Builder<aos::examples::PongStatic> pong_static(&allocator);
+ CHECK(pong_static->FromFlatbuffer(*pong));
+
+ pong_static->set_value(pong_count + 101);
+ ++pong_count;
+
+ if ((pong_count % 2) == 0) {
+ data_ptr = nullptr;
+ return nullptr;
+ }
+
+ SharedSpan result = allocator.Release();
+
+ data_ptr = result->data();
+
+ return result;
+ });
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ EXPECT_THAT(reader.LoggedNodes(),
+ ::testing::ElementsAre(
+ configuration::GetNode(reader.logged_configuration(), pi1),
+ configuration::GetNode(reader.logged_configuration(), pi2)));
+
+ std::unique_ptr<EventLoop> pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
+
+ int pi1_pong_count = 10;
+ pi1_event_loop->MakeWatcher(
+ "/test", [&pi1_event_loop, &pong_count, &pi1_pong_count,
+ &data_ptr](const examples::Pong &pong) {
+ pi1_pong_count += 2;
+ // Since simulated event loops (especially log replay) refcount the
+ // shared data, we can verify if the right data got published by
+ // verifying that the actual pointer to the flatbuffer matches. This
+ // only is guarenteed to hold during this callback.
+ EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
+ EXPECT_EQ(pong_count + 100, pong.value());
+ EXPECT_EQ(pi1_pong_count + 101, pong.value());
+ });
+
+ pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pong_count,
+ &data_ptr](const examples::Pong &pong) {
+ // Same goes for the forwarded side, that should be the same contents too.
+ EXPECT_EQ(pi2_event_loop->context().data, data_ptr);
+ EXPECT_EQ(pong_count + 100, pong.value());
+ });
+
+ reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+ reader.Deregister();
+
+ EXPECT_EQ(pong_count, 2011);
+ // Since we count up by 2 each time we get a message, and the last pong gets
+ // dropped since it is an odd number we expect the number on pi1 to be 1 less.
+ EXPECT_EQ(pi1_pong_count, 2010);
+}
+
+// MultinodeLoggerTest that tests that non-forwarded channels are able to be
+// mutated.
+TEST_P(MultinodeLoggerTest, MultiNodeMutateCallbackNotForwarded) {
+ time_converter_.StartEqual();
+ std::vector<std::string> actual_filenames;
+
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ pi1_logger.AppendAllFilenames(&actual_filenames);
+ pi2_logger.AppendAllFilenames(&actual_filenames);
+ }
+
+ const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+
+ LogReader reader(sorted_parts, &config_.message());
+
+ int ping_count = 10;
+ const uint8_t *data_ptr = nullptr;
+ // Adds a callback which mutates the value of the pong message before the
+ // message is sent which is the feature we are testing here
+ reader.AddBeforeSendCallback<aos::examples::Ping>(
+ "/pi1/aos",
+ [&ping_count, &data_ptr](aos::examples::Ping *ping,
+ const TimestampedMessage &) -> SharedSpan {
+ fbs::AlignedVectorAllocator allocator;
+ aos::fbs::Builder<aos::examples::PingStatic> ping_static(&allocator);
+ CHECK(ping_static->FromFlatbuffer(*ping));
+
+ ping_static->set_value(ping_count + 101);
+ ++ping_count;
+
+ SharedSpan result = allocator.Release();
+
+ data_ptr = result->data();
+
+ return result;
+ });
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ EXPECT_THAT(reader.LoggedNodes(),
+ ::testing::ElementsAre(
+ configuration::GetNode(reader.logged_configuration(), pi1),
+ configuration::GetNode(reader.logged_configuration(), pi2)));
+
+ std::unique_ptr<EventLoop> pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
+
+ int pi1_ping_count = 10;
+ pi1_event_loop->MakeWatcher(
+ "/aos", [&pi1_event_loop, &ping_count, &pi1_ping_count,
+ &data_ptr](const examples::Ping &ping) {
+ ++pi1_ping_count;
+ // Since simulated event loops (especially log replay) refcount the
+ // shared data, we can verify if the right data got published by
+ // verifying that the actual pointer to the flatbuffer matches. This
+ // only is guarenteed to hold during this callback.
+ EXPECT_EQ(pi1_event_loop->context().data, data_ptr);
+ EXPECT_EQ(ping_count + 100, ping.value());
+ EXPECT_EQ(pi1_ping_count + 101, ping.value());
+ });
+
+ reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+ reader.Deregister();
+
+ EXPECT_EQ(ping_count, 2011);
+}
+
// Tests that we do not allow adding callbacks after Register is called
TEST_P(MultinodeLoggerDeathTest, AddCallbackAfterRegister) {
time_converter_.StartEqual();
@@ -848,9 +1146,13 @@
reader.Register(&log_reader_factory);
EXPECT_DEATH(
{
- reader.AddBeforeSendCallback("/test", [](aos::examples::Pong *) {
- LOG(FATAL) << "This should not be called";
- });
+ reader.AddBeforeSendCallback<aos::examples::Pong>(
+ "/test",
+ [](aos::examples::Pong *,
+ const TimestampedMessage ×tamped_message) -> SharedSpan {
+ LOG(FATAL) << "This should not be called";
+ return *timestamped_message.data;
+ });
},
"Cannot add callbacks after calling Register");
reader.Deregister();
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 607303c..e5de120 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -9,6 +9,7 @@
#include "glog/logging.h"
#include "aos/containers/resizeable_buffer.h"
+#include "aos/ipc_lib/data_alignment.h"
#include "aos/macros.h"
#include "aos/util/file.h"
diff --git a/aos/flatbuffers/BUILD b/aos/flatbuffers/BUILD
index 32f1d39..1d6c602 100644
--- a/aos/flatbuffers/BUILD
+++ b/aos/flatbuffers/BUILD
@@ -33,6 +33,7 @@
name = "base_test",
srcs = ["base_test.cc"],
deps = [
+ ":aligned_allocator",
":base",
"//aos/testing:googletest",
],
@@ -165,3 +166,17 @@
srcs = ["test_static.h"],
visibility = [":__subpackages__"],
)
+
+cc_library(
+ name = "aligned_allocator",
+ srcs = ["aligned_allocator.cc"],
+ hdrs = ["aligned_allocator.h"],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":base",
+ "//aos/containers:resizeable_buffer",
+ "//aos/events:event_loop",
+ "//aos/ipc_lib:data_alignment",
+ "@com_github_google_glog//:glog",
+ ],
+)
diff --git a/aos/flatbuffers/aligned_allocator.cc b/aos/flatbuffers/aligned_allocator.cc
new file mode 100644
index 0000000..d2c47e5
--- /dev/null
+++ b/aos/flatbuffers/aligned_allocator.cc
@@ -0,0 +1,96 @@
+#include "aos/flatbuffers/aligned_allocator.h"
+
+namespace aos::fbs {
+
+AlignedVectorAllocator::~AlignedVectorAllocator() {
+ CHECK(buffer_.empty())
+ << ": Must deallocate before destroying the AlignedVectorAllocator.";
+}
+
+std::optional<std::span<uint8_t>> AlignedVectorAllocator::Allocate(
+ size_t size, size_t /*alignment*/, fbs::SetZero set_zero) {
+ CHECK(buffer_.empty()) << ": Must deallocate before calling Allocate().";
+ buffer_.resize(((size + kAlignment - 1) / kAlignment) * kAlignment);
+ allocated_size_ = size;
+ if (set_zero == fbs::SetZero::kYes) {
+ memset(buffer_.data(), 0, buffer_.size());
+ }
+
+ return std::span<uint8_t>{data(), allocated_size_};
+}
+
+std::optional<std::span<uint8_t>> AlignedVectorAllocator::InsertBytes(
+ void *insertion_point, size_t bytes, size_t /*alignment*/,
+ fbs::SetZero set_zero) {
+ DCHECK_GE(reinterpret_cast<const uint8_t *>(insertion_point), data());
+ DCHECK_LE(reinterpret_cast<const uint8_t *>(insertion_point),
+ data() + allocated_size_);
+ const size_t buffer_offset =
+ reinterpret_cast<const uint8_t *>(insertion_point) - data();
+ // TODO(austin): This has an extra memcpy in it that isn't strictly needed
+ // when we resize. Remove it if performance is a concern.
+ const size_t absolute_buffer_offset =
+ reinterpret_cast<const uint8_t *>(insertion_point) - buffer_.data();
+ const size_t previous_size = buffer_.size();
+
+ buffer_.resize(((allocated_size_ + bytes + kAlignment - 1) / kAlignment) *
+ kAlignment);
+
+ // Now, we've got space both before and after the block of data. Move the
+ // data after to the end, and the data before to the start.
+
+ const size_t new_space_after = buffer_.size() - previous_size;
+
+ // Move the rest of the data to be end aligned. If the buffer wasn't resized,
+ // this will be a nop.
+ memmove(buffer_.data() + absolute_buffer_offset + new_space_after,
+ buffer_.data() + absolute_buffer_offset,
+ previous_size - absolute_buffer_offset);
+
+ // Now, move the data at the front to be aligned too.
+ memmove(buffer_.data() + buffer_.size() - (allocated_size_ + bytes),
+ buffer_.data() + previous_size - allocated_size_,
+ allocated_size_ - (previous_size - absolute_buffer_offset));
+
+ if (set_zero == fbs::SetZero::kYes) {
+ memset(data() - bytes + buffer_offset, 0, bytes);
+ }
+ allocated_size_ += bytes;
+
+ return std::span<uint8_t>{data(), allocated_size_};
+}
+
+std::span<uint8_t> AlignedVectorAllocator::RemoveBytes(
+ std::span<uint8_t> remove_bytes) {
+ const ssize_t removal_index = remove_bytes.data() - buffer_.data();
+ const size_t old_start_index = buffer_.size() - allocated_size_;
+ CHECK_LE(static_cast<ssize_t>(old_start_index), removal_index);
+ CHECK_LE(removal_index, static_cast<ssize_t>(buffer_.size()));
+ CHECK_LE(removal_index + remove_bytes.size(), buffer_.size());
+ uint8_t *old_buffer_start = buffer_.data() + old_start_index;
+ memmove(old_buffer_start + remove_bytes.size(), old_buffer_start,
+ removal_index - old_start_index);
+ allocated_size_ -= remove_bytes.size();
+
+ return std::span<uint8_t>{data(), allocated_size_};
+}
+
+void AlignedVectorAllocator::Deallocate(std::span<uint8_t>) {
+ if (!released_) {
+ CHECK(!buffer_.empty())
+ << ": Called Deallocate() without a prior allocation.";
+ }
+ released_ = false;
+ buffer_.resize(0);
+}
+
+aos::SharedSpan AlignedVectorAllocator::Release() {
+ absl::Span<uint8_t> span{data(), allocated_size_};
+ std::shared_ptr<SharedSpanHolder> result = std::make_shared<SharedSpanHolder>(
+ std::move(buffer_), absl::Span<const uint8_t>());
+ result->span = span;
+ released_ = true;
+ return aos::SharedSpan(result, &(result->span));
+}
+
+} // namespace aos::fbs
diff --git a/aos/flatbuffers/aligned_allocator.h b/aos/flatbuffers/aligned_allocator.h
new file mode 100644
index 0000000..a974818
--- /dev/null
+++ b/aos/flatbuffers/aligned_allocator.h
@@ -0,0 +1,57 @@
+#ifndef AOS_FLATBUFFERS_ALIGNED_ALLOCATOR_H_
+#define AOS_FLATBUFFERS_ALIGNED_ALLOCATOR_H_
+
+#include <memory>
+#include <optional>
+#include <span>
+
+#include "glog/logging.h"
+
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/event_loop.h"
+#include "aos/flatbuffers/base.h"
+#include "aos/ipc_lib/data_alignment.h"
+
+namespace aos::fbs {
+
+// Allocator that uses an AllocatorResizeableBuffer to allow arbitrary-sized
+// allocations. Aligns the end of the buffer to an alignment of
+// kChannelDataAlignment.
+class AlignedVectorAllocator : public fbs::Allocator {
+ public:
+ static constexpr size_t kAlignment = aos::kChannelDataAlignment;
+ AlignedVectorAllocator() {}
+ ~AlignedVectorAllocator();
+
+ std::optional<std::span<uint8_t>> Allocate(size_t size, size_t alignment,
+ fbs::SetZero set_zero) override;
+
+ std::optional<std::span<uint8_t>> InsertBytes(void *insertion_point,
+ size_t bytes, size_t alignment,
+ fbs::SetZero set_zero) override;
+
+ std::span<uint8_t> RemoveBytes(std::span<uint8_t> remove_bytes) override;
+
+ void Deallocate(std::span<uint8_t>) override;
+
+ aos::SharedSpan Release();
+
+ private:
+ struct SharedSpanHolder {
+ aos::AllocatorResizeableBuffer<
+ aos::AlignedReallocator<kChannelDataAlignment>>
+ buffer;
+ absl::Span<const uint8_t> span;
+ };
+ uint8_t *data() { return buffer_.data() + buffer_.size() - allocated_size_; }
+
+ aos::AllocatorResizeableBuffer<aos::AlignedReallocator<kChannelDataAlignment>>
+ buffer_;
+
+ size_t allocated_size_ = 0u;
+ bool released_ = false;
+};
+
+} // namespace aos::fbs
+
+#endif // AOS_FLATBUFFERS_ALIGNED_ALLOCATOR_H_
diff --git a/aos/flatbuffers/base.h b/aos/flatbuffers/base.h
index ff81c9a..a92cc91 100644
--- a/aos/flatbuffers/base.h
+++ b/aos/flatbuffers/base.h
@@ -1,5 +1,6 @@
#ifndef AOS_FLATBUFFERS_BASE_H_
#define AOS_FLATBUFFERS_BASE_H_
+
#include <stdint.h>
#include <sys/types.h>
@@ -15,6 +16,7 @@
#include "glog/logging.h"
namespace aos::fbs {
+
using ::flatbuffers::soffset_t;
using ::flatbuffers::uoffset_t;
using ::flatbuffers::voffset_t;
diff --git a/aos/flatbuffers/base_test.cc b/aos/flatbuffers/base_test.cc
index f0eaf04..87d89fa 100644
--- a/aos/flatbuffers/base_test.cc
+++ b/aos/flatbuffers/base_test.cc
@@ -6,6 +6,8 @@
#include "gtest/gtest.h"
+#include "aos/flatbuffers/aligned_allocator.h"
+
namespace aos::fbs::testing {
// Tests that PaddedSize() behaves as expected.
TEST(BaseTest, PaddedSize) {
@@ -16,7 +18,7 @@
EXPECT_EQ(8, PaddedSize(7, 4));
}
-inline constexpr size_t kDefaultSize = 16;
+inline constexpr size_t kDefaultSize = AlignedVectorAllocator::kAlignment * 2;
template <typename T>
class AllocatorTest : public ::testing::Test {
protected:
@@ -32,7 +34,8 @@
allocator_(std::make_unique<SpanAllocator>(
std::span<uint8_t>{buffer_.data(), buffer_.size()})) {}
-using AllocatorTypes = ::testing::Types<SpanAllocator, VectorAllocator>;
+using AllocatorTypes =
+ ::testing::Types<SpanAllocator, VectorAllocator, AlignedVectorAllocator>;
TYPED_TEST_SUITE(AllocatorTest, AllocatorTypes);
// Tests that we can create and not use a VectorAllocator.
@@ -79,6 +82,11 @@
// Tests that we can remove bytes from an arbitrary spot in the buffer.
TYPED_TEST(AllocatorTest, RemoveBytes) {
+ // Deletion doesn't require resizing, so we don't need to worry about it being
+ // larger than the alignment to test everything. The test requires the size
+ // to be < 255 to store the sentinal values.
+ const size_t kDefaultSize = 128;
+
const size_t half_size = kDefaultSize / 2;
std::span<uint8_t> span =
this->allocator_->Allocate(kDefaultSize, 4, SetZero::kYes).value();