blob: de941c3069ee690368f2014de3cfa5ccb8d9396d [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/shm_event_loop.h"
2
3#include <sys/mman.h>
4#include <sys/stat.h>
Austin Schuh39788ff2019-12-01 18:22:57 -08005#include <sys/syscall.h>
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include <sys/types.h>
7#include <unistd.h>
8#include <algorithm>
9#include <atomic>
10#include <chrono>
Austin Schuh39788ff2019-12-01 18:22:57 -080011#include <iterator>
Alex Perrycb7da4b2019-08-28 19:35:56 -070012#include <stdexcept>
13
14#include "aos/events/epoll.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080015#include "aos/events/event_loop_generated.h"
16#include "aos/events/timing_statistics.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/ipc_lib/lockless_queue.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080018#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070019#include "aos/realtime.h"
Austin Schuh32fd5a72019-12-01 22:20:26 -080020#include "aos/stl_mutex/stl_mutex.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070021#include "aos/util/phased_loop.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080022#include "glog/logging.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070023
24DEFINE_string(shm_base, "/dev/shm/aos",
25 "Directory to place queue backing mmaped files in.");
26DEFINE_uint32(permissions, 0770,
27 "Permissions to make shared memory files and folders.");
28
29namespace aos {
30
31std::string ShmFolder(const Channel *channel) {
32 CHECK(channel->has_name());
33 CHECK_EQ(channel->name()->string_view()[0], '/');
34 return FLAGS_shm_base + channel->name()->str() + "/";
35}
36std::string ShmPath(const Channel *channel) {
37 CHECK(channel->has_type());
Austin Schuhad154822019-12-27 15:45:13 -080038 return ShmFolder(channel) + channel->type()->str() + ".v1";
Alex Perrycb7da4b2019-08-28 19:35:56 -070039}
40
41class MMapedQueue {
42 public:
43 MMapedQueue(const Channel *channel) {
44 std::string path = ShmPath(channel);
45
Austin Schuh80c7fce2019-12-05 20:48:43 -080046 config_.num_watchers = channel->num_watchers();
47 config_.num_senders = channel->num_senders();
Alex Perrycb7da4b2019-08-28 19:35:56 -070048 config_.queue_size = 2 * channel->frequency();
49 config_.message_data_size = channel->max_size();
50
51 size_ = ipc_lib::LocklessQueueMemorySize(config_);
52
53 MkdirP(path);
54
55 // There are 2 cases. Either the file already exists, or it does not
56 // already exist and we need to create it. Start by trying to create it. If
57 // that fails, the file has already been created and we can open it
58 // normally.. Once the file has been created it wil never be deleted.
59 fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
60 O_CLOEXEC | FLAGS_permissions);
61 if (fd_ == -1 && errno == EEXIST) {
62 VLOG(1) << path << " already created.";
63 // File already exists.
64 fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
65 PCHECK(fd_ != -1) << ": Failed to open " << path;
66 while (true) {
67 struct stat st;
68 PCHECK(fstat(fd_, &st) == 0);
69 if (st.st_size != 0) {
70 CHECK_EQ(static_cast<size_t>(st.st_size), size_)
71 << ": Size of " << path
72 << " doesn't match expected size of backing queue file. Did the "
73 "queue definition change?";
74 break;
75 } else {
76 // The creating process didn't get around to it yet. Give it a bit.
77 std::this_thread::sleep_for(std::chrono::milliseconds(10));
78 VLOG(1) << path << " is zero size, waiting";
79 }
80 }
81 } else {
82 VLOG(1) << "Created " << path;
83 PCHECK(fd_ != -1) << ": Failed to open " << path;
84 PCHECK(ftruncate(fd_, size_) == 0);
85 }
86
87 data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
88 PCHECK(data_ != MAP_FAILED);
89
90 ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
91 }
92
93 ~MMapedQueue() {
94 PCHECK(munmap(data_, size_) == 0);
95 PCHECK(close(fd_) == 0);
96 }
97
98 ipc_lib::LocklessQueueMemory *memory() const {
99 return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
100 }
101
Austin Schuh39788ff2019-12-01 18:22:57 -0800102 const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700103
104 private:
James Kuszmaul3ae42262019-11-08 12:33:41 -0800105 void MkdirP(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700106 auto last_slash_pos = path.find_last_of("/");
107
James Kuszmaul3ae42262019-11-08 12:33:41 -0800108 std::string folder(last_slash_pos == std::string_view::npos
109 ? std::string_view("")
Alex Perrycb7da4b2019-08-28 19:35:56 -0700110 : path.substr(0, last_slash_pos));
Austin Schuh8ec76182019-12-23 16:28:00 -0800111 if (folder.empty()) return;
112 MkdirP(folder);
113 VLOG(1) << "Creating " << folder;
114 const int result = mkdir(folder.c_str(), FLAGS_permissions);
115 if (result == -1 && errno == EEXIST) {
116 VLOG(1) << "Already exists";
117 return;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700118 }
Austin Schuh8ec76182019-12-23 16:28:00 -0800119 PCHECK(result == 0) << ": Error creating " << folder;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700120 }
121
122 ipc_lib::LocklessQueueConfiguration config_;
123
124 int fd_;
125
126 size_t size_;
127 void *data_;
128};
129
Austin Schuh217a9782019-12-21 23:02:50 -0800130namespace {
131
Alex Perrycb7da4b2019-08-28 19:35:56 -0700132// Returns the portion of the path after the last /.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800133std::string_view Filename(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700134 auto last_slash_pos = path.find_last_of("/");
135
James Kuszmaul3ae42262019-11-08 12:33:41 -0800136 return last_slash_pos == std::string_view::npos
Alex Perrycb7da4b2019-08-28 19:35:56 -0700137 ? path
138 : path.substr(last_slash_pos + 1, path.size());
139}
140
Austin Schuh217a9782019-12-21 23:02:50 -0800141const Node *MaybeMyNode(const Configuration *configuration) {
142 if (!configuration->has_nodes()) {
143 return nullptr;
144 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700145
Austin Schuh217a9782019-12-21 23:02:50 -0800146 return configuration::GetMyNode(configuration);
147}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700148
149namespace chrono = ::std::chrono;
150
Austin Schuh39788ff2019-12-01 18:22:57 -0800151} // namespace
152
Austin Schuh217a9782019-12-21 23:02:50 -0800153ShmEventLoop::ShmEventLoop(const Configuration *configuration)
154 : EventLoop(configuration),
155 name_(Filename(program_invocation_name)),
156 node_(MaybeMyNode(configuration)) {}
157
Austin Schuh39788ff2019-12-01 18:22:57 -0800158namespace internal {
159
160class SimpleShmFetcher {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700161 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800162 explicit SimpleShmFetcher(const Channel *channel)
Austin Schuhf5652592019-12-29 16:26:15 -0800163 : channel_(channel),
164 lockless_queue_memory_(channel),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700165 lockless_queue_(lockless_queue_memory_.memory(),
166 lockless_queue_memory_.config()),
167 data_storage_(static_cast<AlignedChar *>(aligned_alloc(
168 alignof(AlignedChar), channel->max_size())),
169 &free) {
170 context_.data = nullptr;
171 // Point the queue index at the next index to read starting now. This
172 // makes it such that FetchNext will read the next message sent after
173 // the fetcher is created.
174 PointAtNextQueueIndex();
175 }
176
Austin Schuh39788ff2019-12-01 18:22:57 -0800177 ~SimpleShmFetcher() {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700178
179 // Points the next message to fetch at the queue index which will be
180 // populated next.
181 void PointAtNextQueueIndex() {
182 actual_queue_index_ = lockless_queue_.LatestQueueIndex();
183 if (!actual_queue_index_.valid()) {
184 // Nothing in the queue. The next element will show up at the 0th
185 // index in the queue.
186 actual_queue_index_ =
187 ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
188 } else {
189 actual_queue_index_ = actual_queue_index_.Increment();
190 }
191 }
192
Austin Schuh39788ff2019-12-01 18:22:57 -0800193 bool FetchNext() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700194 // TODO(austin): Get behind and make sure it dies both here and with
195 // Fetch.
196 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
Austin Schuhad154822019-12-27 15:45:13 -0800197 actual_queue_index_.index(), &context_.monotonic_event_time,
198 &context_.realtime_event_time, &context_.monotonic_remote_time,
199 &context_.realtime_remote_time, &context_.remote_queue_index,
200 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700201 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
202 context_.queue_index = actual_queue_index_.index();
Austin Schuhad154822019-12-27 15:45:13 -0800203 if (context_.remote_queue_index == 0xffffffffu) {
204 context_.remote_queue_index = context_.queue_index;
205 }
206 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
207 context_.monotonic_remote_time = context_.monotonic_event_time;
208 }
209 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
210 context_.realtime_remote_time = context_.realtime_event_time;
211 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800212 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
213 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700214 actual_queue_index_ = actual_queue_index_.Increment();
215 }
216
217 // Make sure the data wasn't modified while we were reading it. This
218 // can only happen if you are reading the last message *while* it is
219 // being written to, which means you are pretty far behind.
220 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
221 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800222 "out from under us while we were reading it. Don't get so far "
223 "behind. "
224 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700225
226 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800227 << ": The next message is no longer available. "
228 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700229 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
230 }
231
Austin Schuh39788ff2019-12-01 18:22:57 -0800232 bool Fetch() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700233 const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
234 // actual_queue_index_ is only meaningful if it was set by Fetch or
235 // FetchNext. This happens when valid_data_ has been set. So, only
236 // skip checking if valid_data_ is true.
237 //
238 // Also, if the latest queue index is invalid, we are empty. So there
239 // is nothing to fetch.
Austin Schuh39788ff2019-12-01 18:22:57 -0800240 if ((context_.data != nullptr &&
Alex Perrycb7da4b2019-08-28 19:35:56 -0700241 queue_index == actual_queue_index_.DecrementBy(1u)) ||
242 !queue_index.valid()) {
243 return false;
244 }
245
Austin Schuhad154822019-12-27 15:45:13 -0800246 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
247 queue_index.index(), &context_.monotonic_event_time,
248 &context_.realtime_event_time, &context_.monotonic_remote_time,
249 &context_.realtime_remote_time, &context_.remote_queue_index,
250 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700251 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
252 context_.queue_index = queue_index.index();
Austin Schuhad154822019-12-27 15:45:13 -0800253 if (context_.remote_queue_index == 0xffffffffu) {
254 context_.remote_queue_index = context_.queue_index;
255 }
256 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
257 context_.monotonic_remote_time = context_.monotonic_event_time;
258 }
259 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
260 context_.realtime_remote_time = context_.realtime_event_time;
261 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800262 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
263 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700264 actual_queue_index_ = queue_index.Increment();
265 }
266
267 // Make sure the data wasn't modified while we were reading it. This
268 // can only happen if you are reading the last message *while* it is
269 // being written to, which means you are pretty far behind.
270 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
271 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800272 "out from under us while we were reading it. Don't get so far "
273 "behind."
274 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700275
276 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
Austin Schuhf5652592019-12-29 16:26:15 -0800277 << ": Queue index went backwards. This should never happen. "
278 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700279
280 // We fell behind between when we read the index and read the value.
281 // This isn't worth recovering from since this means we went to sleep
282 // for a long time in the middle of this function.
283 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800284 << ": The next message is no longer available. "
285 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700286 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
287 }
288
Austin Schuh39788ff2019-12-01 18:22:57 -0800289 Context context() const { return context_; }
290
Alex Perrycb7da4b2019-08-28 19:35:56 -0700291 bool RegisterWakeup(int priority) {
292 return lockless_queue_.RegisterWakeup(priority);
293 }
294
295 void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
296
297 private:
Austin Schuhf5652592019-12-29 16:26:15 -0800298 const Channel *const channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700299 MMapedQueue lockless_queue_memory_;
300 ipc_lib::LocklessQueue lockless_queue_;
301
302 ipc_lib::QueueIndex actual_queue_index_ =
303 ipc_lib::LocklessQueue::empty_queue_index();
304
305 struct AlignedChar {
306 alignas(32) char data;
307 };
308
309 std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800310
311 Context context_;
312};
313
314class ShmFetcher : public RawFetcher {
315 public:
316 explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
317 : RawFetcher(event_loop, channel), simple_shm_fetcher_(channel) {}
318
319 ~ShmFetcher() { context_.data = nullptr; }
320
321 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
322 if (simple_shm_fetcher_.FetchNext()) {
323 context_ = simple_shm_fetcher_.context();
324 return std::make_pair(true, monotonic_clock::now());
325 }
326 return std::make_pair(false, monotonic_clock::min_time);
327 }
328
329 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
330 if (simple_shm_fetcher_.Fetch()) {
331 context_ = simple_shm_fetcher_.context();
332 return std::make_pair(true, monotonic_clock::now());
333 }
334 return std::make_pair(false, monotonic_clock::min_time);
335 }
336
337 private:
338 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700339};
340
341class ShmSender : public RawSender {
342 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800343 explicit ShmSender(EventLoop *event_loop, const Channel *channel)
344 : RawSender(event_loop, channel),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700345 lockless_queue_memory_(channel),
346 lockless_queue_(lockless_queue_memory_.memory(),
347 lockless_queue_memory_.config()),
348 lockless_queue_sender_(lockless_queue_.MakeSender()) {}
349
Austin Schuh39788ff2019-12-01 18:22:57 -0800350 ~ShmSender() override {}
351
Alex Perrycb7da4b2019-08-28 19:35:56 -0700352 void *data() override { return lockless_queue_sender_.Data(); }
353 size_t size() override { return lockless_queue_sender_.size(); }
Austin Schuhad154822019-12-27 15:45:13 -0800354 bool DoSend(size_t length,
355 aos::monotonic_clock::time_point monotonic_remote_time,
356 aos::realtime_clock::time_point realtime_remote_time,
357 uint32_t remote_queue_index) override {
358 lockless_queue_sender_.Send(
359 length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
360 &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800361 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700362 return true;
363 }
364
Austin Schuhad154822019-12-27 15:45:13 -0800365 bool DoSend(const void *msg, size_t length,
366 aos::monotonic_clock::time_point monotonic_remote_time,
367 aos::realtime_clock::time_point realtime_remote_time,
368 uint32_t remote_queue_index) override {
369 lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length,
370 monotonic_remote_time, realtime_remote_time,
371 remote_queue_index, &monotonic_sent_time_,
372 &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800373 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700374 // TODO(austin): Return an error if we send too fast.
375 return true;
376 }
377
Alex Perrycb7da4b2019-08-28 19:35:56 -0700378 private:
Alex Perrycb7da4b2019-08-28 19:35:56 -0700379 MMapedQueue lockless_queue_memory_;
380 ipc_lib::LocklessQueue lockless_queue_;
381 ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
382};
383
Alex Perrycb7da4b2019-08-28 19:35:56 -0700384// Class to manage the state for a Watcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800385class WatcherState : public aos::WatcherState {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700386 public:
387 WatcherState(
Austin Schuh7d87b672019-12-01 20:23:49 -0800388 ShmEventLoop *event_loop, const Channel *channel,
Austin Schuh39788ff2019-12-01 18:22:57 -0800389 std::function<void(const Context &context, const void *message)> fn)
390 : aos::WatcherState(event_loop, channel, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800391 event_loop_(event_loop),
392 event_(this),
Austin Schuh39788ff2019-12-01 18:22:57 -0800393 simple_shm_fetcher_(channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700394
Austin Schuh7d87b672019-12-01 20:23:49 -0800395 ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800396
397 void Startup(EventLoop *event_loop) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800398 simple_shm_fetcher_.PointAtNextQueueIndex();
Austin Schuh39788ff2019-12-01 18:22:57 -0800399 CHECK(RegisterWakeup(event_loop->priority()));
400 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700401
Alex Perrycb7da4b2019-08-28 19:35:56 -0700402 // Returns true if there is new data available.
Austin Schuh7d87b672019-12-01 20:23:49 -0800403 bool CheckForNewData() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700404 if (!has_new_data_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800405 has_new_data_ = simple_shm_fetcher_.FetchNext();
Austin Schuh7d87b672019-12-01 20:23:49 -0800406
407 if (has_new_data_) {
408 event_.set_event_time(
Austin Schuhad154822019-12-27 15:45:13 -0800409 simple_shm_fetcher_.context().monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800410 event_loop_->AddEvent(&event_);
411 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700412 }
413
414 return has_new_data_;
415 }
416
Alex Perrycb7da4b2019-08-28 19:35:56 -0700417 // Consumes the data by calling the callback.
Austin Schuh7d87b672019-12-01 20:23:49 -0800418 void HandleEvent() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700419 CHECK(has_new_data_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800420 DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700421 has_new_data_ = false;
Austin Schuh7d87b672019-12-01 20:23:49 -0800422 CheckForNewData();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700423 }
424
Austin Schuh39788ff2019-12-01 18:22:57 -0800425 // Registers us to receive a signal on event reception.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700426 bool RegisterWakeup(int priority) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800427 return simple_shm_fetcher_.RegisterWakeup(priority);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700428 }
429
Austin Schuh39788ff2019-12-01 18:22:57 -0800430 void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700431
432 private:
433 bool has_new_data_ = false;
434
Austin Schuh7d87b672019-12-01 20:23:49 -0800435 ShmEventLoop *event_loop_;
436 EventHandler<WatcherState> event_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800437 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700438};
439
440// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh7d87b672019-12-01 20:23:49 -0800441class TimerHandlerState final : public TimerHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700442 public:
443 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -0800444 : TimerHandler(shm_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800445 shm_event_loop_(shm_event_loop),
446 event_(this) {
447 shm_event_loop_->epoll_.OnReadable(
448 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700449 }
450
Austin Schuh7d87b672019-12-01 20:23:49 -0800451 ~TimerHandlerState() {
452 Disable();
453 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
454 }
455
456 void HandleEvent() {
457 uint64_t elapsed_cycles = timerfd_.Read();
458 if (elapsed_cycles == 0u) {
459 // We got called before the timer interrupt could happen, but because we
460 // are checking the time, we got called on time. Push the timer out by 1
461 // cycle.
462 elapsed_cycles = 1u;
463 timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
464 }
465
466 Call(monotonic_clock::now, base_);
467
468 base_ += repeat_offset_ * elapsed_cycles;
469
470 if (repeat_offset_ != chrono::seconds(0)) {
471 event_.set_event_time(base_);
472 shm_event_loop_->AddEvent(&event_);
473 }
474 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700475
476 void Setup(monotonic_clock::time_point base,
477 monotonic_clock::duration repeat_offset) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800478 if (event_.valid()) {
479 shm_event_loop_->RemoveEvent(&event_);
480 }
481
Alex Perrycb7da4b2019-08-28 19:35:56 -0700482 timerfd_.SetTime(base, repeat_offset);
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800483 base_ = base;
484 repeat_offset_ = repeat_offset;
Austin Schuh7d87b672019-12-01 20:23:49 -0800485 event_.set_event_time(base_);
486 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700487 }
488
Austin Schuh7d87b672019-12-01 20:23:49 -0800489 void Disable() override {
490 shm_event_loop_->RemoveEvent(&event_);
491 timerfd_.Disable();
492 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700493
494 private:
495 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800496 EventHandler<TimerHandlerState> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700497
498 TimerFd timerfd_;
499
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800500 monotonic_clock::time_point base_;
501 monotonic_clock::duration repeat_offset_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700502};
503
504// Adapter class to the timerfd and PhasedLoop.
Austin Schuh7d87b672019-12-01 20:23:49 -0800505class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700506 public:
507 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
508 const monotonic_clock::duration interval,
509 const monotonic_clock::duration offset)
Austin Schuh39788ff2019-12-01 18:22:57 -0800510 : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
Austin Schuh7d87b672019-12-01 20:23:49 -0800511 shm_event_loop_(shm_event_loop),
512 event_(this) {
513 shm_event_loop_->epoll_.OnReadable(
514 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
515 }
516
517 void HandleEvent() {
518 // The return value for read is the number of cycles that have elapsed.
519 // Because we check to see when this event *should* have happened, there are
520 // cases where Read() will return 0, when 1 cycle has actually happened.
521 // This occurs when the timer interrupt hasn't triggered yet. Therefore,
522 // ignore it. Call handles rescheduling and calculating elapsed cycles
523 // without any extra help.
524 timerfd_.Read();
525 event_.Invalidate();
526
527 Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
528 Schedule(sleep_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700529 });
530 }
531
Austin Schuh39788ff2019-12-01 18:22:57 -0800532 ~PhasedLoopHandler() override {
533 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
Austin Schuh7d87b672019-12-01 20:23:49 -0800534 shm_event_loop_->RemoveEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700535 }
536
537 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800538 // Reschedules the timer.
Austin Schuh39788ff2019-12-01 18:22:57 -0800539 void Schedule(monotonic_clock::time_point sleep_time) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800540 if (event_.valid()) {
541 shm_event_loop_->RemoveEvent(&event_);
542 }
543
Austin Schuh39788ff2019-12-01 18:22:57 -0800544 timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
Austin Schuh7d87b672019-12-01 20:23:49 -0800545 event_.set_event_time(sleep_time);
546 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700547 }
548
549 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800550 EventHandler<PhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700551
552 TimerFd timerfd_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700553};
554} // namespace internal
555
556::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
557 const Channel *channel) {
Austin Schuhca4828c2019-12-28 14:21:35 -0800558 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
559 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
560 << "\", \"type\": \"" << channel->type()->string_view()
561 << "\" } is not able to be fetched on this node. Check your "
562 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800563 }
564
Austin Schuh39788ff2019-12-01 18:22:57 -0800565 return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700566}
567
568::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
569 const Channel *channel) {
570 Take(channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800571
572 return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700573}
574
575void ShmEventLoop::MakeRawWatcher(
576 const Channel *channel,
577 std::function<void(const Context &context, const void *message)> watcher) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700578 Take(channel);
579
Austin Schuhca4828c2019-12-28 14:21:35 -0800580 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
581 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
582 << "\", \"type\": \"" << channel->type()->string_view()
583 << "\" } is not able to be watched on this node. Check your "
584 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800585 }
586
Austin Schuh39788ff2019-12-01 18:22:57 -0800587 NewWatcher(::std::unique_ptr<WatcherState>(
588 new internal::WatcherState(this, channel, std::move(watcher))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700589}
590
591TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800592 return NewTimer(::std::unique_ptr<TimerHandler>(
593 new internal::TimerHandlerState(this, ::std::move(callback))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700594}
595
596PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
597 ::std::function<void(int)> callback,
598 const monotonic_clock::duration interval,
599 const monotonic_clock::duration offset) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800600 return NewPhasedLoop(
601 ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
602 this, ::std::move(callback), interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700603}
604
605void ShmEventLoop::OnRun(::std::function<void()> on_run) {
606 on_run_.push_back(::std::move(on_run));
607}
608
Austin Schuh7d87b672019-12-01 20:23:49 -0800609void ShmEventLoop::HandleEvent() {
610 // Update all the times for handlers.
611 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
612 internal::WatcherState *watcher =
613 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
614
615 watcher->CheckForNewData();
616 }
617
Austin Schuh39788ff2019-12-01 18:22:57 -0800618 while (true) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800619 if (EventCount() == 0 ||
620 PeekEvent()->event_time() > monotonic_clock::now()) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800621 break;
622 }
623
Austin Schuh7d87b672019-12-01 20:23:49 -0800624 EventLoopEvent *event = PopEvent();
625 event->HandleEvent();
Austin Schuh39788ff2019-12-01 18:22:57 -0800626 }
627}
628
Austin Schuh32fd5a72019-12-01 22:20:26 -0800629// RAII class to mask signals.
630class ScopedSignalMask {
631 public:
632 ScopedSignalMask(std::initializer_list<int> signals) {
633 sigset_t sigset;
634 PCHECK(sigemptyset(&sigset) == 0);
635 for (int signal : signals) {
636 PCHECK(sigaddset(&sigset, signal) == 0);
637 }
638
639 PCHECK(sigprocmask(SIG_BLOCK, &sigset, &old_) == 0);
640 }
641
642 ~ScopedSignalMask() { PCHECK(sigprocmask(SIG_SETMASK, &old_, nullptr) == 0); }
643
644 private:
645 sigset_t old_;
646};
647
648// Class to manage the static state associated with killing multiple event
649// loops.
650class SignalHandler {
651 public:
652 // Gets the singleton.
653 static SignalHandler *global() {
654 static SignalHandler loop;
655 return &loop;
656 }
657
658 // Handles the signal with the singleton.
659 static void HandleSignal(int) { global()->DoHandleSignal(); }
660
661 // Registers an event loop to receive Exit() calls.
662 void Register(ShmEventLoop *event_loop) {
663 // Block signals while we have the mutex so we never race with the signal
664 // handler.
665 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
666 std::unique_lock<stl_mutex> locker(mutex_);
667 if (event_loops_.size() == 0) {
668 // The first caller registers the signal handler.
669 struct sigaction new_action;
670 sigemptyset(&new_action.sa_mask);
671 // This makes it so that 2 control c's to a stuck process will kill it by
672 // restoring the original signal handler.
673 new_action.sa_flags = SA_RESETHAND;
674 new_action.sa_handler = &HandleSignal;
675
676 PCHECK(sigaction(SIGINT, &new_action, &old_action_int_) == 0);
677 PCHECK(sigaction(SIGHUP, &new_action, &old_action_hup_) == 0);
678 PCHECK(sigaction(SIGTERM, &new_action, &old_action_term_) == 0);
679 }
680
681 event_loops_.push_back(event_loop);
682 }
683
684 // Unregisters an event loop to receive Exit() calls.
685 void Unregister(ShmEventLoop *event_loop) {
686 // Block signals while we have the mutex so we never race with the signal
687 // handler.
688 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
689 std::unique_lock<stl_mutex> locker(mutex_);
690
691 event_loops_.erase(std::find(event_loops_.begin(), event_loops_.end(), event_loop));
692
693 if (event_loops_.size() == 0u) {
694 // The last caller restores the original signal handlers.
695 PCHECK(sigaction(SIGINT, &old_action_int_, nullptr) == 0);
696 PCHECK(sigaction(SIGHUP, &old_action_hup_, nullptr) == 0);
697 PCHECK(sigaction(SIGTERM, &old_action_term_, nullptr) == 0);
698 }
699 }
700
701 private:
702 void DoHandleSignal() {
703 // We block signals while grabbing the lock, so there should never be a
704 // race. Confirm that this is true using trylock.
705 CHECK(mutex_.try_lock()) << ": sigprocmask failed to block signals while "
706 "modifing the event loop list.";
707 for (ShmEventLoop *event_loop : event_loops_) {
708 event_loop->Exit();
709 }
710 mutex_.unlock();
711 }
712
713 // Mutex to protect all state.
714 stl_mutex mutex_;
715 std::vector<ShmEventLoop *> event_loops_;
716 struct sigaction old_action_int_;
717 struct sigaction old_action_hup_;
718 struct sigaction old_action_term_;
719};
720
Alex Perrycb7da4b2019-08-28 19:35:56 -0700721void ShmEventLoop::Run() {
Austin Schuh32fd5a72019-12-01 22:20:26 -0800722 SignalHandler::global()->Register(this);
Austin Schuh39788ff2019-12-01 18:22:57 -0800723
Alex Perrycb7da4b2019-08-28 19:35:56 -0700724 std::unique_ptr<ipc_lib::SignalFd> signalfd;
725
726 if (watchers_.size() > 0) {
727 signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
728
729 epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
730 signalfd_siginfo result = signalfd_ptr->Read();
731 CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
732
733 // TODO(austin): We should really be checking *everything*, not just
734 // watchers, and calling the oldest thing first. That will improve
735 // determinism a lot.
736
Austin Schuh7d87b672019-12-01 20:23:49 -0800737 HandleEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700738 });
739 }
740
Austin Schuh39788ff2019-12-01 18:22:57 -0800741 MaybeScheduleTimingReports();
742
Austin Schuh7d87b672019-12-01 20:23:49 -0800743 ReserveEvents();
744
Austin Schuh39788ff2019-12-01 18:22:57 -0800745 // Now, all the callbacks are setup. Lock everything into memory and go RT.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700746 if (priority_ != 0) {
747 ::aos::InitRT();
748
749 LOG(INFO) << "Setting priority to " << priority_;
750 ::aos::SetCurrentThreadRealtimePriority(priority_);
751 }
752
753 set_is_running(true);
754
755 // Now that we are realtime (but before the OnRun handlers run), snap the
756 // queue index.
Austin Schuh39788ff2019-12-01 18:22:57 -0800757 for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
758 watcher->Startup(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700759 }
760
761 // Now that we are RT, run all the OnRun handlers.
762 for (const auto &run : on_run_) {
763 run();
764 }
765
Alex Perrycb7da4b2019-08-28 19:35:56 -0700766 // And start our main event loop which runs all the timers and handles Quit.
767 epoll_.Run();
768
769 // Once epoll exits, there is no useful nonrt work left to do.
770 set_is_running(false);
771
772 // Nothing time or synchronization critical needs to happen after this point.
773 // Drop RT priority.
774 ::aos::UnsetCurrentThreadRealtimePriority();
775
Austin Schuh39788ff2019-12-01 18:22:57 -0800776 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
777 internal::WatcherState *watcher =
778 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700779 watcher->UnregisterWakeup();
780 }
781
782 if (watchers_.size() > 0) {
783 epoll_.DeleteFd(signalfd->fd());
784 signalfd.reset();
785 }
Austin Schuh32fd5a72019-12-01 22:20:26 -0800786
787 SignalHandler::global()->Unregister(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700788}
789
790void ShmEventLoop::Exit() { epoll_.Quit(); }
791
792ShmEventLoop::~ShmEventLoop() {
Austin Schuh39788ff2019-12-01 18:22:57 -0800793 // Trigger any remaining senders or fetchers to be cleared before destroying
794 // the event loop so the book keeping matches.
795 timing_report_sender_.reset();
796
797 // Force everything with a registered fd with epoll to be destroyed now.
798 timers_.clear();
799 phased_loops_.clear();
800 watchers_.clear();
801
Alex Perrycb7da4b2019-08-28 19:35:56 -0700802 CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
803}
804
805void ShmEventLoop::Take(const Channel *channel) {
806 CHECK(!is_running()) << ": Cannot add new objects while running.";
807
808 // Cheat aggresively. Use the shared memory path as a proxy for a unique
809 // identifier for the channel.
810 const std::string path = ShmPath(channel);
811
812 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
813 CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
814
815 taken_.emplace_back(path);
816}
817
818void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
819 if (is_running()) {
820 LOG(FATAL) << "Cannot set realtime priority while running.";
821 }
822 priority_ = priority;
823}
824
Austin Schuh39788ff2019-12-01 18:22:57 -0800825pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
826
Alex Perrycb7da4b2019-08-28 19:35:56 -0700827} // namespace aos