Generalize futex observers for the compare_exchanges in lockless_queue

This is necessary to get lockless_queue_death_test running on aarch64.

Change-Id: I621fe55fac4d43a8e40d053d4f069d618e738f7d
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index c24fcee..c124fe0 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -12,6 +12,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
+        ":shm_observers",
         "//aos:macros",
         "//aos:thread_local",
         "//aos/util:compiler_memory_barrier",
@@ -146,6 +147,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
+        ":shm_observers",
         "@com_github_google_glog//:glog",
     ],
 )
@@ -225,6 +227,7 @@
         ":event",
         ":lockless_queue",
         ":queue_racer",
+        ":shm_observers",
         ":signalfd",
         "//aos/events:epoll",
         "//aos/libc:aos_strsignal",
@@ -359,3 +362,13 @@
         "//aos/time",
     ],
 )
+
+cc_library(
+    name = "shm_observers",
+    srcs = [
+        "shm_observers.cc",
+    ],
+    hdrs = [
+        "shm_observers.h",
+    ],
+)
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
index 6407a57..861a6fe 100644
--- a/aos/ipc_lib/aos_sync.cc
+++ b/aos/ipc_lib/aos_sync.cc
@@ -20,6 +20,8 @@
 #include <cstdint>
 #include <cstring>
 
+#include "aos/ipc_lib/shm_observers.h"
+
 #ifdef AOS_SANITIZER_thread
 #include <sanitizer/tsan_interface_atomic.h>
 #endif
@@ -33,7 +35,7 @@
 #include "aos/util/compiler_memory_barrier.h"
 #include "glog/logging.h"
 
-using ::aos::linux_code::ipc_lib::FutexAccessorObserver;
+using ::aos::linux_code::ipc_lib::RunShmObservers;
 
 // This code was originally based on
 // <https://www.akkadia.org/drepper/futex.pdf>, but is has since evolved a lot.
@@ -680,34 +682,6 @@
   my_robust_list::Init();
 }
 
