Make timestamp_to_csv work with multiple boots

The old code used a starting monotonic time to sync all the data on the
same plot.  This doesn't work when reboots are at play.  Rather than
fight that, use the distributed clock for everything.

Rather than push the concept of the distributed clock into the timestamp
filter, pull the concept of logging CSV files completely into the
multinode filter code.  This actually simplifies things a fair amount.

The next step is to take this beautiful set of boot/time correspondances
and plumb it into the simulation code.

Change-Id: Id9336d683a13dd214eeec3af883d0f38a4cfa711
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index ae3c4cc..6a2c95a 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -119,37 +119,38 @@
 
   // Don't get clever. Use the first time as the start time.  Note: this is
   // different than how log_cat and others work.
-  std::optional<
-      std::tuple<distributed_clock::time_point, std::vector<BootTimestamp>>>
-      next_timestamp = multinode_estimator.NextTimestamp();
+  std::optional<const std::tuple<distributed_clock::time_point,
+                                 std::vector<BootTimestamp>> *>
+      next_timestamp = multinode_estimator.QueueNextTimestamp();
   CHECK(next_timestamp);
   LOG(INFO) << "Starting at:";
   for (const Node *node : configuration::GetNodes(config)) {
     const size_t node_index = configuration::GetNodeIndex(config, node);
     LOG(INFO) << "  " << node->name()->string_view() << " -> "
-              << std::get<1>(*next_timestamp)[node_index].time;
+              << std::get<1>(*next_timestamp.value())[node_index].time;
   }
 
   std::vector<monotonic_clock::time_point> just_monotonic(
-      std::get<1>(*next_timestamp).size());
+      std::get<1>(*next_timestamp.value()).size());
   for (size_t i = 0; i < just_monotonic.size(); ++i) {
-    CHECK_EQ(std::get<1>(*next_timestamp)[i].boot, 0u);
-    just_monotonic[i] = std::get<1>(*next_timestamp)[i].time;
+    CHECK_EQ(std::get<1>(*next_timestamp.value())[i].boot, 0u);
+    just_monotonic[i] = std::get<1>(*next_timestamp.value())[i].time;
   }
   multinode_estimator.Start(just_monotonic);
 
   // As we pull off all the timestamps, the time problem is continually solved,
   // filling in the CSV files.
   while (true) {
-    std::optional<
-        std::tuple<distributed_clock::time_point, std::vector<BootTimestamp>>>
-        next_timestamp = multinode_estimator.NextTimestamp();
-    // TODO(austin): Figure out how to make the plot work across reboots.
+    std::optional<const std::tuple<distributed_clock::time_point,
+                                   std::vector<BootTimestamp>> *>
+        next_timestamp = multinode_estimator.QueueNextTimestamp();
     if (!next_timestamp) {
       break;
     }
   }
 
+  LOG(INFO) << "Done";
+
   return 0;
 }
 
diff --git a/aos/events/logging/timestamp_plot.gnuplot b/aos/events/logging/timestamp_plot.gnuplot
index 94ef7e4..1028948 100755
--- a/aos/events/logging/timestamp_plot.gnuplot
+++ b/aos/events/logging/timestamp_plot.gnuplot
@@ -9,9 +9,7 @@
 print "Node1: ", node1
 print "Node2: ", node2
 
-node1_start_time = system("grep " . node1 . " /tmp/timestamp_noncausal_starttime.csv | awk '{print $2}'") + 0
 node1_index = int(system("grep -n " . node1 . " /tmp/timestamp_noncausal_starttime.csv | sed 's/:.*//'")) + 1
-node2_start_time = system("grep " . node2 . " /tmp/timestamp_noncausal_starttime.csv | awk '{print $2}'") + 0
 node2_index = int(system("grep -n " . node2 . " /tmp/timestamp_noncausal_starttime.csv | sed 's/:.*//'")) + 1
 
 noncausalfile12 = sprintf("/tmp/timestamp_noncausal_%s_%s.csv", node1, node2)
@@ -32,7 +30,7 @@
      samplefile21 using 1:(-$2) title 'sample 2-1', \
      noncausalfile12 using 1:3 title 'nc 1-2' with lines, \
      noncausalfile21 using 1:(-$3) title 'nc 2-1' with lines, \
-     offsetfile using ((column(node1_index) - node1_start_time + (column(node2_index) - node2_start_time)) / 2):(column(node2_index) - column(node1_index)) title 'filter 2-1' with linespoints
+     offsetfile using 1:(column(node2_index) - column(node1_index)) title 'filter 2-1' with linespoints
 
 if (ARG3 ne "" ) {
      exit
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 0710926..3b5b997 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -329,39 +329,36 @@
   }
 }
 
