blob: bd6d37c71317369d85bbf01b0bf81262876a9a7b [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/shm_event_loop.h"
2
3#include <sys/mman.h>
4#include <sys/stat.h>
Austin Schuh39788ff2019-12-01 18:22:57 -08005#include <sys/syscall.h>
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include <sys/types.h>
7#include <unistd.h>
8#include <algorithm>
9#include <atomic>
10#include <chrono>
Austin Schuh39788ff2019-12-01 18:22:57 -080011#include <iterator>
Alex Perrycb7da4b2019-08-28 19:35:56 -070012#include <stdexcept>
13
14#include "aos/events/epoll.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080015#include "aos/events/event_loop_generated.h"
16#include "aos/events/timing_statistics.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/ipc_lib/lockless_queue.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080018#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070019#include "aos/realtime.h"
20#include "aos/util/phased_loop.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080021#include "glog/logging.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070022
23DEFINE_string(shm_base, "/dev/shm/aos",
24 "Directory to place queue backing mmaped files in.");
25DEFINE_uint32(permissions, 0770,
26 "Permissions to make shared memory files and folders.");
27
28namespace aos {
29
30std::string ShmFolder(const Channel *channel) {
31 CHECK(channel->has_name());
32 CHECK_EQ(channel->name()->string_view()[0], '/');
33 return FLAGS_shm_base + channel->name()->str() + "/";
34}
35std::string ShmPath(const Channel *channel) {
36 CHECK(channel->has_type());
37 return ShmFolder(channel) + channel->type()->str() + ".v0";
38}
39
40class MMapedQueue {
41 public:
42 MMapedQueue(const Channel *channel) {
43 std::string path = ShmPath(channel);
44
Austin Schuh80c7fce2019-12-05 20:48:43 -080045 config_.num_watchers = channel->num_watchers();
46 config_.num_senders = channel->num_senders();
Alex Perrycb7da4b2019-08-28 19:35:56 -070047 config_.queue_size = 2 * channel->frequency();
48 config_.message_data_size = channel->max_size();
49
50 size_ = ipc_lib::LocklessQueueMemorySize(config_);
51
52 MkdirP(path);
53
54 // There are 2 cases. Either the file already exists, or it does not
55 // already exist and we need to create it. Start by trying to create it. If
56 // that fails, the file has already been created and we can open it
57 // normally.. Once the file has been created it wil never be deleted.
58 fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
59 O_CLOEXEC | FLAGS_permissions);
60 if (fd_ == -1 && errno == EEXIST) {
61 VLOG(1) << path << " already created.";
62 // File already exists.
63 fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
64 PCHECK(fd_ != -1) << ": Failed to open " << path;
65 while (true) {
66 struct stat st;
67 PCHECK(fstat(fd_, &st) == 0);
68 if (st.st_size != 0) {
69 CHECK_EQ(static_cast<size_t>(st.st_size), size_)
70 << ": Size of " << path
71 << " doesn't match expected size of backing queue file. Did the "
72 "queue definition change?";
73 break;
74 } else {
75 // The creating process didn't get around to it yet. Give it a bit.
76 std::this_thread::sleep_for(std::chrono::milliseconds(10));
77 VLOG(1) << path << " is zero size, waiting";
78 }
79 }
80 } else {
81 VLOG(1) << "Created " << path;
82 PCHECK(fd_ != -1) << ": Failed to open " << path;
83 PCHECK(ftruncate(fd_, size_) == 0);
84 }
85
86 data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
87 PCHECK(data_ != MAP_FAILED);
88
89 ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
90 }
91
92 ~MMapedQueue() {
93 PCHECK(munmap(data_, size_) == 0);
94 PCHECK(close(fd_) == 0);
95 }
96
97 ipc_lib::LocklessQueueMemory *memory() const {
98 return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
99 }
100
Austin Schuh39788ff2019-12-01 18:22:57 -0800101 const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700102
103 private:
James Kuszmaul3ae42262019-11-08 12:33:41 -0800104 void MkdirP(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700105 struct stat st;
106 auto last_slash_pos = path.find_last_of("/");
107
James Kuszmaul3ae42262019-11-08 12:33:41 -0800108 std::string folder(last_slash_pos == std::string_view::npos
109 ? std::string_view("")
Alex Perrycb7da4b2019-08-28 19:35:56 -0700110 : path.substr(0, last_slash_pos));
111 if (stat(folder.c_str(), &st) == -1) {
112 PCHECK(errno == ENOENT);
113 CHECK_NE(folder, "") << ": Base path doesn't exist";
114 MkdirP(folder);
115 VLOG(1) << "Creating " << folder;
116 PCHECK(mkdir(folder.c_str(), FLAGS_permissions) == 0);
117 }
118 }
119
120 ipc_lib::LocklessQueueConfiguration config_;
121
122 int fd_;
123
124 size_t size_;
125 void *data_;
126};
127
128// Returns the portion of the path after the last /.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800129std::string_view Filename(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700130 auto last_slash_pos = path.find_last_of("/");
131
James Kuszmaul3ae42262019-11-08 12:33:41 -0800132 return last_slash_pos == std::string_view::npos
Alex Perrycb7da4b2019-08-28 19:35:56 -0700133 ? path
134 : path.substr(last_slash_pos + 1, path.size());
135}
136
137ShmEventLoop::ShmEventLoop(const Configuration *configuration)
138 : EventLoop(configuration), name_(Filename(program_invocation_name)) {}
139
140namespace {
141
142namespace chrono = ::std::chrono;
143
Austin Schuh39788ff2019-12-01 18:22:57 -0800144} // namespace
145
146namespace internal {
147
148class SimpleShmFetcher {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700149 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800150 explicit SimpleShmFetcher(const Channel *channel)
151 : lockless_queue_memory_(channel),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700152 lockless_queue_(lockless_queue_memory_.memory(),
153 lockless_queue_memory_.config()),
154 data_storage_(static_cast<AlignedChar *>(aligned_alloc(
155 alignof(AlignedChar), channel->max_size())),
156 &free) {
157 context_.data = nullptr;
158 // Point the queue index at the next index to read starting now. This
159 // makes it such that FetchNext will read the next message sent after
160 // the fetcher is created.
161 PointAtNextQueueIndex();
162 }
163
Austin Schuh39788ff2019-12-01 18:22:57 -0800164 ~SimpleShmFetcher() {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700165
166 // Points the next message to fetch at the queue index which will be
167 // populated next.
168 void PointAtNextQueueIndex() {
169 actual_queue_index_ = lockless_queue_.LatestQueueIndex();
170 if (!actual_queue_index_.valid()) {
171 // Nothing in the queue. The next element will show up at the 0th
172 // index in the queue.
173 actual_queue_index_ =
174 ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
175 } else {
176 actual_queue_index_ = actual_queue_index_.Increment();
177 }
178 }
179
Austin Schuh39788ff2019-12-01 18:22:57 -0800180 bool FetchNext() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700181 // TODO(austin): Write a test which starts with nothing in the queue,
182 // and then calls FetchNext() after something is sent.
183 // TODO(austin): Get behind and make sure it dies both here and with
184 // Fetch.
185 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
186 actual_queue_index_.index(), &context_.monotonic_sent_time,
187 &context_.realtime_sent_time, &context_.size,
188 reinterpret_cast<char *>(data_storage_.get()));
189 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
190 context_.queue_index = actual_queue_index_.index();
Austin Schuh39788ff2019-12-01 18:22:57 -0800191 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
192 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700193 actual_queue_index_ = actual_queue_index_.Increment();
194 }
195
196 // Make sure the data wasn't modified while we were reading it. This
197 // can only happen if you are reading the last message *while* it is
198 // being written to, which means you are pretty far behind.
199 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
200 << ": Got behind while reading and the last message was modified "
201 "out "
202 "from under us while we were reading it. Don't get so far "
203 "behind.";
204
205 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
206 << ": The next message is no longer available.";
207 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
208 }
209
Austin Schuh39788ff2019-12-01 18:22:57 -0800210 bool Fetch() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700211 const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
212 // actual_queue_index_ is only meaningful if it was set by Fetch or
213 // FetchNext. This happens when valid_data_ has been set. So, only
214 // skip checking if valid_data_ is true.
215 //
216 // Also, if the latest queue index is invalid, we are empty. So there
217 // is nothing to fetch.
Austin Schuh39788ff2019-12-01 18:22:57 -0800218 if ((context_.data != nullptr &&
Alex Perrycb7da4b2019-08-28 19:35:56 -0700219 queue_index == actual_queue_index_.DecrementBy(1u)) ||
220 !queue_index.valid()) {
221 return false;
222 }
223
224 ipc_lib::LocklessQueue::ReadResult read_result =
225 lockless_queue_.Read(queue_index.index(), &context_.monotonic_sent_time,
226 &context_.realtime_sent_time, &context_.size,
227 reinterpret_cast<char *>(data_storage_.get()));
228 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
229 context_.queue_index = queue_index.index();
Austin Schuh39788ff2019-12-01 18:22:57 -0800230 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
231 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700232 actual_queue_index_ = queue_index.Increment();
233 }
234
235 // Make sure the data wasn't modified while we were reading it. This
236 // can only happen if you are reading the last message *while* it is
237 // being written to, which means you are pretty far behind.
238 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
239 << ": Got behind while reading and the last message was modified "
240 "out "
241 "from under us while we were reading it. Don't get so far "
242 "behind.";
243
244 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
245 << ": Queue index went backwards. This should never happen.";
246
247 // We fell behind between when we read the index and read the value.
248 // This isn't worth recovering from since this means we went to sleep
249 // for a long time in the middle of this function.
250 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
251 << ": The next message is no longer available.";
252 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
253 }
254
Austin Schuh39788ff2019-12-01 18:22:57 -0800255 Context context() const { return context_; }
256
Alex Perrycb7da4b2019-08-28 19:35:56 -0700257 bool RegisterWakeup(int priority) {
258 return lockless_queue_.RegisterWakeup(priority);
259 }
260
261 void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
262
263 private:
264 MMapedQueue lockless_queue_memory_;
265 ipc_lib::LocklessQueue lockless_queue_;
266
267 ipc_lib::QueueIndex actual_queue_index_ =
268 ipc_lib::LocklessQueue::empty_queue_index();
269
270 struct AlignedChar {
271 alignas(32) char data;
272 };
273
274 std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800275
276 Context context_;
277};
278
279class ShmFetcher : public RawFetcher {
280 public:
281 explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
282 : RawFetcher(event_loop, channel), simple_shm_fetcher_(channel) {}
283
284 ~ShmFetcher() { context_.data = nullptr; }
285
286 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
287 if (simple_shm_fetcher_.FetchNext()) {
288 context_ = simple_shm_fetcher_.context();
289 return std::make_pair(true, monotonic_clock::now());
290 }
291 return std::make_pair(false, monotonic_clock::min_time);
292 }
293
294 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
295 if (simple_shm_fetcher_.Fetch()) {
296 context_ = simple_shm_fetcher_.context();
297 return std::make_pair(true, monotonic_clock::now());
298 }
299 return std::make_pair(false, monotonic_clock::min_time);
300 }
301
302 private:
303 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700304};
305
306class ShmSender : public RawSender {
307 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800308 explicit ShmSender(EventLoop *event_loop, const Channel *channel)
309 : RawSender(event_loop, channel),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700310 lockless_queue_memory_(channel),
311 lockless_queue_(lockless_queue_memory_.memory(),
312 lockless_queue_memory_.config()),
313 lockless_queue_sender_(lockless_queue_.MakeSender()) {}
314
Austin Schuh39788ff2019-12-01 18:22:57 -0800315 ~ShmSender() override {}
316
Alex Perrycb7da4b2019-08-28 19:35:56 -0700317 void *data() override { return lockless_queue_sender_.Data(); }
318 size_t size() override { return lockless_queue_sender_.size(); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800319 bool DoSend(size_t length) override {
320 lockless_queue_sender_.Send(length);
321 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700322 return true;
323 }
324
Austin Schuh39788ff2019-12-01 18:22:57 -0800325 bool DoSend(const void *msg, size_t length) override {
Austin Schuh4726ce92019-11-29 13:23:18 -0800326 lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length);
Austin Schuh39788ff2019-12-01 18:22:57 -0800327 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700328 // TODO(austin): Return an error if we send too fast.
329 return true;
330 }
331
Alex Perrycb7da4b2019-08-28 19:35:56 -0700332 private:
Alex Perrycb7da4b2019-08-28 19:35:56 -0700333 MMapedQueue lockless_queue_memory_;
334 ipc_lib::LocklessQueue lockless_queue_;
335 ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
336};
337
Alex Perrycb7da4b2019-08-28 19:35:56 -0700338// Class to manage the state for a Watcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800339class WatcherState : public aos::WatcherState {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700340 public:
341 WatcherState(
Austin Schuh39788ff2019-12-01 18:22:57 -0800342 EventLoop *event_loop, const Channel *channel,
343 std::function<void(const Context &context, const void *message)> fn)
344 : aos::WatcherState(event_loop, channel, std::move(fn)),
345 simple_shm_fetcher_(channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700346
Austin Schuh39788ff2019-12-01 18:22:57 -0800347 ~WatcherState() override {}
348
349 void Startup(EventLoop *event_loop) override {
350 PointAtNextQueueIndex();
351 CHECK(RegisterWakeup(event_loop->priority()));
352 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700353
354 // Points the next message to fetch at the queue index which will be populated
355 // next.
Austin Schuh39788ff2019-12-01 18:22:57 -0800356 void PointAtNextQueueIndex() { simple_shm_fetcher_.PointAtNextQueueIndex(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700357
358 // Returns true if there is new data available.
359 bool HasNewData() {
360 if (!has_new_data_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800361 has_new_data_ = simple_shm_fetcher_.FetchNext();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700362 }
363
364 return has_new_data_;
365 }
366
367 // Returns the time of the current data sample.
368 aos::monotonic_clock::time_point event_time() const {
Austin Schuh39788ff2019-12-01 18:22:57 -0800369 return simple_shm_fetcher_.context().monotonic_sent_time;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700370 }
371
372 // Consumes the data by calling the callback.
373 void CallCallback() {
374 CHECK(has_new_data_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800375 DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700376 has_new_data_ = false;
377 }
378
Austin Schuh39788ff2019-12-01 18:22:57 -0800379 // Registers us to receive a signal on event reception.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700380 bool RegisterWakeup(int priority) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800381 return simple_shm_fetcher_.RegisterWakeup(priority);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700382 }
383
Austin Schuh39788ff2019-12-01 18:22:57 -0800384 void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700385
386 private:
387 bool has_new_data_ = false;
388
Austin Schuh39788ff2019-12-01 18:22:57 -0800389 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700390};
391
392// Adapter class to adapt a timerfd to a TimerHandler.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700393class TimerHandlerState : public TimerHandler {
394 public:
395 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -0800396 : TimerHandler(shm_event_loop, std::move(fn)),
397 shm_event_loop_(shm_event_loop) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700398 shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800399 const uint64_t elapsed_cycles = timerfd_.Read();
400
Austin Schuh39788ff2019-12-01 18:22:57 -0800401 Call(monotonic_clock::now, base_);
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800402
403 base_ += repeat_offset_ * elapsed_cycles;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700404 });
405 }
406
407 ~TimerHandlerState() { shm_event_loop_->epoll_.DeleteFd(timerfd_.fd()); }
408
409 void Setup(monotonic_clock::time_point base,
410 monotonic_clock::duration repeat_offset) override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700411 timerfd_.SetTime(base, repeat_offset);
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800412 base_ = base;
413 repeat_offset_ = repeat_offset;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700414 }
415
Austin Schuh39788ff2019-12-01 18:22:57 -0800416 void Disable() override { timerfd_.Disable(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700417
418 private:
419 ShmEventLoop *shm_event_loop_;
420
421 TimerFd timerfd_;
422
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800423 monotonic_clock::time_point base_;
424 monotonic_clock::duration repeat_offset_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700425};
426
427// Adapter class to the timerfd and PhasedLoop.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700428class PhasedLoopHandler : public ::aos::PhasedLoopHandler {
429 public:
430 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
431 const monotonic_clock::duration interval,
432 const monotonic_clock::duration offset)
Austin Schuh39788ff2019-12-01 18:22:57 -0800433 : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
434 shm_event_loop_(shm_event_loop) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700435 shm_event_loop_->epoll_.OnReadable(timerfd_.fd(), [this]() {
436 timerfd_.Read();
Austin Schuh39788ff2019-12-01 18:22:57 -0800437 Call(monotonic_clock::now,
438 [this](monotonic_clock::time_point sleep_time) {
439 Schedule(sleep_time);
440 });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700441 });
442 }
443
Austin Schuh39788ff2019-12-01 18:22:57 -0800444 ~PhasedLoopHandler() override {
445 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700446 }
447
448 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800449 // Reschedules the timer.
Austin Schuh39788ff2019-12-01 18:22:57 -0800450 void Schedule(monotonic_clock::time_point sleep_time) override {
451 timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700452 }
453
454 ShmEventLoop *shm_event_loop_;
455
456 TimerFd timerfd_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700457};
458} // namespace internal
459
460::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
461 const Channel *channel) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800462 return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700463}
464
465::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
466 const Channel *channel) {
467 Take(channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800468
469 return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700470}
471
472void ShmEventLoop::MakeRawWatcher(
473 const Channel *channel,
474 std::function<void(const Context &context, const void *message)> watcher) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700475 Take(channel);
476
Austin Schuh39788ff2019-12-01 18:22:57 -0800477 NewWatcher(::std::unique_ptr<WatcherState>(
478 new internal::WatcherState(this, channel, std::move(watcher))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700479}
480
481TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800482 return NewTimer(::std::unique_ptr<TimerHandler>(
483 new internal::TimerHandlerState(this, ::std::move(callback))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700484}
485
486PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
487 ::std::function<void(int)> callback,
488 const monotonic_clock::duration interval,
489 const monotonic_clock::duration offset) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800490 return NewPhasedLoop(
491 ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
492 this, ::std::move(callback), interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700493}
494
495void ShmEventLoop::OnRun(::std::function<void()> on_run) {
496 on_run_.push_back(::std::move(on_run));
497}
498
Austin Schuh39788ff2019-12-01 18:22:57 -0800499void ShmEventLoop::HandleWatcherSignal() {
500 while (true) {
501 // Call the handlers in time order of their messages.
502 aos::monotonic_clock::time_point min_event_time =
503 aos::monotonic_clock::max_time;
504 size_t min_watcher_index = -1;
505 size_t watcher_index = 0;
506 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
507 internal::WatcherState *watcher =
508 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
509
510 if (watcher->HasNewData()) {
511 if (watcher->event_time() < min_event_time) {
512 min_watcher_index = watcher_index;
513 min_event_time = watcher->event_time();
514 }
515 }
516 ++watcher_index;
517 }
518
519 if (min_event_time == aos::monotonic_clock::max_time) {
520 break;
521 }
522
523 reinterpret_cast<internal::WatcherState *>(
524 watchers_[min_watcher_index].get())
525 ->CallCallback();
526 }
527}
528
Alex Perrycb7da4b2019-08-28 19:35:56 -0700529void ShmEventLoop::Run() {
Austin Schuh39788ff2019-12-01 18:22:57 -0800530 // TODO(austin): Automatically register ^C with a sigaction so 2 in a row send
531 // an actual control C.
532
Alex Perrycb7da4b2019-08-28 19:35:56 -0700533 std::unique_ptr<ipc_lib::SignalFd> signalfd;
534
535 if (watchers_.size() > 0) {
536 signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
537
538 epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
539 signalfd_siginfo result = signalfd_ptr->Read();
540 CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
541
542 // TODO(austin): We should really be checking *everything*, not just
543 // watchers, and calling the oldest thing first. That will improve
544 // determinism a lot.
545
Austin Schuh39788ff2019-12-01 18:22:57 -0800546 HandleWatcherSignal();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700547 });
548 }
549
Austin Schuh39788ff2019-12-01 18:22:57 -0800550 MaybeScheduleTimingReports();
551
552 // Now, all the callbacks are setup. Lock everything into memory and go RT.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700553 if (priority_ != 0) {
554 ::aos::InitRT();
555
556 LOG(INFO) << "Setting priority to " << priority_;
557 ::aos::SetCurrentThreadRealtimePriority(priority_);
558 }
559
560 set_is_running(true);
561
562 // Now that we are realtime (but before the OnRun handlers run), snap the
563 // queue index.
Austin Schuh39788ff2019-12-01 18:22:57 -0800564 for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
565 watcher->Startup(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700566 }
567
568 // Now that we are RT, run all the OnRun handlers.
569 for (const auto &run : on_run_) {
570 run();
571 }
572
Alex Perrycb7da4b2019-08-28 19:35:56 -0700573 // And start our main event loop which runs all the timers and handles Quit.
574 epoll_.Run();
575
576 // Once epoll exits, there is no useful nonrt work left to do.
577 set_is_running(false);
578
579 // Nothing time or synchronization critical needs to happen after this point.
580 // Drop RT priority.
581 ::aos::UnsetCurrentThreadRealtimePriority();
582
Austin Schuh39788ff2019-12-01 18:22:57 -0800583 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
584 internal::WatcherState *watcher =
585 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700586 watcher->UnregisterWakeup();
587 }
588
589 if (watchers_.size() > 0) {
590 epoll_.DeleteFd(signalfd->fd());
591 signalfd.reset();
592 }
593}
594
595void ShmEventLoop::Exit() { epoll_.Quit(); }
596
597ShmEventLoop::~ShmEventLoop() {
Austin Schuh39788ff2019-12-01 18:22:57 -0800598 // Trigger any remaining senders or fetchers to be cleared before destroying
599 // the event loop so the book keeping matches.
600 timing_report_sender_.reset();
601
602 // Force everything with a registered fd with epoll to be destroyed now.
603 timers_.clear();
604 phased_loops_.clear();
605 watchers_.clear();
606
Alex Perrycb7da4b2019-08-28 19:35:56 -0700607 CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
608}
609
610void ShmEventLoop::Take(const Channel *channel) {
611 CHECK(!is_running()) << ": Cannot add new objects while running.";
612
613 // Cheat aggresively. Use the shared memory path as a proxy for a unique
614 // identifier for the channel.
615 const std::string path = ShmPath(channel);
616
617 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
618 CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
619
620 taken_.emplace_back(path);
621}
622
623void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
624 if (is_running()) {
625 LOG(FATAL) << "Cannot set realtime priority while running.";
626 }
627 priority_ = priority;
628}
629
Austin Schuh39788ff2019-12-01 18:22:57 -0800630pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
631
Alex Perrycb7da4b2019-08-28 19:35:56 -0700632} // namespace aos