Merge "Print out which queue was corrupted"
diff --git a/aos/configuration.cc b/aos/configuration.cc
index c74234e..f466ee0 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -975,5 +975,40 @@
   return result;
 }
 
+std::vector<const Node *> TimestampNodes(const Configuration *config,
+                                         const Node *my_node) {
+  if (!configuration::MultiNode(config)) {
+    CHECK(my_node == nullptr);
+    return std::vector<const Node *>{};
+  }
+
+  std::set<const Node *> timestamp_logger_nodes;
+  for (const Channel *channel : *config->channels()) {
+    if (!configuration::ChannelIsSendableOnNode(channel, my_node)) {
+      continue;
+    }
+    if (!channel->has_destination_nodes()) {
+      continue;
+    }
+    for (const Connection *connection : *channel->destination_nodes()) {
+      const Node *other_node =
+          configuration::GetNode(config, connection->name()->string_view());
+
+      if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+                                                              my_node)) {
+        VLOG(1) << "Timestamps are logged from "
+                << FlatbufferToJson(other_node);
+        timestamp_logger_nodes.insert(other_node);
+      }
+    }
+  }
+
+  std::vector<const Node *> result;
+  for (const Node *node : timestamp_logger_nodes) {
+    result.emplace_back(node);
+  }
+  return result;
+}
+
 }  // namespace configuration
 }  // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index fbd9cff..f1bcece 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -135,6 +135,10 @@
 std::vector<std::string_view> SourceNodeNames(const Configuration *config,
                                               const Node *my_node);
 
+// Returns the all nodes that are logging timestamps on our node.
+std::vector<const Node *> TimestampNodes(const Configuration *config,
+                                         const Node *my_node);
+
 // TODO(austin): GetSchema<T>(const Flatbuffer<Configuration> &config);
 
 }  // namespace configuration
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 15b4fd9..70dae61 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1153,6 +1153,8 @@
     timestamp.realtime_event_time =
         realtime_clock::time_point(chrono::nanoseconds(
             std::get<2>(oldest_timestamp).message().realtime_sent_time()));
+    timestamp.queue_index =
+        std::get<2>(oldest_timestamp).message().queue_index();
 
     // Consistency check.
     CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
@@ -1255,6 +1257,7 @@
     timestamp.realtime_event_time =
         realtime_clock::time_point(chrono::nanoseconds(
             std::get<2>(oldest_message).message().realtime_sent_time()));
+    timestamp.queue_index = std::get<2>(oldest_message).message().queue_index();
     timestamp.remote_queue_index = 0xffffffff;
 
     CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index a3d16f7..0b0c8f5 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -492,6 +492,7 @@
     monotonic_clock::time_point monotonic_event_time =
         monotonic_clock::min_time;
     realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+    uint32_t queue_index = 0xffffffff;
 
     monotonic_clock::time_point monotonic_remote_time =
         monotonic_clock::min_time;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 2780bd5..19a0229 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -52,38 +52,22 @@
               : aos::Fetcher<message_bridge::ServerStatistics>()) {
   VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
 
-  // Find all the nodes which are logging timestamps on our node.
-  std::set<const Node *> timestamp_logger_nodes;
-  for (const Channel *channel : *configuration_->channels()) {
-    if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
-      continue;
-    }
-    if (!channel->has_destination_nodes()) {
-      continue;
-    }
-    if (!should_log(channel)) {
-      continue;
-    }
-    for (const Connection *connection : *channel->destination_nodes()) {
-      const Node *other_node = configuration::GetNode(
-          configuration_, connection->name()->string_view());
-
-      if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-              connection, event_loop_->node())) {
-        VLOG(1) << "Timestamps are logged from "
-                << FlatbufferToJson(other_node);
-        timestamp_logger_nodes.insert(other_node);
-      }
-    }
-  }
+  // Find all the nodes which are logging timestamps on our node.  This may
+  // over-estimate if should_log is specified.
+  std::vector<const Node *> timestamp_logger_nodes =
+      configuration::TimestampNodes(configuration_, event_loop_->node());
 
   std::map<const Channel *, const Node *> timestamp_logger_channels;
 
   // Now that we have all the nodes accumulated, make remote timestamp loggers
   // for them.
   for (const Node *node : timestamp_logger_nodes) {
+    // Note: since we are doing a find using the event loop channel, we need to
+    // make sure this channel pointer is part of the event loop configuration,
+    // not configuration_.  This only matters when configuration_ !=
+    // event_loop->configuration();
     const Channel *channel = configuration::GetChannel(
-        configuration_,
+        event_loop->configuration(),
         absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
         logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
         event_loop_->node());
@@ -112,6 +96,9 @@
     const Channel *channel = aos::configuration::GetChannel(
         event_loop_->configuration(), config_channel->name()->string_view(),
         config_channel->type()->string_view(), "", event_loop_->node());
+    CHECK(channel != nullptr)
+        << ": Failed to look up channel "
+        << aos::configuration::CleanedChannelToString(config_channel);
     if (!should_log(channel)) {
       continue;
     }
@@ -168,6 +155,29 @@
       fetchers_.emplace_back(std::move(fs));
     }
   }
