blob: 2d932ccd81bbe2fa4a88af0a4dc05c2839b2cd03 [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh070019a2022-12-20 22:23:09 -08006#include "aos/realtime.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08007#include "glog/logging.h"
8
Austin Schuh39788ff2019-12-01 18:22:57 -08009DEFINE_bool(timing_reports, true, "Publish timing reports.");
10DEFINE_int32(timing_report_ms, 1000,
11 "Period in milliseconds to publish timing reports at.");
12
Austin Schuh54cf95f2019-11-29 13:14:18 -080013namespace aos {
Austin Schuhd54780b2020-10-03 16:26:02 -070014namespace {
15void CheckAlignment(const Channel *channel) {
16 if (channel->max_size() % alignof(flatbuffers::largest_scalar_t) != 0) {
17 LOG(FATAL) << "max_size() (" << channel->max_size()
18 << ") is not a multiple of alignment ("
19 << alignof(flatbuffers::largest_scalar_t) << ") for channel "
20 << configuration::CleanedChannelToString(channel) << ".";
21 }
22}
milind1f1dca32021-07-03 13:50:07 -070023
24std::string_view ErrorToString(const RawSender::Error err) {
25 switch (err) {
26 case RawSender::Error::kOk:
27 return "RawSender::Error::kOk";
28 case RawSender::Error::kMessagesSentTooFast:
29 return "RawSender::Error::kMessagesSentTooFast";
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070030 case RawSender::Error::kInvalidRedzone:
31 return "RawSender::Error::kInvalidRedzone";
milind1f1dca32021-07-03 13:50:07 -070032 }
33 LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
34}
Austin Schuhd54780b2020-10-03 16:26:02 -070035} // namespace
Austin Schuh54cf95f2019-11-29 13:14:18 -080036
Austin Schuhe0ab4de2023-05-03 08:05:08 -070037std::pair<SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(size_t size) {
38 AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
39 malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
40
41 absl::Span<uint8_t> mutable_span(
42 reinterpret_cast<uint8_t *>(RoundChannelData(span->data(), size)), size);
43 // Use the placement new operator to construct an actual absl::Span in place.
44 new (span) AlignedOwningSpan(mutable_span);
45
46 return std::make_pair(
47 SharedSpan(std::shared_ptr<AlignedOwningSpan>(span,
48 [](AlignedOwningSpan *s) {
49 s->~AlignedOwningSpan();
50 free(s);
51 }),
52 &span->span),
53 mutable_span);
54}
55
milind1f1dca32021-07-03 13:50:07 -070056std::ostream &operator<<(std::ostream &os, const RawSender::Error err) {
57 os << ErrorToString(err);
58 return os;
59}
60
61void RawSender::CheckOk(const RawSender::Error err) {
62 CHECK_EQ(err, Error::kOk) << "Messages were sent too fast on channel: "
63 << configuration::CleanedChannelToString(channel_);
64}
65
Austin Schuh39788ff2019-12-01 18:22:57 -080066RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
67 : event_loop_(event_loop),
68 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050069 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080070 timing_(event_loop_->ChannelIndex(channel)) {
71 event_loop_->NewSender(this);
72}
73
74RawSender::~RawSender() { event_loop_->DeleteSender(this); }
75
milind1f1dca32021-07-03 13:50:07 -070076RawSender::Error RawSender::DoSend(
77 const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
78 realtime_clock::time_point realtime_remote_time,
79 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070080 return DoSend(data->data(), data->size(), monotonic_remote_time,
81 realtime_remote_time, remote_queue_index, source_boot_uuid);
82}
83
James Kuszmaul93abac12022-04-14 15:05:10 -070084void RawSender::RecordSendResult(const Error error, size_t message_size) {
85 switch (error) {
86 case Error::kOk: {
87 if (timing_.sender) {
88 timing_.size.Add(message_size);
89 timing_.sender->mutate_count(timing_.sender->count() + 1);
90 }
91 break;
92 }
93 case Error::kMessagesSentTooFast:
94 timing_.IncrementError(timing::SendError::MESSAGE_SENT_TOO_FAST);
95 break;
96 case Error::kInvalidRedzone:
97 timing_.IncrementError(timing::SendError::INVALID_REDZONE);
98 break;
99 }
100}
101
Austin Schuh39788ff2019-12-01 18:22:57 -0800102RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
103 : event_loop_(event_loop),
104 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -0500105 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -0800106 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -0800107 context_.monotonic_event_time = monotonic_clock::min_time;
108 context_.monotonic_remote_time = monotonic_clock::min_time;
109 context_.realtime_event_time = realtime_clock::min_time;
110 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -0800111 context_.queue_index = 0xffffffff;
Austin Schuh0debde12022-08-17 16:25:17 -0700112 context_.remote_queue_index = 0xffffffffu;
Austin Schuh39788ff2019-12-01 18:22:57 -0800113 context_.size = 0;
114 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -0700115 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -0800116 event_loop_->NewFetcher(this);
117}
118
119RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
120
121TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
122 : event_loop_(event_loop), fn_(std::move(fn)) {}
123
124TimerHandler::~TimerHandler() {}
125
126PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
127 std::function<void(int)> fn,
128 const monotonic_clock::duration interval,
129 const monotonic_clock::duration offset)
130 : event_loop_(event_loop),
131 fn_(std::move(fn)),
132 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
133 event_loop_->OnRun([this]() {
134 const monotonic_clock::time_point monotonic_now =
135 event_loop_->monotonic_now();
136 phased_loop_.Reset(monotonic_now);
James Kuszmaul20dcc7c2023-01-20 11:06:31 -0800137 Reschedule(monotonic_now);
Milind Upadhyay42589bb2021-05-19 20:05:16 -0700138 // Reschedule here will count cycles elapsed before now, and then the
139 // reschedule before running the handler will count the time that elapsed
140 // then. So clear the count here.
Austin Schuh39788ff2019-12-01 18:22:57 -0800141 cycles_elapsed_ = 0;
142 });
143}
144
145PhasedLoopHandler::~PhasedLoopHandler() {}
146
Austin Schuh83c7f702021-01-19 22:36:29 -0800147EventLoop::EventLoop(const Configuration *configuration)
148 : timing_report_(flatbuffers::DetachedBuffer()),
Austin Schuh56196432020-10-24 20:15:21 -0700149 configuration_(configuration) {}
Tyler Chatow67ddb032020-01-12 14:30:04 -0800150
Austin Schuh39788ff2019-12-01 18:22:57 -0800151EventLoop::~EventLoop() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800152 if (!senders_.empty()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700153 for (const RawSender *sender : senders_) {
154 LOG(ERROR) << " Sender "
155 << configuration::StrippedChannelToString(sender->channel())
156 << " still open";
157 }
158 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800159 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -0800160 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -0800161}
162
Brian Silvermanbf889922021-11-10 12:41:57 -0800163void EventLoop::SkipTimingReport() {
164 skip_timing_report_ = true;
165 timing_report_ = flatbuffers::DetachedBuffer();
166
167 for (size_t i = 0; i < timers_.size(); ++i) {
168 timers_[i]->timing_.set_timing_report(nullptr);
169 }
170
171 for (size_t i = 0; i < phased_loops_.size(); ++i) {
172 phased_loops_[i]->timing_.set_timing_report(nullptr);
173 }
174
175 for (size_t i = 0; i < watchers_.size(); ++i) {
176 watchers_[i]->set_timing_report(nullptr);
177 }
178
179 for (size_t i = 0; i < senders_.size(); ++i) {
180 senders_[i]->timing_.set_timing_report(nullptr);
181 }
182
183 for (size_t i = 0; i < fetchers_.size(); ++i) {
184 fetchers_[i]->timing_.set_timing_report(nullptr);
185 }
186}
187
Austin Schuh39788ff2019-12-01 18:22:57 -0800188int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -0800189 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800190}
191
Brian Silverman5120afb2020-01-31 17:44:35 -0800192WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
193 const int channel_index = ChannelIndex(channel);
194 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
195 if (watcher->channel_index() == channel_index) {
196 return watcher.get();
197 }
198 }
199 LOG(FATAL) << "No watcher found for channel";
200}
201
Austin Schuh39788ff2019-12-01 18:22:57 -0800202void EventLoop::NewSender(RawSender *sender) {
203 senders_.emplace_back(sender);
204 UpdateTimingReport();
205}
206void EventLoop::DeleteSender(RawSender *sender) {
207 CHECK(!is_running());
208 auto s = std::find(senders_.begin(), senders_.end(), sender);
209 CHECK(s != senders_.end()) << ": Sender not in senders list";
210 senders_.erase(s);
211 UpdateTimingReport();
212}
213
214TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
215 timers_.emplace_back(std::move(timer));
216 UpdateTimingReport();
217 return timers_.back().get();
218}
219
220PhasedLoopHandler *EventLoop::NewPhasedLoop(
221 std::unique_ptr<PhasedLoopHandler> phased_loop) {
222 phased_loops_.emplace_back(std::move(phased_loop));
223 UpdateTimingReport();
224 return phased_loops_.back().get();
225}
226
227void EventLoop::NewFetcher(RawFetcher *fetcher) {
Austin Schuhd54780b2020-10-03 16:26:02 -0700228 CheckAlignment(fetcher->channel());
229
Austin Schuh39788ff2019-12-01 18:22:57 -0800230 fetchers_.emplace_back(fetcher);
231 UpdateTimingReport();
232}
233
234void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
235 CHECK(!is_running());
236 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
237 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
238 fetchers_.erase(f);
239 UpdateTimingReport();
240}
241
242WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
243 watchers_.emplace_back(std::move(watcher));
244
245 UpdateTimingReport();
246
247 return watchers_.back().get();
248}
249
Brian Silverman0fc69932020-01-24 21:54:02 -0800250void EventLoop::TakeWatcher(const Channel *channel) {
251 CHECK(!is_running()) << ": Cannot add new objects while running.";
252 ChannelIndex(channel);
253
Austin Schuhd54780b2020-10-03 16:26:02 -0700254 CheckAlignment(channel);
255
Brian Silverman0fc69932020-01-24 21:54:02 -0800256 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800257 << ": " << configuration::CleanedChannelToString(channel)
milind-u5dbdba42023-02-04 17:48:43 -0800258 << " is already being used for sending. Can't make a watcher on the "
259 "same event loop.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800260
261 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800262 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800263 << " is already being used.";
264
265 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800266 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800267 << " is not able to be watched on this node. Check your "
268 "configuration.";
269 }
270}
271
272void EventLoop::TakeSender(const Channel *channel) {
273 CHECK(!is_running()) << ": Cannot add new objects while running.";
274 ChannelIndex(channel);
275
Austin Schuhd54780b2020-10-03 16:26:02 -0700276 CheckAlignment(channel);
277
Brian Silverman0fc69932020-01-24 21:54:02 -0800278 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800279 << ": Channel " << configuration::CleanedChannelToString(channel)
280 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800281
282 // We don't care if this is a duplicate.
283 taken_senders_.insert(channel);
284}
285
Austin Schuh39788ff2019-12-01 18:22:57 -0800286void EventLoop::SendTimingReport() {
Brian Silvermance418d02021-11-03 11:25:52 -0700287 if (!timing_report_sender_) {
288 // Timing reports are disabled, so nothing for us to do.
289 return;
290 }
291
Austin Schuh39788ff2019-12-01 18:22:57 -0800292 // We need to do a fancy dance here to get all the accounting to work right.
293 // We want to copy the memory here, but then send after resetting. Otherwise
294 // the send for the timing report won't be counted in the timing report.
295 //
296 // Also, flatbuffers build from the back end. So place this at the back end
297 // of the buffer. We only have to care because we are using this in a very
298 // raw fashion.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800299 CHECK_LE(timing_report_.span().size(), timing_report_sender_->size())
Austin Schuh39788ff2019-12-01 18:22:57 -0800300 << ": Timing report bigger than the sender size.";
Austin Schuhadd6eb32020-11-09 21:24:26 -0800301 std::copy(timing_report_.span().data(),
302 timing_report_.span().data() + timing_report_.span().size(),
Austin Schuh39788ff2019-12-01 18:22:57 -0800303 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
Austin Schuhadd6eb32020-11-09 21:24:26 -0800304 timing_report_sender_->size() - timing_report_.span().size());
Austin Schuh39788ff2019-12-01 18:22:57 -0800305
306 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
307 timer->timing_.ResetTimingReport();
308 }
309 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
310 watcher->ResetReport();
311 }
312 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
313 phased_loop->timing_.ResetTimingReport();
314 }
315 for (RawSender *sender : senders_) {
316 sender->timing_.ResetTimingReport();
317 }
318 for (RawFetcher *fetcher : fetchers_) {
319 fetcher->timing_.ResetTimingReport();
320 }
milind1f1dca32021-07-03 13:50:07 -0700321 // TODO(milind): If we fail to send, we don't want to reset the timing report.
322 // We would need to move the reset after the send, and then find the correct
323 // timing report and set the reports with it instead of letting the sender do
324 // this. If we failed to send, we wouldn't reset or set the reports, so they
325 // can accumalate until the next send.
326 timing_report_failure_counter_.Count(
327 timing_report_sender_->Send(timing_report_.span().size()));
Austin Schuh39788ff2019-12-01 18:22:57 -0800328}
329
330void EventLoop::UpdateTimingReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800331 if (skip_timing_report_) {
332 return;
333 }
334
Austin Schuh39788ff2019-12-01 18:22:57 -0800335 // We need to support senders and fetchers changing while we are setting up
336 // the event loop. Otherwise we can't fetch or send until the loop runs. This
337 // means that on each change, we need to redo all this work. This makes setup
338 // more expensive, but not by all that much on a modern processor.
339
340 // Now, build up a report with everything pre-filled out.
341 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800342 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800343
344 // Pre-fill in the defaults for timers.
345 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
346 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
347 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
348 timing::CreateStatistic(fbb);
349 flatbuffers::Offset<timing::Statistic> handler_time_offset =
350 timing::CreateStatistic(fbb);
351 flatbuffers::Offset<flatbuffers::String> name_offset;
352 if (timer->name().size() != 0) {
353 name_offset = fbb.CreateString(timer->name());
354 }
355
356 timing::Timer::Builder timer_builder(fbb);
357
358 if (timer->name().size() != 0) {
359 timer_builder.add_name(name_offset);
360 }
361 timer_builder.add_wakeup_latency(wakeup_latency_offset);
362 timer_builder.add_handler_time(handler_time_offset);
363 timer_builder.add_count(0);
364 timer_offsets.emplace_back(timer_builder.Finish());
365 }
366
367 // Pre-fill in the defaults for phased_loops.
368 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
369 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
370 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
371 timing::CreateStatistic(fbb);
372 flatbuffers::Offset<timing::Statistic> handler_time_offset =
373 timing::CreateStatistic(fbb);
374 flatbuffers::Offset<flatbuffers::String> name_offset;
375 if (phased_loop->name().size() != 0) {
376 name_offset = fbb.CreateString(phased_loop->name());
377 }
378
379 timing::Timer::Builder timer_builder(fbb);
380
381 if (phased_loop->name().size() != 0) {
382 timer_builder.add_name(name_offset);
383 }
384 timer_builder.add_wakeup_latency(wakeup_latency_offset);
385 timer_builder.add_handler_time(handler_time_offset);
386 timer_builder.add_count(0);
387 phased_loop_offsets.emplace_back(timer_builder.Finish());
388 }
389
390 // Pre-fill in the defaults for watchers.
391 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
392 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
393 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
394 timing::CreateStatistic(fbb);
395 flatbuffers::Offset<timing::Statistic> handler_time_offset =
396 timing::CreateStatistic(fbb);
397
398 timing::Watcher::Builder watcher_builder(fbb);
399
400 watcher_builder.add_channel_index(watcher->channel_index());
401 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
402 watcher_builder.add_handler_time(handler_time_offset);
403 watcher_builder.add_count(0);
404 watcher_offsets.emplace_back(watcher_builder.Finish());
405 }
406
407 // Pre-fill in the defaults for senders.
408 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
James Kuszmaulcc94ed42022-08-24 11:36:17 -0700409 for (RawSender *sender : senders_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800410 flatbuffers::Offset<timing::Statistic> size_offset =
411 timing::CreateStatistic(fbb);
412
James Kuszmaul78514332022-04-06 15:08:34 -0700413 const flatbuffers::Offset<
414 flatbuffers::Vector<flatbuffers::Offset<timing::SendErrorCount>>>
James Kuszmaulcc94ed42022-08-24 11:36:17 -0700415 error_counts_offset = sender->timing_.error_counter.Initialize(&fbb);
James Kuszmaul78514332022-04-06 15:08:34 -0700416
Austin Schuh39788ff2019-12-01 18:22:57 -0800417 timing::Sender::Builder sender_builder(fbb);
418
419 sender_builder.add_channel_index(sender->timing_.channel_index);
420 sender_builder.add_size(size_offset);
James Kuszmaul78514332022-04-06 15:08:34 -0700421 sender_builder.add_error_counts(error_counts_offset);
Austin Schuh39788ff2019-12-01 18:22:57 -0800422 sender_builder.add_count(0);
423 sender_offsets.emplace_back(sender_builder.Finish());
424 }
425
426 // Pre-fill in the defaults for fetchers.
427 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
428 for (RawFetcher *fetcher : fetchers_) {
429 flatbuffers::Offset<timing::Statistic> latency_offset =
430 timing::CreateStatistic(fbb);
431
432 timing::Fetcher::Builder fetcher_builder(fbb);
433
434 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
435 fetcher_builder.add_count(0);
436 fetcher_builder.add_latency(latency_offset);
437 fetcher_offsets.emplace_back(fetcher_builder.Finish());
438 }
439
440 // Then build the final report.
441 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
442 timers_offset;
443 if (timer_offsets.size() > 0) {
444 timers_offset = fbb.CreateVector(timer_offsets);
445 }
446
447 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
448 phased_loops_offset;
449 if (phased_loop_offsets.size() > 0) {
450 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
451 }
452
453 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
454 watchers_offset;
455 if (watcher_offsets.size() > 0) {
456 watchers_offset = fbb.CreateVector(watcher_offsets);
457 }
458
459 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
460 senders_offset;
461 if (sender_offsets.size() > 0) {
462 senders_offset = fbb.CreateVector(sender_offsets);
463 }
464
465 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
466 fetchers_offset;
467 if (fetcher_offsets.size() > 0) {
468 fetchers_offset = fbb.CreateVector(fetcher_offsets);
469 }
470
471 flatbuffers::Offset<flatbuffers::String> name_offset =
472 fbb.CreateString(name());
473
474 timing::Report::Builder report_builder(fbb);
475 report_builder.add_name(name_offset);
476 report_builder.add_pid(GetTid());
477 if (timer_offsets.size() > 0) {
478 report_builder.add_timers(timers_offset);
479 }
480 if (phased_loop_offsets.size() > 0) {
481 report_builder.add_phased_loops(phased_loops_offset);
482 }
483 if (watcher_offsets.size() > 0) {
484 report_builder.add_watchers(watchers_offset);
485 }
486 if (sender_offsets.size() > 0) {
487 report_builder.add_senders(senders_offset);
488 }
489 if (fetcher_offsets.size() > 0) {
490 report_builder.add_fetchers(fetchers_offset);
491 }
milind1f1dca32021-07-03 13:50:07 -0700492 report_builder.add_send_failures(timing_report_failure_counter_.failures());
Austin Schuh39788ff2019-12-01 18:22:57 -0800493 fbb.Finish(report_builder.Finish());
494
495 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
496
497 // Now that the pointers are stable, pass them to the timers and watchers to
498 // be updated.
499 for (size_t i = 0; i < timers_.size(); ++i) {
500 timers_[i]->timing_.set_timing_report(
501 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
502 i));
503 }
504
505 for (size_t i = 0; i < phased_loops_.size(); ++i) {
506 phased_loops_[i]->timing_.set_timing_report(
507 timing_report_.mutable_message()
508 ->mutable_phased_loops()
509 ->GetMutableObject(i));
510 }
511
512 for (size_t i = 0; i < watchers_.size(); ++i) {
513 watchers_[i]->set_timing_report(
514 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
515 i));
516 }
517
518 for (size_t i = 0; i < senders_.size(); ++i) {
519 senders_[i]->timing_.set_timing_report(
520 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
521 i));
522 }
523
524 for (size_t i = 0; i < fetchers_.size(); ++i) {
525 fetchers_[i]->timing_.set_timing_report(
526 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
527 i));
528 }
529}
530
531void EventLoop::MaybeScheduleTimingReports() {
532 if (FLAGS_timing_reports && !skip_timing_report_) {
533 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
534 // Make a raw sender for the report.
535 const Channel *channel = configuration::GetChannel(
536 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800537 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700538 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
539 "\"type\": \"aos.timing.Report\"} on node "
540 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800541
542 // Since we are using a RawSender, validity isn't checked. So check it
543 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800544 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
545 LOG(FATAL) << "Channel { \"name\": \"/aos"
546 << channel->name()->string_view() << "\", \"type\": \""
547 << channel->type()->string_view()
548 << "\" } is not able to be sent on this node. Check your "
549 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800550 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800551 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
552 << timing::Report::GetFullyQualifiedName()
553 << "\" } not found in config.";
554 timing_report_sender_ = MakeRawSender(channel);
555
556 // Register a handler which sends the report out by copying the raw data
557 // from the prebuilt and subsequently modified report.
558 TimerHandler *timing_reports_timer =
559 AddTimer([this]() { SendTimingReport(); });
560
561 // Set it up to send once per second.
562 timing_reports_timer->set_name("timing_reports");
563 OnRun([this, timing_reports_timer]() {
564 timing_reports_timer->Setup(
565 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
566 std::chrono::milliseconds(FLAGS_timing_report_ms));
567 });
568
569 UpdateTimingReport();
570 }
571}
572
Austin Schuh7d87b672019-12-01 20:23:49 -0800573void EventLoop::ReserveEvents() {
574 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
575}
576
577namespace {
578bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700579 if (first->event_time() > second->event_time()) {
580 return true;
581 }
582 if (first->event_time() < second->event_time()) {
583 return false;
584 }
585 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800586}
587} // namespace
588
589void EventLoop::AddEvent(EventLoopEvent *event) {
590 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700591 DCHECK(event->generation() == 0);
592 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800593 events_.push_back(event);
594 std::push_heap(events_.begin(), events_.end(), CompareEvents);
595}
596
597void EventLoop::RemoveEvent(EventLoopEvent *event) {
598 auto e = std::find(events_.begin(), events_.end(), event);
599 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700600 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800601 events_.erase(e);
602 std::make_heap(events_.begin(), events_.end(), CompareEvents);
603 event->Invalidate();
604 }
605}
606
607EventLoopEvent *EventLoop::PopEvent() {
608 EventLoopEvent *result = events_.front();
609 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
610 events_.pop_back();
611 result->Invalidate();
612 return result;
613}
614
Austin Schuh0debde12022-08-17 16:25:17 -0700615void EventLoop::ClearContext() {
616 context_.monotonic_event_time = monotonic_clock::min_time;
617 context_.monotonic_remote_time = monotonic_clock::min_time;
618 context_.realtime_event_time = realtime_clock::min_time;
619 context_.realtime_remote_time = realtime_clock::min_time;
620 context_.queue_index = 0xffffffffu;
621 context_.remote_queue_index = 0xffffffffu;
622 context_.size = 0u;
623 context_.data = nullptr;
624 context_.buffer_index = -1;
625 context_.source_boot_uuid = boot_uuid();
626}
627
Austin Schuha9012be2021-07-21 15:19:11 -0700628void EventLoop::SetTimerContext(
629 monotonic_clock::time_point monotonic_event_time) {
630 context_.monotonic_event_time = monotonic_event_time;
631 context_.monotonic_remote_time = monotonic_clock::min_time;
632 context_.realtime_event_time = realtime_clock::min_time;
633 context_.realtime_remote_time = realtime_clock::min_time;
634 context_.queue_index = 0xffffffffu;
Austin Schuh0debde12022-08-17 16:25:17 -0700635 context_.remote_queue_index = 0xffffffffu;
Austin Schuha9012be2021-07-21 15:19:11 -0700636 context_.size = 0u;
637 context_.data = nullptr;
638 context_.buffer_index = -1;
639 context_.source_boot_uuid = boot_uuid();
640}
641
Austin Schuh070019a2022-12-20 22:23:09 -0800642cpu_set_t EventLoop::DefaultAffinity() { return aos::DefaultAffinity(); }
643
Austin Schuh39788ff2019-12-01 18:22:57 -0800644void WatcherState::set_timing_report(timing::Watcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800645 watcher_ = watcher;
Brian Silvermanbf889922021-11-10 12:41:57 -0800646 if (!watcher) {
647 wakeup_latency_.set_statistic(nullptr);
648 handler_time_.set_statistic(nullptr);
649 } else {
650 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
651 handler_time_.set_statistic(watcher->mutable_handler_time());
652 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800653}
654
655void WatcherState::ResetReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800656 if (!watcher_) {
657 return;
658 }
659
Austin Schuh39788ff2019-12-01 18:22:57 -0800660 wakeup_latency_.Reset();
661 handler_time_.Reset();
662 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800663}
664
665} // namespace aos