Fix log sorting for good

Unfortunately, this is really hard to split up, so a bunch of stuff
happens at once.

When a message is sent from node A -> node B, add support for sending
that timestamp back from node B to node A for logging.

    {
      "name": "/pi1/aos",
      "type": "aos.message_bridge.Timestamp",
      "source_node": "pi1",
      "frequency": 10,
      "max_size": 200,
      "destination_nodes": [
        {
          "name": "pi2",
          "priority": 1,
          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
          "timestamp_logger_nodes": ["pi1"]
        }
      ]
    },

This gives us a way to log enough information on node A such that
everything is self contained.  We know all the messages we sent to B,
and when they got there, so we can recreate the time offset and replay
the node.

This data is then published over
   { "name": "/aos/remote_timestamps/pi2", "type": ".aos.logger.MessageHeader"}

The logger then treats that channel specially and log the contents
directly as though the message contents were received on the remote
node.

This (among other things) exposes log sorting problems.  Use our fancy
new infinite precision noncausal filter to estimate time precise enough
to actually order events.  This gets us down to 2-3 ns of error due to
integer precision.

Change-Id: Ia843c5176a2c4efc227e669c07d7bb4c7cbe7c91
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index a123dcc..e599091 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -4,18 +4,22 @@
 #include <chrono>
 #include <deque>
 #include <string_view>
+#include <tuple>
 #include <vector>
 
 #include "Eigen/Dense"
 #include "absl/strings/str_cat.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/eigen_mpq.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/simulated_event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/timestamp_filter.h"
 #include "aos/time/time.h"
 #include "flatbuffers/flatbuffers.h"
+#include "third_party/gmp/gmpxx.h"
 
 namespace aos {
 namespace logger {
@@ -25,11 +29,18 @@
   LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
   virtual ~LogNamer() {}
 
-  virtual void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
-                           const Node *node) = 0;
+  virtual void WriteHeader(
+      const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
+      const Node *node) = 0;
   virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
 
   virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
+  virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
+      const Channel *channel, const Node *node) = 0;
+  virtual void Rotate(
+      const Node *node,
+      const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
+          &header) = 0;
   const std::vector<const Node *> &nodes() const { return nodes_; }
 
   const Node *node() const { return node_; }
@@ -41,21 +52,28 @@
 
 class LocalLogNamer : public LogNamer {
  public:
-  LocalLogNamer(DetachedBufferWriter *writer, const Node *node)
-      : LogNamer(node), writer_(writer) {}
+  LocalLogNamer(std::string_view base_name, const Node *node)
+      : LogNamer(node), base_name_(base_name), data_writer_(OpenDataWriter()) {}
 
-  ~LocalLogNamer() override { writer_->Flush(); }
-
-  void WriteHeader(flatbuffers::FlatBufferBuilder *fbb,
-                   const Node *node) override {
+  void WriteHeader(
+      const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
+      const Node *node) override {
     CHECK_EQ(node, this->node());
-    writer_->WriteSizedFlatbuffer(
-        absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
+    data_writer_->WriteSizedFlatbuffer(header.full_span());
   }
 
   DetachedBufferWriter *MakeWriter(const Channel *channel) override {
     CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
-    return writer_;
+    return data_writer_.get();
+  }
+
+  void Rotate(const Node *node,
+              const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
+                  &header) override {
+    CHECK(node == this->node());
+    ++part_number_;
+    *data_writer_ = std::move(*OpenDataWriter());
+    data_writer_->WriteSizedFlatbuffer(header.full_span());
   }
 
   DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
@@ -65,11 +83,23 @@
     CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
                                                               node_))
         << ": Delivery times aren't logged for this channel on this node.";
-    return writer_;
+    return data_writer_.get();
+  }
+
+  DetachedBufferWriter *MakeForwardedTimestampWriter(
+      const Channel * /*channel*/, const Node * /*node*/) override {
+    LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
+    return nullptr;
   }
 
  private:
-  DetachedBufferWriter *writer_;
+  std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
+    return std::make_unique<DetachedBufferWriter>(
+        absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+  }
+  const std::string base_name_;
+  size_t part_number_ = 0;
+  std::unique_ptr<DetachedBufferWriter> data_writer_;
 };
 
 // TODO(austin): Split naming files from making files so we can re-use the
@@ -81,29 +111,20 @@
       : LogNamer(node),
         base_name_(base_name),
         configuration_(configuration),
-        data_writer_(std::make_unique<DetachedBufferWriter>(absl::StrCat(
-            base_name_, "_", node->name()->string_view(), "_data.bfbs"))) {}
+        data_writer_(OpenDataWriter()) {}
 
   // Writes the header to all log files for a specific node.  This function
   // needs to be called after all the writers are created.
-  void WriteHeader(flatbuffers::FlatBufferBuilder *fbb, const Node *node) {
-    if (node == this->node()) {
-      data_writer_->WriteSizedFlatbuffer(
-          absl::Span<const uint8_t>(fbb->GetBufferPointer(), fbb->GetSize()));
-    } else {
-      for (std::pair<const Channel *const,
-                     std::unique_ptr<DetachedBufferWriter>> &data_writer :
-           data_writers_) {
-        if (configuration::ChannelIsSendableOnNode(data_writer.first, node)) {
-          data_writer.second->WriteSizedFlatbuffer(absl::Span<const uint8_t>(
-              fbb->GetBufferPointer(), fbb->GetSize()));
-        }
-      }
-    }
-  }
+  void WriteHeader(
+      const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &header,
+      const Node *node) override;
+
+  void Rotate(const Node *node,
+              const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>
+                  &header) override;
 
   // Makes a data logger for a specific channel.
-  DetachedBufferWriter *MakeWriter(const Channel *channel) {
+  DetachedBufferWriter *MakeWriter(const Channel *channel) override {
     // See if we can read the data on this node at all.
     const bool is_readable =
         configuration::ChannelIsReadableOnNode(channel, this->node());
@@ -123,36 +144,62 @@
     // generated if it is sendable on this node.
     if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
       return data_writer_.get();
-    } else {
-      // Ok, we have data that is being forwarded to us that we are supposed to
-      // log.  It needs to be logged with send timestamps, but be sorted enough
-      // to be able to be processed.
-      CHECK(data_writers_.find(channel) == data_writers_.end());
-
-      // Track that this node is being logged.
-      if (configuration::MultiNode(configuration_)) {
-        const Node *source_node = configuration::GetNode(
-            configuration_, channel->source_node()->string_view());
-        if (std::find(nodes_.begin(), nodes_.end(), source_node) ==
-            nodes_.end()) {
-          nodes_.emplace_back(source_node);
-        }
-      }
-
-      return data_writers_
-          .insert(std::make_pair(
-              channel,
-              std::make_unique<DetachedBufferWriter>(absl::StrCat(
-                  base_name_, "_", channel->source_node()->string_view(),
-                  "_data", channel->name()->string_view(), "/",
-                  channel->type()->string_view(), ".bfbs"))))
-          .first->second.get();
     }
+
+    // Ok, we have data that is being forwarded to us that we are supposed to
+    // log.  It needs to be logged with send timestamps, but be sorted enough
+    // to be able to be processed.
+    CHECK(data_writers_.find(channel) == data_writers_.end());
+
+    // Track that this node is being logged.
+    const Node *source_node = configuration::GetNode(
+        configuration_, channel->source_node()->string_view());
+
+    if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
+      nodes_.emplace_back(source_node);
+    }
+
+    DataWriter data_writer;
+    data_writer.node = source_node;
+    data_writer.rotate = [this](const Channel *channel,
+                                DataWriter *data_writer) {
+      OpenWriter(channel, data_writer);
+    };
+    data_writer.rotate(channel, &data_writer);
+
+    return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+        .first->second.writer.get();
+  }
+
+  DetachedBufferWriter *MakeForwardedTimestampWriter(
+      const Channel *channel, const Node *node) override {
+    // See if we can read the data on this node at all.
+    const bool is_readable =
+        configuration::ChannelIsReadableOnNode(channel, this->node());
+    CHECK(is_readable) << ": "
+                       << configuration::CleanedChannelToString(channel);
+
+    CHECK(data_writers_.find(channel) == data_writers_.end());
+
+    if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
+      nodes_.emplace_back(node);
+    }
+
+    DataWriter data_writer;
+    data_writer.node = node;
+    data_writer.rotate = [this](const Channel *channel,
+                                DataWriter *data_writer) {
+      OpenForwardedTimestampWriter(channel, data_writer);
+    };
+    data_writer.rotate(channel, &data_writer);
+
+    return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+        .first->second.writer.get();
   }
 
   // Makes a timestamp (or timestamp and data) logger for a channel and
   // forwarding connection.
