Recreate remote timestamp logging in LogReader

It is useful to be able to log data, replay it into a simulation, and
then recreate a log again.  To do this, we need remote timestamps to
work correctly.

When LogReader replays a forwarded message, it now creates the
corresponding MessageHeader and publishes it.  It also tracks the queue
indicies such that the message is valid and can be logged.

Logger also translates channel indices as well when the logging config
is not the event loop config.

Change-Id: Iff6175a204b191c6f43a1d73ffce5b542925860c
diff --git a/aos/configuration.cc b/aos/configuration.cc
index c74234e..f466ee0 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -975,5 +975,40 @@
   return result;
 }
 
+std::vector<const Node *> TimestampNodes(const Configuration *config,
+                                         const Node *my_node) {
+  if (!configuration::MultiNode(config)) {
+    CHECK(my_node == nullptr);
+    return std::vector<const Node *>{};
+  }
+
+  std::set<const Node *> timestamp_logger_nodes;
+  for (const Channel *channel : *config->channels()) {
+    if (!configuration::ChannelIsSendableOnNode(channel, my_node)) {
+      continue;
+    }
+    if (!channel->has_destination_nodes()) {
+      continue;
+    }
+    for (const Connection *connection : *channel->destination_nodes()) {
+      const Node *other_node =
+          configuration::GetNode(config, connection->name()->string_view());
+
+      if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+                                                              my_node)) {
+        VLOG(1) << "Timestamps are logged from "
+                << FlatbufferToJson(other_node);
+        timestamp_logger_nodes.insert(other_node);
+      }
+    }
+  }
+
+  std::vector<const Node *> result;
+  for (const Node *node : timestamp_logger_nodes) {
+    result.emplace_back(node);
+  }
+  return result;
+}
+
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index fbd9cff..f1bcece 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -135,6 +135,10 @@
 std::vector<std::string_view> SourceNodeNames(const Configuration *config,
                                               const Node *my_node);
 
+// Returns the all nodes that are logging timestamps on our node.
+std::vector<const Node *> TimestampNodes(const Configuration *config,
+                                         const Node *my_node);
+
 // TODO(austin): GetSchema<T>(const Flatbuffer<Configuration> &config);
 
 }  // namespace configuration
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 15b4fd9..70dae61 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1153,6 +1153,8 @@
     timestamp.realtime_event_time =
         realtime_clock::time_point(chrono::nanoseconds(
             std::get<2>(oldest_timestamp).message().realtime_sent_time()));
+    timestamp.queue_index =
+        std::get<2>(oldest_timestamp).message().queue_index();
 
     // Consistency check.
     CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
@@ -1255,6 +1257,7 @@
     timestamp.realtime_event_time =
         realtime_clock::time_point(chrono::nanoseconds(
             std::get<2>(oldest_message).message().realtime_sent_time()));
