Add timestamp_extractor to simplify timestamp extraction

This is a simpler program which has fewer moving parts.
Sometimes, --skip_order_validation with log_cat isn't aggressive enough
to make the log readable because it is that busted.  timestamp_extractor
is good enough to parse those logs.

Change-Id: I8e5250cd149ef2f1b86384d61064d49da41f42c2
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 827aa99..b88478d 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -208,7 +208,6 @@
         "//aos:configuration",
         "//aos:init",
         "//aos:json_to_flatbuffer",
-        "//aos/events:shm_event_loop",
         "//aos/events:simulated_event_loop",
         "@com_github_gflags_gflags//:gflags",
         "@com_github_google_glog//:glog",
@@ -216,6 +215,24 @@
 )
 
 cc_binary(
+    name = "timestamp_extractor",
+    srcs = [
+        "timestamp_extractor.cc",
+    ],
+    target_compatible_with = ["@platforms//os:linux"],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":logfile_utils",
+        "//aos:configuration",
+        "//aos:init",
+        "//aos/events:simulated_event_loop",
+        "//aos/network:multinode_timestamp_filter",
+        "@com_github_gflags_gflags//:gflags",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_binary(
     name = "log_edit",
     srcs = [
         "log_edit.cc",
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index a8975d2..493f09a 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1111,7 +1111,7 @@
   remapped_configuration_ = event_loop_factory_->configuration();
   filters_ =
       std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
-          event_loop_factory_, logged_configuration(),
+          event_loop_factory_->configuration(), logged_configuration(),
           FLAGS_skip_order_validation,
           chrono::duration_cast<chrono::nanoseconds>(
               chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
@@ -1264,15 +1264,6 @@
 
   if (FLAGS_timestamps_to_csv) {
     filters_->Start(event_loop_factory);
-    std::fstream s("/tmp/timestamp_noncausal_starttime.csv", s.trunc | s.out);
-    CHECK(s.is_open());
-    for (std::unique_ptr<State> &state : states_) {
-      s << state->event_loop()->node()->name()->string_view() << ", "
-        << std::setprecision(12) << std::fixed
-        << chrono::duration<double>(state->monotonic_now().time_since_epoch())
-               .count()
-        << "\n";
-    }
   }
 }
 
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
new file mode 100644
index 0000000..5ec7025
--- /dev/null
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -0,0 +1,177 @@
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include "aos/events/logging/logfile_sorting.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/init.h"
+#include "aos/network/multinode_timestamp_filter.h"
+#include "gflags/gflags.h"
+
+DECLARE_bool(timestamps_to_csv);
+DEFINE_bool(skip_order_validation, false,
+            "If true, ignore any out of orderness in replay");
+
+namespace aos::logger {
+
+namespace chrono = std::chrono;
+
+std::string LogFileVectorToString(std::vector<logger::LogFile> log_files) {
+  std::stringstream ss;
+  for (const auto f : log_files) {
+    ss << f << "\n";
+  }
+  return ss.str();
+}
+
+int Main(int argc, char **argv) {
+  const std::vector<std::string> unsorted_logfiles = FindLogs(argc, argv);
+  const std::vector<LogFile> log_files = SortParts(unsorted_logfiles);
+
+  CHECK_GT(log_files.size(), 0u);
+  // Validate that we have the same config everwhere.  This will be true if
+  // all the parts were sorted together and the configs match.
+  const Configuration *config = nullptr;
+  for (const LogFile &log_file : log_files) {
+    if (config == nullptr) {
+      config = log_file.config.get();
+    } else {
+      CHECK_EQ(config, log_file.config.get());
+    }
+  }
+
+  CHECK(configuration::MultiNode(config))
+      << ": Timestamps only make sense in a multi-node world.";
+
+  // Now, build up all the TimestampMapper classes to read and sort the data.
+  std::vector<std::unique_ptr<TimestampMapper>> mappers;
+
+  for (const Node *node : configuration::GetNodes(config)) {
+    std::vector<LogParts> filtered_parts =
+        FilterPartsForNode(log_files, node->name()->string_view());
+
+    // Confirm that all the parts are from the same boot if there are enough
+    // parts to not be from the same boot.
+    if (!filtered_parts.empty()) {
+      for (size_t i = 1; i < filtered_parts.size(); ++i) {
+        CHECK_EQ(filtered_parts[i].source_boot_uuid,
+                 filtered_parts[0].source_boot_uuid)
+            << ": Found parts from different boots "
+            << LogFileVectorToString(log_files);
+      }
+
+      // Filter the parts relevant to each node when building the mapper.
+      mappers.emplace_back(
+          std::make_unique<TimestampMapper>(std::move(filtered_parts)));
+    } else {
+      mappers.emplace_back(nullptr);
+    }
+  }
+
+  // Now, build up the estimator used to solve for time.
+  message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
+      config, config, FLAGS_skip_order_validation, chrono::seconds(0));
+
+  // To make things more like the logger and faster, cache the node + channel ->
+  // filter mapping in a set of vectors.
+  std::vector<std::vector<message_bridge::NoncausalOffsetEstimator *>> filters;
+  filters.resize(configuration::NodesCount(config));
+
+  for (const Node *node : configuration::GetNodes(config)) {
+    const size_t node_index = configuration::GetNodeIndex(config, node);
+    filters[node_index].resize(config->channels()->size(), nullptr);
+    for (size_t channel_index = 0; channel_index < config->channels()->size();
+         ++channel_index) {
+      const Channel *channel = config->channels()->Get(channel_index);
+
+      if (!configuration::ChannelIsSendableOnNode(channel, node) &&
+          configuration::ChannelIsReadableOnNode(channel, node)) {
+        // We've got a message which is being forwarded to this node.
+        const Node *source_node = configuration::GetNode(
+            config, channel->source_node()->string_view());
+        filters[node_index][channel_index] =
+            multinode_estimator.GetFilter(node, source_node);
+      }
+    }
+  }
+
+  // Now, read all the timestamps for each node.  This is simpler than the
+  // logger on purpose.  It loads in *all* the timestamps in 1 go per node,
+  // ignoring memory usage.
+  for (const Node *node : configuration::GetNodes(config)) {
+    LOG(INFO) << "Reading all data for " << node->name()->string_view();
+    const size_t node_index = configuration::GetNodeIndex(config, node);
+    TimestampMapper *timestamp_mapper = mappers[node_index].get();
+    if (timestamp_mapper == nullptr) {
+      continue;
+    }
+    while (true) {
+      TimestampedMessage *m = timestamp_mapper->Front();
+      if (m == nullptr) {
+        break;
+      }
+
+      if (m->monotonic_remote_time != monotonic_clock::min_time) {
+        // Got a forwarding timestamp!
+        message_bridge::NoncausalOffsetEstimator *filter =
+            filters[node_index][m->channel_index];
+        CHECK(filter != nullptr);
+
+        filter->Sample(node, m->monotonic_event_time, m->monotonic_remote_time);
+
+        // Call the correct method depending on if we are the forward or
+        // reverse direction here.
+        if (m->monotonic_timestamp_time != monotonic_clock::min_time) {
+          // TODO(austin): This assumes that this timestamp is only logged on
+          // the node which sent the data.  That is correct for now, but should
+          // be explicitly checked somewhere.
+          filter->ReverseSample(node, m->monotonic_event_time,
+                                m->monotonic_timestamp_time);
+        }
+      }
+      timestamp_mapper->PopFront();
+    }
+  }
+
+  // Don't get clever. Use the first time as the start time.  Note: this is
+  // different than how log_cat and others work.
+  std::optional<std::tuple<distributed_clock::time_point,
+                           std::vector<monotonic_clock::time_point>>>
+      next_timestamp = multinode_estimator.NextTimestamp();
+  CHECK(next_timestamp);
+  LOG(INFO) << "Starting at:";
+  for (const Node *node : configuration::GetNodes(config)) {
+    const size_t node_index = configuration::GetNodeIndex(config, node);
+    LOG(INFO) << "  " << node->name()->string_view() << " -> "
+              << std::get<1>(*next_timestamp)[node_index];
+  }
+
+  multinode_estimator.Start(std::get<1>(*next_timestamp));
+
+  // As we pull off all the timestamps, the time problem is continually solved,
+  // filling in the CSV files.
+  while (true) {
+    std::optional<std::tuple<distributed_clock::time_point,
+                             std::vector<monotonic_clock::time_point>>>
+        next_timestamp = multinode_estimator.NextTimestamp();
+    if (!next_timestamp) {
+      break;
+    }
+  }
+
+  return 0;
+}
+
+}  // namespace aos::logger
+
+int main(int argc, char **argv) {
+  FLAGS_timestamps_to_csv = true;
+  gflags::SetUsageMessage(
+      "Usage:\n"
+      "  timestamp_extractor [args] logfile1 logfile2 ...\n\nThis program "
+      "dumps out all the timestamps from a set of log files for plotting.  Use "
+      "--skip_order_validation to skip any time estimation problems we find.");
+  aos::InitGoogle(&argc, &argv);
+
+  return aos::logger::Main(argc, argv);
+}
diff --git a/aos/events/logging/timestamp_plot.gnuplot b/aos/events/logging/timestamp_plot.gnuplot
index 566448a..7ad62da 100755
--- a/aos/events/logging/timestamp_plot.gnuplot
+++ b/aos/events/logging/timestamp_plot.gnuplot
@@ -28,6 +28,6 @@
      samplefile21 using 1:(-$2) title 'sample 2-1', \
      noncausalfile12 using 1:3 title 'nc 1-2' with lines, \
      noncausalfile21 using 1:(-$3) title 'nc 2-1' with lines, \
-     offsetfile using ((column(node1_index) - node1_start_time + (column(node2_index) - node2_start_time)) / 2):(column(node2_index) - column(node1_index)) title 'filter 2-1' with lines
+     offsetfile using ((column(node1_index) - node1_start_time + (column(node2_index) - node2_start_time)) / 2):(column(node2_index) - column(node1_index)) title 'filter 2-1' with linespoints
 
 pause -1
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 0d32e3d..b1a1600 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -85,27 +85,32 @@
   }
 
   if (VLOG_IS_ON(1)) {
-    std::vector<double> gradient(n, 0.0);
-    Cost(result.data(), gradient.data());
-
-    // High precision formatter for the gradient.
-    struct MyFormatter {
-      void operator()(std::string *out, double i) const {
-        std::stringstream ss;
-        ss << std::setprecision(12) << std::fixed << i;
-        out->append(ss.str());
-      }
-    };
-
-    VLOG(1) << std::setprecision(12) << std::fixed << "Found minimum at f("
-            << absl::StrJoin(result, ", ") << ") -> " << minf << " grad ["
-            << absl::StrJoin(gradient, ", ", MyFormatter()) << "] after "
-            << cost_call_count_ << " cycles for node " << solution_node_ << ".";
+    PrintSolution(result);
   }
   nlopt_destroy(opt);
   return result;
 }
 
