Change LogReader API to be able to replace messages
The mutation API in LogReader was not able to express dropping messages,
or growing messages. This enables more aggressive mutation.
Change-Id: I477482da4262483a780d15ebf8c98a51e37099f6
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index c01026f..30ed3d7 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1579,14 +1579,14 @@
os << "{.channel_index=" << msg.channel_index
<< ", .queue_index=" << msg.queue_index
<< ", .timestamp=" << msg.timestamp;
- if (msg.data != nullptr) {
- if (msg.data->remote_queue_index.has_value()) {
- os << ", .remote_queue_index=" << *msg.data->remote_queue_index;
+ if (msg.header != nullptr) {
+ if (msg.header->remote_queue_index.has_value()) {
+ os << ", .remote_queue_index=" << *msg.header->remote_queue_index;
}
- if (msg.data->monotonic_remote_time.has_value()) {
- os << ", .monotonic_remote_time=" << *msg.data->monotonic_remote_time;
+ if (msg.header->monotonic_remote_time.has_value()) {
+ os << ", .monotonic_remote_time=" << *msg.header->monotonic_remote_time;
}
- os << ", .data=" << msg.data;
+ os << ", .header=" << msg.header;
}
os << "}";
return os;
@@ -1614,7 +1614,7 @@
os << ", .monotonic_timestamp_time=" << msg.monotonic_timestamp_time;
}
if (msg.data != nullptr) {
- os << ", .data=" << *msg.data;
+ os << ", .data=" << msg.data.get();
} else {
os << ", .data=nullptr";
}
@@ -1666,15 +1666,20 @@
monotonic_remote_boot = *boot;
}
- messages_.insert(
- Message{.channel_index = msg->channel_index,
- .queue_index = BootQueueIndex{.boot = parts().boot_count,
- .index = msg->queue_index},
- .timestamp = BootTimestamp{.boot = parts().boot_count,
- .time = msg->monotonic_sent_time},
- .monotonic_remote_boot = monotonic_remote_boot,
- .monotonic_timestamp_boot = monotonic_timestamp_boot,
- .data = std::move(msg)});
+ std::shared_ptr<SharedSpan> data =
+ std::make_shared<SharedSpan>(msg, &msg->span);
+
+ messages_.insert(Message{
+ .channel_index = msg->channel_index,
+ .queue_index = BootQueueIndex{.boot = parts().boot_count,
+ .index = msg->queue_index},
+ .timestamp = BootTimestamp{.boot = parts().boot_count,
+ .time = msg->monotonic_sent_time},
+ .monotonic_remote_boot = monotonic_remote_boot,
+ .monotonic_timestamp_boot = monotonic_timestamp_boot,
+ .header = std::move(msg),
+ .data = std::move(data),
+ });
// Now, update sorted_until_ to match the new message.
if (parts_message_reader_.newest_timestamp() >
@@ -1827,15 +1832,15 @@
} else if (*msg == *oldest) {
// Found a duplicate. If there is a choice, we want the one which has
// the timestamp time.
- if (!msg->data->has_monotonic_timestamp_time) {
+ if (!msg->header->has_monotonic_timestamp_time) {
message_sorter.PopFront();
- } else if (!oldest->data->has_monotonic_timestamp_time) {
+ } else if (!oldest->header->has_monotonic_timestamp_time) {
current_->PopFront();
current_ = &message_sorter;
oldest = msg;
} else {
- CHECK_EQ(msg->data->monotonic_timestamp_time,
- oldest->data->monotonic_timestamp_time);
+ CHECK_EQ(msg->header->monotonic_timestamp_time,
+ oldest->header->monotonic_timestamp_time);
message_sorter.PopFront();
}
}
@@ -2037,26 +2042,30 @@
}
CHECK_LT(msg->channel_index, source_node.size());
if (source_node[msg->channel_index] != static_cast<size_t>(node())) {
- timestamp_messages_.emplace_back(TimestampedMessage{
+ TimestampedMessage timestamped_message{
.channel_index = msg->channel_index,
.queue_index = msg->queue_index,
.monotonic_event_time = msg->timestamp,
- .realtime_event_time = msg->data->realtime_sent_time,
+ .realtime_event_time = msg->header->realtime_sent_time,
.remote_queue_index =
BootQueueIndex{.boot = msg->monotonic_remote_boot,
- .index = msg->data->remote_queue_index.value()},
+ .index = msg->header->remote_queue_index.value()},
.monotonic_remote_time = {msg->monotonic_remote_boot,
- msg->data->monotonic_remote_time.value()},
- .realtime_remote_time = msg->data->realtime_remote_time.value(),
+ msg->header->monotonic_remote_time.value()},
+ .realtime_remote_time = msg->header->realtime_remote_time.value(),
.monotonic_remote_transmit_time =
{msg->monotonic_remote_boot,
- msg->data->monotonic_remote_transmit_time},
+ msg->header->monotonic_remote_transmit_time},
.monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
- msg->data->monotonic_timestamp_time},
- .data = std::move(msg->data)});
+ msg->header->monotonic_timestamp_time},
+ .data = msg->data,
+ };
- VLOG(2) << this << " Queued timestamp of " << timestamp_messages_.back();
- fn(×tamp_messages_.back());
+ fn(×tamped_message);
+
+ VLOG(2) << this << " Queued timestamp of " << timestamped_message;
+
+ timestamp_messages_.emplace_back(std::move(*msg));
} else {
VLOG(2) << this << " Dropped data";
}
@@ -2100,25 +2109,12 @@
CHECK(queue_timestamps_ran_);
}
- // timestamp_messages_ is a queue of TimestampedMessage, but we are supposed
- // to return a Message. We need to convert the first message in the list
- // before returning it (and comparing, honestly). Fill next_timestamp_ in if
- // it is empty so the rest of the logic here can just look at next_timestamp_
- // and use that instead.
- if (!next_timestamp_ && !timestamp_messages_.empty()) {
- auto &front = timestamp_messages_.front();
- next_timestamp_ = Message{
- .channel_index = front.channel_index,
- .queue_index = front.queue_index,
- .timestamp = front.monotonic_event_time,
- .monotonic_remote_boot = front.remote_queue_index.boot,
- .monotonic_timestamp_boot = front.monotonic_timestamp_time.boot,
- .data = std::move(front.data),
- };
- timestamp_messages_.pop_front();
+ const Message *timestamp_messages_front = nullptr;
+ if (!timestamp_messages_.empty()) {
+ timestamp_messages_front = ×tamp_messages_.front();
}
- if (!next_timestamp_) {
+ if (!timestamp_messages_front) {
message_source_ = MessageSource::kBootMerger;
if (boot_merger_front != nullptr) {
VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
@@ -2134,15 +2130,15 @@
message_source_ = MessageSource::kTimestampMessage;
VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
- << next_timestamp_.value();
- return &next_timestamp_.value();
+ << *timestamp_messages_front;
+ return timestamp_messages_front;
}
- if (*boot_merger_front <= next_timestamp_.value()) {
- if (*boot_merger_front == next_timestamp_.value()) {
+ if (*boot_merger_front <= *timestamp_messages_front) {
+ if (*boot_merger_front == *timestamp_messages_front) {
VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name()
<< " Dropping duplicate timestamp.";
- next_timestamp_.reset();
+ timestamp_messages_.pop_front();
}
message_source_ = MessageSource::kBootMerger;
if (boot_merger_front != nullptr) {
@@ -2156,16 +2152,16 @@
} else {
message_source_ = MessageSource::kTimestampMessage;
VLOG(1) << this << " SplitTimestampBootMerger::Front " << node_name() << " "
- << next_timestamp_.value();
- return &next_timestamp_.value();
+ << *timestamp_messages_front;
+ return timestamp_messages_front;
}
}
void SplitTimestampBootMerger::PopFront() {
switch (message_source_) {
case MessageSource::kTimestampMessage:
- CHECK(next_timestamp_.has_value());
- next_timestamp_.reset();
+ CHECK(!timestamp_messages_.empty());
+ timestamp_messages_.pop_front();
break;
case MessageSource::kBootMerger:
boot_merger_.PopFront();
@@ -2246,7 +2242,7 @@
.channel_index = msg->channel_index,
.queue_index = msg->queue_index,
.monotonic_event_time = msg->timestamp,
- .realtime_event_time = msg->data->realtime_sent_time,
+ .realtime_event_time = msg->header->realtime_sent_time,
.remote_queue_index = BootQueueIndex::Invalid(),
.monotonic_remote_time = BootTimestamp::min_time(),
.realtime_remote_time = realtime_clock::min_time,
@@ -2368,18 +2364,18 @@
.channel_index = msg->channel_index,
.queue_index = msg->queue_index,
.monotonic_event_time = msg->timestamp,
- .realtime_event_time = msg->data->realtime_sent_time,
+ .realtime_event_time = msg->header->realtime_sent_time,
.remote_queue_index =
BootQueueIndex{.boot = msg->monotonic_remote_boot,
- .index = msg->data->remote_queue_index.value()},
+ .index = msg->header->remote_queue_index.value()},
.monotonic_remote_time = {msg->monotonic_remote_boot,
- msg->data->monotonic_remote_time.value()},
- .realtime_remote_time = msg->data->realtime_remote_time.value(),
+ msg->header->monotonic_remote_time.value()},
+ .realtime_remote_time = msg->header->realtime_remote_time.value(),
.monotonic_remote_transmit_time =
{msg->monotonic_remote_boot,
- msg->data->monotonic_remote_transmit_time},
+ msg->header->monotonic_remote_transmit_time},
.monotonic_timestamp_time = {msg->monotonic_timestamp_boot,
- msg->data->monotonic_timestamp_time},
+ msg->header->monotonic_timestamp_time},
.data = std::move(data.data)});
VLOG(1) << node_name() << " Inserted timestamp "
<< matched_messages_.back();
@@ -2446,23 +2442,23 @@
Message TimestampMapper::MatchingMessageFor(const Message &message) {
// Figure out what queue index we are looking for.
- CHECK_NOTNULL(message.data);
- CHECK(message.data->remote_queue_index.has_value());
+ CHECK_NOTNULL(message.header);
+ CHECK(message.header->remote_queue_index.has_value());
const BootQueueIndex remote_queue_index =
BootQueueIndex{.boot = message.monotonic_remote_boot,
- .index = *message.data->remote_queue_index};
+ .index = *message.header->remote_queue_index};
- CHECK(message.data->monotonic_remote_time.has_value());
- CHECK(message.data->realtime_remote_time.has_value());
+ CHECK(message.header->monotonic_remote_time.has_value());
+ CHECK(message.header->realtime_remote_time.has_value());
const BootTimestamp monotonic_remote_time{
.boot = message.monotonic_remote_boot,
- .time = message.data->monotonic_remote_time.value()};
+ .time = message.header->monotonic_remote_time.value()};
const realtime_clock::time_point realtime_remote_time =
- *message.data->realtime_remote_time;
+ *message.header->realtime_remote_time;
TimestampMapper *peer =
- nodes_data_[source_node_[message.data->channel_index]].peer;
+ nodes_data_[source_node_[message.header->channel_index]].peer;
// We only register the peers which we have data for. So, if we are being
// asked to pull a timestamp from a peer which doesn't exist, return an
@@ -2475,6 +2471,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
}
@@ -2490,6 +2487,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
}
@@ -2500,6 +2498,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
}
@@ -2520,7 +2519,7 @@
CHECK_EQ(result.timestamp, monotonic_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
- CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+ CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please investigate!";
// Now drop the data off the front. We have deduplicated timestamps, so we
// are done. And all the data is in order.
@@ -2544,6 +2543,7 @@
.timestamp = monotonic_remote_time,
.monotonic_remote_boot = 0xffffff,
.monotonic_timestamp_boot = 0xffffff,
+ .header = nullptr,
.data = nullptr};
}
@@ -2552,7 +2552,7 @@
CHECK_EQ(result.timestamp, monotonic_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please "
"investigate!";
- CHECK_EQ(result.data->realtime_sent_time, realtime_remote_time)
+ CHECK_EQ(result.header->realtime_sent_time, realtime_remote_time)
<< ": Queue index matches, but timestamp doesn't. Please "
"investigate!";