Allow non-fatal timestamp solver failures

This refactors the timestamp filters so that it is possible to observe
timestamp solver failures without having the application crash.

This also makes it so that the timestamp_extractor code gets at least
somewhat exercised by our tests.

The only behavioral change this should cause for normal users is that
the stack trace will look slightly different when errors occur, because
the LOG(FATAL) now occurs when evaluating the result of QueueNextTimestamp()
instead of deep in the code (the prior LOG(FATAL)'s are replaced by
LOG(ERROR)'s to keep them visible).

Change-Id: I01e4de8df724c2853cc62ac588668b811563b33d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index a687748..c33606c 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -545,6 +545,24 @@
     ],
 )
 
+cc_library(
+    name = "logfile_validator",
+    srcs = [
+        "logfile_validator.cc",
+    ],
+    hdrs = ["logfile_validator.h"],
+    target_compatible_with = ["@platforms//os:linux"],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":logfile_utils",
+        "//aos:configuration",
+        "//aos/events:simulated_event_loop",
+        "//aos/network:multinode_timestamp_filter",
+        "@com_github_gflags_gflags//:gflags",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
 cc_binary(
     name = "timestamp_extractor",
     srcs = [
@@ -554,6 +572,7 @@
     visibility = ["//visibility:public"],
     deps = [
         ":logfile_utils",
+        ":logfile_validator",
         "//aos:configuration",
         "//aos:init",
         "//aos/events:simulated_event_loop",
@@ -652,6 +671,7 @@
     deps = [
         ":log_reader",
         ":log_writer",
+        ":logfile_validator",
         ":snappy_encoder",
         "//aos/events:message_counter",
         "//aos/events:ping_lib",
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 3cdfba1..e1aa620 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -288,6 +288,8 @@
   // Returns true if we have timestamps in any of the files.
   bool HasTimestamps(std::string_view node_name) const;
 
+  const std::vector<LogFile> &log_files() const { return log_files_; }
+
  private:
   LogFilesContainer(std::optional<const LogSource *> log_source,
                     std::vector<LogFile> log_files);
diff --git a/aos/events/logging/logfile_validator.cc b/aos/events/logging/logfile_validator.cc
new file mode 100644
index 0000000..19d11c8
--- /dev/null
+++ b/aos/events/logging/logfile_validator.cc
@@ -0,0 +1,162 @@
+
+#include "aos/events/logging/logfile_validator.h"
+
+#include "aos/events/logging/logfile_sorting.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logfile_validator.h"
+#include "aos/network/multinode_timestamp_filter.h"
+
+namespace aos::logger {
+bool MultiNodeLogIsReadable(const LogFilesContainer &log_files,
+                            bool skip_order_validation) {
+  const Configuration *config = log_files.config().get();
+
+  CHECK(configuration::MultiNode(config))
+      << ": Timestamps only make sense in a multi-node world.";
+
+  // Now, build up all the TimestampMapper classes to read and sort the data.
+  std::vector<std::unique_ptr<TimestampMapper>> mappers;
+
+  for (const Node *node : configuration::GetNodes(config)) {
+    auto node_name = MaybeNodeName(node);
+    // Confirm that all the parts are from the same boot if there are enough
+    // parts to not be from the same boot.
+    if (log_files.ContainsPartsForNode(node_name)) {
+      // Filter the parts relevant to each node when building the mapper.
+      mappers.emplace_back(std::make_unique<TimestampMapper>(
+          node_name, log_files, TimestampQueueStrategy::kQueueTogether));
+    } else {
+      mappers.emplace_back(nullptr);
+    }
+  }
+
+  // Now, build up the estimator used to solve for time.
+  message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
+      config, config, log_files.boots(), skip_order_validation,
+      std::chrono::seconds(0));
+  multinode_estimator.set_reboot_found(
+      [config](distributed_clock::time_point reboot_time,
+               const std::vector<logger::BootTimestamp> &node_times) {
+        LOG(INFO) << "Rebooted at distributed " << reboot_time;
+        size_t node_index = 0;
+        for (const logger::BootTimestamp &time : node_times) {
+          LOG(INFO) << "  "
+                    << config->nodes()->Get(node_index)->name()->string_view()
+                    << " " << time;
+          ++node_index;
+        }
+      });
+
+  // Because RAII doesn't let us do non-fatal/non-exception things, use this
+  // when returning to handle certain cleanup-related checks that would normally
+  // happen fatally in the estimator destrictor.
+  auto preempt_destructor = [&multinode_estimator](bool success) {
+    if (!multinode_estimator.RunDestructorChecks()) {
+      return false;
+    }
+    return success;
+  };
+
+  {
+    std::vector<TimestampMapper *> timestamp_mappers;
+    for (std::unique_ptr<TimestampMapper> &mapper : mappers) {
+      timestamp_mappers.emplace_back(mapper.get());
+    }
+    multinode_estimator.SetTimestampMappers(std::move(timestamp_mappers));
+  }
+
+  // To make things more like the logger and faster, cache the node + channel ->
+  // filter mapping in a set of vectors.
+  std::vector<std::vector<message_bridge::NoncausalOffsetEstimator *>> filters;
+  filters.resize(configuration::NodesCount(config));
+
+  for (const Node *node : configuration::GetNodes(config)) {
+    const size_t node_index = configuration::GetNodeIndex(config, node);
+    filters[node_index].resize(config->channels()->size(), nullptr);
+    for (size_t channel_index = 0; channel_index < config->channels()->size();
+         ++channel_index) {
+      const Channel *channel = config->channels()->Get(channel_index);
+
+      if (!configuration::ChannelIsSendableOnNode(channel, node) &&
+          configuration::ChannelIsReadableOnNode(channel, node)) {
+        // We've got a message which is being forwarded to this node.
+        const Node *source_node = configuration::GetNode(
+            config, channel->source_node()->string_view());
+        filters[node_index][channel_index] =
+            multinode_estimator.GetFilter(node, source_node);
+      }
+    }
+  }
+
+  multinode_estimator.CheckGraph();
+
+  // Now, read all the timestamps for each node.  This is simpler than the
+  // logger on purpose.  It loads in *all* the timestamps in 1 go per node,
+  // ignoring memory usage.
+  for (const Node *node : configuration::GetNodes(config)) {
+    LOG(INFO) << "Reading all data for " << node->name()->string_view();
+    const size_t node_index = configuration::GetNodeIndex(config, node);
+    TimestampMapper *timestamp_mapper = mappers[node_index].get();
+    if (timestamp_mapper == nullptr) {
+      continue;
+    }
+    while (true) {
+      TimestampedMessage *m = timestamp_mapper->Front();
+      if (m == nullptr) {
+        break;
+      }
+      timestamp_mapper->PopFront();
+    }
+  }
+
+  // Don't get clever. Use the first time as the start time.  Note: this is
+  // different than how log_cat and others work.
+  std::optional<std::optional<const std::tuple<distributed_clock::time_point,
+                                               std::vector<BootTimestamp>> *>>
+      next_timestamp = multinode_estimator.QueueNextTimestamp();
+  if (!next_timestamp.has_value() || !next_timestamp.value().has_value()) {
+    return preempt_destructor(false);
+  }
+  LOG(INFO) << "Starting at:";
+  for (const Node *node : configuration::GetNodes(config)) {
+    const size_t node_index = configuration::GetNodeIndex(config, node);
+    LOG(INFO) << "  " << node->name()->string_view() << " -> "
+              << std::get<1>(*next_timestamp.value().value())[node_index].time;
+  }
+
+  std::vector<monotonic_clock::time_point> just_monotonic(
+      std::get<1>(*next_timestamp.value().value()).size());
+  for (size_t i = 0; i < just_monotonic.size(); ++i) {
+    CHECK_EQ(std::get<1>(*next_timestamp.value().value())[i].boot, 0u);
+    just_monotonic[i] = std::get<1>(*next_timestamp.value().value())[i].time;
+  }
+  multinode_estimator.Start(just_monotonic);
+
+  // As we pull off all the timestamps, the time problem is continually solved,
+  // filling in the CSV files.
+  while (true) {
+    std::optional<std::optional<const std::tuple<distributed_clock::time_point,
+                                                 std::vector<BootTimestamp>> *>>
+        next_timestamp = multinode_estimator.QueueNextTimestamp();
+    if (!next_timestamp.has_value()) {
+      return preempt_destructor(false);
+    }
+    if (!next_timestamp.value().has_value()) {
+      break;
+    }
+    multinode_estimator.ObserveTimePassed(
+        std::get<0>(*next_timestamp.value().value()));
+  }
+
+  LOG(INFO) << "Done";
+
+  return preempt_destructor(true);
+}
+
+bool LogIsReadableIfMultiNode(const LogFilesContainer &log_files) {
+  if (aos::configuration::NodesCount(log_files.config().get()) == 1u) {
+    return true;
+  }
+  return MultiNodeLogIsReadable(log_files);
+}
+}  // namespace aos::logger
diff --git a/aos/events/logging/logfile_validator.h b/aos/events/logging/logfile_validator.h
new file mode 100644
index 0000000..814b89e
--- /dev/null
+++ b/aos/events/logging/logfile_validator.h
@@ -0,0 +1,16 @@
+#ifndef AOS_EVENTS_LOGGING_LOGFILE_VALIDATOR_H_
+#define AOS_EVENTS_LOGGING_LOGFILE_VALIDATOR_H_
+#include "aos/events/logging/logfile_sorting.h"
+namespace aos::logger {
+// Attempts to validate that a log is readable without actually running the full
+// LogReader. This aims to allow the user to preempt fatal crashes that can
+// occur when trying to replay a log.
+// Returns true if successful.
+bool MultiNodeLogIsReadable(const LogFilesContainer &log_files,
+                            bool skip_order_validation = false);
+
+// Returns true if the requested log is either a single-node log or if the
+// MultiNodeLogIsReadable() returns true.
+bool LogIsReadableIfMultiNode(const LogFilesContainer &log_files);
+}  // namespace aos::logger
+#endif  // AOS_EVENTS_LOGGING_LOGFILE_VALIDATOR_H_
diff --git a/aos/events/logging/multinode_logger_test_lib.cc b/aos/events/logging/multinode_logger_test_lib.cc
index 9a905b1..64dd1cd 100644
--- a/aos/events/logging/multinode_logger_test_lib.cc
+++ b/aos/events/logging/multinode_logger_test_lib.cc
@@ -3,6 +3,7 @@
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/log_reader.h"
 #include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logfile_validator.h"
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
 #include "aos/events/simulated_event_loop.h"
@@ -442,6 +443,7 @@
 
     reader.Deregister();
   }
+  CHECK(LogIsReadableIfMultiNode(LogFilesContainer{SortParts(files)}));
   {
     std::vector<std::pair<std::vector<realtime_clock::time_point>,
                           std::vector<realtime_clock::time_point>>>
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index ca7f231..3cf96f6 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -6,8 +6,8 @@
 
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logfile_validator.h"
 #include "aos/init.h"
-#include "aos/network/multinode_timestamp_filter.h"
 
 DECLARE_bool(timestamps_to_csv);
 DEFINE_bool(skip_order_validation, false,
@@ -15,135 +15,9 @@
 
 namespace aos::logger {
 
-namespace chrono = std::chrono;
-
 int Main(int argc, char **argv) {
   const LogFilesContainer log_files(SortParts(FindLogs(argc, argv)));
-  const Configuration *config = log_files.config().get();
-
-  CHECK(configuration::MultiNode(config))
-      << ": Timestamps only make sense in a multi-node world.";
-
-  // Now, build up all the TimestampMapper classes to read and sort the data.
-  std::vector<std::unique_ptr<TimestampMapper>> mappers;
-
-  for (const Node *node : configuration::GetNodes(config)) {
-    auto node_name = MaybeNodeName(node);
-    // Confirm that all the parts are from the same boot if there are enough
-    // parts to not be from the same boot.
-    if (!log_files.ContainsPartsForNode(node_name)) {
-      // Filter the parts relevant to each node when building the mapper.
-      mappers.emplace_back(std::make_unique<TimestampMapper>(
-          node_name, log_files, TimestampQueueStrategy::kQueueTogether));
-    } else {
-      mappers.emplace_back(nullptr);
-    }
-  }
-
-  // Now, build up the estimator used to solve for time.
-  message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
-      config, config, log_files.boots(), FLAGS_skip_order_validation,
-      chrono::seconds(0));
-  multinode_estimator.set_reboot_found(
-      [config](distributed_clock::time_point reboot_time,
-               const std::vector<logger::BootTimestamp> &node_times) {
-        LOG(INFO) << "Rebooted at distributed " << reboot_time;
-        size_t node_index = 0;
-        for (const logger::BootTimestamp &time : node_times) {
-          LOG(INFO) << "  "
-                    << config->nodes()->Get(node_index)->name()->string_view()
-                    << " " << time;
-          ++node_index;
-        }
-      });
-
-  {
-    std::vector<TimestampMapper *> timestamp_mappers;
-    for (std::unique_ptr<TimestampMapper> &mapper : mappers) {
-      timestamp_mappers.emplace_back(mapper.get());
-    }
-    multinode_estimator.SetTimestampMappers(std::move(timestamp_mappers));
-  }
-
-  // To make things more like the logger and faster, cache the node + channel ->
-  // filter mapping in a set of vectors.
-  std::vector<std::vector<message_bridge::NoncausalOffsetEstimator *>> filters;
-  filters.resize(configuration::NodesCount(config));
-
-  for (const Node *node : configuration::GetNodes(config)) {
-    const size_t node_index = configuration::GetNodeIndex(config, node);
-    filters[node_index].resize(config->channels()->size(), nullptr);
-    for (size_t channel_index = 0; channel_index < config->channels()->size();
-         ++channel_index) {
-      const Channel *channel = config->channels()->Get(channel_index);
-
-      if (!configuration::ChannelIsSendableOnNode(channel, node) &&
-          configuration::ChannelIsReadableOnNode(channel, node)) {
-        // We've got a message which is being forwarded to this node.
-        const Node *source_node = configuration::GetNode(
-            config, channel->source_node()->string_view());
-        filters[node_index][channel_index] =
-            multinode_estimator.GetFilter(node, source_node);
-      }
-    }
-  }
-
-  multinode_estimator.CheckGraph();
-
-  // Now, read all the timestamps for each node.  This is simpler than the
-  // logger on purpose.  It loads in *all* the timestamps in 1 go per node,
-  // ignoring memory usage.
-  for (const Node *node : configuration::GetNodes(config)) {
-    LOG(INFO) << "Reading all data for " << node->name()->string_view();
-    const size_t node_index = configuration::GetNodeIndex(config, node);
-    TimestampMapper *timestamp_mapper = mappers[node_index].get();
-    if (timestamp_mapper == nullptr) {
-      continue;
-    }
-    while (true) {
-      TimestampedMessage *m = timestamp_mapper->Front();
-      if (m == nullptr) {
-        break;
-      }
-      timestamp_mapper->PopFront();
-    }
-  }
-
-  // Don't get clever. Use the first time as the start time.  Note: this is
-  // different than how log_cat and others work.
-  std::optional<const std::tuple<distributed_clock::time_point,
-                                 std::vector<BootTimestamp>> *>
-      next_timestamp = multinode_estimator.QueueNextTimestamp();
-  CHECK(next_timestamp);
-  LOG(INFO) << "Starting at:";
-  for (const Node *node : configuration::GetNodes(config)) {
-    const size_t node_index = configuration::GetNodeIndex(config, node);
-    LOG(INFO) << "  " << node->name()->string_view() << " -> "
-              << std::get<1>(*next_timestamp.value())[node_index].time;
-  }
-
-  std::vector<monotonic_clock::time_point> just_monotonic(
-      std::get<1>(*next_timestamp.value()).size());
-  for (size_t i = 0; i < just_monotonic.size(); ++i) {
-    CHECK_EQ(std::get<1>(*next_timestamp.value())[i].boot, 0u);
-    just_monotonic[i] = std::get<1>(*next_timestamp.value())[i].time;
-  }
-  multinode_estimator.Start(just_monotonic);
-
-  // As we pull off all the timestamps, the time problem is continually solved,
-  // filling in the CSV files.
-  while (true) {
-    std::optional<const std::tuple<distributed_clock::time_point,
-                                   std::vector<BootTimestamp>> *>
-        next_timestamp = multinode_estimator.QueueNextTimestamp();
-    if (!next_timestamp) {
-      break;
-    }
-    multinode_estimator.ObserveTimePassed(std::get<0>(*next_timestamp.value()));
-  }
-
-  LOG(INFO) << "Done";
-
+  CHECK(MultiNodeLogIsReadable(log_files, FLAGS_skip_order_validation));
   return 0;
 }
 
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index 65a049f..85ee12c 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -877,20 +877,27 @@
   return updated;
 }
 
-std::tuple<std::vector<BootTimestamp>, Eigen::VectorXd, size_t, size_t>
+std::optional<
+    std::tuple<std::vector<BootTimestamp>, Eigen::VectorXd, size_t, size_t>>
 SolveConstrainedNewton(NewtonSolver *solver, TimestampProblem *problem,
                        const size_t max_iterations) {
   std::vector<size_t> active_constraints;
 
   while (true) {
-    Eigen::VectorXd y;
     size_t iteration;
     size_t solution_node;
-    std::vector<size_t> new_active_constraints;
 
-    std::tie(y, solution_node, iteration, new_active_constraints) =
-        solver->SolveConstrainedNewton(problem, max_iterations,
-                                       active_constraints);
+    auto solver_result = solver->SolveConstrainedNewton(problem, max_iterations,
+                                                        active_constraints);
+    if (!solver_result.has_value()) {
+      return std::nullopt;
+    }
+
+    std::tie(std::ignore, solution_node, iteration, std::ignore) =
+        *solver_result;
+    Eigen::VectorXd y = std::move(std::get<0>(*solver_result));
+    std::vector<size_t> new_active_constraints =
+        std::move(std::get<3>(*solver_result));
 
     std::vector<BootTimestamp> result =
         problem->PackResults(iteration > max_iterations ||
@@ -913,7 +920,7 @@
   }
 }
 
-std::tuple<Eigen::VectorXd, size_t, size_t, std::vector<size_t>>
+std::optional<std::tuple<Eigen::VectorXd, size_t, size_t, std::vector<size_t>>>
 NewtonSolver::SolveConstrainedNewton(
     Problem *problem, const size_t max_iterations,
     const absl::Span<const size_t> original_active_constraints) {
@@ -1197,8 +1204,9 @@
     }
   }
 
-  if (iteration > max_iterations) {
-    LOG(FATAL) << "Failed to converge on solve " << my_solve_number_;
+  if (iteration > max_iterations && FLAGS_crash_on_solve_failure) {
+    LOG(ERROR) << "Failed to converge on solve " << my_solve_number_;
+    return std::nullopt;
   }
 
   return std::make_tuple(std::move(y), solution_node, iteration,
@@ -1345,35 +1353,45 @@
   }
 }
 
-std::optional<const std::tuple<distributed_clock::time_point,
-                               std::vector<BootTimestamp>> *>
+std::optional<std::optional<const std::tuple<distributed_clock::time_point,
+                                             std::vector<BootTimestamp>> *>>
 InterpolatedTimeConverter::QueueNextTimestamp() {
-  std::optional<
-      std::tuple<distributed_clock::time_point, std::vector<BootTimestamp>>>
+  std::optional<std::optional<
+      std::tuple<distributed_clock::time_point, std::vector<BootTimestamp>>>>
       next_time = NextTimestamp();
-  if (!next_time) {
-    VLOG(1) << "Last timestamp, calling it quits";
-    at_end_ = true;
+  if (!next_time.has_value()) {
+    VLOG(1) << "Error in processing timestamps.";
     return std::nullopt;
   }
+  if (!next_time.value().has_value()) {
+    VLOG(1) << "Last timestamp, calling it quits";
+    std::optional<std::optional<const std::tuple<distributed_clock::time_point,
+                                                 std::vector<BootTimestamp>> *>>
+        result;
+    result.emplace(std::nullopt);
+    // Check that C++ actually works how we think it does...
+    CHECK(result.has_value());
+    at_end_ = true;
+    return result;
+  }
 
-  VLOG(1) << "Fetched next timestamp while solving: " << std::get<0>(*next_time)
-          << " ->";
-  for (BootTimestamp t : std::get<1>(*next_time)) {
+  VLOG(1) << "Fetched next timestamp while solving: "
+          << std::get<0>(**next_time) << " ->";
+  for (BootTimestamp t : std::get<1>(**next_time)) {
     VLOG(1) << "  " << t;
   }
 
-  CHECK_EQ(node_count_, std::get<1>(*next_time).size());
+  CHECK_EQ(node_count_, std::get<1>(**next_time).size());
 
   if (times_.empty()) {
-    for (BootTimestamp t : std::get<1>(*next_time)) {
+    for (BootTimestamp t : std::get<1>(**next_time)) {
       CHECK_EQ(t.boot, 0u);
     }
   } else {
     bool rebooted = false;
     for (size_t i = 0; i < node_count_; ++i) {
       if (std::get<1>(times_.back())[i].boot !=
-          std::get<1>(*next_time)[i].boot) {
+          std::get<1>(**next_time)[i].boot) {
         rebooted = true;
         break;
       }
@@ -1383,15 +1401,15 @@
       if (VLOG_IS_ON(2)) {
         VLOG(2) << "Notified reboot of";
         size_t node_index = 0;
-        for (logger::BootTimestamp t : std::get<1>(*next_time)) {
+        for (logger::BootTimestamp t : std::get<1>(**next_time)) {
           VLOG(2) << "  Node " << node_index << " " << t;
           ++node_index;
         }
       }
-      reboot_found_(std::get<0>(*next_time), std::get<1>(*next_time));
+      reboot_found_(std::get<0>(**next_time), std::get<1>(**next_time));
     }
   }
-  times_.emplace_back(std::move(*next_time));
+  times_.emplace_back(std::move(**next_time));
   return &times_.back();
 }
 
@@ -1693,7 +1711,7 @@
   }
 }
 
-void MultiNodeNoncausalOffsetEstimator::FlushAndClose(bool destructor) {
+bool MultiNodeNoncausalOffsetEstimator::FlushAndClose(bool destructor) {
   // Write out all the data in our filters.
   FlushAllSamples(true);
   if (fp_) {
@@ -1726,6 +1744,7 @@
         }
       }
     }
+    filter_fps_.clear();
   }
   if (sample_fps_.size() != 0) {
     for (std::vector<FILE *> &filter_fp : sample_fps_) {
@@ -1735,6 +1754,7 @@
         }
       }
     }
+    sample_fps_.clear();
   }
 
   if (all_done_) {
@@ -1744,13 +1764,14 @@
         std::optional<std::tuple<BootTimestamp, BootDuration>> next =
             filter.filter->Consume();
         if (next) {
-          skip_order_validation_
-              ? LOG(WARNING)
-              : LOG(FATAL) << "MultiNodeNoncausalOffsetEstimator reported all "
-                              "done, but "
-                           << node_a_index << " -> " << filter.b_index
-                           << " found more data at time " << std::get<0>(*next)
-                           << ".  Time estimation was silently wrong.";
+          LOG(ERROR) << "MultiNodeNoncausalOffsetEstimator reported all "
+                        "done, but "
+                     << node_a_index << " -> " << filter.b_index
+                     << " found more data at time " << std::get<0>(*next)
+                     << ".  Time estimation was silently wrong.";
+          if (!skip_order_validation_) {
+            return false;
+          }
         }
       }
       ++node_a_index;
@@ -1761,14 +1782,21 @@
   if (!node_samples_.empty()) {
     for (NodeSamples &node : node_samples_) {
       for (SingleNodeSamples &timestamps : node.nodes) {
-        CHECK(timestamps.messages.empty());
+        if (!timestamps.messages.empty()) {
+          LOG(ERROR) << "Timestamps still remaining.";
+          return false;
+        }
       }
     }
   }
+  return true;
 }
 
 MultiNodeNoncausalOffsetEstimator::~MultiNodeNoncausalOffsetEstimator() {
-  FlushAndClose(true);
+  const bool success = FlushAndClose(true);
+  if (!non_fatal_destructor_checks_) {
+    CHECK(success);
+  }
 }
 
 UUID MultiNodeNoncausalOffsetEstimator::boot_uuid(size_t node_index,
@@ -2438,7 +2466,8 @@
   return std::make_tuple(candidate_times, boots_all_match);
 }
 
-std::tuple<NoncausalTimestampFilter *, std::vector<BootTimestamp>, int>
+std::optional<
+    std::tuple<NoncausalTimestampFilter *, std::vector<BootTimestamp>, int>>
 MultiNodeNoncausalOffsetEstimator::SimultaneousSolution(
     TimestampProblem *problem,
     const std::vector<CandidateTimes> candidate_times,
@@ -2497,8 +2526,11 @@
 
     if (iterations > kMaxIterations && FLAGS_crash_on_solve_failure) {
       UpdateSolution(std::move(solution));
-      FlushAndClose(false);
-      LOG(FATAL) << "Failed to converge.";
+      if (!FlushAndClose(false)) {
+        return std::nullopt;
+      }
+      LOG(ERROR) << "Failed to converge.";
+      return std::nullopt;
     }
 
     if (!problem->ValidateSolution(solution, true)) {
@@ -2512,13 +2544,16 @@
         problem->Debug();
         if (!skip_order_validation_) {
           UpdateSolution(solution);
-          FlushAndClose(false);
-          LOG(FATAL) << "Bailing, use --skip_order_validation to continue.  "
+          if (!FlushAndClose(false)) {
+            return std::nullopt;
+          }
+          LOG(ERROR) << "Bailing, use --skip_order_validation to continue.  "
                         "Use at your own risk.";
+          return std::nullopt;
         }
       } else {
         // Aw, our initial attempt to solve failed.  Now, let's try again with
-        // constriants.
+        // constraints.
         for (size_t node_index = 0; node_index < base_times.size();
              ++node_index) {
           problem->set_base_clock(node_index, solution[node_index]);
@@ -2530,14 +2565,24 @@
         }
 
         problem->set_points(points);
-        std::tie(solution, solution_y, solution_index, iterations) =
+        auto solver_result =
             SolveConstrainedNewton(&solver, problem, kMaxIterations);
+        if (!solver_result.has_value()) {
+          return std::nullopt;
+        }
+        solution = std::move(std::get<0>(solver_result.value()));
+        solution_y = std::move(std::get<1>(solver_result.value()));
+        std::tie(std::ignore, std::ignore, solution_index, iterations) =
+            *solver_result;
 
         if (iterations > kMaxIterations && FLAGS_crash_on_solve_failure) {
           UpdateSolution(std::move(solution));
-          FlushAndClose(false);
-          LOG(FATAL) << "Failed to converge for problem "
+          if (!FlushAndClose(false)) {
+            return std::nullopt;
+          }
+          LOG(ERROR) << "Failed to converge for problem "
                      << solver.my_solve_number();
+          return std::nullopt;
         }
         if (!problem->ValidateSolution(solution, false)) {
           LOG(WARNING) << "Invalid solution, constraints not met for problem "
@@ -2548,9 +2593,12 @@
           problem->Debug();
           if (!skip_order_validation_) {
             UpdateSolution(solution);
-            FlushAndClose(false);
-            LOG(FATAL) << "Bailing, use --skip_order_validation to continue.  "
+            if (!FlushAndClose(false)) {
+              return std::nullopt;
+            }
+            LOG(ERROR) << "Bailing, use --skip_order_validation to continue.  "
                           "Use at your own risk.";
+            return std::nullopt;
           }
         }
       }
@@ -2563,7 +2611,7 @@
   return std::make_tuple(next_filter, std::move(result_times), solution_index);
 }
 
-void MultiNodeNoncausalOffsetEstimator::CheckInvalidDistance(
+bool MultiNodeNoncausalOffsetEstimator::CheckInvalidDistance(
     const std::vector<BootTimestamp> &result_times,
     const std::vector<BootTimestamp> &solution) {
   // If times are close enough, drop the invalid time.
@@ -2577,7 +2625,7 @@
               << (result_times[i].time - solution[i].time).count() << "ns";
     }
     VLOG(1) << "Ignoring because it is close enough.";
-    return;
+    return true;
   }
   // Somehow the new solution is better *and* worse than the old
   // solution...  This is an internal failure because that means time
@@ -2594,13 +2642,18 @@
     LOG(ERROR) << "Skipping because --skip_order_validation";
   } else {
     UpdateSolution(solution);
-    FlushAndClose(false);
-    LOG(FATAL) << "Please investigate.  Use --max_invalid_distance_ns="
+    if (!FlushAndClose(false)) {
+      return false;
+    }
+    LOG(ERROR) << "Please investigate.  Use --max_invalid_distance_ns="
                << invalid_distance.count() << " to ignore this.";
+    return false;
   }
+  return true;
 }
 
