LogReader Before Send Callback

Adds the ability to add a callback to LogReader to mutate or act on a
message right before it is sent.

Change-Id: I94a6b9fa2074c0a9aa8ea23cbc979e6cbce4bd05
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index fb6d6f9..b691b30 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -12,6 +12,7 @@
 
 #include "aos/condition.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/event_loop_tmpl.h"
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
@@ -352,6 +353,48 @@
   // Only applies when running against a SimulatedEventLoopFactory.
   void SetRealtimeReplayRate(double replay_rate);
 
+  // Adds a callback for a channel to be called right before sending a message.
+  // This allows a user to mutate a message or do any processing when a specific
+  // type of message is sent on a channel. The name and type of the channel
+  // corresponds to the logged_configuration's name and type.
+  //
+  // Note, only one callback can be registered per channel in the current
+  // implementation. And, the callback is called only once one the Sender's Node
+  // if the channel is forwarded.
+  //
+  // See multinode_logger_test for examples of usage.
+  template <typename Callback>
+  void AddBeforeSendCallback(std::string_view channel_name,
+                             Callback &&callback) {
+    CHECK(!AreStatesInitialized())
+        << ": Cannot add callbacks after calling Register";
+
+    using MessageType = typename std::remove_pointer<
+        typename event_loop_internal::watch_message_type_trait<
+            decltype(&Callback::operator())>::message_type>::type;
+
+    const Channel *channel = configuration::GetChannel(
+        logged_configuration(), channel_name,
+        MessageType::GetFullyQualifiedName(), "", nullptr);
+
+    CHECK(channel != nullptr)
+        << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
+        << MessageType::GetFullyQualifiedName()
+        << "\" } not found in config for application.";
+    auto channel_index =
+        configuration::ChannelIndex(logged_configuration(), channel);
+
+    CHECK(!before_send_callbacks_[channel_index])
+        << ": Before Send Callback already registered for channel "
+        << ":{ \"name\": \"" << channel_name << "\", \"type\": \""
+        << MessageType::GetFullyQualifiedName() << "\" }";
+
+    before_send_callbacks_[channel_index] = [callback](void *message) {
+      callback(flatbuffers::GetMutableRoot<MessageType>(
+          reinterpret_cast<char *>(message)));
+    };
+  }
+
  private:
   void Register(EventLoop *event_loop, const Node *node);
 
@@ -433,7 +476,9 @@
           message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
           std::function<void()> notice_realtime_end, const Node *node,
           ThreadedBuffering threading,
-          std::unique_ptr<const ReplayChannelIndices> replay_channel_indices);
+          std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
+          const std::vector<std::function<void(void *message)>>
+              &before_send_callbacks);
 
     // Connects up the timestamp mappers.
     void AddPeer(State *peer);
@@ -659,7 +704,7 @@
     }
 
     // Sends a buffer on the provided channel index.
-    bool Send(const TimestampedMessage &timestamped_message);
+    bool Send(const TimestampedMessage &&timestamped_message);
 
     void MaybeSetClockOffset();
     std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
@@ -838,8 +883,14 @@
     // indices of the channels to replay for the Node represented by
     // the instance of LogReader::State.
     std::unique_ptr<const ReplayChannelIndices> replay_channel_indices_;
+    const std::vector<std::function<void(void *message)>>
+        before_send_callbacks_;
   };
 
+  // Checks if any of the States have been constructed yet.
+  // This happens during Register
+  bool AreStatesInitialized() const;
+
   // If a ReplayChannels was passed to LogReader then creates a
   // ReplayChannelIndices for the given node. Otherwise, returns a nullptr.
   std::unique_ptr<const ReplayChannelIndices> MaybeMakeReplayChannelIndices(
@@ -889,6 +940,10 @@
   // name and type of channels to replay which is used when creating States.
   const ReplayChannels *replay_channels_ = nullptr;
 
+  // The callbacks that will be called before sending a message indexed by the
+  // channel index from the logged_configuration
+  std::vector<std::function<void(void *message)>> before_send_callbacks_;
+
   // If true, the replay timer will ignore any missing data.  This is used
   // during startup when we are bootstrapping everything and trying to get to
   // the start of all the log files.