Queue messages in LogReader::State

This gives us an interface to run the noncausal time offset filter.

Change-Id: I251714f3a6bcd78dd5cae8fd7089ca078e1663c4
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 6dbc36f..ade82f9 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1098,7 +1098,7 @@
   return true;
 }
 
-monotonic_clock::time_point ChannelMerger::OldestMessage() const {
+monotonic_clock::time_point ChannelMerger::OldestMessageTime() const {
   if (channel_heap_.empty()) {
     return monotonic_clock::max_time;
   }
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 23dadc8..2b08b59 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -570,7 +570,7 @@
   // Everything else needs the node set before it works.
 
   // Returns a timestamp for the oldest message in this group of logfiles.
-  monotonic_clock::time_point OldestMessage() const;
+  monotonic_clock::time_point OldestMessageTime() const;
   // Pops the oldest message.
   std::tuple<TimestampMerger::DeliveryTimestamp, int,
              FlatbufferVector<MessageHeader>>
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index abab5f6..242638c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -306,10 +306,8 @@
   }
 
   if (!configuration::MultiNode(configuration())) {
-    states_.emplace_back(std::make_unique<State>());
-    State *state = states_[0].get();
-
-    state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+    states_.emplace_back(
+        std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
   } else {
     if (replay_configuration) {
       CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -370,7 +368,7 @@
       states_[configuration::GetNodeIndex(configuration(), node)].get();
   CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
 
-  return state->channel_merger->monotonic_start_time();
+  return state->monotonic_start_time();
 }
 
 realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
@@ -378,7 +376,7 @@
       states_[configuration::GetNodeIndex(configuration(), node)].get();
   CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
 
-  return state->channel_merger->realtime_start_time();
+  return state->realtime_start_time();
 }
 
 void LogReader::Register() {
@@ -393,17 +391,12 @@
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
         configuration::GetNodeIndex(configuration(), node);
-    states_[node_index] = std::make_unique<State>();
+    states_[node_index] =
+        std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
     State *state = states_[node_index].get();
 
-    state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
-
-    state->node_event_loop_factory =
-        event_loop_factory_->GetNodeEventLoopFactory(node);
-    state->event_loop_unique_ptr =
-        event_loop_factory->MakeEventLoop("log_reader", node);
-
-    Register(state->event_loop_unique_ptr.get());
+    Register(state->SetNodeEventLoopFactory(
+        event_loop_factory_->GetNodeEventLoopFactory(node)));
   }
   if (live_nodes_ == 0) {
     LOG(FATAL)
@@ -479,7 +472,7 @@
   for (std::unique_ptr<State> &state : states_) {
     for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
       TimestampMerger::DeliveryTimestamp timestamp =
-          state->channel_merger->OldestTimestampForChannel(i);
+          state->OldestTimestampForChannel(i);
       if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
         CHECK(state->MaybeUpdateTimestamp(timestamp, i));
       }
@@ -548,13 +541,17 @@
   for (std::unique_ptr<State> &state : states_) {
     for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
       TimestampMerger::DeliveryTimestamp timestamp =
-          state->channel_merger->OldestTimestampForChannel(i);
+          state->OldestTimestampForChannel(i);
       if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
         CHECK(state->MaybeUpdateTimestamp(timestamp, i));
       }
     }
   }
 
+  for (std::unique_ptr<State> &state : states_) {
+    state->SeedSortedMessages();
+  }
+
   UpdateOffsets();
 
   // We want to start the log file at the last start time of the log files from
@@ -564,14 +561,12 @@
 
   for (std::unique_ptr<State> &state : states_) {
     // Setup the realtime clock to have something sane in it now.
-    state->node_event_loop_factory->SetRealtimeOffset(
-        state->channel_merger->monotonic_start_time(),
-        state->channel_merger->realtime_start_time());
+    state->SetRealtimeOffset(state->monotonic_start_time(),
+                             state->realtime_start_time());
     // And start computing the start time on the distributed clock now that that
     // works.
-    start_time = std::max(start_time,
-                          state->node_event_loop_factory->ToDistributedClock(
-                              state->channel_merger->monotonic_start_time()));
+    start_time = std::max(
+        start_time, state->ToDistributedClock(state->monotonic_start_time()));
   }
   CHECK_GE(start_time, distributed_clock::epoch());
 
@@ -589,7 +584,7 @@
           states_[configuration::GetNodeIndex(configuration(), node)].get();
 
       const Channel *remapped_channel =
-          RemapChannel(state->event_loop, channel);
+          RemapChannel(state->event_loop(), channel);
 
       event_loop_factory_->DisableForwarding(remapped_channel);
     }
