blob: 08b0064b3e4e8a2a5918f89d215c3cc9cdb77523 [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08006#include "glog/logging.h"
7
Austin Schuh39788ff2019-12-01 18:22:57 -08008DEFINE_bool(timing_reports, true, "Publish timing reports.");
9DEFINE_int32(timing_report_ms, 1000,
10 "Period in milliseconds to publish timing reports at.");
11
Austin Schuh54cf95f2019-11-29 13:14:18 -080012namespace aos {
Austin Schuhd54780b2020-10-03 16:26:02 -070013namespace {
14void CheckAlignment(const Channel *channel) {
15 if (channel->max_size() % alignof(flatbuffers::largest_scalar_t) != 0) {
16 LOG(FATAL) << "max_size() (" << channel->max_size()
17 << ") is not a multiple of alignment ("
18 << alignof(flatbuffers::largest_scalar_t) << ") for channel "
19 << configuration::CleanedChannelToString(channel) << ".";
20 }
21}
milind1f1dca32021-07-03 13:50:07 -070022
23std::string_view ErrorToString(const RawSender::Error err) {
24 switch (err) {
25 case RawSender::Error::kOk:
26 return "RawSender::Error::kOk";
27 case RawSender::Error::kMessagesSentTooFast:
28 return "RawSender::Error::kMessagesSentTooFast";
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070029 case RawSender::Error::kInvalidRedzone:
30 return "RawSender::Error::kInvalidRedzone";
milind1f1dca32021-07-03 13:50:07 -070031 }
32 LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
33}
Austin Schuhd54780b2020-10-03 16:26:02 -070034} // namespace
Austin Schuh54cf95f2019-11-29 13:14:18 -080035
milind1f1dca32021-07-03 13:50:07 -070036std::ostream &operator<<(std::ostream &os, const RawSender::Error err) {
37 os << ErrorToString(err);
38 return os;
39}
40
41void RawSender::CheckOk(const RawSender::Error err) {
42 CHECK_EQ(err, Error::kOk) << "Messages were sent too fast on channel: "
43 << configuration::CleanedChannelToString(channel_);
44}
45
Austin Schuh39788ff2019-12-01 18:22:57 -080046RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
47 : event_loop_(event_loop),
48 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050049 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080050 timing_(event_loop_->ChannelIndex(channel)) {
51 event_loop_->NewSender(this);
52}
53
54RawSender::~RawSender() { event_loop_->DeleteSender(this); }
55
milind1f1dca32021-07-03 13:50:07 -070056RawSender::Error RawSender::DoSend(
57 const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
58 realtime_clock::time_point realtime_remote_time,
59 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070060 return DoSend(data->data(), data->size(), monotonic_remote_time,
61 realtime_remote_time, remote_queue_index, source_boot_uuid);
62}
63
Austin Schuh39788ff2019-12-01 18:22:57 -080064RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
65 : event_loop_(event_loop),
66 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050067 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080068 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080069 context_.monotonic_event_time = monotonic_clock::min_time;
70 context_.monotonic_remote_time = monotonic_clock::min_time;
71 context_.realtime_event_time = realtime_clock::min_time;
72 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080073 context_.queue_index = 0xffffffff;
74 context_.size = 0;
75 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -070076 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -080077 event_loop_->NewFetcher(this);
78}
79
80RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
81
82TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
83 : event_loop_(event_loop), fn_(std::move(fn)) {}
84
85TimerHandler::~TimerHandler() {}
86
87PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
88 std::function<void(int)> fn,
89 const monotonic_clock::duration interval,
90 const monotonic_clock::duration offset)
91 : event_loop_(event_loop),
92 fn_(std::move(fn)),
93 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
94 event_loop_->OnRun([this]() {
95 const monotonic_clock::time_point monotonic_now =
96 event_loop_->monotonic_now();
97 phased_loop_.Reset(monotonic_now);
98 Reschedule(
99 [this](monotonic_clock::time_point sleep_time) {
100 Schedule(sleep_time);
101 },
102 monotonic_now);
Milind Upadhyay42589bb2021-05-19 20:05:16 -0700103 // Reschedule here will count cycles elapsed before now, and then the
104 // reschedule before running the handler will count the time that elapsed
105 // then. So clear the count here.
Austin Schuh39788ff2019-12-01 18:22:57 -0800106 cycles_elapsed_ = 0;
107 });
108}
109
110PhasedLoopHandler::~PhasedLoopHandler() {}
111
Austin Schuh83c7f702021-01-19 22:36:29 -0800112EventLoop::EventLoop(const Configuration *configuration)
113 : timing_report_(flatbuffers::DetachedBuffer()),
Austin Schuh56196432020-10-24 20:15:21 -0700114 configuration_(configuration) {}
Tyler Chatow67ddb032020-01-12 14:30:04 -0800115
Austin Schuh39788ff2019-12-01 18:22:57 -0800116EventLoop::~EventLoop() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800117 if (!senders_.empty()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700118 for (const RawSender *sender : senders_) {
119 LOG(ERROR) << " Sender "
120 << configuration::StrippedChannelToString(sender->channel())
121 << " still open";
122 }
123 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800124 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -0800125 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -0800126}
127
Brian Silvermanbf889922021-11-10 12:41:57 -0800128void EventLoop::SkipTimingReport() {
129 skip_timing_report_ = true;
130 timing_report_ = flatbuffers::DetachedBuffer();
131
132 for (size_t i = 0; i < timers_.size(); ++i) {
133 timers_[i]->timing_.set_timing_report(nullptr);
134 }
135
136 for (size_t i = 0; i < phased_loops_.size(); ++i) {
137 phased_loops_[i]->timing_.set_timing_report(nullptr);
138 }
139
140 for (size_t i = 0; i < watchers_.size(); ++i) {
141 watchers_[i]->set_timing_report(nullptr);
142 }
143
144 for (size_t i = 0; i < senders_.size(); ++i) {
145 senders_[i]->timing_.set_timing_report(nullptr);
146 }
147
148 for (size_t i = 0; i < fetchers_.size(); ++i) {
149 fetchers_[i]->timing_.set_timing_report(nullptr);
150 }
151}
152
Austin Schuh39788ff2019-12-01 18:22:57 -0800153int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -0800154 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800155}
156
Brian Silverman5120afb2020-01-31 17:44:35 -0800157WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
158 const int channel_index = ChannelIndex(channel);
159 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
160 if (watcher->channel_index() == channel_index) {
161 return watcher.get();
162 }
163 }
164 LOG(FATAL) << "No watcher found for channel";
165}
166
Austin Schuh39788ff2019-12-01 18:22:57 -0800167void EventLoop::NewSender(RawSender *sender) {
168 senders_.emplace_back(sender);
169 UpdateTimingReport();
170}
171void EventLoop::DeleteSender(RawSender *sender) {
172 CHECK(!is_running());
173 auto s = std::find(senders_.begin(), senders_.end(), sender);
174 CHECK(s != senders_.end()) << ": Sender not in senders list";
175 senders_.erase(s);
176 UpdateTimingReport();
177}
178
179TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
180 timers_.emplace_back(std::move(timer));
181 UpdateTimingReport();
182 return timers_.back().get();
183}
184
185PhasedLoopHandler *EventLoop::NewPhasedLoop(
186 std::unique_ptr<PhasedLoopHandler> phased_loop) {
187 phased_loops_.emplace_back(std::move(phased_loop));
188 UpdateTimingReport();
189 return phased_loops_.back().get();
190}
191
192void EventLoop::NewFetcher(RawFetcher *fetcher) {
Austin Schuhd54780b2020-10-03 16:26:02 -0700193 CheckAlignment(fetcher->channel());
194
Austin Schuh39788ff2019-12-01 18:22:57 -0800195 fetchers_.emplace_back(fetcher);
196 UpdateTimingReport();
197}
198
199void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
200 CHECK(!is_running());
201 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
202 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
203 fetchers_.erase(f);
204 UpdateTimingReport();
205}
206
207WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
208 watchers_.emplace_back(std::move(watcher));
209
210 UpdateTimingReport();
211
212 return watchers_.back().get();
213}
214
Brian Silverman0fc69932020-01-24 21:54:02 -0800215void EventLoop::TakeWatcher(const Channel *channel) {
216 CHECK(!is_running()) << ": Cannot add new objects while running.";
217 ChannelIndex(channel);
218
Austin Schuhd54780b2020-10-03 16:26:02 -0700219 CheckAlignment(channel);
220
Brian Silverman0fc69932020-01-24 21:54:02 -0800221 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800222 << ": " << configuration::CleanedChannelToString(channel)
223 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800224
225 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800226 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800227 << " is already being used.";
228
229 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800230 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800231 << " is not able to be watched on this node. Check your "
232 "configuration.";
233 }
234}
235
236void EventLoop::TakeSender(const Channel *channel) {
237 CHECK(!is_running()) << ": Cannot add new objects while running.";
238 ChannelIndex(channel);
239
Austin Schuhd54780b2020-10-03 16:26:02 -0700240 CheckAlignment(channel);
241
Brian Silverman0fc69932020-01-24 21:54:02 -0800242 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800243 << ": Channel " << configuration::CleanedChannelToString(channel)
244 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800245
246 // We don't care if this is a duplicate.
247 taken_senders_.insert(channel);
248}
249
Austin Schuh39788ff2019-12-01 18:22:57 -0800250void EventLoop::SendTimingReport() {
Brian Silvermance418d02021-11-03 11:25:52 -0700251 if (!timing_report_sender_) {
252 // Timing reports are disabled, so nothing for us to do.
253 return;
254 }
255
Austin Schuh39788ff2019-12-01 18:22:57 -0800256 // We need to do a fancy dance here to get all the accounting to work right.
257 // We want to copy the memory here, but then send after resetting. Otherwise
258 // the send for the timing report won't be counted in the timing report.
259 //
260 // Also, flatbuffers build from the back end. So place this at the back end
261 // of the buffer. We only have to care because we are using this in a very
262 // raw fashion.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800263 CHECK_LE(timing_report_.span().size(), timing_report_sender_->size())
Austin Schuh39788ff2019-12-01 18:22:57 -0800264 << ": Timing report bigger than the sender size.";
Austin Schuhadd6eb32020-11-09 21:24:26 -0800265 std::copy(timing_report_.span().data(),
266 timing_report_.span().data() + timing_report_.span().size(),
Austin Schuh39788ff2019-12-01 18:22:57 -0800267 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
Austin Schuhadd6eb32020-11-09 21:24:26 -0800268 timing_report_sender_->size() - timing_report_.span().size());
Austin Schuh39788ff2019-12-01 18:22:57 -0800269
270 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
271 timer->timing_.ResetTimingReport();
272 }
273 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
274 watcher->ResetReport();
275 }
276 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
277 phased_loop->timing_.ResetTimingReport();
278 }
279 for (RawSender *sender : senders_) {
280 sender->timing_.ResetTimingReport();
281 }
282 for (RawFetcher *fetcher : fetchers_) {
283 fetcher->timing_.ResetTimingReport();
284 }
milind1f1dca32021-07-03 13:50:07 -0700285 // TODO(milind): If we fail to send, we don't want to reset the timing report.
286 // We would need to move the reset after the send, and then find the correct
287 // timing report and set the reports with it instead of letting the sender do
288 // this. If we failed to send, we wouldn't reset or set the reports, so they
289 // can accumalate until the next send.
290 timing_report_failure_counter_.Count(
291 timing_report_sender_->Send(timing_report_.span().size()));
Austin Schuh39788ff2019-12-01 18:22:57 -0800292}
293
294void EventLoop::UpdateTimingReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800295 if (skip_timing_report_) {
296 return;
297 }
298
Austin Schuh39788ff2019-12-01 18:22:57 -0800299 // We need to support senders and fetchers changing while we are setting up
300 // the event loop. Otherwise we can't fetch or send until the loop runs. This
301 // means that on each change, we need to redo all this work. This makes setup
302 // more expensive, but not by all that much on a modern processor.
303
304 // Now, build up a report with everything pre-filled out.
305 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800306 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800307
308 // Pre-fill in the defaults for timers.
309 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
310 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
311 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
312 timing::CreateStatistic(fbb);
313 flatbuffers::Offset<timing::Statistic> handler_time_offset =
314 timing::CreateStatistic(fbb);
315 flatbuffers::Offset<flatbuffers::String> name_offset;
316 if (timer->name().size() != 0) {
317 name_offset = fbb.CreateString(timer->name());
318 }
319
320 timing::Timer::Builder timer_builder(fbb);
321
322 if (timer->name().size() != 0) {
323 timer_builder.add_name(name_offset);
324 }
325 timer_builder.add_wakeup_latency(wakeup_latency_offset);
326 timer_builder.add_handler_time(handler_time_offset);
327 timer_builder.add_count(0);
328 timer_offsets.emplace_back(timer_builder.Finish());
329 }
330
331 // Pre-fill in the defaults for phased_loops.
332 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
333 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
334 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
335 timing::CreateStatistic(fbb);
336 flatbuffers::Offset<timing::Statistic> handler_time_offset =
337 timing::CreateStatistic(fbb);
338 flatbuffers::Offset<flatbuffers::String> name_offset;
339 if (phased_loop->name().size() != 0) {
340 name_offset = fbb.CreateString(phased_loop->name());
341 }
342
343 timing::Timer::Builder timer_builder(fbb);
344
345 if (phased_loop->name().size() != 0) {
346 timer_builder.add_name(name_offset);
347 }
348 timer_builder.add_wakeup_latency(wakeup_latency_offset);
349 timer_builder.add_handler_time(handler_time_offset);
350 timer_builder.add_count(0);
351 phased_loop_offsets.emplace_back(timer_builder.Finish());
352 }
353
354 // Pre-fill in the defaults for watchers.
355 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
356 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
357 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
358 timing::CreateStatistic(fbb);
359 flatbuffers::Offset<timing::Statistic> handler_time_offset =
360 timing::CreateStatistic(fbb);
361
362 timing::Watcher::Builder watcher_builder(fbb);
363
364 watcher_builder.add_channel_index(watcher->channel_index());
365 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
366 watcher_builder.add_handler_time(handler_time_offset);
367 watcher_builder.add_count(0);
368 watcher_offsets.emplace_back(watcher_builder.Finish());
369 }
370
371 // Pre-fill in the defaults for senders.
372 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
373 for (const RawSender *sender : senders_) {
374 flatbuffers::Offset<timing::Statistic> size_offset =
375 timing::CreateStatistic(fbb);
376
377 timing::Sender::Builder sender_builder(fbb);
378
379 sender_builder.add_channel_index(sender->timing_.channel_index);
380 sender_builder.add_size(size_offset);
381 sender_builder.add_count(0);
382 sender_offsets.emplace_back(sender_builder.Finish());
383 }
384
385 // Pre-fill in the defaults for fetchers.
386 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
387 for (RawFetcher *fetcher : fetchers_) {
388 flatbuffers::Offset<timing::Statistic> latency_offset =
389 timing::CreateStatistic(fbb);
390
391 timing::Fetcher::Builder fetcher_builder(fbb);
392
393 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
394 fetcher_builder.add_count(0);
395 fetcher_builder.add_latency(latency_offset);
396 fetcher_offsets.emplace_back(fetcher_builder.Finish());
397 }
398
399 // Then build the final report.
400 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
401 timers_offset;
402 if (timer_offsets.size() > 0) {
403 timers_offset = fbb.CreateVector(timer_offsets);
404 }
405
406 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
407 phased_loops_offset;
408 if (phased_loop_offsets.size() > 0) {
409 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
410 }
411
412 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
413 watchers_offset;
414 if (watcher_offsets.size() > 0) {
415 watchers_offset = fbb.CreateVector(watcher_offsets);
416 }
417
418 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
419 senders_offset;
420 if (sender_offsets.size() > 0) {
421 senders_offset = fbb.CreateVector(sender_offsets);
422 }
423
424 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
425 fetchers_offset;
426 if (fetcher_offsets.size() > 0) {
427 fetchers_offset = fbb.CreateVector(fetcher_offsets);
428 }
429
430 flatbuffers::Offset<flatbuffers::String> name_offset =
431 fbb.CreateString(name());
432
433 timing::Report::Builder report_builder(fbb);
434 report_builder.add_name(name_offset);
435 report_builder.add_pid(GetTid());
436 if (timer_offsets.size() > 0) {
437 report_builder.add_timers(timers_offset);
438 }
439 if (phased_loop_offsets.size() > 0) {
440 report_builder.add_phased_loops(phased_loops_offset);
441 }
442 if (watcher_offsets.size() > 0) {
443 report_builder.add_watchers(watchers_offset);
444 }
445 if (sender_offsets.size() > 0) {
446 report_builder.add_senders(senders_offset);
447 }
448 if (fetcher_offsets.size() > 0) {
449 report_builder.add_fetchers(fetchers_offset);
450 }
milind1f1dca32021-07-03 13:50:07 -0700451 report_builder.add_send_failures(timing_report_failure_counter_.failures());
Austin Schuh39788ff2019-12-01 18:22:57 -0800452 fbb.Finish(report_builder.Finish());
453
454 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
455
456 // Now that the pointers are stable, pass them to the timers and watchers to
457 // be updated.
458 for (size_t i = 0; i < timers_.size(); ++i) {
459 timers_[i]->timing_.set_timing_report(
460 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
461 i));
462 }
463
464 for (size_t i = 0; i < phased_loops_.size(); ++i) {
465 phased_loops_[i]->timing_.set_timing_report(
466 timing_report_.mutable_message()
467 ->mutable_phased_loops()
468 ->GetMutableObject(i));
469 }
470
471 for (size_t i = 0; i < watchers_.size(); ++i) {
472 watchers_[i]->set_timing_report(
473 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
474 i));
475 }
476
477 for (size_t i = 0; i < senders_.size(); ++i) {
478 senders_[i]->timing_.set_timing_report(
479 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
480 i));
481 }
482
483 for (size_t i = 0; i < fetchers_.size(); ++i) {
484 fetchers_[i]->timing_.set_timing_report(
485 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
486 i));
487 }
488}
489
490void EventLoop::MaybeScheduleTimingReports() {
491 if (FLAGS_timing_reports && !skip_timing_report_) {
492 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
493 // Make a raw sender for the report.
494 const Channel *channel = configuration::GetChannel(
495 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800496 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700497 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
498 "\"type\": \"aos.timing.Report\"} on node "
499 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800500
501 // Since we are using a RawSender, validity isn't checked. So check it
502 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800503 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
504 LOG(FATAL) << "Channel { \"name\": \"/aos"
505 << channel->name()->string_view() << "\", \"type\": \""
506 << channel->type()->string_view()
507 << "\" } is not able to be sent on this node. Check your "
508 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800509 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800510 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
511 << timing::Report::GetFullyQualifiedName()
512 << "\" } not found in config.";
513 timing_report_sender_ = MakeRawSender(channel);
514
515 // Register a handler which sends the report out by copying the raw data
516 // from the prebuilt and subsequently modified report.
517 TimerHandler *timing_reports_timer =
518 AddTimer([this]() { SendTimingReport(); });
519
520 // Set it up to send once per second.
521 timing_reports_timer->set_name("timing_reports");
522 OnRun([this, timing_reports_timer]() {
523 timing_reports_timer->Setup(
524 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
525 std::chrono::milliseconds(FLAGS_timing_report_ms));
526 });
527
528 UpdateTimingReport();
529 }
530}
531
Austin Schuh7d87b672019-12-01 20:23:49 -0800532void EventLoop::ReserveEvents() {
533 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
534}
535
536namespace {
537bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700538 if (first->event_time() > second->event_time()) {
539 return true;
540 }
541 if (first->event_time() < second->event_time()) {
542 return false;
543 }
544 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800545}
546} // namespace
547
548void EventLoop::AddEvent(EventLoopEvent *event) {
549 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700550 DCHECK(event->generation() == 0);
551 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800552 events_.push_back(event);
553 std::push_heap(events_.begin(), events_.end(), CompareEvents);
554}
555
556void EventLoop::RemoveEvent(EventLoopEvent *event) {
557 auto e = std::find(events_.begin(), events_.end(), event);
558 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700559 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800560 events_.erase(e);
561 std::make_heap(events_.begin(), events_.end(), CompareEvents);
562 event->Invalidate();
563 }
564}
565
566EventLoopEvent *EventLoop::PopEvent() {
567 EventLoopEvent *result = events_.front();
568 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
569 events_.pop_back();
570 result->Invalidate();
571 return result;
572}
573
Austin Schuha9012be2021-07-21 15:19:11 -0700574void EventLoop::SetTimerContext(
575 monotonic_clock::time_point monotonic_event_time) {
576 context_.monotonic_event_time = monotonic_event_time;
577 context_.monotonic_remote_time = monotonic_clock::min_time;
578 context_.realtime_event_time = realtime_clock::min_time;
579 context_.realtime_remote_time = realtime_clock::min_time;
580 context_.queue_index = 0xffffffffu;
581 context_.size = 0u;
582 context_.data = nullptr;
583 context_.buffer_index = -1;
584 context_.source_boot_uuid = boot_uuid();
585}
586
Austin Schuh39788ff2019-12-01 18:22:57 -0800587void WatcherState::set_timing_report(timing::Watcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800588 watcher_ = watcher;
Brian Silvermanbf889922021-11-10 12:41:57 -0800589 if (!watcher) {
590 wakeup_latency_.set_statistic(nullptr);
591 handler_time_.set_statistic(nullptr);
592 } else {
593 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
594 handler_time_.set_statistic(watcher->mutable_handler_time());
595 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800596}
597
598void WatcherState::ResetReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800599 if (!watcher_) {
600 return;
601 }
602
Austin Schuh39788ff2019-12-01 18:22:57 -0800603 wakeup_latency_.Reset();
604 handler_time_.Reset();
605 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800606}
607
608} // namespace aos