blob: cde538a67143f0e4610f100fc2f4d9c825d9af3b [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/shm_event_loop.h"
2
3#include <sys/mman.h>
4#include <sys/stat.h>
Austin Schuh39788ff2019-12-01 18:22:57 -08005#include <sys/syscall.h>
Alex Perrycb7da4b2019-08-28 19:35:56 -07006#include <sys/types.h>
7#include <unistd.h>
8#include <algorithm>
9#include <atomic>
10#include <chrono>
Austin Schuh39788ff2019-12-01 18:22:57 -080011#include <iterator>
Alex Perrycb7da4b2019-08-28 19:35:56 -070012#include <stdexcept>
13
14#include "aos/events/epoll.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080015#include "aos/events/event_loop_generated.h"
16#include "aos/events/timing_statistics.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/ipc_lib/lockless_queue.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080018#include "aos/ipc_lib/signalfd.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070019#include "aos/realtime.h"
Austin Schuh32fd5a72019-12-01 22:20:26 -080020#include "aos/stl_mutex/stl_mutex.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070021#include "aos/util/phased_loop.h"
Austin Schuh39788ff2019-12-01 18:22:57 -080022#include "glog/logging.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070023
24DEFINE_string(shm_base, "/dev/shm/aos",
25 "Directory to place queue backing mmaped files in.");
26DEFINE_uint32(permissions, 0770,
27 "Permissions to make shared memory files and folders.");
28
29namespace aos {
30
31std::string ShmFolder(const Channel *channel) {
32 CHECK(channel->has_name());
33 CHECK_EQ(channel->name()->string_view()[0], '/');
34 return FLAGS_shm_base + channel->name()->str() + "/";
35}
36std::string ShmPath(const Channel *channel) {
37 CHECK(channel->has_type());
Austin Schuhad154822019-12-27 15:45:13 -080038 return ShmFolder(channel) + channel->type()->str() + ".v1";
Alex Perrycb7da4b2019-08-28 19:35:56 -070039}
40
41class MMapedQueue {
42 public:
Austin Schuhaa79e4e2019-12-29 20:43:32 -080043 MMapedQueue(const Channel *channel,
44 const std::chrono::seconds channel_storage_duration) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070045 std::string path = ShmPath(channel);
46
Austin Schuh80c7fce2019-12-05 20:48:43 -080047 config_.num_watchers = channel->num_watchers();
48 config_.num_senders = channel->num_senders();
Austin Schuhaa79e4e2019-12-29 20:43:32 -080049 config_.queue_size =
50 channel_storage_duration.count() * channel->frequency();
Alex Perrycb7da4b2019-08-28 19:35:56 -070051 config_.message_data_size = channel->max_size();
52
53 size_ = ipc_lib::LocklessQueueMemorySize(config_);
54
55 MkdirP(path);
56
57 // There are 2 cases. Either the file already exists, or it does not
58 // already exist and we need to create it. Start by trying to create it. If
59 // that fails, the file has already been created and we can open it
60 // normally.. Once the file has been created it wil never be deleted.
61 fd_ = open(path.c_str(), O_RDWR | O_CREAT | O_EXCL,
62 O_CLOEXEC | FLAGS_permissions);
63 if (fd_ == -1 && errno == EEXIST) {
64 VLOG(1) << path << " already created.";
65 // File already exists.
66 fd_ = open(path.c_str(), O_RDWR, O_CLOEXEC);
67 PCHECK(fd_ != -1) << ": Failed to open " << path;
68 while (true) {
69 struct stat st;
70 PCHECK(fstat(fd_, &st) == 0);
71 if (st.st_size != 0) {
72 CHECK_EQ(static_cast<size_t>(st.st_size), size_)
73 << ": Size of " << path
74 << " doesn't match expected size of backing queue file. Did the "
75 "queue definition change?";
76 break;
77 } else {
78 // The creating process didn't get around to it yet. Give it a bit.
79 std::this_thread::sleep_for(std::chrono::milliseconds(10));
80 VLOG(1) << path << " is zero size, waiting";
81 }
82 }
83 } else {
84 VLOG(1) << "Created " << path;
85 PCHECK(fd_ != -1) << ": Failed to open " << path;
86 PCHECK(ftruncate(fd_, size_) == 0);
87 }
88
89 data_ = mmap(NULL, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
90 PCHECK(data_ != MAP_FAILED);
91
92 ipc_lib::InitializeLocklessQueueMemory(memory(), config_);
93 }
94
95 ~MMapedQueue() {
96 PCHECK(munmap(data_, size_) == 0);
97 PCHECK(close(fd_) == 0);
98 }
99
100 ipc_lib::LocklessQueueMemory *memory() const {
101 return reinterpret_cast<ipc_lib::LocklessQueueMemory *>(data_);
102 }
103
Austin Schuh39788ff2019-12-01 18:22:57 -0800104 const ipc_lib::LocklessQueueConfiguration &config() const { return config_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700105
106 private:
James Kuszmaul3ae42262019-11-08 12:33:41 -0800107 void MkdirP(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700108 auto last_slash_pos = path.find_last_of("/");
109
James Kuszmaul3ae42262019-11-08 12:33:41 -0800110 std::string folder(last_slash_pos == std::string_view::npos
111 ? std::string_view("")
Alex Perrycb7da4b2019-08-28 19:35:56 -0700112 : path.substr(0, last_slash_pos));
Austin Schuh8ec76182019-12-23 16:28:00 -0800113 if (folder.empty()) return;
114 MkdirP(folder);
115 VLOG(1) << "Creating " << folder;
116 const int result = mkdir(folder.c_str(), FLAGS_permissions);
117 if (result == -1 && errno == EEXIST) {
118 VLOG(1) << "Already exists";
119 return;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700120 }
Austin Schuh8ec76182019-12-23 16:28:00 -0800121 PCHECK(result == 0) << ": Error creating " << folder;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700122 }
123
124 ipc_lib::LocklessQueueConfiguration config_;
125
126 int fd_;
127
128 size_t size_;
129 void *data_;
130};
131
Austin Schuh217a9782019-12-21 23:02:50 -0800132namespace {
133
Alex Perrycb7da4b2019-08-28 19:35:56 -0700134// Returns the portion of the path after the last /.
James Kuszmaul3ae42262019-11-08 12:33:41 -0800135std::string_view Filename(std::string_view path) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700136 auto last_slash_pos = path.find_last_of("/");
137
James Kuszmaul3ae42262019-11-08 12:33:41 -0800138 return last_slash_pos == std::string_view::npos
Alex Perrycb7da4b2019-08-28 19:35:56 -0700139 ? path
140 : path.substr(last_slash_pos + 1, path.size());
141}
142
Austin Schuh217a9782019-12-21 23:02:50 -0800143const Node *MaybeMyNode(const Configuration *configuration) {
144 if (!configuration->has_nodes()) {
145 return nullptr;
146 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700147
Austin Schuh217a9782019-12-21 23:02:50 -0800148 return configuration::GetMyNode(configuration);
149}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700150
151namespace chrono = ::std::chrono;
152
Austin Schuh39788ff2019-12-01 18:22:57 -0800153} // namespace
154
Austin Schuh217a9782019-12-21 23:02:50 -0800155ShmEventLoop::ShmEventLoop(const Configuration *configuration)
156 : EventLoop(configuration),
157 name_(Filename(program_invocation_name)),
Austin Schuh15649d62019-12-28 16:36:38 -0800158 node_(MaybeMyNode(configuration)) {
159 if (configuration->has_nodes()) {
160 CHECK(node_ != nullptr) << ": Couldn't find node in config.";
161 }
162}
Austin Schuh217a9782019-12-21 23:02:50 -0800163
Austin Schuh39788ff2019-12-01 18:22:57 -0800164namespace internal {
165
166class SimpleShmFetcher {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700167 public:
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800168 explicit SimpleShmFetcher(EventLoop *event_loop, const Channel *channel)
Austin Schuhf5652592019-12-29 16:26:15 -0800169 : channel_(channel),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800170 lockless_queue_memory_(
171 channel,
172 chrono::duration_cast<chrono::seconds>(chrono::nanoseconds(
173 event_loop->configuration()->channel_storage_duration()))),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700174 lockless_queue_(lockless_queue_memory_.memory(),
175 lockless_queue_memory_.config()),
176 data_storage_(static_cast<AlignedChar *>(aligned_alloc(
177 alignof(AlignedChar), channel->max_size())),
178 &free) {
179 context_.data = nullptr;
180 // Point the queue index at the next index to read starting now. This
181 // makes it such that FetchNext will read the next message sent after
182 // the fetcher is created.
183 PointAtNextQueueIndex();
184 }
185
Austin Schuh39788ff2019-12-01 18:22:57 -0800186 ~SimpleShmFetcher() {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700187
188 // Points the next message to fetch at the queue index which will be
189 // populated next.
190 void PointAtNextQueueIndex() {
191 actual_queue_index_ = lockless_queue_.LatestQueueIndex();
192 if (!actual_queue_index_.valid()) {
193 // Nothing in the queue. The next element will show up at the 0th
194 // index in the queue.
195 actual_queue_index_ =
196 ipc_lib::QueueIndex::Zero(lockless_queue_.queue_size());
197 } else {
198 actual_queue_index_ = actual_queue_index_.Increment();
199 }
200 }
201
Austin Schuh39788ff2019-12-01 18:22:57 -0800202 bool FetchNext() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700203 // TODO(austin): Get behind and make sure it dies both here and with
204 // Fetch.
205 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
Austin Schuhad154822019-12-27 15:45:13 -0800206 actual_queue_index_.index(), &context_.monotonic_event_time,
207 &context_.realtime_event_time, &context_.monotonic_remote_time,
208 &context_.realtime_remote_time, &context_.remote_queue_index,
209 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700210 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
211 context_.queue_index = actual_queue_index_.index();
Austin Schuhad154822019-12-27 15:45:13 -0800212 if (context_.remote_queue_index == 0xffffffffu) {
213 context_.remote_queue_index = context_.queue_index;
214 }
215 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
216 context_.monotonic_remote_time = context_.monotonic_event_time;
217 }
218 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
219 context_.realtime_remote_time = context_.realtime_event_time;
220 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800221 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
222 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700223 actual_queue_index_ = actual_queue_index_.Increment();
224 }
225
226 // Make sure the data wasn't modified while we were reading it. This
227 // can only happen if you are reading the last message *while* it is
228 // being written to, which means you are pretty far behind.
229 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
230 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800231 "out from under us while we were reading it. Don't get so far "
232 "behind. "
233 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700234
235 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800236 << ": The next message is no longer available. "
237 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700238 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
239 }
240
Austin Schuh39788ff2019-12-01 18:22:57 -0800241 bool Fetch() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700242 const ipc_lib::QueueIndex queue_index = lockless_queue_.LatestQueueIndex();
243 // actual_queue_index_ is only meaningful if it was set by Fetch or
244 // FetchNext. This happens when valid_data_ has been set. So, only
245 // skip checking if valid_data_ is true.
246 //
247 // Also, if the latest queue index is invalid, we are empty. So there
248 // is nothing to fetch.
Austin Schuh39788ff2019-12-01 18:22:57 -0800249 if ((context_.data != nullptr &&
Alex Perrycb7da4b2019-08-28 19:35:56 -0700250 queue_index == actual_queue_index_.DecrementBy(1u)) ||
251 !queue_index.valid()) {
252 return false;
253 }
254
Austin Schuhad154822019-12-27 15:45:13 -0800255 ipc_lib::LocklessQueue::ReadResult read_result = lockless_queue_.Read(
256 queue_index.index(), &context_.monotonic_event_time,
257 &context_.realtime_event_time, &context_.monotonic_remote_time,
258 &context_.realtime_remote_time, &context_.remote_queue_index,
259 &context_.size, reinterpret_cast<char *>(data_storage_.get()));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700260 if (read_result == ipc_lib::LocklessQueue::ReadResult::GOOD) {
261 context_.queue_index = queue_index.index();
Austin Schuhad154822019-12-27 15:45:13 -0800262 if (context_.remote_queue_index == 0xffffffffu) {
263 context_.remote_queue_index = context_.queue_index;
264 }
265 if (context_.monotonic_remote_time == aos::monotonic_clock::min_time) {
266 context_.monotonic_remote_time = context_.monotonic_event_time;
267 }
268 if (context_.realtime_remote_time == aos::realtime_clock::min_time) {
269 context_.realtime_remote_time = context_.realtime_event_time;
270 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800271 context_.data = reinterpret_cast<char *>(data_storage_.get()) +
272 lockless_queue_.message_data_size() - context_.size;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700273 actual_queue_index_ = queue_index.Increment();
274 }
275
276 // Make sure the data wasn't modified while we were reading it. This
277 // can only happen if you are reading the last message *while* it is
278 // being written to, which means you are pretty far behind.
279 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::OVERWROTE)
280 << ": Got behind while reading and the last message was modified "
Austin Schuhf5652592019-12-29 16:26:15 -0800281 "out from under us while we were reading it. Don't get so far "
282 "behind."
283 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700284
285 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::NOTHING_NEW)
Austin Schuhf5652592019-12-29 16:26:15 -0800286 << ": Queue index went backwards. This should never happen. "
287 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700288
289 // We fell behind between when we read the index and read the value.
290 // This isn't worth recovering from since this means we went to sleep
291 // for a long time in the middle of this function.
292 CHECK(read_result != ipc_lib::LocklessQueue::ReadResult::TOO_OLD)
Austin Schuhf5652592019-12-29 16:26:15 -0800293 << ": The next message is no longer available. "
294 << configuration::CleanedChannelToString(channel_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700295 return read_result == ipc_lib::LocklessQueue::ReadResult::GOOD;
296 }
297
Austin Schuh39788ff2019-12-01 18:22:57 -0800298 Context context() const { return context_; }
299
Alex Perrycb7da4b2019-08-28 19:35:56 -0700300 bool RegisterWakeup(int priority) {
301 return lockless_queue_.RegisterWakeup(priority);
302 }
303
304 void UnregisterWakeup() { lockless_queue_.UnregisterWakeup(); }
305
306 private:
Austin Schuhf5652592019-12-29 16:26:15 -0800307 const Channel *const channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700308 MMapedQueue lockless_queue_memory_;
309 ipc_lib::LocklessQueue lockless_queue_;
310
311 ipc_lib::QueueIndex actual_queue_index_ =
312 ipc_lib::LocklessQueue::empty_queue_index();
313
314 struct AlignedChar {
315 alignas(32) char data;
316 };
317
318 std::unique_ptr<AlignedChar, decltype(&free)> data_storage_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800319
320 Context context_;
321};
322
323class ShmFetcher : public RawFetcher {
324 public:
325 explicit ShmFetcher(EventLoop *event_loop, const Channel *channel)
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800326 : RawFetcher(event_loop, channel),
327 simple_shm_fetcher_(event_loop, channel) {}
Austin Schuh39788ff2019-12-01 18:22:57 -0800328
329 ~ShmFetcher() { context_.data = nullptr; }
330
331 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
332 if (simple_shm_fetcher_.FetchNext()) {
333 context_ = simple_shm_fetcher_.context();
334 return std::make_pair(true, monotonic_clock::now());
335 }
336 return std::make_pair(false, monotonic_clock::min_time);
337 }
338
339 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
340 if (simple_shm_fetcher_.Fetch()) {
341 context_ = simple_shm_fetcher_.context();
342 return std::make_pair(true, monotonic_clock::now());
343 }
344 return std::make_pair(false, monotonic_clock::min_time);
345 }
346
347 private:
348 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700349};
350
351class ShmSender : public RawSender {
352 public:
Austin Schuh39788ff2019-12-01 18:22:57 -0800353 explicit ShmSender(EventLoop *event_loop, const Channel *channel)
354 : RawSender(event_loop, channel),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800355 lockless_queue_memory_(
356 channel,
357 chrono::duration_cast<chrono::seconds>(chrono::nanoseconds(
358 event_loop->configuration()->channel_storage_duration()))),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700359 lockless_queue_(lockless_queue_memory_.memory(),
360 lockless_queue_memory_.config()),
361 lockless_queue_sender_(lockless_queue_.MakeSender()) {}
362
Austin Schuh39788ff2019-12-01 18:22:57 -0800363 ~ShmSender() override {}
364
Alex Perrycb7da4b2019-08-28 19:35:56 -0700365 void *data() override { return lockless_queue_sender_.Data(); }
366 size_t size() override { return lockless_queue_sender_.size(); }
Austin Schuhad154822019-12-27 15:45:13 -0800367 bool DoSend(size_t length,
368 aos::monotonic_clock::time_point monotonic_remote_time,
369 aos::realtime_clock::time_point realtime_remote_time,
370 uint32_t remote_queue_index) override {
371 lockless_queue_sender_.Send(
372 length, monotonic_remote_time, realtime_remote_time, remote_queue_index,
373 &monotonic_sent_time_, &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800374 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700375 return true;
376 }
377
Austin Schuhad154822019-12-27 15:45:13 -0800378 bool DoSend(const void *msg, size_t length,
379 aos::monotonic_clock::time_point monotonic_remote_time,
380 aos::realtime_clock::time_point realtime_remote_time,
381 uint32_t remote_queue_index) override {
382 lockless_queue_sender_.Send(reinterpret_cast<const char *>(msg), length,
383 monotonic_remote_time, realtime_remote_time,
384 remote_queue_index, &monotonic_sent_time_,
385 &realtime_sent_time_, &sent_queue_index_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800386 lockless_queue_.Wakeup(event_loop()->priority());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700387 // TODO(austin): Return an error if we send too fast.
388 return true;
389 }
390
Alex Perrycb7da4b2019-08-28 19:35:56 -0700391 private:
Alex Perrycb7da4b2019-08-28 19:35:56 -0700392 MMapedQueue lockless_queue_memory_;
393 ipc_lib::LocklessQueue lockless_queue_;
394 ipc_lib::LocklessQueue::Sender lockless_queue_sender_;
395};
396
Alex Perrycb7da4b2019-08-28 19:35:56 -0700397// Class to manage the state for a Watcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800398class WatcherState : public aos::WatcherState {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700399 public:
400 WatcherState(
Austin Schuh7d87b672019-12-01 20:23:49 -0800401 ShmEventLoop *event_loop, const Channel *channel,
Austin Schuh39788ff2019-12-01 18:22:57 -0800402 std::function<void(const Context &context, const void *message)> fn)
403 : aos::WatcherState(event_loop, channel, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800404 event_loop_(event_loop),
405 event_(this),
Austin Schuhaa79e4e2019-12-29 20:43:32 -0800406 simple_shm_fetcher_(event_loop, channel) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -0700407
Austin Schuh7d87b672019-12-01 20:23:49 -0800408 ~WatcherState() override { event_loop_->RemoveEvent(&event_); }
Austin Schuh39788ff2019-12-01 18:22:57 -0800409
410 void Startup(EventLoop *event_loop) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800411 simple_shm_fetcher_.PointAtNextQueueIndex();
Austin Schuh39788ff2019-12-01 18:22:57 -0800412 CHECK(RegisterWakeup(event_loop->priority()));
413 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700414
Alex Perrycb7da4b2019-08-28 19:35:56 -0700415 // Returns true if there is new data available.
Austin Schuh7d87b672019-12-01 20:23:49 -0800416 bool CheckForNewData() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700417 if (!has_new_data_) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800418 has_new_data_ = simple_shm_fetcher_.FetchNext();
Austin Schuh7d87b672019-12-01 20:23:49 -0800419
420 if (has_new_data_) {
421 event_.set_event_time(
Austin Schuhad154822019-12-27 15:45:13 -0800422 simple_shm_fetcher_.context().monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800423 event_loop_->AddEvent(&event_);
424 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700425 }
426
427 return has_new_data_;
428 }
429
Alex Perrycb7da4b2019-08-28 19:35:56 -0700430 // Consumes the data by calling the callback.
Austin Schuh7d87b672019-12-01 20:23:49 -0800431 void HandleEvent() {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700432 CHECK(has_new_data_);
Austin Schuh39788ff2019-12-01 18:22:57 -0800433 DoCallCallback(monotonic_clock::now, simple_shm_fetcher_.context());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700434 has_new_data_ = false;
Austin Schuh7d87b672019-12-01 20:23:49 -0800435 CheckForNewData();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700436 }
437
Austin Schuh39788ff2019-12-01 18:22:57 -0800438 // Registers us to receive a signal on event reception.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700439 bool RegisterWakeup(int priority) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800440 return simple_shm_fetcher_.RegisterWakeup(priority);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700441 }
442
Austin Schuh39788ff2019-12-01 18:22:57 -0800443 void UnregisterWakeup() { return simple_shm_fetcher_.UnregisterWakeup(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700444
445 private:
446 bool has_new_data_ = false;
447
Austin Schuh7d87b672019-12-01 20:23:49 -0800448 ShmEventLoop *event_loop_;
449 EventHandler<WatcherState> event_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800450 SimpleShmFetcher simple_shm_fetcher_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700451};
452
453// Adapter class to adapt a timerfd to a TimerHandler.
Austin Schuh7d87b672019-12-01 20:23:49 -0800454class TimerHandlerState final : public TimerHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700455 public:
456 TimerHandlerState(ShmEventLoop *shm_event_loop, ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -0800457 : TimerHandler(shm_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -0800458 shm_event_loop_(shm_event_loop),
459 event_(this) {
460 shm_event_loop_->epoll_.OnReadable(
461 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700462 }
463
Austin Schuh7d87b672019-12-01 20:23:49 -0800464 ~TimerHandlerState() {
465 Disable();
466 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
467 }
468
469 void HandleEvent() {
470 uint64_t elapsed_cycles = timerfd_.Read();
471 if (elapsed_cycles == 0u) {
472 // We got called before the timer interrupt could happen, but because we
473 // are checking the time, we got called on time. Push the timer out by 1
474 // cycle.
475 elapsed_cycles = 1u;
476 timerfd_.SetTime(base_ + repeat_offset_, repeat_offset_);
477 }
478
479 Call(monotonic_clock::now, base_);
480
481 base_ += repeat_offset_ * elapsed_cycles;
482
483 if (repeat_offset_ != chrono::seconds(0)) {
484 event_.set_event_time(base_);
485 shm_event_loop_->AddEvent(&event_);
486 }
487 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700488
489 void Setup(monotonic_clock::time_point base,
490 monotonic_clock::duration repeat_offset) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800491 if (event_.valid()) {
492 shm_event_loop_->RemoveEvent(&event_);
493 }
494
Alex Perrycb7da4b2019-08-28 19:35:56 -0700495 timerfd_.SetTime(base, repeat_offset);
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800496 base_ = base;
497 repeat_offset_ = repeat_offset;
Austin Schuh7d87b672019-12-01 20:23:49 -0800498 event_.set_event_time(base_);
499 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700500 }
501
Austin Schuh7d87b672019-12-01 20:23:49 -0800502 void Disable() override {
503 shm_event_loop_->RemoveEvent(&event_);
504 timerfd_.Disable();
505 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700506
507 private:
508 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800509 EventHandler<TimerHandlerState> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700510
511 TimerFd timerfd_;
512
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800513 monotonic_clock::time_point base_;
514 monotonic_clock::duration repeat_offset_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700515};
516
517// Adapter class to the timerfd and PhasedLoop.
Austin Schuh7d87b672019-12-01 20:23:49 -0800518class PhasedLoopHandler final : public ::aos::PhasedLoopHandler {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700519 public:
520 PhasedLoopHandler(ShmEventLoop *shm_event_loop, ::std::function<void(int)> fn,
521 const monotonic_clock::duration interval,
522 const monotonic_clock::duration offset)
Austin Schuh39788ff2019-12-01 18:22:57 -0800523 : aos::PhasedLoopHandler(shm_event_loop, std::move(fn), interval, offset),
Austin Schuh7d87b672019-12-01 20:23:49 -0800524 shm_event_loop_(shm_event_loop),
525 event_(this) {
526 shm_event_loop_->epoll_.OnReadable(
527 timerfd_.fd(), [this]() { shm_event_loop_->HandleEvent(); });
528 }
529
530 void HandleEvent() {
531 // The return value for read is the number of cycles that have elapsed.
532 // Because we check to see when this event *should* have happened, there are
533 // cases where Read() will return 0, when 1 cycle has actually happened.
534 // This occurs when the timer interrupt hasn't triggered yet. Therefore,
535 // ignore it. Call handles rescheduling and calculating elapsed cycles
536 // without any extra help.
537 timerfd_.Read();
538 event_.Invalidate();
539
540 Call(monotonic_clock::now, [this](monotonic_clock::time_point sleep_time) {
541 Schedule(sleep_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700542 });
543 }
544
Austin Schuh39788ff2019-12-01 18:22:57 -0800545 ~PhasedLoopHandler() override {
546 shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
Austin Schuh7d87b672019-12-01 20:23:49 -0800547 shm_event_loop_->RemoveEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700548 }
549
550 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800551 // Reschedules the timer.
Austin Schuh39788ff2019-12-01 18:22:57 -0800552 void Schedule(monotonic_clock::time_point sleep_time) override {
Austin Schuh7d87b672019-12-01 20:23:49 -0800553 if (event_.valid()) {
554 shm_event_loop_->RemoveEvent(&event_);
555 }
556
Austin Schuh39788ff2019-12-01 18:22:57 -0800557 timerfd_.SetTime(sleep_time, ::aos::monotonic_clock::zero());
Austin Schuh7d87b672019-12-01 20:23:49 -0800558 event_.set_event_time(sleep_time);
559 shm_event_loop_->AddEvent(&event_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700560 }
561
562 ShmEventLoop *shm_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800563 EventHandler<PhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700564
565 TimerFd timerfd_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700566};
567} // namespace internal
568
569::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
570 const Channel *channel) {
Austin Schuhca4828c2019-12-28 14:21:35 -0800571 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
572 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
573 << "\", \"type\": \"" << channel->type()->string_view()
574 << "\" } is not able to be fetched on this node. Check your "
575 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800576 }
577
Austin Schuh39788ff2019-12-01 18:22:57 -0800578 return ::std::unique_ptr<RawFetcher>(new internal::ShmFetcher(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700579}
580
581::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
582 const Channel *channel) {
583 Take(channel);
Austin Schuh39788ff2019-12-01 18:22:57 -0800584
585 return ::std::unique_ptr<RawSender>(new internal::ShmSender(this, channel));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700586}
587
588void ShmEventLoop::MakeRawWatcher(
589 const Channel *channel,
590 std::function<void(const Context &context, const void *message)> watcher) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700591 Take(channel);
592
Austin Schuhca4828c2019-12-28 14:21:35 -0800593 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
594 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
595 << "\", \"type\": \"" << channel->type()->string_view()
596 << "\" } is not able to be watched on this node. Check your "
597 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800598 }
599
Austin Schuh39788ff2019-12-01 18:22:57 -0800600 NewWatcher(::std::unique_ptr<WatcherState>(
601 new internal::WatcherState(this, channel, std::move(watcher))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700602}
603
604TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800605 return NewTimer(::std::unique_ptr<TimerHandler>(
606 new internal::TimerHandlerState(this, ::std::move(callback))));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700607}
608
609PhasedLoopHandler *ShmEventLoop::AddPhasedLoop(
610 ::std::function<void(int)> callback,
611 const monotonic_clock::duration interval,
612 const monotonic_clock::duration offset) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800613 return NewPhasedLoop(
614 ::std::unique_ptr<PhasedLoopHandler>(new internal::PhasedLoopHandler(
615 this, ::std::move(callback), interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700616}
617
618void ShmEventLoop::OnRun(::std::function<void()> on_run) {
619 on_run_.push_back(::std::move(on_run));
620}
621
Austin Schuh7d87b672019-12-01 20:23:49 -0800622void ShmEventLoop::HandleEvent() {
623 // Update all the times for handlers.
624 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
625 internal::WatcherState *watcher =
626 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
627
628 watcher->CheckForNewData();
629 }
630
Austin Schuh39788ff2019-12-01 18:22:57 -0800631 while (true) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800632 if (EventCount() == 0 ||
633 PeekEvent()->event_time() > monotonic_clock::now()) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800634 break;
635 }
636
Austin Schuh7d87b672019-12-01 20:23:49 -0800637 EventLoopEvent *event = PopEvent();
638 event->HandleEvent();
Austin Schuh39788ff2019-12-01 18:22:57 -0800639 }
640}
641
Austin Schuh32fd5a72019-12-01 22:20:26 -0800642// RAII class to mask signals.
643class ScopedSignalMask {
644 public:
645 ScopedSignalMask(std::initializer_list<int> signals) {
646 sigset_t sigset;
647 PCHECK(sigemptyset(&sigset) == 0);
648 for (int signal : signals) {
649 PCHECK(sigaddset(&sigset, signal) == 0);
650 }
651
652 PCHECK(sigprocmask(SIG_BLOCK, &sigset, &old_) == 0);
653 }
654
655 ~ScopedSignalMask() { PCHECK(sigprocmask(SIG_SETMASK, &old_, nullptr) == 0); }
656
657 private:
658 sigset_t old_;
659};
660
661// Class to manage the static state associated with killing multiple event
662// loops.
663class SignalHandler {
664 public:
665 // Gets the singleton.
666 static SignalHandler *global() {
667 static SignalHandler loop;
668 return &loop;
669 }
670
671 // Handles the signal with the singleton.
672 static void HandleSignal(int) { global()->DoHandleSignal(); }
673
674 // Registers an event loop to receive Exit() calls.
675 void Register(ShmEventLoop *event_loop) {
676 // Block signals while we have the mutex so we never race with the signal
677 // handler.
678 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
679 std::unique_lock<stl_mutex> locker(mutex_);
680 if (event_loops_.size() == 0) {
681 // The first caller registers the signal handler.
682 struct sigaction new_action;
683 sigemptyset(&new_action.sa_mask);
684 // This makes it so that 2 control c's to a stuck process will kill it by
685 // restoring the original signal handler.
686 new_action.sa_flags = SA_RESETHAND;
687 new_action.sa_handler = &HandleSignal;
688
689 PCHECK(sigaction(SIGINT, &new_action, &old_action_int_) == 0);
690 PCHECK(sigaction(SIGHUP, &new_action, &old_action_hup_) == 0);
691 PCHECK(sigaction(SIGTERM, &new_action, &old_action_term_) == 0);
692 }
693
694 event_loops_.push_back(event_loop);
695 }
696
697 // Unregisters an event loop to receive Exit() calls.
698 void Unregister(ShmEventLoop *event_loop) {
699 // Block signals while we have the mutex so we never race with the signal
700 // handler.
701 ScopedSignalMask mask({SIGINT, SIGHUP, SIGTERM});
702 std::unique_lock<stl_mutex> locker(mutex_);
703
704 event_loops_.erase(std::find(event_loops_.begin(), event_loops_.end(), event_loop));
705
706 if (event_loops_.size() == 0u) {
707 // The last caller restores the original signal handlers.
708 PCHECK(sigaction(SIGINT, &old_action_int_, nullptr) == 0);
709 PCHECK(sigaction(SIGHUP, &old_action_hup_, nullptr) == 0);
710 PCHECK(sigaction(SIGTERM, &old_action_term_, nullptr) == 0);
711 }
712 }
713
714 private:
715 void DoHandleSignal() {
716 // We block signals while grabbing the lock, so there should never be a
717 // race. Confirm that this is true using trylock.
718 CHECK(mutex_.try_lock()) << ": sigprocmask failed to block signals while "
719 "modifing the event loop list.";
720 for (ShmEventLoop *event_loop : event_loops_) {
721 event_loop->Exit();
722 }
723 mutex_.unlock();
724 }
725
726 // Mutex to protect all state.
727 stl_mutex mutex_;
728 std::vector<ShmEventLoop *> event_loops_;
729 struct sigaction old_action_int_;
730 struct sigaction old_action_hup_;
731 struct sigaction old_action_term_;
732};
733
Alex Perrycb7da4b2019-08-28 19:35:56 -0700734void ShmEventLoop::Run() {
Austin Schuh32fd5a72019-12-01 22:20:26 -0800735 SignalHandler::global()->Register(this);
Austin Schuh39788ff2019-12-01 18:22:57 -0800736
Alex Perrycb7da4b2019-08-28 19:35:56 -0700737 std::unique_ptr<ipc_lib::SignalFd> signalfd;
738
739 if (watchers_.size() > 0) {
740 signalfd.reset(new ipc_lib::SignalFd({ipc_lib::kWakeupSignal}));
741
742 epoll_.OnReadable(signalfd->fd(), [signalfd_ptr = signalfd.get(), this]() {
743 signalfd_siginfo result = signalfd_ptr->Read();
744 CHECK_EQ(result.ssi_signo, ipc_lib::kWakeupSignal);
745
746 // TODO(austin): We should really be checking *everything*, not just
747 // watchers, and calling the oldest thing first. That will improve
748 // determinism a lot.
749
Austin Schuh7d87b672019-12-01 20:23:49 -0800750 HandleEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700751 });
752 }
753
Austin Schuh39788ff2019-12-01 18:22:57 -0800754 MaybeScheduleTimingReports();
755
Austin Schuh7d87b672019-12-01 20:23:49 -0800756 ReserveEvents();
757
Austin Schuh39788ff2019-12-01 18:22:57 -0800758 // Now, all the callbacks are setup. Lock everything into memory and go RT.
Alex Perrycb7da4b2019-08-28 19:35:56 -0700759 if (priority_ != 0) {
760 ::aos::InitRT();
761
762 LOG(INFO) << "Setting priority to " << priority_;
763 ::aos::SetCurrentThreadRealtimePriority(priority_);
764 }
765
766 set_is_running(true);
767
768 // Now that we are realtime (but before the OnRun handlers run), snap the
769 // queue index.
Austin Schuh39788ff2019-12-01 18:22:57 -0800770 for (::std::unique_ptr<WatcherState> &watcher : watchers_) {
771 watcher->Startup(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700772 }
773
774 // Now that we are RT, run all the OnRun handlers.
775 for (const auto &run : on_run_) {
776 run();
777 }
778
Alex Perrycb7da4b2019-08-28 19:35:56 -0700779 // And start our main event loop which runs all the timers and handles Quit.
780 epoll_.Run();
781
782 // Once epoll exits, there is no useful nonrt work left to do.
783 set_is_running(false);
784
785 // Nothing time or synchronization critical needs to happen after this point.
786 // Drop RT priority.
787 ::aos::UnsetCurrentThreadRealtimePriority();
788
Austin Schuh39788ff2019-12-01 18:22:57 -0800789 for (::std::unique_ptr<WatcherState> &base_watcher : watchers_) {
790 internal::WatcherState *watcher =
791 reinterpret_cast<internal::WatcherState *>(base_watcher.get());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700792 watcher->UnregisterWakeup();
793 }
794
795 if (watchers_.size() > 0) {
796 epoll_.DeleteFd(signalfd->fd());
797 signalfd.reset();
798 }
Austin Schuh32fd5a72019-12-01 22:20:26 -0800799
800 SignalHandler::global()->Unregister(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700801}
802
803void ShmEventLoop::Exit() { epoll_.Quit(); }
804
805ShmEventLoop::~ShmEventLoop() {
Austin Schuh39788ff2019-12-01 18:22:57 -0800806 // Trigger any remaining senders or fetchers to be cleared before destroying
807 // the event loop so the book keeping matches.
808 timing_report_sender_.reset();
809
810 // Force everything with a registered fd with epoll to be destroyed now.
811 timers_.clear();
812 phased_loops_.clear();
813 watchers_.clear();
814
Alex Perrycb7da4b2019-08-28 19:35:56 -0700815 CHECK(!is_running()) << ": ShmEventLoop destroyed while running";
816}
817
818void ShmEventLoop::Take(const Channel *channel) {
819 CHECK(!is_running()) << ": Cannot add new objects while running.";
820
821 // Cheat aggresively. Use the shared memory path as a proxy for a unique
822 // identifier for the channel.
823 const std::string path = ShmPath(channel);
824
825 const auto prior = ::std::find(taken_.begin(), taken_.end(), path);
826 CHECK(prior == taken_.end()) << ": " << path << " is already being used.";
827
828 taken_.emplace_back(path);
829}
830
831void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
832 if (is_running()) {
833 LOG(FATAL) << "Cannot set realtime priority while running.";
834 }
835 priority_ = priority;
836}
837
Austin Schuh39788ff2019-12-01 18:22:57 -0800838pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
839
Alex Perrycb7da4b2019-08-28 19:35:56 -0700840} // namespace aos