Sort messages between nodes properly

We used to assume the realtime clocks were in sync.  This isn't
realistic.  Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.

Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled.  They can't be converted to the
distributed_clock once.  They need to now be converted every time they
are compared between nodes.

Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/configuration.cc b/aos/configuration.cc
index c4f139c..bdbb463 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -653,9 +653,8 @@
   }
 }
 
-int GetNodeIndex(const Configuration *config, const Node *node) {
-  CHECK(config->has_nodes())
-      << ": Asking for a node from a single node configuration.";
+namespace {
+int GetNodeIndexFromConfig(const Configuration *config, const Node *node) {
   int node_index = 0;
   for (const Node *iterated_node : *config->nodes()) {
     if (iterated_node == node) {
@@ -663,12 +662,39 @@
     }
     ++node_index;
   }
-  LOG(FATAL) << "Node not found in the configuration.";
+  return -1;
+}
+}  // namespace
+
+int GetNodeIndex(const Configuration *config, const Node *node) {
+  if (!MultiNode(config)) {
+    return 0;
+  }
+
+  {
+    int node_index = GetNodeIndexFromConfig(config, node);
+    if (node_index != -1) {
+      return node_index;
+    }
+  }
+
+  const Node *result = GetNode(config, node);
+  CHECK(result != nullptr);
+
+  {
+    int node_index = GetNodeIndexFromConfig(config, node);
+    if (node_index != -1) {
+      return node_index;
+    }
+  }
+
+  LOG(FATAL) << "Node " << FlatbufferToJson(node)
+             << " not found in the configuration.";
 }
 
 std::vector<const Node *> GetNodes(const Configuration *config) {
   std::vector<const Node *> nodes;
-  if (configuration::MultiNode(config)) {
+  if (MultiNode(config)) {
     for (const Node *node : *config->nodes()) {
       nodes.emplace_back(node);
     }
diff --git a/aos/configuration.h b/aos/configuration.h
index a9fbfe3..c756215 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -80,7 +80,8 @@
 // in a single node world.)
 std::vector<const Node *> GetNodes(const Configuration *config);
 
-// Returns the node index for a node.  Note: node needs to exist inside config.
+// Returns the node index for a node.  Note: will be faster if node is a pointer
+// to a node in config, but is not required.
 int GetNodeIndex(const Configuration *config, const Node *node);
 
 // Returns true if we are running in a multinode configuration.
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 26752b1..a9ae7c0 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -9,7 +9,7 @@
 namespace aos {
 
 EventScheduler::Token EventScheduler::Schedule(
-    distributed_clock::time_point time, ::std::function<void()> callback) {
+    monotonic_clock::time_point time, ::std::function<void()> callback) {
   return events_list_.emplace(time, callback);
 }
 
@@ -17,43 +17,29 @@
   events_list_.erase(token);
 }
 
-void EventScheduler::RunFor(distributed_clock::duration duration) {
-  const distributed_clock::time_point end_time =
-      distributed_now() + duration;
-  logging::ScopedLogRestorer prev_logger;
-  is_running_ = true;
-  for (std::function<void()> &on_run : on_run_) {
-    on_run();
+aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
+  if (events_list_.empty()) {
+    return monotonic_clock::max_time;
   }
-  on_run_.clear();
-  while (!events_list_.empty() && is_running_) {
-    auto iter = events_list_.begin();
-    distributed_clock::time_point next_time = iter->first;
-    if (next_time > end_time) {
-      break;
-    }
-    now_ = iter->first;
-    ::std::function<void()> callback = ::std::move(iter->second);
-    events_list_.erase(iter);
-    callback();
-  }
-  now_ = end_time;
+
+  return events_list_.begin()->first;
 }
 
-void EventScheduler::Run() {
-  logging::ScopedLogRestorer prev_logger;
-  is_running_ = true;
+void EventScheduler::CallOldestEvent() {
+  CHECK_GT(events_list_.size(), 0u);
+  auto iter = events_list_.begin();
+  now_ = iter->first;
+
+  ::std::function<void()> callback = ::std::move(iter->second);
+  events_list_.erase(iter);
+  callback();
+}
+
+void EventScheduler::RunOnRun() {
   for (std::function<void()> &on_run : on_run_) {
     on_run();
   }
   on_run_.clear();
-  while (!events_list_.empty() && is_running_) {
-    auto iter = events_list_.begin();
-    now_ = iter->first;
-    ::std::function<void()> callback = ::std::move(iter->second);
-    events_list_.erase(iter);
-    callback();
-  }
 }
 
 std::ostream &operator<<(std::ostream &stream,
@@ -63,4 +49,92 @@
   return stream;
 }
 
+void EventSchedulerScheduler::AddEventScheduler(EventScheduler *scheduler) {
+  CHECK(std::find(schedulers_.begin(), schedulers_.end(), scheduler) ==
+        schedulers_.end());
+  CHECK(scheduler->scheduler_scheduler_ == nullptr);
+
+  schedulers_.emplace_back(scheduler);
+  scheduler->scheduler_scheduler_ = this;
+}
+
+void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
+  distributed_clock::time_point end_time = now_ + duration;
+  logging::ScopedLogRestorer prev_logger;
+  RunOnRun();
+
+  // Run all the sub-event-schedulers.
+  while (is_running_) {
+    std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
+        OldestEvent();
+    // No events left, bail.
+    if (std::get<0>(oldest_event) == distributed_clock::max_time ||
+        std::get<0>(oldest_event) > end_time) {
+      is_running_ = false;
+      break;
+    }
+
+    // We get to pick our tradeoffs here.  Either we assume that there are no
+    // backward step changes in our time function for each node, or we have to
+    // let time go backwards.  This backwards time jump should be small, so we
+    // can check for it and bound it.
+    CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+        << ": Simulated time went backwards by too much.  Please investigate.";
+    now_ = std::get<0>(oldest_event);
+
+    std::get<1>(oldest_event)->CallOldestEvent();
+  }
+
+  now_ = end_time;
+}
+
+void EventSchedulerScheduler::Run() {
+  logging::ScopedLogRestorer prev_logger;
+  RunOnRun();
+  // Run all the sub-event-schedulers.
+  while (is_running_) {
+    std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
+        OldestEvent();
+    // No events left, bail.
+    if (std::get<0>(oldest_event) == distributed_clock::max_time) {
+      break;
+    }
+
+    // We get to pick our tradeoffs here.  Either we assume that there are no
+    // backward step changes in our time function for each node, or we have to
+    // let time go backwards.  This backwards time jump should be small, so we
+    // can check for it and bound it.
+    CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+        << ": Simulated time went backwards by too much.  Please investigate.";
+    now_ = std::get<0>(oldest_event);
+
+    std::get<1>(oldest_event)->CallOldestEvent();
+  }
+
+  is_running_ = false;
+}
+
+std::tuple<distributed_clock::time_point, EventScheduler *>
+EventSchedulerScheduler::OldestEvent() {
+  distributed_clock::time_point min_event_time = distributed_clock::max_time;
+  EventScheduler *min_scheduler = nullptr;
+
+  // TODO(austin): Don't linearly search...  But for N=3, it is probably the
+  // fastest way to do this.
+  for (EventScheduler *scheduler : schedulers_) {
+    const monotonic_clock::time_point monotonic_event_time =
+        scheduler->OldestEvent();
+    if (monotonic_event_time != monotonic_clock::max_time) {
+      const distributed_clock::time_point event_time =
+          scheduler->ToDistributedClock(monotonic_event_time);
+      if (event_time < min_event_time) {
+        min_event_time = event_time;
+        min_scheduler = scheduler;
+      }
+    }
+  }
+
+  return std::make_tuple(min_event_time, min_scheduler);
+}
+
 }  // namespace aos
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 400a307..e6c732b 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -9,6 +9,7 @@
 #include <vector>
 
 #include "aos/events/event_loop.h"
+#include "aos/logging/implementations.h"
 #include "aos/time/time.h"
 #include "glog/logging.h"
 
@@ -42,15 +43,17 @@
 std::ostream &operator<<(std::ostream &stream,
                          const aos::distributed_clock::time_point &now);
 
+class EventSchedulerScheduler;
+
 class EventScheduler {
  public:
   using ChannelType =
-      std::multimap<distributed_clock::time_point, std::function<void()>>;
+      std::multimap<monotonic_clock::time_point, std::function<void()>>;
   using Token = ChannelType::iterator;
 
   // Schedule an event with a callback function
   // Returns an iterator to the event
-  Token Schedule(distributed_clock::time_point time,
+  Token Schedule(monotonic_clock::time_point time,
                  std::function<void()> callback);
 
   // Schedules a callback when the event scheduler starts.
@@ -63,29 +66,132 @@
   // Deschedule an event by its iterator
   void Deschedule(Token token);
 
-  // Runs until exited.
-  void Run();
-  // Runs for a duration.
-  void RunFor(distributed_clock::duration duration);
+  // Runs the OnRun callbacks.
+  void RunOnRun();
 
-  void Exit() { is_running_ = false; }
+  // Returns true if events are being handled.
+  inline bool is_running() const;
 
-  bool is_running() const { return is_running_; }
+  // Returns the timestamp of the next event to trigger.
+  aos::monotonic_clock::time_point OldestEvent();
+  // Handles the next event.
+  void CallOldestEvent();
 
-  distributed_clock::time_point distributed_now() const { return now_; }
+  // Converts a time to the distributed clock for scheduling and cross-node time
+  // measurement.
+  distributed_clock::time_point ToDistributedClock(
+      monotonic_clock::time_point time) const {
+    return distributed_clock::epoch() + time.time_since_epoch() +
+           monotonic_offset_;
+  }
+
+  // Takes the distributed time and converts it to the monotonic clock for this
+  // node.
+  monotonic_clock::time_point FromDistributedClock(
+      distributed_clock::time_point time) const {
+    return monotonic_clock::epoch() + time.time_since_epoch() -
+           monotonic_offset_;
+  }
+
+  // Returns the current monotonic time on this node calculated from the
+  // distributed clock.
+  inline monotonic_clock::time_point monotonic_now() const;
+
+  // Sets the offset between the distributed and monotonic clock.
+  //   distributed = monotonic + offset;
+  void SetDistributedOffset(std::chrono::nanoseconds monotonic_offset) {
+    monotonic_offset_ = monotonic_offset;
+  }
+
+  // Returns the offset used to convert to and from the distributed clock.
+  std::chrono::nanoseconds monotonic_offset() const {
+    return monotonic_offset_;
+  }
 
  private:
+  friend class EventSchedulerScheduler;
   // Current execution time.
-  distributed_clock::time_point now_ = distributed_clock::epoch();
+  monotonic_clock::time_point now_ = monotonic_clock::epoch();
 
+  // Offset to the distributed clock.
+  //   distributed = monotonic + offset;
+  std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
+
+  // List of functions to run (once) when running.
   std::vector<std::function<void()>> on_run_;
 
   // Multimap holding times to run functions.  These are stored in order, and
   // the order is the callback tree.
   ChannelType events_list_;
-  bool is_running_ = false;
+
+  // Pointer to the actual scheduler.
+  EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
 };
 
+// We need a heap of heaps...
+//
+// Events in a node have a very well defined progression of time.  It is linear
+// and well represented by the monotonic clock.
+//
+// Events across nodes don't follow this well.  Time skews between the two nodes
+// all the time.  We also don't know the function ahead of time which converts
+// from each node's monotonic clock to the distributed clock (our unified base
+// time which is likely the average time between nodes).
+//
+// This pushes us towards merge sort.  Sorting each node's events with a heap
+// like we used to be doing, and then sorting each of those nodes independently.
+class EventSchedulerScheduler {
+ public:
+  // Adds an event scheduler to the list.
+  void AddEventScheduler(EventScheduler *scheduler);
+
+  // Runs until there are no more events or Exit is called.
+  void Run();
+
+  // Stops running.
+  void Exit() { is_running_ = false; }
+
+  bool is_running() const { return is_running_; }
+
+  // Runs for a duration on the distributed clock.  Time on the distributed
+  // clock should be very representative of time on each node, but won't be
+  // exactly the same.
+  void RunFor(distributed_clock::duration duration);
+
+  // Returns the current distributed time.
+  distributed_clock::time_point distributed_now() const { return now_; }
+
+ private:
+  // Handles running the OnRun functions.
+  void RunOnRun() {
+    CHECK(!is_running_);
+    is_running_ = true;
+    for (EventScheduler *scheduler : schedulers_) {
+      scheduler->RunOnRun();
+    }
+  }
+
+  // Returns the next event time and scheduler on which to run it.
+  std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
+
+  // True if we are running.
+  bool is_running_ = false;
+  // The current time.
+  distributed_clock::time_point now_ = distributed_clock::epoch();
+  // List of schedulers to run in sync.
+  std::vector<EventScheduler *> schedulers_;
+};
+
+inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
+  // Make sure we stay in sync.
+  CHECK_EQ(now_, FromDistributedClock(scheduler_scheduler_->distributed_now()));
+  return now_;
+}
+
+inline bool EventScheduler::is_running() const {
+  return scheduler_scheduler_->is_running();
+}
+
 }  // namespace aos
 
 #endif  // AOS_EVENTS_EVENT_SCHEDULER_H_
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 2b3df8f..2e63f45 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -16,6 +16,7 @@
     srcs = [
         "logfile_utils.cc",
         "logger.cc",
+        "logger_math.cc",
     ],
     hdrs = [
         "logfile_utils.h",
@@ -28,10 +29,12 @@
         "//aos/events:event_loop",
         "//aos/events:simulated_event_loop",
         "//aos/network:team_number",
+        "//aos/network:timestamp_filter",
         "//aos/time",
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_google_absl//absl/container:inlined_vector",
         "@com_google_absl//absl/strings",
+        "@org_tuxfamily_eigen//:eigen",
     ],
 )
 
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 03b89d1..44381f3 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -291,7 +291,7 @@
       chrono::nanoseconds(result.message().monotonic_sent_time()));
 
   newest_timestamp_ = std::max(newest_timestamp_, timestamp);
-  VLOG(1) << "Read from " << filename() << " data " << FlatbufferToJson(result);
+  VLOG(2) << "Read from " << filename() << " data " << FlatbufferToJson(result);
   return std::move(result);
 }
 