+    timestamp.queue_index = std::get<2>(oldest_message).message().queue_index();
     timestamp.remote_queue_index = 0xffffffff;
 
     CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index a3d16f7..0b0c8f5 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -492,6 +492,7 @@
     monotonic_clock::time_point monotonic_event_time =
         monotonic_clock::min_time;
     realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+    uint32_t queue_index = 0xffffffff;
 
     monotonic_clock::time_point monotonic_remote_time =
         monotonic_clock::min_time;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 2780bd5..19a0229 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -52,38 +52,22 @@
               : aos::Fetcher<message_bridge::ServerStatistics>()) {
   VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
 
-  // Find all the nodes which are logging timestamps on our node.
-  std::set<const Node *> timestamp_logger_nodes;
-  for (const Channel *channel : *configuration_->channels()) {
-    if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
-      continue;
-    }
-    if (!channel->has_destination_nodes()) {
-      continue;
-    }
-    if (!should_log(channel)) {
-      continue;
-    }
-    for (const Connection *connection : *channel->destination_nodes()) {
-      const Node *other_node = configuration::GetNode(
-          configuration_, connection->name()->string_view());
-
-      if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-              connection, event_loop_->node())) {
-        VLOG(1) << "Timestamps are logged from "
-                << FlatbufferToJson(other_node);
-        timestamp_logger_nodes.insert(other_node);
-      }
-    }
-  }
+  // Find all the nodes which are logging timestamps on our node.  This may
+  // over-estimate if should_log is specified.
+  std::vector<const Node *> timestamp_logger_nodes =
+      configuration::TimestampNodes(configuration_, event_loop_->node());
 
   std::map<const Channel *, const Node *> timestamp_logger_channels;
 
   // Now that we have all the nodes accumulated, make remote timestamp loggers
   // for them.
   for (const Node *node : timestamp_logger_nodes) {
+    // Note: since we are doing a find using the event loop channel, we need to
+    // make sure this channel pointer is part of the event loop configuration,
+    // not configuration_.  This only matters when configuration_ !=
+    // event_loop->configuration();
     const Channel *channel = configuration::GetChannel(
-        configuration_,
+        event_loop->configuration(),
         absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
         logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
         event_loop_->node());
@@ -112,6 +96,9 @@
     const Channel *channel = aos::configuration::GetChannel(
         event_loop_->configuration(), config_channel->name()->string_view(),
         config_channel->type()->string_view(), "", event_loop_->node());
+    CHECK(channel != nullptr)
+        << ": Failed to look up channel "
+        << aos::configuration::CleanedChannelToString(config_channel);
     if (!should_log(channel)) {
       continue;
     }
@@ -168,6 +155,29 @@
       fetchers_.emplace_back(std::move(fs));
     }
   }
