copying condition-class branch over from my 2012 repo
diff --git a/aos/atom_code/ipc_lib/aos_sync.c b/aos/atom_code/ipc_lib/aos_sync.c
index 4c14a35..29f3081 100644
--- a/aos/atom_code/ipc_lib/aos_sync.c
+++ b/aos/atom_code/ipc_lib/aos_sync.c
@@ -14,6 +14,7 @@
// this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
// should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
+// (sys_set_robust_list appears to be the function name)
// <http://locklessinc.com/articles/futex_cheat_sheet/> and <http://locklessinc.com/articles/mutex_cv_futex/> are useful
// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009 (fairly recent compared to everything else...)
// can't use PRIVATE futex operations because they use the pid (or something) as part of the hash
diff --git a/aos/atom_code/ipc_lib/aos_sync.h b/aos/atom_code/ipc_lib/aos_sync.h
index d28ae3a..80364e9 100644
--- a/aos/atom_code/ipc_lib/aos_sync.h
+++ b/aos/atom_code/ipc_lib/aos_sync.h
@@ -32,8 +32,15 @@
} condition_variable;
// All return -1 for other error (which will be in errno from futex(2)).
+//
+// There is no priority inversion protection.
+// TODO(brians) look at using
+// <http://www.kernel.org/doc/Documentation/pi-futex.txt>
// Returns 1 if interrupted by a signal.
+//
+// One of the highest priority processes blocked on a given mutex will be the
+// one to lock it when it is unlocked.
int mutex_lock(mutex *m) __attribute__((warn_unused_result));
// Returns 2 if it timed out or 1 if interrupted by a signal.
int mutex_lock_timeout(mutex *m, const struct timespec *timeout)
@@ -55,11 +62,14 @@
int futex_wait(mutex *m) __attribute__((warn_unused_result));
// Set the futex and wake up anybody waiting on it.
// Returns the number that were woken or -1.
+//
+// This will always wake up all waiters at the same time.
int futex_set(mutex *m);
// Same as above except lets something other than 1 be used as the final value.
int futex_set_value(mutex *m, mutex value);
// Unsets the futex.
// Returns 0 if it was set before and 1 if it wasn't.
+// Can not fail.
int futex_unset(mutex *m);
// The condition_ functions implement condition variable support. The API is
diff --git a/aos/atom_code/ipc_lib/condition.cc b/aos/atom_code/ipc_lib/condition.cc
new file mode 100644
index 0000000..1524773
--- /dev/null
+++ b/aos/atom_code/ipc_lib/condition.cc
@@ -0,0 +1,54 @@
+#include "aos/common/condition.h"
+
+#include <inttypes.h>
+
+#include "aos/common/type_traits.h"
+
+namespace aos {
+
+static_assert(shm_ok<Condition>::value, "Condition should work"
+ " in shared memory");
+
+Condition::Condition() : impl_(0) {}
+
+bool Condition::Wait() {
+ switch (condition_wait(&impl_)) {
+ case 1:
+ return false;
+ case 0:
+ return true;
+ default:
+ if (errno != EINTR) {
+ LOG(FATAL, "condition_wait(%p(=%"PRIu32")) failed because of %d: %s\n",
+ &impl_, impl_, errno, strerror(errno));
+ }
+ return false;
+ }
+}
+bool Condition::WaitNext() {
+ switch (condition_wait_force(&impl_)) {
+ case 1:
+ return false;
+ case 0:
+ return true;
+ default:
+ if (errno != EINTR) {
+ LOG(FATAL, "condition_wait_force(%p(=%"PRIu32")) failed"
+ " because of %d: %s\n", &impl_, impl_, errno, strerror(errno));
+ }
+ return false;
+ }
+}
+
+void Condition::Set() {
+ if (condition_set(&impl_) == -1) {
+ LOG(FATAL, "condition_set(%p(=%"PRIu32")) failed because of %d: %s\n",
+ &impl_, impl_, errno, strerror(errno));
+ }
+}
+void Condition::Unset() {
+ // can not fail
+ condition_unset(&impl_);
+}
+
+} // namespace aos
diff --git a/aos/common/condition.h b/aos/common/condition.h
new file mode 100644
index 0000000..c527f34
--- /dev/null
+++ b/aos/common/condition.h
@@ -0,0 +1,62 @@
+#ifndef AOS_COMMON_CONDITION_H_
+#define AOS_COMMON_CONDITION_H_
+
+#ifdef __VXWORKS__
+#include <semLib.h>
+#endif
+
+#include "aos/aos_core.h"
+#include "aos/common/mutex.h"
+
+namespace aos {
+
+// A condition variable (IPC mechanism where 1 process/task can notify all
+// others that are waiting for something to happen).
+// There are implementations for both the atom and the cRIO.
+// They both LOG(FATAL) if anything weird happens.
+//
+// A condition is either set or unset, and multiple processes/tasks can wait on
+// one for it to be set.
+class Condition {
+ public:
+ // Creates an unset condition.
+ Condition();
+#ifdef __VXWORKS__
+ // Will not make sure that it is either set or unset.
+ ~Condition();
+#endif
+ // Waits for the condition to be set. Will return true immediately if it is
+ // already set.
+ // Returns false if returning before a confirmed condition set, although doing
+ // anything very useful with the return value is difficult because the
+ // condition may have been set (and possibly even unset again) between the
+ // time when the system call to block returned and this function returns.
+ bool Wait();
+ // Waits for the next Set(), regardless of whether or not the condition is
+ // currently set.
+ // Same return value as Wait().
+ bool WaitNext();
+
+ // Sets the condition. Any processes/tasks that are currently Wait()ing will
+ // continue.
+ // All implementations will wake all waiting processes/tasks at once so that
+ // the highest priority one(s) will run before others like usual.
+ void Set();
+ // Unsets the condition.
+ void Unset();
+
+ private:
+#ifdef __VXWORKS__
+ // Always empty. Used to make tasks wait and then gets flushed to unblock all
+ // of them.
+ SEM_ID wait_;
+ // Whether or not the conditon is set.
+ bool set_;
+#else
+ mutex impl_;
+#endif
+};
+
+} // namespace aos
+
+#endif // AOS_COMMON_CONDITION_H_
diff --git a/aos/common/mutex.h b/aos/common/mutex.h
index 035889b..4c06aae 100644
--- a/aos/common/mutex.h
+++ b/aos/common/mutex.h
@@ -64,6 +64,20 @@
Mutex *mutex_;
DISALLOW_COPY_AND_ASSIGN(MutexLocker);
};
+// The inverse of MutexLocker.
+class MutexUnlocker {
+ public:
+ explicit MutexUnlocker(Mutex *mutex) : mutex_(mutex) {
+ mutex_->Unlock();
+ }
+ ~MutexUnlocker() {
+ mutex_->Lock();
+ }
+
+ private:
+ Mutex *mutex_;
+ DISALLOW_COPY_AND_ASSIGN(MutexUnlocker);
+};
} // namespace aos
diff --git a/aos/common/mutex_test.cpp b/aos/common/mutex_test.cpp
index a5d5e86..6af126b 100644
--- a/aos/common/mutex_test.cpp
+++ b/aos/common/mutex_test.cpp
@@ -1,7 +1,16 @@
#include "aos/common/mutex.h"
+#include <sched.h>
+#include <math.h>
+#include <pthread.h>
+#ifdef __VXWORKS__
+#include <taskLib.h>
+#endif
+
#include "gtest/gtest.h"
+#include "aos/common/condition.h"
+
namespace aos {
namespace testing {
@@ -50,6 +59,116 @@
}
EXPECT_TRUE(test_mutex.TryLock());
}
+TEST_F(MutexTest, MutexUnlocker) {
+ test_mutex.Lock();
+ {
+ aos::MutexUnlocker unlocker(&test_mutex);
+ // If this fails, then something weird is going on and the next line might
+ // hang.
+ ASSERT_TRUE(test_mutex.TryLock());
+ test_mutex.Unlock();
+ }
+ EXPECT_TRUE(test_mutex.TryLock());
+}
+
+// A worker thread for testing the fairness of the mutex implementation.
+class MutexFairnessWorkerThread {
+ public:
+ MutexFairnessWorkerThread(int *cycles, int index,
+ Mutex *mutex, Condition *start)
+ : cycles_(cycles), index_(index), mutex_(mutex), start_(start) {}
+
+ static void *RunStatic(void *self_in) {
+ MutexFairnessWorkerThread *self =
+ static_cast<MutexFairnessWorkerThread *>(self_in);
+ self->Run();
+ delete self;
+ return NULL;
+ }
+
+ static void Reset(int cycles) {
+ cyclesRun = 0;
+ totalCycles = cycles;
+ }
+
+ private:
+ void Run() {
+ cycles_[index_] = 0;
+ start_->Wait();
+ while (cyclesRun < totalCycles) {
+ {
+ MutexLocker locker(mutex_);
+ ++cyclesRun;
+ }
+ ++cycles_[index_];
+ // Otherwise the fitpc implementation tends to just relock in the same
+ // thread.
+ sched_yield();
+ }
+
+#ifdef __VXWORKS__
+ // Without this, all of the "task ... deleted ..." messages come out at
+ // once, and it looks weird and triggers an socat bug (at least for
+ // Squeeze's version 1.7.1.3-1).
+ taskDelay(index_);
+#endif
+ }
+
+ int *cycles_;
+ int index_;
+ Mutex *mutex_;
+ Condition *start_;
+ static int cyclesRun, totalCycles;
+};
+int MutexFairnessWorkerThread::cyclesRun;
+int MutexFairnessWorkerThread::totalCycles;
+// Tests the fairness of the implementation. It does this by repeatedly locking
+// and unlocking a mutex in multiple threads and then checking the standard
+// deviation of the number of times each one locks.
+//
+// It is safe to do this with threads because this is the test so it can change
+// if the implementations ever change to not support that. Fitpc logging calls
+// are not thread-safe, but it doesn't really matter because the only logging
+// call that would get made would be a LOG(FATAL) that would still terminate the
+// process.
+TEST_F(MutexTest, Fairness) {
+ static const int kThreads = 13;
+#ifdef __VXWORKS__
+ static const int kWarmupCycles = 1000, kRunCycles = 60000, kMaxDeviation = 20;
+#else
+ static const int kWarmupCycles = 30000, kRunCycles = 3000000, kMaxDeviation = 10000;
+#endif
+
+ int cycles[kThreads];
+ pthread_t workers[kThreads];
+ Condition start;
+
+ for (int repeats = 0; repeats < 2; ++repeats) {
+ MutexFairnessWorkerThread::Reset(repeats ? kRunCycles : kWarmupCycles);
+ start.Unset();
+ for (int i = 0; i < kThreads; ++i) {
+ MutexFairnessWorkerThread *c = new MutexFairnessWorkerThread(cycles, i,
+ &test_mutex,
+ &start);
+ ASSERT_EQ(0, pthread_create(&workers[i], NULL,
+ MutexFairnessWorkerThread::RunStatic, c));
+ }
+ start.Set();
+ for (int i = 0; i < kThreads; ++i) {
+ ASSERT_EQ(0, pthread_join(workers[i], NULL));
+ }
+ }
+
+ double variance = 0;
+ int expected = kRunCycles / kThreads;
+ for (int i = 0; i < kThreads; ++i) {
+ variance += (cycles[i] - expected) * (cycles[i] - expected);
+ }
+ double deviation = sqrt(variance / kThreads);
+ printf("deviation=%f\n", deviation);
+ ASSERT_GT(deviation, 0);
+ EXPECT_LT(deviation, kMaxDeviation);
+}
} // namespace testing
} // namespace aos