@@ -435,8 +435,14 @@
       }
     } else {
       if (!NextLogFile()) {
-        VLOG(1) << "End of log file.";
+        VLOG(1) << "End of log file " << filenames_.back();
         at_end_ = true;
+        for (MessageHeaderQueue *queue : channels_to_write_) {
+          if (queue == nullptr || queue->timestamp_merger == nullptr) {
+            continue;
+          }
+          queue->timestamp_merger->NoticeAtEnd();
+        }
         return false;
       }
     }
@@ -800,15 +806,40 @@
   return oldest_timestamp;
 }
 
+TimestampMerger::DeliveryTimestamp TimestampMerger::OldestTimestamp() const {
+  if (!has_timestamps_ || timestamp_heap_.size() == 0u) {
+    return TimestampMerger::DeliveryTimestamp{};
+  }
+
+  std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
+      oldest_timestamp_reader = timestamp_heap_.front();
+
+  std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
+      oldest_timestamp = std::get<2>(oldest_timestamp_reader)
+                             ->oldest_message(channel_index_, node_index_);
+
+  TimestampMerger::DeliveryTimestamp timestamp;
+  timestamp.monotonic_event_time =
+      monotonic_clock::time_point(chrono::nanoseconds(
+          std::get<2>(oldest_timestamp)->monotonic_sent_time()));
+  timestamp.realtime_event_time = realtime_clock::time_point(
+      chrono::nanoseconds(std::get<2>(oldest_timestamp)->realtime_sent_time()));
+
+  timestamp.monotonic_remote_time =
+      monotonic_clock::time_point(chrono::nanoseconds(
+          std::get<2>(oldest_timestamp)->monotonic_remote_time()));
+  timestamp.realtime_remote_time =
+      realtime_clock::time_point(chrono::nanoseconds(
+          std::get<2>(oldest_timestamp)->realtime_remote_time()));
+
+  timestamp.remote_queue_index = std::get<2>(oldest_timestamp)->queue_index();
+  return timestamp;
+}
+
 std::tuple<TimestampMerger::DeliveryTimestamp, FlatbufferVector<MessageHeader>>
 TimestampMerger::PopOldest() {
   if (has_timestamps_) {
-    CHECK_GT(message_heap_.size(), 0u)
-        << ": Missing data from source node, no data available to match "
-           "timestamp on "
-        << configuration::CleanedChannelToString(
-               configuration_->channels()->Get(channel_index_));
-
+    // Read the timestamps.
     std::tuple<monotonic_clock::time_point, uint32_t,
                FlatbufferVector<MessageHeader>>
         oldest_timestamp = PopTimestampHeap();
@@ -830,6 +861,15 @@
         chrono::nanoseconds(
             std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
 
+    // See if we have any data.  If not, pass the problem up the chain.
+    if (message_heap_.size() == 0u) {
+      VLOG(1) << "No data to match timestamp on "
+              << configuration::CleanedChannelToString(
+                     configuration_->channels()->Get(channel_index_));
+      return std::make_tuple(timestamp,
+                             std::move(std::get<2>(oldest_timestamp)));
+    }
+
     while (true) {
       {
         // Ok, now try grabbing data until we find one which matches.
@@ -842,16 +882,16 @@
             std::get<2>(oldest_message_ref)->monotonic_sent_time()));
 
         if (remote_monotonic_time < remote_timestamp_monotonic_time) {
-          LOG(INFO) << "Undelivered message, skipping.  Remote time is "
-                    << remote_monotonic_time << " timestamp is "
-                    << remote_timestamp_monotonic_time << " on channel "
-                    << channel_index_;
+          VLOG(1) << "Undelivered message, skipping.  Remote time is "
+                  << remote_monotonic_time << " timestamp is "
+                  << remote_timestamp_monotonic_time << " on channel "
+                  << channel_index_;
           PopMessageHeap();
           continue;
         } else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
-          LOG(INFO) << "Data not found.  Remote time should be "
-                    << remote_timestamp_monotonic_time << " on channel "
-                    << channel_index_;
+          VLOG(1) << "Data not found.  Remote time should be "
+                  << remote_timestamp_monotonic_time << " on channel "
+                  << channel_index_;
           return std::make_tuple(timestamp,
                                  std::move(std::get<2>(oldest_timestamp)));
         }
@@ -902,6 +942,8 @@
   }
 }
 
+void TimestampMerger::NoticeAtEnd() { channel_merger_->NoticeAtEnd(); }
+
 namespace {
 std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
     const std::vector<std::vector<std::string>> &filenames) {
@@ -997,6 +1039,18 @@
   return channel_heap_.front().first;
 }
 
+TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
+  if (timestamp_heap_.size() == 0u) {
+    return TimestampMerger::DeliveryTimestamp{};
+  }
+  return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
+}
+
+TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestampForChannel(
+    int channel) const {
+  return timestamp_mergers_[channel].OldestTimestamp();
+}
+
 void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
                                     int channel_index) {
   // Pop and recreate the heap if it has already been pushed.  And since we are
@@ -1009,6 +1063,16 @@
         }));
     std::make_heap(channel_heap_.begin(), channel_heap_.end(),
                    ChannelHeapCompare);
+
+    if (timestamp_mergers_[channel_index].has_timestamps()) {
+      timestamp_heap_.erase(std::find_if(
+          timestamp_heap_.begin(), timestamp_heap_.end(),
+          [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
+            return x.second == channel_index;
+          }));
+      std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                     ChannelHeapCompare);
+    }
   }
 
   channel_heap_.push_back(std::make_pair(timestamp, channel_index));
@@ -1017,22 +1081,40 @@
   // put the oldest message first.
   std::push_heap(channel_heap_.begin(), channel_heap_.end(),
                  ChannelHeapCompare);
+
+  if (timestamp_mergers_[channel_index].has_timestamps()) {
+    timestamp_heap_.push_back(std::make_pair(timestamp, channel_index));
+    std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                   ChannelHeapCompare);
+  }
 }
 
 std::tuple<TimestampMerger::DeliveryTimestamp, int,
            FlatbufferVector<MessageHeader>>
 ChannelMerger::PopOldest() {
-  CHECK(channel_heap_.size() > 0);
+  CHECK_GT(channel_heap_.size(), 0u);
   std::pair<monotonic_clock::time_point, int> oldest_channel_data =
       channel_heap_.front();
   int channel_index = oldest_channel_data.second;
   std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
                 &ChannelHeapCompare);
   channel_heap_.pop_back();
+
   timestamp_mergers_[channel_index].set_pushed(false);
 
   TimestampMerger *merger = &timestamp_mergers_[channel_index];
 
+  if (merger->has_timestamps()) {
+    CHECK_GT(timestamp_heap_.size(), 0u);
+    std::pair<monotonic_clock::time_point, int> oldest_timestamp_data =
+        timestamp_heap_.front();
+    CHECK(oldest_timestamp_data == oldest_channel_data)
+        << ": Timestamp heap out of sync.";
+    std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
+                  &ChannelHeapCompare);
+    timestamp_heap_.pop_back();
+  }
+
   // Merger handles any queueing needed from here.
   std::tuple<TimestampMerger::DeliveryTimestamp,
              FlatbufferVector<MessageHeader>>
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index e5e0175..4ab4dca 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -302,6 +302,10 @@
   std::string DebugString(int channel) const;
   std::string DebugString(int channel, int node_index) const;
 
+  // Returns true if all the messages have been queued from the last log file in
+  // the list of log files chunks.
+  bool at_end() const { return at_end_; }
+
  private:
   // TODO(austin): Need to copy or refcount the message instead of running
   // multiple copies of the reader.  Or maybe have a "as_node" index and hide it
