blob: 382c773fcc8ad3c8ea3f3b862e50c0a3a4996fb1 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/shm_event_loop.h"
2
3#include <sys/mman.h>
4#include <sys/stat.h>
Austin Schuh39788ff2019-12-01 18:22:57 -08005#include <sys/syscall.h>
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include <sys/types.h>
7#include <unistd.h>
8#include <algorithm>
9#include <atomic>
10#include <chrono>
Austin Schuh39788ff2019-12-01 18:22:57 -080011#include <iterator>
Alex Perrycb7da4b2019-08-28 19:35:56 -070012#include <stdexcept>
13
14#include "aos/events/epoll.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080015#include "aos/events/event_loop_generated.h"
16#include "aos/events/timing_statistics.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/ipc_lib/lockless_queue.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080018#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070019#include "aos/realtime.h"
Austin Schuh32fd5a72019-12-01 22:20:26 -080020#include "aos/stl_mutex/stl_mutex.h"
Austin Schuhfccb2d02020-01-26 16:11:19 -080021#include "aos/util/file.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070022#include "aos/util/phased_loop.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080023#include "glog/logging.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070024
Austin Schuhe84c3ed2019-12-14 15:29:48 -080025namespace {
26
27// Returns the portion of the path after the last /. This very much assumes
28// that the application name is null terminated.
29const char *Filename(const char *path) {
30 const std::string_view path_string_view = path;
31 auto last_slash_pos = path_string_view.find_last_of("/");
32
33 return last_slash_pos == std::string_view::npos ? path
34 : path + last_slash_pos + 1;
35}
36
37} // namespace
38
Alex Perrycb7da4b2019-08-28 19:35:56 -070039DEFINE_string(shm_base, "/dev/shm/aos",
40 "Directory to place queue backing mmaped files in.");
41DEFINE_uint32(permissions, 0770,
42 "Permissions to make shared memory files and folders.");
Austin Schuhe84c3ed2019-12-14 15:29:48 -080043DEFINE_string(application_name, Filename(program_invocation_name),
44 "The application name");
Alex Perrycb7da4b2019-08-28 19:35:56 -070045
46namespace aos {
47
Austin Schuhcdab6192019-12-29 17:47:46 -080048void SetShmBase(const std::string_view base) {
49 FLAGS_shm_base = std::string(base) + "/dev/shm/aos";
50}
51
Alex Perrycb7da4b2019-08-28 19:35:56 -070052std::string ShmFolder(const Channel *channel) {
53 CHECK(channel->has_name());
54 CHECK_EQ(channel->name()->string_view()[0], '/');
55 return FLAGS_shm_base + channel->name()->str() + "/";
56}
57std::string ShmPath(const Channel *channel) {
58 CHECK(channel->has_type());
Austin Schuhad154822019-12-27 15:45:13 -080059 return ShmFolder(channel) + channel->type()->str() + ".v1";
Alex Perrycb7da4b2019-08-28 19:35:56 -070060}
61
62class MMapedQueue {
63 public:
Austin Schuhaa79e4e2019-12-29 20:43:32 -080064 MMapedQueue(const Channel *channel,
65 const std::chrono::seconds channel_storage_duration) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070066 std::string path = ShmPath(channel);
67
Austin Schuh80c7fce2019-12-05 20:48:43 -080068 config_.num_watchers = channel->num_watchers();
69 config_.num_senders = channel->num_senders();
Austin Schuhaa79e4e2019-12-29 20:43:32 -080070 config_.queue_size =
71 channel_storage_duration.count() * channel->frequency();
Alex Perrycb7da4b2019-08-28 19:35:56 -070072 config_.message_data_size = channel->max_size();
73
74 size_ = ipc_lib::LocklessQueueMemorySize(config_);
75
Austin Schuhfccb2d02020-01-26 16:11:19 -080076 util::MkdirP(path, FLAGS_permissions);
Alex Perrycb7da4b2019-08-28 19:35:56 -070077
78 // There are 2 cases. Either the file already exists, or it does not
79 // already exist and we need to create it. Start by trying to create it. If
80 // that fails, the file has already been created and we can open it
81 // normally.. Once the file has been created it wil never be deleted.
82 fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
83 O_CLOEXEC | FLAGS_permissions);
84 if (fd_ == -1 && errno == EEXIST) {
85 VLOG(1) << path << " already created.";
86 // File already exists.
87 fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
88 PCHECK(fd_ != -1) << ": Failed to open " << path;
89 while (true) {
90 struct stat st;
91 PCHECK(fstat(fd_, &st) == 0);
92 if (st.st_size != 0) {
93 CHECK_EQ(static_cast<size_t>(st.st_size), size_)
94 << ": Size of " << path
95 << " doesn't match expected size of backing queue file. Did the "
96 "queue definition change?";
97 break;
98 } else {
99 // The creating process didn't get around to it yet. Give it a bit.
100 std::this_thread::sleep_for(std::chrono::milliseconds(10));
101 VLOG(1) << path << " is zero size, waiting";
102 }
103 }
104 } else {
105 VLOG(1) << "Created " << path;
106 PCHECK(fd_ != -1) << ": Failed to open " << path;
107 PCHECK(ftruncate(fd_, size_) == 0);
108 }
109
110 data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
111 PCHECK(data_ != MAP_FAILED);
112
113 ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
114 }
115
116 ~MMapedQueue() {
117 PCHECK(munmap(data_, size_) == 0);
118 PCHECK(close(fd_) == 0);
119 }
120
121 ipc_lib::LocklessQueueMemory *memory() const {
122 return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
123 }
124
Austin Schuh39788ff2019-12-01 18:22:57 -0800125 const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700126
127 private:
Alex Perrycb7da4b2019-08-28 19:35:56 -0700128 ipc_lib::LocklessQueueConfiguration config_;
129
130 int fd_;
131
132 size_t size_;
133 void *data_;
134};
135
Austin Schuh217a9782019-12-21 23:02:50 -0800136namespace {
137
Austin Schuh217a9782019-12-21 23:02:50 -0800138const Node *MaybeMyNode(const Configuration *configuration) {
139 if (!configuration->has_nodes()) {
140 return nullptr;
141 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700142
Austin Schuh217a9782019-12-21 23:02:50 -0800143 return configuration::GetMyNode(configuration);
144}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700145
146namespace chrono = ::std::chrono;
147
Austin Schuh39788ff2019-12-01 18:22:57 -0800148} // namespace
149
Austin Schuh217a9782019-12-21 23:02:50 -0800150ShmEventLoop::ShmEventLoop(const Configuration *configuration)
151 : EventLoop(configuration),
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800152 name_(FLAGS_application_name),
Austin Schuh15649d62019-12-28 16:36:38 -0800153 node_(MaybeMyNode(configuration)) {
154 if (configuration->has_nodes()) {
155 CHECK(node_ != nullptr) << ": Couldn't find node in config.";
156 }
157}
Austin Schuh217a9782019-12-21 23:02:50 -0800158
Austin Schuh39788ff2019-12-01 18:22:57 -0800159namespace internal {
160
161class SimpleShmFetcher {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700162 public:
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800163 explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
Austin Schuhf5652592019-12-29 16:26:15 -0800164 : channel_(channel),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800165 lockless_queue_memory_(
166 channel,
Brian Silverman587da252020-01-01 17:00:47 -0800167 chrono::ceil<chrono::seconds>(chrono::nanoseconds(
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800168 event_loop->configuration()->channel_storage_duration()))),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700169 lockless_queue_(lockless_queue_memory_.memory(),
170 lockless_queue_memory_.config()),
171 data_storage_(static_cast<AlignedChar *>(aligned_alloc(
172 alignof(AlignedChar), channel->max_size())),
173 &free) {
174 context_.data = nullptr;
175 // Point the queue index at the next index to read starting now. This
176 // makes it such that FetchNext will read the next message sent after
177 // the fetcher is created.
178 PointAtNextQueueIndex();
179 }
180
Austin Schuh39788ff2019-12-01 18:22:57 -0800181 ~SimpleShmFetcher() {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700182
183 // Points the next message to fetch at the queue index which will be
184 // populated next.
185 void PointAtNextQueueIndex() {
186 actual_queue_index_ = lockless_queue_.LatestQueueIndex();
187 if (!actual_queue_index_.valid()) {
188 // Nothing in the queue. The next element will show up at the 0th
189 // index in the queue.
190 actual_queue_index_ =
191 ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
192 } else {
193 actual_queue_index_ = actual_queue_index_.Increment();
194 }
195 }
196
Austin Schuh39788ff2019-12-01 18:22:57 -0800197 bool FetchNext() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700198 // TODO(austin): Get behind and make sure it dies both here and with
199 // Fetch.
200 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
Austin Schuhad154822019-12-27 15:45:13 -0800201 actual_queue_index_.index(), &context_.monotonic_event_time,
202 &context_.realtime_event_time, &context_.monotonic_remote_time,
203 &context_.realtime_remote_time, &context_.remote_queue_index,
204 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700205 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
206 context_.queue_index = actual_queue_index_.index();
Austin Schuhad154822019-12-27 15:45:13 -0800207 if (context_.remote_queue_index == 0xffffffffu) {
208 context_.remote_queue_index = context_.queue_index;
209 }
210 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
211 context_.monotonic_remote_time = context_.monotonic_event_time;
212 }
213 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
214 context_.realtime_remote_time = context_.realtime_event_time;
215 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800216 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
217 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700218 actual_queue_index_ = actual_queue_index_.Increment();
219 }
220
221 // Make sure the data wasn't modified while we were reading it. This
222 // can only happen if you are reading the last message *while* it is
223 // being written to, which means you are pretty far behind.
224 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
225 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800226 "out from under us while we were reading it. Don't get so far "
227 "behind. "
228 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700229
230 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800231 << ": The next message is no longer available. "
232 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700233 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
234 }
235
Austin Schuh39788ff2019-12-01 18:22:57 -0800236 bool Fetch() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700237 const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
238 // actual_queue_index_ is only meaningful if it was set by Fetch or
239 // FetchNext. This happens when valid_data_ has been set. So, only
240 // skip checking if valid_data_ is true.
241 //
242 // Also, if the latest queue index is invalid, we are empty. So there
243 // is nothing to fetch.
Austin Schuh39788ff2019-12-01 18:22:57 -0800244 if ((context_.data != nullptr &&
Alex Perrycb7da4b2019-08-28 19:35:56 -0700245 queue_index == actual_queue_index_.DecrementBy(1u)) ||
246 !queue_index.valid()) {
247 return false;
248 }
249
Austin Schuhad154822019-12-27 15:45:13 -0800250 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
251 queue_index.index(), &context_.monotonic_event_time,
252 &context_.realtime_event_time, &context_.monotonic_remote_time,
253 &context_.realtime_remote_time, &context_.remote_queue_index,
254 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700255 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
256 context_.queue_index = queue_index.index();
Austin Schuhad154822019-12-27 15:45:13 -0800257 if (context_.remote_queue_index == 0xffffffffu) {
258 context_.remote_queue_index = context_.queue_index;
259 }
260 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
261 context_.monotonic_remote_time = context_.monotonic_event_time;
262 }
263 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
264 context_.realtime_remote_time = context_.realtime_event_time;
265 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800266 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
267 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700268 actual_queue_index_ = queue_index.Increment();
269 }
270
271 // Make sure the data wasn't modified while we were reading it. This
272 // can only happen if you are reading the last message *while* it is
273 // being written to, which means you are pretty far behind.
274 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
275 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800276 "out from under us while we were reading it. Don't get so far "
277 "behind."
278 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700279
280 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
Austin Schuhf5652592019-12-29 16:26:15 -0800281 << ": Queue index went backwards. This should never happen. "
282 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700283
284 // We fell behind between when we read the index and read the value.
285 // This isn't worth recovering from since this means we went to sleep
286 // for a long time in the middle of this function.
287 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800288 << ": The next message is no longer available. "
289 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700290 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
291 }
292
Austin Schuh39788ff2019-12-01 18:22:57 -0800293 Context context() const { return context_; }
294
Alex Perrycb7da4b2019-08-28 19:35:56 -0700295 bool RegisterWakeup(int priority) {
296 return lockless_queue_.RegisterWakeup(priority);
297 }
298
299 void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
300
301 private:
Austin Schuhf5652592019-12-29 16:26:15 -0800302 const Channel *const channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700303 MMapedQueue lockless_queue_memory_;
304 ipc_lib::LocklessQueue lockless_queue_;
305
306 ipc_lib::QueueIndex actual_queue_index_ =
307 ipc_lib::LocklessQueue::empty_queue_index();
308
309 struct AlignedChar {
Brian Silverman0fc69932020-01-24 21:54:02 -0800310 // Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
311 // cache lines.
312 // V4L2 requires 64 byte alignment for USERPTR.
313 alignas(64) char data;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700314 };
315
316 std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800317
318 Context context_;
319};
320
321class ShmFetcher : public RawFetcher {
322 public:
323 explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800324 : RawFetcher(event_loop, channel),
325 simple_shm_fetcher_(event_loop, channel) {}
Austin Schuh39788ff2019-12-01 18:22:57 -0800326
327 ~ShmFetcher() { context_.data = nullptr; }
328
329 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
330 if (simple_shm_fetcher_.FetchNext()) {
331 context_ = simple_shm_fetcher_.context();
332 return std::make_pair(true, monotonic_clock::now());
333 }
334 return std::make_pair(false, monotonic_clock::min_time);
335 }
336
337 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
338 if (simple_shm_fetcher_.Fetch()) {
339 context_ = simple_shm_fetcher_.context();
340 return std::make_pair(true, monotonic_clock::now());
341 }
342 return std::make_pair(false, monotonic_clock::min_time);
343 }
344
345 private:
346 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700347};
348
349class ShmSender : public RawSender {
350 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800351 explicit ShmSender(EventLoop *event_loop, const Channel *channel)
352 : RawSender(event_loop, channel),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800353 lockless_queue_memory_(
354 channel,
Brian Silverman587da252020-01-01 17:00:47 -0800355 chrono::ceil<chrono::seconds>(chrono::nanoseconds(
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800356 event_loop->configuration()->channel_storage_duration()))),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700357 lockless_queue_(lockless_queue_memory_.memory(),
358 lockless_queue_memory_.config()),
359 lockless_queue_sender_(lockless_queue_.MakeSender()) {}
360
Austin Schuh39788ff2019-12-01 18:22:57 -0800361 ~ShmSender() override {}
362
Alex Perrycb7da4b2019-08-28 19:35:56 -0700363 void *data() override { return lockless_queue_sender_.Data(); }
364 size_t size() override { return lockless_queue_sender_.size(); }
Austin Schuhad154822019-12-27 15:45:13 -0800365 bool DoSend(size_t length,
366 aos::monotonic_clock::time_point monotonic_remote_time,
367 aos::realtime_clock::time_point realtime_remote_time,
368 uint32_t remote_queue_index) override {
369 lockless_queue_sender_.Send(
370 length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
371 &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800372 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700373 return true;
374 }
375
Austin Schuhad154822019-12-27 15:45:13 -0800376 bool DoSend(const void *msg, size_t length,
377 aos::monotonic_clock::time_point monotonic_remote_time,
378 aos::realtime_clock::time_point realtime_remote_time,
379 uint32_t remote_queue_index) override {
380 lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length,
381 monotonic_remote_time, realtime_remote_time,
382 remote_queue_index, &monotonic_sent_time_,
383 &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800384 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700385 // TODO(austin): Return an error if we send too fast.
386 return true;
387 }
388
Alex Perrycb7da4b2019-08-28 19:35:56 -0700389 private:
Alex Perrycb7da4b2019-08-28 19:35:56 -0700390 MMapedQueue lockless_queue_memory_;
391 ipc_lib::LocklessQueue lockless_queue_;
392 ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
393};
394
Alex Perrycb7da4b2019-08-28 19:35:56 -0700395// Class to manage the state for a Watcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800396class WatcherState : public aos::WatcherState {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700397 public:
398 WatcherState(
Austin Schuh7d87b672019-12-01 20:23:49 -0800399 ShmEventLoop *event_loop, const Channel *channel,
Austin Schuh39788ff2019-12-01 18:22:57 -0800400 std::function<void(const Context &context, const void *message)> fn)
401 : aos::WatcherState(event_loop, channel, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800402 event_loop_(event_loop),
403 event_(this),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800404 simple_shm_fetcher_(event_loop, channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700405
Austin Schuh7d87b672019-12-01 20:23:49 -0800406 ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800407
408 void Startup(EventLoop *event_loop) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800409 simple_shm_fetcher_.PointAtNextQueueIndex();
Austin Schuh39788ff2019-12-01 18:22:57 -0800410 CHECK(RegisterWakeup(event_loop->priority()));
411 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700412
Alex Perrycb7da4b2019-08-28 19:35:56 -0700413 // Returns true if there is new data available.
Austin Schuh7d87b672019-12-01 20:23:49 -0800414 bool CheckForNewData() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700415 if (!has_new_data_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800416 has_new_data_ = simple_shm_fetcher_.FetchNext();
Austin Schuh7d87b672019-12-01 20:23:49 -0800417
418 if (has_new_data_) {
419 event_.set_event_time(
Austin Schuhad154822019-12-27 15:45:13 -0800420 simple_shm_fetcher_.context().monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800421 event_loop_->AddEvent(&event_);
422 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700423 }
424
425 return has_new_data_;
426 }
427
Alex Perrycb7da4b2019-08-28 19:35:56 -0700428 // Consumes the data by calling the callback.
Austin Schuh7d87b672019-12-01 20:23:49 -0800429 void HandleEvent() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700430 CHECK(has_new_data_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800431 DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700432 has_new_data_ = false;
Austin Schuh7d87b672019-12-01 20:23:49 -0800433 CheckForNewData();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700434 }
435
Austin Schuh39788ff2019-12-01 18:22:57 -0800436 // Registers us to receive a signal on event reception.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700437 bool RegisterWakeup(int priority) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800438 return simple_shm_fetcher_.RegisterWakeup(priority);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700439 }
440
Austin Schuh39788ff2019-12-01 18:22:57 -0800441 void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700442
443 private:
444 bool has_new_data_ = false;
445
Austin Schuh7d87b672019-12-01 20:23:49 -0800446 ShmEventLoop *event_loop_;
447 EventHandler<WatcherState> event_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800448 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700449};
450
451// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh7d87b672019-12-01 20:23:49 -0800452class TimerHandlerState final : public TimerHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700453 public:
454 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -0800455 : TimerHandler(shm_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800456 shm_event_loop_(shm_event_loop),
457 event_(this) {
458 shm_event_loop_->epoll_.OnReadable(
459 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700460 }
461
Austin Schuh7d87b672019-12-01 20:23:49 -0800462 ~TimerHandlerState() {
463 Disable();
464 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
465 }
466
467 void HandleEvent() {
468 uint64_t elapsed_cycles = timerfd_.Read();
469 if (elapsed_cycles == 0u) {
470 // We got called before the timer interrupt could happen, but because we
471 // are checking the time, we got called on time. Push the timer out by 1
472 // cycle.
473 elapsed_cycles = 1u;
474 timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
475 }
476
477 Call(monotonic_clock::now, base_);
478
479 base_ += repeat_offset_ * elapsed_cycles;
480
481 if (repeat_offset_ != chrono::seconds(0)) {
482 event_.set_event_time(base_);
483 shm_event_loop_->AddEvent(&event_);
484 }
485 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700486
487 void Setup(monotonic_clock::time_point base,
488 monotonic_clock::duration repeat_offset) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800489 if (event_.valid()) {
490 shm_event_loop_->RemoveEvent(&event_);
491 }
492
Alex Perrycb7da4b2019-08-28 19:35:56 -0700493 timerfd_.SetTime(base, repeat_offset);
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800494 base_ = base;
495 repeat_offset_ = repeat_offset;
Austin Schuh7d87b672019-12-01 20:23:49 -0800496 event_.set_event_time(base_);
497 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700498 }
499
Austin Schuh7d87b672019-12-01 20:23:49 -0800500 void Disable() override {
501 shm_event_loop_->RemoveEvent(&event_);
502 timerfd_.Disable();
503 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700504
505 private:
506 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800507 EventHandler<TimerHandlerState> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700508
509 TimerFd timerfd_;
510
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800511 monotonic_clock::time_point base_;
512 monotonic_clock::duration repeat_offset_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700513};
514
515// Adapter class to the timerfd and PhasedLoop.
Austin Schuh7d87b672019-12-01 20:23:49 -0800516class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700517 public:
518 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
519 const monotonic_clock::duration interval,
520 const monotonic_clock::duration offset)
Austin Schuh39788ff2019-12-01 18:22:57 -0800521 : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
Austin Schuh7d87b672019-12-01 20:23:49 -0800522 shm_event_loop_(shm_event_loop),
523 event_(this) {
524 shm_event_loop_->epoll_.OnReadable(
525 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
526 }
527
528 void HandleEvent() {
529 // The return value for read is the number of cycles that have elapsed.
530 // Because we check to see when this event *should* have happened, there are
531 // cases where Read() will return 0, when 1 cycle has actually happened.
532 // This occurs when the timer interrupt hasn't triggered yet. Therefore,
533 // ignore it. Call handles rescheduling and calculating elapsed cycles
534 // without any extra help.
535 timerfd_.Read();
536 event_.Invalidate();
537
538 Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
539 Schedule(sleep_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700540 });
541 }
542
Austin Schuh39788ff2019-12-01 18:22:57 -0800543 ~PhasedLoopHandler() override {
544 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
Austin Schuh7d87b672019-12-01 20:23:49 -0800545 shm_event_loop_->RemoveEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700546 }
547
548 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800549 // Reschedules the timer.
Austin Schuh39788ff2019-12-01 18:22:57 -0800550 void Schedule(monotonic_clock::time_point sleep_time) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800551 if (event_.valid()) {
552 shm_event_loop_->RemoveEvent(&event_);
553 }
554
Austin Schuh39788ff2019-12-01 18:22:57 -0800555 timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
Austin Schuh7d87b672019-12-01 20:23:49 -0800556 event_.set_event_time(sleep_time);
557 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700558 }
559
560 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800561 EventHandler<PhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700562
563 TimerFd timerfd_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700564};
565} // namespace internal
566
567::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
568 const Channel *channel) {
Austin Schuhca4828c2019-12-28 14:21:35 -0800569 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
570 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
571 << "\", \"type\": \"" << channel->type()->string_view()
572 << "\" } is not able to be fetched on this node. Check your "
573 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800574 }
575
Austin Schuh39788ff2019-12-01 18:22:57 -0800576 return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700577}
578
579::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
580 const Channel *channel) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800581 TakeSender(channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800582
583 return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700584}
585
586void ShmEventLoop::MakeRawWatcher(
587 const Channel *channel,
588 std::function<void(const Context &context, const void *message)> watcher) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800589 TakeWatcher(channel);
Austin Schuh217a9782019-12-21 23:02:50 -0800590
Austin Schuh39788ff2019-12-01 18:22:57 -0800591 NewWatcher(::std::unique_ptr<WatcherState>(
592 new internal::WatcherState(this, channel, std::move(watcher))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700593}
594
595TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800596 return NewTimer(::std::unique_ptr<TimerHandler>(
597 new internal::TimerHandlerState(this, ::std::move(callback))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700598}
599
600PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
601 ::std::function<void(int)> callback,
602 const monotonic_clock::duration interval,
603 const monotonic_clock::duration offset) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800604 return NewPhasedLoop(
605 ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
606 this, ::std::move(callback), interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700607}
608
609void ShmEventLoop::OnRun(::std::function<void()> on_run) {
610 on_run_.push_back(::std::move(on_run));
611}
612
Austin Schuh7d87b672019-12-01 20:23:49 -0800613void ShmEventLoop::HandleEvent() {
614 // Update all the times for handlers.
615 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
616 internal::WatcherState *watcher =
617 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
618
619 watcher->CheckForNewData();
620 }
621
Austin Schuh39788ff2019-12-01 18:22:57 -0800622 while (true) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800623 if (EventCount() == 0 ||
624 PeekEvent()->event_time() > monotonic_clock::now()) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800625 break;
626 }
627
Austin Schuh7d87b672019-12-01 20:23:49 -0800628 EventLoopEvent *event = PopEvent();
629 event->HandleEvent();
Austin Schuh39788ff2019-12-01 18:22:57 -0800630 }
631}
632
Austin Schuh32fd5a72019-12-01 22:20:26 -0800633// RAII class to mask signals.
634class ScopedSignalMask {
635 public:
636 ScopedSignalMask(std::initializer_list<int> signals) {
637 sigset_t sigset;
638 PCHECK(sigemptyset(&sigset) == 0);
639 for (int signal : signals) {
640 PCHECK(sigaddset(&sigset, signal) == 0);
641 }
642
643 PCHECK(sigprocmask(SIG_BLOCK, &sigset, &old_) == 0);
644 }
645
646 ~ScopedSignalMask() { PCHECK(sigprocmask(SIG_SETMASK, &old_, nullptr) == 0); }
647
648 private:
649 sigset_t old_;
650};
651
652// Class to manage the static state associated with killing multiple event
653// loops.
654class SignalHandler {
655 public:
656 // Gets the singleton.
657 static SignalHandler *global() {
658 static SignalHandler loop;
659 return &loop;
660 }
661
662 // Handles the signal with the singleton.
663 static void HandleSignal(int) { global()->DoHandleSignal(); }
664
665 // Registers an event loop to receive Exit() calls.
666 void Register(ShmEventLoop *event_loop) {
667 // Block signals while we have the mutex so we never race with the signal
668 // handler.
669 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
670 std::unique_lock<stl_mutex> locker(mutex_);
671 if (event_loops_.size() == 0) {
672 // The first caller registers the signal handler.
673 struct sigaction new_action;
674 sigemptyset(&new_action.sa_mask);
675 // This makes it so that 2 control c's to a stuck process will kill it by
676 // restoring the original signal handler.
677 new_action.sa_flags = SA_RESETHAND;
678 new_action.sa_handler = &HandleSignal;
679
680 PCHECK(sigaction(SIGINT, &new_action, &old_action_int_) == 0);
681 PCHECK(sigaction(SIGHUP, &new_action, &old_action_hup_) == 0);
682 PCHECK(sigaction(SIGTERM, &new_action, &old_action_term_) == 0);
683 }
684
685 event_loops_.push_back(event_loop);
686 }
687
688 // Unregisters an event loop to receive Exit() calls.
689 void Unregister(ShmEventLoop *event_loop) {
690 // Block signals while we have the mutex so we never race with the signal
691 // handler.
692 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
693 std::unique_lock<stl_mutex> locker(mutex_);
694
695 event_loops_.erase(std::find(event_loops_.begin(), event_loops_.end(), event_loop));
696
697 if (event_loops_.size() == 0u) {
698 // The last caller restores the original signal handlers.
699 PCHECK(sigaction(SIGINT, &old_action_int_, nullptr) == 0);
700 PCHECK(sigaction(SIGHUP, &old_action_hup_, nullptr) == 0);
701 PCHECK(sigaction(SIGTERM, &old_action_term_, nullptr) == 0);
702 }
703 }
704
705 private:
706 void DoHandleSignal() {
707 // We block signals while grabbing the lock, so there should never be a
708 // race. Confirm that this is true using trylock.
709 CHECK(mutex_.try_lock()) << ": sigprocmask failed to block signals while "
710 "modifing the event loop list.";
711 for (ShmEventLoop *event_loop : event_loops_) {
712 event_loop->Exit();
713 }
714 mutex_.unlock();
715 }
716
717 // Mutex to protect all state.
718 stl_mutex mutex_;
719 std::vector<ShmEventLoop *> event_loops_;
720 struct sigaction old_action_int_;
721 struct sigaction old_action_hup_;
722 struct sigaction old_action_term_;
723};
724
Alex Perrycb7da4b2019-08-28 19:35:56 -0700725void ShmEventLoop::Run() {
Austin Schuh32fd5a72019-12-01 22:20:26 -0800726 SignalHandler::global()->Register(this);
Austin Schuh39788ff2019-12-01 18:22:57 -0800727
Alex Perrycb7da4b2019-08-28 19:35:56 -0700728 std::unique_ptr<ipc_lib::SignalFd> signalfd;
729
730 if (watchers_.size() > 0) {
731 signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
732
733 epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
734 signalfd_siginfo result = signalfd_ptr->Read();
735 CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
736
737 // TODO(austin): We should really be checking *everything*, not just
738 // watchers, and calling the oldest thing first. That will improve
739 // determinism a lot.
740
Austin Schuh7d87b672019-12-01 20:23:49 -0800741 HandleEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700742 });
743 }
744
Austin Schuh39788ff2019-12-01 18:22:57 -0800745 MaybeScheduleTimingReports();
746
Austin Schuh7d87b672019-12-01 20:23:49 -0800747 ReserveEvents();
748
James Kuszmaul57c2baa2020-01-19 14:52:52 -0800749 aos::SetCurrentThreadName(name_.substr(0, 16));
Austin Schuh39788ff2019-12-01 18:22:57 -0800750 // Now, all the callbacks are setup. Lock everything into memory and go RT.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700751 if (priority_ != 0) {
752 ::aos::InitRT();
753
754 LOG(INFO) << "Setting priority to " << priority_;
755 ::aos::SetCurrentThreadRealtimePriority(priority_);
756 }
757
758 set_is_running(true);
759
760 // Now that we are realtime (but before the OnRun handlers run), snap the
761 // queue index.
Austin Schuh39788ff2019-12-01 18:22:57 -0800762 for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
763 watcher->Startup(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700764 }
765
766 // Now that we are RT, run all the OnRun handlers.
767 for (const auto &run : on_run_) {
768 run();
769 }
770
Alex Perrycb7da4b2019-08-28 19:35:56 -0700771 // And start our main event loop which runs all the timers and handles Quit.
772 epoll_.Run();
773
774 // Once epoll exits, there is no useful nonrt work left to do.
775 set_is_running(false);
776
777 // Nothing time or synchronization critical needs to happen after this point.
778 // Drop RT priority.
779 ::aos::UnsetCurrentThreadRealtimePriority();
780
Austin Schuh39788ff2019-12-01 18:22:57 -0800781 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
782 internal::WatcherState *watcher =
783 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700784 watcher->UnregisterWakeup();
785 }
786
787 if (watchers_.size() > 0) {
788 epoll_.DeleteFd(signalfd->fd());
789 signalfd.reset();
790 }
Austin Schuh32fd5a72019-12-01 22:20:26 -0800791
792 SignalHandler::global()->Unregister(this);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800793
794 // Trigger any remaining senders or fetchers to be cleared before destroying
795 // the event loop so the book keeping matches. Do this in the thread that
796 // created the timing reporter.
797 timing_report_sender_.reset();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700798}
799
800void ShmEventLoop::Exit() { epoll_.Quit(); }
801
802ShmEventLoop::~ShmEventLoop() {
Austin Schuh39788ff2019-12-01 18:22:57 -0800803 // Force everything with a registered fd with epoll to be destroyed now.
804 timers_.clear();
805 phased_loops_.clear();
806 watchers_.clear();
807
Alex Perrycb7da4b2019-08-28 19:35:56 -0700808 CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
809}
810
Alex Perrycb7da4b2019-08-28 19:35:56 -0700811void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
812 if (is_running()) {
813 LOG(FATAL) << "Cannot set realtime priority while running.";
814 }
815 priority_ = priority;
816}
817
James Kuszmaul57c2baa2020-01-19 14:52:52 -0800818void ShmEventLoop::set_name(const std::string_view name) {
819 name_ = std::string(name);
820 UpdateTimingReport();
821}
822
Austin Schuh39788ff2019-12-01 18:22:57 -0800823pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
824
Alex Perrycb7da4b2019-08-28 19:35:56 -0700825} // namespace aos