+std::optional<const std::tuple<distributed_clock::time_point,
+                               std::vector<BootTimestamp>> *>
+InterpolatedTimeConverter::QueueNextTimestamp() {
+  std::optional<
+      std::tuple<distributed_clock::time_point, std::vector<BootTimestamp>>>
+      next_time = NextTimestamp();
+  if (!next_time) {
+    VLOG(1) << "Last timestamp, calling it quits";
+    at_end_ = true;
+    return std::nullopt;
+  }
+
+  VLOG(1) << "Fetched next timestamp while solving: " << std::get<0>(*next_time)
+          << " ->";
+  for (BootTimestamp t : std::get<1>(*next_time)) {
+    VLOG(1) << "  " << t;
+  }
+
+  // TODO(austin): Figure out how to communicate the reboot up to the factory.
+  CHECK_EQ(node_count_, std::get<1>(*next_time).size());
+  times_.emplace_back(std::move(*next_time));
+  return &times_.back();
+}
+
 void InterpolatedTimeConverter::QueueUntil(
-    std::function<
-        bool(const std::tuple<distributed_clock::time_point,
-                              std::vector<monotonic_clock::time_point>> &)>
+    std::function<bool(const std::tuple<distributed_clock::time_point,
+                                        std::vector<BootTimestamp>> &)>
         not_done) {
   while (!at_end_ && (times_.empty() || not_done(times_.back()))) {
-    std::optional<
-        std::tuple<distributed_clock::time_point, std::vector<BootTimestamp>>>
-        next_time = NextTimestamp();
-
-    if (!next_time) {
-      VLOG(1) << "Last timestamp, calling it quits";
-      at_end_ = true;
-      break;
-    }
-
-    VLOG(1) << "Fetched next timestamp while solving: "
-            << std::get<0>(*next_time) << " ->";
-    for (BootTimestamp t : std::get<1>(*next_time)) {
-      VLOG(1) << "  " << t;
-    }
-
-    // TODO(austin): Figure out how to communicate the reboot up to the factory.
-    std::vector<monotonic_clock::time_point> just_monotonic(
-        std::get<1>(*next_time).size());
-    for (size_t i = 0; i < just_monotonic.size(); ++i) {
-      CHECK_EQ(std::get<1>(*next_time)[i].boot, 0u);
-      just_monotonic[i] = std::get<1>(*next_time)[i].time;
-    }
-
-    CHECK_EQ(node_count_, std::get<1>(*next_time).size());
-    times_.emplace_back(
-        std::make_tuple(std::get<0>(*next_time), std::move(just_monotonic)));
+    QueueNextTimestamp();
   }
 
   CHECK(!times_.empty())
@@ -391,56 +388,10 @@
   }
 }
 
