added more multithreaded tests that are useful with tsan

Also made various small cleanups while writing and checking these tests.
diff --git a/aos/common/common.gyp b/aos/common/common.gyp
index 186599a..0a4bf62 100644
--- a/aos/common/common.gyp
+++ b/aos/common/common.gyp
@@ -276,7 +276,7 @@
       'target_name': 'mutex_test',
       'type': 'executable',
       'sources': [
-        'mutex_test.cpp',
+        'mutex_test.cc',
       ],
       'dependencies': [
         '<(EXTERNALS):gtest',
@@ -284,6 +284,8 @@
         'die',
         '<(AOS)/build/aos.gyp:logging',
         '<(AOS)/common/util/util.gyp:death_test_log_implementation',
+        '<(AOS)/common/util/util.gyp:thread',
+        '<(AOS)/common/common.gyp:time',
       ],
     },
     {
diff --git a/aos/common/mutex_test.cc b/aos/common/mutex_test.cc
new file mode 100644
index 0000000..f55d15d
--- /dev/null
+++ b/aos/common/mutex_test.cc
@@ -0,0 +1,151 @@
+#include "aos/common/mutex.h"
+
+#include <sched.h>
+#include <math.h>
+#include <pthread.h>
+
+#include "gtest/gtest.h"
+
+#include "aos/linux_code/ipc_lib/aos_sync.h"
+#include "aos/common/die.h"
+#include "aos/common/util/death_test_log_implementation.h"
+#include "aos/common/util/thread.h"
+#include "aos/common/time.h"
+
+namespace aos {
+namespace testing {
+
+class MutexTest : public ::testing::Test {
+ public:
+  Mutex test_mutex;
+
+ protected:
+  void SetUp() override {
+    SetDieTestMode(true);
+  }
+};
+
+typedef MutexTest MutexDeathTest;
+
+TEST_F(MutexTest, TryLock) {
+  EXPECT_TRUE(test_mutex.TryLock());
+  EXPECT_FALSE(test_mutex.TryLock());
+}
+
+TEST_F(MutexTest, Lock) {
+  test_mutex.Lock();
+  EXPECT_FALSE(test_mutex.TryLock());
+}
+
+TEST_F(MutexTest, Unlock) {
+  test_mutex.Lock();
+  EXPECT_FALSE(test_mutex.TryLock());
+  test_mutex.Unlock();
+  EXPECT_TRUE(test_mutex.TryLock());
+}
+
+// Sees what happens with multiple unlocks.
+TEST_F(MutexDeathTest, RepeatUnlock) {
+  test_mutex.Lock();
+  test_mutex.Unlock();
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        test_mutex.Unlock();
+      },
+      ".*multiple unlock.*");
+}
+
+// Sees what happens if you unlock without ever locking (or unlocking) it.
+TEST_F(MutexDeathTest, NeverLock) {
+  EXPECT_DEATH(
+      {
+        logging::AddImplementation(new util::DeathTestLogImplementation());
+        test_mutex.Unlock();
+      },
+      ".*multiple unlock.*");
+}
+
+TEST_F(MutexTest, MutexLocker) {
+  {
+    aos::MutexLocker locker(&test_mutex);
+    EXPECT_FALSE(test_mutex.TryLock());
+  }
+  EXPECT_TRUE(test_mutex.TryLock());
+}
+
+TEST_F(MutexTest, MutexUnlocker) {
+  test_mutex.Lock();
+  {
+    aos::MutexUnlocker unlocker(&test_mutex);
+    // If this fails, then something weird is going on and the next line might
+    // hang, so fail immediately.
+    ASSERT_TRUE(test_mutex.TryLock());
+    test_mutex.Unlock();
+  }
+  EXPECT_FALSE(test_mutex.TryLock());
+}
+
+namespace {
+
+class AdderThread : public ::aos::util::Thread {
+ public:
+  AdderThread(int *counter, Mutex *mutex, ::aos::time::Time sleep_before_time,
+              ::aos::time::Time sleep_after_time)
+      : counter_(counter),
+        mutex_(mutex),
+        sleep_before_time_(sleep_before_time),
+        sleep_after_time_(sleep_after_time) {}
+  virtual void Run() override {
+    ::aos::time::SleepFor(sleep_before_time_);
+    MutexLocker locker(mutex_);
+    ++(*counter_);
+    ::aos::time::SleepFor(sleep_after_time_);
+  }
+
+ private:
+  int *const counter_;
+  Mutex *const mutex_;
+  const ::aos::time::Time sleep_before_time_, sleep_after_time_;
+};
+
+}  // namespace
+
+// Verifies that ThreadSanitizer understands that a contended mutex establishes
+// a happens-before relationship.
+TEST_F(MutexTest, ThreadSanitizerContended) {
+  int counter = 0;
+  AdderThread threads[2]{
+      {&counter, &test_mutex, ::aos::time::Time::InSeconds(1),
+       ::aos::time::Time::InSeconds(0)},
+      {&counter, &test_mutex, ::aos::time::Time::InSeconds(0),
+       ::aos::time::Time::InSeconds(0)}, };
+  for (auto &c : threads) {
+    c.Start();
+  }
+  for (auto &c : threads) {
+    c.WaitUntilDone();
+  }
+  EXPECT_EQ(2, counter);
+}
+
+// Verifies that ThreadSanitizer understands that an uncontended mutex
+// establishes a happens-before relationship.
+TEST_F(MutexTest, ThreadSanitizerUncontended) {
+  int counter = 0;
+  AdderThread threads[2]{
+      {&counter, &test_mutex, ::aos::time::Time::InSeconds(1),
+       ::aos::time::Time::InSeconds(0)},
+      {&counter, &test_mutex, ::aos::time::Time::InSeconds(0),
+       ::aos::time::Time::InSeconds(0)}, };
+  for (auto &c : threads) {
+    c.Start();
+  }
+  for (auto &c : threads) {
+    c.WaitUntilDone();
+  }
+  EXPECT_EQ(2, counter);
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/common/mutex_test.cpp b/aos/common/mutex_test.cpp
deleted file mode 100644
index 2ab3e5c..0000000
--- a/aos/common/mutex_test.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-#include "aos/common/mutex.h"
-
-#include <sched.h>
-#include <math.h>
-#include <pthread.h>
-#ifdef __VXWORKS__
-#include <taskLib.h>
-#endif
-
-#include "gtest/gtest.h"
-
-#include "aos/linux_code/ipc_lib/aos_sync.h"
-#include "aos/common/die.h"
-#include "aos/common/util/death_test_log_implementation.h"
-
-namespace aos {
-namespace testing {
-
-class MutexTest : public ::testing::Test {
- public:
-  Mutex test_mutex;
-
- protected:
-  void SetUp() override {
-    SetDieTestMode(true);
-  }
-};
-
-typedef MutexTest MutexDeathTest;
-
-TEST_F(MutexTest, TryLock) {
-  EXPECT_TRUE(test_mutex.TryLock());
-  EXPECT_FALSE(test_mutex.TryLock());
-}
-
-TEST_F(MutexTest, Lock) {
-  test_mutex.Lock();
-  EXPECT_FALSE(test_mutex.TryLock());
-}
-
-TEST_F(MutexTest, Unlock) {
-  test_mutex.Lock();
-  EXPECT_FALSE(test_mutex.TryLock());
-  test_mutex.Unlock();
-  EXPECT_TRUE(test_mutex.TryLock());
-}
-
-#ifndef __VXWORKS__
-// Sees what happens with multiple unlocks.
-TEST_F(MutexDeathTest, RepeatUnlock) {
-  test_mutex.Lock();
-  test_mutex.Unlock();
-  EXPECT_DEATH(
-      {
-        logging::AddImplementation(new util::DeathTestLogImplementation());
-        test_mutex.Unlock();
-      },
-      ".*multiple unlock.*");
-}
-
-// Sees what happens if you unlock without ever locking (or unlocking) it.
-TEST_F(MutexDeathTest, NeverLock) {
-  EXPECT_DEATH(
-      {
-        logging::AddImplementation(new util::DeathTestLogImplementation());
-        test_mutex.Unlock();
-      },
-      ".*multiple unlock.*");
-}
-#endif
-
-TEST_F(MutexTest, MutexLocker) {
-  {
-    aos::MutexLocker locker(&test_mutex);
-    EXPECT_FALSE(test_mutex.TryLock());
-  }
-  EXPECT_TRUE(test_mutex.TryLock());
-}
-
-TEST_F(MutexTest, MutexUnlocker) {
-  test_mutex.Lock();
-  {
-    aos::MutexUnlocker unlocker(&test_mutex);
-    // If this fails, then something weird is going on and the next line might
-    // hang, so fail immediately.
-    ASSERT_TRUE(test_mutex.TryLock());
-    test_mutex.Unlock();
-  }
-  EXPECT_FALSE(test_mutex.TryLock());
-}
-
-}  // namespace testing
-}  // namespace aos
diff --git a/aos/common/queue_test.cc b/aos/common/queue_test.cc
index dfafe6a..dfd18a2 100644
--- a/aos/common/queue_test.cc
+++ b/aos/common/queue_test.cc
@@ -53,7 +53,7 @@
   usleep(50000);
   my_test_queue.MakeWithBuilder().test_bool(true).test_int(0x971).Send();
   t.Join();
-  EXPECT_LE(t.threaded_test_queue.Age(), time::Time::InMS(55));
+  EXPECT_LE(t.threaded_test_queue.Age(), time::Time::InMS(57));
 }
 
 // Tests that we can send a message with the message pointer and get it back.
diff --git a/aos/common/util/run_command_test.cc b/aos/common/util/run_command_test.cc
index b440e47..daff3c5 100644
--- a/aos/common/util/run_command_test.cc
+++ b/aos/common/util/run_command_test.cc
@@ -2,6 +2,8 @@
 
 #include "gtest/gtest.h"
 
+#include "aos/common/util/thread.h"
+
 namespace aos {
 namespace util {
 namespace testing {
@@ -34,6 +36,26 @@
   EXPECT_EQ(SIGQUIT, WTERMSIG(result));
 }
 
+TEST(RunCommandTest, MultipleThreads) {
+  int result1, result2;
+  util::FunctionThread t1([&result1](util::Thread *) {
+    result1 = RunCommand("true");
+  });
+  util::FunctionThread t2([&result2](util::Thread *) {
+    result2 = RunCommand("true");
+  });
+  t1.Start();
+  t2.Start();
+  t1.WaitUntilDone();
+  t2.WaitUntilDone();
+  ASSERT_NE(-1, result1);
+  ASSERT_NE(-1, result2);
+  ASSERT_TRUE(WIFEXITED(result1));
+  ASSERT_TRUE(WIFEXITED(result2));
+  EXPECT_EQ(0, WEXITSTATUS(result1));
+  EXPECT_EQ(0, WEXITSTATUS(result2));
+}
+
 }  // namespace testing
 }  // namespace util
 }  // namespace aos
