blob: 22d8b396ed2b97c544bce5d9e033e86bdf263276 [file] [log] [blame]
#include <stdlib.h>
#include <iostream>
#include <optional>
#include <ostream>
#include <sstream>
#include <string_view>
#include <vector>
#include "flatbuffers/flatbuffers.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "aos/configuration_generated.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/log_reader.h"
#include "aos/events/logging/log_reader_utils.h"
#include "aos/events/logging/log_replayer_config_generated.h"
#include "aos/events/logging/log_replayer_stats_generated.h"
#include "aos/events/logging/log_replayer_stats_schema.h"
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/replay_timing_generated.h"
#include "aos/events/logging/replay_timing_schema.h"
#include "aos/events/shm_event_loop.h"
#include "aos/flatbuffer_merge.h"
#include "aos/init.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/util/file.h"
DEFINE_string(config, "", "If specified, overrides logged configuration.");
DEFINE_bool(
plot_timing, true,
"If set, generates a plot of the replay timing--namely, the errors between "
"when we "
"should've sent messages and when we actually sent replayed messages.");
DEFINE_bool(skip_sender_channels, true,
"If set, skips replay of the channels applications replay on");
DEFINE_bool(skip_replay, false,
"If set, skips actually running the replay. Useful for writing a "
"config without running replay");
DEFINE_bool(
print_config, false,
"If set, prints the config that will be used for replay to stdout as json");
DEFINE_string(
replay_config, "",
"Path to the configuration used for log replay which includes items such "
"as channels to remap, and applications to target for replay. If not set, "
"log_reader will run on shm event loop. ");
DEFINE_string(merge_with_config, "",
"A valid json string to be merged with config. This is used to "
"add extra applications needed to run only for log_replayer");
DEFINE_bool(print_stats, true,
"if set, prints the LogReplayerStats message as JSON to stdout");
namespace aos::logger {
int Main(int argc, char *argv[]) {
const std::vector<aos::logger::LogFile> logfiles =
aos::logger::SortParts(aos::logger::FindLogs(argc, argv));
aos::logger::LogReader config_reader(logfiles);
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
FLAGS_config.empty()
? CopyFlatBuffer<aos::Configuration>(config_reader.configuration())
: aos::configuration::ReadConfig(FLAGS_config);
if (FLAGS_plot_timing) {
// Go through the effort to add a ReplayTiming channel to ensure that we
// can capture timing information from the replay.
const aos::Configuration *raw_config = &config.message();
aos::ChannelT channel_overrides;
channel_overrides.max_size = 10000;
channel_overrides.frequency = 10000;
config = aos::configuration::AddChannelToConfiguration(
raw_config, "/timing",
aos::FlatbufferSpan<reflection::Schema>(
aos::timing::ReplayTimingSchema()),
aos::configuration::GetMyNode(raw_config), channel_overrides);
}
// Add the LogReplayerStats channel
const aos::Configuration *raw_config = &config.message();
aos::ChannelT channel_overrides;
channel_overrides.max_size = 10000;
channel_overrides.frequency = 1;
config = aos::configuration::AddChannelToConfiguration(
raw_config, "/replay",
aos::FlatbufferSpan<reflection::Schema>(aos::LogReplayerStatsSchema()),
aos::configuration::GetMyNode(raw_config), channel_overrides);
if (!FLAGS_merge_with_config.empty()) {
config = aos::configuration::MergeWithConfig(&config.message(),
FLAGS_merge_with_config);
}
std::optional<aos::FlatbufferDetachedBuffer<ReplayConfig>> replay_config =
FLAGS_replay_config.empty()
? std::nullopt
: std::make_optional(aos::JsonToFlatbuffer<ReplayConfig>(
aos::util::ReadFileToStringOrDie(FLAGS_replay_config.data())));
std::vector<std::pair<std::string, std::string>> message_filter;
if (FLAGS_skip_sender_channels && replay_config.has_value()) {
CHECK(replay_config.value().message().has_active_nodes());
std::vector<const Node *> active_nodes;
for (const auto &node : *replay_config.value().message().active_nodes()) {
active_nodes.emplace_back(configuration::GetNode(
&config.message(), node->name()->string_view()));
}
std::vector<std::string> applications;
for (const auto &application :
*replay_config.value().message().applications()) {
if (application->name()->string_view() != "camera_message_interceptor") {
applications.emplace_back(application->name()->string_view());
}
}
aos::logger::ChannelsInLogResult channels =
ChannelsInLog(logfiles, active_nodes, applications);
for (auto const &channel :
channels.watchers_and_fetchers_without_senders.value()) {
message_filter.emplace_back(channel.name, channel.type);
}
}
aos::logger::LogReader reader(
logfiles, &config.message(),
message_filter.empty() ? nullptr : &message_filter);
if (replay_config.has_value() &&
replay_config.value().message().has_remap_channels()) {
for (auto const &remap_channel :
*replay_config.value().message().remap_channels()) {
auto const &channel = remap_channel->channel();
std::string_view new_type = remap_channel->has_new_type()
? remap_channel->new_type()->string_view()
: channel->type()->string_view();
reader.RemapLoggedChannel(
channel->name()->string_view(), channel->type()->string_view(),
remap_channel->prefix()->string_view(), new_type);
}
}
if (FLAGS_print_config) {
// TODO(Naman): Replace with config writer if it will be cleaner
std::cout << FlatbufferToJson(reader.configuration()) << std::endl;
}
if (!FLAGS_skip_replay) {
aos::ShmEventLoop event_loop(reader.configuration());
event_loop.SkipAosLog();
event_loop.SkipTimingReport();
aos::Sender<aos::LogReplayerStats> stats_sender =
event_loop.MakeSender<aos::LogReplayerStats>("/replay");
auto builder = stats_sender.MakeBuilder();
auto node_name = builder.fbb()->CreateString(event_loop.node()->name());
flatbuffers::Offset<aos::ReplayConfig> replay_config_offset;
if (replay_config.has_value()) {
replay_config_offset =
aos::CopyFlatBuffer(&replay_config.value().message(), builder.fbb());
}
auto stats_builder = builder.MakeBuilder<aos::LogReplayerStats>();
if (replay_config.has_value()) {
stats_builder.add_replay_config(replay_config_offset);
}
reader.Register(&event_loop);
// Save off the start and end times of replay.
reader.OnStart(event_loop.node(), [&event_loop, &stats_builder,
&node_name]() {
stats_builder.add_node(node_name);
stats_builder.add_realtime_start_time(
std::chrono::nanoseconds(event_loop.realtime_now().time_since_epoch())
.count());
stats_builder.add_monotonic_start_time(
std::chrono::nanoseconds(
event_loop.monotonic_now().time_since_epoch())
.count());
});
reader.OnEnd(event_loop.node(), [&event_loop, &stats_builder, &builder]() {
stats_builder.add_realtime_end_time(
std::chrono::nanoseconds(event_loop.realtime_now().time_since_epoch())
.count());
stats_builder.add_monotonic_end_time(
std::chrono::nanoseconds(
event_loop.monotonic_now().time_since_epoch())
.count());
builder.CheckOk(builder.Send(stats_builder.Finish()));
});
reader.OnEnd(event_loop.node(), [&event_loop]() { event_loop.Exit(); });
if (FLAGS_plot_timing) {
aos::Sender<aos::timing::ReplayTiming> replay_timing_sender =
event_loop.MakeSender<aos::timing::ReplayTiming>("/timing");
reader.set_timing_accuracy_sender(event_loop.node(),
std::move(replay_timing_sender));
}
event_loop.Run();
reader.Deregister();
if (FLAGS_print_stats) {
aos::Fetcher<aos::LogReplayerStats> stats_fetcher =
event_loop.MakeFetcher<aos::LogReplayerStats>("/replay");
CHECK(stats_fetcher.Fetch()) << "Failed to fetch LogReplayerStats!";
std::cout << aos::FlatbufferToJson(stats_fetcher.get());
}
}
return EXIT_SUCCESS;
}
} // namespace aos::logger
int main(int argc, char *argv[]) {
gflags::SetUsageMessage(
R"message(Binary to replay the full contents of a logfile into shared memory.
#replay_config should be set in order to replay a set of nodes, applications and channels
#print config and skip replay, if you only want to print the config and not do log replay
Use case #1: log_replayer <log_dir> --print_config --replay_config=<path_to_config> --skip_replay
Use case #2: log_replayer <log_dir> --nofatal_sent_too_fast --replay_config=<path_to_config>
)message");
aos::InitGoogle(&argc, &argv);
return aos::logger::Main(argc, argv);
}