@@ -441,6 +445,9 @@
   // The caller can determine what the appropriate action is to recover.
   std::tuple<DeliveryTimestamp, FlatbufferVector<MessageHeader>> PopOldest();
 
+  // Returns the oldest forwarding timestamp.
+  DeliveryTimestamp OldestTimestamp() const;
+
   // Tracks if the channel merger has pushed this onto it's heap or not.
   bool pushed() { return pushed_; }
   // Sets if this has been pushed to the channel merger heap.  Should only be
@@ -450,6 +457,13 @@
   // Returns a debug string with the heaps printed out.
   std::string DebugString() const;
 
+  // Returns true if we have timestamps.
+  bool has_timestamps() const { return has_timestamps_; }
+
+  // Records that one of the log files ran out of data.  This should only be
+  // called by a SplitMessageReader.
+  void NoticeAtEnd();
+
  private:
   // Pushes messages and timestamps to the corresponding heaps.
   void PushMessageHeap(
@@ -536,6 +550,12 @@
              FlatbufferVector<MessageHeader>>
   PopOldest();
 
+  // Returns the oldest timestamp in the timestamp heap.
+  TimestampMerger::DeliveryTimestamp OldestTimestamp() const;
+  // Returns the oldest timestamp in the timestamp heap for a specific channel.
+  TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
+      int channel) const;
+
   // Returns the config for this set of log files.
   const Configuration *configuration() const {
     return log_file_header()->configuration();
@@ -568,6 +588,15 @@
   // debugging what went wrong.
   std::string DebugString() const;
 
+  // Returns true if one of the log files has finished reading everything.  When
+  // log file chunks are involved, this means that the last chunk in a log file
+  // has been read.  It is acceptable to be missing data at this point in time.
+  bool at_end() const { return at_end_; }
+
+  // Marks that one of the log files is at the end.  This should only be called
+  // by timestamp mergers.
+  void NoticeAtEnd() { at_end_ = true; }
+
  private:
   // Pushes the timestamp for new data on the provided channel.
   void PushChannelHeap(monotonic_clock::time_point timestamp,
@@ -584,10 +613,15 @@
 
   // A heap of the channel readers and timestamps for the oldest data in each.
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+  // A heap of just the timestamp channel readers and timestamps for the oldest
+  // data in each.
+  std::vector<std::pair<monotonic_clock::time_point, int>> timestamp_heap_;
 
   // Configured node.
   const Node *node_;
 
+  bool at_end_ = false;
+
   // Cached copy of the list of nodes.
   std::vector<const Node *> nodes_;
 };
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index c7f64ae..57c2c2c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -7,6 +7,7 @@
 #include <sys/uio.h>
 #include <vector>
 
+#include "Eigen/Dense"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logger_generated.h"
@@ -19,6 +20,11 @@
             "If true, drop any forwarding entries with missing data.  If "
             "false, CHECK.");
 
+DEFINE_bool(timestamps_to_csv, false,
+            "If true, write all the time synchronization information to a set "
+            "of CSV files in /tmp/.  This should only be needed when debugging "
+            "time synchronization.");
+
 namespace aos {
 namespace logger {
 
@@ -109,12 +115,12 @@
 // TODO(austin): Set the remote start time to the first time we see a remote
 // message when we are logging those messages separate?  Need to signal what to
 // do, or how to get a good timestamp.
-
 void Logger::WriteHeader() {
   for (const Node *node : log_namer_->nodes()) {
     WriteHeader(node);
   }
 }
+
 void Logger::WriteHeader(const Node *node) {
   // Now write the header with this timestamp in it.
   flatbuffers::FlatBufferBuilder fbb;
@@ -294,14 +300,20 @@
   MakeRemappedConfig();
 
   if (!configuration::MultiNode(configuration())) {
-    auto it = channel_mergers_.insert(std::make_pair(nullptr, State{}));
-    State *state = &(it.first->second);
+    states_.emplace_back(std::make_unique<State>());
+    State *state = states_[0].get();
 
     state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+  } else {
+    states_.resize(configuration()->nodes()->size());
   }
 }
 
-LogReader::~LogReader() { Deregister(); }
+LogReader::~LogReader() { Deregister();
+  if (offset_fp_ != nullptr) {
+    fclose(offset_fp_);
+  }
+}
 
 const Configuration *LogReader::logged_configuration() const {
   return log_file_header_.message().configuration();
@@ -324,17 +336,19 @@
 }
 
 monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
-  auto it = channel_mergers_.find(node);
-  CHECK(it != channel_mergers_.end())
-      << ": Unknown node " << FlatbufferToJson(node);
-  return it->second.channel_merger->monotonic_start_time();
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), node)].get();
+  CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+  return state->channel_merger->monotonic_start_time();
 }
 
 realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
-  auto it = channel_mergers_.find(node);
-  CHECK(it != channel_mergers_.end())
-      << ": Unknown node " << FlatbufferToJson(node);
-  return it->second.channel_merger->realtime_start_time();
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), node)].get();
+  CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+  return state->channel_merger->realtime_start_time();
 }
 
 void LogReader::Register() {
@@ -347,9 +361,10 @@
   event_loop_factory_ = event_loop_factory;
 
   for (const Node *node : configuration::GetNodes(configuration())) {
-    auto it = channel_mergers_.insert(std::make_pair(node, State{}));
-
-    State *state = &(it.first->second);
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    states_[node_index] = std::make_unique<State>();
+    State *state = states_[node_index].get();
 
     state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
 
@@ -361,47 +376,169 @@
     Register(state->event_loop_unique_ptr.get());
   }
 
-  // Basic idea is that we want to
-  //   1) Find the node which booted first.
-  //   2) Setup the clocks so that each clock is at the time it would be at when
-  //      the first node booted.
+  // We need to now seed our per-node time offsets and get everything set up to
+  // run.
+  const size_t num_nodes = !configuration::MultiNode(logged_configuration())
+                               ? 1u
+                               : logged_configuration()->nodes()->size();
 
-  realtime_clock::time_point earliest_boot_time = realtime_clock::max_time;
-  for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
-    State *state = &(state_pair.second);
+  // It is easiest to solve for per node offsets with a matrix rather than
+  // trying to solve the equations by hand.  So let's get after it.
+  //
+  // Now, build up the map matrix.
+  //
+  // sample_matrix_ = map_matrix_ * offset_matrix_
+  map_matrix_ = Eigen::MatrixXd::Zero(filters_.size() + 1, num_nodes);
 
-    const realtime_clock::time_point boot_time =
-        state->channel_merger->realtime_start_time() -
-        state->channel_merger->monotonic_start_time().time_since_epoch();
+  sample_matrix_ = Eigen::VectorXd::Zero(filters_.size() + 1);
+  offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
 
-    if (boot_time < earliest_boot_time) {
-      earliest_boot_time = boot_time;
+  // And the base offset matrix, which will be a copy of the initial offset
+  // matrix.
+  base_offset_matrix_ =
+      Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>::Zero(
+          num_nodes);
+
+  // All offsets should sum to 0.  Add that as the first constraint in our least
+  // squares.
+  map_matrix_.row(0).setOnes();
+
+  {
+    // Now, add the a - b -> sample elements.
+    size_t i = 1;
+    for (std::pair<const std::tuple<const Node *, const Node *>,
+                   message_bridge::ClippedAverageFilter> &filter : filters_) {
+      const Node *const node_a = std::get<0>(filter.first);
+      const Node *const node_b = std::get<1>(filter.first);
+
+      const size_t node_a_index =
+          configuration::GetNodeIndex(configuration(), node_a);
+      const size_t node_b_index =
+          configuration::GetNodeIndex(configuration(), node_b);
+
+      // +a
+      map_matrix_(i, node_a_index) = 1.0;
+      // -b
+      map_matrix_(i, node_b_index) = -1.0;
+
+      // -> sample
+      filter.second.set_sample_pointer(&sample_matrix_(i, 0));
+
+      ++i;
     }
   }
 
+  // Rank of the map matrix tells you if all the nodes are in communication with
+  // each other, which tells you if the offsets are observable.
+  const size_t connected_nodes =
+      Eigen::FullPivLU<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>>(
+          map_matrix_)
+          .rank();
+
+  // We don't need to support isolated nodes until someone has a real use case.
+  CHECK_EQ(connected_nodes, num_nodes)
+      << ": There is a node which isn't communicating with the rest.";
+
+  // Now, iterate through all the timestamps from all the nodes and seed
+  // everything.
+  for (std::unique_ptr<State> &state : states_) {
+    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+      TimestampMerger::DeliveryTimestamp timestamp =
+          state->channel_merger->OldestTimestampForChannel(i);
+      if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
+        CHECK(state->MaybeUpdateTimestamp(timestamp, i));
+      }
+    }
+  }
+
+  // Make sure all the samples have been seeded.
+  for (int i = 1; i < sample_matrix_.cols(); ++i) {
+    // The seeding logic is pretty basic right now because we don't have great
+    // use cases yet.  It wants to see data from every node.  Blow up for now,
+    // and once we have a reason to do something different, update this logic.
+    // Maybe read further in the log file?  Or seed off the realtime time?
+    CHECK_NE(sample_matrix_(i, 0), 0.0)
+        << ": Sample " << i << " is not seeded.";
+  }
+
+  // And solve.
+  offset_matrix_ = SolveOffsets();
+
+  // Save off the base offsets so we can work in deltas from here out.  That
+  // will significantly simplify the numerical precision problems.
+  for (size_t i = 0; i < num_nodes; ++i) {
+    base_offset_matrix_(i, 0) =
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            std::chrono::duration<double>(offset_matrix_(i, 0)));
+  }
+
+  {
+    // Shift everything so we never could (reasonably) require the distributed
+    // clock to have a large backwards jump in time.  This makes it so the boot
+    // time on the node up the longest will essentially start matching the
+    // distributed clock.
+    const chrono::nanoseconds offset = -base_offset_matrix_.maxCoeff();
+    for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
+      base_offset_matrix_(i, 0) += offset;
+    }
+  }
+
+  {
+    // Re-compute the samples and setup all the filters so that they
+    // subtract this base offset.
+
+    size_t i = 1;
+    for (std::pair<const std::tuple<const Node *, const Node *>,
+                   message_bridge::ClippedAverageFilter> &filter : filters_) {
+      CHECK(filter.second.sample_pointer() == &sample_matrix_(i, 0));
+
+      const Node *const node_a = std::get<0>(filter.first);
+      const Node *const node_b = std::get<1>(filter.first);
+
+      const size_t node_a_index =
+          configuration::GetNodeIndex(configuration(), node_a);
+      const size_t node_b_index =
+          configuration::GetNodeIndex(configuration(), node_b);
+
+      filter.second.set_base_offset(base_offset_matrix_(node_a_index) -
+                                    base_offset_matrix_(node_b_index));
+
+      ++i;
+    }
+  }
+
+  // Now, iterate again through all the offsets now that we have set the base
+  // offset to something sane.  This will seed everything with an accurate
+  // initial offset.
+  for (std::unique_ptr<State> &state : states_) {
+    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+      TimestampMerger::DeliveryTimestamp timestamp =
+          state->channel_merger->OldestTimestampForChannel(i);
+      if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
+        CHECK(state->MaybeUpdateTimestamp(timestamp, i));
+      }
+    }
+  }
+
+  UpdateOffsets();
+
   // We want to start the log file at the last start time of the log files from
   // all the nodes.  Compute how long each node's simulation needs to run to
   // move time to this point.