+
+  // When we are logging remote timestamps, we need to be able to translate from
+  // the channel index that the event loop uses to the channel index in the
+  // config in the log file.
+  event_loop_to_logged_channel_index_.resize(
+      event_loop->configuration()->channels()->size(), -1);
+  for (size_t event_loop_channel_index = 0;
+       event_loop_channel_index <
+       event_loop->configuration()->channels()->size();
+       ++event_loop_channel_index) {
+    const Channel *event_loop_channel =
+        event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
+    const Channel *logged_channel = aos::configuration::GetChannel(
+        configuration_, event_loop_channel->name()->string_view(),
+        event_loop_channel->type()->string_view(), "",
+        configuration::GetNode(configuration_, event_loop_->node()));
+
+    if (logged_channel != nullptr) {
+      event_loop_to_logged_channel_index_[event_loop_channel_index] =
+          configuration::ChannelIndex(configuration_, logged_channel);
+    }
+  }
 }
 
 Logger::~Logger() {
@@ -549,10 +559,18 @@
 
           logger::MessageHeader::Builder message_header_builder(fbb);
 
+          // TODO(austin): This needs to check the channel_index and confirm
+          // that it should be logged before squirreling away the timestamp to
+          // disk.  We don't want to log irrelevant timestamps.
+
           // Note: this must match the same order as MessageBridgeServer and
           // PackMessage.  We want identical headers to have identical
           // on-the-wire formats to make comparing them easier.
-          message_header_builder.add_channel_index(msg->channel_index());
+
+          // Translate from the channel index that the event loop uses to the
+          // channel index in the log file.
+          message_header_builder.add_channel_index(
+              event_loop_to_logged_channel_index_[msg->channel_index()]);
 
           message_header_builder.add_queue_index(msg->queue_index());
           message_header_builder.add_monotonic_sent_time(
@@ -856,6 +874,22 @@
       replay_configuration_(replay_configuration) {
   MakeRemappedConfig();
 
+  // Remap all existing remote timestamp channels.  They will be recreated, and
+  // the data logged isn't relevant anymore.
+  for (const Node *node : Nodes()) {
+    std::vector<const Node *> timestamp_logger_nodes =
+        configuration::TimestampNodes(logged_configuration(), node);
+    for (const Node *remote_node : timestamp_logger_nodes) {
+      const std::string channel = absl::StrCat(
+          "/aos/remote_timestamps/", remote_node->name()->string_view());
+      CHECK(HasChannel<logger::MessageHeader>(channel, node))
+          << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
+          << logger::MessageHeader::GetFullyQualifiedName() << "\"} for node "
+          << node->name()->string_view();
+      RemapLoggedChannel<logger::MessageHeader>(channel, node);
+    }
+  }
+
   if (replay_configuration) {
     CHECK_EQ(configuration::MultiNode(configuration()),
              configuration::MultiNode(replay_configuration))
@@ -956,10 +990,22 @@
     states_[node_index] =
         std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
     State *state = states_[node_index].get();
-
-    Register(state->SetNodeEventLoopFactory(
+    state->set_event_loop(state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node)));
+
+    state->SetChannelCount(logged_configuration()->channels()->size());
   }
+
+  // Register after making all the State objects so we can build references
+  // between them.
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    State *state = states_[node_index].get();
+
+    Register(state->event_loop());
+  }
+
   if (live_nodes_ == 0) {
     LOG(FATAL)
         << "Don't have logs from any of the nodes in the replay config--are "
@@ -1313,29 +1359,46 @@
 
   const bool has_data = state->SetNode();
 
-  state->SetChannelCount(logged_configuration()->channels()->size());
+  for (size_t logged_channel_index = 0;
+       logged_channel_index < logged_configuration()->channels()->size();
+       ++logged_channel_index) {
+    const Channel *channel = RemapChannel(
+        event_loop,
+        logged_configuration()->channels()->Get(logged_channel_index));
 
-  for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
-    const Channel *channel =
-        RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
-
-    NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
+    aos::Sender<MessageHeader> *remote_timestamp_sender = nullptr;
+
+    State *source_state = nullptr;
 
     if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
         configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
-      const Node *target_node = configuration::GetNode(
+      // We've got a message which is being forwarded to this node.
+      const Node *source_node = configuration::GetNode(
           event_loop->configuration(), channel->source_node()->string_view());
-      filter = GetFilter(event_loop->node(), target_node);
+      filter = GetFilter(event_loop->node(), source_node);
 
-      if (event_loop_factory_ != nullptr) {
-        channel_target_event_loop_factory =
-            event_loop_factory_->GetNodeEventLoopFactory(target_node);
+      // Delivery timestamps are supposed to be logged back on the source node.
+      // Configure remote timestamps to be sent.
+      const bool delivery_time_is_logged =
+          configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+              channel, event_loop->node(), source_node);
+
+      source_state =
+          states_[configuration::GetNodeIndex(configuration(), source_node)]
+              .get();
+
+      if (delivery_time_is_logged) {
+        remote_timestamp_sender =
+            source_state->RemoteTimestampSender(event_loop->node());
       }
     }
 
-    state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
-                      channel_target_event_loop_factory);
+    state->SetChannel(
+        logged_channel_index,
+        configuration::ChannelIndex(event_loop->configuration(), channel),
+        event_loop->MakeRawSender(channel), filter, remote_timestamp_sender,
+        source_state);
   }
 
   // If we didn't find any log files with data in them, we won't ever get a
@@ -1464,10 +1527,7 @@
         // TODO(austin): std::move channel_data in and make that efficient in
         // simulation.
         state->Send(channel_index, channel_data.message().data()->Data(),
-                    channel_data.message().data()->size(),
-                    channel_timestamp.monotonic_remote_time,
-                    channel_timestamp.realtime_remote_time,
-                    channel_timestamp.remote_queue_index);
+                    channel_data.message().data()->size(), channel_timestamp);
       } else if (state->at_end() && !ignore_missing_data_) {
         // We are at the end of the log file and found missing data.  Finish
         // reading the rest of the log file and call it quits.  We don't want
@@ -1829,18 +1889,148 @@
 
 void LogReader::State::SetChannelCount(size_t count) {
   channels_.resize(count);
+  remote_timestamp_senders_.resize(count);
   filters_.resize(count);
-  channel_target_event_loop_factory_.resize(count);
+  channel_source_state_.resize(count);
+  factory_channel_index_.resize(count);
+  queue_index_map_.resize(count);
 }
 
 void LogReader::State::SetChannel(
-    size_t channel, std::unique_ptr<RawSender> sender,
+    size_t logged_channel_index, size_t factory_channel_index,
+    std::unique_ptr<RawSender> sender,
     message_bridge::NoncausalOffsetEstimator *filter,
-    NodeEventLoopFactory *channel_target_event_loop_factory) {
-  channels_[channel] = std::move(sender);
-  filters_[channel] = filter;
-  channel_target_event_loop_factory_[channel] =
-      channel_target_event_loop_factory;
+    aos::Sender<MessageHeader> *remote_timestamp_sender, State *source_state) {
+  channels_[logged_channel_index] = std::move(sender);
+  filters_[logged_channel_index] = filter;
+  remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
+
+  if (source_state) {
+    channel_source_state_[logged_channel_index] = source_state;
+
+    if (remote_timestamp_sender != nullptr) {
+      source_state->queue_index_map_[logged_channel_index] =
+          std::make_unique<std::vector<State::SentTimestamp>>();
+    }
+  }
+
+  factory_channel_index_[logged_channel_index] = factory_channel_index;
+}
+
+bool LogReader::State::Send(
+    size_t channel_index, const void *data, size_t size,
+    const TimestampMerger::DeliveryTimestamp &delivery_timestamp) {
+  aos::RawSender *sender = channels_[channel_index].get();
+  uint32_t remote_queue_index = 0xffffffff;
+
+  if (remote_timestamp_senders_[channel_index] != nullptr) {
+    std::vector<SentTimestamp> *queue_index_map =
+        CHECK_NOTNULL(CHECK_NOTNULL(channel_source_state_[channel_index])
+                          ->queue_index_map_[channel_index]
+                          .get());
+
+    SentTimestamp search;
+    search.monotonic_event_time = delivery_timestamp.monotonic_remote_time;
+    search.realtime_event_time = delivery_timestamp.realtime_remote_time;
+    search.queue_index = delivery_timestamp.remote_queue_index;
+
+    // Find the sent time if available.
+    auto element = std::lower_bound(
+        queue_index_map->begin(), queue_index_map->end(), search,
+        [](SentTimestamp a, SentTimestamp b) {
+          if (b.monotonic_event_time < a.monotonic_event_time) {
+            return false;
+          }
+          if (b.monotonic_event_time > a.monotonic_event_time) {
+            return true;
+          }
+
+          if (b.queue_index < a.queue_index) {
+            return false;
+          }
+          if (b.queue_index > a.queue_index) {
+            return true;
+          }
+
+          CHECK_EQ(a.realtime_event_time, b.realtime_event_time);
+          return false;
+        });
+
+    // TODO(austin): Be a bit more principled here, but we will want to do that
+    // after the logger rewrite.  We hit this when one node finishes, but the
+    // other node isn't done yet.  So there is no send time, but there is a
+    // receive time.
+    if (element != queue_index_map->end()) {
+      CHECK_EQ(element->monotonic_event_time,
+               delivery_timestamp.monotonic_remote_time);
+      CHECK_EQ(element->realtime_event_time,
+               delivery_timestamp.realtime_remote_time);
+      CHECK_EQ(element->queue_index, delivery_timestamp.remote_queue_index);
+
+      remote_queue_index = element->actual_queue_index;
+    }
+  }
+
+  // Send!  Use the replayed queue index here instead of the logged queue index
+  // for the remote queue index.  This makes re-logging work.
+  const bool sent =
+      sender->Send(data, size, delivery_timestamp.monotonic_remote_time,
+                   delivery_timestamp.realtime_remote_time, remote_queue_index);
+  if (!sent) return false;
+
+  if (queue_index_map_[channel_index]) {
+    SentTimestamp timestamp;
+    timestamp.monotonic_event_time = delivery_timestamp.monotonic_event_time;
+    timestamp.realtime_event_time = delivery_timestamp.realtime_event_time;
+    timestamp.queue_index = delivery_timestamp.queue_index;
+    timestamp.actual_queue_index = sender->sent_queue_index();
+    queue_index_map_[channel_index]->emplace_back(timestamp);
+  } else if (remote_timestamp_senders_[channel_index] != nullptr) {
+    aos::Sender<MessageHeader>::Builder builder =
+        remote_timestamp_senders_[channel_index]->MakeBuilder();
+
+    logger::MessageHeader::Builder message_header_builder =
+        builder.MakeBuilder<logger::MessageHeader>();
+
+    message_header_builder.add_channel_index(
+        factory_channel_index_[channel_index]);
+
+    // Swap the remote and sent metrics.  They are from the sender's
+    // perspective, not the receiver's perspective.
+    message_header_builder.add_monotonic_sent_time(
+        sender->monotonic_sent_time().time_since_epoch().count());
+    message_header_builder.add_realtime_sent_time(
+        sender->realtime_sent_time().time_since_epoch().count());
+    message_header_builder.add_queue_index(sender->sent_queue_index());
+
+    message_header_builder.add_monotonic_remote_time(
+        delivery_timestamp.monotonic_remote_time.time_since_epoch().count());
+    message_header_builder.add_realtime_remote_time(
+        delivery_timestamp.realtime_remote_time.time_since_epoch().count());
+
+    message_header_builder.add_remote_queue_index(remote_queue_index);
+
+    builder.Send(message_header_builder.Finish());
+  }
+
+  return true;
+}
+
+aos::Sender<MessageHeader> *LogReader::State::RemoteTimestampSender(
+    const Node *delivered_node) {
+  auto sender = remote_timestamp_senders_map_.find(delivered_node);
+
+  if (sender == remote_timestamp_senders_map_.end()) {
+    sender = remote_timestamp_senders_map_
+                 .emplace(std::make_pair(
+                     delivered_node,
+                     event_loop()->MakeSender<MessageHeader>(
+                         absl::StrCat("/aos/remote_timestamps/",
+                                      delivered_node->name()->string_view()))))
+                 .first;
+  }
+
+  return &(sender->second);
 }
 
 std::tuple<TimestampMerger::DeliveryTimestamp, int,
@@ -1929,6 +2119,7 @@
   for (size_t i = 0; i < channels_.size(); ++i) {
     channels_[i].reset();
   }
+  remote_timestamp_senders_map_.clear();
   event_loop_unique_ptr_.reset();
   event_loop_ = nullptr;
   timer_handler_ = nullptr;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 161300c..dd10f38 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -110,6 +110,7 @@
     std::unique_ptr<RawFetcher> fetcher;
     bool written = false;
 
+    // Channel index to log to.
     int channel_index = -1;
     const Channel *channel = nullptr;
     const Node *timestamp_node = nullptr;
@@ -130,6 +131,10 @@
     int node_index = 0;
   };
 
