Sort messages between nodes properly

We used to assume the realtime clocks were in sync.  This isn't
realistic.  Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.

Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled.  They can't be converted to the
distributed_clock once.  They need to now be converted every time they
are compared between nodes.

Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index c7f64ae..57c2c2c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -7,6 +7,7 @@
 #include <sys/uio.h>
 #include <vector>
 
+#include "Eigen/Dense"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/logger_generated.h"
@@ -19,6 +20,11 @@
             "If true, drop any forwarding entries with missing data.  If "
             "false, CHECK.");
 
+DEFINE_bool(timestamps_to_csv, false,
+            "If true, write all the time synchronization information to a set "
+            "of CSV files in /tmp/.  This should only be needed when debugging "
+            "time synchronization.");
+
 namespace aos {
 namespace logger {
 
@@ -109,12 +115,12 @@
 // TODO(austin): Set the remote start time to the first time we see a remote
 // message when we are logging those messages separate?  Need to signal what to
 // do, or how to get a good timestamp.
-
 void Logger::WriteHeader() {
   for (const Node *node : log_namer_->nodes()) {
     WriteHeader(node);
   }
 }
+
 void Logger::WriteHeader(const Node *node) {
   // Now write the header with this timestamp in it.
   flatbuffers::FlatBufferBuilder fbb;
@@ -294,14 +300,20 @@
   MakeRemappedConfig();
 
   if (!configuration::MultiNode(configuration())) {
-    auto it = channel_mergers_.insert(std::make_pair(nullptr, State{}));
-    State *state = &(it.first->second);
+    states_.emplace_back(std::make_unique<State>());
+    State *state = states_[0].get();
 
     state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+  } else {
+    states_.resize(configuration()->nodes()->size());
   }
 }
 
-LogReader::~LogReader() { Deregister(); }
+LogReader::~LogReader() { Deregister();
+  if (offset_fp_ != nullptr) {
+    fclose(offset_fp_);
+  }
+}
 
 const Configuration *LogReader::logged_configuration() const {
   return log_file_header_.message().configuration();
@@ -324,17 +336,19 @@
 }
 
 monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
-  auto it = channel_mergers_.find(node);
-  CHECK(it != channel_mergers_.end())
-      << ": Unknown node " << FlatbufferToJson(node);
-  return it->second.channel_merger->monotonic_start_time();
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), node)].get();
+  CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+  return state->channel_merger->monotonic_start_time();
 }
 
 realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