+
+  // When we are logging remote timestamps, we need to be able to translate from
+  // the channel index that the event loop uses to the channel index in the
+  // config in the log file.
+  event_loop_to_logged_channel_index_.resize(
+      event_loop->configuration()->channels()->size(), -1);
+  for (size_t event_loop_channel_index = 0;
+       event_loop_channel_index <
+       event_loop->configuration()->channels()->size();
+       ++event_loop_channel_index) {
+    const Channel *event_loop_channel =
+        event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
+    const Channel *logged_channel = aos::configuration::GetChannel(
+        configuration_, event_loop_channel->name()->string_view(),
+        event_loop_channel->type()->string_view(), "",
+        configuration::GetNode(configuration_, event_loop_->node()));
+
+    if (logged_channel != nullptr) {
+      event_loop_to_logged_channel_index_[event_loop_channel_index] =
+          configuration::ChannelIndex(configuration_, logged_channel);
+    }
+  }
 }
 
 Logger::~Logger() {
@@ -549,10 +559,18 @@
 
           logger::MessageHeader::Builder message_header_builder(fbb);
 
+          // TODO(austin): This needs to check the channel_index and confirm
+          // that it should be logged before squirreling away the timestamp to
+          // disk.  We don't want to log irrelevant timestamps.
+
           // Note: this must match the same order as MessageBridgeServer and
           // PackMessage.  We want identical headers to have identical
           // on-the-wire formats to make comparing them easier.
-          message_header_builder.add_channel_index(msg->channel_index());
+
+          // Translate from the channel index that the event loop uses to the
+          // channel index in the log file.
+          message_header_builder.add_channel_index(
+              event_loop_to_logged_channel_index_[msg->channel_index()]);
 
           message_header_builder.add_queue_index(msg->queue_index());
           message_header_builder.add_monotonic_sent_time(
@@ -856,6 +874,22 @@
       replay_configuration_(replay_configuration) {
   MakeRemappedConfig();
 
+  // Remap all existing remote timestamp channels.  They will be recreated, and
+  // the data logged isn't relevant anymore.
+  for (const Node *node : Nodes()) {
+    std::vector<const Node *> timestamp_logger_nodes =
+        configuration::TimestampNodes(logged_configuration(), node);
+    for (const Node *remote_node : timestamp_logger_nodes) {
+      const std::string channel = absl::StrCat(
+          "/aos/remote_timestamps/", remote_node->name()->string_view());
+      CHECK(HasChannel<logger::MessageHeader>(channel, node))
+          << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
+          << logger::MessageHeader::GetFullyQualifiedName() << "\"} for node "
+          << node->name()->string_view();
+      RemapLoggedChannel<logger::MessageHeader>(channel, node);
+    }
+  }
+
   if (replay_configuration) {
     CHECK_EQ(configuration::MultiNode(configuration()),
              configuration::MultiNode(replay_configuration))
@@ -956,10 +990,22 @@
     states_[node_index] =
         std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
     State *state = states_[node_index].get();
-
-    Register(state->SetNodeEventLoopFactory(
+    state->set_event_loop(state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node)));
+
+    state->SetChannelCount(logged_configuration()->channels()->size());
   }
+
+  // Register after making all the State objects so we can build references
+  // between them.
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    State *state = states_[node_index].get();
+
+    Register(state->event_loop());
+  }
+
   if (live_nodes_ == 0) {
     LOG(FATAL)
         << "Don't have logs from any of the nodes in the replay config--are "
@@ -1313,29 +1359,46 @@
 
   const bool has_data = state->SetNode();
 
-  state->SetChannelCount(logged_configuration()->channels()->size());
+  for (size_t logged_channel_index = 0;
+       logged_channel_index < logged_configuration()->channels()->size();
+       ++logged_channel_index) {
+    const Channel *channel = RemapChannel(
+        event_loop,
+        logged_configuration()->channels()->Get(logged_channel_index));
 
-  for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
-    const Channel *channel =
-        RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
-
-    NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
+    aos::Sender<MessageHeader> *remote_timestamp_sender = nullptr;
+
+    State *source_state = nullptr;
 
     if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
         configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
-      const Node *target_node = configuration::GetNode(
+      // We've got a message which is being forwarded to this node.
+      const Node *source_node = configuration::GetNode(
           event_loop->configuration(), channel->source_node()->string_view());
-      filter = GetFilter(event_loop->node(), target_node);
+      filter = GetFilter(event_loop->node(), source_node);
 
-      if (event_loop_factory_ != nullptr) {
-        channel_target_event_loop_factory =
-            event_loop_factory_->GetNodeEventLoopFactory(target_node);
+      // Delivery timestamps are supposed to be logged back on the source node.
+      // Configure remote timestamps to be sent.
+      const bool delivery_time_is_logged =
+          configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+              channel, event_loop->node(), source_node);
+
+      source_state =
+          states_[configuration::GetNodeIndex(configuration(), source_node)]
+              .get();
+
+      if (delivery_time_is_logged) {
+        remote_timestamp_sender =
+            source_state->RemoteTimestampSender(event_loop->node());
       }
     }
 
-    state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
-                      channel_target_event_loop_factory);
+    state->SetChannel(
+        logged_channel_index,
+        configuration::ChannelIndex(event_loop->configuration(), channel),
+        event_loop->MakeRawSender(channel), filter, remote_timestamp_sender,
+        source_state);
   }
 
   // If we didn't find any log files with data in them, we won't ever get a
@@ -1464,10 +1527,7 @@
         // TODO(austin): std::move channel_data in and make that efficient in
         // simulation.
         state->Send(channel_index, channel_data.message().data()->Data(),
-                    channel_data.message().data()->size(),
-                    channel_timestamp.monotonic_remote_time,
-                    channel_timestamp.realtime_remote_time,
-                    channel_timestamp.remote_queue_index);
+                    channel_data.message().data()->size(), channel_timestamp);
       } else if (state->at_end() && !ignore_missing_data_) {
         // We are at the end of the log file and found missing data.  Finish
         // reading the rest of the log file and call it quits.  We don't want
@@ -1829,18 +1889,148 @@
 
 void LogReader::State::SetChannelCount(size_t count) {
   channels_.resize(count);
+  remote_timestamp_senders_.resize(count);
   filters_.resize(count);
-  channel_target_event_loop_factory_.resize(count);
+  channel_source_state_.resize(count);
+  factory_channel_index_.resize(count);
+  queue_index_map_.resize(count);
 }
 
 void LogReader::State::SetChannel(
-    size_t channel, std::unique_ptr<RawSender> sender,
+    size_t logged_channel_index, size_t factory_channel_index,
+    std::unique_ptr<RawSender> sender,
     message_bridge::NoncausalOffsetEstimator *filter,
-    NodeEventLoopFactory *channel_target_event_loop_factory) {
-  channels_[channel] = std::move(sender);
-  filters_[channel] = filter;
-  channel_target_event_loop_factory_[channel] =
-      channel_target_event_loop_factory;
+    aos::Sender<MessageHeader> *remote_timestamp_sender, State *source_state) {
+  channels_[logged_channel_index] = std::move(sender);
+  filters_[logged_channel_index] = filter;
+  remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
+
+  if (source_state) {
+    channel_source_state_[logged_channel_index] = source_state;
+
+    if (remote_timestamp_sender != nullptr) {
+      source_state->queue_index_map_[logged_channel_index] =
+          std::make_unique<std::vector<State::SentTimestamp>>();
+    }
+  }
+
+  factory_channel_index_[logged_channel_index] = factory_channel_index;
+}
+
+bool LogReader::State::Send(
+    size_t channel_index, const void *data, size_t size,
+    const TimestampMerger::DeliveryTimestamp &delivery_timestamp) {
+  aos::RawSender *sender = channels_[channel_index].get();
+  uint32_t remote_queue_index = 0xffffffff;
+
+  if (remote_timestamp_senders_[channel_index] != nullptr) {
+    std::vector<SentTimestamp> *queue_index_map =
+        CHECK_NOTNULL(CHECK_NOTNULL(channel_source_state_[channel_index])
+                          ->queue_index_map_[channel_index]
+                          .get());
+
+    SentTimestamp search;
+    search.monotonic_event_time = delivery_timestamp.monotonic_remote_time;
+    search.realtime_event_time = delivery_timestamp.realtime_remote_time;
+    search.queue_index = delivery_timestamp.remote_queue_index;
+
+    // Find the sent time if available.
+    auto element = std::lower_bound(
+        queue_index_map->begin(), queue_index_map->end(), search,
+        [](SentTimestamp a, SentTimestamp b) {
+          if (b.monotonic_event_time < a.monotonic_event_time) {
+            return false;
+          }
+          if (b.monotonic_event_time > a.monotonic_event_time) {
+            return true;
+          }
+
+          if (b.queue_index < a.queue_index) {
+            return false;
+          }
+          if (b.queue_index > a.queue_index) {
+            return true;
+          }
+
+          CHECK_EQ(a.realtime_event_time, b.realtime_event_time);
+          return false;
+        });
+
+    // TODO(austin): Be a bit more principled here, but we will want to do that
+    // after the logger rewrite.  We hit this when one node finishes, but the
+    // other node isn't done yet.  So there is no send time, but there is a
+    // receive time.
+    if (element != queue_index_map->end()) {
+      CHECK_EQ(element->monotonic_event_time,
+               delivery_timestamp.monotonic_remote_time);
+      CHECK_EQ(element->realtime_event_time,
+               delivery_timestamp.realtime_remote_time);
+      CHECK_EQ(element->queue_index, delivery_timestamp.remote_queue_index);
+
+      remote_queue_index = element->actual_queue_index;
+    }
+  }
+
+  // Send!  Use the replayed queue index here instead of the logged queue index
+  // for the remote queue index.  This makes re-logging work.
+  const bool sent =
+      sender->Send(data, size, delivery_timestamp.monotonic_remote_time,
+                   delivery_timestamp.realtime_remote_time, remote_queue_index);
+  if (!sent) return false;
+
+  if (queue_index_map_[channel_index]) {
+    SentTimestamp timestamp;
+    timestamp.monotonic_event_time = delivery_timestamp.monotonic_event_time;
+    timestamp.realtime_event_time = delivery_timestamp.realtime_event_time;
+    timestamp.queue_index = delivery_timestamp.queue_index;
+    timestamp.actual_queue_index = sender->sent_queue_index();
+    queue_index_map_[channel_index]->emplace_back(timestamp);
+  } else if (remote_timestamp_senders_[channel_index] != nullptr) {
+    aos::Sender<MessageHeader>::Builder builder =
+        remote_timestamp_senders_[channel_index]->MakeBuilder();
+
+    logger::MessageHeader::Builder message_header_builder =
+        builder.MakeBuilder<logger::MessageHeader>();
+
+    message_header_builder.add_channel_index(
+        factory_channel_index_[channel_index]);
+
+    // Swap the remote and sent metrics.  They are from the sender's
+    // perspective, not the receiver's perspective.
+    message_header_builder.add_monotonic_sent_time(
+        sender->monotonic_sent_time().time_since_epoch().count());
+    message_header_builder.add_realtime_sent_time(
+        sender->realtime_sent_time().time_since_epoch().count());
+    message_header_builder.add_queue_index(sender->sent_queue_index());
+
+    message_header_builder.add_monotonic_remote_time(
+        delivery_timestamp.monotonic_remote_time.time_since_epoch().count());
+    message_header_builder.add_realtime_remote_time(
+        delivery_timestamp.realtime_remote_time.time_since_epoch().count());
+
+    message_header_builder.add_remote_queue_index(remote_queue_index);
+
+    builder.Send(message_header_builder.Finish());
+  }
+
+  return true;
+}
+
+aos::Sender<MessageHeader> *LogReader::State::RemoteTimestampSender(
+    const Node *delivered_node) {
+  auto sender = remote_timestamp_senders_map_.find(delivered_node);
+
+  if (sender == remote_timestamp_senders_map_.end()) {
+    sender = remote_timestamp_senders_map_
+                 .emplace(std::make_pair(
+                     delivered_node,
+                     event_loop()->MakeSender<MessageHeader>(
+                         absl::StrCat("/aos/remote_timestamps/",
+                                      delivered_node->name()->string_view()))))
+                 .first;
+  }
+
+  return &(sender->second);
 }
 
 std::tuple<TimestampMerger::DeliveryTimestamp, int,
@@ -1929,6 +2119,7 @@
   for (size_t i = 0; i < channels_.size(); ++i) {
     channels_[i].reset();
   }
+  remote_timestamp_senders_map_.clear();
   event_loop_unique_ptr_.reset();
   event_loop_ = nullptr;
   timer_handler_ = nullptr;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 161300c..dd10f38 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -110,6 +110,7 @@
     std::unique_ptr<RawFetcher> fetcher;
     bool written = false;
 
+    // Channel index to log to.
     int channel_index = -1;
     const Channel *channel = nullptr;
     const Node *timestamp_node = nullptr;
@@ -130,6 +131,10 @@
     int node_index = 0;
   };
 
