Add LogReader ReplayChannels filtering
LogReader has a new input into its constructor which is replay_channels containing the
name & type pairs of channels to replay on. As a part of construction,
LogReader takes this replay_channels and acquires the channel indicies of
the channels being replayed and uses that to check on sending a message
if the channel is included in replay_channels or not. This functionality is
contained within TimestampMapper which takes a lambda to do the actual
filtering when calling Front().
Change-Id: I614bc70f89afab2e7f6d00a36dc569518d1edc5a
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 07cca48..0d50fb9 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -31,6 +31,12 @@
class EventNotifier;
+// Vector of pair of name and type of the channel
+using ReplayChannels =
+ std::vector<std::pair<std::string_view, std::string_view>>;
+// Vector of channel indices
+using ReplayChannelIndicies = std::vector<size_t>;
+
// We end up with one of the following 3 log file types.
//
// Single node logged as the source node.
@@ -67,11 +73,16 @@
// pass it in here. It must provide all the channels that the original logged
// config did.
//
+ // If certain messages should not be replayed, the replay_channels param can
+ // be used as an inclusive list of channels for messages to be replayed.
+ //
// The single file constructor calls SortParts internally.
LogReader(std::string_view filename,
- const Configuration *replay_configuration = nullptr);
+ const Configuration *replay_configuration = nullptr,
+ const ReplayChannels *replay_channels = nullptr);
LogReader(std::vector<LogFile> log_files,
- const Configuration *replay_configuration = nullptr);
+ const Configuration *replay_configuration = nullptr,
+ const ReplayChannels *replay_channels = nullptr);
~LogReader();
// Registers all the callbacks to send the log file data out on an event loop
@@ -332,7 +343,8 @@
enum class ThreadedBuffering { kYes, kNo };
State(std::unique_ptr<TimestampMapper> timestamp_mapper,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
- const Node *node, ThreadedBuffering threading);
+ const Node *node, ThreadedBuffering threading,
+ std::unique_ptr<const ReplayChannelIndicies> replay_channel_indicies);
// Connects up the timestamp mappers.
void AddPeer(State *peer);
@@ -728,8 +740,18 @@
std::optional<BootTimestamp> last_queued_message_;
std::optional<util::ThreadedQueue<TimestampedMessage, BootTimestamp>>
message_queuer_;
+
+ // If a ReplayChannels was passed to LogReader, this will hold the
+ // indices of the channels to replay for the Node represented by
+ // the instance of LogReader::State.
+ std::unique_ptr<const ReplayChannelIndicies> replay_channel_indicies_;
};
+ // If a ReplayChannels was passed to LogReader then creates a
+ // ReplayChannelIndicies for the given node. Otherwise, returns a nullptr.
+ std::unique_ptr<const ReplayChannelIndicies> MaybeMakeReplayChannelIndicies(
+ const Node *node);
+
// Node index -> State.
std::vector<std::unique_ptr<State>> states_;
@@ -766,6 +788,10 @@
const Configuration *remapped_configuration_ = nullptr;
const Configuration *replay_configuration_ = nullptr;
+ // If a ReplayChannels was passed to LogReader, this will hold the
+ // name and type of channels to replay which is used when creating States.
+ const ReplayChannels *replay_channels_ = nullptr;
+
// If true, the replay timer will ignore any missing data. This is used
// during startup when we are bootstrapping everything and trying to get to
// the start of all the log files.