-  auto it = channel_mergers_.find(node);
-  CHECK(it != channel_mergers_.end())
-      << ": Unknown node " << FlatbufferToJson(node);
-  return it->second.channel_merger->realtime_start_time();
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), node)].get();
+  CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+  return state->channel_merger->realtime_start_time();
 }
 
 void LogReader::Register() {
@@ -347,9 +361,10 @@
   event_loop_factory_ = event_loop_factory;
 
   for (const Node *node : configuration::GetNodes(configuration())) {
-    auto it = channel_mergers_.insert(std::make_pair(node, State{}));
-
-    State *state = &(it.first->second);
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    states_[node_index] = std::make_unique<State>();
+    State *state = states_[node_index].get();
 
     state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
 
@@ -361,47 +376,169 @@
     Register(state->event_loop_unique_ptr.get());
   }
 
-  // Basic idea is that we want to
-  //   1) Find the node which booted first.
-  //   2) Setup the clocks so that each clock is at the time it would be at when
-  //      the first node booted.
+  // We need to now seed our per-node time offsets and get everything set up to
+  // run.
+  const size_t num_nodes = !configuration::MultiNode(logged_configuration())
+                               ? 1u
+                               : logged_configuration()->nodes()->size();
 
-  realtime_clock::time_point earliest_boot_time = realtime_clock::max_time;
-  for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
-    State *state = &(state_pair.second);
+  // It is easiest to solve for per node offsets with a matrix rather than
+  // trying to solve the equations by hand.  So let's get after it.
+  //
+  // Now, build up the map matrix.
+  //
+  // sample_matrix_ = map_matrix_ * offset_matrix_
+  map_matrix_ = Eigen::MatrixXd::Zero(filters_.size() + 1, num_nodes);
 
-    const realtime_clock::time_point boot_time =
-        state->channel_merger->realtime_start_time() -
-        state->channel_merger->monotonic_start_time().time_since_epoch();
+  sample_matrix_ = Eigen::VectorXd::Zero(filters_.size() + 1);
+  offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
 
-    if (boot_time < earliest_boot_time) {
-      earliest_boot_time = boot_time;
+  // And the base offset matrix, which will be a copy of the initial offset
+  // matrix.
+  base_offset_matrix_ =
+      Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>::Zero(
+          num_nodes);
+
+  // All offsets should sum to 0.  Add that as the first constraint in our least
+  // squares.
+  map_matrix_.row(0).setOnes();
+
+  {
+    // Now, add the a - b -> sample elements.
+    size_t i = 1;
+    for (std::pair<const std::tuple<const Node *, const Node *>,
+                   message_bridge::ClippedAverageFilter> &filter : filters_) {
+      const Node *const node_a = std::get<0>(filter.first);
+      const Node *const node_b = std::get<1>(filter.first);
+
+      const size_t node_a_index =
+          configuration::GetNodeIndex(configuration(), node_a);
+      const size_t node_b_index =
+          configuration::GetNodeIndex(configuration(), node_b);
+
+      // +a
+      map_matrix_(i, node_a_index) = 1.0;
+      // -b
+      map_matrix_(i, node_b_index) = -1.0;
+
+      // -> sample
+      filter.second.set_sample_pointer(&sample_matrix_(i, 0));
+
+      ++i;
     }
   }
 
+  // Rank of the map matrix tells you if all the nodes are in communication with
+  // each other, which tells you if the offsets are observable.
+  const size_t connected_nodes =
+      Eigen::FullPivLU<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>>(
+          map_matrix_)
+          .rank();
+
+  // We don't need to support isolated nodes until someone has a real use case.
+  CHECK_EQ(connected_nodes, num_nodes)
+      << ": There is a node which isn't communicating with the rest.";
+
+  // Now, iterate through all the timestamps from all the nodes and seed
+  // everything.
+  for (std::unique_ptr<State> &state : states_) {
+    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+      TimestampMerger::DeliveryTimestamp timestamp =
+          state->channel_merger->OldestTimestampForChannel(i);
+      if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
+        CHECK(state->MaybeUpdateTimestamp(timestamp, i));
+      }
+    }
+  }
+
+  // Make sure all the samples have been seeded.
+  for (int i = 1; i < sample_matrix_.cols(); ++i) {
+    // The seeding logic is pretty basic right now because we don't have great
+    // use cases yet.  It wants to see data from every node.  Blow up for now,
+    // and once we have a reason to do something different, update this logic.
+    // Maybe read further in the log file?  Or seed off the realtime time?
+    CHECK_NE(sample_matrix_(i, 0), 0.0)
+        << ": Sample " << i << " is not seeded.";
+  }
+
+  // And solve.
+  offset_matrix_ = SolveOffsets();
+
+  // Save off the base offsets so we can work in deltas from here out.  That
+  // will significantly simplify the numerical precision problems.
+  for (size_t i = 0; i < num_nodes; ++i) {
+    base_offset_matrix_(i, 0) =
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            std::chrono::duration<double>(offset_matrix_(i, 0)));
+  }
+
+  {
+    // Shift everything so we never could (reasonably) require the distributed
+    // clock to have a large backwards jump in time.  This makes it so the boot
+    // time on the node up the longest will essentially start matching the
+    // distributed clock.
+    const chrono::nanoseconds offset = -base_offset_matrix_.maxCoeff();
+    for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
+      base_offset_matrix_(i, 0) += offset;
+    }
+  }
+
+  {
+    // Re-compute the samples and setup all the filters so that they
+    // subtract this base offset.
+
+    size_t i = 1;
+    for (std::pair<const std::tuple<const Node *, const Node *>,
+                   message_bridge::ClippedAverageFilter> &filter : filters_) {
+      CHECK(filter.second.sample_pointer() == &sample_matrix_(i, 0));
+
+      const Node *const node_a = std::get<0>(filter.first);
+      const Node *const node_b = std::get<1>(filter.first);
+
+      const size_t node_a_index =
+          configuration::GetNodeIndex(configuration(), node_a);
+      const size_t node_b_index =
+          configuration::GetNodeIndex(configuration(), node_b);
+
+      filter.second.set_base_offset(base_offset_matrix_(node_a_index) -
+                                    base_offset_matrix_(node_b_index));
+
+      ++i;
+    }
+  }
+
+  // Now, iterate again through all the offsets now that we have set the base
+  // offset to something sane.  This will seed everything with an accurate
+  // initial offset.
+  for (std::unique_ptr<State> &state : states_) {
+    for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+      TimestampMerger::DeliveryTimestamp timestamp =
+          state->channel_merger->OldestTimestampForChannel(i);
+      if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
+        CHECK(state->MaybeUpdateTimestamp(timestamp, i));
+      }
+    }
+  }
+
+  UpdateOffsets();
+
   // We want to start the log file at the last start time of the log files from
   // all the nodes.  Compute how long each node's simulation needs to run to
   // move time to this point.