-std::tuple<NoncausalTimestampFilter *, std::vector<BootTimestamp>, int>
+std::optional<
+    std::tuple<NoncausalTimestampFilter *, std::vector<BootTimestamp>, int>>
 MultiNodeNoncausalOffsetEstimator::SequentialSolution(
     TimestampProblem *problem,
     const std::vector<CandidateTimes> candidate_times,
@@ -2704,9 +2757,12 @@
 
     if (iterations > kMaxIterations && FLAGS_crash_on_solve_failure) {
       UpdateSolution(std::move(solution));
-      FlushAndClose(false);
-      LOG(FATAL) << "Failed to converge for problem "
+      if (!FlushAndClose(false)) {
+        return std::nullopt;
+      }
+      LOG(ERROR) << "Failed to converge for problem "
                  << solver.my_solve_number();
+      return std::nullopt;
     }
 
     // Bypass checking if order validation is turned off.  This lets us dump a
@@ -2725,9 +2781,12 @@
         problem->Debug();
         if (!skip_order_validation_) {
           UpdateSolution(solution);
-          FlushAndClose(false);
-          LOG(FATAL) << "Bailing, use --skip_order_validation to continue.  "
+          if (!FlushAndClose(false)) {
+            return std::nullopt;
+          }
+          LOG(ERROR) << "Bailing, use --skip_order_validation to continue.  "
                         "Use at your own risk.";
+          return std::nullopt;
         }
       } else {
         // Seed with our last solution.
@@ -2737,14 +2796,24 @@
         }
         // And now resolve constrained.
         problem->set_points(points);
