Publish recreated RemoteMessages delayed from LogReader
This makes the new log -> replay -> logger loop fully correct, and
preserves the timestamp. Note: replaying log files from before this
time will cause RemoteMessage to come out at the wrong time and may
cause problems when replaying. There should be a small number of logs
that will ever be replayed which matter, and those can be recollected.
Change-Id: Ia7350ef8648276623833cc666e955e335b6bde1f
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 91cf7dc..9da790a 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -874,6 +874,11 @@
message_header_builder.add_remote_queue_index(
msg->remote_queue_index());
+ message_header_builder.add_monotonic_timestamp_time(
+ f.fetcher->context()
+ .monotonic_event_time.time_since_epoch()
+ .count());
+
fbb.FinishSizePrefixed(message_header_builder.Finish());
const auto end = event_loop_->monotonic_now();
RecordCreateMessageTime(start, end, &f);
@@ -1278,7 +1283,7 @@
logged_configuration()->channels()->Get(logged_channel_index));
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- aos::Sender<RemoteMessage> *remote_timestamp_sender = nullptr;
+ RemoteMessageSender *remote_timestamp_sender = nullptr;
State *source_state = nullptr;
@@ -1861,7 +1866,7 @@
size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- aos::Sender<RemoteMessage> *remote_timestamp_sender, State *source_state) {
+ RemoteMessageSender *remote_timestamp_sender, State *source_state) {
channels_[logged_channel_index] = std::move(sender);
filters_[logged_channel_index] = filter;
remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
@@ -1949,15 +1954,12 @@
timestamp);
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
- aos::Sender<RemoteMessage>::Builder builder =
- remote_timestamp_senders_[timestamped_message.channel_index]
- ->MakeBuilder();
-
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
- builder.fbb()->CreateString(event_loop_->boot_uuid().string_view());
+ fbb.CreateString(event_loop_->boot_uuid().string_view());
- RemoteMessage::Builder message_header_builder =
- builder.MakeBuilder<RemoteMessage>();
+ RemoteMessage::Builder message_header_builder(fbb);
message_header_builder.add_channel_index(
factory_channel_index_[timestamped_message.channel_index]);
@@ -1978,27 +1980,90 @@
message_header_builder.add_remote_queue_index(remote_queue_index);
message_header_builder.add_boot_uuid(boot_uuid_offset);
- builder.Send(message_header_builder.Finish());
+ fbb.Finish(message_header_builder.Finish());
+
+ remote_timestamp_senders_[timestamped_message.channel_index]->Send(
+ FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
+ timestamped_message.monotonic_timestamp_time);
}
return true;
}
-aos::Sender<RemoteMessage> *LogReader::State::RemoteTimestampSender(
+LogReader::RemoteMessageSender::RemoteMessageSender(
+ aos::Sender<message_bridge::RemoteMessage> sender, EventLoop *event_loop)
+ : event_loop_(event_loop),
+ sender_(std::move(sender)),
+ timer_(event_loop->AddTimer([this]() { SendTimestamp(); })) {}
+
+void LogReader::RemoteMessageSender::ScheduleTimestamp() {
+ if (remote_timestamps_.empty()) {
+ CHECK_NOTNULL(timer_);
+ timer_->Disable();
+ scheduled_time_ = monotonic_clock::min_time;
+ return;
+ }
+
+ if (scheduled_time_ != remote_timestamps_.front().monotonic_timestamp_time) {
+ CHECK_NOTNULL(timer_);
+ timer_->Setup(
+ remote_timestamps_.front().monotonic_timestamp_time);
+ scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
+ }
+}
+
+void LogReader::RemoteMessageSender::Send(
+ FlatbufferDetachedBuffer<RemoteMessage> remote_message,
+ monotonic_clock::time_point monotonic_timestamp_time) {
+ // There are 2 cases. Either we have a monotonic_timestamp_time and need to
+ // resend the timestamp at the correct time, or we don't and can send it
+ // immediately.
+ if (monotonic_timestamp_time == monotonic_clock::min_time) {
+ CHECK(remote_timestamps_.empty())
+ << ": Unsupported mix of timestamps and no timestamps.";
+ sender_.Send(std::move(remote_message));
+ } else {
+ remote_timestamps_.emplace_back(std::move(remote_message),
+ monotonic_timestamp_time);
+ ScheduleTimestamp();
+ }
+}
+
+void LogReader::RemoteMessageSender::SendTimestamp() {
+ CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_);
+ CHECK(!remote_timestamps_.empty());
+
+ // Send out all timestamps at the currently scheduled time.
+ while (remote_timestamps_.front().monotonic_timestamp_time ==
+ scheduled_time_) {
+ sender_.Send(std::move(remote_timestamps_.front().remote_message));
+ remote_timestamps_.pop_front();
+ if (remote_timestamps_.empty()) {
+ break;
+ }
+ }
+ scheduled_time_ = monotonic_clock::min_time;
+
+ ScheduleTimestamp();
+}
+
+LogReader::RemoteMessageSender *LogReader::State::RemoteTimestampSender(
const Node *delivered_node) {
auto sender = remote_timestamp_senders_map_.find(delivered_node);
if (sender == remote_timestamp_senders_map_.end()) {
- sender = remote_timestamp_senders_map_
- .emplace(std::make_pair(
- delivered_node,
- event_loop()->MakeSender<RemoteMessage>(
- absl::StrCat("/aos/remote_timestamps/",
- delivered_node->name()->string_view()))))
- .first;
+ sender =
+ remote_timestamp_senders_map_
+ .emplace(delivered_node,
+ std::make_unique<RemoteMessageSender>(
+ event_loop()->MakeSender<RemoteMessage>(absl::StrCat(
+ "/aos/remote_timestamps/",
+ delivered_node->name()->string_view())),
+ event_loop()))
+ .first;
}
- return &(sender->second);
+ return sender->second.get();
}
const TimestampedMessage &LogReader::State::PeekOldest() {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e71f31d..e3f5380 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -491,6 +491,49 @@
const std::vector<LogFile> log_files_;
+ // Class to manage sending RemoteMessages on the provided node after the
+ // correct delay.
+ class RemoteMessageSender{
+ public:
+ RemoteMessageSender(aos::Sender<message_bridge::RemoteMessage> sender,
+ EventLoop *event_loop);
+ RemoteMessageSender(RemoteMessageSender const &) = delete;
+ RemoteMessageSender &operator=(RemoteMessageSender const &) = delete;
+
+ // Sends the provided message. If monotonic_timestamp_time is min_time,
+ // send it immediately.
+ void Send(
+ FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
+ monotonic_clock::time_point monotonic_timestamp_time);
+
+ private:
+ // Handles actually sending the timestamp if we were delayed.
+ void SendTimestamp();
+ // Handles scheduling the timer to send at the correct time.
+ void ScheduleTimestamp();
+
+ EventLoop *event_loop_;
+ aos::Sender<message_bridge::RemoteMessage> sender_;
+ aos::TimerHandler *timer_;
+
+ // Time we are scheduled for, or min_time if we aren't scheduled.
+ monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
+
+ struct Timestamp {
+ Timestamp(FlatbufferDetachedBuffer<message_bridge::RemoteMessage>
+ new_remote_message,
+ monotonic_clock::time_point new_monotonic_timestamp_time)
+ : remote_message(std::move(new_remote_message)),
+ monotonic_timestamp_time(new_monotonic_timestamp_time) {}
+ FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message;
+ monotonic_clock::time_point monotonic_timestamp_time;
+ };
+
+ // List of messages to send. The timer works through them and then disables
+ // itself automatically.
+ std::deque<Timestamp> remote_timestamps_;
+ };
+
// State per node.
class State {
public:
@@ -545,8 +588,7 @@
// Returns the MessageHeader sender to log delivery timestamps to for the
// provided remote node.
- aos::Sender<message_bridge::RemoteMessage> *RemoteTimestampSender(
- const Node *delivered_node);
+ RemoteMessageSender *RemoteTimestampSender(const Node *delivered_node);
// Converts a timestamp from the monotonic clock on this node to the
// distributed clock.
@@ -581,12 +623,11 @@
void SetChannelCount(size_t count);
// Sets the sender, filter, and target factory for a channel.
- void SetChannel(
- size_t logged_channel_index, size_t factory_channel_index,
- std::unique_ptr<RawSender> sender,
- message_bridge::NoncausalOffsetEstimator *filter,
- aos::Sender<message_bridge::RemoteMessage> *remote_timestamp_sender,
- State *source_state);
+ void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+ std::unique_ptr<RawSender> sender,
+ message_bridge::NoncausalOffsetEstimator *filter,
+ RemoteMessageSender *remote_timestamp_sender,
+ State *source_state);
// Returns if we have read all the messages from all the logs.
bool at_end() const {
@@ -642,8 +683,7 @@
// Senders.
std::vector<std::unique_ptr<RawSender>> channels_;
- std::vector<aos::Sender<message_bridge::RemoteMessage> *>
- remote_timestamp_senders_;
+ std::vector<RemoteMessageSender *> remote_timestamp_senders_;
// The mapping from logged channel index to sent channel index. Needed for
// sending out MessageHeaders.
std::vector<int> factory_channel_index_;
@@ -684,7 +724,7 @@
// channel) which correspond to the originating node.
std::vector<State *> channel_source_state_;
- std::map<const Node *, aos::Sender<message_bridge::RemoteMessage>>
+ std::map<const Node *, std::unique_ptr<RemoteMessageSender>>
remote_timestamp_senders_map_;
};
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 4a311ca..05bafe4 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1590,12 +1590,14 @@
const size_t pong_timestamp_channel = configuration::ChannelIndex(
pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
+ const chrono::nanoseconds network_delay = event_loop_factory_.network_delay();
+
pi1_event_loop->MakeWatcher(
"/aos/remote_timestamps/pi2",
[&pi1_event_loop, &pi2_event_loop, pi1_timestamp_channel,
ping_timestamp_channel, &pi1_timestamp_on_pi1_fetcher,
&pi1_timestamp_on_pi2_fetcher, &ping_on_pi1_fetcher,
- &ping_on_pi2_fetcher](const RemoteMessage &header) {
+ &ping_on_pi2_fetcher, network_delay](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
const aos::realtime_clock::time_point header_realtime_sent_time(
@@ -1645,13 +1647,19 @@
header_realtime_remote_time);
EXPECT_EQ(pi1_context->monotonic_event_time,
header_monotonic_remote_time);
+
+ EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
+ pi2_context->monotonic_event_time +
+ (pi1_event_loop->monotonic_now() -
+ pi2_event_loop->monotonic_now()) +
+ network_delay);
});
pi2_event_loop->MakeWatcher(
"/aos/remote_timestamps/pi1",
[&pi2_event_loop, &pi1_event_loop, pi2_timestamp_channel,
pong_timestamp_channel, &pi2_timestamp_on_pi2_fetcher,
&pi2_timestamp_on_pi1_fetcher, &pong_on_pi2_fetcher,
- &pong_on_pi1_fetcher](const RemoteMessage &header) {
+ &pong_on_pi1_fetcher, network_delay](const RemoteMessage &header) {
const aos::monotonic_clock::time_point header_monotonic_sent_time(
chrono::nanoseconds(header.monotonic_sent_time()));
const aos::realtime_clock::time_point header_realtime_sent_time(
@@ -1701,6 +1709,12 @@
header_realtime_remote_time);
EXPECT_EQ(pi2_context->monotonic_event_time,
header_monotonic_remote_time);
+
+ EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
+ pi1_context->monotonic_event_time +
+ (pi2_event_loop->monotonic_now() -
+ pi1_event_loop->monotonic_now()) +
+ network_delay);
});
// And confirm we can re-create a log again, while checking the contents.