blob: 5034f0b50db000cb43d7c4463396428c9765c9f8 [file] [log] [blame]
#include "aos/events/shm-event-loop.h"
#include <sys/timerfd.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <stdexcept>
#include "aos/events/epoll.h"
#include "aos/init.h"
#include "aos/logging/logging.h"
#include "aos/queue.h"
#include "aos/util/phased_loop.h"
namespace aos {
ShmEventLoop::ShmEventLoop() {}
namespace {
namespace chrono = ::std::chrono;
class ShmFetcher : public RawFetcher {
public:
explicit ShmFetcher(RawQueue *queue) : queue_(queue) {
// Move index_ to point to the end of the queue as it is at construction
// time. Also grab the oldest message but don't expose it to the user yet.
static constexpr Options<RawQueue> kOptions =
RawQueue::kFromEnd | RawQueue::kNonBlock;
msg_ = queue_->ReadMessageIndex(kOptions, &index_);
}
~ShmFetcher() {
if (msg_) {
queue_->FreeMessage(msg_);
}
}
bool FetchNext() override {
const void *msg = queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_);
// Only update the internal pointer if we got a new message.
if (msg != nullptr) {
queue_->FreeMessage(msg_);
msg_ = msg;
set_most_recent(msg_);
}
return msg != nullptr;
}
bool Fetch() override {
static constexpr Options<RawQueue> kOptions =
RawQueue::kFromEnd | RawQueue::kNonBlock;
const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
// Only update the internal pointer if we got a new message.
if (msg != nullptr && msg != msg_) {
queue_->FreeMessage(msg_);
msg_ = msg;
set_most_recent(msg_);
return true;
} else {
// The message has to get freed if we didn't use it (and
// RawQueue::FreeMessage is ok to call on nullptr).
queue_->FreeMessage(msg);
// We have a message from construction time. Give it to the user now.
if (msg_ != nullptr && most_recent() != msg_) {
set_most_recent(msg_);
return true;
} else {
return false;
}
}
}
private:
int index_ = 0;
RawQueue *queue_;
const void *msg_ = nullptr;
};
class ShmSender : public RawSender {
public:
explicit ShmSender(RawQueue *queue) : queue_(queue) {}
::aos::Message *GetMessage() override {
return reinterpret_cast<::aos::Message *>(queue_->GetMessage());
}
void Free(::aos::Message *msg) override { queue_->FreeMessage(msg); }
bool Send(::aos::Message *msg) override {
assert(queue_ != nullptr);
{
// TODO(austin): This lets multiple senders reorder messages since time
// isn't acquired with a lock held.
if (msg->sent_time == monotonic_clock::min_time) {
msg->sent_time = monotonic_clock::now();
}
}
return queue_->WriteMessage(msg, RawQueue::kOverride);
}
const char *name() const override { return queue_->name(); }
private:
RawQueue *queue_;
};
} // namespace
namespace internal {
// Class to manage the state for a Watcher.
class WatcherThreadState {
public:
WatcherThreadState(
ShmEventLoop::ThreadState *thread_state, RawQueue *queue,
::std::function<void(const ::aos::Message *message)> watcher)
: thread_state_(thread_state),
queue_(queue),
index_(0),
watcher_(::std::move(watcher)) {}
~WatcherThreadState() {
// Only kill the thread if it is running.
if (running_) {
// TODO(austin): CHECK that we aren't RT here.
// Try joining. If we fail, we weren't asleep on the condition in the
// queue. So hit it again and again until that's true.
struct timespec end_time;
AOS_PCHECK(clock_gettime(CLOCK_REALTIME, &end_time) == 0);
while (true) {
void *retval = nullptr;
end_time.tv_nsec += 100000000;
if (end_time.tv_nsec > 1000000000L) {
end_time.tv_nsec -= 1000000000L;
++end_time.tv_sec;
}
int ret = pthread_timedjoin_np(pthread_, &retval, &end_time);
if (ret == ETIMEDOUT) continue;
AOS_PCHECK(ret == 0);
break;
}
}
}
// Starts the thread and waits until it is running.
void Start() {
AOS_PCHECK(pthread_create(&pthread_, nullptr, &StaticRun, this) == 0);
IPCRecursiveMutexLocker locker(&thread_started_mutex_);
if (locker.owner_died()) ::aos::Die("Owner died");
while (!running_) {
AOS_CHECK(!thread_started_condition_.Wait());
}
}
void GrabQueueIndex() {
// Right after we are signaled to start, point index to the current index
// so we don't read any messages received before now. Otherwise we will
// get a significantly delayed read.
static constexpr Options<RawQueue> kOptions =
RawQueue::kFromEnd | RawQueue::kNonBlock;
const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
if (msg) {
queue_->FreeMessage(msg);
}
}
private:
// Runs Run given a WatcherThreadState as the argument. This is an adapter
// between pthreads and Run.
static void *StaticRun(void *arg) {
WatcherThreadState *watcher_thread_state =
reinterpret_cast<WatcherThreadState *>(arg);
watcher_thread_state->Run();
return nullptr;
}
// Runs the watcher callback on new messages.
void Run() {
::aos::SetCurrentThreadName(thread_state_->name() + ".watcher");
// Signal the main thread that we are now ready.
thread_state_->MaybeSetCurrentThreadRealtimePriority();
{
IPCRecursiveMutexLocker locker(&thread_started_mutex_);
if (locker.owner_died()) ::aos::Die("Owner died");
running_ = true;
thread_started_condition_.Broadcast();
}
// Wait for the global start before handling events.
thread_state_->WaitForStart();
// Bail immediately if we are supposed to stop.
if (!thread_state_->is_running()) {
::aos::UnsetCurrentThreadRealtimePriority();
return;
}
const void *msg = nullptr;
while (true) {
msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_,
chrono::seconds(1));
// We hit a timeout. Confirm that we should be running and retry. Note,
// is_running is threadsafe (it's an atomic underneath). Worst case, we
// check again in a second.
if (msg == nullptr) {
if (!thread_state_->is_running()) break;
continue;
}
{
// Grab the lock so that only one callback can be called at a time.
MutexLocker locker(&thread_state_->mutex_);
if (!thread_state_->is_running()) break;
watcher_(reinterpret_cast<const Message *>(msg));
// watcher_ may have exited the event loop.
if (!thread_state_->is_running()) break;
}
// Drop the reference.
queue_->FreeMessage(msg);
}
// And drop the last reference.
queue_->FreeMessage(msg);
// Now that everything is cleaned up, drop RT priority before destroying the
// thread.
::aos::UnsetCurrentThreadRealtimePriority();
}
pthread_t pthread_;
ShmEventLoop::ThreadState *thread_state_;
RawQueue *queue_;
int32_t index_;
bool running_ = false;
::std::function<void(const Message *message)> watcher_;
// Mutex and condition variable used to wait until the thread is started
// before going RT.
::aos::Mutex thread_started_mutex_;
::aos::Condition thread_started_condition_{&thread_started_mutex_};
};
// Adapter class to adapt a timerfd to a TimerHandler.
// The part of the API which is accessed by the TimerHandler interface needs to
// be threadsafe. This means Setup and Disable.
class TimerHandlerState : public TimerHandler {
public:
TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
: shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) {
shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
timerfd_.Read();
fn_();
});
}
~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
// SetTime is threadsafe already.
timerfd_.SetTime(base, repeat_offset);
}
void Disable() override {
// Disable is also threadsafe already.
timerfd_.Disable();
}
private:
ShmEventLoop *shm_event_loop_;
TimerFd timerfd_;
// Function to be run on the thread
::std::function<void()> fn_;
};
// Adapter class to the timerfd and PhasedLoop.
// The part of the API which is accessed by the PhasedLoopHandler interface
// needs to be threadsafe. This means set_interval_and_offset
class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
public:
PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: shm_event_loop_(shm_event_loop),
phased_loop_(interval, shm_event_loop_->monotonic_now(), offset),
fn_(::std::move(fn)) {
shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
MutexLocker locker(&shm_event_loop_->thread_state_.mutex_);
{
MutexLocker locker(&mutex_);
timerfd_.Read();
}
// Call the function. To avoid needing a recursive mutex, drop the lock
// before running the function.
fn_(cycles_elapsed_);
{
MutexLocker locker(&mutex_);
Reschedule();
}
});
}
~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
void set_interval_and_offset(
const monotonic_clock::duration interval,
const monotonic_clock::duration offset) override {
MutexLocker locker(&mutex_);
phased_loop_.set_interval_and_offset(interval, offset);
}
void Startup() {
MutexLocker locker(&mutex_);
phased_loop_.Reset(shm_event_loop_->monotonic_now());
Reschedule();
}
private:
// Reschedules the timer. Must be called with the mutex held.
void Reschedule() {
cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now());
timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero());
}
ShmEventLoop *shm_event_loop_;
// Mutex to protect access to the timerfd_ (not strictly necessary), and the
// phased_loop (necessary).
::aos::Mutex mutex_;
TimerFd timerfd_;
time::PhasedLoop phased_loop_;
int cycles_elapsed_ = 1;
// Function to be run
const ::std::function<void(int)> fn_;
};
} // namespace internal
::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
const ::std::string &path, const QueueTypeInfo &type) {
return ::std::unique_ptr<RawFetcher>(new ShmFetcher(
RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
}
::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
const ::std::string &path, const QueueTypeInfo &type) {
Take(path);
return ::std::unique_ptr<RawSender>(new ShmSender(
RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
}
void ShmEventLoop::MakeRawWatcher(
const ::std::string &path, const QueueTypeInfo &type,
::std::function<void(const Message *message)> watcher) {
Take(path);
::std::unique_ptr<internal::WatcherThreadState> state(
new internal::WatcherThreadState(
&thread_state_, RawQueue::Fetch(path.c_str(), type.size, type.hash,
type.queue_length),
std::move(watcher)));
watchers_.push_back(::std::move(state));
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
::std::unique_ptr<internal::TimerHandlerState> timer(
new internal::TimerHandlerState(this, ::std::move(callback)));
timers_.push_back(::std::move(timer));
return timers_.back().get();
}
PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset) {
::std::unique_ptr<internal::PhasedLoopHandler> phased_loop(
new internal::PhasedLoopHandler(this, ::std::move(callback), interval,
offset));
phased_loops_.push_back(::std::move(phased_loop));
return phased_loops_.back().get();
}
void ShmEventLoop::OnRun(::std::function<void()> on_run) {
on_run_.push_back(::std::move(on_run));
}
void ShmEventLoop::set_name(const char *name) { thread_state_.name_ = name; }
void ShmEventLoop::Run() {
// Start all the watcher threads.
for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
watcher->Start();
}
::aos::SetCurrentThreadName(thread_state_.name());
// Now, all the threads are up. Lock everything into memory and go RT.
if (thread_state_.priority_ != -1) {
::aos::InitRT();
}
thread_state_.MaybeSetCurrentThreadRealtimePriority();
set_is_running(true);
// Now that we are realtime (but before the OnRun handlers run), snap the
// queue index.
for (::std::unique_ptr<internal::WatcherThreadState> &watcher : watchers_) {
watcher->GrabQueueIndex();
}
// Now that we are RT, run all the OnRun handlers.
for (const auto &run : on_run_) {
run();
}
// Start up all the phased loops.
for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop :
phased_loops_) {
phased_loop->Startup();
}
// TODO(austin): We don't need a separate watcher thread if there are only
// watchers and fetchers. Could lazily create the epoll loop and pick a
// victim watcher to run in this thread.
// Trigger all the threads to start now.
thread_state_.Start();
// And start our main event loop which runs all the timers and handles Quit.
epoll_.Run();
// Once epoll exits, there is no useful nonrt work left to do.
set_is_running(false);
// Signal all the watcher threads to exit. After this point, no more
// callbacks will be handled.
thread_state_.Exit();
// Nothing time or synchronization critical needs to happen after this point.
// Drop RT priority.
::aos::UnsetCurrentThreadRealtimePriority();
// The watcher threads get cleaned up in the destructor.
}
void ShmEventLoop::ThreadState::Start() {
MutexLocker locker(&mutex_);
loop_running_ = true;
if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
loop_running_cond_.Broadcast();
}
void ShmEventLoop::ThreadState::WaitForStart() {
MutexLocker locker(&mutex_);
while (!(loop_running_ || loop_finished_)) {
Condition::WaitResult wait_result =
loop_running_cond_.WaitTimed(chrono::milliseconds(1000));
if (wait_result == Condition::WaitResult::kOwnerDied) {
::aos::Die("ShmEventLoop mutex lock problem.\n");
}
}
}
void ShmEventLoop::ThreadState::MaybeSetCurrentThreadRealtimePriority() {
if (priority_ != -1) {
::aos::SetCurrentThreadRealtimePriority(priority_);
}
}
void ShmEventLoop::Exit() { epoll_.Quit(); }
void ShmEventLoop::ThreadState::Exit() {
IPCRecursiveMutexLocker locker(&mutex_);
if (locker.owner_died()) ::aos::Die("Owner died");
loop_running_ = false;
loop_finished_ = true;
loop_running_cond_.Broadcast();
}
ShmEventLoop::~ShmEventLoop() {
if (is_running()) {
::aos::Die("ShmEventLoop destroyed while running\n");
}
}
void ShmEventLoop::Take(const ::std::string &path) {
if (is_running()) {
::aos::Die("Cannot add new objects while running.\n");
}
const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
if (prior != taken_.end()) {
::aos::Die("%s is already being used.", path.c_str());
} else {
taken_.emplace_back(path);
}
}
} // namespace aos