blob: 51341bb78ddf89d53477defd62cd07dda635e62a [file] [log] [blame]
Austin Schuh54cf95f2019-11-29 13:14:18 -08001#include "aos/events/event_loop.h"
2
3#include "aos/configuration.h"
4#include "aos/configuration_generated.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08005#include "aos/logging/implementations.h"
Austin Schuh54cf95f2019-11-29 13:14:18 -08006#include "glog/logging.h"
7
Austin Schuh39788ff2019-12-01 18:22:57 -08008DEFINE_bool(timing_reports, true, "Publish timing reports.");
9DEFINE_int32(timing_report_ms, 1000,
10 "Period in milliseconds to publish timing reports at.");
11
Austin Schuh54cf95f2019-11-29 13:14:18 -080012namespace aos {
Austin Schuhd54780b2020-10-03 16:26:02 -070013namespace {
14void CheckAlignment(const Channel *channel) {
15 if (channel->max_size() % alignof(flatbuffers::largest_scalar_t) != 0) {
16 LOG(FATAL) << "max_size() (" << channel->max_size()
17 << ") is not a multiple of alignment ("
18 << alignof(flatbuffers::largest_scalar_t) << ") for channel "
19 << configuration::CleanedChannelToString(channel) << ".";
20 }
21}
22} // namespace
Austin Schuh54cf95f2019-11-29 13:14:18 -080023
Austin Schuh39788ff2019-12-01 18:22:57 -080024RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
25 : event_loop_(event_loop),
26 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050027 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080028 timing_(event_loop_->ChannelIndex(channel)) {
29 event_loop_->NewSender(this);
30}
31
32RawSender::~RawSender() { event_loop_->DeleteSender(this); }
33
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070034bool RawSender::DoSend(const SharedSpan data,
35 monotonic_clock::time_point monotonic_remote_time,
36 realtime_clock::time_point realtime_remote_time,
37 uint32_t remote_queue_index,
38 const UUID &source_boot_uuid) {
39 return DoSend(data->data(), data->size(), monotonic_remote_time,
40 realtime_remote_time, remote_queue_index, source_boot_uuid);
41}
42
Austin Schuh39788ff2019-12-01 18:22:57 -080043RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
44 : event_loop_(event_loop),
45 channel_(channel),
Brian Silverman79ec7fc2020-06-08 20:11:22 -050046 ftrace_prefix_(configuration::StrippedChannelToString(channel)),
Austin Schuh39788ff2019-12-01 18:22:57 -080047 timing_(event_loop_->ChannelIndex(channel)) {
Austin Schuhad154822019-12-27 15:45:13 -080048 context_.monotonic_event_time = monotonic_clock::min_time;
49 context_.monotonic_remote_time = monotonic_clock::min_time;
50 context_.realtime_event_time = realtime_clock::min_time;
51 context_.realtime_remote_time = realtime_clock::min_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080052 context_.queue_index = 0xffffffff;
53 context_.size = 0;
54 context_.data = nullptr;
Brian Silverman4f4e0612020-08-12 19:54:41 -070055 context_.buffer_index = -1;
Austin Schuh39788ff2019-12-01 18:22:57 -080056 event_loop_->NewFetcher(this);
57}
58
59RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
60
61TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
62 : event_loop_(event_loop), fn_(std::move(fn)) {}
63
64TimerHandler::~TimerHandler() {}
65
66PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
67 std::function<void(int)> fn,
68 const monotonic_clock::duration interval,
69 const monotonic_clock::duration offset)
70 : event_loop_(event_loop),
71 fn_(std::move(fn)),
72 phased_loop_(interval, event_loop_->monotonic_now(), offset) {
73 event_loop_->OnRun([this]() {
74 const monotonic_clock::time_point monotonic_now =
75 event_loop_->monotonic_now();
76 phased_loop_.Reset(monotonic_now);
77 Reschedule(
78 [this](monotonic_clock::time_point sleep_time) {
79 Schedule(sleep_time);
80 },
81 monotonic_now);
Milind Upadhyay42589bb2021-05-19 20:05:16 -070082 // Reschedule here will count cycles elapsed before now, and then the
83 // reschedule before running the handler will count the time that elapsed
84 // then. So clear the count here.
Austin Schuh39788ff2019-12-01 18:22:57 -080085 cycles_elapsed_ = 0;
86 });
87}
88
89PhasedLoopHandler::~PhasedLoopHandler() {}
90
Austin Schuh83c7f702021-01-19 22:36:29 -080091EventLoop::EventLoop(const Configuration *configuration)
92 : timing_report_(flatbuffers::DetachedBuffer()),
Austin Schuh56196432020-10-24 20:15:21 -070093 configuration_(configuration) {}
Tyler Chatow67ddb032020-01-12 14:30:04 -080094
Austin Schuh39788ff2019-12-01 18:22:57 -080095EventLoop::~EventLoop() {
Brian Silvermanbf889922021-11-10 12:41:57 -080096 if (!senders_.empty()) {
Austin Schuh58646e22021-08-23 23:51:46 -070097 for (const RawSender *sender : senders_) {
98 LOG(ERROR) << " Sender "
99 << configuration::StrippedChannelToString(sender->channel())
100 << " still open";
101 }
102 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800103 CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
Austin Schuh7d87b672019-12-01 20:23:49 -0800104 CHECK_EQ(events_.size(), 0u) << ": Not all events unregistered";
Austin Schuh39788ff2019-12-01 18:22:57 -0800105}
106
Brian Silvermanbf889922021-11-10 12:41:57 -0800107void EventLoop::SkipTimingReport() {
108 skip_timing_report_ = true;
109 timing_report_ = flatbuffers::DetachedBuffer();
110
111 for (size_t i = 0; i < timers_.size(); ++i) {
112 timers_[i]->timing_.set_timing_report(nullptr);
113 }
114
115 for (size_t i = 0; i < phased_loops_.size(); ++i) {
116 phased_loops_[i]->timing_.set_timing_report(nullptr);
117 }
118
119 for (size_t i = 0; i < watchers_.size(); ++i) {
120 watchers_[i]->set_timing_report(nullptr);
121 }
122
123 for (size_t i = 0; i < senders_.size(); ++i) {
124 senders_[i]->timing_.set_timing_report(nullptr);
125 }
126
127 for (size_t i = 0; i < fetchers_.size(); ++i) {
128 fetchers_[i]->timing_.set_timing_report(nullptr);
129 }
130}
131
Austin Schuh39788ff2019-12-01 18:22:57 -0800132int EventLoop::ChannelIndex(const Channel *channel) {
Austin Schuhc9e10ec2020-01-26 16:08:28 -0800133 return configuration::ChannelIndex(configuration_, channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800134}
135
Brian Silverman5120afb2020-01-31 17:44:35 -0800136WatcherState *EventLoop::GetWatcherState(const Channel *channel) {
137 const int channel_index = ChannelIndex(channel);
138 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
139 if (watcher->channel_index() == channel_index) {
140 return watcher.get();
141 }
142 }
143 LOG(FATAL) << "No watcher found for channel";
144}
145
Austin Schuh39788ff2019-12-01 18:22:57 -0800146void EventLoop::NewSender(RawSender *sender) {
147 senders_.emplace_back(sender);
148 UpdateTimingReport();
149}
150void EventLoop::DeleteSender(RawSender *sender) {
151 CHECK(!is_running());
152 auto s = std::find(senders_.begin(), senders_.end(), sender);
153 CHECK(s != senders_.end()) << ": Sender not in senders list";
154 senders_.erase(s);
155 UpdateTimingReport();
156}
157
158TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
159 timers_.emplace_back(std::move(timer));
160 UpdateTimingReport();
161 return timers_.back().get();
162}
163
164PhasedLoopHandler *EventLoop::NewPhasedLoop(
165 std::unique_ptr<PhasedLoopHandler> phased_loop) {
166 phased_loops_.emplace_back(std::move(phased_loop));
167 UpdateTimingReport();
168 return phased_loops_.back().get();
169}
170
171void EventLoop::NewFetcher(RawFetcher *fetcher) {
Austin Schuhd54780b2020-10-03 16:26:02 -0700172 CheckAlignment(fetcher->channel());
173
Austin Schuh39788ff2019-12-01 18:22:57 -0800174 fetchers_.emplace_back(fetcher);
175 UpdateTimingReport();
176}
177
178void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
179 CHECK(!is_running());
180 auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
181 CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
182 fetchers_.erase(f);
183 UpdateTimingReport();
184}
185
186WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
187 watchers_.emplace_back(std::move(watcher));
188
189 UpdateTimingReport();
190
191 return watchers_.back().get();
192}
193
Brian Silverman0fc69932020-01-24 21:54:02 -0800194void EventLoop::TakeWatcher(const Channel *channel) {
195 CHECK(!is_running()) << ": Cannot add new objects while running.";
196 ChannelIndex(channel);
197
Austin Schuhd54780b2020-10-03 16:26:02 -0700198 CheckAlignment(channel);
199
Brian Silverman0fc69932020-01-24 21:54:02 -0800200 CHECK(taken_senders_.find(channel) == taken_senders_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800201 << ": " << configuration::CleanedChannelToString(channel)
202 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800203
204 auto result = taken_watchers_.insert(channel);
Austin Schuh8072f922020-02-16 21:51:47 -0800205 CHECK(result.second) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800206 << " is already being used.";
207
208 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
Austin Schuh8072f922020-02-16 21:51:47 -0800209 LOG(FATAL) << ": " << configuration::CleanedChannelToString(channel)
Brian Silverman0fc69932020-01-24 21:54:02 -0800210 << " is not able to be watched on this node. Check your "
211 "configuration.";
212 }
213}
214
215void EventLoop::TakeSender(const Channel *channel) {
216 CHECK(!is_running()) << ": Cannot add new objects while running.";
217 ChannelIndex(channel);
218
Austin Schuhd54780b2020-10-03 16:26:02 -0700219 CheckAlignment(channel);
220
Brian Silverman0fc69932020-01-24 21:54:02 -0800221 CHECK(taken_watchers_.find(channel) == taken_watchers_.end())
Austin Schuh8072f922020-02-16 21:51:47 -0800222 << ": Channel " << configuration::CleanedChannelToString(channel)
223 << " is already being used.";
Brian Silverman0fc69932020-01-24 21:54:02 -0800224
225 // We don't care if this is a duplicate.
226 taken_senders_.insert(channel);
227}
228
Austin Schuh39788ff2019-12-01 18:22:57 -0800229void EventLoop::SendTimingReport() {
Brian Silvermance418d02021-11-03 11:25:52 -0700230 if (!timing_report_sender_) {
231 // Timing reports are disabled, so nothing for us to do.
232 return;
233 }
234
Austin Schuh39788ff2019-12-01 18:22:57 -0800235 // We need to do a fancy dance here to get all the accounting to work right.
236 // We want to copy the memory here, but then send after resetting. Otherwise
237 // the send for the timing report won't be counted in the timing report.
238 //
239 // Also, flatbuffers build from the back end. So place this at the back end
240 // of the buffer. We only have to care because we are using this in a very
241 // raw fashion.
Austin Schuhadd6eb32020-11-09 21:24:26 -0800242 CHECK_LE(timing_report_.span().size(), timing_report_sender_->size())
Austin Schuh39788ff2019-12-01 18:22:57 -0800243 << ": Timing report bigger than the sender size.";
Austin Schuhadd6eb32020-11-09 21:24:26 -0800244 std::copy(timing_report_.span().data(),
245 timing_report_.span().data() + timing_report_.span().size(),
Austin Schuh39788ff2019-12-01 18:22:57 -0800246 reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
Austin Schuhadd6eb32020-11-09 21:24:26 -0800247 timing_report_sender_->size() - timing_report_.span().size());
Austin Schuh39788ff2019-12-01 18:22:57 -0800248
249 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
250 timer->timing_.ResetTimingReport();
251 }
252 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
253 watcher->ResetReport();
254 }
255 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
256 phased_loop->timing_.ResetTimingReport();
257 }
258 for (RawSender *sender : senders_) {
259 sender->timing_.ResetTimingReport();
260 }
261 for (RawFetcher *fetcher : fetchers_) {
262 fetcher->timing_.ResetTimingReport();
263 }
Austin Schuhadd6eb32020-11-09 21:24:26 -0800264 timing_report_sender_->Send(timing_report_.span().size());
Austin Schuh39788ff2019-12-01 18:22:57 -0800265}
266
267void EventLoop::UpdateTimingReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800268 if (skip_timing_report_) {
269 return;
270 }
271
Austin Schuh39788ff2019-12-01 18:22:57 -0800272 // We need to support senders and fetchers changing while we are setting up
273 // the event loop. Otherwise we can't fetch or send until the loop runs. This
274 // means that on each change, we need to redo all this work. This makes setup
275 // more expensive, but not by all that much on a modern processor.
276
277 // Now, build up a report with everything pre-filled out.
278 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800279 fbb.ForceDefaults(true);
Austin Schuh39788ff2019-12-01 18:22:57 -0800280
281 // Pre-fill in the defaults for timers.
282 std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
283 for (const std::unique_ptr<TimerHandler> &timer : timers_) {
284 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
285 timing::CreateStatistic(fbb);
286 flatbuffers::Offset<timing::Statistic> handler_time_offset =
287 timing::CreateStatistic(fbb);
288 flatbuffers::Offset<flatbuffers::String> name_offset;
289 if (timer->name().size() != 0) {
290 name_offset = fbb.CreateString(timer->name());
291 }
292
293 timing::Timer::Builder timer_builder(fbb);
294
295 if (timer->name().size() != 0) {
296 timer_builder.add_name(name_offset);
297 }
298 timer_builder.add_wakeup_latency(wakeup_latency_offset);
299 timer_builder.add_handler_time(handler_time_offset);
300 timer_builder.add_count(0);
301 timer_offsets.emplace_back(timer_builder.Finish());
302 }
303
304 // Pre-fill in the defaults for phased_loops.
305 std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
306 for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
307 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
308 timing::CreateStatistic(fbb);
309 flatbuffers::Offset<timing::Statistic> handler_time_offset =
310 timing::CreateStatistic(fbb);
311 flatbuffers::Offset<flatbuffers::String> name_offset;
312 if (phased_loop->name().size() != 0) {
313 name_offset = fbb.CreateString(phased_loop->name());
314 }
315
316 timing::Timer::Builder timer_builder(fbb);
317
318 if (phased_loop->name().size() != 0) {
319 timer_builder.add_name(name_offset);
320 }
321 timer_builder.add_wakeup_latency(wakeup_latency_offset);
322 timer_builder.add_handler_time(handler_time_offset);
323 timer_builder.add_count(0);
324 phased_loop_offsets.emplace_back(timer_builder.Finish());
325 }
326
327 // Pre-fill in the defaults for watchers.
328 std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
329 for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
330 flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
331 timing::CreateStatistic(fbb);
332 flatbuffers::Offset<timing::Statistic> handler_time_offset =
333 timing::CreateStatistic(fbb);
334
335 timing::Watcher::Builder watcher_builder(fbb);
336
337 watcher_builder.add_channel_index(watcher->channel_index());
338 watcher_builder.add_wakeup_latency(wakeup_latency_offset);
339 watcher_builder.add_handler_time(handler_time_offset);
340 watcher_builder.add_count(0);
341 watcher_offsets.emplace_back(watcher_builder.Finish());
342 }
343
344 // Pre-fill in the defaults for senders.
345 std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
346 for (const RawSender *sender : senders_) {
347 flatbuffers::Offset<timing::Statistic> size_offset =
348 timing::CreateStatistic(fbb);
349
350 timing::Sender::Builder sender_builder(fbb);
351
352 sender_builder.add_channel_index(sender->timing_.channel_index);
353 sender_builder.add_size(size_offset);
354 sender_builder.add_count(0);
355 sender_offsets.emplace_back(sender_builder.Finish());
356 }
357
358 // Pre-fill in the defaults for fetchers.
359 std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
360 for (RawFetcher *fetcher : fetchers_) {
361 flatbuffers::Offset<timing::Statistic> latency_offset =
362 timing::CreateStatistic(fbb);
363
364 timing::Fetcher::Builder fetcher_builder(fbb);
365
366 fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
367 fetcher_builder.add_count(0);
368 fetcher_builder.add_latency(latency_offset);
369 fetcher_offsets.emplace_back(fetcher_builder.Finish());
370 }
371
372 // Then build the final report.
373 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
374 timers_offset;
375 if (timer_offsets.size() > 0) {
376 timers_offset = fbb.CreateVector(timer_offsets);
377 }
378
379 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
380 phased_loops_offset;
381 if (phased_loop_offsets.size() > 0) {
382 phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
383 }
384
385 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
386 watchers_offset;
387 if (watcher_offsets.size() > 0) {
388 watchers_offset = fbb.CreateVector(watcher_offsets);
389 }
390
391 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
392 senders_offset;
393 if (sender_offsets.size() > 0) {
394 senders_offset = fbb.CreateVector(sender_offsets);
395 }
396
397 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
398 fetchers_offset;
399 if (fetcher_offsets.size() > 0) {
400 fetchers_offset = fbb.CreateVector(fetcher_offsets);
401 }
402
403 flatbuffers::Offset<flatbuffers::String> name_offset =
404 fbb.CreateString(name());
405
406 timing::Report::Builder report_builder(fbb);
407 report_builder.add_name(name_offset);
408 report_builder.add_pid(GetTid());
409 if (timer_offsets.size() > 0) {
410 report_builder.add_timers(timers_offset);
411 }
412 if (phased_loop_offsets.size() > 0) {
413 report_builder.add_phased_loops(phased_loops_offset);
414 }
415 if (watcher_offsets.size() > 0) {
416 report_builder.add_watchers(watchers_offset);
417 }
418 if (sender_offsets.size() > 0) {
419 report_builder.add_senders(senders_offset);
420 }
421 if (fetcher_offsets.size() > 0) {
422 report_builder.add_fetchers(fetchers_offset);
423 }
424 fbb.Finish(report_builder.Finish());
425
426 timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
427
428 // Now that the pointers are stable, pass them to the timers and watchers to
429 // be updated.
430 for (size_t i = 0; i < timers_.size(); ++i) {
431 timers_[i]->timing_.set_timing_report(
432 timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
433 i));
434 }
435
436 for (size_t i = 0; i < phased_loops_.size(); ++i) {
437 phased_loops_[i]->timing_.set_timing_report(
438 timing_report_.mutable_message()
439 ->mutable_phased_loops()
440 ->GetMutableObject(i));
441 }
442
443 for (size_t i = 0; i < watchers_.size(); ++i) {
444 watchers_[i]->set_timing_report(
445 timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
446 i));
447 }
448
449 for (size_t i = 0; i < senders_.size(); ++i) {
450 senders_[i]->timing_.set_timing_report(
451 timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
452 i));
453 }
454
455 for (size_t i = 0; i < fetchers_.size(); ++i) {
456 fetchers_[i]->timing_.set_timing_report(
457 timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
458 i));
459 }
460}
461
462void EventLoop::MaybeScheduleTimingReports() {
463 if (FLAGS_timing_reports && !skip_timing_report_) {
464 CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
465 // Make a raw sender for the report.
466 const Channel *channel = configuration::GetChannel(
467 configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
Austin Schuhbca6cf02019-12-22 17:28:34 -0800468 name(), node());
Austin Schuh196a4452020-03-15 23:12:03 -0700469 CHECK(channel != nullptr) << ": Failed to look up {\"name\": \"/aos\", "
470 "\"type\": \"aos.timing.Report\"} on node "
471 << FlatbufferToJson(node());
Austin Schuhbca6cf02019-12-22 17:28:34 -0800472
473 // Since we are using a RawSender, validity isn't checked. So check it
474 // ourselves.
Austin Schuhca4828c2019-12-28 14:21:35 -0800475 if (!configuration::ChannelIsSendableOnNode(channel, node())) {
476 LOG(FATAL) << "Channel { \"name\": \"/aos"
477 << channel->name()->string_view() << "\", \"type\": \""
478 << channel->type()->string_view()
479 << "\" } is not able to be sent on this node. Check your "
480 "configuration.";
Austin Schuhbca6cf02019-12-22 17:28:34 -0800481 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800482 CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
483 << timing::Report::GetFullyQualifiedName()
484 << "\" } not found in config.";
485 timing_report_sender_ = MakeRawSender(channel);
486
487 // Register a handler which sends the report out by copying the raw data
488 // from the prebuilt and subsequently modified report.
489 TimerHandler *timing_reports_timer =
490 AddTimer([this]() { SendTimingReport(); });
491
492 // Set it up to send once per second.
493 timing_reports_timer->set_name("timing_reports");
494 OnRun([this, timing_reports_timer]() {
495 timing_reports_timer->Setup(
496 monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
497 std::chrono::milliseconds(FLAGS_timing_report_ms));
498 });
499
500 UpdateTimingReport();
501 }
502}
503
Austin Schuh7d87b672019-12-01 20:23:49 -0800504void EventLoop::ReserveEvents() {
505 events_.reserve(timers_.size() + phased_loops_.size() + watchers_.size());
506}
507
508namespace {
509bool CompareEvents(const EventLoopEvent *first, const EventLoopEvent *second) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700510 if (first->event_time() > second->event_time()) {
511 return true;
512 }
513 if (first->event_time() < second->event_time()) {
514 return false;
515 }
516 return first->generation() > second->generation();
Austin Schuh7d87b672019-12-01 20:23:49 -0800517}
518} // namespace
519
520void EventLoop::AddEvent(EventLoopEvent *event) {
521 DCHECK(std::find(events_.begin(), events_.end(), event) == events_.end());
Brian Silvermanbd405c02020-06-23 16:25:23 -0700522 DCHECK(event->generation() == 0);
523 event->set_generation(++event_generation_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800524 events_.push_back(event);
525 std::push_heap(events_.begin(), events_.end(), CompareEvents);
526}
527
528void EventLoop::RemoveEvent(EventLoopEvent *event) {
529 auto e = std::find(events_.begin(), events_.end(), event);
530 if (e != events_.end()) {
Brian Silvermanbd405c02020-06-23 16:25:23 -0700531 DCHECK(event->generation() != 0);
Austin Schuh7d87b672019-12-01 20:23:49 -0800532 events_.erase(e);
533 std::make_heap(events_.begin(), events_.end(), CompareEvents);
534 event->Invalidate();
535 }
536}
537
538EventLoopEvent *EventLoop::PopEvent() {
539 EventLoopEvent *result = events_.front();
540 std::pop_heap(events_.begin(), events_.end(), CompareEvents);
541 events_.pop_back();
542 result->Invalidate();
543 return result;
544}
545
Austin Schuha9012be2021-07-21 15:19:11 -0700546void EventLoop::SetTimerContext(
547 monotonic_clock::time_point monotonic_event_time) {
548 context_.monotonic_event_time = monotonic_event_time;
549 context_.monotonic_remote_time = monotonic_clock::min_time;
550 context_.realtime_event_time = realtime_clock::min_time;
551 context_.realtime_remote_time = realtime_clock::min_time;
552 context_.queue_index = 0xffffffffu;
553 context_.size = 0u;
554 context_.data = nullptr;
555 context_.buffer_index = -1;
556 context_.source_boot_uuid = boot_uuid();
557}
558
Austin Schuh39788ff2019-12-01 18:22:57 -0800559void WatcherState::set_timing_report(timing::Watcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800560 watcher_ = watcher;
Brian Silvermanbf889922021-11-10 12:41:57 -0800561 if (!watcher) {
562 wakeup_latency_.set_statistic(nullptr);
563 handler_time_.set_statistic(nullptr);
564 } else {
565 wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
566 handler_time_.set_statistic(watcher->mutable_handler_time());
567 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800568}
569
570void WatcherState::ResetReport() {
Brian Silvermanbf889922021-11-10 12:41:57 -0800571 if (!watcher_) {
572 return;
573 }
574
Austin Schuh39788ff2019-12-01 18:22:57 -0800575 wakeup_latency_.Reset();
576 handler_time_.Reset();
577 watcher_->mutate_count(0);
Austin Schuh54cf95f2019-11-29 13:14:18 -0800578}
579
580} // namespace aos