-  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) {
+  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
     const bool log_delivery_times =
         (this->node() == nullptr)
             ? false
@@ -168,40 +215,93 @@
   const std::vector<const Node *> &nodes() const { return nodes_; }
 
  private:
+  // Files to write remote data to.  We want one per channel.  Maps the channel
+  // to the writer, Node, and part number.
+  struct DataWriter {
+    std::unique_ptr<DetachedBufferWriter> writer = nullptr;
+    const Node *node;
+    size_t part_number = 0;
+    std::function<void(const Channel *, DataWriter *)> rotate;
+  };
+
+  void OpenForwardedTimestampWriter(const Channel *channel,
+                                    DataWriter *data_writer) {
+    std::string filename =
+        absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
+                     "/", channel->type()->string_view(), ".part",
+                     data_writer->part_number, ".bfbs");
+
+    if (!data_writer->writer) {
+      data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+    } else {
+      *data_writer->writer = DetachedBufferWriter(filename);
+    }
+  }
+
+  void OpenWriter(const Channel *channel, DataWriter *data_writer) {
+    const std::string filename = absl::StrCat(
+        base_name_, "_", channel->source_node()->string_view(), "_data",
+        channel->name()->string_view(), "/", channel->type()->string_view(),
+        ".part", data_writer->part_number, ".bfbs");
+    if (!data_writer->writer) {
+      data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+    } else {
+      *data_writer->writer = DetachedBufferWriter(filename);
+    }
+  }
+
+  std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
+    return std::make_unique<DetachedBufferWriter>(
+        absl::StrCat(base_name_, "_", node()->name()->string_view(),
+                     "_data.part", part_number_, ".bfbs"));
+  }
+
   const std::string base_name_;
   const Configuration *const configuration_;
 
+  size_t part_number_ = 0;
+
   // File to write both delivery timestamps and local data to.
   std::unique_ptr<DetachedBufferWriter> data_writer_;
-  // Files to write remote data to.  We want one per channel.
-  std::map<const Channel *, std::unique_ptr<DetachedBufferWriter>>
-      data_writers_;
-};
 
+  std::map<const Channel *, DataWriter> data_writers_;
+};
 
 // Logs all channels available in the event loop to disk every 100 ms.
 // Start by logging one message per channel to capture any state and
 // configuration that is sent rately on a channel and would affect execution.
 class Logger {
  public:
-  Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
+  Logger(std::string_view base_name, EventLoop *event_loop,
          std::chrono::milliseconds polling_period =
              std::chrono::milliseconds(100));
   Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
          std::chrono::milliseconds polling_period =
              std::chrono::milliseconds(100));
 
-  // Rotates the log file with the new writer.  This writes out the header
-  // again, but keeps going as if nothing else happened.
-  void Rotate(DetachedBufferWriter *writer);
-  void Rotate(std::unique_ptr<LogNamer> log_namer);
+  // Rotates the log file(s), triggering new part files to be written for each
+  // log file.
+  void Rotate();
 
  private:
   void WriteHeader();
-  void WriteHeader(const Node *node);
+  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+      const Node *node);
+
+  bool MaybeUpdateTimestamp(
+      const Node *node, int node_index,
+      aos::monotonic_clock::time_point monotonic_start_time,
+      aos::realtime_clock::time_point realtime_start_time);
 
   void DoLogData();
 