-  monotonic_clock::duration run_time = monotonic_clock::duration(0);
+  distributed_clock::time_point start_time = distributed_clock::min_time;
 
-  for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
-    State *state = &(state_pair.second);
-
-    const realtime_clock::time_point boot_time =
-        state->channel_merger->realtime_start_time() -
-        state->channel_merger->monotonic_start_time().time_since_epoch();
-
-    // And start each node's clocks so the realtime clocks line up for the start
-    // times.  This will let us start using it, but isn't good enough.
-    state->node_event_loop_factory->SetMonotonicNow(
-        monotonic_clock::time_point(earliest_boot_time - boot_time));
+  for (std::unique_ptr<State> &state : states_) {
+    // Setup the realtime clock to have something sane in it now.
     state->node_event_loop_factory->SetRealtimeOffset(
         state->channel_merger->monotonic_start_time(),
         state->channel_merger->realtime_start_time());
-    run_time =
-        std::max(run_time, state->channel_merger->monotonic_start_time() -
-                               state->node_event_loop_factory->monotonic_now());
+    // And start computing the start time on the distributed clock now that that
+    // works.
+    start_time = std::max(start_time,
+                          state->node_event_loop_factory->ToDistributedClock(
+                              state->channel_merger->monotonic_start_time()));
   }
+  CHECK_GE(start_time, distributed_clock::epoch());
 
   // Forwarding is tracked per channel.  If it is enabled, we want to turn it
   // off.  Otherwise messages replayed will get forwarded across to the other
@@ -413,9 +550,8 @@
       const Node *node = configuration::GetNode(
           configuration(), channel->source_node()->string_view());
 
-      auto state_pair = channel_mergers_.find(node);
-      CHECK(state_pair != channel_mergers_.end());
-      State *state = &(state_pair->second);
+      State *state =
+          states_[configuration::GetNodeIndex(configuration(), node)].get();
 
       const Channel *remapped_channel =
           RemapChannel(state->event_loop, channel);
@@ -428,16 +564,90 @@
   // to timestamps on log files where the timestamp log file starts before the
   // data.  In this case, it is reasonable to expect missing data.
   ignore_missing_data_ = true;
-  event_loop_factory_->RunFor(run_time);
+  event_loop_factory_->RunFor(start_time.time_since_epoch());
   // Now that we are running for real, missing data means that the log file is
   // corrupted or went wrong.
   ignore_missing_data_ = false;
 }
 