+  // Vector mapping from the channel index from the event loop to the logged
+  // channel index.
+  std::vector<int> event_loop_to_logged_channel_index_;
+
   struct NodeState {
     aos::monotonic_clock::time_point monotonic_start_time =
         aos::monotonic_clock::min_time;
@@ -460,6 +465,11 @@
       }
     }
 
+    // Returns the MessageHeader sender to log delivery timestamps to for the
+    // provided remote node.
+    aos::Sender<MessageHeader> *RemoteTimestampSender(
+        const Node *delivered_node);
+
     // Converts a timestamp from the monotonic clock on this node to the
     // distributed clock.
     distributed_clock::time_point ToDistributedClock(
@@ -482,17 +492,19 @@
     // Returns the current time on the remote node which sends messages on
     // channel_index.
     monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
-      return channel_target_event_loop_factory_[channel_index]->monotonic_now();
+      return channel_source_state_[channel_index]
+          ->node_event_loop_factory_->monotonic_now();
     }
 
     distributed_clock::time_point RemoteToDistributedClock(
         size_t channel_index, monotonic_clock::time_point time) {
-      return channel_target_event_loop_factory_[channel_index]
-          ->ToDistributedClock(time);
+      return channel_source_state_[channel_index]
+          ->node_event_loop_factory_->ToDistributedClock(time);
     }
 
     const Node *remote_node(size_t channel_index) {
-      return channel_target_event_loop_factory_[channel_index]->node();
+      return channel_source_state_[channel_index]
+          ->node_event_loop_factory_->node();
     }
 
     monotonic_clock::time_point monotonic_now() {
@@ -507,9 +519,11 @@
     void SetChannelCount(size_t count);
 
     // Sets the sender, filter, and target factory for a channel.
-    void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
+    void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+                    std::unique_ptr<RawSender> sender,
                     message_bridge::NoncausalOffsetEstimator *filter,
-                    NodeEventLoopFactory *channel_target_event_loop_factory);
+                    aos::Sender<MessageHeader> *remote_timestamp_sender,
+                    State *source_state);
 
     // Returns if we have read all the messages from all the logs.
     bool at_end() const { return channel_merger_->at_end(); }
