blob: 4de405283c50c25c6589d53965553385bb365aaf [file] [log] [blame]
Naman Guptae7fe3552022-11-23 13:52:03 -08001#include <stdlib.h>
2
3#include <iostream>
4#include <optional>
5#include <ostream>
6#include <sstream>
7#include <string_view>
8#include <vector>
9
10#include "aos/configuration_generated.h"
11#include "aos/events/event_loop.h"
12#include "aos/events/logging/log_reader.h"
13#include "aos/events/logging/log_reader_utils.h"
14#include "aos/events/logging/log_replayer_config_generated.h"
15#include "aos/events/logging/logfile_sorting.h"
16#include "aos/events/logging/logfile_utils.h"
17#include "aos/events/logging/replay_timing_generated.h"
18#include "aos/events/logging/replay_timing_schema.h"
19#include "aos/events/shm_event_loop.h"
20#include "aos/flatbuffer_merge.h"
21#include "aos/init.h"
22#include "aos/json_to_flatbuffer.h"
23#include "aos/util/file.h"
24#include "flatbuffers/flatbuffers.h"
25#include "gflags/gflags.h"
26#include "glog/logging.h"
27
28DEFINE_string(config, "", "If specified, overrides logged configuration.");
29DEFINE_bool(
30 plot_timing, true,
31 "If set, generates a plot of the replay timing--namely, the errors between "
32 "when we "
33 "should've sent messages and when we actually sent replayed messages.");
34DEFINE_bool(skip_sender_channels, true,
35 "If set, skips replay of the channels applications replay on");
36DEFINE_bool(skip_replay, false,
37 "If set, skips actually running the replay. Useful for writing a "
38 "config without running replay");
39DEFINE_bool(
40 print_config, false,
41 "If set, prints the config that will be used for replay to stdout as json");
42DEFINE_string(
43 replay_config, "",
44 "Path to the configuration used for log replay which includes items such "
45 "as channels to remap, and applications to target for replay. If not set, "
46 "log_reader will run on shm event loop. ");
47DEFINE_string(merge_with_config, "",
48 "A valid json string to be merged with config. This is used to "
49 "add extra applications needed to run only for log_replayer");
50
51namespace aos::logger {
52
53int Main(int argc, char *argv[]) {
54 const std::vector<std::string> unsorted_logfiles =
55 aos::logger::FindLogs(argc, argv);
56
57 const std::vector<aos::logger::LogFile> logfiles =
58 aos::logger::SortParts(unsorted_logfiles);
59
60 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
61 FLAGS_config.empty()
62 ? aos::FlatbufferDetachedBuffer<aos::Configuration>::Empty()
63 : aos::configuration::ReadConfig(FLAGS_config);
64
65 if (FLAGS_plot_timing) {
66 aos::logger::LogReader config_reader(logfiles);
67
68 // Go through the effort to add a ReplayTiming channel to ensure that we
69 // can capture timing information from the replay.
70 const aos::Configuration *raw_config = FLAGS_config.empty()
71 ? config_reader.configuration()
72 : &config.message();
73 const std::string channel_node =
74 aos::configuration::MultiNode(raw_config)
75 ? absl::StrFormat("\"source_node\": \"%s%s",
76 aos::configuration::GetMyNode(raw_config)
77 ->name()
78 ->string_view(),
79 "\",")
80 : "";
81 config = aos::configuration::MergeWithConfig(
82 raw_config,
83 aos::configuration::AddSchema(
84 absl::StrFormat(
85 "{ \"channels\": [{ \"name\": \"/timing\", \"type\": "
86 "\"aos.timing.ReplayTiming\", \"max_size\": 10000, "
87 "\"frequency\": 10000, %s %s",
88 channel_node, "\"num_senders\": 2 }]}"),
89 {aos::FlatbufferVector<reflection::Schema>(
90 aos::FlatbufferSpan<reflection::Schema>(
91 aos::timing::ReplayTimingSchema()))}));
92 }
93
94 if (!FLAGS_merge_with_config.empty()) {
95 config = aos::configuration::MergeWithConfig(&config.message(),
96 FLAGS_merge_with_config);
97 }
98
99 std::optional<aos::FlatbufferDetachedBuffer<ReplayConfig>> replay_config =
100 FLAGS_replay_config.empty()
101 ? std::nullopt
102 : std::make_optional(aos::JsonToFlatbuffer<ReplayConfig>(
103 aos::util::ReadFileToStringOrDie(FLAGS_replay_config.data())));
104
105 std::vector<std::pair<std::string_view, std::string_view>> message_filter;
106 if (FLAGS_skip_sender_channels && replay_config.has_value()) {
107 CHECK(replay_config.value().message().has_active_nodes());
108 std::vector<const Node *> active_nodes;
109 for (const auto &node : *replay_config.value().message().active_nodes()) {
110 active_nodes.emplace_back(configuration::GetNode(
111 &config.message(), node->name()->string_view()));
112 }
113
114 std::vector<std::string> applications;
115 for (const auto &application :
116 *replay_config.value().message().applications()) {
117 if (application->name()->string_view() != "camera_message_interceptor") {
118 applications.emplace_back(application->name()->string_view());
119 }
120 }
121
122 aos::logger::ChannelsInLogResult channels =
123 ChannelsInLog(logfiles, active_nodes, applications);
124 for (auto const &channel :
125 channels.watchers_and_fetchers_without_senders.value()) {
126 message_filter.emplace_back(std::make_pair(channel.name, channel.type));
127 }
128 }
129
130 aos::logger::LogReader reader(
131 logfiles, &config.message(),
132 message_filter.empty() ? nullptr : &message_filter);
133
134 if (replay_config.has_value()) {
135 for (auto const &remap_channel :
136 *replay_config.value().message().remap_channels()) {
137 auto const &channel = remap_channel->channel();
138 std::string_view new_type = remap_channel->has_new_type()
139 ? remap_channel->new_type()->string_view()
140 : channel->type()->string_view();
141 reader.RemapLoggedChannel(
142 channel->name()->string_view(), channel->type()->string_view(),
143 remap_channel->prefix()->string_view(), new_type);
144 }
145 }
146
147 if (FLAGS_print_config) {
148 // TODO(Naman): Replace with config writer if it will be cleaner
149 std::cout << FlatbufferToJson(reader.configuration()) << std::endl;
150 }
151
152 if (!FLAGS_skip_replay) {
153 aos::ShmEventLoop event_loop(reader.configuration());
154
155 event_loop.SkipAosLog();
156 event_loop.SkipTimingReport();
157
158 reader.Register(&event_loop);
159 reader.OnEnd(event_loop.node(), [&event_loop]() { event_loop.Exit(); });
160
161 if (FLAGS_plot_timing) {
162 aos::Sender<aos::timing::ReplayTiming> replay_timing_sender =
163 event_loop.MakeSender<aos::timing::ReplayTiming>("/timing");
164 reader.set_timing_accuracy_sender(event_loop.node(),
165 std::move(replay_timing_sender));
166 }
167
168 event_loop.Run();
169
170 reader.Deregister();
171 }
172
173 return EXIT_SUCCESS;
174}
175
176} // namespace aos::logger
177
178int main(int argc, char *argv[]) {
179 gflags::SetUsageMessage(
180 R"message(Binary to replay the full contents of a logfile into shared memory.
181 #replay_config should be set in order to replay a set of nodes, applications and channels
182 #print config and skip replay, if you only want to print the config and not do log replay
183 Use case #1: log_replayer <log_dir> --print_config --replay_config=<path_to_config> --skip_replay
184 Use case #2: log_replayer <log_dir> --nofatal_sent_too_fast --replay_config=<path_to_config>
185 )message");
186
187 aos::InitGoogle(&argc, &argv);
188 return aos::logger::Main(argc, argv);
189}