Handle start times before the start of a log file
We have seen log files where the start time is before any data after a
node reboots. The side effect of this is that we try to call the
startup callback at the start time, but we only officially reboot at the
first message, so we can't actually start at the start time. The world
then explodes.
Add a test reproducing it, and then fix it.
Change-Id: I7de986a6f7df0d050a0c8164827d27c799221fbf
Signed-off-by: Austin Schuh <austin.linux@gmail.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index fc4da55..6d28343 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -141,7 +141,7 @@
"@com_github_google_flatbuffers//:flatbuffers",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/types:span",
- "@snappy//:snappy",
+ "@snappy",
],
)
@@ -387,6 +387,21 @@
)
aos_config(
+ name = "multinode_pingpong_split3_config",
+ src = "multinode_pingpong_split3.json",
+ flatbuffers = [
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ "//aos/network:remote_message_fbs",
+ "//aos/network:timestamp_fbs",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = ["//aos/events:config"],
+)
+
+aos_config(
name = "multinode_pingpong_combined_config",
src = "multinode_pingpong_combined.json",
flatbuffers = [
@@ -411,6 +426,7 @@
}),
data = [
":multinode_pingpong_combined_config",
+ ":multinode_pingpong_split3_config",
":multinode_pingpong_split_config",
"//aos/events:pingpong_config",
],
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index d8f8b04..35ef498 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -724,7 +724,16 @@
state->monotonic_remote_now(timestamped_message.channel_index);
if (!FLAGS_skip_order_validation) {
CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
- monotonic_remote_now.boot);
+ monotonic_remote_now.boot)
+ << state->event_loop()->node()->name()->string_view() << " to "
+ << state->remote_node(timestamped_message.channel_index)
+ ->name()
+ ->string_view()
+ << " while trying to send a message on "
+ << configuration::CleanedChannelToString(
+ logged_configuration()->channels()->Get(
+ timestamped_message.channel_index))
+ << " " << timestamped_message << " " << state->DebugString();
CHECK_LE(timestamped_message.monotonic_remote_time,
monotonic_remote_now)
<< state->event_loop()->node()->name()->string_view() << " to "
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 1f5c9a4..6e4b9b4 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -20,6 +20,9 @@
// Maps the node index to a set of all boots for that node.
std::vector<std::vector<std::string>> boots;
+
+ // TODO(austin): Aggregated start time should live here. This is a property
+ // of sorting!
};
// Datastructure to hold ordered parts.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index a0ae78a..b58933e 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -456,6 +456,97 @@
return ostream;
}
+struct LoggerState {
+ static LoggerState MakeLogger(NodeEventLoopFactory *node,
+ SimulatedEventLoopFactory *factory,
+ CompressionParams params,
+ const Configuration *configuration = nullptr) {
+ if (configuration == nullptr) {
+ configuration = factory->configuration();
+ }
+ return {node->MakeEventLoop("logger"),
+ {},
+ configuration,
+ configuration::GetNode(configuration, node->node()),
+ nullptr,
+ params};
+ }
+
+ void StartLogger(std::string logfile_base) {
+ CHECK(!logfile_base.empty());
+
+ logger = std::make_unique<Logger>(event_loop.get(), configuration);
+ logger->set_polling_period(std::chrono::milliseconds(100));
+ logger->set_name(
+ absl::StrCat("name_prefix_", event_loop->node()->name()->str()));
+ event_loop->OnRun([this, logfile_base]() {
+ std::unique_ptr<MultiNodeLogNamer> namer =
+ std::make_unique<MultiNodeLogNamer>(logfile_base, configuration,
+ event_loop.get(), node);
+ namer->set_extension(params.extension);
+ namer->set_encoder_factory(params.encoder_factory);
+ log_namer = namer.get();
+
+ logger->StartLogging(std::move(namer));
+ });
+ }
+
+ std::unique_ptr<EventLoop> event_loop;
+ std::unique_ptr<Logger> logger;
+ const Configuration *configuration;
+ const Node *node;
+ MultiNodeLogNamer *log_namer;
+ CompressionParams params;
+
+ void AppendAllFilenames(std::vector<std::string> *filenames) {
+ for (const std::string &file : log_namer->all_filenames()) {
+ const std::string_view separator =
+ log_namer->base_name().back() == '/' ? "" : "_";
+ filenames->emplace_back(
+ absl::StrCat(log_namer->base_name(), separator, file));
+ }
+ }
+
+ ~LoggerState() {
+ if (logger) {
+ for (const std::string &file : log_namer->all_filenames()) {
+ LOG(INFO) << "Wrote to " << file;
+ }
+ }
+ }
+};
+
+void ConfirmReadable(const std::vector<std::string> &files) {
+ {
+ LogReader reader(SortParts(files));
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ reader.Register(&log_reader_factory);
+
+ log_reader_factory.Run();
+
+ reader.Deregister();
+ }
+ {
+ LogReader reader(SortParts(files));
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ reader.RegisterWithoutStarting(&log_reader_factory);
+ if (configuration::MultiNode(log_reader_factory.configuration())) {
+ for (const aos::Node *node :
+ *log_reader_factory.configuration()->nodes()) {
+ reader.OnStart(node, [node]() {
+ LOG(INFO) << "Starting " << node->name()->string_view();
+ });
+ }
+ }
+
+ log_reader_factory.Run();
+
+ reader.Deregister();
+ }
+}
+
class MultinodeLoggerTest : public ::testing::TestWithParam<
std::tuple<ConfigParams, CompressionParams>> {
public:
@@ -722,28 +813,14 @@
return absl::StrCat(".bfbs", std::get<1>(GetParam()).extension);
}
- struct LoggerState {
- std::unique_ptr<EventLoop> event_loop;
- std::unique_ptr<Logger> logger;
- const Configuration *configuration;
- const Node *node;
- MultiNodeLogNamer *log_namer;
- };
-
LoggerState MakeLogger(NodeEventLoopFactory *node,
SimulatedEventLoopFactory *factory = nullptr,
const Configuration *configuration = nullptr) {
if (factory == nullptr) {
factory = &event_loop_factory_;
}
- if (configuration == nullptr) {
- configuration = factory->configuration();
- }
- return {node->MakeEventLoop("logger"),
- {},
- configuration,
- configuration::GetNode(configuration, node->node()),
- nullptr};
+ return LoggerState::MakeLogger(node, factory, std::get<1>(GetParam()),
+ configuration);
}
void StartLogger(LoggerState *logger, std::string logfile_base = "") {
@@ -754,23 +831,7 @@
logfile_base = logfile_base2_;
}
}
-
- logger->logger = std::make_unique<Logger>(logger->event_loop.get(),
- logger->configuration);
- logger->logger->set_polling_period(std::chrono::milliseconds(100));
- logger->logger->set_name(absl::StrCat(
- "name_prefix_", logger->event_loop->node()->name()->str()));
- logger->event_loop->OnRun([logger, logfile_base]() {
- std::unique_ptr<MultiNodeLogNamer> namer =
- std::make_unique<MultiNodeLogNamer>(
- logfile_base, logger->configuration, logger->event_loop.get(),
- logger->node);
- namer->set_extension(std::get<1>(GetParam()).extension);
- namer->set_encoder_factory(std::get<1>(GetParam()).encoder_factory);
- logger->log_namer = namer.get();
-
- logger->logger->StartLogging(std::move(namer));
- });
+ logger->StartLogger(logfile_base);
}
void VerifyParts(const std::vector<LogFile> &sorted_parts,
@@ -844,17 +905,6 @@
EXPECT_THAT(sorted_parts[1].corrupted, ::testing::Eq(corrupted_parts));
}
- void ConfirmReadable(const std::vector<std::string> &files) {
- LogReader reader(SortParts(files));
-
- SimulatedEventLoopFactory log_reader_factory(reader.configuration());
- reader.Register(&log_reader_factory);
-
- log_reader_factory.Run();
-
- reader.Deregister();
- }
-
void AddExtension(std::string_view extension) {
std::transform(logfiles_.begin(), logfiles_.end(), logfiles_.begin(),
[extension](const std::string &in) {
@@ -1042,12 +1092,11 @@
sorted_log_files[0].config;
// Timing reports, pings
- EXPECT_THAT(
- CountChannelsData(config, logfiles_[2]),
- UnorderedElementsAre(
- std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
- 1),
- std::make_tuple("/test", "aos.examples.Ping", 1)))
+ EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos",
+ "aos.message_bridge.ServerStatistics", 1),
+ std::make_tuple("/test", "aos.examples.Ping", 1)))
<< " : " << logfiles_[2];
EXPECT_THAT(
CountChannelsData(config, logfiles_[3]),
@@ -2189,8 +2238,7 @@
// up a clock difference between 2 nodes and looking at the resulting parts.
TEST_P(MultinodeLoggerTest, LoggerStartTime) {
time_converter_.AddMonotonic(
- {BootTimestamp::epoch(),
- BootTimestamp::epoch() + chrono::seconds(1000)});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
{
LoggerState pi1_logger = MakeLogger(pi1_);
LoggerState pi2_logger = MakeLogger(pi2_);
@@ -2228,8 +2276,7 @@
util::UnlinkRecursive(tmp_dir_ + "/renamefolder");
util::UnlinkRecursive(tmp_dir_ + "/new-good");
time_converter_.AddMonotonic(
- {BootTimestamp::epoch(),
- BootTimestamp::epoch() + chrono::seconds(1000)});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
logfile_base1_ = tmp_dir_ + "/renamefolder/multi_logfile1";
logfile_base2_ = tmp_dir_ + "/renamefolder/multi_logfile2";
logfiles_ = MakeLogFiles(logfile_base1_, logfile_base2_);
@@ -2475,8 +2522,7 @@
TEST_P(MultinodeLoggerTest, OneDirectionWithNegativeSlope) {
pi1_->Disconnect(pi2_->node());
time_converter_.AddMonotonic(
- {BootTimestamp::epoch(),
- BootTimestamp::epoch() + chrono::seconds(1000)});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
time_converter_.AddMonotonic(
{chrono::milliseconds(10000),
@@ -2501,8 +2547,7 @@
TEST_P(MultinodeLoggerTest, OneDirectionWithPositiveSlope) {
pi1_->Disconnect(pi2_->node());
time_converter_.AddMonotonic(
- {BootTimestamp::epoch(),
- BootTimestamp::epoch() + chrono::seconds(500)});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(500)});
time_converter_.AddMonotonic(
{chrono::milliseconds(10000),
@@ -2526,8 +2571,7 @@
// error than an out of order error.
TEST_P(MultinodeLoggerTest, DuplicateLogFiles) {
time_converter_.AddMonotonic(
- {BootTimestamp::epoch(),
- BootTimestamp::epoch() + chrono::seconds(1000)});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
{
LoggerState pi1_logger = MakeLogger(pi1_);
@@ -2552,8 +2596,7 @@
pi1_->Disconnect(pi2_->node());
pi2_->Disconnect(pi1_->node());
time_converter_.AddMonotonic(
- {BootTimestamp::epoch(),
- BootTimestamp::epoch() + chrono::seconds(1000)});
+ {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
{
LoggerState pi1_logger = MakeLogger(pi1_);
@@ -2650,8 +2693,8 @@
MakeLogger(log_reader_factory.GetNodeEventLoopFactory("pi2"),
&log_reader_factory, reader.logged_configuration());
- StartLogger(&pi1_logger, tmp_dir_ + "/relogged1");
- StartLogger(&pi2_logger, tmp_dir_ + "/relogged2");
+ pi1_logger.StartLogger(tmp_dir_ + "/relogged1");
+ pi2_logger.StartLogger(tmp_dir_ + "/relogged2");
log_reader_factory.Run();
@@ -2675,6 +2718,148 @@
}
}
+// Tests that we properly replay a log where the start time for a node is before
+// any data on the node. This can happen if the logger starts before data is
+// published. While the scenario below is a bit convoluted, we have seen logs
+// like this generated out in the wild.
+TEST(MultinodeRebootLoggerTest, StartTimeBeforeData) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(ArtifactPath(
+ "aos/events/logging/multinode_pingpong_split3_config.json"));
+ message_bridge::TestingTimeConverter time_converter(
+ configuration::NodesCount(&config.message()));
+ SimulatedEventLoopFactory event_loop_factory(&config.message());
+ event_loop_factory.SetTimeConverter(&time_converter);
+ NodeEventLoopFactory *const pi1 =
+ event_loop_factory.GetNodeEventLoopFactory("pi1");
+ const size_t pi1_index = configuration::GetNodeIndex(
+ event_loop_factory.configuration(), pi1->node());
+ NodeEventLoopFactory *const pi2 =
+ event_loop_factory.GetNodeEventLoopFactory("pi2");
+ const size_t pi2_index = configuration::GetNodeIndex(
+ event_loop_factory.configuration(), pi2->node());
+ NodeEventLoopFactory *const pi3 =
+ event_loop_factory.GetNodeEventLoopFactory("pi3");
+ const size_t pi3_index = configuration::GetNodeIndex(
+ event_loop_factory.configuration(), pi3->node());
+
+ const std::string kLogfile1_1 =
+ aos::testing::TestTmpDir() + "/multi_logfile1/";
+ const std::string kLogfile2_1 =
+ aos::testing::TestTmpDir() + "/multi_logfile2.1/";
+ const std::string kLogfile2_2 =
+ aos::testing::TestTmpDir() + "/multi_logfile2.2/";
+ const std::string kLogfile3_1 =
+ aos::testing::TestTmpDir() + "/multi_logfile3/";
+ util::UnlinkRecursive(kLogfile1_1);
+ util::UnlinkRecursive(kLogfile2_1);
+ util::UnlinkRecursive(kLogfile2_2);
+ util::UnlinkRecursive(kLogfile3_1);
+ const UUID pi1_boot0 = UUID::Random();
+ const UUID pi2_boot0 = UUID::Random();
+ const UUID pi2_boot1 = UUID::Random();
+ const UUID pi3_boot0 = UUID::Random();
+ {
+ CHECK_EQ(pi1_index, 0u);
+ CHECK_EQ(pi2_index, 1u);
+ CHECK_EQ(pi3_index, 2u);
+
+ time_converter.set_boot_uuid(pi1_index, 0, pi1_boot0);
+ time_converter.set_boot_uuid(pi2_index, 0, pi2_boot0);
+ time_converter.set_boot_uuid(pi2_index, 1, pi2_boot1);
+ time_converter.set_boot_uuid(pi3_index, 0, pi3_boot0);
+
+ time_converter.AddNextTimestamp(
+ distributed_clock::epoch(),
+ {BootTimestamp::epoch(), BootTimestamp::epoch(),
+ BootTimestamp::epoch()});
+ const chrono::nanoseconds reboot_time = chrono::milliseconds(20000);
+ time_converter.AddNextTimestamp(
+ distributed_clock::epoch() + reboot_time,
+ {BootTimestamp::epoch() + reboot_time,
+ BootTimestamp{
+ .boot = 1,
+ .time = monotonic_clock::epoch() + chrono::milliseconds(1323)},
+ BootTimestamp::epoch() + reboot_time});
+ }
+
+ // Make everything perfectly quiet.
+ event_loop_factory.SkipTimingReport();
+ event_loop_factory.DisableStatistics();
+
+ std::vector<std::string> filenames;
+ {
+ LoggerState pi1_logger = LoggerState::MakeLogger(
+ pi1, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+ LoggerState pi3_logger = LoggerState::MakeLogger(
+ pi3, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+ {
+ // And now start the logger.
+ LoggerState pi2_logger = LoggerState::MakeLogger(
+ pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+
+ event_loop_factory.RunFor(chrono::milliseconds(1000));
+
+ pi1_logger.StartLogger(kLogfile1_1);
+ pi3_logger.StartLogger(kLogfile3_1);
+ pi2_logger.StartLogger(kLogfile2_1);
+
+ event_loop_factory.RunFor(chrono::milliseconds(10000));
+
+ // Now that we've got a start time in the past, turn on data.
+ event_loop_factory.EnableStatistics();
+ std::unique_ptr<aos::EventLoop> ping_event_loop =
+ pi1->MakeEventLoop("ping");
+ Ping ping(ping_event_loop.get());
+
+ pi2->AlwaysStart<Pong>("pong");
+
+ event_loop_factory.RunFor(chrono::milliseconds(3000));
+
+ pi2_logger.AppendAllFilenames(&filenames);
+
+ // Stop logging on pi2 before rebooting and completely shut off all
+ // messages on pi2.
+ pi2->DisableStatistics();
+ pi1->Disconnect(pi2->node());
+ pi2->Disconnect(pi1->node());
+ }
+ event_loop_factory.RunFor(chrono::milliseconds(7000));
+ // pi2 now reboots.
+ {
+ event_loop_factory.RunFor(chrono::milliseconds(1000));
+
+ // Start logging again on pi2 after it is up.
+ LoggerState pi2_logger = LoggerState::MakeLogger(
+ pi2, &event_loop_factory, SupportedCompressionAlgorithms()[0]);
+ pi2_logger.StartLogger(kLogfile2_2);
+
+ event_loop_factory.RunFor(chrono::milliseconds(10000));
+ // And, now that we have a start time in the log, turn data back on.
+ pi2->EnableStatistics();
+ pi1->Connect(pi2->node());
+ pi2->Connect(pi1->node());
+
+ pi2->AlwaysStart<Pong>("pong");
+ std::unique_ptr<aos::EventLoop> ping_event_loop =
+ pi1->MakeEventLoop("ping");
+ Ping ping(ping_event_loop.get());
+
+ event_loop_factory.RunFor(chrono::milliseconds(3000));
+
+ pi2_logger.AppendAllFilenames(&filenames);
+ }
+
+ pi1_logger.AppendAllFilenames(&filenames);
+ pi3_logger.AppendAllFilenames(&filenames);
+ }
+
+ // Confirm that we can parse the result. LogReader has enough internal CHECKs
+ // to confirm the right thing happened.
+ const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ ConfirmReadable(filenames);
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/multinode_pingpong_split3.json b/aos/events/logging/multinode_pingpong_split3.json
new file mode 100644
index 0000000..151bac3
--- /dev/null
+++ b/aos/events/logging/multinode_pingpong_split3.json
@@ -0,0 +1,258 @@
+{
+ "channels": [
+ {
+ "name": "/pi1/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi1",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi2",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi3",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ /* Logged on pi1 locally */
+ {
+ "name": "/pi1/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi3",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi3"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "logger": "LOCAL_LOGGER",
+ "source_node": "pi3"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi2"],
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
+ },
+ {
+ "name": "pi3",
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
+ }
+ ]
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi1"],
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
+ }
+ ]
+ },
+ {
+ "name": "/pi3/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi1"],
+ "source_node": "pi3",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi3"]
+ }
+ ]
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi3/pi1/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi3/aos/remote_timestamps/pi1/pi3/aos/aos-message_bridge-Timestamp",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi3"
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "num_senders": 2,
+ "source_node": "pi1",
+ "frequency": 150
+ },
+ /* Forwarded to pi2 */
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": [
+ "pi1"
+ ],
+ "time_to_live": 5000000
+ }
+ ],
+ "frequency": 150
+ },
+ /* Forwarded back to pi1.
+ * The message is logged both on the sending node and the receiving node
+ * (to make it easier to look at the results for now).
+ *
+ * The timestamps are logged on the receiving node.
+ */
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi1"],
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ],
+ "frequency": 150
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/pi1/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/pi2/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi3"
+ },
+ "rename": {
+ "name": "/pi3/aos"
+ }
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2",
+ "port": 9971
+ },
+ {
+ "name": "pi3",
+ "hostname": "raspberrypi3",
+ "port": 9971
+ }
+ ]
+}
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index 0cfc19b..f7f5a40 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -33,6 +33,7 @@
// all the parts were sorted together and the configs match.
const Configuration *config = nullptr;
for (const LogFile &log_file : log_files) {
+ VLOG(1) << log_file;
if (config == nullptr) {
config = log_file.config.get();
} else {
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index edfa582..4d18d6b 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -1299,8 +1299,36 @@
++node_a_index;
continue;
}
- VLOG(1) << "Trying " << next_node_time << " " << next_node_duration
- << " for node " << node_a_index;
+
+ // We want to make sure we solve explicitly for the start time for each
+ // log. This is useless (though not all that expensive) if it is in the
+ // middle of a set of data since we are just adding an extra point in the
+ // middle of a line, but very useful if the start time is before any
+ // points and we need to force a node to reboot.
+ //
+ // We can only do this meaningfully if there are data points on this node
+ // before or after this node to solve for.
+ const size_t next_boot = last_monotonics_[node_a_index].boot + 1;
+ if (next_boot < boots_->boots[node_a_index].size() &&
+ timestamp_mappers_[node_a_index] != nullptr) {
+ BootTimestamp next_start_time = BootTimestamp{
+ .boot = next_boot,
+ .time = timestamp_mappers_[node_a_index]->monotonic_start_time(
+ next_boot)};
+ if (next_start_time < next_node_time) {
+ VLOG(1) << "Candidate for node " << node_a_index
+ << " is the next startup time, " << next_start_time;
+ next_node_time = next_start_time;
+ next_node_filter = nullptr;
+ }
+ }
+
+ if (next_node_filter != nullptr) {
+ VLOG(1) << "Trying " << next_node_time << " " << next_node_duration
+ << " for node " << node_a_index;
+ } else {
+ VLOG(1) << "Trying " << next_node_time << " for node " << node_a_index;
+ }
// TODO(austin): If we start supporting only having 1 direction of
// timestamps, we might need to change our assumptions around
@@ -1358,7 +1386,9 @@
VLOG(1) << " " << solution[i];
}
}
+
if (result_times.empty()) {
+ // This is the first solution candidate, so don't bother comparing.
result_times = std::move(solution);
next_filter = next_node_filter;
solution_index = node_a_index;
@@ -1393,12 +1423,14 @@
<< "ns";
}
VLOG(1) << "Ignoring because it is close enough.";
- std::optional<
- std::tuple<logger::BootTimestamp, logger::BootDuration>>
- result = next_node_filter->Consume();
- CHECK(result);
- next_node_filter->Pop(std::get<0>(*result) -
- time_estimation_buffer_seconds_);
+ if (next_node_filter) {
+ std::optional<
+ std::tuple<logger::BootTimestamp, logger::BootDuration>>
+ result = next_node_filter->Consume();
+ CHECK(result);
+ next_node_filter->Pop(std::get<0>(*result) -
+ time_estimation_buffer_seconds_);
+ }
break;
}
// Somehow the new solution is better *and* worse than the old
@@ -1415,12 +1447,14 @@
}
if (skip_order_validation_) {
- std::optional<
- std::tuple<logger::BootTimestamp, logger::BootDuration>>
- result = next_node_filter->Consume();
- CHECK(result);
- next_node_filter->Pop(std::get<0>(*result) -
- time_estimation_buffer_seconds_);
+ if (next_node_filter) {
+ std::optional<
+ std::tuple<logger::BootTimestamp, logger::BootDuration>>
+ result = next_node_filter->Consume();
+ CHECK(result);
+ next_node_filter->Pop(std::get<0>(*result) -
+ time_estimation_buffer_seconds_);
+ }
LOG(ERROR) << "Skipping because --skip_order_validation";
break;
} else {
@@ -1458,7 +1492,7 @@
CHECK(!all_done_);
// All done.
- if (next_filter == nullptr) {
+ if (result_times.empty()) {
if (first_solution_) {
VLOG(1) << "No more timestamps and the first solution.";
// If this is our first time, there is no solution. Instead of giving up
@@ -1496,7 +1530,6 @@
return std::nullopt;
}
-
std::tuple<logger::BootTimestamp, logger::BootDuration> sample;
if (first_solution_) {
first_solution_ = false;
@@ -1508,11 +1541,17 @@
time = BootTimestamp::epoch();
}
}
- sample = *next_filter->Consume();
- next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
+ if (next_filter) {
+ // This isn't a start time because we have a corresponding filter.
+ sample = *next_filter->Consume();
+ next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
+ }
} else {
- sample = *next_filter->Consume();
- next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
+ if (next_filter) {
+ // This isn't a start time because we have a corresponding filter.
+ sample = *next_filter->Consume();
+ next_filter->Pop(std::get<0>(sample) - time_estimation_buffer_seconds_);
+ }
// We found a good sample, so consume it. If it is a duplicate, we still
// want to consume it. But, if this is the first time around, we want to
// re-solve by recursing (once) to pickup the better base.
@@ -1583,7 +1622,7 @@
}
}
- if (filter_fps_.size() > 0) {
+ if (filter_fps_.size() > 0 && next_filter) {
const int node_a_index =
configuration::GetNodeIndex(configuration(), next_filter->node_a());
const int node_b_index =
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index 8d776b8..7de8041 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -331,6 +331,9 @@
private:
TimestampProblem MakeProblem();
+ // Returns the next solution, the filter which has the control point for it
+ // (or nullptr if a start time triggered this to be returned), and the node
+ // which triggered it.
std::tuple<NoncausalTimestampFilter *, std::vector<logger::BootTimestamp>,
int>
NextSolution(TimestampProblem *problem,