Allow non-fatal timestamp solver failures
This refactors the timestamp filters so that it is possible to observe
timestamp solver failures without having the application crash.
This also makes it so that the timestamp_extractor code gets at least
somewhat exercised by our tests.
The only behavioral change this should cause for normal users is that
the stack trace will look slightly different when errors occur, because
the LOG(FATAL) now occurs when evaluating the result of QueueNextTimestamp()
instead of deep in the code (the prior LOG(FATAL)'s are replaced by
LOG(ERROR)'s to keep them visible).
Change-Id: I01e4de8df724c2853cc62ac588668b811563b33d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index a687748..c33606c 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -545,6 +545,24 @@
],
)
+cc_library(
+ name = "logfile_validator",
+ srcs = [
+ "logfile_validator.cc",
+ ],
+ hdrs = ["logfile_validator.h"],
+ target_compatible_with = ["@platforms//os:linux"],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":logfile_utils",
+ "//aos:configuration",
+ "//aos/events:simulated_event_loop",
+ "//aos/network:multinode_timestamp_filter",
+ "@com_github_gflags_gflags//:gflags",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
cc_binary(
name = "timestamp_extractor",
srcs = [
@@ -554,6 +572,7 @@
visibility = ["//visibility:public"],
deps = [
":logfile_utils",
+ ":logfile_validator",
"//aos:configuration",
"//aos:init",
"//aos/events:simulated_event_loop",
@@ -652,6 +671,7 @@
deps = [
":log_reader",
":log_writer",
+ ":logfile_validator",
":snappy_encoder",
"//aos/events:message_counter",
"//aos/events:ping_lib",
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 3cdfba1..e1aa620 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -288,6 +288,8 @@
// Returns true if we have timestamps in any of the files.
bool HasTimestamps(std::string_view node_name) const;
+ const std::vector<LogFile> &log_files() const { return log_files_; }
+
private:
LogFilesContainer(std::optional<const LogSource *> log_source,
std::vector<LogFile> log_files);
diff --git a/aos/events/logging/logfile_validator.cc b/aos/events/logging/logfile_validator.cc
new file mode 100644
index 0000000..19d11c8
--- /dev/null
+++ b/aos/events/logging/logfile_validator.cc
@@ -0,0 +1,162 @@
+
+#include "aos/events/logging/logfile_validator.h"
+
+#include "aos/events/logging/logfile_sorting.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logfile_validator.h"
+#include "aos/network/multinode_timestamp_filter.h"
+
+namespace aos::logger {
+bool MultiNodeLogIsReadable(const LogFilesContainer &log_files,
+ bool skip_order_validation) {
+ const Configuration *config = log_files.config().get();
+
+ CHECK(configuration::MultiNode(config))
+ << ": Timestamps only make sense in a multi-node world.";
+
+ // Now, build up all the TimestampMapper classes to read and sort the data.
+ std::vector<std::unique_ptr<TimestampMapper>> mappers;
+
+ for (const Node *node : configuration::GetNodes(config)) {
+ auto node_name = MaybeNodeName(node);
+ // Confirm that all the parts are from the same boot if there are enough
+ // parts to not be from the same boot.
+ if (log_files.ContainsPartsForNode(node_name)) {
+ // Filter the parts relevant to each node when building the mapper.
+ mappers.emplace_back(std::make_unique<TimestampMapper>(
+ node_name, log_files, TimestampQueueStrategy::kQueueTogether));
+ } else {
+ mappers.emplace_back(nullptr);
+ }
+ }
+
+ // Now, build up the estimator used to solve for time.
+ message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
+ config, config, log_files.boots(), skip_order_validation,
+ std::chrono::seconds(0));
+ multinode_estimator.set_reboot_found(
+ [config](distributed_clock::time_point reboot_time,
+ const std::vector<logger::BootTimestamp> &node_times) {
+ LOG(INFO) << "Rebooted at distributed " << reboot_time;
+ size_t node_index = 0;
+ for (const logger::BootTimestamp &time : node_times) {
+ LOG(INFO) << " "
+ << config->nodes()->Get(node_index)->name()->string_view()
+ << " " << time;
+ ++node_index;
+ }
+ });
+
+ // Because RAII doesn't let us do non-fatal/non-exception things, use this
+ // when returning to handle certain cleanup-related checks that would normally
+ // happen fatally in the estimator destrictor.
+ auto preempt_destructor = [&multinode_estimator](bool success) {
+ if (!multinode_estimator.RunDestructorChecks()) {
+ return false;
+ }
+ return success;
+ };
+
+ {
+ std::vector<TimestampMapper *> timestamp_mappers;
+ for (std::unique_ptr<TimestampMapper> &mapper : mappers) {
+ timestamp_mappers.emplace_back(mapper.get());
+ }
+ multinode_estimator.SetTimestampMappers(std::move(timestamp_mappers));
+ }
+
+ // To make things more like the logger and faster, cache the node + channel ->
+ // filter mapping in a set of vectors.
+ std::vector<std::vector<message_bridge::NoncausalOffsetEstimator *>> filters;
+ filters.resize(configuration::NodesCount(config));
+
+ for (const Node *node : configuration::GetNodes(config)) {
+ const size_t node_index = configuration::GetNodeIndex(config, node);
+ filters[node_index].resize(config->channels()->size(), nullptr);
+ for (size_t channel_index = 0; channel_index < config->channels()->size();
+ ++channel_index) {
+ const Channel *channel = config->channels()->Get(channel_index);
+
+ if (!configuration::ChannelIsSendableOnNode(channel, node) &&
+ configuration::ChannelIsReadableOnNode(channel, node)) {
+ // We've got a message which is being forwarded to this node.
+ const Node *source_node = configuration::GetNode(
+ config, channel->source_node()->string_view());
+ filters[node_index][channel_index] =
+ multinode_estimator.GetFilter(node, source_node);
+ }
+ }
+ }
+
+ multinode_estimator.CheckGraph();
+
+ // Now, read all the timestamps for each node. This is simpler than the
+ // logger on purpose. It loads in *all* the timestamps in 1 go per node,
+ // ignoring memory usage.
+ for (const Node *node : configuration::GetNodes(config)) {
+ LOG(INFO) << "Reading all data for " << node->name()->string_view();
+ const size_t node_index = configuration::GetNodeIndex(config, node);
+ TimestampMapper *timestamp_mapper = mappers[node_index].get();
+ if (timestamp_mapper == nullptr) {
+ continue;
+ }
+ while (true) {
+ TimestampedMessage *m = timestamp_mapper->Front();
+ if (m == nullptr) {
+ break;
+ }
+ timestamp_mapper->PopFront();
+ }
+ }
+
+ // Don't get clever. Use the first time as the start time. Note: this is
+ // different than how log_cat and others work.
+ std::optional<std::optional<const std::tuple<distributed_clock::time_point,
+ std::vector<BootTimestamp>> *>>
+ next_timestamp = multinode_estimator.QueueNextTimestamp();
+ if (!next_timestamp.has_value() || !next_timestamp.value().has_value()) {
+ return preempt_destructor(false);
+ }
+ LOG(INFO) << "Starting at:";
+ for (const Node *node : configuration::GetNodes(config)) {
+ const size_t node_index = configuration::GetNodeIndex(config, node);
+ LOG(INFO) << " " << node->name()->string_view() << " -> "
+ << std::get<1>(*next_timestamp.value().value())[node_index].time;
+ }
+
+ std::vector<monotonic_clock::time_point> just_monotonic(
+ std::get<1>(*next_timestamp.value().value()).size());
+ for (size_t i = 0; i < just_monotonic.size(); ++i) {
+ CHECK_EQ(std::get<1>(*next_timestamp.value().value())[i].boot, 0u);
+ just_monotonic[i] = std::get<1>(*next_timestamp.value().value())[i].time;
+ }
+ multinode_estimator.Start(just_monotonic);
+
+ // As we pull off all the timestamps, the time problem is continually solved,
+ // filling in the CSV files.
+ while (true) {
+ std::optional<std::optional<const std::tuple<distributed_clock::time_point,
+ std::vector<BootTimestamp>> *>>
+ next_timestamp = multinode_estimator.QueueNextTimestamp();
+ if (!next_timestamp.has_value()) {
+ return preempt_destructor(false);
+ }
+ if (!next_timestamp.value().has_value()) {
+ break;
+ }
+ multinode_estimator.ObserveTimePassed(
+ std::get<0>(*next_timestamp.value().value()));
+ }
+
+ LOG(INFO) << "Done";
+
+ return preempt_destructor(true);
+}
+
+bool LogIsReadableIfMultiNode(const LogFilesContainer &log_files) {
+ if (aos::configuration::NodesCount(log_files.config().get()) == 1u) {
+ return true;
+ }
+ return MultiNodeLogIsReadable(log_files);
+}
+} // namespace aos::logger
diff --git a/aos/events/logging/logfile_validator.h b/aos/events/logging/logfile_validator.h
new file mode 100644
index 0000000..814b89e
--- /dev/null
+++ b/aos/events/logging/logfile_validator.h
@@ -0,0 +1,16 @@
+#ifndef AOS_EVENTS_LOGGING_LOGFILE_VALIDATOR_H_
+#define AOS_EVENTS_LOGGING_LOGFILE_VALIDATOR_H_
+#include "aos/events/logging/logfile_sorting.h"
+namespace aos::logger {
+// Attempts to validate that a log is readable without actually running the full
+// LogReader. This aims to allow the user to preempt fatal crashes that can
+// occur when trying to replay a log.
+// Returns true if successful.
+bool MultiNodeLogIsReadable(const LogFilesContainer &log_files,
+ bool skip_order_validation = false);
+
+// Returns true if the requested log is either a single-node log or if the
+// MultiNodeLogIsReadable() returns true.
+bool LogIsReadableIfMultiNode(const LogFilesContainer &log_files);
+} // namespace aos::logger
+#endif // AOS_EVENTS_LOGGING_LOGFILE_VALIDATOR_H_
diff --git a/aos/events/logging/multinode_logger_test_lib.cc b/aos/events/logging/multinode_logger_test_lib.cc
index 9a905b1..64dd1cd 100644
--- a/aos/events/logging/multinode_logger_test_lib.cc
+++ b/aos/events/logging/multinode_logger_test_lib.cc
@@ -3,6 +3,7 @@
#include "aos/events/event_loop.h"
#include "aos/events/logging/log_reader.h"
#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logfile_validator.h"
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
@@ -442,6 +443,7 @@
reader.Deregister();
}
+ CHECK(LogIsReadableIfMultiNode(LogFilesContainer{SortParts(files)}));
{
std::vector<std::pair<std::vector<realtime_clock::time_point>,
std::vector<realtime_clock::time_point>>>
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index ca7f231..3cf96f6 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -6,8 +6,8 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logfile_validator.h"
#include "aos/init.h"
-#include "aos/network/multinode_timestamp_filter.h"
DECLARE_bool(timestamps_to_csv);
DEFINE_bool(skip_order_validation, false,
@@ -15,135 +15,9 @@
namespace aos::logger {
-namespace chrono = std::chrono;
-
int Main(int argc, char **argv) {
const LogFilesContainer log_files(SortParts(FindLogs(argc, argv)));
- const Configuration *config = log_files.config().get();
-
- CHECK(configuration::MultiNode(config))
- << ": Timestamps only make sense in a multi-node world.";
-
- // Now, build up all the TimestampMapper classes to read and sort the data.
- std::vector<std::unique_ptr<TimestampMapper>> mappers;
-
- for (const Node *node : configuration::GetNodes(config)) {
- auto node_name = MaybeNodeName(node);
- // Confirm that all the parts are from the same boot if there are enough
- // parts to not be from the same boot.
- if (!log_files.ContainsPartsForNode(node_name)) {
- // Filter the parts relevant to each node when building the mapper.
- mappers.emplace_back(std::make_unique<TimestampMapper>(
- node_name, log_files, TimestampQueueStrategy::kQueueTogether));
- } else {
- mappers.emplace_back(nullptr);
- }
- }
-
- // Now, build up the estimator used to solve for time.
- message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
- config, config, log_files.boots(), FLAGS_skip_order_validation,
- chrono::seconds(0));
- multinode_estimator.set_reboot_found(
- [config](distributed_clock::time_point reboot_time,
- const std::vector<logger::BootTimestamp> &node_times) {
- LOG(INFO) << "Rebooted at distributed " << reboot_time;
- size_t node_index = 0;
- for (const logger::BootTimestamp &time : node_times) {
- LOG(INFO) << " "
- << config->nodes()->Get(node_index)->name()->string_view()
- << " " << time;
- ++node_index;
- }
- });
-
- {
- std::vector<TimestampMapper *> timestamp_mappers;
- for (std::unique_ptr<TimestampMapper> &mapper : mappers) {
- timestamp_mappers.emplace_back(mapper.get());
- }
- multinode_estimator.SetTimestampMappers(std::move(timestamp_mappers));
- }
-
- // To make things more like the logger and faster, cache the node + channel ->
- // filter mapping in a set of vectors.
- std::vector<std::vector<message_bridge::NoncausalOffsetEstimator *>> filters;
- filters.resize(configuration::NodesCount(config));
-
- for (const Node *node : configuration::GetNodes(config)) {
- const size_t node_index = configuration::GetNodeIndex(config, node);
- filters[node_index].resize(config->channels()->size(), nullptr);
- for (size_t channel_index = 0; channel_index < config->channels()->size();
- ++channel_index) {
- const Channel *channel = config->channels()->Get(channel_index);
-
- if (!configuration::ChannelIsSendableOnNode(channel, node) &&
- configuration::ChannelIsReadableOnNode(channel, node)) {
- // We've got a message which is being forwarded to this node.
- const Node *source_node = configuration::GetNode(
- config, channel->source_node()->string_view());
- filters[node_index][channel_index] =
- multinode_estimator.GetFilter(node, source_node);
- }
- }
- }
-
- multinode_estimator.CheckGraph();
-
- // Now, read all the timestamps for each node. This is simpler than the
- // logger on purpose. It loads in *all* the timestamps in 1 go per node,
- // ignoring memory usage.
- for (const Node *node : configuration::GetNodes(config)) {
- LOG(INFO) << "Reading all data for " << node->name()->string_view();
- const size_t node_index = configuration::GetNodeIndex(config, node);
- TimestampMapper *timestamp_mapper = mappers[node_index].get();
- if (timestamp_mapper == nullptr) {
- continue;
- }
- while (true) {
- TimestampedMessage *m = timestamp_mapper->Front();
- if (m == nullptr) {
- break;
- }
- timestamp_mapper->PopFront();
- }
- }
-
- // Don't get clever. Use the first time as the start time. Note: this is
- // different than how log_cat and others work.
- std::optional<const std::tuple<distributed_clock::time_point,
- std::vector<BootTimestamp>> *>
- next_timestamp = multinode_estimator.QueueNextTimestamp();
- CHECK(next_timestamp);
- LOG(INFO) << "Starting at:";
- for (const Node *node : configuration::GetNodes(config)) {
- const size_t node_index = configuration::GetNodeIndex(config, node);
- LOG(INFO) << " " << node->name()->string_view() << " -> "
- << std::get<1>(*next_timestamp.value())[node_index].time;
- }
-
- std::vector<monotonic_clock::time_point> just_monotonic(
- std::get<1>(*next_timestamp.value()).size());
- for (size_t i = 0; i < just_monotonic.size(); ++i) {
- CHECK_EQ(std::get<1>(*next_timestamp.value())[i].boot, 0u);
- just_monotonic[i] = std::get<1>(*next_timestamp.value())[i].time;
- }
- multinode_estimator.Start(just_monotonic);
-
- // As we pull off all the timestamps, the time problem is continually solved,
- // filling in the CSV files.
- while (true) {
- std::optional<const std::tuple<distributed_clock::time_point,
- std::vector<BootTimestamp>> *>
- next_timestamp = multinode_estimator.QueueNextTimestamp();
- if (!next_timestamp) {
- break;
- }
- multinode_estimator.ObserveTimePassed(std::get<0>(*next_timestamp.value()));
- }
-
- LOG(INFO) << "Done";
-
+ CHECK(MultiNodeLogIsReadable(log_files, FLAGS_skip_order_validation));
return 0;
}