| #ifndef AOS_EVENTS_EVENT_LOOP_TMPL_H_ |
| #define AOS_EVENTS_EVENT_LOOP_TMPL_H_ |
| |
| #include <cinttypes> |
| #include <cstdint> |
| #include <type_traits> |
| |
| #include "aos/events/event_loop.h" |
| #include "glog/logging.h" |
| |
| namespace aos { |
| namespace event_loop_internal { |
| |
| // From a watch functor, specializations of this will extract the message type |
| // of the template argument. If T is not a valid message type, there will be no |
| // matching specialization. |
| // |
| // This is just the forward declaration, which will be used by one of the |
| // following specializations to match valid argument types. |
| template <class T> |
| struct watch_message_type_trait; |
| |
| // From a watch functor, this will extract the message type of the argument. |
| // This is the template specialization. |
| template <class ClassType, class ReturnType, class A1> |
| struct watch_message_type_trait<ReturnType (ClassType::*)(A1) const> { |
| using message_type = typename std::decay<A1>::type; |
| }; |
| |
| } // namespace event_loop_internal |
| |
| template <typename T> |
| typename Sender<T>::Builder Sender<T>::MakeBuilder() { |
| return Builder(sender_.get(), sender_->fbb_allocator()); |
| } |
| |
| template <typename Watch> |
| void EventLoop::MakeWatcher(const std::string_view channel_name, Watch &&w) { |
| using MessageType = |
| typename event_loop_internal::watch_message_type_trait<decltype( |
| &Watch::operator())>::message_type; |
| const Channel *channel = configuration::GetChannel( |
| configuration_, channel_name, MessageType::GetFullyQualifiedName(), |
| name(), node()); |
| |
| CHECK(channel != nullptr) |
| << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \"" |
| << MessageType::GetFullyQualifiedName() |
| << "\" } not found in config for application " << name() << "."; |
| |
| MakeRawWatcher(channel, |
| [this, w](const Context &context, const void *message) { |
| context_ = context; |
| w(*flatbuffers::GetRoot<MessageType>( |
| reinterpret_cast<const char *>(message))); |
| }); |
| } |
| |
| template <typename MessageType> |
| void EventLoop::MakeNoArgWatcher(const std::string_view channel_name, |
| std::function<void()> w) { |
| const Channel *channel = configuration::GetChannel( |
| configuration_, channel_name, MessageType::GetFullyQualifiedName(), |
| name(), node()); |
| CHECK(channel != nullptr) |
| << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \"" |
| << MessageType::GetFullyQualifiedName() |
| << "\" } not found in config for application " << name() << "."; |
| MakeRawNoArgWatcher(channel, [this, w](const Context &context) { |
| context_ = context; |
| w(); |
| }); |
| } |
| |
| inline bool RawFetcher::FetchNext() { |
| const auto result = DoFetchNext(); |
| if (result.first) { |
| if (timing_.fetcher) { |
| timing_.fetcher->mutate_count(timing_.fetcher->count() + 1); |
| } |
| const monotonic_clock::time_point monotonic_time = result.second; |
| ftrace_.FormatMessage( |
| "%.*s: fetch next: now=%" PRId64 " event=%" PRId64 " queue=%" PRIu32, |
| static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(), |
| static_cast<int64_t>(monotonic_time.time_since_epoch().count()), |
| static_cast<int64_t>( |
| context_.monotonic_event_time.time_since_epoch().count()), |
| context_.queue_index); |
| const float latency = |
| std::chrono::duration_cast<std::chrono::duration<float>>( |
| monotonic_time - context_.monotonic_event_time) |
| .count(); |
| timing_.latency.Add(latency); |
| return true; |
| } |
| ftrace_.FormatMessage( |
| "%.*s: fetch next: still event=%" PRId64 " queue=%" PRIu32, |
| static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(), |
| static_cast<int64_t>( |
| context_.monotonic_event_time.time_since_epoch().count()), |
| context_.queue_index); |
| return false; |
| } |
| |
| inline bool RawFetcher::Fetch() { |
| const auto result = DoFetch(); |
| if (result.first) { |
| if (timing_.fetcher) { |
| timing_.fetcher->mutate_count(timing_.fetcher->count() + 1); |
| } |
| const monotonic_clock::time_point monotonic_time = result.second; |
| ftrace_.FormatMessage( |
| "%.*s: fetch latest: now=%" PRId64 " event=%" PRId64 " queue=%" PRIu32, |
| static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(), |
| static_cast<int64_t>(monotonic_time.time_since_epoch().count()), |
| static_cast<int64_t>( |
| context_.monotonic_event_time.time_since_epoch().count()), |
| context_.queue_index); |
| const float latency = |
| std::chrono::duration_cast<std::chrono::duration<float>>( |
| monotonic_time - context_.monotonic_event_time) |
| .count(); |
| timing_.latency.Add(latency); |
| return true; |
| } |
| ftrace_.FormatMessage( |
| "%.*s: fetch latest: still event=%" PRId64 " queue=%" PRIu32, |
| static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(), |
| static_cast<int64_t>( |
| context_.monotonic_event_time.time_since_epoch().count()), |
| context_.queue_index); |
| return false; |
| } |
| |
| inline RawSender::Error RawSender::Send(size_t size) { |
| return Send(size, monotonic_clock::min_time, realtime_clock::min_time, |
| 0xffffffffu, event_loop_->boot_uuid()); |
| } |
| |
| inline RawSender::Error RawSender::Send( |
| size_t size, aos::monotonic_clock::time_point monotonic_remote_time, |
| aos::realtime_clock::time_point realtime_remote_time, |
| uint32_t remote_queue_index, const UUID &uuid) { |
| const auto err = DoSend(size, monotonic_remote_time, realtime_remote_time, |
| remote_queue_index, uuid); |
| if (err == RawSender::Error::kOk) { |
| if (timing_.sender) { |
| timing_.size.Add(size); |
| timing_.sender->mutate_count(timing_.sender->count() + 1); |
| } |
| ftrace_.FormatMessage( |
| "%.*s: sent internal: event=%" PRId64 " queue=%" PRIu32, |
| static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(), |
| static_cast<int64_t>(monotonic_sent_time().time_since_epoch().count()), |
| sent_queue_index()); |
| } |
| return err; |
| } |
| |
| inline RawSender::Error RawSender::Send(const void *data, size_t size) { |
| return Send(data, size, monotonic_clock::min_time, realtime_clock::min_time, |
| 0xffffffffu, event_loop_->boot_uuid()); |
| } |
| |
| inline RawSender::Error RawSender::Send( |
| const void *data, size_t size, |
| aos::monotonic_clock::time_point monotonic_remote_time, |
| aos::realtime_clock::time_point realtime_remote_time, |
| uint32_t remote_queue_index, const UUID &uuid) { |
| const auto err = DoSend(data, size, monotonic_remote_time, |
| realtime_remote_time, remote_queue_index, uuid); |
| if (err == RawSender::Error::kOk) { |
| if (timing_.sender) { |
| timing_.size.Add(size); |
| timing_.sender->mutate_count(timing_.sender->count() + 1); |
| } |
| ftrace_.FormatMessage( |
| "%.*s: sent external: event=%" PRId64 " queue=%" PRIu32, |
| static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(), |
| static_cast<int64_t>(monotonic_sent_time().time_since_epoch().count()), |
| sent_queue_index()); |
| } |
| return err; |
| } |
| |
| inline RawSender::Error RawSender::Send(const SharedSpan data) { |
| return Send(std::move(data), monotonic_clock::min_time, |
| realtime_clock::min_time, 0xffffffffu, event_loop_->boot_uuid()); |
| } |
| |
| inline RawSender::Error RawSender::Send( |
| const SharedSpan data, |
| aos::monotonic_clock::time_point monotonic_remote_time, |
| aos::realtime_clock::time_point realtime_remote_time, |
| uint32_t remote_queue_index, const UUID &uuid) { |
| const size_t size = data->size(); |
| const auto err = DoSend(std::move(data), monotonic_remote_time, |
| realtime_remote_time, remote_queue_index, uuid); |
| if (err == Error::kOk) { |
| if (timing_.sender) { |
| timing_.size.Add(size); |
| timing_.sender->mutate_count(timing_.sender->count() + 1); |
| } |
| ftrace_.FormatMessage( |
| "%.*s: sent shared: event=%" PRId64 " queue=%" PRIu32, |
| static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(), |
| static_cast<int64_t>(monotonic_sent_time().time_since_epoch().count()), |
| sent_queue_index()); |
| } |
| return err; |
| } |
| |
| inline monotonic_clock::time_point TimerHandler::Call( |
| std::function<monotonic_clock::time_point()> get_time, |
| monotonic_clock::time_point event_time) { |
| const monotonic_clock::time_point monotonic_start_time = get_time(); |
| |
| event_loop_->SetTimerContext(event_time); |
| |
| ftrace_.FormatMessage( |
| "timer: %.*s: start now=%" PRId64 " event=%" PRId64, |
| static_cast<int>(name_.size()), name_.data(), |
| static_cast<int64_t>(monotonic_start_time.time_since_epoch().count()), |
| static_cast<int64_t>(event_time.time_since_epoch().count())); |
| if (timing_.timer) { |
| const float start_latency = |
| std::chrono::duration_cast<std::chrono::duration<float>>( |
| monotonic_start_time - event_time) |
| .count(); |
| timing_.wakeup_latency.Add(start_latency); |
| timing_.timer->mutate_count(timing_.timer->count() + 1); |
| } |
| fn_(); |
| |
| const monotonic_clock::time_point monotonic_end_time = get_time(); |
| ftrace_.FormatMessage( |
| "timer: %.*s: end now=%" PRId64, static_cast<int>(name_.size()), |
| name_.data(), |
| static_cast<int64_t>(monotonic_end_time.time_since_epoch().count())); |
| |
| const float handler_latency = |
| std::chrono::duration_cast<std::chrono::duration<float>>( |
| monotonic_end_time - monotonic_start_time) |
| .count(); |
| timing_.handler_time.Add(handler_latency); |
| return monotonic_start_time; |
| } |
| |
| inline void PhasedLoopHandler::Call( |
| std::function<monotonic_clock::time_point()> get_time, |
| std::function<void(monotonic_clock::time_point)> schedule) { |
| // Read time directly to save a vtable indirection... |
| const monotonic_clock::time_point monotonic_start_time = get_time(); |
| |
| // Update the context to hold the desired wakeup time. |
| event_loop_->SetTimerContext(phased_loop_.sleep_time()); |
| |
| // Compute how many cycles elapsed |
| cycles_elapsed_ += phased_loop_.Iterate(monotonic_start_time); |
| |
| ftrace_.FormatMessage( |
| "phased: %.*s: start now=%" PRId64 " event=%" PRId64 " cycles=%d", |
| static_cast<int>(name_.size()), name_.data(), |
| static_cast<int64_t>(monotonic_start_time.time_since_epoch().count()), |
| static_cast<int64_t>( |
| phased_loop_.sleep_time().time_since_epoch().count()), |
| cycles_elapsed_); |
| if (timing_.timer) { |
| const float start_latency = |
| std::chrono::duration_cast<std::chrono::duration<float>>( |
| monotonic_start_time - event_loop_->context_.monotonic_event_time) |
| .count(); |
| timing_.wakeup_latency.Add(start_latency); |
| timing_.timer->mutate_count(timing_.timer->count() + 1); |
| } |
| |
| // Call the function with the elapsed cycles. |
| fn_(cycles_elapsed_); |
| cycles_elapsed_ = 0; |
| |
| // Schedule the next wakeup. |
| schedule(phased_loop_.sleep_time()); |
| |
| const monotonic_clock::time_point monotonic_end_time = get_time(); |
| ftrace_.FormatMessage( |
| "phased: %.*s: end now=%" PRId64, static_cast<int>(name_.size()), |
| name_.data(), |
| static_cast<int64_t>(monotonic_end_time.time_since_epoch().count())); |
| |
| const float handler_latency = |
| std::chrono::duration_cast<std::chrono::duration<float>>( |
| monotonic_end_time - monotonic_start_time) |
| .count(); |
| timing_.handler_time.Add(handler_latency); |
| |
| // If the handler took too long so we blew by the previous deadline, we |
| // want to just try for the next deadline. Reschedule. |
| if (monotonic_end_time > phased_loop_.sleep_time()) { |
| Reschedule(schedule, monotonic_end_time); |
| } |
| } |
| |
| // Class to automate the timing report generation for watchers. |
| class WatcherState { |
| public: |
| WatcherState( |
| EventLoop *event_loop, const Channel *channel, |
| std::function<void(const Context &context, const void *message)> fn) |
| : channel_index_(event_loop->ChannelIndex(channel)), |
| ftrace_prefix_(configuration::StrippedChannelToString(channel)), |
| fn_(std::move(fn)) {} |
| |
| virtual ~WatcherState() {} |
| |
| // Calls the callback, measuring time with get_time, with the provided |
| // context. |
| void DoCallCallback(std::function<monotonic_clock::time_point()> get_time, |
| Context context) noexcept { |
| if (context.data) { |
| CheckChannelDataAlignment(context.data, context.size); |
| } |
| const monotonic_clock::time_point monotonic_start_time = get_time(); |
| ftrace_.FormatMessage( |
| "%.*s: watcher start: now=%" PRId64 " event=%" PRId64 " queue=%" PRIu32, |
| static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(), |
| static_cast<int64_t>(monotonic_start_time.time_since_epoch().count()), |
| static_cast<int64_t>( |
| context.monotonic_event_time.time_since_epoch().count()), |
| context.queue_index); |
| if (watcher_) { |
| const float start_latency = |
| std::chrono::duration_cast<std::chrono::duration<float>>( |
| monotonic_start_time - context.monotonic_event_time) |
| .count(); |
| wakeup_latency_.Add(start_latency); |
| watcher_->mutate_count(watcher_->count() + 1); |
| } |
| fn_(context, context.data); |
| |
| const monotonic_clock::time_point monotonic_end_time = get_time(); |
| ftrace_.FormatMessage( |
| "%.*s: watcher end: now=%" PRId64, |
| static_cast<int>(ftrace_prefix_.size()), ftrace_prefix_.data(), |
| static_cast<int64_t>(monotonic_end_time.time_since_epoch().count())); |
| |
| const float handler_latency = |
| std::chrono::duration_cast<std::chrono::duration<float>>( |
| monotonic_end_time - monotonic_start_time) |
| .count(); |
| handler_time_.Add(handler_latency); |
| } |
| |
| int channel_index() const { return channel_index_; } |
| |
| void set_timing_report(timing::Watcher *watcher); |
| void ResetReport(); |
| |
| virtual void Startup(EventLoop *event_loop) = 0; |
| |
| protected: |
| const int channel_index_; |
| const std::string ftrace_prefix_; |
| |
| const std::function<void(const Context &context, const void *message)> fn_; |
| |
| internal::TimingStatistic wakeup_latency_; |
| internal::TimingStatistic handler_time_; |
| timing::Watcher *watcher_ = nullptr; |
| |
| Ftrace ftrace_; |
| }; |
| |
| template <typename T> |
| RawSender::Error Sender<T>::Send( |
| const NonSizePrefixedFlatbuffer<T> &flatbuffer) { |
| return sender_->Send(flatbuffer.span().data(), flatbuffer.span().size()); |
| } |
| |
| template <typename T> |
| RawSender::Error Sender<T>::SendDetached(FlatbufferDetachedBuffer<T> detached) { |
| CHECK_EQ(static_cast<void *>(detached.span().data() + detached.span().size() - |
| sender_->size()), |
| sender_->data()) |
| << ": May only send the buffer detached from this Sender"; |
| return sender_->Send(detached.span().size()); |
| } |
| |
| } // namespace aos |
| |
| #endif // AOS_EVENTS_EVENT_LOOP_TMPL_H |