Optimize simulated event loop log reading.
Currenly, we make 2 copies of a message before it is sent in
simulation: First, from the source (file or lzma decoder) to a
flatbuffer vector, then to the simulated message. By aligning the
data part of a MessageHeader in the first copy from the source,
we don't need to copy it again. This is all tracked with
shared_ptr's so the lifetime management is easy.
Change-Id: I82c86ef3f9662d4c615dc57862fa89b1b9981ed4
Signed-off-by: Tyler Chatow <tchatow@gmail.com>
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 73578e6..dd82418 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -43,9 +43,20 @@
"last header as the actual header.");
namespace aos::logger {
+namespace {
namespace chrono = std::chrono;
+template <typename T>
+void PrintOptionalOrNull(std::ostream *os, const std::optional<T> &t) {
+ if (t.has_value()) {
+ *os << *t;
+ } else {
+ *os << "null";
+ }
+}
+} // namespace
+
DetachedBufferWriter::DetachedBufferWriter(
std::string_view filename, std::unique_ptr<DetachedBufferEncoder> encoder)
: filename_(filename), encoder_(std::move(encoder)) {
@@ -499,23 +510,81 @@
<< FlatbufferToJson(log_file_header()->node());
}
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
-MessageReader::ReadMessage() {
+std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
if (msg_data == absl::Span<const uint8_t>()) {
- return std::nullopt;
+ return nullptr;
}
- SizePrefixedFlatbufferVector<MessageHeader> result(msg_data);
+ SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
+ CHECK(msg.Verify()) << ": Corrupted message from " << filename();
- CHECK(result.Verify()) << ": Corrupted message from " << filename();
+ auto result = UnpackedMessageHeader::MakeMessage(msg.message());
- const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
- chrono::nanoseconds(result.message().monotonic_sent_time()));
+ const monotonic_clock::time_point timestamp = result->monotonic_sent_time;
newest_timestamp_ = std::max(newest_timestamp_, timestamp);
- VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
- return std::move(result);
+ VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(msg);
+ return result;
+}
+
+std::shared_ptr<UnpackedMessageHeader> UnpackedMessageHeader::MakeMessage(
+ const MessageHeader &message) {
+ const size_t data_size = message.has_data() ? message.data()->size() : 0;
+
+ UnpackedMessageHeader *const unpacked_message =
+ reinterpret_cast<UnpackedMessageHeader *>(
+ malloc(sizeof(UnpackedMessageHeader) + data_size +
+ kChannelDataAlignment - 1));
+
+ CHECK(message.has_channel_index());
+ CHECK(message.has_monotonic_sent_time());
+
+ absl::Span<uint8_t> span;
+ if (data_size > 0) {
+ span =
+ absl::Span<uint8_t>(reinterpret_cast<uint8_t *>(RoundChannelData(
+ &unpacked_message->actual_data[0], data_size)),
+ data_size);
+ }
+
+ std::optional<std::chrono::nanoseconds> monotonic_remote_time;
+ if (message.has_monotonic_remote_time()) {
+ monotonic_remote_time =
+ std::chrono::nanoseconds(message.monotonic_remote_time());
+ }
+ std::optional<realtime_clock::time_point> realtime_remote_time;
+ if (message.has_realtime_remote_time()) {
+ realtime_remote_time = realtime_clock::time_point(
+ chrono::nanoseconds(message.realtime_remote_time()));
+ }
+
+ std::optional<uint32_t> remote_queue_index;
+ if (message.has_remote_queue_index()) {
+ remote_queue_index = message.remote_queue_index();
+ }
+
+ new (unpacked_message) UnpackedMessageHeader{
+ .channel_index = message.channel_index(),
+ .monotonic_sent_time = monotonic_clock::time_point(
+ chrono::nanoseconds(message.monotonic_sent_time())),
+ .realtime_sent_time = realtime_clock::time_point(
+ chrono::nanoseconds(message.realtime_sent_time())),
+ .queue_index = message.queue_index(),
+ .monotonic_remote_time = monotonic_remote_time,
+ .realtime_remote_time = realtime_remote_time,
+ .remote_queue_index = remote_queue_index,
+ .monotonic_timestamp_time = monotonic_clock::time_point(
+ std::chrono::nanoseconds(message.monotonic_timestamp_time())),
+ .has_monotonic_timestamp_time = message.has_monotonic_timestamp_time(),
+ .span = span};
+
+ if (data_size > 0) {
+ memcpy(span.data(), message.data()->data(), data_size);
+ }
+
+ return std::shared_ptr<UnpackedMessageHeader>(unpacked_message,
+ &DestroyAndFree);
}
PartsMessageReader::PartsMessageReader(LogParts log_parts)
@@ -569,19 +638,19 @@
}
}
-std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
-PartsMessageReader::ReadMessage() {
+std::shared_ptr<UnpackedMessageHeader> PartsMessageReader::ReadMessage() {
while (!done_) {
- std::optional<SizePrefixedFlatbufferVector<MessageHeader>> message =
+ std::shared_ptr<UnpackedMessageHeader> message =
message_reader_.ReadMessage();
if (message) {
newest_timestamp_ = message_reader_.newest_timestamp();
- const monotonic_clock::time_point monotonic_sent_time(
- chrono::nanoseconds(message->message().monotonic_sent_time()));
- // TODO(austin): Does this work with startup? Might need to use the start
- // time.
- // TODO(austin): Does this work with startup when we don't know the remote
- // start time too? Look at one of those logs to compare.
+ const monotonic_clock::time_point monotonic_sent_time =
+ message->monotonic_sent_time;
+
+ // TODO(austin): Does this work with startup? Might need to use the
+ // start time.
+ // TODO(austin): Does this work with startup when we don't know the
+ // remote start time too? Look at one of those logs to compare.
if (monotonic_sent_time >
parts_.monotonic_start_time + max_out_of_order_duration()) {
after_start_ = true;
@@ -599,7 +668,7 @@
NextLog();
}
newest_timestamp_ = monotonic_clock::max_time;
- return std::nullopt;
+ return nullptr;
}
void PartsMessageReader::NextLog() {
@@ -645,13 +714,29 @@
channel_index == m2.channel_index && queue_index == m2.queue_index;
}
+std::ostream &operator<<(std::ostream &os, const UnpackedMessageHeader &m) {
+ os << "{.channel_index=" << m.channel_index
+ << ", .monotonic_sent_time=" << m.monotonic_sent_time
+ << ", .realtime_sent_time=" << m.realtime_sent_time
+ << ", .queue_index=" << m.queue_index;
+ if (m.monotonic_remote_time) {
+ os << ", .monotonic_remote_time=" << m.monotonic_remote_time->count();
+ }
+ os << ", .realtime_remote_time=";
+ PrintOptionalOrNull(&os, m.realtime_remote_time);
+ os << ", .remote_queue_index=";
+ PrintOptionalOrNull(&os, m.remote_queue_index);
+ if (m.has_monotonic_timestamp_time) {
+ os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+ }
+ return os;
+}
+
std::ostream &operator<<(std::ostream &os, const Message &m) {
os << "{.channel_index=" << m.channel_index
<< ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
- if (m.data.Verify()) {
- os << ", .data="
- << aos::FlatbufferToJson(m.data,
- {.multi_line = false, .max_vector_size = 1});
+ if (m.data != nullptr) {
+ os << ", .data=" << m;
}
os << "}";
return os;
@@ -674,10 +759,8 @@
if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
}
- if (m.data.Verify()) {
- os << ", .data="
- << aos::FlatbufferToJson(m.data,
- {.multi_line = false, .max_vector_size = 1});
+ if (m.data != nullptr) {
+ os << ", .data=" << *m.data;
}
os << "}";
return os;
@@ -700,7 +783,7 @@
break;
}
- std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+ std::shared_ptr<UnpackedMessageHeader> m =
parts_message_reader_.ReadMessage();
// No data left, sorted forever, work through what is left.
if (!m) {
@@ -709,36 +792,32 @@
}
size_t monotonic_timestamp_boot = 0;
- if (m.value().message().has_monotonic_timestamp_time()) {
+ if (m->has_monotonic_timestamp_time) {
monotonic_timestamp_boot = parts().logger_boot_count;
}
size_t monotonic_remote_boot = 0xffffff;
- if (m.value().message().has_monotonic_remote_time()) {
+ if (m->monotonic_remote_time.has_value()) {
const Node *node = parts().config->nodes()->Get(
- source_node_index_[m->message().channel_index()]);
+ source_node_index_[m->channel_index]);
std::optional<size_t> boot = parts_message_reader_.boot_count(
- source_node_index_[m->message().channel_index()]);
+ source_node_index_[m->channel_index]);
CHECK(boot) << ": Failed to find boot for node " << MaybeNodeName(node)
<< ", with index "
- << source_node_index_[m->message().channel_index()];
+ << source_node_index_[m->channel_index];
monotonic_remote_boot = *boot;
}
- messages_.insert(Message{
- .channel_index = m.value().message().channel_index(),
- .queue_index =
- BootQueueIndex{.boot = parts().boot_count,
- .index = m.value().message().queue_index()},
- .timestamp =
- BootTimestamp{
- .boot = parts().boot_count,
- .time = monotonic_clock::time_point(std::chrono::nanoseconds(
- m.value().message().monotonic_sent_time()))},
- .monotonic_remote_boot = monotonic_remote_boot,
- .monotonic_timestamp_boot = monotonic_timestamp_boot,
- .data = std::move(m.value())});
+ messages_.insert(
+ Message{.channel_index = m->channel_index,
+ .queue_index = BootQueueIndex{.boot = parts().boot_count,
+ .index = m->queue_index},
+ .timestamp = BootTimestamp{.boot = parts().boot_count,
+ .time = m->monotonic_sent_time},
+ .monotonic_remote_boot = monotonic_remote_boot,
+ .monotonic_timestamp_boot = monotonic_timestamp_boot,
+ .data = std::move(m)});
// Now, update sorted_until_ to match the new message.
if (parts_message_reader_.newest_timestamp() >
@@ -878,17 +957,17 @@
oldest = m;
current_ = &parts_sorter;
} else if (*m == *oldest) {
- // Found a duplicate. If there is a choice, we want the one which has the
- // timestamp time.
- if (!m->data.message().has_monotonic_timestamp_time()) {
+ // Found a duplicate. If there is a choice, we want the one which has
+ // the timestamp time.
+ if (!m->data->has_monotonic_timestamp_time) {
parts_sorter.PopFront();
- } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
+ } else if (!oldest->data->has_monotonic_timestamp_time) {
current_->PopFront();
current_ = &parts_sorter;
oldest = m;
} else {
- CHECK_EQ(m->data.message().monotonic_timestamp_time(),
- oldest->data.message().monotonic_timestamp_time());
+ CHECK_EQ(m->data->monotonic_timestamp_time,
+ oldest->data->monotonic_timestamp_time);
parts_sorter.PopFront();
}
}
@@ -1017,7 +1096,7 @@
CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
- // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
+ // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
// we could needlessly save data.
if (node_data->any_delivered) {
VLOG(1) << "Registering on node " << node() << " for peer node "
@@ -1035,8 +1114,7 @@
.channel_index = m->channel_index,
.queue_index = m->queue_index,
.monotonic_event_time = m->timestamp,
- .realtime_event_time = aos::realtime_clock::time_point(
- std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+ .realtime_event_time = m->data->realtime_sent_time,
.remote_queue_index = BootQueueIndex::Invalid(),
.monotonic_remote_time = BootTimestamp::min_time(),
.realtime_remote_time = realtime_clock::min_time,
@@ -1086,16 +1164,17 @@
return true;
}
- // We need to only add messages to the list so they get processed for messages
- // which are delivered. Reuse the flow below which uses messages_ by just
- // adding the new message to messages_ and continuing.
+ // We need to only add messages to the list so they get processed for
+ // messages which are delivered. Reuse the flow below which uses messages_
+ // by just adding the new message to messages_ and continuing.
if (messages_.empty()) {
if (!Queue()) {
// Found nothing to add, we are out of data!
return false;
}
- // Now that it has been added (and cannibalized), forget about it upstream.
+ // Now that it has been added (and cannibalized), forget about it
+ // upstream.
boot_merger_.PopFront();
}
@@ -1110,7 +1189,8 @@
timestamp_callback_(&matched_messages_.back());
return true;
} else {
- // Got a timestamp, find the matching remote data, match it, and return it.
+ // Got a timestamp, find the matching remote data, match it, and return
+ // it.
Message data = MatchingMessageFor(*m);
// Return the data from the remote. The local message only has timestamp
@@ -1119,21 +1199,16 @@
.channel_index = m->channel_index,
.queue_index = m->queue_index,
.monotonic_event_time = m->timestamp,
- .realtime_event_time = aos::realtime_clock::time_point(
- std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+ .realtime_event_time = m->data->realtime_sent_time,
.remote_queue_index =
BootQueueIndex{.boot = m->monotonic_remote_boot,
- .index = m->data.message().remote_queue_index()},
- .monotonic_remote_time =
- {m->monotonic_remote_boot,
- monotonic_clock::time_point(std::chrono::nanoseconds(
- m->data.message().monotonic_remote_time()))},
- .realtime_remote_time = realtime_clock::time_point(
- std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
- .monotonic_timestamp_time =
- {m->monotonic_timestamp_boot,
- monotonic_clock::time_point(std::chrono::nanoseconds(
- m->data.message().monotonic_timestamp_time()))},
+ .index = m->data->remote_queue_index.value()},
+ .monotonic_remote_time = {m->monotonic_remote_boot,
+ monotonic_clock::time_point(
+ m->data->monotonic_remote_time.value())},
+ .realtime_remote_time = m->data->realtime_remote_time.value(),
+ .monotonic_timestamp_time = {m->monotonic_timestamp_boot,
+ m->data->monotonic_timestamp_time},
.data = std::move(data.data)});
CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
last_message_time_ = matched_messages_.back().monotonic_event_time;
@@ -1153,8 +1228,9 @@
}
void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
- // Note: queueing for time doesn't really work well across boots. So we just
- // assume that if you are using this, you only care about the current boot.
+ // Note: queueing for time doesn't really work well across boots. So we
+ // just assume that if you are using this, you only care about the current
+ // boot.
//
// TODO(austin): Is that the right concept?
//
@@ -1191,36 +1267,37 @@
Message TimestampMapper::MatchingMessageFor(const Message &message) {
// Figure out what queue index we are looking for.
- CHECK(message.data.message().has_remote_queue_index());
+ CHECK_NOTNULL(message.data);
+ CHECK(message.data->remote_queue_index.has_value());
const BootQueueIndex remote_queue_index =
BootQueueIndex{.boot = message.monotonic_remote_boot,
- .index = message.data.message().remote_queue_index()};
+ .index = *message.data->remote_queue_index};
- CHECK(message.data.message().has_monotonic_remote_time());
- CHECK(message.data.message().has_realtime_remote_time());
+ CHECK(message.data->monotonic_remote_time.has_value());
+ CHECK(message.data->realtime_remote_time.has_value());
const BootTimestamp monotonic_remote_time{
.boot = message.monotonic_remote_boot,
- .time = monotonic_clock::time_point(std::chrono::nanoseconds(
- message.data.message().monotonic_remote_time()))};
- const realtime_clock::time_point realtime_remote_time(
- std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
+ .time = monotonic_clock::time_point(
+ message.data->monotonic_remote_time.value())};
+ const realtime_clock::time_point realtime_remote_time =
+ *message.data->realtime_remote_time;
- TimestampMapper *peer = nodes_data_[source_node_[message.channel_index]].peer;
+ TimestampMapper *peer =
+ nodes_data_[source_node_[message.data->channel_index]].peer;
// We only register the peers which we have data for. So, if we are being
- // asked to pull a timestamp from a peer which doesn't exist, return an empty
- // message.
+ // asked to pull a timestamp from a peer which doesn't exist, return an
+ // empty message.
if (peer == nullptr) {
// TODO(austin): Make sure the tests hit all these paths with a boot count
// of 1...
- return Message{
- .channel_index = message.channel_index,
- .queue_index = remote_queue_index,
- .timestamp = monotonic_remote_time,
- .monotonic_remote_boot = 0xffffff,
- .monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ return Message{.channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
+ .data = nullptr};
}
// The queue which will have the matching data, if available.
@@ -1230,13 +1307,12 @@
peer->QueueUnmatchedUntil(monotonic_remote_time);
if (data_queue->empty()) {
- return Message{
- .channel_index = message.channel_index,
- .queue_index = remote_queue_index,
- .timestamp = monotonic_remote_time,
- .monotonic_remote_boot = 0xffffff,
- .monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ return Message{.channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
+ .data = nullptr};
}
if (remote_queue_index < data_queue->front().queue_index ||
@@ -1247,7 +1323,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ .data = nullptr};
}
// The algorithm below is constant time with some assumptions. We need there
@@ -1267,8 +1343,7 @@
CHECK_EQ(result.timestamp, monotonic_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
- CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
- result.data.message().realtime_sent_time())),
+ CHECK_EQ(result.data->realtime_sent_time,
realtime_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
// Now drop the data off the front. We have deduplicated timestamps, so we
@@ -1288,23 +1363,22 @@
m.timestamp.boot == remote_boot;
});
if (it == data_queue->end()) {
- return Message{
- .channel_index = message.channel_index,
- .queue_index = remote_queue_index,
- .timestamp = monotonic_remote_time,
- .monotonic_remote_boot = 0xffffff,
- .monotonic_timestamp_boot = 0xffffff,
- .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ return Message{.channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
+ .data = nullptr};
}
Message result = std::move(*it);
CHECK_EQ(result.timestamp, monotonic_remote_time)
- << ": Queue index matches, but timestamp doesn't. Please investigate!";
- CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
- result.data.message().realtime_sent_time())),
- realtime_remote_time)
- << ": Queue index matches, but timestamp doesn't. Please investigate!";
+ << ": Queue index matches, but timestamp doesn't. Please "
+ "investigate!";
+ CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please "
+ "investigate!";
// TODO(austin): We still go in order, so we can erase from the beginning to
// our iterator minus 1. That'll keep 1 in the queue.
@@ -1330,7 +1404,8 @@
return;
}
- // Now that it has been added (and cannibalized), forget about it upstream.
+ // Now that it has been added (and cannibalized), forget about it
+ // upstream.
boot_merger_.PopFront();
}
}
@@ -1346,8 +1421,8 @@
if (node_data.channels[m->channel_index].delivered) {
// TODO(austin): This copies the data... Probably not worth stressing
// about yet.
- // TODO(austin): Bound how big this can get. We tend not to send massive
- // data, so we can probably ignore this for a bit.
+ // TODO(austin): Bound how big this can get. We tend not to send
+ // massive data, so we can probably ignore this for a bit.
node_data.channels[m->channel_index].messages.emplace_back(*m);
}
}