blob: 09a3834bde22adbcc06636149c65d1b5f2cdb900 [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08006#include "glog/logging.h"
7
Austin Schuh39788ff2019-12-01 18:22:57 -08008DEFINE_bool(timing_reports, true, "Publish timing reports.");
9DEFINE_int32(timing_report_ms, 1000,
10 "Period in milliseconds to publish timing reports at.");
11
Austin Schuh54cf95f2019-11-29 13:14:18 -080012namespace aos {
Austin Schuhd54780b2020-10-03 16:26:02 -070013namespace {
14void CheckAlignment(const Channel *channel) {
15 if (channel->max_size() % alignof(flatbuffers::largest_scalar_t) != 0) {
16 LOG(FATAL) << "max_size() (" << channel->max_size()
17 << ") is not a multiple of alignment ("
18 << alignof(flatbuffers::largest_scalar_t) << ") for channel "
19 << configuration::CleanedChannelToString(channel) << ".";
20 }
21}
milind1f1dca32021-07-03 13:50:07 -070022
23std::string_view ErrorToString(const RawSender::Error err) {
24 switch (err) {
25 case RawSender::Error::kOk:
26 return "RawSender::Error::kOk";
27 case RawSender::Error::kMessagesSentTooFast:
28 return "RawSender::Error::kMessagesSentTooFast";
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -070029 case RawSender::Error::kInvalidRedzone:
30 return "RawSender::Error::kInvalidRedzone";
milind1f1dca32021-07-03 13:50:07 -070031 }
32 LOG(FATAL) << "Unknown error given with code " << static_cast<int>(err);
33}
Austin Schuhd54780b2020-10-03 16:26:02 -070034} // namespace
Austin Schuh54cf95f2019-11-29 13:14:18 -080035
milind1f1dca32021-07-03 13:50:07 -070036std::ostream &operator<<(std::ostream &os, const RawSender::Error err) {
37 os << ErrorToString(err);
38 return os;
39}
40
41void RawSender::CheckOk(const RawSender::Error err) {
42 CHECK_EQ(err, Error::kOk) << "Messages were sent too fast on channel: "
43 << configuration::CleanedChannelToString(channel_);
44}
45
Austin Schuh39788ff2019-12-01 18:22:57 -080046RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
47 : event_loop_(event_loop),
48 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050049 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080050 timing_(event_loop_->ChannelIndex(channel)) {
51 event_loop_->NewSender(this);
52}
53
54RawSender::~RawSender() { event_loop_->DeleteSender(this); }
55
milind1f1dca32021-07-03 13:50:07 -070056RawSender::Error RawSender::DoSend(
57 const SharedSpan data, monotonic_clock::time_point monotonic_remote_time,
58 realtime_clock::time_point realtime_remote_time,
59 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070060 return DoSend(data->data(), data->size(), monotonic_remote_time,
61 realtime_remote_time, remote_queue_index, source_boot_uuid);
62}
63
James Kuszmaul93abac12022-04-14 15:05:10 -070064void RawSender::RecordSendResult(const Error error, size_t message_size) {
65 switch (error) {
66 case Error::kOk: {
67 if (timing_.sender) {
68 timing_.size.Add(message_size);
69 timing_.sender->mutate_count(timing_.sender->count() + 1);
70 }
71 break;
72 }
73 case Error::kMessagesSentTooFast:
74 timing_.IncrementError(timing::SendError::MESSAGE_SENT_TOO_FAST);
75 break;
76 case Error::kInvalidRedzone:
77 timing_.IncrementError(timing::SendError::INVALID_REDZONE);
78 break;
79 }
80}
81
Austin Schuh39788ff2019-12-01 18:22:57 -080082RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
83 : event_loop_(event_loop),
84 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050085 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080086 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080087 context_.monotonic_event_time = monotonic_clock::min_time;
88 context_.monotonic_remote_time = monotonic_clock::min_time;
89 context_.realtime_event_time = realtime_clock::min_time;
90 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080091 context_.queue_index = 0xffffffff;
Austin Schuh0debde12022-08-17 16:25:17 -070092 context_.remote_queue_index = 0xffffffffu;
Austin Schuh39788ff2019-12-01 18:22:57 -080093 context_.size = 0;
94 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -070095 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -080096 event_loop_->NewFetcher(this);
97}
98
99RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
100
101TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
102 : event_loop_(event_loop), fn_(std::move(fn)) {}
103
104TimerHandler::~TimerHandler() {}
105
106PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
107 std::function<void(int)> fn,
108 const monotonic_clock::duration interval,
109 const monotonic_clock::duration offset)
110 : event_loop_(event_loop),
111 fn_(std::move(fn)),
112 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
113 event_loop_->OnRun([this]() {
114 const monotonic_clock::time_point monotonic_now =
115 event_loop_->monotonic_now();
116 phased_loop_.Reset(monotonic_now);
117 Reschedule(
118 [this](monotonic_clock::time_point sleep_time) {
119 Schedule(sleep_time);
120 },
121 monotonic_now);
Milind Upadhyay42589bb2021-05-19 20:05:16 -0700122 // Reschedule here will count cycles elapsed before now, and then the
123 // reschedule before running the handler will count the time that elapsed
124 // then. So clear the count here.
Austin Schuh39788ff2019-12-01 18:22:57 -0800125 cycles_elapsed_ = 0;
126 });
127}
128
129PhasedLoopHandler::~PhasedLoopHandler() {}
130
Austin Schuh83c7f702021-01-19 22:36:29 -0800131EventLoop::EventLoop(const Configuration *configuration)
132 : timing_report_(flatbuffers::DetachedBuffer()),
Austin Schuh56196432020-10-24 20:15:21 -0700133 configuration_(configuration) {}
Tyler Chatow67ddb032020-01-12 14:30:04 -0800134
Austin Schuh39788ff2019-12-01 18:22:57 -0800135EventLoop::~EventLoop() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800136 if (!senders_.empty()) {
Austin Schuh58646e22021-08-23 23:51:46 -0700137 for (const RawSender *sender : senders_) {
138 LOG(ERROR) << " Sender "
139 << configuration::StrippedChannelToString(sender->channel())
140 << " still open";
141 }
142 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800143 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -0800144 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -0800145}
146
Brian Silvermanbf889922021-11-10 12:41:57 -0800147void EventLoop::SkipTimingReport() {
148 skip_timing_report_ = true;
149 timing_report_ = flatbuffers::DetachedBuffer();
150
151 for (size_t i = 0; i < timers_.size(); ++i) {
152 timers_[i]->timing_.set_timing_report(nullptr);
153 }
154
155 for (size_t i = 0; i < phased_loops_.size(); ++i) {
156 phased_loops_[i]->timing_.set_timing_report(nullptr);
157 }
158
159 for (size_t i = 0; i < watchers_.size(); ++i) {
160 watchers_[i]->set_timing_report(nullptr);
161 }
162
163 for (size_t i = 0; i < senders_.size(); ++i) {
164 senders_[i]->timing_.set_timing_report(nullptr);
165 }
166
167 for (size_t i = 0; i < fetchers_.size(); ++i) {
168 fetchers_[i]->timing_.set_timing_report(nullptr);
169 }
170}
171
Austin Schuh39788ff2019-12-01 18:22:57 -0800172int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -0800173 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800174}
175
Brian Silverman5120afb2020-01-31 17:44:35 -0800176WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
177 const int channel_index = ChannelIndex(channel);
178 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
179 if (watcher->channel_index() == channel_index) {
180 return watcher.get();
181 }
182 }
183 LOG(FATAL) << "No watcher found for channel";
184}
185
Austin Schuh39788ff2019-12-01 18:22:57 -0800186void EventLoop::NewSender(RawSender *sender) {
187 senders_.emplace_back(sender);
188 UpdateTimingReport();
189}
190void EventLoop::DeleteSender(RawSender *sender) {
191 CHECK(!is_running());
192 auto s = std::find(senders_.begin(), senders_.end(), sender);
193 CHECK(s != senders_.end()) << ": Sender not in senders list";
194 senders_.erase(s);
195 UpdateTimingReport();
196}
197
198TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
199 timers_.emplace_back(std::move(timer));
200 UpdateTimingReport();
201 return timers_.back().get();
202}
203
204PhasedLoopHandler *EventLoop::NewPhasedLoop(
205 std::unique_ptr<PhasedLoopHandler> phased_loop) {
206 phased_loops_.emplace_back(std::move(phased_loop));
207 UpdateTimingReport();
208 return phased_loops_.back().get();
209}
210
211void EventLoop::NewFetcher(RawFetcher *fetcher) {
Austin Schuhd54780b2020-10-03 16:26:02 -0700212 CheckAlignment(fetcher->channel());
213
Austin Schuh39788ff2019-12-01 18:22:57 -0800214 fetchers_.emplace_back(fetcher);
215 UpdateTimingReport();
216}
217
218void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
219 CHECK(!is_running());
220 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
221 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
222 fetchers_.erase(f);
223 UpdateTimingReport();
224}
225
226WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
227 watchers_.emplace_back(std::move(watcher));
228
229 UpdateTimingReport();
230
231 return watchers_.back().get();
232}
233
Brian Silverman0fc69932020-01-24 21:54:02 -0800234void EventLoop::TakeWatcher(const Channel *channel) {
235 CHECK(!is_running()) << ": Cannot add new objects while running.";
236 ChannelIndex(channel);
237
Austin Schuhd54780b2020-10-03 16:26:02 -0700238 CheckAlignment(channel);
239
Brian Silverman0fc69932020-01-24 21:54:02 -0800240 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800241 << ": " << configuration::CleanedChannelToString(channel)
242 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800243
244 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800245 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800246 << " is already being used.";
247
248 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800249 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800250 << " is not able to be watched on this node. Check your "
251 "configuration.";
252 }
253}
254
255void EventLoop::TakeSender(const Channel *channel) {
256 CHECK(!is_running()) << ": Cannot add new objects while running.";
257 ChannelIndex(channel);
258
Austin Schuhd54780b2020-10-03 16:26:02 -0700259 CheckAlignment(channel);
260
Brian Silverman0fc69932020-01-24 21:54:02 -0800261 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800262 << ": Channel " << configuration::CleanedChannelToString(channel)
263 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800264
265 // We don't care if this is a duplicate.
266 taken_senders_.insert(channel);
267}
268
Austin Schuh39788ff2019-12-01 18:22:57 -0800269void EventLoop::SendTimingReport() {
Brian Silvermance418d02021-11-03 11:25:52 -0700270 if (!timing_report_sender_) {
271 // Timing reports are disabled, so nothing for us to do.
272 return;
273 }
274
Austin Schuh39788ff2019-12-01 18:22:57 -0800275 // We need to do a fancy dance here to get all the accounting to work right.
276 // We want to copy the memory here, but then send after resetting. Otherwise
277 // the send for the timing report won't be counted in the timing report.
278 //
279 // Also, flatbuffers build from the back end. So place this at the back end
280 // of the buffer. We only have to care because we are using this in a very
281 // raw fashion.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800282 CHECK_LE(timing_report_.span().size(), timing_report_sender_->size())
Austin Schuh39788ff2019-12-01 18:22:57 -0800283 << ": Timing report bigger than the sender size.";
Austin Schuhadd6eb32020-11-09 21:24:26 -0800284 std::copy(timing_report_.span().data(),
285 timing_report_.span().data() + timing_report_.span().size(),
Austin Schuh39788ff2019-12-01 18:22:57 -0800286 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
Austin Schuhadd6eb32020-11-09 21:24:26 -0800287 timing_report_sender_->size() - timing_report_.span().size());
Austin Schuh39788ff2019-12-01 18:22:57 -0800288
289 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
290 timer->timing_.ResetTimingReport();
291 }
292 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
293 watcher->ResetReport();
294 }
295 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
296 phased_loop->timing_.ResetTimingReport();
297 }
298 for (RawSender *sender : senders_) {
299 sender->timing_.ResetTimingReport();
300 }
301 for (RawFetcher *fetcher : fetchers_) {
302 fetcher->timing_.ResetTimingReport();
303 }
milind1f1dca32021-07-03 13:50:07 -0700304 // TODO(milind): If we fail to send, we don't want to reset the timing report.
305 // We would need to move the reset after the send, and then find the correct
306 // timing report and set the reports with it instead of letting the sender do
307 // this. If we failed to send, we wouldn't reset or set the reports, so they
308 // can accumalate until the next send.
309 timing_report_failure_counter_.Count(
310 timing_report_sender_->Send(timing_report_.span().size()));
Austin Schuh39788ff2019-12-01 18:22:57 -0800311}
312
313void EventLoop::UpdateTimingReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800314 if (skip_timing_report_) {
315 return;
316 }
317
Austin Schuh39788ff2019-12-01 18:22:57 -0800318 // We need to support senders and fetchers changing while we are setting up
319 // the event loop. Otherwise we can't fetch or send until the loop runs. This
320 // means that on each change, we need to redo all this work. This makes setup
321 // more expensive, but not by all that much on a modern processor.
322
323 // Now, build up a report with everything pre-filled out.
324 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800325 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800326
327 // Pre-fill in the defaults for timers.
328 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
329 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
330 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
331 timing::CreateStatistic(fbb);
332 flatbuffers::Offset<timing::Statistic> handler_time_offset =
333 timing::CreateStatistic(fbb);
334 flatbuffers::Offset<flatbuffers::String> name_offset;
335 if (timer->name().size() != 0) {
336 name_offset = fbb.CreateString(timer->name());
337 }
338
339 timing::Timer::Builder timer_builder(fbb);
340
341 if (timer->name().size() != 0) {
342 timer_builder.add_name(name_offset);
343 }
344 timer_builder.add_wakeup_latency(wakeup_latency_offset);
345 timer_builder.add_handler_time(handler_time_offset);
346 timer_builder.add_count(0);
347 timer_offsets.emplace_back(timer_builder.Finish());
348 }
349
350 // Pre-fill in the defaults for phased_loops.
351 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
352 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
353 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
354 timing::CreateStatistic(fbb);
355 flatbuffers::Offset<timing::Statistic> handler_time_offset =
356 timing::CreateStatistic(fbb);
357 flatbuffers::Offset<flatbuffers::String> name_offset;
358 if (phased_loop->name().size() != 0) {
359 name_offset = fbb.CreateString(phased_loop->name());
360 }
361
362 timing::Timer::Builder timer_builder(fbb);
363
364 if (phased_loop->name().size() != 0) {
365 timer_builder.add_name(name_offset);
366 }
367 timer_builder.add_wakeup_latency(wakeup_latency_offset);
368 timer_builder.add_handler_time(handler_time_offset);
369 timer_builder.add_count(0);
370 phased_loop_offsets.emplace_back(timer_builder.Finish());
371 }
372
373 // Pre-fill in the defaults for watchers.
374 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
375 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
376 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
377 timing::CreateStatistic(fbb);
378 flatbuffers::Offset<timing::Statistic> handler_time_offset =
379 timing::CreateStatistic(fbb);
380
381 timing::Watcher::Builder watcher_builder(fbb);
382
383 watcher_builder.add_channel_index(watcher->channel_index());
384 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
385 watcher_builder.add_handler_time(handler_time_offset);
386 watcher_builder.add_count(0);
387 watcher_offsets.emplace_back(watcher_builder.Finish());
388 }
389
390 // Pre-fill in the defaults for senders.
391 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
392 for (const RawSender *sender : senders_) {
393 flatbuffers::Offset<timing::Statistic> size_offset =
394 timing::CreateStatistic(fbb);
395
James Kuszmaul78514332022-04-06 15:08:34 -0700396 std::vector<flatbuffers::Offset<timing::SendErrorCount>>
397 error_count_offsets;
398 for (size_t ii = 0; ii < internal::RawSenderTiming::kNumErrors; ++ii) {
399 error_count_offsets.push_back(timing::CreateSendErrorCount(
400 fbb, timing::EnumValuesSendError()[ii], 0));
401 }
402 const flatbuffers::Offset<
403 flatbuffers::Vector<flatbuffers::Offset<timing::SendErrorCount>>>
404 error_counts_offset = fbb.CreateVector(error_count_offsets);
405
Austin Schuh39788ff2019-12-01 18:22:57 -0800406 timing::Sender::Builder sender_builder(fbb);
407
408 sender_builder.add_channel_index(sender->timing_.channel_index);
409 sender_builder.add_size(size_offset);
James Kuszmaul78514332022-04-06 15:08:34 -0700410 sender_builder.add_error_counts(error_counts_offset);
Austin Schuh39788ff2019-12-01 18:22:57 -0800411 sender_builder.add_count(0);
412 sender_offsets.emplace_back(sender_builder.Finish());
413 }
414
415 // Pre-fill in the defaults for fetchers.
416 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
417 for (RawFetcher *fetcher : fetchers_) {
418 flatbuffers::Offset<timing::Statistic> latency_offset =
419 timing::CreateStatistic(fbb);
420
421 timing::Fetcher::Builder fetcher_builder(fbb);
422
423 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
424 fetcher_builder.add_count(0);
425 fetcher_builder.add_latency(latency_offset);
426 fetcher_offsets.emplace_back(fetcher_builder.Finish());
427 }
428
429 // Then build the final report.
430 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
431 timers_offset;
432 if (timer_offsets.size() > 0) {
433 timers_offset = fbb.CreateVector(timer_offsets);
434 }
435
436 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
437 phased_loops_offset;
438 if (phased_loop_offsets.size() > 0) {
439 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
440 }
441
442 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
443 watchers_offset;
444 if (watcher_offsets.size() > 0) {
445 watchers_offset = fbb.CreateVector(watcher_offsets);
446 }
447
448 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
449 senders_offset;
450 if (sender_offsets.size() > 0) {
451 senders_offset = fbb.CreateVector(sender_offsets);
452 }
453
454 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
455 fetchers_offset;
456 if (fetcher_offsets.size() > 0) {
457 fetchers_offset = fbb.CreateVector(fetcher_offsets);
458 }
459
460 flatbuffers::Offset<flatbuffers::String> name_offset =
461 fbb.CreateString(name());
462
463 timing::Report::Builder report_builder(fbb);
464 report_builder.add_name(name_offset);
465 report_builder.add_pid(GetTid());
466 if (timer_offsets.size() > 0) {
467 report_builder.add_timers(timers_offset);
468 }
469 if (phased_loop_offsets.size() > 0) {
470 report_builder.add_phased_loops(phased_loops_offset);
471 }
472 if (watcher_offsets.size() > 0) {
473 report_builder.add_watchers(watchers_offset);
474 }
475 if (sender_offsets.size() > 0) {
476 report_builder.add_senders(senders_offset);
477 }
478 if (fetcher_offsets.size() > 0) {
479 report_builder.add_fetchers(fetchers_offset);
480 }
milind1f1dca32021-07-03 13:50:07 -0700481 report_builder.add_send_failures(timing_report_failure_counter_.failures());
Austin Schuh39788ff2019-12-01 18:22:57 -0800482 fbb.Finish(report_builder.Finish());
483
484 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
485
486 // Now that the pointers are stable, pass them to the timers and watchers to
487 // be updated.
488 for (size_t i = 0; i < timers_.size(); ++i) {
489 timers_[i]->timing_.set_timing_report(
490 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
491 i));
492 }
493
494 for (size_t i = 0; i < phased_loops_.size(); ++i) {
495 phased_loops_[i]->timing_.set_timing_report(
496 timing_report_.mutable_message()
497 ->mutable_phased_loops()
498 ->GetMutableObject(i));
499 }
500
501 for (size_t i = 0; i < watchers_.size(); ++i) {
502 watchers_[i]->set_timing_report(
503 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
504 i));
505 }
506
507 for (size_t i = 0; i < senders_.size(); ++i) {
508 senders_[i]->timing_.set_timing_report(
509 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
510 i));
511 }
512
513 for (size_t i = 0; i < fetchers_.size(); ++i) {
514 fetchers_[i]->timing_.set_timing_report(
515 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
516 i));
517 }
518}
519
520void EventLoop::MaybeScheduleTimingReports() {
521 if (FLAGS_timing_reports && !skip_timing_report_) {
522 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
523 // Make a raw sender for the report.
524 const Channel *channel = configuration::GetChannel(
525 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800526 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700527 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
528 "\"type\": \"aos.timing.Report\"} on node "
529 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800530
531 // Since we are using a RawSender, validity isn't checked. So check it
532 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800533 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
534 LOG(FATAL) << "Channel { \"name\": \"/aos"
535 << channel->name()->string_view() << "\", \"type\": \""
536 << channel->type()->string_view()
537 << "\" } is not able to be sent on this node. Check your "
538 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800539 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800540 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
541 << timing::Report::GetFullyQualifiedName()
542 << "\" } not found in config.";
543 timing_report_sender_ = MakeRawSender(channel);
544
545 // Register a handler which sends the report out by copying the raw data
546 // from the prebuilt and subsequently modified report.
547 TimerHandler *timing_reports_timer =
548 AddTimer([this]() { SendTimingReport(); });
549
550 // Set it up to send once per second.
551 timing_reports_timer->set_name("timing_reports");
552 OnRun([this, timing_reports_timer]() {
553 timing_reports_timer->Setup(
554 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
555 std::chrono::milliseconds(FLAGS_timing_report_ms));
556 });
557
558 UpdateTimingReport();
559 }
560}
561
Austin Schuh7d87b672019-12-01 20:23:49 -0800562void EventLoop::ReserveEvents() {
563 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
564}
565
566namespace {
567bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700568 if (first->event_time() > second->event_time()) {
569 return true;
570 }
571 if (first->event_time() < second->event_time()) {
572 return false;
573 }
574 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800575}
576} // namespace
577
578void EventLoop::AddEvent(EventLoopEvent *event) {
579 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700580 DCHECK(event->generation() == 0);
581 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800582 events_.push_back(event);
583 std::push_heap(events_.begin(), events_.end(), CompareEvents);
584}
585
586void EventLoop::RemoveEvent(EventLoopEvent *event) {
587 auto e = std::find(events_.begin(), events_.end(), event);
588 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700589 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800590 events_.erase(e);
591 std::make_heap(events_.begin(), events_.end(), CompareEvents);
592 event->Invalidate();
593 }
594}
595
596EventLoopEvent *EventLoop::PopEvent() {
597 EventLoopEvent *result = events_.front();
598 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
599 events_.pop_back();
600 result->Invalidate();
601 return result;
602}
603
Austin Schuh0debde12022-08-17 16:25:17 -0700604void EventLoop::ClearContext() {
605 context_.monotonic_event_time = monotonic_clock::min_time;
606 context_.monotonic_remote_time = monotonic_clock::min_time;
607 context_.realtime_event_time = realtime_clock::min_time;
608 context_.realtime_remote_time = realtime_clock::min_time;
609 context_.queue_index = 0xffffffffu;
610 context_.remote_queue_index = 0xffffffffu;
611 context_.size = 0u;
612 context_.data = nullptr;
613 context_.buffer_index = -1;
614 context_.source_boot_uuid = boot_uuid();
615}
616
Austin Schuha9012be2021-07-21 15:19:11 -0700617void EventLoop::SetTimerContext(
618 monotonic_clock::time_point monotonic_event_time) {
619 context_.monotonic_event_time = monotonic_event_time;
620 context_.monotonic_remote_time = monotonic_clock::min_time;
621 context_.realtime_event_time = realtime_clock::min_time;
622 context_.realtime_remote_time = realtime_clock::min_time;
623 context_.queue_index = 0xffffffffu;
Austin Schuh0debde12022-08-17 16:25:17 -0700624 context_.remote_queue_index = 0xffffffffu;
Austin Schuha9012be2021-07-21 15:19:11 -0700625 context_.size = 0u;
626 context_.data = nullptr;
627 context_.buffer_index = -1;
628 context_.source_boot_uuid = boot_uuid();
629}
630
Austin Schuh39788ff2019-12-01 18:22:57 -0800631void WatcherState::set_timing_report(timing::Watcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800632 watcher_ = watcher;
Brian Silvermanbf889922021-11-10 12:41:57 -0800633 if (!watcher) {
634 wakeup_latency_.set_statistic(nullptr);
635 handler_time_.set_statistic(nullptr);
636 } else {
637 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
638 handler_time_.set_statistic(watcher->mutable_handler_time());
639 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800640}
641
642void WatcherState::ResetReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800643 if (!watcher_) {
644 return;
645 }
646
Austin Schuh39788ff2019-12-01 18:22:57 -0800647 wakeup_latency_.Reset();
648 handler_time_.Reset();
649 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800650}
651
652} // namespace aos