+  // Vector mapping from the channel index from the event loop to the logged
+  // channel index.
+  std::vector<int> event_loop_to_logged_channel_index_;
+
   struct NodeState {
     aos::monotonic_clock::time_point monotonic_start_time =
         aos::monotonic_clock::min_time;
@@ -460,6 +465,11 @@
       }
     }
 
+    // Returns the MessageHeader sender to log delivery timestamps to for the
+    // provided remote node.
+    aos::Sender<MessageHeader> *RemoteTimestampSender(
+        const Node *delivered_node);
+
     // Converts a timestamp from the monotonic clock on this node to the
     // distributed clock.
     distributed_clock::time_point ToDistributedClock(
@@ -482,17 +492,19 @@
     // Returns the current time on the remote node which sends messages on
     // channel_index.
     monotonic_clock::time_point monotonic_remote_now(size_t channel_index) {
-      return channel_target_event_loop_factory_[channel_index]->monotonic_now();
+      return channel_source_state_[channel_index]
+          ->node_event_loop_factory_->monotonic_now();
     }
 
     distributed_clock::time_point RemoteToDistributedClock(
         size_t channel_index, monotonic_clock::time_point time) {
-      return channel_target_event_loop_factory_[channel_index]
-          ->ToDistributedClock(time);
+      return channel_source_state_[channel_index]
+          ->node_event_loop_factory_->ToDistributedClock(time);
     }
 
     const Node *remote_node(size_t channel_index) {
-      return channel_target_event_loop_factory_[channel_index]->node();
+      return channel_source_state_[channel_index]
+          ->node_event_loop_factory_->node();
     }
 
     monotonic_clock::time_point monotonic_now() {
@@ -507,9 +519,11 @@
     void SetChannelCount(size_t count);
 
     // Sets the sender, filter, and target factory for a channel.
-    void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
+    void SetChannel(size_t logged_channel_index, size_t factory_channel_index,
+                    std::unique_ptr<RawSender> sender,
                     message_bridge::NoncausalOffsetEstimator *filter,
-                    NodeEventLoopFactory *channel_target_event_loop_factory);
+                    aos::Sender<MessageHeader> *remote_timestamp_sender,
+                    State *source_state);
 
     // Returns if we have read all the messages from all the logs.
     bool at_end() const { return channel_merger_->at_end(); }
@@ -529,13 +543,7 @@
 
     // Sends a buffer on the provided channel index.
     bool Send(size_t channel_index, const void *data, size_t size,
-              aos::monotonic_clock::time_point monotonic_remote_time,
-              aos::realtime_clock::time_point realtime_remote_time,
-              uint32_t remote_queue_index) {
-      return channels_[channel_index]->Send(data, size, monotonic_remote_time,
-                                            realtime_remote_time,
-                                            remote_queue_index);
-    }
+              const TimestampMerger::DeliveryTimestamp &delivery_timestamp);
 
     // Returns a debug string for the channel merger.
     std::string DebugString() const {
@@ -568,6 +576,28 @@
 
     // Senders.
     std::vector<std::unique_ptr<RawSender>> channels_;
+    std::vector<aos::Sender<MessageHeader> *> remote_timestamp_senders_;
+    // The mapping from logged channel index to sent channel index.  Needed for
+    // sending out MessageHeaders.
+    std::vector<int> factory_channel_index_;
+
+    struct SentTimestamp {
+      monotonic_clock::time_point monotonic_event_time =
+          monotonic_clock::min_time;
+      realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+      uint32_t queue_index = 0xffffffff;
+
+      // The queue index that this message *actually* was sent with.
+      uint32_t actual_queue_index = 0xffffffff;
+    };
+
+    // Stores all the timestamps that have been sent on this channel.  This is
+    // only done for channels which are forwarded and on the node which
+    // initially sends the message.
+    //
+    // TODO(austin): This whole concept is a hack.  We should be able to
+    // associate state with the message as it gets sorted and recover it.
+    std::vector<std::unique_ptr<std::vector<SentTimestamp>>> queue_index_map_;
 
     // Factory (if we are in sim) that this loop was created on.
     NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
@@ -585,7 +615,10 @@
 
     // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
     // channel) which correspond to the originating node.
-    std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory_;
+    std::vector<State *> channel_source_state_;
+
+    std::map<const Node *, aos::Sender<MessageHeader>>
+        remote_timestamp_senders_map_;
   };
 
   // Node index -> State.
