Merge "Add redzones around lockless queue data"
diff --git a/aos/aos_dump.cc b/aos/aos_dump.cc
index ac82b95..ef61f6b 100644
--- a/aos/aos_dump.cc
+++ b/aos/aos_dump.cc
@@ -14,6 +14,9 @@
             "If true, fetch the current message on the channel first");
 DEFINE_bool(pretty, false,
             "If true, pretty print the messages on multiple lines");
+DEFINE_bool(print_timestamps, true, "If true, timestamps are printed.");
+DEFINE_uint64(count, 0,
+              "If >0, aos_dump will exit after printing this many messages.");
 
 namespace {
 
@@ -29,14 +32,18 @@
       builder, channel->schema(), static_cast<const uint8_t *>(context.data),
       {FLAGS_pretty, static_cast<size_t>(FLAGS_max_vector_size)});
 
-  if (context.monotonic_remote_time != context.monotonic_event_time) {
-    std::cout << context.realtime_remote_time << " ("
-              << context.monotonic_remote_time << ") delivered "
-              << context.realtime_event_time << " ("
-              << context.monotonic_event_time << "): " << *builder << '\n';
+  if (FLAGS_print_timestamps) {
+    if (context.monotonic_remote_time != context.monotonic_event_time) {
+      std::cout << context.realtime_remote_time << " ("
+                << context.monotonic_remote_time << ") delivered "
+                << context.realtime_event_time << " ("
+                << context.monotonic_event_time << "): " << *builder << '\n';
+    } else {
+      std::cout << context.realtime_event_time << " ("
+                << context.monotonic_event_time << "): " << *builder << '\n';
+    }
   } else {
-    std::cout << context.realtime_event_time << " ("
-              << context.monotonic_event_time << "): " << *builder << '\n';
+    std::cout << *builder << '\n';
   }
 }
 
@@ -58,7 +65,7 @@
       aos::configuration::ReadConfig(FLAGS_config);
 
   const aos::Configuration *config_msg = &config.message();
-  ::aos::ShmEventLoop event_loop(config_msg);
+  aos::ShmEventLoop event_loop(config_msg);
   event_loop.SkipTimingReport();
   event_loop.SkipAosLog();
 
@@ -98,6 +105,8 @@
     LOG(FATAL) << "Multiple channels found with same type";
   }
 
+  uint64_t message_count = 0;
+
   aos::FastStringBuilder str_builder;
 
   for (const aos::Channel *channel : found_channels) {
@@ -106,13 +115,22 @@
           event_loop.MakeRawFetcher(channel);
       if (fetcher->Fetch()) {
         PrintMessage(channel, fetcher->context(), &str_builder);
+        ++message_count;
       }
     }
 
+    if (FLAGS_count > 0 && message_count >= FLAGS_count) {
+      return 0;
+    }
+
     event_loop.MakeRawWatcher(
-        channel, [channel, &str_builder](const aos::Context &context,
-                                         const void * /*message*/) {
+        channel, [channel, &str_builder, &event_loop, &message_count](
+                     const aos::Context &context, const void * /*message*/) {
           PrintMessage(channel, context, &str_builder);
+          ++message_count;
+          if (FLAGS_count > 0 && message_count >= FLAGS_count) {
+            event_loop.Exit();
+          }
         });
   }
 
diff --git a/aos/configuration.h b/aos/configuration.h
index 4ada459..fbd9cff 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -57,6 +57,14 @@
                                  const Node *node) {
   return GetChannel(&config.message(), name, type, application_name, node);
 }
+template <typename T>
+inline const Channel *GetChannel(const Configuration *config,
+                                 const std::string_view name,
+                                 const std::string_view application_name,
+                                 const Node *node) {
+  return GetChannel(config, name, T::GetFullyQualifiedName(), application_name,
+                    node);
+}
 // Convenience wrapper for getting a channel from a specified config if you
 // already have the name/type in a Channel object--this is useful if you Channel
 // object you have does not point to memory within config.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 902ca76..5912ac6 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -143,6 +143,7 @@
         ":ping_fbs",
         ":pong_fbs",
         "//aos/network:message_bridge_client_fbs",
+        "//aos/events/logging:logger_fbs",
         "//aos/network:timestamp_fbs",
         "//aos/network:message_bridge_server_fbs",
     ],
@@ -306,6 +307,7 @@
         ":aos_logging",
         ":event_loop",
         ":simple_channel",
+        "//aos/events/logging:logger_fbs",
         "//aos/ipc_lib:index",
         "//aos/network:message_bridge_client_status",
         "//aos/network:message_bridge_server_status",
diff --git a/aos/events/aos_logging.cc b/aos/events/aos_logging.cc
index f831071..d312925 100644
--- a/aos/events/aos_logging.cc
+++ b/aos/events/aos_logging.cc
@@ -19,7 +19,7 @@
           builder.add_message(message_str);
           builder.add_level(
               static_cast<::aos::logging::Level>(message_data.level));
-          builder.add_source(message_data.source);
+          builder.add_source_pid(message_data.source);
           builder.add_name(name_str);
 
           message.Send(builder.Finish());
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index c922f2e..de397e2 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -527,7 +527,10 @@
     CHECK(channel != nullptr)
         << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
         << T::GetFullyQualifiedName() << "\" } not found in config for "
-        << name() << ".";
+        << name()
+        << (configuration::MultiNode(configuration_)
+                ? absl::StrCat(" on node ", node()->name()->string_view())
+                : ".");
 
     if (!configuration::ChannelIsSendableOnNode(channel, node())) {
       LOG(FATAL) << "Channel { \"name\": \"" << channel_name
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 202c772..f5d6c92 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -94,9 +94,12 @@
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
     // backward step changes in our time function for each node, or we have to
-    // let time go backwards.  This backwards time jump should be small, so we
-    // can check for it and bound it.
-    CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+    // let time go backwards.  We currently only really see this happen when 2
+    // events are scheduled for "now", time changes, and there is a nanosecond
+    // or two of rounding due to integer math.
+    //
+    // //aos/events/logging:logger_test triggers this.
+    CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
         << ": Simulated time went backwards by too much.  Please investigate.";
     now_ = std::get<0>(oldest_event);
 
@@ -120,9 +123,12 @@
 
     // We get to pick our tradeoffs here.  Either we assume that there are no
     // backward step changes in our time function for each node, or we have to
-    // let time go backwards.  This backwards time jump should be small, so we
-    // can check for it and bound it.
-    CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+    // let time go backwards.  We currently only really see this happen when 2
+    // events are scheduled for "now", time changes, and there is a nanosecond
+    // or two of rounding due to integer math.
+    //
+    // //aos/events/logging:logger_test triggers this.
+    CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
         << ": Simulated time went backwards by too much.  Please investigate.";
     now_ = std::get<0>(oldest_event);
 
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 468d904..c067048 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -196,9 +196,16 @@
 inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
   // Make sure we stay in sync.
   if (monotonic_now_valid_) {
+    // We want time to be smooth, so confirm that it doesn't change too much
+    // while handling an event.
+    //
+    // There are 2 sources of error.  There are numerical precision and interger
+    // rounding problems going from the monotonic clock to the distributed clock
+    // and back again.  When we update the time function as well to transition
+    // line segments, we have a slight jump as well.
     CHECK_NEAR(monotonic_now_,
                FromDistributedClock(scheduler_scheduler_->distributed_now()),
-               std::chrono::nanoseconds(1));
+               std::chrono::nanoseconds(2));
     return monotonic_now_;
   } else {
     return FromDistributedClock(scheduler_scheduler_->distributed_now());
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 0fb4708..56bef43 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -36,22 +36,29 @@
 cc_library(
     name = "logger",
     srcs = [
+        "log_namer.cc",
         "logger.cc",
         "logger_math.cc",
     ],
     hdrs = [
+        "eigen_mpq.h",
+        "log_namer.h",
         "logger.h",
     ],
     visibility = ["//visibility:public"],
     deps = [
         ":logfile_utils",
         ":logger_fbs",
+        ":uuid",
         "//aos/events:event_loop",
         "//aos/events:simulated_event_loop",
+        "//aos/network:message_bridge_server_fbs",
         "//aos/network:team_number",
         "//aos/network:timestamp_filter",
         "//aos/time",
+        "//third_party/gmp",
         "@com_github_google_flatbuffers//:flatbuffers",
+        "@com_google_absl//absl/strings",
         "@com_google_absl//absl/types:span",
         "@org_tuxfamily_eigen//:eigen",
     ],
@@ -129,6 +136,7 @@
     name = "multinode_pingpong_config",
     src = "multinode_pingpong.json",
     flatbuffers = [
+        ":logger_fbs",
         "//aos/events:ping_fbs",
         "//aos/events:pong_fbs",
         "//aos/network:message_bridge_client_fbs",
@@ -153,3 +161,18 @@
         "//aos/testing:googletest",
     ],
 )
+
+cc_library(
+    name = "uuid",
+    srcs = ["uuid.cc"],
+    hdrs = ["uuid.h"],
+)
+
+cc_test(
+    name = "uuid_test",
+    srcs = ["uuid_test.cc"],
+    deps = [
+        ":uuid",
+        "//aos/testing:googletest",
+    ],
+)
diff --git a/aos/events/logging/eigen_mpq.h b/aos/events/logging/eigen_mpq.h
new file mode 100644
index 0000000..1463648
--- /dev/null
+++ b/aos/events/logging/eigen_mpq.h
@@ -0,0 +1,35 @@
+#ifndef AOS_EVENTS_LOGGING_EIGEN_MPQ_H_
+#define AOS_EVENTS_LOGGING_EIGEN_MPQ_H_
+
+#include "Eigen/Dense"
+#include "third_party/gmp/gmpxx.h"
+
+namespace Eigen {
+
+// TypeTraits for mpq_class.  This is only really enough to use inverse().
+template <>
+struct NumTraits<mpq_class>
+    : GenericNumTraits<mpq_class> {
+  typedef mpq_class Real;
+  typedef mpq_class Literal;
+  typedef mpq_class NonInteger;
+  typedef mpq_class Nested;
+
+  enum {
+    IsComplex = 0,
+    IsInteger = 0,
+    IsSigned = 1,
+    RequireInitialization = 1,
+    ReadCost = 1,
+    AddCost = 3,
+    MulCost = 9
+  };
+
+  static inline Real dummy_precision() { return mpq_class(0); }
+  static inline Real epsilon() { return mpq_class(0); }
+  static inline int digits10() { return 0; }
+};
+
+}  // namespace Eigen
+
+#endif  // AOS_EVENTS_LOGGING_EIGEN_MPQ_H_
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index b9d53ff..f406d24 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -78,6 +78,11 @@
       LOG(FATAL) << "Expected 1 logfile as an argument.";
     }
     aos::logger::MessageReader reader(argv[1]);
+    std::cout << aos::FlatbufferToJson(reader.log_file_header(),
+                                       {.multi_line = FLAGS_pretty,
+                                        .max_vector_size = static_cast<size_t>(
+                                            FLAGS_max_vector_size)})
+              << std::endl;
 
     while (true) {
       std::optional<aos::FlatbufferVector<aos::logger::MessageHeader>> message =
@@ -117,12 +122,14 @@
     LOG(FATAL) << "Expected at least 1 logfile as an argument.";
   }
 
-  std::vector<std::vector<std::string>> logfiles;
-
+  std::vector<std::string> unsorted_logfiles;
   for (int i = 1; i < argc; ++i) {
-    logfiles.emplace_back(std::vector<std::string>{std::string(argv[i])});
+    unsorted_logfiles.emplace_back(std::string(argv[i]));
   }
 
+  std::vector<std::vector<std::string>> logfiles =
+      aos::logger::SortParts(unsorted_logfiles);
+
   aos::logger::LogReader reader(logfiles);
 
   aos::FastStringBuilder builder;
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
new file mode 100644
index 0000000..def0842
--- /dev/null
+++ b/aos/events/logging/log_namer.cc
@@ -0,0 +1,237 @@
+#include "aos/events/logging/log_namer.h"
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "absl/strings/str_cat.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace logger {
+
+void LogNamer::UpdateHeader(
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+    const UUID &uuid, int parts_index) const {
+  header->mutable_message()->mutate_parts_index(parts_index);
+  CHECK_EQ(uuid.string_view().size(),
+           header->mutable_message()->mutable_parts_uuid()->size());
+  std::copy(uuid.string_view().begin(), uuid.string_view().end(),
+            reinterpret_cast<char *>(
+                header->mutable_message()->mutable_parts_uuid()->Data()));
+}
+
+void LocalLogNamer::WriteHeader(
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+    const Node *node) {
+  CHECK_EQ(node, this->node());
+  UpdateHeader(header, uuid_, part_number_);
+  data_writer_->WriteSizedFlatbuffer(header->full_span());
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
+  CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
+  return data_writer_.get();
+}
+
+void LocalLogNamer::Rotate(
+    const Node *node,
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+  CHECK(node == this->node());
+  ++part_number_;
+  *data_writer_ = std::move(*OpenDataWriter());
+  UpdateHeader(header, uuid_, part_number_);
+  data_writer_->WriteSizedFlatbuffer(header->full_span());
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
+    const Channel *channel) {
+  CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
+      << ": Message is not delivered to this node.";
+  CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
+  CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
+                                                            node_))
+      << ": Delivery times aren't logged for this channel on this node.";
+  return data_writer_.get();
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
+    const Channel * /*channel*/, const Node * /*node*/) {
+  LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
+  return nullptr;
+}
+
+MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
+                                     const Configuration *configuration,
+                                     const Node *node)
+    : LogNamer(node),
+      base_name_(base_name),
+      configuration_(configuration),
+      uuid_(UUID::Random()),
+      data_writer_(OpenDataWriter()) {}
+
+void MultiNodeLogNamer::WriteHeader(
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+    const Node *node) {
+  if (node == this->node()) {
+    UpdateHeader(header, uuid_, part_number_);
+    data_writer_->WriteSizedFlatbuffer(header->full_span());
+  } else {
+    for (std::pair<const Channel *const, DataWriter> &data_writer :
+         data_writers_) {
+      if (node == data_writer.second.node) {
+        UpdateHeader(header, data_writer.second.uuid,
+                     data_writer.second.part_number);
+        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+      }
+    }
+  }
+}
+
+void MultiNodeLogNamer::Rotate(
+    const Node *node,
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+  if (node == this->node()) {
+    ++part_number_;
+    *data_writer_ = std::move(*OpenDataWriter());
+    UpdateHeader(header, uuid_, part_number_);
+    data_writer_->WriteSizedFlatbuffer(header->full_span());
+  } else {
+    for (std::pair<const Channel *const, DataWriter> &data_writer :
+         data_writers_) {
+      if (node == data_writer.second.node) {
+        ++data_writer.second.part_number;
+        data_writer.second.rotate(data_writer.first, &data_writer.second);
+        UpdateHeader(header, data_writer.second.uuid,
+                     data_writer.second.part_number);
+        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+      }
+    }
+  }
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
+  // See if we can read the data on this node at all.
+  const bool is_readable =
+      configuration::ChannelIsReadableOnNode(channel, this->node());
+  if (!is_readable) {
+    return nullptr;
+  }
+
+  // Then, see if we are supposed to log the data here.
+  const bool log_message =
+      configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
+
+  if (!log_message) {
+    return nullptr;
+  }
+
+  // Now, sort out if this is data generated on this node, or not.  It is
+  // generated if it is sendable on this node.
+  if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
+    return data_writer_.get();
+  }
+
+  // Ok, we have data that is being forwarded to us that we are supposed to
+  // log.  It needs to be logged with send timestamps, but be sorted enough
+  // to be able to be processed.
+  CHECK(data_writers_.find(channel) == data_writers_.end());
+
+  // Track that this node is being logged.
+  const Node *source_node = configuration::GetNode(
+      configuration_, channel->source_node()->string_view());
+
+  if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
+    nodes_.emplace_back(source_node);
+  }
+
+  DataWriter data_writer;
+  data_writer.node = source_node;
+  data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
+    OpenWriter(channel, data_writer);
+  };
+  data_writer.rotate(channel, &data_writer);
+
+  return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+      .first->second.writer.get();
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
+    const Channel *channel, const Node *node) {
+  // See if we can read the data on this node at all.
+  const bool is_readable =
+      configuration::ChannelIsReadableOnNode(channel, this->node());
+  CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
+
+  CHECK(data_writers_.find(channel) == data_writers_.end());
+
+  if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
+    nodes_.emplace_back(node);
+  }
+
+  DataWriter data_writer;
+  data_writer.node = node;
+  data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
+    OpenForwardedTimestampWriter(channel, data_writer);
+  };
+  data_writer.rotate(channel, &data_writer);
+
+  return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+      .first->second.writer.get();
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
+    const Channel *channel) {
+  const bool log_delivery_times =
+      (this->node() == nullptr)
+          ? false
+          : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+                channel, this->node(), this->node());
+  if (!log_delivery_times) {
+    return nullptr;
+  }
+
+  return data_writer_.get();
+}
+
+void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
+                                                     DataWriter *data_writer) {
+  std::string filename =
+      absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
+                   "/", channel->type()->string_view(), ".part",
+                   data_writer->part_number, ".bfbs");
+
+  if (!data_writer->writer) {
+    data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+  } else {
+    *data_writer->writer = DetachedBufferWriter(filename);
+  }
+}
+
+void MultiNodeLogNamer::OpenWriter(const Channel *channel,
+                                   DataWriter *data_writer) {
+  const std::string filename = absl::StrCat(
+      base_name_, "_", channel->source_node()->string_view(), "_data",
+      channel->name()->string_view(), "/", channel->type()->string_view(),
+      ".part", data_writer->part_number, ".bfbs");
+  if (!data_writer->writer) {
+    data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+  } else {
+    *data_writer->writer = DetachedBufferWriter(filename);
+  }
+}
+
+std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
+  return std::make_unique<DetachedBufferWriter>(
+      absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
+                   part_number_, ".bfbs"));
+}
+
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
new file mode 100644
index 0000000..631016b
--- /dev/null
+++ b/aos/events/logging/log_namer.h
@@ -0,0 +1,168 @@
+#ifndef AOS_EVENTS_LOGGING_LOG_NAMER_H_
+#define AOS_EVENTS_LOGGING_LOG_NAMER_H_
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// Interface describing how to name, track, and add headers to log file parts.
+class LogNamer {
+ public:
+  // Constructs a LogNamer with the primary node (ie the one the logger runs on)
+  // being node.
+  LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
+  virtual ~LogNamer() {}
+
+  // Writes the header to all log files for a specific node.  This function
+  // needs to be called after all the writers are created.
+  //
+  // Modifies header to contain the uuid and part number for each writer as it
+  // writes it.  Since this is done unconditionally, it does not restore the
+  // previous value at the end.
+  virtual void WriteHeader(
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+      const Node *node) = 0;
+
+  // Returns a writer for writing data logged on this channel (on the node
+  // provided in the constructor).
+  virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
+
+  // Returns a writer for writing timestamps logged on this channel (on the node
+  // provided in the constructor).
+  virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
+
+  // Returns a writer for writing timestamps delivered over the special
+  // /aos/remote_timestamps/* channels.  node is the node that the timestamps
+  // are forwarded back from.
+  virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
+      const Channel *channel, const Node *node) = 0;
+
+  // Rotates all log files for the provided node.  The provided header will be
+  // modified and written per WriteHeader above.
+  virtual void Rotate(
+      const Node *node,
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+
+  // Returns all the nodes that data is being written for.
+  const std::vector<const Node *> &nodes() const { return nodes_; }
+
+  // Returns the node the logger is running on.
+  const Node *node() const { return node_; }
+
+ protected:
+  // Modifies the header to have the provided UUID and part id.
+  void UpdateHeader(
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+      const UUID &uuid, int part_id) const;
+
+  const Node *const node_;
+  std::vector<const Node *> nodes_;
+};
+
+// Local log namer is a simple version which only names things
+// "base_name.part#.bfbs" and increments the part number.  It doesn't support
+// any other log type.
+class LocalLogNamer : public LogNamer {
+ public:
+  LocalLogNamer(std::string_view base_name, const Node *node)
+      : LogNamer(node),
+        base_name_(base_name),
+        uuid_(UUID::Random()),
+        data_writer_(OpenDataWriter()) {}
+
+  void WriteHeader(
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+      const Node *node) override;
+
+  DetachedBufferWriter *MakeWriter(const Channel *channel) override;
+
+  void Rotate(const Node *node,
+              aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+      override;
+
+  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
+
+  DetachedBufferWriter *MakeForwardedTimestampWriter(
+      const Channel * /*channel*/, const Node * /*node*/) override;
+
+ private:
+  // Creates a new data writer with the new part number.
+  std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
+    return std::make_unique<DetachedBufferWriter>(
+        absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+  }
+
+  const std::string base_name_;
+  const UUID uuid_;
+  size_t part_number_ = 0;
+  std::unique_ptr<DetachedBufferWriter> data_writer_;
+};
+
+// Log namer which uses a config and a base name to name a bunch of files.
+class MultiNodeLogNamer : public LogNamer {
+ public:
+  MultiNodeLogNamer(std::string_view base_name,
+                    const Configuration *configuration, const Node *node);
+
+  void WriteHeader(
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+      const Node *node) override;
+
+  void Rotate(const Node *node,
+              aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+      override;
+
+  DetachedBufferWriter *MakeWriter(const Channel *channel) override;
+
+  DetachedBufferWriter *MakeForwardedTimestampWriter(
+      const Channel *channel, const Node *node) override;
+
+  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
+
+ private:
+  // Files to write remote data to.  We want one per channel.  Maps the channel
+  // to the writer, Node, and part number.
+  struct DataWriter {
+    std::unique_ptr<DetachedBufferWriter> writer = nullptr;
+    const Node *node;
+    size_t part_number = 0;
+    UUID uuid = UUID::Random();
+    std::function<void(const Channel *, DataWriter *)> rotate;
+  };
+
+  // Opens up a writer for timestamps forwarded back.
+  void OpenForwardedTimestampWriter(const Channel *channel,
+                                    DataWriter *data_writer);
+
+  // Opens up a writer for remote data.
+  void OpenWriter(const Channel *channel, DataWriter *data_writer);
+
+  // Opens the main data writer file for this node responsible for data_writer_.
+  std::unique_ptr<DetachedBufferWriter> OpenDataWriter();
+
+  const std::string base_name_;
+  const Configuration *const configuration_;
+  const UUID uuid_;
+
+  size_t part_number_ = 0;
+
+  // File to write both delivery timestamps and local data to.
+  std::unique_ptr<DetachedBufferWriter> data_writer_;
+
+  std::map<const Channel *, DataWriter> data_writers_;
+};
+
+}  // namespace logger
+}  // namespace aos
+
+#endif  // AOS_EVENTS_LOGGING_LOG_NAMER_H_
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index ade82f9..7f53577 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -37,6 +37,24 @@
 DetachedBufferWriter::~DetachedBufferWriter() {
   Flush();
   PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+  VLOG(1) << "Closed " << filename_;
+}
+
+DetachedBufferWriter::DetachedBufferWriter(
+    DetachedBufferWriter &&other) {
+  *this = std::move(other);
+}
+
+DetachedBufferWriter &DetachedBufferWriter::operator=(
+    DetachedBufferWriter &&other) {
+  Flush();
+  std::swap(filename_, other.filename_);
+  std::swap(fd_, other.fd_);
+  std::swap(queued_size_, other.queued_size_);
+  std::swap(written_size_, other.written_size_);
+  std::swap(queue_, other.queue_);
+  std::swap(iovec_, other.iovec_);
+  return *this;
 }
 
 void DetachedBufferWriter::QueueSizedFlatbuffer(
@@ -260,19 +278,36 @@
 
 FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename) {
   SpanReader span_reader(filename);
-  // Make sure we have enough to read the size.
   absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
 
   // Make sure something was read.
   CHECK(config_data != absl::Span<const uint8_t>())
       << ": Failed to read header from: " << filename;
 
-  // And copy the config so we have it forever.
+  // And copy the config so we have it forever, removing the size prefix.
   std::vector<uint8_t> data(
       config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
   return FlatbufferVector<LogFileHeader>(std::move(data));
 }
 
+FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
+                                               size_t n) {
+  SpanReader span_reader(filename);
+  absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
+  for (size_t i = 0; i < n + 1; ++i) {
+    data_span = span_reader.ReadMessage();
+
+    // Make sure something was read.
+    CHECK(data_span != absl::Span<const uint8_t>())
+        << ": Failed to read data from: " << filename;
+  }
+
+  // And copy the data so we have it forever.
+  std::vector<uint8_t> data(data_span.begin() + sizeof(flatbuffers::uoffset_t),
+                            data_span.end());
+  return FlatbufferVector<MessageHeader>(std::move(data));
+}
+
 MessageReader::MessageReader(std::string_view filename)
     : span_reader_(filename),
       raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
@@ -290,7 +325,7 @@
       FlatbufferVector<LogFileHeader>(std::move(header_data_copy));
 
   max_out_of_order_duration_ =
-      std::chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
+      chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
 
   VLOG(1) << "Opened " << filename << " as node "
           << FlatbufferToJson(log_file_header()->node());
@@ -323,6 +358,81 @@
   // open more of them).
   log_file_header_ = message_reader_->raw_log_file_header();
 