@@ -618,8 +613,7 @@
 
   size_t node_index = 0;
   for (std::unique_ptr<State> &state : states_) {
-    state->node_event_loop_factory->SetDistributedOffset(-offset(node_index),
-                                                         1.0);
+    state->SetDistributedOffset(-offset(node_index), 1.0);
     ++node_index;
   }
 }
@@ -664,21 +658,22 @@
     const TimestampMerger::DeliveryTimestamp &channel_timestamp,
     int channel_index) {
   if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
+    CHECK(std::get<0>(filters_[channel_index]) == nullptr);
     return false;
   }
 
   // Got a forwarding timestamp!
-  CHECK(std::get<0>(filters[channel_index]) != nullptr);
+  CHECK(std::get<0>(filters_[channel_index]) != nullptr);
 
   // Call the correct method depending on if we are the forward or reverse
   // direction here.
-  if (std::get<1>(filters[channel_index])) {
-    std::get<0>(filters[channel_index])
+  if (std::get<1>(filters_[channel_index])) {
+    std::get<0>(filters_[channel_index])
         ->FwdSample(channel_timestamp.monotonic_event_time,
                     channel_timestamp.monotonic_event_time -
                         channel_timestamp.monotonic_remote_time);
   } else {
-    std::get<0>(filters[channel_index])
+    std::get<0>(filters_[channel_index])
         ->RevSample(channel_timestamp.monotonic_event_time,
                     channel_timestamp.monotonic_event_time -
                         channel_timestamp.monotonic_remote_time);
@@ -691,7 +686,7 @@
       states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
           .get();
 
-  state->event_loop = event_loop;
+  state->set_event_loop(event_loop);
 
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
@@ -699,32 +694,33 @@
   event_loop->SkipTimingReport();
   event_loop->SkipAosLog();
 
-  const bool has_data = state->channel_merger->SetNode(event_loop->node());
+  const bool has_data = state->SetNode();
 
-  state->channels.resize(logged_configuration()->channels()->size());
-  state->filters.resize(state->channels.size());
+  state->SetChannelCount(logged_configuration()->channels()->size());
 
-  state->channel_target_event_loop_factory.resize(state->channels.size());
-
-  for (size_t i = 0; i < state->channels.size(); ++i) {
+  for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
     const Channel *channel =
         RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
 
-    state->channels[i] = event_loop->MakeRawSender(channel);
+    std::tuple<message_bridge::ClippedAverageFilter *, bool> filter =
+        std::make_tuple(nullptr, false);
 
-    state->filters[i] = std::make_tuple(nullptr, false);
+    NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
 
     if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
         configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
       const Node *target_node = configuration::GetNode(
           event_loop->configuration(), channel->source_node()->string_view());
-      state->filters[i] = GetFilter(event_loop->node(), target_node);
+      filter = GetFilter(event_loop->node(), target_node);
 
       if (event_loop_factory_ != nullptr) {
-        state->channel_target_event_loop_factory[i] =
+        channel_target_event_loop_factory =
             event_loop_factory_->GetNodeEventLoopFactory(target_node);
       }
     }
+
+    state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
+                      channel_target_event_loop_factory);
   }
 
   // If we didn't find any log files with data in them, we won't ever get a
