Add LogReader ReplayChannels filtering
LogReader has a new input into its constructor which is replay_channels containing the
name & type pairs of channels to replay on. As a part of construction,
LogReader takes this replay_channels and acquires the channel indicies of
the channels being replayed and uses that to check on sending a message
if the channel is included in replay_channels or not. This functionality is
contained within TimestampMapper which takes a lambda to do the actual
filtering when calling Front().
Change-Id: I614bc70f89afab2e7f6d00a36dc569518d1edc5a
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 7105b0c..346a36e 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -787,12 +787,25 @@
}
}
+ // Sets the callback that can be used to skip messages.
+ void set_replay_channels_callback(
+ std::function<bool(const TimestampedMessage &)> fn) {
+ replay_channels_callback_ = fn;
+ }
+
// Sets a callback to be called whenever a full message is queued.
void set_timestamp_callback(std::function<void(TimestampedMessage *)> fn) {
timestamp_callback_ = fn;
}
private:
+ // Result of MaybeQueueMatched
+ enum class MatchResult : uint8_t {
+ kEndOfFile, // End of the log file being read
+ kQueued, // Message was queued
+ kSkipped // Message was skipped over
+ };
+
// The state for a remote node. This holds the data that needs to be matched
// with the remote node's timestamps.
struct NodeData {
@@ -835,6 +848,10 @@
// true if one was queued, and false otherwise.
bool QueueMatched();
+ // Queues a message if the replay_channels_callback is passed and the end of
+ // the log file has not been reached.
+ MatchResult MaybeQueueMatched();
+
// Queues up data until we have at least one message >= to time t.
// Useful for triggering a remote node to read enough data to have the
// timestamp you care about available.
@@ -843,6 +860,11 @@
// Queues m into matched_messages_.
void QueueMessage(Message *m);
+ // If a replay_channels_callback was set and the callback returns false, a
+ // matched message is popped and true is returned. Otherwise false is
+ // returned.
+ bool CheckReplayChannelsAndMaybePop(const TimestampedMessage &message);
+
// Returns the name of the node this class is sorting for.
std::string_view node_name() const {
return configuration_->has_nodes() ? configuration_->nodes()
@@ -886,6 +908,7 @@
BootTimestamp queued_until_ = BootTimestamp::min_time();
std::function<void(TimestampedMessage *)> timestamp_callback_;
+ std::function<bool(TimestampedMessage &)> replay_channels_callback_;
};
// Returns the node name with a trailing space, or an empty string if we are on