-  monotonic_clock::duration run_time = monotonic_clock::duration(0);
+  distributed_clock::time_point start_time = distributed_clock::min_time;
 
-  for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
-    State *state = &(state_pair.second);
-
-    const realtime_clock::time_point boot_time =
-        state->channel_merger->realtime_start_time() -
-        state->channel_merger->monotonic_start_time().time_since_epoch();
-
-    // And start each node's clocks so the realtime clocks line up for the start
-    // times.  This will let us start using it, but isn't good enough.
-    state->node_event_loop_factory->SetMonotonicNow(
-        monotonic_clock::time_point(earliest_boot_time - boot_time));
+  for (std::unique_ptr<State> &state : states_) {
+    // Setup the realtime clock to have something sane in it now.
     state->node_event_loop_factory->SetRealtimeOffset(
         state->channel_merger->monotonic_start_time(),
         state->channel_merger->realtime_start_time());
-    run_time =
-        std::max(run_time, state->channel_merger->monotonic_start_time() -
-                               state->node_event_loop_factory->monotonic_now());
+    // And start computing the start time on the distributed clock now that that
+    // works.
+    start_time = std::max(start_time,
+                          state->node_event_loop_factory->ToDistributedClock(
+                              state->channel_merger->monotonic_start_time()));
   }
+  CHECK_GE(start_time, distributed_clock::epoch());
 
   // Forwarding is tracked per channel.  If it is enabled, we want to turn it
   // off.  Otherwise messages replayed will get forwarded across to the other
@@ -413,9 +550,8 @@
       const Node *node = configuration::GetNode(
           configuration(), channel->source_node()->string_view());
 
-      auto state_pair = channel_mergers_.find(node);
-      CHECK(state_pair != channel_mergers_.end());
-      State *state = &(state_pair->second);
+      State *state =
+          states_[configuration::GetNodeIndex(configuration(), node)].get();
 
       const Channel *remapped_channel =
           RemapChannel(state->event_loop, channel);
@@ -428,16 +564,90 @@
   // to timestamps on log files where the timestamp log file starts before the
   // data.  In this case, it is reasonable to expect missing data.
   ignore_missing_data_ = true;
-  event_loop_factory_->RunFor(run_time);
+  event_loop_factory_->RunFor(start_time.time_since_epoch());
   // Now that we are running for real, missing data means that the log file is
   // corrupted or went wrong.
   ignore_missing_data_ = false;
 }
 
+void LogReader::UpdateOffsets() {
+  // TODO(austin): Evaluate less accurate inverses.  We might be able to
+  // do some tricks to keep the accuracy up.
+  offset_matrix_ = SolveOffsets();
+
+  size_t node_index = 0;
+  for (std::unique_ptr<State> &state : states_) {
+    state->node_event_loop_factory->SetDistributedOffset(offset(node_index));
+    ++node_index;
+  }
+}
+
+std::tuple<message_bridge::ClippedAverageFilter *, bool> LogReader::GetFilter(
+    const Node *node_a, const Node *node_b) {
+  CHECK_NE(node_a, node_b);
+  CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
+  CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
+
+  if (node_a > node_b) {
+    return std::make_pair(std::get<0>(GetFilter(node_b, node_a)), false);
+  }
+
+  auto tuple = std::make_tuple(node_a, node_b);
+
+  auto it = filters_.find(tuple);
+
+  if (it == filters_.end()) {
+    auto &x = filters_
+                  .insert(std::make_pair(
+                      tuple, message_bridge::ClippedAverageFilter()))
+                  .first->second;
+    if (FLAGS_timestamps_to_csv) {
+      std::string fwd_name =
+          absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_",
+                       node_b->name()->string_view(), ".csv");
+      x.fwd_fp = fopen(fwd_name.c_str(), "w");
+      std::string rev_name =
+          absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_",
+                       node_a->name()->string_view(), ".csv");
+      x.rev_fp = fopen(rev_name.c_str(), "w");
+    }
+
+    return std::make_tuple(&x, true);
+  } else {
+    return std::make_tuple(&(it->second), true);
+  }
+}
+
+bool LogReader::State::MaybeUpdateTimestamp(
+    const TimestampMerger::DeliveryTimestamp &channel_timestamp,
+    int channel_index) {
+  if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
+    return false;
+  }
+
+  // Got a forwarding timestamp!
+  CHECK(std::get<0>(filters[channel_index]) != nullptr);
+
+  // Call the correct method depending on if we are the forward or reverse
+  // direction here.
+  if (std::get<1>(filters[channel_index])) {
+    std::get<0>(filters[channel_index])
+        ->FwdSample(channel_timestamp.monotonic_event_time,
+                    channel_timestamp.monotonic_event_time -
+                        channel_timestamp.monotonic_remote_time);
+  } else {
+    std::get<0>(filters[channel_index])
+        ->RevSample(channel_timestamp.monotonic_event_time,
+                    channel_timestamp.monotonic_event_time -
+                        channel_timestamp.monotonic_remote_time);
+  }
+  return true;
+}
+
 void LogReader::Register(EventLoop *event_loop) {
-  auto state_pair = channel_mergers_.find(event_loop->node());
-  CHECK(state_pair != channel_mergers_.end());
-  State *state = &(state_pair->second);
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
+          .get();
 
   state->event_loop = event_loop;
 
@@ -450,12 +660,29 @@
   state->channel_merger->SetNode(event_loop->node());
 
   state->channels.resize(logged_configuration()->channels()->size());
+  state->filters.resize(state->channels.size());
+
+  state->channel_target_event_loop_factory.resize(state->channels.size());
 
   for (size_t i = 0; i < state->channels.size(); ++i) {
     const Channel *channel =
         RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
 
     state->channels[i] = event_loop->MakeRawSender(channel);
+
+    state->filters[i] = std::make_tuple(nullptr, false);
+
+    if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
+        configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
+      const Node *target_node = configuration::GetNode(
+          event_loop->configuration(), channel->source_node()->string_view());
+      state->filters[i] = GetFilter(event_loop->node(), target_node);
+
+      if (event_loop_factory_ != nullptr) {
+        state->channel_target_event_loop_factory[i] =
+            event_loop_factory_->GetNodeEventLoopFactory(target_node);
+      }
+    }
   }
 
   state->timer_handler = event_loop->AddTimer([this, state]() {
@@ -466,6 +693,7 @@
       }
       return;
     }
+    bool update_offsets = false;
     TimestampMerger::DeliveryTimestamp channel_timestamp;
     int channel_index;
     FlatbufferVector<MessageHeader> channel_data =
@@ -477,20 +705,60 @@
     const monotonic_clock::time_point monotonic_now =
         state->event_loop->context().monotonic_event_time;
     CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
-        << ": Now " << monotonic_now.time_since_epoch().count()
-        << " trying to send "
-        << channel_timestamp.monotonic_event_time.time_since_epoch().count();
+        << ": " << FlatbufferToJson(state->event_loop->node()) << " Now "
+        << monotonic_now << " trying to send "
+        << channel_timestamp.monotonic_event_time << " failure "
+        << state->channel_merger->DebugString();
 
     if (channel_timestamp.monotonic_event_time >
             state->channel_merger->monotonic_start_time() ||
         event_loop_factory_ != nullptr) {
-      if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries) ||
+      if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
+           !state->channel_merger->at_end()) ||
           channel_data.message().data() != nullptr) {
         CHECK(channel_data.message().data() != nullptr)
             << ": Got a message without data.  Forwarding entry which was "
                "not matched?  Use --skip_missing_forwarding_entries to ignore "
                "this.";
 
+        if (state->MaybeUpdateTimestamp(channel_timestamp, channel_index)) {
+          // Confirm that the message was sent on the sending node before the
+          // destination node (this node).  As a proxy, do this by making sure
+          // that time on the source node is past when the message was sent.
+          CHECK_LT(channel_timestamp.monotonic_remote_time,
+                   state->channel_target_event_loop_factory[channel_index]
+                       ->monotonic_now());
+
+          update_offsets = true;
+
+          if (FLAGS_timestamps_to_csv) {
+            if (offset_fp_ == nullptr) {
+              offset_fp_ = fopen("/tmp/offsets.csv", "w");
+              fprintf(
+                  offset_fp_,
+                  "# time_since_start, offset node 0, offset node 1, ...\n");
+              first_time_ = channel_timestamp.realtime_event_time;
+            }
+
+            fprintf(offset_fp_, "%.9f",
+                    std::chrono::duration_cast<std::chrono::duration<double>>(
+                        channel_timestamp.realtime_event_time - first_time_)
+                        .count());
+            for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
+              fprintf(
+                  offset_fp_, ", %.9f",
+                  offset_matrix_(i, 0) +
+                      std::chrono::duration_cast<std::chrono::duration<double>>(
+                          base_offset_matrix_(i, 0))
+                          .count());
+            }
+            fprintf(offset_fp_, "\n");
+          }
+
+        } else {
+          CHECK(std::get<0>(state->filters[channel_index]) == nullptr);
+        }
+
         // If we have access to the factory, use it to fix the realtime time.
         if (state->node_event_loop_factory != nullptr) {
           state->node_event_loop_factory->SetRealtimeOffset(
@@ -504,7 +772,16 @@
             channel_timestamp.monotonic_remote_time,
             channel_timestamp.realtime_remote_time,
             channel_timestamp.remote_queue_index);
+      } else if (state->channel_merger->at_end()) {
+        // We are at the end of the log file and found missing data.  Finish
+        // reading the rest of the log file and call it quits.  We don't want to
+        // replay partial data.
+        while (state->channel_merger->OldestMessage() !=
+               monotonic_clock::max_time) {
+          state->channel_merger->PopOldest();
+        }
       }
+
     } else {
       LOG(WARNING)
           << "Not sending data from before the start of the log file. "
@@ -526,6 +803,13 @@
                                     std::chrono::nanoseconds(1));
       }
     }
+
+    // Once we make this call, the current time changes.  So do everything which
+    // involves time before changing it.  That especially includes sending the
+    // message.
+    if (update_offsets) {
+      UpdateOffsets();
+    }
   });
 
   ++live_nodes_;
