LogReader Before Send Callback
Adds the ability to add a callback to LogReader to mutate or act on a
message right before it is sent.
Change-Id: I94a6b9fa2074c0a9aa8ea23cbc979e6cbce4bd05
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 75a53e1..d0957bf 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -7,6 +7,7 @@
#include <sys/uio.h>
#include <climits>
+#include <utility>
#include <vector>
#include "absl/strings/escaping.h"
@@ -378,6 +379,8 @@
}
states_.resize(configuration()->nodes()->size());
}
+
+ before_send_callbacks_.resize(configuration()->channels()->size());
}
LogReader::~LogReader() {
@@ -620,7 +623,8 @@
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
- State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndices(node));
+ State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndices(node),
+ before_send_callbacks_);
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -804,7 +808,8 @@
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
- State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndices(node));
+ State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndices(node),
+ before_send_callbacks_);
State *state = states_[node_index].get();
state->SetChannelCount(logged_configuration()->channels()->size());
@@ -1771,13 +1776,16 @@
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
std::function<void()> notice_realtime_end, const Node *node,
LogReader::State::ThreadedBuffering threading,
- std::unique_ptr<const ReplayChannelIndices> replay_channel_indices)
+ std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
+ const std::vector<std::function<void(void *message)>>
+ &before_send_callbacks)
: timestamp_mapper_(std::move(timestamp_mapper)),
notice_realtime_end_(notice_realtime_end),
node_(node),
multinode_filters_(multinode_filters),
threading_(threading),
- replay_channel_indices_(std::move(replay_channel_indices)) {
+ replay_channel_indices_(std::move(replay_channel_indices)),
+ before_send_callbacks_(before_send_callbacks) {
// If timestamp_mapper_ is nullptr, then there are no log parts associated
// with this node. If there are no log parts for the node, there will be no
// log data, and so we do not need to worry about the replay channel filters.
@@ -1883,7 +1891,7 @@
timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
}
-bool LogReader::State::Send(const TimestampedMessage ×tamped_message) {
+bool LogReader::State::Send(const TimestampedMessage &×tamped_message) {
aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
CHECK(sender);
uint32_t remote_queue_index = 0xffffffff;
@@ -1973,6 +1981,16 @@
->boot_uuid());
}
+ // Right before sending allow the user to process the message.
+ if (before_send_callbacks_[timestamped_message.channel_index]) {
+ // Only channels that are forwarded and sent from this State's node will be
+ // in the queue_index_map_
+ if (queue_index_map_[timestamped_message.channel_index]) {
+ before_send_callbacks_[timestamped_message.channel_index](
+ timestamped_message.data->mutable_data());
+ }
+ }
+
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
const RawSender::Error err = sender->Send(
@@ -2426,5 +2444,14 @@
}
}
+bool LogReader::AreStatesInitialized() const {
+ for (const auto &state : states_) {
+ if (state) {
+ return true;
+ }
+ }
+ return false;
+}
+
} // namespace logger
} // namespace aos