+  void WriteMissingTimestamps();
+
+  void StartLogging();
+
+  // Fetches from each channel until all the data is logged.
+  void LogUntil(monotonic_clock::time_point t);
+
   EventLoop *event_loop_;
   std::unique_ptr<LogNamer> log_namer_;
 
@@ -219,6 +319,10 @@
 
     DetachedBufferWriter *writer = nullptr;
     DetachedBufferWriter *timestamp_writer = nullptr;
+    DetachedBufferWriter *contents_writer = nullptr;
+    const Node *writer_node = nullptr;
+    const Node *timestamp_node = nullptr;
+    int node_index = 0;
   };
 
   std::vector<FetcherStruct> fetchers_;
@@ -236,6 +340,25 @@
   // Max size that the header has consumed.  This much extra data will be
   // reserved in the builder to avoid reallocating.
   size_t max_header_size_ = 0;
+
+  // Fetcher for all the statistics from all the nodes.
+  aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
+  // Sets the start time for a specific node.
+  void SetStartTime(size_t node_index,
+                    aos::monotonic_clock::time_point monotonic_start_time,
+                    aos::realtime_clock::time_point realtime_start_time);
+
+  struct NodeState {
+    aos::monotonic_clock::time_point monotonic_start_time =
+        aos::monotonic_clock::min_time;
+    aos::realtime_clock::time_point realtime_start_time =
+        aos::realtime_clock::min_time;
+
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
+        aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
+  };
+  std::vector<NodeState> node_state_;
 };
 
 // We end up with one of the following 3 log file types.
@@ -365,13 +488,25 @@
   // channels from calls to RemapLoggedChannel.
   void MakeRemappedConfig();
 
+  // Returns the number of nodes.
+  size_t nodes_count() const {
+    return !configuration::MultiNode(logged_configuration())
+               ? 1u
+               : logged_configuration()->nodes()->size();
+  }
+
   const std::vector<std::vector<std::string>> filenames_;
 
   // This is *a* log file header used to provide the logged config.  The rest of
   // the header is likely distracting.
   FlatbufferVector<LogFileHeader> log_file_header_;
 
-  Eigen::Matrix<double, Eigen::Dynamic, 1> SolveOffsets();
+  // Returns [ta; tb; ...] = tuple[0] * t + tuple[1]
+  std::tuple<Eigen::Matrix<double, Eigen::Dynamic, 1>,
+             Eigen::Matrix<double, Eigen::Dynamic, 1>>
+  SolveOffsets();
+
+  void LogFit(std::string_view prefix);
 
   // State per node.
   class State {
@@ -392,13 +527,6 @@
     // OldestMessageTime.
     void SeedSortedMessages();
 
-    // Updates the timestamp filter with the timestamp.  Returns true if the
-    // provided timestamp was actually a forwarding timestamp and used, and
-    // false otherwise.
-    bool MaybeUpdateTimestamp(
-        const TimestampMerger::DeliveryTimestamp &channel_timestamp,
-        int channel_index);
-
     // Returns the starting time for this node.
     monotonic_clock::time_point monotonic_start_time() const {
       return channel_merger_->monotonic_start_time();
@@ -416,13 +544,6 @@
     void set_event_loop(EventLoop *event_loop) { event_loop_ = event_loop; }
     EventLoop *event_loop() { return event_loop_; }
 
-    // Returns the oldest timestamp for the provided channel.  This should only
-    // be called before SeedSortedMessages();
-    TimestampMerger::DeliveryTimestamp OldestTimestampForChannel(
-        size_t channel) {
-      return channel_merger_->OldestTimestampForChannel(channel);
-    }
-
     // Sets the current realtime offset from the monotonic clock for this node
     // (if we are on a simulated event loop).
     void SetRealtimeOffset(monotonic_clock::time_point monotonic_time,
@@ -440,6 +561,11 @@
       return node_event_loop_factory_->ToDistributedClock(time);
     }
 
+    monotonic_clock::time_point FromDistributedClock(
+        distributed_clock::time_point time) {
+      return node_event_loop_factory_->FromDistributedClock(time);
+    }
+
     // Sets the offset (and slope) from the distributed clock.
     void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
                               double distributed_slope) {
@@ -453,6 +579,20 @@
       return channel_target_event_loop_factory_[channel_index]->monotonic_now();
     }
 
+    distributed_clock::time_point RemoteToDistributedClock(
+        size_t channel_index, monotonic_clock::time_point time) {
+      return channel_target_event_loop_factory_[channel_index]
+          ->ToDistributedClock(time);
+    }
+
+    const Node *remote_node(size_t channel_index) {
+      return channel_target_event_loop_factory_[channel_index]->node();
+    }
+
+    monotonic_clock::time_point monotonic_now() {
+      return node_event_loop_factory_->monotonic_now();
+    }
+
     // Sets the node we will be merging as, and returns true if there is any
     // data on it.
     bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
@@ -461,10 +601,9 @@
     void SetChannelCount(size_t count);
 
     // Sets the sender, filter, and target factory for a channel.
-    void SetChannel(
-        size_t channel, std::unique_ptr<RawSender> sender,
-        std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
-        NodeEventLoopFactory *channel_target_event_loop_factory);
+    void SetChannel(size_t channel, std::unique_ptr<RawSender> sender,
+                    message_bridge::NoncausalOffsetEstimator *filter,
+                    NodeEventLoopFactory *channel_target_event_loop_factory);
 
     // Returns if we have read all the messages from all the logs.
     bool at_end() const { return channel_merger_->at_end(); }
@@ -493,14 +632,32 @@
     }
 
     // Returns a debug string for the channel merger.
