blob: e84763ee4f5a86c044b8551142e199f249d02b1b [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "glog/logging.h"
4
Austin Schuh54cf95f2019-11-29 13:14:18 -08005#include "aos/configuration.h"
6#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08007#include "aos/logging/implementations.h"
Austin Schuh070019a2022-12-20 22:23:09 -08008#include "aos/realtime.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08009
Austin Schuh39788ff2019-12-01 18:22:57 -080010DEFINE_bool(timing_reports, true, "Publish timing reports.");
11DEFINE_int32(timing_report_ms, 1000,
12 "Period in milliseconds to publish timing reports at.");
13
Austin Schuh54cf95f2019-11-29 13:14:18 -080014namespace aos {
Austin Schuhd54780b2020-10-03 16:26:02 -070015namespace {
16void CheckAlignment(const Channel *channel) {
17 if (channel->max_size() % alignof(flatbuffers::largest_scalar_t) != 0) {
18 LOG(FATAL) << "max_size() (" << channel->max_size()
19 << ") is not a multiple of alignment ("
20 << alignof(flatbuffers::largest_scalar_t) << ") for channel "
21 << configuration::CleanedChannelToString(channel) << ".";
22 }
23}
milind1f1dca32021-07-03 13:50:07 -070024
25std::string_view ErrorToString(const RawSender::Error err) {
26 switch (err) {
27 case RawSender::Error::kOk:
28 return "RawSender::Error::kOk";
29 case RawSender::Error::kMessagesSentTooFast:
30 return "RawSender::Error::kMessagesSentTooFast";
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070031 case RawSender::Error::kInvalidRedzone:
32 return "RawSender::Error::kInvalidRedzone";
milind1f1dca32021-07-03 13:50:07 -070033 }
34 LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
35}
Austin Schuhd54780b2020-10-03 16:26:02 -070036} // namespace
Austin Schuh54cf95f2019-11-29 13:14:18 -080037
Austin Schuhe0ab4de2023-05-03 08:05:08 -070038std::pair<SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(size_t size) {
39 AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
40 malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
41
42 absl::Span<uint8_t> mutable_span(
43 reinterpret_cast<uint8_t *>(RoundChannelData(span->data(), size)), size);
44 // Use the placement new operator to construct an actual absl::Span in place.
45 new (span) AlignedOwningSpan(mutable_span);
46
47 return std::make_pair(
48 SharedSpan(std::shared_ptr<AlignedOwningSpan>(span,
49 [](AlignedOwningSpan *s) {
50 s->~AlignedOwningSpan();
51 free(s);
52 }),
53 &span->span),
54 mutable_span);
55}
56
milind1f1dca32021-07-03 13:50:07 -070057std::ostream &operator<<(std::ostream &os, const RawSender::Error err) {
58 os << ErrorToString(err);
59 return os;
60}
61
62void RawSender::CheckOk(const RawSender::Error err) {
63 CHECK_EQ(err, Error::kOk) << "Messages were sent too fast on channel: "
64 << configuration::CleanedChannelToString(channel_);
65}
66
Austin Schuh39788ff2019-12-01 18:22:57 -080067RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
68 : event_loop_(event_loop),
69 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050070 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080071 timing_(event_loop_->ChannelIndex(channel)) {
72 event_loop_->NewSender(this);
73}
74
75RawSender::~RawSender() { event_loop_->DeleteSender(this); }
76
milind1f1dca32021-07-03 13:50:07 -070077RawSender::Error RawSender::DoSend(
78 const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
79 realtime_clock::time_point realtime_remote_time,
80 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070081 return DoSend(data->data(), data->size(), monotonic_remote_time,
82 realtime_remote_time, remote_queue_index, source_boot_uuid);
83}
84
James Kuszmaul93abac12022-04-14 15:05:10 -070085void RawSender::RecordSendResult(const Error error, size_t message_size) {
86 switch (error) {
87 case Error::kOk: {
88 if (timing_.sender) {
89 timing_.size.Add(message_size);
90 timing_.sender->mutate_count(timing_.sender->count() + 1);
91 }
92 break;
93 }
94 case Error::kMessagesSentTooFast:
95 timing_.IncrementError(timing::SendError::MESSAGE_SENT_TOO_FAST);
96 break;
97 case Error::kInvalidRedzone:
98 timing_.IncrementError(timing::SendError::INVALID_REDZONE);
99 break;
100 }
101}
102
Austin Schuh39788ff2019-12-01 18:22:57 -0800103RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
104 : event_loop_(event_loop),
105 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -0500106 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -0800107 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -0800108 context_.monotonic_event_time = monotonic_clock::min_time;
109 context_.monotonic_remote_time = monotonic_clock::min_time;
110 context_.realtime_event_time = realtime_clock::min_time;
111 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -0800112 context_.queue_index = 0xffffffff;
Austin Schuh0debde12022-08-17 16:25:17 -0700113 context_.remote_queue_index = 0xffffffffu;
Austin Schuh39788ff2019-12-01 18:22:57 -0800114 context_.size = 0;
115 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -0700116 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -0800117 event_loop_->NewFetcher(this);
118}
119
120RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
121
122TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
123 : event_loop_(event_loop), fn_(std::move(fn)) {}
124
125TimerHandler::~TimerHandler() {}
126
127PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
128 std::function<void(int)> fn,
129 const monotonic_clock::duration interval,
130 const monotonic_clock::duration offset)
131 : event_loop_(event_loop),
132 fn_(std::move(fn)),
133 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
134 event_loop_->OnRun([this]() {
135 const monotonic_clock::time_point monotonic_now =
136 event_loop_->monotonic_now();
137 phased_loop_.Reset(monotonic_now);
James Kuszmaul20dcc7c2023-01-20 11:06:31 -0800138 Reschedule(monotonic_now);
Milind Upadhyay42589bb2021-05-19 20:05:16 -0700139 // Reschedule here will count cycles elapsed before now, and then the
140 // reschedule before running the handler will count the time that elapsed
141 // then. So clear the count here.
Austin Schuh39788ff2019-12-01 18:22:57 -0800142 cycles_elapsed_ = 0;
143 });
144}
145
146PhasedLoopHandler::~PhasedLoopHandler() {}
147
Austin Schuh83c7f702021-01-19 22:36:29 -0800148EventLoop::EventLoop(const Configuration *configuration)
149 : timing_report_(flatbuffers::DetachedBuffer()),
Austin Schuh56196432020-10-24 20:15:21 -0700150 configuration_(configuration) {}
Tyler Chatow67ddb032020-01-12 14:30:04 -0800151
Austin Schuh39788ff2019-12-01 18:22:57 -0800152EventLoop::~EventLoop() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800153 if (!senders_.empty()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700154 for (const RawSender *sender : senders_) {
155 LOG(ERROR) << " Sender "
156 << configuration::StrippedChannelToString(sender->channel())
157 << " still open";
158 }
159 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800160 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -0800161 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -0800162}
163
Brian Silvermanbf889922021-11-10 12:41:57 -0800164void EventLoop::SkipTimingReport() {
165 skip_timing_report_ = true;
166 timing_report_ = flatbuffers::DetachedBuffer();
167
168 for (size_t i = 0; i < timers_.size(); ++i) {
169 timers_[i]->timing_.set_timing_report(nullptr);
170 }
171
172 for (size_t i = 0; i < phased_loops_.size(); ++i) {
173 phased_loops_[i]->timing_.set_timing_report(nullptr);
174 }
175
176 for (size_t i = 0; i < watchers_.size(); ++i) {
177 watchers_[i]->set_timing_report(nullptr);
178 }
179
180 for (size_t i = 0; i < senders_.size(); ++i) {
181 senders_[i]->timing_.set_timing_report(nullptr);
182 }
183
184 for (size_t i = 0; i < fetchers_.size(); ++i) {
185 fetchers_[i]->timing_.set_timing_report(nullptr);
186 }
187}
188
Austin Schuh39788ff2019-12-01 18:22:57 -0800189int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -0800190 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800191}
192
Brian Silverman5120afb2020-01-31 17:44:35 -0800193WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
194 const int channel_index = ChannelIndex(channel);
195 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
196 if (watcher->channel_index() == channel_index) {
197 return watcher.get();
198 }
199 }
200 LOG(FATAL) << "No watcher found for channel";
201}
202
Austin Schuh39788ff2019-12-01 18:22:57 -0800203void EventLoop::NewSender(RawSender *sender) {
204 senders_.emplace_back(sender);
205 UpdateTimingReport();
206}
207void EventLoop::DeleteSender(RawSender *sender) {
208 CHECK(!is_running());
209 auto s = std::find(senders_.begin(), senders_.end(), sender);
210 CHECK(s != senders_.end()) << ": Sender not in senders list";
211 senders_.erase(s);
212 UpdateTimingReport();
213}
214
215TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
216 timers_.emplace_back(std::move(timer));
217 UpdateTimingReport();
218 return timers_.back().get();
219}
220
221PhasedLoopHandler *EventLoop::NewPhasedLoop(
222 std::unique_ptr<PhasedLoopHandler> phased_loop) {
223 phased_loops_.emplace_back(std::move(phased_loop));
224 UpdateTimingReport();
225 return phased_loops_.back().get();
226}
227
228void EventLoop::NewFetcher(RawFetcher *fetcher) {
Austin Schuhd54780b2020-10-03 16:26:02 -0700229 CheckAlignment(fetcher->channel());
230
Austin Schuh39788ff2019-12-01 18:22:57 -0800231 fetchers_.emplace_back(fetcher);
232 UpdateTimingReport();
233}
234
235void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
236 CHECK(!is_running());
237 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
238 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
239 fetchers_.erase(f);
240 UpdateTimingReport();
241}
242
243WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
244 watchers_.emplace_back(std::move(watcher));
245
246 UpdateTimingReport();
247
248 return watchers_.back().get();
249}
250
Brian Silverman0fc69932020-01-24 21:54:02 -0800251void EventLoop::TakeWatcher(const Channel *channel) {
252 CHECK(!is_running()) << ": Cannot add new objects while running.";
253 ChannelIndex(channel);
254
Austin Schuhd54780b2020-10-03 16:26:02 -0700255 CheckAlignment(channel);
256
Brian Silverman0fc69932020-01-24 21:54:02 -0800257 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800258 << ": " << configuration::CleanedChannelToString(channel)
milind-u5dbdba42023-02-04 17:48:43 -0800259 << " is already being used for sending. Can't make a watcher on the "
260 "same event loop.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800261
262 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800263 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800264 << " is already being used.";
265
266 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800267 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800268 << " is not able to be watched on this node. Check your "
269 "configuration.";
270 }
271}
272
273void EventLoop::TakeSender(const Channel *channel) {
274 CHECK(!is_running()) << ": Cannot add new objects while running.";
275 ChannelIndex(channel);
276
Austin Schuhd54780b2020-10-03 16:26:02 -0700277 CheckAlignment(channel);
278
Brian Silverman0fc69932020-01-24 21:54:02 -0800279 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800280 << ": Channel " << configuration::CleanedChannelToString(channel)
281 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800282
283 // We don't care if this is a duplicate.
284 taken_senders_.insert(channel);
285}
286
Austin Schuh39788ff2019-12-01 18:22:57 -0800287void EventLoop::SendTimingReport() {
Brian Silvermance418d02021-11-03 11:25:52 -0700288 if (!timing_report_sender_) {
289 // Timing reports are disabled, so nothing for us to do.
290 return;
291 }
292
Austin Schuh39788ff2019-12-01 18:22:57 -0800293 // We need to do a fancy dance here to get all the accounting to work right.
294 // We want to copy the memory here, but then send after resetting. Otherwise
295 // the send for the timing report won't be counted in the timing report.
296 //
297 // Also, flatbuffers build from the back end. So place this at the back end
298 // of the buffer. We only have to care because we are using this in a very
299 // raw fashion.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800300 CHECK_LE(timing_report_.span().size(), timing_report_sender_->size())
Austin Schuh39788ff2019-12-01 18:22:57 -0800301 << ": Timing report bigger than the sender size.";
Austin Schuhadd6eb32020-11-09 21:24:26 -0800302 std::copy(timing_report_.span().data(),
303 timing_report_.span().data() + timing_report_.span().size(),
Austin Schuh39788ff2019-12-01 18:22:57 -0800304 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
Austin Schuhadd6eb32020-11-09 21:24:26 -0800305 timing_report_sender_->size() - timing_report_.span().size());
Austin Schuh39788ff2019-12-01 18:22:57 -0800306
307 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
308 timer->timing_.ResetTimingReport();
309 }
310 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
311 watcher->ResetReport();
312 }
313 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
314 phased_loop->timing_.ResetTimingReport();
315 }
316 for (RawSender *sender : senders_) {
317 sender->timing_.ResetTimingReport();
318 }
319 for (RawFetcher *fetcher : fetchers_) {
320 fetcher->timing_.ResetTimingReport();
321 }
milind1f1dca32021-07-03 13:50:07 -0700322 // TODO(milind): If we fail to send, we don't want to reset the timing report.
323 // We would need to move the reset after the send, and then find the correct
324 // timing report and set the reports with it instead of letting the sender do
325 // this. If we failed to send, we wouldn't reset or set the reports, so they
326 // can accumalate until the next send.
327 timing_report_failure_counter_.Count(
328 timing_report_sender_->Send(timing_report_.span().size()));
Austin Schuh39788ff2019-12-01 18:22:57 -0800329}
330
331void EventLoop::UpdateTimingReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800332 if (skip_timing_report_) {
333 return;
334 }
335
Austin Schuh39788ff2019-12-01 18:22:57 -0800336 // We need to support senders and fetchers changing while we are setting up
337 // the event loop. Otherwise we can't fetch or send until the loop runs. This
338 // means that on each change, we need to redo all this work. This makes setup
339 // more expensive, but not by all that much on a modern processor.
340
341 // Now, build up a report with everything pre-filled out.
342 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800343 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800344
345 // Pre-fill in the defaults for timers.
346 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
347 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
348 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
349 timing::CreateStatistic(fbb);
350 flatbuffers::Offset<timing::Statistic> handler_time_offset =
351 timing::CreateStatistic(fbb);
352 flatbuffers::Offset<flatbuffers::String> name_offset;
353 if (timer->name().size() != 0) {
354 name_offset = fbb.CreateString(timer->name());
355 }
356
357 timing::Timer::Builder timer_builder(fbb);
358
359 if (timer->name().size() != 0) {
360 timer_builder.add_name(name_offset);
361 }
362 timer_builder.add_wakeup_latency(wakeup_latency_offset);
363 timer_builder.add_handler_time(handler_time_offset);
364 timer_builder.add_count(0);
365 timer_offsets.emplace_back(timer_builder.Finish());
366 }
367
368 // Pre-fill in the defaults for phased_loops.
369 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
370 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
371 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
372 timing::CreateStatistic(fbb);
373 flatbuffers::Offset<timing::Statistic> handler_time_offset =
374 timing::CreateStatistic(fbb);
375 flatbuffers::Offset<flatbuffers::String> name_offset;
376 if (phased_loop->name().size() != 0) {
377 name_offset = fbb.CreateString(phased_loop->name());
378 }
379
380 timing::Timer::Builder timer_builder(fbb);
381
382 if (phased_loop->name().size() != 0) {
383 timer_builder.add_name(name_offset);
384 }
385 timer_builder.add_wakeup_latency(wakeup_latency_offset);
386 timer_builder.add_handler_time(handler_time_offset);
387 timer_builder.add_count(0);
388 phased_loop_offsets.emplace_back(timer_builder.Finish());
389 }
390
391 // Pre-fill in the defaults for watchers.
392 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
393 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
394 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
395 timing::CreateStatistic(fbb);
396 flatbuffers::Offset<timing::Statistic> handler_time_offset =
397 timing::CreateStatistic(fbb);
398
399 timing::Watcher::Builder watcher_builder(fbb);
400
401 watcher_builder.add_channel_index(watcher->channel_index());
402 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
403 watcher_builder.add_handler_time(handler_time_offset);
404 watcher_builder.add_count(0);
405 watcher_offsets.emplace_back(watcher_builder.Finish());
406 }
407
408 // Pre-fill in the defaults for senders.
409 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
James Kuszmaulcc94ed42022-08-24 11:36:17 -0700410 for (RawSender *sender : senders_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800411 flatbuffers::Offset<timing::Statistic> size_offset =
412 timing::CreateStatistic(fbb);
413
James Kuszmaul78514332022-04-06 15:08:34 -0700414 const flatbuffers::Offset<
415 flatbuffers::Vector<flatbuffers::Offset<timing::SendErrorCount>>>
James Kuszmaulcc94ed42022-08-24 11:36:17 -0700416 error_counts_offset = sender->timing_.error_counter.Initialize(&fbb);
James Kuszmaul78514332022-04-06 15:08:34 -0700417
Austin Schuh39788ff2019-12-01 18:22:57 -0800418 timing::Sender::Builder sender_builder(fbb);
419
420 sender_builder.add_channel_index(sender->timing_.channel_index);
421 sender_builder.add_size(size_offset);
James Kuszmaul78514332022-04-06 15:08:34 -0700422 sender_builder.add_error_counts(error_counts_offset);
Austin Schuh39788ff2019-12-01 18:22:57 -0800423 sender_builder.add_count(0);
424 sender_offsets.emplace_back(sender_builder.Finish());
425 }
426
427 // Pre-fill in the defaults for fetchers.
428 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
429 for (RawFetcher *fetcher : fetchers_) {
430 flatbuffers::Offset<timing::Statistic> latency_offset =
431 timing::CreateStatistic(fbb);
432
433 timing::Fetcher::Builder fetcher_builder(fbb);
434
435 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
436 fetcher_builder.add_count(0);
437 fetcher_builder.add_latency(latency_offset);
438 fetcher_offsets.emplace_back(fetcher_builder.Finish());
439 }
440
441 // Then build the final report.
442 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
443 timers_offset;
444 if (timer_offsets.size() > 0) {
445 timers_offset = fbb.CreateVector(timer_offsets);
446 }
447
448 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
449 phased_loops_offset;
450 if (phased_loop_offsets.size() > 0) {
451 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
452 }
453
454 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
455 watchers_offset;
456 if (watcher_offsets.size() > 0) {
457 watchers_offset = fbb.CreateVector(watcher_offsets);
458 }
459
460 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
461 senders_offset;
462 if (sender_offsets.size() > 0) {
463 senders_offset = fbb.CreateVector(sender_offsets);
464 }
465
466 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
467 fetchers_offset;
468 if (fetcher_offsets.size() > 0) {
469 fetchers_offset = fbb.CreateVector(fetcher_offsets);
470 }
471
472 flatbuffers::Offset<flatbuffers::String> name_offset =
473 fbb.CreateString(name());
474
475 timing::Report::Builder report_builder(fbb);
476 report_builder.add_name(name_offset);
477 report_builder.add_pid(GetTid());
478 if (timer_offsets.size() > 0) {
479 report_builder.add_timers(timers_offset);
480 }
481 if (phased_loop_offsets.size() > 0) {
482 report_builder.add_phased_loops(phased_loops_offset);
483 }
484 if (watcher_offsets.size() > 0) {
485 report_builder.add_watchers(watchers_offset);
486 }
487 if (sender_offsets.size() > 0) {
488 report_builder.add_senders(senders_offset);
489 }
490 if (fetcher_offsets.size() > 0) {
491 report_builder.add_fetchers(fetchers_offset);
492 }
milind1f1dca32021-07-03 13:50:07 -0700493 report_builder.add_send_failures(timing_report_failure_counter_.failures());
Austin Schuh39788ff2019-12-01 18:22:57 -0800494 fbb.Finish(report_builder.Finish());
495
496 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
497
498 // Now that the pointers are stable, pass them to the timers and watchers to
499 // be updated.
500 for (size_t i = 0; i < timers_.size(); ++i) {
501 timers_[i]->timing_.set_timing_report(
502 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
503 i));
504 }
505
506 for (size_t i = 0; i < phased_loops_.size(); ++i) {
507 phased_loops_[i]->timing_.set_timing_report(
508 timing_report_.mutable_message()
509 ->mutable_phased_loops()
510 ->GetMutableObject(i));
511 }
512
513 for (size_t i = 0; i < watchers_.size(); ++i) {
514 watchers_[i]->set_timing_report(
515 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
516 i));
517 }
518
519 for (size_t i = 0; i < senders_.size(); ++i) {
520 senders_[i]->timing_.set_timing_report(
521 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
522 i));
523 }
524
525 for (size_t i = 0; i < fetchers_.size(); ++i) {
526 fetchers_[i]->timing_.set_timing_report(
527 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
528 i));
529 }
530}
531
532void EventLoop::MaybeScheduleTimingReports() {
533 if (FLAGS_timing_reports && !skip_timing_report_) {
534 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
535 // Make a raw sender for the report.
536 const Channel *channel = configuration::GetChannel(
537 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800538 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700539 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
540 "\"type\": \"aos.timing.Report\"} on node "
541 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800542
543 // Since we are using a RawSender, validity isn't checked. So check it
544 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800545 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
546 LOG(FATAL) << "Channel { \"name\": \"/aos"
547 << channel->name()->string_view() << "\", \"type\": \""
548 << channel->type()->string_view()
549 << "\" } is not able to be sent on this node. Check your "
550 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800551 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800552 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
553 << timing::Report::GetFullyQualifiedName()
554 << "\" } not found in config.";
555 timing_report_sender_ = MakeRawSender(channel);
556
557 // Register a handler which sends the report out by copying the raw data
558 // from the prebuilt and subsequently modified report.
559 TimerHandler *timing_reports_timer =
560 AddTimer([this]() { SendTimingReport(); });
561
562 // Set it up to send once per second.
563 timing_reports_timer->set_name("timing_reports");
564 OnRun([this, timing_reports_timer]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700565 timing_reports_timer->Schedule(
Austin Schuh39788ff2019-12-01 18:22:57 -0800566 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
567 std::chrono::milliseconds(FLAGS_timing_report_ms));
568 });
569
570 UpdateTimingReport();
571 }
572}
573
Austin Schuh7d87b672019-12-01 20:23:49 -0800574void EventLoop::ReserveEvents() {
575 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
576}
577
578namespace {
579bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700580 if (first->event_time() > second->event_time()) {
581 return true;
582 }
583 if (first->event_time() < second->event_time()) {
584 return false;
585 }
586 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800587}
588} // namespace
589
590void EventLoop::AddEvent(EventLoopEvent *event) {
591 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700592 DCHECK(event->generation() == 0);
593 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800594 events_.push_back(event);
595 std::push_heap(events_.begin(), events_.end(), CompareEvents);
596}
597
598void EventLoop::RemoveEvent(EventLoopEvent *event) {
599 auto e = std::find(events_.begin(), events_.end(), event);
600 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700601 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800602 events_.erase(e);
603 std::make_heap(events_.begin(), events_.end(), CompareEvents);
604 event->Invalidate();
605 }
606}
607
608EventLoopEvent *EventLoop::PopEvent() {
609 EventLoopEvent *result = events_.front();
610 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
611 events_.pop_back();
612 result->Invalidate();
613 return result;
614}
615
Austin Schuh0debde12022-08-17 16:25:17 -0700616void EventLoop::ClearContext() {
617 context_.monotonic_event_time = monotonic_clock::min_time;
618 context_.monotonic_remote_time = monotonic_clock::min_time;
619 context_.realtime_event_time = realtime_clock::min_time;
620 context_.realtime_remote_time = realtime_clock::min_time;
621 context_.queue_index = 0xffffffffu;
622 context_.remote_queue_index = 0xffffffffu;
623 context_.size = 0u;
624 context_.data = nullptr;
625 context_.buffer_index = -1;
626 context_.source_boot_uuid = boot_uuid();
627}
628
Austin Schuha9012be2021-07-21 15:19:11 -0700629void EventLoop::SetTimerContext(
630 monotonic_clock::time_point monotonic_event_time) {
631 context_.monotonic_event_time = monotonic_event_time;
632 context_.monotonic_remote_time = monotonic_clock::min_time;
633 context_.realtime_event_time = realtime_clock::min_time;
634 context_.realtime_remote_time = realtime_clock::min_time;
635 context_.queue_index = 0xffffffffu;
Austin Schuh0debde12022-08-17 16:25:17 -0700636 context_.remote_queue_index = 0xffffffffu;
Austin Schuha9012be2021-07-21 15:19:11 -0700637 context_.size = 0u;
638 context_.data = nullptr;
639 context_.buffer_index = -1;
640 context_.source_boot_uuid = boot_uuid();
641}
642
Austin Schuh070019a2022-12-20 22:23:09 -0800643cpu_set_t EventLoop::DefaultAffinity() { return aos::DefaultAffinity(); }
644
Austin Schuh39788ff2019-12-01 18:22:57 -0800645void WatcherState::set_timing_report(timing::Watcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800646 watcher_ = watcher;
Brian Silvermanbf889922021-11-10 12:41:57 -0800647 if (!watcher) {
648 wakeup_latency_.set_statistic(nullptr);
649 handler_time_.set_statistic(nullptr);
650 } else {
651 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
652 handler_time_.set_statistic(watcher->mutable_handler_time());
653 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800654}
655
656void WatcherState::ResetReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800657 if (!watcher_) {
658 return;
659 }
660
Austin Schuh39788ff2019-12-01 18:22:57 -0800661 wakeup_latency_.Reset();
662 handler_time_.Reset();
663 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800664}
665
666} // namespace aos