Merge "Support running logger_test under gdb"
diff --git a/WORKSPACE b/WORKSPACE
index a8c8a19..464d2e2 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -652,8 +652,8 @@
 http_file(
     name = "drivetrain_replay",
     downloaded_file_path = "spinning_wheels_while_still.bfbs",
-    sha256 = "d724dbf0acae894b30c9bb62006a2633a7f4c478db48548e76cbec03cbb07f46",
-    urls = ["https://www.frc971.org/Build-Dependencies/spinning_wheels_while_still3.bfbs"],
+    sha256 = "8abe3bbf7ac7a3ab37ad8a313ec22fc244899d916f5e9037100b02e242f5fb45",
+    urls = ["https://www.frc971.org/Build-Dependencies/spinning_wheels_while_still4.bfbs"],
 )
 
 # OpenCV armhf (for raspberry pi)
diff --git a/aos/BUILD b/aos/BUILD
index 2761cbf..9a62e66 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -547,3 +547,16 @@
         "//aos/testing:googletest",
     ],
 )
+
+cc_test(
+    name = "flatbuffers_test",
+    srcs = [
+        "flatbuffers_test.cc",
+    ],
+    deps = [
+        ":flatbuffers",
+        ":json_to_flatbuffer",
+        ":json_to_flatbuffer_flatbuffer",
+        "//aos/testing:googletest",
+    ],
+)
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index caa2b90..7dabc18 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -33,6 +33,11 @@
 DEFINE_bool(pretty, false,
             "If true, pretty print the messages on multiple lines");
 
+bool EndsWith(std::string_view str, std::string_view ending) {
+  return str.size() >= ending.size() &&
+         str.substr(str.size() - ending.size()) == ending;
+}
+
 // Print the flatbuffer out to stdout, both to remove the unnecessary cruft from
 // glog and to allow the user to readily redirect just the logged output
 // independent of any debugging information on stderr.
@@ -66,7 +71,9 @@
     // its not a directory
     // it could be a file
     // or it could not exist
-    files->emplace_back(filename);
+    if (EndsWith(filename, ".bfbs") || EndsWith(filename, ".bfbs.xz")) {
+      files->emplace_back(filename);
+    }
     return;
   }
 
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index eac8d08..f901056 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -61,7 +61,7 @@
   std::vector<ChannelStats> channel_stats;
 
   // Open LogFile
-  aos::logger::LogReader reader(FLAGS_logfile);
+  aos::logger::LogReader reader(aos::logger::SortParts({FLAGS_logfile}));
   aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   reader.Register(&log_reader_factory);
 
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 08a230c..168230e 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -257,6 +257,33 @@
   return result;
 }
 
+std::vector<std::string> FindNodes(const std::vector<LogFile> &parts) {
+  std::set<std::string> nodes;
+  for (const LogFile &log_file : parts) {
+    for (const LogParts& part : log_file.parts) {
+      nodes.insert(part.node);
+    }
+  }
+  std::vector<std::string> node_list;
+  while (!nodes.empty()) {
+    node_list.emplace_back(std::move(nodes.extract(nodes.begin()).value()));
+  }
+  return node_list;
+}
+
+std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
+                                         std::string_view node) {
+  std::vector<LogParts> result;
+  for (const LogFile &log_file : parts) {
+    for (const LogParts& part : log_file.parts) {
+      if (part.node == node) {
+        result.emplace_back(part);
+      }
+    }
+  }
+  return result;
+}
+
 std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
   stream << "{";
   if (!file.log_event_uuid.empty()) {
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 0d2a0fb..94cb771 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -56,6 +56,12 @@
 // Takes a bunch of parts and sorts them based on part_uuid and part_index.
 std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
 
+// Finds all the nodes which have parts logged from their point of view.
+std::vector<std::string> FindNodes(const std::vector<LogFile> &parts);
+// Finds all the parts which are from the point of view of a single node.
+std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
+                                         std::string_view node);
+
 }  // namespace logger
 }  // namespace aos
 
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 330c78e..9316795 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -442,8 +442,15 @@
       newest_timestamp_ = message_reader_.newest_timestamp();
       const monotonic_clock::time_point monotonic_sent_time(
           chrono::nanoseconds(message->message().monotonic_sent_time()));
-      CHECK_GE(monotonic_sent_time,
-               newest_timestamp_ - max_out_of_order_duration());
+      // TODO(austin): Does this work with startup?  Might need to use the start
+      // time.
+      // TODO(austin): Does this work with startup when we don't know the remote
+      // start time too?  Look at one of those logs to compare.
+      if (monotonic_sent_time > parts_.monotonic_start_time) {
+        CHECK_GE(monotonic_sent_time,
+                 newest_timestamp_ - max_out_of_order_duration())
+            << ": Max out of order exceeded. " << parts_;
+      }
       return message;
     }
     NextLog();
@@ -461,1172 +468,496 @@
   ++next_part_index_;
 }
 