-distributed_clock::time_point InterpolatedTimeConverter::ToDistributedClock(
-    size_t node_index, monotonic_clock::time_point time) {
-  CHECK_LT(node_index, node_count_);
-  // If there is only one node, time estimation makes no sense.  Just return
-  // unity time.
-  if (node_count_ == 1u) {
-    return distributed_clock::epoch() + time.time_since_epoch();
-  }
-
-  // Make sure there are enough timestamps in the queue.
-  QueueUntil(
-      [time, node_index](
-          const std::tuple<distributed_clock::time_point,
-                           std::vector<monotonic_clock::time_point>> &t) {
-        return std::get<1>(t)[node_index] < time;
-      });
-
-  // Before the beginning needs to have 0 slope otherwise time jumps when
-  // timestamp 2 happens.
-  if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index]) {
-    if (time < std::get<1>(times_[0])[node_index]) {
-      CHECK(!have_popped_)
-          << ": Trying to interpolate time " << time
-          << " but we have forgotten the relevant points already.";
-    }
-    const distributed_clock::time_point result =
-        time - std::get<1>(times_[0])[node_index] + std::get<0>(times_[0]);
-    VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
-            << result;
-    return result;
-  }
-
-  // Now, find the corresponding timestamps.  Search from the back since that's
-  // where most of the times we care about will be.
-  size_t index = times_.size() - 2u;
-  while (index > 0u) {
-    if (std::get<1>(times_[index])[node_index] <= time) {
-      break;
-    }
-    --index;
-  }
-
-  // Interpolate with the two of these.
-  const distributed_clock::time_point d0 = std::get<0>(times_[index]);
-  const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
-
-  const monotonic_clock::time_point t0 = std::get<1>(times_[index])[node_index];
-  const monotonic_clock::time_point t1 =
-      std::get<1>(times_[index + 1])[node_index];
-
+distributed_clock::time_point ToDistributedClock(
+    distributed_clock::time_point d0, distributed_clock::time_point d1,
+    monotonic_clock::time_point t0, monotonic_clock::time_point t1,
+    monotonic_clock::time_point time) {
   const chrono::nanoseconds dt = (t1 - t0);
 
   CHECK_NE(dt.count(), 0u) << " t0 " << t0 << " t1 " << t1 << " d0 " << d0
@@ -457,9 +408,67 @@
       absl::int128((time - t0).count()) * absl::int128((d1 - d0).count());
   numerator += numerator > 0 ? absl::int128(dt.count() / 2)
                              : -absl::int128(dt.count() / 2);
+  return d0 + std::chrono::nanoseconds(
+                  static_cast<int64_t>(numerator / absl::int128(dt.count())));
+}
+
+distributed_clock::time_point InterpolatedTimeConverter::ToDistributedClock(
+    size_t node_index, monotonic_clock::time_point time) {
+  CHECK_LT(node_index, node_count_);
+  // If there is only one node, time estimation makes no sense.  Just return
+  // unity time.
+  if (node_count_ == 1u) {
+    return distributed_clock::epoch() + time.time_since_epoch();
+  }
+
+  // Make sure there are enough timestamps in the queue.
+  QueueUntil(
+      [time, node_index](const std::tuple<distributed_clock::time_point,
+                                          std::vector<BootTimestamp>> &t) {
+        return std::get<1>(t)[node_index].time < time;
+      });
+
+  // Before the beginning needs to have 0 slope otherwise time jumps when
+  // timestamp 2 happens.
+  if (times_.size() == 1u || time < std::get<1>(times_[0])[node_index].time) {
+    if (time < std::get<1>(times_[0])[node_index].time) {
+      CHECK(!have_popped_)
+          << ": Trying to interpolate time " << time
+          << " but we have forgotten the relevant points already.";
+    }
+    const distributed_clock::time_point result =
+        time - std::get<1>(times_[0])[node_index].time + std::get<0>(times_[0]);
+    VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
+            << result;
+    return result;
+  }
+
+  // Now, find the corresponding timestamps.  Search from the back since that's
+  // where most of the times we care about will be.
+  size_t index = times_.size() - 2u;
+  while (index > 0u) {
+    // TODO(austin): Binary search.
+    if (std::get<1>(times_[index])[node_index].time <= time) {
+      break;
+    }
+    --index;
+  }
+
+  // Interpolate with the two of these.
+  const distributed_clock::time_point d0 = std::get<0>(times_[index]);
+  const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
+
+  // TODO(austin): We should extrapolate if the boot changes.
+  CHECK_EQ(std::get<1>(times_[index])[node_index].boot,
+           std::get<1>(times_[index + 1])[node_index].boot);
+  const monotonic_clock::time_point t0 =
+      std::get<1>(times_[index])[node_index].time;
+  const monotonic_clock::time_point t1 =
+      std::get<1>(times_[index + 1])[node_index].time;
+
   const distributed_clock::time_point result =
-      d0 + std::chrono::nanoseconds(
-               static_cast<int64_t>(numerator / absl::int128(dt.count())));
+      message_bridge::ToDistributedClock(d0, d1, t0, t1, time);
+
   VLOG(2) << "ToDistributedClock(" << node_index << ", " << time << ") -> "
           << result;
   return result;
@@ -477,7 +486,7 @@
   // Make sure there are enough timestamps in the queue.
   QueueUntil(
       [time](const std::tuple<distributed_clock::time_point,
-                              std::vector<monotonic_clock::time_point>> &t) {
+                              std::vector<BootTimestamp>> &t) {
         return std::get<0>(t) < time;
       });
 
@@ -488,7 +497,7 @@
           << " but we have forgotten the relevant points already.";
     }
     monotonic_clock::time_point result =
-        time - std::get<0>(times_[0]) + std::get<1>(times_[0])[node_index];
+        time - std::get<0>(times_[0]) + std::get<1>(times_[0])[node_index].time;
     VLOG(2) << "FromDistributedClock(" << node_index << ", " << time << ") -> "
             << result;
     return result;
@@ -508,9 +517,12 @@
   const distributed_clock::time_point d0 = std::get<0>(times_[index]);
   const distributed_clock::time_point d1 = std::get<0>(times_[index + 1]);
 
-  const monotonic_clock::time_point t0 = std::get<1>(times_[index])[node_index];
+  CHECK_EQ(std::get<1>(times_[index])[node_index].boot,
+           std::get<1>(times_[index + 1])[node_index].boot);
+  const monotonic_clock::time_point t0 =
+      std::get<1>(times_[index])[node_index].time;
   const monotonic_clock::time_point t1 =
-      std::get<1>(times_[index + 1])[node_index];
+      std::get<1>(times_[index + 1])[node_index].time;
 
   const chrono::nanoseconds dd = d1 - d0;
 
@@ -559,14 +571,48 @@
       fprintf(fp_, ", %s", node->name()->c_str());
     }
     fprintf(fp_, "\n");
