blob: 76b83c9981d986e9b5d0c2d084d2bdd1142a64ae [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
5#include "glog/logging.h"
6
Austin Schuh39788ff2019-12-01 18:22:57 -08007DEFINE_bool(timing_reports, true, "Publish timing reports.");
8DEFINE_int32(timing_report_ms, 1000,
9 "Period in milliseconds to publish timing reports at.");
10
Austin Schuh54cf95f2019-11-29 13:14:18 -080011namespace aos {
12
Austin Schuh39788ff2019-12-01 18:22:57 -080013RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
14 : event_loop_(event_loop),
15 channel_(channel),
16 timing_(event_loop_->ChannelIndex(channel)) {
17 event_loop_->NewSender(this);
18}
19
20RawSender::~RawSender() { event_loop_->DeleteSender(this); }
21
22RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
23 : event_loop_(event_loop),
24 channel_(channel),
25 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080026 context_.monotonic_event_time = monotonic_clock::min_time;
27 context_.monotonic_remote_time = monotonic_clock::min_time;
28 context_.realtime_event_time = realtime_clock::min_time;
29 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080030 context_.queue_index = 0xffffffff;
31 context_.size = 0;
32 context_.data = nullptr;
33 event_loop_->NewFetcher(this);
34}
35
36RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
37
38TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
39 : event_loop_(event_loop), fn_(std::move(fn)) {}
40
41TimerHandler::~TimerHandler() {}
42
43PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
44 std::function<void(int)> fn,
45 const monotonic_clock::duration interval,
46 const monotonic_clock::duration offset)
47 : event_loop_(event_loop),
48 fn_(std::move(fn)),
49 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
50 event_loop_->OnRun([this]() {
51 const monotonic_clock::time_point monotonic_now =
52 event_loop_->monotonic_now();
53 phased_loop_.Reset(monotonic_now);
54 Reschedule(
55 [this](monotonic_clock::time_point sleep_time) {
56 Schedule(sleep_time);
57 },
58 monotonic_now);
59 // The first time, we'll double count. Reschedule here will count cycles
60 // elapsed before now, and then the reschedule before runing the handler
61 // will count the time that elapsed then. So clear the count here.
62 cycles_elapsed_ = 0;
63 });
64}
65
66PhasedLoopHandler::~PhasedLoopHandler() {}
67
68EventLoop::~EventLoop() {
69 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -080070 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -080071}
72
73int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -080074 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -080075}
76
Brian Silverman5120afb2020-01-31 17:44:35 -080077WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
78 const int channel_index = ChannelIndex(channel);
79 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
80 if (watcher->channel_index() == channel_index) {
81 return watcher.get();
82 }
83 }
84 LOG(FATAL) << "No watcher found for channel";
85}
86
Austin Schuh39788ff2019-12-01 18:22:57 -080087void EventLoop::NewSender(RawSender *sender) {
88 senders_.emplace_back(sender);
89 UpdateTimingReport();
90}
91void EventLoop::DeleteSender(RawSender *sender) {
92 CHECK(!is_running());
93 auto s = std::find(senders_.begin(), senders_.end(), sender);
94 CHECK(s != senders_.end()) << ": Sender not in senders list";
95 senders_.erase(s);
96 UpdateTimingReport();
97}
98
99TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
100 timers_.emplace_back(std::move(timer));
101 UpdateTimingReport();
102 return timers_.back().get();
103}
104
105PhasedLoopHandler *EventLoop::NewPhasedLoop(
106 std::unique_ptr<PhasedLoopHandler> phased_loop) {
107 phased_loops_.emplace_back(std::move(phased_loop));
108 UpdateTimingReport();
109 return phased_loops_.back().get();
110}
111
112void EventLoop::NewFetcher(RawFetcher *fetcher) {
113 fetchers_.emplace_back(fetcher);
114 UpdateTimingReport();
115}
116
117void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
118 CHECK(!is_running());
119 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
120 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
121 fetchers_.erase(f);
122 UpdateTimingReport();
123}
124
125WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
126 watchers_.emplace_back(std::move(watcher));
127
128 UpdateTimingReport();
129
130 return watchers_.back().get();
131}
132
Brian Silverman0fc69932020-01-24 21:54:02 -0800133void EventLoop::TakeWatcher(const Channel *channel) {
134 CHECK(!is_running()) << ": Cannot add new objects while running.";
135 ChannelIndex(channel);
136
137 CHECK(taken_senders_.find(channel) == taken_senders_.end())
138 << ": " << FlatbufferToJson(channel) << " is already being used.";
139
140 auto result = taken_watchers_.insert(channel);
141 CHECK(result.second) << ": " << FlatbufferToJson(channel)
142 << " is already being used.";
143
144 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
145 LOG(FATAL) << ": " << FlatbufferToJson(channel)
146 << " is not able to be watched on this node. Check your "
147 "configuration.";
148 }
149}
150
151void EventLoop::TakeSender(const Channel *channel) {
152 CHECK(!is_running()) << ": Cannot add new objects while running.";
153 ChannelIndex(channel);
154
155 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
156 << ": Channel " << FlatbufferToJson(channel) << " is already being used.";
157
158 // We don't care if this is a duplicate.
159 taken_senders_.insert(channel);
160}
161
Austin Schuh39788ff2019-12-01 18:22:57 -0800162void EventLoop::SendTimingReport() {
163 // We need to do a fancy dance here to get all the accounting to work right.
164 // We want to copy the memory here, but then send after resetting. Otherwise
165 // the send for the timing report won't be counted in the timing report.
166 //
167 // Also, flatbuffers build from the back end. So place this at the back end
168 // of the buffer. We only have to care because we are using this in a very
169 // raw fashion.
170 CHECK_LE(timing_report_.size(), timing_report_sender_->size())
171 << ": Timing report bigger than the sender size.";
172 std::copy(timing_report_.data(),
173 timing_report_.data() + timing_report_.size(),
174 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
175 timing_report_sender_->size() - timing_report_.size());
176
177 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
178 timer->timing_.ResetTimingReport();
179 }
180 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
181 watcher->ResetReport();
182 }
183 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
184 phased_loop->timing_.ResetTimingReport();
185 }
186 for (RawSender *sender : senders_) {
187 sender->timing_.ResetTimingReport();
188 }
189 for (RawFetcher *fetcher : fetchers_) {
190 fetcher->timing_.ResetTimingReport();
191 }
192 timing_report_sender_->Send(timing_report_.size());
193}
194
195void EventLoop::UpdateTimingReport() {
196 // We need to support senders and fetchers changing while we are setting up
197 // the event loop. Otherwise we can't fetch or send until the loop runs. This
198 // means that on each change, we need to redo all this work. This makes setup
199 // more expensive, but not by all that much on a modern processor.
200
201 // Now, build up a report with everything pre-filled out.
202 flatbuffers::FlatBufferBuilder fbb;
203 fbb.ForceDefaults(1);
204
205 // Pre-fill in the defaults for timers.
206 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
207 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
208 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
209 timing::CreateStatistic(fbb);
210 flatbuffers::Offset<timing::Statistic> handler_time_offset =
211 timing::CreateStatistic(fbb);
212 flatbuffers::Offset<flatbuffers::String> name_offset;
213 if (timer->name().size() != 0) {
214 name_offset = fbb.CreateString(timer->name());
215 }
216
217 timing::Timer::Builder timer_builder(fbb);
218
219 if (timer->name().size() != 0) {
220 timer_builder.add_name(name_offset);
221 }
222 timer_builder.add_wakeup_latency(wakeup_latency_offset);
223 timer_builder.add_handler_time(handler_time_offset);
224 timer_builder.add_count(0);
225 timer_offsets.emplace_back(timer_builder.Finish());
226 }
227
228 // Pre-fill in the defaults for phased_loops.
229 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
230 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
231 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
232 timing::CreateStatistic(fbb);
233 flatbuffers::Offset<timing::Statistic> handler_time_offset =
234 timing::CreateStatistic(fbb);
235 flatbuffers::Offset<flatbuffers::String> name_offset;
236 if (phased_loop->name().size() != 0) {
237 name_offset = fbb.CreateString(phased_loop->name());
238 }
239
240 timing::Timer::Builder timer_builder(fbb);
241
242 if (phased_loop->name().size() != 0) {
243 timer_builder.add_name(name_offset);
244 }
245 timer_builder.add_wakeup_latency(wakeup_latency_offset);
246 timer_builder.add_handler_time(handler_time_offset);
247 timer_builder.add_count(0);
248 phased_loop_offsets.emplace_back(timer_builder.Finish());
249 }
250
251 // Pre-fill in the defaults for watchers.
252 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
253 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
254 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
255 timing::CreateStatistic(fbb);
256 flatbuffers::Offset<timing::Statistic> handler_time_offset =
257 timing::CreateStatistic(fbb);
258
259 timing::Watcher::Builder watcher_builder(fbb);
260
261 watcher_builder.add_channel_index(watcher->channel_index());
262 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
263 watcher_builder.add_handler_time(handler_time_offset);
264 watcher_builder.add_count(0);
265 watcher_offsets.emplace_back(watcher_builder.Finish());
266 }
267
268 // Pre-fill in the defaults for senders.
269 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
270 for (const RawSender *sender : senders_) {
271 flatbuffers::Offset<timing::Statistic> size_offset =
272 timing::CreateStatistic(fbb);
273
274 timing::Sender::Builder sender_builder(fbb);
275
276 sender_builder.add_channel_index(sender->timing_.channel_index);
277 sender_builder.add_size(size_offset);
278 sender_builder.add_count(0);
279 sender_offsets.emplace_back(sender_builder.Finish());
280 }
281
282 // Pre-fill in the defaults for fetchers.
283 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
284 for (RawFetcher *fetcher : fetchers_) {
285 flatbuffers::Offset<timing::Statistic> latency_offset =
286 timing::CreateStatistic(fbb);
287
288 timing::Fetcher::Builder fetcher_builder(fbb);
289
290 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
291 fetcher_builder.add_count(0);
292 fetcher_builder.add_latency(latency_offset);
293 fetcher_offsets.emplace_back(fetcher_builder.Finish());
294 }
295
296 // Then build the final report.
297 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
298 timers_offset;
299 if (timer_offsets.size() > 0) {
300 timers_offset = fbb.CreateVector(timer_offsets);
301 }
302
303 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
304 phased_loops_offset;
305 if (phased_loop_offsets.size() > 0) {
306 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
307 }
308
309 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
310 watchers_offset;
311 if (watcher_offsets.size() > 0) {
312 watchers_offset = fbb.CreateVector(watcher_offsets);
313 }
314
315 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
316 senders_offset;
317 if (sender_offsets.size() > 0) {
318 senders_offset = fbb.CreateVector(sender_offsets);
319 }
320
321 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
322 fetchers_offset;
323 if (fetcher_offsets.size() > 0) {
324 fetchers_offset = fbb.CreateVector(fetcher_offsets);
325 }
326
327 flatbuffers::Offset<flatbuffers::String> name_offset =
328 fbb.CreateString(name());
329
330 timing::Report::Builder report_builder(fbb);
331 report_builder.add_name(name_offset);
332 report_builder.add_pid(GetTid());
333 if (timer_offsets.size() > 0) {
334 report_builder.add_timers(timers_offset);
335 }
336 if (phased_loop_offsets.size() > 0) {
337 report_builder.add_phased_loops(phased_loops_offset);
338 }
339 if (watcher_offsets.size() > 0) {
340 report_builder.add_watchers(watchers_offset);
341 }
342 if (sender_offsets.size() > 0) {
343 report_builder.add_senders(senders_offset);
344 }
345 if (fetcher_offsets.size() > 0) {
346 report_builder.add_fetchers(fetchers_offset);
347 }
348 fbb.Finish(report_builder.Finish());
349
350 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
351
352 // Now that the pointers are stable, pass them to the timers and watchers to
353 // be updated.
354 for (size_t i = 0; i < timers_.size(); ++i) {
355 timers_[i]->timing_.set_timing_report(
356 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
357 i));
358 }
359
360 for (size_t i = 0; i < phased_loops_.size(); ++i) {
361 phased_loops_[i]->timing_.set_timing_report(
362 timing_report_.mutable_message()
363 ->mutable_phased_loops()
364 ->GetMutableObject(i));
365 }
366
367 for (size_t i = 0; i < watchers_.size(); ++i) {
368 watchers_[i]->set_timing_report(
369 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
370 i));
371 }
372
373 for (size_t i = 0; i < senders_.size(); ++i) {
374 senders_[i]->timing_.set_timing_report(
375 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
376 i));
377 }
378
379 for (size_t i = 0; i < fetchers_.size(); ++i) {
380 fetchers_[i]->timing_.set_timing_report(
381 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
382 i));
383 }
384}
385
386void EventLoop::MaybeScheduleTimingReports() {
387 if (FLAGS_timing_reports && !skip_timing_report_) {
388 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
389 // Make a raw sender for the report.
390 const Channel *channel = configuration::GetChannel(
391 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800392 name(), node());
393
394 // Since we are using a RawSender, validity isn't checked. So check it
395 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800396 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
397 LOG(FATAL) << "Channel { \"name\": \"/aos"
398 << channel->name()->string_view() << "\", \"type\": \""
399 << channel->type()->string_view()
400 << "\" } is not able to be sent on this node. Check your "
401 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800402 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800403 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
404 << timing::Report::GetFullyQualifiedName()
405 << "\" } not found in config.";
406 timing_report_sender_ = MakeRawSender(channel);
407
408 // Register a handler which sends the report out by copying the raw data
409 // from the prebuilt and subsequently modified report.
410 TimerHandler *timing_reports_timer =
411 AddTimer([this]() { SendTimingReport(); });
412
413 // Set it up to send once per second.
414 timing_reports_timer->set_name("timing_reports");
415 OnRun([this, timing_reports_timer]() {
416 timing_reports_timer->Setup(
417 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
418 std::chrono::milliseconds(FLAGS_timing_report_ms));
419 });
420
421 UpdateTimingReport();
422 }
423}
424
Austin Schuh7d87b672019-12-01 20:23:49 -0800425void EventLoop::ReserveEvents() {
426 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
427}
428
429namespace {
430bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
431 return first->event_time() > second->event_time();
432}
433} // namespace
434
435void EventLoop::AddEvent(EventLoopEvent *event) {
436 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
437 events_.push_back(event);
438 std::push_heap(events_.begin(), events_.end(), CompareEvents);
439}
440
441void EventLoop::RemoveEvent(EventLoopEvent *event) {
442 auto e = std::find(events_.begin(), events_.end(), event);
443 if (e != events_.end()) {
444 events_.erase(e);
445 std::make_heap(events_.begin(), events_.end(), CompareEvents);
446 event->Invalidate();
447 }
448}
449
450EventLoopEvent *EventLoop::PopEvent() {
451 EventLoopEvent *result = events_.front();
452 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
453 events_.pop_back();
454 result->Invalidate();
455 return result;
456}
457
Austin Schuh39788ff2019-12-01 18:22:57 -0800458void WatcherState::set_timing_report(timing::Watcher *watcher) {
459 CHECK_NOTNULL(watcher);
460 watcher_ = watcher;
461 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
462 handler_time_.set_statistic(watcher->mutable_handler_time());
463}
464
465void WatcherState::ResetReport() {
466 wakeup_latency_.Reset();
467 handler_time_.Reset();
468 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800469}
470
471} // namespace aos