+void TimestampProblem::PrintSolution(const std::vector<double> &solution) {
+  const size_t n = filters_.size() - 1u;
+  std::vector<double> gradient(n, 0.0);
+  const double minf = Cost(solution.data(), gradient.data());
+
+  // High precision formatter for the gradient.
+  struct MyFormatter {
+    void operator()(std::string *out, double i) const {
+      std::stringstream ss;
+      ss << std::setprecision(12) << std::fixed << i;
+      out->append(ss.str());
+    }
+  };
+
+  LOG(INFO) << std::setprecision(12) << std::fixed << "Found minimum at f("
+            << absl::StrJoin(solution, ", ") << ") -> " << minf << " grad ["
+            << absl::StrJoin(gradient, ", ", MyFormatter()) << "] after "
+            << cost_call_count_ << " cycles for node " << solution_node_ << ".";
+}
+
 std::vector<monotonic_clock::time_point> TimestampProblem::DoubleToMonotonic(
     const double *r) const {
   std::vector<monotonic_clock::time_point> result(filters_.size());
@@ -432,14 +437,14 @@
   return result;
 }
 MultiNodeNoncausalOffsetEstimator::MultiNodeNoncausalOffsetEstimator(
-    SimulatedEventLoopFactory *event_loop_factory,
+    const Configuration *configuration,
     const Configuration *logged_configuration, bool skip_order_validation,
     chrono::nanoseconds time_estimation_buffer_seconds)
     : InterpolatedTimeConverter(!configuration::MultiNode(logged_configuration)
                                     ? 1u
                                     : logged_configuration->nodes()->size(),
                                 time_estimation_buffer_seconds),
