merging in the rewritten queue code and cleaned up linux IPC stuff
This change has been around for over a year now, and it's time to merge
it in, because it makes that code much easier to deal with.
diff --git a/aos/linux_code/camera/Buffers.cpp b/aos/linux_code/camera/Buffers.cpp
index e1d22b6..19c1d45 100644
--- a/aos/linux_code/camera/Buffers.cpp
+++ b/aos/linux_code/camera/Buffers.cpp
@@ -61,24 +61,21 @@
}
void Buffers::Release() {
- if (message_ != NULL) {
- queue_->FreeMessage(message_);
- message_ = NULL;
- }
+ message_.reset();
}
-const void *Buffers::GetNext(bool block,
- uint32_t *bytesused, timeval *timestamp, uint32_t *sequence) {
+const void *Buffers::GetNext(bool block, uint32_t *bytesused,
+ timeval *timestamp, uint32_t *sequence) {
Release();
// TODO(brians) make sure the camera reader process hasn't died
do {
if (block) {
- message_ = static_cast<const Message *>(queue_->ReadMessage(
- RawQueue::kPeek | RawQueue::kBlock));
+ message_.reset(static_cast<const Message *>(
+ queue_->ReadMessage(RawQueue::kPeek | RawQueue::kBlock)));
} else {
static int index = 0;
- message_ = static_cast<const Message *>(queue_->ReadMessageIndex(
- RawQueue::kBlock, &index));
+ message_.reset(static_cast<const Message *>(
+ queue_->ReadMessageIndex(RawQueue::kBlock, &index)));
}
} while (block && message_ == NULL);
if (message_ != NULL) {
@@ -132,9 +129,12 @@
return myfds[0];
}
-Buffers::Buffers() : server_(CreateSocket(connect)), fd_(FetchFD()), message_(NULL) {
+Buffers::Buffers()
+ : server_(CreateSocket(connect)),
+ fd_(FetchFD()),
+ queue_(RawQueue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1)),
+ message_(queue_) {
MMap();
- queue_ = RawQueue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1);
}
Buffers::~Buffers() {
diff --git a/aos/linux_code/camera/Buffers.h b/aos/linux_code/camera/Buffers.h
index b447468..aedd79f 100644
--- a/aos/linux_code/camera/Buffers.h
+++ b/aos/linux_code/camera/Buffers.h
@@ -8,6 +8,7 @@
#include "aos/linux_code/ipc_lib/queue.h"
#include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/unique_message_ptr.h"
namespace aos {
namespace camera {
@@ -17,6 +18,7 @@
// It has to do a lot of the same things as all the other ones, but it gets
// the information from different places (some of it gets sent out by it).
friend class Reader;
+
// Not an abstract name so that an existing one can just be unlinked without
// disturbing it if necessary (like with shm_link).
static const std::string kFDServerName;
@@ -50,14 +52,17 @@
uint32_t sequence;
};
static_assert(shm_ok<Message>::value, "it's going through queues");
- // The current one. Sometimes NULL.
- const Message *message_;
- static const std::string kQueueName;
+
// NULL for the Reader one.
- RawQueue *queue_;
+ RawQueue *const queue_;
+ // The current one. Sometimes NULL.
+ unique_message_ptr<const Message> message_;
+
+ static const std::string kQueueName;
// Make the actual mmap calls.
// Called by Buffers() automatically.
void MMap();
+
public:
Buffers();
// Will clean everything up.
@@ -89,4 +94,3 @@
} // namespace aos
#endif
-
diff --git a/aos/linux_code/camera/camera.gyp b/aos/linux_code/camera/camera.gyp
index e4f1e04..fabd52f 100644
--- a/aos/linux_code/camera/camera.gyp
+++ b/aos/linux_code/camera/camera.gyp
@@ -47,9 +47,11 @@
'dependencies': [
'<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:queue',
'<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:scoped_message_ptr',
],
'export_dependent_settings': [
'<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:queue',
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:scoped_message_ptr',
],
},
{
diff --git a/aos/linux_code/ipc_lib/aos_sync.c b/aos/linux_code/ipc_lib/aos_sync.c
index 52ebed1..a607001 100644
--- a/aos/linux_code/ipc_lib/aos_sync.c
+++ b/aos/linux_code/ipc_lib/aos_sync.c
@@ -18,13 +18,19 @@
return result;
}
-// this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
-// should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
+// this code is based on something that appears to be based on
+// <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful
+// information
+// should probably use
+// <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it
+// becomes available
// (sys_set_robust_list appears to be the function name)
// <http://locklessinc.com/articles/futex_cheat_sheet/> and
// <http://locklessinc.com/articles/mutex_cv_futex/> are useful
-// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009 (fairly recent compared to everything else...)
-// can't use PRIVATE futex operations because they use the pid (or something) as part of the hash
+// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009
+// (fairly recent compared to everything else...)
+// can't use PRIVATE futex operations because they use the pid (or something) as
+// part of the hash
//
// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
@@ -60,7 +66,6 @@
if (c == 1) c = xchg(m, 2);
while (c) {
/* Wait in the kernel */
- //printf("sync here %d\n", __LINE__);
if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
if (signals_fail && errno == EINTR) {
return 1;
@@ -69,7 +74,6 @@
return 2;
}
}
- //printf("sync here %d\n", __LINE__);
c = xchg(m, 2);
}
return 0;
@@ -148,6 +152,8 @@
mutex_unlock(m);
while (1) {
+ // Wait in the kernel iff the value of it doesn't change (ie somebody else
+ // does a wake) from before we unlocked the mutex.
if (sys_futex(c, FUTEX_WAIT, wait_start, NULL, NULL, 0) == -1) {
// If it failed for some reason other than somebody else doing a wake
// before we actually made it to sleep.
@@ -161,8 +167,12 @@
abort();
}
}
+ // Relock the mutex now that we're done waiting.
// Simplified mutex_lock that always leaves it
// contended in case anybody else got requeued.
+ // If we got requeued above, this will just succeed the first time because
+ // the person waking us from the above wait (changed to be on the mutex
+ // instead of the condition) will have just set it to 0.
while (xchg(m, 2) != 0) {
if (sys_futex(m, FUTEX_WAIT, 2, NULL, NULL, 0) == -1) {
// Try again if it was because of a signal or somebody else unlocked it
@@ -180,7 +190,11 @@
}
void condition_signal(mutex *c) {
+ // This will cause anybody else who is in between unlocking the mutex and
+ // going to sleep in the kernel to not go to sleep and return immediately
+ // instead.
__sync_fetch_and_add(c, 1);
+ // Wake at most 1 person who is waiting in the kernel.
if (sys_futex(c, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
fprintf(stderr, "sync: FUTEX_WAKE(%p, 1, NULL, NULL, 0)"
" failed with %d: %s\n",
@@ -192,7 +206,9 @@
void condition_broadcast(mutex *c, mutex *m) {
__sync_fetch_and_add(c, 1);
- // Wake 1 waiter and requeue the rest.
+ // Wake at most 1 waiter and requeue the rest.
+ // Everybody else is going to have to wait for the 1st person to take the
+ // mutex anyways.
if (sys_futex_requeue(c, FUTEX_REQUEUE, 1, INT_MAX, m) == -1) {
fprintf(stderr, "sync: FUTEX_REQUEUE(%p, 1, INT_MAX, %p, 0)"
" failed with %d: %s\n",
diff --git a/aos/linux_code/ipc_lib/aos_sync.h b/aos/linux_code/ipc_lib/aos_sync.h
index 7a81ca3..0d55246 100644
--- a/aos/linux_code/ipc_lib/aos_sync.h
+++ b/aos/linux_code/ipc_lib/aos_sync.h
@@ -48,6 +48,8 @@
// They are designed for signalling when something happens (possibly to
// multiple listeners). A mutex manipulated with them can only be set or unset.
//
+// Another name for this kind of synchronization mechanism is a "notification".
+//
// They are different from the condition_ functions in that they do NOT work
// correctly as standard condition variables. While it is possible to keep
// track of the "condition" using the value part of the futex_* functions, the
diff --git a/aos/linux_code/ipc_lib/condition.cc b/aos/linux_code/ipc_lib/condition.cc
index b764026..0ba4145 100644
--- a/aos/linux_code/ipc_lib/condition.cc
+++ b/aos/linux_code/ipc_lib/condition.cc
@@ -6,8 +6,8 @@
namespace aos {
-static_assert(shm_ok<Condition>::value, "Condition should work"
- " in shared memory");
+static_assert(shm_ok<Condition>::value,
+ "Condition should work in shared memory");
Condition::Condition(Mutex *m) : impl_(), m_(m) {}
diff --git a/aos/linux_code/ipc_lib/core_lib.c b/aos/linux_code/ipc_lib/core_lib.c
index cc1ccbb..2bd6c25 100644
--- a/aos/linux_code/ipc_lib/core_lib.c
+++ b/aos/linux_code/ipc_lib/core_lib.c
@@ -5,7 +5,7 @@
#include "aos/linux_code/ipc_lib/shared_mem.h"
-static inline uint8_t aos_8max(uint8_t l, uint8_t r) {
+static uint8_t aos_8max(uint8_t l, uint8_t r) {
return (l > r) ? l : r;
}
void *shm_malloc_aligned(size_t length, uint8_t alignment) {
diff --git a/aos/linux_code/ipc_lib/ipc_lib.gyp b/aos/linux_code/ipc_lib/ipc_lib.gyp
index df7b658..08e8df8 100644
--- a/aos/linux_code/ipc_lib/ipc_lib.gyp
+++ b/aos/linux_code/ipc_lib/ipc_lib.gyp
@@ -60,6 +60,7 @@
'core_lib',
'<(AOS)/common/common.gyp:queue_testutils',
'<(AOS)/common/common.gyp:time',
+ '<(AOS)/common/common.gyp:die',
],
},
{
@@ -76,6 +77,22 @@
'core_lib',
'<(AOS)/common/common.gyp:die',
],
+ 'variables': {
+ 'is_special_test': 1,
+ },
+ },
+ {
+ 'target_name': 'scoped_message_ptr',
+ 'type': 'static_library',
+ 'sources': [
+ #'scoped_message_ptr.h',
+ ],
+ 'dependencies': [
+ 'queue',
+ ],
+ 'export_dependent_settings': [
+ 'queue',
+ ],
},
],
}
diff --git a/aos/linux_code/ipc_lib/ipc_stress_test.cc b/aos/linux_code/ipc_lib/ipc_stress_test.cc
index 1b55e82..4a9d93f 100644
--- a/aos/linux_code/ipc_lib/ipc_stress_test.cc
+++ b/aos/linux_code/ipc_lib/ipc_stress_test.cc
@@ -7,7 +7,6 @@
#include <libgen.h>
#include <assert.h>
-#include <vector>
#include <string>
#include "aos/common/time.h"
@@ -22,6 +21,10 @@
// stderr output from each test run and only prints it out (not interleaved with
// the output from any other run) if the test fails.
//
+// They have to be run in separate processes because (in addition to various
+// parts of our code not being thread-safe...) gtest does not like multiple
+// threads.
+//
// It's written in C++ for performance. We need actual OS-level parallelism for
// this to work, which means that Ruby's out because it doesn't have good
// support for doing that. My Python implementation ended up pretty heavily disk
@@ -33,14 +36,14 @@
// arguments to pass to it.
// Using --gtest_filter is a bad idea because it seems to result in a lot of
// swapping which causes everything to be disk-bound (at least for me).
-static const ::std::vector< ::std::vector< ::std::string>> kTests = {
+static const char * kTests[][] = {
{"queue_test"},
{"condition_test"},
{"mutex_test"},
{"raw_queue_test"},
};
// These arguments get inserted before any per-test arguments.
-static const ::std::vector< ::std::string> kDefaultArgs = {
+static const char *kDefaultArgs[] = {
"--gtest_repeat=30",
"--gtest_shuffle",
};
@@ -75,7 +78,7 @@
// Gets called after each child forks to run a test.
void __attribute__((noreturn)) DoRunTest(
- Shared *shared, const ::std::vector< ::std::string> &test, int pipes[2]) {
+ Shared *shared, const ::std::array<const char *> &test, int pipes[2]) {
if (close(pipes[0]) == -1) {
Die("close(%d) of read end of pipe failed with %d: %s\n",
pipes[0], errno, strerror(errno));
@@ -95,6 +98,7 @@
size_t size = test.size();
size_t default_size = kDefaultArgs.size();
+ // There's no chance to free this because we either exec or Die.
const char **args = new const char *[size + default_size + 1];
// The actual executable to run.
::std::string executable;
@@ -120,7 +124,9 @@
void DoRun(Shared *shared) {
int iterations = 0;
// An iterator pointing to a random one of the tests.
- auto test = kTests.begin() + (getpid() % kTests.size());
+ // We randomize based on PID because otherwise they all end up running the
+ // same test at the same time for the whole test.
+ const char *(*test)[] = &kTests[getpid() % kTestsLength];
int pipes[2];
while (time::Time::Now() < shared->stop_time) {
if (pipe(pipes) == -1) {
@@ -212,7 +218,10 @@
new (shared) Shared(time::Time::Now() + kTestTime);
char *temp = strdup(argv[0]);
- shared->path = strdup(dirname(temp));
+ if (asprintf(const_cast<char **>(&shared->path),
+ "%s/../tests", dirname(temp)) == -1) {
+ Die("asprintf failed with %d: %s\n", errno, strerror(errno));
+ }
free(temp);
for (int i = 0; i < kTesters; ++i) {
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index e67f22c..8a2f15b 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -38,6 +38,7 @@
struct RawQueue::MessageHeader {
int ref_count;
int index; // in pool_
+ // Gets the message header immediately preceding msg.
static MessageHeader *Get(const void *msg) {
return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
@@ -50,8 +51,8 @@
memcpy(this, &temp, sizeof(*this));
}
};
-static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
- " is to stick it in shared memory");
+static_assert(shm_ok<RawQueue::MessageHeader>::value,
+ "the whole point is to stick it in shared memory");
struct RawQueue::ReadData {
bool writable_start;
@@ -73,7 +74,7 @@
}
RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
- : readable_(&data_lock_), writable_(&data_lock_) {
+ : readable_(&data_lock_), writable_(&data_lock_) {
const size_t name_size = strlen(name) + 1;
char *temp = static_cast<char *>(shm_malloc(name_size));
memcpy(temp, name, name_size);
diff --git a/aos/linux_code/ipc_lib/queue_test.cc b/aos/linux_code/ipc_lib/queue_test.cc
index c87bd7b..0a982cc 100644
--- a/aos/linux_code/ipc_lib/queue_test.cc
+++ b/aos/linux_code/ipc_lib/queue_test.cc
@@ -15,6 +15,7 @@
#include "aos/common/queue_testutils.h"
#include "aos/common/time.h"
#include "aos/common/logging/logging.h"
+#include "aos/common/die.h"
using ::testing::AssertionResult;
using ::testing::AssertionSuccess;
@@ -136,6 +137,9 @@
void SetUp() override {
::testing::Test::SetUp();
+
+ SetDieTestMode(true);
+
fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
static bool registered = false;
if (!registered) {