+    filter_fps_.resize(NodesCount());
+    for (auto &filter_fp : filter_fps_) {
+      filter_fp.resize(NodesCount(), nullptr);
+    }
+    sample_fps_.resize(NodesCount());
+    for (auto &sample_fp : sample_fps_) {
+      sample_fp.resize(NodesCount(), nullptr);
+    }
+
+    node_samples_.resize(NodesCount());
+    for (NodeSamples &node_samples : node_samples_) {
+      node_samples.nodes.resize(NodesCount());
+    }
+
+    source_node_index_ = configuration::SourceNodeIndex(logged_configuration);
   }
 }
 
 MultiNodeNoncausalOffsetEstimator::~MultiNodeNoncausalOffsetEstimator() {
+  FlushAllSamples(true);
   if (fp_) {
     fclose(fp_);
     fp_ = NULL;
   }
+  if (filter_fps_.size() != 0) {
+    for (std::vector<FILE *> &filter_fp : filter_fps_) {
+      for (FILE *&fp : filter_fp) {
+        if (fp != nullptr) {
+          fclose(fp);
+        }
+      }
+    }
+  }
+  if (sample_fps_.size() != 0) {
+    for (std::vector<FILE *> &filter_fp : sample_fps_) {
+      for (FILE *&fp : filter_fp) {
+        if (fp != nullptr) {
+          fclose(fp);
+        }
+      }
+    }
+  }
   if (all_done_) {
     size_t node_a_index = 0;
     for (const auto &filters : filters_per_node_) {
@@ -586,6 +632,15 @@
       ++node_a_index;
     }
   }
+
+  // Make sure everything is flushed to disk.
+  if (!node_samples_.empty()) {
+    for (NodeSamples &node : node_samples_) {
+      for (SingleNodeSamples &timestamps : node.nodes) {
+        CHECK (timestamps.messages.empty());
+      }
+    }
+  }
 }
 
 void MultiNodeNoncausalOffsetEstimator::Start(
@@ -599,21 +654,6 @@
 
 void MultiNodeNoncausalOffsetEstimator::Start(
     std::vector<monotonic_clock::time_point> times) {
-  for (std::pair<const std::tuple<const Node *, const Node *>,
-                 message_bridge::NoncausalOffsetEstimator> &filter : filters_) {
-    const Node *const node_a = std::get<0>(filter.first);
-    const size_t node_a_index =
-        configuration::GetNodeIndex(configuration_, node_a);
-    const Node *const node_b = std::get<1>(filter.first);
-    const size_t node_b_index =
-        configuration::GetNodeIndex(configuration_, node_b);
-
-    // TODO(austin): Write everything from this file instead.  That lets us use
-    // the distributed clock for everything very nicely.
-    filter.second.SetFirstFwdTime(times[node_a_index]);
-    filter.second.SetFirstRevTime(times[node_b_index]);
-  }
-
   std::fstream s("/tmp/timestamp_noncausal_starttime.csv", s.trunc | s.out);
   CHECK(s.is_open());
   for (const Node *node : configuration::GetNodes(configuration())) {
@@ -657,16 +697,6 @@
                                                  node_b_index);
     filters_per_node_[node_b_index].emplace_back(x.GetFilter(node_b),
                                                  node_a_index);
-
-    if (FLAGS_timestamps_to_csv) {
-      x.SetFwdCsvFileName(absl::StrCat("/tmp/timestamp_noncausal_",
-                                       node_a->name()->string_view(), "_",
-                                       node_b->name()->string_view()));
-      x.SetRevCsvFileName(absl::StrCat("/tmp/timestamp_noncausal_",
-                                       node_b->name()->string_view(), "_",
-                                       node_a->name()->string_view()));
-    }
-
     return &x;
   } else {
     return &it->second;
@@ -720,12 +750,38 @@
               filter->Sample(node, msg->monotonic_event_time,
                              msg->monotonic_remote_time);
 
+              if (!node_samples_.empty()) {
+                const size_t sending_node_index =
+                    source_node_index_[msg->channel_index];
+                // The message went from node sending_node_index to
+                // node_index.  monotonic_remote_time is the time it was sent,
+                // and monotonic_event_time was the time it was received.
+                node_samples_[node_index]
+                    .nodes[sending_node_index]
+                    .messages.emplace(std::make_pair(
+                        msg->monotonic_event_time, msg->monotonic_remote_time));
+              }
+
               if (msg->monotonic_timestamp_time != BootTimestamp::min_time()) {
                 // TODO(austin): This assumes that this timestamp is only logged
                 // on the node which sent the data.  That is correct for now,
                 // but should be explicitly checked somewhere.
                 filter->ReverseSample(node, msg->monotonic_event_time,
                                       msg->monotonic_timestamp_time);
+
+                if (!node_samples_.empty()) {
+                  const size_t sending_node_index =
+                      source_node_index_[msg->channel_index];
+                  // The timestamp then went back from node node_index to
+                  // sending_node_index.  monotonic_event_time is the time it
+                  // was sent, and monotonic_timestamp_time was the time it was
+                  // received.
+                  node_samples_[sending_node_index]
+                      .nodes[node_index]
+                      .messages.emplace(
+                          std::make_pair(msg->monotonic_timestamp_time,
+                                         msg->monotonic_event_time));
+                }
               }
             }
           });
