Add multi-node log file reading

This handles timestamps, sorting, and merging with data.

For simplicity, we read the log files once per node.  Once benchmarks
show if this is a bad idea, we can fix it.

Change-Id: I445ac5bfc7186bda25cc899602ac8d95a4cb946d
diff --git a/aos/BUILD b/aos/BUILD
index 80ba32a..2ae0fec 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -462,6 +462,7 @@
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_github_google_glog//:glog",
         "@com_google_absl//absl/strings",
+        "@com_google_absl//absl/types:span",
     ],
 )
 
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index f2427b8..7fb2378 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -7,16 +7,20 @@
 #include "aos/json_to_flatbuffer.h"
 #include "gflags/gflags.h"
 
-DEFINE_string(logfile, "/tmp/logfile.bfbs",
-              "Name of the logfile to read from.");
 DEFINE_string(
     name, "",
     "Name to match for printing out channels. Empty means no name filter.");
 DEFINE_string(type, "",
               "Channel type to match for printing out channels. Empty means no "
               "type filter.");
+DEFINE_bool(raw, false,
+            "If true, just print the data out unsorted and unparsed");
+
 int main(int argc, char **argv) {
   gflags::SetUsageMessage(
+      "Usage:\n"
+      "  log_cat [args] logfile1 logfile2 ...\n"
+      "\n"
       "This program provides a basic interface to dump data from a logfile to "
       "stdout. Given a logfile, channel name filter, and type filter, it will "
       "print all the messages in the logfile matching the filters. The message "
@@ -27,64 +31,103 @@
       "the logged data.");
   aos::InitGoogle(&argc, &argv);
 
-  aos::logger::LogReader reader(FLAGS_logfile);
-  reader.Register();
-
-  std::unique_ptr<aos::EventLoop> printer_event_loop =
-      reader.event_loop_factory()->MakeEventLoop("printer", reader.node());
-  printer_event_loop->SkipTimingReport();
-  printer_event_loop->SkipAosLog();
-
-  bool found_channel = false;
-  const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
-      reader.configuration()->channels();
-  for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
-    const aos::Channel *channel = channels->Get(i);
-    const flatbuffers::string_view name = channel->name()->string_view();
-    const flatbuffers::string_view type = channel->type()->string_view();
-    if (name.find(FLAGS_name) != std::string::npos &&
-        type.find(FLAGS_type) != std::string::npos) {
-      if (!aos::configuration::ChannelIsReadableOnNode(
-              channel, printer_event_loop->node())) {
-        continue;
-      }
-      LOG(INFO) << "Listening on " << name << " " << type;
-
-      CHECK_NOTNULL(channel->schema());
-      printer_event_loop->MakeRawWatcher(
-          channel, [channel](const aos::Context &context, const void *message) {
-            // Print the flatbuffer out to stdout, both to remove the
-            // unnecessary cruft from glog and to allow the user to readily
-            // redirect just the logged output independent of any debugging
-            // information on stderr.
-            if (context.monotonic_remote_time != context.monotonic_event_time) {
-              std::cout << context.realtime_remote_time << " ("
-                        << context.monotonic_remote_time << ") delivered "
-                        << context.realtime_event_time << " ("
-                        << context.monotonic_event_time << ") "
-                        << channel->name()->c_str() << ' '
-                        << channel->type()->c_str() << ": "
-                        << aos::FlatbufferToJson(
-                               channel->schema(),
-                               static_cast<const uint8_t *>(message))
-                        << std::endl;
-            } else {
-              std::cout << context.realtime_event_time << " ("
-                        << context.monotonic_event_time << ") "
-                        << channel->name()->c_str() << ' '
-                        << channel->type()->c_str() << ": "
-                        << aos::FlatbufferToJson(
-                               channel->schema(),
-                               static_cast<const uint8_t *>(message))
-                        << std::endl;
-            }
-          });
-      found_channel = true;
+  if (FLAGS_raw) {
+    if (argc != 2) {
+      LOG(FATAL) << "Expected 1 logfile as an argument.";
     }
+    aos::logger::MessageReader reader(argv[1]);
+
+    while (true) {
+      std::optional<aos::FlatbufferVector<aos::logger::MessageHeader>> message =
+          reader.ReadMessage();
+      if (!message) {
+        break;
+      }
+
+      std::cout << aos::FlatbufferToJson(message.value()) << std::endl;
+    }
+    return 0;
   }
 
-  if (!found_channel) {
-    LOG(FATAL) << "Could not find any channels";
+  if (argc < 2) {
+    LOG(FATAL) << "Expected at least 1 logfile as an argument.";
+  }
+
+  std::vector<std::vector<std::string>> logfiles;
+
+  for (int i = 1; i < argc; ++i) {
+    logfiles.emplace_back(std::vector<std::string>{std::string(argv[i])});
+  }
+
+  aos::logger::LogReader reader(logfiles);
+  reader.Register();
+
+  std::vector<std::unique_ptr<aos::EventLoop>> printer_event_loops;
+
+  for (const aos::Node *node : reader.Nodes()) {
+    std::unique_ptr<aos::EventLoop> printer_event_loop =
+        reader.event_loop_factory()->MakeEventLoop("printer", node);
+    printer_event_loop->SkipTimingReport();
+    printer_event_loop->SkipAosLog();
+
+    bool found_channel = false;
+    const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
+        reader.configuration()->channels();
+    for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
+      const aos::Channel *channel = channels->Get(i);
+      const flatbuffers::string_view name = channel->name()->string_view();
+      const flatbuffers::string_view type = channel->type()->string_view();
+      if (name.find(FLAGS_name) != std::string::npos &&
+          type.find(FLAGS_type) != std::string::npos) {
+        if (!aos::configuration::ChannelIsReadableOnNode(
+                channel, printer_event_loop->node())) {
+          continue;
+        }
+        VLOG(1) << "Listening on " << name << " " << type;
+
+        std::string node_name =
+            node == nullptr ? ""
+                            : std::string(node->name()->string_view()) + " ";
+
+        CHECK_NOTNULL(channel->schema());
+        printer_event_loop->MakeRawWatcher(
+            channel, [channel, node_name](const aos::Context &context,
+                                          const void *message) {
+              // Print the flatbuffer out to stdout, both to remove the
+              // unnecessary cruft from glog and to allow the user to readily
+              // redirect just the logged output independent of any debugging
+              // information on stderr.
+              if (context.monotonic_remote_time !=
+                  context.monotonic_event_time) {
+                std::cout << node_name << context.realtime_event_time << " ("
+                          << context.monotonic_event_time << ") sent "
+                          << context.realtime_remote_time << " ("
+                          << context.monotonic_remote_time << ") "
+                          << channel->name()->c_str() << ' '
+                          << channel->type()->c_str() << ": "
+                          << aos::FlatbufferToJson(
+                                 channel->schema(),
+                                 static_cast<const uint8_t *>(message))
+                          << std::endl;
+              } else {
+                std::cout << node_name << context.realtime_event_time << " ("
+                          << context.monotonic_event_time << ") "
+                          << channel->name()->c_str() << ' '
+                          << channel->type()->c_str() << ": "
+                          << aos::FlatbufferToJson(
+                                 channel->schema(),
+                                 static_cast<const uint8_t *>(message))
+                          << std::endl;
+              }
+            });
+        found_channel = true;
+      }
+    }
+
+    if (!found_channel) {
+      LOG(FATAL) << "Could not find any channels";
+    }
+    printer_event_loops.emplace_back(std::move(printer_event_loop));
   }
 
   reader.event_loop_factory()->Run();
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 65da871..88bf15c 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -14,6 +14,8 @@
     name, "",
     "Name to match for printing out channels. Empty means no name filter.");
 
+DEFINE_string(node, "", "Node to print stats out for.");
+
 // define struct to hold all information
 struct ChannelStats {
   // pointer to the channel for which stats are collected
@@ -63,9 +65,23 @@
   aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration());
   reader.Register(&log_reader_factory);
 
+  const aos::Node *node = nullptr;
+
+  if (aos::configuration::MultiNode(reader.configuration())) {
+    if (FLAGS_node.empty()) {
+      LOG(INFO) << "Need a --node specified.  The log file has:";
+      for (const aos::Node *node : reader.Nodes()) {
+        LOG(INFO) << "  " << node->name()->string_view();
+      }
+      return 1;
+    } else {
+      node = aos::configuration::GetNode(reader.configuration(), FLAGS_node);
+    }
+  }
+
   // Make an eventloop for retrieving stats
   std::unique_ptr<aos::EventLoop> stats_event_loop =
-      log_reader_factory.MakeEventLoop("logstats", reader.node());
+      log_reader_factory.MakeEventLoop("logstats", node);
   stats_event_loop->SkipTimingReport();
   stats_event_loop->SkipAosLog();
 
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index a5ca084..144890a 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -11,6 +11,7 @@
 #include "aos/configuration.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/flatbuffer_merge.h"
+#include "aos/util/file.h"
 #include "flatbuffers/flatbuffers.h"
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -24,9 +25,12 @@
 namespace chrono = std::chrono;
 
 DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
-    : fd_(open(std::string(filename).c_str(),
-               O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
-  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+    : filename_(filename) {
+  util::MkdirP(filename, 0777);
+  fd_ = open(std::string(filename).c_str(),
+             O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774);
+  VLOG(1) << "Opened " << filename << " for writing";
+  PCHECK(fd_ != -1) << ": Failed to open " << filename << " for writing";
 }
 
 DetachedBufferWriter::~DetachedBufferWriter() {
@@ -105,6 +109,7 @@
   switch (log_type) {
     case LogType::kLogMessage:
     case LogType::kLogMessageAndDeliveryTime:
+    case LogType::kLogRemoteMessage:
       data_offset =
           fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
       break;
@@ -115,14 +120,30 @@
 
   MessageHeader::Builder message_header_builder(*fbb);
   message_header_builder.add_channel_index(channel_index);
-  message_header_builder.add_queue_index(context.queue_index);
-  message_header_builder.add_monotonic_sent_time(
-      context.monotonic_event_time.time_since_epoch().count());
-  message_header_builder.add_realtime_sent_time(
-      context.realtime_event_time.time_since_epoch().count());
+
+  switch (log_type) {
+    case LogType::kLogRemoteMessage:
+      message_header_builder.add_queue_index(context.remote_queue_index);
+      message_header_builder.add_monotonic_sent_time(
+          context.monotonic_remote_time.time_since_epoch().count());
+      message_header_builder.add_realtime_sent_time(
+          context.realtime_remote_time.time_since_epoch().count());
+      break;
+
+    case LogType::kLogMessage:
+    case LogType::kLogMessageAndDeliveryTime:
+    case LogType::kLogDeliveryTimeOnly:
+      message_header_builder.add_queue_index(context.queue_index);
+      message_header_builder.add_monotonic_sent_time(
+          context.monotonic_event_time.time_since_epoch().count());
+      message_header_builder.add_realtime_sent_time(
+          context.realtime_event_time.time_since_epoch().count());
+      break;
+  }
 
   switch (log_type) {
     case LogType::kLogMessage:
+    case LogType::kLogRemoteMessage:
       message_header_builder.add_data(data_offset);
       break;
 
@@ -143,7 +164,8 @@
 }
 
 SpanReader::SpanReader(std::string_view filename)
-    : fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
+    : filename_(filename),
+      fd_(open(std::string(filename).c_str(), O_RDONLY | O_CLOEXEC)) {
   PCHECK(fd_ != -1) << ": Failed to open " << filename;
 }
 
@@ -224,6 +246,20 @@
   return true;
 }
 
+FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename) {
+  SpanReader span_reader(filename);
+  // Make sure we have enough to read the size.
+  absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
+
+  // Make sure something was read.
+  CHECK(config_data != absl::Span<const uint8_t>());
+
+  // And copy the config so we have it forever.
+  std::vector<uint8_t> data(
+      config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
+  return FlatbufferVector<LogFileHeader>(std::move(data));
+}
+
 MessageReader::MessageReader(std::string_view filename)
     : span_reader_(filename) {
   // Make sure we have enough to read the size.
@@ -253,23 +289,57 @@
       chrono::nanoseconds(result.message().monotonic_sent_time()));
 
   newest_timestamp_ = std::max(newest_timestamp_, timestamp);
-  return result;
+  VLOG(1) << "Read from " << filename().substr(130) << " data "
+          << FlatbufferToJson(result);
+  return std::move(result);
 }
 
-SortedMessageReader::SortedMessageReader(
+SplitMessageReader::SplitMessageReader(
     const std::vector<std::string> &filenames)
     : filenames_(filenames),
       log_file_header_(FlatbufferDetachedBuffer<LogFileHeader>::Empty()) {
   CHECK(NextLogFile()) << ": filenames is empty.  Need files to read.";
 
+  // Grab any log file header.  They should all match (and we will check as we
+  // open more of them).
   log_file_header_ = CopyFlatBuffer(message_reader_->log_file_header());
 
+  // Setup per channel state.
   channels_.resize(configuration()->channels()->size());
+  for (ChannelData &channel_data : channels_) {
+    channel_data.data.split_reader = this;
+    // Build up the timestamp list.
+    if (configuration::MultiNode(configuration())) {
+      channel_data.timestamps.resize(configuration()->nodes()->size());
+      for (MessageHeaderQueue &queue : channel_data.timestamps) {
+        queue.timestamps = true;
+        queue.split_reader = this;
+      }
+    }
+  }
 
-  QueueMessages();
+  // Build up channels_to_write_ as an optimization to make it fast to figure
+  // out which datastructure to place any new data from a channel on.
+  for (const Channel *channel : *configuration()->channels()) {
+    // This is the main case.  We will only see data on this node.
+    if (configuration::ChannelIsSendableOnNode(channel, node())) {
+      channels_to_write_.emplace_back(
+          &channels_[channels_to_write_.size()].data);
+    } else
+        // If we can't send, but can receive, we should be able to see
+        // timestamps here.
+        if (configuration::ChannelIsReadableOnNode(channel, node())) {
+      channels_to_write_.emplace_back(
+          &(channels_[channels_to_write_.size()]
+                .timestamps[configuration::GetNodeIndex(configuration(),
+                                                        node())]));
+    } else {
+      channels_to_write_.emplace_back(nullptr);
+    }
+  }
 }
 
-bool SortedMessageReader::NextLogFile() {
+bool SplitMessageReader::NextLogFile() {
   if (next_filename_index_ == filenames_.size()) {
     return false;
   }
@@ -279,13 +349,8 @@
   // We can't support the config diverging between two log file headers.  See if
   // they are the same.
   if (next_filename_index_ != 0) {
-    // Since we copied before, we need to copy again to guarantee that things
-    // didn't get re-ordered.
-    const FlatbufferDetachedBuffer<LogFileHeader> new_log_file_header =
-        CopyFlatBuffer(message_reader_->log_file_header());
-    CHECK_EQ(new_log_file_header.size(), log_file_header_.size());
-    CHECK(memcmp(new_log_file_header.data(), log_file_header_.data(),
-                 log_file_header_.size()) == 0)
+    CHECK(CompareFlatBuffer(&log_file_header_.message(),
+                            message_reader_->log_file_header()))
         << ": Header is different between log file chunks "
         << filenames_[next_filename_index_] << " and "
         << filenames_[next_filename_index_ - 1] << ", this is not supported.";
@@ -295,22 +360,170 @@
   return true;
 }
 
-void SortedMessageReader::EmplaceDataBack(
-    FlatbufferVector<MessageHeader> &&new_data) {
-  const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
-      chrono::nanoseconds(new_data.message().monotonic_sent_time()));
-  const size_t channel_index = new_data.message().channel_index();
-  CHECK_LT(channel_index, channels_.size());
+bool SplitMessageReader::QueueMessages(
+    monotonic_clock::time_point oldest_message_time) {
+  // TODO(austin): Once we are happy that everything works, read a 256kb chunk
+  // to reduce the need to re-heap down below.
+  while (true) {
+    // Don't queue if we have enough data already.
+    // When a log file starts, there should be a message from each channel.
+    // Those messages might be very old. Make sure to read a chunk past the
+    // starting time.
+    if (queued_messages_ > 0 &&
+        message_reader_->queue_data_time() > oldest_message_time) {
+      return true;
+    }
 
-  if (channels_[channel_index].data.size() == 0) {
-    channels_[channel_index].oldest_timestamp = timestamp;
-    PushChannelHeap(timestamp, channel_index);
+    if (std::optional<FlatbufferVector<MessageHeader>> msg =
+            message_reader_->ReadMessage()) {
+      const MessageHeader &header = msg.value().message();
+
+      const int channel_index = header.channel_index();
+      channels_to_write_[channel_index]->emplace_back(std::move(msg.value()));
+
+      ++queued_messages_;
+    } else {
+      if (!NextLogFile()) {
+        return false;
+      }
+    }
   }
-  channels_[channel_index].data.emplace_back(std::move(new_data));
+}
+
+void SplitMessageReader::SetTimestampMerger(TimestampMerger *timestamp_merger,
+                                            int channel_index,
+                                            const Node *target_node) {
+  const Node *reinterpreted_target_node =
+      configuration::GetNodeOrDie(configuration(), target_node);
+  const Channel *const channel =
+      configuration()->channels()->Get(channel_index);
+
+  MessageHeaderQueue *message_header_queue = nullptr;
+
+  // Figure out if this log file is from our point of view, or the other node's
+  // point of view.
+  if (node() == reinterpreted_target_node) {
+    if (channels_to_write_[channel_index] != nullptr) {
+      // We already have deduced which is the right channel.  Use
+      // channels_to_write_ here.
+      message_header_queue = channels_to_write_[channel_index];
+    } else {
+      // This means this is data from another node, and will be ignored.
+    }
+  } else {
+    // We are replaying from another node's point of view.  The only interesting
+    // data is data that is forwarded to our node, ie was sent on the other
+    // node.
+    if (configuration::ChannelIsSendableOnNode(channel, node())) {
+      // Data from another node.
+      message_header_queue = &(channels_[channel_index].data);
+    } else {
+      // This is either not sendable on the other node, or is a timestamp and
+      // therefore not interesting.
+    }
+  }
+
+  // If we found one, write it down.  This will be nullptr when there is nothing
+  // relevant on this channel on this node for the target node.  In that case,
+  // we want to drop the message instead of queueing it.
+  if (message_header_queue != nullptr) {
+    message_header_queue->timestamp_merger = timestamp_merger;
+  }
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t,
+           FlatbufferVector<MessageHeader>>
+SplitMessageReader::PopOldest(int channel_index) {
+  CHECK_GT(channels_[channel_index].data.size(), 0u);
+  const std::tuple<monotonic_clock::time_point, uint32_t> timestamp =
+      channels_[channel_index].data.front_timestamp();
+  FlatbufferVector<MessageHeader> front =
+      std::move(channels_[channel_index].data.front());
+  channels_[channel_index].data.pop_front();
+  --queued_messages_;
+
+  return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
+                         std::move(front));
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t,
+           FlatbufferVector<MessageHeader>>
+SplitMessageReader::PopOldest(int channel, int node_index) {
+  CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
+  const std::tuple<monotonic_clock::time_point, uint32_t> timestamp =
+      channels_[channel].timestamps[node_index].front_timestamp();
+  FlatbufferVector<MessageHeader> front =
+      std::move(channels_[channel].timestamps[node_index].front());
+  channels_[channel].timestamps[node_index].pop_front();
+  --queued_messages_;
+
+  return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
+                         std::move(front));
+}
+
+void SplitMessageReader::MessageHeaderQueue::emplace_back(
+    FlatbufferVector<MessageHeader> &&msg) {
+  CHECK(split_reader != nullptr);
+
+  // If there is no timestamp merger for this queue, nobody is listening.  Drop
+  // the message.  This happens when a log file from another node is replayed,
+  // and the timestamp mergers down stream just don't care.
+  if (timestamp_merger == nullptr) {
+    return;
+  }
+
+  CHECK(timestamps != msg.message().has_data())
+      << ": Got timestamps and data mixed up on a node. "
+      << FlatbufferToJson(msg);
+
+  data_.emplace_back(std::move(msg));
+
+  if (data_.size() == 1u) {
+    // Yup, new data.  Notify.
+    if (timestamps) {
+      timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
+    } else {
+      timestamp_merger->Update(split_reader, front_timestamp());
+    }
+  }
+}
+
+void SplitMessageReader::MessageHeaderQueue::pop_front() {
+  data_.pop_front();
+  if (data_.size() != 0u) {
+    // Yup, new data.
+    if (timestamps) {
+      timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
+    } else {
+      timestamp_merger->Update(split_reader, front_timestamp());
+    }
+  }
 }
 
 namespace {
 
+bool SplitMessageReaderHeapCompare(
+    const std::tuple<monotonic_clock::time_point, uint32_t,
+                     SplitMessageReader *>
+        first,
+    const std::tuple<monotonic_clock::time_point, uint32_t,
+                     SplitMessageReader *>
+        second) {
+  if (std::get<0>(first) > std::get<0>(second)) {
+    return true;
+  } else if (std::get<0>(first) == std::get<0>(second)) {
+    if (std::get<1>(first) > std::get<1>(second)) {
+      return true;
+    } else if (std::get<1>(first) == std::get<1>(second)) {
+      return std::get<2>(first) > std::get<2>(second);
+    } else {
+      return false;
+    }
+  } else {
+    return false;
+  }
+}
+
 bool ChannelHeapCompare(
     const std::pair<monotonic_clock::time_point, int> first,
     const std::pair<monotonic_clock::time_point, int> second) {
@@ -325,8 +538,366 @@
 
 }  // namespace
 
-void SortedMessageReader::PushChannelHeap(monotonic_clock::time_point timestamp,
-                                          int channel_index) {
+TimestampMerger::TimestampMerger(
+    const Configuration *configuration,
+    std::vector<SplitMessageReader *> split_message_readers, int channel_index,
+    const Node *target_node, ChannelMerger *channel_merger)
+    : configuration_(configuration),
+      split_message_readers_(std::move(split_message_readers)),
+      channel_index_(channel_index),
+      node_index_(configuration::MultiNode(configuration)
+                      ? configuration::GetNodeIndex(configuration, target_node)
+                      : -1),
+      channel_merger_(channel_merger) {
+  // Tell the readers we care so they know who to notify.
+  for (SplitMessageReader *reader : split_message_readers_) {
+    reader->SetTimestampMerger(this, channel_index, target_node);
+  }
+
+  // And then determine if we need to track timestamps.
+  const Channel *channel = configuration->channels()->Get(channel_index);
+  if (!configuration::ChannelIsSendableOnNode(channel, target_node) &&
+      configuration::ChannelIsReadableOnNode(channel, target_node)) {
+    has_timestamps_ = true;
+  }
+}
+
+void TimestampMerger::PushMessageHeap(
+    std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+    SplitMessageReader *split_message_reader) {
+  DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
+                      [split_message_reader](
+                          const std::tuple<monotonic_clock::time_point,
+                                           uint32_t, SplitMessageReader *>
+                              x) {
+                        return std::get<2>(x) == split_message_reader;
+                      }) == message_heap_.end())
+      << ": Pushing message when it is already in the heap.";
+
+  message_heap_.push_back(std::make_tuple(
+      std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+
+  std::push_heap(message_heap_.begin(), message_heap_.end(),
+                 &SplitMessageReaderHeapCompare);
+
+  // If we are just a data merger, don't wait for timestamps.
+  if (!has_timestamps_) {
+    channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+    pushed_ = true;
+  }
+}
+
+void TimestampMerger::PushTimestampHeap(
+    std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+    SplitMessageReader *split_message_reader) {
+  DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
+                      [split_message_reader](
+                          const std::tuple<monotonic_clock::time_point,
+                                           uint32_t, SplitMessageReader *>
+                              x) {
+                        return std::get<2>(x) == split_message_reader;
+                      }) == timestamp_heap_.end())
+      << ": Pushing timestamp when it is already in the heap.";
+
+  timestamp_heap_.push_back(std::make_tuple(
+      std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+
+  std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                 SplitMessageReaderHeapCompare);
+
+  // If we are a timestamp merger, don't wait for data.  Missing data will be
+  // caught at read time.
+  if (has_timestamps_) {
+    channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+    pushed_ = true;
+  }
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t,
+           FlatbufferVector<MessageHeader>>
+TimestampMerger::PopMessageHeap() {
+  // Pop the oldest message reader pointer off the heap.
+  CHECK_GT(message_heap_.size(), 0u);
+  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+      oldest_message_reader = message_heap_.front();
+
+  std::pop_heap(message_heap_.begin(), message_heap_.end(),
+                &SplitMessageReaderHeapCompare);
+  message_heap_.pop_back();
+
+  // Pop the oldest message.  This re-pushes any messages from the reader to the
+  // message heap.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+      oldest_message =
+          std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
+
+  // Confirm that the time and queue_index we have recorded matches.
+  CHECK_EQ(std::get<0>(oldest_message), std::get<0>(oldest_message_reader));
+  CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
+
+  // Now, keep reading until we have found all duplicates.
+  while (message_heap_.size() > 0u) {
+    // See if it is a duplicate.
+    std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+        next_oldest_message_reader = message_heap_.front();
+
+    std::tuple<monotonic_clock::time_point, uint32_t> next_oldest_message_time =
+        std::get<2>(next_oldest_message_reader)->oldest_message(channel_index_);
+
+    if (std::get<0>(next_oldest_message_time) == std::get<0>(oldest_message) &&
+        std::get<1>(next_oldest_message_time) == std::get<1>(oldest_message)) {
+      // Pop the message reader pointer.
+      std::pop_heap(message_heap_.begin(), message_heap_.end(),
+                    &SplitMessageReaderHeapCompare);
+      message_heap_.pop_back();
+
+      // Pop the next oldest message.  This re-pushes any messages from the
+      // reader.
+      std::tuple<monotonic_clock::time_point, uint32_t,
+                 FlatbufferVector<MessageHeader>>
+          next_oldest_message = std::get<2>(next_oldest_message_reader)
+                                    ->PopOldest(channel_index_);
+
+      // And make sure the message matches in it's entirety.
+      CHECK(std::get<2>(oldest_message).span() ==
+            std::get<2>(next_oldest_message).span())
+          << ": Data at the same timestamp doesn't match.";
+    } else {
+      break;
+    }
+  }
+
+  return oldest_message;
+}
+
+std::tuple<monotonic_clock::time_point, uint32_t,
+           FlatbufferVector<MessageHeader>>
+TimestampMerger::PopTimestampHeap() {
+  // Pop the oldest message reader pointer off the heap.
+  CHECK_GT(timestamp_heap_.size(), 0u);
+
+  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+      oldest_timestamp_reader = timestamp_heap_.front();
+
+  std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                &SplitMessageReaderHeapCompare);
+  timestamp_heap_.pop_back();
+
+  CHECK(node_index_ != -1) << ": Timestamps in a single node environment";
+
+  // Pop the oldest message.  This re-pushes any timestamps from the reader to
+  // the timestamp heap.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+      oldest_timestamp = std::get<2>(oldest_timestamp_reader)
+                             ->PopOldest(channel_index_, node_index_);
+
+  // Confirm that the time we have recorded matches.
+  CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
+  CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
+
+  // TODO(austin): What if we get duplicate timestamps?
+
+  return oldest_timestamp;
+}
+
+std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
+TimestampMerger::PopOldest() {
+  if (has_timestamps_) {
+    CHECK_GT(message_heap_.size(), 0u)
+        << ": Missing data from source node, no data available to match "
+           "timestamp on "
+        << configuration::CleanedChannelToString(
+               configuration_->channels()->Get(channel_index_));
+
+    std::tuple<monotonic_clock::time_point, uint32_t,
+               FlatbufferVector<MessageHeader>>
+        oldest_timestamp = PopTimestampHeap();
+
+    TimestampMerger::DeliveryTimestamp timestamp;
+    timestamp.monotonic_event_time =
+        monotonic_clock::time_point(chrono::nanoseconds(
+            std::get<2>(oldest_timestamp).message().monotonic_sent_time()));
+    timestamp.realtime_event_time =
+        realtime_clock::time_point(chrono::nanoseconds(
+            std::get<2>(oldest_timestamp).message().realtime_sent_time()));
+
+    // Consistency check.
+    CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
+    CHECK_EQ(std::get<2>(oldest_timestamp).message().queue_index(),
+             std::get<1>(oldest_timestamp));
+
+    monotonic_clock::time_point remote_timestamp_monotonic_time(
+        chrono::nanoseconds(
+            std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
+
+    while (true) {
+      // Ok, now try grabbing data until we find one which matches.
+      std::tuple<monotonic_clock::time_point, uint32_t,
+                 FlatbufferVector<MessageHeader>>
+          oldest_message = PopMessageHeap();
+
+      // Time at which the message was sent (this message is written from the
+      // sending node's perspective.
+      monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
+          std::get<2>(oldest_message).message().monotonic_sent_time()));
+
+      if (remote_monotonic_time < remote_timestamp_monotonic_time) {
+        LOG(INFO) << "Undelivered message, skipping.  Remote time is "
+                  << remote_monotonic_time << " timestamp is "
+                  << remote_timestamp_monotonic_time << " on channel "
+                  << channel_index_;
+        continue;
+      }
+
+      timestamp.monotonic_remote_time = remote_monotonic_time;
+      timestamp.realtime_remote_time =
+          realtime_clock::time_point(chrono::nanoseconds(
+              std::get<2>(oldest_message).message().realtime_sent_time()));
+      timestamp.remote_queue_index =
+          std::get<2>(oldest_message).message().queue_index();
+
+      CHECK_EQ(remote_monotonic_time, remote_timestamp_monotonic_time);
+      CHECK_EQ(timestamp.remote_queue_index, std::get<1>(oldest_timestamp));
+
+      return std::make_tuple(timestamp, std::get<2>(oldest_message));
+    }
+  } else {
+    std::tuple<monotonic_clock::time_point, uint32_t,
+               FlatbufferVector<MessageHeader>>
+        oldest_message = PopMessageHeap();
+
+    TimestampMerger::DeliveryTimestamp timestamp;
+    timestamp.monotonic_event_time =
+        monotonic_clock::time_point(chrono::nanoseconds(
+            std::get<2>(oldest_message).message().monotonic_sent_time()));
+    timestamp.realtime_event_time =
+        realtime_clock::time_point(chrono::nanoseconds(
+            std::get<2>(oldest_message).message().realtime_sent_time()));
+    timestamp.remote_queue_index = 0xffffffff;
+
+    CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
+    CHECK_EQ(std::get<1>(oldest_message),
+             std::get<2>(oldest_message).message().queue_index());
+
+    return std::make_tuple(timestamp, std::get<2>(oldest_message));
+  }
+}
+
+namespace {
+std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
+    const std::vector<std::vector<std::string>> &filenames) {
+  CHECK_GT(filenames.size(), 0u);
+  // Build up all the SplitMessageReaders.
+  std::vector<std::unique_ptr<SplitMessageReader>> result;
+  for (const std::vector<std::string> &filenames : filenames) {
+    result.emplace_back(std::make_unique<SplitMessageReader>(filenames));
+  }
+  return result;
+}
+}  // namespace
+
+ChannelMerger::ChannelMerger(
+    const std::vector<std::vector<std::string>> &filenames)
+    : split_message_readers_(MakeSplitMessageReaders(filenames)),
+      log_file_header_(
+          CopyFlatBuffer(split_message_readers_[0]->log_file_header())) {
+  // Now, confirm that the configuration matches for each and pick a start time.
+  // Also return the list of possible nodes.
+  for (const std::unique_ptr<SplitMessageReader> &reader :
+       split_message_readers_) {
+    CHECK(CompareFlatBuffer(log_file_header_.message().configuration(),
+                            reader->log_file_header()->configuration()))
+        << ": Replaying log files with different configurations isn't "
+           "supported";
+  }
+
+  nodes_ = configuration::GetNodes(configuration());
+}
+
+bool ChannelMerger::SetNode(const Node *target_node) {
+  std::vector<SplitMessageReader *> split_message_readers;
+  for (const std::unique_ptr<SplitMessageReader> &reader :
+       split_message_readers_) {
+    split_message_readers.emplace_back(reader.get());
+  }
+
+  // Go find a log_file_header for this node.
+  {
+    bool found_node = false;
+
+    for (const std::unique_ptr<SplitMessageReader> &reader :
+         split_message_readers_) {
+      if (CompareFlatBuffer(reader->node(), target_node)) {
+        if (!found_node) {
+          found_node = true;
+          log_file_header_ = CopyFlatBuffer(reader->log_file_header());
+        } else {
+          // And then make sure all the other files have matching headers.
+          CHECK(
+              CompareFlatBuffer(log_file_header(), reader->log_file_header()));
+        }
+      }
+    }
+
+    if (!found_node) {
+      LOG(WARNING) << "Failed to find log file for node "
+                   << FlatbufferToJson(target_node);
+      return false;
+    }
+  }
+
+  // Build up all the timestamp mergers.  This connects up all the
+  // SplitMessageReaders.
+  timestamp_mergers_.reserve(configuration()->channels()->size());
+  for (size_t channel_index = 0;
+       channel_index < configuration()->channels()->size(); ++channel_index) {
+    timestamp_mergers_.emplace_back(
+        configuration(), split_message_readers, channel_index,
+        configuration::GetNode(configuration(), target_node), this);
+  }
+
+  // And prime everything.
+  size_t split_message_reader_index = 0;
+  for (std::unique_ptr<SplitMessageReader> &split_message_reader :
+       split_message_readers_) {
+    if (split_message_reader->QueueMessages(
+            split_message_reader->monotonic_start_time())) {
+      split_message_reader_heap_.push_back(std::make_pair(
+          split_message_reader->queue_data_time(), split_message_reader_index));
+
+      std::push_heap(split_message_reader_heap_.begin(),
+                     split_message_reader_heap_.end(), ChannelHeapCompare);
+    }
+    ++split_message_reader_index;
+  }
+
+  node_ = configuration::GetNodeOrDie(configuration(), target_node);
+  return true;
+}
+
+monotonic_clock::time_point ChannelMerger::OldestMessage() const {
+  if (channel_heap_.size() == 0u) {
+    return monotonic_clock::max_time;
+  }
+  return channel_heap_.front().first;
+}
+
+void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
+                                    int channel_index) {
+  // Pop and recreate the heap if it has already been pushed.  And since we are
+  // pushing again, we don't need to clear pushed.
+  if (timestamp_mergers_[channel_index].pushed()) {
+    channel_heap_.erase(std::find_if(
+        channel_heap_.begin(), channel_heap_.end(),
+        [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
+          return x.second == channel_index;
+        }));
+    std::make_heap(channel_heap_.begin(), channel_heap_.end(),
+                   ChannelHeapCompare);
+  }
+
   channel_heap_.push_back(std::make_pair(timestamp, channel_index));
 
   // The default sort puts the newest message first.  Use a custom comparator to
@@ -335,60 +906,65 @@
                  ChannelHeapCompare);
 }
 
-void SortedMessageReader::QueueMessages() {
-  while (true) {
-    // Don't queue if we have enough data already.
-    // When a log file starts, there should be a message from each channel.
-    // Those messages might be very old. Make sure to read a chunk past the
-    // starting time.
-    if (channel_heap_.size() > 0 &&
-        message_reader_->newest_timestamp() >
-            std::max(oldest_message().first, monotonic_start_time()) +
-                message_reader_->max_out_of_order_duration()) {
-      break;
-    }
-
-    if (std::optional<FlatbufferVector<MessageHeader>> msg =
-            message_reader_->ReadMessage()) {
-      EmplaceDataBack(std::move(msg.value()));
-    } else {
-      if (!NextLogFile()) {
-        break;
-      }
-    }
-  }
-}
-
-std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
-SortedMessageReader::PopOldestChannel() {
+std::tuple<TimestampMerger::DeliveryTimestamp, int,
+           FlatbufferVector<MessageHeader>>
+ChannelMerger::PopOldest() {
+  CHECK(channel_heap_.size() > 0);
   std::pair<monotonic_clock::time_point, int> oldest_channel_data =
       channel_heap_.front();
+  int channel_index = oldest_channel_data.second;
   std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
                 &ChannelHeapCompare);
   channel_heap_.pop_back();
+  timestamp_mergers_[channel_index].set_pushed(false);
 
-  struct ChannelData &channel = channels_[oldest_channel_data.second];
+  TimestampMerger *merger = &timestamp_mergers_[channel_index];
 
-  FlatbufferVector<MessageHeader> front = std::move(channel.front());
+  // Merger auto-pushes from here, but doesn't fetch anything new from the log
+  // file.
+  std::tuple<TimestampMerger::DeliveryTimestamp,
+             FlatbufferVector<MessageHeader>>
+      message = merger->PopOldest();
 
-  channel.data.pop_front();
+  QueueMessages(OldestMessage());
 
-  // Re-push it and update the oldest timestamp.
-  if (channel.data.size() != 0) {
-    const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
-        chrono::nanoseconds(channel.front().message().monotonic_sent_time()));
-    PushChannelHeap(timestamp, oldest_channel_data.second);
-    channel.oldest_timestamp = timestamp;
-  } else {
-    channel.oldest_timestamp = monotonic_clock::min_time;
+  return std::make_tuple(std::get<0>(message), channel_index,
+                         std::move(std::get<1>(message)));
+}
+
+void ChannelMerger::QueueMessages(
+    monotonic_clock::time_point oldest_message_time) {
+  // Pop and re-queue readers until they are all caught up.
+  while (true) {
+    if (split_message_reader_heap_.size() == 0) {
+      return;
+    }
+    std::pair<monotonic_clock::time_point, int> oldest_channel_data =
+        split_message_reader_heap_.front();
+
+    // No work to do, bail.
+    if (oldest_channel_data.first > oldest_message_time) {
+      return;
+    }
+
+    // Drop it off the heap.
+    std::pop_heap(split_message_reader_heap_.begin(),
+                  split_message_reader_heap_.end(), &ChannelHeapCompare);
+    split_message_reader_heap_.pop_back();
+
+    // And if there is data left in the log file, push it back on the heap with
+    // the updated time.
+    const int split_message_reader_index = oldest_channel_data.second;
+    if (split_message_readers_[split_message_reader_index]->QueueMessages(
+            oldest_message_time)) {
+      split_message_reader_heap_.push_back(std::make_pair(
+          split_message_readers_[split_message_reader_index]->queue_data_time(),
+          split_message_reader_index));
+
+      std::push_heap(split_message_reader_heap_.begin(),
+                     split_message_reader_heap_.end(), ChannelHeapCompare);
+    }
   }
-
-  if (oldest_channel_data.first > message_reader_->queue_data_time()) {
-    QueueMessages();
-  }
-
-  return std::make_tuple(oldest_channel_data.first, oldest_channel_data.second,
-                         std::move(front));
 }
 
 }  // namespace logger
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 5b2bfa6..9a849b2 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -26,10 +26,11 @@
   // The message originated on another node. Log it and the delivery times
   // together.  The message_gateway is responsible for logging any messages
   // which didn't get delivered.
-  kLogMessageAndDeliveryTime
+  kLogMessageAndDeliveryTime,
+  // The message originated on the other node and should be logged on this node.
+  kLogRemoteMessage
 };
 
-
 // This class manages efficiently writing a sequence of detached buffers to a
 // file.  It queues them up and batches the write operation.
 class DetachedBufferWriter {
@@ -37,6 +38,8 @@
   DetachedBufferWriter(std::string_view filename);
   ~DetachedBufferWriter();
 
+  std::string_view filename() const { return filename_; }
+
   // TODO(austin): Snappy compress the log file if it ends with .snappy!
 
   // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
@@ -51,6 +54,8 @@
   void Flush();
 
  private:
+  const std::string filename_;
+
   int fd_ = -1;
 
   // Size of all the data in the queue.
@@ -68,6 +73,8 @@
     flatbuffers::FlatBufferBuilder *fbb, const Context &context,
     int channel_index, LogType log_type);
 
+FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
+
 // Class to read chunks out of a log file.
 class SpanReader {
  public:
@@ -75,6 +82,8 @@
 
   ~SpanReader() { close(fd_); }
 
+  std::string_view filename() const { return filename_; }
+
   // Returns a span with the data for a message from the log file, excluding
   // the size.
   absl::Span<const uint8_t> ReadMessage();
@@ -92,6 +101,8 @@
   // Reads a chunk of data into data_.  Returns false if no data was read.
   bool ReadBlock();
 
+  const std::string filename_;
+
   // File descriptor for the log file.
   int fd_ = -1;
 
@@ -138,6 +149,8 @@
  public:
   MessageReader(std::string_view filename);
 
+  std::string_view filename() const { return span_reader_.filename(); }
+
   // Returns the header from the log file.
   const LogFileHeader *log_file_header() const {
     return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(
@@ -177,52 +190,64 @@
   monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
 };
 
-// We need to read a large chunk at a time, then kit it up into parts and
-// sort.
-//
-// We want to read 256 KB chunks at a time.  This is the fastest read size.
-// This leaves us with a fragmentation problem though.
-//
-// The easy answer is to read 256 KB chunks.  Then, malloc and memcpy those
-// chunks into single flatbuffer messages and manage them in a sorted queue.
-// Everything is copied three times (into 256 kb buffer, then into separate
-// buffer, then into sender), but none of it is all that expensive.  We can
-// optimize if it is slow later.
-//
-// As we place the elements in the sorted list of times, keep doing this
-// until we read a message that is newer than the threshold.
-//
-// Then repeat.  Keep filling up the sorted list with 256 KB chunks (need a
-// small state machine so we can resume), and keep pulling messages back out
-// and sending.
-//
-// For sorting, we want to use the fact that each channel is sorted, and
-// then merge sort the channels.  Have a vector of deques, and then hold a
-// sorted list of pointers to those.
-class SortedMessageReader {
- public:
-  SortedMessageReader(const std::vector<std::string> &filenames);
+class TimestampMerger;
 
-  // Returns the header from the log file.
+// A design requirement is that the relevant data for a channel is not more than
+// max_out_of_order_duration out of order. We approach sorting in layers.
+//
+// 1) Split each (maybe chunked) log file into one queue per channel.  Read this
+//    log file looking for data pertaining to a specific node.
+//    (SplitMessageReader)
+// 2) Merge all the data per channel from the different log files into a sorted
+//    list of timestamps and messages. (TimestampMerger)
+// 3) Combine the timestamps and messages. (TimestampMerger)
+// 4) Merge all the channels to produce the next message on a node.
+//    (ChannelMerger)
+// 5) Duplicate this entire stack per node.
+
+// This class splits messages and timestamps up into a queue per channel, and
+// handles reading data from multiple chunks.
+class SplitMessageReader {
+ public:
+  SplitMessageReader(const std::vector<std::string> &filenames);
+
+  // Sets the TimestampMerger that gets notified for each channel.  The node
+  // that the TimestampMerger is merging as needs to be passed in.
+  void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
+                          const Node *target_node);
+
+  // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
+  // max_time if there is nothing in the channel.
+  std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
+      int channel) {
+    return channels_[channel].data.front_timestamp();
+  }
+
+  // Returns the (timestamp, queue_index) for the oldest delivery time in a
+  // channel, or max_time if there is nothing in the channel.
+  std::tuple<monotonic_clock::time_point, uint32_t> oldest_message(
+      int channel, int destination_node) {
+    return channels_[channel].timestamps[destination_node].front_timestamp();
+  }
+
+  // Returns the timestamp, queue_index, and message for the oldest data on a
+  // channel.  Requeues data as needed.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+  PopOldest(int channel_index);
+
+  // Returns the timestamp, queue_index, and message for the oldest timestamp on
+  // a channel delivered to a node.  Requeues data as needed.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+  PopOldest(int channel, int node_index);
+
+  // Returns the header for the log files.
   const LogFileHeader *log_file_header() const {
     return &log_file_header_.message();
   }
 
-  // Returns a pointer to the channel with the oldest message in it, and the
-  // timestamp.
-  const std::pair<monotonic_clock::time_point, int> &oldest_message() const {
-    return channel_heap_.front();
-  }
-
-  // Returns the number of channels with data still in them.
-  size_t active_channel_count() const { return channel_heap_.size(); }
-
-  // Returns the configuration from the log file header.
-  const Configuration *configuration() const {
-    return log_file_header()->configuration();
-  }
-
-  // Returns the start time on both the monotonic and realtime clocks.
+  // Returns the starting time for this set of log files.
   monotonic_clock::time_point monotonic_start_time() {
     return monotonic_clock::time_point(
         std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
@@ -232,74 +257,304 @@
         std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
   }
 
+  // Returns the configuration from the log file header.
+  const Configuration *configuration() const {
+    return log_file_header()->configuration();
+  }
+
   // Returns the node who's point of view this log file is from.  Make sure this
   // is a pointer in the configuration() nodes list so it can be consumed
   // elsewhere.
   const Node *node() const {
     if (configuration()->has_nodes()) {
-      CHECK(log_file_header()->has_node());
-      CHECK(log_file_header()->node()->has_name());
-      return configuration::GetNode(
-          configuration(), log_file_header()->node()->name()->string_view());
+      return configuration::GetNodeOrDie(configuration(),
+                                         log_file_header()->node());
     } else {
       CHECK(!log_file_header()->has_node());
       return nullptr;
     }
   }
 
-  // Pops a pointer to the channel with the oldest message in it, and the
-  // timestamp.
-  std::tuple<monotonic_clock::time_point, int, FlatbufferVector<MessageHeader>>
-  PopOldestChannel();
+  // Returns the timestamp of the newest message read from the log file, and the
+  // timestamp that we need to re-queue data.
+  monotonic_clock::time_point newest_timestamp() const {
+    return message_reader_->newest_timestamp();
+  }
+  monotonic_clock::time_point queue_data_time() const {
+    return message_reader_->queue_data_time();
+  }
+
+
+  // Adds more messages to the sorted list.  This reads enough data such that
+  // oldest_message_time can be replayed safely.  Returns false if the log file
+  // has all been read.
+  bool QueueMessages(monotonic_clock::time_point oldest_message_time);
 
  private:
+  // TODO(austin): Need to copy or refcount the message instead of running
+  // multiple copies of the reader.  Or maybe have a "as_node" index and hide it
+  // inside.
+
   // Moves to the next log file in the list.
   bool NextLogFile();
 
-  // Adds more messages to the sorted list.
-  void QueueMessages();
+  // Filenames of the log files.
+  std::vector<std::string> filenames_;
+  // And the index of the next file to open.
+  size_t next_filename_index_ = 0;
 
-  // Moves the message to the correct channel queue.
-  void EmplaceDataBack(FlatbufferVector<MessageHeader> &&new_data);
-
-  // Pushes a pointer to the channel for the given timestamp to the sorted
-  // channel list.
-  void PushChannelHeap(monotonic_clock::time_point timestamp,
-                       int channel_index);
-
+  // Log file header to report.  This is a copy.
+  FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+  // Current log file being read.
+  std::unique_ptr<MessageReader> message_reader_;
 
   // Datastructure to hold the list of messages, cached timestamp for the
   // oldest message, and sender to send with.
-  struct ChannelData {
-    monotonic_clock::time_point oldest_timestamp = monotonic_clock::min_time;
-    std::deque<FlatbufferVector<MessageHeader>> data;
-    std::unique_ptr<RawSender> raw_sender;
+  struct MessageHeaderQueue {
+    // If true, this is a timestamp queue.
+    bool timestamps = false;
 
-    // Returns the oldest message.
-    const FlatbufferVector<MessageHeader> &front() { return data.front(); }
-
-    // Returns the timestamp for the oldest message.
-    const monotonic_clock::time_point front_timestamp() {
-      return monotonic_clock::time_point(
-          std::chrono::nanoseconds(front().message().monotonic_sent_time()));
+    // Returns a reference to the the oldest message.
+    FlatbufferVector<MessageHeader> &front() {
+      CHECK_GT(data_.size(), 0u);
+      return data_.front();
     }
+
+    // Adds a message to the back of the queue.
+    void emplace_back(FlatbufferVector<MessageHeader> &&msg);
+
+    // Drops the front message.  Invalidates the front() reference.
+    void pop_front();
+
+    // The size of the queue.
+    size_t size() { return data_.size(); }
+
+    // Returns the (timestamp, queue_index) for the oldest message.
+    const std::tuple<monotonic_clock::time_point, uint32_t> front_timestamp() {
+      CHECK_GT(data_.size(), 0u);
+      return std::make_tuple(
+          monotonic_clock::time_point(std::chrono::nanoseconds(
+              front().message().monotonic_sent_time())),
+          front().message().queue_index());
+    }
+
+    // Pointer to the timestamp merger for this queue if available.
+    TimestampMerger *timestamp_merger = nullptr;
+    // Pointer to the reader which feeds this queue.
+    SplitMessageReader *split_reader = nullptr;
+
+   private:
+    // The data.
+    std::deque<FlatbufferVector<MessageHeader>> data_;
   };
 
-  std::vector<std::string> filenames_;
-  size_t next_filename_index_ = 0;
+  // All the queues needed for a channel.  There isn't going to be data in all
+  // of these.
+  struct ChannelData {
+    // The data queue for the channel.
+    MessageHeaderQueue data;
+    // Queues for timestamps for each node.
+    std::vector<MessageHeaderQueue> timestamps;
+  };
 
-  FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
-  std::unique_ptr<MessageReader> message_reader_;
-
-  // TODO(austin): Multithreaded read at some point.  Gotta go faster!
-  // Especially if we start compressing.
-
-  // List of channels and messages for them.
+  // Data for all the channels.
   std::vector<ChannelData> channels_;
 
-  // Heap of channels so we can track which channel to send next.
+  // Once we know the node that this SplitMessageReader will be writing as,
+  // there will be only one MessageHeaderQueue that a specific channel matches.
+  // Precompute this here for efficiency.
+  std::vector<MessageHeaderQueue *> channels_to_write_;
+
+  // Number of messages queued.
+  size_t queued_messages_ = 0;
+};
+
+class ChannelMerger;
+
+// Sorts channels (and timestamps) from multiple log files for a single channel.
+class TimestampMerger {
+ public:
+  TimestampMerger(const Configuration *configuration,
+                  std::vector<SplitMessageReader *> split_message_readers,
+                  int channel_index, const Node *target_node,
+                  ChannelMerger *channel_merger);
+
+  // Metadata used to schedule the message.
+  struct DeliveryTimestamp {
+    monotonic_clock::time_point monotonic_event_time =
+        monotonic_clock::min_time;
+    realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+
+    monotonic_clock::time_point monotonic_remote_time =
+        monotonic_clock::min_time;
+    realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
+    uint32_t remote_queue_index = 0xffffffff;
+  };
+
+  // Pushes SplitMessageReader onto the timestamp heap.  This should only be
+  // called when timestamps are placed in the channel this class is merging for
+  // the reader.
+  void UpdateTimestamp(
+      SplitMessageReader *split_message_reader,
+      std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+    PushTimestampHeap(oldest_message_time, split_message_reader);
+  }
+  // Pushes SplitMessageReader onto the message heap.  This should only be
+  // called when data is placed in the channel this class is merging for the
+  // reader.
+  void Update(
+      SplitMessageReader *split_message_reader,
+      std::tuple<monotonic_clock::time_point, uint32_t> oldest_message_time) {
+    PushMessageHeap(oldest_message_time, split_message_reader);
+  }
+
+  // Returns the oldest combined timestamp and data for this channel.
+  std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
+
+  // Tracks if the channel merger has pushed this onto it's heap or not.
+  bool pushed() { return pushed_; }
+  // Sets if this has been pushed to the channel merger heap.  Should only be
+  // called by the channel merger.
+  void set_pushed(bool pushed) { pushed_ = pushed; }
+
+ private:
+  // Pushes messages and timestamps to the corresponding heaps.
+  void PushMessageHeap(
+      std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+      SplitMessageReader *split_message_reader);
+  void PushTimestampHeap(
+      std::tuple<monotonic_clock::time_point, uint32_t> timestamp,
+      SplitMessageReader *split_message_reader);
+
+  // Pops a message from the message heap.  This automatically triggers the
+  // split message reader to re-fetch any new data.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+  PopMessageHeap();
+  // Pops a message from the timestamp heap.  This automatically triggers the
+  // split message reader to re-fetch any new data.
+  std::tuple<monotonic_clock::time_point, uint32_t,
+             FlatbufferVector<MessageHeader>>
+  PopTimestampHeap();
+
+  const Configuration *configuration_;
+
+  // If true, this is a forwarded channel and timestamps should be matched.
+  bool has_timestamps_ = false;
+
+  // Tracks if the ChannelMerger has pushed this onto it's queue.
+  bool pushed_ = false;
+
+  // The split message readers used for source data.
+  std::vector<SplitMessageReader *> split_message_readers_;
+
+  // The channel to merge.
+  int channel_index_;
+
+  // Our node.
+  int node_index_;
+
+  // Heaps for messages and timestamps.
+  std::vector<
+      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+      message_heap_;
+  std::vector<
+      std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
+      timestamp_heap_;
+
+  // Parent channel merger.
+  ChannelMerger *channel_merger_;
+};
+
+// This class handles constructing all the split message readers, channel
+// mergers, and combining the results.
+class ChannelMerger {
+ public:
+  // Builds a ChannelMerger around a set of log files.  These are of the format:
+  //   {
+  //     {log1_part0, log1_part1, ...},
+  //     {log2}
+  //   }
+  // The inner vector is a list of log file chunks which form up a log file.
+  // The outer vector is a list of log files with subsets of the messages, or
+  // messages from different nodes.
+  ChannelMerger(const std::vector<std::vector<std::string>> &filenames);
+
+  // Returns the nodes that we know how to merge.
+  const std::vector<const Node *> nodes() const;
+  // Sets the node that we will return messages as.  Returns true if the node
+  // has log files and will produce data.  This can only be called once, and
+  // will likely corrupt state if called a second time.
+  bool SetNode(const Node *target_node);
+
+  // Everything else needs the node set before it works.
+
+  // Returns a timestamp for the oldest message in this group of logfiles.
+  monotonic_clock::time_point OldestMessage() const;
+  // Pops the oldest message.
+  std::tuple<TimestampMerger::DeliveryTimestamp, int,
+             FlatbufferVector<MessageHeader>>
+  PopOldest();
+
+  // Returns the config for this set of log files.
+  const Configuration *configuration() const {
+    return log_file_header()->configuration();
+  }
+
+  const LogFileHeader *log_file_header() const {
+    return &log_file_header_.message();
+  }
+
+  // Returns the start times for the configured node's log files.
+  monotonic_clock::time_point monotonic_start_time() {
+    return monotonic_clock::time_point(
+        std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
+  }
+  realtime_clock::time_point realtime_start_time() {
+    return realtime_clock::time_point(
+        std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
+  }
+
+  // Returns the node set by SetNode above.
+  const Node *node() const { return node_; }
+
+  // Called by the TimestampMerger when new data is available with the provided
+  // timestamp and channel_index.
+  void Update(monotonic_clock::time_point timestamp, int channel_index) {
+    PushChannelHeap(timestamp, channel_index);
+  }
+
+ private:
+  // Queues messages from each SplitMessageReader until enough data is queued
+  // such that we can guarentee all sorting has happened.
+  void QueueMessages(monotonic_clock::time_point oldest_message_time);
+
+  // Pushes the timestamp for new data on the provided channel.
+  void PushChannelHeap(monotonic_clock::time_point timestamp,
+                       int channel_index);
+
+  // All the message readers.
+  std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
+
+  // The log header we are claiming to be.
+  FlatbufferDetachedBuffer<LogFileHeader> log_file_header_;
+
+  // The timestamp mergers which combine data from the split message readers.
+  std::vector<TimestampMerger> timestamp_mergers_;
+
+  // A heap of the channel readers and timestamps for the oldest data in each.
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
 
+  // This holds a heap of split_message_readers sorted by the time at which they
+  // need to have QueueMessages called on them.
+  std::vector<std::pair<monotonic_clock::time_point, int>>
+      split_message_reader_heap_;
+
+  // Configured node.
+  const Node *node_;
+
+  // Cached copy of the list of nodes.
+  std::vector<const Node *> nodes_;
 };
 
 }  // namespace logger
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index c501d7b..84e1f43 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -26,12 +26,22 @@
 
 Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
                std::chrono::milliseconds polling_period)
+    : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
+             event_loop, polling_period) {}
+
+Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+               std::chrono::milliseconds polling_period)
     : event_loop_(event_loop),
-      writer_(writer),
+      log_namer_(std::move(log_namer)),
       timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
       polling_period_(polling_period) {
+  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+  int channel_index = 0;
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
     FetcherStruct fs;
+    const bool is_local =
+        configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
+
     const bool is_readable =
         configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
     const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
@@ -50,28 +60,21 @@
               << configuration::CleanedChannelToString(channel);
 
       if (log_delivery_times) {
-        if (log_message) {
-          VLOG(1) << "  Logging message and delivery times";
-          fs.log_type = LogType::kLogMessageAndDeliveryTime;
-        } else {
-          VLOG(1) << "  Logging delivery times only";
-          fs.log_type = LogType::kLogDeliveryTimeOnly;
-        }
-      } else {
-        // We don't have a particularly great use case right now for logging a
-        // forwarded message, but either not logging the delivery times, or
-        // logging them on another node.  Fail rather than produce bad results.
-        CHECK(configuration::ChannelIsSendableOnNode(channel,
-                                                     event_loop_->node()))
-            << ": Logger only knows how to log remote messages with "
-               "forwarding timestamps.";
-        VLOG(1) << "  Logging message only";
-        fs.log_type = LogType::kLogMessage;
+        VLOG(1) << "  Delivery times";
+        fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
       }
+      if (log_message) {
+        VLOG(1) << "  Data";
+        fs.writer = log_namer_->MakeWriter(channel);
+        if (!is_local) {
+          fs.log_type = LogType::kLogRemoteMessage;
+        }
+      }
+      fs.channel_index = channel_index;
+      fs.written = false;
+      fetchers_.emplace_back(std::move(fs));
     }
-
-    fs.written = false;
-    fetchers_.emplace_back(std::move(fs));
+    ++channel_index;
   }
 
   // When things start, we want to log the header, then the most recent messages
@@ -82,9 +85,7 @@
     // so we can capture the latest message on each channel.  This lets us have
     // non periodic messages with configuration that now get logged.
     for (FetcherStruct &f : fetchers_) {
-      if (f.fetcher.get() != nullptr) {
-        f.written = !f.fetcher->Fetch();
-      }
+      f.written = !f.fetcher->Fetch();
     }
 
     // We need to pick a point in time to declare the log file "started".  This
@@ -105,6 +106,11 @@
 }
 
 void Logger::WriteHeader() {
+  for (const Node *node : log_namer_->nodes()) {
+    WriteHeader(node);
+  }
+}
+void Logger::WriteHeader(const Node *node) {
   // Now write the header with this timestamp in it.
   flatbuffers::FlatBufferBuilder fbb;
   fbb.ForceDefaults(1);
@@ -117,7 +123,7 @@
 
   flatbuffers::Offset<Node> node_offset;
   if (event_loop_->node() != nullptr) {
-    node_offset = CopyFlatBuffer(event_loop_->node(), &fbb);
+    node_offset = CopyFlatBuffer(node, &fbb);
   }
 
   aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
@@ -125,7 +131,7 @@
   log_file_header_builder.add_name(string_offset);
 
   // Only add the node if we are running in a multinode configuration.
-  if (event_loop_->node() != nullptr) {
+  if (node != nullptr) {
     log_file_header_builder.add_node(node_offset);
   }
 
@@ -149,15 +155,32 @@
           .count());
 
   fbb.FinishSizePrefixed(log_file_header_builder.Finish());
-  writer_->QueueSizedFlatbuffer(&fbb);
+  log_namer_->WriteHeader(&fbb, node);
 }
 
 void Logger::Rotate(DetachedBufferWriter *writer) {
+  Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
+}
+
+void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
   // Force data up until now to be written.
   DoLogData();
 
   // Swap the writer out, and re-write the header.
-  writer_ = writer;
+  log_namer_ = std::move(log_namer);
+
+  // And then update the writers.
+  for (FetcherStruct &f : fetchers_) {
+    const Channel *channel =
+        event_loop_->configuration()->channels()->Get(f.channel_index);
+    if (f.timestamp_writer != nullptr) {
+      f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
+    }
+    if (f.writer != nullptr) {
+      f.writer = log_namer_->MakeWriter(channel);
+    }
+  }
+
   WriteHeader();
 }
 
@@ -173,61 +196,81 @@
     // per iteration, even if it is small.
     last_synchronized_time_ =
         std::min(last_synchronized_time_ + polling_period_, monotonic_now);
-    size_t channel_index = 0;
     // Write each channel to disk, one at a time.
     for (FetcherStruct &f : fetchers_) {
-      // Skip any channels which we aren't supposed to log.
-      if (f.fetcher.get() != nullptr) {
-        while (true) {
-          if (f.written) {
-            if (!f.fetcher->FetchNext()) {
-              VLOG(2) << "No new data on "
-                      << configuration::CleanedChannelToString(
-                             f.fetcher->channel());
-              break;
-            } else {
-              f.written = false;
-            }
+      while (true) {
+        if (f.written) {
+          if (!f.fetcher->FetchNext()) {
+            VLOG(2) << "No new data on "
+                    << configuration::CleanedChannelToString(
+                           f.fetcher->channel());
+            break;
+          } else {
+            f.written = false;
           }
+        }
 
-          CHECK(!f.written);
+        CHECK(!f.written);
 
-          // TODO(james): Write tests to exercise this logic.
-          if (f.fetcher->context().monotonic_event_time <
-              last_synchronized_time_) {
+        // TODO(james): Write tests to exercise this logic.
+        if (f.fetcher->context().monotonic_event_time <
+            last_synchronized_time_) {
+          if (f.writer != nullptr) {
             // Write!
             flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
                                                max_header_size_);
             fbb.ForceDefaults(1);
 
             fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                               channel_index, f.log_type));
+                                               f.channel_index, f.log_type));
 
-            VLOG(2) << "Writing data for channel "
+            VLOG(1) << "Writing data as node "
+                    << FlatbufferToJson(event_loop_->node()) << " for channel "
                     << configuration::CleanedChannelToString(
-                           f.fetcher->channel());
+                           f.fetcher->channel())
+                    << " to " << f.writer->filename()
+                    << " data "
+                    << FlatbufferToJson(
+                           flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                               fbb.GetBufferPointer()));
 
             max_header_size_ = std::max(
                 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
-            writer_->QueueSizedFlatbuffer(&fbb);
-
-            f.written = true;
-          } else {
-            break;
+            f.writer->QueueSizedFlatbuffer(&fbb);
           }
+
+          if (f.timestamp_writer != nullptr) {
+            // And now handle timestamps.
+            flatbuffers::FlatBufferBuilder fbb;
+            fbb.ForceDefaults(1);
+
+            fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                               f.channel_index,
+                                               LogType::kLogDeliveryTimeOnly));
+
+            VLOG(1) << "Writing timestamps as node "
+                    << FlatbufferToJson(event_loop_->node()) << " for channel "
+                    << configuration::CleanedChannelToString(
+                           f.fetcher->channel())
+                    << " to " << f.timestamp_writer->filename()
+                    << " timestamp "
+                    << FlatbufferToJson(
+                           flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                               fbb.GetBufferPointer()));
+
+            f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+          }
+
+          f.written = true;
+        } else {
+          break;
         }
       }