+  for (size_t i = 1; i < filenames_.size(); ++i) {
+    MessageReader message_reader(filenames_[i]);
+
+    const monotonic_clock::time_point new_monotonic_start_time(
+        chrono::nanoseconds(
+            message_reader.log_file_header()->monotonic_start_time()));
+    const realtime_clock::time_point new_realtime_start_time(
+        chrono::nanoseconds(
+            message_reader.log_file_header()->realtime_start_time()));
+
+    // There are 2 types of part files.  Part files from before time estimation
+    // has started, and part files after.  We don't declare a log file "started"
+    // until time estimation is up.  And once a log file starts, it should never
+    // stop again, and should remain constant.
+    // To compare both types of headers, we mutate our saved copy of the header
+    // to match the next chunk by updating time if we detect a stopped ->
+    // started transition.
+    if (monotonic_start_time() == monotonic_clock::min_time) {
+      CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
+      // We should only be missing the monotonic start time when logging data
+      // for remote nodes.  We don't have a good way to deteremine the remote
+      // realtime offset, so it shouldn't be filled out.
+      // TODO(austin): If we have a good way, feel free to fill it out.  It
+      // probably won't be better than we could do in post though with the same
+      // data.
+      CHECK(!log_file_header_.mutable_message()->has_realtime_start_time());
+      if (new_monotonic_start_time != monotonic_clock::min_time) {
+        // If we finally found our start time, update the header.  Do this once
+        // because it should never change again.
+        log_file_header_.mutable_message()->mutate_monotonic_start_time(
+            new_monotonic_start_time.time_since_epoch().count());
+        log_file_header_.mutable_message()->mutate_realtime_start_time(
+            new_realtime_start_time.time_since_epoch().count());
+      }
+    }
+
+    // We don't have a good way to set the realtime start time on remote nodes.
+    // Confirm it remains consistent.
+    CHECK_EQ(log_file_header_.mutable_message()->has_realtime_start_time(),
+             message_reader.log_file_header()->has_realtime_start_time());
+
+    // Parts index will *not* match unless we set them to match.  We only want
+    // to accept the start time and parts mismatching, so set them.
+    log_file_header_.mutable_message()->mutate_parts_index(
+        message_reader.log_file_header()->parts_index());
+
+    // Now compare that the headers match.
+    if (!CompareFlatBuffer(message_reader.raw_log_file_header(),
+                           log_file_header_)) {
+      if (message_reader.log_file_header()->has_logger_uuid() &&
+          log_file_header_.message().has_logger_uuid() &&
+          message_reader.log_file_header()->logger_uuid()->string_view() !=
+              log_file_header_.message().logger_uuid()->string_view()) {
+        LOG(FATAL) << "Logger UUIDs don't match between log file chunks "
+                   << filenames_[0] << " and " << filenames_[i]
+                   << ", this is not supported.";
+      }
+      if (message_reader.log_file_header()->has_parts_uuid() &&
+          log_file_header_.message().has_parts_uuid() &&
+          message_reader.log_file_header()->parts_uuid()->string_view() !=
+              log_file_header_.message().parts_uuid()->string_view()) {
+        LOG(FATAL) << "Parts UUIDs don't match between log file chunks "
+                   << filenames_[0] << " and " << filenames_[i]
+                   << ", this is not supported.";
+      }
+
+      LOG(FATAL) << "Header is different between log file chunks "
+                 << filenames_[0] << " and " << filenames_[i]
+                 << ", this is not supported.";
+    }
+  }
+  // Put the parts index back to the first log file chunk.
+  log_file_header_.mutable_message()->mutate_parts_index(
+      message_reader_->log_file_header()->parts_index());
+
   // Setup per channel state.
   channels_.resize(configuration()->channels()->size());
   for (ChannelData &channel_data : channels_) {
@@ -368,11 +478,22 @@
   // We can't support the config diverging between two log file headers.  See if
   // they are the same.
   if (next_filename_index_ != 0) {
+    // In order for the headers to identically compare, they need to have the
+    // same parts_index.  Rewrite the saved header with the new parts_index,
+    // compare, and then restore.
+    const int32_t original_parts_index =
+        log_file_header_.message().parts_index();
+    log_file_header_.mutable_message()->mutate_parts_index(
+        message_reader_->log_file_header()->parts_index());
+
     CHECK(CompareFlatBuffer(message_reader_->raw_log_file_header(),
                             log_file_header_))
         << ": Header is different between log file chunks "
         << filenames_[next_filename_index_] << " and "
         << filenames_[next_filename_index_ - 1] << ", this is not supported.";
+
+    log_file_header_.mutable_message()->mutate_parts_index(
+        original_parts_index);
   }
 
   ++next_filename_index_;
@@ -545,10 +666,13 @@
       timestamp = channels_[channel_index].data.front_timestamp();
   FlatbufferVector<MessageHeader> front =
       std::move(channels_[channel_index].data.front());
-  channels_[channel_index].data.pop_front();
+  channels_[channel_index].data.PopFront();
 
-  VLOG(1) << MaybeNodeName(target_node_) << "Popped " << this << " "
-          << std::get<0>(timestamp) << " for " << channel_index;
+  VLOG(1) << MaybeNodeName(target_node_) << "Popped Data " << this << " "
+          << std::get<0>(timestamp) << " for "
+          << configuration::StrippedChannelToString(
+                 configuration()->channels()->Get(channel_index))
+          << " (" << channel_index << ")";
 
   QueueMessages(std::get<0>(timestamp));
 
@@ -558,19 +682,21 @@
 
 std::tuple<monotonic_clock::time_point, uint32_t,
            FlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldest(int channel, int node_index) {
+SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
   CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
   const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
       timestamp = channels_[channel].timestamps[node_index].front_timestamp();
   FlatbufferVector<MessageHeader> front =
       std::move(channels_[channel].timestamps[node_index].front());
-  channels_[channel].timestamps[node_index].pop_front();
+  channels_[channel].timestamps[node_index].PopFront();
 
-  VLOG(1) << MaybeNodeName(target_node_) << "Popped " << this << " "
+  VLOG(1) << MaybeNodeName(target_node_) << "Popped timestamp " << this << " "
           << std::get<0>(timestamp) << " for "
           << configuration::StrippedChannelToString(
                  configuration()->channels()->Get(channel))
-          << " on " << node_index;
+          << " on "
+          << configuration()->nodes()->Get(node_index)->name()->string_view()
+          << " (" << node_index << ")";
 
   QueueMessages(std::get<0>(timestamp));
 
@@ -607,7 +733,7 @@
   return true;
 }
 
-void SplitMessageReader::MessageHeaderQueue::pop_front() {
+void SplitMessageReader::MessageHeaderQueue::PopFront() {
   data_.pop_front();
   if (data_.size() != 0u) {
     // Yup, new data.
@@ -616,6 +742,15 @@
     } else {
       timestamp_merger->Update(split_reader, front_timestamp());
     }
+  } else {
+    // Poke anyways to update the heap.
+    if (timestamps) {
+      timestamp_merger->UpdateTimestamp(
+          nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
+    } else {
+      timestamp_merger->Update(
+          nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
+    }
   }
 }
 
@@ -687,25 +822,32 @@
     std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
         timestamp,
     SplitMessageReader *split_message_reader) {
-  DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
-                      [split_message_reader](
-                          const std::tuple<monotonic_clock::time_point,
-                                           uint32_t, SplitMessageReader *>
-                              x) {
-                        return std::get<2>(x) == split_message_reader;
-                      }) == message_heap_.end())
-      << ": Pushing message when it is already in the heap.";
+  if (split_message_reader != nullptr) {
+    DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
+                        [split_message_reader](
+                            const std::tuple<monotonic_clock::time_point,
+                                             uint32_t, SplitMessageReader *>
+                                x) {
+                          return std::get<2>(x) == split_message_reader;
+                        }) == message_heap_.end())
+        << ": Pushing message when it is already in the heap.";
 
-  message_heap_.push_back(std::make_tuple(
-      std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+    message_heap_.push_back(std::make_tuple(
+        std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
 
-  std::push_heap(message_heap_.begin(), message_heap_.end(),
-                 &SplitMessageReaderHeapCompare);
+    std::push_heap(message_heap_.begin(), message_heap_.end(),
+                   &SplitMessageReaderHeapCompare);
+  }
 
   // If we are just a data merger, don't wait for timestamps.
   if (!has_timestamps_) {
-    channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
-    pushed_ = true;
+    if (!message_heap_.empty()) {
+      channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
+      pushed_ = true;
+    } else {
+      // Remove ourselves if we are empty.
+      channel_merger_->Update(monotonic_clock::min_time, channel_index_);
+    }
   }
 }
 
@@ -730,26 +872,33 @@
     std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
         timestamp,
     SplitMessageReader *split_message_reader) {
-  DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
-                      [split_message_reader](
-                          const std::tuple<monotonic_clock::time_point,
-                                           uint32_t, SplitMessageReader *>
-                              x) {
-                        return std::get<2>(x) == split_message_reader;
-                      }) == timestamp_heap_.end())
-      << ": Pushing timestamp when it is already in the heap.";
+  if (split_message_reader != nullptr) {
+    DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
+                        [split_message_reader](
+                            const std::tuple<monotonic_clock::time_point,
+                                             uint32_t, SplitMessageReader *>
+                                x) {
+                          return std::get<2>(x) == split_message_reader;
+                        }) == timestamp_heap_.end())
+        << ": Pushing timestamp when it is already in the heap.";
 
-  timestamp_heap_.push_back(std::make_tuple(
-      std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
+    timestamp_heap_.push_back(std::make_tuple(
+        std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
 
-  std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
-                 SplitMessageReaderHeapCompare);
+    std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                   SplitMessageReaderHeapCompare);
+  }
 
   // If we are a timestamp merger, don't wait for data.  Missing data will be
   // caught at read time.
   if (has_timestamps_) {
-    channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
-    pushed_ = true;
+    if (!timestamp_heap_.empty()) {
+      channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
+      pushed_ = true;
+    } else {
+      // Remove ourselves if we are empty.
+      channel_merger_->Update(monotonic_clock::min_time, channel_index_);
+    }
   }
 }
 
@@ -832,50 +981,73 @@
   std::tuple<monotonic_clock::time_point, uint32_t,
              FlatbufferVector<MessageHeader>>
       oldest_timestamp = std::get<2>(oldest_timestamp_reader)
-                             ->PopOldest(channel_index_, node_index_);
+                             ->PopOldestTimestamp(channel_index_, node_index_);
 
   // Confirm that the time we have recorded matches.
   CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
   CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
 
-  // TODO(austin): What if we get duplicate timestamps?
+  // Now, keep reading until we have found all duplicates.
+  while (!timestamp_heap_.empty()) {
+    // See if it is a duplicate.
+    std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+        next_oldest_timestamp_reader = timestamp_heap_.front();
 
-  return oldest_timestamp;
-}
+    std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+        next_oldest_timestamp_time =
+            std::get<2>(next_oldest_timestamp_reader)
+                ->oldest_message(channel_index_, node_index_);
 
-TimestampMerger::DeliveryTimestamp TimestampMerger::OldestTimestamp() const {
-  if (!has_timestamps_ || timestamp_heap_.size() == 0u) {
-    return TimestampMerger::DeliveryTimestamp{};
+    if (std::get<0>(next_oldest_timestamp_time) ==
+            std::get<0>(oldest_timestamp) &&
+        std::get<1>(next_oldest_timestamp_time) ==
+            std::get<1>(oldest_timestamp)) {
+      // Pop the timestamp reader pointer.
+      std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                    &SplitMessageReaderHeapCompare);
+      timestamp_heap_.pop_back();
+
+      // Pop the next oldest timestamp.  This re-pushes any messages from the
+      // reader.
+      std::tuple<monotonic_clock::time_point, uint32_t,
+                 FlatbufferVector<MessageHeader>>
+          next_oldest_timestamp =
+              std::get<2>(next_oldest_timestamp_reader)
+                  ->PopOldestTimestamp(channel_index_, node_index_);
+
+      // And make sure the contents matches in it's entirety.
+      CHECK(std::get<2>(oldest_timestamp).span() ==
+            std::get<2>(next_oldest_timestamp).span())
+          << ": Data at the same timestamp doesn't match, "
+          << aos::FlatbufferToJson(std::get<2>(oldest_timestamp)) << " vs "
+          << aos::FlatbufferToJson(std::get<2>(next_oldest_timestamp)) << " "
+          << absl::BytesToHexString(std::string_view(
+                 reinterpret_cast<const char *>(
+                     std::get<2>(oldest_timestamp).span().data()),
+                 std::get<2>(oldest_timestamp).span().size()))
+          << " vs "
+          << absl::BytesToHexString(std::string_view(
+                 reinterpret_cast<const char *>(
+                     std::get<2>(next_oldest_timestamp).span().data()),
+                 std::get<2>(next_oldest_timestamp).span().size()));
+
+    } else {
+      break;
+    }
   }
 
-  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
-      oldest_timestamp_reader = timestamp_heap_.front();
-
-  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-      oldest_timestamp = std::get<2>(oldest_timestamp_reader)
-                             ->oldest_message(channel_index_, node_index_);
-
-  TimestampMerger::DeliveryTimestamp timestamp;
-  timestamp.monotonic_event_time =
-      monotonic_clock::time_point(chrono::nanoseconds(
-          std::get<2>(oldest_timestamp)->monotonic_sent_time()));
-  timestamp.realtime_event_time = realtime_clock::time_point(
-      chrono::nanoseconds(std::get<2>(oldest_timestamp)->realtime_sent_time()));
-
-  timestamp.monotonic_remote_time =
-      monotonic_clock::time_point(chrono::nanoseconds(
-          std::get<2>(oldest_timestamp)->monotonic_remote_time()));
-  timestamp.realtime_remote_time =
-      realtime_clock::time_point(chrono::nanoseconds(
-          std::get<2>(oldest_timestamp)->realtime_remote_time()));
-
-  timestamp.remote_queue_index = std::get<2>(oldest_timestamp)->queue_index();
-  return timestamp;
+  return oldest_timestamp;
 }
 
 std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
 TimestampMerger::PopOldest() {
   if (has_timestamps_) {
+    VLOG(1) << "Looking for matching timestamp for "
+            << configuration::StrippedChannelToString(
+                   configuration_->channels()->Get(channel_index_))
+            << " (" << channel_index_ << ") "
+            << " at " << std::get<0>(oldest_timestamp());
+
     // Read the timestamps.
     std::tuple<monotonic_clock::time_point, uint32_t,
                FlatbufferVector<MessageHeader>>
@@ -944,7 +1116,8 @@
                        << " on channel "
                        << configuration::StrippedChannelToString(
                               configuration_->channels()->Get(channel_index_))
-                       << " (" << channel_index_ << ")";
+                       << " (" << channel_index_ << ")"
+                       << (VLOG_IS_ON(1) ? DebugString() : "");
           return std::make_tuple(timestamp,
                                  std::move(std::get<2>(oldest_timestamp)));
         }
@@ -952,6 +1125,10 @@
         timestamp.monotonic_remote_time = remote_monotonic_time;
       }
 
+      VLOG(1) << "Found matching data "
+              << configuration::StrippedChannelToString(
+                     configuration_->channels()->Get(channel_index_))
+              << " (" << channel_index_ << ")";
       std::tuple<monotonic_clock::time_point, uint32_t,
                  FlatbufferVector<MessageHeader>>
           oldest_message = PopMessageHeap();
@@ -1062,10 +1239,27 @@
                   << FlatbufferToJson(reader->node()) << " start_time "
                   << monotonic_start_time();
         } else {
-          // And then make sure all the other files have matching headers.
-          CHECK(CompareFlatBuffer(log_file_header(), reader->log_file_header()))
-              << ": " << FlatbufferToJson(log_file_header()) << " reader "
-              << FlatbufferToJson(reader->log_file_header());
+          // Find the earliest start time.  That way, if we get a full log file
+          // directly from the node, and a partial later, we start with the
+          // full.  Update our header to match that.
+          const monotonic_clock::time_point new_monotonic_start_time(
+              chrono::nanoseconds(
+                  reader->log_file_header()->monotonic_start_time()));
+          const realtime_clock::time_point new_realtime_start_time(
+              chrono::nanoseconds(
+                  reader->log_file_header()->realtime_start_time()));
+
+          if (monotonic_start_time() == monotonic_clock::min_time ||
+              (new_monotonic_start_time != monotonic_clock::min_time &&
+               new_monotonic_start_time < monotonic_start_time())) {
+            log_file_header_.mutable_message()->mutate_monotonic_start_time(
+                new_monotonic_start_time.time_since_epoch().count());
+            log_file_header_.mutable_message()->mutate_realtime_start_time(
+                new_realtime_start_time.time_since_epoch().count());
+            VLOG(1) << "Updated log file " << reader->filename()
+                    << " with node " << FlatbufferToJson(reader->node())
+                    << " start_time " << new_monotonic_start_time;
+          }
         }
       }
     }
@@ -1105,24 +1299,6 @@
   return channel_heap_.front().first;
 }
 
-TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
-  if (timestamp_heap_.empty()) {
-    return TimestampMerger::DeliveryTimestamp{};
-  }
-  return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
-}
-
-TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestampForChannel(
-    int channel) const {
-  // If we didn't find any data for this node, we won't have any mergers. Return
-  // an invalid timestamp in that case.
-  if (timestamp_mergers_.size() <= static_cast<size_t>(channel)) {
-    TimestampMerger::DeliveryTimestamp result;
-    return result;
-  }
-  return timestamp_mergers_[channel].OldestTimestamp();
-}
-
 void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
                                     int channel_index) {
   // Pop and recreate the heap if it has already been pushed.  And since we are
@@ -1142,23 +1318,11 @@
     channel_heap_.erase(channel_iterator);
     std::make_heap(channel_heap_.begin(), channel_heap_.end(),
                    ChannelHeapCompare);
+  }
 
-    if (timestamp_mergers_[channel_index].has_timestamps()) {
-      const auto timestamp_iterator = std::find_if(
-          timestamp_heap_.begin(), timestamp_heap_.end(),
-          [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
-            return x.second == channel_index;
-          });
-      DCHECK(timestamp_iterator != timestamp_heap_.end());
-      if (std::get<0>(*timestamp_iterator) == timestamp) {
-        // It's already in the heap, in the correct spot, so nothing
-        // more for us to do here.
-        return;
-      }
-      timestamp_heap_.erase(timestamp_iterator);
-      std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
-                     ChannelHeapCompare);
-    }
+  if (timestamp == monotonic_clock::min_time) {
+    timestamp_mergers_[channel_index].set_pushed(false);
+    return;
   }
 
   channel_heap_.push_back(std::make_pair(timestamp, channel_index));
@@ -1167,11 +1331,18 @@
   // put the oldest message first.
   std::push_heap(channel_heap_.begin(), channel_heap_.end(),
                  ChannelHeapCompare);
+}
 
-  if (timestamp_mergers_[channel_index].has_timestamps()) {
-    timestamp_heap_.push_back(std::make_pair(timestamp, channel_index));
-    std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
-                   ChannelHeapCompare);
+void ChannelMerger::VerifyHeaps() {
+  std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
+      channel_heap_;
+  std::make_heap(channel_heap.begin(), channel_heap.end(), &ChannelHeapCompare);
+
+  for (size_t i = 0; i < channel_heap_.size(); ++i) {
+    CHECK(channel_heap_[i] == channel_heap[i]) << ": Heaps diverged...";
+    CHECK_EQ(
+        std::get<0>(channel_heap[i]),
+        timestamp_mergers_[std::get<1>(channel_heap[i])].channel_merger_time());
   }
 }
 
@@ -1190,17 +1361,6 @@
 
   TimestampMerger *merger = &timestamp_mergers_[channel_index];
 
-  if (merger->has_timestamps()) {
-    CHECK_GT(timestamp_heap_.size(), 0u);
-    std::pair<monotonic_clock::time_point, int> oldest_timestamp_data =
-        timestamp_heap_.front();
-    CHECK(oldest_timestamp_data == oldest_channel_data)
-        << ": Timestamp heap out of sync.";
-    std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
-                  &ChannelHeapCompare);
-    timestamp_heap_.pop_back();
-  }
-
   // Merger handles any queueing needed from here.
   std::tuple<TimestampMerger::DeliveryTimestamp,
              FlatbufferVector<MessageHeader>>
@@ -1210,6 +1370,16 @@
       << ": channel_heap_ was corrupted for " << channel_index << ": "
       << DebugString();
 
+  CHECK_GE(std::get<0>(message).monotonic_event_time, last_popped_time_)
+      << ": " << MaybeNodeName(log_file_header()->node())
+      << "Messages came off the queue out of order. " << DebugString();
+  last_popped_time_ = std::get<0>(message).monotonic_event_time;
+
+  VLOG(1) << "Popped " << last_popped_time_ << " "
+          << configuration::StrippedChannelToString(
+                 configuration()->channels()->Get(channel_index))
+          << " (" << channel_index << ")";
+
   return std::make_tuple(std::get<0>(message), channel_index,
                          std::move(std::get<1>(message)));
 }
@@ -1217,27 +1387,31 @@
 std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
   std::stringstream ss;
   for (size_t i = 0; i < data_.size(); ++i) {
-    if (timestamps) {
-      ss << "        msg: ";
-    } else {
-      ss << "        timestamp: ";
-    }
-    ss << monotonic_clock::time_point(std::chrono::nanoseconds(
-              data_[i].message().monotonic_sent_time()))
-       << " ("
-       << realtime_clock::time_point(
-              std::chrono::nanoseconds(data_[i].message().realtime_sent_time()))
-       << ") " << data_[i].message().queue_index();
-    if (timestamps) {
-      ss << "  <- remote "
-         << monotonic_clock::time_point(std::chrono::nanoseconds(
-                data_[i].message().monotonic_remote_time()))
+    if (i < 5 || i + 5 > data_.size()) {
+      if (timestamps) {
+        ss << "        msg: ";
+      } else {
+        ss << "        timestamp: ";
+      }
+      ss << monotonic_clock::time_point(
+                chrono::nanoseconds(data_[i].message().monotonic_sent_time()))
          << " ("
-         << realtime_clock::time_point(std::chrono::nanoseconds(
-                data_[i].message().realtime_remote_time()))
-         << ")";
+         << realtime_clock::time_point(
+                chrono::nanoseconds(data_[i].message().realtime_sent_time()))
+         << ") " << data_[i].message().queue_index();
+      if (timestamps) {
+        ss << "  <- remote "
+           << monotonic_clock::time_point(chrono::nanoseconds(
+                  data_[i].message().monotonic_remote_time()))
+           << " ("
+           << realtime_clock::time_point(chrono::nanoseconds(
+                  data_[i].message().realtime_remote_time()))
+           << ")";
+      }
+      ss << "\n";
+    } else if (i == 5) {
+      ss << "        ...\n";
     }
-    ss << "\n";
   }
 
   return ss.str();
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 2b08b59..9534860 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -41,13 +41,20 @@
 class DetachedBufferWriter {
  public:
   DetachedBufferWriter(std::string_view filename);
+  DetachedBufferWriter(DetachedBufferWriter &&other);
+  DetachedBufferWriter(const DetachedBufferWriter &) = delete;
+
   ~DetachedBufferWriter();
 
-  DetachedBufferWriter(const DetachedBufferWriter &) = delete;
+  DetachedBufferWriter &operator=(DetachedBufferWriter &&other);
   DetachedBufferWriter &operator=(const DetachedBufferWriter &) = delete;
 
   std::string_view filename() const { return filename_; }
 
+  // Rewrites a location in a file (relative to the start) to have new data in
+  // it.  The main use case is updating start times after a log file starts.
+  void RewriteLocation(off64_t offset, absl::Span<const uint8_t> data);
+
   // TODO(austin): Snappy compress the log file if it ends with .snappy!
 
   // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
@@ -68,7 +75,7 @@
   size_t total_size() const { return written_size_ + queued_size_; }
 
  private:
-  const std::string filename_;
+  std::string filename_;
 
   int fd_ = -1;
 
@@ -89,6 +96,8 @@
     int channel_index, LogType log_type);
 
 FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
+FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
+                                               size_t n);
 
 // Class to read chunks out of a log file.
 class SpanReader {
@@ -236,15 +245,15 @@
   void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
                           const Node *target_node);
 
-  // Returns the (timestamp, queue_idex) for the oldest message in a channel, or
-  // max_time if there is nothing in the channel.
+  // Returns the (timestamp, queue_index, message_header) for the oldest message
+  // in a channel, or max_time if there is nothing in the channel.
   std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
   oldest_message(int channel) {
     return channels_[channel].data.front_timestamp();
   }
 
-  // Returns the (timestamp, queue_index) for the oldest delivery time in a
-  // channel, or max_time if there is nothing in the channel.
+  // Returns the (timestamp, queue_index, message_header) for the oldest
+  // delivery time in a channel, or max_time if there is nothing in the channel.
   std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
   oldest_message(int channel, int destination_node) {
     return channels_[channel].timestamps[destination_node].front_timestamp();
@@ -260,7 +269,7 @@
   // a channel delivered to a node.  Requeues data as needed.
   std::tuple<monotonic_clock::time_point, uint32_t,
              FlatbufferVector<MessageHeader>>
-  PopOldest(int channel, int node_index);
+  PopOldestTimestamp(int channel, int node_index);
 
   // Returns the header for the log files.
   const LogFileHeader *log_file_header() const {
@@ -367,7 +376,7 @@
     bool emplace_back(FlatbufferVector<MessageHeader> &&msg);
 
     // Drops the front message.  Invalidates the front() reference.
-    void pop_front();
+    void PopFront();
 
     // The size of the queue.
     size_t size() { return data_.size(); }
@@ -375,15 +384,16 @@
     // Returns a debug string with info about each message in the queue.
     std::string DebugString() const;
 
-    // Returns the (timestamp, queue_index) for the oldest message.
+    // Returns the (timestamp, queue_index, message_header) for the oldest
+    // message.
     const std::tuple<monotonic_clock::time_point, uint32_t,
                      const MessageHeader *>
     front_timestamp() {
-      CHECK_GT(data_.size(), 0u);
+      const MessageHeader &message = front().message();
       return std::make_tuple(
-          monotonic_clock::time_point(std::chrono::nanoseconds(
-              front().message().monotonic_sent_time())),
-          front().message().queue_index(), &front().message());
+          monotonic_clock::time_point(
+              std::chrono::nanoseconds(message.monotonic_sent_time())),
+          message.queue_index(), &message);
     }
 
     // Pointer to the timestamp merger for this queue if available.
@@ -471,9 +481,6 @@
   // The caller can determine what the appropriate action is to recover.
   std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
 
-  // Returns the oldest forwarding timestamp.
-  DeliveryTimestamp OldestTimestamp() const;
-
   // Tracks if the channel merger has pushed this onto it's heap or not.
   bool pushed() { return pushed_; }
   // Sets if this has been pushed to the channel merger heap.  Should only be
@@ -490,6 +497,14 @@
   // called by a SplitMessageReader.
   void NoticeAtEnd();
 
+  aos::monotonic_clock::time_point channel_merger_time() {
+    if (has_timestamps_) {
+      return std::get<0>(timestamp_heap_[0]);
+    } else {
+      return std::get<0>(message_heap_[0]);
+    }
+  }
+
  private:
   // Pushes messages and timestamps to the corresponding heaps.
   void PushMessageHeap(
@@ -576,12 +591,6 @@
              FlatbufferVector<MessageHeader>>
   PopOldest();
 
-  // Returns the oldest timestamp in the timestamp heap.
-  TimestampMerger::DeliveryTimestamp OldestTimestamp() const;
-  // Returns the oldest timestamp in the timestamp heap for a specific channel.
-  TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
-      int channel) const;
-
   // Returns the config for this set of log files.
   const Configuration *configuration() const {
     return log_file_header()->configuration();
@@ -628,6 +637,9 @@
   void PushChannelHeap(monotonic_clock::time_point timestamp,
                        int channel_index);
 
+  // CHECKs that channel_heap_ and timestamp_heap_ are valid heaps.
+  void VerifyHeaps();
+
   // All the message readers.
   std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
 
@@ -639,9 +651,6 @@
 
   // A heap of the channel readers and timestamps for the oldest data in each.
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
-  // A heap of just the timestamp channel readers and timestamps for the oldest
-  // data in each.
-  std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap_;
 
   // Configured node.
   const Node *node_;
@@ -650,6 +659,9 @@
 
   // Cached copy of the list of nodes.
   std::vector<const Node *> nodes_;
+
+  // Last time popped.  Used to detect events being returned out of order.
+  monotonic_clock::time_point last_popped_time_ = monotonic_clock::min_time;
 };
 
 // Returns the node name with a trailing space, or an empty string if we are on
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 242638c..caccf03 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -8,13 +8,16 @@
 #include <vector>
 
 #include "Eigen/Dense"
+#include "absl/strings/escaping.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/network/team_number.h"
 #include "aos/time/time.h"
 #include "flatbuffers/flatbuffers.h"
+#include "third_party/gmp/gmpxx.h"
 
 DEFINE_bool(skip_missing_forwarding_entries, false,
             "If true, drop any forwarding entries with missing data.  If "
@@ -25,26 +28,98 @@
             "of CSV files in /tmp/.  This should only be needed when debugging "
             "time synchronization.");
 
+DEFINE_bool(skip_order_validation, false,
+            "If true, ignore any out of orderness in replay");
+
 namespace aos {
 namespace logger {
-
 namespace chrono = std::chrono;
 
-Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
-               std::chrono::milliseconds polling_period)
-    : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
-             event_loop, polling_period) {}
 
+Logger::Logger(std::string_view base_name, EventLoop *event_loop,
+               std::chrono::milliseconds polling_period)
+    : Logger(base_name, event_loop, event_loop->configuration(),
+             polling_period) {}
+Logger::Logger(std::string_view base_name, EventLoop *event_loop,
+               const Configuration *configuration,
+               std::chrono::milliseconds polling_period)
+    : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
+             event_loop, configuration, polling_period) {}
 Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
                std::chrono::milliseconds polling_period)
+    : Logger(std::move(log_namer), event_loop, event_loop->configuration(),
+             polling_period) {}
+
+Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+               const Configuration *configuration,
+               std::chrono::milliseconds polling_period)
     : event_loop_(event_loop),
+      uuid_(UUID::Random()),
       log_namer_(std::move(log_namer)),
+      configuration_(configuration),
+      name_(network::GetHostname()),
       timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
-      polling_period_(polling_period) {
+      polling_period_(polling_period),
+      server_statistics_fetcher_(
+          configuration::MultiNode(event_loop_->configuration())
+              ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
+                    "/aos")
+              : aos::Fetcher<message_bridge::ServerStatistics>()) {
   VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
   int channel_index = 0;
-  for (const Channel *channel : *event_loop_->configuration()->channels()) {
+
+  // Find all the nodes which are logging timestamps on our node.
+  std::set<const Node *> timestamp_logger_nodes;
+  for (const Channel *channel : *configuration_->channels()) {
+    if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
+        !channel->has_destination_nodes()) {
+      continue;
+    }
+    for (const Connection *connection : *channel->destination_nodes()) {
+      const Node *other_node = configuration::GetNode(
+          configuration_, connection->name()->string_view());
+
+      if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+              connection, event_loop_->node())) {
+        VLOG(1) << "Timestamps are logged from "
+                << FlatbufferToJson(other_node);
+        timestamp_logger_nodes.insert(other_node);
+      }
+    }
+  }
+
+  std::map<const Channel *, const Node *> timestamp_logger_channels;
+
+  // Now that we have all the nodes accumulated, make remote timestamp loggers
+  // for them.
+  for (const Node *node : timestamp_logger_nodes) {
+    const Channel *channel = configuration::GetChannel(
+        configuration_,
+        absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+        logger::MessageHeader::GetFullyQualifiedName(), event_loop_->name(),
+        event_loop_->node());
+
+    CHECK(channel != nullptr)
+        << ": Remote timestamps are logged on "
+        << event_loop_->node()->name()->string_view()
+        << " but can't find channel /aos/remote_timestamps/"
+        << node->name()->string_view();
+    timestamp_logger_channels.insert(std::make_pair(channel, node));
+  }
+
+  const size_t our_node_index = configuration::GetNodeIndex(
+      configuration_, event_loop_->node());
+
+  for (const Channel *config_channel : *configuration_->channels()) {
+    // The MakeRawFetcher method needs a channel which is in the event loop
+    // configuration() object, not the configuration_ object.  Go look that up
+    // from the config.
+    const Channel *channel = aos::configuration::GetChannel(
+        event_loop_->configuration(), config_channel->name()->string_view(),
+        config_channel->type()->string_view(), "", event_loop_->node());
+
     FetcherStruct fs;
+    fs.node_index = our_node_index;
     const bool is_local =
         configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
 
@@ -60,7 +135,15 @@
             : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
                   channel, event_loop_->node(), event_loop_->node());
 
