| #include "glog/logging.h" |
| |
| #include "aos/events/shm_event_loop.h" |
| |
| #include <sys/mman.h> |
| #include <sys/stat.h> |
| #include <sys/timerfd.h> |
| #include <sys/types.h> |
| #include <unistd.h> |
| #include <algorithm> |
| #include <atomic> |
| #include <chrono> |
| #include <stdexcept> |
| |
| #include "aos/events/epoll.h" |
| #include "aos/ipc_lib/lockless_queue.h" |
| #include "aos/realtime.h" |
| #include "aos/util/phased_loop.h" |
| |
| DEFINE_string(shm_base, "/dev/shm/aos", |
| "Directory to place queue backing mmaped files in."); |
| DEFINE_uint32(permissions, 0770, |
| "Permissions to make shared memory files and folders."); |
| |
| namespace aos { |
| |
| std::string ShmFolder(const Channel *channel) { |
| CHECK(channel->has_name()); |
| CHECK_EQ(channel->name()->string_view()[0], '/'); |
| return FLAGS_shm_base + channel->name()->str() + "/"; |
| } |
| std::string ShmPath(const Channel *channel) { |
| CHECK(channel->has_type()); |
| return ShmFolder(channel) + channel->type()->str() + ".v0"; |
| } |
| |
| class MMapedQueue { |
| public: |
| MMapedQueue(const Channel *channel) { |
| std::string path = ShmPath(channel); |
| |
| // TODO(austin): Pull these out into the config if there is a need. |
| config_.num_watchers = 10; |
| config_.num_senders = 10; |
| config_.queue_size = 2 * channel->frequency(); |
| config_.message_data_size = channel->max_size(); |
| |
| size_ = ipc_lib::LocklessQueueMemorySize(config_); |
| |
| MkdirP(path); |
| |
| // There are 2 cases. Either the file already exists, or it does not |
| // already exist and we need to create it. Start by trying to create it. If |
| // that fails, the file has already been created and we can open it |
| // normally.. Once the file has been created it wil never be deleted. |
| fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL, |
| O_CLOEXEC | FLAGS_permissions); |
| if (fd_ == -1 && errno == EEXIST) { |
| VLOG(1) << path << " already created."; |
| // File already exists. |
| fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC); |
| PCHECK(fd_ != -1) << ": Failed to open " << path; |
| while (true) { |
| struct stat st; |
| PCHECK(fstat(fd_, &st) == 0); |
| if (st.st_size != 0) { |
| CHECK_EQ(static_cast<size_t>(st.st_size), size_) |
| << ": Size of " << path |
| << " doesn't match expected size of backing queue file. Did the " |
| "queue definition change?"; |
| break; |
| } else { |
| // The creating process didn't get around to it yet. Give it a bit. |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| VLOG(1) << path << " is zero size, waiting"; |
| } |
| } |
| } else { |
| VLOG(1) << "Created " << path; |
| PCHECK(fd_ != -1) << ": Failed to open " << path; |
| PCHECK(ftruncate(fd_, size_) == 0); |
| } |
| |
| data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); |
| PCHECK(data_ != MAP_FAILED); |
| |
| ipc_lib::InitializeLocklessQueueMemory(memory(), config_); |
| } |
| |
| ~MMapedQueue() { |
| PCHECK(munmap(data_, size_) == 0); |
| PCHECK(close(fd_) == 0); |
| } |
| |
| ipc_lib::LocklessQueueMemory *memory() const { |
| return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_); |
| } |
| |
| const ipc_lib::LocklessQueueConfiguration &config() const { |
| return config_; |
| } |
| |
| private: |
| void MkdirP(absl::string_view path) { |
| struct stat st; |
| auto last_slash_pos = path.find_last_of("/"); |
| |
| std::string folder(last_slash_pos == absl::string_view::npos |
| ? absl::string_view("") |
| : path.substr(0, last_slash_pos)); |
| if (stat(folder.c_str(), &st) == -1) { |
| PCHECK(errno == ENOENT); |
| CHECK_NE(folder, "") << ": Base path doesn't exist"; |
| MkdirP(folder); |
| VLOG(1) << "Creating " << folder; |
| PCHECK(mkdir(folder.c_str(), FLAGS_permissions) == 0); |
| } |
| } |
| |
| ipc_lib::LocklessQueueConfiguration config_; |
| |
| int fd_; |
| |
| size_t size_; |
| void *data_; |
| }; |
| |
| // Returns the portion of the path after the last /. |
| absl::string_view Filename(absl::string_view path) { |
| auto last_slash_pos = path.find_last_of("/"); |
| |
| return last_slash_pos == absl::string_view::npos |
| ? path |
| : path.substr(last_slash_pos + 1, path.size()); |
| } |
| |
| ShmEventLoop::ShmEventLoop(const Configuration *configuration) |
| : EventLoop(configuration), name_(Filename(program_invocation_name)) {} |
| |
| namespace { |
| |
| namespace chrono = ::std::chrono; |
| |
| class ShmFetcher : public RawFetcher { |
| public: |
| explicit ShmFetcher(const Channel *channel) |
| : lockless_queue_memory_(channel), |
| lockless_queue_(lockless_queue_memory_.memory(), |
| lockless_queue_memory_.config()), |
| data_storage_(static_cast<AlignedChar *>(aligned_alloc( |
| alignof(AlignedChar), channel->max_size())), |
| &free) { |
| context_.data = nullptr; |
| // Point the queue index at the next index to read starting now. This |
| // makes it such that FetchNext will read the next message sent after |
| // the fetcher is created. |
| PointAtNextQueueIndex(); |
| } |
| |
| ~ShmFetcher() { data_ = nullptr; } |
| |
| // Points the next message to fetch at the queue index which will be |
| // populated next. |
| void PointAtNextQueueIndex() { |
| actual_queue_index_ = lockless_queue_.LatestQueueIndex(); |
| if (!actual_queue_index_.valid()) { |
| // Nothing in the queue. The next element will show up at the 0th |
| // index in the queue. |
| actual_queue_index_ = |
| ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size()); |
| } else { |
| actual_queue_index_ = actual_queue_index_.Increment(); |
| } |
| } |
| |
| bool FetchNext() override { |
| // TODO(austin): Write a test which starts with nothing in the queue, |
| // and then calls FetchNext() after something is sent. |
| // TODO(austin): Get behind and make sure it dies both here and with |
| // Fetch. |
| ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read( |
| actual_queue_index_.index(), &context_.monotonic_sent_time, |
| &context_.realtime_sent_time, &context_.size, |
| reinterpret_cast<char *>(data_storage_.get())); |
| if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) { |
| context_.queue_index = actual_queue_index_.index(); |
| data_ = reinterpret_cast<char *>(data_storage_.get()) + |
| lockless_queue_.message_data_size() - context_.size; |
| context_.data = data_; |
| actual_queue_index_ = actual_queue_index_.Increment(); |
| } |
| |
| // Make sure the data wasn't modified while we were reading it. This |
| // can only happen if you are reading the last message *while* it is |
| // being written to, which means you are pretty far behind. |
| CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE) |
| << ": Got behind while reading and the last message was modified " |
| "out " |
| "from under us while we were reading it. Don't get so far " |
| "behind."; |
| |
| CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD) |
| << ": The next message is no longer available."; |
| return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD; |
| } |
| |
| bool Fetch() override { |
| const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex(); |
| // actual_queue_index_ is only meaningful if it was set by Fetch or |
| // FetchNext. This happens when valid_data_ has been set. So, only |
| // skip checking if valid_data_ is true. |
| // |
| // Also, if the latest queue index is invalid, we are empty. So there |
| // is nothing to fetch. |
| if ((data_ != nullptr && |
| queue_index == actual_queue_index_.DecrementBy(1u)) || |
| !queue_index.valid()) { |
| return false; |
| } |
| |
| ipc_lib::LocklessQueue::ReadResult read_result = |
| lockless_queue_.Read(queue_index.index(), &context_.monotonic_sent_time, |
| &context_.realtime_sent_time, &context_.size, |
| reinterpret_cast<char *>(data_storage_.get())); |
| if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) { |
| context_.queue_index = queue_index.index(); |
| data_ = reinterpret_cast<char *>(data_storage_.get()) + |
| lockless_queue_.message_data_size() - context_.size; |
| context_.data = data_; |
| actual_queue_index_ = queue_index.Increment(); |
| } |
| |
| // Make sure the data wasn't modified while we were reading it. This |
| // can only happen if you are reading the last message *while* it is |
| // being written to, which means you are pretty far behind. |
| CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE) |
| << ": Got behind while reading and the last message was modified " |
| "out " |
| "from under us while we were reading it. Don't get so far " |
| "behind."; |
| |
| CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW) |
| << ": Queue index went backwards. This should never happen."; |
| |
| // We fell behind between when we read the index and read the value. |
| // This isn't worth recovering from since this means we went to sleep |
| // for a long time in the middle of this function. |
| CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD) |
| << ": The next message is no longer available."; |
| return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD; |
| } |
| |
| bool RegisterWakeup(int priority) { |
| return lockless_queue_.RegisterWakeup(priority); |
| } |
| |
| void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); } |
| |
| private: |
| MMapedQueue lockless_queue_memory_; |
| ipc_lib::LocklessQueue lockless_queue_; |
| |
| ipc_lib::QueueIndex actual_queue_index_ = |
| ipc_lib::LocklessQueue::empty_queue_index(); |
| |
| struct AlignedChar { |
| alignas(32) char data; |
| }; |
| |
| std::unique_ptr<AlignedChar, decltype(&free)> data_storage_; |
| }; |
| |
| class ShmSender : public RawSender { |
| public: |
| explicit ShmSender(const Channel *channel, const ShmEventLoop *shm_event_loop) |
| : RawSender(), |
| shm_event_loop_(shm_event_loop), |
| name_(channel->name()->str()), |
| lockless_queue_memory_(channel), |
| lockless_queue_(lockless_queue_memory_.memory(), |
| lockless_queue_memory_.config()), |
| lockless_queue_sender_(lockless_queue_.MakeSender()) {} |
| |
| void *data() override { return lockless_queue_sender_.Data(); } |
| size_t size() override { return lockless_queue_sender_.size(); } |
| bool Send(size_t size) override { |
| lockless_queue_sender_.Send(size); |
| lockless_queue_.Wakeup(shm_event_loop_->priority()); |
| return true; |
| } |
| |
| bool Send(void *msg, size_t length) override { |
| lockless_queue_sender_.Send(reinterpret_cast<char *>(msg), length); |
| lockless_queue_.Wakeup(shm_event_loop_->priority()); |
| // TODO(austin): Return an error if we send too fast. |
| return true; |
| } |
| |
| const absl::string_view name() const override { return name_; } |
| |
| private: |
| const ShmEventLoop *shm_event_loop_; |
| std::string name_; |
| MMapedQueue lockless_queue_memory_; |
| ipc_lib::LocklessQueue lockless_queue_; |
| ipc_lib::LocklessQueue::Sender lockless_queue_sender_; |
| }; |
| |
| } // namespace |
| |
| namespace internal { |
| |
| // Class to manage the state for a Watcher. |
| class WatcherState { |
| public: |
| WatcherState( |
| const Channel *channel, |
| std::function<void(const Context &context, const void *message)> watcher) |
| : shm_fetcher_(channel), watcher_(watcher) {} |
| |
| ~WatcherState() {} |
| |
| // Points the next message to fetch at the queue index which will be populated |
| // next. |
| void PointAtNextQueueIndex() { shm_fetcher_.PointAtNextQueueIndex(); } |
| |
| // Returns true if there is new data available. |
| bool HasNewData() { |
| if (!has_new_data_) { |
| has_new_data_ = shm_fetcher_.FetchNext(); |
| } |
| |
| return has_new_data_; |
| } |
| |
| // Returns the time of the current data sample. |
| aos::monotonic_clock::time_point event_time() const { |
| return shm_fetcher_.context().monotonic_sent_time; |
| } |
| |
| // Consumes the data by calling the callback. |
| void CallCallback() { |
| CHECK(has_new_data_); |
| watcher_(shm_fetcher_.context(), shm_fetcher_.most_recent_data()); |
| has_new_data_ = false; |
| } |
| |
| // Starts the thread and waits until it is running. |
| bool RegisterWakeup(int priority) { |
| return shm_fetcher_.RegisterWakeup(priority); |
| } |
| |
| void UnregisterWakeup() { return shm_fetcher_.UnregisterWakeup(); } |
| |
| private: |
| bool has_new_data_ = false; |
| |
| ShmFetcher shm_fetcher_; |
| |
| std::function<void(const Context &context, const void *message)> watcher_; |
| }; |
| |
| // Adapter class to adapt a timerfd to a TimerHandler. |
| // The part of the API which is accessed by the TimerHandler interface needs to |
| // be threadsafe. This means Setup and Disable. |
| class TimerHandlerState : public TimerHandler { |
| public: |
| TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn) |
| : shm_event_loop_(shm_event_loop), fn_(::std::move(fn)) { |
| shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() { |
| timerfd_.Read(); |
| fn_(); |
| }); |
| } |
| |
| ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); } |
| |
| void Setup(monotonic_clock::time_point base, |
| monotonic_clock::duration repeat_offset) override { |
| // SetTime is threadsafe already. |
| timerfd_.SetTime(base, repeat_offset); |
| } |
| |
| void Disable() override { |
| // Disable is also threadsafe already. |
| timerfd_.Disable(); |
| } |
| |
| private: |
| ShmEventLoop *shm_event_loop_; |
| |
| TimerFd timerfd_; |
| |
| // Function to be run on the thread |
| ::std::function<void()> fn_; |
| }; |
| |
| // Adapter class to the timerfd and PhasedLoop. |
| // The part of the API which is accessed by the PhasedLoopHandler interface |
| // needs to be threadsafe. This means set_interval_and_offset |
| class PhasedLoopHandler : public ::aos::PhasedLoopHandler { |
| public: |
| PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn, |
| const monotonic_clock::duration interval, |
| const monotonic_clock::duration offset) |
| : shm_event_loop_(shm_event_loop), |
| phased_loop_(interval, shm_event_loop_->monotonic_now(), offset), |
| fn_(::std::move(fn)) { |
| shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() { |
| timerfd_.Read(); |
| // Call the function. To avoid needing a recursive mutex, drop the lock |
| // before running the function. |
| fn_(cycles_elapsed_); |
| Reschedule(); |
| }); |
| } |
| |
| ~PhasedLoopHandler() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); } |
| |
| void set_interval_and_offset( |
| const monotonic_clock::duration interval, |
| const monotonic_clock::duration offset) override { |
| phased_loop_.set_interval_and_offset(interval, offset); |
| } |
| |
| void Startup() { |
| phased_loop_.Reset(shm_event_loop_->monotonic_now()); |
| Reschedule(); |
| } |
| |
| private: |
| // Reschedules the timer. Must be called with the mutex held. |
| void Reschedule() { |
| cycles_elapsed_ = phased_loop_.Iterate(shm_event_loop_->monotonic_now()); |
| timerfd_.SetTime(phased_loop_.sleep_time(), ::aos::monotonic_clock::zero()); |
| } |
| |
| ShmEventLoop *shm_event_loop_; |
| |
| TimerFd timerfd_; |
| time::PhasedLoop phased_loop_; |
| |
| int cycles_elapsed_ = 1; |
| |
| // Function to be run |
| const ::std::function<void(int)> fn_; |
| }; |
| } // namespace internal |
| |
| ::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher( |
| const Channel *channel) { |
| return ::std::unique_ptr<RawFetcher>(new ShmFetcher(channel)); |
| } |
| |
| ::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender( |
| const Channel *channel) { |
| Take(channel); |
| return ::std::unique_ptr<RawSender>(new ShmSender(channel, this)); |
| } |
| |
| void ShmEventLoop::MakeRawWatcher( |
| const Channel *channel, |
| std::function<void(const Context &context, const void *message)> watcher) { |
| Take(channel); |
| |
| ::std::unique_ptr<internal::WatcherState> state( |
| new internal::WatcherState( |
| channel, std::move(watcher))); |
| watchers_.push_back(::std::move(state)); |
| } |
| |
| TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) { |
| ::std::unique_ptr<internal::TimerHandlerState> timer( |
| new internal::TimerHandlerState(this, ::std::move(callback))); |
| |
| timers_.push_back(::std::move(timer)); |
| |
| return timers_.back().get(); |
| } |
| |
| PhasedLoopHandler *ShmEventLoop::AddPhasedLoop( |
| ::std::function<void(int)> callback, |
| const monotonic_clock::duration interval, |
| const monotonic_clock::duration offset) { |
| ::std::unique_ptr<internal::PhasedLoopHandler> phased_loop( |
| new internal::PhasedLoopHandler(this, ::std::move(callback), interval, |
| offset)); |
| |
| phased_loops_.push_back(::std::move(phased_loop)); |
| |
| return phased_loops_.back().get(); |
| } |
| |
| void ShmEventLoop::OnRun(::std::function<void()> on_run) { |
| on_run_.push_back(::std::move(on_run)); |
| } |
| |
| void ShmEventLoop::Run() { |
| std::unique_ptr<ipc_lib::SignalFd> signalfd; |
| |
| if (watchers_.size() > 0) { |
| signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal})); |
| |
| epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() { |
| signalfd_siginfo result = signalfd_ptr->Read(); |
| CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal); |
| |
| // TODO(austin): We should really be checking *everything*, not just |
| // watchers, and calling the oldest thing first. That will improve |
| // determinism a lot. |
| |
| while (true) { |
| // Call the handlers in time order of their messages. |
| aos::monotonic_clock::time_point min_event_time = |
| aos::monotonic_clock::max_time; |
| size_t min_watcher_index = -1; |
| size_t watcher_index = 0; |
| for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) { |
| if (watcher->HasNewData()) { |
| if (watcher->event_time() < min_event_time) { |
| min_watcher_index = watcher_index; |
| min_event_time = watcher->event_time(); |
| } |
| } |
| ++watcher_index; |
| } |
| |
| if (min_event_time == aos::monotonic_clock::max_time) { |
| break; |
| } |
| |
| watchers_[min_watcher_index]->CallCallback(); |
| } |
| }); |
| } |
| |
| // Now, all the threads are up. Lock everything into memory and go RT. |
| if (priority_ != 0) { |
| ::aos::InitRT(); |
| |
| LOG(INFO) << "Setting priority to " << priority_; |
| ::aos::SetCurrentThreadRealtimePriority(priority_); |
| } |
| |
| set_is_running(true); |
| |
| // Now that we are realtime (but before the OnRun handlers run), snap the |
| // queue index. |
| for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) { |
| watcher->PointAtNextQueueIndex(); |
| CHECK(watcher->RegisterWakeup(priority_)); |
| } |
| |
| // Now that we are RT, run all the OnRun handlers. |
| for (const auto &run : on_run_) { |
| run(); |
| } |
| |
| // Start up all the phased loops. |
| for (::std::unique_ptr<internal::PhasedLoopHandler> &phased_loop : |
| phased_loops_) { |
| phased_loop->Startup(); |
| } |
| |
| // And start our main event loop which runs all the timers and handles Quit. |
| epoll_.Run(); |
| |
| // Once epoll exits, there is no useful nonrt work left to do. |
| set_is_running(false); |
| |
| // Nothing time or synchronization critical needs to happen after this point. |
| // Drop RT priority. |
| ::aos::UnsetCurrentThreadRealtimePriority(); |
| |
| for (::std::unique_ptr<internal::WatcherState> &watcher : watchers_) { |
| watcher->UnregisterWakeup(); |
| } |
| |
| if (watchers_.size() > 0) { |
| epoll_.DeleteFd(signalfd->fd()); |
| signalfd.reset(); |
| } |
| } |
| |
| void ShmEventLoop::Exit() { epoll_.Quit(); } |
| |
| ShmEventLoop::~ShmEventLoop() { |
| CHECK(!is_running()) << ": ShmEventLoop destroyed while running"; |
| } |
| |
| void ShmEventLoop::Take(const Channel *channel) { |
| CHECK(!is_running()) << ": Cannot add new objects while running."; |
| |
| // Cheat aggresively. Use the shared memory path as a proxy for a unique |
| // identifier for the channel. |
| const std::string path = ShmPath(channel); |
| |
| const auto prior = ::std::find(taken_.begin(), taken_.end(), path); |
| CHECK(prior == taken_.end()) << ": " << path << " is already being used."; |
| |
| taken_.emplace_back(path); |
| } |
| |
| void ShmEventLoop::SetRuntimeRealtimePriority(int priority) { |
| if (is_running()) { |
| LOG(FATAL) << "Cannot set realtime priority while running."; |
| } |
| priority_ = priority; |
| } |
| |
| } // namespace aos |