Pull multi-node timestamp estimation into a class

There is enough code there and enough interactions to justify an
interface.  Put it behind an interface to make it much more clear and
easier to enforce what updates when and with what guarentees.

This should provide no functional change.

Change-Id: Idd2c1849c2641df828c26860c61e238f47d87f9e
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 654c7ce..671fbea 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -15,6 +15,7 @@
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/logging/uuid.h"
 #include "aos/flatbuffer_merge.h"
+#include "aos/network/multinode_timestamp_filter.h"
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/remote_message_schema.h"
 #include "aos/network/team_number.h"
@@ -27,10 +28,7 @@
             "If true, drop any forwarding entries with missing data.  If "
             "false, CHECK.");
 
-DEFINE_bool(timestamps_to_csv, false,
-            "If true, write all the time synchronization information to a set "
-            "of CSV files in /tmp/.  This should only be needed when debugging "
-            "time synchronization.");
+DECLARE_bool(timestamps_to_csv);
 
 DEFINE_bool(skip_order_validation, false,
             "If true, ignore any out of orderness in replay");
@@ -993,9 +991,6 @@
     LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
                   "is destroyed";
   }
-  if (offset_fp_ != nullptr) {
-    fclose(offset_fp_);
-  }
   // Zero out some buffers. It's easy to do use-after-frees on these, so make
   // it more obvious.
   if (remapped_configuration_buffer_) {
@@ -1051,6 +1046,9 @@
 void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
   event_loop_factory_ = event_loop_factory;
   remapped_configuration_ = event_loop_factory_->configuration();
+  filters_ =
+      std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
+          event_loop_factory_);
 
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
@@ -1110,86 +1108,12 @@
            "you sure that the replay config matches the original config?";
   }
 
-  // We need to now seed our per-node time offsets and get everything set up
-  // to run.
-  const size_t num_nodes = nodes_count();
-
-  // It is easiest to solve for per node offsets with a matrix rather than
-  // trying to solve the equations by hand.  So let's get after it.
-  //
-  // Now, build up the map matrix.
-  //
-  // offset_matrix_ = (map_matrix_ + slope_matrix_) * [ta; tb; tc]
-  map_matrix_ = Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
-      filters_.size() + 1, num_nodes);
-  slope_matrix_ =
-      Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>::Zero(
-          filters_.size() + 1, num_nodes);
-
-  offset_matrix_ =
-      Eigen::Matrix<mpq_class, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
-  valid_matrix_ =
-      Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
-  last_valid_matrix_ =
-      Eigen::Matrix<bool, Eigen::Dynamic, 1>::Zero(filters_.size() + 1);
-
-  time_offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
-  time_slope_matrix_ = Eigen::VectorXd::Zero(num_nodes);
-
-  // All times should average out to the distributed clock.
-  for (int i = 0; i < map_matrix_.cols(); ++i) {
-    // 1/num_nodes.
-    map_matrix_(0, i) = mpq_class(1, num_nodes);
-  }
-  valid_matrix_(0) = true;
-
-  {
-    // Now, add the a - b -> sample elements.
-    size_t i = 1;
-    for (std::pair<const std::tuple<const Node *, const Node *>,
-                   std::tuple<message_bridge::NoncausalOffsetEstimator>>
-             &filter : filters_) {
-      const Node *const node_a = std::get<0>(filter.first);
-      const Node *const node_b = std::get<1>(filter.first);
-
-      const size_t node_a_index =
-          configuration::GetNodeIndex(configuration(), node_a);
-      const size_t node_b_index =
-          configuration::GetNodeIndex(configuration(), node_b);
-
-      // -a
-      map_matrix_(i, node_a_index) = mpq_class(-1);
-      // +b
-      map_matrix_(i, node_b_index) = mpq_class(1);
-
-      // -> sample
-      std::get<0>(filter.second)
-          .set_slope_pointer(&slope_matrix_(i, node_a_index));
-      std::get<0>(filter.second).set_offset_pointer(&offset_matrix_(i, 0));
-
-      valid_matrix_(i) = false;
-      std::get<0>(filter.second).set_valid_pointer(&valid_matrix_(i));
-
-      ++i;
-    }
-  }
+  filters_->Initialize(logged_configuration());
 
   for (std::unique_ptr<State> &state : states_) {
     state->SeedSortedMessages();
   }
 
-  // Rank of the map matrix tells you if all the nodes are in communication
-  // with each other, which tells you if the offsets are observable.
-  const size_t connected_nodes =
-      Eigen::FullPivLU<
-          Eigen::Matrix<mpq_class, Eigen::Dynamic, Eigen::Dynamic>>(map_matrix_)
-          .rank();
-
-  // We don't need to support isolated nodes until someone has a real use
-  // case.
-  CHECK_EQ(connected_nodes, num_nodes)
-      << ": There is a node which isn't communicating with the rest.";
-
   // And solve.
   UpdateOffsets();
 
