LogReplayer to replay log with selected channels on active nodes
Log replayer checks if the channels specified in the replayer json
config are available to replay. Remaps the channels which are needed to
be remapped. Plots the timing report to compare the replay times with
actual send times in the log and stdouts the specified config or the
logged config depending on the flags specified. And replays the messages
using a shared memory event loop to keep it realtime.
Change-Id: I9afbd5c2dfd309b544baffacfbc10bc5892943fd
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 87ebfc5..364cabb 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -27,6 +27,17 @@
visibility = ["//visibility:public"],
)
+flatbuffer_cc_library(
+ name = "replay_config_fbs",
+ srcs = ["log_replayer_config.fbs"],
+ gen_reflections = True,
+ includes = [
+ "//aos:configuration_fbs_includes",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ visibility = ["//visibility:public"],
+)
+
cc_library(
name = "boot_timestamp",
srcs = ["boot_timestamp.cc"],
@@ -38,6 +49,51 @@
],
)
+cc_binary(
+ name = "log_replayer",
+ srcs = [
+ "log_replayer.cc",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = [
+ ":log_reader",
+ ":log_reader_utils",
+ ":replay_config_fbs",
+ ":replay_timing_fbs",
+ ":replay_timing_schema",
+ "//aos:configuration",
+ "//aos:init",
+ "//aos:json_to_flatbuffer",
+ "//aos/events:shm_event_loop",
+ "@com_github_gflags_gflags//:gflags",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+cc_library(
+ name = "log_reader_utils",
+ srcs = [
+ "log_reader_utils.cc",
+ ],
+ hdrs = [
+ "log_reader_utils.h",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ visibility = ["//visibility:public"],
+ deps = [
+ ":log_reader",
+ "//aos:configuration",
+ "//aos:flatbuffer_merge",
+ "//aos:flatbuffers",
+ "//aos:uuid",
+ "//aos/containers:resizeable_buffer",
+ "//aos/events:event_loop",
+ "//aos/util:file",
+ "@com_google_absl//absl/strings",
+ ],
+)
+
cc_library(
name = "logfile_utils",
srcs = [
diff --git a/aos/events/logging/log_reader_utils.cc b/aos/events/logging/log_reader_utils.cc
new file mode 100644
index 0000000..295f092
--- /dev/null
+++ b/aos/events/logging/log_reader_utils.cc
@@ -0,0 +1,232 @@
+#include "aos/events/logging/log_reader_utils.h"
+
+#include "absl/strings/str_join.h"
+
+namespace {
+
+struct ChannelsExtractorAccumulator {
+ // A set of senders, watchers and fetchers to have unique channels
+ std::set<int> senders;
+ std::set<int> watchers;
+ std::set<int> fetchers;
+
+ // observed_applications are all the applications for which timing reports
+ // have been found
+ std::vector<std::set<std::string>> observed_applications;
+
+ // remaining_applications are the vector of applications that have not been
+ // found on the nodes specified
+ std::vector<std::set<std::string>> remaining_applications;
+};
+
+void HandleChannelsInApplications(
+ const aos::timing::Report &report, const size_t nodes_index,
+ aos::SimulatedEventLoopFactory *factory,
+ const aos::Configuration *logged_configuration,
+ const aos::logger::ChannelsInLogOptions &options,
+ ChannelsExtractorAccumulator *results) {
+ std::string name = report.name()->str();
+ if (results->observed_applications[nodes_index].count(name) > 0) {
+ if (!results->remaining_applications[nodes_index].empty()) {
+ LOG(FATAL) << "Didn't see timing reports for every application! "
+ << absl::StrJoin(results->remaining_applications[nodes_index],
+ ", ");
+ } else {
+ factory->Exit();
+ }
+ }
+ if (results->remaining_applications[nodes_index].count(name) == 0) {
+ return;
+ }
+ results->observed_applications[nodes_index].insert(name);
+ results->remaining_applications[nodes_index].erase(name);
+
+ if (options.get_senders) {
+ if (report.has_senders()) {
+ for (const aos::timing::Sender *sender : *report.senders()) {
+ CHECK_LT(0, sender->channel_index());
+ CHECK_LT(static_cast<size_t>(sender->channel_index()),
+ logged_configuration->channels()->size());
+ results->senders.insert(sender->channel_index());
+ }
+ }
+ }
+
+ if (options.get_watchers) {
+ if (report.has_watchers()) {
+ for (const aos::timing::Watcher *watcher : *report.watchers()) {
+ CHECK_LT(0, watcher->channel_index());
+ CHECK_LT(static_cast<size_t>(watcher->channel_index()),
+ factory->configuration()->channels()->size());
+ results->watchers.insert(watcher->channel_index());
+ }
+ }
+ }
+
+ if (options.get_fetchers) {
+ if (report.has_fetchers()) {
+ for (const aos::timing::Fetcher *fetcher : *report.fetchers()) {
+ CHECK_LT(0, fetcher->channel_index());
+ CHECK_LT(static_cast<size_t>(fetcher->channel_index()),
+ factory->configuration()->channels()->size());
+ results->fetchers.insert(fetcher->channel_index());
+ }
+ }
+ }
+}
+
+class ChannelsExtractor {
+ public:
+ ChannelsExtractor(aos::EventLoop *node_event_loop,
+ ChannelsExtractorAccumulator *results,
+ const size_t node_index,
+ aos::SimulatedEventLoopFactory *factory,
+ const aos::Configuration *logged_configuration,
+ const aos::logger::ChannelsInLogOptions options) {
+ // skip timing report because we don't want the reader to generate a timing
+ // report
+ node_event_loop->SkipTimingReport();
+ node_event_loop->SkipAosLog();
+
+ // This is the watcher which looks for applications and then records the
+ // respective channels
+ node_event_loop->MakeWatcher(
+ "/aos", [results, node_index, factory, logged_configuration,
+ options](const aos::timing::Report &report) {
+ HandleChannelsInApplications(report, node_index, factory,
+ logged_configuration, options, results);
+ });
+ }
+};
+
+} // namespace
+namespace aos::logger {
+
+ChannelsInLogResult ChannelsInLog(
+ const std::vector<aos::logger::LogFile> &log_files,
+ const std::vector<const aos::Node *> &nodes,
+ const std::vector<std::string> &applications,
+ const aos::logger::ChannelsInLogOptions options) {
+ // Make a log_reader object to make event loop and look into channels and
+ // configuration
+ LogReader reader(log_files);
+ aos::SimulatedEventLoopFactory factory(reader.configuration());
+ reader.RegisterWithoutStarting(&factory);
+
+ ChannelsExtractorAccumulator results;
+
+ // Make watchers for every node in "nodes" and for all applications in
+ // "applications" that ran for that node
+ for (size_t ii = 0; ii < nodes.size(); ++ii) {
+ results.observed_applications.push_back({});
+ results.remaining_applications.push_back(
+ std::set<std::string>(applications.begin(), applications.end()));
+
+ const aos::Node *const node = nodes.at(ii);
+
+ aos::NodeEventLoopFactory *node_factory = factory.GetNodeEventLoopFactory(
+ aos::configuration::GetNode(factory.configuration(), node));
+
+ reader.OnStart(node,
+ [node_factory, &results, ii, &factory, &reader, &options]() {
+ node_factory->AlwaysStart<ChannelsExtractor>(
+ "channels_extractor", &results, ii, &factory,
+ reader.logged_configuration(), options);
+ });
+ }
+
+ factory.Run();
+ reader.Deregister();
+
+ for (size_t ii = 0; ii < nodes.size(); ++ii) {
+ if (!results.remaining_applications[ii].empty()) {
+ LOG(INFO) << "Didn't find all applications requested on "
+ << nodes[ii]->name()->string_view()
+ << ": remaining applications: "
+ << absl::StrJoin(results.remaining_applications[ii], ", ");
+ }
+ }
+
+ ChannelsInLogResult channels;
+
+ if (options.get_senders) {
+ channels.senders = std::make_optional<std::vector<aos::ChannelT>>({});
+ for (const int index : results.senders) {
+ channels.senders.value().push_back({});
+ reader.logged_configuration()->channels()->Get(index)->UnPackTo(
+ &channels.senders.value().back());
+ }
+ }
+
+ if (options.get_watchers) {
+ channels.watchers = std::make_optional<std::vector<aos::ChannelT>>({});
+ for (const int index : results.watchers) {
+ channels.watchers.value().push_back({});
+ reader.configuration()->channels()->Get(index)->UnPackTo(
+ &channels.watchers.value().back());
+ }
+ }
+
+ if (options.get_fetchers) {
+ channels.fetchers = std::make_optional<std::vector<aos::ChannelT>>({});
+ for (const int index : results.fetchers) {
+ channels.fetchers.value().push_back({});
+ reader.configuration()->channels()->Get(index)->UnPackTo(
+ &channels.fetchers.value().back());
+ }
+ }
+
+ if (options.get_senders && options.get_watchers && options.get_fetchers) {
+ channels.watchers_and_fetchers_without_senders =
+ std::make_optional<std::vector<aos::ChannelT>>({});
+ std::set<int> watchers_and_fetchers_without_senders;
+ // TODO(EricS) probably a better way to optimize this symmetric diff algo
+ for (const int watcher : results.watchers) {
+ if (!std::binary_search(results.senders.begin(), results.senders.end(),
+ watcher)) {
+ watchers_and_fetchers_without_senders.insert(watcher);
+ }
+ }
+
+ for (const int fetcher : results.fetchers) {
+ if (!std::binary_search(results.senders.begin(), results.senders.end(),
+ fetcher)) {
+ watchers_and_fetchers_without_senders.insert(fetcher);
+ }
+ }
+
+ for (const int index : watchers_and_fetchers_without_senders) {
+ channels.watchers_and_fetchers_without_senders.value().push_back({});
+ reader.configuration()->channels()->Get(index)->UnPackTo(
+ &channels.watchers_and_fetchers_without_senders.value().back());
+ }
+ }
+
+ return channels;
+}
+
+std::vector<aos::ChannelT> SenderChannelsInLog(
+ const std::vector<aos::logger::LogFile> &log_files,
+ const std::vector<const aos::Node *> &nodes,
+ const std::vector<std::string> &applications) {
+ return ChannelsInLog(log_files, nodes, applications, {true, false, false})
+ .senders.value();
+}
+
+std::vector<aos::ChannelT> WatcherChannelsInLog(
+ const std::vector<aos::logger::LogFile> &log_files,
+ const std::vector<const aos::Node *> &nodes,
+ const std::vector<std::string> &applications) {
+ return ChannelsInLog(log_files, nodes, applications, {false, true, false})
+ .watchers.value();
+}
+
+std::vector<aos::ChannelT> FetcherChannelsInLog(
+ const std::vector<aos::logger::LogFile> &log_files,
+ const std::vector<const aos::Node *> &nodes,
+ const std::vector<std::string> &applications) {
+ return ChannelsInLog(log_files, nodes, applications, {false, false, true})
+ .fetchers.value();
+}
+
+} // namespace aos::logger
diff --git a/aos/events/logging/log_reader_utils.h b/aos/events/logging/log_reader_utils.h
new file mode 100644
index 0000000..0c8ed07
--- /dev/null
+++ b/aos/events/logging/log_reader_utils.h
@@ -0,0 +1,54 @@
+#ifndef AOS_EVENTS_LOGGING_LOG_READER_UTILS_H_
+#define AOS_EVENTS_LOGGING_LOG_READER_UTILS_H_
+
+#include "aos/events/logging/log_reader.h"
+
+namespace aos::logger {
+
+// Utility struct for returning all channels segregated as senders, watchers and
+// fetchers
+struct ChannelsInLogResult {
+ std::optional<std::vector<aos::ChannelT>> senders;
+ std::optional<std::vector<aos::ChannelT>> watchers;
+ std::optional<std::vector<aos::ChannelT>> fetchers;
+ std::optional<std::vector<aos::ChannelT>>
+ watchers_and_fetchers_without_senders;
+}; // struct ChannelsInLogResult
+
+// A struct to select what kind of channels we want to extract from the log
+struct ChannelsInLogOptions {
+ bool get_senders = false;
+ bool get_watchers = false;
+ bool get_fetchers = false;
+}; // struct ChannelsInLogOptions
+
+// Reads the first ~1 second of timing reports in a logfile and generates a list
+// of all the channels sent on by the specified applications on the specified
+// nodes.
+ChannelsInLogResult ChannelsInLog(
+ const std::vector<aos::logger::LogFile> &log_files,
+ const std::vector<const aos::Node *> &nodes,
+ const std::vector<std::string> &applications,
+ const ChannelsInLogOptions options = ChannelsInLogOptions{true, true,
+ true});
+// Wrapper for channelsinlog but only for sender channels
+std::vector<aos::ChannelT> SenderChannelsInLog(
+ const std::vector<aos::logger::LogFile> &log_files,
+ const std::vector<const aos::Node *> &nodes,
+ const std::vector<std::string> &applications);
+
+// Wrapper for channelsinlog but only for watcher channels
+std::vector<aos::ChannelT> WatcherChannelsInLog(
+ const std::vector<aos::logger::LogFile> &log_files,
+ const std::vector<const aos::Node *> &nodes,
+ const std::vector<std::string> &applications);
+
+// Wrapper for channelsinlog but only for fetcher channels
+std::vector<aos::ChannelT> FetcherChannelsInLog(
+ const std::vector<aos::logger::LogFile> &log_files,
+ const std::vector<const aos::Node *> &nodes,
+ const std::vector<std::string> &applications);
+
+} // namespace aos::logger
+
+#endif // AOS_EVENTS_LOGGING_LOG_READER_UTILS_H_
diff --git a/aos/events/logging/log_replayer.cc b/aos/events/logging/log_replayer.cc
new file mode 100644
index 0000000..4de4052
--- /dev/null
+++ b/aos/events/logging/log_replayer.cc
@@ -0,0 +1,189 @@
+#include <stdlib.h>
+
+#include <iostream>
+#include <optional>
+#include <ostream>
+#include <sstream>
+#include <string_view>
+#include <vector>
+
+#include "aos/configuration_generated.h"
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/log_reader.h"
+#include "aos/events/logging/log_reader_utils.h"
+#include "aos/events/logging/log_replayer_config_generated.h"
+#include "aos/events/logging/logfile_sorting.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/replay_timing_generated.h"
+#include "aos/events/logging/replay_timing_schema.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/init.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/util/file.h"
+#include "flatbuffers/flatbuffers.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+DEFINE_string(config, "", "If specified, overrides logged configuration.");
+DEFINE_bool(
+ plot_timing, true,
+ "If set, generates a plot of the replay timing--namely, the errors between "
+ "when we "
+ "should've sent messages and when we actually sent replayed messages.");
+DEFINE_bool(skip_sender_channels, true,
+ "If set, skips replay of the channels applications replay on");
+DEFINE_bool(skip_replay, false,
+ "If set, skips actually running the replay. Useful for writing a "
+ "config without running replay");
+DEFINE_bool(
+ print_config, false,
+ "If set, prints the config that will be used for replay to stdout as json");
+DEFINE_string(
+ replay_config, "",
+ "Path to the configuration used for log replay which includes items such "
+ "as channels to remap, and applications to target for replay. If not set, "
+ "log_reader will run on shm event loop. ");
+DEFINE_string(merge_with_config, "",
+ "A valid json string to be merged with config. This is used to "
+ "add extra applications needed to run only for log_replayer");
+
+namespace aos::logger {
+
+int Main(int argc, char *argv[]) {
+ const std::vector<std::string> unsorted_logfiles =
+ aos::logger::FindLogs(argc, argv);
+
+ const std::vector<aos::logger::LogFile> logfiles =
+ aos::logger::SortParts(unsorted_logfiles);
+
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ FLAGS_config.empty()
+ ? aos::FlatbufferDetachedBuffer<aos::Configuration>::Empty()
+ : aos::configuration::ReadConfig(FLAGS_config);
+
+ if (FLAGS_plot_timing) {
+ aos::logger::LogReader config_reader(logfiles);
+
+ // Go through the effort to add a ReplayTiming channel to ensure that we
+ // can capture timing information from the replay.
+ const aos::Configuration *raw_config = FLAGS_config.empty()
+ ? config_reader.configuration()
+ : &config.message();
+ const std::string channel_node =
+ aos::configuration::MultiNode(raw_config)
+ ? absl::StrFormat("\"source_node\": \"%s%s",
+ aos::configuration::GetMyNode(raw_config)
+ ->name()
+ ->string_view(),
+ "\",")
+ : "";
+ config = aos::configuration::MergeWithConfig(
+ raw_config,
+ aos::configuration::AddSchema(
+ absl::StrFormat(
+ "{ \"channels\": [{ \"name\": \"/timing\", \"type\": "
+ "\"aos.timing.ReplayTiming\", \"max_size\": 10000, "
+ "\"frequency\": 10000, %s %s",
+ channel_node, "\"num_senders\": 2 }]}"),
+ {aos::FlatbufferVector<reflection::Schema>(
+ aos::FlatbufferSpan<reflection::Schema>(
+ aos::timing::ReplayTimingSchema()))}));
+ }
+
+ if (!FLAGS_merge_with_config.empty()) {
+ config = aos::configuration::MergeWithConfig(&config.message(),
+ FLAGS_merge_with_config);
+ }
+
+ std::optional<aos::FlatbufferDetachedBuffer<ReplayConfig>> replay_config =
+ FLAGS_replay_config.empty()
+ ? std::nullopt
+ : std::make_optional(aos::JsonToFlatbuffer<ReplayConfig>(
+ aos::util::ReadFileToStringOrDie(FLAGS_replay_config.data())));
+
+ std::vector<std::pair<std::string_view, std::string_view>> message_filter;
+ if (FLAGS_skip_sender_channels && replay_config.has_value()) {
+ CHECK(replay_config.value().message().has_active_nodes());
+ std::vector<const Node *> active_nodes;
+ for (const auto &node : *replay_config.value().message().active_nodes()) {
+ active_nodes.emplace_back(configuration::GetNode(
+ &config.message(), node->name()->string_view()));
+ }
+
+ std::vector<std::string> applications;
+ for (const auto &application :
+ *replay_config.value().message().applications()) {
+ if (application->name()->string_view() != "camera_message_interceptor") {
+ applications.emplace_back(application->name()->string_view());
+ }
+ }
+
+ aos::logger::ChannelsInLogResult channels =
+ ChannelsInLog(logfiles, active_nodes, applications);
+ for (auto const &channel :
+ channels.watchers_and_fetchers_without_senders.value()) {
+ message_filter.emplace_back(std::make_pair(channel.name, channel.type));
+ }
+ }
+
+ aos::logger::LogReader reader(
+ logfiles, &config.message(),
+ message_filter.empty() ? nullptr : &message_filter);
+
+ if (replay_config.has_value()) {
+ for (auto const &remap_channel :
+ *replay_config.value().message().remap_channels()) {
+ auto const &channel = remap_channel->channel();
+ std::string_view new_type = remap_channel->has_new_type()
+ ? remap_channel->new_type()->string_view()
+ : channel->type()->string_view();
+ reader.RemapLoggedChannel(
+ channel->name()->string_view(), channel->type()->string_view(),
+ remap_channel->prefix()->string_view(), new_type);
+ }
+ }
+
+ if (FLAGS_print_config) {
+ // TODO(Naman): Replace with config writer if it will be cleaner
+ std::cout << FlatbufferToJson(reader.configuration()) << std::endl;
+ }
+
+ if (!FLAGS_skip_replay) {
+ aos::ShmEventLoop event_loop(reader.configuration());
+
+ event_loop.SkipAosLog();
+ event_loop.SkipTimingReport();
+
+ reader.Register(&event_loop);
+ reader.OnEnd(event_loop.node(), [&event_loop]() { event_loop.Exit(); });
+
+ if (FLAGS_plot_timing) {
+ aos::Sender<aos::timing::ReplayTiming> replay_timing_sender =
+ event_loop.MakeSender<aos::timing::ReplayTiming>("/timing");
+ reader.set_timing_accuracy_sender(event_loop.node(),
+ std::move(replay_timing_sender));
+ }
+
+ event_loop.Run();
+
+ reader.Deregister();
+ }
+
+ return EXIT_SUCCESS;
+}
+
+} // namespace aos::logger
+
+int main(int argc, char *argv[]) {
+ gflags::SetUsageMessage(
+ R"message(Binary to replay the full contents of a logfile into shared memory.
+ #replay_config should be set in order to replay a set of nodes, applications and channels
+ #print config and skip replay, if you only want to print the config and not do log replay
+ Use case #1: log_replayer <log_dir> --print_config --replay_config=<path_to_config> --skip_replay
+ Use case #2: log_replayer <log_dir> --nofatal_sent_too_fast --replay_config=<path_to_config>
+ )message");
+
+ aos::InitGoogle(&argc, &argv);
+ return aos::logger::Main(argc, argv);
+}
diff --git a/aos/events/logging/log_replayer_config.fbs b/aos/events/logging/log_replayer_config.fbs
new file mode 100644
index 0000000..13a3cd9
--- /dev/null
+++ b/aos/events/logging/log_replayer_config.fbs
@@ -0,0 +1,29 @@
+include "../../configuration.fbs";
+
+namespace aos;
+
+// A flatbuffer table to store the RemapChannel used inside ReplayConfig
+table RemapChannel {
+ // table Channel defined in configuration.fbs
+ // Specify the old channel's name, type and other contents of the table
+ channel:Channel(id : 0);
+ // A prefix used for remapping the channel from /channel_name to
+ // /prefix/channel_name
+ prefix:string(id : 1);
+ // new type for the remapped channel
+ new_type:string(id : 2);
+}
+
+// A flatbuffer message to store the replay config
+// This is used to replay a log with specific applications, active nodes and
+// remaps the channels if needed
+table ReplayConfig {
+ // applications on which log needs to be replayed
+ applications:[Application](id : 0);
+ // Specify the nodes which need to be replayed
+ active_nodes:[Node](id : 1);
+// Specify the channels in the config which need to be remapped
+ remap_channels:[RemapChannel](id : 2);
+}
+
+root_type ReplayConfig;