diff --git a/aos/events/logging/logger_math.cc b/aos/events/logging/logger_math.cc
index c333f55..ea360cc 100644
--- a/aos/events/logging/logger_math.cc
+++ b/aos/events/logging/logger_math.cc
@@ -167,13 +167,64 @@
       return std::make_tuple(
           Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
           Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
-    } else {
-      // TODO(austin): Solve just the nodes we know about.  This is harder and
-      // there are no logs which require this yet to test on.
-      CHECK_EQ(cached_valid_node_count_, nodes_count())
-          << ": TODO(austin): Handle partial valid nodes";
-
+    } else if (cached_valid_node_count_ == nodes_count()) {
       return Solve(mpq_map, mpq_offsets);
+    } else {
+      // Strip out any columns (nodes) which aren't relevant.  Solve the
+      // simplified problem, then set any nodes which were missing back to slope
+      // 1, offset 0 (ie the distributed clock).
+      Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>
+          valid_node_mpq_map =
+              Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+                  nonzero_offset_count, cached_valid_node_count_);
+
+      {
+        // Only copy over the columns with valid nodes in them.
+        size_t column = 0;
+        for (size_t i = 0; i < valid_nodes.size(); ++i) {
+          if (valid_nodes[i]) {
+            valid_node_mpq_map.col(column) = mpq_map.col(i);
+
+            ++column;
+          }
+        }
+        // The 1/n needs to be based on the number of nodes being solved.
+        // Recreate it here.
+        for (int j = 0; j < valid_node_mpq_map.cols(); ++j) {
+          valid_node_mpq_map(0, j) = mpq_class(1, cached_valid_node_count_);
+        }
+      }
+
+      VLOG(1) << "Reduced node filtered map "
+              << ToDouble(valid_node_mpq_map).format(HeavyFmt);
+      VLOG(1) << "Reduced node filtered offsets "
+              << ToDouble(mpq_offsets).format(HeavyFmt);
+
+      // Solve the simplified problem now.
+      std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+                 Eigen::Matrix<double, Eigen::Dynamic, 1>>
+          valid_result = Solve(valid_node_mpq_map, mpq_offsets);
+
+      // And expand the results back into a solution matrix.
+      std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+                 Eigen::Matrix<double, Eigen::Dynamic, 1>>
+          result = std::make_tuple(
+              Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
+              Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
+
+      {
+        size_t column = 0;
+        for (size_t i = 0; i < valid_nodes.size(); ++i) {
+          if (valid_nodes[i]) {
+            std::get<0>(result)(i) = std::get<0>(valid_result)(column);
+            std::get<1>(result)(i) = std::get<1>(valid_result)(column);
+
+            ++column;
+          }
+        }
+      }
+
+      return result;
     }
   } else {
     const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 108d4de..51994e3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -5,6 +5,7 @@
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
 #include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_generated.h"
 #include "aos/util/file.h"
 #include "glog/logging.h"
 #include "gmock/gmock.h"
@@ -412,16 +413,24 @@
     std::unique_ptr<Logger> logger;
   };
 