-SplitMessageReader::SplitMessageReader(
-    const std::vector<std::string> &filenames)
-    : filenames_(filenames),
-      log_file_header_(SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
-  CHECK(NextLogFile()) << ": filenames is empty.  Need files to read.";
-
-  // Grab any log file header.  They should all match (and we will check as we
-  // open more of them).
-  log_file_header_ = message_reader_->raw_log_file_header();
-
-  for (size_t i = 1; i < filenames_.size(); ++i) {
-    MessageReader message_reader(filenames_[i]);
-
-    const monotonic_clock::time_point new_monotonic_start_time(
-        chrono::nanoseconds(
-            message_reader.log_file_header()->monotonic_start_time()));
-    const realtime_clock::time_point new_realtime_start_time(
-        chrono::nanoseconds(
-            message_reader.log_file_header()->realtime_start_time()));
-
-    // There are 2 types of part files.  Part files from before time estimation
-    // has started, and part files after.  We don't declare a log file "started"
-    // until time estimation is up.  And once a log file starts, it should never
-    // stop again, and should remain constant.
-    // To compare both types of headers, we mutate our saved copy of the header
-    // to match the next chunk by updating time if we detect a stopped ->
-    // started transition.
-    if (monotonic_start_time() == monotonic_clock::min_time) {
-      CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
-      // We should only be missing the monotonic start time when logging data
-      // for remote nodes.  We don't have a good way to determine the remote
-      // realtime offset, so it shouldn't be filled out.
-      // TODO(austin): If we have a good way, feel free to fill it out.  It
-      // probably won't be better than we could do in post though with the same
-      // data.
-      CHECK(!log_file_header_.mutable_message()->has_realtime_start_time());
-      if (new_monotonic_start_time != monotonic_clock::min_time) {
-        // If we finally found our start time, update the header.  Do this once
-        // because it should never change again.
-        log_file_header_.mutable_message()->mutate_monotonic_start_time(
-            new_monotonic_start_time.time_since_epoch().count());
-        log_file_header_.mutable_message()->mutate_realtime_start_time(
-            new_realtime_start_time.time_since_epoch().count());
-      }
-    }
-
-    // We don't have a good way to set the realtime start time on remote nodes.
-    // Confirm it remains consistent.
-    CHECK_EQ(log_file_header_.mutable_message()->has_realtime_start_time(),
-             message_reader.log_file_header()->has_realtime_start_time());
-
-    // Parts index will *not* match unless we set them to match.  We only want
-    // to accept the start time and parts mismatching, so set them.
-    log_file_header_.mutable_message()->mutate_parts_index(
-        message_reader.log_file_header()->parts_index());
-
-    // Now compare that the headers match.
-    if (!CompareFlatBuffer(message_reader.raw_log_file_header(),
-                           log_file_header_)) {
-      if (message_reader.log_file_header()->has_log_event_uuid() &&
-          log_file_header_.message().has_log_event_uuid() &&
-          message_reader.log_file_header()->log_event_uuid()->string_view() !=
-              log_file_header_.message().log_event_uuid()->string_view()) {
-        LOG(FATAL) << "Logger UUIDs don't match between log file chunks "
-                   << filenames_[0] << " and " << filenames_[i]
-                   << ", this is not supported.";
-      }
-      if (message_reader.log_file_header()->has_parts_uuid() &&
-          log_file_header_.message().has_parts_uuid() &&
-          message_reader.log_file_header()->parts_uuid()->string_view() !=
-              log_file_header_.message().parts_uuid()->string_view()) {
-        LOG(FATAL) << "Parts UUIDs don't match between log file chunks "
-                   << filenames_[0] << " and " << filenames_[i]
-                   << ", this is not supported.";
-      }
-
-      LOG(FATAL) << "Header is different between log file chunks "
-                 << filenames_[0] << " and " << filenames_[i]
-                 << ", this is not supported.";
-    }
-  }
-  // Put the parts index back to the first log file chunk.
-  log_file_header_.mutable_message()->mutate_parts_index(
-      message_reader_->log_file_header()->parts_index());
-
-  // Setup per channel state.
-  channels_.resize(configuration()->channels()->size());
-  for (ChannelData &channel_data : channels_) {
-    channel_data.data.split_reader = this;
-    // Build up the timestamp list.
-    if (configuration::MultiNode(configuration())) {
-      channel_data.timestamps.resize(configuration()->nodes()->size());
-      for (MessageHeaderQueue &queue : channel_data.timestamps) {
-        queue.timestamps = true;
-        queue.split_reader = this;
-      }
-    }
-  }
-
-  // Build up channels_to_write_ as an optimization to make it fast to figure
-  // out which datastructure to place any new data from a channel on.
-  for (const Channel *channel : *configuration()->channels()) {
-    // This is the main case.  We will only see data on this node.
-    if (configuration::ChannelIsSendableOnNode(channel, node())) {
-      channels_to_write_.emplace_back(
-          &channels_[channels_to_write_.size()].data);
-    } else
-        // If we can't send, but can receive, we should be able to see
-        // timestamps here.
-        if (configuration::ChannelIsReadableOnNode(channel, node())) {
-      channels_to_write_.emplace_back(
-          &(channels_[channels_to_write_.size()]
-                .timestamps[configuration::GetNodeIndex(configuration(),
-                                                        node())]));
-    } else {
-      channels_to_write_.emplace_back(nullptr);
-    }
-  }
-}
-
-bool SplitMessageReader::NextLogFile() {
-  if (next_filename_index_ == filenames_.size()) {
-    return false;
-  }
-  message_reader_ =
-      std::make_unique<MessageReader>(filenames_[next_filename_index_]);
-
-  // We can't support the config diverging between two log file headers.  See if
-  // they are the same.
-  if (next_filename_index_ != 0) {
-    // In order for the headers to identically compare, they need to have the
-    // same parts_index.  Rewrite the saved header with the new parts_index,
-    // compare, and then restore.
-    const int32_t original_parts_index =
-        log_file_header_.message().parts_index();
-    log_file_header_.mutable_message()->mutate_parts_index(
-        message_reader_->log_file_header()->parts_index());
-
-    CHECK(CompareFlatBuffer(message_reader_->raw_log_file_header(),
-                            log_file_header_))
-        << ": Header is different between log file chunks "
-        << filenames_[next_filename_index_] << " and "
-        << filenames_[next_filename_index_ - 1] << ", this is not supported.";
-
-    log_file_header_.mutable_message()->mutate_parts_index(
-        original_parts_index);
-  }
-
-  ++next_filename_index_;
-  return true;
-}
-
-bool SplitMessageReader::QueueMessages(
-    monotonic_clock::time_point last_dequeued_time) {
-  // TODO(austin): Once we are happy that everything works, read a 256kb chunk
-  // to reduce the need to re-heap down below.
-
-  // Special case no more data.  Otherwise we blow up on the CHECK statement
-  // confirming that we have enough data queued.
-  if (at_end_) {
-    return false;
-  }
-
-  // If this isn't the first time around, confirm that we had enough data queued
-  // to follow the contract.
-  if (time_to_queue_ != monotonic_clock::min_time) {
-    CHECK_LE(last_dequeued_time,
-             newest_timestamp() - max_out_of_order_duration())
-        << " node " << FlatbufferToJson(node()) << " on " << this;
-
-    // Bail if there is enough data already queued.
-    if (last_dequeued_time < time_to_queue_) {
-      VLOG(1) << MaybeNodeName(target_node_) << "All up to date on " << this
-              << ", dequeued " << last_dequeued_time << " queue time "
-              << time_to_queue_;
-      return true;
-    }
-  } else {
-    // Startup takes a special dance.  We want to queue up until the start time,
-    // but we then want to find the next message to read.  The conservative
-    // answer is to immediately trigger a second requeue to get things moving.
-    time_to_queue_ = monotonic_start_time();
-    CHECK_NE(time_to_queue_, monotonic_clock::min_time);
-    QueueMessages(time_to_queue_);
-  }
-
-  // If we are asked to queue, queue for at least max_out_of_order_duration past
-  // the last known time in the log file (ie the newest timestep read).  As long
-  // as we requeue exactly when time_to_queue_ is dequeued and go no further, we
-  // are safe.  And since we pop in order, that works.
-  //
-  // Special case the start of the log file.  There should be at most 1 message
-  // from each channel at the start of the log file.  So always force the start
-  // of the log file to just be read.
-  time_to_queue_ = std::max(time_to_queue_, newest_timestamp());
-  VLOG(1) << MaybeNodeName(target_node_) << "Queueing, going until "
-          << time_to_queue_ << " " << filename();
-
-  bool was_emplaced = false;
-  while (true) {
-    // Stop if we have enough.
-    if (newest_timestamp() > time_to_queue_ + max_out_of_order_duration() &&
-        was_emplaced) {
-      VLOG(1) << MaybeNodeName(target_node_) << "Done queueing on " << this
-              << ", queued to " << newest_timestamp() << " with requeue time "
-              << time_to_queue_;
-      return true;
-    }
-
-    if (std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
-            message_reader_->ReadMessage()) {
-      const MessageHeader &header = msg.value().message();
-
-      const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
-          chrono::nanoseconds(header.monotonic_sent_time()));
-
-      if (VLOG_IS_ON(2)) {
-        LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
-                  << filename() << " ttq: " << time_to_queue_ << " now "
-                  << newest_timestamp() << " start time "
-                  << monotonic_start_time() << " " << FlatbufferToJson(&header);
-      } else if (VLOG_IS_ON(1)) {
-        SizePrefixedFlatbufferVector<MessageHeader> copy = msg.value();
-        copy.mutable_message()->clear_data();
-        LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
-                  << filename() << " ttq: " << time_to_queue_ << " now "
-                  << newest_timestamp() << " start time "
-                  << monotonic_start_time() << " " << FlatbufferToJson(copy);
-      }
-
-      const int channel_index = header.channel_index();
-      was_emplaced = channels_to_write_[channel_index]->emplace_back(
-          std::move(msg.value()));
-      if (was_emplaced) {
-        newest_timestamp_ = std::max(newest_timestamp_, timestamp);
-      }
-    } else {
-      if (!NextLogFile()) {
-        VLOG(1) << MaybeNodeName(target_node_) << "No more files, last was "
-                << filenames_.back();
-        at_end_ = true;
-        for (MessageHeaderQueue *queue : channels_to_write_) {
-          if (queue == nullptr || queue->timestamp_merger == nullptr) {
-            continue;
-          }
-          queue->timestamp_merger->NoticeAtEnd();
-        }
-        return false;
-      }
-    }
-  }
-}
-
-void SplitMessageReader::SetTimestampMerger(TimestampMerger *timestamp_merger,
-                                            int channel_index,
-                                            const Node *target_node) {
-  const Node *reinterpreted_target_node =
-      configuration::GetNodeOrDie(configuration(), target_node);
-  target_node_ = reinterpreted_target_node;
-
-  const Channel *const channel =
-      configuration()->channels()->Get(channel_index);
-
-  VLOG(1) << "  Configuring merger " << this << " for channel " << channel_index
-          << " "
-          << configuration::CleanedChannelToString(
-                 configuration()->channels()->Get(channel_index));
-
-  MessageHeaderQueue *message_header_queue = nullptr;
-
-  // Figure out if this log file is from our point of view, or the other node's
-  // point of view.
-  if (node() == reinterpreted_target_node) {
-    VLOG(1) << "    Replaying as logged node " << filename();
-
-    if (configuration::ChannelIsSendableOnNode(channel, node())) {
-      VLOG(1) << "      Data on node";
-      message_header_queue = &(channels_[channel_index].data);
-    } else if (configuration::ChannelIsReadableOnNode(channel, node())) {
-      VLOG(1) << "      Timestamps on node";
-      message_header_queue =
-          &(channels_[channel_index].timestamps[configuration::GetNodeIndex(
-              configuration(), node())]);
-    } else {
-      VLOG(1) << "     Dropping";
-    }
-  } else {
-    VLOG(1) << "    Replaying as other node " << filename();
-    // We are replaying from another node's point of view.  The only interesting
-    // data is data that is sent from our node and received on theirs.
-    if (configuration::ChannelIsReadableOnNode(channel,
-                                               reinterpreted_target_node) &&
-        configuration::ChannelIsSendableOnNode(channel, node())) {
-      VLOG(1) << "      Readable on target node";
-      // Data from another node.
-      message_header_queue = &(channels_[channel_index].data);
-    } else {
-      VLOG(1) << "      Dropping";
-      // This is either not sendable on the other node, or is a timestamp and
-      // therefore not interesting.
-    }
-  }
-
-  // If we found one, write it down.  This will be nullptr when there is nothing
-  // relevant on this channel on this node for the target node.  In that case,
-  // we want to drop the message instead of queueing it.
-  if (message_header_queue != nullptr) {
-    message_header_queue->timestamp_merger = timestamp_merger;
-  }
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
-           SizePrefixedFlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldest(int channel_index) {
-  CHECK_GT(channels_[channel_index].data.size(), 0u);
-  const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-      timestamp = channels_[channel_index].data.front_timestamp();
-  SizePrefixedFlatbufferVector<MessageHeader> front =
-      std::move(channels_[channel_index].data.front());
-  channels_[channel_index].data.PopFront();
-
-  VLOG(1) << MaybeNodeName(target_node_) << "Popped Data " << this << " "
-          << std::get<0>(timestamp) << " for "
-          << configuration::StrippedChannelToString(
-                 configuration()->channels()->Get(channel_index))
-          << " (" << channel_index << ")";
-
-  QueueMessages(std::get<0>(timestamp));
-
-  return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
-                         std::move(front));
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
-           SizePrefixedFlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
-  CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
-  const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-      timestamp = channels_[channel].timestamps[node_index].front_timestamp();
-  SizePrefixedFlatbufferVector<MessageHeader> front =
-      std::move(channels_[channel].timestamps[node_index].front());
-  channels_[channel].timestamps[node_index].PopFront();
-
-  VLOG(1) << MaybeNodeName(target_node_) << "Popped timestamp " << this << " "
-          << std::get<0>(timestamp) << " for "
-          << configuration::StrippedChannelToString(
-                 configuration()->channels()->Get(channel))
-          << " on "
-          << configuration()->nodes()->Get(node_index)->name()->string_view()
-          << " (" << node_index << ")";
-
-  QueueMessages(std::get<0>(timestamp));
-
-  return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
-                         std::move(front));
-}
-
-bool SplitMessageReader::MessageHeaderQueue::emplace_back(
-    SizePrefixedFlatbufferVector<MessageHeader> &&msg) {
-  CHECK(split_reader != nullptr);
-
-  // If there is no timestamp merger for this queue, nobody is listening.  Drop
-  // the message.  This happens when a log file from another node is replayed,
-  // and the timestamp mergers down stream just don't care.
-  if (timestamp_merger == nullptr) {
-    return false;
-  }
-
-  CHECK(timestamps != msg.message().has_data())
-      << ": Got timestamps and data mixed up on a node. "
-      << FlatbufferToJson(msg);
-
-  data_.emplace_back(std::move(msg));
-
-  if (data_.size() == 1u) {
-    // Yup, new data.  Notify.
-    if (timestamps) {
-      timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
-    } else {
-      timestamp_merger->Update(split_reader, front_timestamp());
-    }
-  }
-
-  return true;
-}
-
-void SplitMessageReader::MessageHeaderQueue::PopFront() {
-  data_.pop_front();
-  if (data_.size() != 0u) {
-    // Yup, new data.
-    if (timestamps) {
-      timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
-    } else {
-      timestamp_merger->Update(split_reader, front_timestamp());
-    }
-  } else {
-    // Poke anyways to update the heap.
-    if (timestamps) {
-      timestamp_merger->UpdateTimestamp(
-          nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
-    } else {
-      timestamp_merger->Update(
-          nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
-    }
-  }
-}
-
-namespace {
-
-bool SplitMessageReaderHeapCompare(
-    const std::tuple<monotonic_clock::time_point, uint32_t,
-                     SplitMessageReader *>
-        first,
-    const std::tuple<monotonic_clock::time_point, uint32_t,
-                     SplitMessageReader *>
-        second) {
-  if (std::get<0>(first) > std::get<0>(second)) {
+bool Message::operator<(const Message &m2) const {
+  if (this->timestamp < m2.timestamp) {
     return true;
-  } else if (std::get<0>(first) == std::get<0>(second)) {
-    if (std::get<1>(first) > std::get<1>(second)) {
-      return true;
-    } else if (std::get<1>(first) == std::get<1>(second)) {
-      return std::get<2>(first) > std::get<2>(second);
-    } else {
-      return false;
-    }
-  } else {
+  } else if (this->timestamp > m2.timestamp) {
     return false;
   }
-}
 
-bool ChannelHeapCompare(
-    const std::pair<monotonic_clock::time_point, int> first,
-    const std::pair<monotonic_clock::time_point, int> second) {
-  if (first.first > second.first) {
+  if (this->channel_index < m2.channel_index) {
     return true;
-  } else if (first.first == second.first) {
-    return first.second > second.second;
-  } else {
+  } else if (this->channel_index > m2.channel_index) {
     return false;
   }
+
+  return this->queue_index < m2.queue_index;
 }
 
-}  // namespace
-
-TimestampMerger::TimestampMerger(
-    const Configuration *configuration,
-    std::vector<SplitMessageReader *> split_message_readers, int channel_index,
-    const Node *target_node, ChannelMerger *channel_merger)
-    : configuration_(configuration),
-      split_message_readers_(std::move(split_message_readers)),
-      channel_index_(channel_index),
-      node_index_(configuration::MultiNode(configuration)
-                      ? configuration::GetNodeIndex(configuration, target_node)
-                      : -1),
-      channel_merger_(channel_merger) {
-  // Tell the readers we care so they know who to notify.
-  VLOG(1) << "Configuring channel " << channel_index << " target node "
-          << FlatbufferToJson(target_node);
-  for (SplitMessageReader *reader : split_message_readers_) {
-    reader->SetTimestampMerger(this, channel_index, target_node);
-  }
-
-  // And then determine if we need to track timestamps.
-  const Channel *channel = configuration->channels()->Get(channel_index);
-  if (!configuration::ChannelIsSendableOnNode(channel, target_node) &&
-      configuration::ChannelIsReadableOnNode(channel, target_node)) {
-    has_timestamps_ = true;
-  }
+bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
+bool Message::operator==(const Message &m2) const {
+  return timestamp == m2.timestamp && channel_index == m2.channel_index &&
+         queue_index == m2.queue_index;
 }
 
-void TimestampMerger::PushMessageHeap(
-    std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-        timestamp,
-    SplitMessageReader *split_message_reader) {
-  if (split_message_reader != nullptr) {
-    DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
-                        [split_message_reader](
-                            const std::tuple<monotonic_clock::time_point,
-                                             uint32_t, SplitMessageReader *>
-                                x) {
-                          return std::get<2>(x) == split_message_reader;
-                        }) == message_heap_.end())
-        << ": Pushing message when it is already in the heap.";
-
-    message_heap_.push_back(std::make_tuple(
-        std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
-
-    std::push_heap(message_heap_.begin(), message_heap_.end(),
-                   &SplitMessageReaderHeapCompare);
+std::ostream &operator<<(std::ostream &os, const Message &m) {
+  os << "{.channel_index=" << m.channel_index
+     << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
+  if (m.data.Verify()) {
+    os << ", .data="
+       << aos::FlatbufferToJson(m.data,
+                                {.multi_line = false, .max_vector_size = 1});
   }
-
-  // If we are just a data merger, don't wait for timestamps.
-  if (!has_timestamps_) {
-    if (!message_heap_.empty()) {
-      channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
-      pushed_ = true;
-    } else {
-      // Remove ourselves if we are empty.
-      channel_merger_->Update(monotonic_clock::min_time, channel_index_);
-    }
-  }
+  os << "}";
+  return os;
 }
 
-std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-TimestampMerger::oldest_message() const {
-  CHECK_GT(message_heap_.size(), 0u);
-  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
-      oldest_message_reader = message_heap_.front();
-  return std::get<2>(oldest_message_reader)->oldest_message(channel_index_);
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-TimestampMerger::oldest_timestamp() const {
-  CHECK_GT(timestamp_heap_.size(), 0u);
-  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
-      oldest_message_reader = timestamp_heap_.front();
-  return std::get<2>(oldest_message_reader)
-      ->oldest_message(channel_index_, node_index_);
-}
-
-void TimestampMerger::PushTimestampHeap(
-    std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-        timestamp,
-    SplitMessageReader *split_message_reader) {
-  if (split_message_reader != nullptr) {
-    DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
-                        [split_message_reader](
-                            const std::tuple<monotonic_clock::time_point,
-                                             uint32_t, SplitMessageReader *>
-                                x) {
-                          return std::get<2>(x) == split_message_reader;
-                        }) == timestamp_heap_.end())
-        << ": Pushing timestamp when it is already in the heap.";
-
-    timestamp_heap_.push_back(std::make_tuple(
-        std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
-
-    std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
-                   SplitMessageReaderHeapCompare);
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
+  os << "{.channel_index=" << m.channel_index
+     << ", .queue_index=" << m.queue_index
+     << ", .monotonic_event_time=" << m.monotonic_event_time
+     << ", .realtime_event_time=" << m.realtime_event_time;
+  if (m.remote_queue_index != 0xffffffff) {
+    os << ", .remote_queue_index=" << m.remote_queue_index;
   }
-
-  // If we are a timestamp merger, don't wait for data.  Missing data will be
-  // caught at read time.
-  if (has_timestamps_) {
-    if (!timestamp_heap_.empty()) {
-      channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
-      pushed_ = true;
-    } else {
-      // Remove ourselves if we are empty.
-      channel_merger_->Update(monotonic_clock::min_time, channel_index_);
-    }
+  if (m.monotonic_remote_time != monotonic_clock::min_time) {
+    os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
   }
+  if (m.realtime_remote_time != realtime_clock::min_time) {
+    os << ", .realtime_remote_time=" << m.realtime_remote_time;
+  }
+  if (m.data.Verify()) {
+    os << ", .data="
+       << aos::FlatbufferToJson(m.data,
+                                {.multi_line = false, .max_vector_size = 1});
+  }
+  os << "}";
+  return os;
 }
 
-std::tuple<monotonic_clock::time_point, uint32_t,
-           SizePrefixedFlatbufferVector<MessageHeader>>
-TimestampMerger::PopMessageHeap() {
-  // Pop the oldest message reader pointer off the heap.
-  CHECK_GT(message_heap_.size(), 0u);
-  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
-      oldest_message_reader = message_heap_.front();
+LogPartsSorter::LogPartsSorter(LogParts log_parts)
+    : parts_message_reader_(log_parts) {}
 
-  std::pop_heap(message_heap_.begin(), message_heap_.end(),
-                &SplitMessageReaderHeapCompare);
-  message_heap_.pop_back();
-
-  // Pop the oldest message.  This re-pushes any messages from the reader to the
-  // message heap.
-  std::tuple<monotonic_clock::time_point, uint32_t,
-             SizePrefixedFlatbufferVector<MessageHeader>>
-      oldest_message =
-          std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
-
-  // Confirm that the time and queue_index we have recorded matches.
-  CHECK_EQ(std::get<0>(oldest_message), std::get<0>(oldest_message_reader));
-  CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
-
-  // Now, keep reading until we have found all duplicates.
-  while (!message_heap_.empty()) {
-    // See if it is a duplicate.
-    std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
-        next_oldest_message_reader = message_heap_.front();
-
-    std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-        next_oldest_message_time = std::get<2>(next_oldest_message_reader)
-                                       ->oldest_message(channel_index_);
-
-    if (std::get<0>(next_oldest_message_time) == std::get<0>(oldest_message) &&
-        std::get<1>(next_oldest_message_time) == std::get<1>(oldest_message)) {
-      // Pop the message reader pointer.
-      std::pop_heap(message_heap_.begin(), message_heap_.end(),
-                    &SplitMessageReaderHeapCompare);
-      message_heap_.pop_back();
-
-      // Pop the next oldest message.  This re-pushes any messages from the
-      // reader.
-      std::tuple<monotonic_clock::time_point, uint32_t,
-                 SizePrefixedFlatbufferVector<MessageHeader>>
-          next_oldest_message = std::get<2>(next_oldest_message_reader)
-                                    ->PopOldest(channel_index_);
-
-      // And make sure the message matches in it's entirety.
-      CHECK(std::get<2>(oldest_message).span() ==
-            std::get<2>(next_oldest_message).span())
-          << ": Data at the same timestamp doesn't match.";
-    } else {
-      break;
-    }
-  }
-
-  return oldest_message;
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
-           SizePrefixedFlatbufferVector<MessageHeader>>
-TimestampMerger::PopTimestampHeap() {
-  // Pop the oldest message reader pointer off the heap.
-  CHECK_GT(timestamp_heap_.size(), 0u);
-
-  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
-      oldest_timestamp_reader = timestamp_heap_.front();
-
-  std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
-                &SplitMessageReaderHeapCompare);
-  timestamp_heap_.pop_back();
-
-  CHECK(node_index_ != -1) << ": Timestamps in a single node environment";
-
-  // Pop the oldest message.  This re-pushes any timestamps from the reader to
-  // the timestamp heap.
-  std::tuple<monotonic_clock::time_point, uint32_t,
-             SizePrefixedFlatbufferVector<MessageHeader>>
-      oldest_timestamp = std::get<2>(oldest_timestamp_reader)
-                             ->PopOldestTimestamp(channel_index_, node_index_);
-
-  // Confirm that the time we have recorded matches.
-  CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
-  CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
-
-  // Now, keep reading until we have found all duplicates.
-  while (!timestamp_heap_.empty()) {
-    // See if it is a duplicate.
-    std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
-        next_oldest_timestamp_reader = timestamp_heap_.front();
-
-    std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-        next_oldest_timestamp_time =
-            std::get<2>(next_oldest_timestamp_reader)
-                ->oldest_message(channel_index_, node_index_);
-
-    if (std::get<0>(next_oldest_timestamp_time) ==
-            std::get<0>(oldest_timestamp) &&
-        std::get<1>(next_oldest_timestamp_time) ==
-            std::get<1>(oldest_timestamp)) {
-      // Pop the timestamp reader pointer.
-      std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
-                    &SplitMessageReaderHeapCompare);
-      timestamp_heap_.pop_back();
-
-      // Pop the next oldest timestamp.  This re-pushes any messages from the
-      // reader.
-      std::tuple<monotonic_clock::time_point, uint32_t,
-                 SizePrefixedFlatbufferVector<MessageHeader>>
-          next_oldest_timestamp =
-              std::get<2>(next_oldest_timestamp_reader)
-                  ->PopOldestTimestamp(channel_index_, node_index_);
-
-      // And make sure the contents matches in it's entirety.
-      CHECK(std::get<2>(oldest_timestamp).span() ==
-            std::get<2>(next_oldest_timestamp).span())
-          << ": Data at the same timestamp doesn't match, "
-          << aos::FlatbufferToJson(std::get<2>(oldest_timestamp)) << " vs "
-          << aos::FlatbufferToJson(std::get<2>(next_oldest_timestamp)) << " "
-          << absl::BytesToHexString(std::string_view(
-                 reinterpret_cast<const char *>(
-                     std::get<2>(oldest_timestamp).span().data()),
-                 std::get<2>(oldest_timestamp).span().size()))
-          << " vs "
-          << absl::BytesToHexString(std::string_view(
-                 reinterpret_cast<const char *>(
-                     std::get<2>(next_oldest_timestamp).span().data()),
-                 std::get<2>(next_oldest_timestamp).span().size()));
-
-    } else {
-      break;
-    }
-  }
-
-  return oldest_timestamp;
-}
-
-std::tuple<TimestampMerger::DeliveryTimestamp,
-           SizePrefixedFlatbufferVector<MessageHeader>>
-TimestampMerger::PopOldest() {
-  if (has_timestamps_) {
-    VLOG(1) << "Looking for matching timestamp for "
-            << configuration::StrippedChannelToString(
-                   configuration_->channels()->Get(channel_index_))
-            << " (" << channel_index_ << ") "
-            << " at " << std::get<0>(oldest_timestamp());
-
-    // Read the timestamps.
-    std::tuple<monotonic_clock::time_point, uint32_t,
-               SizePrefixedFlatbufferVector<MessageHeader>>
-        oldest_timestamp = PopTimestampHeap();
-
-    TimestampMerger::DeliveryTimestamp timestamp;
-    timestamp.monotonic_event_time =
-        monotonic_clock::time_point(chrono::nanoseconds(
-            std::get<2>(oldest_timestamp).message().monotonic_sent_time()));
-    timestamp.realtime_event_time =
-        realtime_clock::time_point(chrono::nanoseconds(
-            std::get<2>(oldest_timestamp).message().realtime_sent_time()));
-    timestamp.queue_index =
-        std::get<2>(oldest_timestamp).message().queue_index();
-
-    // Consistency check.
-    CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
-    CHECK_EQ(std::get<2>(oldest_timestamp).message().queue_index(),
-             std::get<1>(oldest_timestamp));
-
-    monotonic_clock::time_point remote_timestamp_monotonic_time(
-        chrono::nanoseconds(
-            std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
-
-    // See if we have any data.  If not, pass the problem up the chain.
-    if (message_heap_.empty()) {
-      LOG(WARNING) << MaybeNodeName(configuration_->nodes()->Get(node_index_))
-                   << "No data to match timestamp on "
-                   << configuration::CleanedChannelToString(
-                          configuration_->channels()->Get(channel_index_))
-                   << " (" << channel_index_ << ")";
-      return std::make_tuple(timestamp,
-                             std::move(std::get<2>(oldest_timestamp)));
-    }
-
+Message *LogPartsSorter::Front() {
+  // Queue up data until enough data has been queued that the front message is
+  // sorted enough to be safe to pop.  This may do nothing, so we should make
+  // sure the nothing path is checked quickly.
+  if (sorted_until() != monotonic_clock::max_time) {
     while (true) {
-      {
-        // Ok, now try grabbing data until we find one which matches.
-        std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-            oldest_message_ref = oldest_message();
-
-        // Time at which the message was sent (this message is written from the
-        // sending node's perspective.
-        monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
-            std::get<2>(oldest_message_ref)->monotonic_sent_time()));
-
-        if (remote_monotonic_time < remote_timestamp_monotonic_time) {
-          LOG(WARNING) << configuration_->nodes()
-                              ->Get(node_index_)
-                              ->name()
-                              ->string_view()
-                       << " Undelivered message, skipping.  Remote time is "
-                       << remote_monotonic_time << " timestamp is "
-                       << remote_timestamp_monotonic_time << " on channel "
-                       << configuration::StrippedChannelToString(
-                              configuration_->channels()->Get(channel_index_))
-                       << " (" << channel_index_ << ")";
-          PopMessageHeap();
-          continue;
-        } else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
-          LOG(WARNING) << configuration_->nodes()
-                              ->Get(node_index_)
-                              ->name()
-                              ->string_view()
-                       << " Data not found.  Remote time should be "
-                       << remote_timestamp_monotonic_time
-                       << ", message time is " << remote_monotonic_time
-                       << " on channel "
-                       << configuration::StrippedChannelToString(
-                              configuration_->channels()->Get(channel_index_))
-                       << " (" << channel_index_ << ")"
-                       << (VLOG_IS_ON(1) ? DebugString() : "");
-          return std::make_tuple(timestamp,
-                                 std::move(std::get<2>(oldest_timestamp)));
-        }
-
-        timestamp.monotonic_remote_time = remote_monotonic_time;
+      if (!messages_.empty() && messages_.begin()->timestamp < sorted_until() &&
+          sorted_until() >= monotonic_start_time()) {
+        break;
       }
 
-      VLOG(1) << "Found matching data "
-              << configuration::StrippedChannelToString(
-                     configuration_->channels()->Get(channel_index_))
-              << " (" << channel_index_ << ")";
-      std::tuple<monotonic_clock::time_point, uint32_t,
-                 SizePrefixedFlatbufferVector<MessageHeader>>
-          oldest_message = PopMessageHeap();
+      std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+          parts_message_reader_.ReadMessage();
+      // No data left, sorted forever, work through what is left.
+      if (!m) {
+        sorted_until_ = monotonic_clock::max_time;
+        break;
+      }
 
-      timestamp.realtime_remote_time =
-          realtime_clock::time_point(chrono::nanoseconds(
-              std::get<2>(oldest_message).message().realtime_sent_time()));
-      timestamp.remote_queue_index =
-          std::get<2>(oldest_message).message().queue_index();
+      messages_.insert(
+          {.channel_index = m.value().message().channel_index(),
+           .queue_index = m.value().message().queue_index(),
+           .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
+               m.value().message().monotonic_sent_time())),
+           .data = std::move(m.value())});
 
-      CHECK_EQ(timestamp.monotonic_remote_time,
-               remote_timestamp_monotonic_time);
-
-      CHECK_EQ(timestamp.remote_queue_index,
-               std::get<2>(oldest_timestamp).message().remote_queue_index())
-          << ": " << FlatbufferToJson(&std::get<2>(oldest_timestamp).message())
-          << " data "
-          << FlatbufferToJson(&std::get<2>(oldest_message).message());
-
-      return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
+      // Now, update sorted_until_ to match the new message.
+      if (parts_message_reader_.newest_timestamp() >
+          monotonic_clock::min_time +
+              parts_message_reader_.max_out_of_order_duration()) {
+        sorted_until_ = parts_message_reader_.newest_timestamp() -
+                        parts_message_reader_.max_out_of_order_duration();
+      } else {
+        sorted_until_ = monotonic_clock::min_time;
+      }
     }
-  } else {
-    std::tuple<monotonic_clock::time_point, uint32_t,
-               SizePrefixedFlatbufferVector<MessageHeader>>
-        oldest_message = PopMessageHeap();
+  }
 
-    TimestampMerger::DeliveryTimestamp timestamp;
-    timestamp.monotonic_event_time =
-        monotonic_clock::time_point(chrono::nanoseconds(
-            std::get<2>(oldest_message).message().monotonic_sent_time()));
-    timestamp.realtime_event_time =
-        realtime_clock::time_point(chrono::nanoseconds(
-            std::get<2>(oldest_message).message().realtime_sent_time()));
-    timestamp.queue_index = std::get<2>(oldest_message).message().queue_index();
-    timestamp.remote_queue_index = 0xffffffff;
+  // Now that we have enough data queued, return a pointer to the oldest piece
+  // of data if it exists.
+  if (messages_.empty()) {
+    last_message_time_ = monotonic_clock::max_time;
+    return nullptr;
+  }
 
-    CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
-    CHECK_EQ(std::get<1>(oldest_message),
-             std::get<2>(oldest_message).message().queue_index());
+  CHECK_GE(messages_.begin()->timestamp, last_message_time_);
+  last_message_time_ = messages_.begin()->timestamp;
+  return &(*messages_.begin());
+}
 
-    return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
+void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
+
+std::string LogPartsSorter::DebugString() const {
+  std::stringstream ss;
+  ss << "messages: [\n";
+  for (const Message &m : messages_) {
+    ss << m << "\n";
+  }
+  ss << "] <- " << parts_message_reader_.filename();
+  return ss.str();
+}
+
+NodeMerger::NodeMerger(std::vector<LogParts> parts) {
+  CHECK_GE(parts.size(), 1u);
+  const std::string part0_node = parts[0].node;
+  for (size_t i = 1; i < parts.size(); ++i) {
+    CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
+  }
+  for (LogParts &part : parts) {
+    parts_sorters_.emplace_back(std::move(part));
+  }
+
+  node_ = configuration::GetNodeIndex(log_file_header()->configuration(),
+                                      part0_node);
+
+  monotonic_start_time_ = monotonic_clock::max_time;
+  realtime_start_time_ = realtime_clock::max_time;
+  for (const LogPartsSorter &parts_sorter : parts_sorters_) {
+    if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
+      monotonic_start_time_ = parts_sorter.monotonic_start_time();
+      realtime_start_time_ = parts_sorter.realtime_start_time();
+    }
   }
 }
 
-void TimestampMerger::NoticeAtEnd() { channel_merger_->NoticeAtEnd(); }
-
-namespace {
-std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
-    const std::vector<std::vector<std::string>> &filenames) {
-  CHECK_GT(filenames.size(), 0u);
-  // Build up all the SplitMessageReaders.
-  std::vector<std::unique_ptr<SplitMessageReader>> result;
-  for (const std::vector<std::string> &filenames : filenames) {
-    result.emplace_back(std::make_unique<SplitMessageReader>(filenames));
+Message *NodeMerger::Front() {
+  // Return the current Front if we have one, otherwise go compute one.
+  if (current_ != nullptr) {
+    Message *result = current_->Front();
+    CHECK_GE(result->timestamp, last_message_time_);
+    return result;
   }
+
+  // Otherwise, do a simple search for the oldest message, deduplicating any
+  // duplicates.
+  Message *oldest = nullptr;
+  sorted_until_ = monotonic_clock::max_time;
+  for (LogPartsSorter &parts_sorter : parts_sorters_) {
+    Message *m = parts_sorter.Front();
+    if (!m) {
+      sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
+      continue;
+    }
+    if (oldest == nullptr || *m < *oldest) {
+      oldest = m;
+      current_ = &parts_sorter;
+    } else if (*m == *oldest) {
+      // Found a duplicate.  It doesn't matter which one we return.  It is
+      // easiest to just drop the new one.
+      parts_sorter.PopFront();
+    }
+
+    // PopFront may change this, so compute it down here.
+    sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
+  }
+
+  if (oldest) {
+    CHECK_GE(oldest->timestamp, last_message_time_);
+    last_message_time_ = oldest->timestamp;
+  } else {
+    last_message_time_ = monotonic_clock::max_time;
+  }
+
+  // Return the oldest message found.  This will be nullptr if nothing was
+  // found, indicating there is nothing left.
+  return oldest;
+}
+
+void NodeMerger::PopFront() {
+  CHECK(current_ != nullptr) << "Popping before calling Front()";
+  current_->PopFront();
+  current_ = nullptr;
+}
+
+TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
+    : node_merger_(std::move(parts)),
+      message_{.channel_index = 0xffffffff,
+               .queue_index = 0xffffffff,
+               .monotonic_event_time = monotonic_clock::min_time,
+               .realtime_event_time = realtime_clock::min_time,
+               .remote_queue_index = 0xffffffff,
+               .monotonic_remote_time = monotonic_clock::min_time,
+               .realtime_remote_time = realtime_clock::min_time,
+               .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()} {
+  const Configuration *config = log_file_header()->configuration();
+  // Only fill out nodes_data_ if there are nodes.  Otherwise everything gets
+  // pretty simple.
+  if (configuration::MultiNode(config)) {
+    nodes_data_.resize(config->nodes()->size());
+    const Node *my_node = config->nodes()->Get(node());
+    for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
+      const Node *node = config->nodes()->Get(node_index);
+      NodeData *node_data = &nodes_data_[node_index];
+      node_data->channels.resize(config->channels()->size());
+      // We should save the channel if it is delivered to the node represented
+      // by the NodeData, but not sent by that node.  That combo means it is
+      // forwarded.
+      size_t channel_index = 0;
+      node_data->any_delivered = false;
+      for (const Channel *channel : *config->channels()) {
+        node_data->channels[channel_index].delivered =
+            configuration::ChannelIsReadableOnNode(channel, node) &&
+            configuration::ChannelIsSendableOnNode(channel, my_node);
+        node_data->any_delivered = node_data->any_delivered ||
+                                   node_data->channels[channel_index].delivered;
+        ++channel_index;
+      }
+    }
+
+    for (const Channel *channel : *config->channels()) {
+      source_node_.emplace_back(configuration::GetNodeIndex(
+          config, channel->source_node()->string_view()));
+    }
+  }
+}
+
+void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
+  CHECK(configuration::MultiNode(log_file_header()->configuration()));
+  CHECK_NE(timestamp_mapper->node(), node());
+  CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
+
+  NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
+  // Only set it if this node delivers to the peer timestamp_mapper.  Otherwise
+  // we could needlessly save data.
+  if (node_data->any_delivered) {
+    LOG(INFO) << "Registering on node " << node() << " for peer node "
+              << timestamp_mapper->node();
+    CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
+
+    timestamp_mapper->nodes_data_[node()].peer = this;
+  }
+}
+
+void TimestampMapper::FillMessage(Message *m) {
+  message_ = {
+      .channel_index = m->channel_index,
+      .queue_index = m->queue_index,
+      .monotonic_event_time = m->timestamp,
+      .realtime_event_time = aos::realtime_clock::time_point(
+          std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+      .remote_queue_index = 0xffffffff,
+      .monotonic_remote_time = monotonic_clock::min_time,
+      .realtime_remote_time = realtime_clock::min_time,
+      .data = std::move(m->data)};
+}
+
+TimestampedMessage *TimestampMapper::Front() {
+  // No need to fetch anything new.  A previous message still exists.
+  switch (first_message_) {
+    case FirstMessage::kNeedsUpdate:
+      break;
+    case FirstMessage::kInMessage:
+      return &message_;
+    case FirstMessage::kNullptr:
+      return nullptr;
+  }
+
+  if (nodes_data_.empty()) {
+    // Simple path.  We are single node, so there are no timestamps to match!
+    CHECK_EQ(messages_.size(), 0u);
+    Message *m = node_merger_.Front();
+    if (!m) {
+      first_message_ = FirstMessage::kNullptr;
+      return nullptr;
+    }
+    // Fill in message_ so we have a place to associate remote timestamps, and
+    // return it.
+    FillMessage(m);
+
+    CHECK_GE(message_.monotonic_event_time, last_message_time_);
+    last_message_time_ = message_.monotonic_event_time;
+    first_message_ = FirstMessage::kInMessage;
+    return &message_;
+  }
+
+  // We need to only add messages to the list so they get processed for messages
+  // which are delivered.  Reuse the flow below which uses messages_ by just
+  // adding the new message to messages_ and continuing.
+  if (messages_.empty()) {
+    if (!Queue()) {
+      // Found nothing to add, we are out of data!
+      first_message_ = FirstMessage::kNullptr;
+      return nullptr;
+    }
+
+    // Now that it has been added (and cannibalized), forget about it upstream.
+    node_merger_.PopFront();
+  }
+
+  Message *m = &(messages_.front());
+
+  if (source_node_[m->channel_index] == node()) {
+    // From us, just forward it on, filling the remote data in as invalid.
+    FillMessage(m);
+    CHECK_GE(message_.monotonic_event_time, last_message_time_);
+    last_message_time_ = message_.monotonic_event_time;
+    first_message_ = FirstMessage::kInMessage;
+    return &message_;
+  } else {
+    // Got a timestamp, find the matching remote data, match it, and return it.
+    Message data = MatchingMessageFor(*m);
+
+    // Return the data from the remote.  The local message only has timestamp
+    // info which isn't relevant anymore once extracted.
+    message_ = {
+        .channel_index = m->channel_index,
+        .queue_index = m->queue_index,
+        .monotonic_event_time = m->timestamp,
+        .realtime_event_time = aos::realtime_clock::time_point(
+            std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+        .remote_queue_index = m->data.message().remote_queue_index(),
+        .monotonic_remote_time =
+            monotonic_clock::time_point(std::chrono::nanoseconds(
+                m->data.message().monotonic_remote_time())),
+        .realtime_remote_time = realtime_clock::time_point(
+            std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
+        .data = std::move(data.data)};
+    CHECK_GE(message_.monotonic_event_time, last_message_time_);
+    last_message_time_ = message_.monotonic_event_time;
+    first_message_ = FirstMessage::kInMessage;
+    return &message_;
+  }
+}
+
+void TimestampMapper::PopFront() {
+  CHECK(first_message_ != FirstMessage::kNeedsUpdate);
+  first_message_ = FirstMessage::kNeedsUpdate;
+
+  if (nodes_data_.empty()) {
+    // We are thin wrapper around node_merger.  Call it directly.
+    node_merger_.PopFront();
+  } else {
+    // Since messages_ holds the data, drop it.
+    messages_.pop_front();
+  }
+}
+
+Message TimestampMapper::MatchingMessageFor(const Message &message) {
+  TimestampMapper *peer =
+      CHECK_NOTNULL(nodes_data_[source_node_[message.channel_index]].peer);
+  // The queue which will have the matching data, if available.
+  std::deque<Message> *data_queue =
+      &peer->nodes_data_[node()].channels[message.channel_index].messages;
+
+  // Figure out what queue index we are looking for.
+  CHECK(message.data.message().has_remote_queue_index());
+  const uint32_t remote_queue_index =
+      message.data.message().remote_queue_index();
+
+  CHECK(message.data.message().has_monotonic_remote_time());
+  CHECK(message.data.message().has_realtime_remote_time());
+
+  const monotonic_clock::time_point monotonic_remote_time(
+      std::chrono::nanoseconds(message.data.message().monotonic_remote_time()));
+  const realtime_clock::time_point realtime_remote_time(
+      std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
+
+  peer->QueueUntil(monotonic_remote_time);
+
+  if (data_queue->empty()) {
+    return Message{
+        .channel_index = message.channel_index,
+        .queue_index = remote_queue_index,
+        .timestamp = monotonic_remote_time,
+        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+  }
+
+  // The algorithm below is constant time with some assumptions.  We need there
+  // to be no missing messages in the data stream.  This also assumes a queue
+  // hasn't wrapped.  That is conservative, but should let us get started.
+  //
+  // TODO(austin): We can break these assumptions pretty easily once we have a
+  // need.
+  CHECK_EQ(
+      data_queue->back().queue_index - data_queue->front().queue_index + 1u,
+      data_queue->size());
+
+  if (remote_queue_index < data_queue->front().queue_index ||
+      remote_queue_index > data_queue->back().queue_index) {
+    return Message{
+        .channel_index = message.channel_index,
+        .queue_index = remote_queue_index,
+        .timestamp = monotonic_remote_time,
+        .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+  }
+
+  // Pull the data out and confirm that the timestamps match as expected.
+  Message result = std::move(
+      (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
+  CHECK_EQ(result.timestamp, monotonic_remote_time)
+      << ": Queue index matches, but timestamp doesn't.  Please investigate!";
+  CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
+               result.data.message().realtime_sent_time())),
+           realtime_remote_time)
+      << ": Queue index matches, but timestamp doesn't.  Please investigate!";
+  // Now drop the data off the front.  We have deduplicated timestamps, so we
+  // are done.  And all the data is in order.
+  data_queue->erase(data_queue->begin(),
+                    data_queue->begin() + (1 + remote_queue_index -
+                                           data_queue->front().queue_index));
   return result;
 }
-}  // namespace
 
-ChannelMerger::ChannelMerger(
-    const std::vector<std::vector<std::string>> &filenames)
-    : split_message_readers_(MakeSplitMessageReaders(filenames)),
-      log_file_header_(split_message_readers_[0]->raw_log_file_header()) {
-  // Now, confirm that the configuration matches for each and pick a start time.
-  // Also return the list of possible nodes.
-  for (const std::unique_ptr<SplitMessageReader> &reader :
-       split_message_readers_) {
-    CHECK(CompareFlatBuffer(log_file_header_.message().configuration(),
-                            reader->log_file_header()->configuration()))
-        << ": Replaying log files with different configurations isn't "
-           "supported";
+void TimestampMapper::QueueUntil(monotonic_clock::time_point t) {
+  if (queued_until_ > t) {
+    return;
   }
+  while (true) {
+    if (!messages_.empty() && messages_.back().timestamp > t) {
+      queued_until_ = std::max(queued_until_, messages_.back().timestamp);
+      return;
+    }
 
-  nodes_ = configuration::GetNodes(configuration());
+    if (!Queue()) {
+      // Found nothing to add, we are out of data!
+      queued_until_ = monotonic_clock::max_time;
+      return;
+    }
+
+    // Now that it has been added (and cannibalized), forget about it upstream.
+    node_merger_.PopFront();
+  }
 }
 
-bool ChannelMerger::SetNode(const Node *target_node) {
-  std::vector<SplitMessageReader *> split_message_readers;
-  for (const std::unique_ptr<SplitMessageReader> &reader :
-       split_message_readers_) {
-    split_message_readers.emplace_back(reader.get());
+bool TimestampMapper::Queue() {
+  Message *m = node_merger_.Front();
+  if (m == nullptr) {
+    return false;
   }
-
-  // Go find a log_file_header for this node.
-  {
-    bool found_node = false;
-
-    for (const std::unique_ptr<SplitMessageReader> &reader :
-         split_message_readers_) {
-      // In order to identify which logfile(s) map to the target node, do a
-      // logical comparison of the nodes, by confirming that we are either in a
-      // single-node setup (where the nodes will both be nullptr) or that the
-      // node names match (but the other node fields--e.g., hostname lists--may
-      // not).
-      const bool both_null =
-          reader->node() == nullptr && target_node == nullptr;
-      const bool both_have_name =
-          (reader->node() != nullptr) && (target_node != nullptr) &&
-          (reader->node()->has_name() && target_node->has_name());
-      const bool node_names_identical =
-          both_have_name && (reader->node()->name()->string_view() ==
-                             target_node->name()->string_view());
-      if (both_null || node_names_identical) {
-        if (!found_node) {
-          found_node = true;
-          log_file_header_ = reader->raw_log_file_header();
-          VLOG(1) << "Found log file " << reader->filename() << " with node "
-                  << FlatbufferToJson(reader->node()) << " start_time "
-                  << monotonic_start_time();
-        } else {
-          // Find the earliest start time.  That way, if we get a full log file
-          // directly from the node, and a partial later, we start with the
-          // full.  Update our header to match that.
-          const monotonic_clock::time_point new_monotonic_start_time(
-              chrono::nanoseconds(
-                  reader->log_file_header()->monotonic_start_time()));
-          const realtime_clock::time_point new_realtime_start_time(
-              chrono::nanoseconds(
-                  reader->log_file_header()->realtime_start_time()));
-
-          if (monotonic_start_time() == monotonic_clock::min_time ||
-              (new_monotonic_start_time != monotonic_clock::min_time &&
-               new_monotonic_start_time < monotonic_start_time())) {
-            log_file_header_.mutable_message()->mutate_monotonic_start_time(
-                new_monotonic_start_time.time_since_epoch().count());
-            log_file_header_.mutable_message()->mutate_realtime_start_time(
-                new_realtime_start_time.time_since_epoch().count());
-            VLOG(1) << "Updated log file " << reader->filename()
-                    << " with node " << FlatbufferToJson(reader->node())
-                    << " start_time " << new_monotonic_start_time;
-          }
-        }
-      }
-    }
-
-    if (!found_node) {
-      LOG(WARNING) << "Failed to find log file for node "
-                   << FlatbufferToJson(target_node);
-      return false;
+  for (NodeData &node_data : nodes_data_) {
+    if (!node_data.any_delivered) continue;
+    if (node_data.channels[m->channel_index].delivered) {
+      // TODO(austin): This copies the data...  Probably not worth stressing
+      // about yet.
+      // TODO(austin): Bound how big this can get.  We tend not to send massive
+      // data, so we can probably ignore this for a bit.
+      node_data.channels[m->channel_index].messages.emplace_back(*m);
     }
   }
 
-  // Build up all the timestamp mergers.  This connects up all the
-  // SplitMessageReaders.
-  timestamp_mergers_.reserve(configuration()->channels()->size());
-  for (size_t channel_index = 0;
-       channel_index < configuration()->channels()->size(); ++channel_index) {
-    timestamp_mergers_.emplace_back(
-        configuration(), split_message_readers, channel_index,
-        configuration::GetNode(configuration(), target_node), this);
-  }
-
-  // And prime everything.
-  for (std::unique_ptr<SplitMessageReader> &split_message_reader :
-       split_message_readers_) {
-    split_message_reader->QueueMessages(
-        split_message_reader->monotonic_start_time());
-  }
-
-  node_ = configuration::GetNodeOrDie(configuration(), target_node);
+  messages_.emplace_back(std::move(*m));
   return true;
 }
 
-monotonic_clock::time_point ChannelMerger::OldestMessageTime() const {
-  if (channel_heap_.empty()) {
-    return monotonic_clock::max_time;
-  }
-  return channel_heap_.front().first;
-}
-
-void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
-                                    int channel_index) {
-  // Pop and recreate the heap if it has already been pushed.  And since we are
-  // pushing again, we don't need to clear pushed.
-  if (timestamp_mergers_[channel_index].pushed()) {
-    const auto channel_iterator = std::find_if(
-        channel_heap_.begin(), channel_heap_.end(),
-        [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
-          return x.second == channel_index;
-        });
-    DCHECK(channel_iterator != channel_heap_.end());
-    if (std::get<0>(*channel_iterator) == timestamp) {
-      // It's already in the heap, in the correct spot, so nothing
-      // more for us to do here.
-      return;
-    }
-    channel_heap_.erase(channel_iterator);
-    std::make_heap(channel_heap_.begin(), channel_heap_.end(),
-                   ChannelHeapCompare);
-  }
-
-  if (timestamp == monotonic_clock::min_time) {
-    timestamp_mergers_[channel_index].set_pushed(false);
-    return;
-  }
-
-  channel_heap_.push_back(std::make_pair(timestamp, channel_index));
-
-  // The default sort puts the newest message first.  Use a custom comparator to
-  // put the oldest message first.
-  std::push_heap(channel_heap_.begin(), channel_heap_.end(),
-                 ChannelHeapCompare);
-}
-
-void ChannelMerger::VerifyHeaps() {
-  std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
-      channel_heap_;
-  std::make_heap(channel_heap.begin(), channel_heap.end(), &ChannelHeapCompare);
-
-  for (size_t i = 0; i < channel_heap_.size(); ++i) {
-    CHECK(channel_heap_[i] == channel_heap[i]) << ": Heaps diverged...";
-    CHECK_EQ(
-        std::get<0>(channel_heap[i]),
-        timestamp_mergers_[std::get<1>(channel_heap[i])].channel_merger_time());
-  }
-}
-
-std::tuple<TimestampMerger::DeliveryTimestamp, int,
-           SizePrefixedFlatbufferVector<MessageHeader>>
-ChannelMerger::PopOldest() {
-  CHECK_GT(channel_heap_.size(), 0u);
-  std::pair<monotonic_clock::time_point, int> oldest_channel_data =
-      channel_heap_.front();
-  int channel_index = oldest_channel_data.second;
-  std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
-                &ChannelHeapCompare);
-  channel_heap_.pop_back();
-
-  timestamp_mergers_[channel_index].set_pushed(false);
-
-  TimestampMerger *merger = &timestamp_mergers_[channel_index];
-
-  // Merger handles any queueing needed from here.
-  std::tuple<TimestampMerger::DeliveryTimestamp,
-             SizePrefixedFlatbufferVector<MessageHeader>>
-      message = merger->PopOldest();
-  DCHECK_EQ(std::get<0>(message).monotonic_event_time,
-            oldest_channel_data.first)
-      << ": channel_heap_ was corrupted for " << channel_index << ": "
-      << DebugString();
-
-  CHECK_GE(std::get<0>(message).monotonic_event_time, last_popped_time_)
-      << ": " << MaybeNodeName(log_file_header()->node())
-      << "Messages came off the queue out of order. " << DebugString();
-  last_popped_time_ = std::get<0>(message).monotonic_event_time;
-
-  VLOG(1) << "Popped " << last_popped_time_ << " "
-          << configuration::StrippedChannelToString(
-                 configuration()->channels()->Get(channel_index))
-          << " (" << channel_index << ")";
-
-  return std::make_tuple(std::get<0>(message), channel_index,
-                         std::move(std::get<1>(message)));
-}
-
-std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
+std::string TimestampMapper::DebugString() const {
   std::stringstream ss;
-  for (size_t i = 0; i < data_.size(); ++i) {
-    if (i < 5 || i + 5 > data_.size()) {
-      if (timestamps) {
-        ss << "        msg: ";
-      } else {
-        ss << "        timestamp: ";
+  ss << "node " << node() << " [\n";
+  for (const Message &message : messages_) {
+    ss << "  " << message << "\n";
+  }
+  ss << "] queued_until " << queued_until_;
+  for (const NodeData &ns : nodes_data_) {
+    if (ns.peer == nullptr) continue;
+    ss << "\nnode " << ns.peer->node() << " remote_data [\n";
+    size_t channel_index = 0;
+    for (const NodeData::ChannelData &channel_data :
+         ns.peer->nodes_data_[node()].channels) {
+      if (channel_data.messages.empty()) {
+        continue;
       }
-      ss << monotonic_clock::time_point(
-                chrono::nanoseconds(data_[i].message().monotonic_sent_time()))
-         << " ("
-         << realtime_clock::time_point(
-                chrono::nanoseconds(data_[i].message().realtime_sent_time()))
-         << ") " << data_[i].message().queue_index();
-      if (timestamps) {
-        ss << "  <- remote "
-           << monotonic_clock::time_point(chrono::nanoseconds(
-                  data_[i].message().monotonic_remote_time()))
-           << " ("
-           << realtime_clock::time_point(chrono::nanoseconds(
-                  data_[i].message().realtime_remote_time()))
-           << ")";
+
+      ss << "  channel " << channel_index << " [\n";
+      for (const Message &m : channel_data.messages) {
+        ss << "    " << m << "\n";
       }
-      ss << "\n";
-    } else if (i == 5) {
-      ss << "        ...\n";
+      ss << "  ]\n";
+      ++channel_index;
     }
+    ss << "] queued_until " << ns.peer->queued_until_;
   }
-
-  return ss.str();
-}
-
-std::string SplitMessageReader::DebugString(int channel) const {
-  std::stringstream ss;
-  ss << "[\n";
-  ss << channels_[channel].data.DebugString();
-  ss << "      ]";
-  return ss.str();
-}
-
-std::string SplitMessageReader::DebugString(int channel, int node_index) const {
-  std::stringstream ss;
-  ss << "[\n";
-  ss << channels_[channel].timestamps[node_index].DebugString();
-  ss << "      ]";
-  return ss.str();
-}
-
-std::string TimestampMerger::DebugString() const {
-  std::stringstream ss;
-
-  if (timestamp_heap_.size() > 0) {
-    ss << "    timestamp_heap {\n";
-    std::vector<
-        std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
-        timestamp_heap = timestamp_heap_;
-    while (timestamp_heap.size() > 0u) {
-      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
-          oldest_timestamp_reader = timestamp_heap.front();
-
-      ss << "      " << std::get<2>(oldest_timestamp_reader) << " "
-         << std::get<0>(oldest_timestamp_reader) << " queue_index ("
-         << std::get<1>(oldest_timestamp_reader) << ") ttq "
-         << std::get<2>(oldest_timestamp_reader)->time_to_queue() << " "
-         << std::get<2>(oldest_timestamp_reader)->filename() << " -> "
-         << std::get<2>(oldest_timestamp_reader)
-                ->DebugString(channel_index_, node_index_)
-         << "\n";
-
-      std::pop_heap(timestamp_heap.begin(), timestamp_heap.end(),
-                    &SplitMessageReaderHeapCompare);
-      timestamp_heap.pop_back();
-    }
-    ss << "    }\n";
-  }
-
-  ss << "    message_heap {\n";
-  {
-    std::vector<
-        std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
-        message_heap = message_heap_;
-    while (!message_heap.empty()) {
-      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
-          oldest_message_reader = message_heap.front();
-
-      ss << "      " << std::get<2>(oldest_message_reader) << " "
-         << std::get<0>(oldest_message_reader) << " queue_index ("
-         << std::get<1>(oldest_message_reader) << ") ttq "
-         << std::get<2>(oldest_message_reader)->time_to_queue() << " "
-         << std::get<2>(oldest_message_reader)->filename() << " -> "
-         << std::get<2>(oldest_message_reader)->DebugString(channel_index_)
-         << "\n";
-
-      std::pop_heap(message_heap.begin(), message_heap.end(),
-                    &SplitMessageReaderHeapCompare);
-      message_heap.pop_back();
-    }
-  }
-  ss << "    }";
-
-  return ss.str();
-}
-
-std::string ChannelMerger::DebugString() const {
-  std::stringstream ss;
-  ss << "start_time " << realtime_start_time() << " " << monotonic_start_time()
-     << "\n";
-  ss << "channel_heap {\n";
-  std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
-      channel_heap_;
-  while (!channel_heap.empty()) {
-    std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
-    ss << "  " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
-       << configuration::CleanedChannelToString(
-              configuration()->channels()->Get(std::get<1>(channel)))
-       << "\n";
-
-    ss << timestamp_mergers_[std::get<1>(channel)].DebugString() << "\n";
-
-    std::pop_heap(channel_heap.begin(), channel_heap.end(),
-                  &ChannelHeapCompare);
-    channel_heap.pop_back();
-  }
-  ss << "}";
-
   return ss.str();
 }
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 985a6bc..fd600d3 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -14,6 +14,7 @@
 #include <utility>
 #include <vector>
 
+#include "absl/container/btree_set.h"
 #include "absl/types/span.h"
 #include "aos/containers/resizeable_buffer.h"
 #include "aos/events/event_loop.h"
@@ -280,6 +281,13 @@
 
   std::string_view filename() const { return message_reader_.filename(); }
 
+  // Returns the LogParts that holds the filenames we are reading.
+  const LogParts &parts() const { return parts_; }
+
+  const LogFileHeader *log_file_header() const {
+    return message_reader_.log_file_header();
+  }
+
   // Returns the minimum amount of data needed to queue up for sorting before
   // we are guarenteed to not see data out of order.
   std::chrono::nanoseconds max_out_of_order_duration() const {
@@ -309,452 +317,265 @@
   monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
 };
 
-class TimestampMerger;
+// Struct to hold a message as it gets sorted on a single node.
+struct Message {
+  // The channel.
+  uint32_t channel_index = 0xffffffff;
+  // The local queue index.
+  uint32_t queue_index = 0xffffffff;
+  // The local timestamp on the monotonic clock.
+  monotonic_clock::time_point timestamp = monotonic_clock::min_time;
+  // The data (either a timestamp header, or a data header).
+  SizePrefixedFlatbufferVector<MessageHeader> data;
 
-// A design requirement is that the relevant data for a channel is not more than
-// max_out_of_order_duration out of order. We approach sorting in layers.
-//
-// 1) Split each (maybe chunked) log file into one queue per channel.  Read this
-//    log file looking for data pertaining to a specific node.
-//    (SplitMessageReader)
-// 2) Merge all the data per channel from the different log files into a sorted
-//    list of timestamps and messages. (TimestampMerger)
-// 3) Combine the timestamps and messages. (TimestampMerger)
-// 4) Merge all the channels to produce the next message on a node.
-//    (ChannelMerger)
-// 5) Duplicate this entire stack per node.
-
-// This class splits messages and timestamps up into a queue per channel, and
-// handles reading data from multiple chunks.
-class SplitMessageReader {
- public:
-  SplitMessageReader(const std::vector<std::string> &filenames);
-
-  // Sets the TimestampMerger that gets notified for each channel.  The node
-  // that the TimestampMerger is merging as needs to be passed in.
-  void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
-                          const Node *target_node);
-
-  // Returns the (timestamp, queue_index, message_header) for the oldest message
-  // in a channel, or max_time if there is nothing in the channel.
-  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-  oldest_message(int channel) {
-    return channels_[channel].data.front_timestamp();
-  }
-
-  // Returns the (timestamp, queue_index, message_header) for the oldest
-  // delivery time in a channel, or max_time if there is nothing in the channel.
-  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-  oldest_message(int channel, int destination_node) {
-    return channels_[channel].timestamps[destination_node].front_timestamp();
-  }
-
-  // Returns the timestamp, queue_index, and message for the oldest data on a
-  // channel.  Requeues data as needed.
-  std::tuple<monotonic_clock::time_point, uint32_t,
-             SizePrefixedFlatbufferVector<MessageHeader>>
-  PopOldest(int channel_index);
-
-  // Returns the timestamp, queue_index, and message for the oldest timestamp on
-  // a channel delivered to a node.  Requeues data as needed.
-  std::tuple<monotonic_clock::time_point, uint32_t,
-             SizePrefixedFlatbufferVector<MessageHeader>>
-  PopOldestTimestamp(int channel, int node_index);
-
-  // Returns the header for the log files.
-  const LogFileHeader *log_file_header() const {
-    return &log_file_header_.message();
-  }
-
-  const SizePrefixedFlatbufferVector<LogFileHeader> &raw_log_file_header()
-      const {
-    return log_file_header_;
-  }
-
-  // Returns the starting time for this set of log files.
-  monotonic_clock::time_point monotonic_start_time() {
-    return monotonic_clock::time_point(
-        std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
-  }
-  realtime_clock::time_point realtime_start_time() {
-    return realtime_clock::time_point(
-        std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
-  }
-
-  // Returns the configuration from the log file header.
-  const Configuration *configuration() const {
-    return log_file_header()->configuration();
-  }
-
-  // Returns the node who's point of view this log file is from.  Make sure this
-  // is a pointer in the configuration() nodes list so it can be consumed
-  // elsewhere.
-  const Node *node() const {
-    if (configuration()->has_nodes()) {
-      return configuration::GetNodeOrDie(configuration(),
-                                         log_file_header()->node());
-    } else {
-      CHECK(!log_file_header()->has_node());
-      return nullptr;
-    }
-  }
-
-  // Returns the timestamp of the newest message read from the log file, and the
-  // timestamp that we need to re-queue data.
-  monotonic_clock::time_point newest_timestamp() const {
-    return newest_timestamp_;
-  }
-
-  // Returns the next time to trigger a requeue.
-  monotonic_clock::time_point time_to_queue() const { return time_to_queue_; }
-
-  // Returns the minimum amount of data needed to queue up for sorting before
-  // we are guarenteed to not see data out of order.
-  std::chrono::nanoseconds max_out_of_order_duration() const {
-    return message_reader_->max_out_of_order_duration();
-  }
-
-  std::string_view filename() const { return message_reader_->filename(); }
-
-  // Adds more messages to the sorted list.  This reads enough data such that
-  // oldest_message_time can be replayed safely.  Returns false if the log file
-  // has all been read.
-  bool QueueMessages(monotonic_clock::time_point oldest_message_time);
-
-  // Returns debug strings for a channel, and timestamps for a node.
-  std::string DebugString(int channel) const;
-  std::string DebugString(int channel, int node_index) const;
-
-  // Returns true if all the messages have been queued from the last log file in
-  // the list of log files chunks.
-  bool at_end() const { return at_end_; }
-
- private:
-  // TODO(austin): Need to copy or refcount the message instead of running
-  // multiple copies of the reader.  Or maybe have a "as_node" index and hide it
-  // inside.
-
-  // Moves to the next log file in the list.
-  bool NextLogFile();
-
-  // Filenames of the log files.
-  std::vector<std::string> filenames_;
-  // And the index of the next file to open.
-  size_t next_filename_index_ = 0;
-
-  // Node we are reading as.
-  const Node *target_node_ = nullptr;
-
-  // Log file header to report.  This is a copy.
-  SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
-  // Current log file being read.
-  std::unique_ptr<MessageReader> message_reader_;
-
-  // Datastructure to hold the list of messages, cached timestamp for the
-  // oldest message, and sender to send with.
-  struct MessageHeaderQueue {
-    // If true, this is a timestamp queue.
-    bool timestamps = false;
-
-    // Returns a reference to the the oldest message.
-    SizePrefixedFlatbufferVector<MessageHeader> &front() {
-      CHECK_GT(data_.size(), 0u);
-      return data_.front();
-    }
-
-    // Adds a message to the back of the queue. Returns true if it was actually
-    // emplaced.
-    bool emplace_back(SizePrefixedFlatbufferVector<MessageHeader> &&msg);
-
-    // Drops the front message.  Invalidates the front() reference.
-    void PopFront();
-
-    // The size of the queue.
-    size_t size() { return data_.size(); }
-
-    // Returns a debug string with info about each message in the queue.
-    std::string DebugString() const;
-
-    // Returns the (timestamp, queue_index, message_header) for the oldest
-    // message.
-    const std::tuple<monotonic_clock::time_point, uint32_t,
-                     const MessageHeader *>
-    front_timestamp() {
-      const MessageHeader &message = front().message();
-      return std::make_tuple(
-          monotonic_clock::time_point(
-              std::chrono::nanoseconds(message.monotonic_sent_time())),
-          message.queue_index(), &message);
-    }
-
-    // Pointer to the timestamp merger for this queue if available.
-    TimestampMerger *timestamp_merger = nullptr;
-    // Pointer to the reader which feeds this queue.
-    SplitMessageReader *split_reader = nullptr;
-
-   private:
-    // The data.
-    std::deque<SizePrefixedFlatbufferVector<MessageHeader>> data_;
-  };
-
-  // All the queues needed for a channel.  There isn't going to be data in all
-  // of these.
-  struct ChannelData {
-    // The data queue for the channel.
-    MessageHeaderQueue data;
-    // Queues for timestamps for each node.
-    std::vector<MessageHeaderQueue> timestamps;
-  };
-
-  // Data for all the channels.
-  std::vector<ChannelData> channels_;
-
-  // Once we know the node that this SplitMessageReader will be writing as,
-  // there will be only one MessageHeaderQueue that a specific channel matches.
-  // Precompute this here for efficiency.
-  std::vector<MessageHeaderQueue *> channels_to_write_;
-
-  monotonic_clock::time_point time_to_queue_ = monotonic_clock::min_time;
-
-  // Latches true when we hit the end of the last log file and there is no sense
-  // poking it further.
-  bool at_end_ = false;
-
-  // Timestamp of the newest message that was read and actually queued.  We want
-  // to track this independently from the log file because we need the
-  // timestamps here to be timestamps of messages that are queued.
-  monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+  bool operator<(const Message &m2) const;
+  bool operator>=(const Message &m2) const;
+  bool operator==(const Message &m2) const;
 };
 
-class ChannelMerger;
+std::ostream &operator<<(std::ostream &os, const Message &m);
 
-// Sorts channels (and timestamps) from multiple log files for a single channel.
-class TimestampMerger {
- public:
-  TimestampMerger(const Configuration *configuration,
-                  std::vector<SplitMessageReader *> split_message_readers,
-                  int channel_index, const Node *target_node,
-                  ChannelMerger *channel_merger);
+// Structure to hold a full message and all the timestamps, which may or may not
+// have been sent from a remote node.  The remote_queue_index will be invalid if
+// this message is from the point of view of the node which sent it.
+struct TimestampedMessage {
+  uint32_t channel_index = 0xffffffff;
 
-  // Metadata used to schedule the message.
-  struct DeliveryTimestamp {
-    monotonic_clock::time_point monotonic_event_time =
-        monotonic_clock::min_time;
-    realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
-    uint32_t queue_index = 0xffffffff;
+  uint32_t queue_index = 0xffffffff;
+  monotonic_clock::time_point monotonic_event_time = monotonic_clock::min_time;
+  realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
 
-    monotonic_clock::time_point monotonic_remote_time =
-        monotonic_clock::min_time;
-    realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
-    uint32_t remote_queue_index = 0xffffffff;
-  };
+  uint32_t remote_queue_index = 0xffffffff;
+  monotonic_clock::time_point monotonic_remote_time = monotonic_clock::min_time;
+  realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
 
-  // Pushes SplitMessageReader onto the timestamp heap.  This should only be
-  // called when timestamps are placed in the channel this class is merging for
-  // the reader.
-  void UpdateTimestamp(
-      SplitMessageReader *split_message_reader,
-      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-          oldest_message_time) {
-    PushTimestampHeap(oldest_message_time, split_message_reader);
-  }
-  // Pushes SplitMessageReader onto the message heap.  This should only be
-  // called when data is placed in the channel this class is merging for the
-  // reader.
-  void Update(
-      SplitMessageReader *split_message_reader,
-      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-          oldest_message_time) {
-    PushMessageHeap(oldest_message_time, split_message_reader);
-  }
-
-  // Returns the oldest combined timestamp and data for this channel.  If there
-  // isn't a matching piece of data, returns only the timestamp with no data.
-  // The caller can determine what the appropriate action is to recover.
-  std::tuple<DeliveryTimestamp, SizePrefixedFlatbufferVector<MessageHeader>>
-  PopOldest();
-
-  // Tracks if the channel merger has pushed this onto it's heap or not.
-  bool pushed() { return pushed_; }
-  // Sets if this has been pushed to the channel merger heap.  Should only be
-  // called by the channel merger.
-  void set_pushed(bool pushed) { pushed_ = pushed; }
-
-  // Returns a debug string with the heaps printed out.
-  std::string DebugString() const;
-
-  // Returns true if we have timestamps.
-  bool has_timestamps() const { return has_timestamps_; }
-
-  // Records that one of the log files ran out of data.  This should only be
-  // called by a SplitMessageReader.
-  void NoticeAtEnd();
-
-  aos::monotonic_clock::time_point channel_merger_time() {
-    if (has_timestamps_) {
-      return std::get<0>(timestamp_heap_[0]);
-    } else {
-      return std::get<0>(message_heap_[0]);
-    }
-  }
-
- private:
-  // Pushes messages and timestamps to the corresponding heaps.
-  void PushMessageHeap(
-      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-          timestamp,
-      SplitMessageReader *split_message_reader);
-  void PushTimestampHeap(
-      std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-          timestamp,
-      SplitMessageReader *split_message_reader);
-
-  // Pops a message from the message heap.  This automatically triggers the
-  // split message reader to re-fetch any new data.
-  std::tuple<monotonic_clock::time_point, uint32_t,
-             SizePrefixedFlatbufferVector<MessageHeader>>
-  PopMessageHeap();
-
-  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-  oldest_message() const;
-  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-  oldest_timestamp() const;
-  // Pops a message from the timestamp heap.  This automatically triggers the
-  // split message reader to re-fetch any new data.
-  std::tuple<monotonic_clock::time_point, uint32_t,
-             SizePrefixedFlatbufferVector<MessageHeader>>
-  PopTimestampHeap();
-
-  const Configuration *configuration_;
-
-  // If true, this is a forwarded channel and timestamps should be matched.
-  bool has_timestamps_ = false;
-
-  // Tracks if the ChannelMerger has pushed this onto it's queue.
-  bool pushed_ = false;
-
-  // The split message readers used for source data.
-  std::vector<SplitMessageReader *> split_message_readers_;
-
-  // The channel to merge.
-  int channel_index_;
-
-  // Our node.
-  int node_index_;
-
-  // Heaps for messages and timestamps.
-  std::vector<
-      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
-      message_heap_;
-  std::vector<
-      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
-      timestamp_heap_;
-
-  // Parent channel merger.
-  ChannelMerger *channel_merger_;
+  SizePrefixedFlatbufferVector<MessageHeader> data;
 };
 
-// This class handles constructing all the split message readers, channel
-// mergers, and combining the results.
-class ChannelMerger {
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
+
+// Class to sort the resulting messages from a PartsMessageReader.
+class LogPartsSorter {
  public:
-  // Builds a ChannelMerger around a set of log files.  These are of the format:
-  //   {
-  //     {log1_part0, log1_part1, ...},
-  //     {log2}
-  //   }
-  // The inner vector is a list of log file chunks which form up a log file.
-  // The outer vector is a list of log files with subsets of the messages, or
-  // messages from different nodes.
-  ChannelMerger(const std::vector<std::vector<std::string>> &filenames);
+  LogPartsSorter(LogParts log_parts);
 
-  // Returns the nodes that we know how to merge.
-  const std::vector<const Node *> nodes() const;
-  // Sets the node that we will return messages as.  Returns true if the node
-  // has log files and will produce data.  This can only be called once, and
-  // will likely corrupt state if called a second time.
-  bool SetNode(const Node *target_node);
-
-  // Everything else needs the node set before it works.
-
-  // Returns a timestamp for the oldest message in this group of logfiles.
-  monotonic_clock::time_point OldestMessageTime() const;
-  // Pops the oldest message.
-  std::tuple<TimestampMerger::DeliveryTimestamp, int,
-             SizePrefixedFlatbufferVector<MessageHeader>>
-  PopOldest();
-
-  // Returns the config for this set of log files.
-  const Configuration *configuration() const {
-    return log_file_header()->configuration();
-  }
-
+  // Returns the current log file header.
+  // TODO(austin): Is this the header we want to report?  Do we want a better
+  // start time?
+  // TODO(austin): Report a start time from the LogParts.  Figure out how that
+  // all works.
   const LogFileHeader *log_file_header() const {
-    return &log_file_header_.message();
+    return parts_message_reader_.log_file_header();
   }
 
-  // Returns the start times for the configured node's log files.
   monotonic_clock::time_point monotonic_start_time() const {
-    return monotonic_clock::time_point(
-        std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
+    return parts_message_reader_.parts().monotonic_start_time;
   }
   realtime_clock::time_point realtime_start_time() const {
-    return realtime_clock::time_point(
-        std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
+    return parts_message_reader_.parts().realtime_start_time;
   }
 
-  // Returns the node set by SetNode above.
-  const Node *node() const { return node_; }
+  // The time this data is sorted until.
+  monotonic_clock::time_point sorted_until() const { return sorted_until_; }
 
-  // Called by the TimestampMerger when new data is available with the provided
-  // timestamp and channel_index.
-  void Update(monotonic_clock::time_point timestamp, int channel_index) {
-    PushChannelHeap(timestamp, channel_index);
-  }
+  // Returns the next sorted message from the log file.  It is safe to call
+  // std::move() on the result to move the data flatbuffer from it.
+  Message *Front();
+  // Pops the front message.  This should only be called after a call to
+  // Front().
+  void PopFront();
 
-  // Returns a debug string with all the heaps in it.  Generally only useful for
-  // debugging what went wrong.
+  // Returns a debug string representing the contents of this sorter.
   std::string DebugString() const;
 
-  // Returns true if one of the log files has finished reading everything.  When
-  // log file chunks are involved, this means that the last chunk in a log file
-  // has been read.  It is acceptable to be missing data at this point in time.
-  bool at_end() const { return at_end_; }
+ private:
+  // Log parts reader we are wrapping.
+  PartsMessageReader parts_message_reader_;
+  // Cache of the time we are sorted until.
+  aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
 
-  // Marks that one of the log files is at the end.  This should only be called
-  // by timestamp mergers.
-  void NoticeAtEnd() { at_end_ = true; }
+  // Timestamp of the last message returned.  Used to make sure nothing goes
+  // backwards.
+  monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
+
+  // Set used for efficient sorting of messages.  We can benchmark and evaluate
+  // other data structures if this proves to be the bottleneck.
+  absl::btree_set<Message> messages_;
+};
+
+// Class to run merge sort on the messages from multiple LogPartsSorter
+// instances.
+class NodeMerger {
+ public:
+  NodeMerger(std::vector<LogParts> parts);
+
+  // Node index in the configuration of this node.
+  int node() const { return node_; }
+
+  // The log file header for one of the log files.
+  const LogFileHeader *log_file_header() const {
+    CHECK(!parts_sorters_.empty());
+    return parts_sorters_[0].log_file_header();
+  }
+
+  monotonic_clock::time_point monotonic_start_time() const {
+    return monotonic_start_time_;
+  }
+  realtime_clock::time_point realtime_start_time() const {
+    return realtime_start_time_;
+  }
+
+  // The time this data is sorted until.
+  monotonic_clock::time_point sorted_until() const { return sorted_until_; }
+
+  // Returns the next sorted message from the set of log files.  It is safe to
+  // call std::move() on the result to move the data flatbuffer from it.
+  Message *Front();
+  // Pops the front message.  This should only be called after a call to
+  // Front().
+  void PopFront();
 
  private:
-  // Pushes the timestamp for new data on the provided channel.
-  void PushChannelHeap(monotonic_clock::time_point timestamp,
-                       int channel_index);
+  // Unsorted list of all parts sorters.
+  std::vector<LogPartsSorter> parts_sorters_;
+  // Pointer to the parts sorter holding the current Front message if one
+  // exists, or nullptr if a new one needs to be found.
+  LogPartsSorter *current_ = nullptr;
+  // Cached sorted_until value.
+  aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
 
-  // CHECKs that channel_heap_ and timestamp_heap_ are valid heaps.
-  void VerifyHeaps();
+  // Cached node.
+  int node_;
 
-  // All the message readers.
-  std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
+  // Timestamp of the last message returned.  Used to make sure nothing goes
+  // backwards.
+  monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
 
-  // The log header we are claiming to be.
-  SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
+  realtime_clock::time_point realtime_start_time_ = realtime_clock::max_time;
+  monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::max_time;
+};
 
-  // The timestamp mergers which combine data from the split message readers.
-  std::vector<TimestampMerger> timestamp_mergers_;
+// Class to match timestamps with the corresponding data from other nodes.
+class TimestampMapper {
+ public:
+  TimestampMapper(std::vector<LogParts> file);
 
-  // A heap of the channel readers and timestamps for the oldest data in each.
-  std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+  // Copying and moving will mess up the internal raw pointers.  Just don't do
+  // it.
+  TimestampMapper(TimestampMapper const &) = delete;
+  TimestampMapper(TimestampMapper &&) = delete;
+  void operator=(TimestampMapper const &) = delete;
+  void operator=(TimestampMapper &&) = delete;
 
-  // Configured node.
-  const Node *node_;
+  // TODO(austin): It would be super helpful to provide a way to queue up to
+  // time X without matching timestamps, and to then be able to pull the
+  // timestamps out of this queue.  This lets us bootstrap time estimation
+  // without exploding memory usage worst case.
 
-  bool at_end_ = false;
+  // Returns a log file header for this node.
+  const LogFileHeader *log_file_header() const {
+    return node_merger_.log_file_header();
+  }
 
-  // Cached copy of the list of nodes.
-  std::vector<const Node *> nodes_;
+  // Returns which node this is sorting for.
+  size_t node() const { return node_merger_.node(); }
 
-  // Last time popped.  Used to detect events being returned out of order.
-  monotonic_clock::time_point last_popped_time_ = monotonic_clock::min_time;
+  // The start time of this log.
+  monotonic_clock::time_point monotonic_start_time() const {
+    return node_merger_.monotonic_start_time();
+  }
+  realtime_clock::time_point realtime_start_time() const {
+    return node_merger_.realtime_start_time();
+  }
+
+  // Uses timestamp_mapper as the peer for its node. Only one mapper may be set
+  // for each node.  Peers are used to look up the data for timestamps on this
+  // node.
+  void AddPeer(TimestampMapper *timestamp_mapper);
+
+  // Time that we are sorted until internally.
+  monotonic_clock::time_point sorted_until() const {
+    return node_merger_.sorted_until();
+  }
+
+  // Returns the next message for this node.
+  TimestampedMessage *Front();
+  // Pops the next message.  Front must be called first.
+  void PopFront();
+
+  // Returns debug information about this node.
+  std::string DebugString() const;
+
+ private:
+  // The state for a remote node.  This holds the data that needs to be matched
+  // with the remote node's timestamps.
+  struct NodeData {
+    // True if we should save data here.  This should be true if any of the
+    // bools in delivered below are true.
+    bool any_delivered = false;
+
+    // Peer pointer.  This node is only to be considered if a peer is set.
+    TimestampMapper *peer = nullptr;
+
+    struct ChannelData {
+      // Deque per channel.  This contains the data from the outside
+      // TimestampMapper node which is relevant for the node this NodeData
+      // points to.
+      std::deque<Message> messages;
+      // Bool tracking per channel if a message is delivered to the node this
+      // NodeData represents.
+      bool delivered = false;
+    };
+
+    // Vector with per channel data.
+    std::vector<ChannelData> channels;
+  };
+
+  // Returns (and forgets about) the data for the provided timestamp message
+  // showing when it was delivered to this node.
+  Message MatchingMessageFor(const Message &message);
+
+  // Queues up a single message into our message queue, and any nodes that this
+  // message is delivered to.  Returns true if one was available, false
+  // otherwise.
+  bool Queue();
+
+  // Queues up data until we have at least one message >= to time t.
+  // Useful for triggering a remote node to read enough data to have the
+  // timestamp you care about available.
+  void QueueUntil(monotonic_clock::time_point t);
+
+  // Fills message_ with the contents of m.
+  void FillMessage(Message *m);
+
+  // The node merger to source messages from.
+  NodeMerger node_merger_;
+  // The buffer of messages for this node.  These are not matched with any
+  // remote data.
+  std::deque<Message> messages_;
+  // The node index for the source node for each channel.
+  std::vector<size_t> source_node_;
+
+  // Vector per node.  Not all nodes will have anything.
+  std::vector<NodeData> nodes_data_;
+
+  // Latest message to return.
+  TimestampedMessage message_;
+
+  // Tracks if the first message points to message_, nullptr (all done), or is
+  // invalid.
+  enum class FirstMessage {
+    kNeedsUpdate,
+    kInMessage,
+    kNullptr,
+  };
+  FirstMessage first_message_ = FirstMessage::kNeedsUpdate;
+
+  // Timestamp of the last message returned.  Used to make sure nothing goes
+  // backwards.
+  monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
+  // Time this node is queued up until.  Used for caching.
+  monotonic_clock::time_point queued_until_ = monotonic_clock::min_time;
 };
 
 // Returns the node name with a trailing space, or an empty string if we are on
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 14d1de7..1d83466 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -5,6 +5,7 @@
 
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/test_message_generated.h"
+#include "aos/flatbuffer_merge.h"
 #include "aos/flatbuffers.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/testing/tmpdir.h"
@@ -31,11 +32,9 @@
   unlink(logfile.c_str());
 
   const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
-      JsonToSizedFlatbuffer<TestMessage>(
-          R"({ "value": 1 })");
+      JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
   const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
-      JsonToSizedFlatbuffer<TestMessage>(
-          R"({ "value": 2 })");
+      JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
 
   {
     DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
@@ -215,6 +214,896 @@
   EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
 }
 
+// Tests that Message's operator < works as expected.
+TEST(MessageTest, Sorting) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+
+  Message m1{.channel_index = 0,
+             .queue_index = 0,
+             .timestamp = e + chrono::milliseconds(1),
+             .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+  Message m2{.channel_index = 0,
+             .queue_index = 0,
+             .timestamp = e + chrono::milliseconds(2),
+             .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+
+  EXPECT_LT(m1, m2);
+  EXPECT_GE(m2, m1);
+
+  m1.timestamp = e;
+  m2.timestamp = e;
+
+  m1.channel_index = 1;
+  m2.channel_index = 2;
+
+  EXPECT_LT(m1, m2);
+  EXPECT_GE(m2, m1);
+
+  m1.channel_index = 0;
+  m2.channel_index = 0;
+  m1.queue_index = 0;
+  m2.queue_index = 1;
+
+  EXPECT_LT(m1, m2);
+  EXPECT_GE(m2, m1);
+}
+
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+    const aos::FlatbufferDetachedBuffer<Configuration> &config,
+    const std::string_view json) {
+  flatbuffers::FlatBufferBuilder fbb;
+  flatbuffers::Offset<Configuration> config_offset =
+      aos::CopyFlatBuffer(config, &fbb);
+  LogFileHeader::Builder header_builder(fbb);
+  header_builder.add_configuration(config_offset);
+  fbb.Finish(header_builder.Finish());
+  aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
+
+  aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
+      JsonToFlatbuffer<LogFileHeader>(json));
+  CHECK(header_updates.Verify());
+  flatbuffers::FlatBufferBuilder fbb2;
+  fbb2.FinishSizePrefixed(
+      aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
+  return fbb2.Release();
+}
+
+class SortingElementTest : public ::testing::Test {
+ public:
+  SortingElementTest()
+      : config_(JsonToFlatbuffer<Configuration>(
+            R"({
+  "channels": [
+    {
+      "name": "/a",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2"
+        },
+        {
+          "name": "pi3"
+        }
+      ]
+    },
+    {
+      "name": "/b",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/c",
+      "type": "aos.logger.testing.TestMessage",
+      "source_node": "pi1"
+    }
+  ],
+  "nodes": [
+    {
+      "name": "pi1"
+    },
+    {
+      "name": "pi2"
+    },
+    {
+      "name": "pi3"
+    }
+  ]
+}
+)")),
+        config0_(MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+  "parts_index": 0
+})")),
+        config1_(MakeHeader(config_,
+                            R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+  "parts_index": 0
+})")),
+        config2_(MakeHeader(config_,
+                            R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi2"
+  },
+  "monotonic_start_time": 0,
+  "realtime_start_time": 1000000000000,
+  "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
+  "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
+  "parts_index": 0
+})")),
+        config3_(MakeHeader(config_,
+                            R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi1"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 2000000,
+  "realtime_start_time": 1000000000,
+  "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
+  "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
+  "parts_index": 0
+})")) {
+    unlink(logfile0_.c_str());
+    unlink(logfile1_.c_str());
+    unlink(logfile2_.c_str());
+    queue_index_.resize(kChannels);
+  }
+
+ protected:
+  static constexpr size_t kChannels = 3u;
+
+  flatbuffers::DetachedBuffer MakeLogMessage(
+      const aos::monotonic_clock::time_point monotonic_now, int channel_index,
+      int value) {
+    flatbuffers::FlatBufferBuilder message_fbb;
+    message_fbb.ForceDefaults(true);
+    TestMessage::Builder test_message_builder(message_fbb);
+    test_message_builder.add_value(value);
+    message_fbb.Finish(test_message_builder.Finish());
+
+    aos::Context context;
+    context.monotonic_event_time = monotonic_now;
+    context.realtime_event_time = aos::realtime_clock::epoch() +
+                                  chrono::seconds(1000) +
+                                  monotonic_now.time_since_epoch();
+    context.queue_index = queue_index_[channel_index];
+    context.size = message_fbb.GetSize();
+    context.data = message_fbb.GetBufferPointer();
+
+    ++queue_index_[channel_index];
+
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.FinishSizePrefixed(
+        PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
+
+    return fbb.Release();
+  }
+
+  flatbuffers::DetachedBuffer MakeTimestampMessage(
+      const aos::monotonic_clock::time_point sender_monotonic_now,
+      int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
+    aos::Context context;
+    context.monotonic_remote_time = sender_monotonic_now;
+    context.realtime_remote_time = aos::realtime_clock::epoch() +
+                                   chrono::seconds(1000) +
+                                   sender_monotonic_now.time_since_epoch();
+    context.remote_queue_index = queue_index_[channel_index] - 1;
+    context.monotonic_event_time =
+        sender_monotonic_now + receiver_monotonic_offset;
+    context.realtime_event_time =
+        aos::realtime_clock::epoch() + chrono::seconds(1000) +
+        context.monotonic_event_time.time_since_epoch();
+    context.queue_index = queue_index_[channel_index] - 1 + 100;
+    context.size = 0;
+    context.data = nullptr;
+
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
+                                       LogType::kLogDeliveryTimeOnly));
+    LOG(INFO) << aos::FlatbufferToJson(
+        aos::SizePrefixedFlatbufferSpan<MessageHeader>(
+            absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
+
+    return fbb.Release();
+  }
+
+  const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
+  const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
+  const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
+
+  const aos::FlatbufferDetachedBuffer<Configuration> config_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
+
+  std::vector<uint32_t> queue_index_;
+};
+
+using LogPartsSorterTest = SortingElementTest;
+using LogPartsSorterDeathTest = LogPartsSorterTest;
+using NodeMergerTest = SortingElementTest;
+using TimestampMapperTest = SortingElementTest;
+
+// Tests that we can pull messages out of a log sorted in order.
+TEST_F(LogPartsSorterTest, Pull) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_});
+
+  LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+  // Confirm we aren't sorted until any time until the message is popped.
+  // Peeking shouldn't change the sorted until time.
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+
+  std::deque<Message> output;
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  output.emplace_back(std::move(*parts_sorter.Front()));
+  parts_sorter.PopFront();
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(parts_sorter.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+  EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
+  EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+}
+
+// Tests that we can pull messages out of a log sorted in order.
+TEST_F(LogPartsSorterTest, WayBeforeStart) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_});
+
+  LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+  // Confirm we aren't sorted until any time until the message is popped.
+  // Peeking shouldn't change the sorted until time.
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+
+  std::deque<Message> output;
+
+  for (monotonic_clock::time_point t :
+       {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
+        e + chrono::milliseconds(1900), monotonic_clock::max_time,
+        monotonic_clock::max_time}) {
+    ASSERT_TRUE(parts_sorter.Front() != nullptr);
+    output.emplace_back(std::move(*parts_sorter.Front()));
+    parts_sorter.PopFront();
+    EXPECT_EQ(parts_sorter.sorted_until(), t);
+  }
+
+  ASSERT_TRUE(parts_sorter.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e - chrono::milliseconds(1000));
+  EXPECT_EQ(output[1].timestamp, e - chrono::milliseconds(500));
+  EXPECT_EQ(output[2].timestamp, e - chrono::milliseconds(10));
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(1901));
+  EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(2000));
+}
+
+// Tests that messages too far out of order trigger death.
+TEST_F(LogPartsSorterDeathTest, Pull) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0_.span());
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
+    // The following message is too far out of order and will trigger the CHECK.
+    writer.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_});
+
+  LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+  // Confirm we aren't sorted until any time until the message is popped.
+  // Peeking shouldn't change the sorted until time.
+  EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+  std::deque<Message> output;
+
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  parts_sorter.PopFront();
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  ASSERT_TRUE(parts_sorter.Front() != nullptr);
+  parts_sorter.PopFront();
+
+  EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
+}
+
+// Tests that we can merge data from 2 separate files, including duplicate data.
+TEST_F(NodeMergerTest, TwoFileMerger) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config1_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
+
+    // Make a duplicate!
+    SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer0.QueueSpan(msg.span());
+    writer1.QueueSpan(msg.span());
+
+    writer1.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  ASSERT_EQ(parts.size(), 1u);
+
+  NodeMerger merger(FilterPartsForNode(parts, "pi1"));
+
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+
+  std::deque<Message> output;
+
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+  ASSERT_TRUE(merger.Front() != nullptr);
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
+
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(merger.Front() != nullptr);
+  output.emplace_back(std::move(*merger.Front()));
+  merger.PopFront();
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
+
+  ASSERT_TRUE(merger.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+  EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1001));
+  EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1002));
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+  EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(3000));
+  EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
+}
+
+// Tests that we can match timestamps on delivered messages.
+TEST_F(TimestampMapperTest, ReadNode0First) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config2_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+  ASSERT_EQ(parts[1].logger_node, "pi2");
+
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(1900));
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(2900));
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(),
+              e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(),
+              e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(output1[0].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_EQ(output1[1].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(2000));
+    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_EQ(output1[2].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_TRUE(output1[2].data.Verify());
+  }
+}
+
+// Tests that we can match timestamps on delivered messages.  By doing this in
+// the reverse order, the second node needs to queue data up from the first node
+// to find the matching timestamp.
+TEST_F(TimestampMapperTest, ReadNode1First) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config2_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+  ASSERT_EQ(parts[1].logger_node, "pi2");
+
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(),
+              e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(),
+              e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(output1[0].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_EQ(output1[1].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(2000));
+    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_EQ(output1[2].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_TRUE(output1[2].data.Verify());
+  }
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+    ASSERT_TRUE(mapper0.Front() != nullptr);
+    output0.emplace_back(std::move(*mapper0.Front()));
+    mapper0.PopFront();
+    EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+}
+
+// Tests that we return just the timestamps if we couldn't find the data and the
+// missing data was at the beginning of the file.
+TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config2_.span());
+
+    MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+  ASSERT_EQ(parts[1].logger_node, "pi2");
+
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(),
+              e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(),
+              e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(output1[0].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_FALSE(output1[0].data.Verify());
+    EXPECT_EQ(output1[1].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(2000));
+    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_EQ(output1[2].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_TRUE(output1[2].data.Verify());
+  }
+}
+
+// Tests that we return just the timestamps if we couldn't find the data and the
+// missing data was at the end of the file.
+TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config2_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+    MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+  ASSERT_EQ(parts[1].logger_node, "pi2");
+
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(),
+              e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(),
+              e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+    ASSERT_TRUE(mapper1.Front() != nullptr);
+    output1.emplace_back(std::move(*mapper1.Front()));
+    mapper1.PopFront();
+    EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(output1[0].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_EQ(output1[1].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(2000));
+    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_EQ(output1[2].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_FALSE(output1[2].data.Verify());
+  }
+}
+
+// Tests that we properly sort log files with duplicate timestamps.
+TEST_F(TimestampMapperTest, ReadSameTimestamp) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config2_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  ASSERT_EQ(parts[0].logger_node, "pi1");
+  ASSERT_EQ(parts[1].logger_node, "pi2");
+
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    for (int i = 0; i < 4; ++i) {
+      ASSERT_TRUE(mapper1.Front() != nullptr);
+      output1.emplace_back(std::move(*mapper1.Front()));
+      mapper1.PopFront();
+    }
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(output1[0].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_EQ(output1[1].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(2000));
+    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_EQ(output1[2].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(2000));
+    EXPECT_TRUE(output1[2].data.Verify());
+    EXPECT_EQ(output1[3].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_TRUE(output1[3].data.Verify());
+  }
+}
+
+// Tests that we properly sort log files with duplicate timestamps.
+TEST_F(TimestampMapperTest, StartTime) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config1_.span());
+    DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
+    writer2.QueueSpan(config3_.span());
+  }
+
+  const std::vector<LogFile> parts =
+      SortParts({logfile0_, logfile1_, logfile2_});
+
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+  EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
+  EXPECT_EQ(mapper0.realtime_start_time(),
+            realtime_clock::time_point(chrono::seconds(1000)));
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index b32c748..5a6bb8f 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -38,11 +38,12 @@
 namespace {
 // Helper to safely read a header, or CHECK.
 SizePrefixedFlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
-    const std::vector<std::vector<std::string>> &filenames) {
-  CHECK_GE(filenames.size(), 1u) << ": Empty filenames list";
-  CHECK_GE(filenames[0].size(), 1u) << ": Empty filenames list";
+    const std::vector<LogFile> &log_files) {
+  CHECK_GE(log_files.size(), 1u) << ": Empty filenames list";
+  CHECK_GE(log_files[0].parts.size(), 1u) << ": Empty filenames list";
+  CHECK_GE(log_files[0].parts[0].parts.size(), 1u) << ": Empty filenames list";
   std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> result =
-      ReadHeader(filenames[0][0]);
+      ReadHeader(log_files[0].parts[0].parts[0]);
   CHECK(result);
   return result.value();
 }
@@ -717,24 +718,12 @@
 
 LogReader::LogReader(std::string_view filename,
                      const Configuration *replay_configuration)
-    : LogReader(std::vector<std::string>{std::string(filename)},
-                replay_configuration) {}
+    : LogReader(SortParts({std::string(filename)}), replay_configuration) {}
 
-LogReader::LogReader(const std::vector<std::string> &filenames,
+LogReader::LogReader(std::vector<LogFile> log_files,
                      const Configuration *replay_configuration)
-    : LogReader(std::vector<std::vector<std::string>>{filenames},
-                replay_configuration) {}
-
-// TODO(austin): Make this the base and kill the others.  This has much better
-// context for sorting.
-LogReader::LogReader(const std::vector<LogFile> &log_files,
-                     const Configuration *replay_configuration)
-    : LogReader(ToLogReaderVector(log_files), replay_configuration) {}
-
-LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
-                     const Configuration *replay_configuration)
-    : filenames_(filenames),
-      log_file_header_(MaybeReadHeaderOrDie(filenames)),
+    : log_files_(std::move(log_files)),
+      log_file_header_(MaybeReadHeaderOrDie(log_files_)),
       replay_configuration_(replay_configuration) {
   MakeRemappedConfig();
 
@@ -762,8 +751,8 @@
   }
 
   if (!configuration::MultiNode(configuration())) {
-    states_.emplace_back(
-        std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
+    states_.emplace_back(std::make_unique<State>(
+        std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, ""))));
   } else {
     if (replay_configuration) {
       CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -851,8 +840,12 @@
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
         configuration::GetNodeIndex(configuration(), node);
-    states_[node_index] =
-        std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
+    std::vector<LogParts> filtered_parts = FilterPartsForNode(
+        log_files_, node != nullptr ? node->name()->string_view() : "");
+    states_[node_index] = std::make_unique<State>(
+        filtered_parts.size() == 0u
+            ? nullptr
+            : std::make_unique<TimestampMapper>(std::move(filtered_parts)));
     State *state = states_[node_index].get();
     state->set_event_loop(state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node)));