diff --git a/aos/common/util/thread.cc b/aos/common/util/thread.cc
index fab62eb..dbb638e 100644
--- a/aos/common/util/thread.cc
+++ b/aos/common/util/thread.cc
@@ -1,7 +1,8 @@
 #include "aos/common/util/thread.h"
 
 #include <pthread.h>
-#include <assert.h>
+
+#include "aos/common/logging/logging.h"
 
 namespace aos {
 namespace util {
@@ -10,24 +11,30 @@
 
 Thread::~Thread() {
   if (started_ && !joined_) {
-    assert(false);
+    CHECK(false);
   }
 }
 
 void Thread::Start() {
-  assert(!started_);
+  CHECK(!started_);
   started_ = true;
-  assert(pthread_create(&thread_, NULL, &Thread::StaticRun, this) == 0);
+  CHECK(pthread_create(&thread_, NULL, &Thread::StaticRun, this) == 0);
 }
 
 void Thread::Join() {
-  assert(!joined_ && started_);
+  CHECK(!joined_ && started_);
   joined_ = true;
   {
     MutexLocker locker(&should_terminate_mutex_);
     should_terminate_ = true;
   }
-  assert(pthread_join(thread_, NULL) == 0);
+  CHECK(pthread_join(thread_, NULL) == 0);
+}
+
+void Thread::WaitUntilDone() {
+  CHECK(!joined_ && started_);
+  joined_ = true;
+  CHECK(pthread_join(thread_, NULL) == 0);
 }
 
 void *Thread::StaticRun(void *self) {
diff --git a/aos/common/util/thread.h b/aos/common/util/thread.h
index ab6f09c..97123d5 100644
--- a/aos/common/util/thread.h
+++ b/aos/common/util/thread.h
@@ -1,7 +1,10 @@
 #ifndef AOS_COMMON_UTIL_THREAD_H_
 #define AOS_COMMON_UTIL_THREAD_H_
 
+#include <functional>
+
 #include "aos/common/mutex.h"
+#include "aos/common/macros.h"
 
 namespace aos {
 namespace util {
@@ -20,6 +23,9 @@
   // Asks the code to stop and then waits until it has done so.
   void Join();
 
+  // Waits until the code has stopped. Does not ask it to do so.
+  void WaitUntilDone();
+
  protected:
   // Subclasses need to call this periodically if they are going to loop to
   // check whether they have been asked to stop.
@@ -42,6 +48,21 @@
   bool joined_;
   bool should_terminate_;
   Mutex should_terminate_mutex_;
+
+  DISALLOW_COPY_AND_ASSIGN(Thread);
+};
+
+class FunctionThread : public Thread {
+ public:
+  FunctionThread(::std::function<void(FunctionThread *)> function)
+      : function_(function) {}
+
+ private:
+  virtual void Run() override {
+    function_(this);
+  }
+
+  const ::std::function<void(FunctionThread *)> function_;
 };
 
 }  // namespace util
diff --git a/aos/common/util/util.gyp b/aos/common/util/util.gyp
index 7c0adf3..fb29d7f 100644
--- a/aos/common/util/util.gyp
+++ b/aos/common/util/util.gyp
@@ -20,6 +20,7 @@
         'run_command',
         '<(EXTERNALS):gtest',
         '<(AOS)/build/aos.gyp:logging',
+        'thread',
       ],
     },
     {
diff --git a/aos/linux_code/ipc_lib/aos_sync.c b/aos/linux_code/ipc_lib/aos_sync.c
index 204322f..e6c6ba1 100644
--- a/aos/linux_code/ipc_lib/aos_sync.c
+++ b/aos/linux_code/ipc_lib/aos_sync.c
@@ -11,13 +11,10 @@
 
 #include "aos/common/logging/logging.h"
 
-// TODO(brians): Inline these in the new PI version.
+// TODO(brians): Inline this in the new PI version.
 #define cmpxchg(ptr, o, n) __sync_val_compare_and_swap(ptr, o, n)
-static inline uint32_t xchg(mutex *pointer, uint32_t value) {
-  uint32_t result;
-  __atomic_exchange(pointer, &value, &result, __ATOMIC_SEQ_CST);
-  return result;
-}
+
+#define ARM_EABI_INLINE_SYSCALL defined(__ARM_EABI__)
 
 // this code is based on something that appears to be based on
 //   <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful
@@ -33,6 +30,10 @@
 // can't use PRIVATE futex operations because they use the pid (or something) as
 //   part of the hash
 //
+// ThreadSanitizer understands how these mutexes etc work. It appears to be able
+// to figure out the happens-before relationship from the __ATOMIC_SEQ_CST
+// atomic primitives.
+//
 // Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
 // FUTEX_WAIT, the docs call it EWOULDBLOCK...)
 //
@@ -54,7 +55,7 @@
 
 static inline int sys_futex_wait(mutex *addr1, int val1,
                                  const struct timespec *timeout) {
-#ifdef __ARM_EABI__
+#if ARM_EABI_INLINE_SYSCALL
   register mutex *addr1_reg __asm__("r0") = addr1;
   register int op_reg __asm__("r1") = FUTEX_WAIT;
   register int val1_reg __asm__("r2") = val1;
@@ -75,7 +76,7 @@
 }
 
 static inline int sys_futex_wake(mutex *addr1, int val1) {
-#ifdef __ARM_EABI__
+#if ARM_EABI_INLINE_SYSCALL
   register mutex *addr1_reg __asm__("r0") = addr1;
   register int op_reg __asm__("r1") = FUTEX_WAKE;
   register int val1_reg __asm__("r2") = val1;
@@ -96,7 +97,7 @@
 
 static inline int sys_futex_requeue(mutex *addr1, int num_wake,
     int num_requeue, mutex *m) {
-#ifdef __ARM_EABI__
+#if ARM_EABI_INLINE_SYSCALL
   register mutex *addr1_reg __asm__("r0") = addr1;
   register int op_reg __asm__("r1") = FUTEX_REQUEUE;
   register int num_wake_reg __asm__("r2") = num_wake;
@@ -124,7 +125,7 @@
   c = cmpxchg(m, 0, 1);
   if (!c) return 0;
   /* The lock is now contended */
-  if (c == 1) c = xchg(m, 2);
+  if (c == 1) c = __atomic_exchange_n(m, 2, __ATOMIC_SEQ_CST);
   while (c) {
     /* Wait in the kernel */
     const int ret = sys_futex_wait(m, 2, timeout);
@@ -136,7 +137,7 @@
         return 2;
       }
     }
-    c = xchg(m, 2);
+    c = __atomic_exchange_n(m, 2, __ATOMIC_SEQ_CST);
   }
   return 0;
 }
@@ -152,7 +153,7 @@
 
 void mutex_unlock(mutex *m) {
   /* Unlock, and if not contended then exit. */
-  switch (xchg(m, 0)) {
+  switch (__atomic_exchange_n(m, 0, __ATOMIC_SEQ_CST)) {
     case 0:
       LOG(FATAL, "multiple unlock of %p\n", m);
     case 1:
@@ -192,7 +193,7 @@
   return 0;
 }
 int futex_set_value(mutex *m, mutex value) {
-  xchg(m, value);
+  __atomic_store_n(m, value, __ATOMIC_SEQ_CST);
   const int r = sys_futex_wake(m, INT_MAX - 4096);
   if (__builtin_expect((unsigned int)r > (unsigned int)-4096, 0)) {
     errno = -r;
@@ -205,7 +206,7 @@
   return futex_set_value(m, 1);
 }
 int futex_unset(mutex *m) {
-  return !xchg(m, 0);
+  return !__atomic_exchange_n(m, 0, __ATOMIC_SEQ_CST);
 }
 
 void condition_wait(mutex *c, mutex *m) {
@@ -233,7 +234,7 @@
     // If we got requeued above, this will just succeed the first time because
     // the person waking us from the above wait (changed to be on the mutex
     // instead of the condition) will have just set it to 0.
-    while (xchg(m, 2) != 0) {
+    while (__atomic_exchange_n(m, 2, __ATOMIC_SEQ_CST) != 0) {
       const int ret = sys_futex_wait(m, 2, NULL);
       if (ret != 0) {
         // Try again if it was because of a signal or somebody else unlocked it
diff --git a/aos/linux_code/ipc_lib/ipc_lib.gyp b/aos/linux_code/ipc_lib/ipc_lib.gyp
index 3c0c9fa..edca758 100644
--- a/aos/linux_code/ipc_lib/ipc_lib.gyp
+++ b/aos/linux_code/ipc_lib/ipc_lib.gyp
@@ -65,6 +65,7 @@
         '<(AOS)/common/common.gyp:queue_testutils',
         '<(AOS)/common/common.gyp:time',
         '<(AOS)/common/common.gyp:die',
+        '<(AOS)/common/util/util.gyp:thread',
       ],
     },
     {
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index 7c19b30..0447bef 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -62,20 +62,10 @@
   }
 
   void ref_count_sub() {
-    // TODO(brians): Take the #ifdef out once clang can handle the
-    // __atomic_*_fetch variants which could be more efficient.
-#ifdef __clang__
-    __atomic_fetch_sub(&ref_count_, 1, __ATOMIC_RELAXED);
-#else
     __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
-#endif
   }
   void ref_count_add() {
-#ifdef __clang__
-    __atomic_fetch_add(&ref_count_, 1, __ATOMIC_RELAXED);
-#else
     __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED);
-#endif
   }
 
  private:
diff --git a/aos/linux_code/ipc_lib/raw_queue_test.cc b/aos/linux_code/ipc_lib/raw_queue_test.cc
index 4196286..65f2308 100644
--- a/aos/linux_code/ipc_lib/raw_queue_test.cc
+++ b/aos/linux_code/ipc_lib/raw_queue_test.cc
@@ -16,6 +16,7 @@
 #include "aos/common/time.h"
 #include "aos/common/logging/logging.h"
 #include "aos/common/die.h"
+#include "aos/common/util/thread.h"
 
 using ::testing::AssertionResult;
 using ::testing::AssertionSuccess;
@@ -925,5 +926,28 @@
   EXPECT_EQ(free_before + 2, queue->FreeMessages());
 }
 
+// Makes sure that ThreadSanitizer doesn't catch any issues freeing from
+// multiple threads at once.
+TEST_F(RawQueueTest, MultiThreadedFree) {
+  RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+  PushMessage(queue, 971);
+  int free_before = queue->FreeMessages();
+
+  const void *const message1 = queue->ReadMessage(RawQueue::kPeek);
+  const void *const message2 = queue->ReadMessage(RawQueue::kPeek);
+  EXPECT_EQ(free_before, queue->FreeMessages());
+  util::FunctionThread t1([message1, queue](util::Thread *) {
+    queue->FreeMessage(message1);
+  });
+  util::FunctionThread t2([message2, queue](util::Thread *) {
+    queue->FreeMessage(message2);
+  });
+  t1.Start();
+  t2.Start();
+  t1.WaitUntilDone();
+  t2.WaitUntilDone();
+  EXPECT_EQ(free_before, queue->FreeMessages());
+}
+
 }  // namespace testing
 }  // namespace aos