blob: 2c3a2155755aa335d037d8d99adabcf7188da99e [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
5#include "glog/logging.h"
6
Austin Schuh39788ff2019-12-01 18:22:57 -08007DEFINE_bool(timing_reports, true, "Publish timing reports.");
8DEFINE_int32(timing_report_ms, 1000,
9 "Period in milliseconds to publish timing reports at.");
10
Austin Schuh54cf95f2019-11-29 13:14:18 -080011namespace aos {
12
Austin Schuh39788ff2019-12-01 18:22:57 -080013RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
14 : event_loop_(event_loop),
15 channel_(channel),
16 timing_(event_loop_->ChannelIndex(channel)) {
17 event_loop_->NewSender(this);
18}
19
20RawSender::~RawSender() { event_loop_->DeleteSender(this); }
21
22RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
23 : event_loop_(event_loop),
24 channel_(channel),
25 timing_(event_loop_->ChannelIndex(channel)) {
26 context_.monotonic_sent_time = monotonic_clock::min_time;
27 context_.realtime_sent_time = realtime_clock::min_time;
28 context_.queue_index = 0xffffffff;
29 context_.size = 0;
30 context_.data = nullptr;
31 event_loop_->NewFetcher(this);
32}
33
34RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
35
36TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
37 : event_loop_(event_loop), fn_(std::move(fn)) {}
38
39TimerHandler::~TimerHandler() {}
40
41PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
42 std::function<void(int)> fn,
43 const monotonic_clock::duration interval,
44 const monotonic_clock::duration offset)
45 : event_loop_(event_loop),
46 fn_(std::move(fn)),
47 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
48 event_loop_->OnRun([this]() {
49 const monotonic_clock::time_point monotonic_now =
50 event_loop_->monotonic_now();
51 phased_loop_.Reset(monotonic_now);
52 Reschedule(
53 [this](monotonic_clock::time_point sleep_time) {
54 Schedule(sleep_time);
55 },
56 monotonic_now);
57 // The first time, we'll double count. Reschedule here will count cycles
58 // elapsed before now, and then the reschedule before runing the handler
59 // will count the time that elapsed then. So clear the count here.
60 cycles_elapsed_ = 0;
61 });
62}
63
64PhasedLoopHandler::~PhasedLoopHandler() {}
65
66EventLoop::~EventLoop() {
67 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -080068 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -080069}
70
71int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuh54cf95f2019-11-29 13:14:18 -080072 CHECK(configuration_->channels() != nullptr) << ": No channels";
73
Austin Schuh39788ff2019-12-01 18:22:57 -080074 auto c = std::find(configuration_->channels()->begin(),
75 configuration_->channels()->end(), channel);
76 CHECK(c != configuration_->channels()->end())
Austin Schuh54cf95f2019-11-29 13:14:18 -080077 << ": Channel pointer not found in configuration()->channels()";
Austin Schuh39788ff2019-12-01 18:22:57 -080078
79 return std::distance(configuration()->channels()->begin(), c);
80}
81
82void EventLoop::NewSender(RawSender *sender) {
83 senders_.emplace_back(sender);
84 UpdateTimingReport();
85}
86void EventLoop::DeleteSender(RawSender *sender) {
87 CHECK(!is_running());
88 auto s = std::find(senders_.begin(), senders_.end(), sender);
89 CHECK(s != senders_.end()) << ": Sender not in senders list";
90 senders_.erase(s);
91 UpdateTimingReport();
92}
93
94TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
95 timers_.emplace_back(std::move(timer));
96 UpdateTimingReport();
97 return timers_.back().get();
98}
99
100PhasedLoopHandler *EventLoop::NewPhasedLoop(
101 std::unique_ptr<PhasedLoopHandler> phased_loop) {
102 phased_loops_.emplace_back(std::move(phased_loop));
103 UpdateTimingReport();
104 return phased_loops_.back().get();
105}
106
107void EventLoop::NewFetcher(RawFetcher *fetcher) {
108 fetchers_.emplace_back(fetcher);
109 UpdateTimingReport();
110}
111
112void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
113 CHECK(!is_running());
114 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
115 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
116 fetchers_.erase(f);
117 UpdateTimingReport();
118}
119
120WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
121 watchers_.emplace_back(std::move(watcher));
122
123 UpdateTimingReport();
124
125 return watchers_.back().get();
126}
127
128void EventLoop::SendTimingReport() {
129 // We need to do a fancy dance here to get all the accounting to work right.
130 // We want to copy the memory here, but then send after resetting. Otherwise
131 // the send for the timing report won't be counted in the timing report.
132 //
133 // Also, flatbuffers build from the back end. So place this at the back end
134 // of the buffer. We only have to care because we are using this in a very
135 // raw fashion.
136 CHECK_LE(timing_report_.size(), timing_report_sender_->size())
137 << ": Timing report bigger than the sender size.";
138 std::copy(timing_report_.data(),
139 timing_report_.data() + timing_report_.size(),
140 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
141 timing_report_sender_->size() - timing_report_.size());
142
143 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
144 timer->timing_.ResetTimingReport();
145 }
146 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
147 watcher->ResetReport();
148 }
149 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
150 phased_loop->timing_.ResetTimingReport();
151 }
152 for (RawSender *sender : senders_) {
153 sender->timing_.ResetTimingReport();
154 }
155 for (RawFetcher *fetcher : fetchers_) {
156 fetcher->timing_.ResetTimingReport();
157 }
158 timing_report_sender_->Send(timing_report_.size());
159}
160
161void EventLoop::UpdateTimingReport() {
162 // We need to support senders and fetchers changing while we are setting up
163 // the event loop. Otherwise we can't fetch or send until the loop runs. This
164 // means that on each change, we need to redo all this work. This makes setup
165 // more expensive, but not by all that much on a modern processor.
166
167 // Now, build up a report with everything pre-filled out.
168 flatbuffers::FlatBufferBuilder fbb;
169 fbb.ForceDefaults(1);
170
171 // Pre-fill in the defaults for timers.
172 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
173 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
174 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
175 timing::CreateStatistic(fbb);
176 flatbuffers::Offset<timing::Statistic> handler_time_offset =
177 timing::CreateStatistic(fbb);
178 flatbuffers::Offset<flatbuffers::String> name_offset;
179 if (timer->name().size() != 0) {
180 name_offset = fbb.CreateString(timer->name());
181 }
182
183 timing::Timer::Builder timer_builder(fbb);
184
185 if (timer->name().size() != 0) {
186 timer_builder.add_name(name_offset);
187 }
188 timer_builder.add_wakeup_latency(wakeup_latency_offset);
189 timer_builder.add_handler_time(handler_time_offset);
190 timer_builder.add_count(0);
191 timer_offsets.emplace_back(timer_builder.Finish());
192 }
193
194 // Pre-fill in the defaults for phased_loops.
195 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
196 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
197 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
198 timing::CreateStatistic(fbb);
199 flatbuffers::Offset<timing::Statistic> handler_time_offset =
200 timing::CreateStatistic(fbb);
201 flatbuffers::Offset<flatbuffers::String> name_offset;
202 if (phased_loop->name().size() != 0) {
203 name_offset = fbb.CreateString(phased_loop->name());
204 }
205
206 timing::Timer::Builder timer_builder(fbb);
207
208 if (phased_loop->name().size() != 0) {
209 timer_builder.add_name(name_offset);
210 }
211 timer_builder.add_wakeup_latency(wakeup_latency_offset);
212 timer_builder.add_handler_time(handler_time_offset);
213 timer_builder.add_count(0);
214 phased_loop_offsets.emplace_back(timer_builder.Finish());
215 }
216
217 // Pre-fill in the defaults for watchers.
218 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
219 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
220 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
221 timing::CreateStatistic(fbb);
222 flatbuffers::Offset<timing::Statistic> handler_time_offset =
223 timing::CreateStatistic(fbb);
224
225 timing::Watcher::Builder watcher_builder(fbb);
226
227 watcher_builder.add_channel_index(watcher->channel_index());
228 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
229 watcher_builder.add_handler_time(handler_time_offset);
230 watcher_builder.add_count(0);
231 watcher_offsets.emplace_back(watcher_builder.Finish());
232 }
233
234 // Pre-fill in the defaults for senders.
235 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
236 for (const RawSender *sender : senders_) {
237 flatbuffers::Offset<timing::Statistic> size_offset =
238 timing::CreateStatistic(fbb);
239
240 timing::Sender::Builder sender_builder(fbb);
241
242 sender_builder.add_channel_index(sender->timing_.channel_index);
243 sender_builder.add_size(size_offset);
244 sender_builder.add_count(0);
245 sender_offsets.emplace_back(sender_builder.Finish());
246 }
247
248 // Pre-fill in the defaults for fetchers.
249 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
250 for (RawFetcher *fetcher : fetchers_) {
251 flatbuffers::Offset<timing::Statistic> latency_offset =
252 timing::CreateStatistic(fbb);
253
254 timing::Fetcher::Builder fetcher_builder(fbb);
255
256 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
257 fetcher_builder.add_count(0);
258 fetcher_builder.add_latency(latency_offset);
259 fetcher_offsets.emplace_back(fetcher_builder.Finish());
260 }
261
262 // Then build the final report.
263 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
264 timers_offset;
265 if (timer_offsets.size() > 0) {
266 timers_offset = fbb.CreateVector(timer_offsets);
267 }
268
269 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
270 phased_loops_offset;
271 if (phased_loop_offsets.size() > 0) {
272 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
273 }
274
275 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
276 watchers_offset;
277 if (watcher_offsets.size() > 0) {
278 watchers_offset = fbb.CreateVector(watcher_offsets);
279 }
280
281 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
282 senders_offset;
283 if (sender_offsets.size() > 0) {
284 senders_offset = fbb.CreateVector(sender_offsets);
285 }
286
287 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
288 fetchers_offset;
289 if (fetcher_offsets.size() > 0) {
290 fetchers_offset = fbb.CreateVector(fetcher_offsets);
291 }
292
293 flatbuffers::Offset<flatbuffers::String> name_offset =
294 fbb.CreateString(name());
295
296 timing::Report::Builder report_builder(fbb);
297 report_builder.add_name(name_offset);
298 report_builder.add_pid(GetTid());
299 if (timer_offsets.size() > 0) {
300 report_builder.add_timers(timers_offset);
301 }
302 if (phased_loop_offsets.size() > 0) {
303 report_builder.add_phased_loops(phased_loops_offset);
304 }
305 if (watcher_offsets.size() > 0) {
306 report_builder.add_watchers(watchers_offset);
307 }
308 if (sender_offsets.size() > 0) {
309 report_builder.add_senders(senders_offset);
310 }
311 if (fetcher_offsets.size() > 0) {
312 report_builder.add_fetchers(fetchers_offset);
313 }
314 fbb.Finish(report_builder.Finish());
315
316 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
317
318 // Now that the pointers are stable, pass them to the timers and watchers to
319 // be updated.
320 for (size_t i = 0; i < timers_.size(); ++i) {
321 timers_[i]->timing_.set_timing_report(
322 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
323 i));
324 }
325
326 for (size_t i = 0; i < phased_loops_.size(); ++i) {
327 phased_loops_[i]->timing_.set_timing_report(
328 timing_report_.mutable_message()
329 ->mutable_phased_loops()
330 ->GetMutableObject(i));
331 }
332
333 for (size_t i = 0; i < watchers_.size(); ++i) {
334 watchers_[i]->set_timing_report(
335 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
336 i));
337 }
338
339 for (size_t i = 0; i < senders_.size(); ++i) {
340 senders_[i]->timing_.set_timing_report(
341 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
342 i));
343 }
344
345 for (size_t i = 0; i < fetchers_.size(); ++i) {
346 fetchers_[i]->timing_.set_timing_report(
347 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
348 i));
349 }
350}
351
352void EventLoop::MaybeScheduleTimingReports() {
353 if (FLAGS_timing_reports && !skip_timing_report_) {
354 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
355 // Make a raw sender for the report.
356 const Channel *channel = configuration::GetChannel(
357 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
358 name());
359 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
360 << timing::Report::GetFullyQualifiedName()
361 << "\" } not found in config.";
362 timing_report_sender_ = MakeRawSender(channel);
363
364 // Register a handler which sends the report out by copying the raw data
365 // from the prebuilt and subsequently modified report.
366 TimerHandler *timing_reports_timer =
367 AddTimer([this]() { SendTimingReport(); });
368
369 // Set it up to send once per second.
370 timing_reports_timer->set_name("timing_reports");
371 OnRun([this, timing_reports_timer]() {
372 timing_reports_timer->Setup(
373 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
374 std::chrono::milliseconds(FLAGS_timing_report_ms));
375 });
376
377 UpdateTimingReport();
378 }
379}
380
Austin Schuh7d87b672019-12-01 20:23:49 -0800381void EventLoop::ReserveEvents() {
382 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
383}
384
385namespace {
386bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
387 return first->event_time() > second->event_time();
388}
389} // namespace
390
391void EventLoop::AddEvent(EventLoopEvent *event) {
392 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
393 events_.push_back(event);
394 std::push_heap(events_.begin(), events_.end(), CompareEvents);
395}
396
397void EventLoop::RemoveEvent(EventLoopEvent *event) {
398 auto e = std::find(events_.begin(), events_.end(), event);
399 if (e != events_.end()) {
400 events_.erase(e);
401 std::make_heap(events_.begin(), events_.end(), CompareEvents);
402 event->Invalidate();
403 }
404}
405
406EventLoopEvent *EventLoop::PopEvent() {
407 EventLoopEvent *result = events_.front();
408 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
409 events_.pop_back();
410 result->Invalidate();
411 return result;
412}
413
Austin Schuh39788ff2019-12-01 18:22:57 -0800414void WatcherState::set_timing_report(timing::Watcher *watcher) {
415 CHECK_NOTNULL(watcher);
416 watcher_ = watcher;
417 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
418 handler_time_.set_statistic(watcher->mutable_handler_time());
419}
420
421void WatcherState::ResetReport() {
422 wakeup_latency_.Reset();
423 handler_time_.Reset();
424 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800425}
426
427} // namespace aos