@@ -529,13 +543,7 @@
 
     // Sends a buffer on the provided channel index.
     bool Send(size_t channel_index, const void *data, size_t size,
-              aos::monotonic_clock::time_point monotonic_remote_time,
-              aos::realtime_clock::time_point realtime_remote_time,
-              uint32_t remote_queue_index) {
-      return channels_[channel_index]->Send(data, size, monotonic_remote_time,
-                                            realtime_remote_time,
-                                            remote_queue_index);
-    }
+              const TimestampMerger::DeliveryTimestamp &delivery_timestamp);
 
     // Returns a debug string for the channel merger.
     std::string DebugString() const {
@@ -568,6 +576,28 @@
 
     // Senders.
     std::vector<std::unique_ptr<RawSender>> channels_;
+    std::vector<aos::Sender<MessageHeader> *> remote_timestamp_senders_;
+    // The mapping from logged channel index to sent channel index.  Needed for
+    // sending out MessageHeaders.
+    std::vector<int> factory_channel_index_;
+
+    struct SentTimestamp {
+      monotonic_clock::time_point monotonic_event_time =
+          monotonic_clock::min_time;
+      realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+      uint32_t queue_index = 0xffffffff;
+
+      // The queue index that this message *actually* was sent with.
+      uint32_t actual_queue_index = 0xffffffff;
+    };
+
+    // Stores all the timestamps that have been sent on this channel.  This is
+    // only done for channels which are forwarded and on the node which
+    // initially sends the message.
+    //
+    // TODO(austin): This whole concept is a hack.  We should be able to
+    // associate state with the message as it gets sorted and recover it.
+    std::vector<std::unique_ptr<std::vector<SentTimestamp>>> queue_index_map_;
 
     // Factory (if we are in sim) that this loop was created on.
     NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
@@ -585,7 +615,10 @@
 
     // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
     // channel) which correspond to the originating node.
-    std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
+    std::vector<State *> channel_source_state_;
+
+    std::map<const Node *, aos::Sender<MessageHeader>>
+        remote_timestamp_senders_map_;
   };
 
   // Node index -> State.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 108d4de..51994e3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -5,6 +5,7 @@
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
 #include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_generated.h"
 #include "aos/util/file.h"
 #include "glog/logging.h"
 #include "gmock/gmock.h"