-    if (log_message || log_delivery_times) {
+    // Now, detect a MessageHeader timestamp logger where we should just log the
+    // contents to a file directly.
+    const bool log_contents = timestamp_logger_channels.find(channel) !=
+                              timestamp_logger_channels.end();
+    const Node *timestamp_node =
+        log_contents ? timestamp_logger_channels.find(channel)->second
+                     : nullptr;
+
+    if (log_message || log_delivery_times || log_contents) {
       fs.fetcher = event_loop->MakeRawFetcher(channel);
       VLOG(1) << "Logging channel "
               << configuration::CleanedChannelToString(channel);
@@ -76,6 +159,14 @@
           fs.log_type = LogType::kLogRemoteMessage;
         }
       }
+      if (log_contents) {
+        VLOG(1) << "Timestamp logger channel "
+                << configuration::CleanedChannelToString(channel);
+        fs.contents_writer =
+            log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
+        fs.node_index =
+            configuration::GetNodeIndex(configuration_, timestamp_node);
+      }
       fs.channel_index = channel_index;
       fs.written = false;
       fetchers_.emplace_back(std::move(fs));
@@ -83,63 +174,207 @@
     ++channel_index;
   }
 
-  // When things start, we want to log the header, then the most recent messages
-  // available on each fetcher to capture the previous state, then start
-  // polling.
-  event_loop_->OnRun([this, polling_period]() {
-    // Grab data from each channel right before we declare the log file started
-    // so we can capture the latest message on each channel.  This lets us have
-    // non periodic messages with configuration that now get logged.
-    for (FetcherStruct &f : fetchers_) {
-      f.written = !f.fetcher->Fetch();
-    }
+  node_state_.resize(configuration::MultiNode(configuration_)
+                         ? configuration_->nodes()->size()
+                         : 1u);
 
-    // We need to pick a point in time to declare the log file "started".  This
-    // starts here.  It needs to be after everything is fetched so that the
-    // fetchers are all pointed at the most recent message before the start
-    // time.
-    monotonic_start_time_ = event_loop_->monotonic_now();
-    realtime_start_time_ = event_loop_->realtime_now();
-    last_synchronized_time_ = monotonic_start_time_;
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index =
+        configuration::GetNodeIndex(configuration_, node);
 
-    LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
-              << " start_time " << monotonic_start_time_;
+    node_state_[node_index].log_file_header = MakeHeader(node);
+  }
 
-    WriteHeader();
-
-    timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
-                          polling_period);
-  });
+  // When things start, we want to log the header, then the most recent
+  // messages available on each fetcher to capture the previous state, then
+  // start polling.
+  event_loop_->OnRun([this]() { StartLogging(); });
 }
 
-// TODO(austin): Set the remote start time to the first time we see a remote
-// message when we are logging those messages separate?  Need to signal what to
-// do, or how to get a good timestamp.
+Logger::~Logger() {
+  // If we are replaying a log file, or in simulation, we want to force the last
+  // bit of data to be logged.  The easiest way to deal with this is to poll
+  // everything as we go to destroy the class, ie, shut down the logger, and
+  // write it to disk.
+  DoLogData();
+}
+
+void Logger::StartLogging() {
+  // Grab data from each channel right before we declare the log file started
+  // so we can capture the latest message on each channel.  This lets us have
+  // non periodic messages with configuration that now get logged.
+  for (FetcherStruct &f : fetchers_) {
+    f.written = !f.fetcher->Fetch();
+  }
+
+  // Clear out any old timestamps in case we are re-starting logging.
+  for (size_t i = 0; i < node_state_.size(); ++i) {
+    SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time);
+  }
+
+  WriteHeader();
+
+  LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+            << " start_time " << last_synchronized_time_;
+
+  timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
+                        polling_period_);
+}
+
 void Logger::WriteHeader() {
+  if (configuration::MultiNode(configuration_)) {
+    server_statistics_fetcher_.Fetch();
+  }
+
+  aos::monotonic_clock::time_point monotonic_start_time =
+      event_loop_->monotonic_now();
+  aos::realtime_clock::time_point realtime_start_time =
+      event_loop_->realtime_now();
+
+  // We need to pick a point in time to declare the log file "started".  This
+  // starts here.  It needs to be after everything is fetched so that the
+  // fetchers are all pointed at the most recent message before the start
+  // time.
+  last_synchronized_time_ = monotonic_start_time;
+
   for (const Node *node : log_namer_->nodes()) {
-    WriteHeader(node);
+    const int node_index =
+        configuration::GetNodeIndex(configuration_, node);
+    MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
+                         realtime_start_time);
+    log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
   }
 }
 
-void Logger::WriteHeader(const Node *node) {
+void Logger::WriteMissingTimestamps() {
+  if (configuration::MultiNode(configuration_)) {
+    server_statistics_fetcher_.Fetch();
+  } else {
+    return;
+  }
+
+  if (server_statistics_fetcher_.get() == nullptr) {
+    return;
+  }
+
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index =
+        configuration::GetNodeIndex(configuration_, node);
+    if (MaybeUpdateTimestamp(
+            node, node_index,
+            server_statistics_fetcher_.context().monotonic_event_time,
+            server_statistics_fetcher_.context().realtime_event_time)) {
+      log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+    }
+  }
+}
+
+void Logger::SetStartTime(size_t node_index,
+                          aos::monotonic_clock::time_point monotonic_start_time,
+                          aos::realtime_clock::time_point realtime_start_time) {
+  node_state_[node_index].monotonic_start_time = monotonic_start_time;
+  node_state_[node_index].realtime_start_time = realtime_start_time;
+  node_state_[node_index]
+      .log_file_header.mutable_message()
+      ->mutate_monotonic_start_time(
+          std::chrono::duration_cast<std::chrono::nanoseconds>(
+              monotonic_start_time.time_since_epoch())
+              .count());
+  if (node_state_[node_index]
+          .log_file_header.mutable_message()
+          ->has_realtime_start_time()) {
+    node_state_[node_index]
+        .log_file_header.mutable_message()
+        ->mutate_realtime_start_time(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                realtime_start_time.time_since_epoch())
+                .count());
+  }
+}
+
+bool Logger::MaybeUpdateTimestamp(
+    const Node *node, int node_index,
+    aos::monotonic_clock::time_point monotonic_start_time,
+    aos::realtime_clock::time_point realtime_start_time) {
+  // Bail early if there the start times are already set.
+  if (node_state_[node_index].monotonic_start_time !=
+      monotonic_clock::min_time) {
+    return false;
+  }
+  if (configuration::MultiNode(configuration_)) {
+    if (event_loop_->node() == node) {
+      // There are no offsets to compute for ourself, so always succeed.
+      SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+      return true;
+    } else if (server_statistics_fetcher_.get() != nullptr) {
+      // We must be a remote node now.  Look for the connection and see if it is
+      // connected.
+
+      for (const message_bridge::ServerConnection *connection :
+           *server_statistics_fetcher_->connections()) {
+        if (connection->node()->name()->string_view() !=
+            node->name()->string_view()) {
+          continue;
+        }
+
+        if (connection->state() != message_bridge::State::CONNECTED) {
+          VLOG(1) << node->name()->string_view()
+                  << " is not connected, can't start it yet.";
+          break;
+        }
+
+        if (!connection->has_monotonic_offset()) {
+          VLOG(1) << "Missing monotonic offset for setting start time for node "
+                  << aos::FlatbufferToJson(node);
+          break;
+        }
+
+        VLOG(1) << "Updating start time for " << aos::FlatbufferToJson(node);
+
+        // Found it and it is connected.  Compensate and go.
+        monotonic_start_time +=
+            std::chrono::nanoseconds(connection->monotonic_offset());
+
+        SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+        return true;
+      }
+    }
+  } else {
+    SetStartTime(node_index, monotonic_start_time, realtime_start_time);
+    return true;
+  }
+  return false;
+}
+
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
+    const Node *node) {
   // Now write the header with this timestamp in it.
   flatbuffers::FlatBufferBuilder fbb;
   fbb.ForceDefaults(true);
 
+  // TODO(austin): Compress this much more efficiently.  There are a bunch of
+  // duplicated schemas.
   flatbuffers::Offset<aos::Configuration> configuration_offset =
-      CopyFlatBuffer(event_loop_->configuration(), &fbb);
+      CopyFlatBuffer(configuration_, &fbb);
 
-  flatbuffers::Offset<flatbuffers::String> string_offset =
-      fbb.CreateString(network::GetHostname());
+  flatbuffers::Offset<flatbuffers::String> name_offset =
+      fbb.CreateString(name_);
+
+  flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
+      fbb.CreateString(uuid_.string_view());
+
+  flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
+      fbb.CreateString("00000000-0000-4000-8000-000000000000");
 
   flatbuffers::Offset<Node> node_offset;
-  if (event_loop_->node() != nullptr) {
+
+  if (configuration::MultiNode(configuration_)) {
     node_offset = CopyFlatBuffer(node, &fbb);
   }
 
   aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
 
-  log_file_header_builder.add_name(string_offset);
+  log_file_header_builder.add_name(name_offset);
 
   // Only add the node if we are running in a multinode configuration.
   if (node != nullptr) {
@@ -158,41 +393,138 @@
 
   log_file_header_builder.add_monotonic_start_time(
       std::chrono::duration_cast<std::chrono::nanoseconds>(
-          monotonic_start_time_.time_since_epoch())
+          monotonic_clock::min_time.time_since_epoch())
           .count());
-  log_file_header_builder.add_realtime_start_time(
-      std::chrono::duration_cast<std::chrono::nanoseconds>(
-          realtime_start_time_.time_since_epoch())
-          .count());
-
-  fbb.FinishSizePrefixed(log_file_header_builder.Finish());
-  log_namer_->WriteHeader(&fbb, node);
-}
-
-void Logger::Rotate(DetachedBufferWriter *writer) {
-  Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
-}
-
-void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
-  // Force data up until now to be written.
-  DoLogData();
-
-  // Swap the writer out, and re-write the header.
-  log_namer_ = std::move(log_namer);
-
-  // And then update the writers.
-  for (FetcherStruct &f : fetchers_) {
-    const Channel *channel =
-        event_loop_->configuration()->channels()->Get(f.channel_index);
-    if (f.timestamp_writer != nullptr) {
-      f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
-    }
-    if (f.writer != nullptr) {
-      f.writer = log_namer_->MakeWriter(channel);
-    }
+  if (node == event_loop_->node()) {
+    log_file_header_builder.add_realtime_start_time(
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            realtime_clock::min_time.time_since_epoch())
+            .count());
   }
 
-  WriteHeader();
+  log_file_header_builder.add_logger_uuid(logger_uuid_offset);
+
+  log_file_header_builder.add_parts_uuid(parts_uuid_offset);
+  log_file_header_builder.add_parts_index(0);
+
+  fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+  return fbb.Release();
+}
+
+void Logger::Rotate() {
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index =
+        configuration::GetNodeIndex(configuration_, node);
+    log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+  }
+}
+
+void Logger::LogUntil(monotonic_clock::time_point t) {
+  WriteMissingTimestamps();
+
+  // Write each channel to disk, one at a time.
+  for (FetcherStruct &f : fetchers_) {
+    while (true) {
+      if (f.written) {
+        if (!f.fetcher->FetchNext()) {
+          VLOG(2) << "No new data on "
+                  << configuration::CleanedChannelToString(
+                         f.fetcher->channel());
+          break;
+        } else {
+          f.written = false;
+        }
+      }
+
+      CHECK(!f.written);
+
+      // TODO(james): Write tests to exercise this logic.
+      if (f.fetcher->context().monotonic_event_time < t) {
+        if (f.writer != nullptr) {
+          // Write!
+          flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+                                             max_header_size_);
+          fbb.ForceDefaults(true);
+
+          fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                             f.channel_index, f.log_type));
+
+          VLOG(2) << "Writing data as node "
+                  << FlatbufferToJson(event_loop_->node()) << " for channel "
+                  << configuration::CleanedChannelToString(f.fetcher->channel())
+                  << " to " << f.writer->filename() << " data "
+                  << FlatbufferToJson(
+                         flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                             fbb.GetBufferPointer()));
+
+          max_header_size_ = std::max(
+              max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+          f.writer->QueueSizedFlatbuffer(&fbb);
+        }
+
+        if (f.timestamp_writer != nullptr) {
+          // And now handle timestamps.
+          flatbuffers::FlatBufferBuilder fbb;
+          fbb.ForceDefaults(true);
+
+          fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                             f.channel_index,
+                                             LogType::kLogDeliveryTimeOnly));
+
+          VLOG(2) << "Writing timestamps as node "
+                  << FlatbufferToJson(event_loop_->node()) << " for channel "
+                  << configuration::CleanedChannelToString(f.fetcher->channel())
+                  << " to " << f.timestamp_writer->filename() << " timestamp "
+                  << FlatbufferToJson(
+                         flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                             fbb.GetBufferPointer()));
+
+          f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+        }
+
+        if (f.contents_writer != nullptr) {
+          // And now handle the special message contents channel.  Copy the
+          // message into a FlatBufferBuilder and save it to disk.
+          // TODO(austin): We can be more efficient here when we start to
+          // care...
+          flatbuffers::FlatBufferBuilder fbb;
+          fbb.ForceDefaults(true);
+
+          const MessageHeader *msg =
+              flatbuffers::GetRoot<MessageHeader>(f.fetcher->context().data);
+
+          logger::MessageHeader::Builder message_header_builder(fbb);
+
+          // Note: this must match the same order as MessageBridgeServer and
+          // PackMessage.  We want identical headers to have identical
+          // on-the-wire formats to make comparing them easier.
+          message_header_builder.add_channel_index(msg->channel_index());
+
+          message_header_builder.add_queue_index(msg->queue_index());
+          message_header_builder.add_monotonic_sent_time(
+              msg->monotonic_sent_time());
+          message_header_builder.add_realtime_sent_time(
+              msg->realtime_sent_time());
+
+          message_header_builder.add_monotonic_remote_time(
+              msg->monotonic_remote_time());
+          message_header_builder.add_realtime_remote_time(
+              msg->realtime_remote_time());
+          message_header_builder.add_remote_queue_index(
+              msg->remote_queue_index());
+
+          fbb.FinishSizePrefixed(message_header_builder.Finish());
+
+          f.contents_writer->QueueSizedFlatbuffer(&fbb);
+        }
+
+        f.written = true;
+      } else {
+        break;
+      }
+    }
+  }
+  last_synchronized_time_ = t;
 }
 
 void Logger::DoLogData() {
@@ -205,83 +537,107 @@
   do {
     // Move the sync point up by at most polling_period.  This forces one sync
     // per iteration, even if it is small.
-    last_synchronized_time_ =
-        std::min(last_synchronized_time_ + polling_period_, monotonic_now);
-    // Write each channel to disk, one at a time.
-    for (FetcherStruct &f : fetchers_) {
-      while (true) {
-        if (f.written) {
-          if (!f.fetcher->FetchNext()) {
-            VLOG(2) << "No new data on "
-                    << configuration::CleanedChannelToString(
-                           f.fetcher->channel());
-            break;
-          } else {
-            f.written = false;
-          }
-        }
-
-        CHECK(!f.written);
-
-        // TODO(james): Write tests to exercise this logic.
-        if (f.fetcher->context().monotonic_event_time <
-            last_synchronized_time_) {
-          if (f.writer != nullptr) {
-            // Write!
-            flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
-                                               max_header_size_);
-            fbb.ForceDefaults(true);
-
-            fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                               f.channel_index, f.log_type));
-
-            VLOG(2) << "Writing data as node "
-                    << FlatbufferToJson(event_loop_->node()) << " for channel "
-                    << configuration::CleanedChannelToString(
-                           f.fetcher->channel())
-                    << " to " << f.writer->filename() << " data "
-                    << FlatbufferToJson(
-                           flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                               fbb.GetBufferPointer()));
-
-            max_header_size_ = std::max(
-                max_header_size_, fbb.GetSize() - f.fetcher->context().size);
-            f.writer->QueueSizedFlatbuffer(&fbb);
-          }
-
-          if (f.timestamp_writer != nullptr) {
-            // And now handle timestamps.
-            flatbuffers::FlatBufferBuilder fbb;
-            fbb.ForceDefaults(true);
-
-            fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                               f.channel_index,
-                                               LogType::kLogDeliveryTimeOnly));
-
-            VLOG(2) << "Writing timestamps as node "
-                    << FlatbufferToJson(event_loop_->node()) << " for channel "
-                    << configuration::CleanedChannelToString(
-                           f.fetcher->channel())
-                    << " to " << f.timestamp_writer->filename() << " timestamp "
-                    << FlatbufferToJson(
-                           flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                               fbb.GetBufferPointer()));
-
-            f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
-          }
-
-          f.written = true;
-        } else {
-          break;
-        }
-      }
-    }
+    LogUntil(
+        std::min(last_synchronized_time_ + polling_period_, monotonic_now));
 
     // If we missed cycles, we could be pretty far behind.  Spin until we are
     // caught up.
   } while (last_synchronized_time_ + polling_period_ < monotonic_now);
 }
 
+std::vector<std::vector<std::string>> SortParts(
+    const std::vector<std::string> &parts) {
+  // Start by grouping all parts by UUID, and extracting the part index.
+  std::map<std::string, std::vector<std::pair<std::string, int>>> parts_list;
+
+  // Sort part files without UUIDs and part indexes as well.  Extract everything
+  // useful from the log in the first pass, then sort later.
+  struct LogPart {
+    std::string filename;
+    monotonic_clock::time_point start_time;
+    monotonic_clock::time_point first_message_time;
+  };
+
+  std::vector<LogPart> old_parts;
+
+  for (const std::string &part : parts) {
+    FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
+
+    // Looks like an old log.  No UUID, index, and also single node.  We have
+    // little to no multi-node log files in the wild without part UUIDs and
+    // indexes which we care much about.
+    if (!log_header.message().has_parts_uuid() &&
+        !log_header.message().has_parts_index() &&
+        !log_header.message().has_node()) {
+      LogPart log_part;
+      log_part.filename = part;
+      log_part.start_time = monotonic_clock::time_point(
+          chrono::nanoseconds(log_header.message().monotonic_start_time()));
+      FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
+      log_part.first_message_time = monotonic_clock::time_point(
+          chrono::nanoseconds(first_message.message().monotonic_sent_time()));
+      old_parts.emplace_back(std::move(log_part));
+      continue;
+    }
+
+    CHECK(log_header.message().has_parts_uuid());
+    CHECK(log_header.message().has_parts_index());
+
+    const std::string parts_uuid = log_header.message().parts_uuid()->str();
+    auto it = parts_list.find(parts_uuid);
+    if (it == parts_list.end()) {
+      it = parts_list
+               .insert(std::make_pair(
+                   parts_uuid, std::vector<std::pair<std::string, int>>{}))
+               .first;
+    }
+    it->second.emplace_back(
+        std::make_pair(part, log_header.message().parts_index()));
+  }
+
+  CHECK_NE(old_parts.empty(), parts_list.empty())
+      << ": Can't have a mix of old and new parts.";
+
+  if (!old_parts.empty()) {
+    // Confirm they all have the same start time.  Old loggers always used the
+    // same start time.
+    for (const LogPart &p : old_parts) {
+      CHECK_EQ(old_parts[0].start_time, p.start_time);
+    }
+    // Sort by the oldest message in each file.
+    std::sort(old_parts.begin(), old_parts.end(),
+              [](const LogPart &a, const LogPart &b) {
+                return a.first_message_time < b.first_message_time;
+              });
+
+    // Produce the final form.
+    std::vector<std::string> sorted_old_parts;
+    sorted_old_parts.reserve(old_parts.size());
+    for (LogPart &p : old_parts) {
+      sorted_old_parts.emplace_back(std::move(p.filename));
+    }
+    return std::vector<std::vector<std::string>>{std::move(sorted_old_parts)};
+  }
+
+  // Now, sort them and produce the final vector form.
+  std::vector<std::vector<std::string>> result;
+  result.reserve(parts_list.size());
+  for (auto &part : parts_list) {
+    std::sort(part.second.begin(), part.second.end(),
+              [](const std::pair<std::string, int> &a,
+                 const std::pair<std::string, int> &b) {
+                return a.second < b.second;
+              });
+    std::vector<std::string> result_line;
+    result_line.reserve(part.second.size());
+    for (std::pair<std::string, int> &p : part.second) {
+      result_line.emplace_back(std::move(p.first));
+    }
+    result.emplace_back(std::move(result_line));
+  }
+  return result;
+}
+
 LogReader::LogReader(std::string_view filename,
                      const Configuration *replay_configuration)
     : LogReader(std::vector<std::string>{std::string(filename)},
@@ -302,7 +658,8 @@
   if (replay_configuration) {
     CHECK_EQ(configuration::MultiNode(configuration()),
              configuration::MultiNode(replay_configuration))
-        << ": Log file and replay config need to both be multi or single node.";
+        << ": Log file and replay config need to both be multi or single "
+           "node.";
   }
 
   if (!configuration::MultiNode(configuration())) {
@@ -312,12 +669,13 @@
     if (replay_configuration) {
       CHECK_EQ(logged_configuration()->nodes()->size(),
                replay_configuration->nodes()->size())
-          << ": Log file and replay config need to have matching nodes lists.";
+          << ": Log file and replay config need to have matching nodes "
+             "lists.";
       for (const Node *node : *logged_configuration()->nodes()) {
         if (configuration::GetNode(replay_configuration, node) == nullptr) {
-          LOG(FATAL)
-              << "Found node " << FlatbufferToJson(node)
-              << " in logged config that is not present in the replay config.";
+          LOG(FATAL) << "Found node " << FlatbufferToJson(node)
+                     << " in logged config that is not present in the replay "
+                        "config.";
         }
       }
     }
@@ -335,8 +693,8 @@
   if (offset_fp_ != nullptr) {
     fclose(offset_fp_);
   }
-  // Zero out some buffers. It's easy to do use-after-frees on these, so make it
-  // more obvious.
+  // Zero out some buffers. It's easy to do use-after-frees on these, so make
+  // it more obvious.
   if (remapped_configuration_buffer_) {
     remapped_configuration_buffer_->Wipe();
   }
@@ -352,8 +710,8 @@
 }
 
 std::vector<const Node *> LogReader::Nodes() const {
-  // Because the Node pointer will only be valid if it actually points to memory
-  // owned by remapped_configuration_, we need to wait for the
+  // Because the Node pointer will only be valid if it actually points to
+  // memory owned by remapped_configuration_, we need to wait for the
   // remapped_configuration_ to be populated before accessing it.
   //
   // Also, note, that when ever a map is changed, the nodes in here are
@@ -404,38 +762,45 @@
            "you sure that the replay config matches the original config?";
   }
 
-  // We need to now seed our per-node time offsets and get everything set up to
-  // run.
-  const size_t num_nodes = !configuration::MultiNode(logged_configuration())
-                               ? 1u
-                               : logged_configuration()->nodes()->size();
+  // We need to now seed our per-node time offsets and get everything set up
+  // to run.
+  const size_t num_nodes = nodes_count();
 
   // It is easiest to solve for per node offsets with a matrix rather than
   // trying to solve the equations by hand.  So let's get after it.
   //
   // Now, build up the map matrix.
   //
-  // sample_matrix_ = map_matrix_ * offset_matrix_
-  map_matrix_ = Eigen::MatrixXd::Zero(filters_.size() + 1, num_nodes);
+  // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
+  map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+      filters_.size() + 1, num_nodes);
+  slope_matrix_ =
+      Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+          filters_.size() + 1, num_nodes);
 
-  sample_matrix_ = Eigen::VectorXd::Zero(filters_.size() + 1);
-  offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+  offset_matrix_ =
+      Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+  valid_matrix_ =
+      Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
+  last_valid_matrix_ =
+      Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
 
-  // And the base offset matrix, which will be a copy of the initial offset
-  // matrix.
-  base_offset_matrix_ =
-      Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>::Zero(
-          num_nodes);
+  time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
+  time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
 
-  // All offsets should sum to 0.  Add that as the first constraint in our least
-  // squares.
-  map_matrix_.row(0).setOnes();
+  // All times should average out to the distributed clock.
+  for (int i = 0; i < map_matrix_.cols(); ++i) {
+    // 1/num_nodes.
+    map_matrix_(0, i) = mpq_class(1, num_nodes);
+  }
+  valid_matrix_(0) = true;
 
   {
     // Now, add the a - b -> sample elements.
     size_t i = 1;
     for (std::pair<const std::tuple<const Node *, const Node *>,
-                   message_bridge::ClippedAverageFilter> &filter : filters_) {
+                   std::tuple<message_bridge::NoncausalOffsetEstimator>>
+             &filter : filters_) {
       const Node *const node_a = std::get<0>(filter.first);
       const Node *const node_b = std::get<1>(filter.first);
 
@@ -444,136 +809,68 @@
       const size_t node_b_index =
           configuration::GetNodeIndex(configuration(), node_b);
 
-      // +a
-      map_matrix_(i, node_a_index) = 1.0;
-      // -b
-      map_matrix_(i, node_b_index) = -1.0;
+      // -a
+      map_matrix_(i, node_a_index) = mpq_class(-1);
+      // +b
+      map_matrix_(i, node_b_index) = mpq_class(1);
 
       // -> sample
-      filter.second.set_sample_pointer(&sample_matrix_(i, 0));
+      std::get<0>(filter.second)
+          .set_slope_pointer(&slope_matrix_(i, node_a_index));
+      std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
+
+      valid_matrix_(i) = false;
+      std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
 
       ++i;
     }
   }
 
-  // Rank of the map matrix tells you if all the nodes are in communication with
-  // each other, which tells you if the offsets are observable.
-  const size_t connected_nodes =
-      Eigen::FullPivLU<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>>(
-          map_matrix_)
-          .rank();
-
-  // We don't need to support isolated nodes until someone has a real use case.
-  CHECK_EQ(connected_nodes, num_nodes)
-      << ": There is a node which isn't communicating with the rest.";
-
-  // Now, iterate through all the timestamps from all the nodes and seed
-  // everything.
-  for (std::unique_ptr<State> &state : states_) {
-    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
-      TimestampMerger::DeliveryTimestamp timestamp =
-          state->OldestTimestampForChannel(i);
-      if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
-        CHECK(state->MaybeUpdateTimestamp(timestamp, i));
-      }
-    }
-  }
-
-  // Make sure all the samples have been seeded.
-  for (int i = 1; i < sample_matrix_.cols(); ++i) {
-    // The seeding logic is pretty basic right now because we don't have great
-    // use cases yet.  It wants to see data from every node.  Blow up for now,
-    // and once we have a reason to do something different, update this logic.
-    // Maybe read further in the log file?  Or seed off the realtime time?
-    CHECK_NE(sample_matrix_(i, 0), 0.0)
-        << ": Sample " << i << " is not seeded.";
-  }
-
-  // And solve.
-  offset_matrix_ = SolveOffsets();
-
-  // Save off the base offsets so we can work in deltas from here out.  That
-  // will significantly simplify the numerical precision problems.
-  for (size_t i = 0; i < num_nodes; ++i) {
-    base_offset_matrix_(i, 0) =
-        std::chrono::duration_cast<std::chrono::nanoseconds>(
-            std::chrono::duration<double>(offset_matrix_(i, 0)));
-  }
-
-  {
-    // Shift everything so we never could (reasonably) require the distributed
-    // clock to have a large backwards jump in time.  This makes it so the boot
-    // time on the node up the longest will essentially start matching the
-    // distributed clock.
-    const chrono::nanoseconds offset = -base_offset_matrix_.maxCoeff();
-    for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
-      base_offset_matrix_(i, 0) += offset;
-    }
-  }
-
-  {
-    // Re-compute the samples and setup all the filters so that they
-    // subtract this base offset.
-
-    size_t i = 1;
-    for (std::pair<const std::tuple<const Node *, const Node *>,
-                   message_bridge::ClippedAverageFilter> &filter : filters_) {
-      CHECK(filter.second.sample_pointer() == &sample_matrix_(i, 0));
-
-      const Node *const node_a = std::get<0>(filter.first);
-      const Node *const node_b = std::get<1>(filter.first);
-
-      const size_t node_a_index =
-          configuration::GetNodeIndex(configuration(), node_a);
-      const size_t node_b_index =
-          configuration::GetNodeIndex(configuration(), node_b);
-
-      filter.second.set_base_offset(base_offset_matrix_(node_a_index) -
-                                    base_offset_matrix_(node_b_index));
-
-      ++i;
-    }
-  }
-
-  // Now, iterate again through all the offsets now that we have set the base
-  // offset to something sane.  This will seed everything with an accurate
-  // initial offset.
-  for (std::unique_ptr<State> &state : states_) {
-    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
-      TimestampMerger::DeliveryTimestamp timestamp =
-          state->OldestTimestampForChannel(i);
-      if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
-        CHECK(state->MaybeUpdateTimestamp(timestamp, i));
-      }
-    }
-  }
-
   for (std::unique_ptr<State> &state : states_) {
     state->SeedSortedMessages();
   }
 
