blob: c4c2c8864708b4abe9bb985124b634c35cb93231 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh81fc9cc2019-02-02 23:25:47 -08009#include "aos/logging/logging.h"
10#include "aos/queue.h"
11
Parker Schuhe4a70d62017-12-27 20:10:20 -080012namespace aos {
13
14ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
15
16namespace {
17class ShmFetcher : public RawFetcher {
18 public:
19 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
20 ~ShmFetcher() {
21 if (msg_) {
22 queue_->FreeMessage(msg_);
23 }
24 }
25
James Kuszmaulc79768b2019-02-18 15:08:44 -080026 bool FetchNext() override {
27 const FetchValue *msg = static_cast<const FetchValue *>(
28 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
29 // Only update the internal pointer if we got a new message.
30 if (msg != NULL) {
31 queue_->FreeMessage(msg_);
32 msg_ = msg;
33 set_most_recent(msg_);
34 }
35 return msg != NULL;
36 }
37
38 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080039 static constexpr Options<RawQueue> kOptions =
40 RawQueue::kFromEnd | RawQueue::kNonBlock;
41 const FetchValue *msg = static_cast<const FetchValue *>(
42 queue_->ReadMessageIndex(kOptions, &index_));
43 // Only update the internal pointer if we got a new message.
44 if (msg != NULL && msg != msg_) {
45 queue_->FreeMessage(msg_);
46 msg_ = msg;
47 set_most_recent(msg_);
48 return true;
49 }
50 // The message has to get freed if we didn't use it (and
51 // RawQueue::FreeMessage is ok to call on NULL).
52 queue_->FreeMessage(msg);
53 return false;
54 }
55
56 private:
57 int index_ = 0;
58 RawQueue *queue_;
59 const FetchValue *msg_ = nullptr;
60};
61
62class ShmSender : public RawSender {
63 public:
64 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
65
James Kuszmaulcd1db352019-05-26 16:42:29 -070066 aos::Message *GetMessage() override {
67 return reinterpret_cast<aos::Message *>(queue_->GetMessage());
Parker Schuhe4a70d62017-12-27 20:10:20 -080068 }
69
James Kuszmaulcd1db352019-05-26 16:42:29 -070070 void Free(aos::Message *msg) override { queue_->FreeMessage(msg); }
Parker Schuhe4a70d62017-12-27 20:10:20 -080071
James Kuszmaulcd1db352019-05-26 16:42:29 -070072 bool Send(aos::Message *msg) override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080073 assert(queue_ != NULL);
Austin Schuh7267c532019-05-19 19:55:53 -070074 {
Austin Schuh7267c532019-05-19 19:55:53 -070075 // TODO(austin): This lets multiple senders reorder messages since time
76 // isn't acquired with a lock held.
James Kuszmaulcd1db352019-05-26 16:42:29 -070077 if (msg->sent_time == monotonic_clock::min_time) {
78 msg->sent_time = monotonic_clock::now();
Austin Schuh7267c532019-05-19 19:55:53 -070079 }
80 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080081 return queue_->WriteMessage(msg, RawQueue::kOverride);
82 }
83
Austin Schuhd681bbd2019-02-02 12:03:32 -080084 const char *name() const override { return queue_->name(); }
85
Parker Schuhe4a70d62017-12-27 20:10:20 -080086 private:
87 RawQueue *queue_;
88};
89} // namespace
90
91namespace internal {
92class WatcherThreadState {
93 public:
94 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
95 RawQueue *queue,
96 std::function<void(const aos::Message *message)> watcher)
97 : thread_state_(std::move(thread_state)),
98 queue_(queue),
Austin Schuh3578a2e2019-05-25 18:17:59 -070099 index_(0),
100 watcher_(std::move(watcher)) {
101 static constexpr Options<RawQueue> kOptions =
102 RawQueue::kFromEnd | RawQueue::kNonBlock;
103 const void *msg = queue_->ReadMessageIndex(kOptions, &index_);
104 if (msg) {
105 queue_->FreeMessage(msg);
106 }
107 }
Parker Schuhe4a70d62017-12-27 20:10:20 -0800108
109 void Run() {
110 thread_state_->WaitForStart();
111
112 if (!thread_state_->is_running()) return;
113
Austin Schuh3578a2e2019-05-25 18:17:59 -0700114 const void *msg = nullptr;
Parker Schuhe4a70d62017-12-27 20:10:20 -0800115 while (true) {
Austin Schuh3578a2e2019-05-25 18:17:59 -0700116 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index_);
117 assert(msg != nullptr);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800118
119 {
Neil Balch229001a2018-01-07 18:22:52 -0800120 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800121 if (!thread_state_->is_running()) break;
122
123 watcher_(reinterpret_cast<const Message *>(msg));
124 // watcher_ may have exited the event loop.
125 if (!thread_state_->is_running()) break;
126 }
127 queue_->FreeMessage(msg);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800128 }
129
130 queue_->FreeMessage(msg);
131 }
132
133 private:
134 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
135 RawQueue *queue_;
Austin Schuh3578a2e2019-05-25 18:17:59 -0700136 int32_t index_;
137
Parker Schuhe4a70d62017-12-27 20:10:20 -0800138 std::function<void(const Message *message)> watcher_;
139};
Neil Balch229001a2018-01-07 18:22:52 -0800140
141class TimerHandlerState : public TimerHandler {
142 public:
143 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
144 ::std::function<void()> fn)
145 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
146 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
147 PCHECK(fd_ != -1);
148 }
149
150 ~TimerHandlerState() {
151 PCHECK(close(fd_) == 0);
152 }
153
154 void Setup(monotonic_clock::time_point base,
155 monotonic_clock::duration repeat_offset) override {
156 struct itimerspec new_value;
157 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
158 new_value.it_value = ::aos::time::to_timespec(base);
159 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
160 }
161
162 void Disable() override {
163 // Disarm the timer by feeding zero values
164 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
165 }
166
167 void Run() {
168 thread_state_->WaitForStart();
169
170 while (true) {
171 uint64_t buf;
172 ssize_t result = read(fd_, &buf, sizeof(buf));
173 PCHECK(result != -1);
174 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
175
176 {
177 MutexLocker locker(&thread_state_->mutex_);
178 if (!thread_state_->is_running()) break;
179 fn_();
180 // fn_ may have exited the event loop.
181 if (!thread_state_->is_running()) break;
182 }
183 }
184 }
185
186 private:
187 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
188
189 // File descriptor for the timer
190 int fd_;
191
192 // Function to be run on the thread
193 ::std::function<void()> fn_;
194};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800195} // namespace internal
196
197std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
198 const std::string &path, const QueueTypeInfo &type) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800199 return std::unique_ptr<RawFetcher>(new ShmFetcher(
200 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
201}
202
203std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
204 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800205 Take(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800206 return std::unique_ptr<RawSender>(new ShmSender(
207 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
208}
209
210void ShmEventLoop::MakeRawWatcher(
211 const std::string &path, const QueueTypeInfo &type,
212 std::function<void(const Message *message)> watcher) {
213 Take(path);
214 auto *state = new internal::WatcherThreadState(
215 thread_state_,
216 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
217 std::move(watcher));
218
219 std::thread thread([state] {
220 state->Run();
221 delete state;
222 });
223 thread.detach();
224}
225
Neil Balch229001a2018-01-07 18:22:52 -0800226TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
227 internal::TimerHandlerState *timer =
228 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
229
230 ::std::thread t([timer] {
231 timer->Run();
232 delete timer;
233 });
234 t.detach();
235
236 return timer;
237}
238
Parker Schuhe4a70d62017-12-27 20:10:20 -0800239void ShmEventLoop::OnRun(std::function<void()> on_run) {
240 on_run_.push_back(std::move(on_run));
241}
242
243void ShmEventLoop::Run() {
244 set_is_running(true);
245 for (const auto &run : on_run_) run();
Austin Schuha1654ed2019-01-27 17:24:54 -0800246 // TODO(austin): epoll event loop in main thread (if needed), and async safe
247 // quit handler.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800248 thread_state_->Run();
249}
250
251void ShmEventLoop::ThreadState::Run() {
252 MutexLocker locker(&mutex_);
253 loop_running_ = true;
254 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
255 loop_running_cond_.Broadcast();
256 while (loop_running_) {
257 if (loop_running_cond_.Wait()) {
258 ::aos::Die("ShmEventLoop mutex lock problem.\n");
259 }
260 }
261}
262
263void ShmEventLoop::ThreadState::WaitForStart() {
264 MutexLocker locker(&mutex_);
265 while (!(loop_running_ || loop_finished_)) {
266 if (loop_running_cond_.Wait()) {
267 ::aos::Die("ShmEventLoop mutex lock problem.\n");
268 }
269 }
270}
271
272void ShmEventLoop::Exit() {
273 set_is_running(false);
274 thread_state_->Exit();
275}
276
277void ShmEventLoop::ThreadState::Exit() {
278 IPCRecursiveMutexLocker locker(&mutex_);
279 if (locker.owner_died()) ::aos::Die("Owner died");
280 loop_running_ = false;
281 loop_finished_ = true;
282 loop_running_cond_.Broadcast();
283}
284
285ShmEventLoop::~ShmEventLoop() {
286 if (is_running()) {
287 ::aos::Die("ShmEventLoop destroyed while running\n");
288 }
289}
290
291void ShmEventLoop::Take(const std::string &path) {
292 if (is_running()) {
293 ::aos::Die("Cannot add new objects while running.\n");
294 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800295
296 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
297 if (prior != taken_.end()) {
298 ::aos::Die("%s is already being used.", path.c_str());
299 } else {
300 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800301 }
302}
303
304} // namespace aos