Teach ShmEventLoop how to validate some multithreading use cases
This helps catch bugs in callers who do these limited kinds of
multithreading.
Change-Id: Ie04790b2c46f0401430ed4c18a2d10845329623b
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 63e1cb9..cee1dfa 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -477,9 +477,13 @@
simple_shm_fetcher_.RetrieveData();
}
- ~ShmFetcher() { context_.data = nullptr; }
+ ~ShmFetcher() override {
+ shm_event_loop()->CheckCurrentThread();
+ context_.data = nullptr;
+ }
std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
+ shm_event_loop()->CheckCurrentThread();
if (simple_shm_fetcher_.FetchNext()) {
context_ = simple_shm_fetcher_.context();
return std::make_pair(true, monotonic_clock::now());
@@ -488,6 +492,7 @@
}
std::pair<bool, monotonic_clock::time_point> DoFetch() override {
+ shm_event_loop()->CheckCurrentThread();
if (simple_shm_fetcher_.Fetch()) {
context_ = simple_shm_fetcher_.context();
return std::make_pair(true, monotonic_clock::now());
@@ -500,6 +505,10 @@
}
private:
+ const ShmEventLoop *shm_event_loop() const {
+ return static_cast<const ShmEventLoop *>(event_loop());
+ }
+
SimpleShmFetcher simple_shm_fetcher_;
};
@@ -517,7 +526,7 @@
channel)),
wake_upper_(lockless_queue_memory_.queue()) {}
- ~ShmSender() override {}
+ ~ShmSender() override { shm_event_loop()->CheckCurrentThread(); }
static ipc_lib::LocklessQueueSender VerifySender(
std::optional<ipc_lib::LocklessQueueSender> sender,
@@ -530,13 +539,20 @@
<< ", too many senders.";
}
- void *data() override { return lockless_queue_sender_.Data(); }
- size_t size() override { return lockless_queue_sender_.size(); }
+ void *data() override {
+ shm_event_loop()->CheckCurrentThread();
+ return lockless_queue_sender_.Data();
+ }
+ size_t size() override {
+ shm_event_loop()->CheckCurrentThread();
+ return lockless_queue_sender_.size();
+ }
bool DoSend(size_t length,
aos::monotonic_clock::time_point monotonic_remote_time,
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
const UUID &remote_boot_uuid) override {
+ shm_event_loop()->CheckCurrentThread();
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
@@ -556,6 +572,7 @@
aos::realtime_clock::time_point realtime_remote_time,
uint32_t remote_queue_index,
const UUID &remote_boot_uuid) override {
+ shm_event_loop()->CheckCurrentThread();
CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
<< ": Sent too big a message on "
<< configuration::CleanedChannelToString(channel());
@@ -574,9 +591,16 @@
return lockless_queue_memory_.GetMutableSharedMemory();
}
- int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
+ int buffer_index() override {
+ shm_event_loop()->CheckCurrentThread();
+ return lockless_queue_sender_.buffer_index();
+ }
private:
+ const ShmEventLoop *shm_event_loop() const {
+ return static_cast<const ShmEventLoop *>(event_loop());
+ }
+
MMappedQueue lockless_queue_memory_;
ipc_lib::LocklessQueueSender lockless_queue_sender_;
ipc_lib::LocklessQueueWakeUpper wake_upper_;
@@ -599,9 +623,13 @@
}
}
- ~ShmWatcherState() override { event_loop_->RemoveEvent(&event_); }
+ ~ShmWatcherState() override {
+ event_loop_->CheckCurrentThread();
+ event_loop_->RemoveEvent(&event_);
+ }
void Startup(EventLoop *event_loop) override {
+ event_loop_->CheckCurrentThread();
simple_shm_fetcher_.PointAtNextQueueIndex();
CHECK(RegisterWakeup(event_loop->priority()));
}
@@ -666,6 +694,7 @@
}
~ShmTimerHandler() {
+ shm_event_loop_->CheckCurrentThread();
Disable();
shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
}
@@ -705,6 +734,7 @@
void Setup(monotonic_clock::time_point base,
monotonic_clock::duration repeat_offset) override {
+ shm_event_loop_->CheckCurrentThread();
if (event_.valid()) {
shm_event_loop_->RemoveEvent(&event_);
}
@@ -717,6 +747,7 @@
}
void Disable() override {
+ shm_event_loop_->CheckCurrentThread();
shm_event_loop_->RemoveEvent(&event_);
timerfd_.Disable();
disabled_ = true;
@@ -766,6 +797,7 @@
}
~ShmPhasedLoopHandler() override {
+ shm_event_loop_->CheckCurrentThread();
shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
shm_event_loop_->RemoveEvent(&event_);
}
@@ -773,6 +805,7 @@
private:
// Reschedules the timer.
void Schedule(monotonic_clock::time_point sleep_time) override {
+ shm_event_loop_->CheckCurrentThread();
if (event_.valid()) {
shm_event_loop_->RemoveEvent(&event_);
}
@@ -792,6 +825,7 @@
::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
const Channel *channel) {
+ CheckCurrentThread();
if (!configuration::ChannelIsReadableOnNode(channel, node())) {
LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
<< "\", \"type\": \"" << channel->type()->string_view()
@@ -805,6 +839,7 @@
::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
const Channel *channel) {
+ CheckCurrentThread();
TakeSender(channel);
return ::std::unique_ptr<RawSender>(new ShmSender(shm_base_, this, channel));
@@ -813,6 +848,7 @@
void ShmEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &context, const void *message)> watcher) {
+ CheckCurrentThread();
TakeWatcher(channel);
NewWatcher(::std::unique_ptr<WatcherState>(
@@ -822,6 +858,7 @@
void ShmEventLoop::MakeRawNoArgWatcher(
const Channel *channel,
std::function<void(const Context &context)> watcher) {
+ CheckCurrentThread();
TakeWatcher(channel);
NewWatcher(::std::unique_ptr<WatcherState>(new ShmWatcherState(
@@ -831,6 +868,7 @@
}
TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
+ CheckCurrentThread();
return NewTimer(::std::unique_ptr<TimerHandler>(
new ShmTimerHandler(this, ::std::move(callback))));
}
@@ -839,14 +877,28 @@
::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset) {
+ CheckCurrentThread();
return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
new ShmPhasedLoopHandler(this, ::std::move(callback), interval, offset)));
}
void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+ CheckCurrentThread();
on_run_.push_back(::std::move(on_run));
}
+void ShmEventLoop::CheckCurrentThread() const {
+ if (__builtin_expect(check_mutex_ != nullptr, false)) {
+ CHECK(check_mutex_->is_locked())
+ << ": The configured mutex is not locked while calling a "
+ "ShmEventLoop function";
+ }
+ if (__builtin_expect(!!check_tid_, false)) {
+ CHECK_EQ(syscall(SYS_gettid), *check_tid_)
+ << ": Being called from the wrong thread";
+ }
+}
+
// This is a bit tricky because watchers can generate new events at any time (as
// long as it's in the past). We want to check the watchers at least once before
// declaring there are no events to handle, and we want to check them again if
@@ -1021,6 +1073,7 @@
};
void ShmEventLoop::Run() {
+ CheckCurrentThread();
SignalHandler::global()->Register(this);
if (watchers_.size() > 0) {
@@ -1100,6 +1153,7 @@
void ShmEventLoop::Exit() { epoll_.Quit(); }
ShmEventLoop::~ShmEventLoop() {
+ CheckCurrentThread();
// Force everything with a registered fd with epoll to be destroyed now.
timers_.clear();
phased_loops_.clear();
@@ -1109,6 +1163,7 @@
}
void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
+ CheckCurrentThread();
if (is_running()) {
LOG(FATAL) << "Cannot set realtime priority while running.";
}
@@ -1116,6 +1171,7 @@
}
void ShmEventLoop::SetRuntimeAffinity(const cpu_set_t &cpuset) {
+ CheckCurrentThread();
if (is_running()) {
LOG(FATAL) << "Cannot set affinity while running.";
}
@@ -1123,18 +1179,21 @@
}
void ShmEventLoop::set_name(const std::string_view name) {
+ CheckCurrentThread();
name_ = std::string(name);
UpdateTimingReport();
}
absl::Span<const char> ShmEventLoop::GetWatcherSharedMemory(
const Channel *channel) {
+ CheckCurrentThread();
ShmWatcherState *const watcher_state =
static_cast<ShmWatcherState *>(GetWatcherState(channel));
return watcher_state->GetSharedMemory();
}
int ShmEventLoop::NumberBuffers(const Channel *channel) {
+ CheckCurrentThread();
return MakeQueueConfiguration(
channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
configuration()->channel_storage_duration())))
@@ -1143,14 +1202,19 @@
absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
const aos::RawSender *sender) const {
+ CheckCurrentThread();
return static_cast<const ShmSender *>(sender)->GetSharedMemory();
}
absl::Span<const char> ShmEventLoop::GetShmFetcherPrivateMemory(
const aos::RawFetcher *fetcher) const {
+ CheckCurrentThread();
return static_cast<const ShmFetcher *>(fetcher)->GetPrivateMemory();
}
-pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
+pid_t ShmEventLoop::GetTid() {
+ CheckCurrentThread();
+ return syscall(SYS_gettid);
+}
} // namespace aos