@@ -540,10 +824,7 @@
 void LogReader::Deregister() {
   // Make sure that things get destroyed in the correct order, rather than
   // relying on getting the order correct in the class definition.
-  for (const Node *node : Nodes()) {
-    auto state_pair = channel_mergers_.find(node);
-    CHECK(state_pair != channel_mergers_.end());
-    State *state = &(state_pair->second);
+  for (std::unique_ptr<State> &state : states_) {
     for (size_t i = 0; i < state->channels.size(); ++i) {
       state->channels[i].reset();
     }
@@ -578,8 +859,8 @@
 }
 
 void LogReader::MakeRemappedConfig() {
-  for (std::pair<const Node *const, State> &state : channel_mergers_) {
-    CHECK(!state.second.event_loop)
+  for (std::unique_ptr<State> &state : states_) {
+    CHECK(!state->event_loop)
         << ": Can't change the mapping after the events are scheduled.";
   }
 
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e0350bb..34dbd24 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -1,15 +1,19 @@
 #ifndef AOS_EVENTS_LOGGER_H_
 #define AOS_EVENTS_LOGGER_H_
 
+#include <chrono>
 #include <deque>
 #include <string_view>
 #include <vector>
 
+#include "Eigen/Dense"
+#include "absl/strings/str_cat.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_filter.h"
 #include "aos/time/time.h"
 #include "flatbuffers/flatbuffers.h"
 
@@ -174,6 +178,7 @@
       data_writers_;
 };
 
+
 // Logs all channels available in the event loop to disk every 100 ms.
 // Start by logging one message per channel to capture any state and
 // configuration that is sent rately on a channel and would affect execution.
@@ -359,6 +364,8 @@
   // the header is likely distracting.
   FlatbufferVector<LogFileHeader> log_file_header_;
 
+  Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
+
   // State per node.
   struct State {
     // Log file.
@@ -373,10 +380,68 @@
     EventLoop *event_loop = nullptr;
     // And timer used to send messages.
     TimerHandler *timer_handler;
+
+    // Updates the timestamp filter with the timestamp.  Returns true if the
+    // provided timestamp was actually a forwarding timestamp and used, and
+    // false otherwise.
+    bool MaybeUpdateTimestamp(
+        const TimestampMerger::DeliveryTimestamp &channel_timestamp,
+        int channel_index);
+
+    // Filters (or nullptr if it isn't a forwarded channel) for each channel.
+    // This corresponds to the object which is shared among all the channels
+    // going between 2 nodes.  The second element in the tuple indicates if this
+    // is the primary direction or not.
+    std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
+        filters;
+
+    // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
+    // channel) which correspond to the originating node.
+    std::vector<NodeEventLoopFactory *> channel_target_event_loop_factory;
   };
 
-  // Map of nodes to States used to hold all the state for all the nodes.
-  std::map<const Node *, State> channel_mergers_;
+  // Node index -> State.
+  std::vector<std::unique_ptr<State>> states_;
+
+  // Creates the requested filter if it doesn't exist, regardless of whether
+  // these nodes can actually communicate directly.  The second return value
+  // reports if this is the primary direction or not.
+  std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
+      const Node *node_a, const Node *node_b);
+
+  // FILE to write offsets to (if populated).
+  FILE *offset_fp_ = nullptr;
+  // Timestamp of the first piece of data used for the horizontal axis on the
+  // plot.
+  aos::realtime_clock::time_point first_time_;
+
+  // List of filters for a connection.  The pointer to the first node will be
+  // less than the second node.
+  std::map<std::tuple<const Node *, const Node *>,
+           message_bridge::ClippedAverageFilter>
+      filters_;
+
+  // Returns the offset from the monotonic clock for a node to the distributed
+  // clock.  distributed = monotonic + offset;
+  std::chrono::nanoseconds offset(int node_index) const {
+    return -std::chrono::duration_cast<std::chrono::nanoseconds>(
+               std::chrono::duration<double>(offset_matrix_(node_index))) -
+           base_offset_matrix_(node_index);
+  }
+
+  // Updates the offset matrix solution and sets the per-node distributed
+  // offsets in the factory.
+  void UpdateOffsets();
+
+  // sample_matrix_ = map_matrix_ * offset_matrix_
+  Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
+  Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
+  Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
+
+  // Base offsets.  The actual offset is the sum of this and the offset matrix.
+  // This removes some of the dynamic range challenges from the double above.
+  Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
+      base_offset_matrix_;
 
   std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
       remapped_configuration_buffer_;
diff --git a/aos/events/logging/logger_math.cc b/aos/events/logging/logger_math.cc
new file mode 100644
index 0000000..313894b
--- /dev/null
+++ b/aos/events/logging/logger_math.cc
@@ -0,0 +1,16 @@
+#include "aos/events/logging/logger.h"
+
+#include "Eigen/Dense"
+
+namespace aos {
+namespace logger {
+
+// This is slow to compile, so we put it in a separate file.  More parallelism
+// and less change.
+Eigen::Matrix<double, Eigen::Dynamic, 1> LogReader::SolveOffsets() {
+  return map_matrix_.bdcSvd(Eigen::ComputeThinU | Eigen::ComputeThinV)
+      .solve(sample_matrix_);
+}
+
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 9f2969e..9e69ae4 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -325,10 +325,10 @@
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
     Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
-                      std::chrono::milliseconds(100));
+                      chrono::milliseconds(100));
 
     Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
-                      std::chrono::milliseconds(100));
+                      chrono::milliseconds(100));
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
@@ -547,12 +547,12 @@
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
     Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
-                      std::chrono::milliseconds(100));
+                      chrono::milliseconds(100));
 
     event_loop_factory_.RunFor(chrono::milliseconds(200));
 
     Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
-                      std::chrono::milliseconds(100));
+                      chrono::milliseconds(100));
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
@@ -638,12 +638,11 @@
 
   reader.Deregister();
 }
-// TODO(austin): We can write a test which recreates a logfile and confirms that
-// we get it back.  That is the ultimate test.
 
-// Tests that we can read log files where the monotonic clocks don't match
-// correctly.
-TEST_F(MultinodeLoggerTest, MissmatchingTimeStart) {
+// Tests that we can read log files where the monotonic clocks drift and don't
+// match correctly.  While we are here, also test that different ending times
+// also is readable.
+TEST_F(MultinodeLoggerTest, MismatchedClocks) {
   const ::std::string tmpdir(getenv("TEST_TMPDIR"));
   const ::std::string logfile_base = tmpdir + "/multi_logfile";
   const ::std::string logfile1 = logfile_base + "_pi1_data.bfbs";
@@ -659,47 +658,76 @@
   LOG(INFO) << "Logging data to " << logfile1 << " and " << logfile3;
 
   {
-   NodeEventLoopFactory *pi2 = event_loop_factory_.GetNodeEventLoopFactory(pi2_);
-   LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
-             << pi2->realtime_now() << " distributed "
-             << pi2->ToDistributedClock(pi2->monotonic_now());
+    NodeEventLoopFactory *pi2 =
+        event_loop_factory_.GetNodeEventLoopFactory(pi2_);
+    LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
+              << pi2->realtime_now() << " distributed "
+              << pi2->ToDistributedClock(pi2->monotonic_now());
 
-   pi2->SetMonotonicNow(pi2->monotonic_now() + std::chrono::seconds(1000));
-   LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
-             << pi2->realtime_now() << " distributed "
-             << pi2->ToDistributedClock(pi2->monotonic_now());
+    const chrono::nanoseconds initial_pi2_offset = -chrono::seconds(1000);
+    chrono::nanoseconds pi2_offset = initial_pi2_offset;
 
-   std::unique_ptr<EventLoop> ping_event_loop =
-       event_loop_factory_.MakeEventLoop("ping", pi1_);
-   Ping ping(ping_event_loop.get());
-   std::unique_ptr<EventLoop> pong_event_loop =
-       event_loop_factory_.MakeEventLoop("pong", pi2_);
-   Pong pong(pong_event_loop.get());
+    pi2->SetDistributedOffset(pi2_offset);
+    LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
+              << pi2->realtime_now() << " distributed "
+              << pi2->ToDistributedClock(pi2->monotonic_now());
 
-   std::unique_ptr<EventLoop> pi1_logger_event_loop =
-       event_loop_factory_.MakeEventLoop("logger", pi1_);
-   std::unique_ptr<LogNamer> pi1_log_namer =
-       std::make_unique<MultiNodeLogNamer>(
-           logfile_base, pi1_logger_event_loop->configuration(),
-           pi1_logger_event_loop->node());
+    std::unique_ptr<EventLoop> ping_event_loop =
+        event_loop_factory_.MakeEventLoop("ping", pi1_);
+    Ping ping(ping_event_loop.get());
+    std::unique_ptr<EventLoop> pong_event_loop =
+        event_loop_factory_.MakeEventLoop("pong", pi2_);
+    Pong pong(pong_event_loop.get());
 
-   std::unique_ptr<EventLoop> pi2_logger_event_loop =
-       event_loop_factory_.MakeEventLoop("logger", pi2_);
-   std::unique_ptr<LogNamer> pi2_log_namer =
-       std::make_unique<MultiNodeLogNamer>(
-           logfile_base, pi2_logger_event_loop->configuration(),
-           pi2_logger_event_loop->node());
+    std::unique_ptr<EventLoop> pi2_logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger", pi2_);
+    std::unique_ptr<LogNamer> pi2_log_namer =
+        std::make_unique<MultiNodeLogNamer>(
+            logfile_base, pi2_logger_event_loop->configuration(),
+            pi2_logger_event_loop->node());
 
-   event_loop_factory_.RunFor(chrono::milliseconds(95));
+    for (int i = 0; i < 95; ++i) {
+      pi2_offset += chrono::nanoseconds(200);
+      pi2->SetDistributedOffset(pi2_offset);
+      event_loop_factory_.RunFor(chrono::milliseconds(1));
+    }
 
-   Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
-                     std::chrono::milliseconds(100));
+    Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
+                      chrono::milliseconds(100));
 
-   event_loop_factory_.RunFor(chrono::milliseconds(200));
+    event_loop_factory_.RunFor(chrono::milliseconds(200));
 
-   Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
-                     std::chrono::milliseconds(100));
-   event_loop_factory_.RunFor(chrono::milliseconds(20000));
+    {
+      // Run pi1's logger for only part of the time.
+      std::unique_ptr<EventLoop> pi1_logger_event_loop =
+          event_loop_factory_.MakeEventLoop("logger", pi1_);
+      std::unique_ptr<LogNamer> pi1_log_namer =
+          std::make_unique<MultiNodeLogNamer>(
+              logfile_base, pi1_logger_event_loop->configuration(),
+              pi1_logger_event_loop->node());
+
+      Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
+                        chrono::milliseconds(100));
+
+      for (int i = 0; i < 20000; ++i) {
+        pi2_offset += chrono::nanoseconds(200);
+        pi2->SetDistributedOffset(pi2_offset);
+        event_loop_factory_.RunFor(chrono::milliseconds(1));
+      }
+
+      EXPECT_GT(pi2_offset - initial_pi2_offset,
+                event_loop_factory_.send_delay() +
+                    event_loop_factory_.network_delay());
+
+      for (int i = 0; i < 40000; ++i) {
+        pi2_offset -= chrono::nanoseconds(200);
+        pi2->SetDistributedOffset(pi2_offset);
+        event_loop_factory_.RunFor(chrono::milliseconds(1));
+      }
+    }
+
+    // And log a bit more on pi2.
+    event_loop_factory_.RunFor(chrono::milliseconds(400));
   }
 
   LogReader reader(
@@ -785,14 +813,17 @@
       });
 
   log_reader_factory.Run();