-
-      ++channel_index;
     }
 
-    CHECK_EQ(channel_index, fetchers_.size());
-
     // If we missed cycles, we could be pretty far behind.  Spin until we are
     // caught up.
   } while (last_synchronized_time_ + polling_period_ < monotonic_now);
-
-  writer_->Flush();
 }
 
 LogReader::LogReader(std::string_view filename,
@@ -237,41 +280,58 @@
 
 LogReader::LogReader(const std::vector<std::string> &filenames,
                      const Configuration *replay_configuration)
-    : sorted_message_reader_(filenames),
+    : LogReader(std::vector<std::vector<std::string>>{filenames},
+                replay_configuration) {}
+
+LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
+                     const Configuration *replay_configuration)
+    : filenames_(filenames),
+      log_file_header_(ReadHeader(filenames[0][0])),
       replay_configuration_(replay_configuration) {
-  channels_.resize(logged_configuration()->channels()->size());
   MakeRemappedConfig();
+
+  if (!configuration::MultiNode(configuration())) {
+    auto it = channel_mergers_.insert(std::make_pair(nullptr, State{}));
+    State *state = &(it.first->second);
+
+    state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+  }
 }
 
 LogReader::~LogReader() { Deregister(); }
 
 const Configuration *LogReader::logged_configuration() const {
-  return sorted_message_reader_.configuration();
+  return log_file_header_.message().configuration();
 }
 
 const Configuration *LogReader::configuration() const {
   return remapped_configuration_;
 }
 
