Use the new solver to compute time

Now that all the infrastructure exists, hook it up.  Track which nodes
are connected, if there are any orphaned nodes, and everything else the
old code used to do.

This doesn't yet handle single directions going and coming.

Change-Id: I658347797384f7608870d231a3ebbb2c05dad1dc
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 5d21ae4..0d8fe0b 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -307,6 +307,7 @@
         "//aos/events:ping_lib",
         "//aos/events:pong_lib",
         "//aos/events:simulated_event_loop",
+        "//aos/network:testing_time_converter",
         "//aos/testing:googletest",
         "//aos/testing:tmpdir",
     ],
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 00322e1..2fe5473 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -766,8 +766,8 @@
   // Only set it if this node delivers to the peer timestamp_mapper.  Otherwise
   // we could needlessly save data.
   if (node_data->any_delivered) {
-    LOG(INFO) << "Registering on node " << node() << " for peer node "
-              << timestamp_mapper->node();
+    VLOG(1) << "Registering on node " << node() << " for peer node "
+            << timestamp_mapper->node();
     CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
 
     timestamp_mapper->nodes_data_[node()].peer = this;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index d4f4abd..2e0f5db 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1101,7 +1101,8 @@
   remapped_configuration_ = event_loop_factory_->configuration();
   filters_ =
       std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
-          event_loop_factory_);
+          event_loop_factory_, logged_configuration(),
+          FLAGS_skip_order_validation);
 
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
@@ -1130,6 +1131,7 @@
 
     state->SetChannelCount(logged_configuration()->channels()->size());
   }
+  event_loop_factory_->SetTimeConverter(filters_.get());
 
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
@@ -1161,15 +1163,12 @@
            "you sure that the replay config matches the original config?";
   }
 
-  filters_->Initialize(logged_configuration());
+  filters_->CheckGraph();
 
   for (std::unique_ptr<State> &state : states_) {
     state->SeedSortedMessages();
   }
 
-  // And solve.
-  UpdateOffsets();
-
   // We want to start the log file at the last start time of the log files
   // from all the nodes.  Compute how long each node's simulation needs to run
   // to move time to this point.
@@ -1191,6 +1190,9 @@
         start_time, state->ToDistributedClock(state->monotonic_start_time()));
   }
 
+  // TODO(austin): If a node doesn't have a start time, we might not queue
+  // enough.  If this happens, we'll explode with a frozen error eventually.
+
   CHECK_GE(start_time, distributed_clock::epoch())
       << ": Hmm, we have a node starting before the start of time.  Offset "
          "everything.";
@@ -1255,14 +1257,6 @@
   }
 }
 
-void LogReader::UpdateOffsets() {
-  filters_->UpdateOffsets();
-
-  if (VLOG_IS_ON(1)) {
-    filters_->LogFit("Offset is");
-  }
-}
-
 message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
     const Node *node_a, const Node *node_b) {
   if (filters_) {
@@ -1344,9 +1338,6 @@
       }
       return;
     }
-    if (VLOG_IS_ON(1)) {
-      filters_->LogFit("Offset was");
-    }
 
     bool update_time;
     TimestampedMessage timestamped_message = state->PopOldest(&update_time);
@@ -1388,8 +1379,13 @@
           // Confirm that the message was sent on the sending node before the
           // destination node (this node).  As a proxy, do this by making sure
           // that time on the source node is past when the message was sent.
+          //
+          // TODO(austin): <= means that the cause message (which we know) could
+          // happen after the effect even though we know they are at the same
+          // time.  I doubt anyone will notice for a bit, but we should really
+          // fix that.
           if (!FLAGS_skip_order_validation) {
-            CHECK_LT(
+            CHECK_LE(
                 timestamped_message.monotonic_remote_time,
                 state->monotonic_remote_now(timestamped_message.channel_index))
                 << state->event_loop()->node()->name()->string_view() << " to "
@@ -1401,7 +1397,7 @@
                        logged_configuration()->channels()->Get(
                            timestamped_message.channel_index))
                 << " " << state->DebugString();
-          } else if (timestamped_message.monotonic_remote_time >=
+          } else if (timestamped_message.monotonic_remote_time >
                      state->monotonic_remote_now(
                          timestamped_message.channel_index)) {
             LOG(WARNING)
@@ -1492,7 +1488,6 @@
                        return state->monotonic_now();
                      });
 