-  EXPECT_EQ(pi1_ping_count, 2030);
-  EXPECT_EQ(pi2_ping_count, 2030);
-  EXPECT_EQ(pi1_pong_count, 2030);
-  EXPECT_EQ(pi2_pong_count, 2030);
+  EXPECT_EQ(pi1_ping_count, 6030);
+  EXPECT_EQ(pi2_ping_count, 6030);
+  EXPECT_EQ(pi1_pong_count, 6030);
+  EXPECT_EQ(pi2_pong_count, 6030);
 
   reader.Deregister();
 }
 
+// TODO(austin): We can write a test which recreates a logfile and confirms that
+// we get it back.  That is the ultimate test.
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index 80ffe76..c9bc53e 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -25,14 +25,6 @@
       "max_size": 2048
     },
     {
-      "name": "/aos/roborio",
-      "type": "aos.logging.LogMessageFbs",
-      "source_node": "roborio",
-      "frequency": 200,
-      "num_senders": 20,
-      "max_size": 2048
-    },
-    {
       "name": "/aos/pi1",
       "type": "aos.message_bridge.ServerStatistics",
       "source_node": "pi1",
@@ -51,12 +43,6 @@
       "frequency": 2
     },
     {
-      "name": "/aos/roborio",
-      "type": "aos.message_bridge.ServerStatistics",
-      "source_node": "roborio",
-      "frequency": 2
-    },
-    {
       "name": "/aos/pi1",
       "type": "aos.message_bridge.ClientStatistics",
       "source_node": "pi1",
@@ -75,12 +61,6 @@
       "frequency": 2
     },
     {
-      "name": "/aos/roborio",
-      "type": "aos.message_bridge.ClientStatistics",
-      "source_node": "roborio",
-      "frequency": 2
-    },
-    {
       "name": "/aos/pi1",
       "type": "aos.timing.Report",
       "source_node": "pi1",
@@ -105,14 +85,6 @@
       "max_size": 2048
     },
     {
-      "name": "/aos/roborio",
-      "type": "aos.timing.Report",
-      "source_node": "roborio",
-      "frequency": 50,
-      "num_senders": 20,
-      "max_size": 2048
-    },
-    {
       "name": "/test",
       "type": "aos.examples.Ping",
       "source_node": "pi1",
@@ -199,16 +171,6 @@
     {
       "match": {
         "name": "/aos",
-        "type": "aos.logging.LogMessageFbs",
-        "source_node": "roborio"
-      },
-      "rename": {
-        "name": "/aos/roborio"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
         "type": "aos.timing.Report",
         "source_node": "pi1"
       },
@@ -239,16 +201,6 @@
     {
       "match": {
         "name": "/aos",
-        "type": "aos.timing.Report",
-        "source_node": "roborio"
-      },
-      "rename": {
-        "name": "/aos/roborio"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
         "type": "aos.message_bridge.ServerStatistics",
         "source_node": "pi1"
       },
@@ -279,16 +231,6 @@
     {
       "match": {
         "name": "/aos",
-        "type": "aos.message_bridge.ServerStatistics",
-        "source_node": "roborio"
-      },
-      "rename": {
-        "name": "/aos/roborio"
-      }
-    },
-    {
-      "match": {
-        "name": "/aos",
         "type": "aos.message_bridge.ClientStatistics",
         "source_node": "pi1"
       },
@@ -315,16 +257,6 @@
       "rename": {
         "name": "/aos/pi3"
       }
-    },
-    {
-      "match": {
-        "name": "/aos",
-        "type": "aos.message_bridge.ClientStatistics",
-        "source_node": "roborio"
-      },
-      "rename": {
-        "name": "/aos/roborio"
-      }
     }
   ],
   "nodes": [
@@ -342,11 +274,6 @@
       "name": "pi3",
       "hostname": "raspberrypi3",
       "port": 9971
-    },
-    {
-      "name": "roborio",
-      "hostname": "roboRIO-6971-FRC",
-      "port": 9971
     }
   ],
   "applications": [
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index faa9fe9..cf58b46 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -36,7 +36,6 @@
  public:
   SimulatedWatcher(
       SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
-      NodeEventLoopFactory *node_event_loop_factory,
       const Channel *channel,
       std::function<void(const Context &context, const void *message)> fn);
 
@@ -60,7 +59,6 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedWatcher> event_;
   EventScheduler *scheduler_;
-  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
   SimulatedChannel *simulated_channel_ = nullptr;
 };
@@ -272,7 +270,6 @@
 class SimulatedTimerHandler : public TimerHandler {
  public:
   explicit SimulatedTimerHandler(EventScheduler *scheduler,
-                                 NodeEventLoopFactory *node_event_loop_factory,
                                  SimulatedEventLoop *simulated_event_loop,
                                  ::std::function<void()> fn);
   ~SimulatedTimerHandler() { Disable(); }
@@ -288,7 +285,6 @@
   SimulatedEventLoop *simulated_event_loop_;
   EventHandler<SimulatedTimerHandler> event_;
   EventScheduler *scheduler_;
-  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 
   monotonic_clock::time_point base_;
@@ -298,7 +294,6 @@
 class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
  public:
   SimulatedPhasedLoopHandler(EventScheduler *scheduler,
-                             NodeEventLoopFactory *node_event_loop_factory,
                              SimulatedEventLoop *simulated_event_loop,
                              ::std::function<void(int)> fn,
                              const monotonic_clock::duration interval,
@@ -314,7 +309,6 @@
   EventHandler<SimulatedPhasedLoopHandler> event_;
 
   EventScheduler *scheduler_;
-  NodeEventLoopFactory *node_event_loop_factory_;
   EventScheduler::Token token_;
 };
 
@@ -387,17 +381,17 @@
 
   TimerHandler *AddTimer(::std::function<void()> callback) override {
     CHECK(!is_running());
-    return NewTimer(::std::unique_ptr<TimerHandler>(new SimulatedTimerHandler(
-        scheduler_, node_event_loop_factory_, this, callback)));
+    return NewTimer(::std::unique_ptr<TimerHandler>(
+        new SimulatedTimerHandler(scheduler_, this, callback)));
   }
 
   PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
                                    const monotonic_clock::duration interval,
                                    const monotonic_clock::duration offset =
                                        ::std::chrono::seconds(0)) override {
-    return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
-        new SimulatedPhasedLoopHandler(scheduler_, node_event_loop_factory_,
-                                       this, callback, interval, offset)));
+    return NewPhasedLoop(
+        ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
+            scheduler_, this, callback, interval, offset)));
   }
 
   void OnRun(::std::function<void()> on_run) override {
@@ -483,8 +477,8 @@
     std::function<void(const Context &channel, const void *message)> watcher) {
   TakeWatcher(channel);
 
-  std::unique_ptr<SimulatedWatcher> shm_watcher(new SimulatedWatcher(
-      this, scheduler_, node_event_loop_factory_, channel, std::move(watcher)));
+  std::unique_ptr<SimulatedWatcher> shm_watcher(
+      new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
 
   GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
   NewWatcher(std::move(shm_watcher));
@@ -526,13 +520,12 @@
 
 SimulatedWatcher::SimulatedWatcher(
     SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
-    NodeEventLoopFactory *node_event_loop_factory, const Channel *channel,
+    const Channel *channel,
     std::function<void(const Context &context, const void *message)> fn)
     : WatcherState(simulated_event_loop, channel, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
-      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedWatcher::~SimulatedWatcher() {
@@ -594,10 +587,9 @@
 }
 
 void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
-  token_ = scheduler_->Schedule(
-      node_event_loop_factory_->ToDistributedClock(
-          event_time + simulated_event_loop_->send_delay()),
-      [this]() { simulated_event_loop_->HandleEvent(); });
+  token_ =
+      scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
+                           [this]() { simulated_event_loop_->HandleEvent(); });
 }
 
 void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -643,13 +635,12 @@
 }
 
 SimulatedTimerHandler::SimulatedTimerHandler(
-    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
-    SimulatedEventLoop *simulated_event_loop, ::std::function<void()> fn)
+    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
+    ::std::function<void()> fn)
     : TimerHandler(simulated_event_loop, std::move(fn)),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
-      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
@@ -661,12 +652,10 @@
   repeat_offset_ = repeat_offset;
   if (base < monotonic_now) {
     token_ = scheduler_->Schedule(
-        node_event_loop_factory_->ToDistributedClock(monotonic_now),
-        [this]() { simulated_event_loop_->HandleEvent(); });
+        monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
   } else {
     token_ = scheduler_->Schedule(
-        node_event_loop_factory_->ToDistributedClock(base),
-        [this]() { simulated_event_loop_->HandleEvent(); });
+        base, [this]() { simulated_event_loop_->HandleEvent(); });
   }
   event_.set_event_time(base_);
   simulated_event_loop_->AddEvent(&event_);
@@ -683,8 +672,7 @@
     // Reschedule.
     while (base_ <= monotonic_now) base_ += repeat_offset_;
     token_ = scheduler_->Schedule(
-        node_event_loop_factory_->ToDistributedClock(base_),
-        [this]() { simulated_event_loop_->HandleEvent(); });
+        base_, [this]() { simulated_event_loop_->HandleEvent(); });
     event_.set_event_time(base_);
     simulated_event_loop_->AddEvent(&event_);
   } else {
@@ -703,15 +691,13 @@
 }
 
 SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
-    EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
-    SimulatedEventLoop *simulated_event_loop, ::std::function<void(int)> fn,
-    const monotonic_clock::duration interval,
+    EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
+    ::std::function<void(int)> fn, const monotonic_clock::duration interval,
     const monotonic_clock::duration offset)
     : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
       simulated_event_loop_(simulated_event_loop),
       event_(this),
       scheduler_(scheduler),
-      node_event_loop_factory_(node_event_loop_factory),
       token_(scheduler_->InvalidToken()) {}
 
 SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
@@ -737,29 +723,27 @@
 void SimulatedPhasedLoopHandler::Schedule(
     monotonic_clock::time_point sleep_time) {
   token_ = scheduler_->Schedule(
-      node_event_loop_factory_->ToDistributedClock(sleep_time),
-      [this]() { simulated_event_loop_->HandleEvent(); });
+      sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
   event_.set_event_time(sleep_time);
   simulated_event_loop_->AddEvent(&event_);
 }
 
 NodeEventLoopFactory::NodeEventLoopFactory(
-    EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
-    const Node *node,
+    EventSchedulerScheduler *scheduler_scheduler,
+    SimulatedEventLoopFactory *factory, const Node *node,
     std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
         *raw_event_loops)
