LogReader Before Send Callback
Adds the ability to add a callback to LogReader to mutate or act on a
message right before it is sent.
Change-Id: I94a6b9fa2074c0a9aa8ea23cbc979e6cbce4bd05
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index fb6d6f9..b691b30 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -12,6 +12,7 @@
#include "aos/condition.h"
#include "aos/events/event_loop.h"
+#include "aos/events/event_loop_tmpl.h"
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
@@ -352,6 +353,48 @@
// Only applies when running against a SimulatedEventLoopFactory.
void SetRealtimeReplayRate(double replay_rate);
+ // Adds a callback for a channel to be called right before sending a message.
+ // This allows a user to mutate a message or do any processing when a specific
+ // type of message is sent on a channel. The name and type of the channel
+ // corresponds to the logged_configuration's name and type.
+ //
+ // Note, only one callback can be registered per channel in the current
+ // implementation. And, the callback is called only once one the Sender's Node
+ // if the channel is forwarded.
+ //
+ // See multinode_logger_test for examples of usage.
+ template <typename Callback>
+ void AddBeforeSendCallback(std::string_view channel_name,
+ Callback &&callback) {
+ CHECK(!AreStatesInitialized())
+ << ": Cannot add callbacks after calling Register";
+
+ using MessageType = typename std::remove_pointer<
+ typename event_loop_internal::watch_message_type_trait<
+ decltype(&Callback::operator())>::message_type>::type;
+
+ const Channel *channel = configuration::GetChannel(
+ logged_configuration(), channel_name,
+ MessageType::GetFullyQualifiedName(), "", nullptr);
+
+ CHECK(channel != nullptr)
+ << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+ << MessageType::GetFullyQualifiedName()
+ << "\" } not found in config for application.";
+ auto channel_index =
+ configuration::ChannelIndex(logged_configuration(), channel);
+
+ CHECK(!before_send_callbacks_[channel_index])
+ << ": Before Send Callback already registered for channel "
+ << ":{ \"name\": \"" << channel_name << "\", \"type\": \""
+ << MessageType::GetFullyQualifiedName() << "\" }";
+
+ before_send_callbacks_[channel_index] = [callback](void *message) {
+ callback(flatbuffers::GetMutableRoot<MessageType>(
+ reinterpret_cast<char *>(message)));
+ };
+ }
+
private:
void Register(EventLoop *event_loop, const Node *node);
@@ -433,7 +476,9 @@
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
std::function<void()> notice_realtime_end, const Node *node,
ThreadedBuffering threading,
- std::unique_ptr<const ReplayChannelIndices> replay_channel_indices);
+ std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
+ const std::vector<std::function<void(void *message)>>
+ &before_send_callbacks);
// Connects up the timestamp mappers.
void AddPeer(State *peer);
@@ -659,7 +704,7 @@
}
// Sends a buffer on the provided channel index.
- bool Send(const TimestampedMessage ×tamped_message);
+ bool Send(const TimestampedMessage &×tamped_message);
void MaybeSetClockOffset();
std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
@@ -838,8 +883,14 @@
// indices of the channels to replay for the Node represented by
// the instance of LogReader::State.
std::unique_ptr<const ReplayChannelIndices> replay_channel_indices_;
+ const std::vector<std::function<void(void *message)>>
+ before_send_callbacks_;
};
+ // Checks if any of the States have been constructed yet.
+ // This happens during Register
+ bool AreStatesInitialized() const;
+
// If a ReplayChannels was passed to LogReader then creates a
// ReplayChannelIndices for the given node. Otherwise, returns a nullptr.
std::unique_ptr<const ReplayChannelIndices> MaybeMakeReplayChannelIndices(
@@ -889,6 +940,10 @@
// name and type of channels to replay which is used when creating States.
const ReplayChannels *replay_channels_ = nullptr;
+ // The callbacks that will be called before sending a message indexed by the
+ // channel index from the logged_configuration
+ std::vector<std::function<void(void *message)>> before_send_callbacks_;
+
// If true, the replay timer will ignore any missing data. This is used
// during startup when we are bootstrapping everything and trying to get to
// the start of all the log files.