blob: 01f6a3ea4fcf4fdcca497304019c0dc8e33e814b [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
5#include "glog/logging.h"
6
Austin Schuh39788ff2019-12-01 18:22:57 -08007DEFINE_bool(timing_reports, true, "Publish timing reports.");
8DEFINE_int32(timing_report_ms, 1000,
9 "Period in milliseconds to publish timing reports at.");
10
Austin Schuh54cf95f2019-11-29 13:14:18 -080011namespace aos {
12
Austin Schuh39788ff2019-12-01 18:22:57 -080013RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
14 : event_loop_(event_loop),
15 channel_(channel),
16 timing_(event_loop_->ChannelIndex(channel)) {
17 event_loop_->NewSender(this);
18}
19
20RawSender::~RawSender() { event_loop_->DeleteSender(this); }
21
22RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
23 : event_loop_(event_loop),
24 channel_(channel),
25 timing_(event_loop_->ChannelIndex(channel)) {
26 context_.monotonic_sent_time = monotonic_clock::min_time;
27 context_.realtime_sent_time = realtime_clock::min_time;
28 context_.queue_index = 0xffffffff;
29 context_.size = 0;
30 context_.data = nullptr;
31 event_loop_->NewFetcher(this);
32}
33
34RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
35
36TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
37 : event_loop_(event_loop), fn_(std::move(fn)) {}
38
39TimerHandler::~TimerHandler() {}
40
41PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
42 std::function<void(int)> fn,
43 const monotonic_clock::duration interval,
44 const monotonic_clock::duration offset)
45 : event_loop_(event_loop),
46 fn_(std::move(fn)),
47 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
48 event_loop_->OnRun([this]() {
49 const monotonic_clock::time_point monotonic_now =
50 event_loop_->monotonic_now();
51 phased_loop_.Reset(monotonic_now);
52 Reschedule(
53 [this](monotonic_clock::time_point sleep_time) {
54 Schedule(sleep_time);
55 },
56 monotonic_now);
57 // The first time, we'll double count. Reschedule here will count cycles
58 // elapsed before now, and then the reschedule before runing the handler
59 // will count the time that elapsed then. So clear the count here.
60 cycles_elapsed_ = 0;
61 });
62}
63
64PhasedLoopHandler::~PhasedLoopHandler() {}
65
66EventLoop::~EventLoop() {
67 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
68}
69
70int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuh54cf95f2019-11-29 13:14:18 -080071 CHECK(configuration_->channels() != nullptr) << ": No channels";
72
Austin Schuh39788ff2019-12-01 18:22:57 -080073 auto c = std::find(configuration_->channels()->begin(),
74 configuration_->channels()->end(), channel);
75 CHECK(c != configuration_->channels()->end())
Austin Schuh54cf95f2019-11-29 13:14:18 -080076 << ": Channel pointer not found in configuration()->channels()";
Austin Schuh39788ff2019-12-01 18:22:57 -080077
78 return std::distance(configuration()->channels()->begin(), c);
79}
80
81void EventLoop::NewSender(RawSender *sender) {
82 senders_.emplace_back(sender);
83 UpdateTimingReport();
84}
85void EventLoop::DeleteSender(RawSender *sender) {
86 CHECK(!is_running());
87 auto s = std::find(senders_.begin(), senders_.end(), sender);
88 CHECK(s != senders_.end()) << ": Sender not in senders list";
89 senders_.erase(s);
90 UpdateTimingReport();
91}
92
93TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
94 timers_.emplace_back(std::move(timer));
95 UpdateTimingReport();
96 return timers_.back().get();
97}
98
99PhasedLoopHandler *EventLoop::NewPhasedLoop(
100 std::unique_ptr<PhasedLoopHandler> phased_loop) {
101 phased_loops_.emplace_back(std::move(phased_loop));
102 UpdateTimingReport();
103 return phased_loops_.back().get();
104}
105
106void EventLoop::NewFetcher(RawFetcher *fetcher) {
107 fetchers_.emplace_back(fetcher);
108 UpdateTimingReport();
109}
110
111void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
112 CHECK(!is_running());
113 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
114 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
115 fetchers_.erase(f);
116 UpdateTimingReport();
117}
118
119WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
120 watchers_.emplace_back(std::move(watcher));
121
122 UpdateTimingReport();
123
124 return watchers_.back().get();
125}
126
127void EventLoop::SendTimingReport() {
128 // We need to do a fancy dance here to get all the accounting to work right.
129 // We want to copy the memory here, but then send after resetting. Otherwise
130 // the send for the timing report won't be counted in the timing report.
131 //
132 // Also, flatbuffers build from the back end. So place this at the back end
133 // of the buffer. We only have to care because we are using this in a very
134 // raw fashion.
135 CHECK_LE(timing_report_.size(), timing_report_sender_->size())
136 << ": Timing report bigger than the sender size.";
137 std::copy(timing_report_.data(),
138 timing_report_.data() + timing_report_.size(),
139 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
140 timing_report_sender_->size() - timing_report_.size());
141
142 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
143 timer->timing_.ResetTimingReport();
144 }
145 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
146 watcher->ResetReport();
147 }
148 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
149 phased_loop->timing_.ResetTimingReport();
150 }
151 for (RawSender *sender : senders_) {
152 sender->timing_.ResetTimingReport();
153 }
154 for (RawFetcher *fetcher : fetchers_) {
155 fetcher->timing_.ResetTimingReport();
156 }
157 timing_report_sender_->Send(timing_report_.size());
158}
159
160void EventLoop::UpdateTimingReport() {
161 // We need to support senders and fetchers changing while we are setting up
162 // the event loop. Otherwise we can't fetch or send until the loop runs. This
163 // means that on each change, we need to redo all this work. This makes setup
164 // more expensive, but not by all that much on a modern processor.
165
166 // Now, build up a report with everything pre-filled out.
167 flatbuffers::FlatBufferBuilder fbb;
168 fbb.ForceDefaults(1);
169
170 // Pre-fill in the defaults for timers.
171 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
172 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
173 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
174 timing::CreateStatistic(fbb);
175 flatbuffers::Offset<timing::Statistic> handler_time_offset =
176 timing::CreateStatistic(fbb);
177 flatbuffers::Offset<flatbuffers::String> name_offset;
178 if (timer->name().size() != 0) {
179 name_offset = fbb.CreateString(timer->name());
180 }
181
182 timing::Timer::Builder timer_builder(fbb);
183
184 if (timer->name().size() != 0) {
185 timer_builder.add_name(name_offset);
186 }
187 timer_builder.add_wakeup_latency(wakeup_latency_offset);
188 timer_builder.add_handler_time(handler_time_offset);
189 timer_builder.add_count(0);
190 timer_offsets.emplace_back(timer_builder.Finish());
191 }
192
193 // Pre-fill in the defaults for phased_loops.
194 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
195 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
196 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
197 timing::CreateStatistic(fbb);
198 flatbuffers::Offset<timing::Statistic> handler_time_offset =
199 timing::CreateStatistic(fbb);
200 flatbuffers::Offset<flatbuffers::String> name_offset;
201 if (phased_loop->name().size() != 0) {
202 name_offset = fbb.CreateString(phased_loop->name());
203 }
204
205 timing::Timer::Builder timer_builder(fbb);
206
207 if (phased_loop->name().size() != 0) {
208 timer_builder.add_name(name_offset);
209 }
210 timer_builder.add_wakeup_latency(wakeup_latency_offset);
211 timer_builder.add_handler_time(handler_time_offset);
212 timer_builder.add_count(0);
213 phased_loop_offsets.emplace_back(timer_builder.Finish());
214 }
215
216 // Pre-fill in the defaults for watchers.
217 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
218 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
219 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
220 timing::CreateStatistic(fbb);
221 flatbuffers::Offset<timing::Statistic> handler_time_offset =
222 timing::CreateStatistic(fbb);
223
224 timing::Watcher::Builder watcher_builder(fbb);
225
226 watcher_builder.add_channel_index(watcher->channel_index());
227 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
228 watcher_builder.add_handler_time(handler_time_offset);
229 watcher_builder.add_count(0);
230 watcher_offsets.emplace_back(watcher_builder.Finish());
231 }
232
233 // Pre-fill in the defaults for senders.
234 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
235 for (const RawSender *sender : senders_) {
236 flatbuffers::Offset<timing::Statistic> size_offset =
237 timing::CreateStatistic(fbb);
238
239 timing::Sender::Builder sender_builder(fbb);
240
241 sender_builder.add_channel_index(sender->timing_.channel_index);
242 sender_builder.add_size(size_offset);
243 sender_builder.add_count(0);
244 sender_offsets.emplace_back(sender_builder.Finish());
245 }
246
247 // Pre-fill in the defaults for fetchers.
248 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
249 for (RawFetcher *fetcher : fetchers_) {
250 flatbuffers::Offset<timing::Statistic> latency_offset =
251 timing::CreateStatistic(fbb);
252
253 timing::Fetcher::Builder fetcher_builder(fbb);
254
255 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
256 fetcher_builder.add_count(0);
257 fetcher_builder.add_latency(latency_offset);
258 fetcher_offsets.emplace_back(fetcher_builder.Finish());
259 }
260
261 // Then build the final report.
262 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
263 timers_offset;
264 if (timer_offsets.size() > 0) {
265 timers_offset = fbb.CreateVector(timer_offsets);
266 }
267
268 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
269 phased_loops_offset;
270 if (phased_loop_offsets.size() > 0) {
271 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
272 }
273
274 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
275 watchers_offset;
276 if (watcher_offsets.size() > 0) {
277 watchers_offset = fbb.CreateVector(watcher_offsets);
278 }
279
280 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
281 senders_offset;
282 if (sender_offsets.size() > 0) {
283 senders_offset = fbb.CreateVector(sender_offsets);
284 }
285
286 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
287 fetchers_offset;
288 if (fetcher_offsets.size() > 0) {
289 fetchers_offset = fbb.CreateVector(fetcher_offsets);
290 }
291
292 flatbuffers::Offset<flatbuffers::String> name_offset =
293 fbb.CreateString(name());
294
295 timing::Report::Builder report_builder(fbb);
296 report_builder.add_name(name_offset);
297 report_builder.add_pid(GetTid());
298 if (timer_offsets.size() > 0) {
299 report_builder.add_timers(timers_offset);
300 }
301 if (phased_loop_offsets.size() > 0) {
302 report_builder.add_phased_loops(phased_loops_offset);
303 }
304 if (watcher_offsets.size() > 0) {
305 report_builder.add_watchers(watchers_offset);
306 }
307 if (sender_offsets.size() > 0) {
308 report_builder.add_senders(senders_offset);
309 }
310 if (fetcher_offsets.size() > 0) {
311 report_builder.add_fetchers(fetchers_offset);
312 }
313 fbb.Finish(report_builder.Finish());
314
315 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
316
317 // Now that the pointers are stable, pass them to the timers and watchers to
318 // be updated.
319 for (size_t i = 0; i < timers_.size(); ++i) {
320 timers_[i]->timing_.set_timing_report(
321 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
322 i));
323 }
324
325 for (size_t i = 0; i < phased_loops_.size(); ++i) {
326 phased_loops_[i]->timing_.set_timing_report(
327 timing_report_.mutable_message()
328 ->mutable_phased_loops()
329 ->GetMutableObject(i));
330 }
331
332 for (size_t i = 0; i < watchers_.size(); ++i) {
333 watchers_[i]->set_timing_report(
334 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
335 i));
336 }
337
338 for (size_t i = 0; i < senders_.size(); ++i) {
339 senders_[i]->timing_.set_timing_report(
340 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
341 i));
342 }
343
344 for (size_t i = 0; i < fetchers_.size(); ++i) {
345 fetchers_[i]->timing_.set_timing_report(
346 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
347 i));
348 }
349}
350
351void EventLoop::MaybeScheduleTimingReports() {
352 if (FLAGS_timing_reports && !skip_timing_report_) {
353 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
354 // Make a raw sender for the report.
355 const Channel *channel = configuration::GetChannel(
356 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
357 name());
358 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
359 << timing::Report::GetFullyQualifiedName()
360 << "\" } not found in config.";
361 timing_report_sender_ = MakeRawSender(channel);
362
363 // Register a handler which sends the report out by copying the raw data
364 // from the prebuilt and subsequently modified report.
365 TimerHandler *timing_reports_timer =
366 AddTimer([this]() { SendTimingReport(); });
367
368 // Set it up to send once per second.
369 timing_reports_timer->set_name("timing_reports");
370 OnRun([this, timing_reports_timer]() {
371 timing_reports_timer->Setup(
372 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
373 std::chrono::milliseconds(FLAGS_timing_report_ms));
374 });
375
376 UpdateTimingReport();
377 }
378}
379
380void WatcherState::set_timing_report(timing::Watcher *watcher) {
381 CHECK_NOTNULL(watcher);
382 watcher_ = watcher;
383 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
384 handler_time_.set_statistic(watcher->mutable_handler_time());
385}
386
387void WatcherState::ResetReport() {
388 wakeup_latency_.Reset();
389 handler_time_.Reset();
390 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800391}
392
393} // namespace aos