-    : scheduler_(scheduler),
-      factory_(factory),
-      node_(node),
-      raw_event_loops_(raw_event_loops) {}
+    : factory_(factory), node_(node), raw_event_loops_(raw_event_loops) {
+  scheduler_scheduler->AddEventScheduler(&scheduler_);
+}
 
 SimulatedEventLoopFactory::SimulatedEventLoopFactory(
     const Configuration *configuration)
     : configuration_(CHECK_NOTNULL(configuration)),
       nodes_(configuration::GetNodes(configuration_)) {
   for (const Node *node : nodes_) {
-    node_factories_.emplace_back(
-        new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
+    node_factories_.emplace_back(new NodeEventLoopFactory(
+        &scheduler_scheduler_, this, node, &raw_event_loops_));
   }
 
   if (configuration::MultiNode(configuration)) {
@@ -797,11 +781,14 @@
 
 ::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
     std::string_view name) {
+  CHECK(!scheduler_.is_running())
+      << ": Can't create an event loop while running";
+
   pid_t tid = tid_;
   ++tid_;
   ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
-      scheduler_, this, &channels_, factory_->configuration(), raw_event_loops_,
-      node_, tid));
+      &scheduler_, this, &channels_, factory_->configuration(),
+      raw_event_loops_, node_, tid));
   result->set_name(name);
   result->set_send_delay(factory_->send_delay());
   return std::move(result);
@@ -812,7 +799,7 @@
        raw_event_loops_) {
     event_loop.second(true);
   }
-  scheduler_.RunFor(duration);
+  scheduler_scheduler_.RunFor(duration);
   for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
        raw_event_loops_) {
     event_loop.second(false);
@@ -824,7 +811,7 @@
        raw_event_loops_) {
     event_loop.second(true);
   }
-  scheduler_.Run();
+  scheduler_scheduler_.Run();
   for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
        raw_event_loops_) {
     event_loop.second(false);
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 9dd3d1f..a7f7920 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -78,7 +78,7 @@
 
   // Stops executing all event loops.  Meant to be called from within an event
   // loop handler.
-  void Exit() { scheduler_.Exit(); }
+  void Exit() { scheduler_scheduler_.Exit(); }
 
   const std::vector<const Node *> &nodes() const { return nodes_; }
 
@@ -92,7 +92,7 @@
 
   // Returns the clock used to synchronize the nodes.
   distributed_clock::time_point distributed_now() const {
-    return scheduler_.distributed_now();
+    return scheduler_scheduler_.distributed_now();
   }
 
   // Returns the configuration used for everything.
@@ -104,7 +104,7 @@
 
  private:
   const Configuration *const configuration_;
-  EventScheduler scheduler_;
+  EventSchedulerScheduler scheduler_scheduler_;
   // List of event loops to manage running and not running for.
   // The function is a callback used to set and clear the running bool on each
   // event loop.
@@ -149,28 +149,28 @@
   // node.
   std::chrono::nanoseconds send_delay() const { return factory_->send_delay(); }
 
+  // TODO(austin): Private for the following?
+
   // Converts a time to the distributed clock for scheduling and cross-node time
   // measurement.
   inline distributed_clock::time_point ToDistributedClock(
       monotonic_clock::time_point time) const;
 
-  // Note: use this very very carefully.  It can cause massive problems.  This
-  // needs to go away as we properly handle time drifting between nodes.
-  void SetMonotonicNow(monotonic_clock::time_point monotonic_now) {
-    monotonic_clock::duration offset = (monotonic_now - this->monotonic_now());
-    monotonic_offset_ += offset;
-    realtime_offset_ -= offset;
+  // Sets the offset between the monotonic clock and the central distributed
+  // clock.  distributed_clock = monotonic_clock + offset.
+  void SetDistributedOffset(std::chrono::nanoseconds monotonic_offset) {
+    scheduler_.SetDistributedOffset(monotonic_offset);
   }
 
  private:
   friend class SimulatedEventLoopFactory;
   NodeEventLoopFactory(
-      EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
-      const Node *node,
+      EventSchedulerScheduler *scheduler_scheduler,
+      SimulatedEventLoopFactory *factory, const Node *node,
       std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
           *raw_event_loops);
 
-  EventScheduler *const scheduler_;
+  EventScheduler scheduler_;
   SimulatedEventLoopFactory *const factory_;
 
   const Node *const node_;
@@ -178,7 +178,6 @@
   std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
       *const raw_event_loops_;
 
-  std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
   std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
 
   // Map from name, type to queue.
@@ -189,8 +188,8 @@
 };
 
 inline monotonic_clock::time_point NodeEventLoopFactory::monotonic_now() const {
-  return monotonic_clock::time_point(
-      factory_->distributed_now().time_since_epoch() + monotonic_offset_);
+  // TODO(austin): Confirm that time never goes backwards?
+  return scheduler_.FromDistributedClock(factory_->distributed_now());
 }
 
 inline realtime_clock::time_point NodeEventLoopFactory::realtime_now() const {
@@ -200,8 +199,7 @@
 
 inline distributed_clock::time_point NodeEventLoopFactory::ToDistributedClock(
     monotonic_clock::time_point time) const {
-  return distributed_clock::time_point(time.time_since_epoch() -
-                                       monotonic_offset_);
+  return scheduler_.ToDistributedClock(time);
 }
 
 }  // namespace aos
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index de5cbae..0b5dbec 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -64,30 +64,33 @@
 // Test that creating an event and running the scheduler runs the event.
 TEST(EventSchedulerTest, ScheduleEvent) {
   int counter = 0;
+  EventSchedulerScheduler scheduler_scheduler;
   EventScheduler scheduler;
+  scheduler_scheduler.AddEventScheduler(&scheduler);
 
-  scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+  scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
                      [&counter]() { counter += 1; });
-  scheduler.Run();
+  scheduler_scheduler.Run();
   EXPECT_EQ(counter, 1);
   auto token =
-      scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(2),
+      scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(2),
                          [&counter]() { counter += 1; });
   scheduler.Deschedule(token);
-  scheduler.Run();
+  scheduler_scheduler.Run();
   EXPECT_EQ(counter, 1);
 }
 
 // Test that descheduling an already scheduled event doesn't run the event.
 TEST(EventSchedulerTest, DescheduleEvent) {
   int counter = 0;
+  EventSchedulerScheduler scheduler_scheduler;
   EventScheduler scheduler;
+  scheduler_scheduler.AddEventScheduler(&scheduler);
 
-  auto token =
-      scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
-                         [&counter]() { counter += 1; });
+  auto token = scheduler.Schedule(monotonic_clock::epoch() + chrono::seconds(1),
+                                  [&counter]() { counter += 1; });
   scheduler.Deschedule(token);
-  scheduler.Run();
+  scheduler_scheduler.Run();
   EXPECT_EQ(counter, 0);
 }
 
diff --git a/aos/network/BUILD b/aos/network/BUILD
index a334a47..11eebdb 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -271,8 +271,8 @@
 
 cc_library(
     name = "web_proxy",
-    hdrs = ["web_proxy.h"],
     srcs = ["web_proxy.cc"],
+    hdrs = ["web_proxy.h"],
     copts = [
         "-DWEBRTC_POSIX",
         "-Wno-unused-parameter",
@@ -283,8 +283,8 @@
         ":web_proxy_utils",
         "//aos/events:shm_event_loop",
         "//aos/seasocks:seasocks_logger",
-        "//third_party/seasocks",
         "//third_party:webrtc",
+        "//third_party/seasocks",
         "@com_github_google_glog//:glog",
     ],
 )
@@ -321,3 +321,22 @@
         "//aos/events:pingpong_config.json",
     ],
 )
