Teach ShmEventLoop how to validate some multithreading use cases

This helps catch bugs in callers who do these limited kinds of
multithreading.

Change-Id: Ie04790b2c46f0401430ed4c18a2d10845329623b
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 63e1cb9..cee1dfa 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -477,9 +477,13 @@
     simple_shm_fetcher_.RetrieveData();
   }
 
-  ~ShmFetcher() { context_.data = nullptr; }
+  ~ShmFetcher() override {
+    shm_event_loop()->CheckCurrentThread();
+    context_.data = nullptr;
+  }
 
   std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
+    shm_event_loop()->CheckCurrentThread();
     if (simple_shm_fetcher_.FetchNext()) {
       context_ = simple_shm_fetcher_.context();
       return std::make_pair(true, monotonic_clock::now());
@@ -488,6 +492,7 @@
   }
 
   std::pair<bool, monotonic_clock::time_point> DoFetch() override {
+    shm_event_loop()->CheckCurrentThread();
     if (simple_shm_fetcher_.Fetch()) {
       context_ = simple_shm_fetcher_.context();
       return std::make_pair(true, monotonic_clock::now());
@@ -500,6 +505,10 @@
   }
 
  private:
+  const ShmEventLoop *shm_event_loop() const {
+    return static_cast<const ShmEventLoop *>(event_loop());
+  }
+
   SimpleShmFetcher simple_shm_fetcher_;
 };
 
@@ -517,7 +526,7 @@
             channel)),
         wake_upper_(lockless_queue_memory_.queue()) {}
 
-  ~ShmSender() override {}
+  ~ShmSender() override { shm_event_loop()->CheckCurrentThread(); }
 
   static ipc_lib::LocklessQueueSender VerifySender(
       std::optional<ipc_lib::LocklessQueueSender> sender,
@@ -530,13 +539,20 @@
                << ", too many senders.";
   }
 
-  void *data() override { return lockless_queue_sender_.Data(); }
-  size_t size() override { return lockless_queue_sender_.size(); }
+  void *data() override {
+    shm_event_loop()->CheckCurrentThread();
+    return lockless_queue_sender_.Data();
+  }
+  size_t size() override {
+    shm_event_loop()->CheckCurrentThread();
+    return lockless_queue_sender_.size();
+  }
   bool DoSend(size_t length,
               aos::monotonic_clock::time_point monotonic_remote_time,
               aos::realtime_clock::time_point realtime_remote_time,
               uint32_t remote_queue_index,
               const UUID &remote_boot_uuid) override {
+    shm_event_loop()->CheckCurrentThread();
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
@@ -556,6 +572,7 @@
               aos::realtime_clock::time_point realtime_remote_time,
               uint32_t remote_queue_index,
               const UUID &remote_boot_uuid) override {
+    shm_event_loop()->CheckCurrentThread();
     CHECK_LE(length, static_cast<size_t>(channel()->max_size()))
         << ": Sent too big a message on "
         << configuration::CleanedChannelToString(channel());
@@ -574,9 +591,16 @@
     return lockless_queue_memory_.GetMutableSharedMemory();
   }
 
-  int buffer_index() override { return lockless_queue_sender_.buffer_index(); }
+  int buffer_index() override {
+    shm_event_loop()->CheckCurrentThread();
+    return lockless_queue_sender_.buffer_index();
+  }
 
  private:
+  const ShmEventLoop *shm_event_loop() const {
+    return static_cast<const ShmEventLoop *>(event_loop());
+  }
+
   MMappedQueue lockless_queue_memory_;
   ipc_lib::LocklessQueueSender lockless_queue_sender_;
   ipc_lib::LocklessQueueWakeUpper wake_upper_;
@@ -599,9 +623,13 @@
     }
   }
 
-  ~ShmWatcherState() override { event_loop_->RemoveEvent(&event_); }
+  ~ShmWatcherState() override {
+    event_loop_->CheckCurrentThread();
+    event_loop_->RemoveEvent(&event_);
+  }
 
   void Startup(EventLoop *event_loop) override {
+    event_loop_->CheckCurrentThread();
     simple_shm_fetcher_.PointAtNextQueueIndex();
     CHECK(RegisterWakeup(event_loop->priority()));
   }
@@ -666,6 +694,7 @@
   }
 
   ~ShmTimerHandler() {
+    shm_event_loop_->CheckCurrentThread();
     Disable();
     shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
   }
@@ -705,6 +734,7 @@
 
   void Setup(monotonic_clock::time_point base,
              monotonic_clock::duration repeat_offset) override {
+    shm_event_loop_->CheckCurrentThread();
     if (event_.valid()) {
       shm_event_loop_->RemoveEvent(&event_);
     }
@@ -717,6 +747,7 @@
   }
 
   void Disable() override {
+    shm_event_loop_->CheckCurrentThread();
     shm_event_loop_->RemoveEvent(&event_);
     timerfd_.Disable();
     disabled_ = true;
@@ -766,6 +797,7 @@
   }
 
   ~ShmPhasedLoopHandler() override {
+    shm_event_loop_->CheckCurrentThread();
     shm_event_loop_->epoll_.DeleteFd(timerfd_.fd());
     shm_event_loop_->RemoveEvent(&event_);
   }
