blob: 639e337991d08c2de1d74196d77716598d018e4b [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08006#include "glog/logging.h"
7
Austin Schuh39788ff2019-12-01 18:22:57 -08008DEFINE_bool(timing_reports, true, "Publish timing reports.");
9DEFINE_int32(timing_report_ms, 1000,
10 "Period in milliseconds to publish timing reports at.");
11
Austin Schuh54cf95f2019-11-29 13:14:18 -080012namespace aos {
Austin Schuhd54780b2020-10-03 16:26:02 -070013namespace {
14void CheckAlignment(const Channel *channel) {
15 if (channel->max_size() % alignof(flatbuffers::largest_scalar_t) != 0) {
16 LOG(FATAL) << "max_size() (" << channel->max_size()
17 << ") is not a multiple of alignment ("
18 << alignof(flatbuffers::largest_scalar_t) << ") for channel "
19 << configuration::CleanedChannelToString(channel) << ".";
20 }
21}
22} // namespace
Austin Schuh54cf95f2019-11-29 13:14:18 -080023
Austin Schuh39788ff2019-12-01 18:22:57 -080024RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
25 : event_loop_(event_loop),
26 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050027 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080028 timing_(event_loop_->ChannelIndex(channel)) {
29 event_loop_->NewSender(this);
30}
31
32RawSender::~RawSender() { event_loop_->DeleteSender(this); }
33
34RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
35 : event_loop_(event_loop),
36 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050037 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080038 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080039 context_.monotonic_event_time = monotonic_clock::min_time;
40 context_.monotonic_remote_time = monotonic_clock::min_time;
41 context_.realtime_event_time = realtime_clock::min_time;
42 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080043 context_.queue_index = 0xffffffff;
44 context_.size = 0;
45 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -070046 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -080047 event_loop_->NewFetcher(this);
48}
49
50RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
51
52TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
53 : event_loop_(event_loop), fn_(std::move(fn)) {}
54
55TimerHandler::~TimerHandler() {}
56
57PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
58 std::function<void(int)> fn,
59 const monotonic_clock::duration interval,
60 const monotonic_clock::duration offset)
61 : event_loop_(event_loop),
62 fn_(std::move(fn)),
63 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
64 event_loop_->OnRun([this]() {
65 const monotonic_clock::time_point monotonic_now =
66 event_loop_->monotonic_now();
67 phased_loop_.Reset(monotonic_now);
68 Reschedule(
69 [this](monotonic_clock::time_point sleep_time) {
70 Schedule(sleep_time);
71 },
72 monotonic_now);
73 // The first time, we'll double count. Reschedule here will count cycles
74 // elapsed before now, and then the reschedule before runing the handler
75 // will count the time that elapsed then. So clear the count here.
76 cycles_elapsed_ = 0;
77 });
78}
79
80PhasedLoopHandler::~PhasedLoopHandler() {}
81
Tyler Chatow67ddb032020-01-12 14:30:04 -080082EventLoop::EventLoop(const Configuration *configuration)
83 : timing_report_(flatbuffers::DetachedBuffer()),
84 configuration_(configuration) {
85 logging::Init();
86}
87
Austin Schuh39788ff2019-12-01 18:22:57 -080088EventLoop::~EventLoop() {
89 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -080090 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -080091}
92
93int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -080094 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -080095}
96
Brian Silverman5120afb2020-01-31 17:44:35 -080097WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
98 const int channel_index = ChannelIndex(channel);
99 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
100 if (watcher->channel_index() == channel_index) {
101 return watcher.get();
102 }
103 }
104 LOG(FATAL) << "No watcher found for channel";
105}
106
Austin Schuh39788ff2019-12-01 18:22:57 -0800107void EventLoop::NewSender(RawSender *sender) {
108 senders_.emplace_back(sender);
109 UpdateTimingReport();
110}
111void EventLoop::DeleteSender(RawSender *sender) {
112 CHECK(!is_running());
113 auto s = std::find(senders_.begin(), senders_.end(), sender);
114 CHECK(s != senders_.end()) << ": Sender not in senders list";
115 senders_.erase(s);
116 UpdateTimingReport();
117}
118
119TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
120 timers_.emplace_back(std::move(timer));
121 UpdateTimingReport();
122 return timers_.back().get();
123}
124
125PhasedLoopHandler *EventLoop::NewPhasedLoop(
126 std::unique_ptr<PhasedLoopHandler> phased_loop) {
127 phased_loops_.emplace_back(std::move(phased_loop));
128 UpdateTimingReport();
129 return phased_loops_.back().get();
130}
131
132void EventLoop::NewFetcher(RawFetcher *fetcher) {
Austin Schuhd54780b2020-10-03 16:26:02 -0700133 CheckAlignment(fetcher->channel());
134
Austin Schuh39788ff2019-12-01 18:22:57 -0800135 fetchers_.emplace_back(fetcher);
136 UpdateTimingReport();
137}
138
139void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
140 CHECK(!is_running());
141 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
142 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
143 fetchers_.erase(f);
144 UpdateTimingReport();
145}
146
147WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
148 watchers_.emplace_back(std::move(watcher));
149
150 UpdateTimingReport();
151
152 return watchers_.back().get();
153}
154
Brian Silverman0fc69932020-01-24 21:54:02 -0800155void EventLoop::TakeWatcher(const Channel *channel) {
156 CHECK(!is_running()) << ": Cannot add new objects while running.";
157 ChannelIndex(channel);
158
Austin Schuhd54780b2020-10-03 16:26:02 -0700159 CheckAlignment(channel);
160
Brian Silverman0fc69932020-01-24 21:54:02 -0800161 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800162 << ": " << configuration::CleanedChannelToString(channel)
163 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800164
165 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800166 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800167 << " is already being used.";
168
169 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800170 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800171 << " is not able to be watched on this node. Check your "
172 "configuration.";
173 }
174}
175
176void EventLoop::TakeSender(const Channel *channel) {
177 CHECK(!is_running()) << ": Cannot add new objects while running.";
178 ChannelIndex(channel);
179
Austin Schuhd54780b2020-10-03 16:26:02 -0700180 CheckAlignment(channel);
181
Brian Silverman0fc69932020-01-24 21:54:02 -0800182 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800183 << ": Channel " << configuration::CleanedChannelToString(channel)
184 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800185
186 // We don't care if this is a duplicate.
187 taken_senders_.insert(channel);
188}
189
Austin Schuh39788ff2019-12-01 18:22:57 -0800190void EventLoop::SendTimingReport() {
191 // We need to do a fancy dance here to get all the accounting to work right.
192 // We want to copy the memory here, but then send after resetting. Otherwise
193 // the send for the timing report won't be counted in the timing report.
194 //
195 // Also, flatbuffers build from the back end. So place this at the back end
196 // of the buffer. We only have to care because we are using this in a very
197 // raw fashion.
198 CHECK_LE(timing_report_.size(), timing_report_sender_->size())
199 << ": Timing report bigger than the sender size.";
200 std::copy(timing_report_.data(),
201 timing_report_.data() + timing_report_.size(),
202 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
203 timing_report_sender_->size() - timing_report_.size());
204
205 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
206 timer->timing_.ResetTimingReport();
207 }
208 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
209 watcher->ResetReport();
210 }
211 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
212 phased_loop->timing_.ResetTimingReport();
213 }
214 for (RawSender *sender : senders_) {
215 sender->timing_.ResetTimingReport();
216 }
217 for (RawFetcher *fetcher : fetchers_) {
218 fetcher->timing_.ResetTimingReport();
219 }
220 timing_report_sender_->Send(timing_report_.size());
221}
222
223void EventLoop::UpdateTimingReport() {
224 // We need to support senders and fetchers changing while we are setting up
225 // the event loop. Otherwise we can't fetch or send until the loop runs. This
226 // means that on each change, we need to redo all this work. This makes setup
227 // more expensive, but not by all that much on a modern processor.
228
229 // Now, build up a report with everything pre-filled out.
230 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800231 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800232
233 // Pre-fill in the defaults for timers.
234 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
235 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
236 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
237 timing::CreateStatistic(fbb);
238 flatbuffers::Offset<timing::Statistic> handler_time_offset =
239 timing::CreateStatistic(fbb);
240 flatbuffers::Offset<flatbuffers::String> name_offset;
241 if (timer->name().size() != 0) {
242 name_offset = fbb.CreateString(timer->name());
243 }
244
245 timing::Timer::Builder timer_builder(fbb);
246
247 if (timer->name().size() != 0) {
248 timer_builder.add_name(name_offset);
249 }
250 timer_builder.add_wakeup_latency(wakeup_latency_offset);
251 timer_builder.add_handler_time(handler_time_offset);
252 timer_builder.add_count(0);
253 timer_offsets.emplace_back(timer_builder.Finish());
254 }
255
256 // Pre-fill in the defaults for phased_loops.
257 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
258 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
259 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
260 timing::CreateStatistic(fbb);
261 flatbuffers::Offset<timing::Statistic> handler_time_offset =
262 timing::CreateStatistic(fbb);
263 flatbuffers::Offset<flatbuffers::String> name_offset;
264 if (phased_loop->name().size() != 0) {
265 name_offset = fbb.CreateString(phased_loop->name());
266 }
267
268 timing::Timer::Builder timer_builder(fbb);
269
270 if (phased_loop->name().size() != 0) {
271 timer_builder.add_name(name_offset);
272 }
273 timer_builder.add_wakeup_latency(wakeup_latency_offset);
274 timer_builder.add_handler_time(handler_time_offset);
275 timer_builder.add_count(0);
276 phased_loop_offsets.emplace_back(timer_builder.Finish());
277 }
278
279 // Pre-fill in the defaults for watchers.
280 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
281 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
282 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
283 timing::CreateStatistic(fbb);
284 flatbuffers::Offset<timing::Statistic> handler_time_offset =
285 timing::CreateStatistic(fbb);
286
287 timing::Watcher::Builder watcher_builder(fbb);
288
289 watcher_builder.add_channel_index(watcher->channel_index());
290 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
291 watcher_builder.add_handler_time(handler_time_offset);
292 watcher_builder.add_count(0);
293 watcher_offsets.emplace_back(watcher_builder.Finish());
294 }
295
296 // Pre-fill in the defaults for senders.
297 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
298 for (const RawSender *sender : senders_) {
299 flatbuffers::Offset<timing::Statistic> size_offset =
300 timing::CreateStatistic(fbb);
301
302 timing::Sender::Builder sender_builder(fbb);
303
304 sender_builder.add_channel_index(sender->timing_.channel_index);
305 sender_builder.add_size(size_offset);
306 sender_builder.add_count(0);
307 sender_offsets.emplace_back(sender_builder.Finish());
308 }
309
310 // Pre-fill in the defaults for fetchers.
311 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
312 for (RawFetcher *fetcher : fetchers_) {
313 flatbuffers::Offset<timing::Statistic> latency_offset =
314 timing::CreateStatistic(fbb);
315
316 timing::Fetcher::Builder fetcher_builder(fbb);
317
318 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
319 fetcher_builder.add_count(0);
320 fetcher_builder.add_latency(latency_offset);
321 fetcher_offsets.emplace_back(fetcher_builder.Finish());
322 }
323
324 // Then build the final report.
325 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
326 timers_offset;
327 if (timer_offsets.size() > 0) {
328 timers_offset = fbb.CreateVector(timer_offsets);
329 }
330
331 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
332 phased_loops_offset;
333 if (phased_loop_offsets.size() > 0) {
334 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
335 }
336
337 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
338 watchers_offset;
339 if (watcher_offsets.size() > 0) {
340 watchers_offset = fbb.CreateVector(watcher_offsets);
341 }
342
343 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
344 senders_offset;
345 if (sender_offsets.size() > 0) {
346 senders_offset = fbb.CreateVector(sender_offsets);
347 }
348
349 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
350 fetchers_offset;
351 if (fetcher_offsets.size() > 0) {
352 fetchers_offset = fbb.CreateVector(fetcher_offsets);
353 }
354
355 flatbuffers::Offset<flatbuffers::String> name_offset =
356 fbb.CreateString(name());
357
358 timing::Report::Builder report_builder(fbb);
359 report_builder.add_name(name_offset);
360 report_builder.add_pid(GetTid());
361 if (timer_offsets.size() > 0) {
362 report_builder.add_timers(timers_offset);
363 }
364 if (phased_loop_offsets.size() > 0) {
365 report_builder.add_phased_loops(phased_loops_offset);
366 }
367 if (watcher_offsets.size() > 0) {
368 report_builder.add_watchers(watchers_offset);
369 }
370 if (sender_offsets.size() > 0) {
371 report_builder.add_senders(senders_offset);
372 }
373 if (fetcher_offsets.size() > 0) {
374 report_builder.add_fetchers(fetchers_offset);
375 }
376 fbb.Finish(report_builder.Finish());
377
378 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
379
380 // Now that the pointers are stable, pass them to the timers and watchers to
381 // be updated.
382 for (size_t i = 0; i < timers_.size(); ++i) {
383 timers_[i]->timing_.set_timing_report(
384 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
385 i));
386 }
387
388 for (size_t i = 0; i < phased_loops_.size(); ++i) {
389 phased_loops_[i]->timing_.set_timing_report(
390 timing_report_.mutable_message()
391 ->mutable_phased_loops()
392 ->GetMutableObject(i));
393 }
394
395 for (size_t i = 0; i < watchers_.size(); ++i) {
396 watchers_[i]->set_timing_report(
397 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
398 i));
399 }
400
401 for (size_t i = 0; i < senders_.size(); ++i) {
402 senders_[i]->timing_.set_timing_report(
403 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
404 i));
405 }
406
407 for (size_t i = 0; i < fetchers_.size(); ++i) {
408 fetchers_[i]->timing_.set_timing_report(
409 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
410 i));
411 }
412}
413
414void EventLoop::MaybeScheduleTimingReports() {
415 if (FLAGS_timing_reports && !skip_timing_report_) {
416 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
417 // Make a raw sender for the report.
418 const Channel *channel = configuration::GetChannel(
419 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800420 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700421 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
422 "\"type\": \"aos.timing.Report\"} on node "
423 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800424
425 // Since we are using a RawSender, validity isn't checked. So check it
426 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800427 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
428 LOG(FATAL) << "Channel { \"name\": \"/aos"
429 << channel->name()->string_view() << "\", \"type\": \""
430 << channel->type()->string_view()
431 << "\" } is not able to be sent on this node. Check your "
432 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800433 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800434 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
435 << timing::Report::GetFullyQualifiedName()
436 << "\" } not found in config.";
437 timing_report_sender_ = MakeRawSender(channel);
438
439 // Register a handler which sends the report out by copying the raw data
440 // from the prebuilt and subsequently modified report.
441 TimerHandler *timing_reports_timer =
442 AddTimer([this]() { SendTimingReport(); });
443
444 // Set it up to send once per second.
445 timing_reports_timer->set_name("timing_reports");
446 OnRun([this, timing_reports_timer]() {
447 timing_reports_timer->Setup(
448 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
449 std::chrono::milliseconds(FLAGS_timing_report_ms));
450 });
451
452 UpdateTimingReport();
453 }
454}
455
Austin Schuh7d87b672019-12-01 20:23:49 -0800456void EventLoop::ReserveEvents() {
457 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
458}
459
460namespace {
461bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700462 if (first->event_time() > second->event_time()) {
463 return true;
464 }
465 if (first->event_time() < second->event_time()) {
466 return false;
467 }
468 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800469}
470} // namespace
471
472void EventLoop::AddEvent(EventLoopEvent *event) {
473 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700474 DCHECK(event->generation() == 0);
475 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800476 events_.push_back(event);
477 std::push_heap(events_.begin(), events_.end(), CompareEvents);
478}
479
480void EventLoop::RemoveEvent(EventLoopEvent *event) {
481 auto e = std::find(events_.begin(), events_.end(), event);
482 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700483 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800484 events_.erase(e);
485 std::make_heap(events_.begin(), events_.end(), CompareEvents);
486 event->Invalidate();
487 }
488}
489
490EventLoopEvent *EventLoop::PopEvent() {
491 EventLoopEvent *result = events_.front();
492 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
493 events_.pop_back();
494 result->Invalidate();
495 return result;
496}
497
Austin Schuh39788ff2019-12-01 18:22:57 -0800498void WatcherState::set_timing_report(timing::Watcher *watcher) {
499 CHECK_NOTNULL(watcher);
500 watcher_ = watcher;
501 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
502 handler_time_.set_statistic(watcher->mutable_handler_time());
503}
504
505void WatcherState::ResetReport() {
506 wakeup_latency_.Reset();
507 handler_time_.Reset();
508 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800509}
510
511} // namespace aos