@@ -860,6 +853,20 @@
     state->SetChannelCount(logged_configuration()->channels()->size());
   }
 
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    State *state = states_[node_index].get();
+    for (const Node *other_node : configuration::GetNodes(configuration())) {
+      const size_t other_node_index =
+          configuration::GetNodeIndex(configuration(), other_node);
+      State *other_state = states_[other_node_index].get();
+      if (other_state != state) {
+        state->AddPeer(other_state);
+      }
+    }
+  }
+
   // Register after making all the State objects so we can build references
   // between them.
   for (const Node *node : configuration::GetNodes(configuration())) {
@@ -971,6 +978,9 @@
     VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
             << MaybeNodeName(state->event_loop()->node()) << "now "
             << state->monotonic_now();
+    if (state->monotonic_start_time() == monotonic_clock::min_time) {
+      continue;
+    }
     // And start computing the start time on the distributed clock now that
     // that works.
     start_time = std::max(
@@ -1221,8 +1231,6 @@
   event_loop->SkipTimingReport();
   event_loop->SkipAosLog();
 
-  const bool has_data = state->SetNode();
-
   for (size_t logged_channel_index = 0;
        logged_channel_index < logged_configuration()->channels()->size();
        ++logged_channel_index) {
@@ -1267,7 +1275,7 @@
 
   // If we didn't find any log files with data in them, we won't ever get a
   // callback or be live.  So skip the rest of the setup.
-  if (!has_data) {
+  if (state->OldestMessageTime() == monotonic_clock::max_time) {
     return;
   }
 
@@ -1283,45 +1291,39 @@
       }
       return;
     }
-    TimestampMerger::DeliveryTimestamp channel_timestamp;
-    int channel_index;
-    SizePrefixedFlatbufferVector<MessageHeader> channel_data =
-        SizePrefixedFlatbufferVector<MessageHeader>::Empty();
-
     if (VLOG_IS_ON(1)) {
       LogFit("Offset was");
     }
 
     bool update_time;
-    std::tie(channel_timestamp, channel_index, channel_data) =
-        state->PopOldest(&update_time);
+    TimestampedMessage timestamped_message = state->PopOldest(&update_time);
 
     const monotonic_clock::time_point monotonic_now =
         state->event_loop()->context().monotonic_event_time;
     if (!FLAGS_skip_order_validation) {
-      CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
+      CHECK(monotonic_now == timestamped_message.monotonic_event_time)
           << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
           << monotonic_now << " trying to send "
-          << channel_timestamp.monotonic_event_time << " failure "
+          << timestamped_message.monotonic_event_time << " failure "
           << state->DebugString();
-    } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
+    } else if (monotonic_now != timestamped_message.monotonic_event_time) {
       LOG(WARNING) << "Check failed: monotonic_now == "
-                      "channel_timestamp.monotonic_event_time) ("
+                      "timestamped_message.monotonic_event_time) ("
                    << monotonic_now << " vs. "
-                   << channel_timestamp.monotonic_event_time
+                   << timestamped_message.monotonic_event_time
                    << "): " << FlatbufferToJson(state->event_loop()->node())
                    << " Now " << monotonic_now << " trying to send "
-                   << channel_timestamp.monotonic_event_time << " failure "
+                   << timestamped_message.monotonic_event_time << " failure "
                    << state->DebugString();
     }
 
-    if (channel_timestamp.monotonic_event_time >
+    if (timestamped_message.monotonic_event_time >
             state->monotonic_start_time() ||
         event_loop_factory_ != nullptr) {
       if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
            !state->at_end()) ||
-          channel_data.message().data() != nullptr) {
-        CHECK(channel_data.message().data() != nullptr)
+          timestamped_message.data.span().size() != 0u) {
+        CHECK_NE(timestamped_message.data.span().size(), 0u)
             << ": Got a message without data.  Forwarding entry which was "
                "not matched?  Use --skip_missing_forwarding_entries to "
                "ignore this.";
@@ -1331,28 +1333,38 @@
           // destination node (this node).  As a proxy, do this by making sure
           // that time on the source node is past when the message was sent.
           if (!FLAGS_skip_order_validation) {
-            CHECK_LT(channel_timestamp.monotonic_remote_time,
-                     state->monotonic_remote_now(channel_index))
+            CHECK_LT(
+                timestamped_message.monotonic_remote_time,
+                state->monotonic_remote_now(timestamped_message.channel_index))
                 << state->event_loop()->node()->name()->string_view() << " to "
-                << state->remote_node(channel_index)->name()->string_view()
+                << state->remote_node(timestamped_message.channel_index)
+                       ->name()
+                       ->string_view()
                 << " " << state->DebugString();
-          } else if (channel_timestamp.monotonic_remote_time >=
-                     state->monotonic_remote_now(channel_index)) {
+          } else if (timestamped_message.monotonic_remote_time >=
+                     state->monotonic_remote_now(
+                         timestamped_message.channel_index)) {
             LOG(WARNING)
-                << "Check failed: channel_timestamp.monotonic_remote_time < "
-                   "state->monotonic_remote_now(channel_index) ("
-                << channel_timestamp.monotonic_remote_time << " vs. "
-                << state->monotonic_remote_now(channel_index) << ") "
-                << state->event_loop()->node()->name()->string_view() << " to "
-                << state->remote_node(channel_index)->name()->string_view()
-                << " currently " << channel_timestamp.monotonic_event_time
+                << "Check failed: timestamped_message.monotonic_remote_time < "
+                   "state->monotonic_remote_now(timestamped_message.channel_"
+                   "index) ("
+                << timestamped_message.monotonic_remote_time << " vs. "
+                << state->monotonic_remote_now(
+                       timestamped_message.channel_index)
+                << ") " << state->event_loop()->node()->name()->string_view()
+                << " to "
+                << state->remote_node(timestamped_message.channel_index)
+                       ->name()
+                       ->string_view()
+                << " currently " << timestamped_message.monotonic_event_time
                 << " ("
                 << state->ToDistributedClock(
-                       channel_timestamp.monotonic_event_time)
+                       timestamped_message.monotonic_event_time)
                 << ") remote event time "
-                << channel_timestamp.monotonic_remote_time << " ("
+                << timestamped_message.monotonic_remote_time << " ("
                 << state->RemoteToDistributedClock(
-                       channel_index, channel_timestamp.monotonic_remote_time)
+                       timestamped_message.channel_index,
+                       timestamped_message.monotonic_remote_time)
                 << ") " << state->DebugString();
           }
 