-    std::string DebugString() const { return channel_merger_->DebugString(); }
+    std::string DebugString() const {
+      std::stringstream messages;
+      size_t i = 0;
+      for (const auto &message : sorted_messages_) {
+        if (i < 7 || i + 7 > sorted_messages_.size()) {
+          messages << "sorted_messages[" << i
+                   << "]: " << std::get<0>(message).monotonic_event_time << " "
+                   << configuration::StrippedChannelToString(
+                          event_loop_->configuration()->channels()->Get(
+                              std::get<2>(message).message().channel_index()))
+                   << "\n";
+        } else if (i == 7) {
+          messages << "...\n";
+        }
+        ++i;
+      }
+      return messages.str() + channel_merger_->DebugString();
+    }
 
    private:
     // Log file.
     std::unique_ptr<ChannelMerger> channel_merger_;
 
     std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
-                          FlatbufferVector<MessageHeader>>>
+                          FlatbufferVector<MessageHeader>,
+                          message_bridge::NoncausalOffsetEstimator *>>
         sorted_messages_;
 
     // Senders.
@@ -518,8 +675,7 @@
     // This corresponds to the object which is shared among all the channels
     // going between 2 nodes.  The second element in the tuple indicates if this
     // is the primary direction or not.
-    std::vector<std::tuple<message_bridge::ClippedAverageFilter *, bool>>
-        filters_;
+    std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
 
     // List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
     // channel) which correspond to the originating node.
@@ -532,8 +688,8 @@
   // Creates the requested filter if it doesn't exist, regardless of whether
   // these nodes can actually communicate directly.  The second return value
   // reports if this is the primary direction or not.
-  std::tuple<message_bridge::ClippedAverageFilter *, bool> GetFilter(
-      const Node *node_a, const Node *node_b);
+  message_bridge::NoncausalOffsetEstimator *GetFilter(const Node *node_a,
+                                                      const Node *node_b);
 
   // FILE to write offsets to (if populated).
   FILE *offset_fp_ = nullptr;
@@ -544,32 +700,79 @@
   // List of filters for a connection.  The pointer to the first node will be
   // less than the second node.
   std::map<std::tuple<const Node *, const Node *>,
-           message_bridge::ClippedAverageFilter>
+           std::tuple<message_bridge::NoncausalOffsetEstimator>>
       filters_;
 
   // Returns the offset from the monotonic clock for a node to the distributed
