blob: 945ebc04a0d45a3f6a722f520d4aaa533e18b512 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
John Park33858a32018-09-28 23:05:48 -07002#include "aos/logging/logging.h"
3#include "aos/queue.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08004
Neil Balch229001a2018-01-07 18:22:52 -08005#include <sys/timerfd.h>
Parker Schuhe4a70d62017-12-27 20:10:20 -08006#include <atomic>
7#include <chrono>
8#include <stdexcept>
9
10namespace aos {
11
12ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
13
14namespace {
15class ShmFetcher : public RawFetcher {
16 public:
17 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
18 ~ShmFetcher() {
19 if (msg_) {
20 queue_->FreeMessage(msg_);
21 }
22 }
23
24 bool Fetch() {
25 static constexpr Options<RawQueue> kOptions =
26 RawQueue::kFromEnd | RawQueue::kNonBlock;
27 const FetchValue *msg = static_cast<const FetchValue *>(
28 queue_->ReadMessageIndex(kOptions, &index_));
29 // Only update the internal pointer if we got a new message.
30 if (msg != NULL && msg != msg_) {
31 queue_->FreeMessage(msg_);
32 msg_ = msg;
33 set_most_recent(msg_);
34 return true;
35 }
36 // The message has to get freed if we didn't use it (and
37 // RawQueue::FreeMessage is ok to call on NULL).
38 queue_->FreeMessage(msg);
39 return false;
40 }
41
42 private:
43 int index_ = 0;
44 RawQueue *queue_;
45 const FetchValue *msg_ = nullptr;
46};
47
48class ShmSender : public RawSender {
49 public:
50 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
51
52 SendContext *GetContext() override {
53 return reinterpret_cast<SendContext *>(queue_->GetMessage());
54 }
55
56 void Free(SendContext *context) override { queue_->FreeMessage(context); }
57
58 bool Send(SendContext *msg) override {
59 assert(queue_ != NULL);
60 return queue_->WriteMessage(msg, RawQueue::kOverride);
61 }
62
Austin Schuhd681bbd2019-02-02 12:03:32 -080063 const char *name() const override { return queue_->name(); }
64
Parker Schuhe4a70d62017-12-27 20:10:20 -080065 private:
66 RawQueue *queue_;
67};
68} // namespace
69
70namespace internal {
71class WatcherThreadState {
72 public:
73 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
74 RawQueue *queue,
75 std::function<void(const aos::Message *message)> watcher)
76 : thread_state_(std::move(thread_state)),
77 queue_(queue),
78 watcher_(std::move(watcher)) {}
79
80 void Run() {
81 thread_state_->WaitForStart();
82
83 if (!thread_state_->is_running()) return;
84
85 int32_t index = 0;
86
87 static constexpr Options<RawQueue> kOptions =
88 RawQueue::kFromEnd | RawQueue::kNonBlock;
89 const void *msg = queue_->ReadMessageIndex(kOptions, &index);
90
91 while (true) {
92 if (msg == nullptr) {
93 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
94 assert(msg != nullptr);
95 }
96
97 {
Neil Balch229001a2018-01-07 18:22:52 -080098 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -080099 if (!thread_state_->is_running()) break;
100
101 watcher_(reinterpret_cast<const Message *>(msg));
102 // watcher_ may have exited the event loop.
103 if (!thread_state_->is_running()) break;
104 }
105 queue_->FreeMessage(msg);
106 msg = nullptr;
107 }
108
109 queue_->FreeMessage(msg);
110 }
111
112 private:
113 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
114 RawQueue *queue_;
115 std::function<void(const Message *message)> watcher_;
116};
Neil Balch229001a2018-01-07 18:22:52 -0800117
118class TimerHandlerState : public TimerHandler {
119 public:
120 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
121 ::std::function<void()> fn)
122 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
123 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
124 PCHECK(fd_ != -1);
125 }
126
127 ~TimerHandlerState() {
128 PCHECK(close(fd_) == 0);
129 }
130
131 void Setup(monotonic_clock::time_point base,
132 monotonic_clock::duration repeat_offset) override {
133 struct itimerspec new_value;
134 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
135 new_value.it_value = ::aos::time::to_timespec(base);
136 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
137 }
138
139 void Disable() override {
140 // Disarm the timer by feeding zero values
141 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
142 }
143
144 void Run() {
145 thread_state_->WaitForStart();
146
147 while (true) {
148 uint64_t buf;
149 ssize_t result = read(fd_, &buf, sizeof(buf));
150 PCHECK(result != -1);
151 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
152
153 {
154 MutexLocker locker(&thread_state_->mutex_);
155 if (!thread_state_->is_running()) break;
156 fn_();
157 // fn_ may have exited the event loop.
158 if (!thread_state_->is_running()) break;
159 }
160 }
161 }
162
163 private:
164 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
165
166 // File descriptor for the timer
167 int fd_;
168
169 // Function to be run on the thread
170 ::std::function<void()> fn_;
171};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800172} // namespace internal
173
174std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
175 const std::string &path, const QueueTypeInfo &type) {
176 Take(path);
177 return std::unique_ptr<RawFetcher>(new ShmFetcher(
178 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
179}
180
181std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
182 const std::string &path, const QueueTypeInfo &type) {
183 return std::unique_ptr<RawSender>(new ShmSender(
184 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
185}
186
187void ShmEventLoop::MakeRawWatcher(
188 const std::string &path, const QueueTypeInfo &type,
189 std::function<void(const Message *message)> watcher) {
190 Take(path);
191 auto *state = new internal::WatcherThreadState(
192 thread_state_,
193 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
194 std::move(watcher));
195
196 std::thread thread([state] {
197 state->Run();
198 delete state;
199 });
200 thread.detach();
201}
202
Neil Balch229001a2018-01-07 18:22:52 -0800203TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
204 internal::TimerHandlerState *timer =
205 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
206
207 ::std::thread t([timer] {
208 timer->Run();
209 delete timer;
210 });
211 t.detach();
212
213 return timer;
214}
215
Parker Schuhe4a70d62017-12-27 20:10:20 -0800216void ShmEventLoop::OnRun(std::function<void()> on_run) {
217 on_run_.push_back(std::move(on_run));
218}
219
220void ShmEventLoop::Run() {
221 set_is_running(true);
222 for (const auto &run : on_run_) run();
Austin Schuha1654ed2019-01-27 17:24:54 -0800223 // TODO(austin): epoll event loop in main thread (if needed), and async safe
224 // quit handler.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800225 thread_state_->Run();
226}
227
228void ShmEventLoop::ThreadState::Run() {
229 MutexLocker locker(&mutex_);
230 loop_running_ = true;
231 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
232 loop_running_cond_.Broadcast();
233 while (loop_running_) {
234 if (loop_running_cond_.Wait()) {
235 ::aos::Die("ShmEventLoop mutex lock problem.\n");
236 }
237 }
238}
239
240void ShmEventLoop::ThreadState::WaitForStart() {
241 MutexLocker locker(&mutex_);
242 while (!(loop_running_ || loop_finished_)) {
243 if (loop_running_cond_.Wait()) {
244 ::aos::Die("ShmEventLoop mutex lock problem.\n");
245 }
246 }
247}
248
249void ShmEventLoop::Exit() {
250 set_is_running(false);
251 thread_state_->Exit();
252}
253
254void ShmEventLoop::ThreadState::Exit() {
255 IPCRecursiveMutexLocker locker(&mutex_);
256 if (locker.owner_died()) ::aos::Die("Owner died");
257 loop_running_ = false;
258 loop_finished_ = true;
259 loop_running_cond_.Broadcast();
260}
261
262ShmEventLoop::~ShmEventLoop() {
263 if (is_running()) {
264 ::aos::Die("ShmEventLoop destroyed while running\n");
265 }
266}
267
268void ShmEventLoop::Take(const std::string &path) {
269 if (is_running()) {
270 ::aos::Die("Cannot add new objects while running.\n");
271 }
272 if (!taken_.emplace(path).second) {
273 ::aos::Die("%s already has a listener / watcher.", path.c_str());
274 }
275}
276
277} // namespace aos