fixed some bugs

The API for the condition variables was broken, so I changed that and
then fixed RawQueue to use it correctly. I also found a bug in the
condition variable implementation using the tests.
diff --git a/aos/common/common.gyp b/aos/common/common.gyp
index c867193..eee1d05 100644
--- a/aos/common/common.gyp
+++ b/aos/common/common.gyp
@@ -271,6 +271,23 @@
       ],
     },
     {
+      'target_name': 'condition_test',
+      'type': 'executable',
+      'sources': [
+        'condition_test.cc',
+      ],
+      'dependencies': [
+        '<(EXTERNALS):gtest',
+        'condition',
+        '<(AOS)/common/util/util.gyp:thread',
+        'time',
+        'mutex',
+        '<(AOS)/build/aos.gyp:logging',
+        'queue_testutils',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:core_lib',
+       ],
+    },
+    {
       'target_name': 'die_test',
       'type': 'executable',
       'sources': [
diff --git a/aos/common/condition.h b/aos/common/condition.h
index 956dc6c..346ae54 100644
--- a/aos/common/condition.h
+++ b/aos/common/condition.h
@@ -14,7 +14,8 @@
 // exactly 1 mutex must be associated with each condition variable.
 class Condition {
  public:
-  // m is the mutex that will be associated with this condition variable.
+  // m is the mutex that will be associated with this condition variable. This
+  // object will hold on to a reference to it but does not take ownership.
   explicit Condition(Mutex *m);
 
   // Waits for the condition variable to be signalled, atomically unlocking m at
@@ -25,16 +26,18 @@
   // Signals at most 1 other process currently Wait()ing on this condition
   // variable. Calling this does not require the mutex associated with this
   // condition variable to be locked.
-  // One of the processes with the highest priority level will be woken if there
-  // are multiple ones.
+  // One of the processes with the highest priority level will be woken.
   void Signal();
   // Wakes all processes that are currently Wait()ing on this condition
   // variable. Calling this does not require the mutex associated with this
   // condition variable to be locked.
   void Broadcast();
 
+  // Retrieves the mutex associated with this condition variable.
+  Mutex *m() { return m_; }
+
  private:
-  condition_variable impl_;
+  mutex impl_;
   Mutex *m_;
 };
 
diff --git a/aos/common/condition_test.cc b/aos/common/condition_test.cc
new file mode 100644
index 0000000..ddc12d5
--- /dev/null
+++ b/aos/common/condition_test.cc
@@ -0,0 +1,263 @@
+#include "aos/common/condition.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "gtest/gtest.h"
+
+#include "aos/common/util/thread.h"
+#include "aos/common/time.h"
+#include "aos/common/mutex.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+#include "aos/common/logging/logging.h"
+
+using ::aos::time::Time;
+using ::aos::common::testing::GlobalCoreInstance;
+
+namespace aos {
+namespace testing {
+
+class ConditionTest : public ::testing::Test {
+ public:
+  struct Shared {
+    Shared() : condition(&mutex) {}
+
+    Mutex mutex;
+    Condition condition;
+  };
+  static_assert(shm_ok<Shared>::value,
+                "it's going to get shared between forked processes");
+
+  ConditionTest() : shared_(static_cast<Shared *>(shm_malloc(sizeof(Shared)))) {
+    new (shared_) Shared();
+  }
+
+  GlobalCoreInstance my_core;
+
+  Shared *const shared_;
+
+  void Settle() {
+    time::SleepFor(::Time::InSeconds(0.009));
+  }
+};
+
+class ConditionTestProcess {
+ public:
+  enum class Action {
+    kWaitLockStart,  // lock, delay, wait, unlock
+    kWait,  // delay, lock, wait, unlock
+    kWaitNoUnlock,  // delay, lock, wait
+    kSignal,  // delay, signal
+    kBroadcast,  // delay, broadcast
+  };
+
+  // This amount gets added to any passed in delay to make the test repeatable.
+  static constexpr ::Time kMinimumDelay = ::Time::InSeconds(0.015);
+  static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.06);
+
+  // delay is how long to wait before doing action to condition.
+  // timeout is how long to wait after delay before deciding that it's hung.
+  ConditionTestProcess(const ::Time &delay, Action action, Condition *condition,
+                       const ::Time &timeout = kDefaultTimeout)
+    : delay_(kMinimumDelay + delay), action_(action), condition_(condition),
+      timeout_(delay_ + timeout), child_(-1),
+      shared_(static_cast<Shared *>(shm_malloc(sizeof(Shared)))) {
+    new (shared_) Shared();
+  }
+  ~ConditionTestProcess() {
+    assert(child_ == -1);
+  }
+
+  void Start() {
+    ASSERT_FALSE(shared_->started);
+
+    child_ = fork();
+    if (child_ == 0) {  // in child
+      Run();
+      exit(EXIT_SUCCESS);
+    } else {  // in parent
+      assert(child_ != -1);
+
+      shared_->ready.Lock();
+
+      shared_->started = true;
+    }
+  }
+
+  bool IsFinished() {
+    return shared_->finished;
+  }
+
+  ::testing::AssertionResult Hung() {
+    if (!shared_->started) {
+      ADD_FAILURE();
+      return ::testing::AssertionFailure() << "not started yet";
+    }
+    if (shared_->finished) {
+      Join();
+      return ::testing::AssertionFailure() << "already returned";
+    }
+    if (shared_->delayed) {
+      if (shared_->start_time > ::Time::Now() + timeout_) {
+        Kill();
+        return ::testing::AssertionSuccess() << "already been too long";
+      }
+    } else {
+      shared_->done_delaying.Lock();
+    }
+    time::SleepFor(::Time::InSeconds(0.01));
+    if (!shared_->finished) time::SleepUntil(shared_->start_time + timeout_);
+    if (shared_->finished) {
+      Join();
+      return ::testing::AssertionFailure() << "completed within timeout";
+    } else {
+      Kill();
+      return ::testing::AssertionSuccess() << "took too long";
+    }
+  }
+  ::testing::AssertionResult Test() {
+    Start();
+    return Hung();
+  }
+
+ private:
+  struct Shared {
+    Shared()
+      : started(false), delayed(false), start_time(0, 0), finished(false) {
+      done_delaying.Lock();
+      ready.Lock();
+    }
+
+    bool started;
+    bool delayed;
+    Mutex done_delaying;
+    ::Time start_time;
+    bool finished;
+    Mutex ready;
+  };
+  static_assert(shm_ok<Shared>::value,
+                "it's going to get shared between forked processes");
+
+  void Run() {
+    if (action_ == Action::kWaitLockStart) {
+      shared_->ready.Unlock();
+      condition_->m()->Lock();
+    }
+    time::SleepFor(delay_);
+    shared_->start_time = ::Time::Now();
+    shared_->delayed = true;
+    shared_->done_delaying.Unlock();
+    switch (action_) {
+      case Action::kWait:
+      case Action::kWaitNoUnlock:
+        shared_->ready.Unlock();
+        condition_->m()->Lock();
+      case Action::kWaitLockStart:
+        condition_->Wait();
+        break;
+      case Action::kSignal:
+        shared_->ready.Unlock();
+        condition_->Signal();
+        break;
+      case Action::kBroadcast:
+        shared_->ready.Unlock();
+        condition_->Broadcast();
+        break;
+    }
+    shared_->finished = true;
+    if (action_ == Action::kWait || action_ == Action::kWaitLockStart) {
+      condition_->m()->Unlock();
+    }
+  }
+
+  void Join() {
+    assert(child_ != -1);
+    int status;
+    do {
+      assert(waitpid(child_, &status, 0) == child_);
+    } while (!(WIFEXITED(status) || WIFSIGNALED(status)));
+    child_ = -1;
+  }
+  void Kill() {
+    assert(child_ != -1);
+    assert(kill(child_, SIGTERM) == 0);
+    Join();
+  }
+
+  const ::Time delay_;
+  const Action action_;
+  Condition *const condition_;
+  const ::Time timeout_;
+
+  pid_t child_;
+
+  Shared *const shared_;
+};
+constexpr ::Time ConditionTestProcess::kMinimumDelay;
+constexpr ::Time ConditionTestProcess::kDefaultTimeout;
+
+// Makes sure that the testing framework and everything work for a really simple
+// Wait() and then Signal().
+TEST_F(ConditionTest, Basic) {
+  ConditionTestProcess thread(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  thread.Start();
+  Settle();
+  EXPECT_FALSE(thread.IsFinished());
+  shared_->condition.Signal();
+  EXPECT_FALSE(thread.Hung());
+}
+
+// Makes sure that the worker thread locks before it tries to Wait() etc.
+TEST_F(ConditionTest, Locking) {
+  ConditionTestProcess thread(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  shared_->mutex.Lock();
+  thread.Start();
+  Settle();
+  // This Signal() shouldn't do anything because the thread should still be
+  // waiting to lock the mutex.
+  shared_->condition.Signal();
+  Settle();
+  shared_->mutex.Unlock();
+  EXPECT_TRUE(thread.Hung());
+}
+
+// Tests that the work thread only catches a Signal() after the mutex gets
+// unlocked.
+TEST_F(ConditionTest, LockFirst) {
+  ConditionTestProcess thread(::Time(0, 0),
+                              ConditionTestProcess::Action::kWait,
+                              &shared_->condition);
+  shared_->mutex.Lock();
+  thread.Start();
+  Settle();
+  shared_->condition.Signal();
+  Settle();
+  EXPECT_FALSE(thread.IsFinished());
+  shared_->mutex.Unlock();
+  Settle();
+  EXPECT_FALSE(thread.IsFinished());
+  shared_->condition.Signal();
+  EXPECT_FALSE(thread.Hung());
+}
+
+// Tests that the mutex gets relocked after Wait() returns.
+TEST_F(ConditionTest, Relocking) {
+  ConditionTestProcess thread(::Time(0, 0),
+                              ConditionTestProcess::Action::kWaitNoUnlock,
+                              &shared_->condition);
+  thread.Start();
+  Settle();
+  shared_->condition.Signal();
+  EXPECT_FALSE(thread.Hung());
+  EXPECT_FALSE(shared_->mutex.TryLock());
+}
+
+}  // namespace testing
+}  // namespace aos
diff --git a/aos/common/mutex_test.cpp b/aos/common/mutex_test.cpp
index 7a5c2ce..d9327a1 100644
--- a/aos/common/mutex_test.cpp
+++ b/aos/common/mutex_test.cpp
@@ -59,12 +59,13 @@
   }
   EXPECT_TRUE(test_mutex.TryLock());
 }
+
 TEST_F(MutexTest, MutexUnlocker) {
   test_mutex.Lock();
   {
     aos::MutexUnlocker unlocker(&test_mutex);
     // If this fails, then something weird is going on and the next line might
-    // hang.
+    // hang, so fail immediately.
     ASSERT_TRUE(test_mutex.TryLock());
     test_mutex.Unlock();
   }
@@ -122,6 +123,7 @@
 };
 int MutexFairnessWorkerThread::cyclesRun;
 int MutexFairnessWorkerThread::totalCycles;
+
 // Tests the fairness of the implementation. It does this by repeatedly locking
 // and unlocking a mutex in multiple threads and then checking the standard
 // deviation of the number of times each one locks.
diff --git a/aos/common/queue_testutils.cc b/aos/common/queue_testutils.cc
index 1d47e62..629c9be 100644
--- a/aos/common/queue_testutils.cc
+++ b/aos/common/queue_testutils.cc
@@ -1,6 +1,7 @@
 #include "aos/common/queue_testutils.h"
 
 #include <string.h>
+#include <sys/mman.h>
 
 #include "gtest/gtest.h"
 
@@ -110,15 +111,18 @@
 
 Once<void> enable_test_logging_once(DoEnableTestLogging);
 
+const size_t kCoreSize = 0x100000;
+
 }  // namespace
 
 GlobalCoreInstance::GlobalCoreInstance() {
-  const size_t kCoreSize = 0x100000;
   global_core = &global_core_data_;
   global_core->owner = 1;
-  void *memory = malloc(kCoreSize);
-  assert(memory != NULL);
-  memset(memory, 0, kCoreSize);
+  // Use mmap(2) manually so that we can pass MAP_SHARED so that forked
+  // processes can still communicate using the "shared" memory.
+  void *memory = mmap(NULL, kCoreSize, PROT_READ | PROT_WRITE,
+                      MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+  assert(memory != MAP_FAILED);
 
   assert(aos_core_use_address_as_shared_mem(memory, kCoreSize) == 0);
 
@@ -126,7 +130,7 @@
 }
 
 GlobalCoreInstance::~GlobalCoreInstance() {
-  free(global_core->mem_struct);
+  assert(munmap(global_core->mem_struct, kCoreSize) == 0);
   global_core = NULL;
 }