-  // clock.  distributed = monotonic + offset;
-  std::chrono::nanoseconds offset(int node_index) const {
-    CHECK_LT(node_index, offset_matrix_.rows())
+  // clock.  monotonic = distributed * slope() + offset();
+  double slope(int node_index) const {
+    CHECK_LT(node_index, time_slope_matrix_.rows())
         << ": Got too high of a node index.";
-    return -std::chrono::duration_cast<std::chrono::nanoseconds>(
-               std::chrono::duration<double>(offset_matrix_(node_index))) -
-           base_offset_matrix_(node_index);
+    return time_slope_matrix_(node_index);
+  }
+  std::chrono::nanoseconds offset(int node_index) const {
+    CHECK_LT(node_index, time_offset_matrix_.rows())
+        << ": Got too high of a node index.";
+    return std::chrono::duration_cast<std::chrono::nanoseconds>(
+        std::chrono::duration<double>(time_offset_matrix_(node_index)));
   }
 
   // Updates the offset matrix solution and sets the per-node distributed
   // offsets in the factory.
   void UpdateOffsets();
 
-  // sample_matrix_ = map_matrix_ * offset_matrix_
-  Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
-  Eigen::Matrix<double, Eigen::Dynamic, 1> sample_matrix_;
-  Eigen::Matrix<double, Eigen::Dynamic, 1> offset_matrix_;
+  // We have 2 types of equations to do a least squares regression over to fully
+  // constrain our time function.
+  //
+  // One is simple.  The distributed clock is the average of all the clocks.
+  //   (ta + tb + tc + td) / num_nodex = t_distributed
+  //
+  // The second is a bit more complicated.  Our basic time conversion function
+  // is:
+  //   tb = ta + (ta * slope + offset)
+  // We can rewrite this as follows
+  //   tb - (1 + slope) * ta = offset
+  //
+  // From here, we have enough equations to solve for t{a,b,c,...}  We want to
+  // take as an input the offsets and slope, and solve for the per-node times as
+  // a function of the distributed clock.
+  //
+  // We need to massage our equations to make this work.  If we solve for the
+  // per-node times at two set distributed clock times, we will be able to
+  // recreate the linear function (we know it is linear).  We can do a similar
+  // thing by breaking our equation up into:
+  // 
+  // [1/3  1/3  1/3  ] [ta]   [t_distributed]
+  // [ 1  -1-m1  0   ] [tb] = [oab]
+  // [ 1    0  -1-m2 ] [tc]   [oac]
+  //
+  // This solves to:
+  //
+  // [ta]   [ a00 a01 a02]   [t_distributed]
+  // [tb] = [ a10 a11 a12] * [oab]
+  // [tc]   [ a20 a21 a22]   [oac]
+  //
+  // and can be split into:
+  //
+  // [ta]   [ a00 ]                   [a01 a02]
+  // [tb] = [ a10 ] * t_distributed + [a11 a12] * [oab]
+  // [tc]   [ a20 ]                   [a21 a22]   [oac]
+  //
+  // (map_matrix_ + slope_matrix_) * [ta; tb; tc] = [offset_matrix_];
+  // offset_matrix_ will be in nanoseconds.
+  Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> map_matrix_;
+  Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic> slope_matrix_;
+  Eigen::Matrix<mpq_class, Eigen::Dynamic, 1> offset_matrix_;
+  // Matrix tracking which offsets are valid.
+  Eigen::Matrix<bool, Eigen::Dynamic, 1> valid_matrix_;
+  // Matrix tracking the last valid matrix we used to determine connected nodes.
+  Eigen::Matrix<bool, Eigen::Dynamic, 1> last_valid_matrix_;
+  size_t cached_valid_node_count_ = 0;
 
-  // Base offsets.  The actual offset is the sum of this and the offset matrix.
-  // This removes some of the dynamic range challenges from the double above.
-  Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>
-      base_offset_matrix_;
+  // [ta; tb; tc] = time_slope_matrix_ * t + time_offset_matrix;
+  // t is in seconds.
+  Eigen::Matrix<double, Eigen::Dynamic, 1> time_slope_matrix_;
+  Eigen::Matrix<double, Eigen::Dynamic, 1> time_offset_matrix_;
 
   std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
       remapped_configuration_buffer_;