Recreate remote timestamp logging in LogReader
It is useful to be able to log data, replay it into a simulation, and
then recreate a log again. To do this, we need remote timestamps to
work correctly.
When LogReader replays a forwarded message, it now creates the
corresponding MessageHeader and publishes it. It also tracks the queue
indicies such that the message is valid and can be logged.
Logger also translates channel indices as well when the logging config
is not the event loop config.
Change-Id: Iff6175a204b191c6f43a1d73ffce5b542925860c
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 161300c..dd10f38 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -110,6 +110,7 @@
std::unique_ptr<RawFetcher> fetcher;
bool written = false;
+ // Channel index to log to.
int channel_index = -1;
const Channel *channel = nullptr;
const Node *timestamp_node = nullptr;
@@ -130,6 +131,10 @@
int node_index = 0;
};
+ // Vector mapping from the channel index from the event loop to the logged
+ // channel index.
+ std::vector<int> event_loop_to_logged_channel_index_;
+
struct NodeState {
aos::monotonic_clock::time_point monotonic_start_time =
aos::monotonic_clock::min_time;
@@ -460,6 +465,11 @@
}
}
+ // Returns the MessageHeader sender to log delivery timestamps to for the
+ // provided remote node.
+ aos::Sender<MessageHeader> *RemoteTimestampSender(
+ const Node *delivered_node);
+
// Converts a timestamp from the monotonic clock on this node to the
// distributed clock.
distributed_clock::time_point ToDistributedClock(
@@ -482,17 +492,19 @@
// Returns the current time on the remote node which sends messages on
// channel_index.
monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
- return channel_target_event_loop_factory_[channel_index]->monotonic_now();
+ return channel_source_state_[channel_index]
+ ->node_event_loop_factory_->monotonic_now();
}
distributed_clock::time_point RemoteToDistributedClock(
size_t channel_index, monotonic_clock::time_point time) {
- return channel_target_event_loop_factory_[channel_index]
- ->ToDistributedClock(time);
+ return channel_source_state_[channel_index]
+ ->node_event_loop_factory_->ToDistributedClock(time);
}
const Node *remote_node(size_t channel_index) {
- return channel_target_event_loop_factory_[channel_index]->node();
+ return channel_source_state_[channel_index]
+ ->node_event_loop_factory_->node();
}
monotonic_clock::time_point monotonic_now() {
@@ -507,9 +519,11 @@
void SetChannelCount(size_t count);
// Sets the sender, filter, and target factory for a channel.
- void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
+ void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+ std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter,
- NodeEventLoopFactory *channel_target_event_loop_factory);
+ aos::Sender<MessageHeader> *remote_timestamp_sender,
+ State *source_state);
// Returns if we have read all the messages from all the logs.
bool at_end() const { return channel_merger_->at_end(); }
@@ -529,13 +543,7 @@
// Sends a buffer on the provided channel index.
bool Send(size_t channel_index, const void *data, size_t size,
- aos::monotonic_clock::time_point monotonic_remote_time,
- aos::realtime_clock::time_point realtime_remote_time,
- uint32_t remote_queue_index) {
- return channels_[channel_index]->Send(data, size, monotonic_remote_time,
- realtime_remote_time,
- remote_queue_index);
- }
+ const TimestampMerger::DeliveryTimestamp &delivery_timestamp);
// Returns a debug string for the channel merger.
std::string DebugString() const {
@@ -568,6 +576,28 @@
// Senders.
std::vector<std::unique_ptr<RawSender>> channels_;
+ std::vector<aos::Sender<MessageHeader> *> remote_timestamp_senders_;
+ // The mapping from logged channel index to sent channel index. Needed for
+ // sending out MessageHeaders.
+ std::vector<int> factory_channel_index_;
+
+ struct SentTimestamp {
+ monotonic_clock::time_point monotonic_event_time =
+ monotonic_clock::min_time;
+ realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+ uint32_t queue_index = 0xffffffff;
+
+ // The queue index that this message *actually* was sent with.
+ uint32_t actual_queue_index = 0xffffffff;
+ };
+
+ // Stores all the timestamps that have been sent on this channel. This is
+ // only done for channels which are forwarded and on the node which
+ // initially sends the message.
+ //
+ // TODO(austin): This whole concept is a hack. We should be able to
+ // associate state with the message as it gets sorted and recover it.
+ std::vector<std::unique_ptr<std::vector<SentTimestamp>>> queue_index_map_;
// Factory (if we are in sim) that this loop was created on.
NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
@@ -585,7 +615,10 @@
// List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
// channel) which correspond to the originating node.
- std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
+ std::vector<State *> channel_source_state_;
+
+ std::map<const Node *, aos::Sender<MessageHeader>>
+ remote_timestamp_senders_map_;
};
// Node index -> State.