-const Node *LogReader::node() const {
+std::vector<const Node *> LogReader::Nodes() const {
   // Because the Node pointer will only be valid if it actually points to memory
   // owned by remapped_configuration_, we need to wait for the
   // remapped_configuration_ to be populated before accessing it.
+  //
+  // Also, note, that when ever a map is changed, the nodes in here are
+  // invalidated.
   CHECK(remapped_configuration_ != nullptr)
       << ": Need to call Register before the node() pointer will be valid.";
-  if (sorted_message_reader_.node() == nullptr) {
-    return nullptr;
-  }
-  return configuration::GetNode(
-      configuration(), sorted_message_reader_.node()->name()->string_view());
+  return configuration::GetNodes(remapped_configuration_);
 }
 
-monotonic_clock::time_point LogReader::monotonic_start_time() {
-  return sorted_message_reader_.monotonic_start_time();
+monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
+  auto it = channel_mergers_.find(node);
+  CHECK(it != channel_mergers_.end())
+      << ": Unknown node " << FlatbufferToJson(node);
+  return it->second.channel_merger->monotonic_start_time();
 }
 
-realtime_clock::time_point LogReader::realtime_start_time() {
-  return sorted_message_reader_.realtime_start_time();
+realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
+  auto it = channel_mergers_.find(node);
+  CHECK(it != channel_mergers_.end())
+      << ": Unknown node " << FlatbufferToJson(node);
+  return it->second.channel_merger->realtime_start_time();
 }
 
 void LogReader::Register() {
@@ -282,126 +342,156 @@
 
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
-  node_event_loop_factory_ =
-      event_loop_factory_->GetNodeEventLoopFactory(node());
-  event_loop_unique_ptr_ =
-      event_loop_factory->MakeEventLoop("log_reader", node());
-  // We don't run timing reports when trying to print out logged data, because
-  // otherwise we would end up printing out the timing reports themselves...
-  // This is only really relevant when we are replaying into a simulation.
-  event_loop_unique_ptr_->SkipTimingReport();
+  // We want to start the log file at the last start time of the log files from
+  // all the nodes.  Compute how long each node's simulation needs to run to
+  // move time to this point.
+  monotonic_clock::duration run_time = monotonic_clock::duration(0);
 
-  Register(event_loop_unique_ptr_.get());
-  event_loop_factory_->RunFor(monotonic_start_time() -
-                              event_loop_->monotonic_now());
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    auto it = channel_mergers_.insert(std::make_pair(node, State{}));
+
+    State *state = &(it.first->second);
+
+    state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
+
+    state->node_event_loop_factory =
+        event_loop_factory_->GetNodeEventLoopFactory(node);
+    state->event_loop_unique_ptr =
+        event_loop_factory->MakeEventLoop("log_reader", node);
+
+    Register(state->event_loop_unique_ptr.get());
+
+    const monotonic_clock::duration startup_time =
+        state->channel_merger->monotonic_start_time() -
+        state->event_loop->monotonic_now();
+    if (startup_time > run_time) {
+      run_time = startup_time;
+    }
+  }
+
+  // Forwarding is tracked per channel.  If it is enabled, we want to turn it
+  // off.  Otherwise messages replayed will get forwarded across to the other
+  // nodes, and also replayed on the other nodes.  This may not satisfy all our
+  // users, but it'll start the discussion.
+  if (configuration::MultiNode(event_loop_factory_->configuration())) {
+    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+      const Channel *channel = logged_configuration()->channels()->Get(i);
+      const Node *node = configuration::GetNode(
+          configuration(), channel->source_node()->string_view());
+
+      auto state_pair = channel_mergers_.find(node);
+      CHECK(state_pair != channel_mergers_.end());
+      State *state = &(state_pair->second);
+
+      const Channel *remapped_channel =
+          RemapChannel(state->event_loop, channel);
+
+      event_loop_factory_->DisableForwarding(remapped_channel);
+    }
+  }
+
+  event_loop_factory_->RunFor(run_time);
 }
 
 void LogReader::Register(EventLoop *event_loop) {
-  event_loop_ = event_loop;
+  auto state_pair = channel_mergers_.find(event_loop->node());
+  CHECK(state_pair != channel_mergers_.end());
+  State *state = &(state_pair->second);
+
+  state->event_loop = event_loop;
 
   // We don't run timing reports when trying to print out logged data, because
   // otherwise we would end up printing out the timing reports themselves...
   // This is only really relevant when we are replaying into a simulation.
-  // Otherwise we replay the timing report and try to resend it...
-  event_loop_->SkipTimingReport();
-  event_loop_->SkipAosLog();
+  event_loop->SkipTimingReport();
+  event_loop->SkipAosLog();
 
-  for (size_t i = 0; i < channels_.size(); ++i) {
-    const Channel *const original_channel =
-        logged_configuration()->channels()->Get(i);
+  state->channel_merger->SetNode(event_loop->node());
 
-    std::string_view channel_name = original_channel->name()->string_view();
-    std::string_view channel_type = original_channel->type()->string_view();
-    // If the channel is remapped, find the correct channel name to use.
-    if (remapped_channels_.count(i) > 0) {
-      VLOG(2) << "Got remapped channel on "
-              << configuration::CleanedChannelToString(original_channel);
-      channel_name = remapped_channels_[i];
-    }
+  state->channels.resize(logged_configuration()->channels()->size());
 
-    VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
-    const Channel *channel = configuration::GetChannel(
-        event_loop_->configuration(), channel_name, channel_type,
-        event_loop_->name(), event_loop_->node());
+  for (size_t i = 0; i < state->channels.size(); ++i) {
+    const Channel *channel =
+        RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
 
-    CHECK(channel != nullptr)
-        << ": Unable to send {\"name\": \"" << channel_name
-        << "\", \"type\": \"" << channel_type
-        << "\"} because it is not in the provided configuration.";
-
-    channels_[i] = event_loop_->MakeRawSender(channel);
+    state->channels[i] = event_loop->MakeRawSender(channel);
   }
 
-  timer_handler_ = event_loop_->AddTimer([this]() {
-    if (sorted_message_reader_.active_channel_count() == 0u) {
-      event_loop_factory_->Exit();
+  state->timer_handler = event_loop->AddTimer([this, state]() {
+    if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
+      --live_nodes_;
+      if (live_nodes_ == 0) {
+        event_loop_factory_->Exit();
+      }
       return;
     }
-    monotonic_clock::time_point channel_timestamp;
+    TimestampMerger::DeliveryTimestamp channel_timestamp;
     int channel_index;
     FlatbufferVector<MessageHeader> channel_data =
         FlatbufferVector<MessageHeader>::Empty();
 
     std::tie(channel_timestamp, channel_index, channel_data) =
-        sorted_message_reader_.PopOldestChannel();
+        state->channel_merger->PopOldest();
 
     const monotonic_clock::time_point monotonic_now =
-        event_loop_->context().monotonic_event_time;
-    CHECK(monotonic_now == channel_timestamp)
+        state->event_loop->context().monotonic_event_time;
+    CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
         << ": Now " << monotonic_now.time_since_epoch().count()
-        << " trying to send " << channel_timestamp.time_since_epoch().count();
+        << " trying to send "
+        << channel_timestamp.monotonic_event_time.time_since_epoch().count();
 
-    if (channel_timestamp > monotonic_start_time() ||
+    if (channel_timestamp.monotonic_event_time >
+            state->channel_merger->monotonic_start_time() ||
         event_loop_factory_ != nullptr) {
       if (!FLAGS_skip_missing_forwarding_entries ||
           channel_data.message().data() != nullptr) {
         CHECK(channel_data.message().data() != nullptr)
             << ": Got a message without data.  Forwarding entry which was "
-               "not "
-               "matched?  Use --skip_missing_forwarding_entries to ignore "
+               "not matched?  Use --skip_missing_forwarding_entries to ignore "
                "this.";
 
         // If we have access to the factory, use it to fix the realtime time.
-        if (node_event_loop_factory_ != nullptr) {
-          node_event_loop_factory_->SetRealtimeOffset(
-              monotonic_clock::time_point(chrono::nanoseconds(
-                  channel_data.message().monotonic_sent_time())),
-              realtime_clock::time_point(chrono::nanoseconds(
-                  channel_data.message().realtime_sent_time())));
+        if (state->node_event_loop_factory != nullptr) {
+          state->node_event_loop_factory->SetRealtimeOffset(
+              channel_timestamp.monotonic_event_time,
+              channel_timestamp.realtime_event_time);
         }
 
-        channels_[channel_index]->Send(
+        state->channels[channel_index]->Send(
             channel_data.message().data()->Data(),
             channel_data.message().data()->size(),
-            monotonic_clock::time_point(chrono::nanoseconds(
-                channel_data.message().monotonic_remote_time())),
-            realtime_clock::time_point(chrono::nanoseconds(
-                channel_data.message().realtime_remote_time())),
-            channel_data.message().remote_queue_index());
+            channel_timestamp.monotonic_remote_time,
+            channel_timestamp.realtime_remote_time,
+            channel_timestamp.remote_queue_index);
       }
     } else {
-      LOG(WARNING) << "Not sending data from before the start of the log file. "
-                   << channel_timestamp.time_since_epoch().count() << " start "
-                   << monotonic_start_time().time_since_epoch().count() << " "
-                   << FlatbufferToJson(channel_data);
+      LOG(WARNING)
+          << "Not sending data from before the start of the log file. "
+          << channel_timestamp.monotonic_event_time.time_since_epoch().count()
+          << " start " << monotonic_start_time().time_since_epoch().count()
+          << " " << FlatbufferToJson(channel_data);
     }
 
-    if (sorted_message_reader_.active_channel_count() > 0u) {
-      timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+    const monotonic_clock::time_point next_time =
+        state->channel_merger->OldestMessage();
+    if (next_time != monotonic_clock::max_time) {
+      state->timer_handler->Setup(next_time);
     } else {
       // Set a timer up immediately after now to die. If we don't do this, then
       // the senders waiting on the message we just read will never get called.
       if (event_loop_factory_ != nullptr) {
-        timer_handler_->Setup(monotonic_now +
-                              event_loop_factory_->send_delay() +
-                              std::chrono::nanoseconds(1));
+        state->timer_handler->Setup(monotonic_now +
+                                    event_loop_factory_->send_delay() +
+                                    std::chrono::nanoseconds(1));
       }
     }
   });
 
-  if (sorted_message_reader_.active_channel_count() > 0u) {
-    event_loop_->OnRun([this]() {
-      timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+  ++live_nodes_;
+
+  if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) {
+    event_loop->OnRun([state]() {
+      state->timer_handler->Setup(state->channel_merger->OldestMessage());
     });
   }
 }
@@ -409,15 +499,20 @@
 void LogReader::Deregister() {
   // Make sure that things get destroyed in the correct order, rather than
   // relying on getting the order correct in the class definition.
-  for (size_t i = 0; i < channels_.size(); ++i) {
-    channels_[i].reset();
+  for (const Node *node : Nodes()) {
+    auto state_pair = channel_mergers_.find(node);
+    CHECK(state_pair != channel_mergers_.end());
+    State *state = &(state_pair->second);
+    for (size_t i = 0; i < state->channels.size(); ++i) {
+      state->channels[i].reset();
+    }
+    state->event_loop_unique_ptr.reset();
+    state->event_loop = nullptr;
+    state->node_event_loop_factory = nullptr;
   }
 
-  event_loop_unique_ptr_.reset();
-  event_loop_ = nullptr;
   event_loop_factory_unique_ptr_.reset();
   event_loop_factory_ = nullptr;
-  node_event_loop_factory_ = nullptr;
 }
 
 void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -442,13 +537,15 @@
 }
 
 void LogReader::MakeRemappedConfig() {
-  CHECK(!event_loop_)
-      << ": Can't change the mapping after the events are scheduled.";
+  for (std::pair<const Node *const, State> &state : channel_mergers_) {
+    CHECK(!state.second.event_loop)
+        << ": Can't change the mapping after the events are scheduled.";
+  }
 
   // If no remapping occurred and we are using the original config, then there
   // is nothing interesting to do here.
   if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
-    remapped_configuration_ = sorted_message_reader_.configuration();
+    remapped_configuration_ = logged_configuration();
     return;
   }
   // Config to copy Channel definitions from. Use the specified
@@ -526,5 +623,30 @@
   remapped_configuration_ = &remapped_configuration_buffer_->message();
 }
 
+const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
+                                       const Channel *channel) {
+  std::string_view channel_name = channel->name()->string_view();
+  std::string_view channel_type = channel->type()->string_view();
+  const int channel_index =
+      configuration::ChannelIndex(logged_configuration(), channel);
+  // If the channel is remapped, find the correct channel name to use.
+  if (remapped_channels_.count(channel_index) > 0) {
+    VLOG(2) << "Got remapped channel on "
+            << configuration::CleanedChannelToString(channel);
+    channel_name = remapped_channels_[channel_index];
+  }
+
+  VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
+  const Channel *remapped_channel = configuration::GetChannel(
+      event_loop->configuration(), channel_name, channel_type,
+      event_loop->name(), event_loop->node());
+
+  CHECK(remapped_channel != nullptr)
+      << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
+      << channel_type << "\"} because it is not in the provided configuration.";
+
+  return remapped_channel;
+}
+
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 54b55d8..5e3ca52 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -2,8 +2,8 @@
 #define AOS_EVENTS_LOGGER_H_
 
 #include <deque>
-#include <vector>
 #include <string_view>
+#include <vector>
 
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
@@ -16,6 +16,164 @@
 namespace aos {
 namespace logger {
 
+class LogNamer {
+ public:
+  LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
+  virtual ~LogNamer() {}
+
+  virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
+                           const Node *node) = 0;
+  virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
+
+  virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
+  const std::vector<const Node *> &nodes() const { return nodes_; }
+
+  const Node *node() const { return node_; }
+
+ protected:
+  const Node *const node_;
+  std::vector<const Node *> nodes_;
+};
+
+class LocalLogNamer : public LogNamer {
+ public:
+  LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
+      : LogNamer(node), writer_(writer) {}
+
+  ~LocalLogNamer() override { writer_->Flush(); }
+
+  void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
+                   const Node *node) override {
+    CHECK_EQ(node, this->node());
+    writer_->WriteSizedFlatbuffer(
+        absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
+  }
+
+  DetachedBufferWriter *MakeWriter(const Channel *channel) override {
+    CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
+    return writer_;
+  }
+
+  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
+    CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
+        << ": Message is not delivered to this node.";
+    CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
+    CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
+                                                              node_))
+        << ": Delivery times aren't logged for this channel on this node.";
+    return writer_;
+  }
+
+ private:
+  DetachedBufferWriter *writer_;
+};
+
+// TODO(austin): Split naming files from making files so we can re-use the
+// naming code to predict the log file names for a provided base name.
+class MultiNodeLogNamer : public LogNamer {
+ public:
+  MultiNodeLogNamer(std::string_view base_name,
+                    const Configuration *configuration, const Node *node)
+      : LogNamer(node),
+        base_name_(base_name),
+        configuration_(configuration),
+        data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
+            base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
+
+  // Writes the header to all log files for a specific node.  This function
+  // needs to be called after all the writers are created.
+  void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
+    if (node == this->node()) {
+      data_writer_->WriteSizedFlatbuffer(
+          absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
+    } else {
+      for (std::pair<const Channel *const,
+                     std::unique_ptr<DetachedBufferWriter>> &data_writer :
+           data_writers_) {
+        if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
+          data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
+              fbb->GetBufferPointer(), fbb->GetSize()));
+        }
+      }
+    }
+  }
+
+  // Makes a data logger for a specific channel.
+  DetachedBufferWriter *MakeWriter(const Channel *channel) {
+    // See if we can read the data on this node at all.
+    const bool is_readable =
+        configuration::ChannelIsReadableOnNode(channel, this->node());
+    if (!is_readable) {
+      return nullptr;
+    }
+
+    // Then, see if we are supposed to log the data here.
+    const bool log_message =
+        configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
+
+    if (!log_message) {
+      return nullptr;
+    }
+
+    // Now, sort out if this is data generated on this node, or not.  It is
+    // generated if it is sendable on this node.
+    if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
+      return data_writer_.get();
+    } else {
+      // Ok, we have data that is being forwarded to us that we are supposed to
+      // log.  It needs to be logged with send timestamps, but be sorted enough
+      // to be able to be processed.
+      CHECK(data_writers_.find(channel) == data_writers_.end());
+
+      // Track that this node is being logged.
+      if (configuration::MultiNode(configuration_)) {
+        const Node *source_node = configuration::GetNode(
+            configuration_, channel->source_node()->string_view());
+        if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
+            nodes_.end()) {
+          nodes_.emplace_back(source_node);
+        }
+      }
+
+      return data_writers_
+          .insert(std::make_pair(
+              channel,
+              std::make_unique<DetachedBufferWriter>(absl::StrCat(
+                  base_name_, "_", channel->source_node()->string_view(),
+                  "_data", channel->name()->string_view(), "/",
+                  channel->type()->string_view(), ".bfbs"))))
+          .first->second.get();
+    }
+  }
+
+  // Makes a timestamp (or timestamp and data) logger for a channel and
+  // forwarding connection.
+  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
+    const bool log_delivery_times =
+        (this->node() == nullptr)
+            ? false
+            : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+                  channel, this->node(), this->node());
+    if (!log_delivery_times) {
+      return nullptr;
+    }
+
+    return data_writer_.get();
+  }
+
+  const std::vector<const Node *> &nodes() const { return nodes_; }
+
+ private:
+  const std::string base_name_;
+  const Configuration *const configuration_;
+
+  // File to write both delivery timestamps and local data to.
+  std::unique_ptr<DetachedBufferWriter> data_writer_;
+  // Files to write remote data to.  We want one per channel.
+  std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
+      data_writers_;
+};
+
 // Logs all channels available in the event loop to disk every 100 ms.
 // Start by logging one message per channel to capture any state and
 // configuration that is sent rately on a channel and would affect execution.