@@ -1327,6 +1383,8 @@
     return std::nullopt;
   }
 
+
+  std::tuple<logger::BootTimestamp, logger::BootDuration> sample;
   if (first_solution_) {
     first_solution_ = false;
 
@@ -1337,9 +1395,9 @@
         time = BootTimestamp::epoch();
       }
     }
-    next_filter->Consume();
+    sample = *next_filter->Consume();
   } else {
-    next_filter->Consume();
+    sample = *next_filter->Consume();
     // We found a good sample, so consume it.  If it is a duplicate, we still
     // want to consume it.  But, if this is the first time around, we want to
     // re-solve by recursing (once) to pickup the better base.
@@ -1410,6 +1468,35 @@
     }
   }
 
+  if (filter_fps_.size() > 0) {
+    const int node_a_index =
+        configuration::GetNodeIndex(configuration(), next_filter->node_a());
+    const int node_b_index =
+        configuration::GetNodeIndex(configuration(), next_filter->node_b());
+
+    FILE *fp = filter_fps_[node_a_index][node_b_index];
+    if (fp == nullptr) {
+      fp = filter_fps_[node_a_index][node_b_index] = fopen(
+          absl::StrCat("/tmp/timestamp_noncausal_",
+                       next_filter->node_a()->name()->string_view(), "_",
+                       next_filter->node_b()->name()->string_view(), ".csv")
+              .c_str(),
+          "w");
+      fprintf(fp, "time_since_start,sample_ns,filtered_offset\n");
+    }
+
+    fprintf(fp, "%.9f, %.9f, %.9f\n",
+            std::chrono::duration_cast<std::chrono::duration<double>>(
+                last_distributed_.time_since_epoch())
+                .count(),
+            std::chrono::duration_cast<std::chrono::duration<double>>(
+                std::get<0>(sample).time.time_since_epoch())
+                .count(),
+            std::chrono::duration_cast<std::chrono::duration<double>>(
+                std::get<1>(sample).duration)
+                .count());
+  }
+
   if (fp_) {
     fprintf(
         fp_, "%.9f",
@@ -1420,9 +1507,101 @@
     }
     fprintf(fp_, "\n");
   }
-
+  FlushAllSamples(false);
   return std::make_tuple(last_distributed_, last_monotonics_);
 }
 
+void MultiNodeNoncausalOffsetEstimator::FlushAllSamples(bool finish) {
+  size_t node_index = 0;
+  for (NodeSamples &node_samples : node_samples_) {
+    size_t sending_node_index = 0;
+    for (SingleNodeSamples &samples : node_samples.nodes) {
+      if (samples.messages.size() == 0) {
+        ++sending_node_index;
+        continue;
+      }
+
+      FILE *samples_fp = sample_fps_[node_index][sending_node_index];
+      if (samples_fp == nullptr) {
+        samples_fp = sample_fps_[node_index][sending_node_index] =
+            fopen(absl::StrCat("/tmp/timestamp_noncausal_",
+                               logged_configuration()
+                                   ->nodes()
+                                   ->Get(node_index)
+                                   ->name()
+                                   ->string_view(),
+                               "_",
+                               logged_configuration()
+                                   ->nodes()
+                                   ->Get(sending_node_index)
+                                   ->name()
+                                   ->string_view(),
+                               "_samples.csv")
+                      .c_str(),
+                  "w");
+        fprintf(samples_fp,
+                "time_since_start,sample_ns,monotonic,monotonic+offset("
+                "remote)\n");
+      }
+
+      auto times_it = times_.begin();
+      while (!samples.messages.empty() && times_it != times_.end()) {
+        const std::pair<BootTimestamp, BootTimestamp> &message =
+            *samples.messages.begin();
+        auto next = times_it + 1;
+        while (next != times_.end()) {
+          if (std::get<1>(*next)[node_index] < message.first) {
+            times_it = next;
+            next = times_it + 1;
+          } else {
+            break;
+          }
+        }
+
+        distributed_clock::time_point distributed;
+        const distributed_clock::time_point d0 = std::get<0>(*times_it);
+        const BootTimestamp t0 = std::get<1>(*times_it)[node_index];
+        if (next == times_.end()) {
+          if (!finish) {
+            break;
+          }
+          CHECK_EQ(t0.boot, message.first.boot);
+          distributed = message.first.time - t0.time + d0;
+        } else {
+          const distributed_clock::time_point d1 = std::get<0>(*next);
+          const BootTimestamp t1 = std::get<1>(*next)[node_index];
+          if (t0.boot == t1.boot) {
+            distributed = ::aos::message_bridge::ToDistributedClock(
+                d0, d1, t0.time, t1.time, message.first.time);
+          } else if (t0.boot == message.first.boot) {
+            distributed = message.first.time - t0.time + d0;
+          } else if (t1.boot == message.first.boot) {
+            distributed = message.first.time - t1.time + d1;
+          } else {
+            LOG(FATAL) << "Boots don't match";
+          }
+        }
+        fprintf(samples_fp, "%.9f, %.9f, %.9f, %.9f\n",
+                std::chrono::duration_cast<std::chrono::duration<double>>(
+                    distributed.time_since_epoch())
+                    .count(),
+                std::chrono::duration_cast<std::chrono::duration<double>>(
+                    message.second.time - message.first.time)
+                    .count(),
+                std::chrono::duration_cast<std::chrono::duration<double>>(
+                    message.first.time.time_since_epoch())
+                    .count(),
+                std::chrono::duration_cast<std::chrono::duration<double>>(
+                    message.second.time.time_since_epoch())
+                    .count());
+
+        samples.messages.erase(samples.messages.begin());
+      }
+      ++sending_node_index;
+    }
+    ++node_index;
+  }
+}
+
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 1ff44a4..a52db7d 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -6,6 +6,7 @@
 #include <string_view>
 
 #include "Eigen/Dense"
