blob: a8e227e825d143c0b4127edf5ba3bca6c3f3f2ec [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/shm_event_loop.h"
2
3#include <sys/mman.h>
4#include <sys/stat.h>
Austin Schuh39788ff2019-12-01 18:22:57 -08005#include <sys/syscall.h>
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include <sys/types.h>
7#include <unistd.h>
8#include <algorithm>
9#include <atomic>
10#include <chrono>
Austin Schuh39788ff2019-12-01 18:22:57 -080011#include <iterator>
Alex Perrycb7da4b2019-08-28 19:35:56 -070012#include <stdexcept>
13
14#include "aos/events/epoll.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080015#include "aos/events/event_loop_generated.h"
16#include "aos/events/timing_statistics.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/ipc_lib/lockless_queue.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080018#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070019#include "aos/realtime.h"
Austin Schuh32fd5a72019-12-01 22:20:26 -080020#include "aos/stl_mutex/stl_mutex.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070021#include "aos/util/phased_loop.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080022#include "glog/logging.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070023
24DEFINE_string(shm_base, "/dev/shm/aos",
25 "Directory to place queue backing mmaped files in.");
26DEFINE_uint32(permissions, 0770,
27 "Permissions to make shared memory files and folders.");
28
29namespace aos {
30
31std::string ShmFolder(const Channel *channel) {
32 CHECK(channel->has_name());
33 CHECK_EQ(channel->name()->string_view()[0], '/');
34 return FLAGS_shm_base + channel->name()->str() + "/";
35}
36std::string ShmPath(const Channel *channel) {
37 CHECK(channel->has_type());
Austin Schuhad154822019-12-27 15:45:13 -080038 return ShmFolder(channel) + channel->type()->str() + ".v1";
Alex Perrycb7da4b2019-08-28 19:35:56 -070039}
40
41class MMapedQueue {
42 public:
Austin Schuhaa79e4e2019-12-29 20:43:32 -080043 MMapedQueue(const Channel *channel,
44 const std::chrono::seconds channel_storage_duration) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070045 std::string path = ShmPath(channel);
46
Austin Schuh80c7fce2019-12-05 20:48:43 -080047 config_.num_watchers = channel->num_watchers();
48 config_.num_senders = channel->num_senders();
Austin Schuhaa79e4e2019-12-29 20:43:32 -080049 config_.queue_size =
50 channel_storage_duration.count() * channel->frequency();
Alex Perrycb7da4b2019-08-28 19:35:56 -070051 config_.message_data_size = channel->max_size();
52
53 size_ = ipc_lib::LocklessQueueMemorySize(config_);
54
55 MkdirP(path);
56
57 // There are 2 cases. Either the file already exists, or it does not
58 // already exist and we need to create it. Start by trying to create it. If
59 // that fails, the file has already been created and we can open it
60 // normally.. Once the file has been created it wil never be deleted.
61 fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
62 O_CLOEXEC | FLAGS_permissions);
63 if (fd_ == -1 && errno == EEXIST) {
64 VLOG(1) << path << " already created.";
65 // File already exists.
66 fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
67 PCHECK(fd_ != -1) << ": Failed to open " << path;
68 while (true) {
69 struct stat st;
70 PCHECK(fstat(fd_, &st) == 0);
71 if (st.st_size != 0) {
72 CHECK_EQ(static_cast<size_t>(st.st_size), size_)
73 << ": Size of " << path
74 << " doesn't match expected size of backing queue file. Did the "
75 "queue definition change?";
76 break;
77 } else {
78 // The creating process didn't get around to it yet. Give it a bit.
79 std::this_thread::sleep_for(std::chrono::milliseconds(10));
80 VLOG(1) << path << " is zero size, waiting";
81 }
82 }
83 } else {
84 VLOG(1) << "Created " << path;
85 PCHECK(fd_ != -1) << ": Failed to open " << path;
86 PCHECK(ftruncate(fd_, size_) == 0);
87 }
88
89 data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
90 PCHECK(data_ != MAP_FAILED);
91
92 ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
93 }
94
95 ~MMapedQueue() {
96 PCHECK(munmap(data_, size_) == 0);
97 PCHECK(close(fd_) == 0);
98 }
99
100 ipc_lib::LocklessQueueMemory *memory() const {
101 return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
102 }
103
Austin Schuh39788ff2019-12-01 18:22:57 -0800104 const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700105
106 private:
James Kuszmaul3ae42262019-11-08 12:33:41 -0800107 void MkdirP(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700108 auto last_slash_pos = path.find_last_of("/");
109
James Kuszmaul3ae42262019-11-08 12:33:41 -0800110 std::string folder(last_slash_pos == std::string_view::npos
111 ? std::string_view("")
Alex Perrycb7da4b2019-08-28 19:35:56 -0700112 : path.substr(0, last_slash_pos));
Austin Schuh8ec76182019-12-23 16:28:00 -0800113 if (folder.empty()) return;
114 MkdirP(folder);
115 VLOG(1) << "Creating " << folder;
116 const int result = mkdir(folder.c_str(), FLAGS_permissions);
117 if (result == -1 && errno == EEXIST) {
118 VLOG(1) << "Already exists";
119 return;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700120 }
Austin Schuh8ec76182019-12-23 16:28:00 -0800121 PCHECK(result == 0) << ": Error creating " << folder;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700122 }
123
124 ipc_lib::LocklessQueueConfiguration config_;
125
126 int fd_;
127
128 size_t size_;
129 void *data_;
130};
131
Austin Schuh217a9782019-12-21 23:02:50 -0800132namespace {
133
Alex Perrycb7da4b2019-08-28 19:35:56 -0700134// Returns the portion of the path after the last /.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800135std::string_view Filename(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700136 auto last_slash_pos = path.find_last_of("/");
137
James Kuszmaul3ae42262019-11-08 12:33:41 -0800138 return last_slash_pos == std::string_view::npos
Alex Perrycb7da4b2019-08-28 19:35:56 -0700139 ? path
140 : path.substr(last_slash_pos + 1, path.size());
141}
142
Austin Schuh217a9782019-12-21 23:02:50 -0800143const Node *MaybeMyNode(const Configuration *configuration) {
144 if (!configuration->has_nodes()) {
145 return nullptr;
146 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700147
Austin Schuh217a9782019-12-21 23:02:50 -0800148 return configuration::GetMyNode(configuration);
149}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700150
151namespace chrono = ::std::chrono;
152
Austin Schuh39788ff2019-12-01 18:22:57 -0800153} // namespace
154
Austin Schuh217a9782019-12-21 23:02:50 -0800155ShmEventLoop::ShmEventLoop(const Configuration *configuration)
156 : EventLoop(configuration),
157 name_(Filename(program_invocation_name)),
158 node_(MaybeMyNode(configuration)) {}
159
Austin Schuh39788ff2019-12-01 18:22:57 -0800160namespace internal {
161
162class SimpleShmFetcher {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700163 public:
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800164 explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
Austin Schuhf5652592019-12-29 16:26:15 -0800165 : channel_(channel),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800166 lockless_queue_memory_(
167 channel,
168 chrono::duration_cast<chrono::seconds>(chrono::nanoseconds(
169 event_loop->configuration()->channel_storage_duration()))),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700170 lockless_queue_(lockless_queue_memory_.memory(),
171 lockless_queue_memory_.config()),
172 data_storage_(static_cast<AlignedChar *>(aligned_alloc(
173 alignof(AlignedChar), channel->max_size())),
174 &free) {
175 context_.data = nullptr;
176 // Point the queue index at the next index to read starting now. This
177 // makes it such that FetchNext will read the next message sent after
178 // the fetcher is created.
179 PointAtNextQueueIndex();
180 }
181
Austin Schuh39788ff2019-12-01 18:22:57 -0800182 ~SimpleShmFetcher() {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700183
184 // Points the next message to fetch at the queue index which will be
185 // populated next.
186 void PointAtNextQueueIndex() {
187 actual_queue_index_ = lockless_queue_.LatestQueueIndex();
188 if (!actual_queue_index_.valid()) {
189 // Nothing in the queue. The next element will show up at the 0th
190 // index in the queue.
191 actual_queue_index_ =
192 ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
193 } else {
194 actual_queue_index_ = actual_queue_index_.Increment();
195 }
196 }
197
Austin Schuh39788ff2019-12-01 18:22:57 -0800198 bool FetchNext() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700199 // TODO(austin): Get behind and make sure it dies both here and with
200 // Fetch.
201 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
Austin Schuhad154822019-12-27 15:45:13 -0800202 actual_queue_index_.index(), &context_.monotonic_event_time,
203 &context_.realtime_event_time, &context_.monotonic_remote_time,
204 &context_.realtime_remote_time, &context_.remote_queue_index,
205 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700206 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
207 context_.queue_index = actual_queue_index_.index();
Austin Schuhad154822019-12-27 15:45:13 -0800208 if (context_.remote_queue_index == 0xffffffffu) {
209 context_.remote_queue_index = context_.queue_index;
210 }
211 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
212 context_.monotonic_remote_time = context_.monotonic_event_time;
213 }
214 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
215 context_.realtime_remote_time = context_.realtime_event_time;
216 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800217 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
218 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700219 actual_queue_index_ = actual_queue_index_.Increment();
220 }
221
222 // Make sure the data wasn't modified while we were reading it. This
223 // can only happen if you are reading the last message *while* it is
224 // being written to, which means you are pretty far behind.
225 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
226 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800227 "out from under us while we were reading it. Don't get so far "
228 "behind. "
229 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700230
231 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800232 << ": The next message is no longer available. "
233 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700234 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
235 }
236
Austin Schuh39788ff2019-12-01 18:22:57 -0800237 bool Fetch() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700238 const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
239 // actual_queue_index_ is only meaningful if it was set by Fetch or
240 // FetchNext. This happens when valid_data_ has been set. So, only
241 // skip checking if valid_data_ is true.
242 //
243 // Also, if the latest queue index is invalid, we are empty. So there
244 // is nothing to fetch.
Austin Schuh39788ff2019-12-01 18:22:57 -0800245 if ((context_.data != nullptr &&
Alex Perrycb7da4b2019-08-28 19:35:56 -0700246 queue_index == actual_queue_index_.DecrementBy(1u)) ||
247 !queue_index.valid()) {
248 return false;
249 }
250
Austin Schuhad154822019-12-27 15:45:13 -0800251 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
252 queue_index.index(), &context_.monotonic_event_time,
253 &context_.realtime_event_time, &context_.monotonic_remote_time,
254 &context_.realtime_remote_time, &context_.remote_queue_index,
255 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700256 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
257 context_.queue_index = queue_index.index();
Austin Schuhad154822019-12-27 15:45:13 -0800258 if (context_.remote_queue_index == 0xffffffffu) {
259 context_.remote_queue_index = context_.queue_index;
260 }
261 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
262 context_.monotonic_remote_time = context_.monotonic_event_time;
263 }
264 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
265 context_.realtime_remote_time = context_.realtime_event_time;
266 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800267 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
268 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700269 actual_queue_index_ = queue_index.Increment();
270 }
271
272 // Make sure the data wasn't modified while we were reading it. This
273 // can only happen if you are reading the last message *while* it is
274 // being written to, which means you are pretty far behind.
275 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
276 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800277 "out from under us while we were reading it. Don't get so far "
278 "behind."
279 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700280
281 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
Austin Schuhf5652592019-12-29 16:26:15 -0800282 << ": Queue index went backwards. This should never happen. "
283 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700284
285 // We fell behind between when we read the index and read the value.
286 // This isn't worth recovering from since this means we went to sleep
287 // for a long time in the middle of this function.
288 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800289 << ": The next message is no longer available. "
290 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700291 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
292 }
293
Austin Schuh39788ff2019-12-01 18:22:57 -0800294 Context context() const { return context_; }
295
Alex Perrycb7da4b2019-08-28 19:35:56 -0700296 bool RegisterWakeup(int priority) {
297 return lockless_queue_.RegisterWakeup(priority);
298 }
299
300 void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
301
302 private:
Austin Schuhf5652592019-12-29 16:26:15 -0800303 const Channel *const channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700304 MMapedQueue lockless_queue_memory_;
305 ipc_lib::LocklessQueue lockless_queue_;
306
307 ipc_lib::QueueIndex actual_queue_index_ =
308 ipc_lib::LocklessQueue::empty_queue_index();
309
310 struct AlignedChar {
311 alignas(32) char data;
312 };
313
314 std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800315
316 Context context_;
317};
318
319class ShmFetcher : public RawFetcher {
320 public:
321 explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800322 : RawFetcher(event_loop, channel),
323 simple_shm_fetcher_(event_loop, channel) {}
Austin Schuh39788ff2019-12-01 18:22:57 -0800324
325 ~ShmFetcher() { context_.data = nullptr; }
326
327 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
328 if (simple_shm_fetcher_.FetchNext()) {
329 context_ = simple_shm_fetcher_.context();
330 return std::make_pair(true, monotonic_clock::now());
331 }
332 return std::make_pair(false, monotonic_clock::min_time);
333 }
334
335 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
336 if (simple_shm_fetcher_.Fetch()) {
337 context_ = simple_shm_fetcher_.context();
338 return std::make_pair(true, monotonic_clock::now());
339 }
340 return std::make_pair(false, monotonic_clock::min_time);
341 }
342
343 private:
344 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700345};
346
347class ShmSender : public RawSender {
348 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800349 explicit ShmSender(EventLoop *event_loop, const Channel *channel)
350 : RawSender(event_loop, channel),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800351 lockless_queue_memory_(
352 channel,
353 chrono::duration_cast<chrono::seconds>(chrono::nanoseconds(
354 event_loop->configuration()->channel_storage_duration()))),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700355 lockless_queue_(lockless_queue_memory_.memory(),
356 lockless_queue_memory_.config()),
357 lockless_queue_sender_(lockless_queue_.MakeSender()) {}
358
Austin Schuh39788ff2019-12-01 18:22:57 -0800359 ~ShmSender() override {}
360
Alex Perrycb7da4b2019-08-28 19:35:56 -0700361 void *data() override { return lockless_queue_sender_.Data(); }
362 size_t size() override { return lockless_queue_sender_.size(); }
Austin Schuhad154822019-12-27 15:45:13 -0800363 bool DoSend(size_t length,
364 aos::monotonic_clock::time_point monotonic_remote_time,
365 aos::realtime_clock::time_point realtime_remote_time,
366 uint32_t remote_queue_index) override {
367 lockless_queue_sender_.Send(
368 length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
369 &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800370 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700371 return true;
372 }
373
Austin Schuhad154822019-12-27 15:45:13 -0800374 bool DoSend(const void *msg, size_t length,
375 aos::monotonic_clock::time_point monotonic_remote_time,
376 aos::realtime_clock::time_point realtime_remote_time,
377 uint32_t remote_queue_index) override {
378 lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length,
379 monotonic_remote_time, realtime_remote_time,
380 remote_queue_index, &monotonic_sent_time_,
381 &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800382 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700383 // TODO(austin): Return an error if we send too fast.
384 return true;
385 }
386
Alex Perrycb7da4b2019-08-28 19:35:56 -0700387 private:
Alex Perrycb7da4b2019-08-28 19:35:56 -0700388 MMapedQueue lockless_queue_memory_;
389 ipc_lib::LocklessQueue lockless_queue_;
390 ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
391};
392
Alex Perrycb7da4b2019-08-28 19:35:56 -0700393// Class to manage the state for a Watcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800394class WatcherState : public aos::WatcherState {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700395 public:
396 WatcherState(
Austin Schuh7d87b672019-12-01 20:23:49 -0800397 ShmEventLoop *event_loop, const Channel *channel,
Austin Schuh39788ff2019-12-01 18:22:57 -0800398 std::function<void(const Context &context, const void *message)> fn)
399 : aos::WatcherState(event_loop, channel, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800400 event_loop_(event_loop),
401 event_(this),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800402 simple_shm_fetcher_(event_loop, channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700403
Austin Schuh7d87b672019-12-01 20:23:49 -0800404 ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800405
406 void Startup(EventLoop *event_loop) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800407 simple_shm_fetcher_.PointAtNextQueueIndex();
Austin Schuh39788ff2019-12-01 18:22:57 -0800408 CHECK(RegisterWakeup(event_loop->priority()));
409 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700410
Alex Perrycb7da4b2019-08-28 19:35:56 -0700411 // Returns true if there is new data available.
Austin Schuh7d87b672019-12-01 20:23:49 -0800412 bool CheckForNewData() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700413 if (!has_new_data_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800414 has_new_data_ = simple_shm_fetcher_.FetchNext();
Austin Schuh7d87b672019-12-01 20:23:49 -0800415
416 if (has_new_data_) {
417 event_.set_event_time(
Austin Schuhad154822019-12-27 15:45:13 -0800418 simple_shm_fetcher_.context().monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800419 event_loop_->AddEvent(&event_);
420 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700421 }
422
423 return has_new_data_;
424 }
425
Alex Perrycb7da4b2019-08-28 19:35:56 -0700426 // Consumes the data by calling the callback.
Austin Schuh7d87b672019-12-01 20:23:49 -0800427 void HandleEvent() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700428 CHECK(has_new_data_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800429 DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700430 has_new_data_ = false;
Austin Schuh7d87b672019-12-01 20:23:49 -0800431 CheckForNewData();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700432 }
433
Austin Schuh39788ff2019-12-01 18:22:57 -0800434 // Registers us to receive a signal on event reception.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700435 bool RegisterWakeup(int priority) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800436 return simple_shm_fetcher_.RegisterWakeup(priority);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700437 }
438
Austin Schuh39788ff2019-12-01 18:22:57 -0800439 void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700440
441 private:
442 bool has_new_data_ = false;
443
Austin Schuh7d87b672019-12-01 20:23:49 -0800444 ShmEventLoop *event_loop_;
445 EventHandler<WatcherState> event_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800446 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700447};
448
449// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh7d87b672019-12-01 20:23:49 -0800450class TimerHandlerState final : public TimerHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700451 public:
452 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -0800453 : TimerHandler(shm_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800454 shm_event_loop_(shm_event_loop),
455 event_(this) {
456 shm_event_loop_->epoll_.OnReadable(
457 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700458 }
459
Austin Schuh7d87b672019-12-01 20:23:49 -0800460 ~TimerHandlerState() {
461 Disable();
462 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
463 }
464
465 void HandleEvent() {
466 uint64_t elapsed_cycles = timerfd_.Read();
467 if (elapsed_cycles == 0u) {
468 // We got called before the timer interrupt could happen, but because we
469 // are checking the time, we got called on time. Push the timer out by 1
470 // cycle.
471 elapsed_cycles = 1u;
472 timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
473 }
474
475 Call(monotonic_clock::now, base_);
476
477 base_ += repeat_offset_ * elapsed_cycles;
478
479 if (repeat_offset_ != chrono::seconds(0)) {
480 event_.set_event_time(base_);
481 shm_event_loop_->AddEvent(&event_);
482 }
483 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700484
485 void Setup(monotonic_clock::time_point base,
486 monotonic_clock::duration repeat_offset) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800487 if (event_.valid()) {
488 shm_event_loop_->RemoveEvent(&event_);
489 }
490
Alex Perrycb7da4b2019-08-28 19:35:56 -0700491 timerfd_.SetTime(base, repeat_offset);
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800492 base_ = base;
493 repeat_offset_ = repeat_offset;
Austin Schuh7d87b672019-12-01 20:23:49 -0800494 event_.set_event_time(base_);
495 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700496 }
497
Austin Schuh7d87b672019-12-01 20:23:49 -0800498 void Disable() override {
499 shm_event_loop_->RemoveEvent(&event_);
500 timerfd_.Disable();
501 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700502
503 private:
504 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800505 EventHandler<TimerHandlerState> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700506
507 TimerFd timerfd_;
508
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800509 monotonic_clock::time_point base_;
510 monotonic_clock::duration repeat_offset_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700511};
512
513// Adapter class to the timerfd and PhasedLoop.
Austin Schuh7d87b672019-12-01 20:23:49 -0800514class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700515 public:
516 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
517 const monotonic_clock::duration interval,
518 const monotonic_clock::duration offset)
Austin Schuh39788ff2019-12-01 18:22:57 -0800519 : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
Austin Schuh7d87b672019-12-01 20:23:49 -0800520 shm_event_loop_(shm_event_loop),
521 event_(this) {
522 shm_event_loop_->epoll_.OnReadable(
523 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
524 }
525
526 void HandleEvent() {
527 // The return value for read is the number of cycles that have elapsed.
528 // Because we check to see when this event *should* have happened, there are
529 // cases where Read() will return 0, when 1 cycle has actually happened.
530 // This occurs when the timer interrupt hasn't triggered yet. Therefore,
531 // ignore it. Call handles rescheduling and calculating elapsed cycles
532 // without any extra help.
533 timerfd_.Read();
534 event_.Invalidate();
535
536 Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
537 Schedule(sleep_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700538 });
539 }
540
Austin Schuh39788ff2019-12-01 18:22:57 -0800541 ~PhasedLoopHandler() override {
542 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
Austin Schuh7d87b672019-12-01 20:23:49 -0800543 shm_event_loop_->RemoveEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700544 }
545
546 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800547 // Reschedules the timer.
Austin Schuh39788ff2019-12-01 18:22:57 -0800548 void Schedule(monotonic_clock::time_point sleep_time) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800549 if (event_.valid()) {
550 shm_event_loop_->RemoveEvent(&event_);
551 }
552
Austin Schuh39788ff2019-12-01 18:22:57 -0800553 timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
Austin Schuh7d87b672019-12-01 20:23:49 -0800554 event_.set_event_time(sleep_time);
555 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700556 }
557
558 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800559 EventHandler<PhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700560
561 TimerFd timerfd_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700562};
563} // namespace internal
564
565::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
566 const Channel *channel) {
Austin Schuhca4828c2019-12-28 14:21:35 -0800567 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
568 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
569 << "\", \"type\": \"" << channel->type()->string_view()
570 << "\" } is not able to be fetched on this node. Check your "
571 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800572 }
573
Austin Schuh39788ff2019-12-01 18:22:57 -0800574 return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700575}
576
577::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
578 const Channel *channel) {
579 Take(channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800580
581 return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700582}
583
584void ShmEventLoop::MakeRawWatcher(
585 const Channel *channel,
586 std::function<void(const Context &context, const void *message)> watcher) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700587 Take(channel);
588
Austin Schuhca4828c2019-12-28 14:21:35 -0800589 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
590 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
591 << "\", \"type\": \"" << channel->type()->string_view()
592 << "\" } is not able to be watched on this node. Check your "
593 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800594 }
595
Austin Schuh39788ff2019-12-01 18:22:57 -0800596 NewWatcher(::std::unique_ptr<WatcherState>(
597 new internal::WatcherState(this, channel, std::move(watcher))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700598}
599
600TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800601 return NewTimer(::std::unique_ptr<TimerHandler>(
602 new internal::TimerHandlerState(this, ::std::move(callback))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700603}
604
605PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
606 ::std::function<void(int)> callback,
607 const monotonic_clock::duration interval,
608 const monotonic_clock::duration offset) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800609 return NewPhasedLoop(
610 ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
611 this, ::std::move(callback), interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700612}
613
614void ShmEventLoop::OnRun(::std::function<void()> on_run) {
615 on_run_.push_back(::std::move(on_run));
616}
617
Austin Schuh7d87b672019-12-01 20:23:49 -0800618void ShmEventLoop::HandleEvent() {
619 // Update all the times for handlers.
620 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
621 internal::WatcherState *watcher =
622 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
623
624 watcher->CheckForNewData();
625 }
626
Austin Schuh39788ff2019-12-01 18:22:57 -0800627 while (true) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800628 if (EventCount() == 0 ||
629 PeekEvent()->event_time() > monotonic_clock::now()) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800630 break;
631 }
632
Austin Schuh7d87b672019-12-01 20:23:49 -0800633 EventLoopEvent *event = PopEvent();
634 event->HandleEvent();
Austin Schuh39788ff2019-12-01 18:22:57 -0800635 }
636}
637
Austin Schuh32fd5a72019-12-01 22:20:26 -0800638// RAII class to mask signals.
639class ScopedSignalMask {
640 public:
641 ScopedSignalMask(std::initializer_list<int> signals) {
642 sigset_t sigset;
643 PCHECK(sigemptyset(&sigset) == 0);
644 for (int signal : signals) {
645 PCHECK(sigaddset(&sigset, signal) == 0);
646 }
647
648 PCHECK(sigprocmask(SIG_BLOCK, &sigset, &old_) == 0);
649 }
650
651 ~ScopedSignalMask() { PCHECK(sigprocmask(SIG_SETMASK, &old_, nullptr) == 0); }
652
653 private:
654 sigset_t old_;
655};
656
657// Class to manage the static state associated with killing multiple event
658// loops.
659class SignalHandler {
660 public:
661 // Gets the singleton.
662 static SignalHandler *global() {
663 static SignalHandler loop;
664 return &loop;
665 }
666
667 // Handles the signal with the singleton.
668 static void HandleSignal(int) { global()->DoHandleSignal(); }
669
670 // Registers an event loop to receive Exit() calls.
671 void Register(ShmEventLoop *event_loop) {
672 // Block signals while we have the mutex so we never race with the signal
673 // handler.
674 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
675 std::unique_lock<stl_mutex> locker(mutex_);
676 if (event_loops_.size() == 0) {
677 // The first caller registers the signal handler.
678 struct sigaction new_action;
679 sigemptyset(&new_action.sa_mask);
680 // This makes it so that 2 control c's to a stuck process will kill it by
681 // restoring the original signal handler.
682 new_action.sa_flags = SA_RESETHAND;
683 new_action.sa_handler = &HandleSignal;
684
685 PCHECK(sigaction(SIGINT, &new_action, &old_action_int_) == 0);
686 PCHECK(sigaction(SIGHUP, &new_action, &old_action_hup_) == 0);
687 PCHECK(sigaction(SIGTERM, &new_action, &old_action_term_) == 0);
688 }
689
690 event_loops_.push_back(event_loop);
691 }
692
693 // Unregisters an event loop to receive Exit() calls.
694 void Unregister(ShmEventLoop *event_loop) {
695 // Block signals while we have the mutex so we never race with the signal
696 // handler.
697 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
698 std::unique_lock<stl_mutex> locker(mutex_);
699
700 event_loops_.erase(std::find(event_loops_.begin(), event_loops_.end(), event_loop));
701
702 if (event_loops_.size() == 0u) {
703 // The last caller restores the original signal handlers.
704 PCHECK(sigaction(SIGINT, &old_action_int_, nullptr) == 0);
705 PCHECK(sigaction(SIGHUP, &old_action_hup_, nullptr) == 0);
706 PCHECK(sigaction(SIGTERM, &old_action_term_, nullptr) == 0);
707 }
708 }
709
710 private:
711 void DoHandleSignal() {
712 // We block signals while grabbing the lock, so there should never be a
713 // race. Confirm that this is true using trylock.
714 CHECK(mutex_.try_lock()) << ": sigprocmask failed to block signals while "
715 "modifing the event loop list.";
716 for (ShmEventLoop *event_loop : event_loops_) {
717 event_loop->Exit();
718 }
719 mutex_.unlock();
720 }
721
722 // Mutex to protect all state.
723 stl_mutex mutex_;
724 std::vector<ShmEventLoop *> event_loops_;
725 struct sigaction old_action_int_;
726 struct sigaction old_action_hup_;
727 struct sigaction old_action_term_;
728};
729
Alex Perrycb7da4b2019-08-28 19:35:56 -0700730void ShmEventLoop::Run() {
Austin Schuh32fd5a72019-12-01 22:20:26 -0800731 SignalHandler::global()->Register(this);
Austin Schuh39788ff2019-12-01 18:22:57 -0800732
Alex Perrycb7da4b2019-08-28 19:35:56 -0700733 std::unique_ptr<ipc_lib::SignalFd> signalfd;
734
735 if (watchers_.size() > 0) {
736 signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
737
738 epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
739 signalfd_siginfo result = signalfd_ptr->Read();
740 CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
741
742 // TODO(austin): We should really be checking *everything*, not just
743 // watchers, and calling the oldest thing first. That will improve
744 // determinism a lot.
745
Austin Schuh7d87b672019-12-01 20:23:49 -0800746 HandleEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700747 });
748 }
749
Austin Schuh39788ff2019-12-01 18:22:57 -0800750 MaybeScheduleTimingReports();
751
Austin Schuh7d87b672019-12-01 20:23:49 -0800752 ReserveEvents();
753
Austin Schuh39788ff2019-12-01 18:22:57 -0800754 // Now, all the callbacks are setup. Lock everything into memory and go RT.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700755 if (priority_ != 0) {
756 ::aos::InitRT();
757
758 LOG(INFO) << "Setting priority to " << priority_;
759 ::aos::SetCurrentThreadRealtimePriority(priority_);
760 }
761
762 set_is_running(true);
763
764 // Now that we are realtime (but before the OnRun handlers run), snap the
765 // queue index.
Austin Schuh39788ff2019-12-01 18:22:57 -0800766 for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
767 watcher->Startup(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700768 }
769
770 // Now that we are RT, run all the OnRun handlers.
771 for (const auto &run : on_run_) {
772 run();
773 }
774
Alex Perrycb7da4b2019-08-28 19:35:56 -0700775 // And start our main event loop which runs all the timers and handles Quit.
776 epoll_.Run();
777
778 // Once epoll exits, there is no useful nonrt work left to do.
779 set_is_running(false);
780
781 // Nothing time or synchronization critical needs to happen after this point.
782 // Drop RT priority.
783 ::aos::UnsetCurrentThreadRealtimePriority();
784
Austin Schuh39788ff2019-12-01 18:22:57 -0800785 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
786 internal::WatcherState *watcher =
787 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700788 watcher->UnregisterWakeup();
789 }
790
791 if (watchers_.size() > 0) {
792 epoll_.DeleteFd(signalfd->fd());
793 signalfd.reset();
794 }
Austin Schuh32fd5a72019-12-01 22:20:26 -0800795
796 SignalHandler::global()->Unregister(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700797}
798
799void ShmEventLoop::Exit() { epoll_.Quit(); }
800
801ShmEventLoop::~ShmEventLoop() {
Austin Schuh39788ff2019-12-01 18:22:57 -0800802 // Trigger any remaining senders or fetchers to be cleared before destroying
803 // the event loop so the book keeping matches.
804 timing_report_sender_.reset();
805
806 // Force everything with a registered fd with epoll to be destroyed now.
807 timers_.clear();
808 phased_loops_.clear();
809 watchers_.clear();
810
Alex Perrycb7da4b2019-08-28 19:35:56 -0700811 CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
812}
813
814void ShmEventLoop::Take(const Channel *channel) {
815 CHECK(!is_running()) << ": Cannot add new objects while running.";
816
817 // Cheat aggresively. Use the shared memory path as a proxy for a unique
818 // identifier for the channel.
819 const std::string path = ShmPath(channel);
820
821 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
822 CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
823
824 taken_.emplace_back(path);
825}
826
827void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
828 if (is_running()) {
829 LOG(FATAL) << "Cannot set realtime priority while running.";
830 }
831 priority_ = priority;
832}
833
Austin Schuh39788ff2019-12-01 18:22:57 -0800834pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
835
Alex Perrycb7da4b2019-08-28 19:35:56 -0700836} // namespace aos