LogReader Before Send Callback

Adds the ability to add a callback to LogReader to mutate or act on a
message right before it is sent.

Change-Id: I94a6b9fa2074c0a9aa8ea23cbc979e6cbce4bd05
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 75a53e1..d0957bf 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -7,6 +7,7 @@
 #include <sys/uio.h>
 
 #include <climits>
+#include <utility>
 #include <vector>
 
 #include "absl/strings/escaping.h"
@@ -378,6 +379,8 @@
     }
     states_.resize(configuration()->nodes()->size());
   }
+
+  before_send_callbacks_.resize(configuration()->channels()->size());
 }
 
 LogReader::~LogReader() {
@@ -620,7 +623,8 @@
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
         filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
-        State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndices(node));
+        State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndices(node),
+        before_send_callbacks_);
     State *state = states_[node_index].get();
     state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -804,7 +808,8 @@
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
         filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
-        State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndices(node));
+        State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndices(node),
+        before_send_callbacks_);
     State *state = states_[node_index].get();
 
     state->SetChannelCount(logged_configuration()->channels()->size());
@@ -1771,13 +1776,16 @@
     message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
     std::function<void()> notice_realtime_end, const Node *node,
     LogReader::State::ThreadedBuffering threading,
-    std::unique_ptr<const ReplayChannelIndices> replay_channel_indices)
+    std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
+    const std::vector<std::function<void(void *message)>>
+        &before_send_callbacks)
     : timestamp_mapper_(std::move(timestamp_mapper)),
       notice_realtime_end_(notice_realtime_end),
       node_(node),
       multinode_filters_(multinode_filters),
       threading_(threading),
-      replay_channel_indices_(std::move(replay_channel_indices)) {
+      replay_channel_indices_(std::move(replay_channel_indices)),
+      before_send_callbacks_(before_send_callbacks) {
   // If timestamp_mapper_ is nullptr, then there are no log parts associated
   // with this node. If there are no log parts for the node, there will be no
   // log data, and so we do not need to worry about the replay channel filters.
@@ -1883,7 +1891,7 @@
   timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
 }
 
-bool LogReader::State::Send(const TimestampedMessage &timestamped_message) {
+bool LogReader::State::Send(const TimestampedMessage &&timestamped_message) {
   aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
   CHECK(sender);
   uint32_t remote_queue_index = 0xffffffff;
@@ -1973,6 +1981,16 @@
                  ->boot_uuid());
   }
 
+  // Right before sending allow the user to process the message.
+  if (before_send_callbacks_[timestamped_message.channel_index]) {
+    // Only channels that are forwarded and sent from this State's node will be
+    // in the queue_index_map_
+    if (queue_index_map_[timestamped_message.channel_index]) {
+      before_send_callbacks_[timestamped_message.channel_index](
+          timestamped_message.data->mutable_data());
+    }
+  }
+
   // Send!  Use the replayed queue index here instead of the logged queue index
   // for the remote queue index.  This makes re-logging work.
   const RawSender::Error err = sender->Send(
@@ -2426,5 +2444,14 @@
   }
 }
 
+bool LogReader::AreStatesInitialized() const {
+  for (const auto &state : states_) {
+    if (state) {
+      return true;
+    }
+  }
+  return false;
+}
+
 }  // namespace logger
 }  // namespace aos