blob: 67c4472c99b1977438cc2c5c3a5167b331da96c7 [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08006#include "glog/logging.h"
7
Austin Schuh39788ff2019-12-01 18:22:57 -08008DEFINE_bool(timing_reports, true, "Publish timing reports.");
9DEFINE_int32(timing_report_ms, 1000,
10 "Period in milliseconds to publish timing reports at.");
11
Austin Schuh54cf95f2019-11-29 13:14:18 -080012namespace aos {
Austin Schuhd54780b2020-10-03 16:26:02 -070013namespace {
14void CheckAlignment(const Channel *channel) {
15 if (channel->max_size() % alignof(flatbuffers::largest_scalar_t) != 0) {
16 LOG(FATAL) << "max_size() (" << channel->max_size()
17 << ") is not a multiple of alignment ("
18 << alignof(flatbuffers::largest_scalar_t) << ") for channel "
19 << configuration::CleanedChannelToString(channel) << ".";
20 }
21}
milind1f1dca32021-07-03 13:50:07 -070022
23std::string_view ErrorToString(const RawSender::Error err) {
24 switch (err) {
25 case RawSender::Error::kOk:
26 return "RawSender::Error::kOk";
27 case RawSender::Error::kMessagesSentTooFast:
28 return "RawSender::Error::kMessagesSentTooFast";
29 }
30 LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
31}
Austin Schuhd54780b2020-10-03 16:26:02 -070032} // namespace
Austin Schuh54cf95f2019-11-29 13:14:18 -080033
milind1f1dca32021-07-03 13:50:07 -070034std::ostream &operator<<(std::ostream &os, const RawSender::Error err) {
35 os << ErrorToString(err);
36 return os;
37}
38
39void RawSender::CheckOk(const RawSender::Error err) {
40 CHECK_EQ(err, Error::kOk) << "Messages were sent too fast on channel: "
41 << configuration::CleanedChannelToString(channel_);
42}
43
Austin Schuh39788ff2019-12-01 18:22:57 -080044RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
45 : event_loop_(event_loop),
46 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050047 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080048 timing_(event_loop_->ChannelIndex(channel)) {
49 event_loop_->NewSender(this);
50}
51
52RawSender::~RawSender() { event_loop_->DeleteSender(this); }
53
milind1f1dca32021-07-03 13:50:07 -070054RawSender::Error RawSender::DoSend(
55 const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
56 realtime_clock::time_point realtime_remote_time,
57 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070058 return DoSend(data->data(), data->size(), monotonic_remote_time,
59 realtime_remote_time, remote_queue_index, source_boot_uuid);
60}
61
Austin Schuh39788ff2019-12-01 18:22:57 -080062RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
63 : event_loop_(event_loop),
64 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050065 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080066 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080067 context_.monotonic_event_time = monotonic_clock::min_time;
68 context_.monotonic_remote_time = monotonic_clock::min_time;
69 context_.realtime_event_time = realtime_clock::min_time;
70 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080071 context_.queue_index = 0xffffffff;
72 context_.size = 0;
73 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -070074 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -080075 event_loop_->NewFetcher(this);
76}
77
78RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
79
80TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
81 : event_loop_(event_loop), fn_(std::move(fn)) {}
82
83TimerHandler::~TimerHandler() {}
84
85PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
86 std::function<void(int)> fn,
87 const monotonic_clock::duration interval,
88 const monotonic_clock::duration offset)
89 : event_loop_(event_loop),
90 fn_(std::move(fn)),
91 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
92 event_loop_->OnRun([this]() {
93 const monotonic_clock::time_point monotonic_now =
94 event_loop_->monotonic_now();
95 phased_loop_.Reset(monotonic_now);
96 Reschedule(
97 [this](monotonic_clock::time_point sleep_time) {
98 Schedule(sleep_time);
99 },
100 monotonic_now);
Milind Upadhyay42589bb2021-05-19 20:05:16 -0700101 // Reschedule here will count cycles elapsed before now, and then the
102 // reschedule before running the handler will count the time that elapsed
103 // then. So clear the count here.
Austin Schuh39788ff2019-12-01 18:22:57 -0800104 cycles_elapsed_ = 0;
105 });
106}
107
108PhasedLoopHandler::~PhasedLoopHandler() {}
109
Austin Schuh83c7f702021-01-19 22:36:29 -0800110EventLoop::EventLoop(const Configuration *configuration)
111 : timing_report_(flatbuffers::DetachedBuffer()),
Austin Schuh56196432020-10-24 20:15:21 -0700112 configuration_(configuration) {}
Tyler Chatow67ddb032020-01-12 14:30:04 -0800113
Austin Schuh39788ff2019-12-01 18:22:57 -0800114EventLoop::~EventLoop() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800115 if (!senders_.empty()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700116 for (const RawSender *sender : senders_) {
117 LOG(ERROR) << " Sender "
118 << configuration::StrippedChannelToString(sender->channel())
119 << " still open";
120 }
121 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800122 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -0800123 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -0800124}
125
Brian Silvermanbf889922021-11-10 12:41:57 -0800126void EventLoop::SkipTimingReport() {
127 skip_timing_report_ = true;
128 timing_report_ = flatbuffers::DetachedBuffer();
129
130 for (size_t i = 0; i < timers_.size(); ++i) {
131 timers_[i]->timing_.set_timing_report(nullptr);
132 }
133
134 for (size_t i = 0; i < phased_loops_.size(); ++i) {
135 phased_loops_[i]->timing_.set_timing_report(nullptr);
136 }
137
138 for (size_t i = 0; i < watchers_.size(); ++i) {
139 watchers_[i]->set_timing_report(nullptr);
140 }
141
142 for (size_t i = 0; i < senders_.size(); ++i) {
143 senders_[i]->timing_.set_timing_report(nullptr);
144 }
145
146 for (size_t i = 0; i < fetchers_.size(); ++i) {
147 fetchers_[i]->timing_.set_timing_report(nullptr);
148 }
149}
150
Austin Schuh39788ff2019-12-01 18:22:57 -0800151int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -0800152 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800153}
154
Brian Silverman5120afb2020-01-31 17:44:35 -0800155WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
156 const int channel_index = ChannelIndex(channel);
157 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
158 if (watcher->channel_index() == channel_index) {
159 return watcher.get();
160 }
161 }
162 LOG(FATAL) << "No watcher found for channel";
163}
164
Austin Schuh39788ff2019-12-01 18:22:57 -0800165void EventLoop::NewSender(RawSender *sender) {
166 senders_.emplace_back(sender);
167 UpdateTimingReport();
168}
169void EventLoop::DeleteSender(RawSender *sender) {
170 CHECK(!is_running());
171 auto s = std::find(senders_.begin(), senders_.end(), sender);
172 CHECK(s != senders_.end()) << ": Sender not in senders list";
173 senders_.erase(s);
174 UpdateTimingReport();
175}
176
177TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
178 timers_.emplace_back(std::move(timer));
179 UpdateTimingReport();
180 return timers_.back().get();
181}
182
183PhasedLoopHandler *EventLoop::NewPhasedLoop(
184 std::unique_ptr<PhasedLoopHandler> phased_loop) {
185 phased_loops_.emplace_back(std::move(phased_loop));
186 UpdateTimingReport();
187 return phased_loops_.back().get();
188}
189
190void EventLoop::NewFetcher(RawFetcher *fetcher) {
Austin Schuhd54780b2020-10-03 16:26:02 -0700191 CheckAlignment(fetcher->channel());
192
Austin Schuh39788ff2019-12-01 18:22:57 -0800193 fetchers_.emplace_back(fetcher);
194 UpdateTimingReport();
195}
196
197void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
198 CHECK(!is_running());
199 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
200 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
201 fetchers_.erase(f);
202 UpdateTimingReport();
203}
204
205WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
206 watchers_.emplace_back(std::move(watcher));
207
208 UpdateTimingReport();
209
210 return watchers_.back().get();
211}
212
Brian Silverman0fc69932020-01-24 21:54:02 -0800213void EventLoop::TakeWatcher(const Channel *channel) {
214 CHECK(!is_running()) << ": Cannot add new objects while running.";
215 ChannelIndex(channel);
216
Austin Schuhd54780b2020-10-03 16:26:02 -0700217 CheckAlignment(channel);
218
Brian Silverman0fc69932020-01-24 21:54:02 -0800219 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800220 << ": " << configuration::CleanedChannelToString(channel)
221 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800222
223 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800224 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800225 << " is already being used.";
226
227 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800228 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800229 << " is not able to be watched on this node. Check your "
230 "configuration.";
231 }
232}
233
234void EventLoop::TakeSender(const Channel *channel) {
235 CHECK(!is_running()) << ": Cannot add new objects while running.";
236 ChannelIndex(channel);
237
Austin Schuhd54780b2020-10-03 16:26:02 -0700238 CheckAlignment(channel);
239
Brian Silverman0fc69932020-01-24 21:54:02 -0800240 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800241 << ": Channel " << configuration::CleanedChannelToString(channel)
242 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800243
244 // We don't care if this is a duplicate.
245 taken_senders_.insert(channel);
246}
247
Austin Schuh39788ff2019-12-01 18:22:57 -0800248void EventLoop::SendTimingReport() {
Brian Silvermance418d02021-11-03 11:25:52 -0700249 if (!timing_report_sender_) {
250 // Timing reports are disabled, so nothing for us to do.
251 return;
252 }
253
Austin Schuh39788ff2019-12-01 18:22:57 -0800254 // We need to do a fancy dance here to get all the accounting to work right.
255 // We want to copy the memory here, but then send after resetting. Otherwise
256 // the send for the timing report won't be counted in the timing report.
257 //
258 // Also, flatbuffers build from the back end. So place this at the back end
259 // of the buffer. We only have to care because we are using this in a very
260 // raw fashion.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800261 CHECK_LE(timing_report_.span().size(), timing_report_sender_->size())
Austin Schuh39788ff2019-12-01 18:22:57 -0800262 << ": Timing report bigger than the sender size.";
Austin Schuhadd6eb32020-11-09 21:24:26 -0800263 std::copy(timing_report_.span().data(),
264 timing_report_.span().data() + timing_report_.span().size(),
Austin Schuh39788ff2019-12-01 18:22:57 -0800265 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
Austin Schuhadd6eb32020-11-09 21:24:26 -0800266 timing_report_sender_->size() - timing_report_.span().size());
Austin Schuh39788ff2019-12-01 18:22:57 -0800267
268 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
269 timer->timing_.ResetTimingReport();
270 }
271 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
272 watcher->ResetReport();
273 }
274 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
275 phased_loop->timing_.ResetTimingReport();
276 }
277 for (RawSender *sender : senders_) {
278 sender->timing_.ResetTimingReport();
279 }
280 for (RawFetcher *fetcher : fetchers_) {
281 fetcher->timing_.ResetTimingReport();
282 }
milind1f1dca32021-07-03 13:50:07 -0700283 // TODO(milind): If we fail to send, we don't want to reset the timing report.
284 // We would need to move the reset after the send, and then find the correct
285 // timing report and set the reports with it instead of letting the sender do
286 // this. If we failed to send, we wouldn't reset or set the reports, so they
287 // can accumalate until the next send.
288 timing_report_failure_counter_.Count(
289 timing_report_sender_->Send(timing_report_.span().size()));
Austin Schuh39788ff2019-12-01 18:22:57 -0800290}
291
292void EventLoop::UpdateTimingReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800293 if (skip_timing_report_) {
294 return;
295 }
296
Austin Schuh39788ff2019-12-01 18:22:57 -0800297 // We need to support senders and fetchers changing while we are setting up
298 // the event loop. Otherwise we can't fetch or send until the loop runs. This
299 // means that on each change, we need to redo all this work. This makes setup
300 // more expensive, but not by all that much on a modern processor.
301
302 // Now, build up a report with everything pre-filled out.
303 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800304 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800305
306 // Pre-fill in the defaults for timers.
307 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
308 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
309 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
310 timing::CreateStatistic(fbb);
311 flatbuffers::Offset<timing::Statistic> handler_time_offset =
312 timing::CreateStatistic(fbb);
313 flatbuffers::Offset<flatbuffers::String> name_offset;
314 if (timer->name().size() != 0) {
315 name_offset = fbb.CreateString(timer->name());
316 }
317
318 timing::Timer::Builder timer_builder(fbb);
319
320 if (timer->name().size() != 0) {
321 timer_builder.add_name(name_offset);
322 }
323 timer_builder.add_wakeup_latency(wakeup_latency_offset);
324 timer_builder.add_handler_time(handler_time_offset);
325 timer_builder.add_count(0);
326 timer_offsets.emplace_back(timer_builder.Finish());
327 }
328
329 // Pre-fill in the defaults for phased_loops.
330 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
331 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
332 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
333 timing::CreateStatistic(fbb);
334 flatbuffers::Offset<timing::Statistic> handler_time_offset =
335 timing::CreateStatistic(fbb);
336 flatbuffers::Offset<flatbuffers::String> name_offset;
337 if (phased_loop->name().size() != 0) {
338 name_offset = fbb.CreateString(phased_loop->name());
339 }
340
341 timing::Timer::Builder timer_builder(fbb);
342
343 if (phased_loop->name().size() != 0) {
344 timer_builder.add_name(name_offset);
345 }
346 timer_builder.add_wakeup_latency(wakeup_latency_offset);
347 timer_builder.add_handler_time(handler_time_offset);
348 timer_builder.add_count(0);
349 phased_loop_offsets.emplace_back(timer_builder.Finish());
350 }
351
352 // Pre-fill in the defaults for watchers.
353 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
354 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
355 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
356 timing::CreateStatistic(fbb);
357 flatbuffers::Offset<timing::Statistic> handler_time_offset =
358 timing::CreateStatistic(fbb);
359
360 timing::Watcher::Builder watcher_builder(fbb);
361
362 watcher_builder.add_channel_index(watcher->channel_index());
363 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
364 watcher_builder.add_handler_time(handler_time_offset);
365 watcher_builder.add_count(0);
366 watcher_offsets.emplace_back(watcher_builder.Finish());
367 }
368
369 // Pre-fill in the defaults for senders.
370 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
371 for (const RawSender *sender : senders_) {
372 flatbuffers::Offset<timing::Statistic> size_offset =
373 timing::CreateStatistic(fbb);
374
375 timing::Sender::Builder sender_builder(fbb);
376
377 sender_builder.add_channel_index(sender->timing_.channel_index);
378 sender_builder.add_size(size_offset);
379 sender_builder.add_count(0);
380 sender_offsets.emplace_back(sender_builder.Finish());
381 }
382
383 // Pre-fill in the defaults for fetchers.
384 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
385 for (RawFetcher *fetcher : fetchers_) {
386 flatbuffers::Offset<timing::Statistic> latency_offset =
387 timing::CreateStatistic(fbb);
388
389 timing::Fetcher::Builder fetcher_builder(fbb);
390
391 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
392 fetcher_builder.add_count(0);
393 fetcher_builder.add_latency(latency_offset);
394 fetcher_offsets.emplace_back(fetcher_builder.Finish());
395 }
396
397 // Then build the final report.
398 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
399 timers_offset;
400 if (timer_offsets.size() > 0) {
401 timers_offset = fbb.CreateVector(timer_offsets);
402 }
403
404 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
405 phased_loops_offset;
406 if (phased_loop_offsets.size() > 0) {
407 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
408 }
409
410 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
411 watchers_offset;
412 if (watcher_offsets.size() > 0) {
413 watchers_offset = fbb.CreateVector(watcher_offsets);
414 }
415
416 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
417 senders_offset;
418 if (sender_offsets.size() > 0) {
419 senders_offset = fbb.CreateVector(sender_offsets);
420 }
421
422 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
423 fetchers_offset;
424 if (fetcher_offsets.size() > 0) {
425 fetchers_offset = fbb.CreateVector(fetcher_offsets);
426 }
427
428 flatbuffers::Offset<flatbuffers::String> name_offset =
429 fbb.CreateString(name());
430
431 timing::Report::Builder report_builder(fbb);
432 report_builder.add_name(name_offset);
433 report_builder.add_pid(GetTid());
434 if (timer_offsets.size() > 0) {
435 report_builder.add_timers(timers_offset);
436 }
437 if (phased_loop_offsets.size() > 0) {
438 report_builder.add_phased_loops(phased_loops_offset);
439 }
440 if (watcher_offsets.size() > 0) {
441 report_builder.add_watchers(watchers_offset);
442 }
443 if (sender_offsets.size() > 0) {
444 report_builder.add_senders(senders_offset);
445 }
446 if (fetcher_offsets.size() > 0) {
447 report_builder.add_fetchers(fetchers_offset);
448 }
milind1f1dca32021-07-03 13:50:07 -0700449 report_builder.add_send_failures(timing_report_failure_counter_.failures());
Austin Schuh39788ff2019-12-01 18:22:57 -0800450 fbb.Finish(report_builder.Finish());
451
452 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
453
454 // Now that the pointers are stable, pass them to the timers and watchers to
455 // be updated.
456 for (size_t i = 0; i < timers_.size(); ++i) {
457 timers_[i]->timing_.set_timing_report(
458 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
459 i));
460 }
461
462 for (size_t i = 0; i < phased_loops_.size(); ++i) {
463 phased_loops_[i]->timing_.set_timing_report(
464 timing_report_.mutable_message()
465 ->mutable_phased_loops()
466 ->GetMutableObject(i));
467 }
468
469 for (size_t i = 0; i < watchers_.size(); ++i) {
470 watchers_[i]->set_timing_report(
471 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
472 i));
473 }
474
475 for (size_t i = 0; i < senders_.size(); ++i) {
476 senders_[i]->timing_.set_timing_report(
477 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
478 i));
479 }
480
481 for (size_t i = 0; i < fetchers_.size(); ++i) {
482 fetchers_[i]->timing_.set_timing_report(
483 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
484 i));
485 }
486}
487
488void EventLoop::MaybeScheduleTimingReports() {
489 if (FLAGS_timing_reports && !skip_timing_report_) {
490 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
491 // Make a raw sender for the report.
492 const Channel *channel = configuration::GetChannel(
493 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800494 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700495 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
496 "\"type\": \"aos.timing.Report\"} on node "
497 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800498
499 // Since we are using a RawSender, validity isn't checked. So check it
500 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800501 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
502 LOG(FATAL) << "Channel { \"name\": \"/aos"
503 << channel->name()->string_view() << "\", \"type\": \""
504 << channel->type()->string_view()
505 << "\" } is not able to be sent on this node. Check your "
506 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800507 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800508 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
509 << timing::Report::GetFullyQualifiedName()
510 << "\" } not found in config.";
511 timing_report_sender_ = MakeRawSender(channel);
512
513 // Register a handler which sends the report out by copying the raw data
514 // from the prebuilt and subsequently modified report.
515 TimerHandler *timing_reports_timer =
516 AddTimer([this]() { SendTimingReport(); });
517
518 // Set it up to send once per second.
519 timing_reports_timer->set_name("timing_reports");
520 OnRun([this, timing_reports_timer]() {
521 timing_reports_timer->Setup(
522 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
523 std::chrono::milliseconds(FLAGS_timing_report_ms));
524 });
525
526 UpdateTimingReport();
527 }
528}
529
Austin Schuh7d87b672019-12-01 20:23:49 -0800530void EventLoop::ReserveEvents() {
531 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
532}
533
534namespace {
535bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700536 if (first->event_time() > second->event_time()) {
537 return true;
538 }
539 if (first->event_time() < second->event_time()) {
540 return false;
541 }
542 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800543}
544} // namespace
545
546void EventLoop::AddEvent(EventLoopEvent *event) {
547 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700548 DCHECK(event->generation() == 0);
549 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800550 events_.push_back(event);
551 std::push_heap(events_.begin(), events_.end(), CompareEvents);
552}
553
554void EventLoop::RemoveEvent(EventLoopEvent *event) {
555 auto e = std::find(events_.begin(), events_.end(), event);
556 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700557 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800558 events_.erase(e);
559 std::make_heap(events_.begin(), events_.end(), CompareEvents);
560 event->Invalidate();
561 }
562}
563
564EventLoopEvent *EventLoop::PopEvent() {
565 EventLoopEvent *result = events_.front();
566 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
567 events_.pop_back();
568 result->Invalidate();
569 return result;
570}
571
Austin Schuha9012be2021-07-21 15:19:11 -0700572void EventLoop::SetTimerContext(
573 monotonic_clock::time_point monotonic_event_time) {
574 context_.monotonic_event_time = monotonic_event_time;
575 context_.monotonic_remote_time = monotonic_clock::min_time;
576 context_.realtime_event_time = realtime_clock::min_time;
577 context_.realtime_remote_time = realtime_clock::min_time;
578 context_.queue_index = 0xffffffffu;
579 context_.size = 0u;
580 context_.data = nullptr;
581 context_.buffer_index = -1;
582 context_.source_boot_uuid = boot_uuid();
583}
584
Austin Schuh39788ff2019-12-01 18:22:57 -0800585void WatcherState::set_timing_report(timing::Watcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800586 watcher_ = watcher;
Brian Silvermanbf889922021-11-10 12:41:57 -0800587 if (!watcher) {
588 wakeup_latency_.set_statistic(nullptr);
589 handler_time_.set_statistic(nullptr);
590 } else {
591 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
592 handler_time_.set_statistic(watcher->mutable_handler_time());
593 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800594}
595
596void WatcherState::ResetReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800597 if (!watcher_) {
598 return;
599 }
600
Austin Schuh39788ff2019-12-01 18:22:57 -0800601 wakeup_latency_.Reset();
602 handler_time_.Reset();
603 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800604}
605
606} // namespace aos