blob: 295f0924b3844a1d9c8dc9f544a2555b6788cc8b [file] [log] [blame]
Naman Guptae7fe3552022-11-23 13:52:03 -08001#include "aos/events/logging/log_reader_utils.h"
2
3#include "absl/strings/str_join.h"
4
5namespace {
6
7struct ChannelsExtractorAccumulator {
8 // A set of senders, watchers and fetchers to have unique channels
9 std::set<int> senders;
10 std::set<int> watchers;
11 std::set<int> fetchers;
12
13 // observed_applications are all the applications for which timing reports
14 // have been found
15 std::vector<std::set<std::string>> observed_applications;
16
17 // remaining_applications are the vector of applications that have not been
18 // found on the nodes specified
19 std::vector<std::set<std::string>> remaining_applications;
20};
21
22void HandleChannelsInApplications(
23 const aos::timing::Report &report, const size_t nodes_index,
24 aos::SimulatedEventLoopFactory *factory,
25 const aos::Configuration *logged_configuration,
26 const aos::logger::ChannelsInLogOptions &options,
27 ChannelsExtractorAccumulator *results) {
28 std::string name = report.name()->str();
29 if (results->observed_applications[nodes_index].count(name) > 0) {
30 if (!results->remaining_applications[nodes_index].empty()) {
31 LOG(FATAL) << "Didn't see timing reports for every application! "
32 << absl::StrJoin(results->remaining_applications[nodes_index],
33 ", ");
34 } else {
35 factory->Exit();
36 }
37 }
38 if (results->remaining_applications[nodes_index].count(name) == 0) {
39 return;
40 }
41 results->observed_applications[nodes_index].insert(name);
42 results->remaining_applications[nodes_index].erase(name);
43
44 if (options.get_senders) {
45 if (report.has_senders()) {
46 for (const aos::timing::Sender *sender : *report.senders()) {
47 CHECK_LT(0, sender->channel_index());
48 CHECK_LT(static_cast<size_t>(sender->channel_index()),
49 logged_configuration->channels()->size());
50 results->senders.insert(sender->channel_index());
51 }
52 }
53 }
54
55 if (options.get_watchers) {
56 if (report.has_watchers()) {
57 for (const aos::timing::Watcher *watcher : *report.watchers()) {
58 CHECK_LT(0, watcher->channel_index());
59 CHECK_LT(static_cast<size_t>(watcher->channel_index()),
60 factory->configuration()->channels()->size());
61 results->watchers.insert(watcher->channel_index());
62 }
63 }
64 }
65
66 if (options.get_fetchers) {
67 if (report.has_fetchers()) {
68 for (const aos::timing::Fetcher *fetcher : *report.fetchers()) {
69 CHECK_LT(0, fetcher->channel_index());
70 CHECK_LT(static_cast<size_t>(fetcher->channel_index()),
71 factory->configuration()->channels()->size());
72 results->fetchers.insert(fetcher->channel_index());
73 }
74 }
75 }
76}
77
78class ChannelsExtractor {
79 public:
80 ChannelsExtractor(aos::EventLoop *node_event_loop,
81 ChannelsExtractorAccumulator *results,
82 const size_t node_index,
83 aos::SimulatedEventLoopFactory *factory,
84 const aos::Configuration *logged_configuration,
85 const aos::logger::ChannelsInLogOptions options) {
86 // skip timing report because we don't want the reader to generate a timing
87 // report
88 node_event_loop->SkipTimingReport();
89 node_event_loop->SkipAosLog();
90
91 // This is the watcher which looks for applications and then records the
92 // respective channels
93 node_event_loop->MakeWatcher(
94 "/aos", [results, node_index, factory, logged_configuration,
95 options](const aos::timing::Report &report) {
96 HandleChannelsInApplications(report, node_index, factory,
97 logged_configuration, options, results);
98 });
99 }
100};
101
102} // namespace
103namespace aos::logger {
104
105ChannelsInLogResult ChannelsInLog(
106 const std::vector<aos::logger::LogFile> &log_files,
107 const std::vector<const aos::Node *> &nodes,
108 const std::vector<std::string> &applications,
109 const aos::logger::ChannelsInLogOptions options) {
110 // Make a log_reader object to make event loop and look into channels and
111 // configuration
112 LogReader reader(log_files);
113 aos::SimulatedEventLoopFactory factory(reader.configuration());
114 reader.RegisterWithoutStarting(&factory);
115
116 ChannelsExtractorAccumulator results;
117
118 // Make watchers for every node in "nodes" and for all applications in
119 // "applications" that ran for that node
120 for (size_t ii = 0; ii < nodes.size(); ++ii) {
121 results.observed_applications.push_back({});
122 results.remaining_applications.push_back(
123 std::set<std::string>(applications.begin(), applications.end()));
124
125 const aos::Node *const node = nodes.at(ii);
126
127 aos::NodeEventLoopFactory *node_factory = factory.GetNodeEventLoopFactory(
128 aos::configuration::GetNode(factory.configuration(), node));
129
130 reader.OnStart(node,
131 [node_factory, &results, ii, &factory, &reader, &options]() {
132 node_factory->AlwaysStart<ChannelsExtractor>(
133 "channels_extractor", &results, ii, &factory,
134 reader.logged_configuration(), options);
135 });
136 }
137
138 factory.Run();
139 reader.Deregister();
140
141 for (size_t ii = 0; ii < nodes.size(); ++ii) {
142 if (!results.remaining_applications[ii].empty()) {
143 LOG(INFO) << "Didn't find all applications requested on "
144 << nodes[ii]->name()->string_view()
145 << ": remaining applications: "
146 << absl::StrJoin(results.remaining_applications[ii], ", ");
147 }
148 }
149
150 ChannelsInLogResult channels;
151
152 if (options.get_senders) {
153 channels.senders = std::make_optional<std::vector<aos::ChannelT>>({});
154 for (const int index : results.senders) {
155 channels.senders.value().push_back({});
156 reader.logged_configuration()->channels()->Get(index)->UnPackTo(
157 &channels.senders.value().back());
158 }
159 }
160
161 if (options.get_watchers) {
162 channels.watchers = std::make_optional<std::vector<aos::ChannelT>>({});
163 for (const int index : results.watchers) {
164 channels.watchers.value().push_back({});
165 reader.configuration()->channels()->Get(index)->UnPackTo(
166 &channels.watchers.value().back());
167 }
168 }
169
170 if (options.get_fetchers) {
171 channels.fetchers = std::make_optional<std::vector<aos::ChannelT>>({});
172 for (const int index : results.fetchers) {
173 channels.fetchers.value().push_back({});
174 reader.configuration()->channels()->Get(index)->UnPackTo(
175 &channels.fetchers.value().back());
176 }
177 }
178
179 if (options.get_senders && options.get_watchers && options.get_fetchers) {
180 channels.watchers_and_fetchers_without_senders =
181 std::make_optional<std::vector<aos::ChannelT>>({});
182 std::set<int> watchers_and_fetchers_without_senders;
183 // TODO(EricS) probably a better way to optimize this symmetric diff algo
184 for (const int watcher : results.watchers) {
185 if (!std::binary_search(results.senders.begin(), results.senders.end(),
186 watcher)) {
187 watchers_and_fetchers_without_senders.insert(watcher);
188 }
189 }
190
191 for (const int fetcher : results.fetchers) {
192 if (!std::binary_search(results.senders.begin(), results.senders.end(),
193 fetcher)) {
194 watchers_and_fetchers_without_senders.insert(fetcher);
195 }
196 }
197
198 for (const int index : watchers_and_fetchers_without_senders) {
199 channels.watchers_and_fetchers_without_senders.value().push_back({});
200 reader.configuration()->channels()->Get(index)->UnPackTo(
201 &channels.watchers_and_fetchers_without_senders.value().back());
202 }
203 }
204
205 return channels;
206}
207
208std::vector<aos::ChannelT> SenderChannelsInLog(
209 const std::vector<aos::logger::LogFile> &log_files,
210 const std::vector<const aos::Node *> &nodes,
211 const std::vector<std::string> &applications) {
212 return ChannelsInLog(log_files, nodes, applications, {true, false, false})
213 .senders.value();
214}
215
216std::vector<aos::ChannelT> WatcherChannelsInLog(
217 const std::vector<aos::logger::LogFile> &log_files,
218 const std::vector<const aos::Node *> &nodes,
219 const std::vector<std::string> &applications) {
220 return ChannelsInLog(log_files, nodes, applications, {false, true, false})
221 .watchers.value();
222}
223
224std::vector<aos::ChannelT> FetcherChannelsInLog(
225 const std::vector<aos::logger::LogFile> &log_files,
226 const std::vector<const aos::Node *> &nodes,
227 const std::vector<std::string> &applications) {
228 return ChannelsInLog(log_files, nodes, applications, {false, false, true})
229 .fetchers.value();
230}
231
232} // namespace aos::logger