-FutexAccessorObserver before_observer = nullptr, after_observer = nullptr;
-
-// RAII class which runs before_observer during construction and after_observer
-// during destruction.
-class RunObservers {
- public:
-  template <class T>
-  RunObservers(T *address, bool write)
-      : address_(static_cast<void *>(
-            const_cast<typename ::std::remove_cv<T>::type *>(address))),
-        write_(write) {
-    if (__builtin_expect(before_observer != nullptr, false)) {
-      before_observer(address_, write_);
-    }
-  }
-  ~RunObservers() {
-    if (__builtin_expect(after_observer != nullptr, false)) {
-      after_observer(address_, write_);
-    }
-  }
-
- private:
-  void *const address_;
-  const bool write_;
-
-  DISALLOW_COPY_AND_ASSIGN(RunObservers);
-};
-
 // Finishes the locking of a mutex by potentially clearing FUTEX_OWNER_DIED in
 // the futex and returning the correct value.
 inline int mutex_finish_lock(aos_mutex *m) {
@@ -726,7 +700,7 @@
 // own my_robust_list::Adder.
 inline int mutex_do_get(aos_mutex *m, bool signals_fail,
                         const struct timespec *timeout, uint32_t tid) {
-  RunObservers run_observers(m, true);
+  RunShmObservers run_observers(m, true);
   if (kPrintOperations) {
     printf("%" PRId32 ": %p do_get\n", tid, m);
   }
@@ -794,7 +768,7 @@
 // number_requeue is the number of waiters to requeue (probably INT_MAX or 0). 1
 // will always be woken.
 void condition_wake(aos_condition *c, aos_mutex *m, int number_requeue) {
-  RunObservers run_observers(c, true);
+  RunShmObservers run_observers(c, true);
   // Make it so that anybody just going to sleep won't.
   // This is where we might accidentally wake more than just 1 waiter with 1
   // signal():
@@ -838,7 +812,7 @@
 int mutex_grab(aos_mutex *m) { return mutex_get(m, false, NULL); }
 
 void mutex_unlock(aos_mutex *m) {
-  RunObservers run_observers(m, true);
+  RunShmObservers run_observers(m, true);
   const uint32_t tid = get_tid();
   if (kPrintOperations) {
     printf("%" PRId32 ": %p unlock\n", tid, m);
@@ -874,7 +848,7 @@
 }
 
 int mutex_trylock(aos_mutex *m) {
-  RunObservers run_observers(m, true);
+  RunShmObservers run_observers(m, true);
   const uint32_t tid = get_tid();
   if (kPrintOperations) {
     printf("%" PRId32 ": %p trylock\n", tid, m);
@@ -929,14 +903,14 @@
   }
   my_robust_list::Adder adder(m);
   {
-    RunObservers run_observers(m, true);
+    RunShmObservers run_observers(m, true);
     CHECK(compare_and_swap(&m->futex, 0, tid));
   }
   adder.Add();
 }
 
 void death_notification_release(aos_mutex *m) {
-  RunObservers run_observers(m, true);
+  RunShmObservers run_observers(m, true);
 
 #ifndef NDEBUG
   // Verify it's "locked", like it should be.
@@ -961,7 +935,7 @@
 }
 
 int condition_wait(aos_condition *c, aos_mutex *m, struct timespec *end_time) {
-  RunObservers run_observers(c, false);
+  RunShmObservers run_observers(c, false);
   const uint32_t tid = get_tid();
   const uint32_t wait_start = __atomic_load_n(c, __ATOMIC_SEQ_CST);
 
@@ -1041,7 +1015,7 @@
 }
 
 int futex_wait_timeout(aos_futex *m, const struct timespec *timeout) {
-  RunObservers run_observers(m, false);
+  RunShmObservers run_observers(m, false);
   const int ret = sys_futex_wait(FUTEX_WAIT, m, 0, timeout);
   if (ret != 0) {
     if (ret == -EINTR) {
@@ -1060,7 +1034,7 @@
 int futex_wait(aos_futex *m) { return futex_wait_timeout(m, NULL); }
 
 int futex_set_value(aos_futex *m, uint32_t value) {
-  RunObservers run_observers(m, false);
+  RunShmObservers run_observers(m, false);
   ANNOTATE_HAPPENS_BEFORE(m);
   __atomic_store_n(m, value, __ATOMIC_SEQ_CST);
   const int r = sys_futex_wake(m, INT_MAX - 4096);
@@ -1084,15 +1058,6 @@
 namespace linux_code {
 namespace ipc_lib {
 
-// Sets functions to run befor eand after all futex operations.
-// This is important when doing robustness testing because the memory has to be
-// made writable for the whole futex operation, otherwise it never succeeds.
-void SetFutexAccessorObservers(FutexAccessorObserver before,
-                               FutexAccessorObserver after) {
-  before_observer = before;
-  after_observer = after;
-}
-
 // Sets an extra offset between mutexes and the value we use for them in the
 // robust list (only the forward pointers). This is used to work around a kernel
 // bug by keeping a second set of mutexes which is always writable so the kernel
diff --git a/aos/ipc_lib/aos_sync.h b/aos/ipc_lib/aos_sync.h
index a61fcc1..c5fede1 100644
--- a/aos/ipc_lib/aos_sync.h
+++ b/aos/ipc_lib/aos_sync.h
@@ -182,12 +182,6 @@
 namespace linux_code {
 namespace ipc_lib {
 
-typedef void (*FutexAccessorObserver)(void *address, bool write);
-
-// Set functions which get called before and after all futex operations.
-void SetFutexAccessorObservers(FutexAccessorObserver before,
-                               FutexAccessorObserver after);
-
 // Set the offset to use for putting addresses into the robust list.
 // This is necessary to work around a kernel bug where it hangs when trying to
 // deal with a futex on the robust list when its memory has been changed to
diff --git a/aos/ipc_lib/index.h b/aos/ipc_lib/index.h
index a47121e..c6a485f 100644
--- a/aos/ipc_lib/index.h
+++ b/aos/ipc_lib/index.h
@@ -2,9 +2,11 @@
 #define AOS_IPC_LIB_INDEX_H_
 
 #include <sys/types.h>
+
 #include <atomic>
 #include <string>
 
+#include "aos/ipc_lib/shm_observers.h"
 #include "glog/logging.h"
 
 namespace aos {
@@ -155,6 +157,7 @@
   // Swaps expected for index atomically.  Returns true on success, false
   // otherwise.
   inline bool CompareAndExchangeStrong(QueueIndex expected, QueueIndex index) {
+    linux_code::ipc_lib::RunShmObservers run_observers(&index_, true);
     return index_.compare_exchange_strong(expected.index_, index.index_,
                                           ::std::memory_order_acq_rel);
   }
@@ -242,6 +245,7 @@
   // Swaps expected for index atomically.  Returns true on success, false
   // otherwise.
   bool CompareAndExchangeStrong(Index expected, Index index) {
+    linux_code::ipc_lib::RunShmObservers run_observers(&index_, true);
     return index_.compare_exchange_strong(expected.index_, index.index_,
                                           ::std::memory_order_acq_rel);
   }
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 3ed2ce5..35ca5ca 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -14,6 +14,7 @@
 #include "aos/ipc_lib/aos_sync.h"
 #include "aos/ipc_lib/lockless_queue.h"
 #include "aos/ipc_lib/lockless_queue_memory.h"
+#include "aos/ipc_lib/shm_observers.h"
 #include "aos/libc/aos_strsignal.h"
 #include "aos/realtime.h"
 #include "aos/testing/prevent_exit.h"
@@ -332,7 +333,7 @@
     InstallHandler(SIGSEGV, segv_handler, &old_segv_handler);
     InstallHandler(SIGTRAP, trap_handler, &old_trap_handler);
     CHECK_EQ(old_trap_handler.sa_handler, SIG_DFL);
-    linux_code::ipc_lib::SetFutexAccessorObservers(futex_before, futex_after);
+    linux_code::ipc_lib::SetShmAccessorObservers(futex_before, futex_after);
 
     ShmProtectOrDie(PROT_READ);
     my_global_state->state = DieAtState::kRunning;
diff --git a/aos/ipc_lib/shm_observers.cc b/aos/ipc_lib/shm_observers.cc
new file mode 100644
index 0000000..04a6214
--- /dev/null
+++ b/aos/ipc_lib/shm_observers.cc
@@ -0,0 +1,17 @@
+#include "aos/ipc_lib/shm_observers.h"
+
+namespace aos {
+namespace linux_code {
+namespace ipc_lib {
+
+ShmAccessorObserver before_observer = nullptr, after_observer = nullptr;
+
+void SetShmAccessorObservers(ShmAccessorObserver before,
+                             ShmAccessorObserver after) {
+  before_observer = before;
+  after_observer = after;
+}
+
+}  // namespace ipc_lib
+}  // namespace linux_code
+}  // namespace aos
diff --git a/aos/ipc_lib/shm_observers.h b/aos/ipc_lib/shm_observers.h
new file mode 100644
index 0000000..5780699
--- /dev/null
+++ b/aos/ipc_lib/shm_observers.h
@@ -0,0 +1,52 @@
+#ifndef AOS_IPC_LIB_SHM_OBSERVERS_H_
+#define AOS_IPC_LIB_SHM_OBSERVERS_H_
+
+#include <type_traits>
+
+namespace aos {
+namespace linux_code {
+namespace ipc_lib {
+
+typedef void (*ShmAccessorObserver)(void *address, bool write);
+
+extern ShmAccessorObserver before_observer, after_observer;
+
+// Sets functions to run before and after SHM write operations which may
+// involved multiple instructions. This is important when doing robustness
+// testing because the memory has to be made writable for the whole operation,
+// otherwise it never succeeds.
+void SetShmAccessorObservers(ShmAccessorObserver before,
+                             ShmAccessorObserver after);
+
+// RAII class which runs before_observer during construction and after_observer
+// during destruction.
+class RunShmObservers {
+ public:
+  template <class T>
+  RunShmObservers(T *address, bool write)
+      : address_(static_cast<void *>(
+            const_cast<typename ::std::remove_cv<T>::type *>(address))),
+        write_(write) {
+    if (__builtin_expect(before_observer != nullptr, false)) {
+      before_observer(address_, write_);
+    }
+  }
+  ~RunShmObservers() {
+    if (__builtin_expect(after_observer != nullptr, false)) {
+      after_observer(address_, write_);
+    }
+  }
+
+  RunShmObservers(const RunShmObservers &) = delete;
+  RunShmObservers &operator=(const RunShmObservers &) = delete;
+
+ private:
+  void *const address_;
+  const bool write_;
+};
+
+}  // namespace ipc_lib
+}  // namespace linux_code
+}  // namespace aos
+
+#endif  // AOS_IPC_LIB_SHM_OBSERVERS_H_