+  // Rank of the map matrix tells you if all the nodes are in communication
+  // with each other, which tells you if the offsets are observable.
+  const size_t connected_nodes =
+      Eigen::FullPivLU<
+          Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
+          .rank();
+
+  // We don't need to support isolated nodes until someone has a real use
+  // case.
+  CHECK_EQ(connected_nodes, num_nodes)
+      << ": There is a node which isn't communicating with the rest.";
+
+  // And solve.
   UpdateOffsets();
 
-  // We want to start the log file at the last start time of the log files from
-  // all the nodes.  Compute how long each node's simulation needs to run to
-  // move time to this point.
+  // We want to start the log file at the last start time of the log files
+  // from all the nodes.  Compute how long each node's simulation needs to run
+  // to move time to this point.
   distributed_clock::time_point start_time = distributed_clock::min_time;
 
+  // TODO(austin): We want an "OnStart" callback for each node rather than
+  // running until the last node.
+
   for (std::unique_ptr<State> &state : states_) {
-    // Setup the realtime clock to have something sane in it now.
-    state->SetRealtimeOffset(state->monotonic_start_time(),
-                             state->realtime_start_time());
-    // And start computing the start time on the distributed clock now that that
-    // works.
+    VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+            << MaybeNodeName(state->event_loop()->node()) << "now "
+            << state->monotonic_now();
+    // And start computing the start time on the distributed clock now that
+    // that works.
     start_time = std::max(
         start_time, state->ToDistributedClock(state->monotonic_start_time()));
   }
-  CHECK_GE(start_time, distributed_clock::epoch());
+
+  CHECK_GE(start_time, distributed_clock::epoch())
+      << ": Hmm, we have a node starting before the start of time.  Offset "
+         "everything.";
 
   // Forwarding is tracked per channel.  If it is enabled, we want to turn it
   // off.  Otherwise messages replayed will get forwarded across to the other
-  // nodes, and also replayed on the other nodes.  This may not satisfy all our
-  // users, but it'll start the discussion.
+  // nodes, and also replayed on the other nodes.  This may not satisfy all
+  // our users, but it'll start the discussion.
   if (configuration::MultiNode(event_loop_factory_->configuration())) {
     for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
       const Channel *channel = logged_configuration()->channels()->Get(i);
@@ -598,34 +895,177 @@
   // to timestamps on log files where the timestamp log file starts before the
   // data.  In this case, it is reasonable to expect missing data.
   ignore_missing_data_ = true;
-  VLOG(1) << "Running until start time: " << start_time;
+  VLOG(1) << "Running until " << start_time << " in Register";
   event_loop_factory_->RunFor(start_time.time_since_epoch());
   VLOG(1) << "At start time";
   // Now that we are running for real, missing data means that the log file is
   // corrupted or went wrong.
   ignore_missing_data_ = false;
-}
 
-void LogReader::UpdateOffsets() {
-  // TODO(austin): Evaluate less accurate inverses.  We might be able to
-  // do some tricks to keep the accuracy up.
-  offset_matrix_ = SolveOffsets();
-
-  size_t node_index = 0;
   for (std::unique_ptr<State> &state : states_) {
-    state->SetDistributedOffset(-offset(node_index), 1.0);
-    ++node_index;
+    // Make the RT clock be correct before handing it to the user.
+    if (state->realtime_start_time() != realtime_clock::min_time) {
+      state->SetRealtimeOffset(state->monotonic_start_time(),
+                               state->realtime_start_time());
+    }
+    VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+            << MaybeNodeName(state->event_loop()->node()) << "now "
+            << state->monotonic_now();
+  }
+
+  if (FLAGS_timestamps_to_csv) {
+    for (std::pair<const std::tuple<const Node *, const Node *>,
+                   std::tuple<message_bridge::NoncausalOffsetEstimator>>
+             &filter : filters_) {
+      const Node *const node_a = std::get<0>(filter.first);
+      const Node *const node_b = std::get<1>(filter.first);
+
+      std::get<0>(filter.second)
+          .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
+                               ->monotonic_now());
+      std::get<0>(filter.second)
+          .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
+                               ->monotonic_now());
+    }
   }
 }
 
-std::tuple<message_bridge::ClippedAverageFilter *, bool> LogReader::GetFilter(
+void LogReader::UpdateOffsets() {
+  VLOG(2) << "Samples are " << offset_matrix_;
+  VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
+  std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
+  Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+                           "]");
+  VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
+          << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
+
+  size_t node_index = 0;
+  for (std::unique_ptr<State> &state : states_) {
+    state->SetDistributedOffset(offset(node_index), slope(node_index));
+    VLOG(1) << "Offset for node " << node_index << " "
+            << MaybeNodeName(state->event_loop()->node()) << "is "
+            << aos::distributed_clock::time_point(offset(node_index))
+            << " slope " << std::setprecision(9) << std::fixed
+            << slope(node_index);
+    ++node_index;
+  }
+
+  if (VLOG_IS_ON(1)) {
+    LogFit("Offset is");
+  }
+}
+
+void LogReader::LogFit(std::string_view prefix) {
+  for (std::unique_ptr<State> &state : states_) {
+    VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
+            << state->monotonic_now() << " distributed "
+            << event_loop_factory_->distributed_now();
+  }
+
+  for (std::pair<const std::tuple<const Node *, const Node *>,
+                 std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
+       filters_) {
+    message_bridge::NoncausalOffsetEstimator *estimator =
+        &std::get<0>(filter.second);
+
+    if (estimator->a_timestamps().size() == 0 &&
+        estimator->b_timestamps().size() == 0) {
+      continue;
+    }
+
+    if (VLOG_IS_ON(1)) {
+      estimator->LogFit(prefix);
+    }
+
+    const Node *const node_a = std::get<0>(filter.first);
+    const Node *const node_b = std::get<1>(filter.first);
+
+    const size_t node_a_index =
+        configuration::GetNodeIndex(configuration(), node_a);
+    const size_t node_b_index =
+        configuration::GetNodeIndex(configuration(), node_b);
+
+    const double recovered_slope =
+        slope(node_b_index) / slope(node_a_index) - 1.0;
+    const int64_t recovered_offset =
+        offset(node_b_index).count() - offset(node_a_index).count() *
+                                           slope(node_b_index) /
+                                           slope(node_a_index);
+
+    VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
+            << " (error " << recovered_slope - estimator->fit().slope() << ") "
+            << " offset " << std::setprecision(20) << recovered_offset
+            << " (error "
+            << recovered_offset - estimator->fit().offset().count() << ")";
+
+    const aos::distributed_clock::time_point a0 =
+        states_[node_a_index]->ToDistributedClock(
+            std::get<0>(estimator->a_timestamps()[0]));
+    const aos::distributed_clock::time_point a1 =
+        states_[node_a_index]->ToDistributedClock(
+            std::get<0>(estimator->a_timestamps()[1]));
+
+    VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
+            << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
+            << " distributed -> " << node_b->name()->string_view() << " "
+            << states_[node_b_index]->FromDistributedClock(a0) << " should be "
+            << aos::monotonic_clock::time_point(
+                   std::chrono::nanoseconds(static_cast<int64_t>(
+                       std::get<0>(estimator->a_timestamps()[0])
+                           .time_since_epoch()
+                           .count() *
+                       (1.0 + estimator->fit().slope()))) +
+                   estimator->fit().offset())
+            << ((a0 <= event_loop_factory_->distributed_now())
+                    ? ""
+                    : " After now, investigate");
+    VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
+            << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
+            << " distributed -> " << node_b->name()->string_view() << " "
+            << states_[node_b_index]->FromDistributedClock(a1) << " should be "
+            << aos::monotonic_clock::time_point(
+                   std::chrono::nanoseconds(static_cast<int64_t>(
+                       std::get<0>(estimator->a_timestamps()[1])
+                           .time_since_epoch()
+                           .count() *
+                       (1.0 + estimator->fit().slope()))) +
+                   estimator->fit().offset())
+            << ((event_loop_factory_->distributed_now() <= a1)
+                    ? ""
+                    : " Before now, investigate");
+
+    const aos::distributed_clock::time_point b0 =
+        states_[node_b_index]->ToDistributedClock(
+            std::get<0>(estimator->b_timestamps()[0]));
+    const aos::distributed_clock::time_point b1 =
+        states_[node_b_index]->ToDistributedClock(
+            std::get<0>(estimator->b_timestamps()[1]));
+
+    VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
+            << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
+            << " distributed -> " << node_a->name()->string_view() << " "
+            << states_[node_a_index]->FromDistributedClock(b0)
+            << ((b0 <= event_loop_factory_->distributed_now())
+                    ? ""
+                    : " After now, investigate");
+    VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
+            << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
+            << " distributed -> " << node_a->name()->string_view() << " "
+            << states_[node_a_index]->FromDistributedClock(b1)
+            << ((event_loop_factory_->distributed_now() <= b1)
+                    ? ""
+                    : " Before now, investigate");
+  }
+}
+
+message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
     const Node *node_a, const Node *node_b) {
   CHECK_NE(node_a, node_b);
   CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
   CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
 
   if (node_a > node_b) {
-    return std::make_pair(std::get<0>(GetFilter(node_b, node_a)), false);
+    return GetFilter(node_b, node_a);
   }
 
   auto tuple = std::make_tuple(node_a, node_b);
@@ -633,53 +1073,27 @@
   auto it = filters_.find(tuple);
 
   if (it == filters_.end()) {
-    auto &x = filters_
-                  .insert(std::make_pair(
-                      tuple, message_bridge::ClippedAverageFilter()))
-                  .first->second;
+    auto &x =
+        filters_
+            .insert(std::make_pair(
+                tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
+                           node_a, node_b))))
+            .first->second;
     if (FLAGS_timestamps_to_csv) {
-      std::string fwd_name =
-          absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_",
-                       node_b->name()->string_view());
-      x.SetFwdCsvFileName(fwd_name);
-      std::string rev_name =
-          absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_",
-                       node_a->name()->string_view());
-      x.SetRevCsvFileName(rev_name);
+      std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
+          "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
+          node_b->name()->string_view()));
+      std::get<0>(x).SetRevCsvFileName(absl::StrCat(
+          "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
+          node_a->name()->string_view()));
     }
 
-    return std::make_tuple(&x, true);
+    return &std::get<0>(x);
   } else {
-    return std::make_tuple(&(it->second), true);
+    return &std::get<0>(it->second);
   }
 }
 
-bool LogReader::State::MaybeUpdateTimestamp(
-    const TimestampMerger::DeliveryTimestamp &channel_timestamp,
-    int channel_index) {
-  if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
-    CHECK(std::get<0>(filters_[channel_index]) == nullptr);
-    return false;
-  }
-
-  // Got a forwarding timestamp!
-  CHECK(std::get<0>(filters_[channel_index]) != nullptr);
-
-  // Call the correct method depending on if we are the forward or reverse
-  // direction here.
-  if (std::get<1>(filters_[channel_index])) {
-    std::get<0>(filters_[channel_index])
-        ->FwdSample(channel_timestamp.monotonic_event_time,
-                    channel_timestamp.monotonic_event_time -
-                        channel_timestamp.monotonic_remote_time);
-  } else {
-    std::get<0>(filters_[channel_index])
-        ->RevSample(channel_timestamp.monotonic_event_time,
-                    channel_timestamp.monotonic_event_time -
-                        channel_timestamp.monotonic_remote_time);
-  }
-  return true;
-}
 
 void LogReader::Register(EventLoop *event_loop) {
   State *state =
@@ -702,10 +1116,8 @@
     const Channel *channel =
         RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
 
-    std::tuple<message_bridge::ClippedAverageFilter *, bool> filter =
-        std::make_tuple(nullptr, false);
-
     NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
+    message_bridge::NoncausalOffsetEstimator *filter = nullptr;
 
     if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
         configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
@@ -730,31 +1142,48 @@
   }
 
   state->set_timer_handler(event_loop->AddTimer([this, state]() {
+    VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
+            << "at " << state->event_loop()->context().monotonic_event_time
+            << " now " << state->monotonic_now();
     if (state->OldestMessageTime() == monotonic_clock::max_time) {
       --live_nodes_;
-      VLOG(1) << "Node down!";
+      VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
       if (live_nodes_ == 0) {
         event_loop_factory_->Exit();
       }
       return;
     }
-    bool update_offsets = false;
     TimestampMerger::DeliveryTimestamp channel_timestamp;
     int channel_index;
     FlatbufferVector<MessageHeader> channel_data =
         FlatbufferVector<MessageHeader>::Empty();
 
-    bool dummy_update_time = false;
+    if (VLOG_IS_ON(1)) {
+      LogFit("Offset was");
+    }
+
+    bool update_time;
     std::tie(channel_timestamp, channel_index, channel_data) =
-        state->PopOldest(&dummy_update_time);
+        state->PopOldest(&update_time);
 
     const monotonic_clock::time_point monotonic_now =
         state->event_loop()->context().monotonic_event_time;
-    CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
-        << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
-        << monotonic_now << " trying to send "
-        << channel_timestamp.monotonic_event_time << " failure "
-        << state->DebugString();
+    if (!FLAGS_skip_order_validation) {
+      CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
+          << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+          << monotonic_now << " trying to send "
+          << channel_timestamp.monotonic_event_time << " failure "
+          << state->DebugString();
+    } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
+      LOG(WARNING) << "Check failed: monotonic_now == "
+                      "channel_timestamp.monotonic_event_time) ("
+                   << monotonic_now << " vs. "
+                   << channel_timestamp.monotonic_event_time
+                   << "): " << FlatbufferToJson(state->event_loop()->node())
+                   << " Now " << monotonic_now << " trying to send "
+                   << channel_timestamp.monotonic_event_time << " failure "
+                   << state->DebugString();
+    }
 
     if (channel_timestamp.monotonic_event_time >
             state->monotonic_start_time() ||
@@ -764,17 +1193,39 @@
           channel_data.message().data() != nullptr) {
         CHECK(channel_data.message().data() != nullptr)
             << ": Got a message without data.  Forwarding entry which was "
-               "not matched?  Use --skip_missing_forwarding_entries to ignore "
+               "not matched?  Use --skip_missing_forwarding_entries to "
+               "ignore "
                "this.";
 
-        if (state->MaybeUpdateTimestamp(channel_timestamp, channel_index)) {
+        if (update_time) {
           // Confirm that the message was sent on the sending node before the
           // destination node (this node).  As a proxy, do this by making sure
           // that time on the source node is past when the message was sent.
-          CHECK_LT(channel_timestamp.monotonic_remote_time,
-                   state->monotonic_remote_now(channel_index));
-
-          update_offsets = true;
+          if (!FLAGS_skip_order_validation) {
+            CHECK_LT(channel_timestamp.monotonic_remote_time,
+                     state->monotonic_remote_now(channel_index))
+                << state->event_loop()->node()->name()->string_view() << " to "
+                << state->remote_node(channel_index)->name()->string_view()
+                << " " << state->DebugString();
+          } else if (channel_timestamp.monotonic_remote_time >=
+                     state->monotonic_remote_now(channel_index)) {
+            LOG(WARNING)
+                << "Check failed: channel_timestamp.monotonic_remote_time < "
+                   "state->monotonic_remote_now(channel_index) ("
+                << channel_timestamp.monotonic_remote_time << " vs. "
+                << state->monotonic_remote_now(channel_index) << ") "
+                << state->event_loop()->node()->name()->string_view() << " to "
+                << state->remote_node(channel_index)->name()->string_view()
+                << " currently " << channel_timestamp.monotonic_event_time
+                << " ("
+                << state->ToDistributedClock(
+                       channel_timestamp.monotonic_event_time)
+                << ") remote event time "
+                << channel_timestamp.monotonic_remote_time << " ("
+                << state->RemoteToDistributedClock(
+                       channel_index, channel_timestamp.monotonic_remote_time)
+                << ") " << state->DebugString();
+          }
 
           if (FLAGS_timestamps_to_csv) {
             if (offset_fp_ == nullptr) {
@@ -789,13 +1240,14 @@
                     std::chrono::duration_cast<std::chrono::duration<double>>(
                         channel_timestamp.realtime_event_time - first_time_)
                         .count());
-            for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
-              fprintf(
-                  offset_fp_, ", %.9f",
-                  offset_matrix_(i, 0) +
-                      std::chrono::duration_cast<std::chrono::duration<double>>(
-                          base_offset_matrix_(i, 0))
-                          .count());
+            for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
+              fprintf(offset_fp_, ", %.9f",
+                      time_offset_matrix_(i, 0) +
+                          time_slope_matrix_(i, 0) *
+                              chrono::duration<double>(
+                                  event_loop_factory_->distributed_now()
+                                      .time_since_epoch())
+                                  .count());
             }
             fprintf(offset_fp_, "\n");
           }
@@ -805,21 +1257,26 @@
         state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
                                  channel_timestamp.realtime_event_time);
 
+        VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
+                << channel_timestamp.monotonic_event_time;
+        // TODO(austin): std::move channel_data in and make that efficient in
+        // simulation.
         state->Send(channel_index, channel_data.message().data()->Data(),
                     channel_data.message().data()->size(),
                     channel_timestamp.monotonic_remote_time,
                     channel_timestamp.realtime_remote_time,
                     channel_timestamp.remote_queue_index);
-      } else if (state->at_end()) {
+      } else if (state->at_end() && !ignore_missing_data_) {
         // We are at the end of the log file and found missing data.  Finish
-        // reading the rest of the log file and call it quits.  We don't want to
-        // replay partial data.
+        // reading the rest of the log file and call it quits.  We don't want
+        // to replay partial data.
         while (state->OldestMessageTime() != monotonic_clock::max_time) {
           bool update_time_dummy;
           state->PopOldest(&update_time_dummy);
         }
+      } else {
+        CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
       }
-
     } else {
       LOG(WARNING)
           << "Not sending data from before the start of the log file. "
@@ -830,22 +1287,100 @@
 
     const monotonic_clock::time_point next_time = state->OldestMessageTime();
     if (next_time != monotonic_clock::max_time) {
+      VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+              << "wakeup for " << next_time << "("
+              << state->ToDistributedClock(next_time)
+              << " distributed), now is " << state->monotonic_now();
       state->Setup(next_time);
     } else {
-      // Set a timer up immediately after now to die. If we don't do this, then
-      // the senders waiting on the message we just read will never get called.
+      VLOG(1) << MaybeNodeName(state->event_loop()->node())
+              << "No next message, scheduling shutdown";
+      // Set a timer up immediately after now to die. If we don't do this,
+      // then the senders waiting on the message we just read will never get
+      // called.
       if (event_loop_factory_ != nullptr) {
         state->Setup(monotonic_now + event_loop_factory_->send_delay() +
                      std::chrono::nanoseconds(1));
       }
     }
 
-    // Once we make this call, the current time changes.  So do everything which
-    // involves time before changing it.  That especially includes sending the
-    // message.
-    if (update_offsets) {
+    // Once we make this call, the current time changes.  So do everything
+    // which involves time before changing it.  That especially includes
+    // sending the message.
+    if (update_time) {
+      VLOG(1) << MaybeNodeName(state->event_loop()->node())
+              << "updating offsets";
+
+      std::vector<aos::monotonic_clock::time_point> before_times;
+      before_times.resize(states_.size());
+      std::transform(states_.begin(), states_.end(), before_times.begin(),
+                     [](const std::unique_ptr<State> &state) {
+                       return state->monotonic_now();
+                     });
+
+      for (size_t i = 0; i < states_.size(); ++i) {
+        VLOG(1) << MaybeNodeName(
+                       states_[i]->event_loop()->node())
+                << "before " << states_[i]->monotonic_now();
+      }
+
       UpdateOffsets();
+      VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
+              << state->monotonic_now();
+
+      for (size_t i = 0; i < states_.size(); ++i) {
+        VLOG(1) << MaybeNodeName(
+                       states_[i]->event_loop()->node())
+                << "after " << states_[i]->monotonic_now();
+      }
+
+      // TODO(austin): We should be perfect.
+      const std::chrono::nanoseconds kTolerance{3};
+      if (!FLAGS_skip_order_validation) {
+        CHECK_GE(next_time, state->monotonic_now())
+            << ": Time skipped the next event.";
+
+        for (size_t i = 0; i < states_.size(); ++i) {
+          CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
+              << ": Time changed too much on node "
+              << MaybeNodeName(states_[i]->event_loop()->node());
+          CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
+              << ": Time changed too much on node "
+              << states_[i]->event_loop()->node()->name()->string_view();
+        }
+      } else {
+        if (next_time < state->monotonic_now()) {
+          LOG(WARNING) << "Check failed: next_time >= "
+                          "state->monotonic_now() ("
+                       << next_time << " vs. " << state->monotonic_now()
+                       << "): Time skipped the next event.";
+        }
+        for (size_t i = 0; i < states_.size(); ++i) {
+          if (states_[i]->monotonic_now() >= before_times[i] - kTolerance) {
+            LOG(WARNING) << "Check failed: "
+                            "states_[i]->monotonic_now() "
+                            ">= before_times[i] - kTolerance ("
+                         << states_[i]->monotonic_now() << " vs. "
+                         << before_times[i] - kTolerance
+                         << ") : Time changed too much on node "
+                         << MaybeNodeName(states_[i]->event_loop()->node());
+          }
+          if (states_[i]->monotonic_now() <= before_times[i] + kTolerance) {
+            LOG(WARNING) << "Check failed: "
+                            "states_[i]->monotonic_now() "
+                            "<= before_times[i] + kTolerance ("
+                         << states_[i]->monotonic_now() << " vs. "
+                         << before_times[i] - kTolerance
+                         << ") : Time changed too much on node "
+                         << MaybeNodeName(states_[i]->event_loop()->node());
+          }
+        }
+      }
     }
+
+    VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
+            << state->event_loop()->context().monotonic_event_time << " now "
+            << state->monotonic_now();
   }));
 
   ++live_nodes_;
@@ -942,8 +1477,8 @@
     new_name_builder.add_name(name_offset);
     new_name_fbb.Finish(new_name_builder.Finish());
     const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
-    // Retrieve the channel that we want to copy, confirming that it is actually
-    // present in base_config.
+    // Retrieve the channel that we want to copy, confirming that it is
+    // actually present in base_config.
     const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
         base_config, logged_configuration()->channels()->Get(pair.first), "",
         nullptr));
@@ -1020,7 +1555,7 @@
 
 void LogReader::State::SetChannel(
     size_t channel, std::unique_ptr<RawSender> sender,
-    std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
+    message_bridge::NoncausalOffsetEstimator *filter,
     NodeEventLoopFactory *channel_target_event_loop_factory) {
   channels_[channel] = std::move(sender);
   filters_[channel] = filter;
@@ -1034,21 +1569,27 @@
   CHECK_GT(sorted_messages_.size(), 0u);
 
   std::tuple<TimestampMerger::DeliveryTimestamp, int,
-             FlatbufferVector<MessageHeader>>
+             FlatbufferVector<MessageHeader>,
+             message_bridge::NoncausalOffsetEstimator *>
       result = std::move(sorted_messages_.front());
-  VLOG(1) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+  VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
           << std::get<0>(result).monotonic_event_time;
   sorted_messages_.pop_front();
   SeedSortedMessages();
 
-  *update_time = false;
+  if (std::get<3>(result) != nullptr) {
+    *update_time = std::get<3>(result)->Pop(
+        event_loop_->node(), std::get<0>(result).monotonic_event_time);
+  } else {
+    *update_time = false;
+  }
   return std::make_tuple(std::get<0>(result), std::get<1>(result),
                          std::move(std::get<2>(result)));
 }
 
 monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
   if (sorted_messages_.size() > 0) {
-    VLOG(1) << MaybeNodeName(event_loop_->node()) << "oldest message at "
+    VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
             << std::get<0>(sorted_messages_.front()).monotonic_event_time;
     return std::get<0>(sorted_messages_.front()).monotonic_event_time;
   }
@@ -1081,11 +1622,26 @@
     FlatbufferVector<MessageHeader> channel_data =
         FlatbufferVector<MessageHeader>::Empty();
 
+    message_bridge::NoncausalOffsetEstimator *filter = nullptr;
+
     std::tie(channel_timestamp, channel_index, channel_data) =
         channel_merger_->PopOldest();
 
+    // Skip any messages without forwarding information.
+    if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
+      // Got a forwarding timestamp!
+      filter = filters_[channel_index];
+
+      CHECK(filter != nullptr);
+
+      // Call the correct method depending on if we are the forward or
+      // reverse direction here.
+      filter->Sample(event_loop_->node(),
+                     channel_timestamp.monotonic_event_time,
+                     channel_timestamp.monotonic_remote_time);
+    }
     sorted_messages_.emplace_back(channel_timestamp, channel_index,
-                                  std::move(channel_data));
+                                  std::move(channel_data), filter);
   }
 }
 
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index f1af17e..2905dfa 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -15,9 +15,11 @@
 
 table LogFileHeader {
   // Time this log file started on the monotonic clock in nanoseconds.
-  monotonic_start_time:long;
+  // If this isn't known (the log file is being recorded from another node
+  // where we don't know the time offset), both timestamps will be min_time.
+  monotonic_start_time:int64 = -9223372036854775808;
   // Time this log file started on the realtime clock in nanoseconds.
-  realtime_start_time:long;
+  realtime_start_time:int64 = -9223372036854775808;
 
   // Messages are not written in order to disk.  They will be out of order by
   // at most this duration (in nanoseconds).  If the log reader buffers until
@@ -33,6 +35,36 @@
 
   // The current node, if known and running in a multi-node configuration.
   node:Node;
+
+  // All UUIDs are uuid4.
+
+  // A log file is made up of a bunch of log files and parts.  These build up
+  // a tree.  Every .bfbs file has a LogFileHeader at the start.
+  //
+  //  /-- basename_pi1_data.part0.bfbs, basename_pi1_data.part1.bfbs, etc.
+  // ---- basename_timestamps/pi1/aos/remote_timestamps/pi2/aos.logger.MessageHeader.part0.bfbs, etc.
+  //  \-- basename_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs, etc.
+
+  // All log files and parts from a single run of a logger executable will have
+  // the same uuid.  This should be all the files generated on a single node.
+  // Used to correlate files recorded together.
+  logger_uuid:string;
+
+  // Part files which go together all have headers.  When creating a log file
+  // with multiple parts, the logger should stop writing to part n-1 as soon
+  // as it starts writing to part n, and write messages as though there was
+  // just 1 big file.  Therefore, part files won't be self standing, since
+  // they won't have data fetched at the beginning.
+
+  // If data is logged before the time offset can be established to the other
+  // node, the start time will be monotonic_clock::min_time, and a new part file
+  // will be created when the start time is known.
+
+  // All the parts which go together have the same uuid.
+  parts_uuid:string;
+  // And the parts_index corresponds to which part this is in the sequence.  The
+  // index should start at 0.
+  parts_index:int32;
 }
 
 // Table holding a message.
@@ -58,10 +90,12 @@
 
   // Time this message was sent on the monotonic clock of the remote node in
   // nanoseconds.
-  monotonic_remote_time:long = -9223372036854775808;
+  monotonic_remote_time:int64 = -9223372036854775808;
   // Time this message was sent on the realtime clock of the remote node in
   // nanoseconds.
-  realtime_remote_time:long = -9223372036854775808;
+  realtime_remote_time:int64 = -9223372036854775808;
   // Queue index of this message on the remote node.
-  remote_queue_index:uint = 4294967295;
+  remote_queue_index:uint32 = 4294967295;
 }
+
+root_type MessageHeader;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index a123dcc..32c2022 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -4,207 +4,94 @@
 #include <chrono>
 #include <deque>
 #include <string_view>
+#include <tuple>
 #include <vector>
 
 #include "Eigen/Dense"
 #include "absl/strings/str_cat.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/eigen_mpq.h"
+#include "aos/events/logging/log_namer.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
 #include "aos/events/simulated_event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/timestamp_filter.h"
 #include "aos/time/time.h"
 #include "flatbuffers/flatbuffers.h"