@@ -773,6 +805,7 @@
  private:
   // Reschedules the timer.
   void Schedule(monotonic_clock::time_point sleep_time) override {
+    shm_event_loop_->CheckCurrentThread();
     if (event_.valid()) {
       shm_event_loop_->RemoveEvent(&event_);
     }
@@ -792,6 +825,7 @@
 
 ::std::unique_ptr<RawFetcher> ShmEventLoop::MakeRawFetcher(
     const Channel *channel) {
+  CheckCurrentThread();
   if (!configuration::ChannelIsReadableOnNode(channel, node())) {
     LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
                << "\", \"type\": \"" << channel->type()->string_view()
@@ -805,6 +839,7 @@
 
 ::std::unique_ptr<RawSender> ShmEventLoop::MakeRawSender(
     const Channel *channel) {
+  CheckCurrentThread();
   TakeSender(channel);
 
   return ::std::unique_ptr<RawSender>(new ShmSender(shm_base_, this, channel));
@@ -813,6 +848,7 @@
 void ShmEventLoop::MakeRawWatcher(
     const Channel *channel,
     std::function<void(const Context &context, const void *message)> watcher) {
+  CheckCurrentThread();
   TakeWatcher(channel);
 
   NewWatcher(::std::unique_ptr<WatcherState>(
@@ -822,6 +858,7 @@
 void ShmEventLoop::MakeRawNoArgWatcher(
     const Channel *channel,
     std::function<void(const Context &context)> watcher) {
+  CheckCurrentThread();
   TakeWatcher(channel);
 
   NewWatcher(::std::unique_ptr<WatcherState>(new ShmWatcherState(
@@ -831,6 +868,7 @@
 }
 
 TimerHandler *ShmEventLoop::AddTimer(::std::function<void()> callback) {
+  CheckCurrentThread();
   return NewTimer(::std::unique_ptr<TimerHandler>(
       new ShmTimerHandler(this, ::std::move(callback))));
 }
@@ -839,14 +877,28 @@
     ::std::function<void(int)> callback,
     const monotonic_clock::duration interval,
     const monotonic_clock::duration offset) {
+  CheckCurrentThread();
   return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
       new ShmPhasedLoopHandler(this, ::std::move(callback), interval, offset)));
 }
 
 void ShmEventLoop::OnRun(::std::function<void()> on_run) {
+  CheckCurrentThread();
   on_run_.push_back(::std::move(on_run));
 }
 
+void ShmEventLoop::CheckCurrentThread() const {
+  if (__builtin_expect(check_mutex_ != nullptr, false)) {
+    CHECK(check_mutex_->is_locked())
+        << ": The configured mutex is not locked while calling a "
+           "ShmEventLoop function";
+  }
+  if (__builtin_expect(!!check_tid_, false)) {
+    CHECK_EQ(syscall(SYS_gettid), *check_tid_)
+        << ": Being called from the wrong thread";
+  }
+}
+
 // This is a bit tricky because watchers can generate new events at any time (as
 // long as it's in the past). We want to check the watchers at least once before
 // declaring there are no events to handle, and we want to check them again if
@@ -1021,6 +1073,7 @@
 };
 
 void ShmEventLoop::Run() {
+  CheckCurrentThread();
   SignalHandler::global()->Register(this);
 
   if (watchers_.size() > 0) {
@@ -1100,6 +1153,7 @@
 void ShmEventLoop::Exit() { epoll_.Quit(); }
 
 ShmEventLoop::~ShmEventLoop() {
+  CheckCurrentThread();
   // Force everything with a registered fd with epoll to be destroyed now.
   timers_.clear();
   phased_loops_.clear();
@@ -1109,6 +1163,7 @@
 }
 
 void ShmEventLoop::SetRuntimeRealtimePriority(int priority) {
+  CheckCurrentThread();
   if (is_running()) {
     LOG(FATAL) << "Cannot set realtime priority while running.";
   }
@@ -1116,6 +1171,7 @@
 }
 
 void ShmEventLoop::SetRuntimeAffinity(const cpu_set_t &cpuset) {
+  CheckCurrentThread();
   if (is_running()) {
     LOG(FATAL) << "Cannot set affinity while running.";
   }
@@ -1123,18 +1179,21 @@
 }
 
 void ShmEventLoop::set_name(const std::string_view name) {
+  CheckCurrentThread();
   name_ = std::string(name);
   UpdateTimingReport();
 }
 
 absl::Span<const char> ShmEventLoop::GetWatcherSharedMemory(
     const Channel *channel) {
+  CheckCurrentThread();
   ShmWatcherState *const watcher_state =
       static_cast<ShmWatcherState *>(GetWatcherState(channel));
   return watcher_state->GetSharedMemory();
 }
 
 int ShmEventLoop::NumberBuffers(const Channel *channel) {
+  CheckCurrentThread();
   return MakeQueueConfiguration(
              channel, chrono::ceil<chrono::seconds>(chrono::nanoseconds(
                           configuration()->channel_storage_duration())))
@@ -1143,14 +1202,19 @@
 
 absl::Span<char> ShmEventLoop::GetShmSenderSharedMemory(
     const aos::RawSender *sender) const {
+  CheckCurrentThread();
   return static_cast<const ShmSender *>(sender)->GetSharedMemory();
 }
 
 absl::Span<const char> ShmEventLoop::GetShmFetcherPrivateMemory(
     const aos::RawFetcher *fetcher) const {
+  CheckCurrentThread();
   return static_cast<const ShmFetcher *>(fetcher)->GetPrivateMemory();
 }
 
-pid_t ShmEventLoop::GetTid() { return syscall(SYS_gettid); }
+pid_t ShmEventLoop::GetTid() {
+  CheckCurrentThread();
+  return syscall(SYS_gettid);
+}
 
 }  // namespace aos