+#include "absl/container/btree_set.h"
 #include "aos/configuration.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/simulated_event_loop.h"
@@ -165,6 +166,7 @@
 
   // Converts a time to the distributed clock for scheduling and cross-node
   // time measurement.
+  // TODO(austin): Need to pass in boot.
   distributed_clock::time_point ToDistributedClock(
       size_t node_index, monotonic_clock::time_point time) override;
 
@@ -176,6 +178,12 @@
   // Called whenever time passes this point and we can forget about it.
   void ObserveTimePassed(distributed_clock::time_point time) override;
 
+  // Queues 1 more timestammp in the interpolation list.  This is public for
+  // timestamp_extractor so it can hammer on the log until everything is queued.
+  std::optional<const std::tuple<distributed_clock::time_point,
+                                 std::vector<logger::BootTimestamp>> *>
+  QueueNextTimestamp();
+
  private:
   // Returns the next timestamp, or nullopt if there isn't one. It is assumed
   // that if there isn't one, there never will be one.
@@ -191,22 +199,22 @@
   void QueueUntil(
       std::function<
           bool(const std::tuple<distributed_clock::time_point,
-                                std::vector<monotonic_clock::time_point>> &)>
+                                std::vector<logger::BootTimestamp>> &)>
           not_done);
 
   // The number of nodes to enforce.
   const size_t node_count_;
 
+ protected:
   // List of timestamps.
   std::deque<std::tuple<distributed_clock::time_point,
-                        std::vector<monotonic_clock::time_point>>>
+                        std::vector<logger::BootTimestamp>>>
       times_;
 
   // If true, we have popped data from times_, so anything before the start is
   // unknown.
   bool have_popped_ = false;
 
- protected:
   // The amount of time to buffer when estimating.  We care so we don't throw
   // data out of our queue too soon.  This time is indicative of how much to
   // buffer everywhere, so let's latch onto it as well until proven that there
@@ -238,6 +246,13 @@
     const std::vector<logger::BootTimestamp> &ta,
     const std::vector<logger::BootTimestamp> &tb);
 
+// Interpolates a monotonic time to a distributed time without loss of
+// precision.  Implements (d1 - d0) / (t1 - t0) * (time - t0) + d0;
+distributed_clock::time_point ToDistributedClock(
+    distributed_clock::time_point d0, distributed_clock::time_point d1,
+    monotonic_clock::time_point t0, monotonic_clock::time_point t1,
+    monotonic_clock::time_point time);
+
 // Class to hold a NoncausalOffsetEstimator per pair of communicating nodes, and
 // to estimate and set the overall time of all nodes.
 //
@@ -316,6 +331,9 @@
   NextSolution(TimestampProblem *problem,
                const std::vector<logger::BootTimestamp> &base_times);
 
+  // Writes all samples to disk.
+  void FlushAllSamples(bool finish);
+
   const Configuration *configuration_;
   const Configuration *logged_configuration_;
 
@@ -349,7 +367,39 @@
   bool first_solution_ = true;
   bool all_done_ = false;
 
+  // Optional file pointers to save the results of the noncausal filter in. This
+  // lives here so we can give each sample a distributed clock.
+  std::vector<std::vector<FILE *>> filter_fps_;
+  // Optional file pointers to save all the samples into.
+  std::vector<std::vector<FILE *>> sample_fps_;
+
   FILE *fp_ = NULL;
