blob: e696f36aba0dc8c1c75bbacfe710ea10319f7211 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/shm_event_loop.h"
2
3#include <sys/mman.h>
4#include <sys/stat.h>
Austin Schuh39788ff2019-12-01 18:22:57 -08005#include <sys/syscall.h>
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include <sys/types.h>
7#include <unistd.h>
8#include <algorithm>
9#include <atomic>
10#include <chrono>
Austin Schuh39788ff2019-12-01 18:22:57 -080011#include <iterator>
Alex Perrycb7da4b2019-08-28 19:35:56 -070012#include <stdexcept>
13
14#include "aos/events/epoll.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080015#include "aos/events/event_loop_generated.h"
16#include "aos/events/timing_statistics.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/ipc_lib/lockless_queue.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080018#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070019#include "aos/realtime.h"
Austin Schuh32fd5a72019-12-01 22:20:26 -080020#include "aos/stl_mutex/stl_mutex.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070021#include "aos/util/phased_loop.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080022#include "glog/logging.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070023
24DEFINE_string(shm_base, "/dev/shm/aos",
25 "Directory to place queue backing mmaped files in.");
26DEFINE_uint32(permissions, 0770,
27 "Permissions to make shared memory files and folders.");
28
29namespace aos {
30
31std::string ShmFolder(const Channel *channel) {
32 CHECK(channel->has_name());
33 CHECK_EQ(channel->name()->string_view()[0], '/');
34 return FLAGS_shm_base + channel->name()->str() + "/";
35}
36std::string ShmPath(const Channel *channel) {
37 CHECK(channel->has_type());
38 return ShmFolder(channel) + channel->type()->str() + ".v0";
39}
40
41class MMapedQueue {
42 public:
43 MMapedQueue(const Channel *channel) {
44 std::string path = ShmPath(channel);
45
Austin Schuh80c7fce2019-12-05 20:48:43 -080046 config_.num_watchers = channel->num_watchers();
47 config_.num_senders = channel->num_senders();
Alex Perrycb7da4b2019-08-28 19:35:56 -070048 config_.queue_size = 2 * channel->frequency();
49 config_.message_data_size = channel->max_size();
50
51 size_ = ipc_lib::LocklessQueueMemorySize(config_);
52
53 MkdirP(path);
54
55 // There are 2 cases. Either the file already exists, or it does not
56 // already exist and we need to create it. Start by trying to create it. If
57 // that fails, the file has already been created and we can open it
58 // normally.. Once the file has been created it wil never be deleted.
59 fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
60 O_CLOEXEC | FLAGS_permissions);
61 if (fd_ == -1 && errno == EEXIST) {
62 VLOG(1) << path << " already created.";
63 // File already exists.
64 fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
65 PCHECK(fd_ != -1) << ": Failed to open " << path;
66 while (true) {
67 struct stat st;
68 PCHECK(fstat(fd_, &st) == 0);
69 if (st.st_size != 0) {
70 CHECK_EQ(static_cast<size_t>(st.st_size), size_)
71 << ": Size of " << path
72 << " doesn't match expected size of backing queue file. Did the "
73 "queue definition change?";
74 break;
75 } else {
76 // The creating process didn't get around to it yet. Give it a bit.
77 std::this_thread::sleep_for(std::chrono::milliseconds(10));
78 VLOG(1) << path << " is zero size, waiting";
79 }
80 }
81 } else {
82 VLOG(1) << "Created " << path;
83 PCHECK(fd_ != -1) << ": Failed to open " << path;
84 PCHECK(ftruncate(fd_, size_) == 0);
85 }
86
87 data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
88 PCHECK(data_ != MAP_FAILED);
89
90 ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
91 }
92
93 ~MMapedQueue() {
94 PCHECK(munmap(data_, size_) == 0);
95 PCHECK(close(fd_) == 0);
96 }
97
98 ipc_lib::LocklessQueueMemory *memory() const {
99 return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
100 }
101
Austin Schuh39788ff2019-12-01 18:22:57 -0800102 const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700103
104 private:
James Kuszmaul3ae42262019-11-08 12:33:41 -0800105 void MkdirP(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700106 struct stat st;
107 auto last_slash_pos = path.find_last_of("/");
108
James Kuszmaul3ae42262019-11-08 12:33:41 -0800109 std::string folder(last_slash_pos == std::string_view::npos
110 ? std::string_view("")
Alex Perrycb7da4b2019-08-28 19:35:56 -0700111 : path.substr(0, last_slash_pos));
112 if (stat(folder.c_str(), &st) == -1) {
113 PCHECK(errno == ENOENT);
114 CHECK_NE(folder, "") << ": Base path doesn't exist";
115 MkdirP(folder);
116 VLOG(1) << "Creating " << folder;
117 PCHECK(mkdir(folder.c_str(), FLAGS_permissions) == 0);
118 }
119 }
120
121 ipc_lib::LocklessQueueConfiguration config_;
122
123 int fd_;
124
125 size_t size_;
126 void *data_;
127};
128
Austin Schuh217a9782019-12-21 23:02:50 -0800129namespace {
130
Alex Perrycb7da4b2019-08-28 19:35:56 -0700131// Returns the portion of the path after the last /.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800132std::string_view Filename(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700133 auto last_slash_pos = path.find_last_of("/");
134
James Kuszmaul3ae42262019-11-08 12:33:41 -0800135 return last_slash_pos == std::string_view::npos
Alex Perrycb7da4b2019-08-28 19:35:56 -0700136 ? path
137 : path.substr(last_slash_pos + 1, path.size());
138}
139
Austin Schuh217a9782019-12-21 23:02:50 -0800140const Node *MaybeMyNode(const Configuration *configuration) {
141 if (!configuration->has_nodes()) {
142 return nullptr;
143 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700144
Austin Schuh217a9782019-12-21 23:02:50 -0800145 return configuration::GetMyNode(configuration);
146}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700147
148namespace chrono = ::std::chrono;
149
Austin Schuh39788ff2019-12-01 18:22:57 -0800150} // namespace
151
Austin Schuh217a9782019-12-21 23:02:50 -0800152ShmEventLoop::ShmEventLoop(const Configuration *configuration)
153 : EventLoop(configuration),
154 name_(Filename(program_invocation_name)),
155 node_(MaybeMyNode(configuration)) {}
156
Austin Schuh39788ff2019-12-01 18:22:57 -0800157namespace internal {
158
159class SimpleShmFetcher {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700160 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800161 explicit SimpleShmFetcher(const Channel *channel)
162 : lockless_queue_memory_(channel),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700163 lockless_queue_(lockless_queue_memory_.memory(),
164 lockless_queue_memory_.config()),
165 data_storage_(static_cast<AlignedChar *>(aligned_alloc(
166 alignof(AlignedChar), channel->max_size())),
167 &free) {
168 context_.data = nullptr;
169 // Point the queue index at the next index to read starting now. This
170 // makes it such that FetchNext will read the next message sent after
171 // the fetcher is created.
172 PointAtNextQueueIndex();
173 }
174
Austin Schuh39788ff2019-12-01 18:22:57 -0800175 ~SimpleShmFetcher() {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700176
177 // Points the next message to fetch at the queue index which will be
178 // populated next.
179 void PointAtNextQueueIndex() {
180 actual_queue_index_ = lockless_queue_.LatestQueueIndex();
181 if (!actual_queue_index_.valid()) {
182 // Nothing in the queue. The next element will show up at the 0th
183 // index in the queue.
184 actual_queue_index_ =
185 ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
186 } else {
187 actual_queue_index_ = actual_queue_index_.Increment();
188 }
189 }
190
Austin Schuh39788ff2019-12-01 18:22:57 -0800191 bool FetchNext() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700192 // TODO(austin): Get behind and make sure it dies both here and with
193 // Fetch.
194 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
195 actual_queue_index_.index(), &context_.monotonic_sent_time,
196 &context_.realtime_sent_time, &context_.size,
197 reinterpret_cast<char *>(data_storage_.get()));
198 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
199 context_.queue_index = actual_queue_index_.index();
Austin Schuh39788ff2019-12-01 18:22:57 -0800200 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
201 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700202 actual_queue_index_ = actual_queue_index_.Increment();
203 }
204
205 // Make sure the data wasn't modified while we were reading it. This
206 // can only happen if you are reading the last message *while* it is
207 // being written to, which means you are pretty far behind.
208 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
209 << ": Got behind while reading and the last message was modified "
210 "out "
211 "from under us while we were reading it. Don't get so far "
212 "behind.";
213
214 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
215 << ": The next message is no longer available.";
216 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
217 }
218
Austin Schuh39788ff2019-12-01 18:22:57 -0800219 bool Fetch() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700220 const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
221 // actual_queue_index_ is only meaningful if it was set by Fetch or
222 // FetchNext. This happens when valid_data_ has been set. So, only
223 // skip checking if valid_data_ is true.
224 //
225 // Also, if the latest queue index is invalid, we are empty. So there
226 // is nothing to fetch.
Austin Schuh39788ff2019-12-01 18:22:57 -0800227 if ((context_.data != nullptr &&
Alex Perrycb7da4b2019-08-28 19:35:56 -0700228 queue_index == actual_queue_index_.DecrementBy(1u)) ||
229 !queue_index.valid()) {
230 return false;
231 }
232
233 ipc_lib::LocklessQueue::ReadResult read_result =
234 lockless_queue_.Read(queue_index.index(), &context_.monotonic_sent_time,
235 &context_.realtime_sent_time, &context_.size,
236 reinterpret_cast<char *>(data_storage_.get()));
237 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
238 context_.queue_index = queue_index.index();
Austin Schuh39788ff2019-12-01 18:22:57 -0800239 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
240 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700241 actual_queue_index_ = queue_index.Increment();
242 }
243
244 // Make sure the data wasn't modified while we were reading it. This
245 // can only happen if you are reading the last message *while* it is
246 // being written to, which means you are pretty far behind.
247 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
248 << ": Got behind while reading and the last message was modified "
249 "out "
250 "from under us while we were reading it. Don't get so far "
251 "behind.";
252
253 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
254 << ": Queue index went backwards. This should never happen.";
255
256 // We fell behind between when we read the index and read the value.
257 // This isn't worth recovering from since this means we went to sleep
258 // for a long time in the middle of this function.
259 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
260 << ": The next message is no longer available.";
261 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
262 }
263
Austin Schuh39788ff2019-12-01 18:22:57 -0800264 Context context() const { return context_; }
265
Alex Perrycb7da4b2019-08-28 19:35:56 -0700266 bool RegisterWakeup(int priority) {
267 return lockless_queue_.RegisterWakeup(priority);
268 }
269
270 void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
271
272 private:
273 MMapedQueue lockless_queue_memory_;
274 ipc_lib::LocklessQueue lockless_queue_;
275
276 ipc_lib::QueueIndex actual_queue_index_ =
277 ipc_lib::LocklessQueue::empty_queue_index();
278
279 struct AlignedChar {
280 alignas(32) char data;
281 };
282
283 std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800284
285 Context context_;
286};
287
288class ShmFetcher : public RawFetcher {
289 public:
290 explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
291 : RawFetcher(event_loop, channel), simple_shm_fetcher_(channel) {}
292
293 ~ShmFetcher() { context_.data = nullptr; }
294
295 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
296 if (simple_shm_fetcher_.FetchNext()) {
297 context_ = simple_shm_fetcher_.context();
298 return std::make_pair(true, monotonic_clock::now());
299 }
300 return std::make_pair(false, monotonic_clock::min_time);
301 }
302
303 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
304 if (simple_shm_fetcher_.Fetch()) {
305 context_ = simple_shm_fetcher_.context();
306 return std::make_pair(true, monotonic_clock::now());
307 }
308 return std::make_pair(false, monotonic_clock::min_time);
309 }
310
311 private:
312 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700313};
314
315class ShmSender : public RawSender {
316 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800317 explicit ShmSender(EventLoop *event_loop, const Channel *channel)
318 : RawSender(event_loop, channel),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700319 lockless_queue_memory_(channel),
320 lockless_queue_(lockless_queue_memory_.memory(),
321 lockless_queue_memory_.config()),
322 lockless_queue_sender_(lockless_queue_.MakeSender()) {}
323
Austin Schuh39788ff2019-12-01 18:22:57 -0800324 ~ShmSender() override {}
325
Alex Perrycb7da4b2019-08-28 19:35:56 -0700326 void *data() override { return lockless_queue_sender_.Data(); }
327 size_t size() override { return lockless_queue_sender_.size(); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800328 bool DoSend(size_t length) override {
329 lockless_queue_sender_.Send(length);
330 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700331 return true;
332 }
333
Austin Schuh39788ff2019-12-01 18:22:57 -0800334 bool DoSend(const void *msg, size_t length) override {
Austin Schuh4726ce92019-11-29 13:23:18 -0800335 lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length);
Austin Schuh39788ff2019-12-01 18:22:57 -0800336 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700337 // TODO(austin): Return an error if we send too fast.
338 return true;
339 }
340
Alex Perrycb7da4b2019-08-28 19:35:56 -0700341 private:
Alex Perrycb7da4b2019-08-28 19:35:56 -0700342 MMapedQueue lockless_queue_memory_;
343 ipc_lib::LocklessQueue lockless_queue_;
344 ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
345};
346
Alex Perrycb7da4b2019-08-28 19:35:56 -0700347// Class to manage the state for a Watcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800348class WatcherState : public aos::WatcherState {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700349 public:
350 WatcherState(
Austin Schuh7d87b672019-12-01 20:23:49 -0800351 ShmEventLoop *event_loop, const Channel *channel,
Austin Schuh39788ff2019-12-01 18:22:57 -0800352 std::function<void(const Context &context, const void *message)> fn)
353 : aos::WatcherState(event_loop, channel, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800354 event_loop_(event_loop),
355 event_(this),
Austin Schuh39788ff2019-12-01 18:22:57 -0800356 simple_shm_fetcher_(channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700357
Austin Schuh7d87b672019-12-01 20:23:49 -0800358 ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800359
360 void Startup(EventLoop *event_loop) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800361 simple_shm_fetcher_.PointAtNextQueueIndex();
Austin Schuh39788ff2019-12-01 18:22:57 -0800362 CHECK(RegisterWakeup(event_loop->priority()));
363 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700364
Alex Perrycb7da4b2019-08-28 19:35:56 -0700365 // Returns true if there is new data available.
Austin Schuh7d87b672019-12-01 20:23:49 -0800366 bool CheckForNewData() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700367 if (!has_new_data_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800368 has_new_data_ = simple_shm_fetcher_.FetchNext();
Austin Schuh7d87b672019-12-01 20:23:49 -0800369
370 if (has_new_data_) {
371 event_.set_event_time(
372 simple_shm_fetcher_.context().monotonic_sent_time);
373 event_loop_->AddEvent(&event_);
374 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700375 }
376
377 return has_new_data_;
378 }
379
Alex Perrycb7da4b2019-08-28 19:35:56 -0700380 // Consumes the data by calling the callback.
Austin Schuh7d87b672019-12-01 20:23:49 -0800381 void HandleEvent() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700382 CHECK(has_new_data_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800383 DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700384 has_new_data_ = false;
Austin Schuh7d87b672019-12-01 20:23:49 -0800385 CheckForNewData();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700386 }
387
Austin Schuh39788ff2019-12-01 18:22:57 -0800388 // Registers us to receive a signal on event reception.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700389 bool RegisterWakeup(int priority) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800390 return simple_shm_fetcher_.RegisterWakeup(priority);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700391 }
392
Austin Schuh39788ff2019-12-01 18:22:57 -0800393 void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700394
395 private:
396 bool has_new_data_ = false;
397
Austin Schuh7d87b672019-12-01 20:23:49 -0800398 ShmEventLoop *event_loop_;
399 EventHandler<WatcherState> event_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800400 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700401};
402
403// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh7d87b672019-12-01 20:23:49 -0800404class TimerHandlerState final : public TimerHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700405 public:
406 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -0800407 : TimerHandler(shm_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800408 shm_event_loop_(shm_event_loop),
409 event_(this) {
410 shm_event_loop_->epoll_.OnReadable(
411 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700412 }
413
Austin Schuh7d87b672019-12-01 20:23:49 -0800414 ~TimerHandlerState() {
415 Disable();
416 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
417 }
418
419 void HandleEvent() {
420 uint64_t elapsed_cycles = timerfd_.Read();
421 if (elapsed_cycles == 0u) {
422 // We got called before the timer interrupt could happen, but because we
423 // are checking the time, we got called on time. Push the timer out by 1
424 // cycle.
425 elapsed_cycles = 1u;
426 timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
427 }
428
429 Call(monotonic_clock::now, base_);
430
431 base_ += repeat_offset_ * elapsed_cycles;
432
433 if (repeat_offset_ != chrono::seconds(0)) {
434 event_.set_event_time(base_);
435 shm_event_loop_->AddEvent(&event_);
436 }
437 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700438
439 void Setup(monotonic_clock::time_point base,
440 monotonic_clock::duration repeat_offset) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800441 if (event_.valid()) {
442 shm_event_loop_->RemoveEvent(&event_);
443 }
444
Alex Perrycb7da4b2019-08-28 19:35:56 -0700445 timerfd_.SetTime(base, repeat_offset);
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800446 base_ = base;
447 repeat_offset_ = repeat_offset;
Austin Schuh7d87b672019-12-01 20:23:49 -0800448 event_.set_event_time(base_);
449 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700450 }
451
Austin Schuh7d87b672019-12-01 20:23:49 -0800452 void Disable() override {
453 shm_event_loop_->RemoveEvent(&event_);
454 timerfd_.Disable();
455 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700456
457 private:
458 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800459 EventHandler<TimerHandlerState> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700460
461 TimerFd timerfd_;
462
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800463 monotonic_clock::time_point base_;
464 monotonic_clock::duration repeat_offset_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700465};
466
467// Adapter class to the timerfd and PhasedLoop.
Austin Schuh7d87b672019-12-01 20:23:49 -0800468class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700469 public:
470 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
471 const monotonic_clock::duration interval,
472 const monotonic_clock::duration offset)
Austin Schuh39788ff2019-12-01 18:22:57 -0800473 : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
Austin Schuh7d87b672019-12-01 20:23:49 -0800474 shm_event_loop_(shm_event_loop),
475 event_(this) {
476 shm_event_loop_->epoll_.OnReadable(
477 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
478 }
479
480 void HandleEvent() {
481 // The return value for read is the number of cycles that have elapsed.
482 // Because we check to see when this event *should* have happened, there are
483 // cases where Read() will return 0, when 1 cycle has actually happened.
484 // This occurs when the timer interrupt hasn't triggered yet. Therefore,
485 // ignore it. Call handles rescheduling and calculating elapsed cycles
486 // without any extra help.
487 timerfd_.Read();
488 event_.Invalidate();
489
490 Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
491 Schedule(sleep_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700492 });
493 }
494
Austin Schuh39788ff2019-12-01 18:22:57 -0800495 ~PhasedLoopHandler() override {
496 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
Austin Schuh7d87b672019-12-01 20:23:49 -0800497 shm_event_loop_->RemoveEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700498 }
499
500 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800501 // Reschedules the timer.
Austin Schuh39788ff2019-12-01 18:22:57 -0800502 void Schedule(monotonic_clock::time_point sleep_time) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800503 if (event_.valid()) {
504 shm_event_loop_->RemoveEvent(&event_);
505 }
506
Austin Schuh39788ff2019-12-01 18:22:57 -0800507 timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
Austin Schuh7d87b672019-12-01 20:23:49 -0800508 event_.set_event_time(sleep_time);
509 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700510 }
511
512 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800513 EventHandler<PhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700514
515 TimerFd timerfd_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700516};
517} // namespace internal
518
519::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
520 const Channel *channel) {
Austin Schuh217a9782019-12-21 23:02:50 -0800521
522 if (node() != nullptr) {
523 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
524 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
525 << "\", \"type\": \"" << channel->type()->string_view()
526 << "\" } is not able to be fetched on this node. Check your "
527 "configuration.";
528 }
529 }
530
Austin Schuh39788ff2019-12-01 18:22:57 -0800531 return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700532}
533
534::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
535 const Channel *channel) {
536 Take(channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800537
538 return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700539}
540
541void ShmEventLoop::MakeRawWatcher(
542 const Channel *channel,
543 std::function<void(const Context &context, const void *message)> watcher) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700544 Take(channel);
545
Austin Schuh217a9782019-12-21 23:02:50 -0800546 if (node() != nullptr) {
547 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
548 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
549 << "\", \"type\": \"" << channel->type()->string_view()
550 << "\" } is not able to be watched on this node. Check your "
551 "configuration.";
552 }
553 }
554
Austin Schuh39788ff2019-12-01 18:22:57 -0800555 NewWatcher(::std::unique_ptr<WatcherState>(
556 new internal::WatcherState(this, channel, std::move(watcher))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700557}
558
559TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800560 return NewTimer(::std::unique_ptr<TimerHandler>(
561 new internal::TimerHandlerState(this, ::std::move(callback))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700562}
563
564PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
565 ::std::function<void(int)> callback,
566 const monotonic_clock::duration interval,
567 const monotonic_clock::duration offset) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800568 return NewPhasedLoop(
569 ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
570 this, ::std::move(callback), interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700571}
572
573void ShmEventLoop::OnRun(::std::function<void()> on_run) {
574 on_run_.push_back(::std::move(on_run));
575}
576
Austin Schuh7d87b672019-12-01 20:23:49 -0800577void ShmEventLoop::HandleEvent() {
578 // Update all the times for handlers.
579 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
580 internal::WatcherState *watcher =
581 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
582
583 watcher->CheckForNewData();
584 }
585
Austin Schuh39788ff2019-12-01 18:22:57 -0800586 while (true) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800587 if (EventCount() == 0 ||
588 PeekEvent()->event_time() > monotonic_clock::now()) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800589 break;
590 }
591
Austin Schuh7d87b672019-12-01 20:23:49 -0800592 EventLoopEvent *event = PopEvent();
593 event->HandleEvent();
Austin Schuh39788ff2019-12-01 18:22:57 -0800594 }
595}
596
Austin Schuh32fd5a72019-12-01 22:20:26 -0800597// RAII class to mask signals.
598class ScopedSignalMask {
599 public:
600 ScopedSignalMask(std::initializer_list<int> signals) {
601 sigset_t sigset;
602 PCHECK(sigemptyset(&sigset) == 0);
603 for (int signal : signals) {
604 PCHECK(sigaddset(&sigset, signal) == 0);
605 }
606
607 PCHECK(sigprocmask(SIG_BLOCK, &sigset, &old_) == 0);
608 }
609
610 ~ScopedSignalMask() { PCHECK(sigprocmask(SIG_SETMASK, &old_, nullptr) == 0); }
611
612 private:
613 sigset_t old_;
614};
615
616// Class to manage the static state associated with killing multiple event
617// loops.
618class SignalHandler {
619 public:
620 // Gets the singleton.
621 static SignalHandler *global() {
622 static SignalHandler loop;
623 return &loop;
624 }
625
626 // Handles the signal with the singleton.
627 static void HandleSignal(int) { global()->DoHandleSignal(); }
628
629 // Registers an event loop to receive Exit() calls.
630 void Register(ShmEventLoop *event_loop) {
631 // Block signals while we have the mutex so we never race with the signal
632 // handler.
633 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
634 std::unique_lock<stl_mutex> locker(mutex_);
635 if (event_loops_.size() == 0) {
636 // The first caller registers the signal handler.
637 struct sigaction new_action;
638 sigemptyset(&new_action.sa_mask);
639 // This makes it so that 2 control c's to a stuck process will kill it by
640 // restoring the original signal handler.
641 new_action.sa_flags = SA_RESETHAND;
642 new_action.sa_handler = &HandleSignal;
643
644 PCHECK(sigaction(SIGINT, &new_action, &old_action_int_) == 0);
645 PCHECK(sigaction(SIGHUP, &new_action, &old_action_hup_) == 0);
646 PCHECK(sigaction(SIGTERM, &new_action, &old_action_term_) == 0);
647 }
648
649 event_loops_.push_back(event_loop);
650 }
651
652 // Unregisters an event loop to receive Exit() calls.
653 void Unregister(ShmEventLoop *event_loop) {
654 // Block signals while we have the mutex so we never race with the signal
655 // handler.
656 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
657 std::unique_lock<stl_mutex> locker(mutex_);
658
659 event_loops_.erase(std::find(event_loops_.begin(), event_loops_.end(), event_loop));
660
661 if (event_loops_.size() == 0u) {
662 // The last caller restores the original signal handlers.
663 PCHECK(sigaction(SIGINT, &old_action_int_, nullptr) == 0);
664 PCHECK(sigaction(SIGHUP, &old_action_hup_, nullptr) == 0);
665 PCHECK(sigaction(SIGTERM, &old_action_term_, nullptr) == 0);
666 }
667 }
668
669 private:
670 void DoHandleSignal() {
671 // We block signals while grabbing the lock, so there should never be a
672 // race. Confirm that this is true using trylock.
673 CHECK(mutex_.try_lock()) << ": sigprocmask failed to block signals while "
674 "modifing the event loop list.";
675 for (ShmEventLoop *event_loop : event_loops_) {
676 event_loop->Exit();
677 }
678 mutex_.unlock();
679 }
680
681 // Mutex to protect all state.
682 stl_mutex mutex_;
683 std::vector<ShmEventLoop *> event_loops_;
684 struct sigaction old_action_int_;
685 struct sigaction old_action_hup_;
686 struct sigaction old_action_term_;
687};
688
Alex Perrycb7da4b2019-08-28 19:35:56 -0700689void ShmEventLoop::Run() {
Austin Schuh32fd5a72019-12-01 22:20:26 -0800690 SignalHandler::global()->Register(this);
Austin Schuh39788ff2019-12-01 18:22:57 -0800691
Alex Perrycb7da4b2019-08-28 19:35:56 -0700692 std::unique_ptr<ipc_lib::SignalFd> signalfd;
693
694 if (watchers_.size() > 0) {
695 signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
696
697 epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
698 signalfd_siginfo result = signalfd_ptr->Read();
699 CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
700
701 // TODO(austin): We should really be checking *everything*, not just
702 // watchers, and calling the oldest thing first. That will improve
703 // determinism a lot.
704
Austin Schuh7d87b672019-12-01 20:23:49 -0800705 HandleEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700706 });
707 }
708
Austin Schuh39788ff2019-12-01 18:22:57 -0800709 MaybeScheduleTimingReports();
710
Austin Schuh7d87b672019-12-01 20:23:49 -0800711 ReserveEvents();
712
Austin Schuh39788ff2019-12-01 18:22:57 -0800713 // Now, all the callbacks are setup. Lock everything into memory and go RT.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700714 if (priority_ != 0) {
715 ::aos::InitRT();
716
717 LOG(INFO) << "Setting priority to " << priority_;
718 ::aos::SetCurrentThreadRealtimePriority(priority_);
719 }
720
721 set_is_running(true);
722
723 // Now that we are realtime (but before the OnRun handlers run), snap the
724 // queue index.
Austin Schuh39788ff2019-12-01 18:22:57 -0800725 for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
726 watcher->Startup(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700727 }
728
729 // Now that we are RT, run all the OnRun handlers.
730 for (const auto &run : on_run_) {
731 run();
732 }
733
Alex Perrycb7da4b2019-08-28 19:35:56 -0700734 // And start our main event loop which runs all the timers and handles Quit.
735 epoll_.Run();
736
737 // Once epoll exits, there is no useful nonrt work left to do.
738 set_is_running(false);
739
740 // Nothing time or synchronization critical needs to happen after this point.
741 // Drop RT priority.
742 ::aos::UnsetCurrentThreadRealtimePriority();
743
Austin Schuh39788ff2019-12-01 18:22:57 -0800744 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
745 internal::WatcherState *watcher =
746 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700747 watcher->UnregisterWakeup();
748 }
749
750 if (watchers_.size() > 0) {
751 epoll_.DeleteFd(signalfd->fd());
752 signalfd.reset();
753 }
Austin Schuh32fd5a72019-12-01 22:20:26 -0800754
755 SignalHandler::global()->Unregister(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700756}
757
758void ShmEventLoop::Exit() { epoll_.Quit(); }
759
760ShmEventLoop::~ShmEventLoop() {
Austin Schuh39788ff2019-12-01 18:22:57 -0800761 // Trigger any remaining senders or fetchers to be cleared before destroying
762 // the event loop so the book keeping matches.
763 timing_report_sender_.reset();
764
765 // Force everything with a registered fd with epoll to be destroyed now.
766 timers_.clear();
767 phased_loops_.clear();
768 watchers_.clear();
769
Alex Perrycb7da4b2019-08-28 19:35:56 -0700770 CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
771}
772
773void ShmEventLoop::Take(const Channel *channel) {
774 CHECK(!is_running()) << ": Cannot add new objects while running.";
775
776 // Cheat aggresively. Use the shared memory path as a proxy for a unique
777 // identifier for the channel.
778 const std::string path = ShmPath(channel);
779
780 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
781 CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
782
783 taken_.emplace_back(path);
784}
785
786void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
787 if (is_running()) {
788 LOG(FATAL) << "Cannot set realtime priority while running.";
789 }
790 priority_ = priority;
791}
792
Austin Schuh39788ff2019-12-01 18:22:57 -0800793pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
794
Alex Perrycb7da4b2019-08-28 19:35:56 -0700795} // namespace aos