@@ -412,16 +413,24 @@
     std::unique_ptr<Logger> logger;
   };
 
-  LoggerState MakeLogger(const Node *node) {
-    return {event_loop_factory_.MakeEventLoop("logger", node), {}};
+  LoggerState MakeLogger(const Node *node,
+                         SimulatedEventLoopFactory *factory = nullptr) {
+    if (factory == nullptr) {
+      factory = &event_loop_factory_;
+    }
+    return {factory->MakeEventLoop("logger", node), {}};
   }
 
-  void StartLogger(LoggerState *logger) {
+  void StartLogger(LoggerState *logger, std::string logfile_base = "") {
+    if (logfile_base.empty()) {
+      logfile_base = logfile_base_;
+    }
+
     logger->logger = std::make_unique<Logger>(logger->event_loop.get());
     logger->logger->set_polling_period(std::chrono::milliseconds(100));
-    logger->event_loop->OnRun([this, logger]() {
+    logger->event_loop->OnRun([logger, logfile_base]() {
       logger->logger->StartLogging(std::make_unique<MultiNodeLogNamer>(
-          logfile_base_, logger->event_loop->configuration(),
+          logfile_base, logger->event_loop->configuration(),
           logger->event_loop->node()));
     });
   }
@@ -656,7 +665,7 @@
 
   LogReader reader(structured_logfiles_);
 
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   // This sends out the fetched messages and advances time to the start of the
@@ -731,7 +740,7 @@
         ++pi2_ping_count;
       });
 
-  constexpr ssize_t kQueueIndexOffset = 0;
+  constexpr ssize_t kQueueIndexOffset = -9;
   // Confirm that the ping and pong counts both match, and the value also
   // matches.
   pi1_event_loop->MakeWatcher(
@@ -771,7 +780,7 @@
                 << pi2_event_loop->context().monotonic_event_time;
 
         EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
-                  pi2_pong_count + kQueueIndexOffset - 9);
+                  pi2_pong_count + kQueueIndexOffset);
 
         EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
                   chrono::microseconds(200) +
@@ -854,7 +863,7 @@
 
   LogReader reader(structured_logfiles_);
 
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   // This sends out the fetched messages and advances time to the start of the
@@ -994,7 +1003,7 @@
 
   LogReader reader(structured_logfiles_);
 
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   const Node *pi1 =
@@ -1216,6 +1225,213 @@
   reader.Deregister();
 }
 