@@ -1362,12 +1374,12 @@
               fprintf(
                   offset_fp_,
                   "# time_since_start, offset node 0, offset node 1, ...\n");
-              first_time_ = channel_timestamp.realtime_event_time;
+              first_time_ = timestamped_message.realtime_event_time;
             }
 
             fprintf(offset_fp_, "%.9f",
                     std::chrono::duration_cast<std::chrono::duration<double>>(
-                        channel_timestamp.realtime_event_time - first_time_)
+                        timestamped_message.realtime_event_time - first_time_)
                         .count());
             for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
               fprintf(offset_fp_, ", %.9f",
@@ -1383,15 +1395,14 @@
         }
 
         // If we have access to the factory, use it to fix the realtime time.
-        state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
-                                 channel_timestamp.realtime_event_time);
+        state->SetRealtimeOffset(timestamped_message.monotonic_event_time,
+                                 timestamped_message.realtime_event_time);
 
         VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
-                << channel_timestamp.monotonic_event_time;
+                << timestamped_message.monotonic_event_time;
         // TODO(austin): std::move channel_data in and make that efficient in
         // simulation.
-        state->Send(channel_index, channel_data.message().data()->Data(),
-                    channel_data.message().data()->size(), channel_timestamp);
+        state->Send(std::move(timestamped_message));
       } else if (state->at_end() && !ignore_missing_data_) {
         // We are at the end of the log file and found missing data.  Finish
         // reading the rest of the log file and call it quits.  We don't want
@@ -1401,15 +1412,15 @@
           state->PopOldest(&update_time_dummy);
         }
       } else {
-        CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
+        CHECK(timestamped_message.data.span().data() == nullptr) << ": Nullptr";
       }
     } else {
       LOG(WARNING)
           << "Not sending data from before the start of the log file. "
-          << channel_timestamp.monotonic_event_time.time_since_epoch().count()
+          << timestamped_message.monotonic_event_time.time_since_epoch().count()
           << " start " << monotonic_start_time().time_since_epoch().count()
           << " "
-          << FlatbufferToJson(channel_data,
+          << FlatbufferToJson(timestamped_message.data,
                               {.multi_line = false, .max_vector_size = 100});
     }
 
@@ -1742,8 +1753,14 @@
   return remapped_channel;
 }
 
-LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
-    : channel_merger_(std::move(channel_merger)) {}
+LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper)
+    : timestamp_mapper_(std::move(timestamp_mapper)) {}
+
+void LogReader::State::AddPeer(State *peer) {
+  if (timestamp_mapper_ && peer->timestamp_mapper_) {
+    timestamp_mapper_->AddPeer(peer->timestamp_mapper_.get());
+  }
+}
 
 EventLoop *LogReader::State::SetNodeEventLoopFactory(
     NodeEventLoopFactory *node_event_loop_factory) {
@@ -1783,22 +1800,20 @@
   factory_channel_index_[logged_channel_index] = factory_channel_index;
 }
 
-bool LogReader::State::Send(
-    size_t channel_index, const void *data, size_t size,
-    const TimestampMerger::DeliveryTimestamp &delivery_timestamp) {
-  aos::RawSender *sender = channels_[channel_index].get();
+bool LogReader::State::Send(const TimestampedMessage &timestamped_message) {
+  aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
   uint32_t remote_queue_index = 0xffffffff;
 
-  if (remote_timestamp_senders_[channel_index] != nullptr) {
-    std::vector<SentTimestamp> *queue_index_map =
-        CHECK_NOTNULL(CHECK_NOTNULL(channel_source_state_[channel_index])
-                          ->queue_index_map_[channel_index]
-                          .get());
+  if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
+    std::vector<SentTimestamp> *queue_index_map = CHECK_NOTNULL(
+        CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
+            ->queue_index_map_[timestamped_message.channel_index]
+            .get());
 
     SentTimestamp search;
-    search.monotonic_event_time = delivery_timestamp.monotonic_remote_time;
-    search.realtime_event_time = delivery_timestamp.realtime_remote_time;
-    search.queue_index = delivery_timestamp.remote_queue_index;
+    search.monotonic_event_time = timestamped_message.monotonic_remote_time;
+    search.realtime_event_time = timestamped_message.realtime_remote_time;
+    search.queue_index = timestamped_message.remote_queue_index;
 
     // Find the sent time if available.
     auto element = std::lower_bound(
@@ -1828,10 +1843,10 @@
     // receive time.
     if (element != queue_index_map->end()) {
       CHECK_EQ(element->monotonic_event_time,
-               delivery_timestamp.monotonic_remote_time);
+               timestamped_message.monotonic_remote_time);
       CHECK_EQ(element->realtime_event_time,
-               delivery_timestamp.realtime_remote_time);
-      CHECK_EQ(element->queue_index, delivery_timestamp.remote_queue_index);
+               timestamped_message.realtime_remote_time);
+      CHECK_EQ(element->queue_index, timestamped_message.remote_queue_index);
 
       remote_queue_index = element->actual_queue_index;
     }
@@ -1839,27 +1854,32 @@
 
   // Send!  Use the replayed queue index here instead of the logged queue index
   // for the remote queue index.  This makes re-logging work.
-  const bool sent =
-      sender->Send(data, size, delivery_timestamp.monotonic_remote_time,
-                   delivery_timestamp.realtime_remote_time, remote_queue_index);
+  const bool sent = sender->Send(
+      timestamped_message.data.message().data()->Data(),
+      timestamped_message.data.message().data()->size(),
+      timestamped_message.monotonic_remote_time,
+      timestamped_message.realtime_remote_time, remote_queue_index);
   if (!sent) return false;
 
-  if (queue_index_map_[channel_index]) {
+  if (queue_index_map_[timestamped_message.channel_index]) {
     SentTimestamp timestamp;
-    timestamp.monotonic_event_time = delivery_timestamp.monotonic_event_time;
-    timestamp.realtime_event_time = delivery_timestamp.realtime_event_time;
-    timestamp.queue_index = delivery_timestamp.queue_index;
+    timestamp.monotonic_event_time = timestamped_message.monotonic_event_time;
+    timestamp.realtime_event_time = timestamped_message.realtime_event_time;
+    timestamp.queue_index = timestamped_message.queue_index;
     timestamp.actual_queue_index = sender->sent_queue_index();
-    queue_index_map_[channel_index]->emplace_back(timestamp);
-  } else if (remote_timestamp_senders_[channel_index] != nullptr) {
+    queue_index_map_[timestamped_message.channel_index]->emplace_back(
+        timestamp);
+  } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
+             nullptr) {
     aos::Sender<MessageHeader>::Builder builder =
-        remote_timestamp_senders_[channel_index]->MakeBuilder();
+        remote_timestamp_senders_[timestamped_message.channel_index]
+            ->MakeBuilder();
 
     logger::MessageHeader::Builder message_header_builder =
         builder.MakeBuilder<logger::MessageHeader>();
 
     message_header_builder.add_channel_index(
-        factory_channel_index_[channel_index]);
+        factory_channel_index_[timestamped_message.channel_index]);
 
     // Swap the remote and sent metrics.  They are from the sender's
     // perspective, not the receiver's perspective.
@@ -1870,9 +1890,9 @@
     message_header_builder.add_queue_index(sender->sent_queue_index());
 
     message_header_builder.add_monotonic_remote_time(
-        delivery_timestamp.monotonic_remote_time.time_since_epoch().count());
+        timestamped_message.monotonic_remote_time.time_since_epoch().count());
     message_header_builder.add_realtime_remote_time(
-        delivery_timestamp.realtime_remote_time.time_since_epoch().count());
+        timestamped_message.realtime_remote_time.time_since_epoch().count());
 
     message_header_builder.add_remote_queue_index(remote_queue_index);
 
