blob: dad62cd9eda19d9a5bf541fee0f1f48881d52df5 [file] [log] [blame]
#include "aos/events/shm_event_loop.h"
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <iterator>
#include <stdexcept>
#include "aos/events/aos_logging.h"
#include "aos/events/epoll.h"
#include "aos/events/event_loop_generated.h"
#include "aos/events/timing_statistics.h"
#include "aos/ipc_lib/lockless_queue.h"
#include "aos/ipc_lib/signalfd.h"
#include "aos/realtime.h"
#include "aos/stl_mutex/stl_mutex.h"
#include "aos/util/file.h"
#include "aos/util/phased_loop.h"
#include "glog/logging.h"
namespace {
// Returns the portion of the path after the last /. This very much assumes
// that the application name is null terminated.
const char *Filename(const char *path) {
const std::string_view path_string_view = path;
auto last_slash_pos = path_string_view.find_last_of("/");
return last_slash_pos == std::string_view::npos ? path
: path + last_slash_pos + 1;
}
} // namespace
DEFINE_string(shm_base, "/dev/shm/aos",
"Directory to place queue backing mmaped files in.");
DEFINE_uint32(permissions, 0770,
"Permissions to make shared memory files and folders.");
DEFINE_string(application_name, Filename(program_invocation_name),
"The application name");
namespace aos {
void SetShmBase(const std::string_view base) {
FLAGS_shm_base = std::string(base) + "/dev/shm/aos";
}
std::string ShmFolder(const Channel *channel) {
CHECK(channel->has_name());
CHECK_EQ(channel->name()->string_view()[0], '/');
return FLAGS_shm_base + channel->name()->str() + "/";
}
std::string ShmPath(const Channel *channel) {
CHECK(channel->has_type());
return ShmFolder(channel) + channel->type()->str() + ".v1";
}
class MMapedQueue {
public:
MMapedQueue(const Channel *channel,
const std::chrono::seconds channel_storage_duration) {
std::string path = ShmPath(channel);
config_.num_watchers = channel->num_watchers();
config_.num_senders = channel->num_senders();
config_.queue_size =
channel_storage_duration.count() * channel->frequency();
config_.message_data_size = channel->max_size();
size_ = ipc_lib::LocklessQueueMemorySize(config_);
util::MkdirP(path, FLAGS_permissions);
// There are 2 cases. Either the file already exists, or it does not
// already exist and we need to create it. Start by trying to create it. If
// that fails, the file has already been created and we can open it
// normally.. Once the file has been created it wil never be deleted.
fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
O_CLOEXEC | FLAGS_permissions);
if (fd_ == -1 && errno == EEXIST) {
VLOG(1) << path << " already created.";
// File already exists.
fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
PCHECK(fd_ != -1) << ": Failed to open " << path;
while (true) {
struct stat st;
PCHECK(fstat(fd_, &st) == 0);
if (st.st_size != 0) {
CHECK_EQ(static_cast<size_t>(st.st_size), size_)
<< ": Size of " << path
<< " doesn't match expected size of backing queue file. Did the "
"queue definition change?";
break;
} else {
// The creating process didn't get around to it yet. Give it a bit.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
VLOG(1) << path << " is zero size, waiting";
}
}
} else {
VLOG(1) << "Created " << path;
PCHECK(fd_ != -1) << ": Failed to open " << path;
PCHECK(ftruncate(fd_, size_) == 0);
}
data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
PCHECK(data_ != MAP_FAILED);
ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
}
~MMapedQueue() {
PCHECK(munmap(data_, size_) == 0);
PCHECK(close(fd_) == 0);
}
ipc_lib::LocklessQueueMemory *memory() const {
return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
}
const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
absl::Span<char> GetSharedMemory() const {
return absl::Span<char>(static_cast<char *>(data_), size_);
}
private:
ipc_lib::LocklessQueueConfiguration config_;
int fd_;
size_t size_;
void *data_;
};
namespace {
const Node *MaybeMyNode(const Configuration *configuration) {
if (!configuration->has_nodes()) {
return nullptr;
}
return configuration::GetMyNode(configuration);
}
namespace chrono = ::std::chrono;
} // namespace
ShmEventLoop::ShmEventLoop(const Configuration *configuration)
: EventLoop(configuration),
name_(FLAGS_application_name),
node_(MaybeMyNode(configuration)) {
if (configuration->has_nodes()) {
CHECK(node_ != nullptr) << ": Couldn't find node in config.";
}
}
namespace internal {
class SimpleShmFetcher {
public:
explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
: channel_(channel),
lockless_queue_memory_(
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()),
data_storage_(static_cast<char *>(malloc(channel->max_size() +
kChannelDataAlignment - 1)),
&free) {
context_.data = nullptr;
// Point the queue index at the next index to read starting now. This
// makes it such that FetchNext will read the next message sent after
// the fetcher is created.
PointAtNextQueueIndex();
}
~SimpleShmFetcher() {}
// Points the next message to fetch at the queue index which will be
// populated next.
void PointAtNextQueueIndex() {
actual_queue_index_ = lockless_queue_.LatestQueueIndex();
if (!actual_queue_index_.valid()) {
// Nothing in the queue. The next element will show up at the 0th
// index in the queue.
actual_queue_index_ =
ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
} else {
actual_queue_index_ = actual_queue_index_.Increment();
}
}
bool FetchNext() {
// TODO(austin): Get behind and make sure it dies both here and with
// Fetch.
ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
actual_queue_index_.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
&context_.size, data_storage_start());
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = actual_queue_index_.index();
if (context_.remote_queue_index == 0xffffffffu) {
context_.remote_queue_index = context_.queue_index;
}
if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
context_.monotonic_remote_time = context_.monotonic_event_time;
}
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
context_.data = data_storage_start() +
lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = actual_queue_index_.Increment();
}
// Make sure the data wasn't modified while we were reading it. This
// can only happen if you are reading the last message *while* it is
// being written to, which means you are pretty far behind.
CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
<< ": Got behind while reading and the last message was modified "
"out from under us while we were reading it. Don't get so far "
"behind. "
<< configuration::CleanedChannelToString(channel_);
CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
<< ": The next message is no longer available. "
<< configuration::CleanedChannelToString(channel_);
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
bool Fetch() {
const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
// actual_queue_index_ is only meaningful if it was set by Fetch or
// FetchNext. This happens when valid_data_ has been set. So, only
// skip checking if valid_data_ is true.
//
// Also, if the latest queue index is invalid, we are empty. So there
// is nothing to fetch.
if ((context_.data != nullptr &&
queue_index == actual_queue_index_.DecrementBy(1u)) ||
!queue_index.valid()) {
return false;
}
ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
queue_index.index(), &context_.monotonic_event_time,
&context_.realtime_event_time, &context_.monotonic_remote_time,
&context_.realtime_remote_time, &context_.remote_queue_index,
&context_.size, data_storage_start());
if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
context_.queue_index = queue_index.index();
if (context_.remote_queue_index == 0xffffffffu) {
context_.remote_queue_index = context_.queue_index;
}
if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
context_.monotonic_remote_time = context_.monotonic_event_time;
}
if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
context_.realtime_remote_time = context_.realtime_event_time;
}
context_.data = data_storage_start() +
lockless_queue_.message_data_size() - context_.size;
actual_queue_index_ = queue_index.Increment();
}
// Make sure the data wasn't modified while we were reading it. This
// can only happen if you are reading the last message *while* it is
// being written to, which means you are pretty far behind.
CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
<< ": Got behind while reading and the last message was modified "
"out from under us while we were reading it. Don't get so far "
"behind."
<< configuration::CleanedChannelToString(channel_);
CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
<< ": Queue index went backwards. This should never happen. "
<< configuration::CleanedChannelToString(channel_);
// We fell behind between when we read the index and read the value.
// This isn't worth recovering from since this means we went to sleep
// for a long time in the middle of this function.
CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
<< ": The next message is no longer available. "
<< configuration::CleanedChannelToString(channel_);
return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
}
Context context() const { return context_; }
bool RegisterWakeup(int priority) {
return lockless_queue_.RegisterWakeup(priority);
}
void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
absl::Span<char> GetSharedMemory() const {
return lockless_queue_memory_.GetSharedMemory();
}
private:
char *data_storage_start() {
return RoundChannelData(data_storage_.get(), channel_->max_size());
}
const Channel *const channel_;
MMapedQueue lockless_queue_memory_;
ipc_lib::LocklessQueue lockless_queue_;
ipc_lib::QueueIndex actual_queue_index_ =
ipc_lib::LocklessQueue::empty_queue_index();
std::unique_ptr<char, decltype(&free)> data_storage_;
Context context_;
};
class ShmFetcher : public RawFetcher {
public:
explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
: RawFetcher(event_loop, channel),
simple_shm_fetcher_(event_loop, channel) {}
~ShmFetcher() { context_.data = nullptr; }
std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
if (simple_shm_fetcher_.FetchNext()) {
context_ = simple_shm_fetcher_.context();
return std::make_pair(true, monotonic_clock::now());
}
return std::make_pair(false, monotonic_clock::min_time);
}
std::pair<bool, monotonic_clock::time_point> DoFetch() override {
if (simple_shm_fetcher_.Fetch()) {
context_ = simple_shm_fetcher_.context();
return std::make_pair(true, monotonic_clock::now());
}
return std::make_pair(false, monotonic_clock::min_time);
}
private:
SimpleShmFetcher simple_shm_fetcher_;
};
class ShmSender : public RawSender {
public:
explicit ShmSender(EventLoop *event_loop, const Channel *channel)
: RawSender(event_loop, channel),
lockless_queue_memory_(
channel,
chrono::ceil<chrono::seconds>(chrono::nanoseconds(
event_loop->configuration()->channel_storage_duration()))),
lockless_queue_(lockless_queue_memory_.memory(),
lockless_queue_memory_.config()),
lockless_queue_sender_(lockless_queue_.MakeSender()) {}
~ShmSender() override {}
void *data() override { return lockless_queue_sender_.Data(); }
size_t size() override { return lockless_queue_sender_.size(); }
bool DoSend(size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index) override {
lockless_queue_sender_.Send(
length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
&monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
lockless_queue_.Wakeup(event_loop()->priority());
return true;
}
bool DoSend(const void *msg, size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index) override {
lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length,
monotonic_remote_time, realtime_remote_time,
remote_queue_index, &monotonic_sent_time_,
&realtime_sent_time_, &sent_queue_index_);
lockless_queue_.Wakeup(event_loop()->priority());
// TODO(austin): Return an error if we send too fast.
return true;
}
absl::Span<char> GetSharedMemory() const {
return lockless_queue_memory_.GetSharedMemory();
}
private:
MMapedQueue lockless_queue_memory_;
ipc_lib::LocklessQueue lockless_queue_;
ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
};
// Class to manage the state for a Watcher.
class WatcherState : public aos::WatcherState {
public:
WatcherState(
ShmEventLoop *event_loop, const Channel *channel,
std::function<void(const Context &context, const void *message)> fn)
: aos::WatcherState(event_loop, channel, std::move(fn)),
event_loop_(event_loop),
event_(this),
simple_shm_fetcher_(event_loop, channel) {}
~WatcherState() override { event_loop_->RemoveEvent(&event_); }
void Startup(EventLoop *event_loop) override {
simple_shm_fetcher_.PointAtNextQueueIndex();
CHECK(RegisterWakeup(event_loop->priority()));
}
// Returns true if there is new data available.
bool CheckForNewData() {
if (!has_new_data_) {
has_new_data_ = simple_shm_fetcher_.FetchNext();
if (has_new_data_) {
event_.set_event_time(
simple_shm_fetcher_.context().monotonic_event_time);
event_loop_->AddEvent(&event_);
}
}
return has_new_data_;
}
// Consumes the data by calling the callback.
void HandleEvent() {
CHECK(has_new_data_);
DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
has_new_data_ = false;
CheckForNewData();
}
// Registers us to receive a signal on event reception.
bool RegisterWakeup(int priority) {
return simple_shm_fetcher_.RegisterWakeup(priority);
}
void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
absl::Span<char> GetSharedMemory() const {
return simple_shm_fetcher_.GetSharedMemory();
}
private:
bool has_new_data_ = false;
ShmEventLoop *event_loop_;
EventHandler<WatcherState> event_;
SimpleShmFetcher simple_shm_fetcher_;
};
// Adapter class to adapt a timerfd to a TimerHandler.
class TimerHandlerState final : public TimerHandler {
public:
TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
: TimerHandler(shm_event_loop, std::move(fn)),
shm_event_loop_(shm_event_loop),
event_(this) {
shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
// The timer may fire spurriously. HandleEvent on the event loop will
// call the callback if it is needed. It may also have called it when
// processing some other event, and the kernel decided to deliver this
// wakeup anyways.
timerfd_.Read();
shm_event_loop_->HandleEvent();
});
}
~TimerHandlerState() {
Disable();
shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
}
void HandleEvent() {
CHECK(!event_.valid());
const auto monotonic_now = Call(monotonic_clock::now, base_);
if (event_.valid()) {
// If someone called Setup inside Call, rescheduling is already taken care
// of. Bail.
return;
}
if (repeat_offset_ == chrono::seconds(0)) {
timerfd_.Disable();
} else {
// Compute how many cycles have elapsed and schedule the next iteration
// for the next iteration in the future.
const int elapsed_cycles =
std::max<int>(0, (monotonic_now - base_ + repeat_offset_ -
std::chrono::nanoseconds(1)) /
repeat_offset_);
base_ += repeat_offset_ * elapsed_cycles;
// Update the heap and schedule the timerfd wakeup.
event_.set_event_time(base_);
shm_event_loop_->AddEvent(&event_);
timerfd_.SetTime(base_, chrono::seconds(0));
}
}
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
if (event_.valid()) {
shm_event_loop_->RemoveEvent(&event_);
}
timerfd_.SetTime(base, repeat_offset);
base_ = base;
repeat_offset_ = repeat_offset;
event_.set_event_time(base_);
shm_event_loop_->AddEvent(&event_);
}
void Disable() override {
shm_event_loop_->RemoveEvent(&event_);
timerfd_.Disable();
}
private:
ShmEventLoop *shm_event_loop_;
EventHandler<TimerHandlerState> event_;
TimerFd timerfd_;
monotonic_clock::time_point base_;
monotonic_clock::duration repeat_offset_;
};
// Adapter class to the timerfd and PhasedLoop.
class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
public:
PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
shm_event_loop_(shm_event_loop),
event_(this) {
shm_event_loop_->epoll_.OnReadable(
timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
}
void HandleEvent() {
// The return value for read is the number of cycles that have elapsed.
// Because we check to see when this event *should* have happened, there are
// cases where Read() will return 0, when 1 cycle has actually happened.
// This occurs when the timer interrupt hasn't triggered yet. Therefore,
// ignore it. Call handles rescheduling and calculating elapsed cycles
// without any extra help.
timerfd_.Read();
event_.Invalidate();
Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
Schedule(sleep_time);
});
}
~PhasedLoopHandler() override {
shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
shm_event_loop_->RemoveEvent(&event_);
}
private:
// Reschedules the timer.
void Schedule(monotonic_clock::time_point sleep_time) override {
if (event_.valid()) {
shm_event_loop_->RemoveEvent(&event_);
}
timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
event_.set_event_time(sleep_time);
shm_event_loop_->AddEvent(&event_);
}
ShmEventLoop *shm_event_loop_;
EventHandler<PhasedLoopHandler> event_;
TimerFd timerfd_;
};
} // namespace internal
::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
const Channel *channel) {
if (!configuration::ChannelIsReadableOnNode(channel, node())) {
LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
<< "\", \"type\": \"" << channel->type()->string_view()
<< "\" } is not able to be fetched on this node. Check your "
"configuration.";
}
return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
}
::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
const Channel *channel) {
TakeSender(channel);
return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
}
void ShmEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &context, const void *message)> watcher) {
TakeWatcher(channel);
NewWatcher(::std::unique_ptr<WatcherState>(
new internal::WatcherState(this, channel, std::move(watcher))));
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
return NewTimer(::std::unique_ptr<TimerHandler>(
new internal::TimerHandlerState(this, ::std::move(callback))));
}
PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset) {
return NewPhasedLoop(
::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
this, ::std::move(callback), interval, offset)));
}
void ShmEventLoop::OnRun(::std::function<void()> on_run) {
on_run_.push_back(::std::move(on_run));
}
void ShmEventLoop::HandleEvent() {
// Update all the times for handlers.
for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
internal::WatcherState *watcher =
reinterpret_cast<internal::WatcherState *>(base_watcher.get());
watcher->CheckForNewData();
}
while (true) {
if (EventCount() == 0 ||
PeekEvent()->event_time() > monotonic_clock::now()) {
break;
}
EventLoopEvent *event = PopEvent();
event->HandleEvent();
}
}
// RAII class to mask signals.
class ScopedSignalMask {
public:
ScopedSignalMask(std::initializer_list<int> signals) {
sigset_t sigset;
PCHECK(sigemptyset(&sigset) == 0);
for (int signal : signals) {
PCHECK(sigaddset(&sigset, signal) == 0);
}
PCHECK(sigprocmask(SIG_BLOCK, &sigset, &old_) == 0);
}
~ScopedSignalMask() { PCHECK(sigprocmask(SIG_SETMASK, &old_, nullptr) == 0); }
private:
sigset_t old_;
};
// Class to manage the static state associated with killing multiple event
// loops.
class SignalHandler {
public:
// Gets the singleton.
static SignalHandler *global() {
static SignalHandler loop;
return &loop;
}
// Handles the signal with the singleton.
static void HandleSignal(int) { global()->DoHandleSignal(); }
// Registers an event loop to receive Exit() calls.
void Register(ShmEventLoop *event_loop) {
// Block signals while we have the mutex so we never race with the signal
// handler.
ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
std::unique_lock<stl_mutex> locker(mutex_);
if (event_loops_.size() == 0) {
// The first caller registers the signal handler.
struct sigaction new_action;
sigemptyset(&new_action.sa_mask);
// This makes it so that 2 control c's to a stuck process will kill it by
// restoring the original signal handler.
new_action.sa_flags = SA_RESETHAND;
new_action.sa_handler = &HandleSignal;
PCHECK(sigaction(SIGINT, &new_action, &old_action_int_) == 0);
PCHECK(sigaction(SIGHUP, &new_action, &old_action_hup_) == 0);
PCHECK(sigaction(SIGTERM, &new_action, &old_action_term_) == 0);
}
event_loops_.push_back(event_loop);
}
// Unregisters an event loop to receive Exit() calls.
void Unregister(ShmEventLoop *event_loop) {
// Block signals while we have the mutex so we never race with the signal
// handler.
ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
std::unique_lock<stl_mutex> locker(mutex_);
event_loops_.erase(
std::find(event_loops_.begin(), event_loops_.end(), event_loop));
if (event_loops_.size() == 0u) {
// The last caller restores the original signal handlers.
PCHECK(sigaction(SIGINT, &old_action_int_, nullptr) == 0);
PCHECK(sigaction(SIGHUP, &old_action_hup_, nullptr) == 0);
PCHECK(sigaction(SIGTERM, &old_action_term_, nullptr) == 0);
}
}
private:
void DoHandleSignal() {
// We block signals while grabbing the lock, so there should never be a
// race. Confirm that this is true using trylock.
CHECK(mutex_.try_lock()) << ": sigprocmask failed to block signals while "
"modifing the event loop list.";
for (ShmEventLoop *event_loop : event_loops_) {
event_loop->Exit();
}
mutex_.unlock();
}
// Mutex to protect all state.
stl_mutex mutex_;
std::vector<ShmEventLoop *> event_loops_;
struct sigaction old_action_int_;
struct sigaction old_action_hup_;
struct sigaction old_action_term_;
};
void ShmEventLoop::Run() {
SignalHandler::global()->Register(this);
std::unique_ptr<ipc_lib::SignalFd> signalfd;
if (watchers_.size() > 0) {
signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
signalfd_siginfo result = signalfd_ptr->Read();
CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
// TODO(austin): We should really be checking *everything*, not just
// watchers, and calling the oldest thing first. That will improve
// determinism a lot.
HandleEvent();
});
}
MaybeScheduleTimingReports();
ReserveEvents();
{
AosLogToFbs aos_logger;
if (!skip_logger_) {
aos_logger.Initialize(MakeSender<logging::LogMessageFbs>("/aos"));
}
aos::SetCurrentThreadName(name_.substr(0, 16));
// Now, all the callbacks are setup. Lock everything into memory and go RT.
if (priority_ != 0) {
::aos::InitRT();
LOG(INFO) << "Setting priority to " << priority_;
::aos::SetCurrentThreadRealtimePriority(priority_);
}
set_is_running(true);
// Now that we are realtime (but before the OnRun handlers run), snap the
// queue index.
for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
watcher->Startup(this);
}
// Now that we are RT, run all the OnRun handlers.
for (const auto &run : on_run_) {
run();
}
// And start our main event loop which runs all the timers and handles Quit.
epoll_.Run();
// Once epoll exits, there is no useful nonrt work left to do.
set_is_running(false);
// Nothing time or synchronization critical needs to happen after this
// point. Drop RT priority.
::aos::UnsetCurrentThreadRealtimePriority();
}
for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
internal::WatcherState *watcher =
reinterpret_cast<internal::WatcherState *>(base_watcher.get());
watcher->UnregisterWakeup();
}
if (watchers_.size() > 0) {
epoll_.DeleteFd(signalfd->fd());
signalfd.reset();
}
SignalHandler::global()->Unregister(this);
// Trigger any remaining senders or fetchers to be cleared before destroying
// the event loop so the book keeping matches. Do this in the thread that
// created the timing reporter.
timing_report_sender_.reset();
}
void ShmEventLoop::Exit() { epoll_.Quit(); }
ShmEventLoop::~ShmEventLoop() {
// Force everything with a registered fd with epoll to be destroyed now.
timers_.clear();
phased_loops_.clear();
watchers_.clear();
CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
}
void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
if (is_running()) {
LOG(FATAL) << "Cannot set realtime priority while running.";
}
priority_ = priority;
}
void ShmEventLoop::set_name(const std::string_view name) {
name_ = std::string(name);
UpdateTimingReport();
}
absl::Span<char> ShmEventLoop::GetWatcherSharedMemory(const Channel *channel) {
internal::WatcherState *const watcher_state =
static_cast<internal::WatcherState *>(GetWatcherState(channel));
return watcher_state->GetSharedMemory();
}
absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
const aos::RawSender *sender) const {
return static_cast<const internal::ShmSender *>(sender)->GetSharedMemory();
}
pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
} // namespace aos