blob: da89d9b5f08588ee0cd4b40218e16a4ada27ab74 [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08006#include "glog/logging.h"
7
Austin Schuh39788ff2019-12-01 18:22:57 -08008DEFINE_bool(timing_reports, true, "Publish timing reports.");
9DEFINE_int32(timing_report_ms, 1000,
10 "Period in milliseconds to publish timing reports at.");
11
Austin Schuh54cf95f2019-11-29 13:14:18 -080012namespace aos {
13
Austin Schuh39788ff2019-12-01 18:22:57 -080014RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
15 : event_loop_(event_loop),
16 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050017 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080018 timing_(event_loop_->ChannelIndex(channel)) {
19 event_loop_->NewSender(this);
20}
21
22RawSender::~RawSender() { event_loop_->DeleteSender(this); }
23
24RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
25 : event_loop_(event_loop),
26 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050027 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080028 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080029 context_.monotonic_event_time = monotonic_clock::min_time;
30 context_.monotonic_remote_time = monotonic_clock::min_time;
31 context_.realtime_event_time = realtime_clock::min_time;
32 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080033 context_.queue_index = 0xffffffff;
34 context_.size = 0;
35 context_.data = nullptr;
36 event_loop_->NewFetcher(this);
37}
38
39RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
40
41TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
42 : event_loop_(event_loop), fn_(std::move(fn)) {}
43
44TimerHandler::~TimerHandler() {}
45
46PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
47 std::function<void(int)> fn,
48 const monotonic_clock::duration interval,
49 const monotonic_clock::duration offset)
50 : event_loop_(event_loop),
51 fn_(std::move(fn)),
52 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
53 event_loop_->OnRun([this]() {
54 const monotonic_clock::time_point monotonic_now =
55 event_loop_->monotonic_now();
56 phased_loop_.Reset(monotonic_now);
57 Reschedule(
58 [this](monotonic_clock::time_point sleep_time) {
59 Schedule(sleep_time);
60 },
61 monotonic_now);
62 // The first time, we'll double count. Reschedule here will count cycles
63 // elapsed before now, and then the reschedule before runing the handler
64 // will count the time that elapsed then. So clear the count here.
65 cycles_elapsed_ = 0;
66 });
67}
68
69PhasedLoopHandler::~PhasedLoopHandler() {}
70
Tyler Chatow67ddb032020-01-12 14:30:04 -080071EventLoop::EventLoop(const Configuration *configuration)
72 : timing_report_(flatbuffers::DetachedBuffer()),
73 configuration_(configuration) {
74 logging::Init();
75}
76
Austin Schuh39788ff2019-12-01 18:22:57 -080077EventLoop::~EventLoop() {
78 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -080079 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -080080}
81
82int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -080083 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -080084}
85
Brian Silverman5120afb2020-01-31 17:44:35 -080086WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
87 const int channel_index = ChannelIndex(channel);
88 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
89 if (watcher->channel_index() == channel_index) {
90 return watcher.get();
91 }
92 }
93 LOG(FATAL) << "No watcher found for channel";
94}
95
Austin Schuh39788ff2019-12-01 18:22:57 -080096void EventLoop::NewSender(RawSender *sender) {
97 senders_.emplace_back(sender);
98 UpdateTimingReport();
99}
100void EventLoop::DeleteSender(RawSender *sender) {
101 CHECK(!is_running());
102 auto s = std::find(senders_.begin(), senders_.end(), sender);
103 CHECK(s != senders_.end()) << ": Sender not in senders list";
104 senders_.erase(s);
105 UpdateTimingReport();
106}
107
108TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
109 timers_.emplace_back(std::move(timer));
110 UpdateTimingReport();
111 return timers_.back().get();
112}
113
114PhasedLoopHandler *EventLoop::NewPhasedLoop(
115 std::unique_ptr<PhasedLoopHandler> phased_loop) {
116 phased_loops_.emplace_back(std::move(phased_loop));
117 UpdateTimingReport();
118 return phased_loops_.back().get();
119}
120
121void EventLoop::NewFetcher(RawFetcher *fetcher) {
122 fetchers_.emplace_back(fetcher);
123 UpdateTimingReport();
124}
125
126void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
127 CHECK(!is_running());
128 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
129 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
130 fetchers_.erase(f);
131 UpdateTimingReport();
132}
133
134WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
135 watchers_.emplace_back(std::move(watcher));
136
137 UpdateTimingReport();
138
139 return watchers_.back().get();
140}
141
Brian Silverman0fc69932020-01-24 21:54:02 -0800142void EventLoop::TakeWatcher(const Channel *channel) {
143 CHECK(!is_running()) << ": Cannot add new objects while running.";
144 ChannelIndex(channel);
145
146 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800147 << ": " << configuration::CleanedChannelToString(channel)
148 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800149
150 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800151 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800152 << " is already being used.";
153
154 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800155 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800156 << " is not able to be watched on this node. Check your "
157 "configuration.";
158 }
159}
160
161void EventLoop::TakeSender(const Channel *channel) {
162 CHECK(!is_running()) << ": Cannot add new objects while running.";
163 ChannelIndex(channel);
164
165 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800166 << ": Channel " << configuration::CleanedChannelToString(channel)
167 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800168
169 // We don't care if this is a duplicate.
170 taken_senders_.insert(channel);
171}
172
Austin Schuh39788ff2019-12-01 18:22:57 -0800173void EventLoop::SendTimingReport() {
174 // We need to do a fancy dance here to get all the accounting to work right.
175 // We want to copy the memory here, but then send after resetting. Otherwise
176 // the send for the timing report won't be counted in the timing report.
177 //
178 // Also, flatbuffers build from the back end. So place this at the back end
179 // of the buffer. We only have to care because we are using this in a very
180 // raw fashion.
181 CHECK_LE(timing_report_.size(), timing_report_sender_->size())
182 << ": Timing report bigger than the sender size.";
183 std::copy(timing_report_.data(),
184 timing_report_.data() + timing_report_.size(),
185 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
186 timing_report_sender_->size() - timing_report_.size());
187
188 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
189 timer->timing_.ResetTimingReport();
190 }
191 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
192 watcher->ResetReport();
193 }
194 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
195 phased_loop->timing_.ResetTimingReport();
196 }
197 for (RawSender *sender : senders_) {
198 sender->timing_.ResetTimingReport();
199 }
200 for (RawFetcher *fetcher : fetchers_) {
201 fetcher->timing_.ResetTimingReport();
202 }
203 timing_report_sender_->Send(timing_report_.size());
204}
205
206void EventLoop::UpdateTimingReport() {
207 // We need to support senders and fetchers changing while we are setting up
208 // the event loop. Otherwise we can't fetch or send until the loop runs. This
209 // means that on each change, we need to redo all this work. This makes setup
210 // more expensive, but not by all that much on a modern processor.
211
212 // Now, build up a report with everything pre-filled out.
213 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800214 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800215
216 // Pre-fill in the defaults for timers.
217 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
218 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
219 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
220 timing::CreateStatistic(fbb);
221 flatbuffers::Offset<timing::Statistic> handler_time_offset =
222 timing::CreateStatistic(fbb);
223 flatbuffers::Offset<flatbuffers::String> name_offset;
224 if (timer->name().size() != 0) {
225 name_offset = fbb.CreateString(timer->name());
226 }
227
228 timing::Timer::Builder timer_builder(fbb);
229
230 if (timer->name().size() != 0) {
231 timer_builder.add_name(name_offset);
232 }
233 timer_builder.add_wakeup_latency(wakeup_latency_offset);
234 timer_builder.add_handler_time(handler_time_offset);
235 timer_builder.add_count(0);
236 timer_offsets.emplace_back(timer_builder.Finish());
237 }
238
239 // Pre-fill in the defaults for phased_loops.
240 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
241 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
242 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
243 timing::CreateStatistic(fbb);
244 flatbuffers::Offset<timing::Statistic> handler_time_offset =
245 timing::CreateStatistic(fbb);
246 flatbuffers::Offset<flatbuffers::String> name_offset;
247 if (phased_loop->name().size() != 0) {
248 name_offset = fbb.CreateString(phased_loop->name());
249 }
250
251 timing::Timer::Builder timer_builder(fbb);
252
253 if (phased_loop->name().size() != 0) {
254 timer_builder.add_name(name_offset);
255 }
256 timer_builder.add_wakeup_latency(wakeup_latency_offset);
257 timer_builder.add_handler_time(handler_time_offset);
258 timer_builder.add_count(0);
259 phased_loop_offsets.emplace_back(timer_builder.Finish());
260 }
261
262 // Pre-fill in the defaults for watchers.
263 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
264 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
265 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
266 timing::CreateStatistic(fbb);
267 flatbuffers::Offset<timing::Statistic> handler_time_offset =
268 timing::CreateStatistic(fbb);
269
270 timing::Watcher::Builder watcher_builder(fbb);
271
272 watcher_builder.add_channel_index(watcher->channel_index());
273 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
274 watcher_builder.add_handler_time(handler_time_offset);
275 watcher_builder.add_count(0);
276 watcher_offsets.emplace_back(watcher_builder.Finish());
277 }
278
279 // Pre-fill in the defaults for senders.
280 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
281 for (const RawSender *sender : senders_) {
282 flatbuffers::Offset<timing::Statistic> size_offset =
283 timing::CreateStatistic(fbb);
284
285 timing::Sender::Builder sender_builder(fbb);
286
287 sender_builder.add_channel_index(sender->timing_.channel_index);
288 sender_builder.add_size(size_offset);
289 sender_builder.add_count(0);
290 sender_offsets.emplace_back(sender_builder.Finish());
291 }
292
293 // Pre-fill in the defaults for fetchers.
294 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
295 for (RawFetcher *fetcher : fetchers_) {
296 flatbuffers::Offset<timing::Statistic> latency_offset =
297 timing::CreateStatistic(fbb);
298
299 timing::Fetcher::Builder fetcher_builder(fbb);
300
301 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
302 fetcher_builder.add_count(0);
303 fetcher_builder.add_latency(latency_offset);
304 fetcher_offsets.emplace_back(fetcher_builder.Finish());
305 }
306
307 // Then build the final report.
308 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
309 timers_offset;
310 if (timer_offsets.size() > 0) {
311 timers_offset = fbb.CreateVector(timer_offsets);
312 }
313
314 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
315 phased_loops_offset;
316 if (phased_loop_offsets.size() > 0) {
317 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
318 }
319
320 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
321 watchers_offset;
322 if (watcher_offsets.size() > 0) {
323 watchers_offset = fbb.CreateVector(watcher_offsets);
324 }
325
326 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
327 senders_offset;
328 if (sender_offsets.size() > 0) {
329 senders_offset = fbb.CreateVector(sender_offsets);
330 }
331
332 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
333 fetchers_offset;
334 if (fetcher_offsets.size() > 0) {
335 fetchers_offset = fbb.CreateVector(fetcher_offsets);
336 }
337
338 flatbuffers::Offset<flatbuffers::String> name_offset =
339 fbb.CreateString(name());
340
341 timing::Report::Builder report_builder(fbb);
342 report_builder.add_name(name_offset);
343 report_builder.add_pid(GetTid());
344 if (timer_offsets.size() > 0) {
345 report_builder.add_timers(timers_offset);
346 }
347 if (phased_loop_offsets.size() > 0) {
348 report_builder.add_phased_loops(phased_loops_offset);
349 }
350 if (watcher_offsets.size() > 0) {
351 report_builder.add_watchers(watchers_offset);
352 }
353 if (sender_offsets.size() > 0) {
354 report_builder.add_senders(senders_offset);
355 }
356 if (fetcher_offsets.size() > 0) {
357 report_builder.add_fetchers(fetchers_offset);
358 }
359 fbb.Finish(report_builder.Finish());
360
361 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
362
363 // Now that the pointers are stable, pass them to the timers and watchers to
364 // be updated.
365 for (size_t i = 0; i < timers_.size(); ++i) {
366 timers_[i]->timing_.set_timing_report(
367 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
368 i));
369 }
370
371 for (size_t i = 0; i < phased_loops_.size(); ++i) {
372 phased_loops_[i]->timing_.set_timing_report(
373 timing_report_.mutable_message()
374 ->mutable_phased_loops()
375 ->GetMutableObject(i));
376 }
377
378 for (size_t i = 0; i < watchers_.size(); ++i) {
379 watchers_[i]->set_timing_report(
380 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
381 i));
382 }
383
384 for (size_t i = 0; i < senders_.size(); ++i) {
385 senders_[i]->timing_.set_timing_report(
386 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
387 i));
388 }
389
390 for (size_t i = 0; i < fetchers_.size(); ++i) {
391 fetchers_[i]->timing_.set_timing_report(
392 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
393 i));
394 }
395}
396
397void EventLoop::MaybeScheduleTimingReports() {
398 if (FLAGS_timing_reports && !skip_timing_report_) {
399 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
400 // Make a raw sender for the report.
401 const Channel *channel = configuration::GetChannel(
402 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800403 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700404 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
405 "\"type\": \"aos.timing.Report\"} on node "
406 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800407
408 // Since we are using a RawSender, validity isn't checked. So check it
409 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800410 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
411 LOG(FATAL) << "Channel { \"name\": \"/aos"
412 << channel->name()->string_view() << "\", \"type\": \""
413 << channel->type()->string_view()
414 << "\" } is not able to be sent on this node. Check your "
415 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800416 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800417 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
418 << timing::Report::GetFullyQualifiedName()
419 << "\" } not found in config.";
420 timing_report_sender_ = MakeRawSender(channel);
421
422 // Register a handler which sends the report out by copying the raw data
423 // from the prebuilt and subsequently modified report.
424 TimerHandler *timing_reports_timer =
425 AddTimer([this]() { SendTimingReport(); });
426
427 // Set it up to send once per second.
428 timing_reports_timer->set_name("timing_reports");
429 OnRun([this, timing_reports_timer]() {
430 timing_reports_timer->Setup(
431 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
432 std::chrono::milliseconds(FLAGS_timing_report_ms));
433 });
434
435 UpdateTimingReport();
436 }
437}
438
Austin Schuh7d87b672019-12-01 20:23:49 -0800439void EventLoop::ReserveEvents() {
440 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
441}
442
443namespace {
444bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
445 return first->event_time() > second->event_time();
446}
447} // namespace
448
449void EventLoop::AddEvent(EventLoopEvent *event) {
450 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
451 events_.push_back(event);
452 std::push_heap(events_.begin(), events_.end(), CompareEvents);
453}
454
455void EventLoop::RemoveEvent(EventLoopEvent *event) {
456 auto e = std::find(events_.begin(), events_.end(), event);
457 if (e != events_.end()) {
458 events_.erase(e);
459 std::make_heap(events_.begin(), events_.end(), CompareEvents);
460 event->Invalidate();
461 }
462}
463
464EventLoopEvent *EventLoop::PopEvent() {
465 EventLoopEvent *result = events_.front();
466 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
467 events_.pop_back();
468 result->Invalidate();
469 return result;
470}
471
Austin Schuh39788ff2019-12-01 18:22:57 -0800472void WatcherState::set_timing_report(timing::Watcher *watcher) {
473 CHECK_NOTNULL(watcher);
474 watcher_ = watcher;
475 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
476 handler_time_.set_statistic(watcher->mutable_handler_time());
477}
478
479void WatcherState::ResetReport() {
480 wakeup_latency_.Reset();
481 handler_time_.Reset();
482 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800483}
484
485} // namespace aos