blob: 5db831924e0d6031de5a29b69f2b937ef9095af8 [file] [log] [blame]
#ifndef AOS_EVENTS_SHM_EVENT_LOOP_H_
#define AOS_EVENTS_SHM_EVENT_LOOP_H_
#include <unordered_set>
#include <vector>
#include "aos/condition.h"
#include "aos/events/epoll.h"
#include "aos/events/event-loop.h"
#include "aos/mutex/mutex.h"
namespace aos {
namespace internal {
class WatcherThreadState;
class TimerHandlerState;
class PhasedLoopHandler;
} // namespace internal
// Specialization of EventLoop that is built from queues running out of shared
// memory. See more details at aos/queue.h
//
// This object must be interacted with from one thread, but the Senders and
// Fetchers may be used from multiple threads afterwords (as long as their
// destructors are called back in one thread again)
class ShmEventLoop : public EventLoop {
public:
ShmEventLoop();
~ShmEventLoop() override;
::aos::monotonic_clock::time_point monotonic_now() override {
return ::aos::monotonic_clock::now();
}
::std::unique_ptr<RawSender> MakeRawSender(
const ::std::string &path, const QueueTypeInfo &type) override;
::std::unique_ptr<RawFetcher> MakeRawFetcher(
const ::std::string &path, const QueueTypeInfo &type) override;
void MakeRawWatcher(
const ::std::string &path, const QueueTypeInfo &type,
::std::function<void(const aos::Message *message)> watcher) override;
TimerHandler *AddTimer(::std::function<void()> callback) override;
::aos::PhasedLoopHandler *AddPhasedLoop(
::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset =
::std::chrono::seconds(0)) override;
void OnRun(::std::function<void()> on_run) override;
void Run();
void Exit();
// TODO(austin): Add a function to register control-C call.
void SetRuntimeRealtimePriority(int priority) override {
if (is_running()) {
::aos::Die("Cannot set realtime priority while running.");
}
thread_state_.priority_ = priority;
}
void set_name(const char *name) override;
private:
friend class internal::WatcherThreadState;
friend class internal::TimerHandlerState;
friend class internal::PhasedLoopHandler;
// This ThreadState ensures that two watchers in the same loop cannot be
// triggered concurrently. Because watchers block threads indefinitely, this
// has to be shared_ptr in case the EventLoop is destroyed before the thread
// receives any new events.
class ThreadState {
public:
void WaitForStart();
bool is_running() { return loop_running_; }
void Start();
void Exit();
void MaybeSetCurrentThreadRealtimePriority();
const ::std::string &name() const { return name_; }
private:
friend class internal::WatcherThreadState;
friend class internal::TimerHandlerState;
friend class internal::PhasedLoopHandler;
friend class ShmEventLoop;
// This mutex ensures that only one watch event happens at a time.
::aos::Mutex mutex_;
// Block on this until the loop starts.
::aos::Condition loop_running_cond_{&mutex_};
// Used to notify watchers that the loop is done.
::std::atomic<bool> loop_running_{false};
bool loop_finished_ = false;
int priority_ = -1;
// Immutable after Start is called.
::std::string name_;
};
// Tracks that we can't have multiple watchers or a sender and a watcher (or
// multiple senders) on a single queue (path).
void Take(const ::std::string &path);
::std::vector<::std::function<void()>> on_run_;
ThreadState thread_state_;
::std::vector<::std::string> taken_;
internal::EPoll epoll_;
::std::vector<::std::unique_ptr<internal::TimerHandlerState>> timers_;
::std::vector<::std::unique_ptr<internal::PhasedLoopHandler>> phased_loops_;
::std::vector<::std::unique_ptr<internal::WatcherThreadState>> watchers_;
};
} // namespace aos
#endif // AOS_EVENTS_SHM_EVENT_LOOP_H_