+// Tests that we properly recreate forwarded timestamps when replaying a log.
+// This should be enough that we can then re-run the logger and get a valid log
+// back.
+TEST_F(MultinodeLoggerTest, MessageHeader) {
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(structured_logfiles_);
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+  LOG(INFO) << "now pi1 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+  LOG(INFO) << "now pi2 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
+  EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
+
+  reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  MessageCounter<MessageHeader> pi1_original_message_header_counter(
+      pi1_event_loop.get(), "/original/aos/remote_timestamps/pi2");
+  MessageCounter<MessageHeader> pi2_original_message_header_counter(
+      pi2_event_loop.get(), "/original/aos/remote_timestamps/pi1");
+
+  aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
+      pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
+  aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
+      pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
+
+  aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+      pi1_event_loop->MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+      pi2_event_loop->MakeFetcher<examples::Ping>("/test");
+
+  aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
+      pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
+  aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
+      pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
+
+  aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
+      pi2_event_loop->MakeFetcher<examples::Pong>("/test");
+  aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
+      pi1_event_loop->MakeFetcher<examples::Pong>("/test");
+
+  const size_t pi1_timestamp_channel = configuration::ChannelIndex(
+      pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
+  const size_t ping_timestamp_channel = configuration::ChannelIndex(
+      pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
+
+  const size_t pi2_timestamp_channel = configuration::ChannelIndex(
+      pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
+  const size_t pong_timestamp_channel = configuration::ChannelIndex(
+      pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
+
+  pi1_event_loop->MakeWatcher(
+      "/aos/remote_timestamps/pi2",
+      [&pi1_event_loop, pi1_timestamp_channel, ping_timestamp_channel,
+       &pi1_timestamp_on_pi1_fetcher, &pi1_timestamp_on_pi2_fetcher,
+       &ping_on_pi1_fetcher,
+       &ping_on_pi2_fetcher](const MessageHeader &header) {
+        const aos::monotonic_clock::time_point header_monotonic_sent_time(
+            chrono::nanoseconds(header.monotonic_sent_time()));
+        const aos::realtime_clock::time_point header_realtime_sent_time(
+            chrono::nanoseconds(header.realtime_sent_time()));
+        const aos::monotonic_clock::time_point header_monotonic_remote_time(
+            chrono::nanoseconds(header.monotonic_remote_time()));
+        const aos::realtime_clock::time_point header_realtime_remote_time(
+            chrono::nanoseconds(header.realtime_remote_time()));
+
+        const Context *pi1_context = nullptr;
+        const Context *pi2_context = nullptr;
+
+        if (header.channel_index() == pi1_timestamp_channel) {
+          ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
+          ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
+          pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
+          pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
+        } else if (header.channel_index() == ping_timestamp_channel) {
+          ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+          ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+          pi1_context = &ping_on_pi1_fetcher.context();
+          pi2_context = &ping_on_pi2_fetcher.context();
+        } else {
+          LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
+                     << configuration::CleanedChannelToString(
+                            pi1_event_loop->configuration()->channels()->Get(
+                                header.channel_index()));
+        }
+
+        EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
+        EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
+        EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+
+        EXPECT_EQ(pi2_context->monotonic_event_time,
+                  header_monotonic_sent_time);
+        EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+        EXPECT_EQ(pi2_context->realtime_remote_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi2_context->monotonic_remote_time,
+                  header_monotonic_remote_time);
+
+        EXPECT_EQ(pi1_context->realtime_event_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi1_context->monotonic_event_time,
+                  header_monotonic_remote_time);
+      });
+  pi2_event_loop->MakeWatcher(
+      "/aos/remote_timestamps/pi1",
+      [&pi2_event_loop, pi2_timestamp_channel, pong_timestamp_channel,
+       &pi2_timestamp_on_pi2_fetcher, &pi2_timestamp_on_pi1_fetcher,
+       &pong_on_pi2_fetcher,
+       &pong_on_pi1_fetcher](const MessageHeader &header) {
+        const aos::monotonic_clock::time_point header_monotonic_sent_time(
+            chrono::nanoseconds(header.monotonic_sent_time()));
+        const aos::realtime_clock::time_point header_realtime_sent_time(
+            chrono::nanoseconds(header.realtime_sent_time()));
+        const aos::monotonic_clock::time_point header_monotonic_remote_time(
+            chrono::nanoseconds(header.monotonic_remote_time()));
+        const aos::realtime_clock::time_point header_realtime_remote_time(
+            chrono::nanoseconds(header.realtime_remote_time()));
+
+        const Context *pi2_context = nullptr;
+        const Context *pi1_context = nullptr;
+
+        if (header.channel_index() == pi2_timestamp_channel) {
+          ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
+          ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
+          pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
+          pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
+        } else if (header.channel_index() == pong_timestamp_channel) {
+          ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
+          ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
+          pi2_context = &pong_on_pi2_fetcher.context();
+          pi1_context = &pong_on_pi1_fetcher.context();
+        } else {
+          LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
+                     << configuration::CleanedChannelToString(
+                            pi2_event_loop->configuration()->channels()->Get(
+                                header.channel_index()));
+        }
+
+        EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
+        EXPECT_EQ(pi1_context->remote_queue_index, header.remote_queue_index());
+        EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+
+        EXPECT_EQ(pi1_context->monotonic_event_time,
+                  header_monotonic_sent_time);
+        EXPECT_EQ(pi1_context->realtime_event_time, header_realtime_sent_time);
+        EXPECT_EQ(pi1_context->realtime_remote_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi1_context->monotonic_remote_time,
+                  header_monotonic_remote_time);
+
+        EXPECT_EQ(pi2_context->realtime_event_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi2_context->monotonic_event_time,
+                  header_monotonic_remote_time);
+      });
+
+  // And confirm we can re-create a log again, while checking the contents.
+  {
+    LoggerState pi1_logger = MakeLogger(
+        configuration::GetNode(log_reader_factory.configuration(), pi1_),
+        &log_reader_factory);
+    LoggerState pi2_logger = MakeLogger(
+        configuration::GetNode(log_reader_factory.configuration(), pi2_),
+        &log_reader_factory);
+
+    StartLogger(&pi1_logger, "relogged");
+    StartLogger(&pi2_logger, "relogged");
+
+    log_reader_factory.Run();
+  }
+
+  EXPECT_EQ(pi2_original_message_header_counter.count(), 0u);
+  EXPECT_EQ(pi1_original_message_header_counter.count(), 0u);
+
+  reader.Deregister();
+}
+
 // TODO(austin): We can write a test which recreates a logfile and confirms that
 // we get it back.  That is the ultimate test.
 
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 81937ca..3923297 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -518,6 +518,7 @@
         // Confirm the forwarded message has matching timestamps to the
         // timestamps we got back.
         EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
         EXPECT_EQ(pi2_context->monotonic_event_time,
                   header_monotonic_sent_time);
         EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
@@ -527,7 +528,7 @@
                   header_monotonic_remote_time);
 
         // Confirm the forwarded message also matches the source message.
-        EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
         EXPECT_EQ(pi1_context->monotonic_event_time,
                   header_monotonic_remote_time);
         EXPECT_EQ(pi1_context->realtime_event_time,