@@ -24,18 +182,23 @@
   Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
          std::chrono::milliseconds polling_period =
              std::chrono::milliseconds(100));
+  Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+         std::chrono::milliseconds polling_period =
+             std::chrono::milliseconds(100));
 
   // Rotates the log file with the new writer.  This writes out the header
   // again, but keeps going as if nothing else happened.
   void Rotate(DetachedBufferWriter *writer);
+  void Rotate(std::unique_ptr<LogNamer> log_namer);
 
  private:
   void WriteHeader();
+  void WriteHeader(const Node *node);
 
   void DoLogData();
 
   EventLoop *event_loop_;
-  DetachedBufferWriter *writer_;
+  std::unique_ptr<LogNamer> log_namer_;
 
   // Structure to track both a fetcher, and if the data fetched has been
   // written.  We may want to delay writing data to disk so that we don't let
@@ -45,7 +208,12 @@
     std::unique_ptr<RawFetcher> fetcher;
     bool written = false;
 
-    LogType log_type;
+    int channel_index = -1;
+
+    LogType log_type = LogType::kLogMessage;
+
+    DetachedBufferWriter *writer = nullptr;
+    DetachedBufferWriter *timestamp_writer = nullptr;
   };
 
   std::vector<FetcherStruct> fetchers_;
@@ -65,6 +233,34 @@
   size_t max_header_size_ = 0;
 };
 