+#include "third_party/gmp/gmpxx.h"
 
 namespace aos {
 namespace logger {
 
-class LogNamer {
- public:
-  LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
-  virtual ~LogNamer() {}
-
-  virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
-                           const Node *node) = 0;
-  virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
-
-  virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
-  const std::vector<const Node *> &nodes() const { return nodes_; }
-
-  const Node *node() const { return node_; }
-
- protected:
-  const Node *const node_;
-  std::vector<const Node *> nodes_;
-};
-
-class LocalLogNamer : public LogNamer {
- public:
-  LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
-      : LogNamer(node), writer_(writer) {}
-
-  ~LocalLogNamer() override { writer_->Flush(); }
-
-  void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
-                   const Node *node) override {
-    CHECK_EQ(node, this->node());
-    writer_->WriteSizedFlatbuffer(
-        absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
-  }
-
-  DetachedBufferWriter *MakeWriter(const Channel *channel) override {
-    CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
-    return writer_;
-  }
-
-  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
-    CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
-        << ": Message is not delivered to this node.";
-    CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
-    CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
-                                                              node_))
-        << ": Delivery times aren't logged for this channel on this node.";
-    return writer_;
-  }
-
- private:
-  DetachedBufferWriter *writer_;
-};
-
-// TODO(austin): Split naming files from making files so we can re-use the
-// naming code to predict the log file names for a provided base name.
-class MultiNodeLogNamer : public LogNamer {
- public:
-  MultiNodeLogNamer(std::string_view base_name,
-                    const Configuration *configuration, const Node *node)
-      : LogNamer(node),
-        base_name_(base_name),
-        configuration_(configuration),
-        data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
-            base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
-
-  // Writes the header to all log files for a specific node.  This function
-  // needs to be called after all the writers are created.
-  void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
-    if (node == this->node()) {
-      data_writer_->WriteSizedFlatbuffer(
-          absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
-    } else {
-      for (std::pair<const Channel *const,
-                     std::unique_ptr<DetachedBufferWriter>> &data_writer :
-           data_writers_) {
-        if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
-          data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
-              fbb->GetBufferPointer(), fbb->GetSize()));
-        }
-      }
-    }
-  }
-
-  // Makes a data logger for a specific channel.
-  DetachedBufferWriter *MakeWriter(const Channel *channel) {
-    // See if we can read the data on this node at all.
-    const bool is_readable =
-        configuration::ChannelIsReadableOnNode(channel, this->node());
-    if (!is_readable) {
-      return nullptr;
-    }
-
-    // Then, see if we are supposed to log the data here.
-    const bool log_message =
-        configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
-
-    if (!log_message) {
-      return nullptr;
-    }
-
-    // Now, sort out if this is data generated on this node, or not.  It is
-    // generated if it is sendable on this node.
-    if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
-      return data_writer_.get();
-    } else {
-      // Ok, we have data that is being forwarded to us that we are supposed to
-      // log.  It needs to be logged with send timestamps, but be sorted enough
-      // to be able to be processed.
-      CHECK(data_writers_.find(channel) == data_writers_.end());
-
-      // Track that this node is being logged.
-      if (configuration::MultiNode(configuration_)) {
-        const Node *source_node = configuration::GetNode(
-            configuration_, channel->source_node()->string_view());
-        if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
-            nodes_.end()) {
-          nodes_.emplace_back(source_node);
-        }
-      }
-
-      return data_writers_
-          .insert(std::make_pair(
-              channel,
-              std::make_unique<DetachedBufferWriter>(absl::StrCat(
-                  base_name_, "_", channel->source_node()->string_view(),
-                  "_data", channel->name()->string_view(), "/",
-                  channel->type()->string_view(), ".bfbs"))))
-          .first->second.get();
-    }
-  }
-
-  // Makes a timestamp (or timestamp and data) logger for a channel and
-  // forwarding connection.
-  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
-    const bool log_delivery_times =
-        (this->node() == nullptr)
-            ? false
-            : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-                  channel, this->node(), this->node());
-    if (!log_delivery_times) {
-      return nullptr;
-    }
-
-    return data_writer_.get();
-  }
-
-  const std::vector<const Node *> &nodes() const { return nodes_; }
-
- private:
-  const std::string base_name_;
-  const Configuration *const configuration_;
-
-  // File to write both delivery timestamps and local data to.
-  std::unique_ptr<DetachedBufferWriter> data_writer_;
-  // Files to write remote data to.  We want one per channel.
-  std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
-      data_writers_;
-};
-
-
 // Logs all channels available in the event loop to disk every 100 ms.
 // Start by logging one message per channel to capture any state and
 // configuration that is sent rately on a channel and would affect execution.
 class Logger {
  public:
-  Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+  // Constructs a logger.
+  //   base_name/log_namer: Object used to write data to disk in one or more log
+  //     files.  If a base_name is passed in, a LocalLogNamer is wrapped
+  //     around it.
+  //   event_loop: The event loop used to read the messages.
+  //   polling_period: The period used to poll the data.
+  //   configuration: When provided, this is the configuration to log, and the
+  //     configuration to use for the channel list to log.  If not provided,
+  //     this becomes the configuration from the event loop.
+  Logger(std::string_view base_name, EventLoop *event_loop,
+         std::chrono::milliseconds polling_period =
+             std::chrono::milliseconds(100));
+  Logger(std::string_view base_name, EventLoop *event_loop,
+         const Configuration *configuration,
          std::chrono::milliseconds polling_period =
              std::chrono::milliseconds(100));
   Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
          std::chrono::milliseconds polling_period =
              std::chrono::milliseconds(100));
+  Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+         const Configuration *configuration,
+         std::chrono::milliseconds polling_period =
+             std::chrono::milliseconds(100));
+  ~Logger();
 
-  // Rotates the log file with the new writer.  This writes out the header
-  // again, but keeps going as if nothing else happened.
-  void Rotate(DetachedBufferWriter *writer);
-  void Rotate(std::unique_ptr<LogNamer> log_namer);
+  // Overrides the name in the log file header.
+  void set_name(std::string_view name) { name_ = name; }
+
+  // Rotates the log file(s), triggering new part files to be written for each
+  // log file.
+  void Rotate();
 
  private:
   void WriteHeader();
-  void WriteHeader(const Node *node);
+  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+      const Node *node);
+
+  bool MaybeUpdateTimestamp(
+      const Node *node, int node_index,
+      aos::monotonic_clock::time_point monotonic_start_time,
+      aos::realtime_clock::time_point realtime_start_time);
 
   void DoLogData();
 
+  void WriteMissingTimestamps();
+
+  void StartLogging();
+
+  // Fetches from each channel until all the data is logged.
+  void LogUntil(monotonic_clock::time_point t);
+
   EventLoop *event_loop_;
+  const UUID uuid_;
   std::unique_ptr<LogNamer> log_namer_;
 
+  // The configuration to place at the top of the log file.
+  const Configuration *configuration_;
+
+  // Name to save in the log file.  Defaults to hostname.
+  std::string name_;
+
   // Structure to track both a fetcher, and if the data fetched has been
   // written.  We may want to delay writing data to disk so that we don't let
   // data get too far out of order when written to disk so we can avoid making
@@ -219,6 +106,10 @@
 
     DetachedBufferWriter *writer = nullptr;
     DetachedBufferWriter *timestamp_writer = nullptr;
+    DetachedBufferWriter *contents_writer = nullptr;
+    const Node *writer_node = nullptr;
+    const Node *timestamp_node = nullptr;
+    int node_index = 0;
   };
 
   std::vector<FetcherStruct> fetchers_;
@@ -236,8 +127,31 @@
   // Max size that the header has consumed.  This much extra data will be
   // reserved in the builder to avoid reallocating.
   size_t max_header_size_ = 0;
+
+  // Fetcher for all the statistics from all the nodes.
+  aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
+  // Sets the start time for a specific node.
+  void SetStartTime(size_t node_index,
+                    aos::monotonic_clock::time_point monotonic_start_time,
+                    aos::realtime_clock::time_point realtime_start_time);
+
+  struct NodeState {
+    aos::monotonic_clock::time_point monotonic_start_time =
+        aos::monotonic_clock::min_time;
+    aos::realtime_clock::time_point realtime_start_time =
+        aos::realtime_clock::min_time;
+
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
+        aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
+  };
+  std::vector<NodeState> node_state_;
 };
 
+// Takes a bunch of parts and sorts them based on part_uuid and part_index.
+std::vector<std::vector<std::string>> SortParts(
+    const std::vector<std::string> &parts);
+
 // We end up with one of the following 3 log file types.
 //
 // Single node logged as the source node.
@@ -313,7 +227,11 @@
   // gets called.
   void Deregister();
 
-  // Returns the configuration from the log file.
+  // Returns the configuration being used for replay from the log file.
+  // Note that this may be different from the configuration actually used for
+  // handling events. You should generally only use this to create a
+  // SimulatedEventLoopFactory, and then get the configuration from there for
+  // everything else.
   const Configuration *logged_configuration() const;
   // Returns the configuration being used for replay.
   // The pointer is invalidated whenever RemapLoggedChannel is called.
@@ -355,6 +273,10 @@
     return &log_file_header_.message();
   }
 
+  std::string_view name() const {
+    return log_file_header()->name()->string_view();
+  }
+
  private:
   const Channel *RemapChannel(const EventLoop *event_loop,
                               const Channel *channel);
@@ -365,13 +287,25 @@
   // channels from calls to RemapLoggedChannel.
   void MakeRemappedConfig();
 
+  // Returns the number of nodes.
+  size_t nodes_count() const {
+    return !configuration::MultiNode(logged_configuration())
+               ? 1u
+               : logged_configuration()->nodes()->size();
+  }
+
   const std::vector<std::vector<std::string>> filenames_;
 
   // This is *a* log file header used to provide the logged config.  The rest of
   // the header is likely distracting.
   FlatbufferVector<LogFileHeader> log_file_header_;
 
-  Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
+  // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
+  std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+             Eigen::Matrix<double, Eigen::Dynamic, 1>>
+  SolveOffsets();
+
+  void LogFit(std::string_view prefix);
 
   // State per node.
   class State {
@@ -392,13 +326,6 @@
     // OldestMessageTime.
     void SeedSortedMessages();
 
-    // Updates the timestamp filter with the timestamp.  Returns true if the
-    // provided timestamp was actually a forwarding timestamp and used, and
-    // false otherwise.
-    bool MaybeUpdateTimestamp(
-        const TimestampMerger::DeliveryTimestamp &channel_timestamp,
-        int channel_index);
-
     // Returns the starting time for this node.
     monotonic_clock::time_point monotonic_start_time() const {
       return channel_merger_->monotonic_start_time();
@@ -416,13 +343,6 @@
     void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
     EventLoop *event_loop() { return event_loop_; }
 
-    // Returns the oldest timestamp for the provided channel.  This should only
-    // be called before SeedSortedMessages();
-    TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
-        size_t channel) {
-      return channel_merger_->OldestTimestampForChannel(channel);
-    }
-
     // Sets the current realtime offset from the monotonic clock for this node
     // (if we are on a simulated event loop).
     void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
@@ -440,6 +360,11 @@
       return node_event_loop_factory_->ToDistributedClock(time);
     }
 
+    monotonic_clock::time_point FromDistributedClock(
+        distributed_clock::time_point time) {
+      return node_event_loop_factory_->FromDistributedClock(time);
+    }
+
     // Sets the offset (and slope) from the distributed clock.
     void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
                               double distributed_slope) {
@@ -453,6 +378,20 @@
       return channel_target_event_loop_factory_[channel_index]->monotonic_now();
     }
 
+    distributed_clock::time_point RemoteToDistributedClock(
+        size_t channel_index, monotonic_clock::time_point time) {
+      return channel_target_event_loop_factory_[channel_index]
+          ->ToDistributedClock(time);
+    }
+
+    const Node *remote_node(size_t channel_index) {
+      return channel_target_event_loop_factory_[channel_index]->node();
+    }
+
+    monotonic_clock::time_point monotonic_now() {
+      return node_event_loop_factory_->monotonic_now();
+    }
+
     // Sets the node we will be merging as, and returns true if there is any
     // data on it.
     bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
@@ -461,10 +400,9 @@
     void SetChannelCount(size_t count);
 
     // Sets the sender, filter, and target factory for a channel.
-    void SetChannel(
-        size_t channel, std::unique_ptr<RawSender> sender,
-        std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
-        NodeEventLoopFactory *channel_target_event_loop_factory);
+    void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
+                    message_bridge::NoncausalOffsetEstimator *filter,
+                    NodeEventLoopFactory *channel_target_event_loop_factory);
 
     // Returns if we have read all the messages from all the logs.
     bool at_end() const { return channel_merger_->at_end(); }
@@ -493,14 +431,32 @@
     }
 
     // Returns a debug string for the channel merger.
-    std::string DebugString() const { return channel_merger_->DebugString(); }
+    std::string DebugString() const {
+      std::stringstream messages;
+      size_t i = 0;
+      for (const auto &message : sorted_messages_) {
+        if (i < 7 || i + 7 > sorted_messages_.size()) {
+          messages << "sorted_messages[" << i
+                   << "]: " << std::get<0>(message).monotonic_event_time << " "
+                   << configuration::StrippedChannelToString(
+                          event_loop_->configuration()->channels()->Get(
+                              std::get<2>(message).message().channel_index()))
+                   << "\n";
+        } else if (i == 7) {
+          messages << "...\n";
+        }
+        ++i;
+      }
+      return messages.str() + channel_merger_->DebugString();
+    }
 
    private:
     // Log file.
     std::unique_ptr<ChannelMerger> channel_merger_;
 
     std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
-                          FlatbufferVector<MessageHeader>>>
+                          FlatbufferVector<MessageHeader>,
+                          message_bridge::NoncausalOffsetEstimator *>>
         sorted_messages_;
 
     // Senders.
@@ -518,8 +474,7 @@
     // This corresponds to the object which is shared among all the channels
     // going between 2 nodes.  The second element in the tuple indicates if this
     // is the primary direction or not.
-    std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
-        filters_;
+    std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
 
     // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
     // channel) which correspond to the originating node.
@@ -532,8 +487,8 @@
   // Creates the requested filter if it doesn't exist, regardless of whether
   // these nodes can actually communicate directly.  The second return value
   // reports if this is the primary direction or not.
-  std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
-      const Node *node_a, const Node *node_b);
+  message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
+                                                      const Node *node_b);
 
   // FILE to write offsets to (if populated).
   FILE *offset_fp_ = nullptr;
@@ -544,32 +499,79 @@
   // List of filters for a connection.  The pointer to the first node will be
   // less than the second node.
   std::map<std::tuple<const Node *, const Node *>,
-           message_bridge::ClippedAverageFilter>
+           std::tuple<message_bridge::NoncausalOffsetEstimator>>
       filters_;
 
   // Returns the offset from the monotonic clock for a node to the distributed
