Optimize simulated event loop log reading.
Currenly, we make 2 copies of a message before it is sent in
simulation: First, from the source (file or lzma decoder) to a
flatbuffer vector, then to the simulated message. By aligning the
data part of a MessageHeader in the first copy from the source,
we don't need to copy it again. This is all tracked with
shared_ptr's so the lifetime management is easy.
Change-Id: I82c86ef3f9662d4c615dc57862fa89b1b9981ed4
Signed-off-by: Tyler Chatow <tchatow@gmail.com>
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 8728111..68cc923 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -31,6 +31,15 @@
RawSender::~RawSender() { event_loop_->DeleteSender(this); }
+bool RawSender::DoSend(const SharedSpan data,
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ const UUID &source_boot_uuid) {
+ return DoSend(data->data(), data->size(), monotonic_remote_time,
+ realtime_remote_time, remote_queue_index, source_boot_uuid);
+}
+
RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
: event_loop_(event_loop),
channel_(channel),
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 1000703..0e7b66a 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -136,6 +136,8 @@
// and as a building block to implement typed senders.
class RawSender {
public:
+ using SharedSpan = std::shared_ptr<const absl::Span<const uint8_t>>;
+
RawSender(EventLoop *event_loop, const Channel *channel);
RawSender(const RawSender &) = delete;
RawSender &operator=(const RawSender &) = delete;
@@ -164,6 +166,15 @@
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index, const UUID &source_boot_uuid);
+ // Sends a single block of data by refcounting it to avoid copies. The data
+ // must not change after being passed into Send. The remote arguments have the
+ // same meaning as in Send above.
+ bool Send(const SharedSpan data);
+ bool Send(const SharedSpan data,
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index, const UUID &remote_boot_uuid);
+
const Channel *channel() const { return channel_; }
// Returns the time_points that the last message was sent at.
@@ -209,6 +220,11 @@
realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
const UUID &source_boot_uuid) = 0;
+ virtual bool DoSend(const SharedSpan data,
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ const UUID &source_boot_uuid);
EventLoop *const event_loop_;
const Channel *const channel_;
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index 698191e..7e560fe 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -36,8 +36,9 @@
template <typename Watch>
void EventLoop::MakeWatcher(const std::string_view channel_name, Watch &&w) {
- using MessageType = typename event_loop_internal::watch_message_type_trait<
- decltype(&Watch::operator())>::message_type;
+ using MessageType =
+ typename event_loop_internal::watch_message_type_trait<decltype(
+ &Watch::operator())>::message_type;
const Channel *channel = configuration::GetChannel(
configuration_, channel_name, MessageType::GetFullyQualifiedName(),
name(), node());
@@ -174,6 +175,31 @@
return false;
}
+inline bool RawSender::Send(const SharedSpan data) {
+ return Send(std::move(data), monotonic_clock::min_time,
+ realtime_clock::min_time, 0xffffffffu, event_loop_->boot_uuid());
+}
+
+inline bool RawSender::Send(
+ const SharedSpan data,
+ aos::monotonic_clock::time_point monotonic_remote_time,
+ aos::realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index, const UUID &uuid) {
+ const size_t size = data->size();
+ if (DoSend(std::move(data), monotonic_remote_time, realtime_remote_time,
+ remote_queue_index, uuid)) {
+ timing_.size.Add(size);
+ timing_.sender->mutate_count(timing_.sender->count() + 1);
+ ftrace_.FormatMessage(
+ "%.*s: sent shared: event=%" PRId64 " queue=%" PRIu32,
+ static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(),
+ static_cast<int64_t>(monotonic_sent_time().time_since_epoch().count()),
+ sent_queue_index());
+ return true;
+ }
+ return false;
+}
+
inline monotonic_clock::time_point TimerHandler::Call(
std::function<monotonic_clock::time_point()> get_time,
monotonic_clock::time_point event_time) {
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index d6213ae..a80b0e5 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -706,7 +706,7 @@
state->monotonic_start_time(
timestamped_message.monotonic_event_time.boot) ||
event_loop_factory_ != nullptr) {
- if (timestamped_message.data.span().size() != 0u) {
+ if (timestamped_message.data != nullptr) {
if (timestamped_message.monotonic_remote_time !=
BootTimestamp::min_time()) {
// Confirm that the message was sent on the sending node before the
@@ -851,7 +851,7 @@
// through events like this, they can set
// --skip_missing_forwarding_entries or ignore_missing_data_.
CHECK_LT(next.channel_index, last_message.size());
- if (next.data.span().size() == 0u) {
+ if (next.data == nullptr) {
last_message[next.channel_index] = true;
} else {
if (last_message[next.channel_index]) {
@@ -874,9 +874,7 @@
.count()
<< " start "
<< monotonic_start_time().time_since_epoch().count() << " "
- << FlatbufferToJson(
- timestamped_message.data,
- {.multi_line = false, .max_vector_size = 100});
+ << *timestamped_message.data;
}
const BootTimestamp next_time = state->OldestMessageTime();
@@ -1421,8 +1419,8 @@
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
const bool sent = sender->Send(
- timestamped_message.data.message().data()->Data(),
- timestamped_message.data.message().data()->size(),
+ RawSender::SharedSpan(timestamped_message.data,
+ ×tamped_message.data->span),
timestamped_message.monotonic_remote_time.time,
timestamped_message.realtime_remote_time, remote_queue_index,
(channel_source_state_[timestamped_message.channel_index] != nullptr
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 73578e6..dd82418 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -43,9 +43,20 @@
"last header as the actual header.");
namespace aos::logger {
+namespace {
namespace chrono = std::chrono;
+template <typename T>
+void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
+ if (t.has_value()) {
+ *os << *t;
+ } else {
+ *os << "null";
+ }
+}
+} // namespace
+
DetachedBufferWriter::DetachedBufferWriter(
std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
: filename_(filename), encoder_(std::move(encoder)) {
@@ -499,23 +510,81 @@
<< FlatbufferToJson(log_file_header()->node());
}
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
-MessageReader::ReadMessage() {
+std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
if (msg_data == absl::Span<const uint8_t>()) {
- return std::nullopt;
+ return nullptr;
}
- SizePrefixedFlatbufferVector<MessageHeader> result(msg_data);
+ SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
+ CHECK(msg.Verify()) << ": Corrupted message from " << filename();
- CHECK(result.Verify()) << ": Corrupted message from " << filename();
+ auto result = UnpackedMessageHeader::MakeMessage(msg.message());
- const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
- chrono::nanoseconds(result.message().monotonic_sent_time()));
+ const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
newest_timestamp_ = std::max(newest_timestamp_, timestamp);
- VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
- return std::move(result);
+ VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
+ return result;
+}
+
+std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
+ const MessageHeader &message) {
+ const size_t data_size = message.has_data() ? message.data()->size() : 0;
+
+ UnpackedMessageHeader *const unpacked_message =
+ reinterpret_cast<UnpackedMessageHeader *>(
+ malloc(sizeof(UnpackedMessageHeader) + data_size +
+ kChannelDataAlignment - 1));
+
+ CHECK(message.has_channel_index());
+ CHECK(message.has_monotonic_sent_time());
+
+ absl::Span<uint8_t> span;
+ if (data_size > 0) {
+ span =
+ absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
+ &unpacked_message->actual_data[0], data_size)),
+ data_size);
+ }
+
+ std::optional<std::chrono::nanoseconds> monotonic_remote_time;
+ if (message.has_monotonic_remote_time()) {
+ monotonic_remote_time =
+ std::chrono::nanoseconds(message.monotonic_remote_time());
+ }
+ std::optional<realtime_clock::time_point> realtime_remote_time;
+ if (message.has_realtime_remote_time()) {
+ realtime_remote_time = realtime_clock::time_point(
+ chrono::nanoseconds(message.realtime_remote_time()));
+ }
+
+ std::optional<uint32_t> remote_queue_index;
+ if (message.has_remote_queue_index()) {
+ remote_queue_index = message.remote_queue_index();
+ }
+
+ new (unpacked_message) UnpackedMessageHeader{
+ .channel_index = message.channel_index(),
+ .monotonic_sent_time = monotonic_clock::time_point(
+ chrono::nanoseconds(message.monotonic_sent_time())),
+ .realtime_sent_time = realtime_clock::time_point(
+ chrono::nanoseconds(message.realtime_sent_time())),
+ .queue_index = message.queue_index(),
+ .monotonic_remote_time = monotonic_remote_time,
+ .realtime_remote_time = realtime_remote_time,
+ .remote_queue_index = remote_queue_index,
+ .monotonic_timestamp_time = monotonic_clock::time_point(
+ std::chrono::nanoseconds(message.monotonic_timestamp_time())),
+ .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
+ .span = span};
+
+ if (data_size > 0) {
+ memcpy(span.data(), message.data()->data(), data_size);
+ }
+
+ return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
+ &DestroyAndFree);
}
PartsMessageReader::PartsMessageReader(LogParts log_parts)
@@ -569,19 +638,19 @@
}
}
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
-PartsMessageReader::ReadMessage() {
+std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
while (!done_) {
- std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
+ std::shared_ptr<UnpackedMessageHeader> message =
message_reader_.ReadMessage();
if (message) {
newest_timestamp_ = message_reader_.newest_timestamp();
- const monotonic_clock::time_point monotonic_sent_time(
- chrono::nanoseconds(message->message().monotonic_sent_time()));
- // TODO(austin): Does this work with startup? Might need to use the start
- // time.
- // TODO(austin): Does this work with startup when we don't know the remote
- // start time too? Look at one of those logs to compare.
+ const monotonic_clock::time_point monotonic_sent_time =
+ message->monotonic_sent_time;
+
+ // TODO(austin): Does this work with startup? Might need to use the
+ // start time.
+ // TODO(austin): Does this work with startup when we don't know the
+ // remote start time too? Look at one of those logs to compare.
if (monotonic_sent_time >
parts_.monotonic_start_time + max_out_of_order_duration()) {
after_start_ = true;
@@ -599,7 +668,7 @@
NextLog();
}
newest_timestamp_ = monotonic_clock::max_time;
- return std::nullopt;
+ return nullptr;
}
void PartsMessageReader::NextLog() {
@@ -645,13 +714,29 @@
channel_index == m2.channel_index && queue_index == m2.queue_index;
}
+std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
+ os << "{.channel_index=" << m.channel_index
+ << ", .monotonic_sent_time=" << m.monotonic_sent_time
+ << ", .realtime_sent_time=" << m.realtime_sent_time
+ << ", .queue_index=" << m.queue_index;
+ if (m.monotonic_remote_time) {
+ os << ", .monotonic_remote_time=" << m.monotonic_remote_time->count();
+ }
+ os << ", .realtime_remote_time=";
+ PrintOptionalOrNull(&os, m.realtime_remote_time);
+ os << ", .remote_queue_index=";
+ PrintOptionalOrNull(&os, m.remote_queue_index);
+ if (m.has_monotonic_timestamp_time) {
+ os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+ }
+ return os;
+}
+
std::ostream &operator<<(std::ostream &os, const Message &m) {
os << "{.channel_index=" << m.channel_index
<< ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
- if (m.data.Verify()) {
- os << ", .data="
- << aos::FlatbufferToJson(m.data,
- {.multi_line = false, .max_vector_size = 1});
+ if (m.data != nullptr) {
+ os << ", .data=" << m;
}
os << "}";
return os;
@@ -674,10 +759,8 @@
if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
}
- if (m.data.Verify()) {
- os << ", .data="
- << aos::FlatbufferToJson(m.data,
- {.multi_line = false, .max_vector_size = 1});
+ if (m.data != nullptr) {
+ os << ", .data=" << *m.data;
}
os << "}";
return os;
@@ -700,7 +783,7 @@
break;
}
- std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+ std::shared_ptr<UnpackedMessageHeader> m =
parts_message_reader_.ReadMessage();
// No data left, sorted forever, work through what is left.
if (!m) {
@@ -709,36 +792,32 @@
}
size_t monotonic_timestamp_boot = 0;
- if (m.value().message().has_monotonic_timestamp_time()) {
+ if (m->has_monotonic_timestamp_time) {
monotonic_timestamp_boot = parts().logger_boot_count;
}
size_t monotonic_remote_boot = 0xffffff;
- if (m.value().message().has_monotonic_remote_time()) {
+ if (m->monotonic_remote_time.has_value()) {
const Node *node = parts().config->nodes()->Get(
- source_node_index_[m->message().channel_index()]);
+ source_node_index_[m->channel_index]);
std::optional<size_t> boot = parts_message_reader_.boot_count(
- source_node_index_[m->message().channel_index()]);
+ source_node_index_[m->channel_index]);
CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
<< ", with index "
- << source_node_index_[m->message().channel_index()];
+ << source_node_index_[m->channel_index];
monotonic_remote_boot = *boot;
}
- messages_.insert(Message{
- .channel_index = m.value().message().channel_index(),
- .queue_index =
- BootQueueIndex{.boot = parts().boot_count,
- .index = m.value().message().queue_index()},
- .timestamp =
- BootTimestamp{
- .boot = parts().boot_count,
- .time = monotonic_clock::time_point(std::chrono::nanoseconds(
- m.value().message().monotonic_sent_time()))},
- .monotonic_remote_boot = monotonic_remote_boot,
- .monotonic_timestamp_boot = monotonic_timestamp_boot,
- .data = std::move(m.value())});
+ messages_.insert(
+ Message{.channel_index = m->channel_index,
+ .queue_index = BootQueueIndex{.boot = parts().boot_count,
+ .index = m->queue_index},
+ .timestamp = BootTimestamp{.boot = parts().boot_count,
+ .time = m->monotonic_sent_time},
+ .monotonic_remote_boot = monotonic_remote_boot,
+ .monotonic_timestamp_boot = monotonic_timestamp_boot,
+ .data = std::move(m)});
// Now, update sorted_until_ to match the new message.
if (parts_message_reader_.newest_timestamp() >
@@ -878,17 +957,17 @@
oldest = m;
current_ = &parts_sorter;
} else if (*m == *oldest) {
- // Found a duplicate. If there is a choice, we want the one which has the
- // timestamp time.
- if (!m->data.message().has_monotonic_timestamp_time()) {
+ // Found a duplicate. If there is a choice, we want the one which has
+ // the timestamp time.
+ if (!m->data->has_monotonic_timestamp_time) {
parts_sorter.PopFront();
- } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
+ } else if (!oldest->data->has_monotonic_timestamp_time) {
current_->PopFront();
current_ = &parts_sorter;
oldest = m;
} else {
- CHECK_EQ(m->data.message().monotonic_timestamp_time(),
- oldest->data.message().monotonic_timestamp_time());
+ CHECK_EQ(m->data->monotonic_timestamp_time,
+ oldest->data->monotonic_timestamp_time);
parts_sorter.PopFront();
}
}
@@ -1017,7 +1096,7 @@
CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
- // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
+ // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
// we could needlessly save data.
if (node_data->any_delivered) {
VLOG(1) << "Registering on node " << node() << " for peer node "
@@ -1035,8 +1114,7 @@
.channel_index = m->channel_index,
.queue_index = m->queue_index,
.monotonic_event_time = m->timestamp,
- .realtime_event_time = aos::realtime_clock::time_point(
- std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+ .realtime_event_time = m->data->realtime_sent_time,
.remote_queue_index = BootQueueIndex::Invalid(),
.monotonic_remote_time = BootTimestamp::min_time(),
.realtime_remote_time = realtime_clock::min_time,
@@ -1086,16 +1164,17 @@
return true;
}
- // We need to only add messages to the list so they get processed for messages
- // which are delivered. Reuse the flow below which uses messages_ by just
- // adding the new message to messages_ and continuing.
+ // We need to only add messages to the list so they get processed for
+ // messages which are delivered. Reuse the flow below which uses messages_
+ // by just adding the new message to messages_ and continuing.
if (messages_.empty()) {
if (!Queue()) {
// Found nothing to add, we are out of data!
return false;
}
- // Now that it has been added (and cannibalized), forget about it upstream.
+ // Now that it has been added (and cannibalized), forget about it
+ // upstream.
boot_merger_.PopFront();
}
@@ -1110,7 +1189,8 @@
timestamp_callback_(&matched_messages_.back());
return true;
} else {
- // Got a timestamp, find the matching remote data, match it, and return it.
+ // Got a timestamp, find the matching remote data, match it, and return
+ // it.
Message data = MatchingMessageFor(*m);
// Return the data from the remote. The local message only has timestamp
@@ -1119,21 +1199,16 @@
.channel_index = m->channel_index,
.queue_index = m->queue_index,
.monotonic_event_time = m->timestamp,
- .realtime_event_time = aos::realtime_clock::time_point(
- std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+ .realtime_event_time = m->data->realtime_sent_time,
.remote_queue_index =
BootQueueIndex{.boot = m->monotonic_remote_boot,
- .index = m->data.message().remote_queue_index()},
- .monotonic_remote_time =
- {m->monotonic_remote_boot,
- monotonic_clock::time_point(std::chrono::nanoseconds(
- m->data.message().monotonic_remote_time()))},
- .realtime_remote_time = realtime_clock::time_point(
- std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
- .monotonic_timestamp_time =
- {m->monotonic_timestamp_boot,
- monotonic_clock::time_point(std::chrono::nanoseconds(
- m->data.message().monotonic_timestamp_time()))},
+ .index = m->data->remote_queue_index.value()},
+ .monotonic_remote_time = {m->monotonic_remote_boot,
+ monotonic_clock::time_point(
+ m->data->monotonic_remote_time.value())},
+ .realtime_remote_time = m->data->realtime_remote_time.value(),
+ .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
+ m->data->monotonic_timestamp_time},
.data = std::move(data.data)});
CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
last_message_time_ = matched_messages_.back().monotonic_event_time;
@@ -1153,8 +1228,9 @@
}
void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
- // Note: queueing for time doesn't really work well across boots. So we just
- // assume that if you are using this, you only care about the current boot.
+ // Note: queueing for time doesn't really work well across boots. So we
+ // just assume that if you are using this, you only care about the current
+ // boot.
//
// TODO(austin): Is that the right concept?
//
@@ -1191,36 +1267,37 @@
Message TimestampMapper::MatchingMessageFor(const Message &message) {
// Figure out what queue index we are looking for.
- CHECK(message.data.message().has_remote_queue_index());
+ CHECK_NOTNULL(message.data);
+ CHECK(message.data->remote_queue_index.has_value());
const BootQueueIndex remote_queue_index =
BootQueueIndex{.boot = message.monotonic_remote_boot,
- .index = message.data.message().remote_queue_index()};
+ .index = *message.data->remote_queue_index};
- CHECK(message.data.message().has_monotonic_remote_time());
- CHECK(message.data.message().has_realtime_remote_time());
+ CHECK(message.data->monotonic_remote_time.has_value());
+ CHECK(message.data->realtime_remote_time.has_value());
const BootTimestamp monotonic_remote_time{
.boot = message.monotonic_remote_boot,
- .time = monotonic_clock::time_point(std::chrono::nanoseconds(
- message.data.message().monotonic_remote_time()))};
- const realtime_clock::time_point realtime_remote_time(
- std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
+ .time = monotonic_clock::time_point(
+ message.data->monotonic_remote_time.value())};
+ const realtime_clock::time_point realtime_remote_time =
+ *message.data->realtime_remote_time;
- TimestampMapper *peer = nodes_data_[source_node_[message.channel_index]].peer;
+ TimestampMapper *peer =
+ nodes_data_[source_node_[message.data->channel_index]].peer;
// We only register the peers which we have data for. So, if we are being
- // asked to pull a timestamp from a peer which doesn't exist, return an empty
- // message.
+ // asked to pull a timestamp from a peer which doesn't exist, return an
+ // empty message.
if (peer == nullptr) {
// TODO(austin): Make sure the tests hit all these paths with a boot count
// of 1...
- return Message{
- .channel_index = message.channel_index,
- .queue_index = remote_queue_index,
- .timestamp = monotonic_remote_time,
- .monotonic_remote_boot = 0xffffff,
- .monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ return Message{.channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
+ .data = nullptr};
}
// The queue which will have the matching data, if available.
@@ -1230,13 +1307,12 @@
peer->QueueUnmatchedUntil(monotonic_remote_time);
if (data_queue->empty()) {
- return Message{
- .channel_index = message.channel_index,
- .queue_index = remote_queue_index,
- .timestamp = monotonic_remote_time,
- .monotonic_remote_boot = 0xffffff,
- .monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ return Message{.channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
+ .data = nullptr};
}
if (remote_queue_index < data_queue->front().queue_index ||
@@ -1247,7 +1323,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ .data = nullptr};
}
// The algorithm below is constant time with some assumptions. We need there
@@ -1267,8 +1343,7 @@
CHECK_EQ(result.timestamp, monotonic_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
- CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
- result.data.message().realtime_sent_time())),
+ CHECK_EQ(result.data->realtime_sent_time,
realtime_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
// Now drop the data off the front. We have deduplicated timestamps, so we
@@ -1288,23 +1363,22 @@
m.timestamp.boot == remote_boot;
});
if (it == data_queue->end()) {
- return Message{
- .channel_index = message.channel_index,
- .queue_index = remote_queue_index,
- .timestamp = monotonic_remote_time,
- .monotonic_remote_boot = 0xffffff,
- .monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ return Message{.channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
+ .data = nullptr};
}
Message result = std::move(*it);
CHECK_EQ(result.timestamp, monotonic_remote_time)
- << ": Queue index matches, but timestamp doesn't. Please investigate!";
- CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
- result.data.message().realtime_sent_time())),
- realtime_remote_time)
- << ": Queue index matches, but timestamp doesn't. Please investigate!";
+ << ": Queue index matches, but timestamp doesn't. Please "
+ "investigate!";
+ CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please "
+ "investigate!";
// TODO(austin): We still go in order, so we can erase from the beginning to
// our iterator minus 1. That'll keep 1 in the queue.
@@ -1330,7 +1404,8 @@
return;
}
- // Now that it has been added (and cannibalized), forget about it upstream.
+ // Now that it has been added (and cannibalized), forget about it
+ // upstream.
boot_merger_.PopFront();
}
}
@@ -1346,8 +1421,8 @@
if (node_data.channels[m->channel_index].delivered) {
// TODO(austin): This copies the data... Probably not worth stressing
// about yet.
- // TODO(austin): Bound how big this can get. We tend not to send massive
- // data, so we can probably ignore this for a bit.
+ // TODO(austin): Bound how big this can get. We tend not to send
+ // massive data, so we can probably ignore this for a bit.
node_data.channels[m->channel_index].messages.emplace_back(*m);
}
}
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 5c5f709..4c82e4d 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -253,6 +253,8 @@
std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
std::string_view filename, size_t n);
+class UnpackedMessageHeader;
+
// Class which handles reading the header and messages from the log file. This
// handles any per-file state left before merging below.
class MessageReader {
@@ -284,7 +286,7 @@
}
// Returns the next message if there is one.
- std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
+ std::shared_ptr<UnpackedMessageHeader> ReadMessage();
// The time at which we need to read another chunk from the logfile.
monotonic_clock::time_point queue_data_time() const {
@@ -334,7 +336,7 @@
// Returns the next message if there is one, or nullopt if we have reached the
// end of all the files.
// Note: reading the next message may change the max_out_of_order_duration().
- std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
+ std::shared_ptr<UnpackedMessageHeader> ReadMessage();
// Returns the boot count for the requested node, or std::nullopt if we don't
// know.
@@ -377,6 +379,54 @@
std::vector<std::optional<size_t>> boot_counts_;
};
+// Stores MessageHeader as a flat header and inline, aligned block of data.
+class UnpackedMessageHeader {
+ public:
+ UnpackedMessageHeader(const UnpackedMessageHeader &) = delete;
+ UnpackedMessageHeader &operator=(const UnpackedMessageHeader &) = delete;
+
+ // The channel.
+ uint32_t channel_index = 0xffffffff;
+
+ monotonic_clock::time_point monotonic_sent_time;
+ realtime_clock::time_point realtime_sent_time;
+
+ // The local queue index.
+ uint32_t queue_index = 0xffffffff;
+
+ std::optional<std::chrono::nanoseconds> monotonic_remote_time;
+
+ std::optional<realtime_clock::time_point> realtime_remote_time;
+ std::optional<uint32_t> remote_queue_index;
+
+ // This field is defaulted in the flatbuffer, so we need to store both the
+ // possibly defaulted value and whether it is defaulted.
+ monotonic_clock::time_point monotonic_timestamp_time;
+ bool has_monotonic_timestamp_time;
+
+ static std::shared_ptr<UnpackedMessageHeader> MakeMessage(
+ const MessageHeader &message);
+
+ // Note: we are storing a span here because we need something to put in the
+ // SharedSpan pointer that RawSender takes. We are using the aliasing
+ // constructor of shared_ptr to avoid the allocation, and it needs a nice
+ // pointer to track.
+ absl::Span<const uint8_t> span;
+
+ char actual_data[];
+
+ private:
+ ~UnpackedMessageHeader() {}
+
+ static void DestroyAndFree(UnpackedMessageHeader *p) {
+ p->~UnpackedMessageHeader();
+ free(p);
+ }
+};
+
+std::ostream &operator<<(std::ostream &os,
+ const UnpackedMessageHeader &message);
+
// Struct to hold a message as it gets sorted on a single node.
struct Message {
// The channel.
@@ -394,8 +444,7 @@
size_t monotonic_timestamp_boot = 0xffffff;
- // The data (either a timestamp header, or a data header).
- SizePrefixedFlatbufferVector<MessageHeader> data;
+ std::shared_ptr<UnpackedMessageHeader> data;
bool operator<(const Message &m2) const;
bool operator>=(const Message &m2) const;
@@ -420,7 +469,7 @@
BootTimestamp monotonic_timestamp_time;
- SizePrefixedFlatbufferVector<MessageHeader> data;
+ std::shared_ptr<UnpackedMessageHeader> data;
};
std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 1c67312..e1e2ebd 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -230,14 +230,14 @@
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ .data = nullptr};
Message m2{.channel_index = 0,
.queue_index = BootQueueIndex{.boot = 0, .index = 0u},
.timestamp =
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ .data = nullptr};
EXPECT_LT(m1, m2);
EXPECT_GE(m2, m1);
@@ -847,22 +847,25 @@
EXPECT_EQ(output[0].timestamp.boot, 0u);
EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(101000));
- EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
+ EXPECT_FALSE(output[0].data->has_monotonic_timestamp_time);
EXPECT_EQ(output[1].timestamp.boot, 0u);
EXPECT_EQ(output[1].timestamp.time, e + chrono::milliseconds(101001));
- EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
- EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
+ EXPECT_TRUE(output[1].data->has_monotonic_timestamp_time);
+ EXPECT_EQ(output[1].data->monotonic_timestamp_time,
+ monotonic_clock::time_point(std::chrono::nanoseconds(971)));
EXPECT_EQ(output[2].timestamp.boot, 0u);
EXPECT_EQ(output[2].timestamp.time, e + chrono::milliseconds(101002));
- EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
- EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
+ EXPECT_TRUE(output[2].data->has_monotonic_timestamp_time);
+ EXPECT_EQ(output[2].data->monotonic_timestamp_time,
+ monotonic_clock::time_point(std::chrono::nanoseconds(972)));
EXPECT_EQ(output[3].timestamp.boot, 0u);
EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(101003));
- EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
- EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
+ EXPECT_TRUE(output[3].data->has_monotonic_timestamp_time);
+ EXPECT_EQ(output[3].data->monotonic_timestamp_time,
+ monotonic_clock::time_point(std::chrono::nanoseconds(973)));
}
// Tests that we can match timestamps on delivered messages.
@@ -941,17 +944,17 @@
EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[0].monotonic_event_time.time,
e + chrono::milliseconds(1000));
- EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_TRUE(output0[0].data != nullptr);
EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[1].monotonic_event_time.time,
e + chrono::milliseconds(2000));
- EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_TRUE(output0[1].data != nullptr);
EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[2].monotonic_event_time.time,
e + chrono::milliseconds(3000));
- EXPECT_TRUE(output0[2].data.Verify());
+ EXPECT_TRUE(output0[2].data != nullptr);
}
{
@@ -993,17 +996,17 @@
EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
- EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_TRUE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(2000));
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(3000));
- EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_TRUE(output1[2].data != nullptr);
}
}
@@ -1072,7 +1075,7 @@
EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
monotonic_clock::min_time);
- EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_TRUE(output0[0].data != nullptr);
EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[1].monotonic_event_time.time,
@@ -1080,7 +1083,7 @@
EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
monotonic_clock::min_time);
- EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_TRUE(output0[1].data != nullptr);
EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[2].monotonic_event_time.time,
@@ -1088,7 +1091,7 @@
EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
monotonic_clock::min_time);
- EXPECT_TRUE(output0[2].data.Verify());
+ EXPECT_TRUE(output0[2].data != nullptr);
}
{
@@ -1109,7 +1112,7 @@
EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
e + chrono::nanoseconds(971));
- EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_TRUE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
@@ -1117,7 +1120,7 @@
EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
e + chrono::nanoseconds(5458));
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
@@ -1125,7 +1128,7 @@
EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
monotonic_clock::min_time);
- EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_TRUE(output1[2].data != nullptr);
}
EXPECT_EQ(mapper0_count, 3u);
@@ -1200,17 +1203,17 @@
EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
- EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_TRUE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(2000));
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(3000));
- EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_TRUE(output1[2].data != nullptr);
}
{
@@ -1236,17 +1239,17 @@
EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[0].monotonic_event_time.time,
e + chrono::milliseconds(1000));
- EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_TRUE(output0[0].data != nullptr);
EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[1].monotonic_event_time.time,
e + chrono::milliseconds(2000));
- EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_TRUE(output0[1].data != nullptr);
EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[2].monotonic_event_time.time,
e + chrono::milliseconds(3000));
- EXPECT_TRUE(output0[2].data.Verify());
+ EXPECT_TRUE(output0[2].data != nullptr);
}
EXPECT_EQ(mapper0_count, 3u);
@@ -1319,17 +1322,17 @@
EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
- EXPECT_FALSE(output1[0].data.Verify());
+ EXPECT_FALSE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(2000));
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(3000));
- EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_TRUE(output1[2].data != nullptr);
}
EXPECT_EQ(mapper0_count, 0u);
@@ -1402,17 +1405,17 @@
EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
- EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_TRUE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(2000));
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(3000));
- EXPECT_FALSE(output1[2].data.Verify());
+ EXPECT_FALSE(output1[2].data != nullptr);
}
EXPECT_EQ(mapper0_count, 0u);
@@ -1477,12 +1480,12 @@
EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
- EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_TRUE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(3000));
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
}
EXPECT_EQ(mapper0_count, 0u);
@@ -1550,22 +1553,22 @@
EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
- EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_TRUE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(2000));
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(2000));
- EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_TRUE(output1[2].data != nullptr);
EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[3].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(3000));
- EXPECT_TRUE(output1[3].data.Verify());
+ EXPECT_TRUE(output1[3].data != nullptr);
}
EXPECT_EQ(mapper0_count, 0u);
@@ -1650,17 +1653,17 @@
EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
- EXPECT_FALSE(output1[0].data.Verify());
+ EXPECT_FALSE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(2000));
- EXPECT_FALSE(output1[1].data.Verify());
+ EXPECT_FALSE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(3000));
- EXPECT_FALSE(output1[2].data.Verify());
+ EXPECT_FALSE(output1[2].data != nullptr);
}
EXPECT_EQ(mapper1_count, 3u);
}
@@ -1754,22 +1757,22 @@
EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[0].monotonic_event_time.time,
e + chrono::milliseconds(1000));
- EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_TRUE(output0[0].data != nullptr);
EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[1].monotonic_event_time.time,
e + chrono::milliseconds(1000));
- EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_TRUE(output0[1].data != nullptr);
EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[2].monotonic_event_time.time,
e + chrono::milliseconds(2000));
- EXPECT_TRUE(output0[2].data.Verify());
+ EXPECT_TRUE(output0[2].data != nullptr);
EXPECT_EQ(output0[3].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[3].monotonic_event_time.time,
e + chrono::milliseconds(3000));
- EXPECT_TRUE(output0[3].data.Verify());
+ EXPECT_TRUE(output0[3].data != nullptr);
}
{
@@ -1827,22 +1830,22 @@
EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
- EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_TRUE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1000));
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(2000));
- EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_TRUE(output1[2].data != nullptr);
EXPECT_EQ(output1[3].monotonic_event_time.boot, 0u);
EXPECT_EQ(output1[3].monotonic_event_time.time,
e + chrono::seconds(100) + chrono::milliseconds(3000));
- EXPECT_TRUE(output1[3].data.Verify());
+ EXPECT_TRUE(output1[3].data != nullptr);
}
}
@@ -2194,7 +2197,7 @@
(BootQueueIndex{.boot = 0u, .index = 0u}));
EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
- EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_TRUE(output0[0].data != nullptr);
EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[1].monotonic_event_time.time,
@@ -2203,7 +2206,7 @@
(BootQueueIndex{.boot = 0u, .index = 1u}));
EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
- EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_TRUE(output0[1].data != nullptr);
EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[2].monotonic_event_time.time,
@@ -2212,7 +2215,7 @@
(BootQueueIndex{.boot = 0u, .index = 2u}));
EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
- EXPECT_TRUE(output0[2].data.Verify());
+ EXPECT_TRUE(output0[2].data != nullptr);
}
{
@@ -2267,7 +2270,7 @@
EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
e + chrono::milliseconds(1001));
- EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_TRUE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
@@ -2280,7 +2283,7 @@
EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
e + chrono::milliseconds(2001));
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
@@ -2293,7 +2296,7 @@
EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
e + chrono::milliseconds(2001));
- EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_TRUE(output1[2].data != nullptr);
EXPECT_EQ(output1[3].monotonic_event_time.boot, 1u);
EXPECT_EQ(output1[3].monotonic_event_time.time,
@@ -2306,7 +2309,7 @@
EXPECT_EQ(output1[3].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output1[3].monotonic_timestamp_time.time,
e + chrono::milliseconds(3001));
- EXPECT_TRUE(output1[3].data.Verify());
+ EXPECT_TRUE(output1[3].data != nullptr);
LOG(INFO) << output1[0];
LOG(INFO) << output1[1];
@@ -2414,7 +2417,7 @@
EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
e + chrono::seconds(100) + chrono::milliseconds(1001));
- EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_TRUE(output0[0].data != nullptr);
EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[1].monotonic_event_time.time,
@@ -2425,7 +2428,7 @@
EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
e + chrono::seconds(20) + chrono::milliseconds(2001));
- EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_TRUE(output0[1].data != nullptr);
EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
EXPECT_EQ(output0[2].monotonic_event_time.time,
@@ -2436,7 +2439,7 @@
EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
e + chrono::seconds(20) + chrono::milliseconds(3001));
- EXPECT_TRUE(output0[2].data.Verify());
+ EXPECT_TRUE(output0[2].data != nullptr);
}
{
@@ -2480,21 +2483,21 @@
e + chrono::seconds(100) + chrono::milliseconds(1000));
EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
- EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_TRUE(output1[0].data != nullptr);
EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
EXPECT_EQ(output1[1].monotonic_event_time.time,
e + chrono::seconds(20) + chrono::milliseconds(2000));
EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
- EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_TRUE(output1[1].data != nullptr);
EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
EXPECT_EQ(output1[2].monotonic_event_time.time,
e + chrono::seconds(20) + chrono::milliseconds(3000));
EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
- EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_TRUE(output1[2].data != nullptr);
LOG(INFO) << output1[0];
LOG(INFO) << output1[1];
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 0be64bb..a923cf6 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,9 +1,8 @@
-#include "aos/events/logging/log_reader.h"
-
#include <sys/stat.h>
#include "absl/strings/str_format.h"
#include "aos/events/event_loop.h"
+#include "aos/events/logging/log_reader.h"
#include "aos/events/logging/log_writer.h"
#include "aos/events/message_counter.h"
#include "aos/events/ping_lib.h"
@@ -849,19 +848,18 @@
// type, count) for every message matching matcher()
std::vector<std::tuple<std::string, std::string, int>> CountChannelsMatching(
std::shared_ptr<const aos::Configuration> config, std::string_view filename,
- std::function<bool(const MessageHeader *)> matcher) {
+ std::function<bool(const UnpackedMessageHeader *)> matcher) {
MessageReader message_reader(filename);
std::vector<int> counts(config->channels()->size(), 0);
while (true) {
- std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
- message_reader.ReadMessage();
+ std::shared_ptr<UnpackedMessageHeader> msg = message_reader.ReadMessage();
if (!msg) {
break;
}
- if (matcher(&msg.value().message())) {
- counts[msg.value().message().channel_index()]++;
+ if (matcher(msg.get())) {
+ counts[msg->channel_index]++;
}
}
@@ -883,30 +881,32 @@
std::vector<std::tuple<std::string, std::string, int>> CountChannelsData(
std::shared_ptr<const aos::Configuration> config,
std::string_view filename) {
- return CountChannelsMatching(config, filename, [](const MessageHeader *msg) {
- if (msg->has_data()) {
- CHECK(!msg->has_monotonic_remote_time());
- CHECK(!msg->has_realtime_remote_time());
- CHECK(!msg->has_remote_queue_index());
- return true;
- }
- return false;
- });
+ return CountChannelsMatching(
+ config, filename, [](const UnpackedMessageHeader *msg) {
+ if (msg->span.data() != nullptr) {
+ CHECK(!msg->monotonic_remote_time.has_value());
+ CHECK(!msg->realtime_remote_time.has_value());
+ CHECK(!msg->remote_queue_index.has_value());
+ return true;
+ }
+ return false;
+ });
}
// Counts the number of messages (channel, count) for all timestamp messages.
std::vector<std::tuple<std::string, std::string, int>> CountChannelsTimestamp(
std::shared_ptr<const aos::Configuration> config,
std::string_view filename) {
- return CountChannelsMatching(config, filename, [](const MessageHeader *msg) {
- if (!msg->has_data()) {
- CHECK(msg->has_monotonic_remote_time());
- CHECK(msg->has_realtime_remote_time());
- CHECK(msg->has_remote_queue_index());
- return true;
- }
- return false;
- });
+ return CountChannelsMatching(
+ config, filename, [](const UnpackedMessageHeader *msg) {
+ if (msg->span.data() == nullptr) {
+ CHECK(msg->monotonic_remote_time.has_value());
+ CHECK(msg->realtime_remote_time.has_value());
+ CHECK(msg->remote_queue_index.has_value());
+ return true;
+ }
+ return false;
+ });
}
// Tests that we can write and read simple multi-node log files.
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 3cd5c5b..9121a8f 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -39,38 +39,71 @@
const bool prior_;
};
+// Holds storage for a span object and the data referenced by that span for
+// compatibility with RawSender::SharedSpan users. If constructed with
+// MakeSharedSpan, span points to only the aligned segment of the entire data.
+struct AlignedOwningSpan {
+ AlignedOwningSpan(const AlignedOwningSpan &) = delete;
+ AlignedOwningSpan &operator=(const AlignedOwningSpan &) = delete;
+ absl::Span<const uint8_t> span;
+ char data[];
+};
+
+// Constructs a span which owns its data through a shared_ptr. The owning span
+// points to a const view of the data; also returns a temporary mutable span
+// which is only valid while the const shared span is kept alive.
+std::pair<RawSender::SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(
+ size_t size) {
+ AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
+ malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
+
+ absl::Span mutable_span(
+ reinterpret_cast<uint8_t *>(RoundChannelData(&span->data[0], size)),
+ size);
+ new (span) AlignedOwningSpan{.span = mutable_span};
+
+ return std::make_pair(
+ RawSender::SharedSpan(
+ std::shared_ptr<AlignedOwningSpan>(span,
+ [](AlignedOwningSpan *s) {
+ s->~AlignedOwningSpan();
+ free(s);
+ }),
+ &span->span),
+ mutable_span);
+}
+
// Container for both a message, and the context for it for simulation. This
// makes tracking the timestamps associated with the data easy.
struct SimulatedMessage final {
SimulatedMessage(const SimulatedMessage &) = delete;
SimulatedMessage &operator=(const SimulatedMessage &) = delete;
+ ~SimulatedMessage();
// Creates a SimulatedMessage with size bytes of storage.
// This is a shared_ptr so we don't have to implement refcounting or copying.
- static std::shared_ptr<SimulatedMessage> Make(SimulatedChannel *channel);
+ static std::shared_ptr<SimulatedMessage> Make(
+ SimulatedChannel *channel, const RawSender::SharedSpan data);
// Context for the data.
Context context;
SimulatedChannel *const channel = nullptr;
- // The data.
- char *data(size_t buffer_size) {
- return RoundChannelData(&actual_data[0], buffer_size);
- }
+ // Owning span to this message's data. Depending on the sender may either
+ // represent the data of just the flatbuffer, or max channel size.
+ RawSender::SharedSpan data;
- // Then the data, including padding on the end so we can align the buffer we
- // actually return from data().
- char actual_data[];
+ // Mutable view of above data. If empty, this message is not mutable.
+ absl::Span<uint8_t> mutable_data;
- private:
+ // Determines whether this message is mutable. Used for Send where the user
+ // fills out a message stored internally then gives us the size of data used.
+ bool is_mutable() const { return data->size() == mutable_data.size(); }
+
+ // Note: this should be private but make_shared requires it to be public. Use
+ // Make() above to construct.
SimulatedMessage(SimulatedChannel *channel_in);
- ~SimulatedMessage();
-
- static void DestroyAndFree(SimulatedMessage *p) {
- p->~SimulatedMessage();
- free(p);
- }
};
} // namespace
@@ -260,19 +293,17 @@
namespace {
std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
- SimulatedChannel *channel) {
+ SimulatedChannel *channel, RawSender::SharedSpan data) {
// The allocations in here are due to infrastructure and don't count in the no
// mallocs in RT code.
ScopedNotRealtime nrt;
- const size_t size = channel->max_size();
- SimulatedMessage *const message = reinterpret_cast<SimulatedMessage *>(
- malloc(sizeof(SimulatedMessage) + size + kChannelDataAlignment - 1));
- new (message) SimulatedMessage(channel);
- message->context.size = size;
- message->context.data = message->data(size);
- return std::shared_ptr<SimulatedMessage>(message,
- &SimulatedMessage::DestroyAndFree);
+ auto message = std::make_shared<SimulatedMessage>(channel);
+ message->context.size = data->size();
+ message->context.data = data->data();
+ message->data = std::move(data);
+
+ return message;
}
SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
@@ -292,9 +323,13 @@
void *data() override {
if (!message_) {
- message_ = SimulatedMessage::Make(simulated_channel_);
+ auto [span, mutable_span] =
+ MakeSharedSpan(simulated_channel_->max_size());
+ message_ = SimulatedMessage::Make(simulated_channel_, span);
+ message_->mutable_data = mutable_span;
}
- return message_->data(simulated_channel_->max_size());
+ CHECK(message_->is_mutable());
+ return message_->mutable_data.data();
}
size_t size() override { return simulated_channel_->max_size(); }
@@ -310,6 +345,12 @@
uint32_t remote_queue_index,
const UUID &source_boot_uuid) override;
+ bool DoSend(const SharedSpan data,
+ aos::monotonic_clock::time_point monotonic_remote_time,
+ aos::realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ const UUID &source_boot_uuid) override;
+
int buffer_index() override {
// First, ensure message_ is allocated.
data();
@@ -869,8 +910,12 @@
uint32_t SimulatedChannel::Send(std::shared_ptr<SimulatedMessage> message) {
const uint32_t queue_index = next_queue_index_.index();
message->context.queue_index = queue_index;
- message->context.data = message->data(channel()->max_size()) +
- channel()->max_size() - message->context.size;
+
+ // Points to the actual data depending on the size set in context. Data may
+ // allocate more than the actual size of the message, so offset from the back
+ // of that to get the actual start of the data.
+ message->context.data =
+ message->data->data() + message->data->size() - message->context.size;
DCHECK(channel()->has_schema())
<< ": Missing schema for channel "
@@ -959,21 +1004,35 @@
<< ": Attempting to send too big a message on "
<< configuration::CleanedChannelToString(simulated_channel_->channel());
- // This is wasteful, but since flatbuffers fill from the back end of the
- // queue, we need it to be full sized.
- message_ = SimulatedMessage::Make(simulated_channel_);
+ // Allocates an aligned buffer in which to copy unaligned msg.
+ auto [span, mutable_span] = MakeSharedSpan(size);
+ message_ = SimulatedMessage::Make(simulated_channel_, span);
// Now fill in the message. size is already populated above, and
- // queue_index will be populated in simulated_channel_. Put this at the
- // back of the data segment.
- memcpy(message_->data(simulated_channel_->max_size()) +
- simulated_channel_->max_size() - size,
- msg, size);
+ // queue_index will be populated in simulated_channel_.
+ memcpy(mutable_span.data(), msg, size);
return DoSend(size, monotonic_remote_time, realtime_remote_time,
remote_queue_index, source_boot_uuid);
}
+bool SimulatedSender::DoSend(const RawSender::SharedSpan data,
+ monotonic_clock::time_point monotonic_remote_time,
+ realtime_clock::time_point realtime_remote_time,
+ uint32_t remote_queue_index,
+ const UUID &source_boot_uuid) {
+ CHECK_LE(data->size(), this->size())
+ << ": Attempting to send too big a message on "
+ << configuration::CleanedChannelToString(simulated_channel_->channel());
+
+ // Constructs a message sharing the already allocated and aligned message
+ // data.
+ message_ = SimulatedMessage::Make(simulated_channel_, data);
+
+ return DoSend(data->size(), monotonic_remote_time, realtime_remote_time,
+ remote_queue_index, source_boot_uuid);
+}
+
SimulatedTimerHandler::SimulatedTimerHandler(
EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn)