Add LogReader ReplayChannels filtering
LogReader has a new input into its constructor which is replay_channels containing the
name & type pairs of channels to replay on. As a part of construction,
LogReader takes this replay_channels and acquires the channel indicies of
the channels being replayed and uses that to check on sending a message
if the channel is included in replay_channels or not. This functionality is
contained within TimestampMapper which takes a lambda to do the actual
filtering when calling Front().
Change-Id: I614bc70f89afab2e7f6d00a36dc569518d1edc5a
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 94337d3..ab56356 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1804,12 +1804,30 @@
}
bool TimestampMapper::QueueMatched() {
+ MatchResult result = MatchResult::kEndOfFile;
+ do {
+ result = MaybeQueueMatched();
+ } while (result == MatchResult::kSkipped);
+ return result == MatchResult::kQueued;
+}
+
+bool TimestampMapper::CheckReplayChannelsAndMaybePop(
+ const TimestampedMessage & /*message*/) {
+ if (replay_channels_callback_ &&
+ !replay_channels_callback_(matched_messages_.back())) {
+ matched_messages_.pop_back();
+ return true;
+ }
+ return false;
+}
+
+TimestampMapper::MatchResult TimestampMapper::MaybeQueueMatched() {
if (nodes_data_.empty()) {
// Simple path. We are single node, so there are no timestamps to match!
CHECK_EQ(messages_.size(), 0u);
Message *m = boot_merger_.Front();
if (!m) {
- return false;
+ return MatchResult::kEndOfFile;
}
// Enqueue this message into matched_messages_ so we have a place to
// associate remote timestamps, and return it.
@@ -1821,7 +1839,10 @@
// We are thin wrapper around node_merger. Call it directly.
boot_merger_.PopFront();
timestamp_callback_(&matched_messages_.back());
- return true;
+ if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
+ return MatchResult::kSkipped;
+ }
+ return MatchResult::kQueued;
}
// We need to only add messages to the list so they get processed for
@@ -1830,7 +1851,7 @@
if (messages_.empty()) {
if (!Queue()) {
// Found nothing to add, we are out of data!
- return false;
+ return MatchResult::kEndOfFile;
}
// Now that it has been added (and cannibalized), forget about it
@@ -1847,7 +1868,10 @@
last_message_time_ = matched_messages_.back().monotonic_event_time;
messages_.pop_front();
timestamp_callback_(&matched_messages_.back());
- return true;
+ if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
+ return MatchResult::kSkipped;
+ }
+ return MatchResult::kQueued;
} else {
// Got a timestamp, find the matching remote data, match it, and return
// it.
@@ -1874,7 +1898,10 @@
// Since messages_ holds the data, drop it.
messages_.pop_front();
timestamp_callback_(&matched_messages_.back());
- return true;
+ if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
+ return MatchResult::kSkipped;
+ }
+ return MatchResult::kQueued;
}
}