blob: a70d6f3d62e59679de4c5002b4bcdb5bea63ca63 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh81fc9cc2019-02-02 23:25:47 -08009#include "aos/logging/logging.h"
10#include "aos/queue.h"
11
Parker Schuhe4a70d62017-12-27 20:10:20 -080012namespace aos {
13
14ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
15
16namespace {
17class ShmFetcher : public RawFetcher {
18 public:
19 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
20 ~ShmFetcher() {
21 if (msg_) {
22 queue_->FreeMessage(msg_);
23 }
24 }
25
James Kuszmaulc79768b2019-02-18 15:08:44 -080026 bool FetchNext() override {
27 const FetchValue *msg = static_cast<const FetchValue *>(
28 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
29 // Only update the internal pointer if we got a new message.
30 if (msg != NULL) {
31 queue_->FreeMessage(msg_);
32 msg_ = msg;
33 set_most_recent(msg_);
34 }
35 return msg != NULL;
36 }
37
38 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080039 static constexpr Options<RawQueue> kOptions =
40 RawQueue::kFromEnd | RawQueue::kNonBlock;
41 const FetchValue *msg = static_cast<const FetchValue *>(
42 queue_->ReadMessageIndex(kOptions, &index_));
43 // Only update the internal pointer if we got a new message.
44 if (msg != NULL && msg != msg_) {
45 queue_->FreeMessage(msg_);
46 msg_ = msg;
47 set_most_recent(msg_);
48 return true;
49 }
50 // The message has to get freed if we didn't use it (and
51 // RawQueue::FreeMessage is ok to call on NULL).
52 queue_->FreeMessage(msg);
53 return false;
54 }
55
56 private:
57 int index_ = 0;
58 RawQueue *queue_;
59 const FetchValue *msg_ = nullptr;
60};
61
62class ShmSender : public RawSender {
63 public:
64 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
65
66 SendContext *GetContext() override {
67 return reinterpret_cast<SendContext *>(queue_->GetMessage());
68 }
69
70 void Free(SendContext *context) override { queue_->FreeMessage(context); }
71
72 bool Send(SendContext *msg) override {
73 assert(queue_ != NULL);
74 return queue_->WriteMessage(msg, RawQueue::kOverride);
75 }
76
Austin Schuhd681bbd2019-02-02 12:03:32 -080077 const char *name() const override { return queue_->name(); }
78
Parker Schuhe4a70d62017-12-27 20:10:20 -080079 private:
80 RawQueue *queue_;
81};
82} // namespace
83
84namespace internal {
85class WatcherThreadState {
86 public:
87 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
88 RawQueue *queue,
89 std::function<void(const aos::Message *message)> watcher)
90 : thread_state_(std::move(thread_state)),
91 queue_(queue),
92 watcher_(std::move(watcher)) {}
93
94 void Run() {
95 thread_state_->WaitForStart();
96
97 if (!thread_state_->is_running()) return;
98
99 int32_t index = 0;
100
101 static constexpr Options<RawQueue> kOptions =
102 RawQueue::kFromEnd | RawQueue::kNonBlock;
103 const void *msg = queue_->ReadMessageIndex(kOptions, &index);
104
105 while (true) {
106 if (msg == nullptr) {
107 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
108 assert(msg != nullptr);
109 }
110
111 {
Neil Balch229001a2018-01-07 18:22:52 -0800112 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800113 if (!thread_state_->is_running()) break;
114
115 watcher_(reinterpret_cast<const Message *>(msg));
116 // watcher_ may have exited the event loop.
117 if (!thread_state_->is_running()) break;
118 }
119 queue_->FreeMessage(msg);
120 msg = nullptr;
121 }
122
123 queue_->FreeMessage(msg);
124 }
125
126 private:
127 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
128 RawQueue *queue_;
129 std::function<void(const Message *message)> watcher_;
130};
Neil Balch229001a2018-01-07 18:22:52 -0800131
132class TimerHandlerState : public TimerHandler {
133 public:
134 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
135 ::std::function<void()> fn)
136 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
137 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
138 PCHECK(fd_ != -1);
139 }
140
141 ~TimerHandlerState() {
142 PCHECK(close(fd_) == 0);
143 }
144
145 void Setup(monotonic_clock::time_point base,
146 monotonic_clock::duration repeat_offset) override {
147 struct itimerspec new_value;
148 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
149 new_value.it_value = ::aos::time::to_timespec(base);
150 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
151 }
152
153 void Disable() override {
154 // Disarm the timer by feeding zero values
155 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
156 }
157
158 void Run() {
159 thread_state_->WaitForStart();
160
161 while (true) {
162 uint64_t buf;
163 ssize_t result = read(fd_, &buf, sizeof(buf));
164 PCHECK(result != -1);
165 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
166
167 {
168 MutexLocker locker(&thread_state_->mutex_);
169 if (!thread_state_->is_running()) break;
170 fn_();
171 // fn_ may have exited the event loop.
172 if (!thread_state_->is_running()) break;
173 }
174 }
175 }
176
177 private:
178 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
179
180 // File descriptor for the timer
181 int fd_;
182
183 // Function to be run on the thread
184 ::std::function<void()> fn_;
185};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800186} // namespace internal
187
188std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
189 const std::string &path, const QueueTypeInfo &type) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800190 return std::unique_ptr<RawFetcher>(new ShmFetcher(
191 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
192}
193
194std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
195 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800196 Take(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800197 return std::unique_ptr<RawSender>(new ShmSender(
198 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
199}
200
201void ShmEventLoop::MakeRawWatcher(
202 const std::string &path, const QueueTypeInfo &type,
203 std::function<void(const Message *message)> watcher) {
204 Take(path);
205 auto *state = new internal::WatcherThreadState(
206 thread_state_,
207 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
208 std::move(watcher));
209
210 std::thread thread([state] {
211 state->Run();
212 delete state;
213 });
214 thread.detach();
215}
216
Neil Balch229001a2018-01-07 18:22:52 -0800217TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
218 internal::TimerHandlerState *timer =
219 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
220
221 ::std::thread t([timer] {
222 timer->Run();
223 delete timer;
224 });
225 t.detach();
226
227 return timer;
228}
229
Parker Schuhe4a70d62017-12-27 20:10:20 -0800230void ShmEventLoop::OnRun(std::function<void()> on_run) {
231 on_run_.push_back(std::move(on_run));
232}
233
234void ShmEventLoop::Run() {
235 set_is_running(true);
236 for (const auto &run : on_run_) run();
Austin Schuha1654ed2019-01-27 17:24:54 -0800237 // TODO(austin): epoll event loop in main thread (if needed), and async safe
238 // quit handler.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800239 thread_state_->Run();
240}
241
242void ShmEventLoop::ThreadState::Run() {
243 MutexLocker locker(&mutex_);
244 loop_running_ = true;
245 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
246 loop_running_cond_.Broadcast();
247 while (loop_running_) {
248 if (loop_running_cond_.Wait()) {
249 ::aos::Die("ShmEventLoop mutex lock problem.\n");
250 }
251 }
252}
253
254void ShmEventLoop::ThreadState::WaitForStart() {
255 MutexLocker locker(&mutex_);
256 while (!(loop_running_ || loop_finished_)) {
257 if (loop_running_cond_.Wait()) {
258 ::aos::Die("ShmEventLoop mutex lock problem.\n");
259 }
260 }
261}
262
263void ShmEventLoop::Exit() {
264 set_is_running(false);
265 thread_state_->Exit();
266}
267
268void ShmEventLoop::ThreadState::Exit() {
269 IPCRecursiveMutexLocker locker(&mutex_);
270 if (locker.owner_died()) ::aos::Die("Owner died");
271 loop_running_ = false;
272 loop_finished_ = true;
273 loop_running_cond_.Broadcast();
274}
275
276ShmEventLoop::~ShmEventLoop() {
277 if (is_running()) {
278 ::aos::Die("ShmEventLoop destroyed while running\n");
279 }
280}
281
282void ShmEventLoop::Take(const std::string &path) {
283 if (is_running()) {
284 ::aos::Die("Cannot add new objects while running.\n");
285 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800286
287 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
288 if (prior != taken_.end()) {
289 ::aos::Die("%s is already being used.", path.c_str());
290 } else {
291 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800292 }
293}
294
295} // namespace aos