Add multi-node log file reading

This handles timestamps, sorting, and merging with data.

For simplicity, we read the log files once per node.  Once benchmarks
show if this is a bad idea, we can fix it.

Change-Id: I445ac5bfc7186bda25cc899602ac8d95a4cb946d
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index c501d7b..84e1f43 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -26,12 +26,22 @@
 
 Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
                std::chrono::milliseconds polling_period)
+    : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
+             event_loop, polling_period) {}
+
+Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+               std::chrono::milliseconds polling_period)
     : event_loop_(event_loop),
-      writer_(writer),
+      log_namer_(std::move(log_namer)),
       timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
       polling_period_(polling_period) {
+  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+  int channel_index = 0;
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
     FetcherStruct fs;
+    const bool is_local =
+        configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
+
     const bool is_readable =
         configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
     const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
@@ -50,28 +60,21 @@
               << configuration::CleanedChannelToString(channel);
 
       if (log_delivery_times) {
-        if (log_message) {
-          VLOG(1) << "  Logging message and delivery times";
-          fs.log_type = LogType::kLogMessageAndDeliveryTime;
-        } else {
-          VLOG(1) << "  Logging delivery times only";
-          fs.log_type = LogType::kLogDeliveryTimeOnly;
-        }
-      } else {
-        // We don't have a particularly great use case right now for logging a
-        // forwarded message, but either not logging the delivery times, or
-        // logging them on another node.  Fail rather than produce bad results.
-        CHECK(configuration::ChannelIsSendableOnNode(channel,
-                                                     event_loop_->node()))
-            << ": Logger only knows how to log remote messages with "
-               "forwarding timestamps.";
-        VLOG(1) << "  Logging message only";
-        fs.log_type = LogType::kLogMessage;
+        VLOG(1) << "  Delivery times";
+        fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
       }
+      if (log_message) {
+        VLOG(1) << "  Data";
+        fs.writer = log_namer_->MakeWriter(channel);
+        if (!is_local) {
+          fs.log_type = LogType::kLogRemoteMessage;
+        }
+      }
+      fs.channel_index = channel_index;
+      fs.written = false;
+      fetchers_.emplace_back(std::move(fs));
     }
-
-    fs.written = false;
-    fetchers_.emplace_back(std::move(fs));
+    ++channel_index;
   }
 
   // When things start, we want to log the header, then the most recent messages
@@ -82,9 +85,7 @@
     // so we can capture the latest message on each channel.  This lets us have
     // non periodic messages with configuration that now get logged.
     for (FetcherStruct &f : fetchers_) {
-      if (f.fetcher.get() != nullptr) {
-        f.written = !f.fetcher->Fetch();
-      }
+      f.written = !f.fetcher->Fetch();
     }
 
     // We need to pick a point in time to declare the log file "started".  This
@@ -105,6 +106,11 @@
 }
 
 void Logger::WriteHeader() {
+  for (const Node *node : log_namer_->nodes()) {
+    WriteHeader(node);
+  }
+}
+void Logger::WriteHeader(const Node *node) {
   // Now write the header with this timestamp in it.
   flatbuffers::FlatBufferBuilder fbb;
   fbb.ForceDefaults(1);
@@ -117,7 +123,7 @@
 
   flatbuffers::Offset<Node> node_offset;
   if (event_loop_->node() != nullptr) {
-    node_offset = CopyFlatBuffer(event_loop_->node(), &fbb);
+    node_offset = CopyFlatBuffer(node, &fbb);
   }
 
   aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
@@ -125,7 +131,7 @@
   log_file_header_builder.add_name(string_offset);
 
   // Only add the node if we are running in a multinode configuration.
-  if (event_loop_->node() != nullptr) {
+  if (node != nullptr) {
     log_file_header_builder.add_node(node_offset);
   }
 
@@ -149,15 +155,32 @@
           .count());
 
   fbb.FinishSizePrefixed(log_file_header_builder.Finish());
-  writer_->QueueSizedFlatbuffer(&fbb);
+  log_namer_->WriteHeader(&fbb, node);
 }
 
 void Logger::Rotate(DetachedBufferWriter *writer) {
+  Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
+}
+
+void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
   // Force data up until now to be written.
   DoLogData();
 
   // Swap the writer out, and re-write the header.
-  writer_ = writer;
+  log_namer_ = std::move(log_namer);
+
+  // And then update the writers.
+  for (FetcherStruct &f : fetchers_) {
+    const Channel *channel =
+        event_loop_->configuration()->channels()->Get(f.channel_index);
+    if (f.timestamp_writer != nullptr) {
+      f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
+    }
+    if (f.writer != nullptr) {
+      f.writer = log_namer_->MakeWriter(channel);
+    }
+  }
+
   WriteHeader();
 }
 
