blob: e368df25f86bdbc723641ade251d98b89ececc9c [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
5#include "glog/logging.h"
6
Austin Schuh39788ff2019-12-01 18:22:57 -08007DEFINE_bool(timing_reports, true, "Publish timing reports.");
8DEFINE_int32(timing_report_ms, 1000,
9 "Period in milliseconds to publish timing reports at.");
10
Austin Schuh54cf95f2019-11-29 13:14:18 -080011namespace aos {
12
Austin Schuh39788ff2019-12-01 18:22:57 -080013RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
14 : event_loop_(event_loop),
15 channel_(channel),
16 timing_(event_loop_->ChannelIndex(channel)) {
17 event_loop_->NewSender(this);
18}
19
20RawSender::~RawSender() { event_loop_->DeleteSender(this); }
21
22RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
23 : event_loop_(event_loop),
24 channel_(channel),
25 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080026 context_.monotonic_event_time = monotonic_clock::min_time;
27 context_.monotonic_remote_time = monotonic_clock::min_time;
28 context_.realtime_event_time = realtime_clock::min_time;
29 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080030 context_.queue_index = 0xffffffff;
31 context_.size = 0;
32 context_.data = nullptr;
33 event_loop_->NewFetcher(this);
34}
35
36RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
37
38TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
39 : event_loop_(event_loop), fn_(std::move(fn)) {}
40
41TimerHandler::~TimerHandler() {}
42
43PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
44 std::function<void(int)> fn,
45 const monotonic_clock::duration interval,
46 const monotonic_clock::duration offset)
47 : event_loop_(event_loop),
48 fn_(std::move(fn)),
49 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
50 event_loop_->OnRun([this]() {
51 const monotonic_clock::time_point monotonic_now =
52 event_loop_->monotonic_now();
53 phased_loop_.Reset(monotonic_now);
54 Reschedule(
55 [this](monotonic_clock::time_point sleep_time) {
56 Schedule(sleep_time);
57 },
58 monotonic_now);
59 // The first time, we'll double count. Reschedule here will count cycles
60 // elapsed before now, and then the reschedule before runing the handler
61 // will count the time that elapsed then. So clear the count here.
62 cycles_elapsed_ = 0;
63 });
64}
65
66PhasedLoopHandler::~PhasedLoopHandler() {}
67
68EventLoop::~EventLoop() {
69 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -080070 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -080071}
72
73int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuh54cf95f2019-11-29 13:14:18 -080074 CHECK(configuration_->channels() != nullptr) << ": No channels";
75
Austin Schuh39788ff2019-12-01 18:22:57 -080076 auto c = std::find(configuration_->channels()->begin(),
77 configuration_->channels()->end(), channel);
78 CHECK(c != configuration_->channels()->end())
Austin Schuh54cf95f2019-11-29 13:14:18 -080079 << ": Channel pointer not found in configuration()->channels()";
Austin Schuh39788ff2019-12-01 18:22:57 -080080
81 return std::distance(configuration()->channels()->begin(), c);
82}
83
84void EventLoop::NewSender(RawSender *sender) {
85 senders_.emplace_back(sender);
86 UpdateTimingReport();
87}
88void EventLoop::DeleteSender(RawSender *sender) {
89 CHECK(!is_running());
90 auto s = std::find(senders_.begin(), senders_.end(), sender);
91 CHECK(s != senders_.end()) << ": Sender not in senders list";
92 senders_.erase(s);
93 UpdateTimingReport();
94}
95
96TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
97 timers_.emplace_back(std::move(timer));
98 UpdateTimingReport();
99 return timers_.back().get();
100}
101
102PhasedLoopHandler *EventLoop::NewPhasedLoop(
103 std::unique_ptr<PhasedLoopHandler> phased_loop) {
104 phased_loops_.emplace_back(std::move(phased_loop));
105 UpdateTimingReport();
106 return phased_loops_.back().get();
107}
108
109void EventLoop::NewFetcher(RawFetcher *fetcher) {
110 fetchers_.emplace_back(fetcher);
111 UpdateTimingReport();
112}
113
114void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
115 CHECK(!is_running());
116 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
117 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
118 fetchers_.erase(f);
119 UpdateTimingReport();
120}
121
122WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
123 watchers_.emplace_back(std::move(watcher));
124
125 UpdateTimingReport();
126
127 return watchers_.back().get();
128}
129
Brian Silverman0fc69932020-01-24 21:54:02 -0800130void EventLoop::TakeWatcher(const Channel *channel) {
131 CHECK(!is_running()) << ": Cannot add new objects while running.";
132 ChannelIndex(channel);
133
134 CHECK(taken_senders_.find(channel) == taken_senders_.end())
135 << ": " << FlatbufferToJson(channel) << " is already being used.";
136
137 auto result = taken_watchers_.insert(channel);
138 CHECK(result.second) << ": " << FlatbufferToJson(channel)
139 << " is already being used.";
140
141 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
142 LOG(FATAL) << ": " << FlatbufferToJson(channel)
143 << " is not able to be watched on this node. Check your "
144 "configuration.";
145 }
146}
147
148void EventLoop::TakeSender(const Channel *channel) {
149 CHECK(!is_running()) << ": Cannot add new objects while running.";
150 ChannelIndex(channel);
151
152 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
153 << ": Channel " << FlatbufferToJson(channel) << " is already being used.";
154
155 // We don't care if this is a duplicate.
156 taken_senders_.insert(channel);
157}
158
Austin Schuh39788ff2019-12-01 18:22:57 -0800159void EventLoop::SendTimingReport() {
160 // We need to do a fancy dance here to get all the accounting to work right.
161 // We want to copy the memory here, but then send after resetting. Otherwise
162 // the send for the timing report won't be counted in the timing report.
163 //
164 // Also, flatbuffers build from the back end. So place this at the back end
165 // of the buffer. We only have to care because we are using this in a very
166 // raw fashion.
167 CHECK_LE(timing_report_.size(), timing_report_sender_->size())
168 << ": Timing report bigger than the sender size.";
169 std::copy(timing_report_.data(),
170 timing_report_.data() + timing_report_.size(),
171 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
172 timing_report_sender_->size() - timing_report_.size());
173
174 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
175 timer->timing_.ResetTimingReport();
176 }
177 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
178 watcher->ResetReport();
179 }
180 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
181 phased_loop->timing_.ResetTimingReport();
182 }
183 for (RawSender *sender : senders_) {
184 sender->timing_.ResetTimingReport();
185 }
186 for (RawFetcher *fetcher : fetchers_) {
187 fetcher->timing_.ResetTimingReport();
188 }
189 timing_report_sender_->Send(timing_report_.size());
190}
191
192void EventLoop::UpdateTimingReport() {
193 // We need to support senders and fetchers changing while we are setting up
194 // the event loop. Otherwise we can't fetch or send until the loop runs. This
195 // means that on each change, we need to redo all this work. This makes setup
196 // more expensive, but not by all that much on a modern processor.
197
198 // Now, build up a report with everything pre-filled out.
199 flatbuffers::FlatBufferBuilder fbb;
200 fbb.ForceDefaults(1);
201
202 // Pre-fill in the defaults for timers.
203 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
204 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
205 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
206 timing::CreateStatistic(fbb);
207 flatbuffers::Offset<timing::Statistic> handler_time_offset =
208 timing::CreateStatistic(fbb);
209 flatbuffers::Offset<flatbuffers::String> name_offset;
210 if (timer->name().size() != 0) {
211 name_offset = fbb.CreateString(timer->name());
212 }
213
214 timing::Timer::Builder timer_builder(fbb);
215
216 if (timer->name().size() != 0) {
217 timer_builder.add_name(name_offset);
218 }
219 timer_builder.add_wakeup_latency(wakeup_latency_offset);
220 timer_builder.add_handler_time(handler_time_offset);
221 timer_builder.add_count(0);
222 timer_offsets.emplace_back(timer_builder.Finish());
223 }
224
225 // Pre-fill in the defaults for phased_loops.
226 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
227 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
228 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
229 timing::CreateStatistic(fbb);
230 flatbuffers::Offset<timing::Statistic> handler_time_offset =
231 timing::CreateStatistic(fbb);
232 flatbuffers::Offset<flatbuffers::String> name_offset;
233 if (phased_loop->name().size() != 0) {
234 name_offset = fbb.CreateString(phased_loop->name());
235 }
236
237 timing::Timer::Builder timer_builder(fbb);
238
239 if (phased_loop->name().size() != 0) {
240 timer_builder.add_name(name_offset);
241 }
242 timer_builder.add_wakeup_latency(wakeup_latency_offset);
243 timer_builder.add_handler_time(handler_time_offset);
244 timer_builder.add_count(0);
245 phased_loop_offsets.emplace_back(timer_builder.Finish());
246 }
247
248 // Pre-fill in the defaults for watchers.
249 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
250 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
251 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
252 timing::CreateStatistic(fbb);
253 flatbuffers::Offset<timing::Statistic> handler_time_offset =
254 timing::CreateStatistic(fbb);
255
256 timing::Watcher::Builder watcher_builder(fbb);
257
258 watcher_builder.add_channel_index(watcher->channel_index());
259 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
260 watcher_builder.add_handler_time(handler_time_offset);
261 watcher_builder.add_count(0);
262 watcher_offsets.emplace_back(watcher_builder.Finish());
263 }
264
265 // Pre-fill in the defaults for senders.
266 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
267 for (const RawSender *sender : senders_) {
268 flatbuffers::Offset<timing::Statistic> size_offset =
269 timing::CreateStatistic(fbb);
270
271 timing::Sender::Builder sender_builder(fbb);
272
273 sender_builder.add_channel_index(sender->timing_.channel_index);
274 sender_builder.add_size(size_offset);
275 sender_builder.add_count(0);
276 sender_offsets.emplace_back(sender_builder.Finish());
277 }
278
279 // Pre-fill in the defaults for fetchers.
280 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
281 for (RawFetcher *fetcher : fetchers_) {
282 flatbuffers::Offset<timing::Statistic> latency_offset =
283 timing::CreateStatistic(fbb);
284
285 timing::Fetcher::Builder fetcher_builder(fbb);
286
287 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
288 fetcher_builder.add_count(0);
289 fetcher_builder.add_latency(latency_offset);
290 fetcher_offsets.emplace_back(fetcher_builder.Finish());
291 }
292
293 // Then build the final report.
294 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
295 timers_offset;
296 if (timer_offsets.size() > 0) {
297 timers_offset = fbb.CreateVector(timer_offsets);
298 }
299
300 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
301 phased_loops_offset;
302 if (phased_loop_offsets.size() > 0) {
303 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
304 }
305
306 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
307 watchers_offset;
308 if (watcher_offsets.size() > 0) {
309 watchers_offset = fbb.CreateVector(watcher_offsets);
310 }
311
312 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
313 senders_offset;
314 if (sender_offsets.size() > 0) {
315 senders_offset = fbb.CreateVector(sender_offsets);
316 }
317
318 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
319 fetchers_offset;
320 if (fetcher_offsets.size() > 0) {
321 fetchers_offset = fbb.CreateVector(fetcher_offsets);
322 }
323
324 flatbuffers::Offset<flatbuffers::String> name_offset =
325 fbb.CreateString(name());
326
327 timing::Report::Builder report_builder(fbb);
328 report_builder.add_name(name_offset);
329 report_builder.add_pid(GetTid());
330 if (timer_offsets.size() > 0) {
331 report_builder.add_timers(timers_offset);
332 }
333 if (phased_loop_offsets.size() > 0) {
334 report_builder.add_phased_loops(phased_loops_offset);
335 }
336 if (watcher_offsets.size() > 0) {
337 report_builder.add_watchers(watchers_offset);
338 }
339 if (sender_offsets.size() > 0) {
340 report_builder.add_senders(senders_offset);
341 }
342 if (fetcher_offsets.size() > 0) {
343 report_builder.add_fetchers(fetchers_offset);
344 }
345 fbb.Finish(report_builder.Finish());
346
347 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
348
349 // Now that the pointers are stable, pass them to the timers and watchers to
350 // be updated.
351 for (size_t i = 0; i < timers_.size(); ++i) {
352 timers_[i]->timing_.set_timing_report(
353 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
354 i));
355 }
356
357 for (size_t i = 0; i < phased_loops_.size(); ++i) {
358 phased_loops_[i]->timing_.set_timing_report(
359 timing_report_.mutable_message()
360 ->mutable_phased_loops()
361 ->GetMutableObject(i));
362 }
363
364 for (size_t i = 0; i < watchers_.size(); ++i) {
365 watchers_[i]->set_timing_report(
366 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
367 i));
368 }
369
370 for (size_t i = 0; i < senders_.size(); ++i) {
371 senders_[i]->timing_.set_timing_report(
372 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
373 i));
374 }
375
376 for (size_t i = 0; i < fetchers_.size(); ++i) {
377 fetchers_[i]->timing_.set_timing_report(
378 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
379 i));
380 }
381}
382
383void EventLoop::MaybeScheduleTimingReports() {
384 if (FLAGS_timing_reports && !skip_timing_report_) {
385 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
386 // Make a raw sender for the report.
387 const Channel *channel = configuration::GetChannel(
388 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800389 name(), node());
390
391 // Since we are using a RawSender, validity isn't checked. So check it
392 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800393 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
394 LOG(FATAL) << "Channel { \"name\": \"/aos"
395 << channel->name()->string_view() << "\", \"type\": \""
396 << channel->type()->string_view()
397 << "\" } is not able to be sent on this node. Check your "
398 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800399 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800400 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
401 << timing::Report::GetFullyQualifiedName()
402 << "\" } not found in config.";
403 timing_report_sender_ = MakeRawSender(channel);
404
405 // Register a handler which sends the report out by copying the raw data
406 // from the prebuilt and subsequently modified report.
407 TimerHandler *timing_reports_timer =
408 AddTimer([this]() { SendTimingReport(); });
409
410 // Set it up to send once per second.
411 timing_reports_timer->set_name("timing_reports");
412 OnRun([this, timing_reports_timer]() {
413 timing_reports_timer->Setup(
414 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
415 std::chrono::milliseconds(FLAGS_timing_report_ms));
416 });
417
418 UpdateTimingReport();
419 }
420}
421
Austin Schuh7d87b672019-12-01 20:23:49 -0800422void EventLoop::ReserveEvents() {
423 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
424}
425
426namespace {
427bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
428 return first->event_time() > second->event_time();
429}
430} // namespace
431
432void EventLoop::AddEvent(EventLoopEvent *event) {
433 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
434 events_.push_back(event);
435 std::push_heap(events_.begin(), events_.end(), CompareEvents);
436}
437
438void EventLoop::RemoveEvent(EventLoopEvent *event) {
439 auto e = std::find(events_.begin(), events_.end(), event);
440 if (e != events_.end()) {
441 events_.erase(e);
442 std::make_heap(events_.begin(), events_.end(), CompareEvents);
443 event->Invalidate();
444 }
445}
446
447EventLoopEvent *EventLoop::PopEvent() {
448 EventLoopEvent *result = events_.front();
449 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
450 events_.pop_back();
451 result->Invalidate();
452 return result;
453}
454
Austin Schuh39788ff2019-12-01 18:22:57 -0800455void WatcherState::set_timing_report(timing::Watcher *watcher) {
456 CHECK_NOTNULL(watcher);
457 watcher_ = watcher;
458 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
459 handler_time_.set_statistic(watcher->mutable_handler_time());
460}
461
462void WatcherState::ResetReport() {
463 wakeup_latency_.Reset();
464 handler_time_.Reset();
465 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800466}
467
468} // namespace aos