+
+cc_library(
+    name = "timestamp_filter",
+    hdrs = ["timestamp_filter.h"],
+    deps = [
+        "//aos/time",
+    ],
+)
+
+cc_test(
+    name = "timestamp_filter_test",
+    srcs = [
+        "timestamp_filter_test.cc",
+    ],
+    deps = [
+        ":timestamp_filter",
+        "//aos/testing:googletest",
+    ],
+)
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
new file mode 100644
index 0000000..b4280f6
--- /dev/null
+++ b/aos/network/timestamp_filter.h
@@ -0,0 +1,289 @@
+#ifndef AOS_EVENTS_LOGGING_TIMESTAMP_FILTER_H_
+#define AOS_EVENTS_LOGGING_TIMESTAMP_FILTER_H_
+
+#include <stdio.h>
+#include <algorithm>
+#include <chrono>
+#include <cmath>
+
+#include "aos/time/time.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class handles filtering differences between clocks across a network.
+//
+// The basic concept is that network latencies are highly asymmetric.  They will
+// have a good and well-defined minima, and have large maxima.  We also want
+// to guarantee that the filtered line is *always* below the samples.
+//
+// In order to preserve precision, an initial offset (base_offset_) is
+// subtracted from everything.  This should make all the offsets near 0, so we
+// have enough precision to represent nanoseconds, and to not have to worry too
+// much about precision when solving for the global offset.
+class TimestampFilter {
+ public:
+  // Updates with a new sample.  monotonic_now is the timestamp of the sample on
+  // the destination node, and sample_ns is destination_time - source_time.
+  void Sample(aos::monotonic_clock::time_point monotonic_now,
+              std::chrono::nanoseconds sample_ns) {
+    // Compute the sample offset as a double (seconds), taking into account the
+    // base offset.
+    const double sample =
+        std::chrono::duration_cast<std::chrono::duration<double>>(sample_ns -
+                                                                  base_offset_)
+            .count();
+
+    // This is our first sample.  Just use it.
+    if (last_time_ == aos::monotonic_clock::min_time) {
+      offset_ = sample;
+    } else {
+      // Took less time to transmit, so clamp to it.
+      if (sample < offset_) {
+        offset_ = sample;
+      } else {
+        // We split things up into 2 portions.
+        //  1) Each sample has information.  Correct some using it.
+        //  2) We want to keep a decent time constant if the sample rate slows.
+        //     Take time since the last sample into account.
+
+        // Time constant for the low pass filter in seconds.
+        constexpr double kTau = 0.5;
+
+        constexpr double kClampNegative = -0.0003;
+
+        {
+          // 1)
+          constexpr double kAlpha = 0.005;
+          // This formulation is more numerically precise.
+          // Clamp to kClampNegative ms to reduce the effect of wildly large
+          // samples.
+          offset_ =
+              offset_ - kAlpha * std::max(offset_ - sample, kClampNegative);
+        }
+
+        {
+          // 2)
+          //
+          // 1-e^(t/tau) -> alpha
+          const double alpha = -std::expm1(
+              -std::chrono::duration_cast<std::chrono::duration<double>>(
+                   monotonic_now - last_time_)
+                   .count() /
+              kTau);
+
+          // Clamp to kClampNegative ms to reduce the effect of wildly large
+          // samples.
+          offset_ =
+              offset_ - alpha * std::max(offset_ - sample, kClampNegative);
+        }
+      }
+    }
+
+    last_time_ = monotonic_now;
+  }
+
+  // Updates the base_offset, and compensates offset while we are here.
+  void set_base_offset(std::chrono::nanoseconds base_offset) {
+    offset_ -= std::chrono::duration_cast<std::chrono::duration<double>>(
+                   base_offset - base_offset_)
+                   .count();
+    base_offset_ = base_offset;
+    // Clear everything out to avoid any numerical precision problems.
+    last_time_ = aos::monotonic_clock::min_time;
+  }
+
+  double offset() const { return offset_; }
+
+  std::chrono::nanoseconds base_offset() const { return base_offset_; }
+
+  double base_offset_double() const {
+    return std::chrono::duration_cast<std::chrono::duration<double>>(
+               base_offset_)
+        .count();
+  }
+
+  bool has_sample() const {
+    return last_time_ != aos::monotonic_clock::min_time;
+  }
+
+ private:
+  double offset_ = 0;
+
+  aos::monotonic_clock::time_point last_time_ = aos::monotonic_clock::min_time;
+  std::chrono::nanoseconds base_offset_{0};
+};
+
+// This class combines the a -> b offsets with the b -> a offsets and
+// aggressively filters the results.
+struct ClippedAverageFilter {
+  // If not nullptr, timestamps will get written to these two files for
+  // debugging.
+  FILE *fwd_fp = nullptr;
+  FILE *rev_fp = nullptr;
+
+  ~ClippedAverageFilter() {
+    if (fwd_fp != nullptr) {
+      fclose(fwd_fp);
+    }
+    if (rev_fp != nullptr) {
+      fclose(rev_fp);
+    }
+  }
+
+  // Adds a forward sample.  sample_ns = destination - source;  Forward samples
+  // are from A -> B.
+  void FwdSample(aos::monotonic_clock::time_point monotonic_now,
+                 std::chrono::nanoseconds sample_ns) {
+    fwd_.Sample(monotonic_now, sample_ns);
+    Update(monotonic_now, &last_fwd_time_);
+
+    if (fwd_fp != nullptr) {
+      if (first_fwd_time_ == aos::monotonic_clock::min_time) {
+        first_fwd_time_ = monotonic_now;
+      }
+      fprintf(
+          fwd_fp, "%f, %f, %f, %f\n",
+          std::chrono::duration_cast<std::chrono::duration<double>>(
+              monotonic_now - first_fwd_time_)
+              .count(),
+          std::chrono::duration_cast<std::chrono::duration<double>>(sample_ns)
+              .count(),
+          fwd_.offset() + fwd_.base_offset_double(),
+          std::chrono::duration_cast<std::chrono::duration<double>>(offset())
+              .count());
+    }
+  }
+
+  // Adds a reverse sample.  sample_ns = destination - source;  Reverse samples
+  // are B -> A.
+  void RevSample(aos::monotonic_clock::time_point monotonic_now,
+                 std::chrono::nanoseconds sample_ns) {
+    rev_.Sample(monotonic_now, sample_ns);
+    Update(monotonic_now, &last_rev_time_);
+
+    if (rev_fp != nullptr) {
+      if (first_rev_time_ == aos::monotonic_clock::min_time) {
+        first_rev_time_ = monotonic_now;
+      }
+      fprintf(
+          rev_fp, "%f, %f, %f, %f\n",
+          std::chrono::duration_cast<std::chrono::duration<double>>(
+              monotonic_now - first_rev_time_)
+              .count(),
+          std::chrono::duration_cast<std::chrono::duration<double>>(sample_ns)
+              .count(),
+          rev_.offset() + rev_.base_offset_double(),
+          std::chrono::duration_cast<std::chrono::duration<double>>(offset())
+              .count());
+    }
+  }
+
+  // Returns the overall filtered offset, offseta - offsetb.
+  std::chrono::nanoseconds offset() const {
+    return std::chrono::duration_cast<std::chrono::nanoseconds>(
+               std::chrono::duration<double>(offset_)) +
+           base_offset_;
+  }
+
+  // Returns the offset as a double.
+  double offset_double() const {
+    return std::chrono::duration_cast<std::chrono::duration<double>>(offset())
+        .count();
+  }
+
+  // Sets the sample pointer.  This address gets set every time the sample
+  // changes.  Makes it really easy to place in a matrix and solve.
+  void set_sample_pointer(double *sample_pointer) {
+    sample_pointer_ = sample_pointer;
+  }
+
+  double *sample_pointer() { return sample_pointer_; }
+
+  // Sets the base offset.  This is used to reduce the dynamic range needed from
+  // the double to something manageable.  It is subtracted from offset_.
+  void set_base_offset(std::chrono::nanoseconds base_offset) {
+    offset_ -= std::chrono::duration_cast<std::chrono::duration<double>>(
+                   base_offset - base_offset_)
+                   .count();
+    fwd_.set_base_offset(base_offset);
+    rev_.set_base_offset(-base_offset);
+    base_offset_ = base_offset;
+    last_fwd_time_ = aos::monotonic_clock::min_time;
+    last_rev_time_ = aos::monotonic_clock::min_time;
+  }
+
+ private:
+  // Updates the offset estimate given the current time, and a pointer to the
+  // variable holding the last time.
+  void Update(aos::monotonic_clock::time_point monotonic_now,
+              aos::monotonic_clock::time_point *last_time) {
+    // ta = t + offseta
+    // tb = t + offsetb
+    // fwd sample => ta - tb + network -> offseta - offsetb + network
+    // rev sample => tb - ta + network -> offsetb - offseta + network
+    const double hard_max = fwd_.offset();
+    const double hard_min = -rev_.offset();
+    const double average = (hard_max + hard_min) / 2.0;
+    LOG(INFO) << "max " << hard_max << " min " << hard_min;
+    // We don't want to clip the offset to the hard min/max.  We really want to
+    // keep it within a band around the middle.  ratio of 0.5 means stay within
+    // +- 0.25 of the middle of the hard min and max.
+    constexpr double kBand = 0.5;
+    const double max = average + kBand / 2.0 * (hard_max - hard_min);
+    const double min = average - kBand / 2.0 * (hard_max - hard_min);
+
+    // Update regardless for the first sample from both the min and max.
+    if (*last_time == aos::monotonic_clock::min_time) {
+      offset_ = average;
+    } else {
+      // Do just a time constant based update.  We can afford to be slow here
+      // for smoothness.
+      constexpr double kTau = 10.0;
+      const double alpha = -std::expm1(
+          -std::chrono::duration_cast<std::chrono::duration<double>>(
+               monotonic_now - *last_time)
+               .count() /
+          kTau);
+
+      // Clamp it such that it remains in the min/max bounds.
+      offset_ = std::clamp(offset_ - alpha * (offset_ - average), min, max);
+    }
+    *last_time = monotonic_now;
+
+    if (sample_pointer_ != nullptr) {
+      *sample_pointer_ = offset_;
+    }
+  }
+
+  // Filters for both the forward and reverse directions.
+  TimestampFilter fwd_;
+  TimestampFilter rev_;
+
+  // Base offset in nanoseconds.  This is subtracted from all calculations, and
+  // added back to the result when reporting.
+  std::chrono::nanoseconds base_offset_ = std::chrono::nanoseconds(0);
+  // Dynamic part of the offset.
+  double offset_ = 0;
+
+  // Last time we had a sample for a direction.
+  aos::monotonic_clock::time_point last_fwd_time_ =
+      aos::monotonic_clock::min_time;
+  aos::monotonic_clock::time_point last_rev_time_ =
+      aos::monotonic_clock::min_time;
+
+  // First times used for plotting.
+  aos::monotonic_clock::time_point first_fwd_time_ =
+      aos::monotonic_clock::min_time;
+  aos::monotonic_clock::time_point first_rev_time_ =
+      aos::monotonic_clock::min_time;
+
+  // Pointer to copy the sample to when it is updated.
+  double *sample_pointer_ = nullptr;
+};
+
+}  // namespace message_bridge
+}  // namespace aos
+
+#endif  // AOS_EVENTS_LOGGING_TIMESTAMP_FILTER_H_
diff --git a/aos/network/timestamp_filter_test.cc b/aos/network/timestamp_filter_test.cc
new file mode 100644
index 0000000..d06923e
--- /dev/null
+++ b/aos/network/timestamp_filter_test.cc
@@ -0,0 +1,76 @@
+#include "aos/network/timestamp_filter.h"
+
+#include <chrono>
+
+#include "aos/macros.h"
+#include "gtest/gtest.h"
+
+namespace aos {
+namespace message_bridge {
+namespace testing {
+
+namespace chrono = std::chrono;
+
+// Tests that adding samples tracks more negative offsets down quickly, and
+// slowly comes back up.
+TEST(TimestampFilterTest, Sample) {
+  TimestampFilter filter;
+
+  EXPECT_EQ(filter.offset(), 0.0);
+  EXPECT_EQ(filter.base_offset(), chrono::seconds(0));
+  EXPECT_FALSE(filter.has_sample());
+
+  filter.Sample(aos::monotonic_clock::epoch() + chrono::seconds(1),
+                chrono::milliseconds(-100));
+
+  EXPECT_EQ(filter.offset(), -0.1);
+  EXPECT_EQ(filter.base_offset(), chrono::seconds(0));
+  EXPECT_TRUE(filter.has_sample());
+
+  // Further negative -> follow the min down exactly.
+  filter.Sample(aos::monotonic_clock::epoch() + chrono::seconds(2),
+                chrono::milliseconds(-1000));
+
+  EXPECT_EQ(filter.offset(), -1.0);
+  EXPECT_EQ(filter.base_offset(), chrono::seconds(0));
+  EXPECT_TRUE(filter.has_sample());
+
+  // Positive now goes positive, but slower.
+  filter.Sample(aos::monotonic_clock::epoch() + chrono::seconds(3),
+                chrono::milliseconds(0));
+
+  EXPECT_GT(filter.offset(), -0.9999);
+  EXPECT_LT(filter.offset(), -0.999);
+  EXPECT_EQ(filter.base_offset(), chrono::seconds(0));
+  EXPECT_TRUE(filter.has_sample());
+}
+
+// Tests that ClippedAverageFilter tracks between the two filters.
+TEST(ClippedAverageFilterTest, Sample) {
+  ClippedAverageFilter filter;
+
+  // Pass in a sample in both the forward and reverse direction.  We should
+  // expect that the offset should be smack down the middle.
+  filter.FwdSample(aos::monotonic_clock::epoch() + chrono::seconds(1),
+                   chrono::milliseconds(101));
+
+  filter.RevSample(aos::monotonic_clock::epoch() + chrono::seconds(1),
+                   chrono::milliseconds(-100));
+
+  EXPECT_EQ(filter.offset(), chrono::microseconds(100500));
+
+  // Confirm the base offset works too.
+  filter.set_base_offset(chrono::milliseconds(100));
+
+  filter.FwdSample(aos::monotonic_clock::epoch() + chrono::seconds(1),
+                   chrono::milliseconds(101));
+
+  filter.RevSample(aos::monotonic_clock::epoch() + chrono::seconds(1),
+                   chrono::milliseconds(-100));
+
+  EXPECT_EQ(filter.offset(), chrono::microseconds(100500));
+}
+
+}  // namespace testing
+}  // namespace message_bridge
+}  // namespace aos