@@ -1265,184 +1189,24 @@
   }
 
   if (FLAGS_timestamps_to_csv) {
-    for (std::pair<const std::tuple<const Node *, const Node *>,
-                   std::tuple<message_bridge::NoncausalOffsetEstimator>>
-             &filter : filters_) {
-      const Node *const node_a = std::get<0>(filter.first);
-      const Node *const node_b = std::get<1>(filter.first);
-
-      std::get<0>(filter.second)
-          .SetFirstFwdTime(event_loop_factory_->GetNodeEventLoopFactory(node_a)
-                               ->monotonic_now());
-      std::get<0>(filter.second)
-          .SetFirstRevTime(event_loop_factory_->GetNodeEventLoopFactory(node_b)
-                               ->monotonic_now());
-    }
+    filters_->Start(event_loop_factory);
   }
 }
 
 void LogReader::UpdateOffsets() {
-  VLOG(2) << "Samples are " << offset_matrix_;
-  VLOG(2) << "Map is " << (map_matrix_ + slope_matrix_);
-  std::tie(time_slope_matrix_, time_offset_matrix_) = SolveOffsets();
-  Eigen::IOFormat HeavyFmt(Eigen::FullPrecision, 0, ", ", ";\n", "[", "]", "[",
-                           "]");
-  VLOG(1) << "First slope " << time_slope_matrix_.transpose().format(HeavyFmt)
-          << " offset " << time_offset_matrix_.transpose().format(HeavyFmt);
-
-  size_t node_index = 0;
-  for (std::unique_ptr<State> &state : states_) {
-    state->SetDistributedOffset(offset(node_index), slope(node_index));
-    VLOG(1) << "Offset for node " << node_index << " "
-            << MaybeNodeName(state->event_loop()->node()) << "is "
-            << aos::distributed_clock::time_point(offset(node_index))
-            << " slope " << std::setprecision(9) << std::fixed
-            << slope(node_index);
-    ++node_index;
-  }
+  filters_->UpdateOffsets();
 
   if (VLOG_IS_ON(1)) {
-    LogFit("Offset is");
-  }
-}
-
-void LogReader::LogFit(std::string_view prefix) {
-  for (std::unique_ptr<State> &state : states_) {
-    VLOG(1) << MaybeNodeName(state->event_loop()->node()) << " now "
-            << state->monotonic_now() << " distributed "
-            << event_loop_factory_->distributed_now();
-  }
-
-  for (std::pair<const std::tuple<const Node *, const Node *>,
-                 std::tuple<message_bridge::NoncausalOffsetEstimator>> &filter :
-       filters_) {
-    message_bridge::NoncausalOffsetEstimator *estimator =
-        &std::get<0>(filter.second);
-
-    if (estimator->a_timestamps().size() == 0 &&
-        estimator->b_timestamps().size() == 0) {
-      continue;
-    }
-
-    if (VLOG_IS_ON(1)) {
-      estimator->LogFit(prefix);
-    }
-
-    const Node *const node_a = std::get<0>(filter.first);
-    const Node *const node_b = std::get<1>(filter.first);
-
-    const size_t node_a_index =
-        configuration::GetNodeIndex(configuration(), node_a);
-    const size_t node_b_index =
-        configuration::GetNodeIndex(configuration(), node_b);
-
-    const double recovered_slope =
-        slope(node_b_index) / slope(node_a_index) - 1.0;
-    const int64_t recovered_offset =
-        offset(node_b_index).count() - offset(node_a_index).count() *
-                                           slope(node_b_index) /
-                                           slope(node_a_index);
-
-    VLOG(1) << "Recovered slope " << std::setprecision(20) << recovered_slope
-            << " (error " << recovered_slope - estimator->fit().slope() << ") "
-            << " offset " << std::setprecision(20) << recovered_offset
-            << " (error "
-            << recovered_offset - estimator->fit().offset().count() << ")";
-
-    const aos::distributed_clock::time_point a0 =
-        states_[node_a_index]->ToDistributedClock(
-            std::get<0>(estimator->a_timestamps()[0]));
-    const aos::distributed_clock::time_point a1 =
-        states_[node_a_index]->ToDistributedClock(
-            std::get<0>(estimator->a_timestamps()[1]));
-
-    VLOG(1) << node_a->name()->string_view() << " timestamps()[0] = "
-            << std::get<0>(estimator->a_timestamps()[0]) << " -> " << a0
-            << " distributed -> " << node_b->name()->string_view() << " "
-            << states_[node_b_index]->FromDistributedClock(a0) << " should be "
-            << aos::monotonic_clock::time_point(
-                   std::chrono::nanoseconds(static_cast<int64_t>(
-                       std::get<0>(estimator->a_timestamps()[0])
-                           .time_since_epoch()
-                           .count() *
-                       (1.0 + estimator->fit().slope()))) +
-                   estimator->fit().offset())
-            << ((a0 <= event_loop_factory_->distributed_now())
-                    ? ""
-                    : " After now, investigate");
-    VLOG(1) << node_a->name()->string_view() << " timestamps()[1] = "
-            << std::get<0>(estimator->a_timestamps()[1]) << " -> " << a1
-            << " distributed -> " << node_b->name()->string_view() << " "
-            << states_[node_b_index]->FromDistributedClock(a1) << " should be "
-            << aos::monotonic_clock::time_point(
-                   std::chrono::nanoseconds(static_cast<int64_t>(
-                       std::get<0>(estimator->a_timestamps()[1])
-                           .time_since_epoch()
-                           .count() *
-                       (1.0 + estimator->fit().slope()))) +
-                   estimator->fit().offset())
-            << ((event_loop_factory_->distributed_now() <= a1)
-                    ? ""
-                    : " Before now, investigate");
-
-    const aos::distributed_clock::time_point b0 =
-        states_[node_b_index]->ToDistributedClock(
-            std::get<0>(estimator->b_timestamps()[0]));
-    const aos::distributed_clock::time_point b1 =
-        states_[node_b_index]->ToDistributedClock(
-            std::get<0>(estimator->b_timestamps()[1]));
-
-    VLOG(1) << node_b->name()->string_view() << " timestamps()[0] = "
-            << std::get<0>(estimator->b_timestamps()[0]) << " -> " << b0
-            << " distributed -> " << node_a->name()->string_view() << " "
-            << states_[node_a_index]->FromDistributedClock(b0)
-            << ((b0 <= event_loop_factory_->distributed_now())
-                    ? ""
-                    : " After now, investigate");
-    VLOG(1) << node_b->name()->string_view() << " timestamps()[1] = "
-            << std::get<0>(estimator->b_timestamps()[1]) << " -> " << b1
-            << " distributed -> " << node_a->name()->string_view() << " "
-            << states_[node_a_index]->FromDistributedClock(b1)
-            << ((event_loop_factory_->distributed_now() <= b1)
-                    ? ""
-                    : " Before now, investigate");
+    filters_->LogFit("Offset is");
   }
 }
 
 message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
     const Node *node_a, const Node *node_b) {
-  CHECK_NE(node_a, node_b);
-  CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
-  CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
-
-  if (node_a > node_b) {
-    return GetFilter(node_b, node_a);
+  if (filters_) {
+    return filters_->GetFilter(node_a, node_b);
   }
-
-  auto tuple = std::make_tuple(node_a, node_b);
-
-  auto it = filters_.find(tuple);
-
-  if (it == filters_.end()) {
-    auto &x =
-        filters_
-            .insert(std::make_pair(
-                tuple, std::make_tuple(message_bridge::NoncausalOffsetEstimator(
-                           node_a, node_b))))
-            .first->second;
-    if (FLAGS_timestamps_to_csv) {
-      std::get<0>(x).SetFwdCsvFileName(absl::StrCat(
-          "/tmp/timestamp_noncausal_", node_a->name()->string_view(), "_",
-          node_b->name()->string_view()));
-      std::get<0>(x).SetRevCsvFileName(absl::StrCat(
-          "/tmp/timestamp_noncausal_", node_b->name()->string_view(), "_",
-          node_a->name()->string_view()));
-    }
-
-    return &std::get<0>(x);
-  } else {
-    return &std::get<0>(it->second);
-  }
+  return nullptr;
 }
 
 void LogReader::Register(EventLoop *event_loop) {
@@ -1519,7 +1283,7 @@
       return;
     }
     if (VLOG_IS_ON(1)) {
-      LogFit("Offset was");
+      filters_->LogFit("Offset was");
     }
 
     bool update_time;
@@ -1601,31 +1365,6 @@
                        timestamped_message.monotonic_remote_time)
                 << ") " << state->DebugString();
           }
