Add timing reports for both simulation and shm
This involves a major refactor of how event loops are put together.
Lots of functionality for timing reports is in the base event loop
class (though hidden as much as possible to avoid cluttering the
interface). More functionality is now in the base class, with simpler
and more consistent subclasses.
Change-Id: I163ffb8d1b0e29a0231b54e205c2d7c5241d662b
diff --git a/aos/BUILD b/aos/BUILD
index fca684b..2ed8aa9 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -459,8 +459,6 @@
"testdata/config2.json",
"testdata/config3.json",
"testdata/expected.json",
- "//aos/events:config.fb.json",
- "//aos/events:ping.bfbs",
"//aos/events:pingpong_config.json",
"//aos/events:pong.bfbs",
],
diff --git a/aos/actions/BUILD b/aos/actions/BUILD
index 60ae440..6276105 100644
--- a/aos/actions/BUILD
+++ b/aos/actions/BUILD
@@ -48,6 +48,9 @@
":test_action_fbs",
":test_action2_fbs",
],
+ deps = [
+ "//aos/events:config",
+ ],
)
cc_test(
diff --git a/aos/actions/action_test_config_source.json b/aos/actions/action_test_config_source.json
index e9b6f16..e2ea703 100644
--- a/aos/actions/action_test_config_source.json
+++ b/aos/actions/action_test_config_source.json
@@ -16,5 +16,8 @@
"name": "/test_action2",
"type": "aos.common.actions.Status"
}
+ ],
+ "imports": [
+ "../events/aos.json"
]
}
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 9f841df..50a5263 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -52,10 +52,13 @@
],
visibility = ["//visibility:public"],
deps = [
+ ":event_loop_fbs",
+ ":timing_statistics",
"//aos:configuration",
"//aos:configuration_fbs",
"//aos:flatbuffers",
"//aos/time",
+ "//aos/util:phased_loop",
"@com_github_google_flatbuffers//:flatbuffers",
],
)
@@ -94,12 +97,21 @@
)
aos_config(
+ name = "config",
+ src = "aos.json",
+ flatbuffers = [
+ ":event_loop_fbs",
+ ],
+)
+
+aos_config(
name = "pingpong_config",
- src = "config.fb.json",
+ src = "pingpong.json",
flatbuffers = [
":ping_fbs",
":pong_fbs",
],
+ deps = [":config"],
)
cc_library(
@@ -180,7 +192,9 @@
deps = [
":epoll",
":event_loop",
+ ":event_loop_fbs",
":test_message_fbs",
+ ":timing_statistics",
"//aos:realtime",
"//aos/ipc_lib:lockless_queue",
"//aos/ipc_lib:signalfd",
diff --git a/aos/events/aos.json b/aos/events/aos.json
new file mode 100644
index 0000000..80eb4f7
--- /dev/null
+++ b/aos/events/aos.json
@@ -0,0 +1,9 @@
+{
+ "channels": [
+ {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "frequency": 50
+ }
+ ]
+}
diff --git a/aos/events/event_loop.cc b/aos/events/event_loop.cc
index 6891f92..01f6a3e 100644
--- a/aos/events/event_loop.cc
+++ b/aos/events/event_loop.cc
@@ -4,15 +4,390 @@
#include "aos/configuration_generated.h"
#include "glog/logging.h"
+DEFINE_bool(timing_reports, true, "Publish timing reports.");
+DEFINE_int32(timing_report_ms, 1000,
+ "Period in milliseconds to publish timing reports at.");
+
namespace aos {
-void EventLoop::ValidateChannel(const Channel *channel) {
+RawSender::RawSender(EventLoop *event_loop, const Channel *channel)
+ : event_loop_(event_loop),
+ channel_(channel),
+ timing_(event_loop_->ChannelIndex(channel)) {
+ event_loop_->NewSender(this);
+}
+
+RawSender::~RawSender() { event_loop_->DeleteSender(this); }
+
+RawFetcher::RawFetcher(EventLoop *event_loop, const Channel *channel)
+ : event_loop_(event_loop),
+ channel_(channel),
+ timing_(event_loop_->ChannelIndex(channel)) {
+ context_.monotonic_sent_time = monotonic_clock::min_time;
+ context_.realtime_sent_time = realtime_clock::min_time;
+ context_.queue_index = 0xffffffff;
+ context_.size = 0;
+ context_.data = nullptr;
+ event_loop_->NewFetcher(this);
+}
+
+RawFetcher::~RawFetcher() { event_loop_->DeleteFetcher(this); }
+
+TimerHandler::TimerHandler(EventLoop *event_loop, std::function<void()> fn)
+ : event_loop_(event_loop), fn_(std::move(fn)) {}
+
+TimerHandler::~TimerHandler() {}
+
+PhasedLoopHandler::PhasedLoopHandler(EventLoop *event_loop,
+ std::function<void(int)> fn,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset)
+ : event_loop_(event_loop),
+ fn_(std::move(fn)),
+ phased_loop_(interval, event_loop_->monotonic_now(), offset) {
+ event_loop_->OnRun([this]() {
+ const monotonic_clock::time_point monotonic_now =
+ event_loop_->monotonic_now();
+ phased_loop_.Reset(monotonic_now);
+ Reschedule(
+ [this](monotonic_clock::time_point sleep_time) {
+ Schedule(sleep_time);
+ },
+ monotonic_now);
+ // The first time, we'll double count. Reschedule here will count cycles
+ // elapsed before now, and then the reschedule before runing the handler
+ // will count the time that elapsed then. So clear the count here.
+ cycles_elapsed_ = 0;
+ });
+}
+
+PhasedLoopHandler::~PhasedLoopHandler() {}
+
+EventLoop::~EventLoop() {
+ CHECK_EQ(senders_.size(), 0u) << ": Not all senders destroyed";
+}
+
+int EventLoop::ChannelIndex(const Channel *channel) {
CHECK(configuration_->channels() != nullptr) << ": No channels";
- CHECK(std::find(configuration_->channels()->begin(),
- configuration_->channels()->end(),
- channel) != configuration_->channels()->end())
+ auto c = std::find(configuration_->channels()->begin(),
+ configuration_->channels()->end(), channel);
+ CHECK(c != configuration_->channels()->end())
<< ": Channel pointer not found in configuration()->channels()";
+
+ return std::distance(configuration()->channels()->begin(), c);
+}
+
+void EventLoop::NewSender(RawSender *sender) {
+ senders_.emplace_back(sender);
+ UpdateTimingReport();
+}
+void EventLoop::DeleteSender(RawSender *sender) {
+ CHECK(!is_running());
+ auto s = std::find(senders_.begin(), senders_.end(), sender);
+ CHECK(s != senders_.end()) << ": Sender not in senders list";
+ senders_.erase(s);
+ UpdateTimingReport();
+}
+
+TimerHandler *EventLoop::NewTimer(std::unique_ptr<TimerHandler> timer) {
+ timers_.emplace_back(std::move(timer));
+ UpdateTimingReport();
+ return timers_.back().get();
+}
+
+PhasedLoopHandler *EventLoop::NewPhasedLoop(
+ std::unique_ptr<PhasedLoopHandler> phased_loop) {
+ phased_loops_.emplace_back(std::move(phased_loop));
+ UpdateTimingReport();
+ return phased_loops_.back().get();
+}
+
+void EventLoop::NewFetcher(RawFetcher *fetcher) {
+ fetchers_.emplace_back(fetcher);
+ UpdateTimingReport();
+}
+
+void EventLoop::DeleteFetcher(RawFetcher *fetcher) {
+ CHECK(!is_running());
+ auto f = std::find(fetchers_.begin(), fetchers_.end(), fetcher);
+ CHECK(f != fetchers_.end()) << ": Fetcher not in fetchers list";
+ fetchers_.erase(f);
+ UpdateTimingReport();
+}
+
+WatcherState *EventLoop::NewWatcher(std::unique_ptr<WatcherState> watcher) {
+ watchers_.emplace_back(std::move(watcher));
+
+ UpdateTimingReport();
+
+ return watchers_.back().get();
+}
+
+void EventLoop::SendTimingReport() {
+ // We need to do a fancy dance here to get all the accounting to work right.
+ // We want to copy the memory here, but then send after resetting. Otherwise
+ // the send for the timing report won't be counted in the timing report.
+ //
+ // Also, flatbuffers build from the back end. So place this at the back end
+ // of the buffer. We only have to care because we are using this in a very
+ // raw fashion.
+ CHECK_LE(timing_report_.size(), timing_report_sender_->size())
+ << ": Timing report bigger than the sender size.";
+ std::copy(timing_report_.data(),
+ timing_report_.data() + timing_report_.size(),
+ reinterpret_cast<uint8_t *>(timing_report_sender_->data()) +
+ timing_report_sender_->size() - timing_report_.size());
+
+ for (const std::unique_ptr<TimerHandler> &timer : timers_) {
+ timer->timing_.ResetTimingReport();
+ }
+ for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
+ watcher->ResetReport();
+ }
+ for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
+ phased_loop->timing_.ResetTimingReport();
+ }
+ for (RawSender *sender : senders_) {
+ sender->timing_.ResetTimingReport();
+ }
+ for (RawFetcher *fetcher : fetchers_) {
+ fetcher->timing_.ResetTimingReport();
+ }
+ timing_report_sender_->Send(timing_report_.size());
+}
+
+void EventLoop::UpdateTimingReport() {
+ // We need to support senders and fetchers changing while we are setting up
+ // the event loop. Otherwise we can't fetch or send until the loop runs. This
+ // means that on each change, we need to redo all this work. This makes setup
+ // more expensive, but not by all that much on a modern processor.
+
+ // Now, build up a report with everything pre-filled out.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(1);
+
+ // Pre-fill in the defaults for timers.
+ std::vector<flatbuffers::Offset<timing::Timer>> timer_offsets;
+ for (const std::unique_ptr<TimerHandler> &timer : timers_) {
+ flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
+ timing::CreateStatistic(fbb);
+ flatbuffers::Offset<timing::Statistic> handler_time_offset =
+ timing::CreateStatistic(fbb);
+ flatbuffers::Offset<flatbuffers::String> name_offset;
+ if (timer->name().size() != 0) {
+ name_offset = fbb.CreateString(timer->name());
+ }
+
+ timing::Timer::Builder timer_builder(fbb);
+
+ if (timer->name().size() != 0) {
+ timer_builder.add_name(name_offset);
+ }
+ timer_builder.add_wakeup_latency(wakeup_latency_offset);
+ timer_builder.add_handler_time(handler_time_offset);
+ timer_builder.add_count(0);
+ timer_offsets.emplace_back(timer_builder.Finish());
+ }
+
+ // Pre-fill in the defaults for phased_loops.
+ std::vector<flatbuffers::Offset<timing::Timer>> phased_loop_offsets;
+ for (const std::unique_ptr<PhasedLoopHandler> &phased_loop : phased_loops_) {
+ flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
+ timing::CreateStatistic(fbb);
+ flatbuffers::Offset<timing::Statistic> handler_time_offset =
+ timing::CreateStatistic(fbb);
+ flatbuffers::Offset<flatbuffers::String> name_offset;
+ if (phased_loop->name().size() != 0) {
+ name_offset = fbb.CreateString(phased_loop->name());
+ }
+
+ timing::Timer::Builder timer_builder(fbb);
+
+ if (phased_loop->name().size() != 0) {
+ timer_builder.add_name(name_offset);
+ }
+ timer_builder.add_wakeup_latency(wakeup_latency_offset);
+ timer_builder.add_handler_time(handler_time_offset);
+ timer_builder.add_count(0);
+ phased_loop_offsets.emplace_back(timer_builder.Finish());
+ }
+
+ // Pre-fill in the defaults for watchers.
+ std::vector<flatbuffers::Offset<timing::Watcher>> watcher_offsets;
+ for (const std::unique_ptr<WatcherState> &watcher : watchers_) {
+ flatbuffers::Offset<timing::Statistic> wakeup_latency_offset =
+ timing::CreateStatistic(fbb);
+ flatbuffers::Offset<timing::Statistic> handler_time_offset =
+ timing::CreateStatistic(fbb);
+
+ timing::Watcher::Builder watcher_builder(fbb);
+
+ watcher_builder.add_channel_index(watcher->channel_index());
+ watcher_builder.add_wakeup_latency(wakeup_latency_offset);
+ watcher_builder.add_handler_time(handler_time_offset);
+ watcher_builder.add_count(0);
+ watcher_offsets.emplace_back(watcher_builder.Finish());
+ }
+
+ // Pre-fill in the defaults for senders.
+ std::vector<flatbuffers::Offset<timing::Sender>> sender_offsets;
+ for (const RawSender *sender : senders_) {
+ flatbuffers::Offset<timing::Statistic> size_offset =
+ timing::CreateStatistic(fbb);
+
+ timing::Sender::Builder sender_builder(fbb);
+
+ sender_builder.add_channel_index(sender->timing_.channel_index);
+ sender_builder.add_size(size_offset);
+ sender_builder.add_count(0);
+ sender_offsets.emplace_back(sender_builder.Finish());
+ }
+
+ // Pre-fill in the defaults for fetchers.
+ std::vector<flatbuffers::Offset<timing::Fetcher>> fetcher_offsets;
+ for (RawFetcher *fetcher : fetchers_) {
+ flatbuffers::Offset<timing::Statistic> latency_offset =
+ timing::CreateStatistic(fbb);
+
+ timing::Fetcher::Builder fetcher_builder(fbb);
+
+ fetcher_builder.add_channel_index(fetcher->timing_.channel_index);
+ fetcher_builder.add_count(0);
+ fetcher_builder.add_latency(latency_offset);
+ fetcher_offsets.emplace_back(fetcher_builder.Finish());
+ }
+
+ // Then build the final report.
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
+ timers_offset;
+ if (timer_offsets.size() > 0) {
+ timers_offset = fbb.CreateVector(timer_offsets);
+ }
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Timer>>>
+ phased_loops_offset;
+ if (phased_loop_offsets.size() > 0) {
+ phased_loops_offset = fbb.CreateVector(phased_loop_offsets);
+ }
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Watcher>>>
+ watchers_offset;
+ if (watcher_offsets.size() > 0) {
+ watchers_offset = fbb.CreateVector(watcher_offsets);
+ }
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Sender>>>
+ senders_offset;
+ if (sender_offsets.size() > 0) {
+ senders_offset = fbb.CreateVector(sender_offsets);
+ }
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<timing::Fetcher>>>
+ fetchers_offset;
+ if (fetcher_offsets.size() > 0) {
+ fetchers_offset = fbb.CreateVector(fetcher_offsets);
+ }
+
+ flatbuffers::Offset<flatbuffers::String> name_offset =
+ fbb.CreateString(name());
+
+ timing::Report::Builder report_builder(fbb);
+ report_builder.add_name(name_offset);
+ report_builder.add_pid(GetTid());
+ if (timer_offsets.size() > 0) {
+ report_builder.add_timers(timers_offset);
+ }
+ if (phased_loop_offsets.size() > 0) {
+ report_builder.add_phased_loops(phased_loops_offset);
+ }
+ if (watcher_offsets.size() > 0) {
+ report_builder.add_watchers(watchers_offset);
+ }
+ if (sender_offsets.size() > 0) {
+ report_builder.add_senders(senders_offset);
+ }
+ if (fetcher_offsets.size() > 0) {
+ report_builder.add_fetchers(fetchers_offset);
+ }
+ fbb.Finish(report_builder.Finish());
+
+ timing_report_ = FlatbufferDetachedBuffer<timing::Report>(fbb.Release());
+
+ // Now that the pointers are stable, pass them to the timers and watchers to
+ // be updated.
+ for (size_t i = 0; i < timers_.size(); ++i) {
+ timers_[i]->timing_.set_timing_report(
+ timing_report_.mutable_message()->mutable_timers()->GetMutableObject(
+ i));
+ }
+
+ for (size_t i = 0; i < phased_loops_.size(); ++i) {
+ phased_loops_[i]->timing_.set_timing_report(
+ timing_report_.mutable_message()
+ ->mutable_phased_loops()
+ ->GetMutableObject(i));
+ }
+
+ for (size_t i = 0; i < watchers_.size(); ++i) {
+ watchers_[i]->set_timing_report(
+ timing_report_.mutable_message()->mutable_watchers()->GetMutableObject(
+ i));
+ }
+
+ for (size_t i = 0; i < senders_.size(); ++i) {
+ senders_[i]->timing_.set_timing_report(
+ timing_report_.mutable_message()->mutable_senders()->GetMutableObject(
+ i));
+ }
+
+ for (size_t i = 0; i < fetchers_.size(); ++i) {
+ fetchers_[i]->timing_.set_timing_report(
+ timing_report_.mutable_message()->mutable_fetchers()->GetMutableObject(
+ i));
+ }
+}
+
+void EventLoop::MaybeScheduleTimingReports() {
+ if (FLAGS_timing_reports && !skip_timing_report_) {
+ CHECK(!timing_report_sender_) << ": Timing reports already scheduled.";
+ // Make a raw sender for the report.
+ const Channel *channel = configuration::GetChannel(
+ configuration(), "/aos", timing::Report::GetFullyQualifiedName(),
+ name());
+ CHECK(channel != nullptr) << ": Channel { \"name\": \"/aos\", \"type\": \""
+ << timing::Report::GetFullyQualifiedName()
+ << "\" } not found in config.";
+ timing_report_sender_ = MakeRawSender(channel);
+
+ // Register a handler which sends the report out by copying the raw data
+ // from the prebuilt and subsequently modified report.
+ TimerHandler *timing_reports_timer =
+ AddTimer([this]() { SendTimingReport(); });
+
+ // Set it up to send once per second.
+ timing_reports_timer->set_name("timing_reports");
+ OnRun([this, timing_reports_timer]() {
+ timing_reports_timer->Setup(
+ monotonic_now() + std::chrono::milliseconds(FLAGS_timing_report_ms),
+ std::chrono::milliseconds(FLAGS_timing_report_ms));
+ });
+
+ UpdateTimingReport();
+ }
+}
+
+void WatcherState::set_timing_report(timing::Watcher *watcher) {
+ CHECK_NOTNULL(watcher);
+ watcher_ = watcher;
+ wakeup_latency_.set_statistic(watcher->mutable_wakeup_latency());
+ handler_time_.set_statistic(watcher->mutable_handler_time());
+}
+
+void WatcherState::ResetReport() {
+ wakeup_latency_.Reset();
+ handler_time_.Reset();
+ watcher_->mutate_count(0);
}
} // namespace aos
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 20ca72c..4f24dc0 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -7,14 +7,23 @@
#include "aos/configuration.h"
#include "aos/configuration_generated.h"
+#include "aos/events/event_loop_generated.h"
+#include "aos/events/timing_statistics.h"
#include "aos/flatbuffers.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/time/time.h"
+#include "aos/util/phased_loop.h"
#include "flatbuffers/flatbuffers.h"
#include "glog/logging.h"
+DECLARE_bool(timing_reports);
+DECLARE_int32(timing_report_ms);
+
namespace aos {
+class EventLoop;
+class WatcherState;
+
// Struct available on Watchers and Fetchers with context about the current
// message.
struct Context {
@@ -34,62 +43,76 @@
// fetchers.
class RawFetcher {
public:
- RawFetcher(const Channel *channel) : channel_(channel) {}
- virtual ~RawFetcher() {}
-
- // Non-blocking fetch of the next message in the queue. Returns true if there
- // was a new message and we got it.
- virtual bool FetchNext() = 0;
-
- // Non-blocking fetch of the latest message:
- virtual bool Fetch() = 0;
-
- // Returns a pointer to data in the most recent message, or nullptr if there
- // is no message.
- const void *most_recent_data() const { return data_; }
-
- const Context &context() const { return context_; }
-
- const Channel *channel() const { return channel_; }
-
- protected:
+ RawFetcher(EventLoop *event_loop, const Channel *channel);
RawFetcher(const RawFetcher &) = delete;
RawFetcher &operator=(const RawFetcher &) = delete;
+ virtual ~RawFetcher();
- void *data_ = nullptr;
+ // Fetches the next message in the queue without blocking. Returns true if
+ // there was a new message and we got it.
+ bool FetchNext();
+
+ // Fetches the latest message without blocking.
+ bool Fetch();
+
+ // Returns the channel this fetcher uses.
+ const Channel *channel() const { return channel_; }
+ // Returns the context for the current message.
+ const Context &context() const { return context_; }
+
+ protected:
+ EventLoop *event_loop() { return event_loop_; }
+
Context context_;
+
+ private:
+ friend class EventLoop;
+ // Implementation
+ virtual std::pair<bool, monotonic_clock::time_point> DoFetchNext() = 0;
+ virtual std::pair<bool, monotonic_clock::time_point> DoFetch() = 0;
+
+ EventLoop *event_loop_;
const Channel *channel_;
+
+ internal::RawFetcherTiming timing_;
};
// Raw version of sender. Sends a block of data. This is used for reflection
// and as a building block to implement typed senders.
class RawSender {
public:
- RawSender(const Channel *channel) : channel_(channel) {}
- virtual ~RawSender() {}
+ RawSender(EventLoop *event_loop, const Channel *channel);
+ RawSender(const RawSender &) = delete;
+ RawSender &operator=(const RawSender &) = delete;
+
+ virtual ~RawSender();
// Sends a message without copying it. The users starts by copying up to
// size() bytes into the data backed by data(). They then call Send to send.
// Returns true on a successful send.
virtual void *data() = 0;
virtual size_t size() = 0;
- virtual bool Send(size_t size) = 0;
+ bool Send(size_t size);
// Sends a single block of data by copying it.
- virtual bool Send(const void *data, size_t size) = 0;
-
- // Returns the name of this sender.
- virtual const std::string_view name() const = 0;
+ bool Send(const void *data, size_t size);
const Channel *channel() const { return channel_; }
protected:
- RawSender(const RawSender &) = delete;
- RawSender &operator=(const RawSender &) = delete;
+ EventLoop *event_loop() { return event_loop_; }
+ private:
+ friend class EventLoop;
+
+ virtual bool DoSend(const void *data, size_t size) = 0;
+ virtual bool DoSend(size_t size) = 0;
+
+ EventLoop *event_loop_;
const Channel *channel_;
-};
+ internal::RawSenderTiming timing_;
+};
// Fetches the newest message from a channel.
// This provides a polling based interface for channels.
@@ -110,9 +133,9 @@
// Returns a pointer to the contained flatbuffer, or nullptr if there is no
// available message.
const T *get() const {
- return fetcher_->most_recent_data() != nullptr
- ? flatbuffers::GetRoot<T>(reinterpret_cast<const char *>(
- fetcher_->most_recent_data()))
+ return fetcher_->context().data != nullptr
+ ? flatbuffers::GetRoot<T>(
+ reinterpret_cast<const char *>(fetcher_->context().data))
: nullptr;
}
@@ -174,6 +197,7 @@
// Constructs an above builder.
Builder MakeBuilder();
+ // Returns the name of the underlying queue.
const Channel *channel() const { return sender_->channel(); }
private:
@@ -185,7 +209,7 @@
// Interface for timers
class TimerHandler {
public:
- virtual ~TimerHandler() {}
+ virtual ~TimerHandler();
// Timer should sleep until base, base + offset, base + offset * 2, ...
// If repeat_offset isn't set, the timer only expires once.
@@ -201,39 +225,76 @@
void set_name(std::string_view name) { name_ = std::string(name); }
const std::string_view name() const { return name_; }
+ protected:
+ TimerHandler(EventLoop *event_loop, std::function<void()> fn);
+
+ void Call(std::function<monotonic_clock::time_point()> get_time,
+ monotonic_clock::time_point event_time);
+
private:
+ friend class EventLoop;
+
+ EventLoop *event_loop_;
+ // The function to call when Call is called.
+ std::function<void()> fn_;
std::string name_;
+
+ internal::TimerTiming timing_;
};
// Interface for phased loops. They are built on timers.
class PhasedLoopHandler {
public:
- virtual ~PhasedLoopHandler() {}
+ virtual ~PhasedLoopHandler();
// Sets the interval and offset. Any changes to interval and offset only take
// effect when the handler finishes running.
- virtual void set_interval_and_offset(
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset) = 0;
+ void set_interval_and_offset(const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset) {
+ phased_loop_.set_interval_and_offset(interval, offset);
+ }
// Sets and gets the name of the timer. Set this if you want a descriptive
// name in the timing report.
void set_name(std::string_view name) { name_ = std::string(name); }
const std::string_view name() const { return name_; }
+ protected:
+ void Call(std::function<monotonic_clock::time_point()> get_time,
+ std::function<void(monotonic_clock::time_point)> schedule);
+
+ PhasedLoopHandler(EventLoop *event_loop, std::function<void(int)> fn,
+ const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset);
+
private:
+ friend class EventLoop;
+
+ void Reschedule(std::function<void(monotonic_clock::time_point)> schedule,
+ monotonic_clock::time_point monotonic_now) {
+ cycles_elapsed_ += phased_loop_.Iterate(monotonic_now);
+ schedule(phased_loop_.sleep_time());
+ }
+
+ virtual void Schedule(monotonic_clock::time_point sleep_time) = 0;
+
+ EventLoop *event_loop_;
+ std::function<void(int)> fn_;
std::string name_;
+ time::PhasedLoop phased_loop_;
+
+ int cycles_elapsed_ = 0;
+
+ internal::TimerTiming timing_;
};
-// TODO(austin): Ping pong example apps, and then start doing introspection.
-// TODO(austin): Timing reporter. Publish statistics on latencies of
-// handlers.
class EventLoop {
public:
EventLoop(const Configuration *configuration)
- : configuration_(configuration) {}
+ : timing_report_(flatbuffers::DetachedBuffer()),
+ configuration_(configuration) {}
- virtual ~EventLoop() {}
+ virtual ~EventLoop();
// Current time.
virtual monotonic_clock::time_point monotonic_now() = 0;
@@ -308,6 +369,7 @@
// Sets the scheduler priority to run the event loop at. This may not be
// called after we go into "real-time-mode".
virtual void SetRuntimeRealtimePriority(int priority) = 0;
+ virtual int priority() const = 0;
// Fetches new messages from the provided channel (path, type). Note: this
// channel must be a member of the exact configuration object this was built
@@ -331,19 +393,57 @@
// Will send new messages from channel (path, type).
virtual std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) = 0;
+ // Prevents the event loop from sending a timing report.
+ void SkipTimingReport() { skip_timing_report_ = true; }
+
protected:
void set_is_running(bool value) { is_running_.store(value); }
- // Validates that channel exists inside configuration_.
- void ValidateChannel(const Channel *channel);
+ // Validates that channel exists inside configuration_ and finds its index.
+ int ChannelIndex(const Channel *channel);
// Context available for watchers.
Context context_;
+ friend class RawSender;
+ friend class TimerHandler;
+ friend class RawFetcher;
+ friend class PhasedLoopHandler;
+ friend class WatcherState;
+
+ // Methods used to implement timing reports.
+ void NewSender(RawSender *sender);
+ void DeleteSender(RawSender *sender);
+ TimerHandler *NewTimer(std::unique_ptr<TimerHandler> timer);
+ PhasedLoopHandler *NewPhasedLoop(
+ std::unique_ptr<PhasedLoopHandler> phased_loop);
+ void NewFetcher(RawFetcher *fetcher);
+ void DeleteFetcher(RawFetcher *fetcher);
+ WatcherState *NewWatcher(std::unique_ptr<WatcherState> watcher);
+
+ std::vector<RawSender *> senders_;
+ std::vector<std::unique_ptr<TimerHandler>> timers_;
+ std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
+ std::vector<RawFetcher *> fetchers_;
+ std::vector<std::unique_ptr<WatcherState>> watchers_;
+
+ void SendTimingReport();
+ void UpdateTimingReport();
+ void MaybeScheduleTimingReports();
+
+ std::unique_ptr<RawSender> timing_report_sender_;
+
private:
+ virtual pid_t GetTid() = 0;
+
+ FlatbufferDetachedBuffer<timing::Report> timing_report_;
+
::std::atomic<bool> is_running_{false};
const Configuration *configuration_;
+
+ // If true, don't send out timing reports.
+ bool skip_timing_report_ = false;
};
} // namespace aos
diff --git a/aos/events/event_loop_param_test.cc b/aos/events/event_loop_param_test.cc
index 8defcb5..834ca16 100644
--- a/aos/events/event_loop_param_test.cc
+++ b/aos/events/event_loop_param_test.cc
@@ -57,6 +57,13 @@
auto fetcher = loop2->MakeFetcher<TestMessage>("/test");
EXPECT_FALSE(fetcher.Fetch());
+ EXPECT_EQ(fetcher.get(), nullptr);
+
+ EXPECT_EQ(fetcher.context().monotonic_sent_time, monotonic_clock::min_time);
+ EXPECT_EQ(fetcher.context().realtime_sent_time, realtime_clock::min_time);
+ EXPECT_EQ(fetcher.context().queue_index, 0xffffffffu);
+ EXPECT_EQ(fetcher.context().size, 0u);
+ EXPECT_EQ(fetcher.context().data, nullptr);
aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
@@ -66,6 +73,20 @@
EXPECT_TRUE(fetcher.Fetch());
ASSERT_FALSE(fetcher.get() == nullptr);
EXPECT_EQ(fetcher.get()->value(), 200);
+
+ const chrono::milliseconds kEpsilon(100);
+
+ EXPECT_GE(fetcher.context().monotonic_sent_time,
+ loop2->monotonic_now() - kEpsilon);
+ EXPECT_LE(fetcher.context().monotonic_sent_time,
+ loop2->monotonic_now() + kEpsilon);
+ EXPECT_GE(fetcher.context().realtime_sent_time,
+ loop2->realtime_now() - kEpsilon);
+ EXPECT_LE(fetcher.context().realtime_sent_time,
+ loop2->realtime_now() + kEpsilon);
+ EXPECT_EQ(fetcher.context().queue_index, 0x0u);
+ EXPECT_EQ(fetcher.context().size, 20u);
+ EXPECT_NE(fetcher.context().data, nullptr);
}
// Tests that watcher will receive all messages sent if they are sent after
@@ -474,21 +495,36 @@
// Verify that timer intervals and duration function properly.
TEST_P(AbstractEventLoopTest, TimerIntervalAndDuration) {
+ // Force a slower rate so we are guarenteed to have reports for our timer.
+ FLAGS_timing_report_ms = 2000;
+
const int kCount = 5;
auto loop = MakePrimary();
+ auto loop2 = Make();
+
::std::vector<::aos::monotonic_clock::time_point> times;
::std::vector<::aos::monotonic_clock::time_point> expected_times;
+ Fetcher<timing::Report> report_fetcher =
+ loop2->MakeFetcher<timing::Report>("/aos");
+ EXPECT_FALSE(report_fetcher.Fetch());
+
auto test_timer = loop->AddTimer([this, ×, &expected_times, &loop]() {
times.push_back(loop->monotonic_now());
+ EXPECT_EQ(loop->context().realtime_sent_time, realtime_clock::min_time);
+ EXPECT_EQ(loop->context().queue_index, 0xffffffffu);
+ EXPECT_EQ(loop->context().size, 0u);
+ EXPECT_EQ(loop->context().data, nullptr);
+
expected_times.push_back(loop->context().monotonic_sent_time);
if (times.size() == kCount) {
this->Exit();
}
});
+ test_timer->set_name("Test loop");
- monotonic_clock::time_point start_time = loop->monotonic_now();
+ const monotonic_clock::time_point start_time = loop->monotonic_now();
// TODO(austin): This should be an error... Should be done in OnRun only.
test_timer->Setup(start_time + chrono::seconds(1), chrono::seconds(1));
@@ -540,6 +576,39 @@
EXPECT_LT(expected_times[expected_times.size() / 2], average_time + kEpsilon);
EXPECT_GT(expected_times[expected_times.size() / 2], average_time - kEpsilon);
+
+ // And, since we are here, check that the timing report makes sense.
+ // Start by looking for our event loop's timing.
+ FlatbufferDetachedBuffer<timing::Report> report =
+ FlatbufferDetachedBuffer<timing::Report>::Empty();
+ while (report_fetcher.FetchNext()) {
+ if (report_fetcher->name()->string_view() == "primary") {
+ report = CopyFlatBuffer(report_fetcher.get());
+ }
+ }
+
+ // Confirm that we have the right number of reports, and the contents are
+ // sane.
+ VLOG(1) << FlatbufferToJson(report, true);
+
+ EXPECT_EQ(report.message().name()->string_view(), "primary");
+
+ ASSERT_NE(report.message().senders(), nullptr);
+ EXPECT_EQ(report.message().senders()->size(), 1);
+
+ ASSERT_NE(report.message().timers(), nullptr);
+ EXPECT_EQ(report.message().timers()->size(), 2);
+
+ EXPECT_EQ(report.message().timers()->Get(0)->name()->string_view(),
+ "Test loop");
+ EXPECT_GE(report.message().timers()->Get(0)->count(), 1);
+
+ EXPECT_EQ(report.message().timers()->Get(1)->name()->string_view(),
+ "timing_reports");
+ EXPECT_EQ(report.message().timers()->Get(1)->count(), 1);
+
+ // Make sure there is a single phased loop report with our report in it.
+ ASSERT_EQ(report.message().phased_loops(), nullptr);
}
// Verify that we can change a timer's parameters during execution.
@@ -591,7 +660,7 @@
TEST_P(AbstractEventLoopDeathTest, InvalidChannel) {
auto loop = MakePrimary();
- const Channel *channel = loop->configuration()->channels()->Get(0);
+ const Channel *channel = loop->configuration()->channels()->Get(1);
FlatbufferDetachedBuffer<Channel> channel_copy = CopyFlatBuffer(channel);
@@ -675,26 +744,45 @@
// Tests that a couple phased loops run in a row result in the correct offset
// and period.
TEST_P(AbstractEventLoopTest, PhasedLoopTest) {
+ // Force a slower rate so we are guarenteed to have reports for our phased
+ // loop.
+ FLAGS_timing_report_ms = 2000;
+
const chrono::milliseconds kOffset = chrono::milliseconds(400);
const int kCount = 5;
auto loop1 = MakePrimary();
+ auto loop2 = Make();
+
+ Fetcher<timing::Report> report_fetcher =
+ loop2->MakeFetcher<timing::Report>("/aos");
+ EXPECT_FALSE(report_fetcher.Fetch());
// Collect up a couple of samples.
::std::vector<::aos::monotonic_clock::time_point> times;
::std::vector<::aos::monotonic_clock::time_point> expected_times;
// Run kCount iterations.
- loop1->AddPhasedLoop(
- [×, &expected_times, &loop1, this](int count) {
- EXPECT_EQ(count, 1);
- times.push_back(loop1->monotonic_now());
- expected_times.push_back(loop1->context().monotonic_sent_time);
- if (times.size() == kCount) {
- this->Exit();
- }
- },
- chrono::seconds(1), kOffset);
+ loop1
+ ->AddPhasedLoop(
+ [×, &expected_times, &loop1, this](int count) {
+ EXPECT_EQ(count, 1);
+ times.push_back(loop1->monotonic_now());
+ expected_times.push_back(loop1->context().monotonic_sent_time);
+
+ EXPECT_EQ(loop1->context().realtime_sent_time,
+ realtime_clock::min_time);
+ EXPECT_EQ(loop1->context().queue_index, 0xffffffffu);
+ EXPECT_EQ(loop1->context().size, 0u);
+ EXPECT_EQ(loop1->context().data, nullptr);
+
+ if (times.size() == kCount) {
+ LOG(INFO) << "Exiting";
+ this->Exit();
+ }
+ },
+ chrono::seconds(1), kOffset)
+ ->set_name("Test loop");
// Add a delay to make sure that delay during startup doesn't result in a
// "missed cycle".
@@ -749,6 +837,266 @@
EXPECT_LT(expected_times[expected_times.size() / 2], average_time + kEpsilon);
EXPECT_GT(expected_times[expected_times.size() / 2], average_time - kEpsilon);
+
+ // And, since we are here, check that the timing report makes sense.
+ // Start by looking for our event loop's timing.
+ FlatbufferDetachedBuffer<timing::Report> report =
+ FlatbufferDetachedBuffer<timing::Report>::Empty();
+ while (report_fetcher.FetchNext()) {
+ if (report_fetcher->name()->string_view() == "primary") {
+ report = CopyFlatBuffer(report_fetcher.get());
+ }
+ }
+
+ VLOG(1) << FlatbufferToJson(report, true);
+
+ EXPECT_EQ(report.message().name()->string_view(), "primary");
+
+ ASSERT_NE(report.message().senders(), nullptr);
+ EXPECT_EQ(report.message().senders()->size(), 1);
+
+ ASSERT_NE(report.message().timers(), nullptr);
+ EXPECT_EQ(report.message().timers()->size(), 1);
+
+ // Make sure there is a single phased loop report with our report in it.
+ ASSERT_NE(report.message().phased_loops(), nullptr);
+ ASSERT_EQ(report.message().phased_loops()->size(), 1);
+ EXPECT_EQ(report.message().phased_loops()->Get(0)->name()->string_view(),
+ "Test loop");
+ EXPECT_GE(report.message().phased_loops()->Get(0)->count(), 1);
+}
+
+// Tests that senders count correctly in the timing report.
+TEST_P(AbstractEventLoopTest, SenderTimingReport) {
+ FLAGS_timing_report_ms = 1000;
+ auto loop1 = MakePrimary();
+
+ auto loop2 = Make("watcher_loop");
+ loop2->MakeWatcher("/test", [](const TestMessage &) {});
+
+ auto loop3 = Make();
+
+ Fetcher<timing::Report> report_fetcher =
+ loop3->MakeFetcher<timing::Report>("/aos");
+ EXPECT_FALSE(report_fetcher.Fetch());
+
+ auto sender = loop1->MakeSender<TestMessage>("/test");
+
+ // Add a timer to actually quit.
+ auto test_timer = loop1->AddTimer([&sender]() {
+ for (int i = 0; i < 10; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200 + i);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ });
+
+ // Quit after 1 timing report, mid way through the next cycle.
+ EndEventLoop(loop1.get(), chrono::milliseconds(2500));
+
+ loop1->OnRun([&test_timer, &loop1]() {
+ test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
+ });
+
+ Run();
+
+ // And, since we are here, check that the timing report makes sense.
+ // Start by looking for our event loop's timing.
+ FlatbufferDetachedBuffer<timing::Report> primary_report =
+ FlatbufferDetachedBuffer<timing::Report>::Empty();
+ while (report_fetcher.FetchNext()) {
+ LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+ if (report_fetcher->name()->string_view() == "primary") {
+ primary_report = CopyFlatBuffer(report_fetcher.get());
+ }
+ }
+
+ LOG(INFO) << FlatbufferToJson(primary_report, true);
+
+ EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+ ASSERT_NE(primary_report.message().senders(), nullptr);
+ EXPECT_EQ(primary_report.message().senders()->size(), 2);
+
+ // Confirm that the sender looks sane.
+ EXPECT_EQ(
+ loop1->configuration()
+ ->channels()
+ ->Get(primary_report.message().senders()->Get(0)->channel_index())
+ ->name()
+ ->string_view(),
+ "/test");
+ EXPECT_EQ(primary_report.message().senders()->Get(0)->count(), 10);
+
+ // Confirm that the timing primary_report sender looks sane.
+ EXPECT_EQ(
+ loop1->configuration()
+ ->channels()
+ ->Get(primary_report.message().senders()->Get(1)->channel_index())
+ ->name()
+ ->string_view(),
+ "/aos");
+ EXPECT_EQ(primary_report.message().senders()->Get(1)->count(), 1);
+
+ ASSERT_NE(primary_report.message().timers(), nullptr);
+ EXPECT_EQ(primary_report.message().timers()->size(), 3);
+
+ // Make sure there are no phased loops or watchers.
+ ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+ ASSERT_EQ(primary_report.message().watchers(), nullptr);
+}
+
+// Tests that senders count correctly in the timing report.
+TEST_P(AbstractEventLoopTest, WatcherTimingReport) {
+ FLAGS_timing_report_ms = 1000;
+ auto loop1 = MakePrimary();
+ loop1->MakeWatcher("/test", [](const TestMessage &) {});
+
+ auto loop2 = Make("sender_loop");
+
+ auto loop3 = Make();
+
+ Fetcher<timing::Report> report_fetcher =
+ loop3->MakeFetcher<timing::Report>("/aos");
+ EXPECT_FALSE(report_fetcher.Fetch());
+
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+
+ // Add a timer to actually quit.
+ auto test_timer = loop1->AddTimer([&sender]() {
+ for (int i = 0; i < 10; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200 + i);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ });
+
+ // Quit after 1 timing report, mid way through the next cycle.
+ EndEventLoop(loop1.get(), chrono::milliseconds(2500));
+
+ loop1->OnRun([&test_timer, &loop1]() {
+ test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1500));
+ });
+
+ Run();
+
+ // And, since we are here, check that the timing report makes sense.
+ // Start by looking for our event loop's timing.
+ FlatbufferDetachedBuffer<timing::Report> primary_report =
+ FlatbufferDetachedBuffer<timing::Report>::Empty();
+ while (report_fetcher.FetchNext()) {
+ LOG(INFO) << "Report " << FlatbufferToJson(report_fetcher.get());
+ if (report_fetcher->name()->string_view() == "primary") {
+ primary_report = CopyFlatBuffer(report_fetcher.get());
+ }
+ }
+
+ // Check the watcher report.
+ VLOG(1) << FlatbufferToJson(primary_report, true);
+
+ EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+ // Just the timing report timer.
+ ASSERT_NE(primary_report.message().timers(), nullptr);
+ EXPECT_EQ(primary_report.message().timers()->size(), 3);
+
+ // No phased loops
+ ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+
+ ASSERT_NE(primary_report.message().watchers(), nullptr);
+ ASSERT_EQ(primary_report.message().watchers()->size(), 1);
+ EXPECT_EQ(primary_report.message().watchers()->Get(0)->count(), 10);
+}
+
+// Tests that fetchers count correctly in the timing report.
+TEST_P(AbstractEventLoopTest, FetcherTimingReport) {
+ FLAGS_timing_report_ms = 1000;
+ auto loop1 = MakePrimary();
+ auto loop2 = Make("sender_loop");
+
+ auto loop3 = Make();
+
+ Fetcher<timing::Report> report_fetcher =
+ loop3->MakeFetcher<timing::Report>("/aos");
+ EXPECT_FALSE(report_fetcher.Fetch());
+
+ auto sender = loop2->MakeSender<TestMessage>("/test");
+ auto fetcher1 = loop1->MakeFetcher<TestMessage>("/test");
+ auto fetcher2 = loop1->MakeFetcher<TestMessage>("/test");
+ fetcher1.Fetch();
+ fetcher2.Fetch();
+
+ // Add a timer to actually quit.
+ auto test_timer = loop1->AddTimer([&sender]() {
+ for (int i = 0; i < 10; ++i) {
+ aos::Sender<TestMessage>::Builder msg = sender.MakeBuilder();
+ TestMessage::Builder builder = msg.MakeBuilder<TestMessage>();
+ builder.add_value(200 + i);
+ ASSERT_TRUE(msg.Send(builder.Finish()));
+ }
+ });
+
+ auto test_timer2 = loop1->AddTimer([&fetcher1, &fetcher2]() {
+ fetcher1.Fetch();
+ while (fetcher2.FetchNext()) {
+ }
+ });
+
+ // Quit after 1 timing report, mid way through the next cycle.
+ EndEventLoop(loop1.get(), chrono::milliseconds(2500));
+
+ loop1->OnRun([test_timer, test_timer2, &loop1]() {
+ test_timer->Setup(loop1->monotonic_now() + chrono::milliseconds(1400));
+ test_timer2->Setup(loop1->monotonic_now() + chrono::milliseconds(1600));
+ });
+
+ Run();
+
+ // And, since we are here, check that the timing report makes sense.
+ // Start by looking for our event loop's timing.
+ FlatbufferDetachedBuffer<timing::Report> primary_report =
+ FlatbufferDetachedBuffer<timing::Report>::Empty();
+ while (report_fetcher.FetchNext()) {
+ if (report_fetcher->name()->string_view() == "primary") {
+ primary_report = CopyFlatBuffer(report_fetcher.get());
+ }
+ }
+
+ VLOG(1) << FlatbufferToJson(primary_report, true);
+
+ EXPECT_EQ(primary_report.message().name()->string_view(), "primary");
+
+ ASSERT_NE(primary_report.message().senders(), nullptr);
+ EXPECT_EQ(primary_report.message().senders()->size(), 1);
+
+ ASSERT_NE(primary_report.message().timers(), nullptr);
+ EXPECT_EQ(primary_report.message().timers()->size(), 4);
+
+ // Make sure there are no phased loops or watchers.
+ ASSERT_EQ(primary_report.message().phased_loops(), nullptr);
+ ASSERT_EQ(primary_report.message().watchers(), nullptr);
+
+ // Now look at the fetchrs.
+ ASSERT_NE(primary_report.message().fetchers(), nullptr);
+ ASSERT_EQ(primary_report.message().fetchers()->size(), 2);
+
+ EXPECT_EQ(primary_report.message().fetchers()->Get(0)->count(), 1);
+ EXPECT_GE(primary_report.message().fetchers()->Get(0)->latency()->average(),
+ 0.1);
+ EXPECT_GE(primary_report.message().fetchers()->Get(0)->latency()->min(),
+ 0.1);
+ EXPECT_GE(primary_report.message().fetchers()->Get(0)->latency()->max(),
+ 0.1);
+ EXPECT_EQ(primary_report.message()
+ .fetchers()
+ ->Get(0)
+ ->latency()
+ ->standard_deviation(),
+ 0.0);
+
+ EXPECT_EQ(primary_report.message().fetchers()->Get(1)->count(), 10);
}
} // namespace testing
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index 8a8f33a..b69f28a 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -18,6 +18,10 @@
: flatbuffer_(JsonToFlatbuffer("{\n"
" \"channels\": [ \n"
" {\n"
+ " \"name\": \"/aos\",\n"
+ " \"type\": \"aos.timing.Report\"\n"
+ " },\n"
+ " {\n"
" \"name\": \"/test\",\n"
" \"type\": \"aos.TestMessage\"\n"
" },\n"
@@ -88,6 +92,7 @@
auto end_timer = loop->AddTimer([this]() { this->Exit(); });
end_timer->Setup(loop->monotonic_now() +
::std::chrono::milliseconds(duration));
+ end_timer->set_name("end");
}
// You can implement all the usual fixture class members here.
diff --git a/aos/events/event_loop_tmpl.h b/aos/events/event_loop_tmpl.h
index bfd9775..f652bb6 100644
--- a/aos/events/event_loop_tmpl.h
+++ b/aos/events/event_loop_tmpl.h
@@ -43,6 +43,180 @@
});
}
+inline bool RawFetcher::FetchNext() {
+ const auto result = DoFetchNext();
+ if (result.first) {
+ timing_.fetcher->mutate_count(timing_.fetcher->count() + 1);
+ const monotonic_clock::time_point monotonic_time = result.second;
+ const float latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_time - context_.monotonic_sent_time)
+ .count();
+ timing_.latency.Add(latency);
+ return true;
+ }
+ return false;
+}
+
+inline bool RawFetcher::Fetch() {
+ const auto result = DoFetch();
+ if (result.first) {
+ timing_.fetcher->mutate_count(timing_.fetcher->count() + 1);
+ const monotonic_clock::time_point monotonic_time = result.second;
+ const float latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_time - context_.monotonic_sent_time)
+ .count();
+ timing_.latency.Add(latency);
+ return true;
+ }
+ return false;
+}
+
+inline bool RawSender::Send(size_t size) {
+ if (DoSend(size)) {
+ timing_.size.Add(size);
+ timing_.sender->mutate_count(timing_.sender->count() + 1);
+ return true;
+ }
+ return false;
+}
+
+inline bool RawSender::Send(const void *data, size_t size) {
+ if (DoSend(data, size)) {
+ timing_.size.Add(size);
+ timing_.sender->mutate_count(timing_.sender->count() + 1);
+ return true;
+ }
+ return false;
+}
+
+inline void TimerHandler::Call(
+ std::function<monotonic_clock::time_point()> get_time,
+ monotonic_clock::time_point event_time) {
+ CHECK_NOTNULL(timing_.timer);
+ const monotonic_clock::time_point monotonic_start_time = get_time();
+
+ event_loop_->context_.monotonic_sent_time = event_time;
+ event_loop_->context_.realtime_sent_time = realtime_clock::min_time;
+ event_loop_->context_.queue_index = 0xffffffffu;
+ event_loop_->context_.size = 0;
+ event_loop_->context_.data = nullptr;
+
+ {
+ const float start_latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_start_time - event_time)
+ .count();
+ timing_.wakeup_latency.Add(start_latency);
+ }
+ timing_.timer->mutate_count(timing_.timer->count() + 1);
+ fn_();
+
+ const monotonic_clock::time_point monotonic_end_time = get_time();
+
+ const float handler_latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_end_time - monotonic_start_time)
+ .count();
+ timing_.handler_time.Add(handler_latency);
+}
+
+inline void PhasedLoopHandler::Call(
+ std::function<monotonic_clock::time_point()> get_time,
+ std::function<void(monotonic_clock::time_point)> schedule) {
+ // Read time directly to save a vtable indirection...
+ const monotonic_clock::time_point monotonic_start_time = get_time();
+
+ // Update the context to hold the desired wakeup time.
+ event_loop_->context_.monotonic_sent_time = phased_loop_.sleep_time();
+ event_loop_->context_.realtime_sent_time = realtime_clock::min_time;
+ event_loop_->context_.queue_index = 0xffffffffu;
+ event_loop_->context_.size = 0;
+ event_loop_->context_.data = nullptr;
+
+ // Compute how many cycles elapsed and schedule the next wakeup.
+ Reschedule(schedule, monotonic_start_time);
+
+ {
+ const float start_latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_start_time - event_loop_->context_.monotonic_sent_time)
+ .count();
+ timing_.wakeup_latency.Add(start_latency);
+ }
+ timing_.timer->mutate_count(timing_.timer->count() + 1);
+
+ // Call the function with the elapsed cycles.
+ fn_(cycles_elapsed_);
+ cycles_elapsed_ = 0;
+
+ const monotonic_clock::time_point monotonic_end_time = get_time();
+
+ const float handler_latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_end_time - monotonic_start_time)
+ .count();
+ timing_.handler_time.Add(handler_latency);
+
+ // If the handler too too long so we blew by the previous deadline, we
+ // want to just try for the next deadline. Rescuedule.
+ if (monotonic_end_time > phased_loop_.sleep_time()) {
+ Reschedule(schedule, monotonic_end_time);
+ }
+}
+
+// Class to automate the timing report generation for watchers.
+class WatcherState {
+ public:
+ WatcherState(
+ EventLoop *event_loop, const Channel *channel,
+ std::function<void(const Context &context, const void *message)> fn)
+ : channel_index_(event_loop->ChannelIndex(channel)), fn_(std::move(fn)) {}
+
+ virtual ~WatcherState() {}
+
+ // Calls the callback, measuring time with get_time, with the provided
+ // context.
+ void DoCallCallback(std::function<monotonic_clock::time_point()> get_time,
+ Context context) {
+ const monotonic_clock::time_point monotonic_start_time = get_time();
+ {
+ const float start_latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_start_time - context.monotonic_sent_time)
+ .count();
+ wakeup_latency_.Add(start_latency);
+ }
+ watcher_->mutate_count(watcher_->count() + 1);
+ fn_(context, context.data);
+
+ const monotonic_clock::time_point monotonic_end_time = get_time();
+
+ const float handler_latency =
+ std::chrono::duration_cast<std::chrono::duration<float>>(
+ monotonic_end_time - monotonic_start_time)
+ .count();
+ handler_time_.Add(handler_latency);
+ }
+
+ int channel_index() const { return channel_index_; }
+
+ void set_timing_report(timing::Watcher *watcher);
+ void ResetReport();
+
+ virtual void Startup(EventLoop *event_loop) = 0;
+
+ protected:
+ const int channel_index_;
+
+ std::function<void(const Context &context, const void *message)> fn_;
+
+ internal::TimingStatistic wakeup_latency_;
+ internal::TimingStatistic handler_time_;
+ timing::Watcher *watcher_ = nullptr;
+};
+
} // namespace aos
#endif // AOS_EVENTS_EVENT_LOOP_TMPL_H
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index a4cfa72..b5a530e 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -20,6 +20,10 @@
const ::aos::monotonic_clock::time_point end_time =
monotonic_now() + duration;
is_running_ = true;
+ for (std::function<void()> &on_run : on_run_) {
+ on_run();
+ }
+ on_run_.clear();
while (!events_list_.empty() && is_running_) {
auto iter = events_list_.begin();
::aos::monotonic_clock::time_point next_time = iter->first;
@@ -36,6 +40,10 @@
void EventScheduler::Run() {
is_running_ = true;
+ for (std::function<void()> &on_run : on_run_) {
+ on_run();
+ }
+ on_run_.clear();
while (!events_list_.empty() && is_running_) {
auto iter = events_list_.begin();
now_ = iter->first;
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 66d34b3..2fcf90b 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -25,6 +25,11 @@
Token Schedule(monotonic_clock::time_point time,
std::function<void()> callback);
+ // Schedules a callback when the event scheduler starts.
+ void ScheduleOnRun(std::function<void()> callback) {
+ on_run_.emplace_back(std::move(callback));
+ }
+
Token InvalidToken() { return events_list_.end(); }
// Deschedule an event by its iterator
@@ -50,6 +55,8 @@
// Current execution time.
monotonic_clock::time_point now_ = monotonic_clock::epoch();
+ std::vector<std::function<void()>> on_run_;
+
// Multimap holding times to run functions. These are stored in order, and
// the order is the callback tree.
ChannelType events_list_;
diff --git a/aos/events/logger.cc b/aos/events/logger.cc
index 6b659c0..d39f0a7 100644
--- a/aos/events/logger.cc
+++ b/aos/events/logger.cc
@@ -378,6 +378,9 @@
void LogReader::Register(EventLoop *event_loop) {
event_loop_ = event_loop;
+ // Otherwise we replay the timing report and try to resend it...
+ event_loop_->SkipTimingReport();
+
for (size_t i = 0; i < channels_.size(); ++i) {
CHECK_EQ(configuration()->channels()->Get(i)->name(),
event_loop_->configuration()->channels()->Get(i)->name());
@@ -392,7 +395,7 @@
std::pair<monotonic_clock::time_point, int> oldest_channel_index =
PopOldestChannel();
const monotonic_clock::time_point monotonic_now =
- event_loop_->monotonic_now();
+ event_loop_->context().monotonic_sent_time;
CHECK(monotonic_now == oldest_channel_index.first)
<< ": Now " << monotonic_now.time_since_epoch().count()
<< " trying to send "
diff --git a/aos/events/ping_lib.cc b/aos/events/ping_lib.cc
index 31c7b18..4bbf580 100644
--- a/aos/events/ping_lib.cc
+++ b/aos/events/ping_lib.cc
@@ -7,6 +7,7 @@
#include "glog/logging.h"
DEFINE_int32(sleep_ms, 10, "Time to sleep between pings");
+DEFINE_bool(phased_loop, false, "If true, use a phased loop");
namespace aos {
@@ -14,15 +15,25 @@
Ping::Ping(EventLoop *event_loop)
: event_loop_(event_loop),
- sender_(event_loop_->MakeSender<examples::Ping>("/test")) {
- timer_handle_ = event_loop_->AddTimer([this]() { SendPing(); });
+ sender_(event_loop_->MakeSender<examples::Ping>("/test")),
+ pong_fetcher_(event_loop_->MakeFetcher<examples::Pong>("/test")) {
+ if (FLAGS_phased_loop) {
+ phased_loop_handle_ = event_loop_->AddPhasedLoop(
+ [this](int) { SendPing(); }, chrono::milliseconds(FLAGS_sleep_ms));
+ phased_loop_handle_->set_name("ping");
+ } else {
+ timer_handle_ = event_loop_->AddTimer([this]() { SendPing(); });
+ timer_handle_->set_name("ping");
+ }
event_loop_->MakeWatcher(
"/test", [this](const examples::Pong &pong) { HandlePong(pong); });
event_loop_->OnRun([this]() {
- timer_handle_->Setup(event_loop_->monotonic_now(),
- chrono::milliseconds(FLAGS_sleep_ms));
+ if (!FLAGS_phased_loop) {
+ timer_handle_->Setup(event_loop_->monotonic_now(),
+ chrono::milliseconds(FLAGS_sleep_ms));
+ }
});
event_loop_->SetRuntimeRealtimePriority(5);
@@ -40,6 +51,7 @@
}
void Ping::HandlePong(const examples::Pong &pong) {
+ pong_fetcher_.Fetch();
const aos::monotonic_clock::time_point monotonic_send_time(
chrono::nanoseconds(pong.initial_send_time()));
const aos::monotonic_clock::time_point monotonic_now =
diff --git a/aos/events/ping_lib.h b/aos/events/ping_lib.h
index 2cc4f5e..6be75ed 100644
--- a/aos/events/ping_lib.h
+++ b/aos/events/ping_lib.h
@@ -25,6 +25,8 @@
aos::Sender<examples::Ping> sender_;
// Timer handle which sends the Ping message.
aos::TimerHandler *timer_handle_;
+ aos::PhasedLoopHandler *phased_loop_handle_;
+ aos::Fetcher<examples::Pong> pong_fetcher_;
// Number of pings sent.
int count_ = 0;
};
diff --git a/aos/events/config.fb.json b/aos/events/pingpong.json
similarity index 82%
rename from aos/events/config.fb.json
rename to aos/events/pingpong.json
index 4068243..0cdba93 100644
--- a/aos/events/config.fb.json
+++ b/aos/events/pingpong.json
@@ -8,5 +8,8 @@
"name": "/test",
"type": "aos.examples.Pong"
}
+ ],
+ "imports": [
+ "aos.json"
]
}
diff --git a/aos/events/pingpong_test.cc b/aos/events/pingpong_test.cc
index fca1867..07e6d31 100644
--- a/aos/events/pingpong_test.cc
+++ b/aos/events/pingpong_test.cc
@@ -13,8 +13,8 @@
class PingPongTest : public ::testing::Test {
public:
PingPongTest()
- : config_(aos::configuration::ReadConfig(
- "aos/events/pingpong_config.json")),
+ : config_(
+ aos::configuration::ReadConfig("aos/events/pingpong_config.json")),
event_loop_factory_(&config_.message()),
ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
ping_(ping_event_loop_.get()),
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 9e18544..a8a8e01 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -1,21 +1,24 @@
-#include "glog/logging.h"
-
#include "aos/events/shm_event_loop.h"
#include <sys/mman.h>
#include <sys/stat.h>
-#include <sys/timerfd.h>
+#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
#include <atomic>
#include <chrono>
+#include <iterator>
#include <stdexcept>
#include "aos/events/epoll.h"
+#include "aos/events/event_loop_generated.h"
+#include "aos/events/timing_statistics.h"
#include "aos/ipc_lib/lockless_queue.h"
+#include "aos/ipc_lib/signalfd.h"
#include "aos/realtime.h"
#include "aos/util/phased_loop.h"
+#include "glog/logging.h"
DEFINE_string(shm_base, "/dev/shm/aos",
"Directory to place queue backing mmaped files in.");
@@ -96,9 +99,7 @@
return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
}
- const ipc_lib::LocklessQueueConfiguration &config() const {
- return config_;
- }
+ const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
private:
void MkdirP(std::string_view path) {
@@ -141,11 +142,14 @@
namespace chrono = ::std::chrono;
-class ShmFetcher : public RawFetcher {
+} // namespace
+
+namespace internal {
+
+class SimpleShmFetcher {
public:
- explicit ShmFetcher(const Channel *channel)
- : RawFetcher(channel),
- lockless_queue_memory_(channel),
+ explicit SimpleShmFetcher(const Channel *channel)
+ : lockless_queue_memory_(channel),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()),
data_storage_(static_cast<AlignedChar *>(aligned_alloc(
@@ -158,7 +162,7 @@
PointAtNextQueueIndex();
}
- ~ShmFetcher() { data_ = nullptr; }
+ ~SimpleShmFetcher() {}
// Points the next message to fetch at the queue index which will be
// populated next.
@@ -174,7 +178,7 @@
}
}
- bool FetchNext() override {
+ bool FetchNext() {
// TODO(austin): Write a test which starts with nothing in the queue,
// and then calls FetchNext() after something is sent.
// TODO(austin): Get behind and make sure it dies both here and with
@@ -185,9 +189,8 @@
reinterpret_cast<char *>(data_storage_.get()));
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = actual_queue_index_.index();
- data_ = reinterpret_cast<char *>(data_storage_.get()) +
- lockless_queue_.message_data_size() - context_.size;
- context_.data = data_;
+ context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+ lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = actual_queue_index_.Increment();
}
@@ -205,7 +208,7 @@
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
- bool Fetch() override {
+ bool Fetch() {
const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
// actual_queue_index_ is only meaningful if it was set by Fetch or
// FetchNext. This happens when valid_data_ has been set. So, only
@@ -213,7 +216,7 @@
//
// Also, if the latest queue index is invalid, we are empty. So there
// is nothing to fetch.
- if ((data_ != nullptr &&
+ if ((context_.data != nullptr &&
queue_index == actual_queue_index_.DecrementBy(1u)) ||
!queue_index.valid()) {
return false;
@@ -225,9 +228,8 @@
reinterpret_cast<char *>(data_storage_.get()));
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = queue_index.index();
- data_ = reinterpret_cast<char *>(data_storage_.get()) +
- lockless_queue_.message_data_size() - context_.size;
- context_.data = data_;
+ context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+ lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = queue_index.Increment();
}
@@ -251,6 +253,8 @@
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
+ Context context() const { return context_; }
+
bool RegisterWakeup(int priority) {
return lockless_queue_.RegisterWakeup(priority);
}
@@ -269,66 +273,93 @@
};
std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
+
+ Context context_;
+};
+
+class ShmFetcher : public RawFetcher {
+ public:
+ explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
+ : RawFetcher(event_loop, channel), simple_shm_fetcher_(channel) {}
+
+ ~ShmFetcher() { context_.data = nullptr; }
+
+ std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
+ if (simple_shm_fetcher_.FetchNext()) {
+ context_ = simple_shm_fetcher_.context();
+ return std::make_pair(true, monotonic_clock::now());
+ }
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
+
+ std::pair<bool, monotonic_clock::time_point> DoFetch() override {
+ if (simple_shm_fetcher_.Fetch()) {
+ context_ = simple_shm_fetcher_.context();
+ return std::make_pair(true, monotonic_clock::now());
+ }
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
+
+ private:
+ SimpleShmFetcher simple_shm_fetcher_;
};
class ShmSender : public RawSender {
public:
- explicit ShmSender(const Channel *channel, const ShmEventLoop *shm_event_loop)
- : RawSender(channel),
- shm_event_loop_(shm_event_loop),
- name_(channel->name()->str()),
+ explicit ShmSender(EventLoop *event_loop, const Channel *channel)
+ : RawSender(event_loop, channel),
lockless_queue_memory_(channel),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()),
lockless_queue_sender_(lockless_queue_.MakeSender()) {}
+ ~ShmSender() override {}
+
void *data() override { return lockless_queue_sender_.Data(); }
size_t size() override { return lockless_queue_sender_.size(); }
- bool Send(size_t size) override {
- lockless_queue_sender_.Send(size);
- lockless_queue_.Wakeup(shm_event_loop_->priority());
+ bool DoSend(size_t length) override {
+ lockless_queue_sender_.Send(length);
+ lockless_queue_.Wakeup(event_loop()->priority());
return true;
}
- bool Send(const void *msg, size_t length) override {
+ bool DoSend(const void *msg, size_t length) override {
lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length);
- lockless_queue_.Wakeup(shm_event_loop_->priority());
+ lockless_queue_.Wakeup(event_loop()->priority());
// TODO(austin): Return an error if we send too fast.
return true;
}
- const std::string_view name() const override { return name_; }
-
private:
- const ShmEventLoop *shm_event_loop_;
- std::string name_;
MMapedQueue lockless_queue_memory_;
ipc_lib::LocklessQueue lockless_queue_;
ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
};
-} // namespace
-
-namespace internal {
-
// Class to manage the state for a Watcher.
-class WatcherState {
+class WatcherState : public aos::WatcherState {
public:
WatcherState(
- const Channel *channel,
- std::function<void(const Context &context, const void *message)> watcher)
- : shm_fetcher_(channel), watcher_(watcher) {}
+ EventLoop *event_loop, const Channel *channel,
+ std::function<void(const Context &context, const void *message)> fn)
+ : aos::WatcherState(event_loop, channel, std::move(fn)),
+ simple_shm_fetcher_(channel) {}
- ~WatcherState() {}
+ ~WatcherState() override {}
+
+ void Startup(EventLoop *event_loop) override {
+ PointAtNextQueueIndex();
+ CHECK(RegisterWakeup(event_loop->priority()));
+ }
// Points the next message to fetch at the queue index which will be populated
// next.
- void PointAtNextQueueIndex() { shm_fetcher_.PointAtNextQueueIndex(); }
+ void PointAtNextQueueIndex() { simple_shm_fetcher_.PointAtNextQueueIndex(); }
// Returns true if there is new data available.
bool HasNewData() {
if (!has_new_data_) {
- has_new_data_ = shm_fetcher_.FetchNext();
+ has_new_data_ = simple_shm_fetcher_.FetchNext();
}
return has_new_data_;
@@ -336,46 +367,39 @@
// Returns the time of the current data sample.
aos::monotonic_clock::time_point event_time() const {
- return shm_fetcher_.context().monotonic_sent_time;
+ return simple_shm_fetcher_.context().monotonic_sent_time;
}
// Consumes the data by calling the callback.
void CallCallback() {
CHECK(has_new_data_);
- watcher_(shm_fetcher_.context(), shm_fetcher_.most_recent_data());
+ DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
has_new_data_ = false;
}
- // Starts the thread and waits until it is running.
+ // Registers us to receive a signal on event reception.
bool RegisterWakeup(int priority) {
- return shm_fetcher_.RegisterWakeup(priority);
+ return simple_shm_fetcher_.RegisterWakeup(priority);
}
- void UnregisterWakeup() { return shm_fetcher_.UnregisterWakeup(); }
+ void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
private:
bool has_new_data_ = false;
- ShmFetcher shm_fetcher_;
-
- std::function<void(const Context &context, const void *message)> watcher_;
+ SimpleShmFetcher simple_shm_fetcher_;
};
// Adapter class to adapt a timerfd to a TimerHandler.
class TimerHandlerState : public TimerHandler {
public:
TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
- : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
+ : TimerHandler(shm_event_loop, std::move(fn)),
+ shm_event_loop_(shm_event_loop) {
shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
const uint64_t elapsed_cycles = timerfd_.Read();
- shm_event_loop_->context_.monotonic_sent_time = base_;
- shm_event_loop_->context_.realtime_sent_time = realtime_clock::min_time;
- shm_event_loop_->context_.queue_index = 0;
- shm_event_loop_->context_.size = 0;
- shm_event_loop_->context_.data = nullptr;
-
- fn_();
+ Call(monotonic_clock::now, base_);
base_ += repeat_offset_ * elapsed_cycles;
});
@@ -390,18 +414,13 @@
repeat_offset_ = repeat_offset;
}
- void Disable() override {
- // Disable is also threadsafe already.
- timerfd_.Disable();
- }
+ void Disable() override { timerfd_.Disable(); }
private:
ShmEventLoop *shm_event_loop_;
TimerFd timerfd_;
- ::std::function<void()> fn_;
-
monotonic_clock::time_point base_;
monotonic_clock::duration repeat_offset_;
};
@@ -412,124 +431,106 @@
PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
- : shm_event_loop_(shm_event_loop),
- phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
- fn_(::std::move(fn)) {
+ : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
+ shm_event_loop_(shm_event_loop) {
shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
timerfd_.Read();
- // Update the context to hold the desired wakeup time.
- shm_event_loop_->context_.monotonic_sent_time = phased_loop_.sleep_time();
- shm_event_loop_->context_.realtime_sent_time = realtime_clock::min_time;
- shm_event_loop_->context_.queue_index = 0;
- shm_event_loop_->context_.size = 0;
- shm_event_loop_->context_.data = nullptr;
-
- // Compute how many cycles elapsed and schedule the next wakeup.
- Reschedule();
-
- // Call the function with the elapsed cycles.
- fn_(cycles_elapsed_);
- cycles_elapsed_ = 0;
-
- const monotonic_clock::time_point monotonic_end_time =
- monotonic_clock::now();
-
- // If the handler too too long so we blew by the previous deadline, we
- // want to just try for the next deadline. Reschedule.
- if (monotonic_end_time > phased_loop_.sleep_time()) {
- Reschedule();
- }
+ Call(monotonic_clock::now,
+ [this](monotonic_clock::time_point sleep_time) {
+ Schedule(sleep_time);
+ });
});
}
- ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
-
- void set_interval_and_offset(
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset) override {
- phased_loop_.set_interval_and_offset(interval, offset);
- }
-
- void Startup() {
- phased_loop_.Reset(shm_event_loop_->monotonic_now());
- Reschedule();
- // The first time, we'll double count. Reschedule here will count cycles
- // elapsed before now, and then the reschedule before runing the handler
- // will count the time that elapsed then. So clear the count here.
- cycles_elapsed_ = 0;
+ ~PhasedLoopHandler() override {
+ shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
}
private:
// Reschedules the timer.
- void Reschedule() {
- cycles_elapsed_ += phased_loop_.Iterate(shm_event_loop_->monotonic_now());
- timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
+ void Schedule(monotonic_clock::time_point sleep_time) override {
+ timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
}
ShmEventLoop *shm_event_loop_;
TimerFd timerfd_;
- time::PhasedLoop phased_loop_;
-
- int cycles_elapsed_ = 0;
-
- // Function to be run
- const ::std::function<void(int)> fn_;
};
} // namespace internal
::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
const Channel *channel) {
- ValidateChannel(channel);
- return ::std::unique_ptr<RawFetcher>(new ShmFetcher(channel));
+ return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
}
::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
const Channel *channel) {
- ValidateChannel(channel);
Take(channel);
- return ::std::unique_ptr<RawSender>(new ShmSender(channel, this));
+
+ return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
}
void ShmEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &context, const void *message)> watcher) {
- ValidateChannel(channel);
Take(channel);
- ::std::unique_ptr<internal::WatcherState> state(
- new internal::WatcherState(
- channel, std::move(watcher)));
- watchers_.push_back(::std::move(state));
+ NewWatcher(::std::unique_ptr<WatcherState>(
+ new internal::WatcherState(this, channel, std::move(watcher))));
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
- ::std::unique_ptr<internal::TimerHandlerState> timer(
- new internal::TimerHandlerState(this, ::std::move(callback)));
-
- timers_.push_back(::std::move(timer));
-
- return timers_.back().get();
+ return NewTimer(::std::unique_ptr<TimerHandler>(
+ new internal::TimerHandlerState(this, ::std::move(callback))));
}
PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset) {
- ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
- new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
- offset));
-
- phased_loops_.push_back(::std::move(phased_loop));
-
- return phased_loops_.back().get();
+ return NewPhasedLoop(
+ ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
+ this, ::std::move(callback), interval, offset)));
}
void ShmEventLoop::OnRun(::std::function<void()> on_run) {
on_run_.push_back(::std::move(on_run));
}
+void ShmEventLoop::HandleWatcherSignal() {
+ while (true) {
+ // Call the handlers in time order of their messages.
+ aos::monotonic_clock::time_point min_event_time =
+ aos::monotonic_clock::max_time;
+ size_t min_watcher_index = -1;
+ size_t watcher_index = 0;
+ for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+ internal::WatcherState *watcher =
+ reinterpret_cast<internal::WatcherState *>(base_watcher.get());
+
+ if (watcher->HasNewData()) {
+ if (watcher->event_time() < min_event_time) {
+ min_watcher_index = watcher_index;
+ min_event_time = watcher->event_time();
+ }
+ }
+ ++watcher_index;
+ }
+
+ if (min_event_time == aos::monotonic_clock::max_time) {
+ break;
+ }
+
+ reinterpret_cast<internal::WatcherState *>(
+ watchers_[min_watcher_index].get())
+ ->CallCallback();
+ }
+}
+
void ShmEventLoop::Run() {
+ // TODO(austin): Automatically register ^C with a sigaction so 2 in a row send
+ // an actual control C.
+
std::unique_ptr<ipc_lib::SignalFd> signalfd;
if (watchers_.size() > 0) {
@@ -543,32 +544,13 @@
// watchers, and calling the oldest thing first. That will improve
// determinism a lot.
- while (true) {
- // Call the handlers in time order of their messages.
- aos::monotonic_clock::time_point min_event_time =
- aos::monotonic_clock::max_time;
- size_t min_watcher_index = -1;
- size_t watcher_index = 0;
- for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
- if (watcher->HasNewData()) {
- if (watcher->event_time() < min_event_time) {
- min_watcher_index = watcher_index;
- min_event_time = watcher->event_time();
- }
- }
- ++watcher_index;
- }
-
- if (min_event_time == aos::monotonic_clock::max_time) {
- break;
- }
-
- watchers_[min_watcher_index]->CallCallback();
- }
+ HandleWatcherSignal();
});
}
- // Now, all the threads are up. Lock everything into memory and go RT.
+ MaybeScheduleTimingReports();
+
+ // Now, all the callbacks are setup. Lock everything into memory and go RT.
if (priority_ != 0) {
::aos::InitRT();
@@ -580,9 +562,8 @@
// Now that we are realtime (but before the OnRun handlers run), snap the
// queue index.
- for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
- watcher->PointAtNextQueueIndex();
- CHECK(watcher->RegisterWakeup(priority_));
+ for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
+ watcher->Startup(this);
}
// Now that we are RT, run all the OnRun handlers.
@@ -590,12 +571,6 @@
run();
}
- // Start up all the phased loops.
- for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
- phased_loops_) {
- phased_loop->Startup();
- }
-
// And start our main event loop which runs all the timers and handles Quit.
epoll_.Run();
@@ -606,7 +581,9 @@
// Drop RT priority.
::aos::UnsetCurrentThreadRealtimePriority();
- for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+ for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+ internal::WatcherState *watcher =
+ reinterpret_cast<internal::WatcherState *>(base_watcher.get());
watcher->UnregisterWakeup();
}
@@ -619,6 +596,15 @@
void ShmEventLoop::Exit() { epoll_.Quit(); }
ShmEventLoop::~ShmEventLoop() {
+ // Trigger any remaining senders or fetchers to be cleared before destroying
+ // the event loop so the book keeping matches.
+ timing_report_sender_.reset();
+
+ // Force everything with a registered fd with epoll to be destroyed now.
+ timers_.clear();
+ phased_loops_.clear();
+ watchers_.clear();
+
CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
}
@@ -642,4 +628,6 @@
priority_ = priority;
}
+pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
+
} // namespace aos
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index 577a98d..89a2721 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -1,13 +1,11 @@
#ifndef AOS_EVENTS_SHM_EVENT_LOOP_H_
#define AOS_EVENTS_SHM_EVENT_LOOP_H_
-#include <unordered_set>
#include <vector>
#include "aos/events/epoll.h"
#include "aos/events/event_loop.h"
-#include "aos/ipc_lib/signalfd.h"
-#include "aos/mutex/mutex.h"
+#include "aos/events/event_loop_generated.h"
namespace aos {
namespace internal {
@@ -15,20 +13,29 @@
class WatcherState;
class TimerHandlerState;
class PhasedLoopHandler;
+class ShmSender;
+class ShmFetcher;
} // namespace internal
// Specialization of EventLoop that is built from queues running out of shared
-// memory. See more details at aos/queue.h
+// memory.
//
-// This object must be interacted with from one thread, but the Senders and
-// Fetchers may be used from multiple threads afterwords (as long as their
+// TODO(austin): Timing reports break multiple threads. Need to add back in a
+// mutex.
+// This object must be interacted with from one thread, but the Senders
+// and Fetchers may be used from multiple threads afterwords (as long as their
// destructors are called back in one thread again)
class ShmEventLoop : public EventLoop {
public:
ShmEventLoop(const Configuration *configuration);
~ShmEventLoop() override;
+ // Runs the event loop until Exit is called, or ^C is caught.
+ void Run();
+ // Exits the event loop. Async safe.
+ void Exit();
+
aos::monotonic_clock::time_point monotonic_now() override {
return aos::monotonic_clock::now();
}
@@ -52,39 +59,39 @@
std::chrono::seconds(0)) override;
void OnRun(std::function<void()> on_run) override;
- void Run();
- void Exit();
-
- // TODO(austin): Add a function to register control-C call.
void SetRuntimeRealtimePriority(int priority) override;
void set_name(const std::string_view name) override {
name_ = std::string(name);
+ UpdateTimingReport();
}
const std::string_view name() const override { return name_; }
- int priority() const { return priority_; }
+ int priority() const override { return priority_; }
private:
friend class internal::WatcherState;
friend class internal::TimerHandlerState;
friend class internal::PhasedLoopHandler;
+ friend class internal::ShmSender;
+ friend class internal::ShmFetcher;
+
+ void HandleWatcherSignal();
// Tracks that we can't have multiple watchers or a sender and a watcher (or
// multiple senders) on a single queue (path).
void Take(const Channel *channel);
+ // Returns the TID of the event loop.
+ pid_t GetTid() override;
+
std::vector<std::function<void()>> on_run_;
int priority_ = 0;
std::string name_;
std::vector<std::string> taken_;
internal::EPoll epoll_;
-
- std::vector<std::unique_ptr<internal::TimerHandlerState>> timers_;
- std::vector<std::unique_ptr<internal::PhasedLoopHandler>> phased_loops_;
- std::vector<std::unique_ptr<internal::WatcherState>> watchers_;
};
} // namespace aos
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index ba0b80d..1bbca2c 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -30,6 +30,7 @@
unlink((FLAGS_shm_base + "/test1/aos.TestMessage.v0").c_str());
unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v0").c_str());
unlink((FLAGS_shm_base + "/test2/aos.TestMessage.v0").c_str());
+ unlink((FLAGS_shm_base + "/aos/aos.timing.Report.v0").c_str());
}
::std::unique_ptr<EventLoop> Make(std::string_view name) override {
@@ -171,5 +172,7 @@
EXPECT_EQ(times.size(), 2u);
}
+// TODO(austin): Test that missing a deadline with a timer recovers as expected.
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 4819141..a9dd3e5 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -30,11 +30,47 @@
};
class SimulatedFetcher;
+class SimulatedChannel;
+
+class ShmWatcher : public WatcherState {
+ public:
+ ShmWatcher(
+ EventLoop *event_loop, const Channel *channel,
+ std::function<void(const Context &context, const void *message)> fn)
+ : WatcherState(event_loop, channel, std::move(fn)),
+ event_loop_(event_loop) {}
+
+ ~ShmWatcher() override;
+
+ void Call(const Context &context) {
+ const monotonic_clock::time_point monotonic_now =
+ event_loop_->monotonic_now();
+ DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
+ }
+
+ void Startup(EventLoop * /*event_loop*/) override {}
+
+ void Schedule(EventScheduler *scheduler,
+ std::shared_ptr<SimulatedMessage> message) {
+ // TODO(austin): Track the token once we schedule in the future.
+ // TODO(austin): Schedule wakeup in the future so we don't have 0 latency.
+ scheduler->Schedule(scheduler->monotonic_now(),
+ [this, message]() { Call(message->context); });
+ }
+
+ void SetSimulatedChannel(SimulatedChannel *channel) {
+ simulated_channel_ = channel;
+ }
+
+ private:
+ EventLoop *event_loop_;
+ SimulatedChannel *simulated_channel_ = nullptr;
+};
class SimulatedChannel {
public:
explicit SimulatedChannel(const Channel *channel, EventScheduler *scheduler)
- : channel_(CopyFlatBuffer(channel)),
+ : channel_(channel),
scheduler_(scheduler),
next_queue_index_(ipc_lib::QueueIndex::Zero(channel->max_size())) {}
@@ -44,12 +80,14 @@
::std::unique_ptr<RawSender> MakeRawSender(EventLoop *event_loop);
// Makes a connected raw fetcher.
- ::std::unique_ptr<RawFetcher> MakeRawFetcher();
+ ::std::unique_ptr<RawFetcher> MakeRawFetcher(EventLoop *event_loop);
// Registers a watcher for the queue.
- void MakeRawWatcher(
- ::std::function<void(const Context &context, const void *message)>
- watcher);
+ void MakeRawWatcher(ShmWatcher *watcher);
+
+ void RemoveWatcher(ShmWatcher *watcher) {
+ watchers_.erase(std::find(watchers_.begin(), watchers_.end(), watcher));
+ }
// Sends the message to all the connected receivers and fetchers.
void Send(std::shared_ptr<SimulatedMessage> message);
@@ -59,21 +97,23 @@
std::shared_ptr<SimulatedMessage> latest_message() { return latest_message_; }
- size_t max_size() const { return channel_.message().max_size(); }
+ size_t max_size() const { return channel()->max_size(); }
const std::string_view name() const {
- return channel_.message().name()->string_view();
+ return channel()->name()->string_view();
}
- const Channel *channel() const { return &channel_.message(); }
+ const Channel *channel() const { return channel_; }
+
+ ::aos::monotonic_clock::time_point monotonic_now() const {
+ return scheduler_->monotonic_now();
+ }
private:
- const FlatbufferDetachedBuffer<Channel> channel_;
+ const Channel *channel_;
// List of all watchers.
- ::std::vector<
- std::function<void(const Context &context, const void *message)>>
- watchers_;
+ ::std::vector<ShmWatcher *> watchers_;
// List of all fetchers.
::std::vector<SimulatedFetcher *> fetchers_;
@@ -83,6 +123,8 @@
ipc_lib::QueueIndex next_queue_index_;
};
+ShmWatcher::~ShmWatcher() { simulated_channel_->RemoveWatcher(this); }
+
namespace {
// Creates a SimulatedMessage with size bytes of storage.
@@ -99,7 +141,7 @@
class SimulatedSender : public RawSender {
public:
SimulatedSender(SimulatedChannel *simulated_channel, EventLoop *event_loop)
- : RawSender(simulated_channel->channel()),
+ : RawSender(event_loop, simulated_channel->channel()),
simulated_channel_(simulated_channel),
event_loop_(event_loop) {}
~SimulatedSender() {}
@@ -113,7 +155,7 @@
size_t size() override { return simulated_channel_->max_size(); }
- bool Send(size_t length) override {
+ bool DoSend(size_t length) override {
CHECK_LE(length, size()) << ": Attempting to send too big a message.";
message_->context.monotonic_sent_time = event_loop_->monotonic_now();
message_->context.realtime_sent_time = event_loop_->realtime_now();
@@ -130,7 +172,7 @@
return true;
}
- bool Send(const void *msg, size_t size) override {
+ bool DoSend(const void *msg, size_t size) override {
CHECK_LE(size, this->size()) << ": Attempting to send too big a message.";
// This is wasteful, but since flatbuffers fill from the back end of the
@@ -145,10 +187,6 @@
return Send(size);
}
- const std::string_view name() const override {
- return simulated_channel_->name();
- }
-
private:
SimulatedChannel *simulated_channel_;
EventLoop *event_loop_;
@@ -161,25 +199,27 @@
class SimulatedFetcher : public RawFetcher {
public:
- explicit SimulatedFetcher(SimulatedChannel *queue)
- : RawFetcher(queue->channel()), queue_(queue) {}
+ explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
+ : RawFetcher(event_loop, queue->channel()), queue_(queue) {}
~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
- bool FetchNext() override {
- if (msgs_.size() == 0) return false;
+ std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
+ if (msgs_.size() == 0) {
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
SetMsg(msgs_.front());
msgs_.pop_front();
- return true;
+ return std::make_pair(true, queue_->monotonic_now());
}
- bool Fetch() override {
+ std::pair<bool, monotonic_clock::time_point> DoFetch() override {
if (msgs_.size() == 0) {
if (!msg_ && queue_->latest_message()) {
SetMsg(queue_->latest_message());
- return true;
+ return std::make_pair(true, queue_->monotonic_now());
} else {
- return false;
+ return std::make_pair(false, monotonic_clock::min_time);
}
}
@@ -187,7 +227,7 @@
// latest message from before we started.
SetMsg(msgs_.back());
msgs_.clear();
- return true;
+ return std::make_pair(true, queue_->monotonic_now());
}
private:
@@ -196,7 +236,6 @@
// Updates the state inside RawFetcher to point to the data in msg_.
void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
msg_ = msg;
- data_ = msg_->context.data;
context_ = msg_->context;
}
@@ -216,11 +255,7 @@
public:
explicit SimulatedTimerHandler(EventScheduler *scheduler,
SimulatedEventLoop *simulated_event_loop,
- ::std::function<void()> fn)
- : scheduler_(scheduler),
- token_(scheduler_->InvalidToken()),
- simulated_event_loop_(simulated_event_loop),
- fn_(fn) {}
+ ::std::function<void()> fn);
~SimulatedTimerHandler() {}
void Setup(monotonic_clock::time_point base,
@@ -243,9 +278,6 @@
EventScheduler *scheduler_;
EventScheduler::Token token_;
- SimulatedEventLoop *simulated_event_loop_;
- // Function to be run on the thread
- ::std::function<void()> fn_;
monotonic_clock::time_point base_;
monotonic_clock::duration repeat_offset_;
};
@@ -256,44 +288,25 @@
SimulatedEventLoop *simulated_event_loop,
::std::function<void(int)> fn,
const monotonic_clock::duration interval,
- const monotonic_clock::duration offset)
- : simulated_timer_handler_(scheduler, simulated_event_loop,
- [this]() { HandleTimerWakeup(); }),
- phased_loop_(interval, simulated_timer_handler_.monotonic_now(),
- offset),
- fn_(fn) {
- // TODO(austin): This assumes time doesn't change between when the
- // constructor is called and when we start running. It's probably a safe
- // assumption.
- Reschedule();
+ const monotonic_clock::duration offset);
+ ~SimulatedPhasedLoopHandler() {
+ if (token_ != scheduler_->InvalidToken()) {
+ scheduler_->Deschedule(token_);
+ token_ = scheduler_->InvalidToken();
+ }
}
- void HandleTimerWakeup() {
- fn_(cycles_elapsed_);
- Reschedule();
- }
+ void HandleTimerWakeup();
- void set_interval_and_offset(
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset) override {
- phased_loop_.set_interval_and_offset(interval, offset);
- }
-
- void Reschedule() {
- cycles_elapsed_ =
- phased_loop_.Iterate(simulated_timer_handler_.monotonic_now());
- simulated_timer_handler_.Setup(phased_loop_.sleep_time(),
- ::aos::monotonic_clock::zero());
+ void Schedule(monotonic_clock::time_point sleep_time) override {
+ token_ = scheduler_->Schedule(sleep_time, [this]() { HandleTimerWakeup(); });
}
private:
- SimulatedTimerHandler simulated_timer_handler_;
+ SimulatedEventLoop *simulated_event_loop_;
- time::PhasedLoop phased_loop_;
-
- int cycles_elapsed_ = 1;
-
- ::std::function<void(int)> fn_;
+ EventScheduler *scheduler_;
+ EventScheduler::Token token_;
};
class SimulatedEventLoop : public EventLoop {
@@ -304,15 +317,31 @@
*channels,
const Configuration *configuration,
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- *raw_event_loops)
- : EventLoop(configuration),
+ *raw_event_loops,
+ pid_t tid)
+ : EventLoop(CHECK_NOTNULL(configuration)),
scheduler_(scheduler),
channels_(channels),
- raw_event_loops_(raw_event_loops) {
- raw_event_loops_->push_back(
- std::make_pair(this, [this](bool value) { set_is_running(value); }));
+ raw_event_loops_(raw_event_loops),
+ tid_(tid) {
+ raw_event_loops_->push_back(std::make_pair(this, [this](bool value) {
+ if (!has_setup_) {
+ Setup();
+ has_setup_ = true;
+ }
+ set_is_running(value);
+ }));
}
~SimulatedEventLoop() override {
+ // Trigger any remaining senders or fetchers to be cleared before destroying
+ // the event loop so the book keeping matches.
+ timing_report_sender_.reset();
+
+ // Force everything with a registered fd with epoll to be destroyed now.
+ timers_.clear();
+ phased_loops_.clear();
+ watchers_.clear();
+
for (auto it = raw_event_loops_->begin(); it != raw_event_loops_->end();
++it) {
if (it->first == this) {
@@ -340,21 +369,22 @@
watcher) override;
TimerHandler *AddTimer(::std::function<void()> callback) override {
- timers_.emplace_back(new SimulatedTimerHandler(scheduler_, this, callback));
- return timers_.back().get();
+ CHECK(!is_running());
+ return NewTimer(::std::unique_ptr<TimerHandler>(
+ new SimulatedTimerHandler(scheduler_, this, callback)));
}
PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset =
::std::chrono::seconds(0)) override {
- phased_loops_.emplace_back(new SimulatedPhasedLoopHandler(
- scheduler_, this, callback, interval, offset));
- return phased_loops_.back().get();
+ return NewPhasedLoop(
+ ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
+ scheduler_, this, callback, interval, offset)));
}
void OnRun(::std::function<void()> on_run) override {
- scheduler_->Schedule(scheduler_->monotonic_now(), on_run);
+ scheduler_->ScheduleOnRun(on_run);
}
void set_name(const std::string_view name) override {
@@ -366,43 +396,58 @@
void Take(const Channel *channel);
- void SetRuntimeRealtimePriority(int /*priority*/) override {
+ void SetRuntimeRealtimePriority(int priority) override {
CHECK(!is_running()) << ": Cannot set realtime priority while running.";
+ priority_ = priority;
}
+ int priority() const override { return priority_; }
+
+ void Setup() { MaybeScheduleTimingReports(); }
+
private:
friend class SimulatedTimerHandler;
+ pid_t GetTid() override { return tid_; }
+
EventScheduler *scheduler_;
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
*raw_event_loops_;
absl::btree_set<SimpleChannel> taken_;
- ::std::vector<std::unique_ptr<TimerHandler>> timers_;
- ::std::vector<std::unique_ptr<PhasedLoopHandler>> phased_loops_;
::std::string name_;
+
+ int priority_ = 0;
+
+ bool has_setup_ = false;
+
+ const pid_t tid_;
};
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
- ValidateChannel(channel);
+ ChannelIndex(channel);
Take(channel);
- GetSimulatedChannel(channel)->MakeRawWatcher(watcher);
+ std::unique_ptr<ShmWatcher> shm_watcher(
+ new ShmWatcher(this, channel, std::move(watcher)));
+
+ GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
+ NewWatcher(std::move(shm_watcher));
}
std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
const Channel *channel) {
- ValidateChannel(channel);
+ ChannelIndex(channel);
Take(channel);
return GetSimulatedChannel(channel)->MakeRawSender(this);
}
std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
const Channel *channel) {
- ValidateChannel(channel);
- return GetSimulatedChannel(channel)->MakeRawFetcher();
+ ChannelIndex(channel);
+ return GetSimulatedChannel(channel)->MakeRawFetcher(this);
}
SimulatedChannel *SimulatedEventLoop::GetSimulatedChannel(
@@ -418,10 +463,9 @@
return it->second.get();
}
-void SimulatedChannel::MakeRawWatcher(
- ::std::function<void(const Context &context, const void *message)>
- watcher) {
- watchers_.push_back(watcher);
+void SimulatedChannel::MakeRawWatcher(ShmWatcher *watcher) {
+ watcher->SetSimulatedChannel(this);
+ watchers_.emplace_back(watcher);
}
::std::unique_ptr<RawSender> SimulatedChannel::MakeRawSender(
@@ -429,8 +473,10 @@
return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
}
-::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher() {
- ::std::unique_ptr<SimulatedFetcher> fetcher(new SimulatedFetcher(this));
+::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher(
+ EventLoop *event_loop) {
+ ::std::unique_ptr<SimulatedFetcher> fetcher(
+ new SimulatedFetcher(event_loop, this));
fetchers_.push_back(fetcher.get());
return ::std::move(fetcher);
}
@@ -443,10 +489,8 @@
latest_message_ = message;
if (scheduler_->is_running()) {
- for (auto &watcher : watchers_) {
- scheduler_->Schedule(scheduler_->monotonic_now(), [watcher, message]() {
- watcher(message->context, message->context.data);
- });
+ for (ShmWatcher *watcher : watchers_) {
+ watcher->Schedule(scheduler_, message);
}
}
for (auto &fetcher : fetchers_) {
@@ -462,6 +506,13 @@
: name(CHECK_NOTNULL(CHECK_NOTNULL(channel)->name())->str()),
type(CHECK_NOTNULL(CHECK_NOTNULL(channel)->type())->str()) {}
+SimulatedTimerHandler::SimulatedTimerHandler(
+ EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
+ ::std::function<void()> fn)
+ : TimerHandler(simulated_event_loop, std::move(fn)),
+ scheduler_(scheduler),
+ token_(scheduler_->InvalidToken()) {}
+
void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) {
Disable();
@@ -486,17 +537,26 @@
} else {
token_ = scheduler_->InvalidToken();
}
- // The scheduler is perfect, so we will always wake up on time.
- simulated_event_loop_->context_.monotonic_sent_time =
- scheduler_->monotonic_now();
- simulated_event_loop_->context_.realtime_sent_time = realtime_clock::min_time;
- simulated_event_loop_->context_.queue_index = 0;
- simulated_event_loop_->context_.size = 0;
- simulated_event_loop_->context_.data = nullptr;
- fn_();
+ Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
}
+SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
+ EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
+ ::std::function<void(int)> fn, const monotonic_clock::duration interval,
+ const monotonic_clock::duration offset)
+ : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
+ simulated_event_loop_(simulated_event_loop),
+ scheduler_(scheduler),
+ token_(scheduler_->InvalidToken()) {}
+
+void SimulatedPhasedLoopHandler::HandleTimerWakeup() {
+ monotonic_clock::time_point monotonic_now =
+ simulated_event_loop_->monotonic_now();
+ Call(
+ [monotonic_now]() { return monotonic_now; },
+ [this](monotonic_clock::time_point sleep_time) { Schedule(sleep_time); });
+}
void SimulatedEventLoop::Take(const Channel *channel) {
CHECK(!is_running()) << ": Cannot add new objects while running.";
@@ -508,13 +568,15 @@
SimulatedEventLoopFactory::SimulatedEventLoopFactory(
const Configuration *configuration)
- : configuration_(configuration) {}
+ : configuration_(CHECK_NOTNULL(configuration)) {}
SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
std::string_view name) {
+ pid_t tid = tid_;
+ ++tid_;
::std::unique_ptr<EventLoop> result(new SimulatedEventLoop(
- &scheduler_, &channels_, configuration_, &raw_event_loops_));
+ &scheduler_, &channels_, configuration_, &raw_event_loops_, tid));
result->set_name(name);
return result;
}
@@ -525,11 +587,9 @@
event_loop.second(true);
}
scheduler_.RunFor(duration);
- if (!scheduler_.is_running()) {
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(false);
- }
+ for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+ raw_event_loops_) {
+ event_loop.second(false);
}
}
@@ -539,11 +599,9 @@
event_loop.second(true);
}
scheduler_.Run();
- if (!scheduler_.is_running()) {
- for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
- raw_event_loops_) {
- event_loop.second(false);
- }
+ for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
+ raw_event_loops_) {
+ event_loop.second(false);
}
}
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 0550204..79f1e46 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -80,6 +80,8 @@
// List of event loops to manage running and not running for.
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
raw_event_loops_;
+
+ pid_t tid_ = 0;
};
} // namespace aos
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index b2eb634..16a1923 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -73,7 +73,10 @@
// Test that running for a time period with no handlers causes time to progress
// correctly.
TEST(SimulatedEventLoopTest, RunForNoHandlers) {
- SimulatedEventLoopFactory simulated_event_loop_factory(nullptr);
+ SimulatedEventLoopTestFactory factory;
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(
+ factory.configuration());
::std::unique_ptr<EventLoop> event_loop =
simulated_event_loop_factory.MakeEventLoop("loop");
@@ -88,7 +91,10 @@
// Test that running for a time with a periodic handler causes time to end
// correctly.
TEST(SimulatedEventLoopTest, RunForTimerHandler) {
- SimulatedEventLoopFactory simulated_event_loop_factory(nullptr);
+ SimulatedEventLoopTestFactory factory;
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(
+ factory.configuration());
::std::unique_ptr<EventLoop> event_loop =
simulated_event_loop_factory.MakeEventLoop("loop");
diff --git a/aos/robot_state/BUILD b/aos/robot_state/BUILD
index 5a9c255..9748e1b 100644
--- a/aos/robot_state/BUILD
+++ b/aos/robot_state/BUILD
@@ -23,4 +23,5 @@
"//aos/robot_state:robot_state_fbs",
],
visibility = ["//visibility:public"],
+ deps = ["//aos/events:config"],
)
diff --git a/aos/robot_state/robot_state_config.json b/aos/robot_state/robot_state_config.json
index 0b2a5cc..37cc160 100644
--- a/aos/robot_state/robot_state_config.json
+++ b/aos/robot_state/robot_state_config.json
@@ -11,5 +11,8 @@
"type": "aos.RobotState",
"frequency": 200
}
+ ],
+ "imports": [
+ "../events/aos.json"
]
}
diff --git a/frc971/control_loops/static_zeroing_single_dof_profiled_subsystem_test.cc b/frc971/control_loops/static_zeroing_single_dof_profiled_subsystem_test.cc
index ffea935..1c04c25 100644
--- a/frc971/control_loops/static_zeroing_single_dof_profiled_subsystem_test.cc
+++ b/frc971/control_loops/static_zeroing_single_dof_profiled_subsystem_test.cc
@@ -348,6 +348,10 @@
" \"type\": \"aos.RobotState\"\n"
" },\n"
" {\n"
+ " \"name\": \"/aos\",\n"
+ " \"type\": \"aos.timing.Report\"\n"
+ " },\n"
+ " {\n"
" \"name\": \"/loop\",\n"
" \"type\": \"frc971.control_loops.zeroing.testing.SubsystemGoal\"\n"
" },\n"
diff --git a/frc971/wpilib/BUILD b/frc971/wpilib/BUILD
index ac9bc87..8d0982a 100644
--- a/frc971/wpilib/BUILD
+++ b/frc971/wpilib/BUILD
@@ -132,6 +132,9 @@
flatbuffers = [
":loop_output_handler_test_fbs",
],
+ deps = [
+ "//aos/events:config",
+ ],
)
cc_library(
diff --git a/frc971/wpilib/loop_output_handler_test_config_source.json b/frc971/wpilib/loop_output_handler_test_config_source.json
index ef07eb0..587a318 100644
--- a/frc971/wpilib/loop_output_handler_test_config_source.json
+++ b/frc971/wpilib/loop_output_handler_test_config_source.json
@@ -5,5 +5,9 @@
"name": "/test",
"type": "frc971.wpilib.LoopOutputHandlerTestOutput"
}
+ ],
+ "imports":
+ [
+ "../../aos/events/aos.json"
]
}