blob: ee3365def9c76aa411cf95ceca8c7f263c081e56 [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
5#include "glog/logging.h"
6
Austin Schuh39788ff2019-12-01 18:22:57 -08007DEFINE_bool(timing_reports, true, "Publish timing reports.");
8DEFINE_int32(timing_report_ms, 1000,
9 "Period in milliseconds to publish timing reports at.");
10
Austin Schuh54cf95f2019-11-29 13:14:18 -080011namespace aos {
12
Austin Schuh39788ff2019-12-01 18:22:57 -080013RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
14 : event_loop_(event_loop),
15 channel_(channel),
16 timing_(event_loop_->ChannelIndex(channel)) {
17 event_loop_->NewSender(this);
18}
19
20RawSender::~RawSender() { event_loop_->DeleteSender(this); }
21
22RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
23 : event_loop_(event_loop),
24 channel_(channel),
25 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080026 context_.monotonic_event_time = monotonic_clock::min_time;
27 context_.monotonic_remote_time = monotonic_clock::min_time;
28 context_.realtime_event_time = realtime_clock::min_time;
29 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080030 context_.queue_index = 0xffffffff;
31 context_.size = 0;
32 context_.data = nullptr;
33 event_loop_->NewFetcher(this);
34}
35
36RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
37
38TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
39 : event_loop_(event_loop), fn_(std::move(fn)) {}
40
41TimerHandler::~TimerHandler() {}
42
43PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
44 std::function<void(int)> fn,
45 const monotonic_clock::duration interval,
46 const monotonic_clock::duration offset)
47 : event_loop_(event_loop),
48 fn_(std::move(fn)),
49 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
50 event_loop_->OnRun([this]() {
51 const monotonic_clock::time_point monotonic_now =
52 event_loop_->monotonic_now();
53 phased_loop_.Reset(monotonic_now);
54 Reschedule(
55 [this](monotonic_clock::time_point sleep_time) {
56 Schedule(sleep_time);
57 },
58 monotonic_now);
59 // The first time, we'll double count. Reschedule here will count cycles
60 // elapsed before now, and then the reschedule before runing the handler
61 // will count the time that elapsed then. So clear the count here.
62 cycles_elapsed_ = 0;
63 });
64}
65
66PhasedLoopHandler::~PhasedLoopHandler() {}
67
68EventLoop::~EventLoop() {
69 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -080070 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -080071}
72
73int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuh54cf95f2019-11-29 13:14:18 -080074 CHECK(configuration_->channels() != nullptr) << ": No channels";
75
Austin Schuh39788ff2019-12-01 18:22:57 -080076 auto c = std::find(configuration_->channels()->begin(),
77 configuration_->channels()->end(), channel);
78 CHECK(c != configuration_->channels()->end())
Austin Schuh54cf95f2019-11-29 13:14:18 -080079 << ": Channel pointer not found in configuration()->channels()";
Austin Schuh39788ff2019-12-01 18:22:57 -080080
81 return std::distance(configuration()->channels()->begin(), c);
82}
83
84void EventLoop::NewSender(RawSender *sender) {
85 senders_.emplace_back(sender);
86 UpdateTimingReport();
87}
88void EventLoop::DeleteSender(RawSender *sender) {
89 CHECK(!is_running());
90 auto s = std::find(senders_.begin(), senders_.end(), sender);
91 CHECK(s != senders_.end()) << ": Sender not in senders list";
92 senders_.erase(s);
93 UpdateTimingReport();
94}
95
96TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
97 timers_.emplace_back(std::move(timer));
98 UpdateTimingReport();
99 return timers_.back().get();
100}
101
102PhasedLoopHandler *EventLoop::NewPhasedLoop(
103 std::unique_ptr<PhasedLoopHandler> phased_loop) {
104 phased_loops_.emplace_back(std::move(phased_loop));
105 UpdateTimingReport();
106 return phased_loops_.back().get();
107}
108
109void EventLoop::NewFetcher(RawFetcher *fetcher) {
110 fetchers_.emplace_back(fetcher);
111 UpdateTimingReport();
112}
113
114void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
115 CHECK(!is_running());
116 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
117 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
118 fetchers_.erase(f);
119 UpdateTimingReport();
120}
121
122WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
123 watchers_.emplace_back(std::move(watcher));
124
125 UpdateTimingReport();
126
127 return watchers_.back().get();
128}
129
130void EventLoop::SendTimingReport() {
131 // We need to do a fancy dance here to get all the accounting to work right.
132 // We want to copy the memory here, but then send after resetting. Otherwise
133 // the send for the timing report won't be counted in the timing report.
134 //
135 // Also, flatbuffers build from the back end. So place this at the back end
136 // of the buffer. We only have to care because we are using this in a very
137 // raw fashion.
138 CHECK_LE(timing_report_.size(), timing_report_sender_->size())
139 << ": Timing report bigger than the sender size.";
140 std::copy(timing_report_.data(),
141 timing_report_.data() + timing_report_.size(),
142 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
143 timing_report_sender_->size() - timing_report_.size());
144
145 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
146 timer->timing_.ResetTimingReport();
147 }
148 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
149 watcher->ResetReport();
150 }
151 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
152 phased_loop->timing_.ResetTimingReport();
153 }
154 for (RawSender *sender : senders_) {
155 sender->timing_.ResetTimingReport();
156 }
157 for (RawFetcher *fetcher : fetchers_) {
158 fetcher->timing_.ResetTimingReport();
159 }
160 timing_report_sender_->Send(timing_report_.size());
161}
162
163void EventLoop::UpdateTimingReport() {
164 // We need to support senders and fetchers changing while we are setting up
165 // the event loop. Otherwise we can't fetch or send until the loop runs. This
166 // means that on each change, we need to redo all this work. This makes setup
167 // more expensive, but not by all that much on a modern processor.
168
169 // Now, build up a report with everything pre-filled out.
170 flatbuffers::FlatBufferBuilder fbb;
171 fbb.ForceDefaults(1);
172
173 // Pre-fill in the defaults for timers.
174 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
175 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
176 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
177 timing::CreateStatistic(fbb);
178 flatbuffers::Offset<timing::Statistic> handler_time_offset =
179 timing::CreateStatistic(fbb);
180 flatbuffers::Offset<flatbuffers::String> name_offset;
181 if (timer->name().size() != 0) {
182 name_offset = fbb.CreateString(timer->name());
183 }
184
185 timing::Timer::Builder timer_builder(fbb);
186
187 if (timer->name().size() != 0) {
188 timer_builder.add_name(name_offset);
189 }
190 timer_builder.add_wakeup_latency(wakeup_latency_offset);
191 timer_builder.add_handler_time(handler_time_offset);
192 timer_builder.add_count(0);
193 timer_offsets.emplace_back(timer_builder.Finish());
194 }
195
196 // Pre-fill in the defaults for phased_loops.
197 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
198 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
199 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
200 timing::CreateStatistic(fbb);
201 flatbuffers::Offset<timing::Statistic> handler_time_offset =
202 timing::CreateStatistic(fbb);
203 flatbuffers::Offset<flatbuffers::String> name_offset;
204 if (phased_loop->name().size() != 0) {
205 name_offset = fbb.CreateString(phased_loop->name());
206 }
207
208 timing::Timer::Builder timer_builder(fbb);
209
210 if (phased_loop->name().size() != 0) {
211 timer_builder.add_name(name_offset);
212 }
213 timer_builder.add_wakeup_latency(wakeup_latency_offset);
214 timer_builder.add_handler_time(handler_time_offset);
215 timer_builder.add_count(0);
216 phased_loop_offsets.emplace_back(timer_builder.Finish());
217 }
218
219 // Pre-fill in the defaults for watchers.
220 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
221 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
222 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
223 timing::CreateStatistic(fbb);
224 flatbuffers::Offset<timing::Statistic> handler_time_offset =
225 timing::CreateStatistic(fbb);
226
227 timing::Watcher::Builder watcher_builder(fbb);
228
229 watcher_builder.add_channel_index(watcher->channel_index());
230 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
231 watcher_builder.add_handler_time(handler_time_offset);
232 watcher_builder.add_count(0);
233 watcher_offsets.emplace_back(watcher_builder.Finish());
234 }
235
236 // Pre-fill in the defaults for senders.
237 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
238 for (const RawSender *sender : senders_) {
239 flatbuffers::Offset<timing::Statistic> size_offset =
240 timing::CreateStatistic(fbb);
241
242 timing::Sender::Builder sender_builder(fbb);
243
244 sender_builder.add_channel_index(sender->timing_.channel_index);
245 sender_builder.add_size(size_offset);
246 sender_builder.add_count(0);
247 sender_offsets.emplace_back(sender_builder.Finish());
248 }
249
250 // Pre-fill in the defaults for fetchers.
251 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
252 for (RawFetcher *fetcher : fetchers_) {
253 flatbuffers::Offset<timing::Statistic> latency_offset =
254 timing::CreateStatistic(fbb);
255
256 timing::Fetcher::Builder fetcher_builder(fbb);
257
258 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
259 fetcher_builder.add_count(0);
260 fetcher_builder.add_latency(latency_offset);
261 fetcher_offsets.emplace_back(fetcher_builder.Finish());
262 }
263
264 // Then build the final report.
265 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
266 timers_offset;
267 if (timer_offsets.size() > 0) {
268 timers_offset = fbb.CreateVector(timer_offsets);
269 }
270
271 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
272 phased_loops_offset;
273 if (phased_loop_offsets.size() > 0) {
274 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
275 }
276
277 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
278 watchers_offset;
279 if (watcher_offsets.size() > 0) {
280 watchers_offset = fbb.CreateVector(watcher_offsets);
281 }
282
283 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
284 senders_offset;
285 if (sender_offsets.size() > 0) {
286 senders_offset = fbb.CreateVector(sender_offsets);
287 }
288
289 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
290 fetchers_offset;
291 if (fetcher_offsets.size() > 0) {
292 fetchers_offset = fbb.CreateVector(fetcher_offsets);
293 }
294
295 flatbuffers::Offset<flatbuffers::String> name_offset =
296 fbb.CreateString(name());
297
298 timing::Report::Builder report_builder(fbb);
299 report_builder.add_name(name_offset);
300 report_builder.add_pid(GetTid());
301 if (timer_offsets.size() > 0) {
302 report_builder.add_timers(timers_offset);
303 }
304 if (phased_loop_offsets.size() > 0) {
305 report_builder.add_phased_loops(phased_loops_offset);
306 }
307 if (watcher_offsets.size() > 0) {
308 report_builder.add_watchers(watchers_offset);
309 }
310 if (sender_offsets.size() > 0) {
311 report_builder.add_senders(senders_offset);
312 }
313 if (fetcher_offsets.size() > 0) {
314 report_builder.add_fetchers(fetchers_offset);
315 }
316 fbb.Finish(report_builder.Finish());
317
318 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
319
320 // Now that the pointers are stable, pass them to the timers and watchers to
321 // be updated.
322 for (size_t i = 0; i < timers_.size(); ++i) {
323 timers_[i]->timing_.set_timing_report(
324 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
325 i));
326 }
327
328 for (size_t i = 0; i < phased_loops_.size(); ++i) {
329 phased_loops_[i]->timing_.set_timing_report(
330 timing_report_.mutable_message()
331 ->mutable_phased_loops()
332 ->GetMutableObject(i));
333 }
334
335 for (size_t i = 0; i < watchers_.size(); ++i) {
336 watchers_[i]->set_timing_report(
337 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
338 i));
339 }
340
341 for (size_t i = 0; i < senders_.size(); ++i) {
342 senders_[i]->timing_.set_timing_report(
343 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
344 i));
345 }
346
347 for (size_t i = 0; i < fetchers_.size(); ++i) {
348 fetchers_[i]->timing_.set_timing_report(
349 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
350 i));
351 }
352}
353
354void EventLoop::MaybeScheduleTimingReports() {
355 if (FLAGS_timing_reports && !skip_timing_report_) {
356 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
357 // Make a raw sender for the report.
358 const Channel *channel = configuration::GetChannel(
359 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800360 name(), node());
361
362 // Since we are using a RawSender, validity isn't checked. So check it
363 // ourselves.
364 if (node() != nullptr) {
365 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
366 LOG(FATAL) << "Channel { \"name\": \"/aos"
367 << channel->name()->string_view() << "\", \"type\": \""
368 << channel->type()->string_view()
369 << "\" } is not able to be sent on this node. Check your "
370 "configuration.";
371 }
372 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800373 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
374 << timing::Report::GetFullyQualifiedName()
375 << "\" } not found in config.";
376 timing_report_sender_ = MakeRawSender(channel);
377
378 // Register a handler which sends the report out by copying the raw data
379 // from the prebuilt and subsequently modified report.
380 TimerHandler *timing_reports_timer =
381 AddTimer([this]() { SendTimingReport(); });
382
383 // Set it up to send once per second.
384 timing_reports_timer->set_name("timing_reports");
385 OnRun([this, timing_reports_timer]() {
386 timing_reports_timer->Setup(
387 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
388 std::chrono::milliseconds(FLAGS_timing_report_ms));
389 });
390
391 UpdateTimingReport();
392 }
393}
394
Austin Schuh7d87b672019-12-01 20:23:49 -0800395void EventLoop::ReserveEvents() {
396 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
397}
398
399namespace {
400bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
401 return first->event_time() > second->event_time();
402}
403} // namespace
404
405void EventLoop::AddEvent(EventLoopEvent *event) {
406 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
407 events_.push_back(event);
408 std::push_heap(events_.begin(), events_.end(), CompareEvents);
409}
410
411void EventLoop::RemoveEvent(EventLoopEvent *event) {
412 auto e = std::find(events_.begin(), events_.end(), event);
413 if (e != events_.end()) {
414 events_.erase(e);
415 std::make_heap(events_.begin(), events_.end(), CompareEvents);
416 event->Invalidate();
417 }
418}
419
420EventLoopEvent *EventLoop::PopEvent() {
421 EventLoopEvent *result = events_.front();
422 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
423 events_.pop_back();
424 result->Invalidate();
425 return result;
426}
427
Austin Schuh39788ff2019-12-01 18:22:57 -0800428void WatcherState::set_timing_report(timing::Watcher *watcher) {
429 CHECK_NOTNULL(watcher);
430 watcher_ = watcher;
431 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
432 handler_time_.set_statistic(watcher->mutable_handler_time());
433}
434
435void WatcherState::ResetReport() {
436 wakeup_latency_.Reset();
437 handler_time_.Reset();
438 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800439}
440
441} // namespace aos