@@ -733,8 +729,8 @@
     return;
   }
 
-  state->timer_handler = event_loop->AddTimer([this, state]() {
-    if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
+  state->set_timer_handler(event_loop->AddTimer([this, state]() {
+    if (state->OldestMessageTime() == monotonic_clock::max_time) {
       --live_nodes_;
       VLOG(1) << "Node down!";
       if (live_nodes_ == 0) {
@@ -748,22 +744,23 @@
     FlatbufferVector<MessageHeader> channel_data =
         FlatbufferVector<MessageHeader>::Empty();
 
+    bool dummy_update_time = false;
     std::tie(channel_timestamp, channel_index, channel_data) =
-        state->channel_merger->PopOldest();
+        state->PopOldest(&dummy_update_time);
 
     const monotonic_clock::time_point monotonic_now =
-        state->event_loop->context().monotonic_event_time;
+        state->event_loop()->context().monotonic_event_time;
     CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
-        << ": " << FlatbufferToJson(state->event_loop->node()) << " Now "
+        << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
         << monotonic_now << " trying to send "
         << channel_timestamp.monotonic_event_time << " failure "
-        << state->channel_merger->DebugString();
+        << state->DebugString();
 
     if (channel_timestamp.monotonic_event_time >
-            state->channel_merger->monotonic_start_time() ||
+            state->monotonic_start_time() ||
         event_loop_factory_ != nullptr) {
       if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
-           !state->channel_merger->at_end()) ||
+           !state->at_end()) ||
           channel_data.message().data() != nullptr) {
         CHECK(channel_data.message().data() != nullptr)
             << ": Got a message without data.  Forwarding entry which was "
@@ -775,8 +772,7 @@
           // destination node (this node).  As a proxy, do this by making sure
           // that time on the source node is past when the message was sent.
           CHECK_LT(channel_timestamp.monotonic_remote_time,
-                   state->channel_target_event_loop_factory[channel_index]
-                       ->monotonic_now());
+                   state->monotonic_remote_now(channel_index));
 
           update_offsets = true;
 
@@ -803,31 +799,24 @@
             }
             fprintf(offset_fp_, "\n");
           }
-
-        } else {
-          CHECK(std::get<0>(state->filters[channel_index]) == nullptr);
         }
 
         // If we have access to the factory, use it to fix the realtime time.
-        if (state->node_event_loop_factory != nullptr) {
-          state->node_event_loop_factory->SetRealtimeOffset(
-              channel_timestamp.monotonic_event_time,
-              channel_timestamp.realtime_event_time);
-        }
+        state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
+                                 channel_timestamp.realtime_event_time);
 
-        state->channels[channel_index]->Send(
-            channel_data.message().data()->Data(),
-            channel_data.message().data()->size(),
-            channel_timestamp.monotonic_remote_time,
-            channel_timestamp.realtime_remote_time,
-            channel_timestamp.remote_queue_index);
-      } else if (state->channel_merger->at_end()) {
+        state->Send(channel_index, channel_data.message().data()->Data(),
+                    channel_data.message().data()->size(),
+                    channel_timestamp.monotonic_remote_time,
+                    channel_timestamp.realtime_remote_time,
+                    channel_timestamp.remote_queue_index);
+      } else if (state->at_end()) {
         // We are at the end of the log file and found missing data.  Finish
         // reading the rest of the log file and call it quits.  We don't want to
         // replay partial data.
-        while (state->channel_merger->OldestMessage() !=
-               monotonic_clock::max_time) {
-          state->channel_merger->PopOldest();
+        while (state->OldestMessageTime() != monotonic_clock::max_time) {
+          bool update_time_dummy;
+          state->PopOldest(&update_time_dummy);
         }
       }
 
@@ -839,17 +828,15 @@
           << " " << FlatbufferToJson(channel_data);
     }
 