-        std::tie(solution, solution_y, solution_index, iterations) =
+        auto solver_result =
             SolveConstrainedNewton(&solver, problem, kMaxIterations);
+        if (!solver_result.has_value()) {
+          return std::nullopt;
+        }
+        solution = std::move(std::get<0>(solver_result.value()));
+        solution_y = std::move(std::get<1>(solver_result.value()));
+        std::tie(std::ignore, std::ignore, solution_index, iterations) =
+            *solver_result;
 
         if (iterations > kMaxIterations && FLAGS_crash_on_solve_failure) {
           UpdateSolution(std::move(solution));
-          FlushAndClose(false);
-          LOG(FATAL) << "Failed to converge for problem "
+          if (!FlushAndClose(false)) {
+            return std::nullopt;
+          }
+          LOG(ERROR) << "Failed to converge for problem "
                      << solver.my_solve_number();
+          return std::nullopt;
         }
         if (!problem->ValidateSolution(solution, false)) {
           LOG(WARNING) << "Invalid solution, constraints not met for problem "
@@ -2755,9 +2824,12 @@
           problem->Debug();
           if (!skip_order_validation_) {
             UpdateSolution(solution);
-            FlushAndClose(false);
-            LOG(FATAL) << "Bailing, use --skip_order_validation to continue.  "
+            if (!FlushAndClose(false)) {
+              return std::nullopt;
+            }
+            LOG(ERROR) << "Bailing, use --skip_order_validation to continue.  "
                           "Use at your own risk.";
+            return std::nullopt;
           }
         }
       }