-  LoggerState MakeLogger(const Node *node) {
-    return {event_loop_factory_.MakeEventLoop("logger", node), {}};
+  LoggerState MakeLogger(const Node *node,
+                         SimulatedEventLoopFactory *factory = nullptr) {
+    if (factory == nullptr) {
+      factory = &event_loop_factory_;
+    }
+    return {factory->MakeEventLoop("logger", node), {}};
   }
 
-  void StartLogger(LoggerState *logger) {
+  void StartLogger(LoggerState *logger, std::string logfile_base = "") {
+    if (logfile_base.empty()) {
+      logfile_base = logfile_base_;
+    }
+
     logger->logger = std::make_unique<Logger>(logger->event_loop.get());
     logger->logger->set_polling_period(std::chrono::milliseconds(100));
-    logger->event_loop->OnRun([this, logger]() {
+    logger->event_loop->OnRun([logger, logfile_base]() {
       logger->logger->StartLogging(std::make_unique<MultiNodeLogNamer>(
-          logfile_base_, logger->event_loop->configuration(),
+          logfile_base, logger->event_loop->configuration(),
           logger->event_loop->node()));
     });
   }
@@ -656,7 +665,7 @@
 
   LogReader reader(structured_logfiles_);
 
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   // This sends out the fetched messages and advances time to the start of the
@@ -731,7 +740,7 @@
         ++pi2_ping_count;
       });
 
-  constexpr ssize_t kQueueIndexOffset = 0;
+  constexpr ssize_t kQueueIndexOffset = -9;
   // Confirm that the ping and pong counts both match, and the value also
   // matches.
   pi1_event_loop->MakeWatcher(
@@ -771,7 +780,7 @@
                 << pi2_event_loop->context().monotonic_event_time;
 
         EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
-                  pi2_pong_count + kQueueIndexOffset - 9);
+                  pi2_pong_count + kQueueIndexOffset);
 
         EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
                   chrono::microseconds(200) +
@@ -854,7 +863,7 @@
 
   LogReader reader(structured_logfiles_);
 
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   // This sends out the fetched messages and advances time to the start of the
@@ -994,7 +1003,7 @@
 
   LogReader reader(structured_logfiles_);
 
-  SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
   const Node *pi1 =
@@ -1216,6 +1225,213 @@
   reader.Deregister();
 }
 
