Publish recreated RemoteMessages delayed from LogReader
This makes the new log -> replay -> logger loop fully correct, and
preserves the timestamp. Note: replaying log files from before this
time will cause RemoteMessage to come out at the wrong time and may
cause problems when replaying. There should be a small number of logs
that will ever be replayed which matter, and those can be recollected.
Change-Id: Ia7350ef8648276623833cc666e955e335b6bde1f
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e71f31d..e3f5380 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -491,6 +491,49 @@
const std::vector<LogFile> log_files_;
+ // Class to manage sending RemoteMessages on the provided node after the
+ // correct delay.
+ class RemoteMessageSender{
+ public:
+ RemoteMessageSender(aos::Sender<message_bridge::RemoteMessage> sender,
+ EventLoop *event_loop);
+ RemoteMessageSender(RemoteMessageSender const &) = delete;
+ RemoteMessageSender &operator=(RemoteMessageSender const &) = delete;
+
+ // Sends the provided message. If monotonic_timestamp_time is min_time,
+ // send it immediately.
+ void Send(
+ FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message,
+ monotonic_clock::time_point monotonic_timestamp_time);
+
+ private:
+ // Handles actually sending the timestamp if we were delayed.
+ void SendTimestamp();
+ // Handles scheduling the timer to send at the correct time.
+ void ScheduleTimestamp();
+
+ EventLoop *event_loop_;
+ aos::Sender<message_bridge::RemoteMessage> sender_;
+ aos::TimerHandler *timer_;
+
+ // Time we are scheduled for, or min_time if we aren't scheduled.
+ monotonic_clock::time_point scheduled_time_ = monotonic_clock::min_time;
+
+ struct Timestamp {
+ Timestamp(FlatbufferDetachedBuffer<message_bridge::RemoteMessage>
+ new_remote_message,
+ monotonic_clock::time_point new_monotonic_timestamp_time)
+ : remote_message(std::move(new_remote_message)),
+ monotonic_timestamp_time(new_monotonic_timestamp_time) {}
+ FlatbufferDetachedBuffer<message_bridge::RemoteMessage> remote_message;
+ monotonic_clock::time_point monotonic_timestamp_time;
+ };
+
+ // List of messages to send. The timer works through them and then disables
+ // itself automatically.
+ std::deque<Timestamp> remote_timestamps_;
+ };
+
// State per node.
class State {
public:
@@ -545,8 +588,7 @@
// Returns the MessageHeader sender to log delivery timestamps to for the
// provided remote node.
- aos::Sender<message_bridge::RemoteMessage> *RemoteTimestampSender(
- const Node *delivered_node);
+ RemoteMessageSender *RemoteTimestampSender(const Node *delivered_node);
// Converts a timestamp from the monotonic clock on this node to the
// distributed clock.
@@ -581,12 +623,11 @@
void SetChannelCount(size_t count);
// Sets the sender, filter, and target factory for a channel.
- void SetChannel(
- size_t logged_channel_index, size_t factory_channel_index,
- std::unique_ptr<RawSender> sender,
- message_bridge::NoncausalOffsetEstimator *filter,
- aos::Sender<message_bridge::RemoteMessage> *remote_timestamp_sender,
- State *source_state);
+ void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+ std::unique_ptr<RawSender> sender,
+ message_bridge::NoncausalOffsetEstimator *filter,
+ RemoteMessageSender *remote_timestamp_sender,
+ State *source_state);
// Returns if we have read all the messages from all the logs.
bool at_end() const {
@@ -642,8 +683,7 @@
// Senders.
std::vector<std::unique_ptr<RawSender>> channels_;
- std::vector<aos::Sender<message_bridge::RemoteMessage> *>
- remote_timestamp_senders_;
+ std::vector<RemoteMessageSender *> remote_timestamp_senders_;
// The mapping from logged channel index to sent channel index. Needed for
// sending out MessageHeaders.
std::vector<int> factory_channel_index_;
@@ -684,7 +724,7 @@
// channel) which correspond to the originating node.
std::vector<State *> channel_source_state_;
- std::map<const Node *, aos::Sender<message_bridge::RemoteMessage>>
+ std::map<const Node *, std::unique_ptr<RemoteMessageSender>>
remote_timestamp_senders_map_;
};