-    const monotonic_clock::time_point next_time =
-        state->channel_merger->OldestMessage();
+    const monotonic_clock::time_point next_time = state->OldestMessageTime();
     if (next_time != monotonic_clock::max_time) {
-      state->timer_handler->Setup(next_time);
+      state->Setup(next_time);
     } else {
       // Set a timer up immediately after now to die. If we don't do this, then
       // the senders waiting on the message we just read will never get called.
       if (event_loop_factory_ != nullptr) {
-        state->timer_handler->Setup(monotonic_now +
-                                    event_loop_factory_->send_delay() +
-                                    std::chrono::nanoseconds(1));
+        state->Setup(monotonic_now + event_loop_factory_->send_delay() +
+                     std::chrono::nanoseconds(1));
       }
     }
 
@@ -859,14 +846,12 @@
     if (update_offsets) {
       UpdateOffsets();
     }
-  });
+  }));
 
   ++live_nodes_;
 
-  if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) {
-    event_loop->OnRun([state]() {
-      state->timer_handler->Setup(state->channel_merger->OldestMessage());
-    });
+  if (state->OldestMessageTime() != monotonic_clock::max_time) {
+    event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
   }
 }
 
@@ -874,12 +859,7 @@
   // Make sure that things get destroyed in the correct order, rather than
   // relying on getting the order correct in the class definition.
   for (std::unique_ptr<State> &state : states_) {
-    for (size_t i = 0; i < state->channels.size(); ++i) {
-      state->channels[i].reset();
-    }
-    state->event_loop_unique_ptr.reset();
-    state->event_loop = nullptr;
-    state->node_event_loop_factory = nullptr;
+    state->Deregister();
   }
 
   event_loop_factory_unique_ptr_.reset();
@@ -910,7 +890,7 @@
 void LogReader::MakeRemappedConfig() {
   for (std::unique_ptr<State> &state : states_) {
     if (state) {
-      CHECK(!state->event_loop)
+      CHECK(!state->event_loop())
           << ": Can't change the mapping after the events are scheduled.";
     }
   }
@@ -1021,5 +1001,103 @@
   return remapped_channel;
 }
 
+LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
+    : channel_merger_(std::move(channel_merger)) {}
+
+EventLoop *LogReader::State::SetNodeEventLoopFactory(
+    NodeEventLoopFactory *node_event_loop_factory) {
+  node_event_loop_factory_ = node_event_loop_factory;
+  event_loop_unique_ptr_ =
+      node_event_loop_factory_->MakeEventLoop("log_reader");
+  return event_loop_unique_ptr_.get();
+}
+
+void LogReader::State::SetChannelCount(size_t count) {
+  channels_.resize(count);
+  filters_.resize(count);
+  channel_target_event_loop_factory_.resize(count);
+}
+
+void LogReader::State::SetChannel(
+    size_t channel, std::unique_ptr<RawSender> sender,
+    std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
+    NodeEventLoopFactory *channel_target_event_loop_factory) {
+  channels_[channel] = std::move(sender);
+  filters_[channel] = filter;
+  channel_target_event_loop_factory_[channel] =
+      channel_target_event_loop_factory;
+}
+
+std::tuple<TimestampMerger::DeliveryTimestamp, int,
+           FlatbufferVector<MessageHeader>>
+LogReader::State::PopOldest(bool *update_time) {
+  CHECK_GT(sorted_messages_.size(), 0u);
+
+  std::tuple<TimestampMerger::DeliveryTimestamp, int,
+             FlatbufferVector<MessageHeader>>
+      result = std::move(sorted_messages_.front());
+  VLOG(1) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+          << std::get<0>(result).monotonic_event_time;
+  sorted_messages_.pop_front();
+  SeedSortedMessages();
+
+  *update_time = false;
+  return std::make_tuple(std::get<0>(result), std::get<1>(result),
+                         std::move(std::get<2>(result)));
+}
+
+monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
+  if (sorted_messages_.size() > 0) {
+    VLOG(1) << MaybeNodeName(event_loop_->node()) << "oldest message at "
+            << std::get<0>(sorted_messages_.front()).monotonic_event_time;
+    return std::get<0>(sorted_messages_.front()).monotonic_event_time;
+  }
+
+  return channel_merger_->OldestMessageTime();
+}
+
+void LogReader::State::SeedSortedMessages() {
+  const aos::monotonic_clock::time_point end_queue_time =
+      (sorted_messages_.size() > 0
+           ? std::get<0>(sorted_messages_.front()).monotonic_event_time
+           : channel_merger_->monotonic_start_time()) +
+      std::chrono::seconds(2);
+
+  while (true) {
+    if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
+      return;
+    }
+    if (sorted_messages_.size() > 0) {
+      // Stop placing sorted messages on the list once we have 2 seconds
+      // queued up (but queue at least until the log starts.
+      if (end_queue_time <
+          std::get<0>(sorted_messages_.back()).monotonic_event_time) {
+        return;
+      }
+    }
+
+    TimestampMerger::DeliveryTimestamp channel_timestamp;
+    int channel_index;
+    FlatbufferVector<MessageHeader> channel_data =
+        FlatbufferVector<MessageHeader>::Empty();
+
+    std::tie(channel_timestamp, channel_index, channel_data) =
+        channel_merger_->PopOldest();
+
+    sorted_messages_.emplace_back(channel_timestamp, channel_index,
+                                  std::move(channel_data));
+  }
+}
+
+void LogReader::State::Deregister() {
+  for (size_t i = 0; i < channels_.size(); ++i) {
+    channels_[i].reset();
+  }
+  event_loop_unique_ptr_.reset();
+  event_loop_ = nullptr;
+  timer_handler_ = nullptr;
+  node_event_loop_factory_ = nullptr;
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index c08d6d4..a123dcc 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -374,19 +374,23 @@
   Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
 
   // State per node.
-  struct State {
-    // Log file.
-    std::unique_ptr<ChannelMerger> channel_merger;
-    // Senders.
-    std::vector<std::unique_ptr<RawSender>> channels;
+  class State {
+   public:
+    State(std::unique_ptr<ChannelMerger> channel_merger);
 
-    // Factory (if we are in sim) that this loop was created on.
-    NodeEventLoopFactory *node_event_loop_factory = nullptr;
-    std::unique_ptr<EventLoop> event_loop_unique_ptr;
-    // Event loop.
-    EventLoop *event_loop = nullptr;
-    // And timer used to send messages.
-    TimerHandler *timer_handler;
+    // Returns the timestamps, channel_index, and message from a channel.
+    // update_time (will be) set to true when popping this message causes the
+    // filter to change the time offset estimation function.
+    std::tuple<TimestampMerger::DeliveryTimestamp, int,
+               FlatbufferVector<MessageHeader>>
+    PopOldest(bool *update_time);
+
+    // Returns the monotonic time of the oldest message.
+    monotonic_clock::time_point OldestMessageTime() const;
+
+    // Primes the queues inside State.  Should be called before calling
+    // OldestMessageTime.
+    void SeedSortedMessages();
 
     // Updates the timestamp filter with the timestamp.  Returns true if the
     // provided timestamp was actually a forwarding timestamp and used, and
@@ -395,16 +399,131 @@
         const TimestampMerger::DeliveryTimestamp &channel_timestamp,
         int channel_index);
 
+    // Returns the starting time for this node.
+    monotonic_clock::time_point monotonic_start_time() const {
+      return channel_merger_->monotonic_start_time();
+    }
+    realtime_clock::time_point realtime_start_time() const {
+      return channel_merger_->realtime_start_time();
+    }
+
+    // Sets the node event loop factory for replaying into a
+    // SimulatedEventLoopFactory.  Returns the EventLoop to use.
+    EventLoop *SetNodeEventLoopFactory(
+        NodeEventLoopFactory *node_event_loop_factory);
+
+    // Sets and gets the event loop to use.
+    void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
+    EventLoop *event_loop() { return event_loop_; }
+
+    // Returns the oldest timestamp for the provided channel.  This should only
+    // be called before SeedSortedMessages();
+    TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
+        size_t channel) {
+      return channel_merger_->OldestTimestampForChannel(channel);
+    }
+
+    // Sets the current realtime offset from the monotonic clock for this node
+    // (if we are on a simulated event loop).
+    void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
+                           realtime_clock::time_point realtime_time) {
+      if (node_event_loop_factory_ != nullptr) {
+        node_event_loop_factory_->SetRealtimeOffset(monotonic_time,
+                                                    realtime_time);
+      }
+    }
+
+    // Converts a timestamp from the monotonic clock on this node to the
+    // distributed clock.
+    distributed_clock::time_point ToDistributedClock(
+        monotonic_clock::time_point time) {
+      return node_event_loop_factory_->ToDistributedClock(time);
+    }
+
+    // Sets the offset (and slope) from the distributed clock.
+    void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
+                              double distributed_slope) {
+      node_event_loop_factory_->SetDistributedOffset(distributed_offset,
+                                                     distributed_slope);
+    }
+
+    // Returns the current time on the remote node which sends messages on
+    // channel_index.
+    monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
+      return channel_target_event_loop_factory_[channel_index]->monotonic_now();
+    }
+
+    // Sets the node we will be merging as, and returns true if there is any
+    // data on it.
+    bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
+
+    // Sets the number of channels.
+    void SetChannelCount(size_t count);
+
+    // Sets the sender, filter, and target factory for a channel.
+    void SetChannel(
+        size_t channel, std::unique_ptr<RawSender> sender,
+        std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
+        NodeEventLoopFactory *channel_target_event_loop_factory);
+
+    // Returns if we have read all the messages from all the logs.
+    bool at_end() const { return channel_merger_->at_end(); }
+
+    // Unregisters everything so we can destory the event loop.
+    void Deregister();
+
+    // Sets the current TimerHandle for the replay callback.
+    void set_timer_handler(TimerHandler *timer_handler) {
+      timer_handler_ = timer_handler;
+    }
+
+    // Sets the next wakeup time on the replay callback.
+    void Setup(monotonic_clock::time_point next_time) {
+      timer_handler_->Setup(next_time);
+    }
+
+    // Sends a buffer on the provided channel index.
+    bool Send(size_t channel_index, const void *data, size_t size,
+              aos::monotonic_clock::time_point monotonic_remote_time,
+              aos::realtime_clock::time_point realtime_remote_time,
+              uint32_t remote_queue_index) {
+      return channels_[channel_index]->Send(data, size, monotonic_remote_time,
+                                            realtime_remote_time,
+                                            remote_queue_index);
+    }
+
+    // Returns a debug string for the channel merger.
+    std::string DebugString() const { return channel_merger_->DebugString(); }
+
+   private:
+    // Log file.
+    std::unique_ptr<ChannelMerger> channel_merger_;
+
+    std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
+                          FlatbufferVector<MessageHeader>>>
+        sorted_messages_;
+
+    // Senders.
+    std::vector<std::unique_ptr<RawSender>> channels_;
+
+    // Factory (if we are in sim) that this loop was created on.
+    NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
+    std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+    // Event loop.
+    EventLoop *event_loop_ = nullptr;
+    // And timer used to send messages.
+    TimerHandler *timer_handler_;
+
     // Filters (or nullptr if it isn't a forwarded channel) for each channel.
     // This corresponds to the object which is shared among all the channels
     // going between 2 nodes.  The second element in the tuple indicates if this
     // is the primary direction or not.
     std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
-        filters;
+        filters_;
 
     // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
     // channel) which correspond to the originating node.
-    std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory;
+    std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
   };
 
   // Node index -> State.