redo the aos_sync API and add PI support
Previously, it didn't have different types for mutexes vs futexes and
the one common type was poorly named. It's hard to split that change out
from adding PI support, so they're both together. Adding PI support is
important because we have many places (ie the logging queue) where high-priority
and low-priority code interact heavily.
This change adds some small parts of robustness support, but they all
result in CHECK/assert failures if triggered.
Change-Id: I841ccee52568c32d453ed14f930430debbd8d78e
diff --git a/aos/common/common.gyp b/aos/common/common.gyp
index 693f717..5a16c08 100644
--- a/aos/common/common.gyp
+++ b/aos/common/common.gyp
@@ -233,7 +233,6 @@
'<(AOS)/build/aos.gyp:logging_interface',
],
'export_dependent_settings': [
- 'mutex',
'<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:aos_sync',
],
},
@@ -241,14 +240,16 @@
'target_name': 'mutex',
'type': 'static_library',
'sources': [
- '<(AOS)/linux_code/ipc_lib/mutex.cpp',
+ '<(AOS)/linux_code/ipc_lib/mutex.cc',
],
'dependencies': [
'<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:aos_sync',
'<(AOS)/build/aos.gyp:logging_interface',
+ 'die',
],
'export_dependent_settings': [
'<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:aos_sync',
+ 'die',
],
},
{
@@ -265,6 +266,7 @@
'<(AOS)/common/util/util.gyp:death_test_log_implementation',
'<(AOS)/common/util/util.gyp:thread',
'<(AOS)/common/common.gyp:time',
+ 'queue_testutils',
],
},
{
@@ -282,6 +284,7 @@
'<(AOS)/build/aos.gyp:logging',
'queue_testutils',
'<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:core_lib',
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:aos_sync',
'die',
'<(AOS)/common/util/util.gyp:thread',
],
diff --git a/aos/common/condition.h b/aos/common/condition.h
index ef51b89..eccc0da 100644
--- a/aos/common/condition.h
+++ b/aos/common/condition.h
@@ -1,18 +1,18 @@
#ifndef AOS_COMMON_CONDITION_H_
#define AOS_COMMON_CONDITION_H_
-#include "aos/common/mutex.h"
#include "aos/linux_code/ipc_lib/aos_sync.h"
namespace aos {
+class Mutex;
+
// A condition variable (IPC mechanism where 1 process/task can notify all
// others that are waiting for something to happen) without the race condition
// where a notification is sent after some process has checked if the thing has
// happened but before it has started listening for notifications.
//
-// This implementation will print debugging information and abort the process
-// if anything weird happens.
+// This implementation will LOG(FATAL) if anything weird happens.
//
// A simple example of the use of a condition variable (adapted from
// pthread_cond(3)):
@@ -41,7 +41,7 @@
// the mutex and the Wait()er(s) relocking it.
//
// Multiple condition variables may be associated with the same mutex but
-// exactly 1 mutex must be associated with each condition variable.
+// exactly one mutex must be associated with each condition variable.
class Condition {
public:
// m is the mutex that will be associated with this condition variable. This
@@ -55,12 +55,15 @@
// NOTE: The relocking of the mutex is not performed atomically with waking
// up.
// Returns false.
- bool Wait();
+ bool Wait() __attribute__((warn_unused_result));
- // Signals at most 1 other process currently Wait()ing on this condition
+ // Signals approximately 1 other process currently Wait()ing on this condition
// variable. Calling this does not require the mutex associated with this
// condition variable to be locked.
// One of the processes with the highest priority level will be woken.
+ // If there aren't any waiting at the time, none will be woken. There is a
+ // small race condition in the Linux implementation that can result in more
+ // than 1 being woken.
void Signal();
// Wakes all processes that are currently Wait()ing on this condition
// variable. Calling this does not require the mutex associated with this
@@ -71,7 +74,7 @@
Mutex *m() { return m_; }
private:
- mutex impl_;
+ aos_condition impl_;
Mutex *m_;
};
diff --git a/aos/common/condition_test.cc b/aos/common/condition_test.cc
index 7b9145d..05990e5 100644
--- a/aos/common/condition_test.cc
+++ b/aos/common/condition_test.cc
@@ -15,6 +15,7 @@
#include "aos/linux_code/ipc_lib/core_lib.h"
#include "aos/common/logging/logging.h"
#include "aos/common/macros.h"
+#include "aos/linux_code/ipc_lib/aos_sync.h"
#include "aos/common/die.h"
#include "aos/common/util/thread.h"
@@ -88,6 +89,9 @@
ConditionTest() : shared_(static_cast<Shared *>(shm_malloc(sizeof(Shared)))) {
new (shared_) Shared();
}
+ ~ConditionTest() {
+ shared_->~Shared();
+ }
GlobalCoreInstance my_core;
@@ -111,8 +115,8 @@
};
// This amount gets added to any passed in delay to make the test repeatable.
- static constexpr ::Time kMinimumDelay = ::Time::InSeconds(0.015);
- static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.09);
+ static constexpr ::Time kMinimumDelay = ::Time::InSeconds(0.15);
+ static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.15);
// delay is how long to wait before doing action to condition.
// timeout is how long to wait after delay before deciding that it's hung.
@@ -138,7 +142,7 @@
} else { // in parent
CHECK_NE(child_, -1);
- shared_->ready.Lock();
+ ASSERT_EQ(0, futex_wait(&shared_->ready));
shared_->started = true;
}
@@ -163,7 +167,7 @@
return ::testing::AssertionSuccess() << "already been too long";
}
} else {
- shared_->done_delaying.Lock();
+ CHECK_EQ(0, futex_wait(&shared_->done_delaying));
}
time::SleepFor(::Time::InSeconds(0.01));
if (!shared_->finished) time::SleepUntil(shared_->start_time + timeout_);
@@ -183,35 +187,35 @@
private:
struct Shared {
Shared()
- : started(false), delayed(false), start_time(0, 0), finished(false) {
- done_delaying.Lock();
- ready.Lock();
+ : started(false), delayed(false), done_delaying(0), start_time(0, 0),
+ finished(false), ready(0) {
}
volatile bool started;
volatile bool delayed;
- Mutex done_delaying;
+ aos_futex done_delaying;
::Time start_time;
volatile bool finished;
- Mutex ready;
+ aos_futex ready;
};
static_assert(shm_ok<Shared>::value,
"it's going to get shared between forked processes");
void Run() {
if (action_ == Action::kWaitLockStart) {
- shared_->ready.Unlock();
- condition_->m()->Lock();
+ ASSERT_EQ(1, futex_set(&shared_->ready));
+ ASSERT_FALSE(condition_->m()->Lock());
}
time::SleepFor(delay_);
shared_->start_time = ::Time::Now();
shared_->delayed = true;
- shared_->done_delaying.Unlock();
+ ASSERT_NE(-1, futex_set(&shared_->done_delaying));
if (action_ != Action::kWaitLockStart) {
- shared_->ready.Unlock();
- condition_->m()->Lock();
+ ASSERT_EQ(1, futex_set(&shared_->ready));
+ ASSERT_FALSE(condition_->m()->Lock());
}
- condition_->Wait();
+ // TODO(brians): Test this returning true (aka the owner dying).
+ ASSERT_FALSE(condition_->Wait());
shared_->finished = true;
if (action_ != Action::kWaitNoUnlock) {
condition_->m()->Unlock();
@@ -264,7 +268,7 @@
ConditionTestProcess child(::Time(0, 0),
ConditionTestProcess::Action::kWait,
&shared_->condition);
- shared_->mutex.Lock();
+ ASSERT_FALSE(shared_->mutex.Lock());
child.Start();
Settle();
// This Signal() shouldn't do anything because the child should still be
@@ -281,7 +285,7 @@
ConditionTestProcess child(::Time(0, 0),
ConditionTestProcess::Action::kWait,
&shared_->condition);
- shared_->mutex.Lock();
+ ASSERT_FALSE(shared_->mutex.Lock());
child.Start();
Settle();
shared_->condition.Signal();
@@ -303,7 +307,7 @@
Settle();
shared_->condition.Signal();
EXPECT_FALSE(child.Hung());
- EXPECT_FALSE(shared_->mutex.TryLock());
+ EXPECT_EQ(Mutex::State::kUnlocked, shared_->mutex.TryLock());
}
// Tests that Signal() stops exactly 1 Wait()er.
diff --git a/aos/common/mutex.h b/aos/common/mutex.h
index 251eb3c..db1b012 100644
--- a/aos/common/mutex.h
+++ b/aos/common/mutex.h
@@ -1,87 +1,104 @@
#ifndef AOS_COMMON_MUTEX_H_
#define AOS_COMMON_MUTEX_H_
-#ifdef __VXWORKS__
-#include <semLib.h>
-#endif
-
#include "aos/common/macros.h"
-#include "aos/common/type_traits.h"
#include "aos/linux_code/ipc_lib/aos_sync.h"
+#include "aos/common/die.h"
namespace aos {
class Condition;
-// An abstraction of a mutex that has implementations both for
-// linux and for the cRIO.
-// If there are multiple tasks or processes contending for the mutex,
+// An abstraction of a mutex that is easy to implement for environments other
+// than Linux too.
+// If there are multiple threads or processes contending for the mutex,
// higher priority ones will succeed in locking first,
// and tasks of equal priorities have the same chance of getting the lock.
-// There is no priority inversion protection.
+// To deal with priority inversion, the linux implementation does priority
+// inheritance.
+// Before destroying a mutex, it is important to make sure it isn't locked.
+// Otherwise, the destructor will LOG(FATAL).
class Mutex {
public:
+ enum class State {
+ kLocked, kUnlocked
+ };
+
// Creates an unlocked mutex.
Mutex();
-#ifdef __VXWORKS__
- // Will not make sure that it is either locked or unlocked.
+ // Verifies that it isn't locked.
+ //
+ // This is important because freeing a locked mutex means there is freed
+ // memory in the middle of the robust list, which breaks things horribly.
~Mutex();
-#endif
+
// Locks the mutex. If it fails, it calls LOG(FATAL).
// Returns false.
- bool Lock();
+ bool Lock() __attribute__((warn_unused_result));
// Unlocks the mutex. Fails like Lock.
// Multiple unlocking is undefined.
void Unlock();
// Locks the mutex unless it is already locked.
- // Returns whether it succeeded or not.
+ // Returns the new state of the mutex.
// Doesn't wait for the mutex to be unlocked if it is locked.
- bool TryLock();
+ State TryLock() __attribute__((warn_unused_result));
private:
-#ifdef __VXWORKS__
- typedef SEM_ID ImplementationType;
-#else
- typedef mutex ImplementationType;
-#endif
- ImplementationType impl_;
+ aos_mutex impl_;
friend class Condition; // for access to impl_
-
-#ifdef __VXWORKS__
- DISALLOW_COPY_AND_ASSIGN(Mutex);
-#endif
};
// A class that locks a Mutex when constructed and unlocks it when destructed.
// Designed to be used as a local variable so that
// the mutex will be unlocked when the scope is exited.
+// This one immediately Dies if the previous owner died. This makes it a good
+// choice for mutexes that are only used within a single process, but NOT for
+// mutexes shared by multiple processes. For those, use IPCMutexLocker.
class MutexLocker {
public:
explicit MutexLocker(Mutex *mutex) : mutex_(mutex) {
- mutex_->Lock();
+ if (__builtin_expect(mutex_->Lock(), false)) {
+ ::aos::Die("previous owner of mutex %p died but it shouldn't be able to",
+ this);
+ }
}
~MutexLocker() {
mutex_->Unlock();
}
private:
- Mutex *mutex_;
+ Mutex *const mutex_;
+
DISALLOW_COPY_AND_ASSIGN(MutexLocker);
};
-// The inverse of MutexLocker.
-class MutexUnlocker {
+
+// A version of MutexLocker which reports the previous owner dying instead of
+// immediately LOG(FATAL)ing.
+class IPCMutexLocker {
public:
- explicit MutexUnlocker(Mutex *mutex) : mutex_(mutex) {
+ explicit IPCMutexLocker(Mutex *mutex)
+ : mutex_(mutex), owner_died_(mutex_->Lock()) {}
+ ~IPCMutexLocker() {
+ if (__builtin_expect(!owner_died_checked_, false)) {
+ ::aos::Die("nobody checked if the previous owner of mutex %p died", this);
+ }
mutex_->Unlock();
}
- ~MutexUnlocker() {
- mutex_->Lock();
+
+ // Whether or not the previous owner died. If this is not called at least
+ // once, the destructor will ::aos::Die.
+ __attribute__((warn_unused_result)) bool owner_died() {
+ owner_died_checked_ = true;
+ return __builtin_expect(owner_died_, false);
}
private:
- Mutex *mutex_;
- DISALLOW_COPY_AND_ASSIGN(MutexUnlocker);
+ Mutex *const mutex_;
+ const bool owner_died_;
+ bool owner_died_checked_ = false;
+
+ DISALLOW_COPY_AND_ASSIGN(IPCMutexLocker);
};
} // namespace aos
diff --git a/aos/common/mutex_test.cc b/aos/common/mutex_test.cc
index 6ec62d8..bd2365a 100644
--- a/aos/common/mutex_test.cc
+++ b/aos/common/mutex_test.cc
@@ -11,6 +11,8 @@
#include "aos/common/util/death_test_log_implementation.h"
#include "aos/common/util/thread.h"
#include "aos/common/time.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/linux_code/ipc_lib/core_lib.h"
namespace aos {
namespace testing {
@@ -21,33 +23,44 @@
protected:
void SetUp() override {
+ ::aos::common::testing::EnableTestLogging();
SetDieTestMode(true);
}
};
typedef MutexTest MutexDeathTest;
+typedef MutexTest MutexLockerTest;
+typedef MutexTest MutexLockerDeathTest;
+typedef MutexTest IPCMutexLockerTest;
+typedef MutexTest IPCMutexLockerDeathTest;
TEST_F(MutexTest, TryLock) {
- EXPECT_TRUE(test_mutex.TryLock());
- EXPECT_FALSE(test_mutex.TryLock());
+ EXPECT_EQ(Mutex::State::kLocked, test_mutex.TryLock());
+ EXPECT_EQ(Mutex::State::kUnlocked, test_mutex.TryLock());
+
+ test_mutex.Unlock();
}
TEST_F(MutexTest, Lock) {
- test_mutex.Lock();
- EXPECT_FALSE(test_mutex.TryLock());
+ ASSERT_FALSE(test_mutex.Lock());
+ EXPECT_EQ(Mutex::State::kUnlocked, test_mutex.TryLock());
+
+ test_mutex.Unlock();
}
TEST_F(MutexTest, Unlock) {
- test_mutex.Lock();
- EXPECT_FALSE(test_mutex.TryLock());
+ ASSERT_FALSE(test_mutex.Lock());
+ EXPECT_EQ(Mutex::State::kUnlocked, test_mutex.TryLock());
test_mutex.Unlock();
- EXPECT_TRUE(test_mutex.TryLock());
+ EXPECT_EQ(Mutex::State::kLocked, test_mutex.TryLock());
+
+ test_mutex.Unlock();
}
// Sees what happens with multiple unlocks.
TEST_F(MutexDeathTest, RepeatUnlock) {
logging::Init();
- test_mutex.Lock();
+ ASSERT_FALSE(test_mutex.Lock());
test_mutex.Unlock();
EXPECT_DEATH(
{
@@ -68,24 +81,25 @@
".*multiple unlock.*");
}
-TEST_F(MutexTest, MutexLocker) {
- {
- aos::MutexLocker locker(&test_mutex);
- EXPECT_FALSE(test_mutex.TryLock());
- }
- EXPECT_TRUE(test_mutex.TryLock());
+// Sees what happens with multiple locks.
+TEST_F(MutexDeathTest, RepeatLock) {
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ ASSERT_FALSE(test_mutex.Lock());
+ ASSERT_FALSE(test_mutex.Lock());
+ },
+ ".*multiple lock.*");
}
-TEST_F(MutexTest, MutexUnlocker) {
- test_mutex.Lock();
- {
- aos::MutexUnlocker unlocker(&test_mutex);
- // If this fails, then something weird is going on and the next line might
- // hang, so fail immediately.
- ASSERT_TRUE(test_mutex.TryLock());
- test_mutex.Unlock();
- }
- EXPECT_FALSE(test_mutex.TryLock());
+TEST_F(MutexDeathTest, DestroyLocked) {
+ EXPECT_DEATH(
+ {
+ logging::AddImplementation(new util::DeathTestLogImplementation());
+ Mutex new_mutex;
+ ASSERT_FALSE(new_mutex.Lock());
+ },
+ ".*destroying locked mutex.*");
}
namespace {
@@ -118,7 +132,7 @@
TEST_F(MutexTest, ThreadSanitizerContended) {
int counter = 0;
AdderThread threads[2]{
- {&counter, &test_mutex, ::aos::time::Time::InSeconds(1),
+ {&counter, &test_mutex, ::aos::time::Time::InSeconds(0.2),
::aos::time::Time::InSeconds(0)},
{&counter, &test_mutex, ::aos::time::Time::InSeconds(0),
::aos::time::Time::InSeconds(0)}, };
@@ -136,7 +150,7 @@
TEST_F(MutexTest, ThreadSanitizerUncontended) {
int counter = 0;
AdderThread threads[2]{
- {&counter, &test_mutex, ::aos::time::Time::InSeconds(1),
+ {&counter, &test_mutex, ::aos::time::Time::InSeconds(0.2),
::aos::time::Time::InSeconds(0)},
{&counter, &test_mutex, ::aos::time::Time::InSeconds(0),
::aos::time::Time::InSeconds(0)}, };
@@ -149,5 +163,31 @@
EXPECT_EQ(2, counter);
}
+TEST_F(MutexLockerTest, Basic) {
+ {
+ aos::MutexLocker locker(&test_mutex);
+ EXPECT_EQ(Mutex::State::kUnlocked, test_mutex.TryLock());
+ }
+ EXPECT_EQ(Mutex::State::kLocked, test_mutex.TryLock());
+
+ test_mutex.Unlock();
+}
+
+TEST_F(IPCMutexLockerTest, Basic) {
+ {
+ aos::IPCMutexLocker locker(&test_mutex);
+ EXPECT_EQ(Mutex::State::kUnlocked, test_mutex.TryLock());
+ EXPECT_FALSE(locker.owner_died());
+ }
+ EXPECT_EQ(Mutex::State::kLocked, test_mutex.TryLock());
+
+ test_mutex.Unlock();
+}
+
+TEST_F(IPCMutexLockerDeathTest, NoCheckOwnerDied) {
+ EXPECT_DEATH({ aos::IPCMutexLocker locker(&test_mutex); },
+ "nobody checked if the previous owner of mutex [^ ]+ died.*");
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/common/util/util.gyp b/aos/common/util/util.gyp
index 5008dd8..e9a37c9 100644
--- a/aos/common/util/util.gyp
+++ b/aos/common/util/util.gyp
@@ -93,12 +93,6 @@
'sources': [
'thread.cc',
],
- 'dependencies': [
- '<(AOS)/common/common.gyp:mutex',
- ],
- 'export_dependent_settings': [
- '<(AOS)/common/common.gyp:mutex',
- ],
},
{
'target_name': 'trapezoid_profile',