@@ -1899,28 +1919,23 @@
   return &(sender->second);
 }
 
-std::tuple<TimestampMerger::DeliveryTimestamp, int,
-           SizePrefixedFlatbufferVector<MessageHeader>>
-LogReader::State::PopOldest(bool *update_time) {
+TimestampedMessage LogReader::State::PopOldest(bool *update_time) {
   CHECK_GT(sorted_messages_.size(), 0u);
 
-  std::tuple<TimestampMerger::DeliveryTimestamp, int,
-             SizePrefixedFlatbufferVector<MessageHeader>,
-             message_bridge::NoncausalOffsetEstimator *>
+  std::tuple<TimestampedMessage, message_bridge::NoncausalOffsetEstimator *>
       result = std::move(sorted_messages_.front());
   VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
           << std::get<0>(result).monotonic_event_time;
   sorted_messages_.pop_front();
   SeedSortedMessages();
 
-  if (std::get<3>(result) != nullptr) {
-    *update_time = std::get<3>(result)->Pop(
+  if (std::get<1>(result) != nullptr) {
+    *update_time = std::get<1>(result)->Pop(
         event_loop_->node(), std::get<0>(result).monotonic_event_time);
   } else {
     *update_time = false;
   }
-  return std::make_tuple(std::get<0>(result), std::get<1>(result),
-                         std::move(std::get<2>(result)));
+  return std::move(std::get<0>(result));
 }
 
 monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
@@ -1930,18 +1945,25 @@
     return std::get<0>(sorted_messages_.front()).monotonic_event_time;
   }
 
-  return channel_merger_->OldestMessageTime();
+  TimestampedMessage *m =
+      timestamp_mapper_ ? timestamp_mapper_->Front() : nullptr;
+  if (m == nullptr) {
+    return monotonic_clock::max_time;
+  }
+  return m->monotonic_event_time;
 }
 
 void LogReader::State::SeedSortedMessages() {
+  if (!timestamp_mapper_) return;
   const aos::monotonic_clock::time_point end_queue_time =
       (sorted_messages_.size() > 0
            ? std::get<0>(sorted_messages_.front()).monotonic_event_time
-           : channel_merger_->monotonic_start_time()) +
+           : timestamp_mapper_->monotonic_start_time()) +
       std::chrono::seconds(2);
 
   while (true) {
-    if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
+    TimestampedMessage *m = timestamp_mapper_->Front();
+    if (m == nullptr) {
       return;
     }
     if (sorted_messages_.size() > 0) {
@@ -1953,31 +1975,25 @@
       }
     }
 
-    TimestampMerger::DeliveryTimestamp channel_timestamp;
-    int channel_index;
-    SizePrefixedFlatbufferVector<MessageHeader> channel_data =
-        SizePrefixedFlatbufferVector<MessageHeader>::Empty();
-
     message_bridge::NoncausalOffsetEstimator *filter = nullptr;
 
-    std::tie(channel_timestamp, channel_index, channel_data) =
-        channel_merger_->PopOldest();
+    TimestampedMessage timestamped_message = std::move(*m);
+    timestamp_mapper_->PopFront();
 
     // Skip any messages without forwarding information.
-    if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
+    if (timestamped_message.monotonic_remote_time != monotonic_clock::min_time) {
       // Got a forwarding timestamp!
-      filter = filters_[channel_index];
+      filter = filters_[timestamped_message.channel_index];
 
       CHECK(filter != nullptr);
 
       // Call the correct method depending on if we are the forward or
       // reverse direction here.
       filter->Sample(event_loop_->node(),
-                     channel_timestamp.monotonic_event_time,
-                     channel_timestamp.monotonic_remote_time);
+                     timestamped_message.monotonic_event_time,
+                     timestamped_message.monotonic_remote_time);
     }