@@ -2791,7 +2863,9 @@
         solution_index = node_a_index;
         break;
       case TimeComparison::kInvalid: {
-        CheckInvalidDistance(result_times, solution);
+        if (!CheckInvalidDistance(result_times, solution)) {
+          return std::nullopt;
+        }
         if (next_node_filter) {
           std::optional<std::tuple<logger::BootTimestamp, logger::BootDuration>>
               result = next_node_filter->Consume();
@@ -2814,18 +2888,18 @@
   return std::make_tuple(next_filter, std::move(result_times), solution_index);
 }
 
-std::tuple<NoncausalTimestampFilter *, std::vector<BootTimestamp>, int>
+std::optional<
+    std::tuple<NoncausalTimestampFilter *, std::vector<BootTimestamp>, int>>
 MultiNodeNoncausalOffsetEstimator::NextSolution(
     TimestampProblem *problem, const std::vector<BootTimestamp> &base_times) {
   // Ok, now solve for the minimum time on each channel.
-  std::vector<BootTimestamp> result_times;
-
   bool boots_all_match = true;
   std::vector<CandidateTimes> candidate_times;
   std::tie(candidate_times, boots_all_match) = MakeCandidateTimes();
 
-  NoncausalTimestampFilter *next_filter = nullptr;
-  size_t solution_index = 0;
+  std::optional<
+      std::tuple<NoncausalTimestampFilter *, std::vector<BootTimestamp>, int>>
+      result;
 
   // We can significantly speed things up if we know that all the boots match by
   // solving for everything at once.  If the boots don't match, the combined min
@@ -2833,21 +2907,21 @@
   // actually using the boot from the candidate times to figure out which
   // interpolation function to use under the hood.
   if (boots_all_match) {
-    std::tie(next_filter, result_times, solution_index) =
+    result =
         SimultaneousSolution(problem, std::move(candidate_times), base_times);
   } else {
     // If all the boots don't match, fall back to the old method of comparing
     // all the solutions individually.
-    std::tie(next_filter, result_times, solution_index) =
+    result =
         SequentialSolution(problem, std::move(candidate_times), base_times);
   }
-  if (VLOG_IS_ON(1)) {
-    VLOG(1) << "Best solution is for node " << solution_index;
-    for (size_t i = 0; i < result_times.size(); ++i) {
-      VLOG(1) << "  " << result_times[i];
+  if (VLOG_IS_ON(1) && result.has_value()) {
+    VLOG(1) << "Best solution is for node " << std::get<2>(*result);
+    for (size_t i = 0; i < std::get<1>(*result).size(); ++i) {
+      VLOG(1) << "  " << std::get<1>(*result)[i];
     }
   }
-  return std::make_tuple(next_filter, std::move(result_times), solution_index);
+  return result;
 }
 
 void MultiNodeNoncausalOffsetEstimator::UpdateSolution(
@@ -2917,19 +2991,24 @@
   }
 }
 
-std::optional<
-    std::tuple<distributed_clock::time_point, std::vector<BootTimestamp>>>
+std::optional<std::optional<
+    std::tuple<distributed_clock::time_point, std::vector<BootTimestamp>>>>
 MultiNodeNoncausalOffsetEstimator::NextTimestamp() {
   // TODO(austin): Detect and handle there being fewer nodes in the log file
   // than in replay, or them being in a different order.
   TimestampProblem problem = MakeProblem();
 
   // Ok, now solve for the minimum time on each channel.
-  std::vector<BootTimestamp> result_times;
+  auto next_solution = NextSolution(&problem, last_monotonics_);
+  if (!next_solution.has_value()) {
+    return std::nullopt;
+  }
+  std::vector<BootTimestamp> result_times =
+      std::move(std::get<1>(next_solution.value()));
   NoncausalTimestampFilter *next_filter = nullptr;
   int solution_node_index = 0;
-  std::tie(next_filter, result_times, solution_node_index) =
-      NextSolution(&problem, last_monotonics_);
+  std::tie(next_filter, std::ignore, solution_node_index) =
+      next_solution.value();
 
   CHECK(!all_done_);
 
@@ -2969,7 +3048,13 @@
     if (fp_) {
       fflush(fp_);
     }
-    return std::nullopt;
+    std::optional<std::optional<
+        std::tuple<distributed_clock::time_point, std::vector<BootTimestamp>>>>
+        result;
+    result.emplace(std::nullopt);
+    CHECK(result.has_value());
+    CHECK(!result.value().has_value());
+    return result;
   }
 
   {
@@ -3026,11 +3111,13 @@
         }
         UpdateSolution(std::move(result_times));
         WriteFilter(next_filter, sample);
-        FlushAndClose(false);
-        LOG(FATAL)
+        if (!FlushAndClose(false)) {
+          return std::nullopt;
+        }
+        LOG(ERROR)
             << "Found a solution before the last returned solution on node "
             << solution_node_index;
-        break;
+        return std::nullopt;
       case TimeComparison::kEq:
         WriteFilter(next_filter, sample);
         return NextTimestamp();
@@ -3053,9 +3140,12 @@
         }
         UpdateSolution(std::move(result_times));
         WriteFilter(next_filter, sample);
-        FlushAndClose(false);
-        LOG(FATAL) << "Please investigate.  Use --max_invalid_distance_ns="
+        if (!FlushAndClose(false)) {
+          return std::nullopt;
+        }
+        LOG(ERROR) << "Please investigate.  Use --max_invalid_distance_ns="
                    << invalid_distance.count() << " to ignore this.";
+        return std::nullopt;
       } break;
     }
   }
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index e3bfde2..959fe85 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -121,7 +121,8 @@
   // Returns the solution, the solution_node field from the final derivatives,
   // the number of iterations it took, and a list of the constraints actually
   // used in the solution.