-
-          if (FLAGS_timestamps_to_csv) {
-            if (offset_fp_ == nullptr) {
-              offset_fp_ = fopen("/tmp/offsets.csv", "w");
-              fprintf(
-                  offset_fp_,
-                  "# time_since_start, offset node 0, offset node 1, ...\n");
-              first_time_ = timestamped_message.realtime_event_time;
-            }
-
-            fprintf(offset_fp_, "%.9f",
-                    std::chrono::duration_cast<std::chrono::duration<double>>(
-                        timestamped_message.realtime_event_time - first_time_)
-                        .count());
-            for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
-              fprintf(offset_fp_, ", %.9f",
-                      time_offset_matrix_(i, 0) +
-                          time_slope_matrix_(i, 0) *
-                              chrono::duration<double>(
-                                  event_loop_factory_->distributed_now()
-                                      .time_since_epoch())
-                                  .count());
-            }
-            fprintf(offset_fp_, "\n");
-          }
         }
 
         // If we have access to the factory, use it to fix the realtime time.
@@ -1681,6 +1420,8 @@
     // which involves time before changing it.  That especially includes
     // sending the message.
     if (update_time) {
+      filters_->LogFit("");
+
       VLOG(1) << MaybeNodeName(state->event_loop()->node())
               << "updating offsets";