-    sorted_messages_.emplace_back(channel_timestamp, channel_index,
-                                  std::move(channel_data), filter);
+    sorted_messages_.emplace_back(std::move(timestamped_message), filter);
   }
 }
 
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index f6a037b..f0f0a69 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -319,24 +319,10 @@
   // pass it in here. It must provide all the channels that the original logged
   // config did.
   //
-  // Log filenames are in the following format:
-  //
-  //   {
-  //     {log1_part0, log1_part1, ...},
-  //     {log2}
-  //   }
-  // The inner vector is a list of log file chunks which form up a log file.
-  // The outer vector is a list of log files with subsets of the messages, or
-  // messages from different nodes.
-  //
-  // If the outer vector isn't provided, it is assumed to be of size 1.
+  // The single file constructor calls SortParts internally.
   LogReader(std::string_view filename,
             const Configuration *replay_configuration = nullptr);
-  LogReader(const std::vector<std::string> &filenames,
-            const Configuration *replay_configuration = nullptr);
-  LogReader(const std::vector<std::vector<std::string>> &filenames,
-            const Configuration *replay_configuration = nullptr);
-  LogReader(const std::vector<LogFile> &log_files,
+  LogReader(std::vector<LogFile> log_files,
             const Configuration *replay_configuration = nullptr);
   ~LogReader();
 
@@ -450,7 +436,7 @@
                : logged_configuration()->nodes()->size();
   }
 
