Support nodes rebooting
We have log files which span a reboot. We want to be able to replay the
timeline across that reboot so we can run simulations and everything
else interesting.
This requires a bunch of stuff, unfortunately.
The most visible one is that we need to be able to "reboot" a node.
This means we need a way of starting it up and stopping it. There are
now OnStartup and OnShutdown handlers in NodeEventLoopFactory to serve
this purpose, and better application context tracking to make it easier
to start and stop applications through a virtual starter.
This requires LogReader and the simulated network bridge to be
refactored to support nodes coming and going while the main application
continues to run.
From there, everything else is just a massive amount of plumbing of the
BootTimestamp through everything just short of the user. Boot UUIDs
were put in TimeConverter so everything related to rebooting is all
nicely together.
Change-Id: I2cfb659c5764c1dd80dc66f33cfab3937159e324
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 5079606..5a9040e 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -79,6 +79,223 @@
}
}
+// Prints out raw log parts to stdout.
+int PrintRaw(int argc, char **argv) {
+ if (argc != 2) {
+ LOG(FATAL) << "Expected 1 logfile as an argument.";
+ }
+ aos::logger::SpanReader reader(argv[1]);
+ absl::Span<const uint8_t> raw_log_file_header_span = reader.ReadMessage();
+
+ if (raw_log_file_header_span == absl::Span<const uint8_t>()) {
+ LOG(WARNING) << "Empty log file on " << reader.filename();
+ return 0;
+ }
+
+ // Now, reproduce the log file header deduplication logic inline so we can
+ // print out all the headers we find.
+ aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader> log_file_header(
+ raw_log_file_header_span);
+ if (!log_file_header.Verify()) {
+ LOG(ERROR) << "Header corrupted on " << reader.filename();
+ return 1;
+ }
+ while (true) {
+ absl::Span<const uint8_t> maybe_header_data = reader.PeekMessage();
+ if (maybe_header_data == absl::Span<const uint8_t>()) {
+ break;
+ }
+
+ aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
+ maybe_header_data);
+ if (maybe_header.Verify()) {
+ std::cout << aos::FlatbufferToJson(
+ log_file_header, {.multi_line = FLAGS_pretty,
+ .max_vector_size = static_cast<size_t>(
+ FLAGS_max_vector_size)})
+ << std::endl;
+ LOG(WARNING) << "Found duplicate LogFileHeader in " << reader.filename();
+ log_file_header =
+ aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>(
+ maybe_header_data);
+
+ reader.ConsumeMessage();
+ } else {
+ break;
+ }
+ }
+
+ // And now use the final sha256 to match the raw_header.
+ std::optional<aos::logger::MessageReader> raw_header_reader;
+ const aos::logger::LogFileHeader *full_header = &log_file_header.message();
+ if (!FLAGS_raw_header.empty()) {
+ raw_header_reader.emplace(FLAGS_raw_header);
+ std::cout << aos::FlatbufferToJson(full_header,
+ {.multi_line = FLAGS_pretty,
+ .max_vector_size = static_cast<size_t>(
+ FLAGS_max_vector_size)})
+ << std::endl;
+ CHECK_EQ(
+ full_header->configuration_sha256()->string_view(),
+ aos::logger::Sha256(raw_header_reader->raw_log_file_header().span()));
+ full_header = raw_header_reader->log_file_header();
+ }
+
+ if (!FLAGS_print) {
+ return 0;
+ }
+
+ std::cout << aos::FlatbufferToJson(full_header,
+ {.multi_line = FLAGS_pretty,
+ .max_vector_size = static_cast<size_t>(
+ FLAGS_max_vector_size)})
+ << std::endl;
+ CHECK(full_header->has_configuration())
+ << ": Missing configuration! You may want to provide the path to the "
+ "logged configuration file using the --raw_header flag.";
+
+ while (true) {
+ const aos::SizePrefixedFlatbufferSpan<aos::logger::MessageHeader> message(
+ reader.ReadMessage());
+ if (message.span() == absl::Span<const uint8_t>()) {
+ break;
+ }
+ CHECK(message.Verify());
+
+ const auto *const channels = full_header->configuration()->channels();
+ const size_t channel_index = message.message().channel_index();
+ CHECK_LT(channel_index, channels->size());
+ const aos::Channel *const channel = channels->Get(channel_index);
+
+ CHECK(message.Verify()) << absl::BytesToHexString(
+ std::string_view(reinterpret_cast<const char *>(message.span().data()),
+ message.span().size()));
+
+ if (message.message().data() != nullptr) {
+ CHECK(channel->has_schema());
+
+ CHECK(flatbuffers::Verify(
+ *channel->schema(), *channel->schema()->root_table(),
+ message.message().data()->data(), message.message().data()->size()))
+ << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
+ << channel->type()->c_str();
+ }
+
+ if (FLAGS_format_raw && message.message().data() != nullptr) {
+ std::cout << aos::configuration::StrippedChannelToString(channel) << " "
+ << aos::FlatbufferToJson(message, {.multi_line = FLAGS_pretty,
+ .max_vector_size = 4})
+ << ": "
+ << aos::FlatbufferToJson(
+ channel->schema(), message.message().data()->data(),
+ {FLAGS_pretty,
+ static_cast<size_t>(FLAGS_max_vector_size)})
+ << std::endl;
+ } else {
+ std::cout << aos::configuration::StrippedChannelToString(channel) << " "
+ << aos::FlatbufferToJson(
+ message, {FLAGS_pretty,
+ static_cast<size_t>(FLAGS_max_vector_size)})
+ << std::endl;
+ }
+ }
+ return 0;
+}
+
+// This class prints out all data from a node on a boot.
+class NodePrinter {
+ public:
+ NodePrinter(aos::EventLoop *event_loop, uint64_t *message_print_counter,
+ aos::SimulatedEventLoopFactory *factory,
+ aos::FastStringBuilder *builder)
+ : factory_(factory),
+ event_loop_(event_loop),
+ message_print_counter_(message_print_counter),
+ node_name_(
+ event_loop_->node() == nullptr
+ ? ""
+ : std::string(event_loop->node()->name()->string_view()) + " "),
+ builder_(builder) {
+ event_loop_->SkipTimingReport();
+ event_loop_->SkipAosLog();
+
+ const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
+ event_loop_->configuration()->channels();
+
+ for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
+ const aos::Channel *channel = channels->Get(i);
+ const flatbuffers::string_view name = channel->name()->string_view();
+ const flatbuffers::string_view type = channel->type()->string_view();
+ if (name.find(FLAGS_name) != std::string::npos &&
+ type.find(FLAGS_type) != std::string::npos) {
+ if (!aos::configuration::ChannelIsReadableOnNode(channel,
+ event_loop_->node())) {
+ continue;
+ }
+ VLOG(1) << "Listening on " << name << " " << type;
+
+ CHECK_NOTNULL(channel->schema());
+ event_loop_->MakeRawWatcher(
+ channel, [this, channel](const aos::Context &context,
+ const void * /*message*/) {
+ if (!FLAGS_print) {
+ return;
+ }
+
+ if (!FLAGS_fetch && !started_) {
+ return;
+ }
+
+ PrintMessage(node_name_, channel, context, builder_);
+ ++(*message_print_counter_);
+ if (FLAGS_count > 0 && *message_print_counter_ >= FLAGS_count) {
+ factory_->Exit();
+ }
+ });
+ }
+ }
+ }
+
+ void SetStarted(bool started, aos::monotonic_clock::time_point monotonic_now,
+ aos::realtime_clock::time_point realtime_now) {
+ started_ = started;
+ if (started_) {
+ std::cout << std::endl;
+ std::cout << (event_loop_->node() != nullptr
+ ? (event_loop_->node()->name()->str() + " ")
+ : "")
+ << "Log starting at " << realtime_now << " (" << monotonic_now
+ << ")";
+ std::cout << std::endl << std::endl;
+ } else {
+ std::cout << std::endl;
+ std::cout << (event_loop_->node() != nullptr
+ ? (event_loop_->node()->name()->str() + " ")
+ : "")
+ << "Log shutting down at " << realtime_now << " ("
+ << monotonic_now << ")";
+ std::cout << std::endl << std::endl;
+ }
+ }
+
+ private:
+ struct MessageInfo {
+ std::string node_name;
+ std::unique_ptr<aos::RawFetcher> fetcher;
+ };
+
+ aos::SimulatedEventLoopFactory *factory_;
+ aos::EventLoop *event_loop_;
+
+ uint64_t *message_print_counter_ = nullptr;
+
+ std::string node_name_;
+
+ bool started_ = false;
+
+ aos::FastStringBuilder *builder_;
+};
+
int main(int argc, char **argv) {
gflags::SetUsageMessage(
"Usage:\n"
@@ -95,138 +312,15 @@
aos::InitGoogle(&argc, &argv);
if (FLAGS_raw) {
- if (argc != 2) {
- LOG(FATAL) << "Expected 1 logfile as an argument.";
- }
- aos::logger::SpanReader reader(argv[1]);
- absl::Span<const uint8_t> raw_log_file_header_span = reader.ReadMessage();
-
- if (raw_log_file_header_span == absl::Span<const uint8_t>()) {
- LOG(WARNING) << "Empty log file on " << reader.filename();
- return 0;
- }
-
- // Now, reproduce the log file header deduplication logic inline so we can
- // print out all the headers we find.
- aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>
- log_file_header(raw_log_file_header_span);
- if (!log_file_header.Verify()) {
- LOG(ERROR) << "Header corrupted on " << reader.filename();
- return 1;
- }
- while (true) {
- absl::Span<const uint8_t> maybe_header_data = reader.PeekMessage();
- if (maybe_header_data == absl::Span<const uint8_t>()) {
- break;
- }
-
- aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
- maybe_header_data);
- if (maybe_header.Verify()) {
- std::cout << aos::FlatbufferToJson(
- log_file_header,
- {.multi_line = FLAGS_pretty,
- .max_vector_size =
- static_cast<size_t>(FLAGS_max_vector_size)})
- << std::endl;
- LOG(WARNING) << "Found duplicate LogFileHeader in "
- << reader.filename();
- log_file_header =
- aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>(
- maybe_header_data);
-
- reader.ConsumeMessage();
- } else {
- break;
- }
- }
-
- // And now use the final sha256 to match the raw_header.
- std::optional<aos::logger::MessageReader> raw_header_reader;
- const aos::logger::LogFileHeader *full_header = &log_file_header.message();
- if (!FLAGS_raw_header.empty()) {
- raw_header_reader.emplace(FLAGS_raw_header);
- std::cout << aos::FlatbufferToJson(
- full_header, {.multi_line = FLAGS_pretty,
- .max_vector_size = static_cast<size_t>(
- FLAGS_max_vector_size)})
- << std::endl;
- CHECK_EQ(
- full_header->configuration_sha256()->string_view(),
- aos::logger::Sha256(raw_header_reader->raw_log_file_header().span()));
- full_header = raw_header_reader->log_file_header();
- }
-
- if (!FLAGS_print) {
- return 0;
- }
-
- std::cout << aos::FlatbufferToJson(full_header,
- {.multi_line = FLAGS_pretty,
- .max_vector_size = static_cast<size_t>(
- FLAGS_max_vector_size)})
- << std::endl;
- CHECK(full_header->has_configuration())
- << ": Missing configuration! You may want to provide the path to the "
- "logged configuration file using the --raw_header flag.";
-
- while (true) {
- const aos::SizePrefixedFlatbufferSpan<aos::logger::MessageHeader> message(
- reader.ReadMessage());
- if (message.span() == absl::Span<const uint8_t>()) {
- break;
- }
- CHECK(message.Verify());
-
- const auto *const channels = full_header->configuration()->channels();
- const size_t channel_index = message.message().channel_index();
- CHECK_LT(channel_index, channels->size());
- const aos::Channel *const channel = channels->Get(channel_index);
-
- CHECK(message.Verify()) << absl::BytesToHexString(std::string_view(
- reinterpret_cast<const char *>(message.span().data()),
- message.span().size()));
-
- if (message.message().data() != nullptr) {
- CHECK(channel->has_schema());
-
- CHECK(flatbuffers::Verify(
- *channel->schema(), *channel->schema()->root_table(),
- message.message().data()->data(), message.message().data()->size()))
- << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
- << channel->type()->c_str();
- }
-
- if (FLAGS_format_raw && message.message().data() != nullptr) {
- std::cout << aos::configuration::StrippedChannelToString(channel) << " "
- << aos::FlatbufferToJson(message, {.multi_line = FLAGS_pretty,
- .max_vector_size = 4})
- << ": "
- << aos::FlatbufferToJson(
- channel->schema(), message.message().data()->data(),
- {FLAGS_pretty,
- static_cast<size_t>(FLAGS_max_vector_size)})
- << std::endl;
- } else {
- std::cout << aos::configuration::StrippedChannelToString(channel) << " "
- << aos::FlatbufferToJson(
- message, {FLAGS_pretty,
- static_cast<size_t>(FLAGS_max_vector_size)})
- << std::endl;
- }
- }
- return 0;
+ return PrintRaw(argc, argv);
}
if (argc < 2) {
LOG(FATAL) << "Expected at least 1 logfile as an argument.";
}
- const std::vector<std::string> unsorted_logfiles =
- aos::logger::FindLogs(argc, argv);
-
const std::vector<aos::logger::LogFile> logfiles =
- aos::logger::SortParts(unsorted_logfiles);
+ aos::logger::SortParts(aos::logger::FindLogs(argc, argv));
for (auto &it : logfiles) {
VLOG(1) << it;
@@ -249,32 +343,10 @@
return 0;
}
- aos::FastStringBuilder builder;
-
- aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
- reader.Register(&event_loop_factory);
-
- std::vector<std::unique_ptr<aos::EventLoop>> printer_event_loops;
-
- bool found_channel = false;
-
- uint64_t message_print_counter = 0;
-
- for (const aos::Node *node :
- aos::configuration::GetNodes(event_loop_factory.configuration())) {
- std::unique_ptr<aos::EventLoop> printer_event_loop =
- event_loop_factory.MakeEventLoop("printer", node);
- printer_event_loop->SkipTimingReport();
- printer_event_loop->SkipAosLog();
-
- struct MessageInfo {
- std::string node_name;
- std::unique_ptr<aos::RawFetcher> fetcher;
- };
- std::vector<MessageInfo> messages_before_start;
-
+ {
+ bool found_channel = false;
const flatbuffers::Vector<flatbuffers::Offset<aos::Channel>> *channels =
- printer_event_loop->configuration()->channels();
+ reader.configuration()->channels();
for (flatbuffers::uoffset_t i = 0; i < channels->size(); i++) {
const aos::Channel *channel = channels->Get(i);
@@ -282,91 +354,54 @@
const flatbuffers::string_view type = channel->type()->string_view();
if (name.find(FLAGS_name) != std::string::npos &&
type.find(FLAGS_type) != std::string::npos) {
- if (!aos::configuration::ChannelIsReadableOnNode(
- channel, printer_event_loop->node())) {
- continue;
- }
- VLOG(1) << "Listening on " << name << " " << type;
-
- std::string node_name =
- node == nullptr ? ""
- : std::string(node->name()->string_view()) + " ";
-
- CHECK_NOTNULL(channel->schema());
-
- // Fetch the last message on this channel from before the log start
- // time.
- if (FLAGS_fetch) {
- std::unique_ptr<aos::RawFetcher> fetcher =
- printer_event_loop->MakeRawFetcher(channel);
- if (fetcher->Fetch()) {
- MessageInfo message{.node_name = node_name,
- .fetcher = std::move(fetcher)};
- // Insert it sorted into the vector so we can print in time order
- // instead of channel order at the start.
- auto it = std::lower_bound(
- messages_before_start.begin(), messages_before_start.end(),
- message, [](const MessageInfo &lhs, const MessageInfo &rhs) {
- if (lhs.fetcher->context().monotonic_event_time <
- rhs.fetcher->context().monotonic_event_time) {
- return true;
- }
- if (lhs.fetcher->context().monotonic_event_time >
- rhs.fetcher->context().monotonic_event_time) {
- return false;
- }
- return lhs.fetcher->channel() < rhs.fetcher->channel();
- });
- messages_before_start.insert(it, std::move(message));
- }
- }
-
- printer_event_loop->MakeRawWatcher(
- channel, [channel, node_name, &builder, &event_loop_factory,
- &message_print_counter](const aos::Context &context,
- const void * /*message*/) {
- if (FLAGS_print) {
- PrintMessage(node_name, channel, context, &builder);
- ++message_print_counter;
- if (FLAGS_count > 0 && message_print_counter >= FLAGS_count) {
- event_loop_factory.Exit();
- }
- }
- });
found_channel = true;
}
}
-
- // Print the messages from before the log start time.
- // TODO(austin): Sort between nodes too when it becomes annoying enough.
- for (const MessageInfo &message : messages_before_start) {
- if (FLAGS_print) {
- PrintMessage(message.node_name, message.fetcher->channel(),
- message.fetcher->context(), &builder);
- ++message_print_counter;
- if (FLAGS_count > 0 && message_print_counter >= FLAGS_count) {
- // We are done. Clean up and exit.
- reader.Deregister();
- return 0;
- }
- }
+ if (!found_channel) {
+ LOG(FATAL) << "Could not find any channels";
}
- printer_event_loops.emplace_back(std::move(printer_event_loop));
-
- std::cout << std::endl;
- std::cout << (node != nullptr ? (node->name()->str() + " ") : "")
- << "Log starting at " << reader.realtime_start_time(node) << " ("
- << reader.monotonic_start_time(node) << ")";
- std::cout << std::endl << std::endl;
}
- if (!found_channel) {
- LOG(FATAL) << "Could not find any channels";
- }
+ aos::FastStringBuilder builder;
- if (FLAGS_fetch) {
- // New line to separate fetched messages from non-fetched messages.
- std::cout << std::endl;
+ aos::SimulatedEventLoopFactory event_loop_factory(reader.configuration());
+
+ reader.RegisterWithoutStarting(&event_loop_factory);
+
+ uint64_t message_print_counter = 0;
+
+ std::vector<NodePrinter *> printers;
+ printers.resize(
+ aos::configuration::NodesCount(event_loop_factory.configuration()),
+ nullptr);
+
+ for (const aos::Node *node :
+ aos::configuration::GetNodes(event_loop_factory.configuration())) {
+ size_t node_index = aos::configuration::GetNodeIndex(
+ event_loop_factory.configuration(), node);
+ // Spin up the printer, and hook up the SetStarted method so that it gets
+ // notified when the log starts and stops.
+ aos::NodeEventLoopFactory *node_factory =
+ event_loop_factory.GetNodeEventLoopFactory(node);
+ node_factory->OnStartup([&event_loop_factory, node_factory,
+ &message_print_counter, &builder, &printers,
+ node_index]() {
+ printers[node_index] = node_factory->AlwaysStart<NodePrinter>(
+ "printer", &message_print_counter, &event_loop_factory, &builder);
+ });
+ node_factory->OnShutdown(
+ [&printers, node_index]() { printers[node_index] = nullptr; });
+
+ reader.OnStart(node, [&printers, node_index, node_factory]() {
+ CHECK(printers[node_index]);
+ printers[node_index]->SetStarted(true, node_factory->monotonic_now(),
+ node_factory->realtime_now());
+ });
+ reader.OnEnd(node, [&printers, node_index, node_factory]() {
+ CHECK(printers[node_index]);
+ printers[node_index]->SetStarted(false, node_factory->monotonic_now(),
+ node_factory->realtime_now());
+ });
}
event_loop_factory.Run();