+void LogReader::UpdateOffsets() {
+  // TODO(austin): Evaluate less accurate inverses.  We might be able to
+  // do some tricks to keep the accuracy up.
+  offset_matrix_ = SolveOffsets();
+
+  size_t node_index = 0;
+  for (std::unique_ptr<State> &state : states_) {
+    state->node_event_loop_factory->SetDistributedOffset(offset(node_index));
+    ++node_index;
+  }
+}
+
+std::tuple<message_bridge::ClippedAverageFilter *, bool> LogReader::GetFilter(
+    const Node *node_a, const Node *node_b) {
+  CHECK_NE(node_a, node_b);
+  CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
+  CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
+
+  if (node_a > node_b) {
+    return std::make_pair(std::get<0>(GetFilter(node_b, node_a)), false);
+  }
+
+  auto tuple = std::make_tuple(node_a, node_b);
+
+  auto it = filters_.find(tuple);
+
+  if (it == filters_.end()) {
+    auto &x = filters_
+                  .insert(std::make_pair(
+                      tuple, message_bridge::ClippedAverageFilter()))
+                  .first->second;
+    if (FLAGS_timestamps_to_csv) {
+      std::string fwd_name =
+          absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_",
+                       node_b->name()->string_view(), ".csv");
+      x.fwd_fp = fopen(fwd_name.c_str(), "w");
+      std::string rev_name =
+          absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_",
+                       node_a->name()->string_view(), ".csv");
+      x.rev_fp = fopen(rev_name.c_str(), "w");
+    }
+
+    return std::make_tuple(&x, true);
+  } else {
+    return std::make_tuple(&(it->second), true);
+  }
+}
+
+bool LogReader::State::MaybeUpdateTimestamp(
+    const TimestampMerger::DeliveryTimestamp &channel_timestamp,
+    int channel_index) {
+  if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
+    return false;
+  }
+
+  // Got a forwarding timestamp!
+  CHECK(std::get<0>(filters[channel_index]) != nullptr);
+
+  // Call the correct method depending on if we are the forward or reverse
+  // direction here.
+  if (std::get<1>(filters[channel_index])) {
+    std::get<0>(filters[channel_index])
+        ->FwdSample(channel_timestamp.monotonic_event_time,
+                    channel_timestamp.monotonic_event_time -
+                        channel_timestamp.monotonic_remote_time);
+  } else {
+    std::get<0>(filters[channel_index])
+        ->RevSample(channel_timestamp.monotonic_event_time,
+                    channel_timestamp.monotonic_event_time -
+                        channel_timestamp.monotonic_remote_time);
+  }
+  return true;
+}
+
 void LogReader::Register(EventLoop *event_loop) {
-  auto state_pair = channel_mergers_.find(event_loop->node());
-  CHECK(state_pair != channel_mergers_.end());
-  State *state = &(state_pair->second);
+  State *state =
+      states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
+          .get();
 
   state->event_loop = event_loop;
 
@@ -450,12 +660,29 @@
   state->channel_merger->SetNode(event_loop->node());
 
   state->channels.resize(logged_configuration()->channels()->size());
+  state->filters.resize(state->channels.size());
+
+  state->channel_target_event_loop_factory.resize(state->channels.size());
 
   for (size_t i = 0; i < state->channels.size(); ++i) {
     const Channel *channel =
         RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
 
     state->channels[i] = event_loop->MakeRawSender(channel);
+
+    state->filters[i] = std::make_tuple(nullptr, false);
+
+    if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
+        configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
+      const Node *target_node = configuration::GetNode(
+          event_loop->configuration(), channel->source_node()->string_view());
+      state->filters[i] = GetFilter(event_loop->node(), target_node);
+
+      if (event_loop_factory_ != nullptr) {
+        state->channel_target_event_loop_factory[i] =
+            event_loop_factory_->GetNodeEventLoopFactory(target_node);
+      }
+    }
   }
 
   state->timer_handler = event_loop->AddTimer([this, state]() {
@@ -466,6 +693,7 @@
       }
       return;
     }
+    bool update_offsets = false;
     TimestampMerger::DeliveryTimestamp channel_timestamp;
     int channel_index;
     FlatbufferVector<MessageHeader> channel_data =
@@ -477,20 +705,60 @@
     const monotonic_clock::time_point monotonic_now =
         state->event_loop->context().monotonic_event_time;
     CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