-  std::tuple<Eigen::VectorXd, size_t, size_t, std::vector<size_t>>
+  std::optional<
+      std::tuple<Eigen::VectorXd, size_t, size_t, std::vector<size_t>>>
   SolveConstrainedNewton(Problem *problem, size_t max_iterations,
                          const absl::Span<const size_t> constraint_indices);
 
@@ -354,8 +355,9 @@
 
   // Queues 1 more timestamp in the interpolation list.  This is public for
   // timestamp_extractor so it can hammer on the log until everything is queued.
-  std::optional<const std::tuple<distributed_clock::time_point,
-                                 std::vector<logger::BootTimestamp>> *>
+  // Return type matches that of NextTimestamp().
+  [[nodiscard]] std::optional<std::optional<const std::tuple<
+      distributed_clock::time_point, std::vector<logger::BootTimestamp>> *>>
   QueueNextTimestamp();
 
  private:
@@ -364,8 +366,12 @@
   // A timestamp is a sample of the distributed clock and a corresponding point
   // on every monotonic clock for all the nodes in the factory that this will be
   // hooked up to.
-  virtual std::optional<std::tuple<distributed_clock::time_point,
-                                   std::vector<logger::BootTimestamp>>>
+  // If !NextTimestamp().has_value(), then an error occurred. If
+  // !NextTimestamp().value().has_value(), then there is merely no next
+  // timestamp available.
+  // TODO(james): Swap this to std::expected when available.
+  virtual std::optional<std::optional<std::tuple<
+      distributed_clock::time_point, std::vector<logger::BootTimestamp>>>>
   NextTimestamp() = 0;
 
   // Queues timestamps util the last time in the queue matches the provided