+// We end up with one of the following 3 log file types.
+//
+// Single node logged as the source node.
+//   -> Replayed just on the source node.
+//
+// Forwarding timestamps only logged from the perspective of the destination
+// node.
+//   -> Matched with data on source node and logged.
+//
+// Forwarding timestamps with data logged as the destination node.
+//   -> Replayed just as the destination
+//   -> Replayed as the source (Much harder, ordering is not defined)
+//
+// Duplicate data logged. -> CHECK that it matches and explode otherwise.
+//
+// This can be boiled down to a set of constraints and tools.
+//
+// 1) Forwarding timestamps and data need to be logged separately.
+// 2) Any forwarded data logged on the destination node needs to be logged
+//   separately such that it can be sorted.
+//
+// 1) Log reader needs to be able to sort a list of log files.
+// 2) Log reader needs to be able to merge sorted lists of log files.
+// 3) Log reader needs to be able to match timestamps with messages.
+//
+// We also need to be able to generate multiple views of a log file depending on
+// the target.
+
 // Replays all the channels in the logfile to the event loop.
 class LogReader {
  public:
@@ -72,9 +268,23 @@
   // (e.g., to change message rates, or to populate an updated schema), then
   // pass it in here. It must provide all the channels that the original logged
   // config did.
+  //
+  // Log filenames are in the following format:
+  //
+  //   {
+  //     {log1_part0, log1_part1, ...},
+  //     {log2}
+  //   }
+  // The inner vector is a list of log file chunks which form up a log file.
+  // The outer vector is a list of log files with subsets of the messages, or
+  // messages from different nodes.
+  //
+  // If the outer vector isn't provided, it is assumed to be of size 1.
   LogReader(std::string_view filename,
             const Configuration *replay_configuration = nullptr);
-  LogReader(const std::vector<std::string> &filename,
+  LogReader(const std::vector<std::string> &filenames,
+            const Configuration *replay_configuration = nullptr);
+  LogReader(const std::vector<std::vector<std::string>> &filenames,
             const Configuration *replay_configuration = nullptr);
   ~LogReader();
 
@@ -101,19 +311,17 @@
   // Returns the configuration from the log file.
   const Configuration *logged_configuration() const;
   // Returns the configuration being used for replay.
+  // The pointer is invalidated whenever RemapLoggedChannel is called.
   const Configuration *configuration() const;
 
-  const LogFileHeader *log_file_header() const {
-    return sorted_message_reader_.log_file_header();
-  }
-
-  // Returns the node that this log file was created on.  This is a pointer to a
-  // node in the nodes() list inside configuration().
-  const Node *node() const;
+  // Returns the nodes that this log file was created on.  This is a list of
+  // pointers to a node in the nodes() list inside configuration().  The
+  // pointers here are invalidated whenever RemapLoggedChannel is called.
+  std::vector<const Node *> Nodes() const;
 
   // Returns the starting timestamp for the log file.
-  monotonic_clock::time_point monotonic_start_time();
-  realtime_clock::time_point realtime_start_time();
+  monotonic_clock::time_point monotonic_start_time(const Node *node = nullptr);
+  realtime_clock::time_point realtime_start_time(const Node *node = nullptr);
 
   // Causes the logger to publish the provided channel on a different name so
   // that replayed applications can publish on the proper channel name without
@@ -131,31 +339,48 @@
     return event_loop_factory_;
   }
 
-  // TODO(austin): Add the ability to re-publish the fetched messages.  Add 2
-  // options, one which publishes them *now*, and another which publishes them
-  // to the simulated event loop factory back in time where they actually
-  // happened.
-
  private:
+  const Channel *RemapChannel(const EventLoop *event_loop,
+                              const Channel *channel);
+
+  const LogFileHeader *log_file_header() const {
+    return &log_file_header_.message();
+  }
+
   // Queues at least max_out_of_order_duration_ messages into channels_.
   void QueueMessages();
   // Handle constructing a configuration with all the additional remapped
   // channels from calls to RemapLoggedChannel.
   void MakeRemappedConfig();
 
-  // Log chunk reader.
-  SortedMessageReader sorted_message_reader_;
+  const std::vector<std::vector<std::string>> filenames_;
+
+  // This is *a* log file header used to provide the logged config.  The rest of
+  // the header is likely distracting.
+  FlatbufferVector<LogFileHeader> log_file_header_;
+
+  // State per node.
+  struct State {
+    // Log file.
+    std::unique_ptr<ChannelMerger> channel_merger;
+    // Senders.
+    std::vector<std::unique_ptr<RawSender>> channels;
+
+    // Factory (if we are in sim) that this loop was created on.
+    NodeEventLoopFactory *node_event_loop_factory = nullptr;
+    std::unique_ptr<EventLoop> event_loop_unique_ptr;
+    // Event loop.
+    EventLoop *event_loop = nullptr;
+    // And timer used to send messages.
+    TimerHandler *timer_handler;
+  };
+
+  // Map of nodes to States used to hold all the state for all the nodes.
+  std::map<const Node *, State> channel_mergers_;
 
   std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
       remapped_configuration_buffer_;
 
-  std::vector<std::unique_ptr<RawSender>> channels_;
-
-  std::unique_ptr<EventLoop> event_loop_unique_ptr_;
-  NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
-  EventLoop *event_loop_ = nullptr;
-  TimerHandler *timer_handler_;
-
   std::unique_ptr<SimulatedEventLoopFactory> event_loop_factory_unique_ptr_;
   SimulatedEventLoopFactory *event_loop_factory_ = nullptr;
 
@@ -164,6 +389,10 @@
   // to send on instead of the logged channel name.
   std::map<size_t, std::string> remapped_channels_;
 
+  // Number of nodes which still have data to send.  This is used to figure out
+  // when to exit.
+  size_t live_nodes_ = 0;
+
   const Configuration *remapped_configuration_ = nullptr;
   const Configuration *replay_configuration_ = nullptr;
 };
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 55d0ecc..3a969d4 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -5,6 +5,7 @@
 #include "aos/events/pong_lib.h"
 #include "aos/events/simulated_event_loop.h"
 #include "glog/logging.h"
