blob: 781c963f819d39aa672a08b34cac1f3ffce2c319 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh81fc9cc2019-02-02 23:25:47 -08009#include "aos/logging/logging.h"
10#include "aos/queue.h"
11
Parker Schuhe4a70d62017-12-27 20:10:20 -080012namespace aos {
13
14ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
15
16namespace {
17class ShmFetcher : public RawFetcher {
18 public:
19 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
20 ~ShmFetcher() {
21 if (msg_) {
22 queue_->FreeMessage(msg_);
23 }
24 }
25
26 bool Fetch() {
27 static constexpr Options<RawQueue> kOptions =
28 RawQueue::kFromEnd | RawQueue::kNonBlock;
29 const FetchValue *msg = static_cast<const FetchValue *>(
30 queue_->ReadMessageIndex(kOptions, &index_));
31 // Only update the internal pointer if we got a new message.
32 if (msg != NULL && msg != msg_) {
33 queue_->FreeMessage(msg_);
34 msg_ = msg;
35 set_most_recent(msg_);
36 return true;
37 }
38 // The message has to get freed if we didn't use it (and
39 // RawQueue::FreeMessage is ok to call on NULL).
40 queue_->FreeMessage(msg);
41 return false;
42 }
43
44 private:
45 int index_ = 0;
46 RawQueue *queue_;
47 const FetchValue *msg_ = nullptr;
48};
49
50class ShmSender : public RawSender {
51 public:
52 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
53
54 SendContext *GetContext() override {
55 return reinterpret_cast<SendContext *>(queue_->GetMessage());
56 }
57
58 void Free(SendContext *context) override { queue_->FreeMessage(context); }
59
60 bool Send(SendContext *msg) override {
61 assert(queue_ != NULL);
62 return queue_->WriteMessage(msg, RawQueue::kOverride);
63 }
64
Austin Schuhd681bbd2019-02-02 12:03:32 -080065 const char *name() const override { return queue_->name(); }
66
Parker Schuhe4a70d62017-12-27 20:10:20 -080067 private:
68 RawQueue *queue_;
69};
70} // namespace
71
72namespace internal {
73class WatcherThreadState {
74 public:
75 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
76 RawQueue *queue,
77 std::function<void(const aos::Message *message)> watcher)
78 : thread_state_(std::move(thread_state)),
79 queue_(queue),
80 watcher_(std::move(watcher)) {}
81
82 void Run() {
83 thread_state_->WaitForStart();
84
85 if (!thread_state_->is_running()) return;
86
87 int32_t index = 0;
88
89 static constexpr Options<RawQueue> kOptions =
90 RawQueue::kFromEnd | RawQueue::kNonBlock;
91 const void *msg = queue_->ReadMessageIndex(kOptions, &index);
92
93 while (true) {
94 if (msg == nullptr) {
95 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
96 assert(msg != nullptr);
97 }
98
99 {
Neil Balch229001a2018-01-07 18:22:52 -0800100 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800101 if (!thread_state_->is_running()) break;
102
103 watcher_(reinterpret_cast<const Message *>(msg));
104 // watcher_ may have exited the event loop.
105 if (!thread_state_->is_running()) break;
106 }
107 queue_->FreeMessage(msg);
108 msg = nullptr;
109 }
110
111 queue_->FreeMessage(msg);
112 }
113
114 private:
115 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
116 RawQueue *queue_;
117 std::function<void(const Message *message)> watcher_;
118};
Neil Balch229001a2018-01-07 18:22:52 -0800119
120class TimerHandlerState : public TimerHandler {
121 public:
122 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
123 ::std::function<void()> fn)
124 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
125 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
126 PCHECK(fd_ != -1);
127 }
128
129 ~TimerHandlerState() {
130 PCHECK(close(fd_) == 0);
131 }
132
133 void Setup(monotonic_clock::time_point base,
134 monotonic_clock::duration repeat_offset) override {
135 struct itimerspec new_value;
136 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
137 new_value.it_value = ::aos::time::to_timespec(base);
138 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
139 }
140
141 void Disable() override {
142 // Disarm the timer by feeding zero values
143 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
144 }
145
146 void Run() {
147 thread_state_->WaitForStart();
148
149 while (true) {
150 uint64_t buf;
151 ssize_t result = read(fd_, &buf, sizeof(buf));
152 PCHECK(result != -1);
153 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
154
155 {
156 MutexLocker locker(&thread_state_->mutex_);
157 if (!thread_state_->is_running()) break;
158 fn_();
159 // fn_ may have exited the event loop.
160 if (!thread_state_->is_running()) break;
161 }
162 }
163 }
164
165 private:
166 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
167
168 // File descriptor for the timer
169 int fd_;
170
171 // Function to be run on the thread
172 ::std::function<void()> fn_;
173};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800174} // namespace internal
175
176std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
177 const std::string &path, const QueueTypeInfo &type) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800178 return std::unique_ptr<RawFetcher>(new ShmFetcher(
179 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
180}
181
182std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
183 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800184 Take(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800185 return std::unique_ptr<RawSender>(new ShmSender(
186 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
187}
188
189void ShmEventLoop::MakeRawWatcher(
190 const std::string &path, const QueueTypeInfo &type,
191 std::function<void(const Message *message)> watcher) {
192 Take(path);
193 auto *state = new internal::WatcherThreadState(
194 thread_state_,
195 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
196 std::move(watcher));
197
198 std::thread thread([state] {
199 state->Run();
200 delete state;
201 });
202 thread.detach();
203}
204
Neil Balch229001a2018-01-07 18:22:52 -0800205TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
206 internal::TimerHandlerState *timer =
207 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
208
209 ::std::thread t([timer] {
210 timer->Run();
211 delete timer;
212 });
213 t.detach();
214
215 return timer;
216}
217
Parker Schuhe4a70d62017-12-27 20:10:20 -0800218void ShmEventLoop::OnRun(std::function<void()> on_run) {
219 on_run_.push_back(std::move(on_run));
220}
221
222void ShmEventLoop::Run() {
223 set_is_running(true);
224 for (const auto &run : on_run_) run();
Austin Schuha1654ed2019-01-27 17:24:54 -0800225 // TODO(austin): epoll event loop in main thread (if needed), and async safe
226 // quit handler.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800227 thread_state_->Run();
228}
229
230void ShmEventLoop::ThreadState::Run() {
231 MutexLocker locker(&mutex_);
232 loop_running_ = true;
233 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
234 loop_running_cond_.Broadcast();
235 while (loop_running_) {
236 if (loop_running_cond_.Wait()) {
237 ::aos::Die("ShmEventLoop mutex lock problem.\n");
238 }
239 }
240}
241
242void ShmEventLoop::ThreadState::WaitForStart() {
243 MutexLocker locker(&mutex_);
244 while (!(loop_running_ || loop_finished_)) {
245 if (loop_running_cond_.Wait()) {
246 ::aos::Die("ShmEventLoop mutex lock problem.\n");
247 }
248 }
249}
250
251void ShmEventLoop::Exit() {
252 set_is_running(false);
253 thread_state_->Exit();
254}
255
256void ShmEventLoop::ThreadState::Exit() {
257 IPCRecursiveMutexLocker locker(&mutex_);
258 if (locker.owner_died()) ::aos::Die("Owner died");
259 loop_running_ = false;
260 loop_finished_ = true;
261 loop_running_cond_.Broadcast();
262}
263
264ShmEventLoop::~ShmEventLoop() {
265 if (is_running()) {
266 ::aos::Die("ShmEventLoop destroyed while running\n");
267 }
268}
269
270void ShmEventLoop::Take(const std::string &path) {
271 if (is_running()) {
272 ::aos::Die("Cannot add new objects while running.\n");
273 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800274
275 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
276 if (prior != taken_.end()) {
277 ::aos::Die("%s is already being used.", path.c_str());
278 } else {
279 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800280 }
281}
282
283} // namespace aos