-  // clock.  distributed = monotonic + offset;
-  std::chrono::nanoseconds offset(int node_index) const {
-    CHECK_LT(node_index, offset_matrix_.rows())
+  // clock.  monotonic = distributed * slope() + offset();
+  double slope(int node_index) const {
+    CHECK_LT(node_index, time_slope_matrix_.rows())
         << ": Got too high of a node index.";
-    return -std::chrono::duration_cast<std::chrono::nanoseconds>(
-               std::chrono::duration<double>(offset_matrix_(node_index))) -
-           base_offset_matrix_(node_index);
+    return time_slope_matrix_(node_index);
+  }
+  std::chrono::nanoseconds offset(int node_index) const {
+    CHECK_LT(node_index, time_offset_matrix_.rows())
+        << ": Got too high of a node index.";
+    return std::chrono::duration_cast<std::chrono::nanoseconds>(
+        std::chrono::duration<double>(time_offset_matrix_(node_index)));
   }
 
   // Updates the offset matrix solution and sets the per-node distributed
   // offsets in the factory.
   void UpdateOffsets();
 
-  // sample_matrix_ = map_matrix_ * offset_matrix_
-  Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
-  Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
-  Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
+  // We have 2 types of equations to do a least squares regression over to fully
+  // constrain our time function.
+  //
+  // One is simple.  The distributed clock is the average of all the clocks.
+  //   (ta + tb + tc + td) / num_nodex = t_distributed
+  //
+  // The second is a bit more complicated.  Our basic time conversion function
+  // is:
+  //   tb = ta + (ta * slope + offset)
+  // We can rewrite this as follows
+  //   tb - (1 + slope) * ta = offset
+  //
+  // From here, we have enough equations to solve for t{a,b,c,...}  We want to
+  // take as an input the offsets and slope, and solve for the per-node times as
+  // a function of the distributed clock.
+  //
+  // We need to massage our equations to make this work.  If we solve for the
+  // per-node times at two set distributed clock times, we will be able to
+  // recreate the linear function (we know it is linear).  We can do a similar
+  // thing by breaking our equation up into:
+  // 
+  // [1/3  1/3  1/3  ] [ta]   [t_distributed]
+  // [ 1  -1-m1  0   ] [tb] = [oab]
+  // [ 1    0  -1-m2 ] [tc]   [oac]
+  //
+  // This solves to:
+  //
+  // [ta]   [ a00 a01 a02]   [t_distributed]
+  // [tb] = [ a10 a11 a12] * [oab]
+  // [tc]   [ a20 a21 a22]   [oac]
+  //
+  // and can be split into:
+  //
+  // [ta]   [ a00 ]                   [a01 a02]
+  // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
+  // [tc]   [ a20 ]                   [a21 a22]   [oac]
+  //
+  // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
+  // offset_matrix_ will be in nanoseconds.
+  Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
+  Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
+  Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
+  // Matrix tracking which offsets are valid.
+  Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
+  // Matrix tracking the last valid matrix we used to determine connected nodes.
+  Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
+  size_t cached_valid_node_count_ = 0;
 
-  // Base offsets.  The actual offset is the sum of this and the offset matrix.
-  // This removes some of the dynamic range challenges from the double above.
-  Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
-      base_offset_matrix_;
+  // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
+  // t is in seconds.
+  Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
+  Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
 
   std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
       remapped_configuration_buffer_;
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index 2575e85..f95fa17 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -25,11 +25,8 @@
   std::unique_ptr<aos::logger::DetachedBufferWriter> writer;
   std::unique_ptr<aos::logger::LogNamer> log_namer;
   if (event_loop.node() == nullptr) {
-    writer = std::make_unique<aos::logger::DetachedBufferWriter>(
-        aos::logging::GetLogName("fbs_log"));
-
-    log_namer = std::make_unique<aos::logger::LocalLogNamer>(writer.get(),
-                                                             event_loop.node());
+    log_namer = std::make_unique<aos::logger::LocalLogNamer>(
+        aos::logging::GetLogName("fbs_log"), event_loop.node());
   } else {
     log_namer = std::make_unique<aos::logger::MultiNodeLogNamer>(
         aos::logging::GetLogName("fbs_log"), event_loop.configuration(),
@@ -41,5 +38,7 @@
 
   event_loop.Run();
 
+  LOG(INFO) << "Shutting down";
+
   return 0;
 }
diff --git a/aos/events/logging/logger_math.cc b/aos/events/logging/logger_math.cc
index 313894b..c333f55 100644
--- a/aos/events/logging/logger_math.cc
+++ b/aos/events/logging/logger_math.cc
@@ -2,14 +2,187 @@
 
 #include "Eigen/Dense"
 
+#include "third_party/gmp/gmpxx.h"
+
 namespace aos {
 namespace logger {
 
+namespace {
+Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> ToDouble(
+    Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> in) {
+  Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> result =
+      Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>::Zero(in.rows(),
+                                                                  in.cols());
+  for (int i = 0; i < in.rows(); ++i) {
+    for (int j = 0; j < in.cols(); ++j) {
+      result(i, j) = in(i, j).get_d();
+    }
+  }
+  return result;
+}
+
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+           Eigen::Matrix<double, Eigen::Dynamic, 1>>
+Solve(const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> &mpq_map,
+      const Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> &mpq_offsets) {
+  aos::monotonic_clock::time_point start_time = aos::monotonic_clock::now();
+  // Least squares solve for the slopes and offsets.
+  const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> inv =
+      (mpq_map.transpose() * mpq_map).inverse() * mpq_map.transpose();
+  aos::monotonic_clock::time_point end_time = aos::monotonic_clock::now();
+
+  VLOG(1) << "Took "
+          << std::chrono::duration<double>(end_time - start_time).count()
+          << " seconds to invert";
+
+  Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_slope =
+      inv.block(0, 0, inv.rows(), 1);
+  Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_solution_offset =
+      inv.block(0, 1, inv.rows(), inv.cols() - 1) *
+      mpq_offsets.block(1, 0, inv.rows() - 1, 1);
+
+  mpq_solution_offset *= mpq_class(1, 1000000000);
+
+  return std::make_tuple(ToDouble(mpq_solution_slope),
+                         ToDouble(mpq_solution_offset));
+}
+}  // namespace
+
 // This is slow to compile, so we put it in a separate file.  More parallelism
 // and less change.
-Eigen::Matrix<double, Eigen::Dynamic, 1> LogReader::SolveOffsets() {
-  return map_matrix_.bdcSvd(Eigen::ComputeThinU | Eigen::ComputeThinV)
-      .solve(sample_matrix_);
+std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+           Eigen::Matrix<double, Eigen::Dynamic, 1>>
+LogReader::SolveOffsets() {
+  // TODO(austin): Split this out and unit tests a bit better.  When we do
+  // partial node subsets and also try to optimize this again would be a good
+  // time.
+  //
+  // TODO(austin): CHECK that the number doesn't change over time.  We can freak
+  // out if that happens.
+
+  // Start by counting how many node pairs we have an offset estimated for.
+  int nonzero_offset_count = 1;
+  for (int i = 1; i < valid_matrix_.rows(); ++i) {
+    if (valid_matrix_(i) != 0) {
+      ++nonzero_offset_count;
+    }
+  }
+
+  Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
+                           "]");
+
+  // If there are missing rows, we can't solve the original problem and instead
+  // need to filter the matrix to remove the missing rows and solve a simplified
+  // problem.  What this means practically is that we might have pairs of nodes
+  // which are communicating, but we don't have timestamps between.  But we can
+  // have multiple paths in our graph between 2 nodes, so we can still solve
+  // time without the missing timestamp.
+  //
+  // In the following example, we can drop any of the last 3 rows, and still
+  // solve.
+  //
+  // [1/3  1/3  1/3  ] [ta]   [t_distributed]
+  // [ 1  -1-m1  0   ] [tb] = [oab]
+  // [ 1    0  -1-m2 ] [tc]   [oac]
+  // [ 0    1  -1-m2 ]        [obc]
+  if (nonzero_offset_count != offset_matrix_.rows()) {
+    Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+        Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
+            nonzero_offset_count, map_matrix_.cols());
+    Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> mpq_offsets =
+        Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(nonzero_offset_count);
+
+    std::vector<bool> valid_nodes(nodes_count(), false);
+
+    size_t destination_row = 0;
+    for (int j = 0; j < map_matrix_.cols(); ++j) {
+      mpq_map(0, j) = mpq_class(1, map_matrix_.cols());
+    }
+    mpq_offsets(0) = mpq_class(0);
+    ++destination_row;
+
+    for (int i = 1; i < offset_matrix_.rows(); ++i) {
+      // Copy over the first row, i.e. the row which says that all times average
+      // to the distributed time.  And then copy over all valid rows.
+      if (valid_matrix_(i)) {
+        mpq_offsets(destination_row) = mpq_class(offset_matrix_(i));
+
+        for (int j = 0; j < map_matrix_.cols(); ++j) {
+          mpq_map(destination_row, j) = map_matrix_(i, j) + slope_matrix_(i, j);
+          if (mpq_map(destination_row, j) != 0) {
+            valid_nodes[j] = true;
+          }
+        }
+
+        ++destination_row;
+      }
+    }
+
+    VLOG(1) << "Filtered map " << ToDouble(mpq_map).format(HeavyFmt);
+    VLOG(1) << "Filtered offsets " << ToDouble(mpq_offsets).format(HeavyFmt);
+
+    // Compute (and cache) the current connectivity.  If we have N nodes
+    // configured, but logs only from one of them, we want to assume that the
+    // rest of the nodes match the distributed clock exactly.
+    //
+    // If data shows up later for them, we will CHECK when time jumps.
+    //
+    // TODO(austin): Once we have more info on what cases are reasonable, we can
+    // open up the restrictions.
+    if (valid_matrix_ != last_valid_matrix_) {
+      Eigen::FullPivLU<Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>
+          full_piv(mpq_map);
+      const size_t connected_nodes = full_piv.rank();
+
+      size_t valid_node_count = 0;
+      for (size_t i = 0; i < valid_nodes.size(); ++i) {
+        const bool valid_node = valid_nodes[i];
+        if (valid_node) {
+          ++valid_node_count;
+        } else {
+          LOG(WARNING)
+              << "Node "
+              << logged_configuration()->nodes()->Get(i)->name()->string_view()
+              << " has no observations, setting to distributed clock.";
+        }
+      }
+
+      // Confirm that the set of nodes we have connected matches the rank.
+      // Otherwise a<->b and c<->d would count as 4 but really is 3.
+      CHECK_EQ(std::max(static_cast<size_t>(1u), valid_node_count),
+               connected_nodes)
+          << ": Ambiguous nodes.";
+
+      last_valid_matrix_ = valid_matrix_;
+      cached_valid_node_count_ = valid_node_count;
+    }
+
+    // There are 2 cases.  Either all the nodes are connected with each other by
+    // actual data, or we have isolated nodes.  We want to force the isolated
+    // nodes to match the distributed clock exactly, and to solve for the other
+    // nodes.
+    if (cached_valid_node_count_ == 0) {
+      // Cheat.  If there are no valid nodes, the slopes are 1, and offset is 0,
+      // ie, just be the distributed clock.
+      return std::make_tuple(
+          Eigen::Matrix<double, Eigen::Dynamic, 1>::Ones(nodes_count()),
+          Eigen::Matrix<double, Eigen::Dynamic, 1>::Zero(nodes_count()));
+    } else {
+      // TODO(austin): Solve just the nodes we know about.  This is harder and
+      // there are no logs which require this yet to test on.
+      CHECK_EQ(cached_valid_node_count_, nodes_count())
+          << ": TODO(austin): Handle partial valid nodes";
+
+      return Solve(mpq_map, mpq_offsets);
+    }
+  } else {
+    const Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> mpq_map =
+        map_matrix_ + slope_matrix_;
+    VLOG(1) << "map " << (map_matrix_ + slope_matrix_).format(HeavyFmt);
+    VLOG(1) << "offsets " << offset_matrix_.format(HeavyFmt);
+
+    return Solve(mpq_map, offset_matrix_);
+  }
 }
 
 }  // namespace logger
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 3b10f9d..47fe9e2 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -4,6 +4,7 @@
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
 #include "aos/events/simulated_event_loop.h"
+#include "aos/util/file.h"
 #include "glog/logging.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
@@ -42,20 +43,20 @@
 // the config.
 TEST_F(LoggerTest, Starts) {
   const ::std::string tmpdir(getenv("TEST_TMPDIR"));
-  const ::std::string logfile = tmpdir + "/logfile.bfbs";
+  const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string logfile = base_name + ".part0.bfbs";
   // Remove it.
   unlink(logfile.c_str());
 
   LOG(INFO) << "Logging data to " << logfile;
 
   {
-    DetachedBufferWriter writer(logfile);
     std::unique_ptr<EventLoop> logger_event_loop =
         event_loop_factory_.MakeEventLoop("logger");
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
-    Logger logger(&writer, logger_event_loop.get(),
+    Logger logger(base_name, logger_event_loop.get(),
                   std::chrono::milliseconds(100));
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
@@ -101,8 +102,9 @@
 // Tests that we can read and write rotated log files.
 TEST_F(LoggerTest, RotatedLogFile) {
   const ::std::string tmpdir(getenv("TEST_TMPDIR"));
-  const ::std::string logfile0 = tmpdir + "/logfile0.bfbs";
-  const ::std::string logfile1 = tmpdir + "/logfile1.bfbs";
+  const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string logfile0 = base_name + ".part0.bfbs";
+  const ::std::string logfile1 = base_name + ".part1.bfbs";
   // Remove it.
   unlink(logfile0.c_str());
   unlink(logfile1.c_str());
@@ -110,20 +112,36 @@
   LOG(INFO) << "Logging data to " << logfile0 << " and " << logfile1;
 
   {
-    DetachedBufferWriter writer0(logfile0);
-    DetachedBufferWriter writer1(logfile1);
     std::unique_ptr<EventLoop> logger_event_loop =
         event_loop_factory_.MakeEventLoop("logger");
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
-    Logger logger(&writer0, logger_event_loop.get(),
-                  std::chrono::milliseconds(100));
+    Logger logger(
+        std::make_unique<LocalLogNamer>(base_name, logger_event_loop->node()),
+        logger_event_loop.get(), std::chrono::milliseconds(100));
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
-    logger.Rotate(&writer1);
+    logger.Rotate();
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
   }
 
+  {
+    // Confirm that the UUIDs match for both the parts and the logger, and the
+    // parts_index increments.
+    std::vector<FlatbufferVector<LogFileHeader>> log_header;
+    for (std::string_view f : {logfile0, logfile1}) {
+      log_header.emplace_back(ReadHeader(f));
+    }
+
+    EXPECT_EQ(log_header[0].message().logger_uuid()->string_view(),
+              log_header[1].message().logger_uuid()->string_view());
+    EXPECT_EQ(log_header[0].message().parts_uuid()->string_view(),
+              log_header[1].message().parts_uuid()->string_view());
+
+    EXPECT_EQ(log_header[0].message().parts_index(), 0);
+    EXPECT_EQ(log_header[1].message().parts_index(), 1);
+  }
+
   // Even though it doesn't make any difference here, exercise the logic for
   // passing in a separate config.
   LogReader reader(std::vector<std::string>{logfile0, logfile1},
@@ -166,7 +184,8 @@
 // Tests that a large number of messages per second doesn't overwhelm writev.
 TEST_F(LoggerTest, ManyMessages) {
   const ::std::string tmpdir(getenv("TEST_TMPDIR"));
-  const ::std::string logfile = tmpdir + "/logfile.bfbs";
+  const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string logfile = base_name + ".part0.bfbs";
   // Remove the log file.
   unlink(logfile.c_str());
 
@@ -174,7 +193,6 @@
   ping_.set_quiet(true);
 
   {
-    DetachedBufferWriter writer(logfile);
     std::unique_ptr<EventLoop> logger_event_loop =
         event_loop_factory_.MakeEventLoop("logger");
 
@@ -198,7 +216,7 @@
                            chrono::microseconds(50));
     });
 
-    Logger logger(&writer, logger_event_loop.get(),
+    Logger logger(base_name, logger_event_loop.get(),
                   std::chrono::milliseconds(100));
 
     event_loop_factory_.RunFor(chrono::milliseconds(1000));
@@ -217,9 +235,35 @@
             configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
         tmp_dir_(getenv("TEST_TMPDIR")),
         logfile_base_(tmp_dir_ + "/multi_logfile"),
-        logfiles_({logfile_base_ + "_pi1_data.bfbs",
-                   logfile_base_ + "_pi2_data/test/aos.examples.Pong.bfbs",
-                   logfile_base_ + "_pi2_data.bfbs"}),
+        logfiles_(
+            {logfile_base_ + "_pi1_data.part0.bfbs",
+             logfile_base_ + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
+             logfile_base_ + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
+             logfile_base_ + "_pi2_data.part0.bfbs",
+             logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+                             "aos.logger.MessageHeader.part0.bfbs",
+             logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+                             "aos.logger.MessageHeader.part1.bfbs",
+             logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+                             "aos.logger.MessageHeader.part0.bfbs",
+             logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+                             "aos.logger.MessageHeader.part1.bfbs",
+             logfile_base_ +
+                 "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
+             logfile_base_ +
+                 "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1.bfbs",
+             logfile_base_ +
+                 "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs",
+             logfile_base_ +
+                 "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs"}),
+        structured_logfiles_{
+            std::vector<std::string>{logfiles_[0]},
+            std::vector<std::string>{logfiles_[1], logfiles_[2]},
+            std::vector<std::string>{logfiles_[3]},
+            std::vector<std::string>{logfiles_[4], logfiles_[5]},
+            std::vector<std::string>{logfiles_[6], logfiles_[7]},
+            std::vector<std::string>{logfiles_[8], logfiles_[9]},
+            std::vector<std::string>{logfiles_[10], logfiles_[11]}},
         ping_event_loop_(event_loop_factory_.MakeEventLoop("ping", pi1_)),
         ping_(ping_event_loop_.get()),
         pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
@@ -259,7 +303,9 @@
 
   std::string tmp_dir_;
   std::string logfile_base_;
-  std::array<std::string, 3> logfiles_;
+  std::vector<std::string> logfiles_;
+
+  std::vector<std::vector<std::string>> structured_logfiles_;
 
   std::unique_ptr<EventLoop> ping_event_loop_;
   Ping ping_;
@@ -346,47 +392,137 @@
   }
 
   {
-    // Confirm that the headers are all for the correct nodes.
-    FlatbufferVector<LogFileHeader> logheader1 = ReadHeader(logfiles_[0]);
-    EXPECT_EQ(logheader1.message().node()->name()->string_view(), "pi1");
-    FlatbufferVector<LogFileHeader> logheader2 = ReadHeader(logfiles_[1]);
-    EXPECT_EQ(logheader2.message().node()->name()->string_view(), "pi2");
-    FlatbufferVector<LogFileHeader> logheader3 = ReadHeader(logfiles_[2]);
-    EXPECT_EQ(logheader3.message().node()->name()->string_view(), "pi2");
+    std::set<std::string> logfile_uuids;
+    std::set<std::string> parts_uuids;
+    // Confirm that we have the expected number of UUIDs for both the logfile
+    // UUIDs and parts UUIDs.
+    std::vector<FlatbufferVector<LogFileHeader>> log_header;
+    for (std::string_view f : logfiles_) {
+      log_header.emplace_back(ReadHeader(f));
+      logfile_uuids.insert(log_header.back().message().logger_uuid()->str());
+      parts_uuids.insert(log_header.back().message().parts_uuid()->str());
+    }
 
+    EXPECT_EQ(logfile_uuids.size(), 2u);
+    EXPECT_EQ(parts_uuids.size(), 7u);
+
+    // And confirm everything is on the correct node.
+    EXPECT_EQ(log_header[0].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[1].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[3].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[4].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
+
+    // And the parts index matches.
+    EXPECT_EQ(log_header[0].message().parts_index(), 0);
+    EXPECT_EQ(log_header[1].message().parts_index(), 0);
+    EXPECT_EQ(log_header[2].message().parts_index(), 1);
+    EXPECT_EQ(log_header[3].message().parts_index(), 0);
+    EXPECT_EQ(log_header[4].message().parts_index(), 0);
+    EXPECT_EQ(log_header[5].message().parts_index(), 1);
+    EXPECT_EQ(log_header[6].message().parts_index(), 0);
+    EXPECT_EQ(log_header[7].message().parts_index(), 1);
+    EXPECT_EQ(log_header[8].message().parts_index(), 0);
+    EXPECT_EQ(log_header[9].message().parts_index(), 1);
+    EXPECT_EQ(log_header[10].message().parts_index(), 0);
+    EXPECT_EQ(log_header[11].message().parts_index(), 1);
+  }
+
+  {
     using ::testing::UnorderedElementsAre;
 
     // Timing reports, pings
-    EXPECT_THAT(CountChannelsData(logfiles_[0]),
-                UnorderedElementsAre(
-                    std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
-                    std::make_tuple("/test", "aos.examples.Ping", 2001)));
+    EXPECT_THAT(
+        CountChannelsData(logfiles_[0]),
+        UnorderedElementsAre(
+            std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+            std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
+            std::make_tuple("/test", "aos.examples.Ping", 2001)));
     // Timestamps for pong
-    EXPECT_THAT(CountChannelsTimestamp(logfiles_[0]),
-                UnorderedElementsAre(
-                    std::make_tuple("/test", "aos.examples.Pong", 2001)));
+    EXPECT_THAT(
+        CountChannelsTimestamp(logfiles_[0]),
+        UnorderedElementsAre(
+            std::make_tuple("/test", "aos.examples.Pong", 2001),
+            std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)));
 
     // Pong data.
     EXPECT_THAT(CountChannelsData(logfiles_[1]),
                 UnorderedElementsAre(
-                    std::make_tuple("/test", "aos.examples.Pong", 2001)));
+                    std::make_tuple("/test", "aos.examples.Pong", 101)));
+    EXPECT_THAT(CountChannelsData(logfiles_[2]),
+                UnorderedElementsAre(
+                    std::make_tuple("/test", "aos.examples.Pong", 1900)));
 
     // No timestamps
     EXPECT_THAT(CountChannelsTimestamp(logfiles_[1]), UnorderedElementsAre());
+    EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]), UnorderedElementsAre());
 
     // Timing reports and pongs.
-    EXPECT_THAT(CountChannelsData(logfiles_[2]),
-                UnorderedElementsAre(
-                    std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
-                    std::make_tuple("/test", "aos.examples.Pong", 2001)));
+    EXPECT_THAT(
+        CountChannelsData(logfiles_[3]),
+        UnorderedElementsAre(
+            std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
+            std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
+            std::make_tuple("/test", "aos.examples.Pong", 2001)));
     // And ping timestamps.
-    EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]),
-                UnorderedElementsAre(
-                    std::make_tuple("/test", "aos.examples.Ping", 2001)));
+    EXPECT_THAT(
+        CountChannelsTimestamp(logfiles_[3]),
+        UnorderedElementsAre(
+            std::make_tuple("/test", "aos.examples.Ping", 2001),
+            std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)));
+
+    // Timestamps from pi2 on pi1, and the other way.
+    EXPECT_THAT(CountChannelsData(logfiles_[4]), UnorderedElementsAre());
+    EXPECT_THAT(CountChannelsData(logfiles_[5]), UnorderedElementsAre());
+    EXPECT_THAT(CountChannelsData(logfiles_[6]), UnorderedElementsAre());
+    EXPECT_THAT(CountChannelsData(logfiles_[7]), UnorderedElementsAre());
+    EXPECT_THAT(
+        CountChannelsTimestamp(logfiles_[4]),
+        UnorderedElementsAre(
+            std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 10),
+            std::make_tuple("/test", "aos.examples.Ping", 101)));
+    EXPECT_THAT(
+        CountChannelsTimestamp(logfiles_[5]),
+        UnorderedElementsAre(
+            std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 190),
+            std::make_tuple("/test", "aos.examples.Ping", 1900)));
+    EXPECT_THAT(CountChannelsTimestamp(logfiles_[6]),
+                UnorderedElementsAre(std::make_tuple(
+                    "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+    EXPECT_THAT(CountChannelsTimestamp(logfiles_[7]),
+                UnorderedElementsAre(std::make_tuple(
+                    "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
+
+    // And then test that the remotely logged timestamp data files only have
+    // timestamps in them.
+    EXPECT_THAT(CountChannelsTimestamp(logfiles_[8]), UnorderedElementsAre());
+    EXPECT_THAT(CountChannelsTimestamp(logfiles_[9]), UnorderedElementsAre());
+    EXPECT_THAT(CountChannelsTimestamp(logfiles_[10]), UnorderedElementsAre());
+    EXPECT_THAT(CountChannelsTimestamp(logfiles_[11]), UnorderedElementsAre());
+
+    EXPECT_THAT(CountChannelsData(logfiles_[8]),
+                UnorderedElementsAre(std::make_tuple(
+                    "/pi1/aos", "aos.message_bridge.Timestamp", 10)));
+    EXPECT_THAT(CountChannelsData(logfiles_[9]),
+                UnorderedElementsAre(std::make_tuple(
+                    "/pi1/aos", "aos.message_bridge.Timestamp", 190)));
+
+    EXPECT_THAT(CountChannelsData(logfiles_[10]),
+                UnorderedElementsAre(std::make_tuple(
+                    "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+    EXPECT_THAT(CountChannelsData(logfiles_[11]),
+                UnorderedElementsAre(std::make_tuple(
+                    "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
   }
 
-  LogReader reader({std::vector<std::string>{logfiles_[0]},
-                    std::vector<std::string>{logfiles_[2]}});
+  LogReader reader(structured_logfiles_);
 
   SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -400,6 +536,13 @@
   const Node *pi2 =
       configuration::GetNode(log_reader_factory.configuration(), "pi2");
 
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+  LOG(INFO) << "now pi1 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+  LOG(INFO) << "now pi2 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
   EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
 
   reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
@@ -417,7 +560,7 @@
   // Confirm that the ping value matches.
   pi1_event_loop->MakeWatcher(
       "/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
-        VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
+        VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
                 << pi1_event_loop->context().monotonic_remote_time << " -> "
                 << pi1_event_loop->context().monotonic_event_time;
         EXPECT_EQ(ping.value(), pi1_ping_count + 1);
@@ -436,7 +579,7 @@
       });
   pi2_event_loop->MakeWatcher(
       "/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
-        VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
+        VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
                 << pi2_event_loop->context().monotonic_remote_time << " -> "
                 << pi2_event_loop->context().monotonic_event_time;
         EXPECT_EQ(ping.value(), pi2_ping_count + 1);
@@ -555,11 +698,8 @@
         }
       )");
 
-  EXPECT_DEATH(LogReader({std::vector<std::string>{logfiles_[0]},
-                          std::vector<std::string>{logfiles_[2]}},
-                         &extra_nodes_config.message()),
+  EXPECT_DEATH(LogReader(structured_logfiles_, &extra_nodes_config.message()),
                "Log file and replay config need to have matching nodes lists.");
-  ;
 }
 
 // Tests that we can read log files where they don't start at the same monotonic
@@ -580,8 +720,7 @@
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
-  LogReader reader({std::vector<std::string>{logfiles_[0]},
-                    std::vector<std::string>{logfiles_[2]}});
+  LogReader reader(structured_logfiles_);
 
   SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -721,21 +860,27 @@
     event_loop_factory_.RunFor(chrono::milliseconds(400));
   }
 
-  LogReader reader({std::vector<std::string>{logfiles_[0]},
-                    std::vector<std::string>{logfiles_[2]}});
+  LogReader reader(structured_logfiles_);
 
   SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
   log_reader_factory.set_send_delay(chrono::microseconds(0));
 
-  // This sends out the fetched messages and advances time to the start of the
-  // log file.
-  reader.Register(&log_reader_factory);
-
   const Node *pi1 =
       configuration::GetNode(log_reader_factory.configuration(), "pi1");
   const Node *pi2 =
       configuration::GetNode(log_reader_factory.configuration(), "pi2");
 
+  // This sends out the fetched messages and advances time to the start of the
+  // log file.
+  reader.Register(&log_reader_factory);
+
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+  LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+  LOG(INFO) << "now pi1 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+  LOG(INFO) << "now pi2 "
+            << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
   LOG(INFO) << "Done registering (pi1) "
             << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
             << " "
@@ -813,6 +958,31 @@
   reader.Deregister();
 }
 
+// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
+TEST_F(MultinodeLoggerTest, SortParts) {
+  // Make a bunch of parts.
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(2000));
+  }
+
+  const std::vector<std::vector<std::string>> sorted_parts =
+      SortParts(logfiles_);
+
+  // Test that each list of parts is in order.  Don't worry about the ordering
+  // between part file lists though.
+  // (inner vectors all need to be in order, but outer one doesn't matter).
+  EXPECT_THAT(sorted_parts,
+              ::testing::UnorderedElementsAreArray(structured_logfiles_));
+}
+
 // TODO(austin): We can write a test which recreates a logfile and confirms that
 // we get it back.  That is the ultimate test.
 
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
index a78b42c..6ac51e7 100644
--- a/aos/events/logging/multinode_pingpong.json
+++ b/aos/events/logging/multinode_pingpong.json
@@ -60,27 +60,43 @@
     {
       "name": "/pi1/aos",
       "type": "aos.message_bridge.Timestamp",
-      "logger": "NOT_LOGGED",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi2"],
       "source_node": "pi1",
       "destination_nodes": [
         {
           "name": "pi2",
-          "timestamp_logger": "NOT_LOGGED"
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi1"]
         }
       ]
     },
     {
       "name": "/pi2/aos",
       "type": "aos.message_bridge.Timestamp",
-      "logger": "NOT_LOGGED",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": ["pi1"],
       "source_node": "pi2",
       "destination_nodes": [
         {
           "name": "pi1",
-          "timestamp_logger": "NOT_LOGGED"
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi2"]
         }
       ]
     },
+    {
+      "name": "/pi1/aos/remote_timestamps/pi2",
+      "type": "aos.logger.MessageHeader",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos/remote_timestamps/pi1",
+      "type": "aos.logger.MessageHeader",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi2"
+    },
     /* Forwarded to pi2 */
     {
       "name": "/test",
diff --git a/aos/events/logging/uuid.cc b/aos/events/logging/uuid.cc
new file mode 100644
index 0000000..9298c8b
--- /dev/null
+++ b/aos/events/logging/uuid.cc
@@ -0,0 +1,60 @@
+#include "aos/events/logging/uuid.h"
+
+#include <array>
+#include <random>
+#include <string_view>
+
+namespace aos {
+namespace {
+char ToHex(int val) {
+  if (val < 10) {
+    return val + '0';
+  } else {
+    return val - 10 + 'a';
+  }
+}
+}  // namespace
+
+UUID UUID::Random() {
+  std::random_device rd;
+  std::mt19937 gen(rd());
+
+  std::uniform_int_distribution<> dis(0, 15);
+  std::uniform_int_distribution<> dis2(8, 11);
+
+  UUID result;
+
+  // UUID4 is implemented per https://www.cryptosys.net/pki/uuid-rfc4122.html
+  int i;
+  for (i = 0; i < 8; i++) {
+    result.data_[i] = ToHex(dis(gen));
+  }
+  result.data_[i] = '-';
+  ++i;
+  for (; i < 13; i++) {
+    result.data_[i] = ToHex(dis(gen));
+  }
+  result.data_[i] = '-';
+  ++i;
+  result.data_[i] = '4';
+  ++i;
+  for (; i < 18; i++) {
+    result.data_[i] = ToHex(dis(gen));
+  }
+  result.data_[i] = '-';
+  ++i;
+  result.data_[i] = ToHex(dis2(gen));
+  ++i;
+  for (; i < 23; i++) {
+    result.data_[i] = ToHex(dis(gen));
+  }
+  result.data_[i] = '-';
+  ++i;
+  for (; i < 36; i++) {
+    result.data_[i] = ToHex(dis(gen));
+  }
+
+  return result;
+}
+
+}  // namespace aos
diff --git a/aos/events/logging/uuid.h b/aos/events/logging/uuid.h
new file mode 100644
index 0000000..b81b811
--- /dev/null
+++ b/aos/events/logging/uuid.h
@@ -0,0 +1,36 @@
+#ifndef AOS_EVENTS_LOGGING_UUID_H_
+#define AOS_EVENTS_LOGGING_UUID_H_
+
+#include <array>
+#include <random>
+#include <string_view>
+
+namespace aos {
+
+// Class to generate and hold a UUID.
+class UUID {
+ public:
+  // Returns a randomly generated UUID.  This is known as a UUID4.
+  static UUID Random();
+
+  std::string_view string_view() const {
+    return std::string_view(data_.data(), data_.size());
+  }
+
+  bool operator==(const UUID &other) const {
+    return other.string_view() == string_view();
+  }
+  bool operator!=(const UUID &other) const {
+    return other.string_view() != string_view();
+  }
+
+ private:
+  UUID() {}
+
+  // Fixed size storage for the data.  Non-null terminated.
+  std::array<char, 36> data_;
+};
+
+}  // namespace aos
+
+#endif  // AOS_EVENTS_LOGGING_UUID_H_
diff --git a/aos/events/logging/uuid_test.cc b/aos/events/logging/uuid_test.cc
new file mode 100644
index 0000000..d0320de
--- /dev/null
+++ b/aos/events/logging/uuid_test.cc
@@ -0,0 +1,18 @@
+#include "aos/events/logging/uuid.h"
+
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace testing {
+
+// Tests that random UUIDs are actually random, and we can convert them to a
+// string.  Not very exhaustive, but it is a good smoke test.
+TEST(UUIDTest, GetOne) {
+  LOG(INFO) << UUID::Random().string_view();
+
+  EXPECT_NE(UUID::Random(), UUID::Random());
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index e56d331..970b7e3 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -35,7 +35,9 @@
         {
           "name": "pi2",
           "priority": 1,
-          "time_to_live": 5000000
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi1"]
         },
         {
           "name": "pi3",
@@ -55,7 +57,9 @@
         {
           "name": "pi1",
           "priority": 1,
-          "time_to_live": 5000000
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi2"]
         }
       ]
     },
@@ -111,6 +115,18 @@
       "frequency": 2
     },
     {
+      "name": "/pi1/aos/remote_timestamps/pi2",
+      "type": "aos.logger.MessageHeader",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/aos/remote_timestamps/pi1",
+      "type": "aos.logger.MessageHeader",
+      "logger": "NOT_LOGGED",
+      "source_node": "pi2"
+    },
+    {
       "name": "/pi1/aos",
       "type": "aos.timing.Report",
       "source_node": "pi1",
@@ -142,8 +158,9 @@
         {
           "name": "pi2",
           "priority": 1,
-          "timestamp_logger": "LOCAL_LOGGER",
-          "time_to_live": 5000000
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi1"]
         }
       ]
     },
@@ -155,8 +172,9 @@
         {
           "name": "pi1",
           "priority": 1,
-          "timestamp_logger": "LOCAL_LOGGER",
-          "time_to_live": 5000000
+          "time_to_live": 5000000,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi2"]
         }
       ]
     },
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 1bb0313..29f01a4 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -118,10 +118,10 @@
   return config;
 }
 
-class MMapedQueue {
+class MMappedQueue {
  public:
-  MMapedQueue(std::string_view shm_base, const Channel *channel,
-              std::chrono::seconds channel_storage_duration)
+  MMappedQueue(std::string_view shm_base, const Channel *channel,
+               std::chrono::seconds channel_storage_duration)
       : config_(MakeQueueConfiguration(channel, channel_storage_duration)) {
     std::string path = ShmPath(shm_base, channel);
 
@@ -172,7 +172,7 @@
     ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
   }
 
-  ~MMapedQueue() {
+  ~MMappedQueue() {
     PCHECK(munmap(data_, size_) == 0);
     PCHECK(munmap(const_cast<void *>(const_data_), size_) == 0);
   }
@@ -450,7 +450,7 @@
 
   aos::ShmEventLoop *event_loop_;
   const Channel *const channel_;
-  MMapedQueue lockless_queue_memory_;
+  MMappedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueueReader reader_;
   // This being nullopt indicates we're not looking for wakeups right now.
   std::optional<ipc_lib::LocklessQueueWatcher> watcher_;
@@ -567,7 +567,7 @@
   int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
 
  private:
-  MMapedQueue lockless_queue_memory_;
+  MMappedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueueSender lockless_queue_sender_;
   ipc_lib::LocklessQueueWakeUpper wake_upper_;
 };
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index b2e54bb..a7301bf 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -741,20 +741,27 @@
   DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
 
   msgs_.pop_front();
+  if (token_ != scheduler_->InvalidToken()) {
+    scheduler_->Deschedule(token_);
+    token_ = scheduler_->InvalidToken();
+  }
   if (msgs_.size() != 0) {
     event_.set_event_time(msgs_.front()->context.monotonic_event_time);
     simulated_event_loop_->AddEvent(&event_);
 
     DoSchedule(event_.event_time());
-  } else {
-    token_ = scheduler_->InvalidToken();
   }
 }
 
 void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
-  token_ =
-      scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
-                           [this]() { simulated_event_loop_->HandleEvent(); });
+  CHECK(token_ == scheduler_->InvalidToken())
+      << ": May not schedule multiple times";
+  token_ = scheduler_->Schedule(
+      event_time + simulated_event_loop_->send_delay(), [this]() {
+        DCHECK(token_ != scheduler_->InvalidToken());
+        token_ = scheduler_->InvalidToken();
+        simulated_event_loop_->HandleEvent();
+      });
 }
 
 void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -817,13 +824,11 @@
       simulated_event_loop_->monotonic_now();
   base_ = base;
   repeat_offset_ = repeat_offset;
-  if (base < monotonic_now) {
-    token_ = scheduler_->Schedule(
-        monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
-  } else {
-    token_ = scheduler_->Schedule(
-        base, [this]() { simulated_event_loop_->HandleEvent(); });
-  }
+  token_ = scheduler_->Schedule(std::max(base, monotonic_now), [this]() {
+    DCHECK(token_ != scheduler_->InvalidToken());
+    token_ = scheduler_->InvalidToken();
+    simulated_event_loop_->HandleEvent();
+  });
   event_.set_event_time(base_);
   simulated_event_loop_->AddEvent(&event_);
 }
@@ -835,15 +840,20 @@
   if (simulated_event_loop_->log_impl_ != nullptr) {
     logging::SetImplementation(simulated_event_loop_->log_impl_);
   }
+  if (token_ != scheduler_->InvalidToken()) {
+    scheduler_->Deschedule(token_);
+    token_ = scheduler_->InvalidToken();
+  }
   if (repeat_offset_ != ::aos::monotonic_clock::zero()) {
     // Reschedule.
     while (base_ <= monotonic_now) base_ += repeat_offset_;
-    token_ = scheduler_->Schedule(
-        base_, [this]() { simulated_event_loop_->HandleEvent(); });
+    token_ = scheduler_->Schedule(base_, [this]() {
+      DCHECK(token_ != scheduler_->InvalidToken());
+      token_ = scheduler_->InvalidToken();
+      simulated_event_loop_->HandleEvent();
+    });
     event_.set_event_time(base_);
     simulated_event_loop_->AddEvent(&event_);
-  } else {
-    token_ = scheduler_->InvalidToken();
   }
 
   Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
@@ -889,8 +899,15 @@
 
 void SimulatedPhasedLoopHandler::Schedule(
     monotonic_clock::time_point sleep_time) {
-  token_ = scheduler_->Schedule(
-      sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+  if (token_ != scheduler_->InvalidToken()) {
+    scheduler_->Deschedule(token_);
+    token_ = scheduler_->InvalidToken();
+  }
+  token_ = scheduler_->Schedule(sleep_time, [this]() {
+    DCHECK(token_ != scheduler_->InvalidToken());
+    token_ = scheduler_->InvalidToken();
+    simulated_event_loop_->HandleEvent();
+  });
   event_.set_event_time(sleep_time);
   simulated_event_loop_->AddEvent(&event_);
 }
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 44ba3e5..7de0540 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,7 @@
 #include <string_view>
 
 #include "aos/events/event_loop_param_test.h"
+#include "aos/events/logging/logger_generated.h"
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
 #include "aos/events/test_message_generated.h"
@@ -290,6 +291,11 @@
       simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
   MessageCounter<examples::Pong> pi2_pong_counter(
       pi2_pong_counter_event_loop.get(), "/test");
+  aos::Fetcher<message_bridge::Timestamp> pi1_on_pi2_timestamp_fetcher =
+      pi2_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
+          "/pi1/aos");
+  aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+      pi2_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
 
   std::unique_ptr<EventLoop> pi3_pong_counter_event_loop =
       simulated_event_loop_factory.MakeEventLoop("pi3_pong_counter", pi3);
@@ -298,6 +304,14 @@
       simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
   MessageCounter<examples::Pong> pi1_pong_counter(
       pi1_pong_counter_event_loop.get(), "/test");
+  aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+      pi1_pong_counter_event_loop->MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<message_bridge::Timestamp> pi1_on_pi1_timestamp_fetcher =
+      pi1_pong_counter_event_loop->MakeFetcher<message_bridge::Timestamp>(
+          "/aos");
+
+  std::unique_ptr<EventLoop> pi1_remote_timestamp =
+      simulated_event_loop_factory.MakeEventLoop("pi1_remote_timestamp", pi1);
 
   // Count timestamps.
   MessageCounter<message_bridge::Timestamp> pi1_on_pi1_timestamp_counter(
@@ -315,6 +329,12 @@
   MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
       pi3_pong_counter_event_loop.get(), "/pi3/aos");
 
+  // Count remote timestamps
+  MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+      pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
+  MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+      pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+
   // Wait to let timestamp estimation start up before looking for the results.
   simulated_event_loop_factory.RunFor(chrono::milliseconds(500));
 
@@ -429,6 +449,98 @@
         ++pi3_client_statistics_count;
       });
 
+  // Find the channel index for both the /pi1/aos Timestamp channel and Ping
+  // channel.
+  const size_t pi1_timestamp_channel =
+      configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
+                                  pi1_on_pi2_timestamp_fetcher.channel());
+  const size_t ping_timestamp_channel =
+      configuration::ChannelIndex(pi1_pong_counter_event_loop->configuration(),
+                                  ping_on_pi2_fetcher.channel());
+
+  for (const Channel *channel :
+       *pi1_pong_counter_event_loop->configuration()->channels()) {
+    VLOG(1) << "Channel "
+            << configuration::ChannelIndex(
+                   pi1_pong_counter_event_loop->configuration(), channel)
+            << " " << configuration::CleanedChannelToString(channel);
+  }
+
+  // For each remote timestamp we get back, confirm that it is either a ping
+  // message, or a timestamp we sent out.  Also confirm that the timestamps are
+  // correct.
+  pi1_remote_timestamp->MakeWatcher(
+      "/pi1/aos/remote_timestamps/pi2",
+      [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+       &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+       &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
+        VLOG(1) << aos::FlatbufferToJson(&header);
+
+        const aos::monotonic_clock::time_point header_monotonic_sent_time(
+            chrono::nanoseconds(header.monotonic_sent_time()));
+        const aos::realtime_clock::time_point header_realtime_sent_time(
+            chrono::nanoseconds(header.realtime_sent_time()));
+        const aos::monotonic_clock::time_point header_monotonic_remote_time(
+            chrono::nanoseconds(header.monotonic_remote_time()));
+        const aos::realtime_clock::time_point header_realtime_remote_time(
+            chrono::nanoseconds(header.realtime_remote_time()));
+
+        const Context *pi1_context = nullptr;
+        const Context *pi2_context = nullptr;
+
+        if (header.channel_index() == pi1_timestamp_channel) {
+          // Find the forwarded message.
+          while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+                 header_monotonic_sent_time) {
+            ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+          }
+
+          // And the source message.
+          while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+                 header_monotonic_remote_time) {
+            ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+          }
+
+          pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+          pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+        } else if (header.channel_index() == ping_timestamp_channel) {
+          // Find the forwarded message.
+          while (ping_on_pi2_fetcher.context().monotonic_event_time <
+                 header_monotonic_sent_time) {
+            ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+          }
+
+          // And the source message.
+          while (ping_on_pi1_fetcher.context().monotonic_event_time <
+                 header_monotonic_remote_time) {
+            ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+          }
+
+          pi1_context = &ping_on_pi1_fetcher.context();
+          pi2_context = &ping_on_pi2_fetcher.context();
+        } else {
+          LOG(FATAL) << "Unknown channel";
+        }
+
+        // Confirm the forwarded message has matching timestamps to the
+        // timestamps we got back.
+        EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi2_context->monotonic_event_time,
+                  header_monotonic_sent_time);
+        EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+        EXPECT_EQ(pi2_context->realtime_remote_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi2_context->monotonic_remote_time,
+                  header_monotonic_remote_time);
+
+        // Confirm the forwarded message also matches the source message.
+        EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi1_context->monotonic_event_time,
+                  header_monotonic_remote_time);
+        EXPECT_EQ(pi1_context->realtime_event_time,
+                  header_realtime_remote_time);
+      });
+
   simulated_event_loop_factory.RunFor(chrono::seconds(10) -
                                       chrono::milliseconds(500) +
                                       chrono::milliseconds(5));