+#include "gmock/gmock.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -70,7 +71,7 @@
   // log file.
   reader.Register();
 
-  EXPECT_EQ(reader.node(), nullptr);
+  EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(nullptr));
 
   std::unique_ptr<EventLoop> test_event_loop =
       reader.event_loop_factory()->MakeEventLoop("log_reader");
@@ -135,7 +136,7 @@
   // log file.
   reader.Register();
 
-  EXPECT_EQ(reader.node(), nullptr);
+  EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(nullptr));
 
   std::unique_ptr<EventLoop> test_event_loop =
       reader.event_loop_factory()->MakeEventLoop("log_reader");
@@ -212,7 +213,11 @@
         ping_event_loop_(event_loop_factory_.MakeEventLoop(
             "ping", configuration::GetNode(event_loop_factory_.configuration(),
                                            "pi1"))),
-        ping_(ping_event_loop_.get()) {}
+        ping_(ping_event_loop_.get()),
+        pong_event_loop_(event_loop_factory_.MakeEventLoop(
+            "pong", configuration::GetNode(event_loop_factory_.configuration(),
+                                           "pi2"))),
+        pong_(pong_event_loop_.get()) {}
 
   // Config and factory.
   aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
@@ -221,65 +226,154 @@
   // Event loop and app for Ping
   std::unique_ptr<EventLoop> ping_event_loop_;
   Ping ping_;