-      UpdateOffsets();
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
               << state->monotonic_now();
 
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e3f5380..4213066 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -741,10 +741,6 @@
   // less than the second node.
   std::unique_ptr<message_bridge::MultiNodeNoncausalOffsetEstimator> filters_;
 
-  // Updates the offset matrix solution and sets the per-node distributed
-  // offsets in the factory.
-  void UpdateOffsets();
-
   std::unique_ptr<FlatbufferDetachedBuffer<Configuration>>
       remapped_configuration_buffer_;
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 05bafe4..16fb2ab 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -7,6 +7,7 @@
 #include "aos/events/pong_lib.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/remote_message_generated.h"
+#include "aos/network/testing_time_converter.h"
 #include "aos/network/timestamp_generated.h"
 #include "aos/testing/tmpdir.h"
 #include "aos/util/file.h"
@@ -427,11 +428,16 @@
   MultinodeLoggerTest()
       : config_(aos::configuration::ReadConfig(
             "aos/events/logging/multinode_pingpong_config.json")),
+        time_converter_(configuration::NodesCount(&config_.message())),
         event_loop_factory_(&config_.message()),
         pi1_(
             configuration::GetNode(event_loop_factory_.configuration(), "pi1")),
+        pi1_index_(configuration::GetNodeIndex(
+            event_loop_factory_.configuration(), pi1_)),
         pi2_(
             configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
+        pi2_index_(configuration::GetNodeIndex(
+            event_loop_factory_.configuration(), pi2_)),
         tmp_dir_(aos::testing::TestTmpDir()),
         logfile_base1_(tmp_dir_ + "/multi_logfile1"),
         logfile_base2_(tmp_dir_ + "/multi_logfile2"),
@@ -474,6 +480,8 @@
         ping_(ping_event_loop_.get()),
         pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
         pong_(pong_event_loop_.get()) {
+    event_loop_factory_.SetTimeConverter(&time_converter_);
+
     // Go through and remove the logfiles if they already exist.
     for (const auto file : logfiles_) {
       unlink(file.c_str());
@@ -637,10 +645,13 @@
 
   // Config and factory.
   aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+  message_bridge::TestingTimeConverter time_converter_;
   SimulatedEventLoopFactory event_loop_factory_;
 
-  const Node *pi1_;
-  const Node *pi2_;
+  const Node *const pi1_;
+  const size_t pi1_index_;
+  const Node *const pi2_;
+  const size_t pi2_index_;
 
   std::string tmp_dir_;
   std::string logfile_base1_;
@@ -723,6 +734,7 @@
 
 // Tests that we can write and read simple multi-node log files.
 TEST_F(MultinodeLoggerTest, SimpleMultiNode) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1054,6 +1066,7 @@
 // Test that if we feed the replay with a mismatched node list that we die on
 // the LogReader constructor.
 TEST_F(MultinodeLoggerDeathTest, MultiNodeBadReplayConfig) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1086,6 +1099,7 @@
 // Tests that we can read log files where they don't start at the same monotonic
 // time.
 TEST_F(MultinodeLoggerTest, StaggeredStart) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1187,58 +1201,78 @@
 // match correctly.  While we are here, also test that different ending times
 // also is readable.
 TEST_F(MultinodeLoggerTest, MismatchedClocks) {
+  // TODO(austin): Negate...
+  const chrono::nanoseconds initial_pi2_offset = chrono::seconds(1000);
+
+  time_converter_.AddMonotonic({monotonic_clock::epoch(),
+                                monotonic_clock::epoch() + initial_pi2_offset});
+  // Wait for 95 ms, (~0.1 seconds - 1/2 of the ping/pong period), and set the
+  // skew to be 200 uS/s
+  const chrono::nanoseconds startup_sleep1 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(95),
+       chrono::milliseconds(95) - chrono::nanoseconds(200) * 95});
+  // Run another 200 ms to have one logger start first.
+  const chrono::nanoseconds startup_sleep2 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(200), chrono::milliseconds(200)});
+  // Slew one way then the other at the same 200 uS/S slew rate.  Make sure we
+  // go far enough to cause problems if this isn't accounted for.
+  const chrono::nanoseconds logger_run1 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(20000),
+       chrono::milliseconds(20000) - chrono::nanoseconds(200) * 20000});
+  const chrono::nanoseconds logger_run2 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(40000),
+       chrono::milliseconds(40000) + chrono::nanoseconds(200) * 40000});
+  const chrono::nanoseconds logger_run3 = time_converter_.AddMonotonic(
+      {chrono::milliseconds(400), chrono::milliseconds(400)});
+
   {
     LoggerState pi2_logger = MakeLogger(pi2_);
 
+    NodeEventLoopFactory *pi1 =
+        event_loop_factory_.GetNodeEventLoopFactory(pi1_);
     NodeEventLoopFactory *pi2 =
         event_loop_factory_.GetNodeEventLoopFactory(pi2_);
     LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
               << pi2->realtime_now() << " distributed "
               << pi2->ToDistributedClock(pi2->monotonic_now());
 
-    const chrono::nanoseconds initial_pi2_offset = -chrono::seconds(1000);
-    chrono::nanoseconds pi2_offset = initial_pi2_offset;
-
-    pi2->SetDistributedOffset(-pi2_offset, 1.0);
     LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
               << pi2->realtime_now() << " distributed "
               << pi2->ToDistributedClock(pi2->monotonic_now());
 
-    for (int i = 0; i < 95; ++i) {
-      pi2_offset += chrono::nanoseconds(200);
-      pi2->SetDistributedOffset(-pi2_offset, 1.0);
-      event_loop_factory_.RunFor(chrono::milliseconds(1));
-    }
+    event_loop_factory_.RunFor(startup_sleep1);
 
     StartLogger(&pi2_logger);
 
-    event_loop_factory_.RunFor(chrono::milliseconds(200));
+    event_loop_factory_.RunFor(startup_sleep2);
 
     {
       // Run pi1's logger for only part of the time.
       LoggerState pi1_logger = MakeLogger(pi1_);
 
       StartLogger(&pi1_logger);
+      event_loop_factory_.RunFor(logger_run1);
 
-      for (int i = 0; i < 20000; ++i) {
-        pi2_offset += chrono::nanoseconds(200);
-        pi2->SetDistributedOffset(-pi2_offset, 1.0);
-        event_loop_factory_.RunFor(chrono::milliseconds(1));
-      }
+      // Make sure we slewed time far enough so that the difference is greater
+      // than the network delay.  This confirms that if we sort incorrectly, it
+      // would show in the results.
+      EXPECT_LT(
+          (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+          -event_loop_factory_.send_delay() -
+              event_loop_factory_.network_delay());
 
-      EXPECT_GT(pi2_offset - initial_pi2_offset,
-                event_loop_factory_.send_delay() +
-                    event_loop_factory_.network_delay());
+      event_loop_factory_.RunFor(logger_run2);
 
-      for (int i = 0; i < 40000; ++i) {
-        pi2_offset -= chrono::nanoseconds(200);
-        pi2->SetDistributedOffset(-pi2_offset, 1.0);
-        event_loop_factory_.RunFor(chrono::milliseconds(1));
-      }
+      // And now check that we went far enough the other way to make sure we
+      // cover both problems.
+      EXPECT_GT(
+          (pi2->monotonic_now() - pi1->monotonic_now()) - initial_pi2_offset,
+          event_loop_factory_.send_delay() +
+              event_loop_factory_.network_delay());
     }
 
     // And log a bit more on pi2.
-    event_loop_factory_.RunFor(chrono::milliseconds(400));
+    event_loop_factory_.RunFor(logger_run3);
   }
 
   LogReader reader(SortParts(logfiles_));
@@ -1341,6 +1375,7 @@
 
 // Tests that we can sort a bunch of parts into the pre-determined sorted parts.
 TEST_F(MultinodeLoggerTest, SortParts) {
+  time_converter_.StartEqual();
   // Make a bunch of parts.
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
@@ -1361,6 +1396,7 @@
 // Tests that we can sort a bunch of parts with an empty part.  We should ignore
 // it and remove it from the sorted list.
 TEST_F(MultinodeLoggerTest, SortEmptyParts) {
+  time_converter_.StartEqual();
   // Make a bunch of parts.
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
@@ -1388,6 +1424,7 @@
 // Tests that we can sort a bunch of parts with an empty .xz file in there.  The
 // empty file should be ignored.
 TEST_F(MultinodeLoggerTest, SortEmptyCompressedParts) {
+  time_converter_.StartEqual();
   // Make a bunch of parts.
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
@@ -1416,6 +1453,7 @@
 // Tests that we can sort a bunch of parts with the end missing off a compressed
 // file.  We should use the part we can read.
 TEST_F(MultinodeLoggerTest, SortTruncatedCompressedParts) {
+  time_converter_.StartEqual();
   // Make a bunch of parts.
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
@@ -1447,6 +1485,7 @@
 
 // Tests that if we remap a remapped channel, it shows up correctly.
 TEST_F(MultinodeLoggerTest, RemapLoggedChannel) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1513,6 +1552,7 @@
 // This should be enough that we can then re-run the logger and get a valid log
 // back.
 TEST_F(MultinodeLoggerTest, MessageHeader) {
+  time_converter_.StartEqual();
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
@@ -1741,15 +1781,13 @@
 // Tests that we properly populate and extract the logger_start time by setting
 // up a clock difference between 2 nodes and looking at the resulting parts.
 TEST_F(MultinodeLoggerTest, LoggerStartTime) {
+  time_converter_.AddMonotonic(
+      {monotonic_clock::epoch(),
+       monotonic_clock::epoch() + chrono::seconds(1000)});
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
     LoggerState pi2_logger = MakeLogger(pi2_);
 
-    NodeEventLoopFactory *pi2 =
-        event_loop_factory_.GetNodeEventLoopFactory(pi2_);
-
-    pi2->SetDistributedOffset(chrono::seconds(1000), 1.0);
-
     StartLogger(&pi1_logger);
     StartLogger(&pi2_logger);
 
@@ -1785,6 +1823,7 @@
 // This should be enough that we can then re-run the logger and get a valid log
 // back.
 TEST_F(MultinodeLoggerDeathTest, RemoteReboot) {
+  time_converter_.StartEqual();
   std::string pi2_boot1;
   std::string pi2_boot2;
   {
@@ -1828,8 +1867,13 @@
 // unavailable.
 TEST_F(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
   event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
-  event_loop_factory_.GetNodeEventLoopFactory(pi2_)->SetDistributedOffset(
-      chrono::seconds(1000), 0.99999);
+  time_converter_.AddMonotonic(
+      {monotonic_clock::epoch(),
+       monotonic_clock::epoch() + chrono::seconds(1000)});
+
+  time_converter_.AddMonotonic(
+      {chrono::milliseconds(10000),
+       chrono::milliseconds(10000) - chrono::milliseconds(1)});
   {
     LoggerState pi1_logger = MakeLogger(pi1_);
 
@@ -1849,8 +1893,36 @@
 // unavailable.
 TEST_F(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
   event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
-  event_loop_factory_.GetNodeEventLoopFactory(pi2_)->SetDistributedOffset(
-      chrono::seconds(500), 1.00001);
+  time_converter_.AddMonotonic(
+      {monotonic_clock::epoch(),
+       monotonic_clock::epoch() + chrono::seconds(500)});
+
+  time_converter_.AddMonotonic(
+      {chrono::milliseconds(10000),
+       chrono::milliseconds(10000) + chrono::milliseconds(1)});
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+  }
+
+  // Confirm that we can parse the result.  LogReader has enough internal CHECKs
+  // to confirm the right thing happened.
+  ConfirmReadable(pi1_single_direction_logfiles_);
+}
+
+// Tests that we properly handle a dead node.  Do this by just disconnecting it
+// and only using one nodes of logs.
+TEST_F(MultinodeLoggerTest, DeadNode) {
+  event_loop_factory_.GetNodeEventLoopFactory(pi1_)->Disconnect(pi2_);
+  event_loop_factory_.GetNodeEventLoopFactory(pi2_)->Disconnect(pi1_);
+  time_converter_.AddMonotonic(
+      {monotonic_clock::epoch(),
+       monotonic_clock::epoch() + chrono::seconds(1000)});
   {
     LoggerState pi1_logger = MakeLogger(pi1_);