-        << ": Now " << monotonic_now.time_since_epoch().count()
-        << " trying to send "
-        << channel_timestamp.monotonic_event_time.time_since_epoch().count();
+        << ": " << FlatbufferToJson(state->event_loop->node()) << " Now "
+        << monotonic_now << " trying to send "
+        << channel_timestamp.monotonic_event_time << " failure "
+        << state->channel_merger->DebugString();
 
     if (channel_timestamp.monotonic_event_time >
             state->channel_merger->monotonic_start_time() ||
         event_loop_factory_ != nullptr) {
-      if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries) ||
+      if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
+           !state->channel_merger->at_end()) ||
           channel_data.message().data() != nullptr) {
         CHECK(channel_data.message().data() != nullptr)
             << ": Got a message without data.  Forwarding entry which was "
                "not matched?  Use --skip_missing_forwarding_entries to ignore "
                "this.";
 
+        if (state->MaybeUpdateTimestamp(channel_timestamp, channel_index)) {
+          // Confirm that the message was sent on the sending node before the
+          // destination node (this node).  As a proxy, do this by making sure
+          // that time on the source node is past when the message was sent.
+          CHECK_LT(channel_timestamp.monotonic_remote_time,
+                   state->channel_target_event_loop_factory[channel_index]
+                       ->monotonic_now());
+
+          update_offsets = true;
+
+          if (FLAGS_timestamps_to_csv) {
+            if (offset_fp_ == nullptr) {
+              offset_fp_ = fopen("/tmp/offsets.csv", "w");
+              fprintf(
+                  offset_fp_,
+                  "# time_since_start, offset node 0, offset node 1, ...\n");
+              first_time_ = channel_timestamp.realtime_event_time;
+            }
+
+            fprintf(offset_fp_, "%.9f",
+                    std::chrono::duration_cast<std::chrono::duration<double>>(
+                        channel_timestamp.realtime_event_time - first_time_)
+                        .count());
+            for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
+              fprintf(
+                  offset_fp_, ", %.9f",
+                  offset_matrix_(i, 0) +
+                      std::chrono::duration_cast<std::chrono::duration<double>>(
+                          base_offset_matrix_(i, 0))
+                          .count());
+            }
+            fprintf(offset_fp_, "\n");
+          }
+
+        } else {
+          CHECK(std::get<0>(state->filters[channel_index]) == nullptr);
+        }
+
         // If we have access to the factory, use it to fix the realtime time.
         if (state->node_event_loop_factory != nullptr) {
           state->node_event_loop_factory->SetRealtimeOffset(
@@ -504,7 +772,16 @@
             channel_timestamp.monotonic_remote_time,
             channel_timestamp.realtime_remote_time,
             channel_timestamp.remote_queue_index);
+      } else if (state->channel_merger->at_end()) {
+        // We are at the end of the log file and found missing data.  Finish
+        // reading the rest of the log file and call it quits.  We don't want to
+        // replay partial data.
+        while (state->channel_merger->OldestMessage() !=
+               monotonic_clock::max_time) {
+          state->channel_merger->PopOldest();
+        }
       }
+
     } else {
       LOG(WARNING)
           << "Not sending data from before the start of the log file. "
@@ -526,6 +803,13 @@
                                     std::chrono::nanoseconds(1));
       }
     }
+
+    // Once we make this call, the current time changes.  So do everything which
+    // involves time before changing it.  That especially includes sending the
+    // message.
+    if (update_offsets) {
+      UpdateOffsets();
+    }
   });
 
   ++live_nodes_;
@@ -540,10 +824,7 @@
 void LogReader::Deregister() {
   // Make sure that things get destroyed in the correct order, rather than
   // relying on getting the order correct in the class definition.
-  for (const Node *node : Nodes()) {
-    auto state_pair = channel_mergers_.find(node);
-    CHECK(state_pair != channel_mergers_.end());
-    State *state = &(state_pair->second);
+  for (std::unique_ptr<State> &state : states_) {
     for (size_t i = 0; i < state->channels.size(); ++i) {
       state->channels[i].reset();
     }
@@ -578,8 +859,8 @@
 }
 
 void LogReader::MakeRemappedConfig() {
-  for (std::pair<const Node *const, State> &state : channel_mergers_) {
-    CHECK(!state.second.event_loop)
+  for (std::unique_ptr<State> &state : states_) {
+    CHECK(!state->event_loop)
         << ": Can't change the mapping after the events are scheduled.";
   }