+
+  // Event loop and app for Pong
+  std::unique_ptr<EventLoop> pong_event_loop_;
+  Pong pong_;
 };
 
-// Tests that we can startup at all in a multinode configuration.
-TEST_F(MultinodeLoggerTest, MultiNode) {
-  constexpr chrono::seconds kTimeOffset = chrono::seconds(10000);
-  constexpr uint32_t kQueueIndexOffset = 1024;
-  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
-  const ::std::string logfile = tmpdir + "/multi_logfile.bfbs";
-  // Remove it.
-  unlink(logfile.c_str());
+// Counts the number of messages on a channel (returns channel, count) for every
+// message matching matcher()
+std::vector<std::pair<int, int>> CountChannelsMatching(
+    std::string_view filename,
+    std::function<bool(const MessageHeader *)> matcher) {
+  MessageReader message_reader(filename);
+  std::vector<int> counts(
+      message_reader.log_file_header()->configuration()->channels()->size(), 0);
 
-  LOG(INFO) << "Logging data to " << logfile;
+  while (true) {
+    std::optional<FlatbufferVector<MessageHeader>> msg =
+        message_reader.ReadMessage();
+    if (!msg) {
+      break;
+    }
+
+    if (matcher(&msg.value().message())) {
+      counts[msg.value().message().channel_index()]++;
+    }
+  }
+
+  std::vector<std::pair<int, int>> result;
+  int channel = 0;
+  for (size_t i = 0; i < counts.size(); ++i) {
+    if (counts[i] != 0) {
+      result.push_back(std::make_pair(channel, counts[i]));
+    }
+    ++channel;
+  }
+
+  return result;
+}
+
+// Counts the number of messages (channel, count) for all data messages.
+std::vector<std::pair<int, int>> CountChannelsData(std::string_view filename) {
+  return CountChannelsMatching(filename, [](const MessageHeader *msg) {
+    if (msg->has_data()) {
+      CHECK(!msg->has_monotonic_remote_time());
+      CHECK(!msg->has_realtime_remote_time());
+      CHECK(!msg->has_remote_queue_index());
+      return true;
+    }
+    return false;
+  });
+}
+
+// Counts the number of messages (channel, count) for all timestamp messages.
+std::vector<std::pair<int, int>> CountChannelsTimestamp(
+    std::string_view filename) {
+  return CountChannelsMatching(filename, [](const MessageHeader *msg) {
+    if (!msg->has_data()) {
+      CHECK(msg->has_monotonic_remote_time());
+      CHECK(msg->has_realtime_remote_time());
+      CHECK(msg->has_remote_queue_index());
+      return true;
+    }
+    return false;
+  });
+}
+
+// Tests that we can write and read multi-node log files correctly.
+TEST_F(MultinodeLoggerTest, MultiNode) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string logfile_base = tmpdir + "/multi_logfile";
+  const ::std::string logfile1 = logfile_base + "_pi1_data.bfbs";
+  const ::std::string logfile2 =
+      logfile_base + "_pi2_data/test/aos.examples.Pong.bfbs";
+  const ::std::string logfile3 = logfile_base + "_pi2_data.bfbs";
+
+  // Remove them.
+  unlink(logfile1.c_str());
+  unlink(logfile2.c_str());
+  unlink(logfile3.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile1 << ", " << logfile2 << " and "
+            << logfile3;
 
   {
     const Node *pi1 =
         configuration::GetNode(event_loop_factory_.configuration(), "pi1");
-    std::unique_ptr<EventLoop> pong_event_loop =
-        event_loop_factory_.MakeEventLoop("pong", pi1);
+    const Node *pi2 =
+        configuration::GetNode(event_loop_factory_.configuration(), "pi2");
 
-    std::unique_ptr<aos::RawSender> pong_sender(
-        pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
-            pong_event_loop->configuration(), "/test", "aos.examples.Pong",
-            pong_event_loop->name(), pong_event_loop->node())));
-
-    // Ok, let's fake a remote node.  We use the fancy raw sender Send
-    // method that message_gateway will use to do that.
-    int pong_count = 0;
-    pong_event_loop->MakeWatcher(
-        "/test", [&pong_event_loop, &pong_count, &pong_sender,
-                  kTimeOffset](const examples::Ping &ping) {
-          flatbuffers::FlatBufferBuilder fbb;
-          examples::Pong::Builder pong_builder(fbb);
-          pong_builder.add_value(ping.value());
-          pong_builder.add_initial_send_time(ping.send_time());
-          fbb.Finish(pong_builder.Finish());
-
-          pong_sender->Send(fbb.GetBufferPointer(), fbb.GetSize(),
-                            pong_event_loop->monotonic_now() + kTimeOffset,
-                            pong_event_loop->realtime_now() + kTimeOffset,
-                            kQueueIndexOffset + pong_count);
-          ++pong_count;
-        });
-
-    DetachedBufferWriter writer(logfile);
-    std::unique_ptr<EventLoop> logger_event_loop =
+    std::unique_ptr<EventLoop> pi1_logger_event_loop =
         event_loop_factory_.MakeEventLoop("logger", pi1);
+    std::unique_ptr<LogNamer> pi1_log_namer =
+        std::make_unique<MultiNodeLogNamer>(
+            logfile_base, pi1_logger_event_loop->configuration(),
+            pi1_logger_event_loop->node());
+
+    std::unique_ptr<EventLoop> pi2_logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger", pi2);
+    std::unique_ptr<LogNamer> pi2_log_namer =
+        std::make_unique<MultiNodeLogNamer>(
+            logfile_base, pi2_logger_event_loop->configuration(),
+            pi2_logger_event_loop->node());
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
-    Logger logger(&writer, logger_event_loop.get(),
-                  std::chrono::milliseconds(100));
+    Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
+                      std::chrono::milliseconds(100));
+    Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
+                      std::chrono::milliseconds(100));
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
-  LogReader reader(logfile);
+  {
+    // Confirm that the headers are all for the correct nodes.
+    FlatbufferVector<LogFileHeader> logheader1 = ReadHeader(logfile1);
+    EXPECT_EQ(logheader1.message().node()->name()->string_view(), "pi1");
+    FlatbufferVector<LogFileHeader> logheader2 = ReadHeader(logfile2);
+    EXPECT_EQ(logheader2.message().node()->name()->string_view(), "pi2");
+    FlatbufferVector<LogFileHeader> logheader3 = ReadHeader(logfile3);
+    EXPECT_EQ(logheader3.message().node()->name()->string_view(), "pi2");
 
-  // TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
-  // messages.  This won't work today yet until the log reading code gets
-  // significantly better.
+    // Timing reports, pings
+    EXPECT_THAT(CountChannelsData(logfile1),
+                ::testing::ElementsAre(::testing::Pair(1, 60),
+                                       ::testing::Pair(4, 2001)));
+    // Timestamps for pong
+    EXPECT_THAT(CountChannelsTimestamp(logfile1),
+                ::testing::ElementsAre(::testing::Pair(5, 2001)));
+
+    // Pong data.
+    EXPECT_THAT(CountChannelsData(logfile2),
+                ::testing::ElementsAre(::testing::Pair(5, 2001)));
+    // No timestamps
+    EXPECT_THAT(CountChannelsTimestamp(logfile2), ::testing::ElementsAre());
+
+    // Timing reports and pongs.
+    EXPECT_THAT(CountChannelsData(logfile3),
+                ::testing::ElementsAre(::testing::Pair(3, 60),
+                                       ::testing::Pair(5, 2001)));
+    // And ping timestamps.
+    EXPECT_THAT(CountChannelsTimestamp(logfile3),
+                ::testing::ElementsAre(::testing::Pair(4, 2001)));
+  }
+
+  LogReader reader({std::vector<std::string>{logfile1},
+                    std::vector<std::string>{logfile2},
+                    std::vector<std::string>{logfile3}});
+
   SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