@@ -173,61 +196,81 @@
     // per iteration, even if it is small.
     last_synchronized_time_ =
         std::min(last_synchronized_time_ + polling_period_, monotonic_now);
-    size_t channel_index = 0;
     // Write each channel to disk, one at a time.
     for (FetcherStruct &f : fetchers_) {
-      // Skip any channels which we aren't supposed to log.
-      if (f.fetcher.get() != nullptr) {
-        while (true) {
-          if (f.written) {
-            if (!f.fetcher->FetchNext()) {
-              VLOG(2) << "No new data on "
-                      << configuration::CleanedChannelToString(
-                             f.fetcher->channel());
-              break;
-            } else {
-              f.written = false;
-            }
+      while (true) {
+        if (f.written) {
+          if (!f.fetcher->FetchNext()) {
+            VLOG(2) << "No new data on "
+                    << configuration::CleanedChannelToString(
+                           f.fetcher->channel());
+            break;
+          } else {
+            f.written = false;
           }
+        }
 
-          CHECK(!f.written);
+        CHECK(!f.written);
 
-          // TODO(james): Write tests to exercise this logic.
-          if (f.fetcher->context().monotonic_event_time <
-              last_synchronized_time_) {
+        // TODO(james): Write tests to exercise this logic.
+        if (f.fetcher->context().monotonic_event_time <
+            last_synchronized_time_) {
+          if (f.writer != nullptr) {
             // Write!
             flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
                                                max_header_size_);
             fbb.ForceDefaults(1);
 
             fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                               channel_index, f.log_type));
+                                               f.channel_index, f.log_type));
 
-            VLOG(2) << "Writing data for channel "
+            VLOG(1) << "Writing data as node "
+                    << FlatbufferToJson(event_loop_->node()) << " for channel "
                     << configuration::CleanedChannelToString(
-                           f.fetcher->channel());
+                           f.fetcher->channel())
+                    << " to " << f.writer->filename()
+                    << " data "
+                    << FlatbufferToJson(
+                           flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                               fbb.GetBufferPointer()));
 
             max_header_size_ = std::max(
                 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
-            writer_->QueueSizedFlatbuffer(&fbb);
-
-            f.written = true;
-          } else {
-            break;
+            f.writer->QueueSizedFlatbuffer(&fbb);
           }
+
+          if (f.timestamp_writer != nullptr) {
+            // And now handle timestamps.
+            flatbuffers::FlatBufferBuilder fbb;
+            fbb.ForceDefaults(1);
+
+            fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                               f.channel_index,
+                                               LogType::kLogDeliveryTimeOnly));
+
+            VLOG(1) << "Writing timestamps as node "
+                    << FlatbufferToJson(event_loop_->node()) << " for channel "
+                    << configuration::CleanedChannelToString(
+                           f.fetcher->channel())
+                    << " to " << f.timestamp_writer->filename()
+                    << " timestamp "
+                    << FlatbufferToJson(
+                           flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                               fbb.GetBufferPointer()));
+
+            f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+          }
+
+          f.written = true;
+        } else {
+          break;
         }
       }
-
-      ++channel_index;
     }
 
-    CHECK_EQ(channel_index, fetchers_.size());
-
     // If we missed cycles, we could be pretty far behind.  Spin until we are
     // caught up.
   } while (last_synchronized_time_ + polling_period_ < monotonic_now);
-
-  writer_->Flush();
 }
 
 LogReader::LogReader(std::string_view filename,
@@ -237,41 +280,58 @@
 
 LogReader::LogReader(const std::vector<std::string> &filenames,
                      const Configuration *replay_configuration)
-    : sorted_message_reader_(filenames),
+    : LogReader(std::vector<std::vector<std::string>>{filenames},
+                replay_configuration) {}
+
+LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
+                     const Configuration *replay_configuration)
+    : filenames_(filenames),
+      log_file_header_(ReadHeader(filenames[0][0])),
       replay_configuration_(replay_configuration) {
-  channels_.resize(logged_configuration()->channels()->size());
   MakeRemappedConfig();
+
+  if (!configuration::MultiNode(configuration())) {
+    auto it = channel_mergers_.insert(std::make_pair(nullptr, State{}));
+    State *state = &(it.first->second);
+
+    state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+  }
 }
 
 LogReader::~LogReader() { Deregister(); }
 
 const Configuration *LogReader::logged_configuration() const {
-  return sorted_message_reader_.configuration();
+  return log_file_header_.message().configuration();
 }
 
 const Configuration *LogReader::configuration() const {
   return remapped_configuration_;
 }
 
