blob: 6980a9e4c921334bb90705f9b70f772514b724a4 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/shm_event_loop.h"
2
3#include <sys/mman.h>
4#include <sys/stat.h>
Austin Schuh39788ff2019-12-01 18:22:57 -08005#include <sys/syscall.h>
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include <sys/types.h>
7#include <unistd.h>
8#include <algorithm>
9#include <atomic>
10#include <chrono>
Austin Schuh39788ff2019-12-01 18:22:57 -080011#include <iterator>
Alex Perrycb7da4b2019-08-28 19:35:56 -070012#include <stdexcept>
13
14#include "aos/events/epoll.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080015#include "aos/events/event_loop_generated.h"
16#include "aos/events/timing_statistics.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/ipc_lib/lockless_queue.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080018#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070019#include "aos/realtime.h"
Austin Schuh32fd5a72019-12-01 22:20:26 -080020#include "aos/stl_mutex/stl_mutex.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070021#include "aos/util/phased_loop.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080022#include "glog/logging.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070023
24DEFINE_string(shm_base, "/dev/shm/aos",
25 "Directory to place queue backing mmaped files in.");
26DEFINE_uint32(permissions, 0770,
27 "Permissions to make shared memory files and folders.");
28
29namespace aos {
30
31std::string ShmFolder(const Channel *channel) {
32 CHECK(channel->has_name());
33 CHECK_EQ(channel->name()->string_view()[0], '/');
34 return FLAGS_shm_base + channel->name()->str() + "/";
35}
36std::string ShmPath(const Channel *channel) {
37 CHECK(channel->has_type());
38 return ShmFolder(channel) + channel->type()->str() + ".v0";
39}
40
41class MMapedQueue {
42 public:
43 MMapedQueue(const Channel *channel) {
44 std::string path = ShmPath(channel);
45
Austin Schuh80c7fce2019-12-05 20:48:43 -080046 config_.num_watchers = channel->num_watchers();
47 config_.num_senders = channel->num_senders();
Alex Perrycb7da4b2019-08-28 19:35:56 -070048 config_.queue_size = 2 * channel->frequency();
49 config_.message_data_size = channel->max_size();
50
51 size_ = ipc_lib::LocklessQueueMemorySize(config_);
52
53 MkdirP(path);
54
55 // There are 2 cases. Either the file already exists, or it does not
56 // already exist and we need to create it. Start by trying to create it. If
57 // that fails, the file has already been created and we can open it
58 // normally.. Once the file has been created it wil never be deleted.
59 fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
60 O_CLOEXEC | FLAGS_permissions);
61 if (fd_ == -1 && errno == EEXIST) {
62 VLOG(1) << path << " already created.";
63 // File already exists.
64 fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
65 PCHECK(fd_ != -1) << ": Failed to open " << path;
66 while (true) {
67 struct stat st;
68 PCHECK(fstat(fd_, &st) == 0);
69 if (st.st_size != 0) {
70 CHECK_EQ(static_cast<size_t>(st.st_size), size_)
71 << ": Size of " << path
72 << " doesn't match expected size of backing queue file. Did the "
73 "queue definition change?";
74 break;
75 } else {
76 // The creating process didn't get around to it yet. Give it a bit.
77 std::this_thread::sleep_for(std::chrono::milliseconds(10));
78 VLOG(1) << path << " is zero size, waiting";
79 }
80 }
81 } else {
82 VLOG(1) << "Created " << path;
83 PCHECK(fd_ != -1) << ": Failed to open " << path;
84 PCHECK(ftruncate(fd_, size_) == 0);
85 }
86
87 data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
88 PCHECK(data_ != MAP_FAILED);
89
90 ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
91 }
92
93 ~MMapedQueue() {
94 PCHECK(munmap(data_, size_) == 0);
95 PCHECK(close(fd_) == 0);
96 }
97
98 ipc_lib::LocklessQueueMemory *memory() const {
99 return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
100 }
101
Austin Schuh39788ff2019-12-01 18:22:57 -0800102 const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700103
104 private:
James Kuszmaul3ae42262019-11-08 12:33:41 -0800105 void MkdirP(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700106 auto last_slash_pos = path.find_last_of("/");
107
James Kuszmaul3ae42262019-11-08 12:33:41 -0800108 std::string folder(last_slash_pos == std::string_view::npos
109 ? std::string_view("")
Alex Perrycb7da4b2019-08-28 19:35:56 -0700110 : path.substr(0, last_slash_pos));
Austin Schuh8ec76182019-12-23 16:28:00 -0800111 if (folder.empty()) return;
112 MkdirP(folder);
113 VLOG(1) << "Creating " << folder;
114 const int result = mkdir(folder.c_str(), FLAGS_permissions);
115 if (result == -1 && errno == EEXIST) {
116 VLOG(1) << "Already exists";
117 return;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700118 }
Austin Schuh8ec76182019-12-23 16:28:00 -0800119 PCHECK(result == 0) << ": Error creating " << folder;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700120 }
121
122 ipc_lib::LocklessQueueConfiguration config_;
123
124 int fd_;
125
126 size_t size_;
127 void *data_;
128};
129
130// Returns the portion of the path after the last /.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800131std::string_view Filename(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700132 auto last_slash_pos = path.find_last_of("/");
133
James Kuszmaul3ae42262019-11-08 12:33:41 -0800134 return last_slash_pos == std::string_view::npos
Alex Perrycb7da4b2019-08-28 19:35:56 -0700135 ? path
136 : path.substr(last_slash_pos + 1, path.size());
137}
138
139ShmEventLoop::ShmEventLoop(const Configuration *configuration)
140 : EventLoop(configuration), name_(Filename(program_invocation_name)) {}
141
142namespace {
143
144namespace chrono = ::std::chrono;
145
Austin Schuh39788ff2019-12-01 18:22:57 -0800146} // namespace
147
148namespace internal {
149
150class SimpleShmFetcher {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700151 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800152 explicit SimpleShmFetcher(const Channel *channel)
153 : lockless_queue_memory_(channel),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700154 lockless_queue_(lockless_queue_memory_.memory(),
155 lockless_queue_memory_.config()),
156 data_storage_(static_cast<AlignedChar *>(aligned_alloc(
157 alignof(AlignedChar), channel->max_size())),
158 &free) {
159 context_.data = nullptr;
160 // Point the queue index at the next index to read starting now. This
161 // makes it such that FetchNext will read the next message sent after
162 // the fetcher is created.
163 PointAtNextQueueIndex();
164 }
165
Austin Schuh39788ff2019-12-01 18:22:57 -0800166 ~SimpleShmFetcher() {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700167
168 // Points the next message to fetch at the queue index which will be
169 // populated next.
170 void PointAtNextQueueIndex() {
171 actual_queue_index_ = lockless_queue_.LatestQueueIndex();
172 if (!actual_queue_index_.valid()) {
173 // Nothing in the queue. The next element will show up at the 0th
174 // index in the queue.
175 actual_queue_index_ =
176 ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
177 } else {
178 actual_queue_index_ = actual_queue_index_.Increment();
179 }
180 }
181
Austin Schuh39788ff2019-12-01 18:22:57 -0800182 bool FetchNext() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700183 // TODO(austin): Get behind and make sure it dies both here and with
184 // Fetch.
185 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
186 actual_queue_index_.index(), &context_.monotonic_sent_time,
187 &context_.realtime_sent_time, &context_.size,
188 reinterpret_cast<char *>(data_storage_.get()));
189 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
190 context_.queue_index = actual_queue_index_.index();
Austin Schuh39788ff2019-12-01 18:22:57 -0800191 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
192 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700193 actual_queue_index_ = actual_queue_index_.Increment();
194 }
195
196 // Make sure the data wasn't modified while we were reading it. This
197 // can only happen if you are reading the last message *while* it is
198 // being written to, which means you are pretty far behind.
199 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
200 << ": Got behind while reading and the last message was modified "
201 "out "
202 "from under us while we were reading it. Don't get so far "
203 "behind.";
204
205 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
206 << ": The next message is no longer available.";
207 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
208 }
209
Austin Schuh39788ff2019-12-01 18:22:57 -0800210 bool Fetch() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700211 const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
212 // actual_queue_index_ is only meaningful if it was set by Fetch or
213 // FetchNext. This happens when valid_data_ has been set. So, only
214 // skip checking if valid_data_ is true.
215 //
216 // Also, if the latest queue index is invalid, we are empty. So there
217 // is nothing to fetch.
Austin Schuh39788ff2019-12-01 18:22:57 -0800218 if ((context_.data != nullptr &&
Alex Perrycb7da4b2019-08-28 19:35:56 -0700219 queue_index == actual_queue_index_.DecrementBy(1u)) ||
220 !queue_index.valid()) {
221 return false;
222 }
223
224 ipc_lib::LocklessQueue::ReadResult read_result =
225 lockless_queue_.Read(queue_index.index(), &context_.monotonic_sent_time,
226 &context_.realtime_sent_time, &context_.size,
227 reinterpret_cast<char *>(data_storage_.get()));
228 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
229 context_.queue_index = queue_index.index();
Austin Schuh39788ff2019-12-01 18:22:57 -0800230 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
231 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700232 actual_queue_index_ = queue_index.Increment();
233 }
234
235 // Make sure the data wasn't modified while we were reading it. This
236 // can only happen if you are reading the last message *while* it is
237 // being written to, which means you are pretty far behind.
238 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
239 << ": Got behind while reading and the last message was modified "
240 "out "
241 "from under us while we were reading it. Don't get so far "
242 "behind.";
243
244 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
245 << ": Queue index went backwards. This should never happen.";
246
247 // We fell behind between when we read the index and read the value.
248 // This isn't worth recovering from since this means we went to sleep
249 // for a long time in the middle of this function.
250 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
251 << ": The next message is no longer available.";
252 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
253 }
254
Austin Schuh39788ff2019-12-01 18:22:57 -0800255 Context context() const { return context_; }
256
Alex Perrycb7da4b2019-08-28 19:35:56 -0700257 bool RegisterWakeup(int priority) {
258 return lockless_queue_.RegisterWakeup(priority);
259 }
260
261 void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
262
263 private:
264 MMapedQueue lockless_queue_memory_;
265 ipc_lib::LocklessQueue lockless_queue_;
266
267 ipc_lib::QueueIndex actual_queue_index_ =
268 ipc_lib::LocklessQueue::empty_queue_index();
269
270 struct AlignedChar {
271 alignas(32) char data;
272 };
273
274 std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800275
276 Context context_;
277};
278
279class ShmFetcher : public RawFetcher {
280 public:
281 explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
282 : RawFetcher(event_loop, channel), simple_shm_fetcher_(channel) {}
283
284 ~ShmFetcher() { context_.data = nullptr; }
285
286 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
287 if (simple_shm_fetcher_.FetchNext()) {
288 context_ = simple_shm_fetcher_.context();
289 return std::make_pair(true, monotonic_clock::now());
290 }
291 return std::make_pair(false, monotonic_clock::min_time);
292 }
293
294 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
295 if (simple_shm_fetcher_.Fetch()) {
296 context_ = simple_shm_fetcher_.context();
297 return std::make_pair(true, monotonic_clock::now());
298 }
299 return std::make_pair(false, monotonic_clock::min_time);
300 }
301
302 private:
303 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700304};
305
306class ShmSender : public RawSender {
307 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800308 explicit ShmSender(EventLoop *event_loop, const Channel *channel)
309 : RawSender(event_loop, channel),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700310 lockless_queue_memory_(channel),
311 lockless_queue_(lockless_queue_memory_.memory(),
312 lockless_queue_memory_.config()),
313 lockless_queue_sender_(lockless_queue_.MakeSender()) {}
314
Austin Schuh39788ff2019-12-01 18:22:57 -0800315 ~ShmSender() override {}
316
Alex Perrycb7da4b2019-08-28 19:35:56 -0700317 void *data() override { return lockless_queue_sender_.Data(); }
318 size_t size() override { return lockless_queue_sender_.size(); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800319 bool DoSend(size_t length) override {
320 lockless_queue_sender_.Send(length);
321 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700322 return true;
323 }
324
Austin Schuh39788ff2019-12-01 18:22:57 -0800325 bool DoSend(const void *msg, size_t length) override {
Austin Schuh4726ce92019-11-29 13:23:18 -0800326 lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length);
Austin Schuh39788ff2019-12-01 18:22:57 -0800327 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700328 // TODO(austin): Return an error if we send too fast.
329 return true;
330 }
331
Alex Perrycb7da4b2019-08-28 19:35:56 -0700332 private:
Alex Perrycb7da4b2019-08-28 19:35:56 -0700333 MMapedQueue lockless_queue_memory_;
334 ipc_lib::LocklessQueue lockless_queue_;
335 ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
336};
337
Alex Perrycb7da4b2019-08-28 19:35:56 -0700338// Class to manage the state for a Watcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800339class WatcherState : public aos::WatcherState {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700340 public:
341 WatcherState(
Austin Schuh7d87b672019-12-01 20:23:49 -0800342 ShmEventLoop *event_loop, const Channel *channel,
Austin Schuh39788ff2019-12-01 18:22:57 -0800343 std::function<void(const Context &context, const void *message)> fn)
344 : aos::WatcherState(event_loop, channel, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800345 event_loop_(event_loop),
346 event_(this),
Austin Schuh39788ff2019-12-01 18:22:57 -0800347 simple_shm_fetcher_(channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700348
Austin Schuh7d87b672019-12-01 20:23:49 -0800349 ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800350
351 void Startup(EventLoop *event_loop) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800352 simple_shm_fetcher_.PointAtNextQueueIndex();
Austin Schuh39788ff2019-12-01 18:22:57 -0800353 CHECK(RegisterWakeup(event_loop->priority()));
354 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700355
Alex Perrycb7da4b2019-08-28 19:35:56 -0700356 // Returns true if there is new data available.
Austin Schuh7d87b672019-12-01 20:23:49 -0800357 bool CheckForNewData() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700358 if (!has_new_data_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800359 has_new_data_ = simple_shm_fetcher_.FetchNext();
Austin Schuh7d87b672019-12-01 20:23:49 -0800360
361 if (has_new_data_) {
362 event_.set_event_time(
363 simple_shm_fetcher_.context().monotonic_sent_time);
364 event_loop_->AddEvent(&event_);
365 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700366 }
367
368 return has_new_data_;
369 }
370
Alex Perrycb7da4b2019-08-28 19:35:56 -0700371 // Consumes the data by calling the callback.
Austin Schuh7d87b672019-12-01 20:23:49 -0800372 void HandleEvent() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700373 CHECK(has_new_data_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800374 DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700375 has_new_data_ = false;
Austin Schuh7d87b672019-12-01 20:23:49 -0800376 CheckForNewData();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700377 }
378
Austin Schuh39788ff2019-12-01 18:22:57 -0800379 // Registers us to receive a signal on event reception.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700380 bool RegisterWakeup(int priority) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800381 return simple_shm_fetcher_.RegisterWakeup(priority);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700382 }
383
Austin Schuh39788ff2019-12-01 18:22:57 -0800384 void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700385
386 private:
387 bool has_new_data_ = false;
388
Austin Schuh7d87b672019-12-01 20:23:49 -0800389 ShmEventLoop *event_loop_;
390 EventHandler<WatcherState> event_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800391 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700392};
393
394// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh7d87b672019-12-01 20:23:49 -0800395class TimerHandlerState final : public TimerHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700396 public:
397 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -0800398 : TimerHandler(shm_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800399 shm_event_loop_(shm_event_loop),
400 event_(this) {
401 shm_event_loop_->epoll_.OnReadable(
402 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700403 }
404
Austin Schuh7d87b672019-12-01 20:23:49 -0800405 ~TimerHandlerState() {
406 Disable();
407 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
408 }
409
410 void HandleEvent() {
411 uint64_t elapsed_cycles = timerfd_.Read();
412 if (elapsed_cycles == 0u) {
413 // We got called before the timer interrupt could happen, but because we
414 // are checking the time, we got called on time. Push the timer out by 1
415 // cycle.
416 elapsed_cycles = 1u;
417 timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
418 }
419
420 Call(monotonic_clock::now, base_);
421
422 base_ += repeat_offset_ * elapsed_cycles;
423
424 if (repeat_offset_ != chrono::seconds(0)) {
425 event_.set_event_time(base_);
426 shm_event_loop_->AddEvent(&event_);
427 }
428 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700429
430 void Setup(monotonic_clock::time_point base,
431 monotonic_clock::duration repeat_offset) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800432 if (event_.valid()) {
433 shm_event_loop_->RemoveEvent(&event_);
434 }
435
Alex Perrycb7da4b2019-08-28 19:35:56 -0700436 timerfd_.SetTime(base, repeat_offset);
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800437 base_ = base;
438 repeat_offset_ = repeat_offset;
Austin Schuh7d87b672019-12-01 20:23:49 -0800439 event_.set_event_time(base_);
440 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700441 }
442
Austin Schuh7d87b672019-12-01 20:23:49 -0800443 void Disable() override {
444 shm_event_loop_->RemoveEvent(&event_);
445 timerfd_.Disable();
446 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700447
448 private:
449 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800450 EventHandler<TimerHandlerState> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700451
452 TimerFd timerfd_;
453
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800454 monotonic_clock::time_point base_;
455 monotonic_clock::duration repeat_offset_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700456};
457
458// Adapter class to the timerfd and PhasedLoop.
Austin Schuh7d87b672019-12-01 20:23:49 -0800459class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700460 public:
461 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
462 const monotonic_clock::duration interval,
463 const monotonic_clock::duration offset)
Austin Schuh39788ff2019-12-01 18:22:57 -0800464 : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
Austin Schuh7d87b672019-12-01 20:23:49 -0800465 shm_event_loop_(shm_event_loop),
466 event_(this) {
467 shm_event_loop_->epoll_.OnReadable(
468 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
469 }
470
471 void HandleEvent() {
472 // The return value for read is the number of cycles that have elapsed.
473 // Because we check to see when this event *should* have happened, there are
474 // cases where Read() will return 0, when 1 cycle has actually happened.
475 // This occurs when the timer interrupt hasn't triggered yet. Therefore,
476 // ignore it. Call handles rescheduling and calculating elapsed cycles
477 // without any extra help.
478 timerfd_.Read();
479 event_.Invalidate();
480
481 Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
482 Schedule(sleep_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700483 });
484 }
485
Austin Schuh39788ff2019-12-01 18:22:57 -0800486 ~PhasedLoopHandler() override {
487 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
Austin Schuh7d87b672019-12-01 20:23:49 -0800488 shm_event_loop_->RemoveEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700489 }
490
491 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800492 // Reschedules the timer.
Austin Schuh39788ff2019-12-01 18:22:57 -0800493 void Schedule(monotonic_clock::time_point sleep_time) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800494 if (event_.valid()) {
495 shm_event_loop_->RemoveEvent(&event_);
496 }
497
Austin Schuh39788ff2019-12-01 18:22:57 -0800498 timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
Austin Schuh7d87b672019-12-01 20:23:49 -0800499 event_.set_event_time(sleep_time);
500 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700501 }
502
503 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800504 EventHandler<PhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700505
506 TimerFd timerfd_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700507};
508} // namespace internal
509
510::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
511 const Channel *channel) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800512 return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700513}
514
515::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
516 const Channel *channel) {
517 Take(channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800518
519 return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700520}
521
522void ShmEventLoop::MakeRawWatcher(
523 const Channel *channel,
524 std::function<void(const Context &context, const void *message)> watcher) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700525 Take(channel);
526
Austin Schuh39788ff2019-12-01 18:22:57 -0800527 NewWatcher(::std::unique_ptr<WatcherState>(
528 new internal::WatcherState(this, channel, std::move(watcher))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700529}
530
531TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800532 return NewTimer(::std::unique_ptr<TimerHandler>(
533 new internal::TimerHandlerState(this, ::std::move(callback))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700534}
535
536PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
537 ::std::function<void(int)> callback,
538 const monotonic_clock::duration interval,
539 const monotonic_clock::duration offset) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800540 return NewPhasedLoop(
541 ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
542 this, ::std::move(callback), interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700543}
544
545void ShmEventLoop::OnRun(::std::function<void()> on_run) {
546 on_run_.push_back(::std::move(on_run));
547}
548
Austin Schuh7d87b672019-12-01 20:23:49 -0800549void ShmEventLoop::HandleEvent() {
550 // Update all the times for handlers.
551 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
552 internal::WatcherState *watcher =
553 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
554
555 watcher->CheckForNewData();
556 }
557
Austin Schuh39788ff2019-12-01 18:22:57 -0800558 while (true) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800559 if (EventCount() == 0 ||
560 PeekEvent()->event_time() > monotonic_clock::now()) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800561 break;
562 }
563
Austin Schuh7d87b672019-12-01 20:23:49 -0800564 EventLoopEvent *event = PopEvent();
565 event->HandleEvent();
Austin Schuh39788ff2019-12-01 18:22:57 -0800566 }
567}
568
Austin Schuh32fd5a72019-12-01 22:20:26 -0800569// RAII class to mask signals.
570class ScopedSignalMask {
571 public:
572 ScopedSignalMask(std::initializer_list<int> signals) {
573 sigset_t sigset;
574 PCHECK(sigemptyset(&sigset) == 0);
575 for (int signal : signals) {
576 PCHECK(sigaddset(&sigset, signal) == 0);
577 }
578
579 PCHECK(sigprocmask(SIG_BLOCK, &sigset, &old_) == 0);
580 }
581
582 ~ScopedSignalMask() { PCHECK(sigprocmask(SIG_SETMASK, &old_, nullptr) == 0); }
583
584 private:
585 sigset_t old_;
586};
587
588// Class to manage the static state associated with killing multiple event
589// loops.
590class SignalHandler {
591 public:
592 // Gets the singleton.
593 static SignalHandler *global() {
594 static SignalHandler loop;
595 return &loop;
596 }
597
598 // Handles the signal with the singleton.
599 static void HandleSignal(int) { global()->DoHandleSignal(); }
600
601 // Registers an event loop to receive Exit() calls.
602 void Register(ShmEventLoop *event_loop) {
603 // Block signals while we have the mutex so we never race with the signal
604 // handler.
605 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
606 std::unique_lock<stl_mutex> locker(mutex_);
607 if (event_loops_.size() == 0) {
608 // The first caller registers the signal handler.
609 struct sigaction new_action;
610 sigemptyset(&new_action.sa_mask);
611 // This makes it so that 2 control c's to a stuck process will kill it by
612 // restoring the original signal handler.
613 new_action.sa_flags = SA_RESETHAND;
614 new_action.sa_handler = &HandleSignal;
615
616 PCHECK(sigaction(SIGINT, &new_action, &old_action_int_) == 0);
617 PCHECK(sigaction(SIGHUP, &new_action, &old_action_hup_) == 0);
618 PCHECK(sigaction(SIGTERM, &new_action, &old_action_term_) == 0);
619 }
620
621 event_loops_.push_back(event_loop);
622 }
623
624 // Unregisters an event loop to receive Exit() calls.
625 void Unregister(ShmEventLoop *event_loop) {
626 // Block signals while we have the mutex so we never race with the signal
627 // handler.
628 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
629 std::unique_lock<stl_mutex> locker(mutex_);
630
631 event_loops_.erase(std::find(event_loops_.begin(), event_loops_.end(), event_loop));
632
633 if (event_loops_.size() == 0u) {
634 // The last caller restores the original signal handlers.
635 PCHECK(sigaction(SIGINT, &old_action_int_, nullptr) == 0);
636 PCHECK(sigaction(SIGHUP, &old_action_hup_, nullptr) == 0);
637 PCHECK(sigaction(SIGTERM, &old_action_term_, nullptr) == 0);
638 }
639 }
640
641 private:
642 void DoHandleSignal() {
643 // We block signals while grabbing the lock, so there should never be a
644 // race. Confirm that this is true using trylock.
645 CHECK(mutex_.try_lock()) << ": sigprocmask failed to block signals while "
646 "modifing the event loop list.";
647 for (ShmEventLoop *event_loop : event_loops_) {
648 event_loop->Exit();
649 }
650 mutex_.unlock();
651 }
652
653 // Mutex to protect all state.
654 stl_mutex mutex_;
655 std::vector<ShmEventLoop *> event_loops_;
656 struct sigaction old_action_int_;
657 struct sigaction old_action_hup_;
658 struct sigaction old_action_term_;
659};
660
Alex Perrycb7da4b2019-08-28 19:35:56 -0700661void ShmEventLoop::Run() {
Austin Schuh32fd5a72019-12-01 22:20:26 -0800662 SignalHandler::global()->Register(this);
Austin Schuh39788ff2019-12-01 18:22:57 -0800663
Alex Perrycb7da4b2019-08-28 19:35:56 -0700664 std::unique_ptr<ipc_lib::SignalFd> signalfd;
665
666 if (watchers_.size() > 0) {
667 signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
668
669 epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
670 signalfd_siginfo result = signalfd_ptr->Read();
671 CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
672
673 // TODO(austin): We should really be checking *everything*, not just
674 // watchers, and calling the oldest thing first. That will improve
675 // determinism a lot.
676
Austin Schuh7d87b672019-12-01 20:23:49 -0800677 HandleEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700678 });
679 }
680
Austin Schuh39788ff2019-12-01 18:22:57 -0800681 MaybeScheduleTimingReports();
682
Austin Schuh7d87b672019-12-01 20:23:49 -0800683 ReserveEvents();
684
Austin Schuh39788ff2019-12-01 18:22:57 -0800685 // Now, all the callbacks are setup. Lock everything into memory and go RT.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700686 if (priority_ != 0) {
687 ::aos::InitRT();
688
689 LOG(INFO) << "Setting priority to " << priority_;
690 ::aos::SetCurrentThreadRealtimePriority(priority_);
691 }
692
693 set_is_running(true);
694
695 // Now that we are realtime (but before the OnRun handlers run), snap the
696 // queue index.
Austin Schuh39788ff2019-12-01 18:22:57 -0800697 for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
698 watcher->Startup(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700699 }
700
701 // Now that we are RT, run all the OnRun handlers.
702 for (const auto &run : on_run_) {
703 run();
704 }
705
Alex Perrycb7da4b2019-08-28 19:35:56 -0700706 // And start our main event loop which runs all the timers and handles Quit.
707 epoll_.Run();
708
709 // Once epoll exits, there is no useful nonrt work left to do.
710 set_is_running(false);
711
712 // Nothing time or synchronization critical needs to happen after this point.
713 // Drop RT priority.
714 ::aos::UnsetCurrentThreadRealtimePriority();
715
Austin Schuh39788ff2019-12-01 18:22:57 -0800716 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
717 internal::WatcherState *watcher =
718 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700719 watcher->UnregisterWakeup();
720 }
721
722 if (watchers_.size() > 0) {
723 epoll_.DeleteFd(signalfd->fd());
724 signalfd.reset();
725 }
Austin Schuh32fd5a72019-12-01 22:20:26 -0800726
727 SignalHandler::global()->Unregister(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700728}
729
730void ShmEventLoop::Exit() { epoll_.Quit(); }
731
732ShmEventLoop::~ShmEventLoop() {
Austin Schuh39788ff2019-12-01 18:22:57 -0800733 // Trigger any remaining senders or fetchers to be cleared before destroying
734 // the event loop so the book keeping matches.
735 timing_report_sender_.reset();
736
737 // Force everything with a registered fd with epoll to be destroyed now.
738 timers_.clear();
739 phased_loops_.clear();
740 watchers_.clear();
741
Alex Perrycb7da4b2019-08-28 19:35:56 -0700742 CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
743}
744
745void ShmEventLoop::Take(const Channel *channel) {
746 CHECK(!is_running()) << ": Cannot add new objects while running.";
747
748 // Cheat aggresively. Use the shared memory path as a proxy for a unique
749 // identifier for the channel.
750 const std::string path = ShmPath(channel);
751
752 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
753 CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
754
755 taken_.emplace_back(path);
756}
757
758void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
759 if (is_running()) {
760 LOG(FATAL) << "Cannot set realtime priority while running.";
761 }
762 priority_ = priority;
763}
764
Austin Schuh39788ff2019-12-01 18:22:57 -0800765pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
766
Alex Perrycb7da4b2019-08-28 19:35:56 -0700767} // namespace aos