@@ -373,7 +379,9 @@
   template <typename F>
   void QueueUntil(F not_done) {
     while (!at_end_ && (times_.empty() || not_done(times_.back()))) {
-      QueueNextTimestamp();
+      // Check the outer std::optional to watch for errors.
+      CHECK(QueueNextTimestamp().has_value())
+          << ": An error occurred when queueing timestamps.";
     }
 
     CHECK(!times_.empty())
@@ -472,10 +480,6 @@
   void SetTimestampMappers(
       std::vector<logger::TimestampMapper *> timestamp_mappers);
 
-  std::optional<std::tuple<distributed_clock::time_point,
-                           std::vector<logger::BootTimestamp>>>
-  NextTimestamp() override;
-
   UUID boot_uuid(size_t node_index, size_t boot_count) override;
 
   // Checks that all the nodes in the graph are connected.  Needs all filters to
@@ -504,6 +508,14 @@
   // Returns the configuration that we are replaying into.
   const aos::Configuration *configuration() const { return configuration_; }
 
+  // Runs some checks that normally run with fatal CHECK's in the destructor.
+  // Returns false if any checks failed. This is used to allow the
+  // logfile_validator to non-fatally identify certain log sorting issues.
+  [[nodiscard]] bool RunDestructorChecks() {
+    non_fatal_destructor_checks_ = true;
+    return FlushAndClose(false);
+  }
+
  private:
   static constexpr int kMaxIterations = 400;
 
@@ -516,35 +528,39 @@
 
   TimestampProblem MakeProblem();
 
+  std::optional<std::optional<std::tuple<distributed_clock::time_point,
+                                         std::vector<logger::BootTimestamp>>>>
+  NextTimestamp() override;
+
   // Returns the list of candidate times to solve for.
   std::tuple<std::vector<CandidateTimes>, bool> MakeCandidateTimes() const;
 
   // Returns the next solution, the filter which has the control point for it
   // (or nullptr if a start time triggered this to be returned), and the node
   // which triggered it.
-  std::tuple<NoncausalTimestampFilter *, std::vector<logger::BootTimestamp>,
-             int>
+  std::optional<std::tuple<NoncausalTimestampFilter *,
+                           std::vector<logger::BootTimestamp>, int>>
   NextSolution(TimestampProblem *problem,
                const std::vector<logger::BootTimestamp> &base_times);
 
   // Returns the solution (if there is one) for the list of candidate times by
   // solving all the problems simultaneously.  They must be from the same boot.
-  std::tuple<NoncausalTimestampFilter *, std::vector<logger::BootTimestamp>,
-             int>
+  std::optional<std::tuple<NoncausalTimestampFilter *,
+                           std::vector<logger::BootTimestamp>, int>>
   SimultaneousSolution(TimestampProblem *problem,
                        const std::vector<CandidateTimes> candidate_times,
                        const std::vector<logger::BootTimestamp> &base_times);
 
   // Returns the solution (if there is one) for the list of candidate times by
   // solving the problems one after another.  They can be from any boot.
-  std::tuple<NoncausalTimestampFilter *, std::vector<logger::BootTimestamp>,
-             int>
+  std::optional<std::tuple<NoncausalTimestampFilter *,
+                           std::vector<logger::BootTimestamp>, int>>
   SequentialSolution(TimestampProblem *problem,
                      const std::vector<CandidateTimes> candidate_times,
                      const std::vector<logger::BootTimestamp> &base_times);
 
-  // Explodes if the invalid distance is too far.
-  void CheckInvalidDistance(
+  // Returns false if the invalid distance is too far.
+  [[nodiscard]] bool CheckInvalidDistance(
       const std::vector<logger::BootTimestamp> &result_times,
       const std::vector<logger::BootTimestamp> &solution);
 
@@ -560,15 +576,18 @@
 
   // Writes everything to disk anc closes it all out in preparation for either
   // destruction or crashing.
-  void FlushAndClose(bool destructor);
+  // Returns true if everything is healthy; returns false if we discovered
+  // sorting issues when closing things out.
+  [[nodiscard]] bool FlushAndClose(bool destructor);
 
   const Configuration *configuration_;
   const Configuration *logged_configuration_;
 
   std::shared_ptr<const logger::Boots> boots_;
 
-  // If true, skip any validation which would trigger if we see evidence that
-  // time estimation between nodes was incorrect.
+  // If true, don't fatally die on any order validation failures which would
+  // trigger if we see evidence that time estimation between nodes was
+  // incorrect.
   const bool skip_order_validation_;
 
   // List of filters for a connection.  The pointer to the first node will be
@@ -598,6 +617,7 @@
 
   bool first_solution_ = true;
   bool all_done_ = false;
+  bool non_fatal_destructor_checks_ = false;
 
   // Optional file pointers to save the results of the noncausal filter in. This
   // lives here so we can give each sample a distributed clock.
diff --git a/aos/network/multinode_timestamp_filter_test.cc b/aos/network/multinode_timestamp_filter_test.cc
index 9b37806..9aa76c0 100644
--- a/aos/network/multinode_timestamp_filter_test.cc
+++ b/aos/network/multinode_timestamp_filter_test.cc
@@ -407,7 +407,7 @@
 
   std::vector<size_t> used_constraints;
   std::tie(y, solution_node, iterations, used_constraints) =
-      solver.SolveConstrainedNewton(&problem, 20, constraints);
+      solver.SolveConstrainedNewton(&problem, 20, constraints).value();
 
   LOG(INFO) << y.transpose();
 
diff --git a/aos/network/testing_time_converter.cc b/aos/network/testing_time_converter.cc
index c175fe6..9dc5fff 100644
--- a/aos/network/testing_time_converter.cc
+++ b/aos/network/testing_time_converter.cc
@@ -22,7 +22,10 @@
 
 TestingTimeConverter::~TestingTimeConverter() {
   if (at_end_) {
-    CHECK(!NextTimestamp()) << ": At the end but there is more data.";
+    auto next_timestamp = NextTimestamp();
+    CHECK(next_timestamp.has_value()) << ": Unexpected error";
+    CHECK(!next_timestamp.value().has_value())
+        << ": At the end but there is more data.";
   }
 }
 
@@ -105,13 +108,17 @@
   ts_.emplace_back(std::make_tuple(time, std::move(times)));
 }
 
-std::optional<std::tuple<distributed_clock::time_point,
-                         std::vector<logger::BootTimestamp>>>
+std::optional<std::optional<std::tuple<distributed_clock::time_point,
+                                       std::vector<logger::BootTimestamp>>>>
 TestingTimeConverter::NextTimestamp() {
   CHECK(!first_) << ": Tried to pull a timestamp before one was added.  This "
                     "is unlikely to be what you want.";
   if (ts_.empty()) {
-    return std::nullopt;
+    std::optional<std::optional<std::tuple<distributed_clock::time_point,
+                                           std::vector<logger::BootTimestamp>>>>
+        result;
+    result.emplace(std::nullopt);
+    return result;
   }
   auto result = ts_.front();
   ts_.pop_front();
diff --git a/aos/network/testing_time_converter.h b/aos/network/testing_time_converter.h
index 20b8e16..ae85cee 100644
--- a/aos/network/testing_time_converter.h
+++ b/aos/network/testing_time_converter.h
@@ -41,8 +41,8 @@
   void AddNextTimestamp(distributed_clock::time_point time,
                         std::vector<logger::BootTimestamp> times);
 
-  std::optional<std::tuple<distributed_clock::time_point,
-                           std::vector<logger::BootTimestamp>>>
+  std::optional<std::optional<std::tuple<distributed_clock::time_point,
+                                         std::vector<logger::BootTimestamp>>>>
   NextTimestamp() override;
 
   void set_boot_uuid(size_t node_index, size_t boot_count, UUID uuid) {