blob: c9ff6fde6bb965f7986f83c7859d3d5b2d98735c [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh81fc9cc2019-02-02 23:25:47 -08009#include "aos/logging/logging.h"
10#include "aos/queue.h"
11
Parker Schuhe4a70d62017-12-27 20:10:20 -080012namespace aos {
13
14ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
15
16namespace {
17class ShmFetcher : public RawFetcher {
18 public:
19 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
20 ~ShmFetcher() {
21 if (msg_) {
22 queue_->FreeMessage(msg_);
23 }
24 }
25
James Kuszmaulc79768b2019-02-18 15:08:44 -080026 bool FetchNext() override {
27 const FetchValue *msg = static_cast<const FetchValue *>(
28 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
29 // Only update the internal pointer if we got a new message.
30 if (msg != NULL) {
31 queue_->FreeMessage(msg_);
32 msg_ = msg;
33 set_most_recent(msg_);
34 }
35 return msg != NULL;
36 }
37
38 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080039 static constexpr Options<RawQueue> kOptions =
40 RawQueue::kFromEnd | RawQueue::kNonBlock;
41 const FetchValue *msg = static_cast<const FetchValue *>(
42 queue_->ReadMessageIndex(kOptions, &index_));
43 // Only update the internal pointer if we got a new message.
44 if (msg != NULL && msg != msg_) {
45 queue_->FreeMessage(msg_);
46 msg_ = msg;
47 set_most_recent(msg_);
48 return true;
49 }
50 // The message has to get freed if we didn't use it (and
51 // RawQueue::FreeMessage is ok to call on NULL).
52 queue_->FreeMessage(msg);
53 return false;
54 }
55
56 private:
57 int index_ = 0;
58 RawQueue *queue_;
59 const FetchValue *msg_ = nullptr;
60};
61
62class ShmSender : public RawSender {
63 public:
64 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
65
66 SendContext *GetContext() override {
67 return reinterpret_cast<SendContext *>(queue_->GetMessage());
68 }
69
70 void Free(SendContext *context) override { queue_->FreeMessage(context); }
71
72 bool Send(SendContext *msg) override {
73 assert(queue_ != NULL);
Austin Schuh7267c532019-05-19 19:55:53 -070074 {
75 ::aos::Message *aos_msg = reinterpret_cast<Message *>(msg);
76 // TODO(austin): This lets multiple senders reorder messages since time
77 // isn't acquired with a lock held.
78 if (aos_msg->sent_time == monotonic_clock::min_time) {
79 aos_msg->sent_time = monotonic_clock::now();
80 }
81 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080082 return queue_->WriteMessage(msg, RawQueue::kOverride);
83 }
84
Austin Schuhd681bbd2019-02-02 12:03:32 -080085 const char *name() const override { return queue_->name(); }
86
Parker Schuhe4a70d62017-12-27 20:10:20 -080087 private:
88 RawQueue *queue_;
89};
90} // namespace
91
92namespace internal {
93class WatcherThreadState {
94 public:
95 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
96 RawQueue *queue,
97 std::function<void(const aos::Message *message)> watcher)
98 : thread_state_(std::move(thread_state)),
99 queue_(queue),
100 watcher_(std::move(watcher)) {}
101
102 void Run() {
103 thread_state_->WaitForStart();
104
105 if (!thread_state_->is_running()) return;
106
107 int32_t index = 0;
108
109 static constexpr Options<RawQueue> kOptions =
110 RawQueue::kFromEnd | RawQueue::kNonBlock;
111 const void *msg = queue_->ReadMessageIndex(kOptions, &index);
112
113 while (true) {
114 if (msg == nullptr) {
115 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
116 assert(msg != nullptr);
117 }
118
119 {
Neil Balch229001a2018-01-07 18:22:52 -0800120 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800121 if (!thread_state_->is_running()) break;
122
123 watcher_(reinterpret_cast<const Message *>(msg));
124 // watcher_ may have exited the event loop.
125 if (!thread_state_->is_running()) break;
126 }
127 queue_->FreeMessage(msg);
128 msg = nullptr;
129 }
130
131 queue_->FreeMessage(msg);
132 }
133
134 private:
135 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
136 RawQueue *queue_;
137 std::function<void(const Message *message)> watcher_;
138};
Neil Balch229001a2018-01-07 18:22:52 -0800139
140class TimerHandlerState : public TimerHandler {
141 public:
142 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
143 ::std::function<void()> fn)
144 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
145 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
146 PCHECK(fd_ != -1);
147 }
148
149 ~TimerHandlerState() {
150 PCHECK(close(fd_) == 0);
151 }
152
153 void Setup(monotonic_clock::time_point base,
154 monotonic_clock::duration repeat_offset) override {
155 struct itimerspec new_value;
156 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
157 new_value.it_value = ::aos::time::to_timespec(base);
158 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
159 }
160
161 void Disable() override {
162 // Disarm the timer by feeding zero values
163 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
164 }
165
166 void Run() {
167 thread_state_->WaitForStart();
168
169 while (true) {
170 uint64_t buf;
171 ssize_t result = read(fd_, &buf, sizeof(buf));
172 PCHECK(result != -1);
173 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
174
175 {
176 MutexLocker locker(&thread_state_->mutex_);
177 if (!thread_state_->is_running()) break;
178 fn_();
179 // fn_ may have exited the event loop.
180 if (!thread_state_->is_running()) break;
181 }
182 }
183 }
184
185 private:
186 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
187
188 // File descriptor for the timer
189 int fd_;
190
191 // Function to be run on the thread
192 ::std::function<void()> fn_;
193};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800194} // namespace internal
195
196std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
197 const std::string &path, const QueueTypeInfo &type) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800198 return std::unique_ptr<RawFetcher>(new ShmFetcher(
199 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
200}
201
202std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
203 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800204 Take(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800205 return std::unique_ptr<RawSender>(new ShmSender(
206 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
207}
208
209void ShmEventLoop::MakeRawWatcher(
210 const std::string &path, const QueueTypeInfo &type,
211 std::function<void(const Message *message)> watcher) {
212 Take(path);
213 auto *state = new internal::WatcherThreadState(
214 thread_state_,
215 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
216 std::move(watcher));
217
218 std::thread thread([state] {
219 state->Run();
220 delete state;
221 });
222 thread.detach();
223}
224
Neil Balch229001a2018-01-07 18:22:52 -0800225TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
226 internal::TimerHandlerState *timer =
227 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
228
229 ::std::thread t([timer] {
230 timer->Run();
231 delete timer;
232 });
233 t.detach();
234
235 return timer;
236}
237
Parker Schuhe4a70d62017-12-27 20:10:20 -0800238void ShmEventLoop::OnRun(std::function<void()> on_run) {
239 on_run_.push_back(std::move(on_run));
240}
241
242void ShmEventLoop::Run() {
243 set_is_running(true);
244 for (const auto &run : on_run_) run();
Austin Schuha1654ed2019-01-27 17:24:54 -0800245 // TODO(austin): epoll event loop in main thread (if needed), and async safe
246 // quit handler.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800247 thread_state_->Run();
248}
249
250void ShmEventLoop::ThreadState::Run() {
251 MutexLocker locker(&mutex_);
252 loop_running_ = true;
253 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
254 loop_running_cond_.Broadcast();
255 while (loop_running_) {
256 if (loop_running_cond_.Wait()) {
257 ::aos::Die("ShmEventLoop mutex lock problem.\n");
258 }
259 }
260}
261
262void ShmEventLoop::ThreadState::WaitForStart() {
263 MutexLocker locker(&mutex_);
264 while (!(loop_running_ || loop_finished_)) {
265 if (loop_running_cond_.Wait()) {
266 ::aos::Die("ShmEventLoop mutex lock problem.\n");
267 }
268 }
269}
270
271void ShmEventLoop::Exit() {
272 set_is_running(false);
273 thread_state_->Exit();
274}
275
276void ShmEventLoop::ThreadState::Exit() {
277 IPCRecursiveMutexLocker locker(&mutex_);
278 if (locker.owner_died()) ::aos::Die("Owner died");
279 loop_running_ = false;
280 loop_finished_ = true;
281 loop_running_cond_.Broadcast();
282}
283
284ShmEventLoop::~ShmEventLoop() {
285 if (is_running()) {
286 ::aos::Die("ShmEventLoop destroyed while running\n");
287 }
288}
289
290void ShmEventLoop::Take(const std::string &path) {
291 if (is_running()) {
292 ::aos::Die("Cannot add new objects while running.\n");
293 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800294
295 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
296 if (prior != taken_.end()) {
297 ::aos::Die("%s is already being used.", path.c_str());
298 } else {
299 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800300 }
301}
302
303} // namespace aos