+// Tests that we properly recreate forwarded timestamps when replaying a log.
+// This should be enough that we can then re-run the logger and get a valid log
+// back.
+TEST_F(MultinodeLoggerTest, MessageHeader) {
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+
+  LogReader reader(structured_logfiles_);
+
+  SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+  log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  const Node *pi1 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+  LOG(INFO) << "now pi1 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+  LOG(INFO) << "now pi2 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
+  EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
+
+  reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
+
+  std::unique_ptr<EventLoop> pi1_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
+
+  MessageCounter<MessageHeader> pi1_original_message_header_counter(
+      pi1_event_loop.get(), "/original/aos/remote_timestamps/pi2");
+  MessageCounter<MessageHeader> pi2_original_message_header_counter(
+      pi2_event_loop.get(), "/original/aos/remote_timestamps/pi1");
+
+  aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
+      pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
+  aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
+      pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
+
+  aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+      pi1_event_loop->MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+      pi2_event_loop->MakeFetcher<examples::Ping>("/test");
+
+  aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
+      pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
+  aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
+      pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
+
+  aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
+      pi2_event_loop->MakeFetcher<examples::Pong>("/test");
+  aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
+      pi1_event_loop->MakeFetcher<examples::Pong>("/test");
+
+  const size_t pi1_timestamp_channel = configuration::ChannelIndex(
+      pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
+  const size_t ping_timestamp_channel = configuration::ChannelIndex(
+      pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
+
+  const size_t pi2_timestamp_channel = configuration::ChannelIndex(
+      pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
+  const size_t pong_timestamp_channel = configuration::ChannelIndex(
+      pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
+
+  pi1_event_loop->MakeWatcher(
+      "/aos/remote_timestamps/pi2",
+      [&pi1_event_loop, pi1_timestamp_channel, ping_timestamp_channel,
+       &pi1_timestamp_on_pi1_fetcher, &pi1_timestamp_on_pi2_fetcher,
+       &ping_on_pi1_fetcher,
+       &ping_on_pi2_fetcher](const MessageHeader &header) {
+        const aos::monotonic_clock::time_point header_monotonic_sent_time(
+            chrono::nanoseconds(header.monotonic_sent_time()));
+        const aos::realtime_clock::time_point header_realtime_sent_time(
+            chrono::nanoseconds(header.realtime_sent_time()));
+        const aos::monotonic_clock::time_point header_monotonic_remote_time(
+            chrono::nanoseconds(header.monotonic_remote_time()));
+        const aos::realtime_clock::time_point header_realtime_remote_time(
+            chrono::nanoseconds(header.realtime_remote_time()));
+
+        const Context *pi1_context = nullptr;
+        const Context *pi2_context = nullptr;
+
+        if (header.channel_index() == pi1_timestamp_channel) {
+          ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
+          ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
+          pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
+          pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
+        } else if (header.channel_index() == ping_timestamp_channel) {
+          ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+          ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+          pi1_context = &ping_on_pi1_fetcher.context();
+          pi2_context = &ping_on_pi2_fetcher.context();
+        } else {
+          LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
+                     << configuration::CleanedChannelToString(
+                            pi1_event_loop->configuration()->channels()->Get(
+                                header.channel_index()));
+        }
+
+        EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
+        EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
+        EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+
+        EXPECT_EQ(pi2_context->monotonic_event_time,
+                  header_monotonic_sent_time);
+        EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+        EXPECT_EQ(pi2_context->realtime_remote_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi2_context->monotonic_remote_time,
+                  header_monotonic_remote_time);
+
+        EXPECT_EQ(pi1_context->realtime_event_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi1_context->monotonic_event_time,
+                  header_monotonic_remote_time);
+      });
+  pi2_event_loop->MakeWatcher(
+      "/aos/remote_timestamps/pi1",
+      [&pi2_event_loop, pi2_timestamp_channel, pong_timestamp_channel,
+       &pi2_timestamp_on_pi2_fetcher, &pi2_timestamp_on_pi1_fetcher,
+       &pong_on_pi2_fetcher,
+       &pong_on_pi1_fetcher](const MessageHeader &header) {
+        const aos::monotonic_clock::time_point header_monotonic_sent_time(
+            chrono::nanoseconds(header.monotonic_sent_time()));
+        const aos::realtime_clock::time_point header_realtime_sent_time(
+            chrono::nanoseconds(header.realtime_sent_time()));
+        const aos::monotonic_clock::time_point header_monotonic_remote_time(
+            chrono::nanoseconds(header.monotonic_remote_time()));
+        const aos::realtime_clock::time_point header_realtime_remote_time(
+            chrono::nanoseconds(header.realtime_remote_time()));
+
+        const Context *pi2_context = nullptr;
+        const Context *pi1_context = nullptr;
+
+        if (header.channel_index() == pi2_timestamp_channel) {
+          ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
+          ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
+          pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
+          pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
+        } else if (header.channel_index() == pong_timestamp_channel) {
+          ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
+          ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
+          pi2_context = &pong_on_pi2_fetcher.context();
+          pi1_context = &pong_on_pi1_fetcher.context();
+        } else {
+          LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
+                     << configuration::CleanedChannelToString(
+                            pi2_event_loop->configuration()->channels()->Get(
+                                header.channel_index()));
+        }
+
+        EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
+        EXPECT_EQ(pi1_context->remote_queue_index, header.remote_queue_index());
+        EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+
+        EXPECT_EQ(pi1_context->monotonic_event_time,
+                  header_monotonic_sent_time);
+        EXPECT_EQ(pi1_context->realtime_event_time, header_realtime_sent_time);
+        EXPECT_EQ(pi1_context->realtime_remote_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi1_context->monotonic_remote_time,
+                  header_monotonic_remote_time);
+
+        EXPECT_EQ(pi2_context->realtime_event_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi2_context->monotonic_event_time,
+                  header_monotonic_remote_time);
+      });
+
+  // And confirm we can re-create a log again, while checking the contents.
+  {
+    LoggerState pi1_logger = MakeLogger(
+        configuration::GetNode(log_reader_factory.configuration(), pi1_),
+        &log_reader_factory);
+    LoggerState pi2_logger = MakeLogger(
+        configuration::GetNode(log_reader_factory.configuration(), pi2_),
+        &log_reader_factory);
+
+    StartLogger(&pi1_logger, "relogged");
+    StartLogger(&pi2_logger, "relogged");
+
+    log_reader_factory.Run();
+  }
+
+  EXPECT_EQ(pi2_original_message_header_counter.count(), 0u);
+  EXPECT_EQ(pi1_original_message_header_counter.count(), 0u);
+
+  reader.Deregister();
+}
+
 // TODO(austin): We can write a test which recreates a logfile and confirms that
 // we get it back.  That is the ultimate test.
 
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 81937ca..3923297 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -518,6 +518,7 @@
         // Confirm the forwarded message has matching timestamps to the
         // timestamps we got back.
         EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
         EXPECT_EQ(pi2_context->monotonic_event_time,
                   header_monotonic_sent_time);
         EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
@@ -527,7 +528,7 @@
                   header_monotonic_remote_time);
 
         // Confirm the forwarded message also matches the source message.
-        EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
         EXPECT_EQ(pi1_context->monotonic_event_time,
                   header_monotonic_remote_time);
         EXPECT_EQ(pi1_context->realtime_event_time,