-      event_loop_factory_(event_loop_factory),
+      configuration_(configuration),
       logged_configuration_(logged_configuration),
       skip_order_validation_(skip_order_validation) {
   filters_per_node_.resize(NodesCount());
@@ -484,15 +489,37 @@
 
 void MultiNodeNoncausalOffsetEstimator::Start(
     SimulatedEventLoopFactory *factory) {
+  std::vector<monotonic_clock::time_point> times;
+  for (const Node *node : configuration::GetNodes(factory->configuration())) {
+    times.emplace_back(factory->GetNodeEventLoopFactory(node)->monotonic_now());
+  }
+  Start(times);
+}
+
+void MultiNodeNoncausalOffsetEstimator::Start(
+    std::vector<monotonic_clock::time_point> times) {
   for (std::pair<const std::tuple<const Node *, const Node *>,
                  message_bridge::NoncausalOffsetEstimator> &filter : filters_) {
     const Node *const node_a = std::get<0>(filter.first);
+    const size_t node_a_index =
+        configuration::GetNodeIndex(configuration_, node_a);
     const Node *const node_b = std::get<1>(filter.first);
+    const size_t node_b_index =
+        configuration::GetNodeIndex(configuration_, node_b);
 
-    filter.second.SetFirstFwdTime(
-        factory->GetNodeEventLoopFactory(node_a)->monotonic_now());
-    filter.second.SetFirstRevTime(
-        factory->GetNodeEventLoopFactory(node_b)->monotonic_now());
+    filter.second.SetFirstFwdTime(times[node_a_index]);
+    filter.second.SetFirstRevTime(times[node_b_index]);
+  }
+
+  std::fstream s("/tmp/timestamp_noncausal_starttime.csv", s.trunc | s.out);
+  CHECK(s.is_open());
+  for (const Node *node : configuration::GetNodes(configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    s << node->name()->string_view() << ", " << std::setprecision(12)
+      << std::fixed
+      << chrono::duration<double>(times[node_index].time_since_epoch()).count()
+      << "\n";
   }
 }
 
@@ -894,13 +921,16 @@
       // Bypass checking if order validation is turned off.  This lets us dump a
       // CSV file so we can view the problem and figure out what to do.  The
       // results won't make sense.
-      if (!skip_order_validation_ && !problem->ValidateSolution(solution)) {
+      if (!problem->ValidateSolution(solution)) {
         LOG(WARNING) << "Invalid solution, constraints not met.";
         for (size_t i = 0; i < solution.size(); ++i) {
           LOG(INFO) << "  " << solution[i];
         }
         problem->Debug();
-        LOG(FATAL) << "Bailing";
+        if (!skip_order_validation_) {
+          LOG(FATAL) << "Bailing, use --skip_order_validation to continue.  "
+                        "Use at your own risk.";
+        }
       }
 
       if (VLOG_IS_ON(1)) {
@@ -932,7 +962,7 @@
         case TimeComparison::kInvalid: {
           // If times are close enough, drop the invalid time.
           if (InvalidDistance(result_times, solution) <
-              chrono::nanoseconds(500)) {
+                  chrono::nanoseconds(500)) {
             VLOG(1) << "Times can't be compared by "
                     << InvalidDistance(result_times, solution).count() << "ns";
             for (size_t i = 0; i < result_times.size(); ++i) {
@@ -955,9 +985,10 @@
                       << " -> " << (result_times[i] - solution[i]).count()
                       << "ns";
           }
-          FLAGS_v = 1;
-
-          // Attempting solution for each node.
+          // Since we found a problem with the solution, solve one problem per
+          // node, starting at the problem point.  This will show us any
+          // inconsistencies due to the problem phrasing and which node we
+          // solved from.
           for (size_t a_index = 0; a_index < solution.size(); ++a_index) {
             if (!problem->live(a_index)) {
               continue;
@@ -972,21 +1003,29 @@
 
             problem->set_solution_node(a_index);
             problem->Debug();
-            std::vector<monotonic_clock::time_point> resolve_solution =
-                problem->Solve();
+            const std::vector<double> resolve_solution_double =
+                problem->SolveDouble();
+            problem->PrintSolution(resolve_solution_double);
 
-            if (VLOG_IS_ON(1)) {
-              VLOG(1) << "Candidate solution for resolved node " << a_index
+            const std::vector<monotonic_clock::time_point> resolve_solution =
+                problem->DoubleToMonotonic(resolve_solution_double.data());
+
+            LOG(INFO) << "Candidate solution for resolved node " << a_index
                       << " is";
-              for (size_t i = 0; i < resolve_solution.size(); ++i) {
-                VLOG(1) << "  " << resolve_solution[i] << " vs original "
+            for (size_t i = 0; i < resolve_solution.size(); ++i) {
+              LOG(INFO) << "  " << resolve_solution[i] << " vs original "
                         << solution[i] << " -> "
                         << (resolve_solution[i] - solution[i]).count();
-              }
             }
           }
 
-          LOG(FATAL) << "Please investigate.";
+          if (skip_order_validation_) {
+            next_node_filter->Consume();
+            LOG(ERROR) << "Skipping because --skip_order_validation";
+            break;
+          } else {
+            LOG(FATAL) << "Please investigate.";
+          }
         } break;
       }
       ++node_a_index;
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 73b4ecb..d3bba2c 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -110,6 +110,10 @@
     return count;
   }
 
+  // LOG(INFO)'s the provided solution, showing the arguments, the minimum
+  // value, and the gradient.
+  void PrintSolution(const std::vector<double> &solution);
+
  private:
   // Static trampoline for nlopt.  n is the number of constraints, time_offsets
   // is input solution to solve for, grad is the gradient to fill out (if not
@@ -287,7 +291,7 @@
     : public InterpolatedTimeConverter {
  public:
   MultiNodeNoncausalOffsetEstimator(
-      SimulatedEventLoopFactory *event_loop_factory,
+      const Configuration *configuration,
       const Configuration *logged_configuration, bool skip_order_validation,
       std::chrono::nanoseconds time_estimation_buffer_seconds);
 
@@ -308,6 +312,7 @@
 
   // Captures the start time.
   void Start(SimulatedEventLoopFactory *factory);
+  void Start(std::vector<monotonic_clock::time_point> times);
 
   // Returns the number of nodes.
   size_t NodesCount() const {
@@ -320,9 +325,7 @@
   }
 
   // Returns the configuration that we are replaying into.
-  const aos::Configuration *configuration() const {
-    return event_loop_factory_->configuration();
-  }
+  const aos::Configuration *configuration() const { return configuration_; }
 
  private:
   TimestampProblem MakeProblem();
@@ -332,7 +335,7 @@
   NextSolution(TimestampProblem *problem,
                const std::vector<aos::monotonic_clock::time_point> &base_times);
 
-  SimulatedEventLoopFactory *event_loop_factory_;
+  const Configuration *configuration_;
   const Configuration *logged_configuration_;
 
   // If true, skip any validation which would trigger if we see evidance that