@@ -451,6 +563,10 @@
   EXPECT_EQ(pi1_client_statistics_count, 95);
   EXPECT_EQ(pi2_client_statistics_count, 95);
   EXPECT_EQ(pi3_client_statistics_count, 95);
+
+  // Also confirm that remote timestamps are being forwarded correctly.
+  EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 1101);
+  EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1101);
 }
 
 // Tests that an offset between nodes can be recovered and shows up in
@@ -605,6 +721,12 @@
   MessageCounter<message_bridge::Timestamp> pi3_on_pi3_timestamp_counter(
       pi3_pong_counter_event_loop.get(), "/pi3/aos");
 
+  // Count remote timestamps
+  MessageCounter<logger::MessageHeader> remote_timestamps_pi2_on_pi1(
+      pi1_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi2");
+  MessageCounter<logger::MessageHeader> remote_timestamps_pi1_on_pi2(
+      pi2_pong_counter_event_loop.get(), "/aos/remote_timestamps/pi1");
+
   MessageCounter<message_bridge::ServerStatistics>
       pi1_server_statistics_counter(pi1_pong_counter_event_loop.get(),
                                     "/pi1/aos");
@@ -646,6 +768,10 @@
   EXPECT_EQ(pi1_client_statistics_counter.count(), 0u);
   EXPECT_EQ(pi2_client_statistics_counter.count(), 0u);
   EXPECT_EQ(pi3_client_statistics_counter.count(), 0u);
+
+  // Also confirm that remote timestamps are being forwarded correctly.
+  EXPECT_EQ(remote_timestamps_pi2_on_pi1.count(), 1001);
+  EXPECT_EQ(remote_timestamps_pi1_on_pi2.count(), 1001);
 }
 
 }  // namespace testing
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 0f6e8bc..825c830 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -19,7 +19,9 @@
                     std::unique_ptr<aos::RawFetcher> fetcher,
                     std::unique_ptr<aos::RawSender> sender,
                     ServerConnection *server_connection, int client_index,
-                    MessageBridgeClientStatus *client_status)
+                    MessageBridgeClientStatus *client_status,
+                    size_t channel_index,
+                    aos::Sender<logger::MessageHeader> *timestamp_logger)
       : fetch_node_factory_(fetch_node_factory),
         send_node_factory_(send_node_factory),
         send_event_loop_(send_event_loop),
@@ -28,7 +30,9 @@
         server_connection_(server_connection),
         client_status_(client_status),
         client_index_(client_index),
-        client_connection_(client_status_->GetClientConnection(client_index)) {
+        client_connection_(client_status_->GetClientConnection(client_index)),
+        channel_index_(channel_index),
+        timestamp_logger_(timestamp_logger) {
     timer_ = send_event_loop_->AddTimer([this]() { Send(); });
 
     Schedule();
@@ -101,6 +105,34 @@
 
     client_connection_->mutate_received_packets(
         client_connection_->received_packets() + 1);
+
+    if (timestamp_logger_) {
+      aos::Sender<logger::MessageHeader>::Builder builder =
+          timestamp_logger_->MakeBuilder();
+
+      logger::MessageHeader::Builder message_header_builder =
+          builder.MakeBuilder<logger::MessageHeader>();
+
+      message_header_builder.add_channel_index(channel_index_);
+
+      // Swap the remote and sent metrics.  They are from the sender's
+      // perspective, not the receiver's perspective.
+      message_header_builder.add_monotonic_remote_time(
+          fetcher_->context().monotonic_event_time.time_since_epoch().count());
+      message_header_builder.add_realtime_remote_time(
+          fetcher_->context().realtime_event_time.time_since_epoch().count());
+      message_header_builder.add_remote_queue_index(
+          fetcher_->context().queue_index);
+
+      message_header_builder.add_monotonic_sent_time(
+          sender_->monotonic_sent_time().time_since_epoch().count());
+      message_header_builder.add_realtime_sent_time(
+          sender_->realtime_sent_time().time_since_epoch().count());
+      message_header_builder.add_queue_index(sender_->sent_queue_index());
+
+      builder.Send(message_header_builder.Finish());
+    }
+
     sent_ = true;
     Schedule();
   }
@@ -136,6 +168,9 @@
   MessageBridgeClientStatus *client_status_ = nullptr;
   int client_index_;
   ClientConnection *client_connection_ = nullptr;
+
+  size_t channel_index_;
+  aos::Sender<logger::MessageHeader> *timestamp_logger_ = nullptr;
 };
 
 SimulatedMessageBridge::SimulatedMessageBridge(
@@ -208,6 +243,13 @@
           destination_event_loop->second.client_status.FindClientIndex(
               channel->source_node()->string_view());
 
+      const size_t destination_node_index = configuration::GetNodeIndex(
+          simulated_event_loop_factory->configuration(), destination_node);
+
+      const bool delivery_time_is_logged =
+          configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+              connection, source_event_loop->second.event_loop->node());
+
       delayers->emplace_back(std::make_unique<RawMessageDelayer>(
           simulated_event_loop_factory->GetNodeEventLoopFactory(node),
           simulated_event_loop_factory->GetNodeEventLoopFactory(
@@ -216,7 +258,13 @@
           source_event_loop->second.event_loop->MakeRawFetcher(channel),
           destination_event_loop->second.event_loop->MakeRawSender(channel),
           server_connection, client_index,
-          &destination_event_loop->second.client_status));
+          &destination_event_loop->second.client_status,
+          configuration::ChannelIndex(
+              source_event_loop->second.event_loop->configuration(), channel),
+          delivery_time_is_logged
+              ? &source_event_loop->second
+                     .timestamp_loggers[destination_node_index]
+              : nullptr));
     }
 
     const Channel *const timestamp_channel = configuration::GetChannel(
@@ -272,5 +320,46 @@
   }
 }
 
+SimulatedMessageBridge::State::State(
+    std::unique_ptr<aos::EventLoop> &&new_event_loop)
+    : event_loop(std::move(new_event_loop)),
+      server_status(event_loop.get()),
+      client_status(event_loop.get()) {
+  timestamp_loggers.resize(event_loop->configuration()->nodes()->size());
+
+  // Find all nodes which log timestamps back to us (from us).
+  for (const Channel *channel : *event_loop->configuration()->channels()) {
+    CHECK(channel->has_source_node());
+
+    // Sent by us.
+    if (configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
+        channel->has_destination_nodes()) {
+      for (const Connection *connection : *channel->destination_nodes()) {
+        const bool delivery_time_is_logged =
+            configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+                connection, event_loop->node());
+
+        // And the timestamps are then logged back by us again.
+        if (!delivery_time_is_logged) {
+          continue;
+        }
+
+        // (And only construct the sender if it hasn't been constructed)
+        const Node *other_node = configuration::GetNode(
+            event_loop->configuration(), connection->name()->string_view());
+        const size_t other_node_index = configuration::GetNodeIndex(
+            event_loop->configuration(), other_node);
+
+        if (!timestamp_loggers[other_node_index]) {
+          timestamp_loggers[other_node_index] =
+              event_loop->MakeSender<logger::MessageHeader>(
+                  absl::StrCat("/aos/remote_timestamps/",
+                               connection->name()->string_view()));
+        }
+      }
+    }
+  }
+}
+
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 5784bb3..2a4da63 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -2,6 +2,7 @@
 #define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
 
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/message_bridge_client_status.h"
 #include "aos/network/message_bridge_server_status.h"
@@ -32,16 +33,15 @@
 
  private:
   struct State {
-    State(std::unique_ptr<aos::EventLoop> &&new_event_loop)
-        : event_loop(std::move(new_event_loop)),
-          server_status(event_loop.get()),
-          client_status(event_loop.get()) {}
+    State(std::unique_ptr<aos::EventLoop> &&new_event_loop);
 
     State(const State &state) = delete;
 
     std::unique_ptr<aos::EventLoop> event_loop;
     MessageBridgeServerStatus server_status;
     MessageBridgeClientStatus client_status;
+
+    std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers;
   };
   // Map of nodes to event loops.  This is a member variable so that the
   // lifetime of the event loops matches the lifetime of the bridge.
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 392a7eb..bd095b8 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -328,6 +328,14 @@
 
   virtual ~SizePrefixedFlatbufferDetachedBuffer() override {}
 
+  static SizePrefixedFlatbufferDetachedBuffer<T> Empty() {
+    flatbuffers::FlatBufferBuilder fbb;
+    fbb.ForceDefaults(true);
+    const auto end = fbb.EndTable(fbb.StartTable());
+    fbb.FinishSizePrefixed(flatbuffers::Offset<flatbuffers::Table>(end));
+    return SizePrefixedFlatbufferDetachedBuffer<T>(fbb.Release());
+  }
+
   // Returns references to the buffer, and the data.
   const flatbuffers::DetachedBuffer &buffer() const { return buffer_; }
   const uint8_t *data() const override {
@@ -340,6 +348,13 @@
     return buffer_.size() - sizeof(flatbuffers::uoffset_t);
   }
 
+  absl::Span<uint8_t> full_span() {
+    return absl::Span<uint8_t>(buffer_.data(), buffer_.size());
+  }
+  absl::Span<const uint8_t> full_span() const {
+    return absl::Span<const uint8_t>(buffer_.data(), buffer_.size());
+  }
+
  private:
   flatbuffers::DetachedBuffer buffer_;
 };
diff --git a/aos/ipc_lib/signalfd.cc b/aos/ipc_lib/signalfd.cc
index 363bf4a..051a654 100644
--- a/aos/ipc_lib/signalfd.cc
+++ b/aos/ipc_lib/signalfd.cc
@@ -2,6 +2,9 @@
 
 #include <signal.h>
 #include <sys/types.h>
+#if __has_feature(memory_sanitizer)
+#include <sanitizer/msan_interface.h>
+#endif
 #include <unistd.h>
 #include <initializer_list>
 
@@ -9,6 +12,50 @@
 
 namespace aos {
 namespace ipc_lib {
+namespace {
+
+// Wrapper which propagates msan information.
+// TODO(Brian): Drop this once we have <https://reviews.llvm.org/D82411> to
+// intercept this function natively.
+int wrapped_sigandset(sigset_t *dest, const sigset_t *left,
+                      const sigset_t *right) {
+#if __has_feature(memory_sanitizer)
+  if (left) {
+    __msan_check_mem_is_initialized(left, sizeof(*left));
+  }
+  if (right) {
+    __msan_check_mem_is_initialized(right, sizeof(*right));
+  }
+#endif
+  const int r = sigandset(dest, left, right);
+#if __has_feature(memory_sanitizer)
+  if (!r && dest) {
+    __msan_unpoison(dest, sizeof(*dest));
+  }
+#endif
+  return r;
+}
+
+// Wrapper which propagates msan information.
+// TODO(Brian): Drop this once we have
+// <https://reviews.llvm.org/rG89ae290b58e20fc5f56b7bfae4b34e7fef06e1b1> to
+// intercept this function natively.
+int wrapped_pthread_sigmask(int how, const sigset_t *set, sigset_t *oldset) {
+#if __has_feature(memory_sanitizer)
+  if (set) {
+    __msan_check_mem_is_initialized(set, sizeof(*set));
+  }
+#endif
+  const int r = pthread_sigmask(how, set, oldset);
+#if __has_feature(memory_sanitizer)
+  if (!r && oldset) {
+    __msan_unpoison(oldset, sizeof(*oldset));
+  }
+#endif
+  return r;
+}
+
+}  // namespace
 
 SignalFd::SignalFd(::std::initializer_list<unsigned int> signals) {
   // Build up the mask with the provided signals.
@@ -23,7 +70,7 @@
   // signalfd gets them. Record which ones we actually blocked, so we can
   // unblock just those later.
   sigset_t old_mask;
-  CHECK_EQ(0, pthread_sigmask(SIG_BLOCK, &blocked_mask_, &old_mask));
+  CHECK_EQ(0, wrapped_pthread_sigmask(SIG_BLOCK, &blocked_mask_, &old_mask));
   for (int signal : signals) {
     if (sigismember(&old_mask, signal)) {
       CHECK_EQ(0, sigdelset(&blocked_mask_, signal));
@@ -35,9 +82,9 @@
   // Unwind the constructor. Unblock the signals and close the fd. Verify nobody
   // else unblocked the signals we're supposed to unblock in the meantime.
   sigset_t old_mask;
-  CHECK_EQ(0, pthread_sigmask(SIG_UNBLOCK, &blocked_mask_, &old_mask));
+  CHECK_EQ(0, wrapped_pthread_sigmask(SIG_UNBLOCK, &blocked_mask_, &old_mask));
   sigset_t unblocked_mask;
-  CHECK_EQ(0, sigandset(&unblocked_mask, &blocked_mask_, &old_mask));
+  CHECK_EQ(0, wrapped_sigandset(&unblocked_mask, &blocked_mask_, &old_mask));
   if (memcmp(&unblocked_mask, &blocked_mask_, sizeof(unblocked_mask)) != 0) {
     LOG(FATAL) << "Some other code unblocked one or more of our signals";
   }
diff --git a/aos/logging/context.cc b/aos/logging/context.cc
index 72c1970..84e4e80 100644
--- a/aos/logging/context.cc
+++ b/aos/logging/context.cc
@@ -4,6 +4,9 @@
 #define _GNU_SOURCE /* See feature_test_macros(7) */
 #endif
 
+#if __has_feature(memory_sanitizer)
+#include <sanitizer/msan_interface.h>
+#endif
 #include <string.h>
 #include <sys/prctl.h>
 #include <sys/types.h>
@@ -16,8 +19,8 @@
 extern char *program_invocation_name;
 extern char *program_invocation_short_name;
 
-#include "aos/die.h"
 #include "aos/complex_thread_local.h"
+#include "aos/die.h"
 
 namespace aos {
 namespace logging {
@@ -39,6 +42,10 @@
   if (prctl(PR_GET_NAME, thread_name_array) != 0) {
     PDie("prctl(PR_GET_NAME, %p) failed", thread_name_array);
   }
+#if __has_feature(memory_sanitizer)
+  // msan doesn't understand PR_GET_NAME, so help it along.
+  __msan_unpoison(thread_name_array, sizeof(thread_name_array));
+#endif
   thread_name_array[sizeof(thread_name_array) - 1] = '\0';
   ::std::string thread_name(thread_name_array);
 
@@ -67,8 +74,7 @@
 ::std::atomic<LogImplementation *> global_top_implementation(NULL);
 
 Context::Context()
-    : implementation(global_top_implementation.load()),
-      sequence(0) {
+    : implementation(global_top_implementation.load()), sequence(0) {
   cork_data.Reset();
 }
 
@@ -77,8 +83,7 @@
   if (my_context.created()) {
     ::std::string my_name = GetMyName();
     if (my_name.size() + 1 > sizeof(Context::name)) {
-      Die("logging: process/thread name '%s' is too long\n",
-          my_name.c_str());
+      Die("logging: process/thread name '%s' is too long\n", my_name.c_str());
     }
     strcpy(my_context->name, my_name.c_str());
     my_context->name_size = my_name.size();
@@ -98,9 +103,7 @@
   return my_context.get();
 }
 
-void Context::Delete() {
-  delete_current_context = true;
-}
+void Context::Delete() { delete_current_context = true; }
 
 }  // namespace internal
 }  // namespace logging
diff --git a/aos/logging/log_message.fbs b/aos/logging/log_message.fbs
index 789724f..7e246cf 100644
--- a/aos/logging/log_message.fbs
+++ b/aos/logging/log_message.fbs
@@ -18,7 +18,7 @@
   level:Level (id: 1);
 
   // Pid of the process creating the log message
-  source:int (id:2);
+  source_pid:int (id:2);
 
   // Application name
   name:string (id:3);
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 65aff91..98a271c 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -268,6 +268,7 @@
     name = "message_bridge_test_common_config",
     src = "message_bridge_test_common.json",
     flatbuffers = [
+        "//aos/events/logging:logger_fbs",
         "//aos/events:ping_fbs",
         "//aos/events:pong_fbs",
         "//aos/network:message_bridge_client_fbs",
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index b371181..00040e1 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -78,6 +78,8 @@
   logger::MessageHeader::Builder message_header_builder(fbb);
   message_header_builder.add_channel_index(0);
   message_header_builder.add_monotonic_sent_time(0);
+  message_header_builder.add_realtime_sent_time(0);
+  message_header_builder.add_queue_index(0);
   message_header_builder.add_monotonic_remote_time(0);
   message_header_builder.add_realtime_remote_time(0);
   message_header_builder.add_remote_queue_index(0);
@@ -250,6 +252,10 @@
         message_header->channel_index());
     message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
         message_header->monotonic_sent_time());
+    message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
+        message_header->realtime_sent_time());
+    message_reception_reply_.mutable_message()->mutate_queue_index(
+        message_header->queue_index());
 
     // And capture the relevant data needed to generate the forwarding
     // MessageHeader.
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 2832339..de836ae 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -1,5 +1,6 @@
 #include "aos/network/message_bridge_server_lib.h"
 
+#include "absl/strings/str_cat.h"
 #include "absl/types/span.h"
 #include "aos/events/logging/logger.h"
 #include "aos/events/logging/logger_generated.h"
@@ -85,14 +86,55 @@
   // and flushes.  Whee.
 }
 
-void ChannelState::HandleDelivery(sctp_assoc_t /*rcv_assoc_id*/,
-                                  uint16_t /*ssn*/,
+void ChannelState::HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/,
                                   absl::Span<const uint8_t> data) {
   const logger::MessageHeader *message_header =
       flatbuffers::GetRoot<logger::MessageHeader>(data.data());
+  for (Peer &peer : peers_) {
+    if (peer.sac_assoc_id == rcv_assoc_id) {
+      if (peer.timestamp_logger != nullptr) {
+        // TODO(austin): Need to implement reliable sending of the delivery
+        // timestamps.  Track what made it, and retry what didn't.
+        //
+        // This needs to be munged and cleaned up to match the timestamp
+        // standard.
+
+        aos::Sender<logger::MessageHeader>::Builder builder =
+            peer.timestamp_logger->MakeBuilder();
+
+        logger::MessageHeader::Builder message_header_builder =
+            builder.MakeBuilder<logger::MessageHeader>();
+
+        message_header_builder.add_channel_index(
+            message_header->channel_index());
+
+        message_header_builder.add_queue_index(
+            message_header->remote_queue_index());
+        message_header_builder.add_monotonic_sent_time(
+            message_header->monotonic_remote_time());
+        message_header_builder.add_realtime_sent_time(
+            message_header->realtime_remote_time());
+
+        // Swap the remote and sent metrics.  They are from the sender's
+        // perspective, not the receiver's perspective.
+        message_header_builder.add_monotonic_remote_time(
+            message_header->monotonic_sent_time());
+        message_header_builder.add_realtime_remote_time(
+            message_header->realtime_sent_time());
+        message_header_builder.add_remote_queue_index(
+            message_header->queue_index());
+
+        builder.Send(message_header_builder.Finish());
+      }
+      break;
+    }
+  }
+
   while (sent_messages_.size() > 0u) {
     if (sent_messages_.begin()->message().monotonic_sent_time() ==
-        message_header->monotonic_sent_time()) {
+            message_header->monotonic_sent_time() &&
+        sent_messages_.begin()->message().queue_index() ==
+            message_header->queue_index()) {
       sent_messages_.pop_front();
       continue;
     }
@@ -124,11 +166,12 @@
   // time out eventually.  Need to sort that out.
 }
 
-void ChannelState::AddPeer(const Connection *connection, int node_index,
-                           ServerConnection *server_connection_statistics,
-                           bool logged_remotely) {
+void ChannelState::AddPeer(
+    const Connection *connection, int node_index,
+    ServerConnection *server_connection_statistics, bool logged_remotely,
+    aos::Sender<logger::MessageHeader> *timestamp_logger) {
   peers_.emplace_back(connection, node_index, server_connection_statistics,
-                      logged_remotely);
+                      logged_remotely, timestamp_logger);
 }
 
 int ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
@@ -168,6 +211,7 @@
   CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
 
   int32_t max_size = 0;
+  timestamp_loggers_.resize(event_loop->configuration()->nodes()->size());
 
   // Seed up all the per-node connection state.
   // We are making the assumption here that every connection is bidirectional
@@ -217,13 +261,30 @@
       for (const Connection *connection : *channel->destination_nodes()) {
         const Node *other_node = configuration::GetNode(
             event_loop_->configuration(), connection->name()->string_view());
+        const size_t other_node_index = configuration::GetNodeIndex(
+            event_loop_->configuration(), other_node);
+
+        const bool delivery_time_is_logged =
+            configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+                connection, event_loop_->node());
+
+        // Conditionally create the timestamp logger if we are supposed to log
+        // timestamps from it.
+        if (delivery_time_is_logged && !timestamp_loggers_[other_node_index]) {
+          timestamp_loggers_[other_node_index] =
+              event_loop_->MakeSender<logger::MessageHeader>(
+                  absl::StrCat("/aos/remote_timestamps/",
+                               connection->name()->string_view()));
+        }
         state->AddPeer(
             connection,
             configuration::GetNodeIndex(event_loop_->configuration(),
                                         connection->name()->string_view()),
             server_status_.FindServerConnection(
                 connection->name()->string_view()),
-            configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
+            configuration::ChannelMessageIsLoggedOnNode(channel, other_node),
+            delivery_time_is_logged ? &timestamp_loggers_[other_node_index]
+                                    : nullptr);
       }
 
       // Don't subscribe to timestamps on the timestamp channel.  Those get
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index b6a4f05..ad384f4 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -32,10 +32,12 @@
   struct Peer {
     Peer(const Connection *new_connection, int new_node_index,
          ServerConnection *new_server_connection_statistics,
-         bool new_logged_remotely)
+         bool new_logged_remotely,
+         aos::Sender<logger::MessageHeader> *new_timestamp_logger)
         : connection(new_connection),
           node_index(new_node_index),
           server_connection_statistics(new_server_connection_statistics),
+          timestamp_logger(new_timestamp_logger),
           logged_remotely(new_logged_remotely) {}
 
     // Valid if != 0.
@@ -45,6 +47,7 @@
     const aos::Connection *connection;
     const int node_index;
     ServerConnection *server_connection_statistics;
+    aos::Sender<logger::MessageHeader> *timestamp_logger = nullptr;
 
     // If true, this message will be logged on a receiving node.  We need to
     // keep it around to log it locally if that fails.
@@ -60,7 +63,8 @@
   // Adds a new peer.
   void AddPeer(const Connection *connection, int node_index,
                ServerConnection *server_connection_statistics,
-               bool logged_remotely);
+               bool logged_remotely,
+               aos::Sender<logger::MessageHeader> *timestamp_logger);
 
   // Returns true if this channel has the same name and type as the other
   // channel.
@@ -111,6 +115,7 @@
   // Event loop to schedule everything on.
   aos::ShmEventLoop *event_loop_;
 
+  std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers_;
   SctpServer server_;
 
   MessageBridgeServerStatus server_status_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 8d52292..aa0a034 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -3,6 +3,7 @@
 #include <chrono>
 #include <thread>
 
+#include "absl/strings/str_cat.h"
 #include "aos/events/ping_generated.h"
 #include "aos/events/pong_generated.h"
 #include "aos/network/message_bridge_client_lib.h"
@@ -10,11 +11,22 @@
 #include "aos/network/team_number.h"
 
 namespace aos {
+void SetShmBase(const std::string_view base);
+
 namespace message_bridge {
 namespace testing {
 
 namespace chrono = std::chrono;
 
+void DoSetShmBase(const std::string_view node) {
+  const char *tmpdir_c_str = getenv("TEST_TMPDIR");
+  if (tmpdir_c_str != nullptr) {
+    aos::SetShmBase(absl::StrCat(tmpdir_c_str, "/", node));
+  } else {
+    aos::SetShmBase(absl::StrCat("/dev/shm/", node));
+  }
+}
+
 // Test that we can send a ping message over sctp and receive it.
 TEST(MessageBridgeTest, PingPong) {
   // This is rather annoying to set up.  We need to start up a client and
@@ -44,9 +56,11 @@
       aos::configuration::ReadConfig(
           "aos/network/message_bridge_test_client_config.json");
 
+  DoSetShmBase("pi1");
   FLAGS_application_name = "pi1_message_bridge_server";
   // Force ourselves to be "raspberrypi" and allocate everything.
   FLAGS_override_hostname = "raspberrypi";
+
   aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
   MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
 
@@ -60,9 +74,22 @@
   aos::Sender<examples::Ping> ping_sender =
       ping_event_loop.MakeSender<examples::Ping>("/test");
 
+  aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+  aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
+      pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
+          "/pi1/aos/remote_timestamps/pi2");
+
+  // Fetchers for confirming the remote timestamps made it.
+  aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+      ping_event_loop.MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
+      ping_event_loop.MakeFetcher<Timestamp>("/aos");
+
   // Now do it for "raspberrypi2", the client.
   FLAGS_application_name = "pi2_message_bridge_client";
   FLAGS_override_hostname = "raspberrypi2";
+  DoSetShmBase("pi2");
+
   aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
   MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
 
@@ -80,11 +107,24 @@
 
   aos::Fetcher<ClientStatistics> client_statistics_fetcher =
       test_event_loop.MakeFetcher<ClientStatistics>("/aos");
+  aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
+      test_event_loop.MakeFetcher<logger::MessageHeader>(
+          "/pi2/aos/remote_timestamps/pi1");
+
+  // Event loop for fetching data delivered to pi2 from pi1 to match up
+  // messages.
+  aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
+  aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
+      delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
+  aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+      delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
+  EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
+  EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
 
   // Count the pongs.
   int pong_count = 0;
   pong_event_loop.MakeWatcher(
-      "/test2", [&pong_count](const examples::Ping &ping) {
+      "/test", [&pong_count](const examples::Ping &ping) {
         ++pong_count;
         LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
       });
@@ -191,11 +231,11 @@
 
   ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
     EXPECT_TRUE(timestamp.has_offsets());
-    LOG(INFO) << FlatbufferToJson(&timestamp);
+    LOG(INFO) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
   });
   pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
     EXPECT_TRUE(timestamp.has_offsets());
-    LOG(INFO) << FlatbufferToJson(&timestamp);
+    LOG(INFO) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
   });
 
   // Run for 5 seconds to make sure we have time to estimate the offset.
@@ -206,6 +246,96 @@
     quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
   });
 
