blob: 832b27b2c3f7df40d9f42465c4f91e3c9a435523 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Neil Balch229001a2018-01-07 18:22:52 -08002#include "aos/common/logging/logging.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08003#include "aos/common/queue.h"
4
Neil Balch229001a2018-01-07 18:22:52 -08005#include <sys/timerfd.h>
Parker Schuhe4a70d62017-12-27 20:10:20 -08006#include <atomic>
7#include <chrono>
8#include <stdexcept>
9
10namespace aos {
11
12ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
13
14namespace {
15class ShmFetcher : public RawFetcher {
16 public:
17 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
18 ~ShmFetcher() {
19 if (msg_) {
20 queue_->FreeMessage(msg_);
21 }
22 }
23
24 bool Fetch() {
25 static constexpr Options<RawQueue> kOptions =
26 RawQueue::kFromEnd | RawQueue::kNonBlock;
27 const FetchValue *msg = static_cast<const FetchValue *>(
28 queue_->ReadMessageIndex(kOptions, &index_));
29 // Only update the internal pointer if we got a new message.
30 if (msg != NULL && msg != msg_) {
31 queue_->FreeMessage(msg_);
32 msg_ = msg;
33 set_most_recent(msg_);
34 return true;
35 }
36 // The message has to get freed if we didn't use it (and
37 // RawQueue::FreeMessage is ok to call on NULL).
38 queue_->FreeMessage(msg);
39 return false;
40 }
41
42 private:
43 int index_ = 0;
44 RawQueue *queue_;
45 const FetchValue *msg_ = nullptr;
46};
47
48class ShmSender : public RawSender {
49 public:
50 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
51
52 SendContext *GetContext() override {
53 return reinterpret_cast<SendContext *>(queue_->GetMessage());
54 }
55
56 void Free(SendContext *context) override { queue_->FreeMessage(context); }
57
58 bool Send(SendContext *msg) override {
59 assert(queue_ != NULL);
60 return queue_->WriteMessage(msg, RawQueue::kOverride);
61 }
62
63 private:
64 RawQueue *queue_;
65};
66} // namespace
67
68namespace internal {
69class WatcherThreadState {
70 public:
71 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
72 RawQueue *queue,
73 std::function<void(const aos::Message *message)> watcher)
74 : thread_state_(std::move(thread_state)),
75 queue_(queue),
76 watcher_(std::move(watcher)) {}
77
78 void Run() {
79 thread_state_->WaitForStart();
80
81 if (!thread_state_->is_running()) return;
82
83 int32_t index = 0;
84
85 static constexpr Options<RawQueue> kOptions =
86 RawQueue::kFromEnd | RawQueue::kNonBlock;
87 const void *msg = queue_->ReadMessageIndex(kOptions, &index);
88
89 while (true) {
90 if (msg == nullptr) {
91 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
92 assert(msg != nullptr);
93 }
94
95 {
Neil Balch229001a2018-01-07 18:22:52 -080096 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -080097 if (!thread_state_->is_running()) break;
98
99 watcher_(reinterpret_cast<const Message *>(msg));
100 // watcher_ may have exited the event loop.
101 if (!thread_state_->is_running()) break;
102 }
103 queue_->FreeMessage(msg);
104 msg = nullptr;
105 }
106
107 queue_->FreeMessage(msg);
108 }
109
110 private:
111 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
112 RawQueue *queue_;
113 std::function<void(const Message *message)> watcher_;
114};
Neil Balch229001a2018-01-07 18:22:52 -0800115
116class TimerHandlerState : public TimerHandler {
117 public:
118 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
119 ::std::function<void()> fn)
120 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
121 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
122 PCHECK(fd_ != -1);
123 }
124
125 ~TimerHandlerState() {
126 PCHECK(close(fd_) == 0);
127 }
128
129 void Setup(monotonic_clock::time_point base,
130 monotonic_clock::duration repeat_offset) override {
131 struct itimerspec new_value;
132 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
133 new_value.it_value = ::aos::time::to_timespec(base);
134 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
135 }
136
137 void Disable() override {
138 // Disarm the timer by feeding zero values
139 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
140 }
141
142 void Run() {
143 thread_state_->WaitForStart();
144
145 while (true) {
146 uint64_t buf;
147 ssize_t result = read(fd_, &buf, sizeof(buf));
148 PCHECK(result != -1);
149 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
150
151 {
152 MutexLocker locker(&thread_state_->mutex_);
153 if (!thread_state_->is_running()) break;
154 fn_();
155 // fn_ may have exited the event loop.
156 if (!thread_state_->is_running()) break;
157 }
158 }
159 }
160
161 private:
162 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
163
164 // File descriptor for the timer
165 int fd_;
166
167 // Function to be run on the thread
168 ::std::function<void()> fn_;
169};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800170} // namespace internal
171
172std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
173 const std::string &path, const QueueTypeInfo &type) {
174 Take(path);
175 return std::unique_ptr<RawFetcher>(new ShmFetcher(
176 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
177}
178
179std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
180 const std::string &path, const QueueTypeInfo &type) {
181 return std::unique_ptr<RawSender>(new ShmSender(
182 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
183}
184
185void ShmEventLoop::MakeRawWatcher(
186 const std::string &path, const QueueTypeInfo &type,
187 std::function<void(const Message *message)> watcher) {
188 Take(path);
189 auto *state = new internal::WatcherThreadState(
190 thread_state_,
191 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
192 std::move(watcher));
193
194 std::thread thread([state] {
195 state->Run();
196 delete state;
197 });
198 thread.detach();
199}
200
Neil Balch229001a2018-01-07 18:22:52 -0800201TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
202 internal::TimerHandlerState *timer =
203 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
204
205 ::std::thread t([timer] {
206 timer->Run();
207 delete timer;
208 });
209 t.detach();
210
211 return timer;
212}
213
Parker Schuhe4a70d62017-12-27 20:10:20 -0800214void ShmEventLoop::OnRun(std::function<void()> on_run) {
215 on_run_.push_back(std::move(on_run));
216}
217
218void ShmEventLoop::Run() {
219 set_is_running(true);
220 for (const auto &run : on_run_) run();
221 thread_state_->Run();
222}
223
224void ShmEventLoop::ThreadState::Run() {
225 MutexLocker locker(&mutex_);
226 loop_running_ = true;
227 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
228 loop_running_cond_.Broadcast();
229 while (loop_running_) {
230 if (loop_running_cond_.Wait()) {
231 ::aos::Die("ShmEventLoop mutex lock problem.\n");
232 }
233 }
234}
235
236void ShmEventLoop::ThreadState::WaitForStart() {
237 MutexLocker locker(&mutex_);
238 while (!(loop_running_ || loop_finished_)) {
239 if (loop_running_cond_.Wait()) {
240 ::aos::Die("ShmEventLoop mutex lock problem.\n");
241 }
242 }
243}
244
245void ShmEventLoop::Exit() {
246 set_is_running(false);
247 thread_state_->Exit();
248}
249
250void ShmEventLoop::ThreadState::Exit() {
251 IPCRecursiveMutexLocker locker(&mutex_);
252 if (locker.owner_died()) ::aos::Die("Owner died");
253 loop_running_ = false;
254 loop_finished_ = true;
255 loop_running_cond_.Broadcast();
256}
257
258ShmEventLoop::~ShmEventLoop() {
259 if (is_running()) {
260 ::aos::Die("ShmEventLoop destroyed while running\n");
261 }
262}
263
264void ShmEventLoop::Take(const std::string &path) {
265 if (is_running()) {
266 ::aos::Die("Cannot add new objects while running.\n");
267 }
268 if (!taken_.emplace(path).second) {
269 ::aos::Die("%s already has a listener / watcher.", path.c_str());
270 }
271}
272
273} // namespace aos