blob: 8662bc88ad441fd912e15557f1b2b397af4ef827 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include "aos/events/shm-event-loop.h"
Parker Schuhe4a70d62017-12-27 20:10:20 -08002
Neil Balch229001a2018-01-07 18:22:52 -08003#include <sys/timerfd.h>
Austin Schuh81fc9cc2019-02-02 23:25:47 -08004#include <algorithm>
Parker Schuhe4a70d62017-12-27 20:10:20 -08005#include <atomic>
6#include <chrono>
7#include <stdexcept>
8
Austin Schuh81fc9cc2019-02-02 23:25:47 -08009#include "aos/logging/logging.h"
10#include "aos/queue.h"
11
Parker Schuhe4a70d62017-12-27 20:10:20 -080012namespace aos {
13
14ShmEventLoop::ShmEventLoop() : thread_state_(std::make_shared<ThreadState>()) {}
15
16namespace {
17class ShmFetcher : public RawFetcher {
18 public:
19 explicit ShmFetcher(RawQueue *queue) : queue_(queue) {}
20 ~ShmFetcher() {
21 if (msg_) {
22 queue_->FreeMessage(msg_);
23 }
24 }
25
James Kuszmaulc79768b2019-02-18 15:08:44 -080026 bool FetchNext() override {
27 const FetchValue *msg = static_cast<const FetchValue *>(
28 queue_->ReadMessageIndex(RawQueue::kNonBlock, &index_));
29 // Only update the internal pointer if we got a new message.
30 if (msg != NULL) {
31 queue_->FreeMessage(msg_);
32 msg_ = msg;
33 set_most_recent(msg_);
34 }
35 return msg != NULL;
36 }
37
38 bool Fetch() override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080039 static constexpr Options<RawQueue> kOptions =
40 RawQueue::kFromEnd | RawQueue::kNonBlock;
41 const FetchValue *msg = static_cast<const FetchValue *>(
42 queue_->ReadMessageIndex(kOptions, &index_));
43 // Only update the internal pointer if we got a new message.
44 if (msg != NULL && msg != msg_) {
45 queue_->FreeMessage(msg_);
46 msg_ = msg;
47 set_most_recent(msg_);
48 return true;
49 }
50 // The message has to get freed if we didn't use it (and
51 // RawQueue::FreeMessage is ok to call on NULL).
52 queue_->FreeMessage(msg);
53 return false;
54 }
55
56 private:
57 int index_ = 0;
58 RawQueue *queue_;
59 const FetchValue *msg_ = nullptr;
60};
61
62class ShmSender : public RawSender {
63 public:
64 explicit ShmSender(RawQueue *queue) : queue_(queue) {}
65
James Kuszmaulcd1db352019-05-26 16:42:29 -070066 aos::Message *GetMessage() override {
67 return reinterpret_cast<aos::Message *>(queue_->GetMessage());
Parker Schuhe4a70d62017-12-27 20:10:20 -080068 }
69
James Kuszmaulcd1db352019-05-26 16:42:29 -070070 void Free(aos::Message *msg) override { queue_->FreeMessage(msg); }
Parker Schuhe4a70d62017-12-27 20:10:20 -080071
James Kuszmaulcd1db352019-05-26 16:42:29 -070072 bool Send(aos::Message *msg) override {
Parker Schuhe4a70d62017-12-27 20:10:20 -080073 assert(queue_ != NULL);
Austin Schuh7267c532019-05-19 19:55:53 -070074 {
Austin Schuh7267c532019-05-19 19:55:53 -070075 // TODO(austin): This lets multiple senders reorder messages since time
76 // isn't acquired with a lock held.
James Kuszmaulcd1db352019-05-26 16:42:29 -070077 if (msg->sent_time == monotonic_clock::min_time) {
78 msg->sent_time = monotonic_clock::now();
Austin Schuh7267c532019-05-19 19:55:53 -070079 }
80 }
Parker Schuhe4a70d62017-12-27 20:10:20 -080081 return queue_->WriteMessage(msg, RawQueue::kOverride);
82 }
83
Austin Schuhd681bbd2019-02-02 12:03:32 -080084 const char *name() const override { return queue_->name(); }
85
Parker Schuhe4a70d62017-12-27 20:10:20 -080086 private:
87 RawQueue *queue_;
88};
89} // namespace
90
91namespace internal {
92class WatcherThreadState {
93 public:
94 WatcherThreadState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
95 RawQueue *queue,
96 std::function<void(const aos::Message *message)> watcher)
97 : thread_state_(std::move(thread_state)),
98 queue_(queue),
99 watcher_(std::move(watcher)) {}
100
101 void Run() {
102 thread_state_->WaitForStart();
103
104 if (!thread_state_->is_running()) return;
105
106 int32_t index = 0;
107
108 static constexpr Options<RawQueue> kOptions =
109 RawQueue::kFromEnd | RawQueue::kNonBlock;
110 const void *msg = queue_->ReadMessageIndex(kOptions, &index);
111
112 while (true) {
113 if (msg == nullptr) {
114 msg = queue_->ReadMessageIndex(RawQueue::kBlock, &index);
115 assert(msg != nullptr);
116 }
117
118 {
Neil Balch229001a2018-01-07 18:22:52 -0800119 MutexLocker locker(&thread_state_->mutex_);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800120 if (!thread_state_->is_running()) break;
121
122 watcher_(reinterpret_cast<const Message *>(msg));
123 // watcher_ may have exited the event loop.
124 if (!thread_state_->is_running()) break;
125 }
126 queue_->FreeMessage(msg);
127 msg = nullptr;
128 }
129
130 queue_->FreeMessage(msg);
131 }
132
133 private:
134 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
135 RawQueue *queue_;
136 std::function<void(const Message *message)> watcher_;
137};
Neil Balch229001a2018-01-07 18:22:52 -0800138
139class TimerHandlerState : public TimerHandler {
140 public:
141 TimerHandlerState(std::shared_ptr<ShmEventLoop::ThreadState> thread_state,
142 ::std::function<void()> fn)
143 : thread_state_(std::move(thread_state)), fn_(::std::move(fn)) {
144 fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
145 PCHECK(fd_ != -1);
146 }
147
148 ~TimerHandlerState() {
149 PCHECK(close(fd_) == 0);
150 }
151
152 void Setup(monotonic_clock::time_point base,
153 monotonic_clock::duration repeat_offset) override {
154 struct itimerspec new_value;
155 new_value.it_interval = ::aos::time::to_timespec(repeat_offset);
156 new_value.it_value = ::aos::time::to_timespec(base);
157 PCHECK(timerfd_settime(fd_, TFD_TIMER_ABSTIME, &new_value, nullptr) == 0);
158 }
159
160 void Disable() override {
161 // Disarm the timer by feeding zero values
162 Setup(::aos::monotonic_clock::epoch(), ::aos::monotonic_clock::zero());
163 }
164
165 void Run() {
166 thread_state_->WaitForStart();
167
168 while (true) {
169 uint64_t buf;
170 ssize_t result = read(fd_, &buf, sizeof(buf));
171 PCHECK(result != -1);
172 CHECK_EQ(result, static_cast<int>(sizeof(buf)));
173
174 {
175 MutexLocker locker(&thread_state_->mutex_);
176 if (!thread_state_->is_running()) break;
177 fn_();
178 // fn_ may have exited the event loop.
179 if (!thread_state_->is_running()) break;
180 }
181 }
182 }
183
184 private:
185 std::shared_ptr<ShmEventLoop::ThreadState> thread_state_;
186
187 // File descriptor for the timer
188 int fd_;
189
190 // Function to be run on the thread
191 ::std::function<void()> fn_;
192};
Parker Schuhe4a70d62017-12-27 20:10:20 -0800193} // namespace internal
194
195std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
196 const std::string &path, const QueueTypeInfo &type) {
Parker Schuhe4a70d62017-12-27 20:10:20 -0800197 return std::unique_ptr<RawFetcher>(new ShmFetcher(
198 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
199}
200
201std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
202 const std::string &path, const QueueTypeInfo &type) {
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800203 Take(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800204 return std::unique_ptr<RawSender>(new ShmSender(
205 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length)));
206}
207
208void ShmEventLoop::MakeRawWatcher(
209 const std::string &path, const QueueTypeInfo &type,
210 std::function<void(const Message *message)> watcher) {
211 Take(path);
212 auto *state = new internal::WatcherThreadState(
213 thread_state_,
214 RawQueue::Fetch(path.c_str(), type.size, type.hash, type.queue_length),
215 std::move(watcher));
216
217 std::thread thread([state] {
218 state->Run();
219 delete state;
220 });
221 thread.detach();
222}
223
Neil Balch229001a2018-01-07 18:22:52 -0800224TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
225 internal::TimerHandlerState *timer =
226 new internal::TimerHandlerState(thread_state_, ::std::move(callback));
227
228 ::std::thread t([timer] {
229 timer->Run();
230 delete timer;
231 });
232 t.detach();
233
234 return timer;
235}
236
Parker Schuhe4a70d62017-12-27 20:10:20 -0800237void ShmEventLoop::OnRun(std::function<void()> on_run) {
238 on_run_.push_back(std::move(on_run));
239}
240
241void ShmEventLoop::Run() {
242 set_is_running(true);
243 for (const auto &run : on_run_) run();
Austin Schuha1654ed2019-01-27 17:24:54 -0800244 // TODO(austin): epoll event loop in main thread (if needed), and async safe
245 // quit handler.
Parker Schuhe4a70d62017-12-27 20:10:20 -0800246 thread_state_->Run();
247}
248
249void ShmEventLoop::ThreadState::Run() {
250 MutexLocker locker(&mutex_);
251 loop_running_ = true;
252 if (loop_finished_) ::aos::Die("Cannot restart an ShmEventLoop()");
253 loop_running_cond_.Broadcast();
254 while (loop_running_) {
255 if (loop_running_cond_.Wait()) {
256 ::aos::Die("ShmEventLoop mutex lock problem.\n");
257 }
258 }
259}
260
261void ShmEventLoop::ThreadState::WaitForStart() {
262 MutexLocker locker(&mutex_);
263 while (!(loop_running_ || loop_finished_)) {
264 if (loop_running_cond_.Wait()) {
265 ::aos::Die("ShmEventLoop mutex lock problem.\n");
266 }
267 }
268}
269
270void ShmEventLoop::Exit() {
271 set_is_running(false);
272 thread_state_->Exit();
273}
274
275void ShmEventLoop::ThreadState::Exit() {
276 IPCRecursiveMutexLocker locker(&mutex_);
277 if (locker.owner_died()) ::aos::Die("Owner died");
278 loop_running_ = false;
279 loop_finished_ = true;
280 loop_running_cond_.Broadcast();
281}
282
283ShmEventLoop::~ShmEventLoop() {
284 if (is_running()) {
285 ::aos::Die("ShmEventLoop destroyed while running\n");
286 }
287}
288
289void ShmEventLoop::Take(const std::string &path) {
290 if (is_running()) {
291 ::aos::Die("Cannot add new objects while running.\n");
292 }
Austin Schuh81fc9cc2019-02-02 23:25:47 -0800293
294 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
295 if (prior != taken_.end()) {
296 ::aos::Die("%s is already being used.", path.c_str());
297 } else {
298 taken_.emplace_back(path);
Parker Schuhe4a70d62017-12-27 20:10:20 -0800299 }
300}
301
302} // namespace aos