blob: c29d8207b5642fcadd36e910549f00cd6ce6ceb0 [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh070019a2022-12-20 22:23:09 -08006#include "aos/realtime.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08007#include "glog/logging.h"
8
Austin Schuh39788ff2019-12-01 18:22:57 -08009DEFINE_bool(timing_reports, true, "Publish timing reports.");
10DEFINE_int32(timing_report_ms, 1000,
11 "Period in milliseconds to publish timing reports at.");
12
Austin Schuh54cf95f2019-11-29 13:14:18 -080013namespace aos {
Austin Schuhd54780b2020-10-03 16:26:02 -070014namespace {
15void CheckAlignment(const Channel *channel) {
16 if (channel->max_size() % alignof(flatbuffers::largest_scalar_t) != 0) {
17 LOG(FATAL) << "max_size() (" << channel->max_size()
18 << ") is not a multiple of alignment ("
19 << alignof(flatbuffers::largest_scalar_t) << ") for channel "
20 << configuration::CleanedChannelToString(channel) << ".";
21 }
22}
milind1f1dca32021-07-03 13:50:07 -070023
24std::string_view ErrorToString(const RawSender::Error err) {
25 switch (err) {
26 case RawSender::Error::kOk:
27 return "RawSender::Error::kOk";
28 case RawSender::Error::kMessagesSentTooFast:
29 return "RawSender::Error::kMessagesSentTooFast";
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070030 case RawSender::Error::kInvalidRedzone:
31 return "RawSender::Error::kInvalidRedzone";
milind1f1dca32021-07-03 13:50:07 -070032 }
33 LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
34}
Austin Schuhd54780b2020-10-03 16:26:02 -070035} // namespace
Austin Schuh54cf95f2019-11-29 13:14:18 -080036
milind1f1dca32021-07-03 13:50:07 -070037std::ostream &operator<<(std::ostream &os, const RawSender::Error err) {
38 os << ErrorToString(err);
39 return os;
40}
41
42void RawSender::CheckOk(const RawSender::Error err) {
43 CHECK_EQ(err, Error::kOk) << "Messages were sent too fast on channel: "
44 << configuration::CleanedChannelToString(channel_);
45}
46
Austin Schuh39788ff2019-12-01 18:22:57 -080047RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
48 : event_loop_(event_loop),
49 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050050 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080051 timing_(event_loop_->ChannelIndex(channel)) {
52 event_loop_->NewSender(this);
53}
54
55RawSender::~RawSender() { event_loop_->DeleteSender(this); }
56
milind1f1dca32021-07-03 13:50:07 -070057RawSender::Error RawSender::DoSend(
58 const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
59 realtime_clock::time_point realtime_remote_time,
60 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070061 return DoSend(data->data(), data->size(), monotonic_remote_time,
62 realtime_remote_time, remote_queue_index, source_boot_uuid);
63}
64
James Kuszmaul93abac12022-04-14 15:05:10 -070065void RawSender::RecordSendResult(const Error error, size_t message_size) {
66 switch (error) {
67 case Error::kOk: {
68 if (timing_.sender) {
69 timing_.size.Add(message_size);
70 timing_.sender->mutate_count(timing_.sender->count() + 1);
71 }
72 break;
73 }
74 case Error::kMessagesSentTooFast:
75 timing_.IncrementError(timing::SendError::MESSAGE_SENT_TOO_FAST);
76 break;
77 case Error::kInvalidRedzone:
78 timing_.IncrementError(timing::SendError::INVALID_REDZONE);
79 break;
80 }
81}
82
Austin Schuh39788ff2019-12-01 18:22:57 -080083RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
84 : event_loop_(event_loop),
85 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050086 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080087 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080088 context_.monotonic_event_time = monotonic_clock::min_time;
89 context_.monotonic_remote_time = monotonic_clock::min_time;
90 context_.realtime_event_time = realtime_clock::min_time;
91 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080092 context_.queue_index = 0xffffffff;
Austin Schuh0debde12022-08-17 16:25:17 -070093 context_.remote_queue_index = 0xffffffffu;
Austin Schuh39788ff2019-12-01 18:22:57 -080094 context_.size = 0;
95 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -070096 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -080097 event_loop_->NewFetcher(this);
98}
99
100RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
101
102TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
103 : event_loop_(event_loop), fn_(std::move(fn)) {}
104
105TimerHandler::~TimerHandler() {}
106
107PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
108 std::function<void(int)> fn,
109 const monotonic_clock::duration interval,
110 const monotonic_clock::duration offset)
111 : event_loop_(event_loop),
112 fn_(std::move(fn)),
113 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
114 event_loop_->OnRun([this]() {
115 const monotonic_clock::time_point monotonic_now =
116 event_loop_->monotonic_now();
117 phased_loop_.Reset(monotonic_now);
James Kuszmaul20dcc7c2023-01-20 11:06:31 -0800118 Reschedule(monotonic_now);
Milind Upadhyay42589bb2021-05-19 20:05:16 -0700119 // Reschedule here will count cycles elapsed before now, and then the
120 // reschedule before running the handler will count the time that elapsed
121 // then. So clear the count here.
Austin Schuh39788ff2019-12-01 18:22:57 -0800122 cycles_elapsed_ = 0;
123 });
124}
125
126PhasedLoopHandler::~PhasedLoopHandler() {}
127
Austin Schuh83c7f702021-01-19 22:36:29 -0800128EventLoop::EventLoop(const Configuration *configuration)
129 : timing_report_(flatbuffers::DetachedBuffer()),
Austin Schuh56196432020-10-24 20:15:21 -0700130 configuration_(configuration) {}
Tyler Chatow67ddb032020-01-12 14:30:04 -0800131
Austin Schuh39788ff2019-12-01 18:22:57 -0800132EventLoop::~EventLoop() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800133 if (!senders_.empty()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700134 for (const RawSender *sender : senders_) {
135 LOG(ERROR) << " Sender "
136 << configuration::StrippedChannelToString(sender->channel())
137 << " still open";
138 }
139 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800140 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -0800141 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -0800142}
143
Brian Silvermanbf889922021-11-10 12:41:57 -0800144void EventLoop::SkipTimingReport() {
145 skip_timing_report_ = true;
146 timing_report_ = flatbuffers::DetachedBuffer();
147
148 for (size_t i = 0; i < timers_.size(); ++i) {
149 timers_[i]->timing_.set_timing_report(nullptr);
150 }
151
152 for (size_t i = 0; i < phased_loops_.size(); ++i) {
153 phased_loops_[i]->timing_.set_timing_report(nullptr);
154 }
155
156 for (size_t i = 0; i < watchers_.size(); ++i) {
157 watchers_[i]->set_timing_report(nullptr);
158 }
159
160 for (size_t i = 0; i < senders_.size(); ++i) {
161 senders_[i]->timing_.set_timing_report(nullptr);
162 }
163
164 for (size_t i = 0; i < fetchers_.size(); ++i) {
165 fetchers_[i]->timing_.set_timing_report(nullptr);
166 }
167}
168
Austin Schuh39788ff2019-12-01 18:22:57 -0800169int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -0800170 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800171}
172
Brian Silverman5120afb2020-01-31 17:44:35 -0800173WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
174 const int channel_index = ChannelIndex(channel);
175 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
176 if (watcher->channel_index() == channel_index) {
177 return watcher.get();
178 }
179 }
180 LOG(FATAL) << "No watcher found for channel";
181}
182
Austin Schuh39788ff2019-12-01 18:22:57 -0800183void EventLoop::NewSender(RawSender *sender) {
184 senders_.emplace_back(sender);
185 UpdateTimingReport();
186}
187void EventLoop::DeleteSender(RawSender *sender) {
188 CHECK(!is_running());
189 auto s = std::find(senders_.begin(), senders_.end(), sender);
190 CHECK(s != senders_.end()) << ": Sender not in senders list";
191 senders_.erase(s);
192 UpdateTimingReport();
193}
194
195TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
196 timers_.emplace_back(std::move(timer));
197 UpdateTimingReport();
198 return timers_.back().get();
199}
200
201PhasedLoopHandler *EventLoop::NewPhasedLoop(
202 std::unique_ptr<PhasedLoopHandler> phased_loop) {
203 phased_loops_.emplace_back(std::move(phased_loop));
204 UpdateTimingReport();
205 return phased_loops_.back().get();
206}
207
208void EventLoop::NewFetcher(RawFetcher *fetcher) {
Austin Schuhd54780b2020-10-03 16:26:02 -0700209 CheckAlignment(fetcher->channel());
210
Austin Schuh39788ff2019-12-01 18:22:57 -0800211 fetchers_.emplace_back(fetcher);
212 UpdateTimingReport();
213}
214
215void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
216 CHECK(!is_running());
217 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
218 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
219 fetchers_.erase(f);
220 UpdateTimingReport();
221}
222
223WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
224 watchers_.emplace_back(std::move(watcher));
225
226 UpdateTimingReport();
227
228 return watchers_.back().get();
229}
230
Brian Silverman0fc69932020-01-24 21:54:02 -0800231void EventLoop::TakeWatcher(const Channel *channel) {
232 CHECK(!is_running()) << ": Cannot add new objects while running.";
233 ChannelIndex(channel);
234
Austin Schuhd54780b2020-10-03 16:26:02 -0700235 CheckAlignment(channel);
236
Brian Silverman0fc69932020-01-24 21:54:02 -0800237 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800238 << ": " << configuration::CleanedChannelToString(channel)
milind-u5dbdba42023-02-04 17:48:43 -0800239 << " is already being used for sending. Can't make a watcher on the "
240 "same event loop.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800241
242 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800243 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800244 << " is already being used.";
245
246 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800247 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800248 << " is not able to be watched on this node. Check your "
249 "configuration.";
250 }
251}
252
253void EventLoop::TakeSender(const Channel *channel) {
254 CHECK(!is_running()) << ": Cannot add new objects while running.";
255 ChannelIndex(channel);
256
Austin Schuhd54780b2020-10-03 16:26:02 -0700257 CheckAlignment(channel);
258
Brian Silverman0fc69932020-01-24 21:54:02 -0800259 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800260 << ": Channel " << configuration::CleanedChannelToString(channel)
261 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800262
263 // We don't care if this is a duplicate.
264 taken_senders_.insert(channel);
265}
266
Austin Schuh39788ff2019-12-01 18:22:57 -0800267void EventLoop::SendTimingReport() {
Brian Silvermance418d02021-11-03 11:25:52 -0700268 if (!timing_report_sender_) {
269 // Timing reports are disabled, so nothing for us to do.
270 return;
271 }
272
Austin Schuh39788ff2019-12-01 18:22:57 -0800273 // We need to do a fancy dance here to get all the accounting to work right.
274 // We want to copy the memory here, but then send after resetting. Otherwise
275 // the send for the timing report won't be counted in the timing report.
276 //
277 // Also, flatbuffers build from the back end. So place this at the back end
278 // of the buffer. We only have to care because we are using this in a very
279 // raw fashion.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800280 CHECK_LE(timing_report_.span().size(), timing_report_sender_->size())
Austin Schuh39788ff2019-12-01 18:22:57 -0800281 << ": Timing report bigger than the sender size.";
Austin Schuhadd6eb32020-11-09 21:24:26 -0800282 std::copy(timing_report_.span().data(),
283 timing_report_.span().data() + timing_report_.span().size(),
Austin Schuh39788ff2019-12-01 18:22:57 -0800284 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
Austin Schuhadd6eb32020-11-09 21:24:26 -0800285 timing_report_sender_->size() - timing_report_.span().size());
Austin Schuh39788ff2019-12-01 18:22:57 -0800286
287 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
288 timer->timing_.ResetTimingReport();
289 }
290 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
291 watcher->ResetReport();
292 }
293 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
294 phased_loop->timing_.ResetTimingReport();
295 }
296 for (RawSender *sender : senders_) {
297 sender->timing_.ResetTimingReport();
298 }
299 for (RawFetcher *fetcher : fetchers_) {
300 fetcher->timing_.ResetTimingReport();
301 }
milind1f1dca32021-07-03 13:50:07 -0700302 // TODO(milind): If we fail to send, we don't want to reset the timing report.
303 // We would need to move the reset after the send, and then find the correct
304 // timing report and set the reports with it instead of letting the sender do
305 // this. If we failed to send, we wouldn't reset or set the reports, so they
306 // can accumalate until the next send.
307 timing_report_failure_counter_.Count(
308 timing_report_sender_->Send(timing_report_.span().size()));
Austin Schuh39788ff2019-12-01 18:22:57 -0800309}
310
311void EventLoop::UpdateTimingReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800312 if (skip_timing_report_) {
313 return;
314 }
315
Austin Schuh39788ff2019-12-01 18:22:57 -0800316 // We need to support senders and fetchers changing while we are setting up
317 // the event loop. Otherwise we can't fetch or send until the loop runs. This
318 // means that on each change, we need to redo all this work. This makes setup
319 // more expensive, but not by all that much on a modern processor.
320
321 // Now, build up a report with everything pre-filled out.
322 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800323 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800324
325 // Pre-fill in the defaults for timers.
326 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
327 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
328 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
329 timing::CreateStatistic(fbb);
330 flatbuffers::Offset<timing::Statistic> handler_time_offset =
331 timing::CreateStatistic(fbb);
332 flatbuffers::Offset<flatbuffers::String> name_offset;
333 if (timer->name().size() != 0) {
334 name_offset = fbb.CreateString(timer->name());
335 }
336
337 timing::Timer::Builder timer_builder(fbb);
338
339 if (timer->name().size() != 0) {
340 timer_builder.add_name(name_offset);
341 }
342 timer_builder.add_wakeup_latency(wakeup_latency_offset);
343 timer_builder.add_handler_time(handler_time_offset);
344 timer_builder.add_count(0);
345 timer_offsets.emplace_back(timer_builder.Finish());
346 }
347
348 // Pre-fill in the defaults for phased_loops.
349 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
350 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
351 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
352 timing::CreateStatistic(fbb);
353 flatbuffers::Offset<timing::Statistic> handler_time_offset =
354 timing::CreateStatistic(fbb);
355 flatbuffers::Offset<flatbuffers::String> name_offset;
356 if (phased_loop->name().size() != 0) {
357 name_offset = fbb.CreateString(phased_loop->name());
358 }
359
360 timing::Timer::Builder timer_builder(fbb);
361
362 if (phased_loop->name().size() != 0) {
363 timer_builder.add_name(name_offset);
364 }
365 timer_builder.add_wakeup_latency(wakeup_latency_offset);
366 timer_builder.add_handler_time(handler_time_offset);
367 timer_builder.add_count(0);
368 phased_loop_offsets.emplace_back(timer_builder.Finish());
369 }
370
371 // Pre-fill in the defaults for watchers.
372 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
373 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
374 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
375 timing::CreateStatistic(fbb);
376 flatbuffers::Offset<timing::Statistic> handler_time_offset =
377 timing::CreateStatistic(fbb);
378
379 timing::Watcher::Builder watcher_builder(fbb);
380
381 watcher_builder.add_channel_index(watcher->channel_index());
382 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
383 watcher_builder.add_handler_time(handler_time_offset);
384 watcher_builder.add_count(0);
385 watcher_offsets.emplace_back(watcher_builder.Finish());
386 }
387
388 // Pre-fill in the defaults for senders.
389 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
James Kuszmaulcc94ed42022-08-24 11:36:17 -0700390 for (RawSender *sender : senders_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800391 flatbuffers::Offset<timing::Statistic> size_offset =
392 timing::CreateStatistic(fbb);
393
James Kuszmaul78514332022-04-06 15:08:34 -0700394 const flatbuffers::Offset<
395 flatbuffers::Vector<flatbuffers::Offset<timing::SendErrorCount>>>
James Kuszmaulcc94ed42022-08-24 11:36:17 -0700396 error_counts_offset = sender->timing_.error_counter.Initialize(&fbb);
James Kuszmaul78514332022-04-06 15:08:34 -0700397
Austin Schuh39788ff2019-12-01 18:22:57 -0800398 timing::Sender::Builder sender_builder(fbb);
399
400 sender_builder.add_channel_index(sender->timing_.channel_index);
401 sender_builder.add_size(size_offset);
James Kuszmaul78514332022-04-06 15:08:34 -0700402 sender_builder.add_error_counts(error_counts_offset);
Austin Schuh39788ff2019-12-01 18:22:57 -0800403 sender_builder.add_count(0);
404 sender_offsets.emplace_back(sender_builder.Finish());
405 }
406
407 // Pre-fill in the defaults for fetchers.
408 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
409 for (RawFetcher *fetcher : fetchers_) {
410 flatbuffers::Offset<timing::Statistic> latency_offset =
411 timing::CreateStatistic(fbb);
412
413 timing::Fetcher::Builder fetcher_builder(fbb);
414
415 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
416 fetcher_builder.add_count(0);
417 fetcher_builder.add_latency(latency_offset);
418 fetcher_offsets.emplace_back(fetcher_builder.Finish());
419 }
420
421 // Then build the final report.
422 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
423 timers_offset;
424 if (timer_offsets.size() > 0) {
425 timers_offset = fbb.CreateVector(timer_offsets);
426 }
427
428 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
429 phased_loops_offset;
430 if (phased_loop_offsets.size() > 0) {
431 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
432 }
433
434 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
435 watchers_offset;
436 if (watcher_offsets.size() > 0) {
437 watchers_offset = fbb.CreateVector(watcher_offsets);
438 }
439
440 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
441 senders_offset;
442 if (sender_offsets.size() > 0) {
443 senders_offset = fbb.CreateVector(sender_offsets);
444 }
445
446 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
447 fetchers_offset;
448 if (fetcher_offsets.size() > 0) {
449 fetchers_offset = fbb.CreateVector(fetcher_offsets);
450 }
451
452 flatbuffers::Offset<flatbuffers::String> name_offset =
453 fbb.CreateString(name());
454
455 timing::Report::Builder report_builder(fbb);
456 report_builder.add_name(name_offset);
457 report_builder.add_pid(GetTid());
458 if (timer_offsets.size() > 0) {
459 report_builder.add_timers(timers_offset);
460 }
461 if (phased_loop_offsets.size() > 0) {
462 report_builder.add_phased_loops(phased_loops_offset);
463 }
464 if (watcher_offsets.size() > 0) {
465 report_builder.add_watchers(watchers_offset);
466 }
467 if (sender_offsets.size() > 0) {
468 report_builder.add_senders(senders_offset);
469 }
470 if (fetcher_offsets.size() > 0) {
471 report_builder.add_fetchers(fetchers_offset);
472 }
milind1f1dca32021-07-03 13:50:07 -0700473 report_builder.add_send_failures(timing_report_failure_counter_.failures());
Austin Schuh39788ff2019-12-01 18:22:57 -0800474 fbb.Finish(report_builder.Finish());
475
476 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
477
478 // Now that the pointers are stable, pass them to the timers and watchers to
479 // be updated.
480 for (size_t i = 0; i < timers_.size(); ++i) {
481 timers_[i]->timing_.set_timing_report(
482 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
483 i));
484 }
485
486 for (size_t i = 0; i < phased_loops_.size(); ++i) {
487 phased_loops_[i]->timing_.set_timing_report(
488 timing_report_.mutable_message()
489 ->mutable_phased_loops()
490 ->GetMutableObject(i));
491 }
492
493 for (size_t i = 0; i < watchers_.size(); ++i) {
494 watchers_[i]->set_timing_report(
495 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
496 i));
497 }
498
499 for (size_t i = 0; i < senders_.size(); ++i) {
500 senders_[i]->timing_.set_timing_report(
501 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
502 i));
503 }
504
505 for (size_t i = 0; i < fetchers_.size(); ++i) {
506 fetchers_[i]->timing_.set_timing_report(
507 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
508 i));
509 }
510}
511
512void EventLoop::MaybeScheduleTimingReports() {
513 if (FLAGS_timing_reports && !skip_timing_report_) {
514 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
515 // Make a raw sender for the report.
516 const Channel *channel = configuration::GetChannel(
517 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800518 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700519 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
520 "\"type\": \"aos.timing.Report\"} on node "
521 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800522
523 // Since we are using a RawSender, validity isn't checked. So check it
524 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800525 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
526 LOG(FATAL) << "Channel { \"name\": \"/aos"
527 << channel->name()->string_view() << "\", \"type\": \""
528 << channel->type()->string_view()
529 << "\" } is not able to be sent on this node. Check your "
530 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800531 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800532 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
533 << timing::Report::GetFullyQualifiedName()
534 << "\" } not found in config.";
535 timing_report_sender_ = MakeRawSender(channel);
536
537 // Register a handler which sends the report out by copying the raw data
538 // from the prebuilt and subsequently modified report.
539 TimerHandler *timing_reports_timer =
540 AddTimer([this]() { SendTimingReport(); });
541
542 // Set it up to send once per second.
543 timing_reports_timer->set_name("timing_reports");
544 OnRun([this, timing_reports_timer]() {
545 timing_reports_timer->Setup(
546 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
547 std::chrono::milliseconds(FLAGS_timing_report_ms));
548 });
549
550 UpdateTimingReport();
551 }
552}
553
Austin Schuh7d87b672019-12-01 20:23:49 -0800554void EventLoop::ReserveEvents() {
555 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
556}
557
558namespace {
559bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700560 if (first->event_time() > second->event_time()) {
561 return true;
562 }
563 if (first->event_time() < second->event_time()) {
564 return false;
565 }
566 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800567}
568} // namespace
569
570void EventLoop::AddEvent(EventLoopEvent *event) {
571 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700572 DCHECK(event->generation() == 0);
573 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800574 events_.push_back(event);
575 std::push_heap(events_.begin(), events_.end(), CompareEvents);
576}
577
578void EventLoop::RemoveEvent(EventLoopEvent *event) {
579 auto e = std::find(events_.begin(), events_.end(), event);
580 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700581 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800582 events_.erase(e);
583 std::make_heap(events_.begin(), events_.end(), CompareEvents);
584 event->Invalidate();
585 }
586}
587
588EventLoopEvent *EventLoop::PopEvent() {
589 EventLoopEvent *result = events_.front();
590 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
591 events_.pop_back();
592 result->Invalidate();
593 return result;
594}
595
Austin Schuh0debde12022-08-17 16:25:17 -0700596void EventLoop::ClearContext() {
597 context_.monotonic_event_time = monotonic_clock::min_time;
598 context_.monotonic_remote_time = monotonic_clock::min_time;
599 context_.realtime_event_time = realtime_clock::min_time;
600 context_.realtime_remote_time = realtime_clock::min_time;
601 context_.queue_index = 0xffffffffu;
602 context_.remote_queue_index = 0xffffffffu;
603 context_.size = 0u;
604 context_.data = nullptr;
605 context_.buffer_index = -1;
606 context_.source_boot_uuid = boot_uuid();
607}
608
Austin Schuha9012be2021-07-21 15:19:11 -0700609void EventLoop::SetTimerContext(
610 monotonic_clock::time_point monotonic_event_time) {
611 context_.monotonic_event_time = monotonic_event_time;
612 context_.monotonic_remote_time = monotonic_clock::min_time;
613 context_.realtime_event_time = realtime_clock::min_time;
614 context_.realtime_remote_time = realtime_clock::min_time;
615 context_.queue_index = 0xffffffffu;
Austin Schuh0debde12022-08-17 16:25:17 -0700616 context_.remote_queue_index = 0xffffffffu;
Austin Schuha9012be2021-07-21 15:19:11 -0700617 context_.size = 0u;
618 context_.data = nullptr;
619 context_.buffer_index = -1;
620 context_.source_boot_uuid = boot_uuid();
621}
622
Austin Schuh070019a2022-12-20 22:23:09 -0800623cpu_set_t EventLoop::DefaultAffinity() { return aos::DefaultAffinity(); }
624
Austin Schuh39788ff2019-12-01 18:22:57 -0800625void WatcherState::set_timing_report(timing::Watcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800626 watcher_ = watcher;
Brian Silvermanbf889922021-11-10 12:41:57 -0800627 if (!watcher) {
628 wakeup_latency_.set_statistic(nullptr);
629 handler_time_.set_statistic(nullptr);
630 } else {
631 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
632 handler_time_.set_statistic(watcher->mutable_handler_time());
633 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800634}
635
636void WatcherState::ResetReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800637 if (!watcher_) {
638 return;
639 }
640
Austin Schuh39788ff2019-12-01 18:22:57 -0800641 wakeup_latency_.Reset();
642 handler_time_.Reset();
643 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800644}
645
646} // namespace aos