-const Node *LogReader::node() const {
+std::vector<const Node *> LogReader::Nodes() const {
   // Because the Node pointer will only be valid if it actually points to memory
   // owned by remapped_configuration_, we need to wait for the
   // remapped_configuration_ to be populated before accessing it.
+  //
+  // Also, note, that when ever a map is changed, the nodes in here are
+  // invalidated.
   CHECK(remapped_configuration_ != nullptr)
       << ": Need to call Register before the node() pointer will be valid.";
-  if (sorted_message_reader_.node() == nullptr) {
-    return nullptr;
-  }
-  return configuration::GetNode(
-      configuration(), sorted_message_reader_.node()->name()->string_view());
+  return configuration::GetNodes(remapped_configuration_);
 }
 
-monotonic_clock::time_point LogReader::monotonic_start_time() {
-  return sorted_message_reader_.monotonic_start_time();
+monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
+  auto it = channel_mergers_.find(node);
+  CHECK(it != channel_mergers_.end())
+      << ": Unknown node " << FlatbufferToJson(node);
+  return it->second.channel_merger->monotonic_start_time();
 }
 
-realtime_clock::time_point LogReader::realtime_start_time() {
-  return sorted_message_reader_.realtime_start_time();
+realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
+  auto it = channel_mergers_.find(node);
+  CHECK(it != channel_mergers_.end())
+      << ": Unknown node " << FlatbufferToJson(node);
+  return it->second.channel_merger->realtime_start_time();
 }
 
 void LogReader::Register() {
@@ -282,126 +342,156 @@
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
-  node_event_loop_factory_ =
-      event_loop_factory_->GetNodeEventLoopFactory(node());
-  event_loop_unique_ptr_ =
-      event_loop_factory->MakeEventLoop("log_reader", node());
-  // We don't run timing reports when trying to print out logged data, because
-  // otherwise we would end up printing out the timing reports themselves...
-  // This is only really relevant when we are replaying into a simulation.
-  event_loop_unique_ptr_->SkipTimingReport();
+  // We want to start the log file at the last start time of the log files from
+  // all the nodes.  Compute how long each node's simulation needs to run to
+  // move time to this point.
+  monotonic_clock::duration run_time = monotonic_clock::duration(0);
 
-  Register(event_loop_unique_ptr_.get());
-  event_loop_factory_->RunFor(monotonic_start_time() -
-                              event_loop_->monotonic_now());
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    auto it = channel_mergers_.insert(std::make_pair(node, State{}));
+
+    State *state = &(it.first->second);
+
+    state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
+
+    state->node_event_loop_factory =
+        event_loop_factory_->GetNodeEventLoopFactory(node);
+    state->event_loop_unique_ptr =
+        event_loop_factory->MakeEventLoop("log_reader", node);
+
+    Register(state->event_loop_unique_ptr.get());
+
+    const monotonic_clock::duration startup_time =
+        state->channel_merger->monotonic_start_time() -
+        state->event_loop->monotonic_now();
+    if (startup_time > run_time) {
+      run_time = startup_time;
+    }
+  }
+
+  // Forwarding is tracked per channel.  If it is enabled, we want to turn it
+  // off.  Otherwise messages replayed will get forwarded across to the other
+  // nodes, and also replayed on the other nodes.  This may not satisfy all our
+  // users, but it'll start the discussion.
+  if (configuration::MultiNode(event_loop_factory_->configuration())) {
+    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+      const Channel *channel = logged_configuration()->channels()->Get(i);
+      const Node *node = configuration::GetNode(
+          configuration(), channel->source_node()->string_view());
+
+      auto state_pair = channel_mergers_.find(node);
+      CHECK(state_pair != channel_mergers_.end());
+      State *state = &(state_pair->second);
+
+      const Channel *remapped_channel =
+          RemapChannel(state->event_loop, channel);
+
+      event_loop_factory_->DisableForwarding(remapped_channel);
+    }
+  }
+
+  event_loop_factory_->RunFor(run_time);
 }
 
 void LogReader::Register(EventLoop *event_loop) {
-  event_loop_ = event_loop;
+  auto state_pair = channel_mergers_.find(event_loop->node());
+  CHECK(state_pair != channel_mergers_.end());
+  State *state = &(state_pair->second);
+
+  state->event_loop = event_loop;
 
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
-  // Otherwise we replay the timing report and try to resend it...
-  event_loop_->SkipTimingReport();
-  event_loop_->SkipAosLog();
+  event_loop->SkipTimingReport();
+  event_loop->SkipAosLog();
 
-  for (size_t i = 0; i < channels_.size(); ++i) {
-    const Channel *const original_channel =
-        logged_configuration()->channels()->Get(i);
+  state->channel_merger->SetNode(event_loop->node());
 
-    std::string_view channel_name = original_channel->name()->string_view();
-    std::string_view channel_type = original_channel->type()->string_view();
-    // If the channel is remapped, find the correct channel name to use.
-    if (remapped_channels_.count(i) > 0) {
-      VLOG(2) << "Got remapped channel on "
-              << configuration::CleanedChannelToString(original_channel);
-      channel_name = remapped_channels_[i];
-    }
+  state->channels.resize(logged_configuration()->channels()->size());
 
-    VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
-    const Channel *channel = configuration::GetChannel(
-        event_loop_->configuration(), channel_name, channel_type,
-        event_loop_->name(), event_loop_->node());
+  for (size_t i = 0; i < state->channels.size(); ++i) {
+    const Channel *channel =
+        RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
 
-    CHECK(channel != nullptr)
-        << ": Unable to send {\"name\": \"" << channel_name
-        << "\", \"type\": \"" << channel_type
-        << "\"} because it is not in the provided configuration.";
-
-    channels_[i] = event_loop_->MakeRawSender(channel);
+    state->channels[i] = event_loop->MakeRawSender(channel);
   }
 
-  timer_handler_ = event_loop_->AddTimer([this]() {
-    if (sorted_message_reader_.active_channel_count() == 0u) {
-      event_loop_factory_->Exit();
+  state->timer_handler = event_loop->AddTimer([this, state]() {
+    if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
+      --live_nodes_;
+      if (live_nodes_ == 0) {
+        event_loop_factory_->Exit();
+      }
       return;
     }
-    monotonic_clock::time_point channel_timestamp;
+    TimestampMerger::DeliveryTimestamp channel_timestamp;
     int channel_index;
     FlatbufferVector<MessageHeader> channel_data =
         FlatbufferVector<MessageHeader>::Empty();
 
     std::tie(channel_timestamp, channel_index, channel_data) =
-        sorted_message_reader_.PopOldestChannel();
+        state->channel_merger->PopOldest();
 
     const monotonic_clock::time_point monotonic_now =
-        event_loop_->context().monotonic_event_time;
-    CHECK(monotonic_now == channel_timestamp)
+        state->event_loop->context().monotonic_event_time;
+    CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
         << ": Now " << monotonic_now.time_since_epoch().count()
-        << " trying to send " << channel_timestamp.time_since_epoch().count();
+        << " trying to send "
+        << channel_timestamp.monotonic_event_time.time_since_epoch().count();
 
-    if (channel_timestamp > monotonic_start_time() ||
+    if (channel_timestamp.monotonic_event_time >
+            state->channel_merger->monotonic_start_time() ||
         event_loop_factory_ != nullptr) {
       if (!FLAGS_skip_missing_forwarding_entries ||
           channel_data.message().data() != nullptr) {
         CHECK(channel_data.message().data() != nullptr)
             << ": Got a message without data.  Forwarding entry which was "
-               "not "
-               "matched?  Use --skip_missing_forwarding_entries to ignore "
+               "not matched?  Use --skip_missing_forwarding_entries to ignore "
                "this.";
 
         // If we have access to the factory, use it to fix the realtime time.
-        if (node_event_loop_factory_ != nullptr) {
-          node_event_loop_factory_->SetRealtimeOffset(
-              monotonic_clock::time_point(chrono::nanoseconds(
-                  channel_data.message().monotonic_sent_time())),
-              realtime_clock::time_point(chrono::nanoseconds(
-                  channel_data.message().realtime_sent_time())));
+        if (state->node_event_loop_factory != nullptr) {
+          state->node_event_loop_factory->SetRealtimeOffset(
+              channel_timestamp.monotonic_event_time,
+              channel_timestamp.realtime_event_time);
         }
 
-        channels_[channel_index]->Send(
+        state->channels[channel_index]->Send(
             channel_data.message().data()->Data(),
             channel_data.message().data()->size(),
-            monotonic_clock::time_point(chrono::nanoseconds(
-                channel_data.message().monotonic_remote_time())),
-            realtime_clock::time_point(chrono::nanoseconds(
-                channel_data.message().realtime_remote_time())),
-            channel_data.message().remote_queue_index());
+            channel_timestamp.monotonic_remote_time,
+            channel_timestamp.realtime_remote_time,
+            channel_timestamp.remote_queue_index);
       }
     } else {
-      LOG(WARNING) << "Not sending data from before the start of the log file. "
-                   << channel_timestamp.time_since_epoch().count() << " start "
-                   << monotonic_start_time().time_since_epoch().count() << " "
-                   << FlatbufferToJson(channel_data);
+      LOG(WARNING)
+          << "Not sending data from before the start of the log file. "
+          << channel_timestamp.monotonic_event_time.time_since_epoch().count()
+          << " start " << monotonic_start_time().time_since_epoch().count()
+          << " " << FlatbufferToJson(channel_data);
     }
 
-    if (sorted_message_reader_.active_channel_count() > 0u) {
-      timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+    const monotonic_clock::time_point next_time =
+        state->channel_merger->OldestMessage();
+    if (next_time != monotonic_clock::max_time) {
+      state->timer_handler->Setup(next_time);
     } else {
       // Set a timer up immediately after now to die. If we don't do this, then
       // the senders waiting on the message we just read will never get called.
       if (event_loop_factory_ != nullptr) {
-        timer_handler_->Setup(monotonic_now +
-                              event_loop_factory_->send_delay() +
-                              std::chrono::nanoseconds(1));
+        state->timer_handler->Setup(monotonic_now +
+                                    event_loop_factory_->send_delay() +
+                                    std::chrono::nanoseconds(1));
       }
     }
   });
 
-  if (sorted_message_reader_.active_channel_count() > 0u) {
-    event_loop_->OnRun([this]() {
-      timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+  ++live_nodes_;
+
+  if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) {
+    event_loop->OnRun([state]() {
+      state->timer_handler->Setup(state->channel_merger->OldestMessage());
     });
   }
 }
@@ -409,15 +499,20 @@
 void LogReader::Deregister() {
   // Make sure that things get destroyed in the correct order, rather than
   // relying on getting the order correct in the class definition.
-  for (size_t i = 0; i < channels_.size(); ++i) {
-    channels_[i].reset();
+  for (const Node *node : Nodes()) {
+    auto state_pair = channel_mergers_.find(node);
+    CHECK(state_pair != channel_mergers_.end());
+    State *state = &(state_pair->second);
+    for (size_t i = 0; i < state->channels.size(); ++i) {
+      state->channels[i].reset();
+    }
+    state->event_loop_unique_ptr.reset();
+    state->event_loop = nullptr;
+    state->node_event_loop_factory = nullptr;
   }
 
-  event_loop_unique_ptr_.reset();
-  event_loop_ = nullptr;
   event_loop_factory_unique_ptr_.reset();
   event_loop_factory_ = nullptr;
-  node_event_loop_factory_ = nullptr;
 }
 
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -442,13 +537,15 @@
 }
 
 void LogReader::MakeRemappedConfig() {
-  CHECK(!event_loop_)
-      << ": Can't change the mapping after the events are scheduled.";
+  for (std::pair<const Node *const, State> &state : channel_mergers_) {
+    CHECK(!state.second.event_loop)
+        << ": Can't change the mapping after the events are scheduled.";
+  }
 
   // If no remapping occurred and we are using the original config, then there
   // is nothing interesting to do here.
   if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
-    remapped_configuration_ = sorted_message_reader_.configuration();
+    remapped_configuration_ = logged_configuration();
     return;
   }
   // Config to copy Channel definitions from. Use the specified
@@ -526,5 +623,30 @@
   remapped_configuration_ = &remapped_configuration_buffer_->message();
 }
 
+const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
+                                       const Channel *channel) {
+  std::string_view channel_name = channel->name()->string_view();
+  std::string_view channel_type = channel->type()->string_view();
+  const int channel_index =
+      configuration::ChannelIndex(logged_configuration(), channel);
+  // If the channel is remapped, find the correct channel name to use.
+  if (remapped_channels_.count(channel_index) > 0) {
+    VLOG(2) << "Got remapped channel on "
+            << configuration::CleanedChannelToString(channel);
+    channel_name = remapped_channels_[channel_index];
+  }
+
+  VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
+  const Channel *remapped_channel = configuration::GetChannel(
+      event_loop->configuration(), channel_name, channel_type,
+      event_loop->name(), event_loop->node());
+
+  CHECK(remapped_channel != nullptr)
+      << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
+      << channel_type << "\"} because it is not in the provided configuration.";
+
+  return remapped_channel;
+}
+
 }  // namespace logger
 }  // namespace aos