+
+  struct SingleNodeSamples {
+    struct CompareTimestamps {
+      bool operator()(
+          const std::pair<logger::BootTimestamp, logger::BootTimestamp> &a,
+          const std::pair<logger::BootTimestamp, logger::BootTimestamp> &b)
+          const {
+        return a.first < b.first;
+      }
+    };
+
+    // Delivered, sent timestamps for each message.
+    absl::btree_set<std::pair<logger::BootTimestamp, logger::BootTimestamp>,
+                    CompareTimestamps>
+        messages;
+  };
+
+  struct NodeSamples {
+    // List of nodes sending.
+    std::vector<SingleNodeSamples> nodes;
+  };
+
+  // List of nodes where data is delivered.
+  std::vector<NodeSamples> node_samples_;
+  // Mapping from channel to the node_index of the source node.
+  std::vector<size_t> source_node_index_;
 };
 
 }  // namespace message_bridge
diff --git a/aos/network/timestamp_filter.cc b/aos/network/timestamp_filter.cc
index e342bc9..59af3a0 100644
--- a/aos/network/timestamp_filter.cc
+++ b/aos/network/timestamp_filter.cc
@@ -35,15 +35,6 @@
           "sample_contribution, time_contribution\n");
 }
 
-void PrintNoncausalTimestampFilterHeader(FILE *fp) {
-  fprintf(fp, "time_since_start,sample_ns,filtered_offset\n");
-}
-
-void PrintNoncausalTimestampFilterSamplesHeader(FILE *fp) {
-  fprintf(fp,
-          "time_since_start,sample_ns,monotonic,monotonic+offset(remote)\n");
-}
-
 void NormalizeTimestamps(monotonic_clock::time_point *ta_base, double *ta) {
   chrono::nanoseconds ta_digits(static_cast<int64_t>(std::floor(*ta)));
   *ta_base += ta_digits;
@@ -472,26 +463,9 @@
   }
 }
 
-NoncausalTimestampFilter::SingleFilter::~SingleFilter() {
-  CHECK_EQ(timestamps_.size(), 0u)
-      << ": Parent didn't pop all timestamps before being destroyed";
-}
+NoncausalTimestampFilter::SingleFilter::~SingleFilter() {}
 
-NoncausalTimestampFilter::~NoncausalTimestampFilter() {
-  for (auto &f : filters_) {
-    while (f.filter.timestamps_size() > 0u) {
-      MaybeWriteTimestamp(f.filter.timestamp(0));
-      f.filter.PopFront();
-    }
-  }
-  if (fp_) {
-    fclose(fp_);
-  }
-
-  if (samples_fp_) {
-    fclose(samples_fp_);
-  }
-}
+NoncausalTimestampFilter::~NoncausalTimestampFilter() {}
 
 std::tuple<monotonic_clock::time_point, chrono::nanoseconds> TrimTuple(
     std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds, bool>
@@ -499,26 +473,6 @@
   return std::make_tuple(std::get<0>(t), std::get<1>(t));
 }
 
-void NoncausalTimestampFilter::FlushSavedSamples() {
-  for (const std::tuple<aos::monotonic_clock::time_point,
-                        std::chrono::nanoseconds> &sample : saved_samples_) {
-    fprintf(samples_fp_, "%.9f, %.9f, %.9f, %.9f\n",
-            chrono::duration_cast<chrono::duration<double>>(
-                std::get<0>(sample) - first_time_)
-                .count(),
-            chrono::duration_cast<chrono::duration<double>>(std::get<1>(sample))
-                .count(),
-            chrono::duration_cast<chrono::duration<double>>(
-                std::get<0>(sample).time_since_epoch())
-                .count(),
-            chrono::duration_cast<chrono::duration<double>>(
-                (std::get<0>(sample) + std::get<1>(sample)).time_since_epoch())
-                .count());
-  }
-  fflush(samples_fp_);
-  saved_samples_.clear();
-}
-
 std::pair<std::tuple<logger::BootTimestamp, logger::BootDuration>,
           std::tuple<logger::BootTimestamp, logger::BootDuration>>
 NoncausalTimestampFilter::FindTimestamps(logger::BootTimestamp ta_base,
@@ -878,14 +832,6 @@
 
 void NoncausalTimestampFilter::Sample(logger::BootTimestamp monotonic_now_all,
                                       logger::BootDuration sample_ns) {
-  if (samples_fp_) {
-    saved_samples_.emplace_back(
-        std::make_pair(monotonic_now_all.time, sample_ns.duration));
-    if (first_time_ != aos::monotonic_clock::min_time) {
-      FlushSavedSamples();
-    }
-  }
-
   filter(monotonic_now_all.boot, sample_ns.boot)
       ->Sample(monotonic_now_all.time, sample_ns.duration);
 }
@@ -1178,7 +1124,6 @@
   // drop it off the list.  Hence the >=
   while (f->timestamps_size() >= 2 &&
          time.time >= std::get<0>(f->timestamp(1))) {
-    MaybeWriteTimestamp(f->timestamp(0));
     f->PopFront();
     removed = true;
   }
@@ -1292,27 +1237,6 @@
   }
 }
 