-  const std::vector<std::vector<std::string>> filenames_;
+  const std::vector<LogFile> log_files_;
 
   // This is *a* log file header used to provide the logged config.  The rest of
   // the header is likely distracting.
@@ -466,14 +452,15 @@
   // State per node.
   class State {
    public:
-    State(std::unique_ptr<ChannelMerger> channel_merger);
+    State(std::unique_ptr<TimestampMapper> timestamp_mapper);
+
+    // Connects up the timestamp mappers.
+    void AddPeer(State *peer);
 
     // Returns the timestamps, channel_index, and message from a channel.
     // update_time (will be) set to true when popping this message causes the
     // filter to change the time offset estimation function.
-    std::tuple<TimestampMerger::DeliveryTimestamp, int,
-               SizePrefixedFlatbufferVector<MessageHeader>>
-    PopOldest(bool *update_time);
+    TimestampedMessage PopOldest(bool *update_time);
 
     // Returns the monotonic time of the oldest message.
     monotonic_clock::time_point OldestMessageTime() const;
@@ -484,10 +471,12 @@
 
     // Returns the starting time for this node.
     monotonic_clock::time_point monotonic_start_time() const {
-      return channel_merger_->monotonic_start_time();
+      return timestamp_mapper_ ? timestamp_mapper_->monotonic_start_time()
+                               : monotonic_clock::min_time;
     }
     realtime_clock::time_point realtime_start_time() const {
-      return channel_merger_->realtime_start_time();
+      return timestamp_mapper_ ? timestamp_mapper_->realtime_start_time()
+                               : realtime_clock::min_time;
     }
 
     // Sets the node event loop factory for replaying into a
@@ -555,10 +544,6 @@
       return node_event_loop_factory_->monotonic_now();
     }
 
-    // Sets the node we will be merging as, and returns true if there is any
-    // data on it.
-    bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
-
     // Sets the number of channels.
     void SetChannelCount(size_t count);
 
@@ -570,7 +555,9 @@
                     State *source_state);
 
     // Returns if we have read all the messages from all the logs.
-    bool at_end() const { return channel_merger_->at_end(); }
+    bool at_end() const {
+      return timestamp_mapper_ ? timestamp_mapper_->Front() == nullptr : true;
+    }
 
     // Unregisters everything so we can destory the event loop.
     void Deregister();
@@ -586,8 +573,7 @@
     }
 
     // Sends a buffer on the provided channel index.
-    bool Send(size_t channel_index, const void *data, size_t size,
-              const TimestampMerger::DeliveryTimestamp &delivery_timestamp);
+    bool Send(const TimestampedMessage &timestamped_message);
 
     // Returns a debug string for the channel merger.
     std::string DebugString() const {
@@ -599,22 +585,24 @@
                    << "]: " << std::get<0>(message).monotonic_event_time << " "
                    << configuration::StrippedChannelToString(
                           event_loop_->configuration()->channels()->Get(
-                              std::get<2>(message).message().channel_index()))
+                              std::get<0>(message).channel_index))
                    << "\n";
         } else if (i == 7) {
           messages << "...\n";
         }
         ++i;
       }
-      return messages.str() + channel_merger_->DebugString();
+      if (!timestamp_mapper_) {
+        return messages.str();
+      }
+      return messages.str() + timestamp_mapper_->DebugString();
     }
 
    private:
     // Log file.
-    std::unique_ptr<ChannelMerger> channel_merger_;
+    std::unique_ptr<TimestampMapper> timestamp_mapper_;
 
-    std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
-                          SizePrefixedFlatbufferVector<MessageHeader>,
+    std::deque<std::tuple<TimestampedMessage,
                           message_bridge::NoncausalOffsetEstimator *>>
         sorted_messages_;
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 2dc02ef..a6385d3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -279,8 +279,7 @@
 
   // Even though it doesn't make any difference here, exercise the logic for
   // passing in a separate config.
-  LogReader reader(std::vector<std::string>{logfile0, logfile1},
-                   &config_.message());
+  LogReader reader(SortParts({logfile0, logfile1}), &config_.message());
 
   // Confirm that we can remap logged channels to point to new buses.
   reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
@@ -763,7 +762,7 @@
                     "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
   }
 
-  LogReader reader(structured_logfiles_);
+  LogReader reader(SortParts(logfiles_));
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -939,7 +938,8 @@
         }
       )");
 
-  EXPECT_DEATH(LogReader(structured_logfiles_, &extra_nodes_config.message()),
+  const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+  EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
                "Log file and replay config need to have matching nodes lists.");
 }
 
@@ -961,7 +961,7 @@
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
-  LogReader reader(structured_logfiles_);
+  LogReader reader(SortParts(logfiles_));
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1101,7 +1101,7 @@
     event_loop_factory_.RunFor(chrono::milliseconds(400));
   }
 
-  LogReader reader(structured_logfiles_);
+  LogReader reader(SortParts(logfiles_));
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1319,7 +1319,7 @@
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
-  LogReader reader(structured_logfiles_);
+  LogReader reader(SortParts(logfiles_));
 
   // Remap just on pi1.
   reader.RemapLoggedChannel<aos::timing::Report>(
@@ -1385,7 +1385,7 @@
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
-  LogReader reader(structured_logfiles_);
+  LogReader reader(SortParts(logfiles_));
 
   SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
diff --git a/aos/flatbuffer_merge.h b/aos/flatbuffer_merge.h
index d98a9eb..0c09b22 100644
--- a/aos/flatbuffer_merge.h
+++ b/aos/flatbuffer_merge.h
@@ -50,6 +50,15 @@
       MergeFlatBuffers<T>(&fb1.message(), &fb2.message()));
 }
 
+template <class T>
+inline flatbuffers::Offset<T> MergeFlatBuffers(
+    const aos::Flatbuffer<T> &fb1, const aos::Flatbuffer<T> &fb2,
+    flatbuffers::FlatBufferBuilder *fbb) {
+  return MergeFlatBuffers<T>(
+      reinterpret_cast<const flatbuffers::Table *>(&fb1.message()),
+      reinterpret_cast<const flatbuffers::Table *>(&fb2.message()), fbb);
+}
+
 // Copies a flatbuffer by walking the tree and copying all the pieces.  This
 // converts DAGs to trees.
 template <class T>
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 81a8cee..b751a4e 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -117,6 +117,9 @@
   void Wipe() { memset(span().data(), 0, span().size()); }
 
   bool Verify() const {
+    if (span().size() < 4u) {
+      return false;
+    }
     flatbuffers::Verifier v(span().data(), span().size());
     return v.VerifyTable(&message());
   }
@@ -477,6 +480,37 @@
   ResizeableBuffer data_;
 };
 
+// Non-owning Span backed flatbuffer.
+template <typename T>
+class SizePrefixedFlatbufferSpan : public SizePrefixedFlatbuffer<T> {
+ public:
+  // Builds a flatbuffer pointing to the contents of a span.
+  SizePrefixedFlatbufferSpan(const absl::Span<const uint8_t> data)
+      : data_(data) {}
+  // Builds a Flatbuffer pointing to the contents of another flatbuffer.
+  SizePrefixedFlatbufferSpan(const SizePrefixedFlatbuffer<T> &other) {
+    data_ = other.span();
+  }
+
+  // Points to the data in the other flatbuffer.
+  SizePrefixedFlatbufferSpan &operator=(
+      const SizePrefixedFlatbuffer<T> &other) {
+    data_ = other.span();
+    return *this;
+  }
+
+  ~SizePrefixedFlatbufferSpan() override {}
+
+  absl::Span<uint8_t> span() override {
+    LOG(FATAL) << "Unimplemented";
+    return absl::Span<uint8_t>(nullptr, 0);
+  }
+  absl::Span<const uint8_t> span() const override { return data_; }
+
+ private:
+  absl::Span<const uint8_t> data_;
+};
+
 inline flatbuffers::DetachedBuffer CopySpanAsDetachedBuffer(
     absl::Span<const uint8_t> span) {
   // Copy the data from the span.
diff --git a/aos/flatbuffers_test.cc b/aos/flatbuffers_test.cc
new file mode 100644
index 0000000..e3030f1
--- /dev/null
+++ b/aos/flatbuffers_test.cc
@@ -0,0 +1,25 @@
+#include "aos/flatbuffers.h"
+
+#include "gtest/gtest.h"
+
+#include "aos/json_to_flatbuffer.h"
+#include "aos/json_to_flatbuffer_generated.h"
+
+namespace aos {
+namespace testing {
+
+// Tests that Verify works.
+TEST(FlatbufferTest, Verify) {
+  FlatbufferDetachedBuffer<Configuration> fb =
+      JsonToFlatbuffer<Configuration>("{}");
+  FlatbufferSpan<Configuration> fb_span(fb);
+  EXPECT_TRUE(fb.Verify());
+  EXPECT_TRUE(fb_span.Verify());
+
+  // Now confirm it works on an empty flatbuffer.
+  FlatbufferSpan<Configuration> empty(absl::Span<const uint8_t>(nullptr, 0));
+  EXPECT_FALSE(empty.Verify());
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/json_tokenizer.cc b/aos/json_tokenizer.cc
index a3d804e..9403daa 100644
--- a/aos/json_tokenizer.cc
+++ b/aos/json_tokenizer.cc
@@ -270,7 +270,8 @@
       ConsumeWhitespace();
 
       if (!Consume(":")) {
-        fprintf(stderr, "Error on line %d\n", linenumber_);
+        fprintf(stderr, "Error on line %d, expected ':', got '%c'\n",
+                linenumber_, Char());
         return TokenType::kError;
       }
 
diff --git a/aos/starter/starterd_lib.cc b/aos/starter/starterd_lib.cc
index ea7fdee..9d74089 100644
--- a/aos/starter/starterd_lib.cc
+++ b/aos/starter/starterd_lib.cc
@@ -406,8 +406,22 @@
   if (config_msg_->has_applications()) {
     const flatbuffers::Vector<flatbuffers::Offset<aos::Application>>
         *applications = config_msg_->applications();
-    for (const aos::Application *application : *applications) {
-      AddApplication(application);
+
+    if (aos::configuration::MultiNode(config_msg_)) {
+      std::string_view current_node = event_loop_.node()->name()->string_view();
+      for (const aos::Application *application : *applications) {
+        CHECK(application->has_nodes());
+        for (const flatbuffers::String *node : *application->nodes()) {
+          if (node->string_view() == current_node) {
+            AddApplication(application);
+            break;
+          }
+        }
+      }
+    } else {
+      for (const aos::Application *application : *applications) {
+        AddApplication(application);
+      }
     }
   }
 }
diff --git a/tools/bazel b/tools/bazel
index 36e88ae..21bb619 100755
--- a/tools/bazel
+++ b/tools/bazel
@@ -24,7 +24,7 @@
   exec "${BAZEL_OVERRIDE}" "$@"
 fi
 
-readonly VERSION="4.0.0rc2-202011211956+37a429ad12"
+readonly VERSION="4.0.0rc2-202012022031+a3c94ec2ed"
 
 readonly DOWNLOAD_DIR="${HOME}/.cache/bazel"
 # Directory to unpack bazel into.  This must change whenever bazel changes.
diff --git a/y2020/BUILD b/y2020/BUILD
index 4643ae8..59642fe 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -210,6 +210,7 @@
     src = "y2020_roborio.json",
     flatbuffers = [
         ":setpoint_fbs",
+        "//aos/events/logging:logger_fbs",
         "//aos/network:message_bridge_client_fbs",
         "//aos/network:message_bridge_server_fbs",
         "//aos/network:timestamp_fbs",
diff --git a/y2020/y2020_pi_template.json b/y2020/y2020_pi_template.json
index 4b04c6c..b504599 100644
--- a/y2020/y2020_pi_template.json
+++ b/y2020/y2020_pi_template.json
@@ -36,12 +36,16 @@
       "source_node": "pi{{ NUM }}",
       "frequency": 10,
       "num_senders": 2,
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["roborio"],
       "max_size": 200,
       "destination_nodes": [
         {
           "name": "roborio",
           "priority": 1,
-          "time_to_live": 5000000
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["roborio"]
         }
       ]
     },
@@ -84,6 +88,28 @@
       "max_size": 2000000
     }
   ],
+  "applications": [
+    {
+      "name": "message_bridge_client",
+      "executable_name": "message_bridge_client.stripped",
+      "nodes": ["pi{{ NUM }}"]
+    },
+    {
+      "name": "message_bridge_server",
+      "executable_name": "message_bridge_server.stripped",
+      "nodes": ["pi{{ NUM }}"]
+    },
+    {
+      "name": "web_proxy",
+      "executable_name": "web_proxy_main.stripped",
+      "nodes": ["pi{{ NUM }}"]
+    },
+    {
+      "name": "camera_reader",
+      "executable_name": "camera_reader.stripped",
+      "nodes": ["pi{{ NUM }}"]
+    }
+  ],
   "maps": [
     {
       "match": {
diff --git a/y2020/y2020_roborio.json b/y2020/y2020_roborio.json
index 0ab9664..703d560 100644
--- a/y2020/y2020_roborio.json
+++ b/y2020/y2020_roborio.json
@@ -11,33 +11,7 @@
       "name": "/roborio/aos",
       "type": "aos.RobotState",
       "source_node": "roborio",
-      "frequency": 200,
-      "destination_nodes": [
-        {
-          "name": "pi1",
-          "priority": 2,
-          "timestamp_logger": "LOCAL_LOGGER",
-          "time_to_live": 10000000
-        },
-        {
-          "name": "pi2",
-          "priority": 2,
-          "timestamp_logger": "LOCAL_LOGGER",
-          "time_to_live": 10000000
-        },
-        {
-          "name": "pi3",
-          "priority": 2,
-          "timestamp_logger": "LOCAL_LOGGER",
-          "time_to_live": 10000000
-        },
-        {
-          "name": "pi4",
-          "priority": 2,
-          "timestamp_logger": "LOCAL_LOGGER",
-          "time_to_live": 10000000
-        }
-      ]
+      "frequency": 200
     },
     {
       "name": "/roborio/aos",
@@ -69,6 +43,30 @@
       "num_senders": 2
     },
     {
+      "name": "/roborio/aos/remote_timestamps/pi1",
+      "type": "aos.logger.MessageHeader",
+      "logger": "NOT_LOGGED",
+      "source_node": "roborio"
+    },
+    {
+      "name": "/roborio/aos/remote_timestamps/pi2",
+      "type": "aos.logger.MessageHeader",
+      "logger": "NOT_LOGGED",
+      "source_node": "roborio"
+    },
+    {
+      "name": "/roborio/aos/remote_timestamps/pi3",
+      "type": "aos.logger.MessageHeader",
+      "logger": "NOT_LOGGED",
+      "source_node": "roborio"
+    },
+    {
+      "name": "/roborio/aos/remote_timestamps/pi4",
+      "type": "aos.logger.MessageHeader",
+      "logger": "NOT_LOGGED",
+      "source_node": "roborio"
+    },
+    {
       "name": "/roborio/aos",
       "type": "aos.message_bridge.Timestamp",
       "source_node": "roborio",
@@ -79,21 +77,29 @@
         {
           "name": "pi1",
           "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["roborio"],
           "time_to_live": 5000000
         },
         {
           "name": "pi2",
           "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["roborio"],
           "time_to_live": 5000000
         },
         {
           "name": "pi3",
           "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["roborio"],
           "time_to_live": 5000000
         },
         {
           "name": "pi4",
           "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["roborio"],
           "time_to_live": 5000000
         }
       ]
@@ -177,16 +183,29 @@
         {
           "name": "pi1",
           "priority": 5,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["roborio"],
           "time_to_live": 5000000
         },
         {
           "name": "pi2",
           "priority": 5,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["roborio"],
           "time_to_live": 5000000
         },
         {
           "name": "pi3",
           "priority": 5,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["roborio"],
+          "time_to_live": 5000000
+        },
+        {
+          "name": "pi4",
+          "priority": 5,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["roborio"],
           "time_to_live": 5000000
         }
       ]
@@ -240,10 +259,44 @@
   ],
   "applications": [
     {
-      "name": "drivetrain"
+      "name": "drivetrain",
+      "executable_name": "drivetrain.stripped",
+      "nodes": ["roborio"]
     },
     {
-      "name": "camera_reader"
+      "name": "superstructure",
+      "executable_name": "superstructure.stripped",
+      "nodes": ["roborio"]
+    },
+    {
+      "name": "joystick_reader",
+      "executable_name": "joystick_reader.stripped",
+      "nodes": ["roborio"]
+    },
+    {
+      "name": "wpilib_interface",
+      "executable_name": "wpilib_interface.stripped",
+      "nodes": ["roborio"]
+    },
+    {
+      "name": "autonomous_action",
+      "executable_name": "autonomous_action.stripped",
+      "nodes": ["roborio"]
+    },
+    {
+      "name": "message_bridge_client",
+      "executable_name": "message_bridge_client.stripped",
+      "nodes": ["roborio"]
+    },
+    {
+      "name": "message_bridge_server",
+      "executable_name": "message_bridge_server.stripped",
+      "nodes": ["roborio"]
+    },
+    {
+      "name": "logger",
+      "executable_name": "logger_main.stripped",
+      "nodes": ["roborio"]
     }
   ],
   "maps": [