+  // Find the channel index for both the /pi1/aos Timestamp channel and Ping
+  // channel.
+  const size_t pi1_timestamp_channel = configuration::ChannelIndex(
+      pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
+  const size_t ping_timestamp_channel =
+      configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
+                                  ping_on_pi2_fetcher.channel());
+
+  for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
+    VLOG(1) << "Channel "
+            << configuration::ChannelIndex(ping_event_loop.configuration(),
+                                           channel)
+            << " " << configuration::CleanedChannelToString(channel);
+  }
+
+  // For each remote timestamp we get back, confirm that it is either a ping
+  // message, or a timestamp we sent out.  Also confirm that the timestamps are
+  // correct.
+  ping_event_loop.MakeWatcher(
+      "/pi1/aos/remote_timestamps/pi2",
+      [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+       &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+       &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
+        VLOG(1) << aos::FlatbufferToJson(&header);
+
+        const aos::monotonic_clock::time_point header_monotonic_sent_time(
+            chrono::nanoseconds(header.monotonic_sent_time()));
+        const aos::realtime_clock::time_point header_realtime_sent_time(
+            chrono::nanoseconds(header.realtime_sent_time()));
+        const aos::monotonic_clock::time_point header_monotonic_remote_time(
+            chrono::nanoseconds(header.monotonic_remote_time()));
+        const aos::realtime_clock::time_point header_realtime_remote_time(
+            chrono::nanoseconds(header.realtime_remote_time()));
+
+        const Context *pi1_context = nullptr;
+        const Context *pi2_context = nullptr;
+
+        if (header.channel_index() == pi1_timestamp_channel) {
+          // Find the forwarded message.
+          while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+                 header_monotonic_sent_time) {
+            ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+          }
+
+          // And the source message.
+          while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+                 header_monotonic_remote_time) {
+            ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+          }
+
+          pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+          pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+        } else if (header.channel_index() == ping_timestamp_channel) {
+          // Find the forwarded message.
+          while (ping_on_pi2_fetcher.context().monotonic_event_time <
+                 header_monotonic_sent_time) {
+            ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+          }
+
+          // And the source message.
+          while (ping_on_pi1_fetcher.context().monotonic_event_time <
+                 header_monotonic_remote_time) {
+            ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+          }
+
+          pi1_context = &ping_on_pi1_fetcher.context();
+          pi2_context = &ping_on_pi2_fetcher.context();
+        } else {
+          LOG(FATAL) << "Unknown channel";
+        }
+
+        // Confirm the forwarded message has matching timestamps to the
+        // timestamps we got back.
+        EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi2_context->monotonic_event_time,
+                  header_monotonic_sent_time);
+        EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+        EXPECT_EQ(pi2_context->realtime_remote_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi2_context->monotonic_remote_time,
+                  header_monotonic_remote_time);
+
+        // Confirm the forwarded message also matches the source message.
+        EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi1_context->monotonic_event_time,
+                  header_monotonic_remote_time);
+        EXPECT_EQ(pi1_context->realtime_event_time,
+                  header_realtime_remote_time);
+      });
+
   // Start everything up.  Pong is the only thing we don't know how to wait on,
   // so start it first.
   std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
@@ -261,6 +391,10 @@
   EXPECT_GE(pi2_server_statistics_count, 2);
   EXPECT_GE(pi1_client_statistics_count, 2);
   EXPECT_GE(pi2_client_statistics_count, 2);
+
+  // Confirm we got timestamps back!
+  EXPECT_TRUE(message_header_fetcher1.Fetch());
+  EXPECT_TRUE(message_header_fetcher2.Fetch());
 }
 
 // Test that the client disconnecting triggers the server offsets on both sides
@@ -290,6 +424,7 @@
   FLAGS_application_name = "pi1_message_bridge_server";
   // Force ourselves to be "raspberrypi" and allocate everything.
   FLAGS_override_hostname = "raspberrypi";
+  DoSetShmBase("pi1");
   aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
   MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
 
@@ -305,6 +440,7 @@
 
   // Now do it for "raspberrypi2", the client.
   FLAGS_override_hostname = "raspberrypi2";
+  DoSetShmBase("pi2");
   FLAGS_application_name = "pi2_message_bridge_server";
   aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
   MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
@@ -496,6 +632,7 @@
   FLAGS_application_name = "pi1_message_bridge_server";
   // Force ourselves to be "raspberrypi" and allocate everything.
   FLAGS_override_hostname = "raspberrypi";
+  DoSetShmBase("pi1");
   aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
   MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
 
@@ -513,6 +650,7 @@
 
   // Now do it for "raspberrypi2", the client.
   FLAGS_override_hostname = "raspberrypi2";
+  DoSetShmBase("pi2");
   FLAGS_application_name = "pi2_message_bridge_client";
   aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
   MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a8475b1..3ac2796 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -25,7 +25,9 @@
       "destination_nodes": [
         {
           "name": "pi2",
-          "priority": 1
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi1"]
         }
       ]
     },
@@ -38,25 +40,13 @@
       "destination_nodes": [
         {
           "name": "pi1",
-          "priority": 1
+          "priority": 1,
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi2"]
         }
       ]
     },
     {
-      "name": "/pi1_forwarded/aos",
-      "type": "aos.message_bridge.Timestamp",
-      "source_node": "pi2",
-      "frequency": 10,
-      "max_size": 200
-    },
-    {
-      "name": "/pi2_forwarded/aos",
-      "type": "aos.message_bridge.Timestamp",
-      "source_node": "pi1",
-      "frequency": 10,
-      "max_size": 200
-    },
-    {
       "name": "/pi1/aos",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "pi1",
@@ -81,6 +71,18 @@
       "frequency": 2
     },
     {
+      "name": "/pi1/aos/remote_timestamps/pi2",
+      "type": "aos.logger.MessageHeader",
+      "source_node": "pi1",
+      "frequency": 10
+    },
+    {
+      "name": "/pi2/aos/remote_timestamps/pi1",
+      "type": "aos.logger.MessageHeader",
+      "source_node": "pi2",
+      "frequency": 10
+    },
+    {
       "name": "/pi1/aos",
       "type": "aos.timing.Report",
       "source_node": "pi1",
@@ -110,11 +112,6 @@
       ]
     },
     {
-      "name": "/test2",
-      "type": "aos.examples.Ping",
-      "source_node": "pi2"
-    },
-    {
       "name": "/test",
       "type": "aos.examples.Pong",
       "source_node": "pi2",
@@ -148,34 +145,6 @@
       "rename": {
         "name": "/pi2/aos"
       }
-    },
-    {
-      "match": {
-        "name": "/test",
-        "type": "aos.examples.Ping",
-        "source_node": "pi2"
-      },
-      "rename": {
-        "name": "/test2"
-      }
-    },
-    {
-      "match": {
-        "name": "/pi1/aos*",
-        "source_node": "pi2"
-      },
-      "rename": {
-        "name": "/pi1_forwarded/aos"
-      }
-    },
-    {
-      "match": {
-        "name": "/pi2/aos*",
-        "source_node": "pi1"
-      },
-      "rename": {
-        "name": "/pi2_forwarded/aos"
-      }
     }
   ]
 }
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index ba8f9bd..116607c 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -49,7 +49,7 @@
   VLOG(1) << "  " << this << " Sample at " << monotonic_now << " is "
           << sample_ns.count() << "ns, Base is " << base_offset_.count();
   CHECK_GE(monotonic_now, last_time_)
-      << ": Being asked to filter backwards in time!";
+      << ": " << this << " Being asked to filter backwards in time!";
   // Compute the sample offset as a double (seconds), taking into account the
   // base offset.
   const double sample =
@@ -380,7 +380,7 @@
   const double hard_max = fwd_.offset();
   const double hard_min = -rev_.offset();
   const double average = (hard_max + hard_min) / 2.0;
-  VLOG(1) << "  Max(fwd) " << hard_max << " min(rev) " << hard_min;
+  VLOG(1) << this << "  Max(fwd) " << hard_max << " min(rev) " << hard_min;
   // We don't want to clip the offset to the hard min/max.  We really want to
   // keep it within a band around the middle.  ratio of 0.3 means stay within
   // +- 0.15 of the middle of the hard min and max.
@@ -390,7 +390,7 @@
 
   // Update regardless for the first sample from both the min and max.
   if (*last_time == aos::monotonic_clock::min_time) {
-    VLOG(1) << "  No last time " << average;
+    VLOG(1) << this << "  No last time " << average;
     offset_ = average;
     offset_velocity_ = 0.0;
   } else {
@@ -415,7 +415,7 @@
             (offset_velocity_ -
              (fwd_.filtered_velocity() - rev_.filtered_velocity()) / 2.0);
 
-    VLOG(1) << "  last time " << offset_;
+    VLOG(1) << this << "  last time " << offset_;
   }
   *last_time = monotonic_now;
 
@@ -424,14 +424,14 @@
     // reverse samples.
     if (!MissingSamples()) {
       *sample_pointer_ = offset_;
-      VLOG(1) << "Updating sample to " << offset_;
+      VLOG(1) << this << " Updating sample to " << offset_;
     } else {
-      VLOG(1) << "Don't have both samples.";
+      VLOG(1) << this << " Don't have both samples.";
       if (last_fwd_time_ == aos::monotonic_clock::min_time) {
-        VLOG(1) << " Missing forward";
+        VLOG(1) << this << " Missing forward";
       }
       if (last_rev_time_ == aos::monotonic_clock::min_time) {
-        VLOG(1) << " Missing reverse";
+        VLOG(1) << this << " Missing reverse";
       }
     }
   }
@@ -657,6 +657,8 @@
 void NoncausalOffsetEstimator::Sample(
     const Node *node, aos::monotonic_clock::time_point node_delivered_time,
     aos::monotonic_clock::time_point other_node_sent_time) {
+  VLOG(1) << "Sample delivered " << node_delivered_time << " sent "
+          << other_node_sent_time << " to " << node->name()->string_view();
   if (node == node_a_) {
     if (a_.Sample(node_delivered_time,
                   other_node_sent_time - node_delivered_time)) {
@@ -740,11 +742,16 @@
   }
   fit_ = AverageFits(a_.FitLine(), b_.FitLine());
   if (offset_pointer_) {
+    VLOG(1) << "Setting offset to " << fit_.mpq_offset();
     *offset_pointer_ = fit_.mpq_offset();
   }
   if (slope_pointer_) {
+    VLOG(1) << "Setting slope to " << fit_.mpq_slope();
     *slope_pointer_ = -fit_.mpq_slope();
   }
+  if (valid_pointer_) {
+    *valid_pointer_ = true;
+  }
 
   if (VLOG_IS_ON(1)) {
     LogFit("Refitting to");
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 6994a8a..10f436e 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -408,6 +408,9 @@
   void set_slope_pointer(mpq_class *slope_pointer) {
     slope_pointer_ = slope_pointer;
   }
+  void set_valid_pointer(bool *valid_pointer) {
+    valid_pointer_ = valid_pointer;
+  }
 
   // Returns the data points from each filter.
   const std::deque<
@@ -441,6 +444,7 @@
 
   mpq_class *offset_pointer_ = nullptr;
   mpq_class *slope_pointer_ = nullptr;
+  bool *valid_pointer_ = nullptr;
 
   Line fit_{std::chrono::nanoseconds(0), 0.0};
 
diff --git a/aos/realtime.cc b/aos/realtime.cc
index ff05f32..a0af97a 100644
--- a/aos/realtime.cc
+++ b/aos/realtime.cc
@@ -74,10 +74,12 @@
   WriteCoreDumps();
   PCHECK(mlockall(MCL_CURRENT | MCL_FUTURE) == 0);
 
+#if !__has_feature(address_sanitizer)
   // Don't give freed memory back to the OS.
   CHECK_EQ(1, mallopt(M_TRIM_THRESHOLD, -1));
   // Don't use mmap for large malloc chunks.
   CHECK_EQ(1, mallopt(M_MMAP_MAX, 0));
+#endif
 
   if (&FLAGS_tcmalloc_release_rate) {
     // Tell tcmalloc not to return memory.
diff --git a/aos/seasocks/gen_embedded.bzl b/aos/seasocks/gen_embedded.bzl
index 93cdc39..5bdb80d 100644
--- a/aos/seasocks/gen_embedded.bzl
+++ b/aos/seasocks/gen_embedded.bzl
@@ -16,7 +16,7 @@
     attrs = {
         "srcs": attr.label_list(
             mandatory = True,
-            non_empty = True,
+            allow_empty = False,
             allow_files = True,
         ),
         "_gen_embedded": attr.label(
diff --git a/debian/m4.BUILD b/debian/m4.BUILD
index 0abafda..50f83cd 100644
--- a/debian/m4.BUILD
+++ b/debian/m4.BUILD
@@ -3,3 +3,9 @@
     srcs = ["usr/bin/m4"],
     visibility = ["//visibility:public"],
 )
+
+filegroup(
+    name = "lib",
+    srcs = glob(["usr/lib/**"]),
+    visibility = ["//visibility:public"],
+)
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index 3c206b3..1f362f3 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -63,10 +63,8 @@
 
     if (!FLAGS_output_file.empty()) {
       unlink(FLAGS_output_file.c_str());
-      log_buffer_writer_ = std::make_unique<aos::logger::DetachedBufferWriter>(
-          FLAGS_output_file);
       logger_event_loop_ = MakeEventLoop("logger");
-      logger_ = std::make_unique<aos::logger::Logger>(log_buffer_writer_.get(),
+      logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
                                                       logger_event_loop_.get());
     }
 
diff --git a/third_party/gmp/BUILD b/third_party/gmp/BUILD
index e4f7029..06cfa51 100644
--- a/third_party/gmp/BUILD
+++ b/third_party/gmp/BUILD
@@ -4,6 +4,7 @@
     ":mpn.bzl",
     "architecture_includes",
     "config_include_from_architecture",
+    "current_directory",
     "file_from_architecture",
     "mparam_path",
     "mpn_cc_library",
@@ -66,10 +67,10 @@
     "-Wno-unused-parameter",
     "-Wno-missing-field-initializers",
     "-DHAVE_CONFIG_H",
-    "-Ithird_party/gmp",
-    "-I$(GENDIR)/third_party/gmp",
-    "-Ithird_party/gmp/mpn",
-    "-I$(GENDIR)/third_party/gmp/mpn",
+    "-I" + current_directory(),
+    "-I$(GENDIR)/" + current_directory(),
+    "-I" + current_directory() + "/mpn",
+    "-I$(GENDIR)/" + current_directory() + "/mpn",
     "-Wno-cast-align",
     "-Wno-dangling-else",
     "-Wno-cast-qual",
@@ -926,17 +927,20 @@
 
 genrule(
     name = "call_m4",
-    srcs = native.glob([
+    srcs = glob([
         "**/*.m4",
         "**/*.asm",
     ]) + ["config.m4"],
     outs = ["tests/call.s"],
-    cmd = "cd third_party/gmp/tests; ../../../$(location @m4_v1.4.18//:bin) -I ../../../$(GENDIR)/third_party/gmp/tests -DHAVE_CONFIG_H  -DPIC " + cpu_select({
+    cmd = "ROOT=$$(pwd); cd ./" + current_directory() + "/tests; LD_LIBRARY_PATH=$${ROOT}/external/m4_v1.4.18/usr/lib/x86_64-linux-gnu/ $${ROOT}/$(location @m4_v1.4.18//:bin) -I $${ROOT}/$(GENDIR)/" + current_directory() + "/tests -DHAVE_CONFIG_H -DPIC " + cpu_select({
               "arm": "arm32call.asm",
               "amd64": "amd64call.asm",
           }) +
-          " > ../../../$@",
-    tools = ["@m4_v1.4.18//:bin"],
+          " > $${ROOT}/$@",
+    tools = [
+        "@m4_v1.4.18//:bin",
+        "@m4_v1.4.18//:lib",
+    ],
 )
 
 cc_library(
diff --git a/third_party/gmp/mpn.bzl b/third_party/gmp/mpn.bzl
index 8f50773..039725f 100644
--- a/third_party/gmp/mpn.bzl
+++ b/third_party/gmp/mpn.bzl
@@ -48,6 +48,9 @@
     "sec_pi1_div_r": ["sec_pi1_div"],
 }
 
+def current_directory():
+    return native.package_name()
+
 def mpn_cc_library(
         name,
         srcs,
@@ -60,9 +63,9 @@
         hdrs = hdrs,
         copts = copts + [
             "-DHAVE_CONFIG_H",
-            "-Ithird_party/gmp/mpn",
-            "-Ithird_party/gmp",
-            "-I$(GENDIR)/third_party/gmp",
+            "-I" + current_directory() + "/mpn",
+            "-I" + current_directory(),
+            "-I$(GENDIR)/" + current_directory(),
             "-D__GMP_WITHIN_GMP",
             "-DOPERATION_" + name,
         ],
@@ -87,17 +90,19 @@
 
     out = ctx.actions.declare_file("mpn/" + ctx.attr.operation + ".s")
 
+    ruledir = ctx.label.workspace_root + "/" + ctx.label.package
     ctx.actions.run_shell(
         inputs = [ctx.files.files[0]] + ctx.files.deps,
         outputs = [out],
         progress_message = "Generating " + out.short_path,
-        tools = [ctx.executable._m4],
-        command = "&&".join([
-            "cd third_party/gmp/mpn",
-            "echo '#define OPERATION_" + ctx.attr.operation + " 1' > ../../../" + out.path,
-            "../../../" + ctx.executable._m4.path + " -I ../../../" + ctx.var["GENDIR"] + "/third_party/gmp/mpn/ " +
+        tools = [ctx.executable._m4] + ctx.attr._m4_lib.files.to_list(),
+        command = " && ".join([
+            "ROOT=$(pwd)",
+            "cd ./" + ruledir + "/mpn",
+            "echo '#define OPERATION_" + ctx.attr.operation + " 1' > ${ROOT}/" + out.path,
+            "LD_LIBRARY_PATH=${ROOT}/external/m4_v1.4.18/usr/lib/x86_64-linux-gnu/ ${ROOT}/" + ctx.executable._m4.path + " -I ${ROOT}/" + ctx.var["GENDIR"] + "/" + ruledir + "/mpn" +
             " -DHAVE_CONFIG_H -D__GMP_WITHIN_GMP -DOPERATION_" + ctx.attr.operation +
-            " -DPIC ../../../" + ctx.files.files[0].path + " >> ../../../" + out.path,
+            " -DPIC ${ROOT}/" + ctx.files.files[0].path + " >> ${ROOT}/" + out.path,
         ]),
     )
 
@@ -120,6 +125,10 @@
             cfg = "host",
             executable = True,
         ),
+        "_m4_lib": attr.label(
+            default = "@m4_v1.4.18//:lib",
+            cfg = "host",
+        ),
     },
     implementation = _m4_mpn_function_impl,
 )
@@ -140,7 +149,7 @@
 def architecture_includes(architecture_paths):
     result = dict()
     for key in architecture_paths:
-        result[key] = ["-Ithird_party/gmp/mpn/" + p for p in architecture_paths[key]]
+        result[key] = ["-I" + current_directory() + "/mpn/" + p for p in architecture_paths[key]]
     return select(result)
 
 def file_from_architecture(architecture_paths, f):
@@ -152,7 +161,7 @@
 def config_include_from_architecture(architecture_paths):
     result = dict()
     for key in architecture_paths:
-        result[key] = ["-Ithird_party/gmp/config/" + architecture_paths[key][0] + "/"]
+        result[key] = ["-I" + current_directory() + "/config/" + architecture_paths[key][0] + "/"]
     return select(result)
 
 def mpn_m4_cc_library(name, architecture_paths):
diff --git a/tools/build_rules/BUILD b/tools/build_rules/BUILD
index f96fb3a..8ad7018 100644
--- a/tools/build_rules/BUILD
+++ b/tools/build_rules/BUILD
@@ -3,3 +3,10 @@
     srcs = ["quiet_success.sh"],
     visibility = ["//visibility:public"],
 )
+
+py_binary(
+    name = "jinja2_generator",
+    srcs = ["jinja2_generator.py"],
+    visibility = ["//visibility:public"],
+    deps = ["@python_jinja2"],
+)
diff --git a/y2020/generate_pi_config.py b/tools/build_rules/jinja2_generator.py
similarity index 96%
rename from y2020/generate_pi_config.py
rename to tools/build_rules/jinja2_generator.py
index afd1fca..34f3354 100644
--- a/y2020/generate_pi_config.py
+++ b/tools/build_rules/jinja2_generator.py
@@ -1,7 +1,6 @@
 #!/usr/bin/python3
 
 import argparse
-from pathlib import Path
 import json
 import sys
 
diff --git a/tools/build_rules/template.bzl b/tools/build_rules/template.bzl
new file mode 100644
index 0000000..d4807e8
--- /dev/null
+++ b/tools/build_rules/template.bzl
@@ -0,0 +1,33 @@
+def _jinja2_template_impl(ctx):
+    out = ctx.actions.declare_file(ctx.attr.name)
+
+    ctx.actions.run_shell(
+        inputs = ctx.files.src,
+        tools = [ctx.executable._jinja2],
+        progress_message = "Generating " + out.short_path,
+        outputs = [out],
+        command = ctx.executable._jinja2.path + " " + ctx.files.src[0].path + " '" + str(ctx.attr.parameters) + "' " + out.path,
+    )
+
+    return [DefaultInfo(files = depset([out])), OutputGroupInfo(out = depset([out]))]
+
+jinja2_template = rule(
+    attrs = {
+        "src": attr.label(
+            mandatory = True,
+            allow_single_file = True,
+            doc = """The jinja2 template file to expand.""",
+        ),
+        "parameters": attr.string_dict(
+            mandatory = True,
+            doc = """The parameters to supply to Jinja2.""",
+        ),
+        "_jinja2": attr.label(
+            default = "//tools/build_rules:jinja2_generator",
+            cfg = "host",
+            executable = True,
+        ),
+    },
+    implementation = _jinja2_template_impl,
+    doc = """Expands a jinja2 template given parameters.""",
+)
diff --git a/y2019/control_loops/drivetrain/drivetrain_replay.cc b/y2019/control_loops/drivetrain/drivetrain_replay.cc
index 3689970..f7b69e4 100644
--- a/y2019/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2019/control_loops/drivetrain/drivetrain_replay.cc
@@ -15,7 +15,7 @@
               "Name of the logfile to read from.");
 DEFINE_string(config, "y2019/config.json",
               "Name of the config file to replay using.");
-DEFINE_string(output_file, "/tmp/replayed.bfbs",
+DEFINE_string(output_file, "/tmp/replayed",
               "Name of the logfile to write replayed data to.");
 DEFINE_int32(team, 971, "Team number to use for logfile replay.");
 int main(int argc, char **argv) {
@@ -37,13 +37,12 @@
                             "frc971.control_loops.drivetrain.Output");
   reader.Register();
 
-  aos::logger::DetachedBufferWriter file_writer(FLAGS_output_file);
   std::unique_ptr<aos::EventLoop> log_writer_event_loop =
       reader.event_loop_factory()->MakeEventLoop("log_writer");
   log_writer_event_loop->SkipTimingReport();
   log_writer_event_loop->SkipAosLog();
   CHECK(nullptr == log_writer_event_loop->node());
-  aos::logger::Logger writer(&file_writer, log_writer_event_loop.get());
+  aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
 
   std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
       reader.event_loop_factory()->MakeEventLoop("drivetrain");
diff --git a/y2020/BUILD b/y2020/BUILD
index 2d3f0e5..38832b8 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -2,7 +2,7 @@
 load("//aos:config.bzl", "aos_config")
 load("@com_google_protobuf//:protobuf.bzl", "cc_proto_library")
 load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
-load("//y2020:config_gen.bzl", "generate_pi_config")
+load("//tools/build_rules:template.bzl", "jinja2_template")
 
 robot_downloader(
     binaries = [
@@ -168,6 +168,7 @@
             "//y2020/vision/sift:sift_fbs",
             "//y2020/vision/sift:sift_training_fbs",
             "//y2020/vision:vision_fbs",
+            "//aos/events/logging:logger_fbs",
         ],
         visibility = ["//visibility:public"],
         deps = [
@@ -181,11 +182,30 @@
         "pi2",
         "pi3",
         "pi4",
-        "laptop",
     ]
 ]
 
 aos_config(
+    name = "config_laptop",
+    src = "y2020_laptop.json",
+    flatbuffers = [
+        "//aos/network:message_bridge_client_fbs",
+        "//aos/network:message_bridge_server_fbs",
+        "//aos/network:timestamp_fbs",
+        "//y2020/vision/sift:sift_fbs",
+        "//y2020/vision/sift:sift_training_fbs",
+        "//y2020/vision:vision_fbs",
+        "//aos/events/logging:logger_fbs",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        "//aos/events:config",
+        "//aos/robot_state:config",
+        "//frc971/control_loops/drivetrain:config",
+    ],
+)
+
+aos_config(
     name = "config_roborio",
     src = "y2020_roborio.json",
     flatbuffers = [
@@ -247,10 +267,11 @@
     ],
 )
 
-py_binary(
-    name = "generate_pi_config",
-    srcs = ["generate_pi_config.py"],
-    deps = ["@python_jinja2"],
-)
-
-[generate_pi_config(num) for num in range(1, 5)]
+[
+    jinja2_template(
+        name = "y2020_pi" + str(num) + ".json",
+        src = "y2020_pi_template.json",
+        parameters = {"NUM": str(num)},
+    )
+    for num in range(1, 5)
+]
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay.cc b/y2020/control_loops/drivetrain/drivetrain_replay.cc
index 5a97943..0dd5a1a 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay.cc
@@ -46,11 +46,10 @@
     node = aos::configuration::GetNode(reader.configuration(), "roborio");
   }
 
-  aos::logger::DetachedBufferWriter file_writer(FLAGS_output_file);
   std::unique_ptr<aos::EventLoop> log_writer_event_loop =
       reader.event_loop_factory()->MakeEventLoop("log_writer", node);
   log_writer_event_loop->SkipTimingReport();
-  aos::logger::Logger writer(&file_writer, log_writer_event_loop.get());
+  aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
 
   std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
       reader.event_loop_factory()->MakeEventLoop("drivetrain", node);
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index a7360c1..ae9f887 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -135,10 +135,8 @@
     set_battery_voltage(12.0);
 
     if (!FLAGS_output_file.empty()) {
-      log_buffer_writer_ = std::make_unique<aos::logger::DetachedBufferWriter>(
-          FLAGS_output_file);
       logger_event_loop_ = MakeEventLoop("logger", roborio_);
-      logger_ = std::make_unique<aos::logger::Logger>(log_buffer_writer_.get(),
+      logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
                                                       logger_event_loop_.get());
     }
 
@@ -399,7 +397,6 @@
   double turret_velocity_ = 0.0;  // rad / sec
 
   std::unique_ptr<aos::EventLoop> logger_event_loop_;
-  std::unique_ptr<aos::logger::DetachedBufferWriter> log_buffer_writer_;
   std::unique_ptr<aos::logger::Logger> logger_;
 };
 
diff --git a/y2020/control_loops/superstructure/superstructure_lib_test.cc b/y2020/control_loops/superstructure/superstructure_lib_test.cc
index 1d42d23..8a0589c 100644
--- a/y2020/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2020/control_loops/superstructure/superstructure_lib_test.cc
@@ -439,10 +439,8 @@
 
     if (!FLAGS_output_file.empty()) {
       unlink(FLAGS_output_file.c_str());
-      log_buffer_writer_ = std::make_unique<aos::logger::DetachedBufferWriter>(
-          FLAGS_output_file);
       logger_event_loop_ = MakeEventLoop("logger", roborio_);
-      logger_ = std::make_unique<aos::logger::Logger>(log_buffer_writer_.get(),
+      logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
                                                       logger_event_loop_.get());
     }
   }
@@ -550,7 +548,6 @@
   SuperstructureSimulation superstructure_plant_;
 
   std::unique_ptr<aos::EventLoop> logger_event_loop_;
-  std::unique_ptr<aos::logger::DetachedBufferWriter> log_buffer_writer_;
   std::unique_ptr<aos::logger::Logger> logger_;
 };
 
diff --git a/y2020/y2020_laptop.json b/y2020/y2020_laptop.json
index 44c4705..0797110 100644
--- a/y2020/y2020_laptop.json
+++ b/y2020/y2020_laptop.json
@@ -175,6 +175,51 @@
       ]
     },
     {
+      "name": "/laptop/aos/remote_timestamps/roborio",
+      "type": "aos.logger.MessageHeader",
+      "source_node": "laptop",
+      "logger": "NOT_LOGGED",
+      "frequency": 20,
+      "num_senders": 2,
+      "max_size": 300
+    },
+    {
+      "name": "/laptop/aos/remote_timestamps/pi1",
+      "type": "aos.logger.MessageHeader",
+      "source_node": "laptop",
+      "logger": "NOT_LOGGED",
+      "frequency": 20,
+      "num_senders": 2,
+      "max_size": 300
+    },
+    {
+      "name": "/laptop/aos/remote_timestamps/pi2",
+      "type": "aos.logger.MessageHeader",
+      "source_node": "laptop",
+      "logger": "NOT_LOGGED",
+      "frequency": 20,
+      "num_senders": 2,
+      "max_size": 300
+    },
+    {
+      "name": "/laptop/aos/remote_timestamps/pi3",
+      "type": "aos.logger.MessageHeader",
+      "source_node": "laptop",
+      "logger": "NOT_LOGGED",
+      "frequency": 20,
+      "num_senders": 2,
+      "max_size": 300
+    },
+    {
+      "name": "/laptop/aos/remote_timestamps/pi4",
+      "type": "aos.logger.MessageHeader",
+      "source_node": "laptop",
+      "logger": "NOT_LOGGED",
+      "frequency": 20,
+      "num_senders": 2,
+      "max_size": 300
+    },
+    {
       "name": "/pi1/camera",
       "type": "frc971.vision.CameraImage",
       "source_node": "pi1",