-void NoncausalTimestampFilter::SetFirstTime(
-    aos::monotonic_clock::time_point time) {
-  first_time_ = time;
-  if (fp_) {
-    fp_ = freopen(NULL, "wb", fp_);
-    PrintNoncausalTimestampFilterHeader(fp_);
-  }
-  if (samples_fp_) {
-    samples_fp_ = freopen(NULL, "wb", samples_fp_);
-    PrintNoncausalTimestampFilterSamplesHeader(samples_fp_);
-    FlushSavedSamples();
-  }
-}
-
-void NoncausalTimestampFilter::SetCsvFileName(std::string_view name) {
-  fp_ = fopen(absl::StrCat(name, ".csv").c_str(), "w");
-  samples_fp_ = fopen(absl::StrCat(name, "_samples.csv").c_str(), "w");
-  PrintNoncausalTimestampFilterHeader(fp_);
-  PrintNoncausalTimestampFilterSamplesHeader(samples_fp_);
-}
-
 void NoncausalTimestampFilter::SingleFilter::PopFront() {
   // If we drop data, we shouldn't add anything before that point.
   frozen_time_ = std::max(frozen_time_, std::get<0>(timestamp(0)));
@@ -1323,24 +1247,6 @@
   }
 }
 
-void NoncausalTimestampFilter::MaybeWriteTimestamp(
-    std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
-        timestamp) {
-  if (fp_ && first_time_ != aos::monotonic_clock::min_time) {
-    fprintf(fp_, "%.9f, %.9f, %.9f\n",
-            std::chrono::duration_cast<std::chrono::duration<double>>(
-                std::get<0>(timestamp) - first_time_)
-                .count(),
-            std::chrono::duration_cast<std::chrono::duration<double>>(
-                std::get<0>(timestamp).time_since_epoch())
-                .count(),
-            std::chrono::duration_cast<std::chrono::duration<double>>(
-                std::get<1>(timestamp))
-                .count());
-    fflush(fp_);
-  }
-}
-
 void NoncausalOffsetEstimator::Sample(
     const Node *node, logger::BootTimestamp node_delivered_time,
     logger::BootTimestamp other_node_sent_time) {
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index bb0b9a1..166e811 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -254,15 +254,8 @@
     node_b_ = other.node_b_;
     other.node_b_ = nullptr;
 
-    saved_samples_ = std::move(other.saved_samples_);
-    fp_ = other.fp_;
-    other.fp_ = nullptr;
-    samples_fp_ = other.samples_fp_;
-    other.samples_fp_ = nullptr;
-
     filters_ = std::move(other.filters_);
     current_filter_ = other.current_filter_;
-    first_time_ = other.first_time_;
     return *this;
   }
   NoncausalTimestampFilter(const NoncausalTimestampFilter &) = delete;
@@ -313,11 +306,6 @@
     }
   }
 
-  // Sets the starting point and filename to log samples to.  These functions
-  // are only used when doing CSV file logging to debug the filter.
-  void SetFirstTime(aos::monotonic_clock::time_point time);
-  void SetCsvFileName(std::string_view name);
-
   // Marks all line segments up until the provided time on the provided node as
   // used.
   void FreezeUntil(logger::BootTimestamp node_monotonic_now,
@@ -601,26 +589,12 @@
   // Removes the oldest timestamp.
   void PopFront();
 
-  // Writes a timestamp to the file if it is reasonable.
-  void MaybeWriteTimestamp(
-      std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>
-          timestamp);
-
   // Writes any saved timestamps to file.
   void FlushSavedSamples();
 
   const Node *node_a_;
   const Node *node_b_;
 
-  // Holds any timestamps from before the start of the log to be flushed when we
-  // know when the log starts.
-  std::vector<
-      std::tuple<aos::monotonic_clock::time_point, std::chrono::nanoseconds>>
-      saved_samples_;
-
-  FILE *fp_ = nullptr;
-  FILE *samples_fp_ = nullptr;
-
   // Returns a debug string with the nodes this filter represents.
   std::string NodeNames() const;
 
@@ -699,8 +673,6 @@
   std::vector<BootFilter> filters_;
 
   size_t current_filter_ = 0;
-
-  aos::monotonic_clock::time_point first_time_ = aos::monotonic_clock::min_time;
 };
 
 // This class holds 2 NoncausalTimestampFilter's and handles averaging the
@@ -744,15 +716,6 @@
   // Returns true if any points were popped.
   bool Pop(const Node *node, logger::BootTimestamp node_monotonic_now);
 
-  void SetFirstFwdTime(monotonic_clock::time_point time) {
-    a_.SetFirstTime(time);
-  }
-  void SetFwdCsvFileName(std::string_view name) { a_.SetCsvFileName(name); }
-  void SetFirstRevTime(monotonic_clock::time_point time) {
-    b_.SetFirstTime(time);
-  }
-  void SetRevCsvFileName(std::string_view name) { b_.SetCsvFileName(name); }
-
  private:
   NoncausalTimestampFilter a_;
   NoncausalTimestampFilter b_;