Add timing reports for both simulation and shm
This involves a major refactor of how event loops are put together.
Lots of functionality for timing reports is in the base event loop
class (though hidden as much as possible to avoid cluttering the
interface). More functionality is now in the base class, with simpler
and more consistent subclasses.
Change-Id: I163ffb8d1b0e29a0231b54e205c2d7c5241d662b
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 9e18544..a8a8e01 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -1,21 +1,24 @@
-#include "glog/logging.h"
-
#include "aos/events/shm_event_loop.h"
#include <sys/mman.h>
#include <sys/stat.h>
-#include <sys/timerfd.h>
+#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
#include <atomic>
#include <chrono>
+#include <iterator>
#include <stdexcept>
#include "aos/events/epoll.h"
+#include "aos/events/event_loop_generated.h"
+#include "aos/events/timing_statistics.h"
#include "aos/ipc_lib/lockless_queue.h"
+#include "aos/ipc_lib/signalfd.h"
#include "aos/realtime.h"
#include "aos/util/phased_loop.h"
+#include "glog/logging.h"
DEFINE_string(shm_base, "/dev/shm/aos",
"Directory to place queue backing mmaped files in.");
@@ -96,9 +99,7 @@
return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
}
- const ipc_lib::LocklessQueueConfiguration &config() const {
- return config_;
- }
+ const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
private:
void MkdirP(std::string_view path) {
@@ -141,11 +142,14 @@
namespace chrono = ::std::chrono;
-class ShmFetcher : public RawFetcher {
+} // namespace
+
+namespace internal {
+
+class SimpleShmFetcher {
public:
- explicit ShmFetcher(const Channel *channel)
- : RawFetcher(channel),
- lockless_queue_memory_(channel),
+ explicit SimpleShmFetcher(const Channel *channel)
+ : lockless_queue_memory_(channel),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()),
data_storage_(static_cast<AlignedChar *>(aligned_alloc(
@@ -158,7 +162,7 @@
PointAtNextQueueIndex();
}
- ~ShmFetcher() { data_ = nullptr; }
+ ~SimpleShmFetcher() {}
// Points the next message to fetch at the queue index which will be
// populated next.
@@ -174,7 +178,7 @@
}
}
- bool FetchNext() override {
+ bool FetchNext() {
// TODO(austin): Write a test which starts with nothing in the queue,
// and then calls FetchNext() after something is sent.
// TODO(austin): Get behind and make sure it dies both here and with
@@ -185,9 +189,8 @@
reinterpret_cast<char *>(data_storage_.get()));
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = actual_queue_index_.index();
- data_ = reinterpret_cast<char *>(data_storage_.get()) +
- lockless_queue_.message_data_size() - context_.size;
- context_.data = data_;
+ context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+ lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = actual_queue_index_.Increment();
}
@@ -205,7 +208,7 @@
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
- bool Fetch() override {
+ bool Fetch() {
const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
// actual_queue_index_ is only meaningful if it was set by Fetch or
// FetchNext. This happens when valid_data_ has been set. So, only
@@ -213,7 +216,7 @@
//
// Also, if the latest queue index is invalid, we are empty. So there
// is nothing to fetch.
- if ((data_ != nullptr &&
+ if ((context_.data != nullptr &&
queue_index == actual_queue_index_.DecrementBy(1u)) ||
!queue_index.valid()) {
return false;
@@ -225,9 +228,8 @@
reinterpret_cast<char *>(data_storage_.get()));
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = queue_index.index();
- data_ = reinterpret_cast<char *>(data_storage_.get()) +
- lockless_queue_.message_data_size() - context_.size;
- context_.data = data_;
+ context_.data = reinterpret_cast<char *>(data_storage_.get()) +
+ lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = queue_index.Increment();
}
@@ -251,6 +253,8 @@
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
+ Context context() const { return context_; }
+
bool RegisterWakeup(int priority) {
return lockless_queue_.RegisterWakeup(priority);
}
@@ -269,66 +273,93 @@
};
std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
+
+ Context context_;
+};
+
+class ShmFetcher : public RawFetcher {
+ public:
+ explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
+ : RawFetcher(event_loop, channel), simple_shm_fetcher_(channel) {}
+
+ ~ShmFetcher() { context_.data = nullptr; }
+
+ std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
+ if (simple_shm_fetcher_.FetchNext()) {
+ context_ = simple_shm_fetcher_.context();
+ return std::make_pair(true, monotonic_clock::now());
+ }
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
+
+ std::pair<bool, monotonic_clock::time_point> DoFetch() override {
+ if (simple_shm_fetcher_.Fetch()) {
+ context_ = simple_shm_fetcher_.context();
+ return std::make_pair(true, monotonic_clock::now());
+ }
+ return std::make_pair(false, monotonic_clock::min_time);
+ }
+
+ private:
+ SimpleShmFetcher simple_shm_fetcher_;
};
class ShmSender : public RawSender {
public:
- explicit ShmSender(const Channel *channel, const ShmEventLoop *shm_event_loop)
- : RawSender(channel),
- shm_event_loop_(shm_event_loop),
- name_(channel->name()->str()),
+ explicit ShmSender(EventLoop *event_loop, const Channel *channel)
+ : RawSender(event_loop, channel),
lockless_queue_memory_(channel),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()),
lockless_queue_sender_(lockless_queue_.MakeSender()) {}
+ ~ShmSender() override {}
+
void *data() override { return lockless_queue_sender_.Data(); }
size_t size() override { return lockless_queue_sender_.size(); }
- bool Send(size_t size) override {
- lockless_queue_sender_.Send(size);
- lockless_queue_.Wakeup(shm_event_loop_->priority());
+ bool DoSend(size_t length) override {
+ lockless_queue_sender_.Send(length);
+ lockless_queue_.Wakeup(event_loop()->priority());
return true;
}
- bool Send(const void *msg, size_t length) override {
+ bool DoSend(const void *msg, size_t length) override {
lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length);
- lockless_queue_.Wakeup(shm_event_loop_->priority());
+ lockless_queue_.Wakeup(event_loop()->priority());
// TODO(austin): Return an error if we send too fast.
return true;
}
- const std::string_view name() const override { return name_; }
-
private:
- const ShmEventLoop *shm_event_loop_;
- std::string name_;
MMapedQueue lockless_queue_memory_;
ipc_lib::LocklessQueue lockless_queue_;
ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
};
-} // namespace
-
-namespace internal {
-
// Class to manage the state for a Watcher.
-class WatcherState {
+class WatcherState : public aos::WatcherState {
public:
WatcherState(
- const Channel *channel,
- std::function<void(const Context &context, const void *message)> watcher)
- : shm_fetcher_(channel), watcher_(watcher) {}
+ EventLoop *event_loop, const Channel *channel,
+ std::function<void(const Context &context, const void *message)> fn)
+ : aos::WatcherState(event_loop, channel, std::move(fn)),
+ simple_shm_fetcher_(channel) {}
- ~WatcherState() {}
+ ~WatcherState() override {}
+
+ void Startup(EventLoop *event_loop) override {
+ PointAtNextQueueIndex();
+ CHECK(RegisterWakeup(event_loop->priority()));
+ }
// Points the next message to fetch at the queue index which will be populated
// next.
- void PointAtNextQueueIndex() { shm_fetcher_.PointAtNextQueueIndex(); }
+ void PointAtNextQueueIndex() { simple_shm_fetcher_.PointAtNextQueueIndex(); }
// Returns true if there is new data available.
bool HasNewData() {
if (!has_new_data_) {
- has_new_data_ = shm_fetcher_.FetchNext();
+ has_new_data_ = simple_shm_fetcher_.FetchNext();
}
return has_new_data_;
@@ -336,46 +367,39 @@
// Returns the time of the current data sample.
aos::monotonic_clock::time_point event_time() const {
- return shm_fetcher_.context().monotonic_sent_time;
+ return simple_shm_fetcher_.context().monotonic_sent_time;
}
// Consumes the data by calling the callback.
void CallCallback() {
CHECK(has_new_data_);
- watcher_(shm_fetcher_.context(), shm_fetcher_.most_recent_data());
+ DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
has_new_data_ = false;
}
- // Starts the thread and waits until it is running.
+ // Registers us to receive a signal on event reception.
bool RegisterWakeup(int priority) {
- return shm_fetcher_.RegisterWakeup(priority);
+ return simple_shm_fetcher_.RegisterWakeup(priority);
}
- void UnregisterWakeup() { return shm_fetcher_.UnregisterWakeup(); }
+ void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
private:
bool has_new_data_ = false;
- ShmFetcher shm_fetcher_;
-
- std::function<void(const Context &context, const void *message)> watcher_;
+ SimpleShmFetcher simple_shm_fetcher_;
};
// Adapter class to adapt a timerfd to a TimerHandler.
class TimerHandlerState : public TimerHandler {
public:
TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
- : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
+ : TimerHandler(shm_event_loop, std::move(fn)),
+ shm_event_loop_(shm_event_loop) {
shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
const uint64_t elapsed_cycles = timerfd_.Read();
- shm_event_loop_->context_.monotonic_sent_time = base_;
- shm_event_loop_->context_.realtime_sent_time = realtime_clock::min_time;
- shm_event_loop_->context_.queue_index = 0;
- shm_event_loop_->context_.size = 0;
- shm_event_loop_->context_.data = nullptr;
-
- fn_();
+ Call(monotonic_clock::now, base_);
base_ += repeat_offset_ * elapsed_cycles;
});
@@ -390,18 +414,13 @@
repeat_offset_ = repeat_offset;
}
- void Disable() override {
- // Disable is also threadsafe already.
- timerfd_.Disable();
- }
+ void Disable() override { timerfd_.Disable(); }
private:
ShmEventLoop *shm_event_loop_;
TimerFd timerfd_;
- ::std::function<void()> fn_;
-
monotonic_clock::time_point base_;
monotonic_clock::duration repeat_offset_;
};
@@ -412,124 +431,106 @@
PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
- : shm_event_loop_(shm_event_loop),
- phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
- fn_(::std::move(fn)) {
+ : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
+ shm_event_loop_(shm_event_loop) {
shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
timerfd_.Read();
- // Update the context to hold the desired wakeup time.
- shm_event_loop_->context_.monotonic_sent_time = phased_loop_.sleep_time();
- shm_event_loop_->context_.realtime_sent_time = realtime_clock::min_time;
- shm_event_loop_->context_.queue_index = 0;
- shm_event_loop_->context_.size = 0;
- shm_event_loop_->context_.data = nullptr;
-
- // Compute how many cycles elapsed and schedule the next wakeup.
- Reschedule();
-
- // Call the function with the elapsed cycles.
- fn_(cycles_elapsed_);
- cycles_elapsed_ = 0;
-
- const monotonic_clock::time_point monotonic_end_time =
- monotonic_clock::now();
-
- // If the handler too too long so we blew by the previous deadline, we
- // want to just try for the next deadline. Reschedule.
- if (monotonic_end_time > phased_loop_.sleep_time()) {
- Reschedule();
- }
+ Call(monotonic_clock::now,
+ [this](monotonic_clock::time_point sleep_time) {
+ Schedule(sleep_time);
+ });
});
}
- ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
-
- void set_interval_and_offset(
- const monotonic_clock::duration interval,
- const monotonic_clock::duration offset) override {
- phased_loop_.set_interval_and_offset(interval, offset);
- }
-
- void Startup() {
- phased_loop_.Reset(shm_event_loop_->monotonic_now());
- Reschedule();
- // The first time, we'll double count. Reschedule here will count cycles
- // elapsed before now, and then the reschedule before runing the handler
- // will count the time that elapsed then. So clear the count here.
- cycles_elapsed_ = 0;
+ ~PhasedLoopHandler() override {
+ shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
}
private:
// Reschedules the timer.
- void Reschedule() {
- cycles_elapsed_ += phased_loop_.Iterate(shm_event_loop_->monotonic_now());
- timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
+ void Schedule(monotonic_clock::time_point sleep_time) override {
+ timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
}
ShmEventLoop *shm_event_loop_;
TimerFd timerfd_;
- time::PhasedLoop phased_loop_;
-
- int cycles_elapsed_ = 0;
-
- // Function to be run
- const ::std::function<void(int)> fn_;
};
} // namespace internal
::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
const Channel *channel) {
- ValidateChannel(channel);
- return ::std::unique_ptr<RawFetcher>(new ShmFetcher(channel));
+ return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
}
::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
const Channel *channel) {
- ValidateChannel(channel);
Take(channel);
- return ::std::unique_ptr<RawSender>(new ShmSender(channel, this));
+
+ return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
}
void ShmEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &context, const void *message)> watcher) {
- ValidateChannel(channel);
Take(channel);
- ::std::unique_ptr<internal::WatcherState> state(
- new internal::WatcherState(
- channel, std::move(watcher)));
- watchers_.push_back(::std::move(state));
+ NewWatcher(::std::unique_ptr<WatcherState>(
+ new internal::WatcherState(this, channel, std::move(watcher))));
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
- ::std::unique_ptr<internal::TimerHandlerState> timer(
- new internal::TimerHandlerState(this, ::std::move(callback)));
-
- timers_.push_back(::std::move(timer));
-
- return timers_.back().get();
+ return NewTimer(::std::unique_ptr<TimerHandler>(
+ new internal::TimerHandlerState(this, ::std::move(callback))));
}
PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset) {
- ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
- new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
- offset));
-
- phased_loops_.push_back(::std::move(phased_loop));
-
- return phased_loops_.back().get();
+ return NewPhasedLoop(
+ ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
+ this, ::std::move(callback), interval, offset)));
}
void ShmEventLoop::OnRun(::std::function<void()> on_run) {
on_run_.push_back(::std::move(on_run));
}
+void ShmEventLoop::HandleWatcherSignal() {
+ while (true) {
+ // Call the handlers in time order of their messages.
+ aos::monotonic_clock::time_point min_event_time =
+ aos::monotonic_clock::max_time;
+ size_t min_watcher_index = -1;
+ size_t watcher_index = 0;
+ for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+ internal::WatcherState *watcher =
+ reinterpret_cast<internal::WatcherState *>(base_watcher.get());
+
+ if (watcher->HasNewData()) {
+ if (watcher->event_time() < min_event_time) {
+ min_watcher_index = watcher_index;
+ min_event_time = watcher->event_time();
+ }
+ }
+ ++watcher_index;
+ }
+
+ if (min_event_time == aos::monotonic_clock::max_time) {
+ break;
+ }
+
+ reinterpret_cast<internal::WatcherState *>(
+ watchers_[min_watcher_index].get())
+ ->CallCallback();
+ }
+}
+
void ShmEventLoop::Run() {
+ // TODO(austin): Automatically register ^C with a sigaction so 2 in a row send
+ // an actual control C.
+
std::unique_ptr<ipc_lib::SignalFd> signalfd;
if (watchers_.size() > 0) {
@@ -543,32 +544,13 @@
// watchers, and calling the oldest thing first. That will improve
// determinism a lot.
- while (true) {
- // Call the handlers in time order of their messages.
- aos::monotonic_clock::time_point min_event_time =
- aos::monotonic_clock::max_time;
- size_t min_watcher_index = -1;
- size_t watcher_index = 0;
- for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
- if (watcher->HasNewData()) {
- if (watcher->event_time() < min_event_time) {
- min_watcher_index = watcher_index;
- min_event_time = watcher->event_time();
- }
- }
- ++watcher_index;
- }
-
- if (min_event_time == aos::monotonic_clock::max_time) {
- break;
- }
-
- watchers_[min_watcher_index]->CallCallback();
- }
+ HandleWatcherSignal();
});
}
- // Now, all the threads are up. Lock everything into memory and go RT.
+ MaybeScheduleTimingReports();
+
+ // Now, all the callbacks are setup. Lock everything into memory and go RT.
if (priority_ != 0) {
::aos::InitRT();
@@ -580,9 +562,8 @@
// Now that we are realtime (but before the OnRun handlers run), snap the
// queue index.
- for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
- watcher->PointAtNextQueueIndex();
- CHECK(watcher->RegisterWakeup(priority_));
+ for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
+ watcher->Startup(this);
}
// Now that we are RT, run all the OnRun handlers.
@@ -590,12 +571,6 @@
run();
}
- // Start up all the phased loops.
- for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
- phased_loops_) {
- phased_loop->Startup();
- }
-
// And start our main event loop which runs all the timers and handles Quit.
epoll_.Run();
@@ -606,7 +581,9 @@
// Drop RT priority.
::aos::UnsetCurrentThreadRealtimePriority();
- for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) {
+ for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
+ internal::WatcherState *watcher =
+ reinterpret_cast<internal::WatcherState *>(base_watcher.get());
watcher->UnregisterWakeup();
}
@@ -619,6 +596,15 @@
void ShmEventLoop::Exit() { epoll_.Quit(); }
ShmEventLoop::~ShmEventLoop() {
+ // Trigger any remaining senders or fetchers to be cleared before destroying
+ // the event loop so the book keeping matches.
+ timing_report_sender_.reset();
+
+ // Force everything with a registered fd with epoll to be destroyed now.
+ timers_.clear();
+ phased_loops_.clear();
+ watchers_.clear();
+
CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
}
@@ -642,4 +628,6 @@
priority_ = priority;
}
+pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
+
} // namespace aos