blob: ba71b975e97172bfeee36ae9ba1b167bb16e6b0a [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "glog/logging.h"
4
Austin Schuh54cf95f2019-11-29 13:14:18 -08005#include "aos/configuration.h"
6#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08007#include "aos/logging/implementations.h"
Austin Schuh070019a2022-12-20 22:23:09 -08008#include "aos/realtime.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08009
Austin Schuh39788ff2019-12-01 18:22:57 -080010DEFINE_bool(timing_reports, true, "Publish timing reports.");
11DEFINE_int32(timing_report_ms, 1000,
12 "Period in milliseconds to publish timing reports at.");
13
Austin Schuh54cf95f2019-11-29 13:14:18 -080014namespace aos {
Austin Schuhd54780b2020-10-03 16:26:02 -070015namespace {
16void CheckAlignment(const Channel *channel) {
17 if (channel->max_size() % alignof(flatbuffers::largest_scalar_t) != 0) {
18 LOG(FATAL) << "max_size() (" << channel->max_size()
19 << ") is not a multiple of alignment ("
20 << alignof(flatbuffers::largest_scalar_t) << ") for channel "
21 << configuration::CleanedChannelToString(channel) << ".";
22 }
23}
milind1f1dca32021-07-03 13:50:07 -070024
25std::string_view ErrorToString(const RawSender::Error err) {
26 switch (err) {
27 case RawSender::Error::kOk:
28 return "RawSender::Error::kOk";
29 case RawSender::Error::kMessagesSentTooFast:
30 return "RawSender::Error::kMessagesSentTooFast";
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070031 case RawSender::Error::kInvalidRedzone:
32 return "RawSender::Error::kInvalidRedzone";
milind1f1dca32021-07-03 13:50:07 -070033 }
34 LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
35}
Austin Schuhd54780b2020-10-03 16:26:02 -070036} // namespace
Austin Schuh54cf95f2019-11-29 13:14:18 -080037
James Kuszmaul762e8692023-07-31 14:57:53 -070038std::optional<std::string> EventLoop::default_version_string_;
39
Austin Schuhe0ab4de2023-05-03 08:05:08 -070040std::pair<SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(size_t size) {
41 AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
42 malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
43
44 absl::Span<uint8_t> mutable_span(
45 reinterpret_cast<uint8_t *>(RoundChannelData(span->data(), size)), size);
46 // Use the placement new operator to construct an actual absl::Span in place.
47 new (span) AlignedOwningSpan(mutable_span);
48
49 return std::make_pair(
50 SharedSpan(std::shared_ptr<AlignedOwningSpan>(span,
51 [](AlignedOwningSpan *s) {
52 s->~AlignedOwningSpan();
53 free(s);
54 }),
55 &span->span),
56 mutable_span);
57}
58
milind1f1dca32021-07-03 13:50:07 -070059std::ostream &operator<<(std::ostream &os, const RawSender::Error err) {
60 os << ErrorToString(err);
61 return os;
62}
63
64void RawSender::CheckOk(const RawSender::Error err) {
Austin Schuhd7fbf022023-09-01 13:46:28 -070065 if (err != Error::kOk) {
66 event_loop_->SendTimingReport();
67 CHECK_EQ(err, Error::kOk)
68 << "Messages were sent too fast on channel: "
69 << configuration::CleanedChannelToString(channel_);
70 }
milind1f1dca32021-07-03 13:50:07 -070071}
72
Austin Schuh39788ff2019-12-01 18:22:57 -080073RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
74 : event_loop_(event_loop),
75 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050076 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080077 timing_(event_loop_->ChannelIndex(channel)) {
78 event_loop_->NewSender(this);
79}
80
81RawSender::~RawSender() { event_loop_->DeleteSender(this); }
82
milind1f1dca32021-07-03 13:50:07 -070083RawSender::Error RawSender::DoSend(
84 const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
85 realtime_clock::time_point realtime_remote_time,
86 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070087 return DoSend(data->data(), data->size(), monotonic_remote_time,
88 realtime_remote_time, remote_queue_index, source_boot_uuid);
89}
90
James Kuszmaul93abac12022-04-14 15:05:10 -070091void RawSender::RecordSendResult(const Error error, size_t message_size) {
92 switch (error) {
93 case Error::kOk: {
94 if (timing_.sender) {
95 timing_.size.Add(message_size);
96 timing_.sender->mutate_count(timing_.sender->count() + 1);
97 }
98 break;
99 }
100 case Error::kMessagesSentTooFast:
101 timing_.IncrementError(timing::SendError::MESSAGE_SENT_TOO_FAST);
102 break;
103 case Error::kInvalidRedzone:
104 timing_.IncrementError(timing::SendError::INVALID_REDZONE);
105 break;
106 }
107}
108
Austin Schuh39788ff2019-12-01 18:22:57 -0800109RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
110 : event_loop_(event_loop),
111 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -0500112 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -0800113 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -0800114 context_.monotonic_event_time = monotonic_clock::min_time;
115 context_.monotonic_remote_time = monotonic_clock::min_time;
116 context_.realtime_event_time = realtime_clock::min_time;
117 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -0800118 context_.queue_index = 0xffffffff;
Austin Schuh0debde12022-08-17 16:25:17 -0700119 context_.remote_queue_index = 0xffffffffu;
Austin Schuh39788ff2019-12-01 18:22:57 -0800120 context_.size = 0;
121 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -0700122 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -0800123 event_loop_->NewFetcher(this);
124}
125
126RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
127
128TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
129 : event_loop_(event_loop), fn_(std::move(fn)) {}
130
131TimerHandler::~TimerHandler() {}
132
133PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
134 std::function<void(int)> fn,
135 const monotonic_clock::duration interval,
136 const monotonic_clock::duration offset)
137 : event_loop_(event_loop),
138 fn_(std::move(fn)),
139 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
140 event_loop_->OnRun([this]() {
141 const monotonic_clock::time_point monotonic_now =
142 event_loop_->monotonic_now();
143 phased_loop_.Reset(monotonic_now);
James Kuszmaul20dcc7c2023-01-20 11:06:31 -0800144 Reschedule(monotonic_now);
Milind Upadhyay42589bb2021-05-19 20:05:16 -0700145 // Reschedule here will count cycles elapsed before now, and then the
146 // reschedule before running the handler will count the time that elapsed
147 // then. So clear the count here.
Austin Schuh39788ff2019-12-01 18:22:57 -0800148 cycles_elapsed_ = 0;
149 });
150}
151
152PhasedLoopHandler::~PhasedLoopHandler() {}
153
Austin Schuh83c7f702021-01-19 22:36:29 -0800154EventLoop::EventLoop(const Configuration *configuration)
James Kuszmaul762e8692023-07-31 14:57:53 -0700155 : version_string_(default_version_string_),
156 timing_report_(flatbuffers::DetachedBuffer()),
Austin Schuh56196432020-10-24 20:15:21 -0700157 configuration_(configuration) {}
Tyler Chatow67ddb032020-01-12 14:30:04 -0800158
Austin Schuh39788ff2019-12-01 18:22:57 -0800159EventLoop::~EventLoop() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800160 if (!senders_.empty()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700161 for (const RawSender *sender : senders_) {
162 LOG(ERROR) << " Sender "
163 << configuration::StrippedChannelToString(sender->channel())
164 << " still open";
165 }
166 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800167 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -0800168 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -0800169}
170
Brian Silvermanbf889922021-11-10 12:41:57 -0800171void EventLoop::SkipTimingReport() {
172 skip_timing_report_ = true;
173 timing_report_ = flatbuffers::DetachedBuffer();
174
175 for (size_t i = 0; i < timers_.size(); ++i) {
176 timers_[i]->timing_.set_timing_report(nullptr);
177 }
178
179 for (size_t i = 0; i < phased_loops_.size(); ++i) {
180 phased_loops_[i]->timing_.set_timing_report(nullptr);
181 }
182
183 for (size_t i = 0; i < watchers_.size(); ++i) {
184 watchers_[i]->set_timing_report(nullptr);
185 }
186
187 for (size_t i = 0; i < senders_.size(); ++i) {
188 senders_[i]->timing_.set_timing_report(nullptr);
189 }
190
191 for (size_t i = 0; i < fetchers_.size(); ++i) {
192 fetchers_[i]->timing_.set_timing_report(nullptr);
193 }
194}
195
Austin Schuh39788ff2019-12-01 18:22:57 -0800196int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -0800197 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800198}
199
Brian Silverman5120afb2020-01-31 17:44:35 -0800200WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
201 const int channel_index = ChannelIndex(channel);
202 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
203 if (watcher->channel_index() == channel_index) {
204 return watcher.get();
205 }
206 }
207 LOG(FATAL) << "No watcher found for channel";
208}
209
Austin Schuh39788ff2019-12-01 18:22:57 -0800210void EventLoop::NewSender(RawSender *sender) {
211 senders_.emplace_back(sender);
212 UpdateTimingReport();
213}
214void EventLoop::DeleteSender(RawSender *sender) {
215 CHECK(!is_running());
216 auto s = std::find(senders_.begin(), senders_.end(), sender);
217 CHECK(s != senders_.end()) << ": Sender not in senders list";
218 senders_.erase(s);
219 UpdateTimingReport();
220}
221
222TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
223 timers_.emplace_back(std::move(timer));
224 UpdateTimingReport();
225 return timers_.back().get();
226}
227
228PhasedLoopHandler *EventLoop::NewPhasedLoop(
229 std::unique_ptr<PhasedLoopHandler> phased_loop) {
230 phased_loops_.emplace_back(std::move(phased_loop));
231 UpdateTimingReport();
232 return phased_loops_.back().get();
233}
234
235void EventLoop::NewFetcher(RawFetcher *fetcher) {
Austin Schuhd54780b2020-10-03 16:26:02 -0700236 CheckAlignment(fetcher->channel());
237
Austin Schuh39788ff2019-12-01 18:22:57 -0800238 fetchers_.emplace_back(fetcher);
239 UpdateTimingReport();
240}
241
242void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
243 CHECK(!is_running());
244 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
245 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
246 fetchers_.erase(f);
247 UpdateTimingReport();
248}
249
250WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
251 watchers_.emplace_back(std::move(watcher));
252
253 UpdateTimingReport();
254
255 return watchers_.back().get();
256}
257
Brian Silverman0fc69932020-01-24 21:54:02 -0800258void EventLoop::TakeWatcher(const Channel *channel) {
259 CHECK(!is_running()) << ": Cannot add new objects while running.";
260 ChannelIndex(channel);
261
Austin Schuhd54780b2020-10-03 16:26:02 -0700262 CheckAlignment(channel);
263
Brian Silverman0fc69932020-01-24 21:54:02 -0800264 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800265 << ": " << configuration::CleanedChannelToString(channel)
milind-u5dbdba42023-02-04 17:48:43 -0800266 << " is already being used for sending. Can't make a watcher on the "
267 "same event loop.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800268
269 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800270 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800271 << " is already being used.";
272
273 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800274 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800275 << " is not able to be watched on this node. Check your "
276 "configuration.";
277 }
278}
279
280void EventLoop::TakeSender(const Channel *channel) {
281 CHECK(!is_running()) << ": Cannot add new objects while running.";
282 ChannelIndex(channel);
283
Austin Schuhd54780b2020-10-03 16:26:02 -0700284 CheckAlignment(channel);
285
Brian Silverman0fc69932020-01-24 21:54:02 -0800286 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800287 << ": Channel " << configuration::CleanedChannelToString(channel)
288 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800289
290 // We don't care if this is a duplicate.
291 taken_senders_.insert(channel);
292}
293
Austin Schuh39788ff2019-12-01 18:22:57 -0800294void EventLoop::SendTimingReport() {
Brian Silvermance418d02021-11-03 11:25:52 -0700295 if (!timing_report_sender_) {
296 // Timing reports are disabled, so nothing for us to do.
297 return;
298 }
299
Austin Schuh39788ff2019-12-01 18:22:57 -0800300 // We need to do a fancy dance here to get all the accounting to work right.
301 // We want to copy the memory here, but then send after resetting. Otherwise
302 // the send for the timing report won't be counted in the timing report.
303 //
304 // Also, flatbuffers build from the back end. So place this at the back end
305 // of the buffer. We only have to care because we are using this in a very
306 // raw fashion.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800307 CHECK_LE(timing_report_.span().size(), timing_report_sender_->size())
Austin Schuhf9402e32023-07-10 12:55:50 -0700308 << ": Timing report bigger than the sender size for " << name() << ".";
Austin Schuhadd6eb32020-11-09 21:24:26 -0800309 std::copy(timing_report_.span().data(),
310 timing_report_.span().data() + timing_report_.span().size(),
Austin Schuh39788ff2019-12-01 18:22:57 -0800311 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
Austin Schuhadd6eb32020-11-09 21:24:26 -0800312 timing_report_sender_->size() - timing_report_.span().size());
Austin Schuh39788ff2019-12-01 18:22:57 -0800313
314 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
315 timer->timing_.ResetTimingReport();
316 }
317 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
318 watcher->ResetReport();
319 }
320 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
321 phased_loop->timing_.ResetTimingReport();
322 }
323 for (RawSender *sender : senders_) {
324 sender->timing_.ResetTimingReport();
325 }
326 for (RawFetcher *fetcher : fetchers_) {
327 fetcher->timing_.ResetTimingReport();
328 }
milind1f1dca32021-07-03 13:50:07 -0700329 // TODO(milind): If we fail to send, we don't want to reset the timing report.
330 // We would need to move the reset after the send, and then find the correct
331 // timing report and set the reports with it instead of letting the sender do
332 // this. If we failed to send, we wouldn't reset or set the reports, so they
333 // can accumalate until the next send.
334 timing_report_failure_counter_.Count(
335 timing_report_sender_->Send(timing_report_.span().size()));
Austin Schuh39788ff2019-12-01 18:22:57 -0800336}
337
338void EventLoop::UpdateTimingReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800339 if (skip_timing_report_) {
340 return;
341 }
342
Austin Schuh39788ff2019-12-01 18:22:57 -0800343 // We need to support senders and fetchers changing while we are setting up
344 // the event loop. Otherwise we can't fetch or send until the loop runs. This
345 // means that on each change, we need to redo all this work. This makes setup
346 // more expensive, but not by all that much on a modern processor.
347
348 // Now, build up a report with everything pre-filled out.
349 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800350 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800351
352 // Pre-fill in the defaults for timers.
353 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
354 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
355 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
356 timing::CreateStatistic(fbb);
357 flatbuffers::Offset<timing::Statistic> handler_time_offset =
358 timing::CreateStatistic(fbb);
359 flatbuffers::Offset<flatbuffers::String> name_offset;
360 if (timer->name().size() != 0) {
361 name_offset = fbb.CreateString(timer->name());
362 }
363
364 timing::Timer::Builder timer_builder(fbb);
365
366 if (timer->name().size() != 0) {
367 timer_builder.add_name(name_offset);
368 }
369 timer_builder.add_wakeup_latency(wakeup_latency_offset);
370 timer_builder.add_handler_time(handler_time_offset);
371 timer_builder.add_count(0);
372 timer_offsets.emplace_back(timer_builder.Finish());
373 }
374
375 // Pre-fill in the defaults for phased_loops.
376 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
377 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
378 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
379 timing::CreateStatistic(fbb);
380 flatbuffers::Offset<timing::Statistic> handler_time_offset =
381 timing::CreateStatistic(fbb);
382 flatbuffers::Offset<flatbuffers::String> name_offset;
383 if (phased_loop->name().size() != 0) {
384 name_offset = fbb.CreateString(phased_loop->name());
385 }
386
387 timing::Timer::Builder timer_builder(fbb);
388
389 if (phased_loop->name().size() != 0) {
390 timer_builder.add_name(name_offset);
391 }
392 timer_builder.add_wakeup_latency(wakeup_latency_offset);
393 timer_builder.add_handler_time(handler_time_offset);
394 timer_builder.add_count(0);
395 phased_loop_offsets.emplace_back(timer_builder.Finish());
396 }
397
398 // Pre-fill in the defaults for watchers.
399 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
400 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
401 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
402 timing::CreateStatistic(fbb);
403 flatbuffers::Offset<timing::Statistic> handler_time_offset =
404 timing::CreateStatistic(fbb);
405
406 timing::Watcher::Builder watcher_builder(fbb);
407
408 watcher_builder.add_channel_index(watcher->channel_index());
409 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
410 watcher_builder.add_handler_time(handler_time_offset);
411 watcher_builder.add_count(0);
412 watcher_offsets.emplace_back(watcher_builder.Finish());
413 }
414
415 // Pre-fill in the defaults for senders.
416 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
James Kuszmaulcc94ed42022-08-24 11:36:17 -0700417 for (RawSender *sender : senders_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800418 flatbuffers::Offset<timing::Statistic> size_offset =
419 timing::CreateStatistic(fbb);
420
James Kuszmaul78514332022-04-06 15:08:34 -0700421 const flatbuffers::Offset<
422 flatbuffers::Vector<flatbuffers::Offset<timing::SendErrorCount>>>
James Kuszmaulcc94ed42022-08-24 11:36:17 -0700423 error_counts_offset = sender->timing_.error_counter.Initialize(&fbb);
James Kuszmaul78514332022-04-06 15:08:34 -0700424
Austin Schuh39788ff2019-12-01 18:22:57 -0800425 timing::Sender::Builder sender_builder(fbb);
426
427 sender_builder.add_channel_index(sender->timing_.channel_index);
428 sender_builder.add_size(size_offset);
James Kuszmaul78514332022-04-06 15:08:34 -0700429 sender_builder.add_error_counts(error_counts_offset);
Austin Schuh39788ff2019-12-01 18:22:57 -0800430 sender_builder.add_count(0);
431 sender_offsets.emplace_back(sender_builder.Finish());
432 }
433
434 // Pre-fill in the defaults for fetchers.
435 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
436 for (RawFetcher *fetcher : fetchers_) {
437 flatbuffers::Offset<timing::Statistic> latency_offset =
438 timing::CreateStatistic(fbb);
439
440 timing::Fetcher::Builder fetcher_builder(fbb);
441
442 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
443 fetcher_builder.add_count(0);
444 fetcher_builder.add_latency(latency_offset);
445 fetcher_offsets.emplace_back(fetcher_builder.Finish());
446 }
447
448 // Then build the final report.
449 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
450 timers_offset;
451 if (timer_offsets.size() > 0) {
452 timers_offset = fbb.CreateVector(timer_offsets);
453 }
454
455 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
456 phased_loops_offset;
457 if (phased_loop_offsets.size() > 0) {
458 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
459 }
460
461 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
462 watchers_offset;
463 if (watcher_offsets.size() > 0) {
464 watchers_offset = fbb.CreateVector(watcher_offsets);
465 }
466
467 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
468 senders_offset;
469 if (sender_offsets.size() > 0) {
470 senders_offset = fbb.CreateVector(sender_offsets);
471 }
472
473 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
474 fetchers_offset;
475 if (fetcher_offsets.size() > 0) {
476 fetchers_offset = fbb.CreateVector(fetcher_offsets);
477 }
478
479 flatbuffers::Offset<flatbuffers::String> name_offset =
480 fbb.CreateString(name());
481
James Kuszmaul762e8692023-07-31 14:57:53 -0700482 const flatbuffers::Offset<flatbuffers::String> version_offset =
483 version_string_.has_value() ? fbb.CreateString(version_string_.value())
484 : flatbuffers::Offset<flatbuffers::String>();
485
Austin Schuh39788ff2019-12-01 18:22:57 -0800486 timing::Report::Builder report_builder(fbb);
487 report_builder.add_name(name_offset);
James Kuszmaul762e8692023-07-31 14:57:53 -0700488 report_builder.add_version(version_offset);
Austin Schuh39788ff2019-12-01 18:22:57 -0800489 report_builder.add_pid(GetTid());
490 if (timer_offsets.size() > 0) {
491 report_builder.add_timers(timers_offset);
492 }
493 if (phased_loop_offsets.size() > 0) {
494 report_builder.add_phased_loops(phased_loops_offset);
495 }
496 if (watcher_offsets.size() > 0) {
497 report_builder.add_watchers(watchers_offset);
498 }
499 if (sender_offsets.size() > 0) {
500 report_builder.add_senders(senders_offset);
501 }
502 if (fetcher_offsets.size() > 0) {
503 report_builder.add_fetchers(fetchers_offset);
504 }
milind1f1dca32021-07-03 13:50:07 -0700505 report_builder.add_send_failures(timing_report_failure_counter_.failures());
Austin Schuh39788ff2019-12-01 18:22:57 -0800506 fbb.Finish(report_builder.Finish());
507
508 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
509
510 // Now that the pointers are stable, pass them to the timers and watchers to
511 // be updated.
512 for (size_t i = 0; i < timers_.size(); ++i) {
513 timers_[i]->timing_.set_timing_report(
514 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
515 i));
516 }
517
518 for (size_t i = 0; i < phased_loops_.size(); ++i) {
519 phased_loops_[i]->timing_.set_timing_report(
520 timing_report_.mutable_message()
521 ->mutable_phased_loops()
522 ->GetMutableObject(i));
523 }
524
525 for (size_t i = 0; i < watchers_.size(); ++i) {
526 watchers_[i]->set_timing_report(
527 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
528 i));
529 }
530
531 for (size_t i = 0; i < senders_.size(); ++i) {
532 senders_[i]->timing_.set_timing_report(
533 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
534 i));
535 }
536
537 for (size_t i = 0; i < fetchers_.size(); ++i) {
538 fetchers_[i]->timing_.set_timing_report(
539 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
540 i));
541 }
542}
543
544void EventLoop::MaybeScheduleTimingReports() {
545 if (FLAGS_timing_reports && !skip_timing_report_) {
546 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
547 // Make a raw sender for the report.
548 const Channel *channel = configuration::GetChannel(
549 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800550 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700551 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
552 "\"type\": \"aos.timing.Report\"} on node "
553 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800554
555 // Since we are using a RawSender, validity isn't checked. So check it
556 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800557 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
558 LOG(FATAL) << "Channel { \"name\": \"/aos"
559 << channel->name()->string_view() << "\", \"type\": \""
560 << channel->type()->string_view()
561 << "\" } is not able to be sent on this node. Check your "
562 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800563 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800564 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
565 << timing::Report::GetFullyQualifiedName()
566 << "\" } not found in config.";
567 timing_report_sender_ = MakeRawSender(channel);
568
569 // Register a handler which sends the report out by copying the raw data
570 // from the prebuilt and subsequently modified report.
571 TimerHandler *timing_reports_timer =
572 AddTimer([this]() { SendTimingReport(); });
573
574 // Set it up to send once per second.
575 timing_reports_timer->set_name("timing_reports");
576 OnRun([this, timing_reports_timer]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700577 timing_reports_timer->Schedule(
Austin Schuh39788ff2019-12-01 18:22:57 -0800578 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
579 std::chrono::milliseconds(FLAGS_timing_report_ms));
580 });
581
582 UpdateTimingReport();
583 }
584}
585
Austin Schuh7d87b672019-12-01 20:23:49 -0800586void EventLoop::ReserveEvents() {
587 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
588}
589
590namespace {
591bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700592 if (first->event_time() > second->event_time()) {
593 return true;
594 }
595 if (first->event_time() < second->event_time()) {
596 return false;
597 }
598 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800599}
600} // namespace
601
602void EventLoop::AddEvent(EventLoopEvent *event) {
603 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700604 DCHECK(event->generation() == 0);
605 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800606 events_.push_back(event);
607 std::push_heap(events_.begin(), events_.end(), CompareEvents);
608}
609
610void EventLoop::RemoveEvent(EventLoopEvent *event) {
611 auto e = std::find(events_.begin(), events_.end(), event);
612 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700613 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800614 events_.erase(e);
615 std::make_heap(events_.begin(), events_.end(), CompareEvents);
616 event->Invalidate();
617 }
618}
619
620EventLoopEvent *EventLoop::PopEvent() {
621 EventLoopEvent *result = events_.front();
622 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
623 events_.pop_back();
624 result->Invalidate();
625 return result;
626}
627
Austin Schuh0debde12022-08-17 16:25:17 -0700628void EventLoop::ClearContext() {
629 context_.monotonic_event_time = monotonic_clock::min_time;
630 context_.monotonic_remote_time = monotonic_clock::min_time;
631 context_.realtime_event_time = realtime_clock::min_time;
632 context_.realtime_remote_time = realtime_clock::min_time;
633 context_.queue_index = 0xffffffffu;
634 context_.remote_queue_index = 0xffffffffu;
635 context_.size = 0u;
636 context_.data = nullptr;
637 context_.buffer_index = -1;
638 context_.source_boot_uuid = boot_uuid();
639}
640
Austin Schuha9012be2021-07-21 15:19:11 -0700641void EventLoop::SetTimerContext(
642 monotonic_clock::time_point monotonic_event_time) {
643 context_.monotonic_event_time = monotonic_event_time;
644 context_.monotonic_remote_time = monotonic_clock::min_time;
645 context_.realtime_event_time = realtime_clock::min_time;
646 context_.realtime_remote_time = realtime_clock::min_time;
647 context_.queue_index = 0xffffffffu;
Austin Schuh0debde12022-08-17 16:25:17 -0700648 context_.remote_queue_index = 0xffffffffu;
Austin Schuha9012be2021-07-21 15:19:11 -0700649 context_.size = 0u;
650 context_.data = nullptr;
651 context_.buffer_index = -1;
652 context_.source_boot_uuid = boot_uuid();
653}
654
Austin Schuh070019a2022-12-20 22:23:09 -0800655cpu_set_t EventLoop::DefaultAffinity() { return aos::DefaultAffinity(); }
656
James Kuszmaul762e8692023-07-31 14:57:53 -0700657void EventLoop::SetDefaultVersionString(std::string_view version) {
658 default_version_string_ = version;
659}
660
661void EventLoop::SetVersionString(std::string_view version) {
662 CHECK(!is_running())
663 << ": Can't do things that might alter the timing report while running.";
664 version_string_ = version;
665
666 UpdateTimingReport();
667}
668
Austin Schuh39788ff2019-12-01 18:22:57 -0800669void WatcherState::set_timing_report(timing::Watcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800670 watcher_ = watcher;
Brian Silvermanbf889922021-11-10 12:41:57 -0800671 if (!watcher) {
672 wakeup_latency_.set_statistic(nullptr);
673 handler_time_.set_statistic(nullptr);
674 } else {
675 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
676 handler_time_.set_statistic(watcher->mutable_handler_time());
677 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800678}
679
680void WatcherState::ResetReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800681 if (!watcher_) {
682 return;
683 }
684
Austin Schuh39788ff2019-12-01 18:22:57 -0800685 wakeup_latency_.Reset();
686 handler_time_.Reset();
687 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800688}
689
690} // namespace aos