blob: 2ffad3b465d3e53bd67ac48c2c52eb0038f733ef [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08006#include "glog/logging.h"
7
Austin Schuh39788ff2019-12-01 18:22:57 -08008DEFINE_bool(timing_reports, true, "Publish timing reports.");
9DEFINE_int32(timing_report_ms, 1000,
10 "Period in milliseconds to publish timing reports at.");
11
Austin Schuh54cf95f2019-11-29 13:14:18 -080012namespace aos {
13
Austin Schuh39788ff2019-12-01 18:22:57 -080014RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
15 : event_loop_(event_loop),
16 channel_(channel),
17 timing_(event_loop_->ChannelIndex(channel)) {
18 event_loop_->NewSender(this);
19}
20
21RawSender::~RawSender() { event_loop_->DeleteSender(this); }
22
23RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
24 : event_loop_(event_loop),
25 channel_(channel),
26 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080027 context_.monotonic_event_time = monotonic_clock::min_time;
28 context_.monotonic_remote_time = monotonic_clock::min_time;
29 context_.realtime_event_time = realtime_clock::min_time;
30 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080031 context_.queue_index = 0xffffffff;
32 context_.size = 0;
33 context_.data = nullptr;
34 event_loop_->NewFetcher(this);
35}
36
37RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
38
39TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
40 : event_loop_(event_loop), fn_(std::move(fn)) {}
41
42TimerHandler::~TimerHandler() {}
43
44PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
45 std::function<void(int)> fn,
46 const monotonic_clock::duration interval,
47 const monotonic_clock::duration offset)
48 : event_loop_(event_loop),
49 fn_(std::move(fn)),
50 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
51 event_loop_->OnRun([this]() {
52 const monotonic_clock::time_point monotonic_now =
53 event_loop_->monotonic_now();
54 phased_loop_.Reset(monotonic_now);
55 Reschedule(
56 [this](monotonic_clock::time_point sleep_time) {
57 Schedule(sleep_time);
58 },
59 monotonic_now);
60 // The first time, we'll double count. Reschedule here will count cycles
61 // elapsed before now, and then the reschedule before runing the handler
62 // will count the time that elapsed then. So clear the count here.
63 cycles_elapsed_ = 0;
64 });
65}
66
67PhasedLoopHandler::~PhasedLoopHandler() {}
68
Tyler Chatow67ddb032020-01-12 14:30:04 -080069EventLoop::EventLoop(const Configuration *configuration)
70 : timing_report_(flatbuffers::DetachedBuffer()),
71 configuration_(configuration) {
72 logging::Init();
73}
74
Austin Schuh39788ff2019-12-01 18:22:57 -080075EventLoop::~EventLoop() {
76 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -080077 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -080078}
79
80int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -080081 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -080082}
83
Brian Silverman5120afb2020-01-31 17:44:35 -080084WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
85 const int channel_index = ChannelIndex(channel);
86 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
87 if (watcher->channel_index() == channel_index) {
88 return watcher.get();
89 }
90 }
91 LOG(FATAL) << "No watcher found for channel";
92}
93
Austin Schuh39788ff2019-12-01 18:22:57 -080094void EventLoop::NewSender(RawSender *sender) {
95 senders_.emplace_back(sender);
96 UpdateTimingReport();
97}
98void EventLoop::DeleteSender(RawSender *sender) {
99 CHECK(!is_running());
100 auto s = std::find(senders_.begin(), senders_.end(), sender);
101 CHECK(s != senders_.end()) << ": Sender not in senders list";
102 senders_.erase(s);
103 UpdateTimingReport();
104}
105
106TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
107 timers_.emplace_back(std::move(timer));
108 UpdateTimingReport();
109 return timers_.back().get();
110}
111
112PhasedLoopHandler *EventLoop::NewPhasedLoop(
113 std::unique_ptr<PhasedLoopHandler> phased_loop) {
114 phased_loops_.emplace_back(std::move(phased_loop));
115 UpdateTimingReport();
116 return phased_loops_.back().get();
117}
118
119void EventLoop::NewFetcher(RawFetcher *fetcher) {
120 fetchers_.emplace_back(fetcher);
121 UpdateTimingReport();
122}
123
124void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
125 CHECK(!is_running());
126 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
127 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
128 fetchers_.erase(f);
129 UpdateTimingReport();
130}
131
132WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
133 watchers_.emplace_back(std::move(watcher));
134
135 UpdateTimingReport();
136
137 return watchers_.back().get();
138}
139
Brian Silverman0fc69932020-01-24 21:54:02 -0800140void EventLoop::TakeWatcher(const Channel *channel) {
141 CHECK(!is_running()) << ": Cannot add new objects while running.";
142 ChannelIndex(channel);
143
144 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800145 << ": " << configuration::CleanedChannelToString(channel)
146 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800147
148 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800149 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800150 << " is already being used.";
151
152 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800153 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800154 << " is not able to be watched on this node. Check your "
155 "configuration.";
156 }
157}
158
159void EventLoop::TakeSender(const Channel *channel) {
160 CHECK(!is_running()) << ": Cannot add new objects while running.";
161 ChannelIndex(channel);
162
163 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800164 << ": Channel " << configuration::CleanedChannelToString(channel)
165 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800166
167 // We don't care if this is a duplicate.
168 taken_senders_.insert(channel);
169}
170
Austin Schuh39788ff2019-12-01 18:22:57 -0800171void EventLoop::SendTimingReport() {
172 // We need to do a fancy dance here to get all the accounting to work right.
173 // We want to copy the memory here, but then send after resetting. Otherwise
174 // the send for the timing report won't be counted in the timing report.
175 //
176 // Also, flatbuffers build from the back end. So place this at the back end
177 // of the buffer. We only have to care because we are using this in a very
178 // raw fashion.
179 CHECK_LE(timing_report_.size(), timing_report_sender_->size())
180 << ": Timing report bigger than the sender size.";
181 std::copy(timing_report_.data(),
182 timing_report_.data() + timing_report_.size(),
183 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
184 timing_report_sender_->size() - timing_report_.size());
185
186 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
187 timer->timing_.ResetTimingReport();
188 }
189 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
190 watcher->ResetReport();
191 }
192 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
193 phased_loop->timing_.ResetTimingReport();
194 }
195 for (RawSender *sender : senders_) {
196 sender->timing_.ResetTimingReport();
197 }
198 for (RawFetcher *fetcher : fetchers_) {
199 fetcher->timing_.ResetTimingReport();
200 }
201 timing_report_sender_->Send(timing_report_.size());
202}
203
204void EventLoop::UpdateTimingReport() {
205 // We need to support senders and fetchers changing while we are setting up
206 // the event loop. Otherwise we can't fetch or send until the loop runs. This
207 // means that on each change, we need to redo all this work. This makes setup
208 // more expensive, but not by all that much on a modern processor.
209
210 // Now, build up a report with everything pre-filled out.
211 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800212 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800213
214 // Pre-fill in the defaults for timers.
215 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
216 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
217 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
218 timing::CreateStatistic(fbb);
219 flatbuffers::Offset<timing::Statistic> handler_time_offset =
220 timing::CreateStatistic(fbb);
221 flatbuffers::Offset<flatbuffers::String> name_offset;
222 if (timer->name().size() != 0) {
223 name_offset = fbb.CreateString(timer->name());
224 }
225
226 timing::Timer::Builder timer_builder(fbb);
227
228 if (timer->name().size() != 0) {
229 timer_builder.add_name(name_offset);
230 }
231 timer_builder.add_wakeup_latency(wakeup_latency_offset);
232 timer_builder.add_handler_time(handler_time_offset);
233 timer_builder.add_count(0);
234 timer_offsets.emplace_back(timer_builder.Finish());
235 }
236
237 // Pre-fill in the defaults for phased_loops.
238 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
239 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
240 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
241 timing::CreateStatistic(fbb);
242 flatbuffers::Offset<timing::Statistic> handler_time_offset =
243 timing::CreateStatistic(fbb);
244 flatbuffers::Offset<flatbuffers::String> name_offset;
245 if (phased_loop->name().size() != 0) {
246 name_offset = fbb.CreateString(phased_loop->name());
247 }
248
249 timing::Timer::Builder timer_builder(fbb);
250
251 if (phased_loop->name().size() != 0) {
252 timer_builder.add_name(name_offset);
253 }
254 timer_builder.add_wakeup_latency(wakeup_latency_offset);
255 timer_builder.add_handler_time(handler_time_offset);
256 timer_builder.add_count(0);
257 phased_loop_offsets.emplace_back(timer_builder.Finish());
258 }
259
260 // Pre-fill in the defaults for watchers.
261 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
262 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
263 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
264 timing::CreateStatistic(fbb);
265 flatbuffers::Offset<timing::Statistic> handler_time_offset =
266 timing::CreateStatistic(fbb);
267
268 timing::Watcher::Builder watcher_builder(fbb);
269
270 watcher_builder.add_channel_index(watcher->channel_index());
271 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
272 watcher_builder.add_handler_time(handler_time_offset);
273 watcher_builder.add_count(0);
274 watcher_offsets.emplace_back(watcher_builder.Finish());
275 }
276
277 // Pre-fill in the defaults for senders.
278 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
279 for (const RawSender *sender : senders_) {
280 flatbuffers::Offset<timing::Statistic> size_offset =
281 timing::CreateStatistic(fbb);
282
283 timing::Sender::Builder sender_builder(fbb);
284
285 sender_builder.add_channel_index(sender->timing_.channel_index);
286 sender_builder.add_size(size_offset);
287 sender_builder.add_count(0);
288 sender_offsets.emplace_back(sender_builder.Finish());
289 }
290
291 // Pre-fill in the defaults for fetchers.
292 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
293 for (RawFetcher *fetcher : fetchers_) {
294 flatbuffers::Offset<timing::Statistic> latency_offset =
295 timing::CreateStatistic(fbb);
296
297 timing::Fetcher::Builder fetcher_builder(fbb);
298
299 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
300 fetcher_builder.add_count(0);
301 fetcher_builder.add_latency(latency_offset);
302 fetcher_offsets.emplace_back(fetcher_builder.Finish());
303 }
304
305 // Then build the final report.
306 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
307 timers_offset;
308 if (timer_offsets.size() > 0) {
309 timers_offset = fbb.CreateVector(timer_offsets);
310 }
311
312 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
313 phased_loops_offset;
314 if (phased_loop_offsets.size() > 0) {
315 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
316 }
317
318 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
319 watchers_offset;
320 if (watcher_offsets.size() > 0) {
321 watchers_offset = fbb.CreateVector(watcher_offsets);
322 }
323
324 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
325 senders_offset;
326 if (sender_offsets.size() > 0) {
327 senders_offset = fbb.CreateVector(sender_offsets);
328 }
329
330 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
331 fetchers_offset;
332 if (fetcher_offsets.size() > 0) {
333 fetchers_offset = fbb.CreateVector(fetcher_offsets);
334 }
335
336 flatbuffers::Offset<flatbuffers::String> name_offset =
337 fbb.CreateString(name());
338
339 timing::Report::Builder report_builder(fbb);
340 report_builder.add_name(name_offset);
341 report_builder.add_pid(GetTid());
342 if (timer_offsets.size() > 0) {
343 report_builder.add_timers(timers_offset);
344 }
345 if (phased_loop_offsets.size() > 0) {
346 report_builder.add_phased_loops(phased_loops_offset);
347 }
348 if (watcher_offsets.size() > 0) {
349 report_builder.add_watchers(watchers_offset);
350 }
351 if (sender_offsets.size() > 0) {
352 report_builder.add_senders(senders_offset);
353 }
354 if (fetcher_offsets.size() > 0) {
355 report_builder.add_fetchers(fetchers_offset);
356 }
357 fbb.Finish(report_builder.Finish());
358
359 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
360
361 // Now that the pointers are stable, pass them to the timers and watchers to
362 // be updated.
363 for (size_t i = 0; i < timers_.size(); ++i) {
364 timers_[i]->timing_.set_timing_report(
365 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
366 i));
367 }
368
369 for (size_t i = 0; i < phased_loops_.size(); ++i) {
370 phased_loops_[i]->timing_.set_timing_report(
371 timing_report_.mutable_message()
372 ->mutable_phased_loops()
373 ->GetMutableObject(i));
374 }
375
376 for (size_t i = 0; i < watchers_.size(); ++i) {
377 watchers_[i]->set_timing_report(
378 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
379 i));
380 }
381
382 for (size_t i = 0; i < senders_.size(); ++i) {
383 senders_[i]->timing_.set_timing_report(
384 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
385 i));
386 }
387
388 for (size_t i = 0; i < fetchers_.size(); ++i) {
389 fetchers_[i]->timing_.set_timing_report(
390 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
391 i));
392 }
393}
394
395void EventLoop::MaybeScheduleTimingReports() {
396 if (FLAGS_timing_reports && !skip_timing_report_) {
397 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
398 // Make a raw sender for the report.
399 const Channel *channel = configuration::GetChannel(
400 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800401 name(), node());
402
403 // Since we are using a RawSender, validity isn't checked. So check it
404 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800405 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
406 LOG(FATAL) << "Channel { \"name\": \"/aos"
407 << channel->name()->string_view() << "\", \"type\": \""
408 << channel->type()->string_view()
409 << "\" } is not able to be sent on this node. Check your "
410 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800411 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800412 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
413 << timing::Report::GetFullyQualifiedName()
414 << "\" } not found in config.";
415 timing_report_sender_ = MakeRawSender(channel);
416
417 // Register a handler which sends the report out by copying the raw data
418 // from the prebuilt and subsequently modified report.
419 TimerHandler *timing_reports_timer =
420 AddTimer([this]() { SendTimingReport(); });
421
422 // Set it up to send once per second.
423 timing_reports_timer->set_name("timing_reports");
424 OnRun([this, timing_reports_timer]() {
425 timing_reports_timer->Setup(
426 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
427 std::chrono::milliseconds(FLAGS_timing_report_ms));
428 });
429
430 UpdateTimingReport();
431 }
432}
433
Austin Schuh7d87b672019-12-01 20:23:49 -0800434void EventLoop::ReserveEvents() {
435 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
436}
437
438namespace {
439bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
440 return first->event_time() > second->event_time();
441}
442} // namespace
443
444void EventLoop::AddEvent(EventLoopEvent *event) {
445 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
446 events_.push_back(event);
447 std::push_heap(events_.begin(), events_.end(), CompareEvents);
448}
449
450void EventLoop::RemoveEvent(EventLoopEvent *event) {
451 auto e = std::find(events_.begin(), events_.end(), event);
452 if (e != events_.end()) {
453 events_.erase(e);
454 std::make_heap(events_.begin(), events_.end(), CompareEvents);
455 event->Invalidate();
456 }
457}
458
459EventLoopEvent *EventLoop::PopEvent() {
460 EventLoopEvent *result = events_.front();
461 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
462 events_.pop_back();
463 result->Invalidate();
464 return result;
465}
466
Austin Schuh39788ff2019-12-01 18:22:57 -0800467void WatcherState::set_timing_report(timing::Watcher *watcher) {
468 CHECK_NOTNULL(watcher);
469 watcher_ = watcher;
470 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
471 handler_time_.set_statistic(watcher->mutable_handler_time());
472}
473
474void WatcherState::ResetReport() {
475 wakeup_latency_.Reset();
476 handler_time_.Reset();
477 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800478}
479
480} // namespace aos