Publish recreated RemoteMessages delayed from LogReader
This makes the new log -> replay -> logger loop fully correct, and
preserves the timestamp. Note: replaying log files from before this
time will cause RemoteMessage to come out at the wrong time and may
cause problems when replaying. There should be a small number of logs
that will ever be replayed which matter, and those can be recollected.
Change-Id: Ia7350ef8648276623833cc666e955e335b6bde1f
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 91cf7dc..9da790a 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -874,6 +874,11 @@
message_header_builder.add_remote_queue_index(
msg->remote_queue_index());
+ message_header_builder.add_monotonic_timestamp_time(
+ f.fetcher->context()
+ .monotonic_event_time.time_since_epoch()
+ .count());
+
fbb.FinishSizePrefixed(message_header_builder.Finish());
const auto end = event_loop_->monotonic_now();
RecordCreateMessageTime(start, end, &f);
@@ -1278,7 +1283,7 @@
logged_configuration()->channels()->Get(logged_channel_index));
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- aos::Sender<RemoteMessage> *remote_timestamp_sender = nullptr;
+ RemoteMessageSender *remote_timestamp_sender = nullptr;
State *source_state = nullptr;
@@ -1861,7 +1866,7 @@
size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- aos::Sender<RemoteMessage> *remote_timestamp_sender, State *source_state) {
+ RemoteMessageSender *remote_timestamp_sender, State *source_state) {
channels_[logged_channel_index] = std::move(sender);
filters_[logged_channel_index] = filter;
remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
@@ -1949,15 +1954,12 @@
timestamp);
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
- aos::Sender<RemoteMessage>::Builder builder =
- remote_timestamp_senders_[timestamped_message.channel_index]
- ->MakeBuilder();
-
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
- builder.fbb()->CreateString(event_loop_->boot_uuid().string_view());
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
- RemoteMessage::Builder message_header_builder =
- builder.MakeBuilder<RemoteMessage>();
+ RemoteMessage::Builder message_header_builder(fbb);
message_header_builder.add_channel_index(
factory_channel_index_[timestamped_message.channel_index]);
@@ -1978,27 +1980,90 @@
message_header_builder.add_remote_queue_index(remote_queue_index);
message_header_builder.add_boot_uuid(boot_uuid_offset);
- builder.Send(message_header_builder.Finish());
+ fbb.Finish(message_header_builder.Finish());
+
+ remote_timestamp_senders_[timestamped_message.channel_index]->Send(
+ FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
+ timestamped_message.monotonic_timestamp_time);
}
return true;
}
-aos::Sender<RemoteMessage> *LogReader::State::RemoteTimestampSender(
+LogReader::RemoteMessageSender::RemoteMessageSender(
+ aos::Sender<message_bridge::RemoteMessage> sender, EventLoop *event_loop)
+ : event_loop_(event_loop),
+ sender_(std::move(sender)),
+ timer_(event_loop->AddTimer([this]() { SendTimestamp(); })) {}
+
+void LogReader::RemoteMessageSender::ScheduleTimestamp() {
+ if (remote_timestamps_.empty()) {
+ CHECK_NOTNULL(timer_);
+ timer_->Disable();
+ scheduled_time_ = monotonic_clock::min_time;
+ return;
+ }
+
+ if (scheduled_time_ != remote_timestamps_.front().monotonic_timestamp_time) {
+ CHECK_NOTNULL(timer_);
+ timer_->Setup(
+ remote_timestamps_.front().monotonic_timestamp_time);
+ scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
+ }
+}
+
+void LogReader::RemoteMessageSender::Send(
+ FlatbufferDetachedBuffer<RemoteMessage> remote_message,
+ monotonic_clock::time_point monotonic_timestamp_time) {
+ // There are 2 cases. Either we have a monotonic_timestamp_time and need to
+ // resend the timestamp at the correct time, or we don't and can send it
+ // immediately.
+ if (monotonic_timestamp_time == monotonic_clock::min_time) {
+ CHECK(remote_timestamps_.empty())
+ << ": Unsupported mix of timestamps and no timestamps.";
+ sender_.Send(std::move(remote_message));
+ } else {
+ remote_timestamps_.emplace_back(std::move(remote_message),
+ monotonic_timestamp_time);
+ ScheduleTimestamp();
+ }
+}
+
+void LogReader::RemoteMessageSender::SendTimestamp() {
+ CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_);
+ CHECK(!remote_timestamps_.empty());
+
+ // Send out all timestamps at the currently scheduled time.
+ while (remote_timestamps_.front().monotonic_timestamp_time ==
+ scheduled_time_) {
+ sender_.Send(std::move(remote_timestamps_.front().remote_message));
+ remote_timestamps_.pop_front();
+ if (remote_timestamps_.empty()) {
+ break;
+ }
+ }
+ scheduled_time_ = monotonic_clock::min_time;
+
+ ScheduleTimestamp();
+}
+
+LogReader::RemoteMessageSender *LogReader::State::RemoteTimestampSender(
const Node *delivered_node) {
auto sender = remote_timestamp_senders_map_.find(delivered_node);
if (sender == remote_timestamp_senders_map_.end()) {
- sender = remote_timestamp_senders_map_
- .emplace(std::make_pair(
- delivered_node,
- event_loop()->MakeSender<RemoteMessage>(
- absl::StrCat("/aos/remote_timestamps/",
- delivered_node->name()->string_view()))))
- .first;
+ sender =
+ remote_timestamp_senders_map_
+ .emplace(delivered_node,
+ std::make_unique<RemoteMessageSender>(
+ event_loop()->MakeSender<RemoteMessage>(absl::StrCat(
+ "/aos/remote_timestamps/",
+ delivered_node->name()->string_view())),
+ event_loop()))
+ .first;
}
- return &(sender->second);
+ return sender->second.get();
}
const TimestampedMessage &LogReader::State::PeekOldest() {