blob: c12579246e6f6ebcf062a21f021bcf58e45a74b8 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/shm_event_loop.h"
2
3#include <sys/mman.h>
4#include <sys/stat.h>
Austin Schuh39788ff2019-12-01 18:22:57 -08005#include <sys/syscall.h>
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include <sys/types.h>
7#include <unistd.h>
8#include <algorithm>
9#include <atomic>
10#include <chrono>
Austin Schuh39788ff2019-12-01 18:22:57 -080011#include <iterator>
Alex Perrycb7da4b2019-08-28 19:35:56 -070012#include <stdexcept>
13
14#include "aos/events/epoll.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080015#include "aos/events/event_loop_generated.h"
16#include "aos/events/timing_statistics.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/ipc_lib/lockless_queue.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080018#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070019#include "aos/realtime.h"
Austin Schuh32fd5a72019-12-01 22:20:26 -080020#include "aos/stl_mutex/stl_mutex.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070021#include "aos/util/phased_loop.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080022#include "glog/logging.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070023
Austin Schuhe84c3ed2019-12-14 15:29:48 -080024namespace {
25
26// Returns the portion of the path after the last /. This very much assumes
27// that the application name is null terminated.
28const char *Filename(const char *path) {
29 const std::string_view path_string_view = path;
30 auto last_slash_pos = path_string_view.find_last_of("/");
31
32 return last_slash_pos == std::string_view::npos ? path
33 : path + last_slash_pos + 1;
34}
35
36} // namespace
37
Alex Perrycb7da4b2019-08-28 19:35:56 -070038DEFINE_string(shm_base, "/dev/shm/aos",
39 "Directory to place queue backing mmaped files in.");
40DEFINE_uint32(permissions, 0770,
41 "Permissions to make shared memory files and folders.");
Austin Schuhe84c3ed2019-12-14 15:29:48 -080042DEFINE_string(application_name, Filename(program_invocation_name),
43 "The application name");
Alex Perrycb7da4b2019-08-28 19:35:56 -070044
45namespace aos {
46
Austin Schuhcdab6192019-12-29 17:47:46 -080047void SetShmBase(const std::string_view base) {
48 FLAGS_shm_base = std::string(base) + "/dev/shm/aos";
49}
50
Alex Perrycb7da4b2019-08-28 19:35:56 -070051std::string ShmFolder(const Channel *channel) {
52 CHECK(channel->has_name());
53 CHECK_EQ(channel->name()->string_view()[0], '/');
54 return FLAGS_shm_base + channel->name()->str() + "/";
55}
56std::string ShmPath(const Channel *channel) {
57 CHECK(channel->has_type());
Austin Schuhad154822019-12-27 15:45:13 -080058 return ShmFolder(channel) + channel->type()->str() + ".v1";
Alex Perrycb7da4b2019-08-28 19:35:56 -070059}
60
61class MMapedQueue {
62 public:
Austin Schuhaa79e4e2019-12-29 20:43:32 -080063 MMapedQueue(const Channel *channel,
64 const std::chrono::seconds channel_storage_duration) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070065 std::string path = ShmPath(channel);
66
Austin Schuh80c7fce2019-12-05 20:48:43 -080067 config_.num_watchers = channel->num_watchers();
68 config_.num_senders = channel->num_senders();
Austin Schuhaa79e4e2019-12-29 20:43:32 -080069 config_.queue_size =
70 channel_storage_duration.count() * channel->frequency();
Alex Perrycb7da4b2019-08-28 19:35:56 -070071 config_.message_data_size = channel->max_size();
72
73 size_ = ipc_lib::LocklessQueueMemorySize(config_);
74
75 MkdirP(path);
76
77 // There are 2 cases. Either the file already exists, or it does not
78 // already exist and we need to create it. Start by trying to create it. If
79 // that fails, the file has already been created and we can open it
80 // normally.. Once the file has been created it wil never be deleted.
81 fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
82 O_CLOEXEC | FLAGS_permissions);
83 if (fd_ == -1 && errno == EEXIST) {
84 VLOG(1) << path << " already created.";
85 // File already exists.
86 fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
87 PCHECK(fd_ != -1) << ": Failed to open " << path;
88 while (true) {
89 struct stat st;
90 PCHECK(fstat(fd_, &st) == 0);
91 if (st.st_size != 0) {
92 CHECK_EQ(static_cast<size_t>(st.st_size), size_)
93 << ": Size of " << path
94 << " doesn't match expected size of backing queue file. Did the "
95 "queue definition change?";
96 break;
97 } else {
98 // The creating process didn't get around to it yet. Give it a bit.
99 std::this_thread::sleep_for(std::chrono::milliseconds(10));
100 VLOG(1) << path << " is zero size, waiting";
101 }
102 }
103 } else {
104 VLOG(1) << "Created " << path;
105 PCHECK(fd_ != -1) << ": Failed to open " << path;
106 PCHECK(ftruncate(fd_, size_) == 0);
107 }
108
109 data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
110 PCHECK(data_ != MAP_FAILED);
111
112 ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
113 }
114
115 ~MMapedQueue() {
116 PCHECK(munmap(data_, size_) == 0);
117 PCHECK(close(fd_) == 0);
118 }
119
120 ipc_lib::LocklessQueueMemory *memory() const {
121 return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
122 }
123
Austin Schuh39788ff2019-12-01 18:22:57 -0800124 const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700125
Brian Silverman5120afb2020-01-31 17:44:35 -0800126 absl::Span<char> GetSharedMemory() const {
127 return absl::Span<char>(static_cast<char *>(data_), size_);
128 }
129
Alex Perrycb7da4b2019-08-28 19:35:56 -0700130 private:
James Kuszmaul3ae42262019-11-08 12:33:41 -0800131 void MkdirP(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700132 auto last_slash_pos = path.find_last_of("/");
133
James Kuszmaul3ae42262019-11-08 12:33:41 -0800134 std::string folder(last_slash_pos == std::string_view::npos
135 ? std::string_view("")
Alex Perrycb7da4b2019-08-28 19:35:56 -0700136 : path.substr(0, last_slash_pos));
Austin Schuh8ec76182019-12-23 16:28:00 -0800137 if (folder.empty()) return;
138 MkdirP(folder);
139 VLOG(1) << "Creating " << folder;
140 const int result = mkdir(folder.c_str(), FLAGS_permissions);
141 if (result == -1 && errno == EEXIST) {
142 VLOG(1) << "Already exists";
143 return;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700144 }
Austin Schuh8ec76182019-12-23 16:28:00 -0800145 PCHECK(result == 0) << ": Error creating " << folder;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700146 }
147
148 ipc_lib::LocklessQueueConfiguration config_;
149
150 int fd_;
151
152 size_t size_;
153 void *data_;
154};
155
Austin Schuh217a9782019-12-21 23:02:50 -0800156namespace {
157
Austin Schuh217a9782019-12-21 23:02:50 -0800158const Node *MaybeMyNode(const Configuration *configuration) {
159 if (!configuration->has_nodes()) {
160 return nullptr;
161 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700162
Austin Schuh217a9782019-12-21 23:02:50 -0800163 return configuration::GetMyNode(configuration);
164}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700165
166namespace chrono = ::std::chrono;
167
Austin Schuh39788ff2019-12-01 18:22:57 -0800168} // namespace
169
Austin Schuh217a9782019-12-21 23:02:50 -0800170ShmEventLoop::ShmEventLoop(const Configuration *configuration)
171 : EventLoop(configuration),
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800172 name_(FLAGS_application_name),
Austin Schuh15649d62019-12-28 16:36:38 -0800173 node_(MaybeMyNode(configuration)) {
174 if (configuration->has_nodes()) {
175 CHECK(node_ != nullptr) << ": Couldn't find node in config.";
176 }
177}
Austin Schuh217a9782019-12-21 23:02:50 -0800178
Austin Schuh39788ff2019-12-01 18:22:57 -0800179namespace internal {
180
181class SimpleShmFetcher {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700182 public:
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800183 explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
Austin Schuhf5652592019-12-29 16:26:15 -0800184 : channel_(channel),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800185 lockless_queue_memory_(
186 channel,
Brian Silverman587da252020-01-01 17:00:47 -0800187 chrono::ceil<chrono::seconds>(chrono::nanoseconds(
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800188 event_loop->configuration()->channel_storage_duration()))),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700189 lockless_queue_(lockless_queue_memory_.memory(),
190 lockless_queue_memory_.config()),
191 data_storage_(static_cast<AlignedChar *>(aligned_alloc(
192 alignof(AlignedChar), channel->max_size())),
193 &free) {
194 context_.data = nullptr;
195 // Point the queue index at the next index to read starting now. This
196 // makes it such that FetchNext will read the next message sent after
197 // the fetcher is created.
198 PointAtNextQueueIndex();
199 }
200
Austin Schuh39788ff2019-12-01 18:22:57 -0800201 ~SimpleShmFetcher() {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700202
203 // Points the next message to fetch at the queue index which will be
204 // populated next.
205 void PointAtNextQueueIndex() {
206 actual_queue_index_ = lockless_queue_.LatestQueueIndex();
207 if (!actual_queue_index_.valid()) {
208 // Nothing in the queue. The next element will show up at the 0th
209 // index in the queue.
210 actual_queue_index_ =
211 ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
212 } else {
213 actual_queue_index_ = actual_queue_index_.Increment();
214 }
215 }
216
Austin Schuh39788ff2019-12-01 18:22:57 -0800217 bool FetchNext() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700218 // TODO(austin): Get behind and make sure it dies both here and with
219 // Fetch.
220 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
Austin Schuhad154822019-12-27 15:45:13 -0800221 actual_queue_index_.index(), &context_.monotonic_event_time,
222 &context_.realtime_event_time, &context_.monotonic_remote_time,
223 &context_.realtime_remote_time, &context_.remote_queue_index,
224 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700225 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
226 context_.queue_index = actual_queue_index_.index();
Austin Schuhad154822019-12-27 15:45:13 -0800227 if (context_.remote_queue_index == 0xffffffffu) {
228 context_.remote_queue_index = context_.queue_index;
229 }
230 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
231 context_.monotonic_remote_time = context_.monotonic_event_time;
232 }
233 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
234 context_.realtime_remote_time = context_.realtime_event_time;
235 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800236 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
237 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700238 actual_queue_index_ = actual_queue_index_.Increment();
239 }
240
241 // Make sure the data wasn't modified while we were reading it. This
242 // can only happen if you are reading the last message *while* it is
243 // being written to, which means you are pretty far behind.
244 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
245 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800246 "out from under us while we were reading it. Don't get so far "
247 "behind. "
248 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700249
250 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800251 << ": The next message is no longer available. "
252 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700253 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
254 }
255
Austin Schuh39788ff2019-12-01 18:22:57 -0800256 bool Fetch() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700257 const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
258 // actual_queue_index_ is only meaningful if it was set by Fetch or
259 // FetchNext. This happens when valid_data_ has been set. So, only
260 // skip checking if valid_data_ is true.
261 //
262 // Also, if the latest queue index is invalid, we are empty. So there
263 // is nothing to fetch.
Austin Schuh39788ff2019-12-01 18:22:57 -0800264 if ((context_.data != nullptr &&
Alex Perrycb7da4b2019-08-28 19:35:56 -0700265 queue_index == actual_queue_index_.DecrementBy(1u)) ||
266 !queue_index.valid()) {
267 return false;
268 }
269
Austin Schuhad154822019-12-27 15:45:13 -0800270 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
271 queue_index.index(), &context_.monotonic_event_time,
272 &context_.realtime_event_time, &context_.monotonic_remote_time,
273 &context_.realtime_remote_time, &context_.remote_queue_index,
274 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700275 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
276 context_.queue_index = queue_index.index();
Austin Schuhad154822019-12-27 15:45:13 -0800277 if (context_.remote_queue_index == 0xffffffffu) {
278 context_.remote_queue_index = context_.queue_index;
279 }
280 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
281 context_.monotonic_remote_time = context_.monotonic_event_time;
282 }
283 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
284 context_.realtime_remote_time = context_.realtime_event_time;
285 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800286 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
287 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700288 actual_queue_index_ = queue_index.Increment();
289 }
290
291 // Make sure the data wasn't modified while we were reading it. This
292 // can only happen if you are reading the last message *while* it is
293 // being written to, which means you are pretty far behind.
294 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
295 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800296 "out from under us while we were reading it. Don't get so far "
297 "behind."
298 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700299
300 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
Austin Schuhf5652592019-12-29 16:26:15 -0800301 << ": Queue index went backwards. This should never happen. "
302 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700303
304 // We fell behind between when we read the index and read the value.
305 // This isn't worth recovering from since this means we went to sleep
306 // for a long time in the middle of this function.
307 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800308 << ": The next message is no longer available. "
309 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700310 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
311 }
312
Austin Schuh39788ff2019-12-01 18:22:57 -0800313 Context context() const { return context_; }
314
Alex Perrycb7da4b2019-08-28 19:35:56 -0700315 bool RegisterWakeup(int priority) {
316 return lockless_queue_.RegisterWakeup(priority);
317 }
318
319 void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
320
Brian Silverman5120afb2020-01-31 17:44:35 -0800321 absl::Span<char> GetSharedMemory() const {
322 return lockless_queue_memory_.GetSharedMemory();
323 }
324
Alex Perrycb7da4b2019-08-28 19:35:56 -0700325 private:
Austin Schuhf5652592019-12-29 16:26:15 -0800326 const Channel *const channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700327 MMapedQueue lockless_queue_memory_;
328 ipc_lib::LocklessQueue lockless_queue_;
329
330 ipc_lib::QueueIndex actual_queue_index_ =
331 ipc_lib::LocklessQueue::empty_queue_index();
332
333 struct AlignedChar {
Brian Silverman0fc69932020-01-24 21:54:02 -0800334 // Cortex-A72 (Raspberry Pi 4) and Cortex-A53 (Xavier AGX) both have 64 byte
335 // cache lines.
336 // V4L2 requires 64 byte alignment for USERPTR.
337 alignas(64) char data;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700338 };
339
340 std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800341
342 Context context_;
343};
344
345class ShmFetcher : public RawFetcher {
346 public:
347 explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800348 : RawFetcher(event_loop, channel),
349 simple_shm_fetcher_(event_loop, channel) {}
Austin Schuh39788ff2019-12-01 18:22:57 -0800350
351 ~ShmFetcher() { context_.data = nullptr; }
352
353 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
354 if (simple_shm_fetcher_.FetchNext()) {
355 context_ = simple_shm_fetcher_.context();
356 return std::make_pair(true, monotonic_clock::now());
357 }
358 return std::make_pair(false, monotonic_clock::min_time);
359 }
360
361 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
362 if (simple_shm_fetcher_.Fetch()) {
363 context_ = simple_shm_fetcher_.context();
364 return std::make_pair(true, monotonic_clock::now());
365 }
366 return std::make_pair(false, monotonic_clock::min_time);
367 }
368
369 private:
370 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700371};
372
373class ShmSender : public RawSender {
374 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800375 explicit ShmSender(EventLoop *event_loop, const Channel *channel)
376 : RawSender(event_loop, channel),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800377 lockless_queue_memory_(
378 channel,
Brian Silverman587da252020-01-01 17:00:47 -0800379 chrono::ceil<chrono::seconds>(chrono::nanoseconds(
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800380 event_loop->configuration()->channel_storage_duration()))),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700381 lockless_queue_(lockless_queue_memory_.memory(),
382 lockless_queue_memory_.config()),
383 lockless_queue_sender_(lockless_queue_.MakeSender()) {}
384
Austin Schuh39788ff2019-12-01 18:22:57 -0800385 ~ShmSender() override {}
386
Alex Perrycb7da4b2019-08-28 19:35:56 -0700387 void *data() override { return lockless_queue_sender_.Data(); }
388 size_t size() override { return lockless_queue_sender_.size(); }
Austin Schuhad154822019-12-27 15:45:13 -0800389 bool DoSend(size_t length,
390 aos::monotonic_clock::time_point monotonic_remote_time,
391 aos::realtime_clock::time_point realtime_remote_time,
392 uint32_t remote_queue_index) override {
393 lockless_queue_sender_.Send(
394 length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
395 &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800396 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700397 return true;
398 }
399
Austin Schuhad154822019-12-27 15:45:13 -0800400 bool DoSend(const void *msg, size_t length,
401 aos::monotonic_clock::time_point monotonic_remote_time,
402 aos::realtime_clock::time_point realtime_remote_time,
403 uint32_t remote_queue_index) override {
404 lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length,
405 monotonic_remote_time, realtime_remote_time,
406 remote_queue_index, &monotonic_sent_time_,
407 &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800408 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700409 // TODO(austin): Return an error if we send too fast.
410 return true;
411 }
412
Brian Silverman5120afb2020-01-31 17:44:35 -0800413 absl::Span<char> GetSharedMemory() const {
414 return lockless_queue_memory_.GetSharedMemory();
415 }
416
Alex Perrycb7da4b2019-08-28 19:35:56 -0700417 private:
Alex Perrycb7da4b2019-08-28 19:35:56 -0700418 MMapedQueue lockless_queue_memory_;
419 ipc_lib::LocklessQueue lockless_queue_;
420 ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
421};
422
Alex Perrycb7da4b2019-08-28 19:35:56 -0700423// Class to manage the state for a Watcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800424class WatcherState : public aos::WatcherState {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700425 public:
426 WatcherState(
Austin Schuh7d87b672019-12-01 20:23:49 -0800427 ShmEventLoop *event_loop, const Channel *channel,
Austin Schuh39788ff2019-12-01 18:22:57 -0800428 std::function<void(const Context &context, const void *message)> fn)
429 : aos::WatcherState(event_loop, channel, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800430 event_loop_(event_loop),
431 event_(this),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800432 simple_shm_fetcher_(event_loop, channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700433
Austin Schuh7d87b672019-12-01 20:23:49 -0800434 ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800435
436 void Startup(EventLoop *event_loop) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800437 simple_shm_fetcher_.PointAtNextQueueIndex();
Austin Schuh39788ff2019-12-01 18:22:57 -0800438 CHECK(RegisterWakeup(event_loop->priority()));
439 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700440
Alex Perrycb7da4b2019-08-28 19:35:56 -0700441 // Returns true if there is new data available.
Austin Schuh7d87b672019-12-01 20:23:49 -0800442 bool CheckForNewData() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700443 if (!has_new_data_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800444 has_new_data_ = simple_shm_fetcher_.FetchNext();
Austin Schuh7d87b672019-12-01 20:23:49 -0800445
446 if (has_new_data_) {
447 event_.set_event_time(
Austin Schuhad154822019-12-27 15:45:13 -0800448 simple_shm_fetcher_.context().monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800449 event_loop_->AddEvent(&event_);
450 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700451 }
452
453 return has_new_data_;
454 }
455
Alex Perrycb7da4b2019-08-28 19:35:56 -0700456 // Consumes the data by calling the callback.
Austin Schuh7d87b672019-12-01 20:23:49 -0800457 void HandleEvent() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700458 CHECK(has_new_data_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800459 DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700460 has_new_data_ = false;
Austin Schuh7d87b672019-12-01 20:23:49 -0800461 CheckForNewData();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700462 }
463
Austin Schuh39788ff2019-12-01 18:22:57 -0800464 // Registers us to receive a signal on event reception.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700465 bool RegisterWakeup(int priority) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800466 return simple_shm_fetcher_.RegisterWakeup(priority);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700467 }
468
Austin Schuh39788ff2019-12-01 18:22:57 -0800469 void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700470
Brian Silverman5120afb2020-01-31 17:44:35 -0800471 absl::Span<char> GetSharedMemory() const {
472 return simple_shm_fetcher_.GetSharedMemory();
473 }
474
Alex Perrycb7da4b2019-08-28 19:35:56 -0700475 private:
476 bool has_new_data_ = false;
477
Austin Schuh7d87b672019-12-01 20:23:49 -0800478 ShmEventLoop *event_loop_;
479 EventHandler<WatcherState> event_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800480 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700481};
482
483// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh7d87b672019-12-01 20:23:49 -0800484class TimerHandlerState final : public TimerHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700485 public:
486 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -0800487 : TimerHandler(shm_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800488 shm_event_loop_(shm_event_loop),
489 event_(this) {
490 shm_event_loop_->epoll_.OnReadable(
491 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700492 }
493
Austin Schuh7d87b672019-12-01 20:23:49 -0800494 ~TimerHandlerState() {
495 Disable();
496 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
497 }
498
499 void HandleEvent() {
500 uint64_t elapsed_cycles = timerfd_.Read();
501 if (elapsed_cycles == 0u) {
502 // We got called before the timer interrupt could happen, but because we
503 // are checking the time, we got called on time. Push the timer out by 1
504 // cycle.
505 elapsed_cycles = 1u;
506 timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
507 }
508
509 Call(monotonic_clock::now, base_);
510
511 base_ += repeat_offset_ * elapsed_cycles;
512
513 if (repeat_offset_ != chrono::seconds(0)) {
514 event_.set_event_time(base_);
515 shm_event_loop_->AddEvent(&event_);
516 }
517 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700518
519 void Setup(monotonic_clock::time_point base,
520 monotonic_clock::duration repeat_offset) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800521 if (event_.valid()) {
522 shm_event_loop_->RemoveEvent(&event_);
523 }
524
Alex Perrycb7da4b2019-08-28 19:35:56 -0700525 timerfd_.SetTime(base, repeat_offset);
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800526 base_ = base;
527 repeat_offset_ = repeat_offset;
Austin Schuh7d87b672019-12-01 20:23:49 -0800528 event_.set_event_time(base_);
529 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700530 }
531
Austin Schuh7d87b672019-12-01 20:23:49 -0800532 void Disable() override {
533 shm_event_loop_->RemoveEvent(&event_);
534 timerfd_.Disable();
535 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700536
537 private:
538 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800539 EventHandler<TimerHandlerState> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700540
541 TimerFd timerfd_;
542
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800543 monotonic_clock::time_point base_;
544 monotonic_clock::duration repeat_offset_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700545};
546
547// Adapter class to the timerfd and PhasedLoop.
Austin Schuh7d87b672019-12-01 20:23:49 -0800548class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700549 public:
550 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
551 const monotonic_clock::duration interval,
552 const monotonic_clock::duration offset)
Austin Schuh39788ff2019-12-01 18:22:57 -0800553 : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
Austin Schuh7d87b672019-12-01 20:23:49 -0800554 shm_event_loop_(shm_event_loop),
555 event_(this) {
556 shm_event_loop_->epoll_.OnReadable(
557 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
558 }
559
560 void HandleEvent() {
561 // The return value for read is the number of cycles that have elapsed.
562 // Because we check to see when this event *should* have happened, there are
563 // cases where Read() will return 0, when 1 cycle has actually happened.
564 // This occurs when the timer interrupt hasn't triggered yet. Therefore,
565 // ignore it. Call handles rescheduling and calculating elapsed cycles
566 // without any extra help.
567 timerfd_.Read();
568 event_.Invalidate();
569
570 Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
571 Schedule(sleep_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700572 });
573 }
574
Austin Schuh39788ff2019-12-01 18:22:57 -0800575 ~PhasedLoopHandler() override {
576 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
Austin Schuh7d87b672019-12-01 20:23:49 -0800577 shm_event_loop_->RemoveEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700578 }
579
580 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800581 // Reschedules the timer.
Austin Schuh39788ff2019-12-01 18:22:57 -0800582 void Schedule(monotonic_clock::time_point sleep_time) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800583 if (event_.valid()) {
584 shm_event_loop_->RemoveEvent(&event_);
585 }
586
Austin Schuh39788ff2019-12-01 18:22:57 -0800587 timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
Austin Schuh7d87b672019-12-01 20:23:49 -0800588 event_.set_event_time(sleep_time);
589 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700590 }
591
592 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800593 EventHandler<PhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700594
595 TimerFd timerfd_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700596};
597} // namespace internal
598
599::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
600 const Channel *channel) {
Austin Schuhca4828c2019-12-28 14:21:35 -0800601 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
602 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
603 << "\", \"type\": \"" << channel->type()->string_view()
604 << "\" } is not able to be fetched on this node. Check your "
605 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800606 }
607
Austin Schuh39788ff2019-12-01 18:22:57 -0800608 return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700609}
610
611::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
612 const Channel *channel) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800613 TakeSender(channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800614
615 return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700616}
617
618void ShmEventLoop::MakeRawWatcher(
619 const Channel *channel,
620 std::function<void(const Context &context, const void *message)> watcher) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800621 TakeWatcher(channel);
Austin Schuh217a9782019-12-21 23:02:50 -0800622
Austin Schuh39788ff2019-12-01 18:22:57 -0800623 NewWatcher(::std::unique_ptr<WatcherState>(
624 new internal::WatcherState(this, channel, std::move(watcher))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700625}
626
627TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800628 return NewTimer(::std::unique_ptr<TimerHandler>(
629 new internal::TimerHandlerState(this, ::std::move(callback))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700630}
631
632PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
633 ::std::function<void(int)> callback,
634 const monotonic_clock::duration interval,
635 const monotonic_clock::duration offset) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800636 return NewPhasedLoop(
637 ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
638 this, ::std::move(callback), interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700639}
640
641void ShmEventLoop::OnRun(::std::function<void()> on_run) {
642 on_run_.push_back(::std::move(on_run));
643}
644
Austin Schuh7d87b672019-12-01 20:23:49 -0800645void ShmEventLoop::HandleEvent() {
646 // Update all the times for handlers.
647 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
648 internal::WatcherState *watcher =
649 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
650
651 watcher->CheckForNewData();
652 }
653
Austin Schuh39788ff2019-12-01 18:22:57 -0800654 while (true) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800655 if (EventCount() == 0 ||
656 PeekEvent()->event_time() > monotonic_clock::now()) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800657 break;
658 }
659
Austin Schuh7d87b672019-12-01 20:23:49 -0800660 EventLoopEvent *event = PopEvent();
661 event->HandleEvent();
Austin Schuh39788ff2019-12-01 18:22:57 -0800662 }
663}
664
Austin Schuh32fd5a72019-12-01 22:20:26 -0800665// RAII class to mask signals.
666class ScopedSignalMask {
667 public:
668 ScopedSignalMask(std::initializer_list<int> signals) {
669 sigset_t sigset;
670 PCHECK(sigemptyset(&sigset) == 0);
671 for (int signal : signals) {
672 PCHECK(sigaddset(&sigset, signal) == 0);
673 }
674
675 PCHECK(sigprocmask(SIG_BLOCK, &sigset, &old_) == 0);
676 }
677
678 ~ScopedSignalMask() { PCHECK(sigprocmask(SIG_SETMASK, &old_, nullptr) == 0); }
679
680 private:
681 sigset_t old_;
682};
683
684// Class to manage the static state associated with killing multiple event
685// loops.
686class SignalHandler {
687 public:
688 // Gets the singleton.
689 static SignalHandler *global() {
690 static SignalHandler loop;
691 return &loop;
692 }
693
694 // Handles the signal with the singleton.
695 static void HandleSignal(int) { global()->DoHandleSignal(); }
696
697 // Registers an event loop to receive Exit() calls.
698 void Register(ShmEventLoop *event_loop) {
699 // Block signals while we have the mutex so we never race with the signal
700 // handler.
701 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
702 std::unique_lock<stl_mutex> locker(mutex_);
703 if (event_loops_.size() == 0) {
704 // The first caller registers the signal handler.
705 struct sigaction new_action;
706 sigemptyset(&new_action.sa_mask);
707 // This makes it so that 2 control c's to a stuck process will kill it by
708 // restoring the original signal handler.
709 new_action.sa_flags = SA_RESETHAND;
710 new_action.sa_handler = &HandleSignal;
711
712 PCHECK(sigaction(SIGINT, &new_action, &old_action_int_) == 0);
713 PCHECK(sigaction(SIGHUP, &new_action, &old_action_hup_) == 0);
714 PCHECK(sigaction(SIGTERM, &new_action, &old_action_term_) == 0);
715 }
716
717 event_loops_.push_back(event_loop);
718 }
719
720 // Unregisters an event loop to receive Exit() calls.
721 void Unregister(ShmEventLoop *event_loop) {
722 // Block signals while we have the mutex so we never race with the signal
723 // handler.
724 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
725 std::unique_lock<stl_mutex> locker(mutex_);
726
Brian Silverman5120afb2020-01-31 17:44:35 -0800727 event_loops_.erase(
728 std::find(event_loops_.begin(), event_loops_.end(), event_loop));
Austin Schuh32fd5a72019-12-01 22:20:26 -0800729
730 if (event_loops_.size() == 0u) {
731 // The last caller restores the original signal handlers.
732 PCHECK(sigaction(SIGINT, &old_action_int_, nullptr) == 0);
733 PCHECK(sigaction(SIGHUP, &old_action_hup_, nullptr) == 0);
734 PCHECK(sigaction(SIGTERM, &old_action_term_, nullptr) == 0);
735 }
736 }
737
738 private:
739 void DoHandleSignal() {
740 // We block signals while grabbing the lock, so there should never be a
741 // race. Confirm that this is true using trylock.
742 CHECK(mutex_.try_lock()) << ": sigprocmask failed to block signals while "
743 "modifing the event loop list.";
744 for (ShmEventLoop *event_loop : event_loops_) {
745 event_loop->Exit();
746 }
747 mutex_.unlock();
748 }
749
750 // Mutex to protect all state.
751 stl_mutex mutex_;
752 std::vector<ShmEventLoop *> event_loops_;
753 struct sigaction old_action_int_;
754 struct sigaction old_action_hup_;
755 struct sigaction old_action_term_;
756};
757
Alex Perrycb7da4b2019-08-28 19:35:56 -0700758void ShmEventLoop::Run() {
Austin Schuh32fd5a72019-12-01 22:20:26 -0800759 SignalHandler::global()->Register(this);
Austin Schuh39788ff2019-12-01 18:22:57 -0800760
Alex Perrycb7da4b2019-08-28 19:35:56 -0700761 std::unique_ptr<ipc_lib::SignalFd> signalfd;
762
763 if (watchers_.size() > 0) {
764 signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
765
766 epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
767 signalfd_siginfo result = signalfd_ptr->Read();
768 CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
769
770 // TODO(austin): We should really be checking *everything*, not just
771 // watchers, and calling the oldest thing first. That will improve
772 // determinism a lot.
773
Austin Schuh7d87b672019-12-01 20:23:49 -0800774 HandleEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700775 });
776 }
777
Austin Schuh39788ff2019-12-01 18:22:57 -0800778 MaybeScheduleTimingReports();
779
Austin Schuh7d87b672019-12-01 20:23:49 -0800780 ReserveEvents();
781
James Kuszmaul57c2baa2020-01-19 14:52:52 -0800782 aos::SetCurrentThreadName(name_.substr(0, 16));
Austin Schuh39788ff2019-12-01 18:22:57 -0800783 // Now, all the callbacks are setup. Lock everything into memory and go RT.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700784 if (priority_ != 0) {
785 ::aos::InitRT();
786
787 LOG(INFO) << "Setting priority to " << priority_;
788 ::aos::SetCurrentThreadRealtimePriority(priority_);
789 }
790
791 set_is_running(true);
792
793 // Now that we are realtime (but before the OnRun handlers run), snap the
794 // queue index.
Austin Schuh39788ff2019-12-01 18:22:57 -0800795 for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
796 watcher->Startup(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700797 }
798
799 // Now that we are RT, run all the OnRun handlers.
800 for (const auto &run : on_run_) {
801 run();
802 }
803
Alex Perrycb7da4b2019-08-28 19:35:56 -0700804 // And start our main event loop which runs all the timers and handles Quit.
805 epoll_.Run();
806
807 // Once epoll exits, there is no useful nonrt work left to do.
808 set_is_running(false);
809
810 // Nothing time or synchronization critical needs to happen after this point.
811 // Drop RT priority.
812 ::aos::UnsetCurrentThreadRealtimePriority();
813
Austin Schuh39788ff2019-12-01 18:22:57 -0800814 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
815 internal::WatcherState *watcher =
816 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700817 watcher->UnregisterWakeup();
818 }
819
820 if (watchers_.size() > 0) {
821 epoll_.DeleteFd(signalfd->fd());
822 signalfd.reset();
823 }
Austin Schuh32fd5a72019-12-01 22:20:26 -0800824
825 SignalHandler::global()->Unregister(this);
Austin Schuhe84c3ed2019-12-14 15:29:48 -0800826
827 // Trigger any remaining senders or fetchers to be cleared before destroying
828 // the event loop so the book keeping matches. Do this in the thread that
829 // created the timing reporter.
830 timing_report_sender_.reset();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700831}
832
833void ShmEventLoop::Exit() { epoll_.Quit(); }
834
835ShmEventLoop::~ShmEventLoop() {
Austin Schuh39788ff2019-12-01 18:22:57 -0800836 // Force everything with a registered fd with epoll to be destroyed now.
837 timers_.clear();
838 phased_loops_.clear();
839 watchers_.clear();
840
Alex Perrycb7da4b2019-08-28 19:35:56 -0700841 CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
842}
843
Alex Perrycb7da4b2019-08-28 19:35:56 -0700844void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
845 if (is_running()) {
846 LOG(FATAL) << "Cannot set realtime priority while running.";
847 }
848 priority_ = priority;
849}
850
James Kuszmaul57c2baa2020-01-19 14:52:52 -0800851void ShmEventLoop::set_name(const std::string_view name) {
852 name_ = std::string(name);
853 UpdateTimingReport();
854}
855
Brian Silverman5120afb2020-01-31 17:44:35 -0800856absl::Span<char> ShmEventLoop::GetWatcherSharedMemory(const Channel *channel) {
857 internal::WatcherState *const watcher_state =
858 static_cast<internal::WatcherState *>(GetWatcherState(channel));
859 return watcher_state->GetSharedMemory();
860}
861
862absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
863 const aos::RawSender *sender) const {
864 return static_cast<const internal::ShmSender *>(sender)->GetSharedMemory();
865}
866
Austin Schuh39788ff2019-12-01 18:22:57 -0800867pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
868
Alex Perrycb7da4b2019-08-28 19:35:56 -0700869} // namespace aos