@@ -289,48 +383,138 @@
 
   const Node *pi1 =
       configuration::GetNode(log_reader_factory.configuration(), "pi1");
+  const Node *pi2 =
+      configuration::GetNode(log_reader_factory.configuration(), "pi2");
 
-  ASSERT_NE(reader.node(), nullptr);
-  EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
+  EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
 
   reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
 
-  std::unique_ptr<EventLoop> test_event_loop =
+  std::unique_ptr<EventLoop> pi1_event_loop =
       log_reader_factory.MakeEventLoop("test", pi1);
+  std::unique_ptr<EventLoop> pi2_event_loop =
+      log_reader_factory.MakeEventLoop("test", pi2);
 
-  int ping_count = 10;
-  int pong_count = 10;
+  int pi1_ping_count = 10;
+  int pi2_ping_count = 10;
+  int pi1_pong_count = 10;
+  int pi2_pong_count = 10;
 
   // Confirm that the ping value matches.
-  test_event_loop->MakeWatcher("/test",
-                               [&ping_count](const examples::Ping &ping) {
-                                 EXPECT_EQ(ping.value(), ping_count + 1);
-                                 ++ping_count;
-                               });
-  // Confirm that the ping and pong counts both match, and the value also
-  // matches.
-  test_event_loop->MakeWatcher(
-      "/test", [&test_event_loop, &ping_count, &pong_count,
-                kTimeOffset](const examples::Pong &pong) {
-        EXPECT_EQ(test_event_loop->context().remote_queue_index,
-                  pong_count + kQueueIndexOffset);
-        EXPECT_EQ(test_event_loop->context().monotonic_remote_time,
-                  test_event_loop->monotonic_now() + kTimeOffset);
-        EXPECT_EQ(test_event_loop->context().realtime_remote_time,
-                  test_event_loop->realtime_now() + kTimeOffset);
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
+        VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
+                << pi1_event_loop->context().monotonic_remote_time << " -> "
+                << pi1_event_loop->context().monotonic_event_time;
+        EXPECT_EQ(ping.value(), pi1_ping_count + 1);
+        EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
+                  pi1_ping_count * chrono::milliseconds(10) +
+                      monotonic_clock::epoch());
+        EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
+                  pi1_ping_count * chrono::milliseconds(10) +
+                      realtime_clock::epoch());
+        EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
+                  pi1_event_loop->context().monotonic_event_time);
+        EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
+                  pi1_event_loop->context().realtime_event_time);
 
-        EXPECT_EQ(pong.value(), pong_count + 1);
-        ++pong_count;
-        EXPECT_EQ(ping_count, pong_count);
+        ++pi1_ping_count;
+      });
+  pi2_event_loop->MakeWatcher(
+      "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
+        VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
+                << pi2_event_loop->context().monotonic_remote_time << " -> "
+                << pi2_event_loop->context().monotonic_event_time;
+        EXPECT_EQ(ping.value(), pi2_ping_count + 1);
+
+        EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+                  pi2_ping_count * chrono::milliseconds(10) +
+                      monotonic_clock::epoch());
+        EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+                  pi2_ping_count * chrono::milliseconds(10) +
+                      realtime_clock::epoch());
+        EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
+                      chrono::microseconds(150),
+                  pi2_event_loop->context().monotonic_event_time);
+        EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
+                      chrono::microseconds(150),
+                  pi2_event_loop->context().realtime_event_time);
+        ++pi2_ping_count;
       });
 
-  log_reader_factory.RunFor(std::chrono::seconds(100));
-  EXPECT_EQ(ping_count, 2010);
-  EXPECT_EQ(pong_count, 2010);
+  constexpr ssize_t kQueueIndexOffset = 0;
+  // Confirm that the ping and pong counts both match, and the value also
+  // matches.
+  pi1_event_loop->MakeWatcher(
+      "/test", [&pi1_event_loop, &pi1_ping_count,
+                &pi1_pong_count](const examples::Pong &pong) {
+        VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
+                << pi1_event_loop->context().monotonic_remote_time << " -> "
+                << pi1_event_loop->context().monotonic_event_time;
+
+        EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
+                  pi1_pong_count + kQueueIndexOffset);
+        EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
+                  chrono::microseconds(200) +
+                      pi1_pong_count * chrono::milliseconds(10) +
+                      monotonic_clock::epoch());
+        EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
+                  chrono::microseconds(200) +
+                      pi1_pong_count * chrono::milliseconds(10) +
+                      realtime_clock::epoch());
+
+        EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
+                      chrono::microseconds(150),
+                  pi1_event_loop->context().monotonic_event_time);
+        EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
+                      chrono::microseconds(150),
+                  pi1_event_loop->context().realtime_event_time);
+
+        EXPECT_EQ(pong.value(), pi1_pong_count + 1);
+        ++pi1_pong_count;
+        EXPECT_EQ(pi1_ping_count, pi1_pong_count);
+      });
+  pi2_event_loop->MakeWatcher(
+      "/test", [&pi2_event_loop, &pi2_ping_count,
+                &pi2_pong_count](const examples::Pong &pong) {
+        VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
+                << pi2_event_loop->context().monotonic_remote_time << " -> "
+                << pi2_event_loop->context().monotonic_event_time;
+
+        EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
+                  pi2_pong_count + kQueueIndexOffset - 9);
+
+        EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+                  chrono::microseconds(200) +
+                      pi2_pong_count * chrono::milliseconds(10) +
+                      monotonic_clock::epoch());
+        EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+                  chrono::microseconds(200) +
+                      pi2_pong_count * chrono::milliseconds(10) +
+                      realtime_clock::epoch());
+
+        EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+                  pi2_event_loop->context().monotonic_event_time);
+        EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+                  pi2_event_loop->context().realtime_event_time);
+
+        EXPECT_EQ(pong.value(), pi2_pong_count + 1);
+        ++pi2_pong_count;
+        EXPECT_EQ(pi2_ping_count, pi2_pong_count);
+      });
+
+  log_reader_factory.Run();
+  EXPECT_EQ(pi1_ping_count, 2010);
+  EXPECT_EQ(pi2_ping_count, 2010);
+  EXPECT_EQ(pi1_pong_count, 2010);
+  EXPECT_EQ(pi2_pong_count, 2010);
 
   reader.Deregister();
 }
 
+// TODO(austin): We can write a test which recreates a logfile and confirms that
+// we get it back.  That is the ultimate test.
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
index be8a402..957cc2b 100644
--- a/aos/events/logging/multinode_pingpong.json
+++ b/aos/events/logging/multinode_pingpong.json
@@ -33,9 +33,7 @@
       "num_senders": 20,
       "max_size": 2048
     },
-    /* Forwarded to pi2.
-     * Doesn't matter where timestamps are logged for the test.
-     */
+    /* Forwarded to pi2 */
     {
       "name": "/test",
       "type": "aos.examples.Ping",
@@ -44,7 +42,7 @@
         {
           "name": "pi2",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
           "timestamp_logger_node": "pi1",
           "time_to_live": 5000000
         }
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 01574fa..faa9fe9 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -755,15 +755,8 @@
 
 SimulatedEventLoopFactory::SimulatedEventLoopFactory(
     const Configuration *configuration)
-    : configuration_(CHECK_NOTNULL(configuration)) {
-  if (configuration::MultiNode(configuration_)) {
-    for (const Node *node : *configuration->nodes()) {
-      nodes_.emplace_back(node);
-    }
-  } else {
-    nodes_.emplace_back(nullptr);
-  }
-
+    : configuration_(CHECK_NOTNULL(configuration)),
+      nodes_(configuration::GetNodes(configuration_)) {
   for (const Node *node : nodes_) {
     node_factories_.emplace_back(
         new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
@@ -838,4 +831,8 @@
   }
 }
 
+void SimulatedEventLoopFactory::DisableForwarding(const Channel *channel) {
+  bridge_->DisableForwarding(channel);
+}
+
 }  // namespace aos
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 8cff0d7..21b241a 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -98,6 +98,10 @@
   // Returns the configuration used for everything.
   const Configuration *configuration() const { return configuration_; }
 
+  // Disables forwarding for this channel.  This should be used very rarely only
+  // for things like the logger.
+  void DisableForwarding(const Channel *channel);
+
  private:
   const Configuration *const configuration_;
   EventScheduler scheduler_;
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 57f2efd..10ce37d 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -28,6 +28,8 @@
     Schedule();
   }
 
+  const Channel *channel() const { return fetcher_->channel(); }
+
   // Kicks us to re-fetch and schedule the timer.
   void Schedule() {
     if (fetcher_->context().data == nullptr || sent_) {
@@ -161,5 +163,22 @@
 
 SimulatedMessageBridge::~SimulatedMessageBridge() {}
 
+void SimulatedMessageBridge::DisableForwarding(const Channel *channel) {
+  for (std::unique_ptr<std::vector<std::unique_ptr<RawMessageDelayer>>>
+           &delayers : delayers_list_) {
+    if (delayers->size() > 0) {
+      if ((*delayers)[0]->channel() == channel) {
+        for (std::unique_ptr<RawMessageDelayer> &delayer : *delayers) {
+          CHECK(delayer->channel() == channel);
+        }
+
+        // If we clear the delayers list, nothing will be scheduled.  Which is a
+        // success!
+        delayers->clear();
+      }
+    }
+  }
+}
+
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 5d613ab..7aeef64 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -19,12 +19,15 @@
       SimulatedEventLoopFactory *simulated_event_loop_factory);
   ~SimulatedMessageBridge();
 
+  // Disables forwarding for this channel.  This should be used very rarely only
+  // for things like the logger.
+  void DisableForwarding(const Channel *channel);
+
  private:
   // Map of nodes to event loops.  This is a member variable so that the
   // lifetime of the event loops matches the lifetime of the bridge.
   std::map<const Node *, std::unique_ptr<aos::EventLoop>> event_loop_map_;
 
-
   // List of delayers used to resend the messages.
   using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
   std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index d9fcab6..e556c0f 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -4,6 +4,7 @@
 #include <array>
 #include <string_view>
 
+#include "absl/types/span.h"
 #include "flatbuffers/flatbuffers.h"
 #include "glog/logging.h"
 
@@ -106,6 +107,11 @@
   virtual const uint8_t *data() const = 0;
   virtual uint8_t *data() = 0;
   virtual size_t size() const = 0;
+
+  absl::Span<uint8_t> span() { return absl::Span<uint8_t>(data(), size()); }
+  absl::Span<const uint8_t> span() const {
+    return absl::Span<const uint8_t>(data(), size());
+  }
 };
 
 // String backed flatbuffer.