blob: 5c7e49d43f7fb162c0258b60a9b46e24fc7e1693 [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08006#include "glog/logging.h"
7
Austin Schuh39788ff2019-12-01 18:22:57 -08008DEFINE_bool(timing_reports, true, "Publish timing reports.");
9DEFINE_int32(timing_report_ms, 1000,
10 "Period in milliseconds to publish timing reports at.");
11
Austin Schuh54cf95f2019-11-29 13:14:18 -080012namespace aos {
13
Austin Schuh39788ff2019-12-01 18:22:57 -080014RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
15 : event_loop_(event_loop),
16 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050017 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080018 timing_(event_loop_->ChannelIndex(channel)) {
19 event_loop_->NewSender(this);
20}
21
22RawSender::~RawSender() { event_loop_->DeleteSender(this); }
23
24RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
25 : event_loop_(event_loop),
26 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050027 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080028 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080029 context_.monotonic_event_time = monotonic_clock::min_time;
30 context_.monotonic_remote_time = monotonic_clock::min_time;
31 context_.realtime_event_time = realtime_clock::min_time;
32 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080033 context_.queue_index = 0xffffffff;
34 context_.size = 0;
35 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -070036 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -080037 event_loop_->NewFetcher(this);
38}
39
40RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
41
42TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
43 : event_loop_(event_loop), fn_(std::move(fn)) {}
44
45TimerHandler::~TimerHandler() {}
46
47PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
48 std::function<void(int)> fn,
49 const monotonic_clock::duration interval,
50 const monotonic_clock::duration offset)
51 : event_loop_(event_loop),
52 fn_(std::move(fn)),
53 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
54 event_loop_->OnRun([this]() {
55 const monotonic_clock::time_point monotonic_now =
56 event_loop_->monotonic_now();
57 phased_loop_.Reset(monotonic_now);
58 Reschedule(
59 [this](monotonic_clock::time_point sleep_time) {
60 Schedule(sleep_time);
61 },
62 monotonic_now);
63 // The first time, we'll double count. Reschedule here will count cycles
64 // elapsed before now, and then the reschedule before runing the handler
65 // will count the time that elapsed then. So clear the count here.
66 cycles_elapsed_ = 0;
67 });
68}
69
70PhasedLoopHandler::~PhasedLoopHandler() {}
71
Tyler Chatow67ddb032020-01-12 14:30:04 -080072EventLoop::EventLoop(const Configuration *configuration)
73 : timing_report_(flatbuffers::DetachedBuffer()),
74 configuration_(configuration) {
75 logging::Init();
76}
77
Austin Schuh39788ff2019-12-01 18:22:57 -080078EventLoop::~EventLoop() {
79 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -080080 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -080081}
82
83int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -080084 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -080085}
86
Brian Silverman5120afb2020-01-31 17:44:35 -080087WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
88 const int channel_index = ChannelIndex(channel);
89 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
90 if (watcher->channel_index() == channel_index) {
91 return watcher.get();
92 }
93 }
94 LOG(FATAL) << "No watcher found for channel";
95}
96
Austin Schuh39788ff2019-12-01 18:22:57 -080097void EventLoop::NewSender(RawSender *sender) {
98 senders_.emplace_back(sender);
99 UpdateTimingReport();
100}
101void EventLoop::DeleteSender(RawSender *sender) {
102 CHECK(!is_running());
103 auto s = std::find(senders_.begin(), senders_.end(), sender);
104 CHECK(s != senders_.end()) << ": Sender not in senders list";
105 senders_.erase(s);
106 UpdateTimingReport();
107}
108
109TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
110 timers_.emplace_back(std::move(timer));
111 UpdateTimingReport();
112 return timers_.back().get();
113}
114
115PhasedLoopHandler *EventLoop::NewPhasedLoop(
116 std::unique_ptr<PhasedLoopHandler> phased_loop) {
117 phased_loops_.emplace_back(std::move(phased_loop));
118 UpdateTimingReport();
119 return phased_loops_.back().get();
120}
121
122void EventLoop::NewFetcher(RawFetcher *fetcher) {
123 fetchers_.emplace_back(fetcher);
124 UpdateTimingReport();
125}
126
127void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
128 CHECK(!is_running());
129 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
130 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
131 fetchers_.erase(f);
132 UpdateTimingReport();
133}
134
135WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
136 watchers_.emplace_back(std::move(watcher));
137
138 UpdateTimingReport();
139
140 return watchers_.back().get();
141}
142
Brian Silverman0fc69932020-01-24 21:54:02 -0800143void EventLoop::TakeWatcher(const Channel *channel) {
144 CHECK(!is_running()) << ": Cannot add new objects while running.";
145 ChannelIndex(channel);
146
147 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800148 << ": " << configuration::CleanedChannelToString(channel)
149 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800150
151 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800152 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800153 << " is already being used.";
154
155 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800156 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800157 << " is not able to be watched on this node. Check your "
158 "configuration.";
159 }
160}
161
162void EventLoop::TakeSender(const Channel *channel) {
163 CHECK(!is_running()) << ": Cannot add new objects while running.";
164 ChannelIndex(channel);
165
166 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800167 << ": Channel " << configuration::CleanedChannelToString(channel)
168 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800169
170 // We don't care if this is a duplicate.
171 taken_senders_.insert(channel);
172}
173
Austin Schuh39788ff2019-12-01 18:22:57 -0800174void EventLoop::SendTimingReport() {
175 // We need to do a fancy dance here to get all the accounting to work right.
176 // We want to copy the memory here, but then send after resetting. Otherwise
177 // the send for the timing report won't be counted in the timing report.
178 //
179 // Also, flatbuffers build from the back end. So place this at the back end
180 // of the buffer. We only have to care because we are using this in a very
181 // raw fashion.
182 CHECK_LE(timing_report_.size(), timing_report_sender_->size())
183 << ": Timing report bigger than the sender size.";
184 std::copy(timing_report_.data(),
185 timing_report_.data() + timing_report_.size(),
186 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
187 timing_report_sender_->size() - timing_report_.size());
188
189 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
190 timer->timing_.ResetTimingReport();
191 }
192 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
193 watcher->ResetReport();
194 }
195 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
196 phased_loop->timing_.ResetTimingReport();
197 }
198 for (RawSender *sender : senders_) {
199 sender->timing_.ResetTimingReport();
200 }
201 for (RawFetcher *fetcher : fetchers_) {
202 fetcher->timing_.ResetTimingReport();
203 }
204 timing_report_sender_->Send(timing_report_.size());
205}
206
207void EventLoop::UpdateTimingReport() {
208 // We need to support senders and fetchers changing while we are setting up
209 // the event loop. Otherwise we can't fetch or send until the loop runs. This
210 // means that on each change, we need to redo all this work. This makes setup
211 // more expensive, but not by all that much on a modern processor.
212
213 // Now, build up a report with everything pre-filled out.
214 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800215 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800216
217 // Pre-fill in the defaults for timers.
218 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
219 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
220 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
221 timing::CreateStatistic(fbb);
222 flatbuffers::Offset<timing::Statistic> handler_time_offset =
223 timing::CreateStatistic(fbb);
224 flatbuffers::Offset<flatbuffers::String> name_offset;
225 if (timer->name().size() != 0) {
226 name_offset = fbb.CreateString(timer->name());
227 }
228
229 timing::Timer::Builder timer_builder(fbb);
230
231 if (timer->name().size() != 0) {
232 timer_builder.add_name(name_offset);
233 }
234 timer_builder.add_wakeup_latency(wakeup_latency_offset);
235 timer_builder.add_handler_time(handler_time_offset);
236 timer_builder.add_count(0);
237 timer_offsets.emplace_back(timer_builder.Finish());
238 }
239
240 // Pre-fill in the defaults for phased_loops.
241 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
242 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
243 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
244 timing::CreateStatistic(fbb);
245 flatbuffers::Offset<timing::Statistic> handler_time_offset =
246 timing::CreateStatistic(fbb);
247 flatbuffers::Offset<flatbuffers::String> name_offset;
248 if (phased_loop->name().size() != 0) {
249 name_offset = fbb.CreateString(phased_loop->name());
250 }
251
252 timing::Timer::Builder timer_builder(fbb);
253
254 if (phased_loop->name().size() != 0) {
255 timer_builder.add_name(name_offset);
256 }
257 timer_builder.add_wakeup_latency(wakeup_latency_offset);
258 timer_builder.add_handler_time(handler_time_offset);
259 timer_builder.add_count(0);
260 phased_loop_offsets.emplace_back(timer_builder.Finish());
261 }
262
263 // Pre-fill in the defaults for watchers.
264 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
265 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
266 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
267 timing::CreateStatistic(fbb);
268 flatbuffers::Offset<timing::Statistic> handler_time_offset =
269 timing::CreateStatistic(fbb);
270
271 timing::Watcher::Builder watcher_builder(fbb);
272
273 watcher_builder.add_channel_index(watcher->channel_index());
274 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
275 watcher_builder.add_handler_time(handler_time_offset);
276 watcher_builder.add_count(0);
277 watcher_offsets.emplace_back(watcher_builder.Finish());
278 }
279
280 // Pre-fill in the defaults for senders.
281 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
282 for (const RawSender *sender : senders_) {
283 flatbuffers::Offset<timing::Statistic> size_offset =
284 timing::CreateStatistic(fbb);
285
286 timing::Sender::Builder sender_builder(fbb);
287
288 sender_builder.add_channel_index(sender->timing_.channel_index);
289 sender_builder.add_size(size_offset);
290 sender_builder.add_count(0);
291 sender_offsets.emplace_back(sender_builder.Finish());
292 }
293
294 // Pre-fill in the defaults for fetchers.
295 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
296 for (RawFetcher *fetcher : fetchers_) {
297 flatbuffers::Offset<timing::Statistic> latency_offset =
298 timing::CreateStatistic(fbb);
299
300 timing::Fetcher::Builder fetcher_builder(fbb);
301
302 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
303 fetcher_builder.add_count(0);
304 fetcher_builder.add_latency(latency_offset);
305 fetcher_offsets.emplace_back(fetcher_builder.Finish());
306 }
307
308 // Then build the final report.
309 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
310 timers_offset;
311 if (timer_offsets.size() > 0) {
312 timers_offset = fbb.CreateVector(timer_offsets);
313 }
314
315 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
316 phased_loops_offset;
317 if (phased_loop_offsets.size() > 0) {
318 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
319 }
320
321 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
322 watchers_offset;
323 if (watcher_offsets.size() > 0) {
324 watchers_offset = fbb.CreateVector(watcher_offsets);
325 }
326
327 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
328 senders_offset;
329 if (sender_offsets.size() > 0) {
330 senders_offset = fbb.CreateVector(sender_offsets);
331 }
332
333 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
334 fetchers_offset;
335 if (fetcher_offsets.size() > 0) {
336 fetchers_offset = fbb.CreateVector(fetcher_offsets);
337 }
338
339 flatbuffers::Offset<flatbuffers::String> name_offset =
340 fbb.CreateString(name());
341
342 timing::Report::Builder report_builder(fbb);
343 report_builder.add_name(name_offset);
344 report_builder.add_pid(GetTid());
345 if (timer_offsets.size() > 0) {
346 report_builder.add_timers(timers_offset);
347 }
348 if (phased_loop_offsets.size() > 0) {
349 report_builder.add_phased_loops(phased_loops_offset);
350 }
351 if (watcher_offsets.size() > 0) {
352 report_builder.add_watchers(watchers_offset);
353 }
354 if (sender_offsets.size() > 0) {
355 report_builder.add_senders(senders_offset);
356 }
357 if (fetcher_offsets.size() > 0) {
358 report_builder.add_fetchers(fetchers_offset);
359 }
360 fbb.Finish(report_builder.Finish());
361
362 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
363
364 // Now that the pointers are stable, pass them to the timers and watchers to
365 // be updated.
366 for (size_t i = 0; i < timers_.size(); ++i) {
367 timers_[i]->timing_.set_timing_report(
368 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
369 i));
370 }
371
372 for (size_t i = 0; i < phased_loops_.size(); ++i) {
373 phased_loops_[i]->timing_.set_timing_report(
374 timing_report_.mutable_message()
375 ->mutable_phased_loops()
376 ->GetMutableObject(i));
377 }
378
379 for (size_t i = 0; i < watchers_.size(); ++i) {
380 watchers_[i]->set_timing_report(
381 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
382 i));
383 }
384
385 for (size_t i = 0; i < senders_.size(); ++i) {
386 senders_[i]->timing_.set_timing_report(
387 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
388 i));
389 }
390
391 for (size_t i = 0; i < fetchers_.size(); ++i) {
392 fetchers_[i]->timing_.set_timing_report(
393 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
394 i));
395 }
396}
397
398void EventLoop::MaybeScheduleTimingReports() {
399 if (FLAGS_timing_reports && !skip_timing_report_) {
400 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
401 // Make a raw sender for the report.
402 const Channel *channel = configuration::GetChannel(
403 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800404 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700405 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
406 "\"type\": \"aos.timing.Report\"} on node "
407 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800408
409 // Since we are using a RawSender, validity isn't checked. So check it
410 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800411 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
412 LOG(FATAL) << "Channel { \"name\": \"/aos"
413 << channel->name()->string_view() << "\", \"type\": \""
414 << channel->type()->string_view()
415 << "\" } is not able to be sent on this node. Check your "
416 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800417 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800418 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
419 << timing::Report::GetFullyQualifiedName()
420 << "\" } not found in config.";
421 timing_report_sender_ = MakeRawSender(channel);
422
423 // Register a handler which sends the report out by copying the raw data
424 // from the prebuilt and subsequently modified report.
425 TimerHandler *timing_reports_timer =
426 AddTimer([this]() { SendTimingReport(); });
427
428 // Set it up to send once per second.
429 timing_reports_timer->set_name("timing_reports");
430 OnRun([this, timing_reports_timer]() {
431 timing_reports_timer->Setup(
432 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
433 std::chrono::milliseconds(FLAGS_timing_report_ms));
434 });
435
436 UpdateTimingReport();
437 }
438}
439
Austin Schuh7d87b672019-12-01 20:23:49 -0800440void EventLoop::ReserveEvents() {
441 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
442}
443
444namespace {
445bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700446 if (first->event_time() > second->event_time()) {
447 return true;
448 }
449 if (first->event_time() < second->event_time()) {
450 return false;
451 }
452 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800453}
454} // namespace
455
456void EventLoop::AddEvent(EventLoopEvent *event) {
457 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700458 DCHECK(event->generation() == 0);
459 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800460 events_.push_back(event);
461 std::push_heap(events_.begin(), events_.end(), CompareEvents);
462}
463
464void EventLoop::RemoveEvent(EventLoopEvent *event) {
465 auto e = std::find(events_.begin(), events_.end(), event);
466 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700467 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800468 events_.erase(e);
469 std::make_heap(events_.begin(), events_.end(), CompareEvents);
470 event->Invalidate();
471 }
472}
473
474EventLoopEvent *EventLoop::PopEvent() {
475 EventLoopEvent *result = events_.front();
476 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
477 events_.pop_back();
478 result->Invalidate();
479 return result;
480}
481
Austin Schuh39788ff2019-12-01 18:22:57 -0800482void WatcherState::set_timing_report(timing::Watcher *watcher) {
483 CHECK_NOTNULL(watcher);
484 watcher_ = watcher;
485 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
486 handler_time_.set_statistic(watcher->mutable_handler_time());
487}
488
489void WatcherState::ResetReport() {
490 wakeup_latency_.Reset();
491 handler_time_.Reset();
492 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800493}
494
495} // namespace aos