glibusb wants a nice condition variable class
diff --git a/aos/atom_code/atom_code.gyp b/aos/atom_code/atom_code.gyp
index 37caccd..51dcaec 100644
--- a/aos/atom_code/atom_code.gyp
+++ b/aos/atom_code/atom_code.gyp
@@ -7,7 +7,7 @@
'<(AOS)/atom_code/init.cc',
],
'dependencies': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:shared_mem',
'<(AOS)/common/common.gyp:die',
'<(AOS)/build/aos.gyp:logging',
],
diff --git a/aos/atom_code/camera/Buffers.cpp b/aos/atom_code/camera/Buffers.cpp
index f580f89..9cd1a7d 100644
--- a/aos/atom_code/camera/Buffers.cpp
+++ b/aos/atom_code/camera/Buffers.cpp
@@ -15,7 +15,6 @@
};
const std::string Buffers::kFDServerName("/tmp/aos_fd_server");
const std::string Buffers::kQueueName("CameraBufferQueue");
-const aos_type_sig Buffers::kSignature{sizeof(Message), 971, 1};
int Buffers::CreateSocket(int (*bind_connect)(int, const sockaddr *, socklen_t)) {
union af_unix_sockaddr {
@@ -66,7 +65,7 @@
void Buffers::Release() {
if (message_ != NULL) {
- aos_queue_free_msg(queue_, message_);
+ queue_->FreeMessage(message_);
message_ = NULL;
}
}
@@ -77,11 +76,12 @@
// TODO(brians) make sure the camera reader process hasn't died
do {
if (block) {
- message_ = static_cast<const Message *>(aos_queue_read_msg(queue_, PEEK | BLOCK));
+ message_ = static_cast<const Message *>(queue_->ReadMessage(
+ RawQueue::kPeek | RawQueue::kBlock));
} else {
static int index = 0;
- message_ = static_cast<const Message *>(aos_queue_read_msg_index(queue_, BLOCK,
- &index));
+ message_ = static_cast<const Message *>(queue_->ReadMessageIndex(
+ RawQueue::kBlock, &index));
}
} while (block && message_ == NULL);
if (message_ != NULL) {
@@ -137,7 +137,7 @@
}
Buffers::Buffers() : server_(CreateSocket(connect)), fd_(FetchFD()), message_(NULL) {
MMap();
- queue_ = aos_fetch_queue(kQueueName.c_str(), &kSignature);
+ queue_ = RawQueue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1);
}
Buffers::~Buffers() {
@@ -157,6 +157,5 @@
}
}
-} // namespace camera
-} // namespace aos
-
+} // namespace camera
+} // namespace aos
diff --git a/aos/atom_code/camera/Buffers.h b/aos/atom_code/camera/Buffers.h
index 6b54188..080e9eb 100644
--- a/aos/atom_code/camera/Buffers.h
+++ b/aos/atom_code/camera/Buffers.h
@@ -53,9 +53,8 @@
// The current one. Sometimes NULL.
const Message *message_;
static const std::string kQueueName;
- static const aos_type_sig kSignature;
// NULL for the Reader one.
- aos_queue *queue_;
+ RawQueue *queue_;
// Make the actual mmap calls.
// Called by Buffers() automatically.
void MMap();
diff --git a/aos/atom_code/camera/Reader.cpp b/aos/atom_code/camera/Reader.cpp
index 5f30cfe..95f6128 100644
--- a/aos/atom_code/camera/Reader.cpp
+++ b/aos/atom_code/camera/Reader.cpp
@@ -17,6 +17,7 @@
#include "aos/atom_code/camera/V4L2.h"
#include "aos/atom_code/camera/Buffers.h"
#include "aos/common/logging/logging.h"
+#include "aos/atom_code/ipc_lib/queue.h"
#define CLEAR(x) memset(&(x), 0, sizeof(x))
@@ -31,8 +32,7 @@
// the bound socket listening for fd requests
int server_fd_;
- static const aos_type_sig kRecycleSignature;
- aos_queue *queue_, *recycle_queue_;
+ RawQueue *queue_, *recycle_queue_;
// the number of buffers currently queued in v4l2
uint32_t queued_;
public:
@@ -52,10 +52,11 @@
dev_name, errno, strerror(errno));
}
- queue_ = aos_fetch_queue_recycle(Buffers::kQueueName.c_str(), &Buffers::kSignature,
- &kRecycleSignature, &recycle_queue_);
+ queue_ = RawQueue::Fetch(Buffers::kQueueName.c_str(),
+ sizeof(Buffers::Message), 971, 1,
+ 1, Buffers::kNumBuffers, &recycle_queue_);
// read off any existing recycled messages
- while (aos_queue_read_msg(recycle_queue_, NON_BLOCK) != NULL);
+ while (recycle_queue_->ReadMessage(RawQueue::kNonBlock) != NULL);
queued_ = 0;
InitServer();
@@ -140,10 +141,11 @@
read = static_cast<const Buffers::Message *>(
// we block waiting for one if we can't dequeue one without leaving
// the driver <= 2 (to be safe)
- aos_queue_read_msg(recycle_queue_, (queued_ <= 2) ? BLOCK : NON_BLOCK));
+ recycle_queue_->ReadMessage((queued_ <= 2) ?
+ RawQueue::kBlock : RawQueue::kNonBlock));
if (read != NULL) {
buf.index = read->index;
- aos_queue_free_msg(recycle_queue_, read);
+ recycle_queue_->FreeMessage(read);
QueueBuffer(&buf);
}
} while (read != NULL);
@@ -163,7 +165,7 @@
}
Buffers::Message *const msg = static_cast<Buffers::Message *>(
- aos_queue_get_msg(queue_));
+ queue_->GetMessage());
if (msg == NULL) {
LOG(WARNING,
"couldn't get a message to send buf #%" PRIu32 " from queue %p."
@@ -175,7 +177,7 @@
msg->bytesused = buf.bytesused;
memcpy(&msg->timestamp, &buf.timestamp, sizeof(msg->timestamp));
msg->sequence = buf.sequence;
- if (aos_queue_write_msg_free(queue_, msg, OVERRIDE) == -1) {
+ if (!queue_->WriteMessage(msg, RawQueue::kOverride)) {
LOG(WARNING,
"sending message %p with buf #%" PRIu32 " to queue %p failed."
" re-queueing now\n", msg, buf.index, queue_);
@@ -405,8 +407,6 @@
}
};
const char *const Reader::dev_name = "/dev/video0";
-const aos_type_sig Reader::kRecycleSignature{
- sizeof(Buffers::Message), 1, Buffers::kNumBuffers};
} // namespace camera
} // namespace aos
diff --git a/aos/atom_code/camera/camera.gyp b/aos/atom_code/camera/camera.gyp
index 9931806..f1743be 100644
--- a/aos/atom_code/camera/camera.gyp
+++ b/aos/atom_code/camera/camera.gyp
@@ -45,11 +45,11 @@
'Buffers.cpp',
],
'dependencies': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
'<(AOS)/build/aos.gyp:logging',
],
'export_dependent_settings': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
},
{
@@ -74,6 +74,7 @@
'buffers',
'<(AOS)/atom_code/atom_code.gyp:init',
'<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
},
],
diff --git a/aos/atom_code/core/BinaryLogReader.cpp b/aos/atom_code/core/BinaryLogReader.cpp
index 9451fd1..3e96cc5 100644
--- a/aos/atom_code/core/BinaryLogReader.cpp
+++ b/aos/atom_code/core/BinaryLogReader.cpp
@@ -97,7 +97,7 @@
output->sequence = msg->sequence;
memcpy(output_strings, msg->name, name_size);
memcpy(output_strings + name_size, msg->message, message_size);
- condition_set(&output->marker);
+ futex_set(&output->marker);
logging::atom::Free(msg);
}
diff --git a/aos/atom_code/core/LogFileCommon.h b/aos/atom_code/core/LogFileCommon.h
index afb86b0..3798b06 100644
--- a/aos/atom_code/core/LogFileCommon.h
+++ b/aos/atom_code/core/LogFileCommon.h
@@ -28,13 +28,13 @@
// A lot of the fields don't have comments because they're the same as the
// identically named fields in LogMessage.
struct __attribute__((aligned)) LogFileMessageHeader {
- // gets condition_set once this one has been written
+ // gets futex_set once this one has been written
// for readers keeping up with a live writer
//
// gets initialized to 0 by ftruncate
//
// there will be something here after the last log on a "page" set to 2
- // (by the condition_set) to indicate that the next log is on the next page
+ // (by the futex_set) to indicate that the next log is on the next page
mutex marker;
static_assert(sizeof(marker) == 4, "mutex changed size!");
log_level level;
@@ -132,8 +132,8 @@
sizeof(mutex) > kPageSize) {
char *const temp = current_;
MapNextPage();
- if (condition_set_value(reinterpret_cast<mutex *>(&temp[position_]), 2) == -1) {
- fprintf(stderr, "LogFileCommon: condition_set_value(%p, 2) failed with %d: %s."
+ if (futex_set_value(reinterpret_cast<mutex *>(&temp[position_]), 2) == -1) {
+ fprintf(stderr, "LogFileCommon: futex_set_value(%p, 2) failed with %d: %s."
" readers will hang\n", &temp[position_], errno, strerror(errno));
}
Unmap(temp);
@@ -152,7 +152,7 @@
do {
r = reinterpret_cast<LogFileMessageHeader *>(¤t_[position_]);
if (wait) {
- if (condition_wait(&r->marker) != 0) continue;
+ if (futex_wait(&r->marker) != 0) continue;
}
if (r->marker == 2) {
Unmap(current_);
diff --git a/aos/atom_code/core/LogStreamer.cpp b/aos/atom_code/core/LogStreamer.cpp
index d3d4928..20c5987 100644
--- a/aos/atom_code/core/LogStreamer.cpp
+++ b/aos/atom_code/core/LogStreamer.cpp
@@ -31,7 +31,7 @@
int index = 0;
while (true) {
- const LogMessage *const msg = ReadNext(BLOCK, &index);
+ const LogMessage *const msg = ReadNext(RawQueue::kBlock, &index);
if (msg == NULL) continue;
internal::PrintMessage(stdout, *msg);
diff --git a/aos/atom_code/core/core.gyp b/aos/atom_code/core/core.gyp
index 74b3232..1aaebfb 100644
--- a/aos/atom_code/core/core.gyp
+++ b/aos/atom_code/core/core.gyp
@@ -32,6 +32,7 @@
'<(AOS)/build/aos.gyp:logging',
'<(AOS)/atom_code/atom_code.gyp:init',
'<(AOS)/common/common.gyp:time',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
},
{
diff --git a/aos/atom_code/ipc_lib/aos_sync.c b/aos/atom_code/ipc_lib/aos_sync.c
index a9a4779..1980a4d 100644
--- a/aos/atom_code/ipc_lib/aos_sync.c
+++ b/aos/atom_code/ipc_lib/aos_sync.c
@@ -1,135 +1,197 @@
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
#include <stdio.h>
#include <linux/futex.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <errno.h>
-#include "aos_sync.h"
-#include "cmpxchg.h"
#include <stdint.h>
#include <limits.h>
#include <string.h>
+#include <inttypes.h>
+
+#include "cmpxchg.h"
// this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
// should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
-// <http://locklessinc.com/articles/futex_cheat_sheet/> and <http://locklessinc.com/articles/mutex_cv_futex/> are useful
+// (sys_set_robust_list appears to be the function name)
+// <http://locklessinc.com/articles/futex_cheat_sheet/> and
+// <http://locklessinc.com/articles/mutex_cv_futex/> are useful
// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009 (fairly recent compared to everything else...)
// can't use PRIVATE futex operations because they use the pid (or something) as part of the hash
//
+// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
+// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
+//
// Values for a mutex:
// 0 = unlocked
// 1 = locked, not contended
// 2 = locked, probably contended
-// Values for a condition:
+// Values for a "futex":
// 0 = unset
// 1 = set
-static inline int sys_futex(mutex *addr1, int op, int val1, const struct timespec *timeout,
- void *addr2, int val3) {
- return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+static inline int sys_futex(mutex *addr1, int op, int val1,
+ const struct timespec *timeout, void *addr2, int val3) {
+ return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+static inline int sys_futex_requeue(mutex *addr1, int op, int num_wake,
+ int num_requeue, mutex *m) {
+ return syscall(SYS_futex, addr1, op, num_wake, num_requeue, m);
+}
+static inline int sys_futex_op(mutex *addr1, int op, int num_waiters1,
+ int num_waiters2, mutex *addr2, int op_args_etc) {
+ return syscall(SYS_futex, addr1, op, num_waiters1,
+ num_waiters2, addr2, op_args_etc);
}
static inline int mutex_get(mutex *m, uint8_t signals_fail, const
- struct timespec *timeout) {
- int c;
- c = cmpxchg(m, 0, 1);
- if (!c) return 0;
- /* The lock is now contended */
- if (c == 1) c = xchg(m, 2);
- while (c) {
- /* Wait in the kernel */
- //printf("sync here %d\n", __LINE__);
- if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
- if (signals_fail && errno == EINTR) {
- return 1;
- }
- if (timeout != NULL && errno == ETIMEDOUT) {
- return 2;
- }
- }
- //printf("sync here %d\n", __LINE__);
- c = xchg(m, 2);
- }
- return 0;
+ struct timespec *timeout) {
+ int c;
+ c = cmpxchg(m, 0, 1);
+ if (!c) return 0;
+ /* The lock is now contended */
+ if (c == 1) c = xchg(m, 2);
+ while (c) {
+ /* Wait in the kernel */
+ //printf("sync here %d\n", __LINE__);
+ if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
+ if (signals_fail && errno == EINTR) {
+ return 1;
+ }
+ if (timeout != NULL && errno == ETIMEDOUT) {
+ return 2;
+ }
+ }
+ //printf("sync here %d\n", __LINE__);
+ c = xchg(m, 2);
+ }
+ return 0;
}
int mutex_lock(mutex *m) {
- return mutex_get(m, 1, NULL);
+ return mutex_get(m, 1, NULL);
}
int mutex_lock_timeout(mutex *m, const struct timespec *timeout) {
- return mutex_get(m, 1, timeout);
+ return mutex_get(m, 1, timeout);
}
int mutex_grab(mutex *m) {
- return mutex_get(m, 0, NULL);
+ return mutex_get(m, 0, NULL);
}
-int mutex_unlock(mutex *m) {
- /* Unlock, and if not contended then exit. */
- //printf("mutex_unlock(%p) => %d \n",m,*m);
- switch (xchg(m, 0)) {
- case 0:
- fprintf(stderr, "sync: multiple unlock of %p. aborting\n", m);
- printf("see stderr\n");
- abort();
- case 1:
- //printf("mutex_unlock return(%p) => %d \n",m,*m);
- return 0;
- case 2:
- if (sys_futex(m, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
- fprintf(stderr, "sync: waking 1 from %p failed with %d: %s\n",
- m, errno, strerror(errno));
- return -1;
- } else {
- return 0;
- }
- default:
- fprintf(stderr, "sync: got a garbage value from mutex %p. aborting\n",
- m);
- printf("see stderr\n");
- abort();
- }
+void mutex_unlock(mutex *m) {
+ /* Unlock, and if not contended then exit. */
+ //printf("mutex_unlock(%p) => %d \n",m,*m);
+ switch (xchg(m, 0)) {
+ case 0:
+ fprintf(stderr, "sync: multiple unlock of %p. aborting\n", m);
+ printf("see stderr\n");
+ abort();
+ case 1:
+ //printf("mutex_unlock return(%p) => %d \n",m,*m);
+ break;
+ case 2:
+ if (sys_futex(m, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+ fprintf(stderr, "sync: waking 1 from %p failed with %d: %s\n",
+ m, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ } else {
+ break;
+ }
+ default:
+ fprintf(stderr, "sync: got a garbage value from mutex %p. aborting\n",
+ m);
+ printf("see stderr\n");
+ abort();
+ }
}
int mutex_trylock(mutex *m) {
- /* Try to take the lock, if is currently unlocked */
- unsigned c = cmpxchg(m, 0, 1);
- if (!c) return 0;
- return 1;
+ /* Try to take the lock, if is currently unlocked */
+ unsigned c = cmpxchg(m, 0, 1);
+ if (!c) return 0;
+ return 1;
}
-int condition_wait(mutex *m) {
- if (*m) {
- return 0;
- }
- if (sys_futex(m, FUTEX_WAIT, 0, NULL, NULL, 0) == -1) {
- if (errno == EINTR) {
- return 1;
- } else if (errno != EWOULDBLOCK) {
- return -1;
- }
- }
- return 0;
+int futex_wait(mutex *m) {
+ if (*m) {
+ return 0;
+ }
+ if (sys_futex(m, FUTEX_WAIT, 0, NULL, NULL, 0) == -1) {
+ if (errno == EINTR) {
+ return 1;
+ } else if (errno != EWOULDBLOCK) {
+ return -1;
+ }
+ }
+ return 0;
}
-int condition_wait_force(mutex *m) {
- while (1) {
- if (sys_futex(m, FUTEX_WAIT, *m, NULL, NULL, 0) == -1) {
- if (errno != EWOULDBLOCK) { // if it was an actual problem
- if (errno == EINTR) {
- return 1;
- } else {
- return -1;
- }
- }
- } else {
- return 0;
- }
- }
+int futex_set_value(mutex *m, mutex value) {
+ xchg(m, value);
+ return sys_futex(m, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
-inline int condition_set_value(mutex *m, mutex value) {
- xchg(m, value);
- return sys_futex(m, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+int futex_set(mutex *m) {
+ return futex_set_value(m, 1);
}
-int condition_set(mutex *m) {
- return condition_set_value(m, 1);
-}
-int condition_unset(mutex *m) {
- return !xchg(m, 0);
+int futex_unset(mutex *m) {
+ return !xchg(m, 0);
}
+void condition_wait(mutex *c, mutex *m) {
+ const mutex wait_start = *c;
+
+ mutex_unlock(m);
+
+ while (1) {
+ if (sys_futex(c, FUTEX_WAIT, wait_start, NULL, NULL, 0) == -1) {
+ // If it failed for some reason other than somebody else doing a wake
+ // before we actually made it to sleep.
+ if (__builtin_expect(*c == wait_start, 0)) {
+ // Try again if it was because of a signal.
+ if (errno == EINTR) continue;
+ fprintf(stderr, "FUTEX_WAIT(%p, %"PRIu32", NULL, NULL, 0) failed"
+ " with %d: %s\n",
+ c, wait_start, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ }
+ // Simplified mutex_lock that always leaves it
+ // contended in case anybody else got requeued.
+ while (xchg(m, 2) != 0) {
+ if (sys_futex(m, FUTEX_WAIT, 2, NULL, NULL, 0) == -1) {
+ // Try again if it was because of a signal or somebody else unlocked it
+ // before we went to sleep.
+ if (errno == EINTR || errno == EWOULDBLOCK) continue;
+ fprintf(stderr, "sync: FUTEX_WAIT(%p, 2, NULL, NULL, 0)"
+ " failed with %d: %s\n",
+ m, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ }
+ return;
+ }
+}
+
+void condition_signal(mutex *c) {
+ __sync_fetch_and_add(c, 1);
+ if (sys_futex(c, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+ fprintf(stderr, "sync: FUTEX_WAKE(%p, 1, NULL, NULL, 0)"
+ " failed with %d: %s\n",
+ c, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+}
+
+void condition_broadcast(mutex *c, mutex *m) {
+ __sync_fetch_and_add(c, 1);
+ // Wake 1 waiter and requeue the rest.
+ if (sys_futex_requeue(c, FUTEX_REQUEUE, 1, INT_MAX, m) == -1) {
+ fprintf(stderr, "sync: FUTEX_REQUEUE(%p, 1, INT_MAX, %p, 0)"
+ " failed with %d: %s\n",
+ c, m, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+}
diff --git a/aos/atom_code/ipc_lib/aos_sync.h b/aos/atom_code/ipc_lib/aos_sync.h
index 3c66264..6d4bf55 100644
--- a/aos/atom_code/ipc_lib/aos_sync.h
+++ b/aos/atom_code/ipc_lib/aos_sync.h
@@ -1,5 +1,5 @@
-#ifndef AOS_IPC_LIB_SYNC_H_
-#define AOS_IPC_LIB_SYNC_H_
+#ifndef AOS_ATOM_CODE_IPC_LIB_SYNC_H_
+#define AOS_ATOM_CODE_IPC_LIB_SYNC_H_
#include <stdlib.h>
#include <signal.h>
@@ -14,50 +14,83 @@
// and <http://www.valgrind.org/docs/manual/drd-manual.html#drd-manual.clientreqs>
// list the interesting ones
-// Have to align structs containing it to to sizeof(int).
+// Have to align structs containing it to sizeof(int).
// Valid initial values for use with mutex_ functions are 0 (unlocked) and 1 (locked).
-// Valid initial values for use with condition_ functions are 0 (unset) and 1 (set).
+// Valid initial values for use with futex_ functions are 0 (unset) and 1 (set).
+// No initialization is necessary for use as c with the condition_ functions.
// The value should not be changed after multiple processes have started
// accessing an instance except through the functions declared in this file.
typedef volatile uint32_t mutex __attribute__((aligned(sizeof(int))));
// All return -1 for other error (which will be in errno from futex(2)).
+//
+// There is no priority inversion protection.
+// TODO(brians) look at using
+// <http://www.kernel.org/doc/Documentation/pi-futex.txt>
// Returns 1 if interrupted by a signal.
+//
+// One of the highest priority processes blocked on a given mutex will be the
+// one to lock it when it is unlocked.
int mutex_lock(mutex *m) __attribute__((warn_unused_result));
// Returns 2 if it timed out or 1 if interrupted by a signal.
int mutex_lock_timeout(mutex *m, const struct timespec *timeout)
__attribute__((warn_unused_result));
// Ignores signals. Can not fail.
int mutex_grab(mutex *m);
-// Returns 1 for multiple unlocking and -1 if something bad happened and
-// whoever's waiting didn't get woken up.
-int mutex_unlock(mutex *m);
+// abort(2)s for multiple unlocking.
+void mutex_unlock(mutex *m);
// Returns 0 when successful in locking the mutex and 1 if somebody else has it
// locked.
int mutex_trylock(mutex *m) __attribute__((warn_unused_result));
-// The condition_ functions are similar to the mutex_ ones but different.
+// The futex_ functions are similar to the mutex_ ones but different.
// They are designed for signalling when something happens (possibly to
// multiple listeners). A mutex manipulated with them can only be set or unset.
+//
+// They are different from the condition_ functions in that they do NOT work
+// correctly as standard condition variables. While it is possible to keep
+// track of the "condition" using the value part of the futex_* functions, the
+// obvious implementation has basically the same race condition that condition
+// variables are designed to prevent between somebody else grabbing the mutex
+// and changing whether it's set or not and the futex_ function changing the
+// futex's value.
-// Wait for the condition to be set. Will return immediately if it's already set.
-// Returns 0 if successful or it was already set, 1 if interrupted by a signal, or -1.
-int condition_wait(mutex *m) __attribute__((warn_unused_result));
-// Will wait for the next condition_set, even if the condition is already set.
-// Returns 0 if successful, 1 if interrupted by a signal, or -1.
-int condition_wait_force(mutex *m) __attribute__((warn_unused_result));
-// Set the condition and wake up anybody waiting on it.
+// Wait for the futex to be set. Will return immediately if it's already set.
+// Returns 0 if successful or it was already set, 1 if interrupted by a signal,
+// or -1.
+int futex_wait(mutex *m) __attribute__((warn_unused_result));
+// Set the futex and wake up anybody waiting on it.
// Returns the number that were woken or -1.
-int condition_set(mutex *m);
+//
+// This will always wake up all waiters at the same time and set the value to 1.
+int futex_set(mutex *m);
// Same as above except lets something other than 1 be used as the final value.
-int condition_set_value(mutex *m, mutex value);
-// Unsets the condition.
+int futex_set_value(mutex *m, mutex value);
+// Unsets the futex (sets the value to 0).
// Returns 0 if it was set before and 1 if it wasn't.
-int condition_unset(mutex *m);
+// Can not fail.
+int futex_unset(mutex *m);
+
+// The condition_ functions implement condition variable support. The API is
+// similar to the pthreads api and works the same way. The same m argument must
+// be passed in for all calls to all of the condition_ functions with a given c.
+
+// Wait for the condition variable to be signalled. m will be unlocked
+// atomically with actually starting to wait. m is guaranteed to be locked when
+// this function returns.
+// NOTE: The relocking of m is not atomic with stopping the actual wait and
+// other process(es) may lock (+unlock) the mutex first.
+void condition_wait(mutex *c, mutex *m);
+// If any other processes are condition_waiting on c, wake 1 of them. Does not
+// require m to be locked.
+void condition_signal(mutex *c);
+// Wakes all processes that are condition_waiting on c. Does not require m to be
+// locked.
+void condition_broadcast(mutex *c, mutex *m);
#ifdef __cplusplus
}
#endif // __cplusplus
-#endif
+#endif // AOS_ATOM_CODE_IPC_LIB_SYNC_H_
diff --git a/aos/atom_code/ipc_lib/binheap.c b/aos/atom_code/ipc_lib/binheap.c
deleted file mode 100644
index 8eb024d..0000000
--- a/aos/atom_code/ipc_lib/binheap.c
+++ /dev/null
@@ -1,125 +0,0 @@
-#include "binheap.h"
-#include <stdlib.h>
-#include <stdio.h>
-
-#ifndef TESTING_ASSERT
-#define TESTING_ASSERT(...)
-#endif
-#define Error(x) TESTING_ASSERT(0, x)
-
-#define MinData (0)
-
-void Initialize( int Elements, PriorityQueue H )
-{
- H->Capacity = Elements - 1;
- H->Size = 0;
- H->Elements[ 0 ] = MinData;
-}
-
-int Insert( ElementType X, PriorityQueue H )
-{
- int i;
-
- if( IsFull( H ) )
- {
- return -1;
- }
-
- for( i = ++H->Size; H->Elements[ i / 2 ] > X; i /= 2 )
- H->Elements[ i ] = H->Elements[ i / 2 ];
- H->Elements[ i ] = X;
- return 0;
-}
-
-void Remove( ElementType X, PriorityQueue H )
-{
- int i, Child, removed = 0;
- ElementType LastElement;
-
- for ( i = 1; i <= H->Size; ++i )
- {
- if( H->Elements[ i ] == X )
- {
- removed = i;
- break;
- }
- }
- if( removed == 0 )
- {
- fprintf(stderr, "could not find element %d to remove. not removing any\n", X);
- return;
- }
-
- LastElement = H->Elements[ H->Size-- ];
-
- for( i = removed; i * 2 <= H->Size; i = Child )
- {
- /* Find smaller child */
- Child = i * 2;
- if( Child != H->Size && H->Elements[ Child + 1 ]
- < H->Elements[ Child ] )
- Child++;
-
- /* Percolate one level */
- if( LastElement > H->Elements[ Child ] )
- H->Elements[ i ] = H->Elements[ Child ];
- else
- break;
- }
- H->Elements[ i ] = LastElement;
-}
-
-ElementType DeleteMin( PriorityQueue H )
-{
- int i, Child;
- ElementType MinElement, LastElement;
-
- if( IsEmpty( H ) )
- {
- Error( "Priority queue is empty" );
- return H->Elements[ 0 ];
- }
- MinElement = H->Elements[ 1 ];
- LastElement = H->Elements[ H->Size-- ];
-
- for( i = 1; i * 2 <= H->Size; i = Child )
- {
- /* Find smaller child */
- Child = i * 2;
- if( Child != H->Size && H->Elements[ Child + 1 ]
- < H->Elements[ Child ] )
- Child++;
-
- /* Percolate one level */
- if( LastElement > H->Elements[ Child ] )
- H->Elements[ i ] = H->Elements[ Child ];
- else
- break;
- }
- H->Elements[ i ] = LastElement;
- return MinElement;
-}
-
-ElementType GetMin( PriorityQueue H )
-{
- if( !IsEmpty( H ) )
- return H->Elements[ 1 ];
- Error( "Priority Queue is Empty" );
- return H->Elements[ 0 ];
-}
-
-int IsEmpty( PriorityQueue H )
-{
- return H->Size == 0;
-}
-
-int IsFull( PriorityQueue H )
-{
- return H->Size == H->Capacity;
-}
-
-int GetSize( PriorityQueue H )
-{
- return H->Size;
-}
-
diff --git a/aos/atom_code/ipc_lib/binheap.h b/aos/atom_code/ipc_lib/binheap.h
deleted file mode 100644
index 8c26f9f..0000000
--- a/aos/atom_code/ipc_lib/binheap.h
+++ /dev/null
@@ -1,29 +0,0 @@
-#ifndef _BinHeap_H
-#define _BinHeap_H
-
-#include <stdint.h>
-
-typedef uint8_t ElementType;
-struct HeapStruct;
-typedef struct HeapStruct *PriorityQueue;
-
-struct HeapStruct
-{
- int Capacity;
- int Size;
- ElementType *Elements;
-};
-
-// Elements is the number allocated at H->Elements
-void Initialize( int Elements, PriorityQueue H );
-// 0 if successful, -1 if full
-int Insert( ElementType X, PriorityQueue H );
-void Remove( ElementType X, PriorityQueue H );
-ElementType DeleteMin( PriorityQueue H );
-ElementType GetMin( PriorityQueue H );
-int IsEmpty( PriorityQueue H );
-int IsFull( PriorityQueue H );
-int GetSize( PriorityQueue H );
-
-#endif
-
diff --git a/aos/atom_code/ipc_lib/binheap_test.cpp b/aos/atom_code/ipc_lib/binheap_test.cpp
deleted file mode 100644
index 62eecd4..0000000
--- a/aos/atom_code/ipc_lib/binheap_test.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-extern "C" {
-#include "binheap.h"
-}
-
-#include <gtest/gtest.h>
-
-class BinHeapTest : public testing::Test{
- protected:
- static const int TEST_ELEMENTS = 57;
- PriorityQueue queue;
- virtual void SetUp(){
- queue = new HeapStruct();
- queue->Elements = new uint8_t[TEST_ELEMENTS];
- Initialize(TEST_ELEMENTS, queue);
- }
- virtual void TearDown(){
- delete[] queue->Elements;
- delete queue;
- }
-};
-
-std::ostream& operator<< (std::ostream& o, uint8_t c){
- return o<<(int)c;
-}
-
-testing::AssertionResult Contains(PriorityQueue queue, const uint8_t expected[], size_t length){
- for(size_t i = 0; i < length; ++i){
- //printf("expected[%d]=%d\n", i, expected[i]);
- if(DeleteMin(queue) != expected[i]){
- return testing::AssertionFailure() << "queue[" << i << "] != " << expected[i];
- }
- }
- if(!IsEmpty(queue))
- return testing::AssertionFailure() << "queue is longer than " << length;
- return ::testing::AssertionSuccess();
-}
-
-TEST_F(BinHeapTest, SingleElement){
- Insert(87, queue);
- EXPECT_EQ(87, DeleteMin(queue));
- EXPECT_TRUE(IsEmpty(queue));
-}
-TEST_F(BinHeapTest, MultipleElements){
- Insert(54, queue);
- Insert(1, queue);
- Insert(0, queue);
- Insert(255, queue);
- Insert(123, queue);
- uint8_t expected[] = {0, 1, 54, 123, 255};
- EXPECT_TRUE(Contains(queue, expected, sizeof(expected)));
-}
-TEST_F(BinHeapTest, Removals){
- Insert(54, queue);
- Insert(1, queue);
- Insert(0, queue);
- Insert(255, queue);
- Insert(123, queue);
- Remove(255, queue);
- Remove(0, queue);
- Insert(222, queue);
- Insert(67, queue);
- uint8_t expected[] = {1, 54, 67, 123, 222};
- EXPECT_TRUE(Contains(queue, expected, sizeof(expected)));
-}
-
diff --git a/aos/atom_code/ipc_lib/cmpxchg.h b/aos/atom_code/ipc_lib/cmpxchg.h
index acb4a3c..715c57d 100644
--- a/aos/atom_code/ipc_lib/cmpxchg.h
+++ b/aos/atom_code/ipc_lib/cmpxchg.h
@@ -9,10 +9,10 @@
#define cmpxchg(ptr, o, n) __sync_val_compare_and_swap(ptr, o, n)
/*#define xchg(ptr, n) ({typeof(*ptr) r; \
- do{ \
- r = *ptr; \
- }while(!__sync_bool_compare_and_swap(ptr, r, n)); \
- r; \
+ do{ \
+ r = *ptr; \
+ }while(!__sync_bool_compare_and_swap(ptr, r, n)); \
+ r; \
})*/
# define LOCK "lock;"
@@ -24,7 +24,7 @@
/*static inline void set_64bit(volatile unsigned long *ptr, unsigned long val)
{
- *ptr = val;
+ *ptr = val;
}
#define _set_64bit set_64bit*/
@@ -32,37 +32,37 @@
/*
* Note: no "lock" prefix even on SMP: xchg always implies lock anyway
* Note 2: xchg has side effect, so that attribute volatile is necessary,
- * but generally the primitive is invalid, *ptr is output argument. --ANK
+ * but generally the primitive is invalid, *ptr is output argument. --ANK
*/
static inline unsigned long __xchg(unsigned long x, volatile void * ptr, int size)
{
- switch (size) {
- case 1:
- __asm__ __volatile__("xchgb %b0,%1"
- :"=q" (x)
- :"m" (*__xg(ptr)), "0" (x)
- :"memory");
- break;
- case 2:
- __asm__ __volatile__("xchgw %w0,%1"
- :"=r" (x)
- :"m" (*__xg(ptr)), "0" (x)
- :"memory");
- break;
- case 4:
- __asm__ __volatile__("xchgl %k0,%1"
- :"=r" (x)
- :"m" (*__xg(ptr)), "0" (x)
- :"memory");
- break;
- case 8:
- __asm__ __volatile__("xchg %0,%1"
- :"=r" (x)
- :"m" (*__xg(ptr)), "0" (x)
- :"memory");
- break;
- }
- return x;
+ switch (size) {
+ case 1:
+ __asm__ __volatile__("xchgb %b0,%1"
+ :"=q" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ case 2:
+ __asm__ __volatile__("xchgw %w0,%1"
+ :"=r" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ case 4:
+ __asm__ __volatile__("xchgl %k0,%1"
+ :"=r" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ case 8:
+ __asm__ __volatile__("xchg %0,%1"
+ :"=r" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ }
+ return x;
}
/*
@@ -76,78 +76,78 @@
#define __HAVE_ARCH_CMPXCHG 1
static inline unsigned long __cmpxchg(volatile void *ptr, unsigned long old,
- unsigned long new, int size)
+ unsigned long new, int size)
{
- int32_t prev;
- switch (size) {
- case 1:
- __asm__ __volatile__(LOCK_PREFIX "cmpxchgb %b1,%2"
- : "=a"(prev)
- : "q"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 2:
- __asm__ __volatile__(LOCK_PREFIX "cmpxchgw %w1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 4:
- __asm__ __volatile__(LOCK_PREFIX "cmpxchgl %k1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 8:
- __asm__ __volatile__("lock; cmpxchg %1,%2"
- : "=a"(prev)
- : "q"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- }
- return old;
+ int32_t prev;
+ switch (size) {
+ case 1:
+ __asm__ __volatile__(LOCK_PREFIX "cmpxchgb %b1,%2"
+ : "=a"(prev)
+ : "q"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 2:
+ __asm__ __volatile__(LOCK_PREFIX "cmpxchgw %w1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 4:
+ __asm__ __volatile__(LOCK_PREFIX "cmpxchgl %k1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 8:
+ __asm__ __volatile__("lock; cmpxchg %1,%2"
+ : "=a"(prev)
+ : "q"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ }
+ return old;
}
/*
static inline unsigned long __cmpxchg_local(volatile void *ptr,
- unsigned long old, unsigned long new, int size)
+ unsigned long old, unsigned long new, int size)
{
- unsigned long prev;
- switch (size) {
- case 1:
- __asm__ __volatile__("cmpxchgb %b1,%2"
- : "=a"(prev)
- : "q"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 2:
- __asm__ __volatile__("cmpxchgw %w1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 4:
- __asm__ __volatile__("cmpxchgl %k1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 8:
- __asm__ __volatile__("cmpxchgq %1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- }
- return old;
+ unsigned long prev;
+ switch (size) {
+ case 1:
+ __asm__ __volatile__("cmpxchgb %b1,%2"
+ : "=a"(prev)
+ : "q"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 2:
+ __asm__ __volatile__("cmpxchgw %w1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 4:
+ __asm__ __volatile__("cmpxchgl %k1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 8:
+ __asm__ __volatile__("cmpxchgq %1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ }
+ return old;
}*/
#define cmpxchg(ptr,o,n)\
- ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
- (unsigned long)(n),sizeof(*(ptr))))
+ ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
+ (unsigned long)(n),sizeof(*(ptr))))
/*#define cmpxchg_local(ptr,o,n)\
- ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
- (unsigned long)(n),sizeof(*(ptr))))*/
+ ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
+ (unsigned long)(n),sizeof(*(ptr))))*/
#endif
#endif
diff --git a/aos/atom_code/ipc_lib/condition.cc b/aos/atom_code/ipc_lib/condition.cc
new file mode 100644
index 0000000..b764026
--- /dev/null
+++ b/aos/atom_code/ipc_lib/condition.cc
@@ -0,0 +1,26 @@
+#include "aos/common/condition.h"
+
+#include <inttypes.h>
+
+#include "aos/common/type_traits.h"
+
+namespace aos {
+
+static_assert(shm_ok<Condition>::value, "Condition should work"
+ " in shared memory");
+
+Condition::Condition(Mutex *m) : impl_(), m_(m) {}
+
+void Condition::Wait() {
+ condition_wait(&impl_, &m_->impl_);
+}
+
+void Condition::Signal() {
+ condition_signal(&impl_);
+}
+
+void Condition::Broadcast() {
+ condition_broadcast(&impl_, &m_->impl_);
+}
+
+} // namespace aos
diff --git a/aos/atom_code/ipc_lib/core_lib.c b/aos/atom_code/ipc_lib/core_lib.c
index 53587d8..bbd2f5b 100644
--- a/aos/atom_code/ipc_lib/core_lib.c
+++ b/aos/atom_code/ipc_lib/core_lib.c
@@ -1,49 +1,43 @@
+#include "aos/atom_code/ipc_lib/core_lib.h"
+
#include <stdio.h>
#include <stdlib.h>
-#include "shared_mem.h"
-#include "core_lib.h"
-#include <time.h>
-void init_shared_mem_core(aos_shm_core *shm_core) {
- clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
- shm_core->queues.alloc_flag = 0;
- shm_core->msg_alloc_lock = 0;
- shm_core->queues.queue_list = NULL;
- shm_core->queues.alloc_lock = 0;
-}
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+
static inline uint8_t aos_8max(uint8_t l, uint8_t r) {
- return (l > r) ? l : r;
+ return (l > r) ? l : r;
}
void *shm_malloc_aligned(size_t length, uint8_t alignment) {
- // minimum alignments from <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
- if (length <= 1) {
- alignment = aos_8max(alignment, 1);
- } else if (length <= 2) {
- alignment = aos_8max(alignment, 2);
- } else if (length <= 4) {
- alignment = aos_8max(alignment, 4);
- } else if (length <= 8) {
- alignment = aos_8max(alignment, 8);
- } else if (length <= 16) {
- alignment = aos_8max(alignment, 16);
- } else {
- alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
- }
+ // minimum alignments from <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
+ if (length <= 1) {
+ alignment = aos_8max(alignment, 1);
+ } else if (length <= 2) {
+ alignment = aos_8max(alignment, 2);
+ } else if (length <= 4) {
+ alignment = aos_8max(alignment, 4);
+ } else if (length <= 8) {
+ alignment = aos_8max(alignment, 8);
+ } else if (length <= 16) {
+ alignment = aos_8max(alignment, 16);
+ } else {
+ alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
+ }
- void *msg = NULL;
- aos_shm_core *shm_core = global_core->mem_struct;
- mutex_grab(&shm_core->msg_alloc_lock);
- shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
- const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
- shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
- msg = shm_core->msg_alloc;
- if (msg <= global_core->shared_mem) {
- fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
- printf("if you didn't see the stderr output just then, you should\n");
- abort();
- }
- //printf("alloc %p\n", msg);
- mutex_unlock(&shm_core->msg_alloc_lock);
- return msg;
+ void *msg = NULL;
+ aos_shm_core *shm_core = global_core->mem_struct;
+ mutex_grab(&shm_core->msg_alloc_lock);
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
+ const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
+ msg = shm_core->msg_alloc;
+ if (msg <= global_core->shared_mem) {
+ fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
+ printf("if you didn't see the stderr output just then, you should have\n");
+ abort();
+ }
+ //printf("alloc %p\n", msg);
+ mutex_unlock(&shm_core->msg_alloc_lock);
+ return msg;
}
diff --git a/aos/atom_code/ipc_lib/core_lib.h b/aos/atom_code/ipc_lib/core_lib.h
index f72ae4c..5674220 100644
--- a/aos/atom_code/ipc_lib/core_lib.h
+++ b/aos/atom_code/ipc_lib/core_lib.h
@@ -1,44 +1,14 @@
#ifndef _AOS_CORE_LIB_H_
#define _AOS_CORE_LIB_H_
-// defined in shared_mem.c
-#ifdef __cplusplus
-extern "C" {
-#endif // __cplusplus
-extern struct aos_core *global_core;
-#ifdef __cplusplus
-}
-#endif // __cplusplus
-
-#include "aos_sync.h"
-#include "queue.h"
#include <stdint.h>
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
#ifdef __cplusplus
extern "C" {
#endif // __cplusplus
-struct aos_queue_list_t;
-typedef struct aos_queue_hash_t {
- int alloc_flag;
- mutex alloc_lock;
- struct aos_queue_list_t *queue_list;
-} aos_queue_hash;
-
-typedef struct aos_shm_core_t {
- // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
- // this shared memory area
- struct timespec identifier;
- // gets 0-initialized at the start (as part of shared memory) and
- // the owner sets as soon as it finishes setting stuff up
- mutex creation_condition;
- mutex msg_alloc_lock;
- void *msg_alloc;
- aos_queue_hash queues;
-} aos_shm_core;
-
-void init_shared_mem_core(aos_shm_core *shm_core);
-
void *shm_malloc_aligned(size_t length, uint8_t alignment)
__attribute__((alloc_size(1)));
static void *shm_malloc(size_t length) __attribute__((alloc_size(1)));
diff --git a/aos/atom_code/ipc_lib/ipc_lib.gyp b/aos/atom_code/ipc_lib/ipc_lib.gyp
index 4614680..f947d5e 100644
--- a/aos/atom_code/ipc_lib/ipc_lib.gyp
+++ b/aos/atom_code/ipc_lib/ipc_lib.gyp
@@ -1,41 +1,81 @@
{
'targets': [
{
- 'target_name': 'ipc_lib',
+ 'target_name': 'aos_sync',
'type': 'static_library',
'sources': [
'aos_sync.c',
- 'binheap.c',
+ ],
+ },
+ {
+ 'target_name': 'core_lib',
+ 'type': 'static_library',
+ 'sources': [
'core_lib.c',
- 'queue.c',
+ ],
+ 'dependencies': [
+ 'aos_sync',
+ 'shared_mem',
+ ],
+ 'export_dependent_settings': [
+ 'aos_sync',
+ ],
+ },
+ {
+ 'target_name': 'shared_mem',
+ 'type': 'static_library',
+ 'sources': [
'shared_mem.c',
],
'dependencies': [
+ 'aos_sync',
+ ],
+ 'export_dependent_settings': [
+ 'aos_sync',
+ ],
+ },
+ {
+ 'target_name': 'queue',
+ 'type': 'static_library',
+ 'sources': [
+ 'queue.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/common/common.gyp:condition',
+ '<(AOS)/common/common.gyp:mutex',
+ 'core_lib',
# TODO(brians): fix this once there's a nice logging interface to use
# '<(AOS)/build/aos.gyp:logging',
],
},
{
- 'target_name': 'binheap_test',
+ 'target_name': 'raw_queue_test',
'type': 'executable',
'sources': [
- 'binheap_test.cpp',
+ 'queue_test.cc',
],
'dependencies': [
'<(EXTERNALS):gtest',
- 'ipc_lib',
+ 'queue',
+ '<(AOS)/build/aos.gyp:logging',
+ 'core_lib',
+ '<(AOS)/common/common.gyp:queue_testutils',
+ '<(AOS)/common/common.gyp:time',
],
},
{
- 'target_name': 'ipc_queue_test',
+ 'target_name': 'ipc_stress_test',
'type': 'executable',
'sources': [
- 'queue_test.cpp',
+ 'ipc_stress_test.cc',
],
'dependencies': [
'<(EXTERNALS):gtest',
- 'ipc_lib',
- '<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/common/common.gyp:time',
+ '<(AOS)/common/common.gyp:queue_testutils',
+ '<(AOS)/common/common.gyp:mutex',
+ 'core_lib',
+ '<(AOS)/common/common.gyp:die',
],
},
],
diff --git a/aos/atom_code/ipc_lib/ipc_stress_test.cc b/aos/atom_code/ipc_lib/ipc_stress_test.cc
new file mode 100644
index 0000000..38c425f
--- /dev/null
+++ b/aos/atom_code/ipc_lib/ipc_stress_test.cc
@@ -0,0 +1,248 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <libgen.h>
+#include <assert.h>
+
+#include <vector>
+#include <string>
+
+#include "aos/common/time.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/common/mutex.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+#include "aos/common/die.h"
+
+// This runs all of the IPC-related tests in a bunch of parallel processes for a
+// while and makes sure that they don't fail. It also captures the stdout and
+// stderr output from each test run and only prints it out (not interleaved with
+// the output from any other run) if the test fails.
+//
+// It's written in C++ for performance. We need actual OS-level parallelism for
+// this to work, which means that Ruby's out because it doesn't have good
+// support for doing that. My Python implementation ended up pretty heavily disk
+// IO-bound, which is a bad way to test CPU contention.
+
+namespace aos {
+
+// Each test is represented by the name of the test binary and then any
+// arguments to pass to it.
+// Using --gtest_filter is a bad idea because it seems to result in a lot of
+// swapping which causes everything to be disk-bound (at least for me).
+static const ::std::vector< ::std::vector< ::std::string>> kTests = {
+ {"queue_test"},
+ {"condition_test"},
+ {"mutex_test"},
+ {"raw_queue_test"},
+};
+// These arguments get inserted before any per-test arguments.
+static const ::std::vector< ::std::string> kDefaultArgs = {
+ "--gtest_repeat=30",
+ "--gtest_shuffle",
+};
+
+// How many test processes to run at a time.
+static const int kTesters = 100;
+// How long to test for.
+static constexpr time::Time kTestTime = time::Time::InSeconds(30);
+
+// The structure that gets put into shared memory and then referenced by all of
+// the child processes.
+struct Shared {
+ Shared(const time::Time &stop_time)
+ : stop_time(stop_time), total_iterations(0) {}
+
+ // Synchronizes access to stdout/stderr to avoid interleaving failure
+ // messages.
+ Mutex output_mutex;
+
+ // When to stop.
+ time::Time stop_time;
+
+ // The total number of iterations. Updated by each child as it finishes.
+ int total_iterations;
+ // Sychronizes writes to total_iterations
+ Mutex total_iterations_mutex;
+
+ const char *path;
+};
+static_assert(shm_ok<Shared>::value,
+ "it's going to get shared between forked processes");
+
+// Gets called after each child forks to run a test.
+void __attribute__((noreturn)) DoRunTest(
+ Shared *shared, const ::std::vector< ::std::string> &test, int pipes[2]) {
+ if (close(pipes[0]) == -1) {
+ Die("close(%d) of read end of pipe failed with %d: %s\n",
+ pipes[0], errno, strerror(errno));
+ }
+ if (close(STDIN_FILENO) == -1) {
+ Die("close(STDIN_FILENO(=%d)) failed with %d: %s\n",
+ STDIN_FILENO, errno, strerror(errno));
+ }
+ if (dup2(pipes[1], STDOUT_FILENO) == -1) {
+ Die("dup2(%d, STDOUT_FILENO(=%d)) failed with %d: %s\n",
+ pipes[1], STDOUT_FILENO, errno, strerror(errno));
+ }
+ if (dup2(pipes[1], STDERR_FILENO) == -1) {
+ Die("dup2(%d, STDERR_FILENO(=%d)) failed with %d: %s\n",
+ pipes[1], STDERR_FILENO, errno, strerror(errno));
+ }
+
+ size_t size = test.size();
+ size_t default_size = kDefaultArgs.size();
+ const char **args = new const char *[size + default_size + 1];
+ // The actual executable to run.
+ ::std::string executable;
+ int i = 0;
+ for (const ::std::string &c : test) {
+ if (i == 0) {
+ executable = ::std::string(shared->path) + "/" + c;
+ args[0] = executable.c_str();
+ for (const ::std::string &ci : kDefaultArgs) {
+ args[++i] = ci.c_str();
+ }
+ } else {
+ args[i] = c.c_str();
+ }
+ ++i;
+ }
+ args[size] = NULL;
+ execv(executable.c_str(), const_cast<char *const *>(args));
+ Die("execv(%s, %p) failed with %d: %s\n",
+ executable.c_str(), args, errno, strerror(errno));
+}
+
+void DoRun(Shared *shared) {
+ int iterations = 0;
+ // An iterator pointing to a random one of the tests.
+ auto test = kTests.begin() + (getpid() % kTests.size());
+ int pipes[2];
+ while (time::Time::Now() < shared->stop_time) {
+ if (pipe(pipes) == -1) {
+ Die("pipe(%p) failed with %d: %s\n", &pipes, errno, strerror(errno));
+ }
+ switch (fork()) {
+ case 0: // in runner
+ DoRunTest(shared, *test, pipes);
+ case -1:
+ Die("fork() failed with %d: %s\n", errno, strerror(errno));
+ }
+
+ if (close(pipes[1]) == -1) {
+ Die("close(%d) of write end of pipe failed with %d: %s\n",
+ pipes[1], errno, strerror(errno));
+ }
+
+ ::std::string output;
+ char buffer[2048];
+ while (true) {
+ ssize_t ret = read(pipes[0], &buffer, sizeof(buffer));
+ if (ret == 0) { // EOF
+ if (close(pipes[0]) == -1) {
+ Die("close(%d) of pipe at EOF failed with %d: %s\n",
+ pipes[0], errno, strerror(errno));
+ }
+ break;
+ } else if (ret == -1) {
+ Die("read(%d, %p, %zd) failed with %d: %s\n",
+ pipes[0], &buffer, sizeof(buffer), errno, strerror(errno));
+ }
+ output += ::std::string(buffer, ret);
+ }
+
+ int status;
+ while (true) {
+ if (wait(&status) == -1) {
+ if (errno == EINTR) continue;
+ Die("wait(%p) in child failed with %d: %s\n",
+ &status, errno, strerror(errno));
+ } else {
+ break;
+ }
+ }
+ if (WIFEXITED(status)) {
+ if (WEXITSTATUS(status) != 0) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s exited with status %d. output:\n",
+ test->at(0).c_str(), WEXITSTATUS(status));
+ fputs(output.c_str(), stderr);
+ }
+ } else if (WIFSIGNALED(status)) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s terminated by signal %d: %s.\n",
+ test->at(0).c_str(),
+ WTERMSIG(status), strsignal(WTERMSIG(status)));
+ fputs(output.c_str(), stderr);
+ } else {
+ assert(WIFSTOPPED(status));
+ Die("Test %s was stopped.\n", test->at(0).c_str());
+ }
+
+ ++test;
+ if (test == kTests.end()) test = kTests.begin();
+ ++iterations;
+ }
+ {
+ MutexLocker sync(&shared->total_iterations_mutex);
+ shared->total_iterations += iterations;
+ }
+}
+
+void Run(Shared *shared) {
+ switch (fork()) {
+ case 0: // in child
+ DoRun(shared);
+ _exit(EXIT_SUCCESS);
+ case -1:
+ Die("fork() of child failed with %d: %s\n", errno, strerror(errno));
+ }
+}
+
+int Main(int argc, char **argv) {
+ assert(argc >= 1);
+
+ ::aos::common::testing::GlobalCoreInstance global_core;
+
+ Shared *shared = static_cast<Shared *>(shm_malloc(sizeof(Shared)));
+ new (shared) Shared(time::Time::Now() + kTestTime);
+
+ char *temp = strdup(argv[0]);
+ shared->path = strdup(dirname(temp));
+ free(temp);
+
+ for (int i = 0; i < kTesters; ++i) {
+ Run(shared);
+ }
+
+ bool error = false;
+ for (int i = 0; i < kTesters; ++i) {
+ int status;
+ if (wait(&status) == -1) {
+ if (errno == EINTR) {
+ --i;
+ } else {
+ Die("wait(%p) failed with %d: %s\n", &status, errno, strerror(errno));
+ }
+ }
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ error = true;
+ }
+ }
+
+ printf("Ran a total of %d tests.\n", shared->total_iterations);
+ if (error) {
+ printf("A child had a problem during the test.\n");
+ }
+ return error ? EXIT_FAILURE : EXIT_SUCCESS;
+}
+
+} // namespace aos
+
+int main(int argc, char **argv) {
+ return ::aos::Main(argc, argv);
+}
diff --git a/aos/atom_code/ipc_lib/mutex.cpp b/aos/atom_code/ipc_lib/mutex.cpp
index 4f908dd..47fc92a 100644
--- a/aos/atom_code/ipc_lib/mutex.cpp
+++ b/aos/atom_code/ipc_lib/mutex.cpp
@@ -26,10 +26,7 @@
}
void Mutex::Unlock() {
- if (mutex_unlock(&impl_) != 0) {
- LOG(FATAL, "mutex_unlock(%p(=%" PRIu32 ")) failed because of %d: %s\n",
- &impl_, impl_, errno, strerror(errno));
- }
+ mutex_unlock(&impl_);
}
bool Mutex::TryLock() {
diff --git a/aos/atom_code/ipc_lib/queue.c b/aos/atom_code/ipc_lib/queue.c
deleted file mode 100644
index 2e45326..0000000
--- a/aos/atom_code/ipc_lib/queue.c
+++ /dev/null
@@ -1,508 +0,0 @@
-#include "aos/atom_code/ipc_lib/queue.h"
-#include "aos/atom_code/ipc_lib/queue_internal.h"
-
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-#include <assert.h>
-
-#include "aos/common/logging/logging.h"
-
-#define READ_DEBUG 0
-#define WRITE_DEBUG 0
-#define REF_DEBUG 0
-
-static inline aos_msg_header *get_header(void *msg) {
- return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
-}
-static inline aos_queue *aos_core_alloc_queue() {
- return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
-}
-static inline void *aos_alloc_msg(aos_msg_pool *pool) {
- return shm_malloc(pool->msg_length);
-}
-
-// actually free the given message
-static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
-#if REF_DEBUG
- if (pool->pool_lock == 0) {
- //LOG(WARNING, "unprotected\n");
- }
-#endif
- aos_msg_header *header = get_header(msg);
- if (pool->pool[header->index] != header) { // if something's messed up
- fprintf(stderr, "queue: something is very very wrong with queue %p."
- " pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
- queue, pool->pool, header->index, header);
- printf("queue: see stderr\n");
- abort();
- }
-#if REF_DEBUG
- printf("ref free: %p\n", msg);
-#endif
- --pool->used;
-
- if (queue->recycle != NULL) {
- void *const new_msg = aos_queue_get_msg(queue->recycle);
- if (new_msg == NULL) {
- fprintf(stderr, "queue: couldn't get a message"
- " for recycle queue %p\n", queue->recycle);
- } else {
- // Take a message from recycle_queue and switch its
- // header with the one being freed, which effectively
- // switches which queue each message belongs to.
- aos_msg_header *const new_header = get_header(new_msg);
- // also switch the messages between the pools
- pool->pool[header->index] = new_header;
- if (mutex_lock(&queue->recycle->pool.pool_lock)) {
- return -1;
- }
- queue->recycle->pool.pool[new_header->index] = header;
- // swap the information in both headers
- header_swap(header, new_header);
- // don't unlock the other pool until all of its messages are valid
- mutex_unlock(&queue->recycle->pool.pool_lock);
- // use the header for new_msg which is now for this pool
- header = new_header;
- if (aos_queue_write_msg_free(queue->recycle,
- (void *)msg, OVERRIDE) != 0) {
- printf("queue: warning aos_queue_write_msg("
- "%p(=queue(=%p)->recycle), %p, OVERRIDE)"
- " failed\n",
- queue->recycle, queue, msg);
- }
- msg = new_msg;
- }
- }
-
- // where the one we're freeing was
- int index = header->index;
- header->index = -1;
- if (index != pool->used) { // if we're not freeing the one on the end
- // put the last one where the one we're freeing was
- header = pool->pool[index] = pool->pool[pool->used];
- // put the one we're freeing at the end
- pool->pool[pool->used] = get_header(msg);
- // update the former last one's index
- header->index = index;
- }
- return 0;
-}
-// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
-static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
- if (msg == NULL) {
- return 0;
- }
-
- int rv = 0;
- if (mutex_lock(&pool->pool_lock)) {
- return -1;
- }
- aos_msg_header *const header = get_header(msg);
- header->ref_count --;
- assert(header->ref_count >= 0);
-#if REF_DEBUG
- printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
-#endif
- if (header->ref_count == 0) {
- rv = aos_free_msg(pool, msg, queue);
- }
- mutex_unlock(&pool->pool_lock);
- return rv;
-}
-
-static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
- if (sig1->length != sig2->length) {
- //LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
- return 0;
- }
- if (sig1->queue_length != sig2->queue_length) {
- //LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
- return 0;
- }
- if (sig1->hash != sig2->hash) {
- //LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
- return 0;
- }
- //LOG(DEBUG, "signature match\n");
- return 1;
-}
-static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
- aos_queue *const queue = aos_core_alloc_queue();
- aos_msg_pool *const pool = &queue->pool;
- pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
- pool->length = 0;
- pool->used = 0;
- pool->msg_length = sig->length + sizeof(aos_msg_header);
- pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
- aos_ring_buf *const buf = &queue->buf;
- buf->length = sig->queue_length + 1;
- if (buf->length < 2) { // TODO(brians) when could this happen?
- buf->length = 2;
- }
- buf->data = shm_malloc(buf->length * sizeof(void *));
- buf->start = 0;
- buf->end = 0;
- buf->msgs = 0;
- buf->writable = 1;
- buf->readable = 0;
- buf->buff_lock = 0;
- pool->pool_lock = 0;
- queue->recycle = NULL;
- return queue;
-}
-aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
- //LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
- mutex_grab(&global_core->mem_struct->queues.alloc_lock);
- aos_queue_list *list = global_core->mem_struct->queues.queue_list;
- aos_queue_list *last = NULL;
- while (list != NULL) {
- // if we found a matching queue
- if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
- mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
- return list->queue;
- } else {
- //LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
- }
- last = list;
- list = list->next;
- }
- list = shm_malloc(sizeof(aos_queue_list));
- if (last == NULL) {
- global_core->mem_struct->queues.queue_list = list;
- } else {
- last->next = list;
- }
- list->sig = *sig;
- const size_t name_size = strlen(name) + 1;
- list->name = shm_malloc(name_size);
- memcpy(list->name, name, name_size);
- //LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
- list->queue = aos_create_queue(sig);
- //LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
- list->next = NULL;
- mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
- return list->queue;
-}
-aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
- const aos_type_sig *recycle_sig, aos_queue **recycle) {
- if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
- *recycle = NULL;
- return NULL;
- }
- aos_queue *const r = aos_fetch_queue(name, sig);
- r->recycle = aos_fetch_queue(name, recycle_sig);
- if (r == r->recycle) {
- fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
- printf("see stderr\n");
- abort();
- }
- *recycle = r->recycle;
- return r;
-}
-
-int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
-#if WRITE_DEBUG
- printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
-#endif
- int rv = 0;
- if (msg == NULL || msg < (void *)global_core->mem_struct ||
- msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
- fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
- msg, queue);
- printf("see stderr\n");
- abort();
- }
- aos_ring_buf *const buf = &queue->buf;
- if (mutex_lock(&buf->buff_lock)) {
-#if WRITE_DEBUG
- printf("queue: locking buff_lock of %p failed\n", buf);
-#endif
- return -1;
- }
- int new_end = (buf->end + 1) % buf->length;
- while (new_end == buf->start) {
- if (opts & NON_BLOCK) {
-#if WRITE_DEBUG
- printf("queue: not blocking on %p. returning -1\n", queue);
-#endif
- mutex_unlock(&buf->buff_lock);
- return -1;
- } else if (opts & OVERRIDE) {
-#if WRITE_DEBUG
- printf("queue: overriding on %p\n", queue);
-#endif
- // avoid leaking the message that we're going to overwrite
- msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
- buf->start = (buf->start + 1) % buf->length;
- } else { // BLOCK
- mutex_unlock(&buf->buff_lock);
-#if WRITE_DEBUG
- printf("queue: going to wait for writable(=%p) of %p\n",
- &buf->writable, queue);
-#endif
- if (condition_wait(&buf->writable)) {
-#if WRITE_DEBUG
- printf("queue: waiting for writable(=%p) of %p failed\n",
- &buf->writable, queue);
-#endif
- return -1;
- }
-#if WRITE_DEBUG
- printf("queue: going to re-lock buff_lock of %p to write\n", queue);
-#endif
- if (mutex_lock(&buf->buff_lock)) {
-#if WRITE_DEBUG
- printf("queue: error locking buff_lock of %p\n", queue);
-#endif
- return -1;
- }
- }
- new_end = (buf->end + 1) % buf->length;
- }
- buf->data[buf->end] = msg;
- ++buf->msgs;
- buf->end = new_end;
- mutex_unlock(&buf->buff_lock);
-#if WRITE_DEBUG
- printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
-#endif
- condition_set(&buf->readable);
- if (((buf->end + 1) % buf->length) == buf->start) { // if it's now full
- condition_unset(&buf->writable);
- }
-#if WRITE_DEBUG
- printf("queue: write returning %d on queue %p\n", rv, queue);
-#endif
- return rv;
-}
-
-int aos_queue_free_msg(aos_queue *queue, const void *msg) {
- // TODO(brians) get rid of this
- void *msg_temp;
- memcpy(&msg_temp, &msg, sizeof(msg_temp));
- return msg_ref_dec(msg_temp, &queue->pool, queue);
-}
-// Deals with setting/unsetting readable and writable.
-// Should be called after buff_lock has been unlocked.
-// read is whether or not this read call read one off the queue
-static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
- if (read) {
- condition_set(&buf->writable);
- if (buf->start == buf->end) {
- condition_unset(&buf->readable);
- }
- }
-}
-// Returns with buff_lock locked and a readable message in buf.
-// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
-static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
- aos_queue *const queue, int *index) {
-#if !READ_DEBUG
- (void)queue;
-#endif
- if (mutex_lock(&buf->buff_lock)) {
-#if READ_DEBUG
- printf("queue: couldn't lock buff_lock of %p\n", queue);
-#endif
- return -1;
- }
- while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
- mutex_unlock(&buf->buff_lock);
- if (opts & NON_BLOCK) {
-#if READ_DEBUG
- printf("queue: not going to block waiting on %p\n", queue);
-#endif
- return -1;
- } else { // BLOCK
-#if READ_DEBUG
- printf("queue: going to wait for readable(=%p) of %p\n",
- &buf->readable, queue);
-#endif
- // wait for a message to become readable
- if ((index == NULL) ? condition_wait(&buf->readable) :
- condition_wait_force(&buf->readable)) {
-#if READ_DEBUG
- printf("queue: waiting for readable(=%p) of %p failed\n",
- &buf->readable, queue);
-#endif
- return -1;
- }
- }
-#if READ_DEBUG
- printf("queue: going to re-lock buff_lock of %p to read\n", queue);
-#endif
- if (mutex_lock(&buf->buff_lock)) {
-#if READ_DEBUG
- printf("couldn't re-lock buff_lock of %p\n", queue);
-#endif
- return -1;
- }
- }
-#if READ_DEBUG
- printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
-#endif
- return 0;
-}
-// handles reading with PEEK
-static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
- void *ret;
- if (opts & FROM_END) {
- int pos = buf->end - 1;
- if (pos < 0) { // if it needs to wrap
- pos = buf->length - 1;
- }
-#if READ_DEBUG
- printf("queue: reading from line %d: %d\n", __LINE__, pos);
-#endif
- ret = buf->data[pos];
- } else {
-#if READ_DEBUG
- printf("queue: reading from line %d: %d\n", __LINE__, start);
-#endif
- ret = buf->data[start];
- }
- aos_msg_header *const header = get_header(ret);
- header->ref_count ++;
-#if REF_DEBUG
- printf("ref inc count: %p\n", ret);
-#endif
- return ret;
-}
-const void *aos_queue_read_msg(aos_queue *queue, int opts) {
-#if READ_DEBUG
- printf("queue: read_msg(%p, %d)\n", queue, opts);
-#endif
- void *msg = NULL;
- aos_ring_buf *const buf = &queue->buf;
- if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
-#if READ_DEBUG
- printf("queue: common returned -1 for %p\n", queue);
-#endif
- return NULL;
- }
- if (opts & PEEK) {
- msg = read_msg_peek(buf, opts, buf->start);
- } else {
- if (opts & FROM_END) {
- while (1) {
-#if READ_DEBUG
- printf("queue: start of c2 of %p\n", queue);
-#endif
- // This loop pulls each message out of the buffer.
- const int pos = buf->start;
- buf->start = (buf->start + 1) % buf->length;
- // if this is the last one
- if (buf->start == buf->end) {
-#if READ_DEBUG
- printf("queue: reading from c2: %d\n", pos);
-#endif
- msg = buf->data[pos];
- break;
- }
- // it's not going to be in the queue any more
- msg_ref_dec(buf->data[pos], &queue->pool, queue);
- }
- } else {
-#if READ_DEBUG
- printf("queue: reading from d2: %d\n", buf->start);
-#endif
- msg = buf->data[buf->start];
- buf->start = (buf->start + 1) % buf->length;
- }
- }
- mutex_unlock(&buf->buff_lock);
- aos_read_msg_common_end(buf, !(opts & PEEK));
-#if READ_DEBUG
- printf("queue: read returning %p\n", msg);
-#endif
- return msg;
-}
-const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
-#if READ_DEBUG
- printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
-#endif
- void *msg = NULL;
- aos_ring_buf *const buf = &queue->buf;
- if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
-#if READ_DEBUG
- printf("queue: common returned -1\n");
-#endif
- return NULL;
- }
- // TODO(parker): Handle integer wrap on the index.
- const int offset = buf->msgs - *index;
- int my_start = buf->end - offset;
- if (offset >= buf->length) { // if we're behind the available messages
- // catch index up to the last available message
- *index += buf->start - my_start;
- // and that's the one we're going to read
- my_start = buf->start;
- }
- if (my_start < 0) { // if we want to read off the end of the buffer
- // unwrap where we're going to read from
- my_start += buf->length;
- }
- if (opts & PEEK) {
- msg = read_msg_peek(buf, opts, my_start);
- } else {
- if (opts & FROM_END) {
-#if READ_DEBUG
- printf("queue: start of c1 of %p\n", queue);
-#endif
- int pos = buf->end - 1;
- if (pos < 0) { // if it wrapped
- pos = buf->length - 1; // unwrap it
- }
-#if READ_DEBUG
- printf("queue: reading from c1: %d\n", pos);
-#endif
- msg = buf->data[pos];
- *index = buf->msgs;
- } else {
-#if READ_DEBUG
- printf("queue: reading from d1: %d\n", my_start);
-#endif
- msg = buf->data[my_start];
- ++(*index);
- }
- aos_msg_header *const header = get_header(msg);
- ++header->ref_count;
-#if REF_DEBUG
- printf("ref_inc_count: %p\n", msg);
-#endif
- }
- mutex_unlock(&buf->buff_lock);
- // this function never consumes one off the queue
- aos_read_msg_common_end(buf, 0);
- return msg;
-}
-static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
- if (mutex_lock(&pool->pool_lock)) {
- return NULL;
- }
- void *msg;
- if (pool->length - pool->used > 0) {
- msg = pool->pool[pool->used];
- } else {
- if (pool->length >= pool->mem_length) {
- LOG(FATAL, "overused pool %p\n", pool);
- }
- msg = pool->pool[pool->length] = aos_alloc_msg(pool);
- ++pool->length;
- }
- aos_msg_header *const header = msg;
- msg = (uint8_t *)msg + sizeof(aos_msg_header);
- header->ref_count = 1;
-#if REF_DEBUG
- printf("ref alloc: %p\n", msg);
-#endif
- header->index = pool->used;
- ++pool->used;
- mutex_unlock(&pool->pool_lock);
- return msg;
-}
-void *aos_queue_get_msg(aos_queue *queue) {
- return aos_pool_get_msg(&queue->pool);
-}
-
diff --git a/aos/atom_code/ipc_lib/queue.cc b/aos/atom_code/ipc_lib/queue.cc
new file mode 100644
index 0000000..7ab7b6c
--- /dev/null
+++ b/aos/atom_code/ipc_lib/queue.cc
@@ -0,0 +1,491 @@
+#include "aos/atom_code/ipc_lib/queue.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <memory>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+
+namespace aos {
+namespace {
+
+static_assert(shm_ok<RawQueue>::value,
+ "RawQueue instances go into shared memory");
+
+const bool kReadDebug = false;
+const bool kWriteDebug = false;
+const bool kRefDebug = false;
+const bool kFetchDebug = false;
+
+// The number of extra messages the pool associated with each queue will be able
+// to hold (for readers who are slow about freeing them or who leak one when
+// they get killed).
+const int kExtraMessages = 20;
+
+} // namespace
+
+const int RawQueue::kPeek;
+const int RawQueue::kFromEnd;
+const int RawQueue::kNonBlock;
+const int RawQueue::kBlock;
+const int RawQueue::kOverride;
+
+struct RawQueue::MessageHeader {
+ int ref_count;
+ int index; // in pool_
+ static MessageHeader *Get(const void *msg) {
+ return reinterpret_cast<MessageHeader *>(
+ static_cast<uint8_t *>(const_cast<void *>(msg)) -
+ sizeof(MessageHeader));
+ }
+ void Swap(MessageHeader *other) {
+ MessageHeader temp;
+ memcpy(&temp, other, sizeof(temp));
+ memcpy(other, this, sizeof(*other));
+ memcpy(this, &temp, sizeof(*this));
+ }
+};
+static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
+ " is to stick it in shared memory");
+
+struct RawQueue::ReadData {
+ bool writable_start;
+};
+
+// TODO(brians) maybe do this with atomic integer instructions so it doesn't
+// have to lock/unlock pool_lock_
+void RawQueue::DecrementMessageReferenceCount(const void *msg) {
+ MutexLocker locker(&pool_lock_);
+ MessageHeader *header = MessageHeader::Get(msg);
+ --header->ref_count;
+ assert(header->ref_count >= 0);
+ if (kRefDebug) {
+ printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
+ }
+ if (header->ref_count == 0) {
+ DoFreeMessage(msg);
+ }
+}
+
+RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
+ : readable_(&data_lock_), writable_(&data_lock_) {
+ const size_t name_size = strlen(name) + 1;
+ char *temp = static_cast<char *>(shm_malloc(name_size));
+ memcpy(temp, name, name_size);
+ name_ = temp;
+ length_ = length;
+ hash_ = hash;
+ queue_length_ = queue_length;
+
+ next_ = NULL;
+ recycle_ = NULL;
+
+ if (kFetchDebug) {
+ printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
+ name, length, hash, queue_length);
+ }
+
+ data_length_ = queue_length + 1;
+ if (data_length_ < 2) { // TODO(brians) when could this happen?
+ data_length_ = 2;
+ }
+ data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
+ data_start_ = 0;
+ data_end_ = 0;
+ messages_ = 0;
+
+ mem_length_ = queue_length + kExtraMessages;
+ pool_length_ = 0;
+ messages_used_ = 0;
+ msg_length_ = length + sizeof(MessageHeader);
+ pool_ = static_cast<MessageHeader **>(
+ shm_malloc(sizeof(MessageHeader *) * mem_length_));
+
+ if (kFetchDebug) {
+ printf("made queue %s\n", name);
+ }
+}
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+ int queue_length) {
+ if (kFetchDebug) {
+ printf("fetching queue %s\n", name);
+ }
+ if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
+ return NULL;
+ }
+ RawQueue *current = static_cast<RawQueue *>(
+ global_core->mem_struct->queues.queue_list);
+ if (current != NULL) {
+ while (true) {
+ // If we found a matching queue.
+ if (strcmp(current->name_, name) == 0 && current->length_ == length &&
+ current->hash_ == hash && current->queue_length_ == queue_length) {
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return current;
+ } else {
+ if (kFetchDebug) {
+ printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
+ strcmp(current->name_, name), name);
+ }
+ }
+ // If this is the last one.
+ if (current->next_ == NULL) break;
+ current = current->next_;
+ }
+ }
+
+ RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
+ RawQueue(name, length, hash, queue_length);
+ if (current == NULL) { // if we don't already have one
+ global_core->mem_struct->queues.queue_list = r;
+ } else {
+ current->next_ = r;
+ }
+
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return r;
+}
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_length, RawQueue **recycle) {
+ RawQueue *r = Fetch(name, length, hash, queue_length);
+ r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
+ if (r == r->recycle_) {
+ fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
+ printf("see stderr\n");
+ r->recycle_ = NULL;
+ abort();
+ }
+ *recycle = r->recycle_;
+ return r;
+}
+
+void RawQueue::DoFreeMessage(const void *msg) {
+ MessageHeader *header = MessageHeader::Get(msg);
+ if (pool_[header->index] != header) { // if something's messed up
+ fprintf(stderr, "queue: something is very very wrong with queue %p."
+ " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
+ this, pool_, header->index, header);
+ printf("queue: see stderr\n");
+ abort();
+ }
+ if (kRefDebug) {
+ printf("ref free: %p\n", msg);
+ }
+ --messages_used_;
+
+ if (recycle_ != NULL) {
+ void *const new_msg = recycle_->GetMessage();
+ if (new_msg == NULL) {
+ fprintf(stderr, "queue: couldn't get a message"
+ " for recycle queue %p\n", recycle_);
+ } else {
+ // Take a message from recycle_ and switch its
+ // header with the one being freed, which effectively
+ // switches which queue each message belongs to.
+ MessageHeader *const new_header = MessageHeader::Get(new_msg);
+ // Also switch the messages between the pools.
+ pool_[header->index] = new_header;
+ {
+ MutexLocker locker(&recycle_->pool_lock_);
+ recycle_->pool_[new_header->index] = header;
+ // Swap the information in both headers.
+ header->Swap(new_header);
+ // Don't unlock the other pool until all of its messages are valid.
+ }
+ // use the header for new_msg which is now for this pool
+ header = new_header;
+ if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
+ fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
+ " aborting\n", recycle_, msg);
+ printf("see stderr\n");
+ abort();
+ }
+ msg = new_msg;
+ }
+ }
+
+ // Where the one we're freeing was.
+ int index = header->index;
+ header->index = -1;
+ if (index != messages_used_) { // if we're not freeing the one on the end
+ // Put the last one where the one we're freeing was.
+ header = pool_[index] = pool_[messages_used_];
+ // Put the one we're freeing at the end.
+ pool_[messages_used_] = MessageHeader::Get(msg);
+ // Update the former last one's index.
+ header->index = index;
+ }
+}
+
+bool RawQueue::WriteMessage(void *msg, int options) {
+ if (kWriteDebug) {
+ printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
+ }
+ if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
+ msg > static_cast<void *>((
+ reinterpret_cast<char *>(global_core->mem_struct) +
+ global_core->size))) {
+ fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
+ msg, this);
+ printf("see stderr\n");
+ abort();
+ }
+ {
+ MutexLocker locker(&data_lock_);
+ bool writable_waited = false;
+
+ int new_end;
+ while (true) {
+ new_end = (data_end_ + 1) % data_length_;
+ // If there is room in the queue right now.
+ if (new_end != data_start_) break;
+ if (options & kNonBlock) {
+ if (kWriteDebug) {
+ printf("queue: not blocking on %p. returning false\n", this);
+ }
+ return false;
+ } else if (options & kOverride) {
+ if (kWriteDebug) {
+ printf("queue: overriding on %p\n", this);
+ }
+ // Avoid leaking the message that we're going to overwrite.
+ DecrementMessageReferenceCount(data_[data_start_]);
+ data_start_ = (data_start_ + 1) % data_length_;
+ } else { // kBlock
+ if (kWriteDebug) {
+ printf("queue: going to wait for writable_ of %p\n", this);
+ }
+ writable_.Wait();
+ writable_waited = true;
+ }
+ }
+ data_[data_end_] = msg;
+ ++messages_;
+ data_end_ = new_end;
+
+ if (kWriteDebug) {
+ printf("queue: broadcasting to readable_ of %p\n", this);
+ }
+ readable_.Broadcast();
+
+ // If we got a signal on writable_ here and it's still writable, then we
+ // need to signal the next person in line (if any).
+ if (writable_waited && is_writable()) {
+ if (kWriteDebug) {
+ printf("queue: resignalling writable_ of %p\n", this);
+ }
+ writable_.Signal();
+ }
+ }
+ if (kWriteDebug) {
+ printf("queue: write returning true on queue %p\n", this);
+ }
+ return true;
+}
+
+void RawQueue::ReadCommonEnd(ReadData *read_data) {
+ if (is_writable()) {
+ if (kReadDebug) {
+ printf("queue: %ssignalling writable_ of %p\n",
+ read_data->writable_start ? "not " : "", this);
+ }
+ if (!read_data->writable_start) writable_.Signal();
+ }
+}
+bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
+ read_data->writable_start = is_writable();
+ while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
+ if (options & kNonBlock) {
+ if (kReadDebug) {
+ printf("queue: not going to block waiting on %p\n", this);
+ }
+ return false;
+ } else { // kBlock
+ if (kReadDebug) {
+ printf("queue: going to wait for readable_ of %p\n", this);
+ }
+ // Wait for a message to become readable.
+ readable_.Wait();
+ if (kReadDebug) {
+ printf("queue: done waiting for readable_ of %p\n", this);
+ }
+ }
+ }
+ if (kReadDebug) {
+ printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
+ }
+ return true;
+}
+void *RawQueue::ReadPeek(int options, int start) {
+ void *ret;
+ if (options & kFromEnd) {
+ int pos = data_end_ - 1;
+ if (pos < 0) { // if it needs to wrap
+ pos = data_length_ - 1;
+ }
+ if (kReadDebug) {
+ printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
+ }
+ ret = data_[pos];
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
+ }
+ ret = data_[start];
+ }
+ MessageHeader *const header = MessageHeader::Get(ret);
+ ++header->ref_count;
+ if (kRefDebug) {
+ printf("ref inc count: %p\n", ret);
+ }
+ return ret;
+}
+const void *RawQueue::ReadMessage(int options) {
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessage(%x)\n", this, options);
+ }
+ void *msg = NULL;
+
+ MutexLocker locker(&data_lock_);
+
+ ReadData read_data;
+ if (!ReadCommonStart(options, NULL, &read_data)) {
+ if (kReadDebug) {
+ printf("queue: %p common returned false\n", this);
+ }
+ return NULL;
+ }
+
+ if (options & kPeek) {
+ msg = ReadPeek(options, data_start_);
+ } else {
+ if (options & kFromEnd) {
+ while (true) {
+ if (kReadDebug) {
+ printf("queue: %p start of c2\n", this);
+ }
+ // This loop pulls each message out of the buffer.
+ const int pos = data_start_;
+ data_start_ = (data_start_ + 1) % data_length_;
+ // If this is the last one.
+ if (data_start_ == data_end_) {
+ if (kReadDebug) {
+ printf("queue: %p reading from c2: %d\n", this, pos);
+ }
+ msg = data_[pos];
+ break;
+ }
+ // This message is not going to be in the queue any more.
+ DecrementMessageReferenceCount(data_[pos]);
+ }
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from d2: %d\n", this, data_start_);
+ }
+ msg = data_[data_start_];
+ // TODO(brians): Doesn't this need to increment the ref count?
+ data_start_ = (data_start_ + 1) % data_length_;
+ }
+ }
+ ReadCommonEnd(&read_data);
+ if (kReadDebug) {
+ printf("queue: %p read returning %p\n", this, msg);
+ }
+ return msg;
+}
+const void *RawQueue::ReadMessageIndex(int options, int *index) {
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
+ this, options, index, *index);
+ }
+ void *msg = NULL;
+
+ MutexLocker locker(&data_lock_);
+
+ ReadData read_data;
+ if (!ReadCommonStart(options, index, &read_data)) {
+ if (kReadDebug) {
+ printf("queue: %p common returned false\n", this);
+ }
+ return NULL;
+ }
+
+ // TODO(parker): Handle integer wrap on the index.
+
+ // How many unread messages we have.
+ const int offset = messages_ - *index;
+ // Where we're going to start reading.
+ int my_start = data_end_ - offset;
+ if (my_start < 0) { // If we want to read off the end of the buffer.
+ // Unwrap it.
+ my_start += data_length_;
+ }
+ if (offset >= data_length_) { // If we're behind the available messages.
+ // Catch index up to the last available message.
+ *index += data_start_ - my_start;
+ // And that's the one we're going to read.
+ my_start = data_start_;
+ }
+ if (options & kPeek) {
+ msg = ReadPeek(options, my_start);
+ } else {
+ if (options & kFromEnd) {
+ if (kReadDebug) {
+ printf("queue: %p start of c1\n", this);
+ }
+ int pos = data_end_ - 1;
+ if (pos < 0) { // If it wrapped.
+ pos = data_length_ - 1; // Unwrap it.
+ }
+ if (kReadDebug) {
+ printf("queue: %p reading from c1: %d\n", this, pos);
+ }
+ msg = data_[pos];
+ *index = messages_;
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from d1: %d\n", this, my_start);
+ }
+ msg = data_[my_start];
+ ++(*index);
+ }
+ MessageHeader *const header = MessageHeader::Get(msg);
+ ++header->ref_count;
+ if (kRefDebug) {
+ printf("ref_inc_count: %p\n", msg);
+ }
+ }
+ ReadCommonEnd(&read_data);
+ return msg;
+}
+
+void *RawQueue::GetMessage() {
+ MutexLocker locker(&pool_lock_);
+ MessageHeader *header;
+ if (pool_length_ - messages_used_ > 0) {
+ header = pool_[messages_used_];
+ } else {
+ if (pool_length_ >= mem_length_) {
+ LOG(FATAL, "overused pool of queue %p\n", this);
+ }
+ header = pool_[pool_length_] =
+ static_cast<MessageHeader *>(shm_malloc(msg_length_));
+ ++pool_length_;
+ }
+ void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
+ header->ref_count = 1;
+ if (kRefDebug) {
+ printf("%p ref alloc: %p\n", this, msg);
+ }
+ header->index = messages_used_;
+ ++messages_used_;
+ return msg;
+}
+
+} // namespace aos
diff --git a/aos/atom_code/ipc_lib/queue.h b/aos/atom_code/ipc_lib/queue.h
index 4c279e1..5158558 100644
--- a/aos/atom_code/ipc_lib/queue.h
+++ b/aos/atom_code/ipc_lib/queue.h
@@ -1,134 +1,171 @@
-#ifndef AOS_IPC_LIB_QUEUE_H_
-#define AOS_IPC_LIB_QUEUE_H_
+#ifndef AOS_ATOM_CODE_IPC_LIB_QUEUE_H_
+#define AOS_ATOM_CODE_IPC_LIB_QUEUE_H_
-#include "shared_mem.h"
-#include "aos_sync.h"
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+#include "aos/common/mutex.h"
+#include "aos/common/condition.h"
// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
// code to make checking for leaks work better
// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
// describes how
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-// Queues are the primary way to use shared memory. Basic use consists of
-// initializing an aos_type_sig and then calling aos_fetch_queue on it.
-// This aos_queue* can then be used to get a message and write it or to read a
-// message.
-// Queues (as the name suggests) are a FIFO stack of messages. Each combination
-// of name and aos_type_sig will result in a different queue, which means that
-// if you only recompile some code that uses differently sized messages, it will
-// simply use a different queue than the old code.
-//
// Any pointers returned from these functions can be safely passed to other
// processes because they are all shared memory pointers.
// IMPORTANT: Any message pointer must be passed back in some way
-// (aos_queue_free_msg and aos_queue_write_msg are common ones) or the
+// (FreeMessage and WriteMessage are common ones) or the
// application will leak shared memory.
-// NOTE: Taking a message from read_msg and then passing it to write_msg might
-// work, but it is not guaranteed to.
+// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
+// might work, but it is not guaranteed to.
-typedef struct aos_type_sig_t {
- size_t length; // sizeof(message)
- int hash; // can differentiate multiple otherwise identical queues
- int queue_length; // how many messages the queue can hold
-} aos_type_sig;
+namespace aos {
-// Structures that are opaque to users (defined in queue_internal.h).
-typedef struct aos_queue_list_t aos_queue_list;
-typedef struct aos_queue_t aos_queue;
+// Queues are the primary way to use shared memory. Basic use consists of
+// calling Queue::Fetch and then reading and/or writing messages.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and type signature will result in a different queue, which means
+// that if you only recompile some code that uses differently sized messages,
+// it will simply use a different queue than the old code.
+class RawQueue {
+ public:
+ // Retrieves (and creates if necessary) a queue. Each combination of name and
+ // signature refers to a completely independent queue.
+ // length is how large each message will be
+ // hash can differentiate multiple otherwise identical queues
+ // queue_length is how many messages the queue will be able to hold
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
+ int queue_length);
+ // Same as above, except sets up the returned queue so that it will put
+ // messages on *recycle when they are freed (after they have been released by
+ // all other readers/writers and are not in the queue).
+ // recycle_queue_length determines how many freed messages will be kept.
+ // Other code can retrieve the 2 queues separately (the recycle queue will
+ // have the same length and hash as the main one). However, any frees made
+ // using a queue with only (name,length,hash,queue_length) before the
+ // recycle queue has been associated with it will not go on to the recycle
+ // queue.
+ // NOTE: calling this function with the same (name,length,hash,queue_length)
+ // but multiple recycle_queue_lengths will result in each freed message being
+ // put onto an undefined one of the recycle queues.
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_queue_length,
+ RawQueue **recycle);
-// Retrieves (and creates if necessary) a queue. Each combination of name and
-// signature refers to a completely independent queue.
-aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig);
-// Same as above, except sets up the returned queue so that it will put messages
-// on *recycle (retrieved with recycle_sig) when they are freed (after they have
-// been released by all other readers/writers and are not in the queue).
-// The length of recycle_sig determines how many freed messages will be kept.
-// Other code can retrieve recycle_sig and sig separately. However, any frees
-// made using aos_fetch_queue with only sig before the recycle queue has been
-// associated with it will not go on to the recyce queue.
-// Will return NULL for both queues if sig->length != recycle_sig->length or
-// sig->hash == recycle_sig->hash (just to be safe).
-// NOTE: calling this function with the same sig but multiple recycle_sig s
-// will result in each freed message being put onto an undefined recycle_sig.
-aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
- const aos_type_sig *recycle_sig, aos_queue **recycle);
+ // Constants for passing to options arguments.
+ // The non-conflicting ones can be combined with bitwise-or.
-// Constants for passing to opts arguments.
-// #defines so that c code can use queues
-// The non-conflicting ones can be combined with bitwise-or.
-// TODO(brians) prefix these?
-//
-// Causes the returned message to be left in the queue.
-// For reading only.
-#define PEEK 0x0001
-// Reads the last message in the queue instead of just the next one.
-// NOTE: This removes all of the messages until the last one from the queue
-// (which means that nobody else will read them). However, PEEK means to not
-// remove any from the queue, including the ones that are skipped.
-// For reading only.
-#define FROM_END 0x0002
-// Causes reads to return NULL and writes to fail instead of waiting.
-// For reading and writing.
-#define NON_BLOCK 0x0004
-// Causes things to block.
-// IMPORTANT: #defined to 0 so that it is the default. This has to stay.
-// For reading and writing.
-#define BLOCK 0x0000
-// Causes writes to overwrite the oldest message in the queue instead of
-// blocking.
-// For writing only.
-#define OVERRIDE 0x0008
+ // Causes the returned message to be left in the queue.
+ // For reading only.
+ static const int kPeek = 0x0001;
+ // Reads the last message in the queue instead of just the next one.
+ // NOTE: This removes all of the messages until the last one from the queue
+ // (which means that nobody else will read them). However, PEEK means to not
+ // remove any from the queue, including the ones that are skipped.
+ // For reading only.
+ static const int kFromEnd = 0x0002;
+ // Causes reads to return NULL and writes to fail instead of waiting.
+ // For reading and writing.
+ static const int kNonBlock = 0x0004;
+ // Causes things to block.
+ // IMPORTANT: Has a value of 0 so that it is the default. This has to stay.
+ // For reading and writing.
+ static const int kBlock = 0x0000;
+ // Causes writes to overwrite the oldest message in the queue instead of
+ // blocking.
+ // For writing only.
+ static const int kOverride = 0x0008;
-// Frees a message. Does nothing if msg is NULL.
-int aos_queue_free_msg(aos_queue *queue, const void *msg);
+ // Writes a message into the queue.
+ // This function takes ownership of msg.
+ // NOTE: msg must point to a valid message from this queue
+ // Returns truen on success.
+ bool WriteMessage(void *msg, int options);
-// Writes a message into the queue.
-// NOTE: msg must point to at least the length of this queue's worth of valid
-// data to write
-// IMPORTANT: if this returns -1, then the caller must do something with msg
-// (like free it)
-int aos_queue_write_msg(aos_queue *queue, void *msg, int opts);
-// Exactly the same as aos_queue_write_msg, except it automatically frees the
-// message if writing fails.
-static inline int aos_queue_write_msg_free(aos_queue *queue, void *msg, int opts) {
- const int ret = aos_queue_write_msg(queue, msg, opts);
- if (ret != 0) {
- aos_queue_free_msg(queue, msg);
+ // Reads a message out of the queue.
+ // The return value will have at least the length of this queue's worth of
+ // valid data where it's pointing to.
+ // The return value is const because other people might be viewing the same
+ // messsage. Do not cast the const away!
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ const void *ReadMessage(int options);
+ // Exactly the same as aos_queue_read_msg, except it will never return the
+ // same message twice with the same index argument. However, it may not
+ // return some messages that pass through the queue.
+ // *index should start as 0. index does not have to be in shared memory, but
+ // it can be
+ const void *ReadMessageIndex(int options, int *index);
+
+ // Retrieves ("allocates") a message that can then be written to the queue.
+ // NOTE: the return value will be completely uninitialized
+ // The return value will have at least the length of this queue's worth of
+ // valid memory where it's pointing to.
+ // Returns NULL for error.
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ void *GetMessage();
+
+ // It is ok to call this method with a NULL msg.
+ void FreeMessage(const void *msg) {
+ if (msg != NULL) DecrementMessageReferenceCount(msg);
}
- return ret;
-}
-// Reads a message out of the queue.
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// The return value is const because other people might be viewing the same
-// messsage. Do not cast the const away!
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-const void *aos_queue_read_msg(aos_queue *buf, int opts);
-// Exactly the same as aos_queue_read_msg, except it will never return the same
-// message twice with the same index argument. However, it may not return some
-// messages that pass through the queue.
-// *index should start as 0. index does not have to be in shared memory, but it
-// can be
-const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index);
+ private:
+ struct MessageHeader;
+ struct ReadData;
-// Retrieves ("allocates") a message that can then be written to the queue.
-// NOTE: the return value will be completely uninitialized
-// The return value will have at least the length of this queue's worth of valid
-// data where it's pointing to.
-// Returns NULL for error.
-// IMPORTANT: The return value (if not NULL) must eventually be passed to
-// aos_queue_free_msg.
-void *aos_queue_get_msg(aos_queue *queue);
+ bool is_readable() { return data_end_ != data_start_; }
+ bool is_writable() { return ((data_end_ + 1) % data_length_) != data_start_; }
-#ifdef __cplusplus
-}
-#endif
+ // These next 4 allow finding the right one.
+ const char *name_;
+ size_t length_;
+ int hash_;
+ int queue_length_;
+ // The next one in the linked list of queues.
+ RawQueue *next_;
-#endif
+ RawQueue *recycle_;
+ Mutex data_lock_; // protects operations on data_ etc
+ // Always gets broadcasted to because different readers might have different
+ // ideas of what "readable" means (ie ones using separated indices).
+ Condition readable_;
+ Condition writable_;
+ int data_length_; // max length into data + 1
+ int data_start_; // is an index into data
+ int data_end_; // is an index into data
+ int messages_; // that have passed through
+ void **data_; // array of messages (with headers)
+
+ Mutex pool_lock_;
+ size_t msg_length_; // sizeof(each message) including the header
+ int mem_length_; // the max number of messages that will ever be allocated
+ int messages_used_;
+ int pool_length_; // the number of allocated messages
+ MessageHeader **pool_; // array of pointers to messages
+
+ // Actually frees the given message.
+ void DoFreeMessage(const void *msg);
+ // Calls DoFreeMessage if appropriate.
+ void DecrementMessageReferenceCount(const void *msg);
+
+ // Should be called with data_lock_ locked.
+ // *read_data will be initialized.
+ // Returns with a readable message in data_ or false.
+ bool ReadCommonStart(int options, int *index, ReadData *read_data);
+ // Deals with setting/unsetting readable_ and writable_.
+ // Should be called after data_lock_ has been unlocked.
+ // read_data should be the same thing that was passed in to ReadCommonStart.
+ void ReadCommonEnd(ReadData *read_data);
+ // Handles reading with kPeek.
+ void *ReadPeek(int options, int start);
+
+ // Gets called by Fetch when necessary (with placement new).
+ RawQueue(const char *name, size_t length, int hash, int queue_length);
+};
+
+} // namespace aos
+
+#endif // AOS_ATOM_CODE_IPC_LIB_QUEUE_H_
diff --git a/aos/atom_code/ipc_lib/queue_internal.h b/aos/atom_code/ipc_lib/queue_internal.h
deleted file mode 100644
index e6b23ef..0000000
--- a/aos/atom_code/ipc_lib/queue_internal.h
+++ /dev/null
@@ -1,62 +0,0 @@
-#ifndef AOS_IPC_LIB_QUEUE_INTERNAL_H_
-#define AOS_IPC_LIB_QUEUE_INTERNAL_H_
-
-#include "shared_mem.h"
-#include "aos_sync.h"
-
-// Should only be used by queue.c. Contains definitions of the structures
-// it uses.
-
-// The number of extra messages the pool associated with each queue will be able
-// to hold (for readers who are slow about freeing them).
-#define EXTRA_MESSAGES 20
-
-typedef struct aos_msg_header_t {
- int ref_count;
- int index; // in the pool
-} aos_msg_header;
-static inline void header_swap(aos_msg_header *l, aos_msg_header *r) {
- aos_msg_header tmp;
- tmp.ref_count = l->ref_count;
- tmp.index = l->index;
- l->ref_count = r->ref_count;
- l->index = r->index;
- r->ref_count = tmp.ref_count;
- r->index = tmp.index;
-}
-
-struct aos_queue_list_t {
- char *name;
- aos_type_sig sig;
- aos_queue *queue;
- aos_queue_list *next;
-};
-
-typedef struct aos_ring_buf_t {
- mutex buff_lock; // the main lock protecting operations on this buffer
- // conditions
- mutex writable;
- mutex readable;
- int length; // max index into data + 1
- int start; // is an index into data
- int end; // is an index into data
- int msgs; // that have passed through
- void **data; // array of messages (w/ headers)
-} aos_ring_buf;
-
-typedef struct aos_msg_pool_t {
- mutex pool_lock;
- size_t msg_length;
- int mem_length; // the number of messages
- int used; // number of messages
- int length; // number of allocated messages
- void **pool; // array of messages
-} aos_msg_pool;
-
-struct aos_queue_t {
- aos_msg_pool pool;
- aos_ring_buf buf;
- aos_queue *recycle;
-};
-
-#endif
diff --git a/aos/atom_code/ipc_lib/queue_test.cpp b/aos/atom_code/ipc_lib/queue_test.cc
similarity index 61%
rename from aos/atom_code/ipc_lib/queue_test.cpp
rename to aos/atom_code/ipc_lib/queue_test.cc
index e9ac8d5..9e5f3ae 100644
--- a/aos/atom_code/ipc_lib/queue_test.cpp
+++ b/aos/atom_code/ipc_lib/queue_test.cc
@@ -1,3 +1,5 @@
+#include "aos/common/queue.h"
+
#include <unistd.h>
#include <sys/mman.h>
#include <inttypes.h>
@@ -8,25 +10,25 @@
#include "gtest/gtest.h"
-#include "aos/atom_code/ipc_lib/sharedmem_test_setup.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
#include "aos/common/type_traits.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/time.h"
+#include "aos/common/logging/logging.h"
-using testing::AssertionResult;
-using testing::AssertionSuccess;
-using testing::AssertionFailure;
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+using ::aos::common::testing::GlobalCoreInstance;
-// IMPORTANT: Some of the functions that do test predicate functions allocate
-// shared memory (and don't free it).
-class QueueTest : public SharedMemTestSetup {
+namespace aos {
+namespace testing {
+
+class QueueTest : public ::testing::Test {
protected:
static const size_t kFailureSize = 400;
static char *fatal_failure;
private:
- // This gets registered right after the fork, so it will get run before any
- // exit handlers that had already been registered.
- static void ExitExitHandler() {
- _exit(EXIT_SUCCESS);
- }
enum class ResultType : uint8_t {
NotCalled,
Called,
@@ -44,32 +46,37 @@
return std::string("unknown(" + static_cast<uint8_t>(result)) + ")";
}
}
- static_assert(aos::shm_ok<ResultType>::value, "this will get put in shared memory");
- // Gets allocated in shared memory so it has to be volatile.
- template<typename T> struct FunctionToCall {
- ResultType result;
+ static_assert(aos::shm_ok<ResultType>::value,
+ "this will get put in shared memory");
+ template<typename T>
+ struct FunctionToCall {
+ FunctionToCall() : result(ResultType::NotCalled) {
+ started.Lock();
+ }
+
+ volatile ResultType result;
bool expected;
void (*function)(T*, char*);
T *arg;
volatile char failure[kFailureSize];
+ Mutex started;
};
- template<typename T> static void Hangs_(volatile FunctionToCall<T> *const to_call) {
+ template<typename T>
+ static void Hangs_(FunctionToCall<T> *const to_call) {
+ to_call->started.Unlock();
to_call->result = ResultType::Called;
to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
to_call->result = ResultType::Returned;
}
- static const long kMsToNs = 1000000;
- // The number of ms after which a function is considered to have hung.
- // Must be < 1000.
- static const long kHangTime = 10;
- static const unsigned int kForkSleep = 0; // how many seconds to sleep after forking
+ // How long until a function is considered to have hung.
+ static constexpr time::Time kHangTime = time::Time::InSeconds(0.035);
+ // How long to sleep after forking (for debugging).
+ static constexpr time::Time kForkSleep = time::Time::InSeconds(0);
// Represents a process that has been forked off. The destructor kills the
// process and wait(2)s for it.
class ForkedProcess {
- const pid_t pid_;
- mutex *const lock_;
public:
ForkedProcess(pid_t pid, mutex *lock) : pid_(pid), lock_(lock) {};
~ForkedProcess() {
@@ -84,24 +91,25 @@
}
const pid_t ret = wait(NULL);
if (ret == -1) {
- fprintf(stderr, "wait(NULL) failed."
- " child %jd might still be alive\n",
- static_cast<intmax_t>(pid_));
+ LOG(WARNING, "wait(NULL) failed."
+ " child %jd might still be alive\n",
+ static_cast<intmax_t>(pid_));
} else if (ret == 0) {
- fprintf(stderr, "child %jd wasn't waitable. it might still be alive\n",
- static_cast<intmax_t>(pid_));
+ LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
+ static_cast<intmax_t>(pid_));
} else if (ret != pid_) {
- fprintf(stderr, "child %d is dead, but child %jd might still be alive\n",
- ret, static_cast<intmax_t>(pid_));
+ LOG(WARNING, "child %d is now confirmed dead"
+ ", but child %jd might still be alive\n",
+ ret, static_cast<intmax_t>(pid_));
}
}
enum class JoinResult {
Finished, Hung, Error
};
- JoinResult Join(long timeout = kHangTime) {
- timespec ts{kForkSleep, timeout * kMsToNs};
- switch (mutex_lock_timeout(lock_, &ts)) {
+ JoinResult Join(time::Time timeout = kHangTime) {
+ timespec lock_timeout = (kForkSleep + timeout).ToTimespec();
+ switch (mutex_lock_timeout(lock_, &lock_timeout)) {
case 2:
return JoinResult::Hung;
case 0:
@@ -110,9 +118,13 @@
return JoinResult::Error;
}
}
+
+ private:
+ const pid_t pid_;
+ mutex *const lock_;
} __attribute__((unused));
- // Member variables for HangsFork and HangsCheck.
+ // State for HangsFork and HangsCheck.
typedef uint8_t ChildID;
static void ReapExitHandler() {
for (auto it = children_.begin(); it != children_.end(); ++it) {
@@ -120,11 +132,11 @@
}
}
static std::map<ChildID, ForkedProcess *> children_;
- std::map<ChildID, volatile FunctionToCall<void> *> to_calls_;
+ std::map<ChildID, FunctionToCall<void> *> to_calls_;
- void SetUp() {
- SharedMemTestSetup::SetUp();
- fatal_failure = reinterpret_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+ void SetUp() override {
+ ::testing::Test::SetUp();
+ fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
static bool registered = false;
if (!registered) {
atexit(ReapExitHandler);
@@ -133,30 +145,30 @@
}
protected:
- // Function gets called with arg in a forked process.
+ // function gets called with arg in a forked process.
// Leaks shared memory.
- // the attribute is in the middle to make gcc happy
template<typename T> __attribute__((warn_unused_result))
- std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
- mutex *lock = reinterpret_cast<mutex *>(shm_malloc_aligned(
+ std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
+ mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
sizeof(*lock), sizeof(int)));
- *lock = 1;
+ assert(mutex_lock(lock) == 0);
const pid_t pid = fork();
switch (pid) {
- case 0: // child
- if (kForkSleep != 0) {
- printf("pid %jd sleeping for %u\n", static_cast<intmax_t>(getpid()),
- kForkSleep);
- sleep(kForkSleep);
+ case 0: // child
+ if (kForkSleep != time::Time(0, 0)) {
+ LOG(INFO, "pid %jd sleeping for %ds%dns\n",
+ static_cast<intmax_t>(getpid()),
+ kForkSleep.sec(), kForkSleep.nsec());
+ time::SleepFor(kForkSleep);
}
- atexit(ExitExitHandler);
+ ::aos::common::testing::PreventExit();
function(arg);
mutex_unlock(lock);
exit(EXIT_SUCCESS);
- case -1: // parent failure
- printf("fork() failed with %d: %s\n", errno, strerror(errno));
+ case -1: // parent failure
+ LOG(ERROR, "fork() failed with %d: %s\n", errno, strerror(errno));
return std::unique_ptr<ForkedProcess>();
- default: // parent
+ default: // parent
return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, lock));
}
}
@@ -166,8 +178,8 @@
// NOTE: There are other reasons for it to return a failure than the function
// doing the wrong thing.
// Leaks shared memory.
- template<typename T> AssertionResult Hangs(void (*function)(T*, char*), T *arg,
- bool expected) {
+ template<typename T>
+ AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
if (!fork_result) {
return fork_result;
@@ -178,21 +190,24 @@
// Use HangsCheck to get the result.
// Returns whether the fork succeeded or not, NOT whether or not the hang
// check succeeded.
- template<typename T> AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
- bool expected, ChildID id) {
+ template<typename T>
+ AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
+ bool expected, ChildID id) {
static_assert(aos::shm_ok<FunctionToCall<T>>::value,
"this is going into shared memory");
- volatile FunctionToCall<T> *const to_call = reinterpret_cast<FunctionToCall<T> *>(
- shm_malloc_aligned(sizeof(*to_call), sizeof(int)));
- to_call->result = ResultType::NotCalled;
+ FunctionToCall<T> *const to_call =
+ static_cast<FunctionToCall<T> *>(
+ shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
+ new (to_call) FunctionToCall<T>();
to_call->function = function;
to_call->arg = arg;
to_call->expected = expected;
to_call->failure[0] = '\0';
- static_cast<volatile char *>(fatal_failure)[0] = '\0';
+ static_cast<char *>(fatal_failure)[0] = '\0';
children_[id] = ForkExecute(Hangs_, to_call).release();
if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
- to_calls_[id] = reinterpret_cast<volatile FunctionToCall<void> *>(to_call);
+ to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
+ to_call->started.Lock();
return AssertionSuccess();
}
// Checks whether or not a function hung like it was supposed to.
@@ -214,8 +229,12 @@
<< ResultTypeString(to_calls_[id]->result));
} else {
if (to_calls_[id]->result == ResultType::Called) {
- return to_calls_[id]->expected ? AssertionSuccess() : AssertionFailure();
+ return to_calls_[id]->expected ? AssertionSuccess() :
+ AssertionFailure();
+ } else if (result == ForkedProcess::JoinResult::Error) {
+ return AssertionFailure() << "error joining child";
} else {
+ abort();
return AssertionFailure() << "something weird happened";
}
}
@@ -234,28 +253,30 @@
} while (false)
struct TestMessage {
- int16_t data; // don't really want to test empty messages
+ // Some contents because we don't really want to test empty messages.
+ int16_t data;
};
struct MessageArgs {
- aos_queue *const queue;
+ RawQueue *const queue;
int flags;
- int16_t data; // -1 means NULL expected
+ int16_t data; // -1 means NULL expected
};
static void WriteTestMessage(MessageArgs *args, char *failure) {
- TestMessage *msg = reinterpret_cast<TestMessage *>(aos_queue_get_msg(args->queue));
+ TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
if (msg == NULL) {
- snprintf(fatal_failure, kFailureSize, "couldn't get_msg from %p", args->queue);
+ snprintf(fatal_failure, kFailureSize,
+ "couldn't get_msg from %p", args->queue);
return;
}
msg->data = args->data;
- if (aos_queue_write_msg_free(args->queue, msg, args->flags) == -1) {
+ if (!args->queue->WriteMessage(msg, args->flags)) {
snprintf(failure, kFailureSize, "write_msg_free(%p, %p, %d) failed",
args->queue, msg, args->flags);
}
}
static void ReadTestMessage(MessageArgs *args, char *failure) {
- const TestMessage *msg = reinterpret_cast<const TestMessage *>(
- aos_queue_read_msg(args->queue, args->flags));
+ const TestMessage *msg = static_cast<const TestMessage *>(
+ args->queue->ReadMessage(args->flags));
if (msg == NULL) {
if (args->data != -1) {
snprintf(failure, kFailureSize,
@@ -268,89 +289,88 @@
"expected data of %" PRId16 " but got %" PRId16 " instead",
args->data, msg->data);
}
- aos_queue_free_msg(args->queue, msg);
+ args->queue->FreeMessage(msg);
}
}
+
+ private:
+ GlobalCoreInstance my_core;
};
char *QueueTest::fatal_failure;
std::map<QueueTest::ChildID, QueueTest::ForkedProcess *> QueueTest::children_;
+constexpr time::Time QueueTest::kHangTime;
+constexpr time::Time QueueTest::kForkSleep;
TEST_F(QueueTest, Reading) {
- static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
- aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
MessageArgs args{queue, 0, -1};
- EXPECT_EQ(BLOCK, 0);
- EXPECT_EQ(BLOCK | FROM_END, FROM_END);
-
- args.flags = NON_BLOCK;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = NON_BLOCK | PEEK;
+ args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = PEEK;
+ args.flags = RawQueue::kPeek;
EXPECT_HANGS(ReadTestMessage, &args);
args.data = 254;
- args.flags = BLOCK;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = PEEK;
+ args.flags = RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = PEEK;
+ args.flags = RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = PEEK | NON_BLOCK;
+ args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
args.data = -1;
EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = NON_BLOCK;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
args.data = 971;
EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
}
TEST_F(QueueTest, Writing) {
- static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
- aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
MessageArgs args{queue, 0, 973};
- args.flags = BLOCK;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = BLOCK;
+ args.flags = RawQueue::kBlock;
EXPECT_HANGS(WriteTestMessage, &args);
- args.flags = NON_BLOCK;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = NON_BLOCK;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = PEEK;
+ args.flags = RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
args.data = 971;
- args.flags = OVERRIDE;
+ args.flags = RawQueue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = OVERRIDE;
+ args.flags = RawQueue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = NON_BLOCK;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = OVERRIDE;
+ args.flags = RawQueue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
}
TEST_F(QueueTest, MultiRead) {
- static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
- aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
MessageArgs args{queue, 0, 1323};
- args.flags = BLOCK;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = BLOCK;
+ args.flags = RawQueue::kBlock;
ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
@@ -360,45 +380,45 @@
TEST_F(QueueTest, Recycle) {
// TODO(brians) basic test of recycle queue
// include all of the ways a message can get into the recycle queue
- static const aos_type_sig signature{sizeof(TestMessage), 1, 2},
- recycle_signature{sizeof(TestMessage), 2, 2};
- aos_queue *recycle_queue = reinterpret_cast<aos_queue *>(23);
- aos_queue *const queue = aos_fetch_queue_recycle("Queue", &signature,
- &recycle_signature, &recycle_queue);
- ASSERT_NE(reinterpret_cast<aos_queue *>(23), recycle_queue);
+ RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
+ 1, 2, 2, 2, &recycle_queue);
+ ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
- args.flags = BLOCK;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
EXPECT_HANGS(ReadTestMessage, &recycle);
args.data = 254;
EXPECT_RETURNS(WriteTestMessage, &args);
EXPECT_HANGS(ReadTestMessage, &recycle);
args.data = 971;
- args.flags = OVERRIDE;
+ args.flags = RawQueue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
- recycle.flags = BLOCK;
+ recycle.flags = RawQueue::kBlock;
EXPECT_RETURNS(ReadTestMessage, &recycle);
EXPECT_HANGS(ReadTestMessage, &recycle);
- TestMessage *msg = static_cast<TestMessage *>(aos_queue_get_msg(queue));
+ TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
ASSERT_TRUE(msg != NULL);
msg->data = 341;
- aos_queue_free_msg(queue, msg);
+ queue->FreeMessage(msg);
recycle.data = 341;
EXPECT_RETURNS(ReadTestMessage, &recycle);
EXPECT_HANGS(ReadTestMessage, &recycle);
args.data = 254;
- args.flags = PEEK;
+ args.flags = RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
- recycle.flags = BLOCK;
+ recycle.flags = RawQueue::kBlock;
EXPECT_HANGS(ReadTestMessage, &recycle);
- args.flags = BLOCK;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
recycle.data = 254;
EXPECT_RETURNS(ReadTestMessage, &recycle);
}
+} // namespace testing
+} // namespace aos
diff --git a/aos/atom_code/ipc_lib/shared_mem.c b/aos/atom_code/ipc_lib/shared_mem.c
index f631b78..e2c2c9e 100644
--- a/aos/atom_code/ipc_lib/shared_mem.c
+++ b/aos/atom_code/ipc_lib/shared_mem.c
@@ -1,4 +1,4 @@
-#include "shared_mem.h"
+#include "aos/atom_code/ipc_lib/shared_mem.h"
#include <stdio.h>
#include <string.h>
@@ -8,22 +8,31 @@
#include <sys/types.h>
#include <errno.h>
+#include "aos/atom_code/ipc_lib/core_lib.h"
+
// the path for the shared memory segment. see shm_open(3) for restrictions
#define AOS_SHM_NAME "/aos_shared_mem"
// Size of the shared mem segment.
// set to the maximum number that worked
#define SIZEOFSHMSEG (4096 * 27813)
+void init_shared_mem_core(aos_shm_core *shm_core) {
+ clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
+ shm_core->msg_alloc_lock = 0;
+ shm_core->queues.queue_list = NULL;
+ shm_core->queues.alloc_lock = 0;
+}
+
ptrdiff_t aos_core_get_mem_usage(void) {
return global_core->size -
((ptrdiff_t)global_core->mem_struct->msg_alloc -
(ptrdiff_t)global_core->mem_struct);
}
-struct aos_core global_core_data;
struct aos_core *global_core = NULL;
int aos_core_create_shared_mem(enum aos_core_create to_create) {
+ static struct aos_core global_core_data;
global_core = &global_core_data;
int shm;
before:
@@ -46,8 +55,8 @@
}
if (shm == -1) {
fprintf(stderr, "shared_mem:"
- " shm_open(" AOS_SHM_NAME ", O_RDWR [| O_CREAT | O_EXCL, 0|0666)"
- " failed with %d: %s\n", errno, strerror(errno));
+ " shm_open(" AOS_SHM_NAME ", O_RDWR [| O_CREAT | O_EXCL, 0|0666)"
+ " failed with %d: %s\n", errno, strerror(errno));
return -1;
}
if (global_core->owner) {
@@ -62,8 +71,8 @@
MAP_SHARED | MAP_FIXED | MAP_LOCKED | MAP_POPULATE, shm, 0);
if (shm_address == MAP_FAILED) {
fprintf(stderr, "shared_mem: mmap(%p, 0x%zx, stuff, stuff, %d, 0) failed"
- " with %d: %s\n",
- (void *)SHM_START, SIZEOFSHMSEG, shm, errno, strerror(errno));
+ " with %d: %s\n",
+ (void *)SHM_START, SIZEOFSHMSEG, shm, errno, strerror(errno));
return -1;
}
printf("shared_mem: shm at: %p\n", shm_address);
@@ -88,9 +97,9 @@
init_shared_mem_core(global_core->mem_struct);
}
if (global_core->owner) {
- condition_set(&global_core->mem_struct->creation_condition);
+ futex_set(&global_core->mem_struct->creation_condition);
} else {
- if (condition_wait(&global_core->mem_struct->creation_condition) != 0) {
+ if (futex_wait(&global_core->mem_struct->creation_condition) != 0) {
fprintf(stderr, "waiting on creation_condition failed\n");
return -1;
}
@@ -116,4 +125,3 @@
}
return 0;
}
-
diff --git a/aos/atom_code/ipc_lib/shared_mem.h b/aos/atom_code/ipc_lib/shared_mem.h
index b1a2608..c0d21ac 100644
--- a/aos/atom_code/ipc_lib/shared_mem.h
+++ b/aos/atom_code/ipc_lib/shared_mem.h
@@ -1,19 +1,40 @@
#ifndef _SHARED_MEM_H_
#define _SHARED_MEM_H_
+#include <stddef.h>
+#include <unistd.h>
+#include <time.h>
+
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
#ifdef __cplusplus
extern "C" {
#endif
-#include "core_lib.h"
-#include <stddef.h>
-#include <unistd.h>
+extern struct aos_core *global_core;
// Where the shared memory segment starts in each process's address space.
// Has to be the same in all of them so that stuff in shared memory
// can have regular pointers to other stuff in shared memory.
#define SHM_START 0x20000000
+typedef struct aos_queue_global_t {
+ mutex alloc_lock;
+ void *queue_list; // an aos::Queue* declared in C code
+} aos_queue_global;
+
+typedef struct aos_shm_core_t {
+ // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
+ // this shared memory area
+ struct timespec identifier;
+ // gets 0-initialized at the start (as part of shared memory) and
+ // the owner sets as soon as it finishes setting stuff up
+ mutex creation_condition;
+ mutex msg_alloc_lock;
+ void *msg_alloc;
+ aos_queue_global queues;
+} aos_shm_core;
+
enum aos_core_create {
create,
reference
@@ -26,6 +47,8 @@
aos_shm_core *mem_struct;
};
+void init_shared_mem_core(aos_shm_core *shm_core);
+
ptrdiff_t aos_core_get_mem_usage(void);
// Takes the specified memory address and uses it as the shared memory.
diff --git a/aos/atom_code/ipc_lib/sharedmem_test_setup.h b/aos/atom_code/ipc_lib/sharedmem_test_setup.h
deleted file mode 100644
index e461c43..0000000
--- a/aos/atom_code/ipc_lib/sharedmem_test_setup.h
+++ /dev/null
@@ -1,147 +0,0 @@
-// defines a fixture (SharedMemTestSetup) that sets up shared memory
-
-extern "C" {
-#include "shared_mem.h"
- extern struct aos_core *global_core;
-}
-
-#include <signal.h>
-
-#include <gtest/gtest.h>
-#include <sys/types.h>
-
-// TODO(brians) read logs from here
-class SharedMemTestSetup : public testing::Test{
- protected:
- pid_t core;
- int start[2];
- int memcheck[2];
- static void signal_handler(int){
- if(aos_core_free_shared_mem()){
- exit(- 1);
- }
- exit(0);
- }
- static int get_mem_usage(){
- return global_core->size - ((uint8_t *)global_core->mem_struct->msg_alloc - (uint8_t *)SHM_START);
- }
- bool checking_mem;
-
- virtual void BeforeLocalShmSetup() {}
- virtual void SetUp(){
- ASSERT_EQ(0, pipe(start)) << "couldn't create start pipes";
- ASSERT_EQ(0, pipe(memcheck)) << "couldn't create memcheck pipes";
- checking_mem = false;
- if((core = fork()) == 0){
- close(start[0]);
- close(memcheck[1]);
- struct sigaction act;
- act.sa_handler = signal_handler;
- sigaction(SIGINT, &act, NULL);
- if(aos_core_create_shared_mem(create)){
- exit(-1);
- }
- write_pipe(start[1], "a", 1);
- int usage = 0;
- while(1){
- char buf1;
- read_pipe(memcheck[0], &buf1, 1);
- if(usage == 0)
- usage = get_mem_usage();
- if(usage == get_mem_usage())
- buf1 = 1;
- else
- buf1 = 0;
- write_pipe(start[1], &buf1, 1);
- }
- }
- close(start[1]);
- close(memcheck[0]);
- ASSERT_NE(-1, core) << "fork failed";
- char buf;
- read_pipe(start[0], &buf, 1);
-
- BeforeLocalShmSetup();
-
- ASSERT_EQ(0, aos_core_create_shared_mem(reference)) << "couldn't create shared mem reference";
- }
- virtual void TearDown(){
- if(checking_mem){
- write_pipe(memcheck[1], "a", 1);
- char buf;
- read_pipe(start[0], &buf, 1);
- EXPECT_EQ(1, buf) << "memory got leaked";
- }
- EXPECT_EQ(0, aos_core_free_shared_mem()) << "issues freeing shared mem";
- if(core > 0){
- kill(core, SIGINT);
- siginfo_t status;
- ASSERT_EQ(0, waitid(P_PID, core, &status, WEXITED)) << "waiting for the core to finish failed";
- EXPECT_EQ(CLD_EXITED, status.si_code) << "core died";
- EXPECT_EQ(0, status.si_status) << "core exited with an error";
- }
- }
- // if any more shared memory gets allocated after calling this and not freed by the end, it's an error
- void AllSharedMemAllocated(){
- checking_mem = true;
- write_pipe(memcheck[1], "a", 1);
- char buf;
- read_pipe(start[0], &buf, 1);
- }
- private:
- // Wrapper functions for pipes because they should never have errors.
- void read_pipe(int fd, void *buf, size_t count) {
- if (read(fd, buf, count) < 0) abort();
- }
- void write_pipe(int fd, const void *buf, size_t count) {
- if (write(fd, buf, count) < 0) abort();
- }
-};
-class ExecVeTestSetup : public SharedMemTestSetup {
- protected:
- std::vector<std::string> files;
- std::vector<pid_t> pids;
- virtual void BeforeLocalShmSetup(){
- std::vector<std::string>::iterator it;
- pid_t child;
- for(it = files.begin(); it < files.end(); ++it){
- if((child = fork()) == 0){
- char *null = NULL;
- execve(it->c_str(), &null, &null);
- ADD_FAILURE() << "execve failed";
- perror("execve");
- exit(0);
- }
- if(child > 0)
- pids.push_back(child);
- else
- ADD_FAILURE() << "fork failed return=" << child;
- }
- usleep(150000);
- }
- virtual void TearDown(){
- std::vector<pid_t>::iterator it;
- siginfo_t status;
- for(it = pids.begin(); it < pids.end(); ++it){
- printf("attempting to SIGINT(%d) %d\n", SIGINT, *it);
- if(*it > 0){
- kill(*it, SIGINT);
- ASSERT_EQ(0, waitid(P_PID, *it, &status, WEXITED)) << "waiting for the AsyncAction(pid=" << *it << ") to finish failed";
- EXPECT_EQ(CLD_EXITED, status.si_code) << "child died (killed by signal is " << (int)CLD_KILLED << ")";
- EXPECT_EQ(0, status.si_status) << "child exited with an error";
- }else{
- FAIL();
- }
- }
-
- SharedMemTestSetup::TearDown();
- }
- // call this _before_ ExecVeTestSetup::SetUp()
- void AddProcess(const std::string file){
- files.push_back(file);
- }
- void PercolatePause(){
- usleep(50000);
- }
-};
-
diff --git a/aos/atom_code/logging/atom_logging.cc b/aos/atom_code/logging/atom_logging.cc
index 08a65cb..854da17 100644
--- a/aos/atom_code/logging/atom_logging.cc
+++ b/aos/atom_code/logging/atom_logging.cc
@@ -53,8 +53,7 @@
return process_name + '.' + thread_name;
}
-static const aos_type_sig message_sig = {sizeof(LogMessage), 1323, 1500};
-static aos_queue *queue;
+RawQueue *queue;
} // namespace
namespace internal {
@@ -83,7 +82,7 @@
class AtomQueueLogImplementation : public LogImplementation {
virtual void DoLog(log_level level, const char *format, va_list ap) {
- LogMessage *message = static_cast<LogMessage *>(aos_queue_get_msg(queue));
+ LogMessage *message = static_cast<LogMessage *>(queue->GetMessage());
if (message == NULL) {
LOG(FATAL, "queue get message failed\n");
}
@@ -99,7 +98,7 @@
void Register() {
Init();
- queue = aos_fetch_queue("LoggingQueue", &message_sig);
+ queue = RawQueue::Fetch("LoggingQueue", sizeof(LogMessage), 1323, 1500);
if (queue == NULL) {
Die("logging: couldn't fetch queue\n");
}
@@ -108,33 +107,32 @@
}
const LogMessage *ReadNext(int flags, int *index) {
- return static_cast<const LogMessage *>(
- aos_queue_read_msg_index(queue, flags, index));
+ return static_cast<const LogMessage *>(queue->ReadMessageIndex(flags, index));
}
const LogMessage *ReadNext() {
- return ReadNext(BLOCK);
+ return ReadNext(RawQueue::kBlock);
}
const LogMessage *ReadNext(int flags) {
const LogMessage *r = NULL;
do {
- r = static_cast<const LogMessage *>(aos_queue_read_msg(queue, flags));
+ r = static_cast<const LogMessage *>(queue->ReadMessage(flags));
// not blocking means return a NULL if that's what it gets
- } while ((flags & BLOCK) && r == NULL);
+ } while ((flags & RawQueue::kBlock) && r == NULL);
return r;
}
LogMessage *Get() {
- return static_cast<LogMessage *>(aos_queue_get_msg(queue));
+ return static_cast<LogMessage *>(queue->GetMessage());
}
void Free(const LogMessage *msg) {
- aos_queue_free_msg(queue, msg);
+ queue->FreeMessage(msg);
}
void Write(LogMessage *msg) {
- if (aos_queue_write_msg_free(queue, msg, OVERRIDE) < 0) {
+ if (!queue->WriteMessage(msg, RawQueue::kOverride)) {
LOG(FATAL, "writing failed");
}
}
diff --git a/aos/atom_code/queue-tmpl.h b/aos/atom_code/queue-tmpl.h
index bb043e1..0cc9392 100644
--- a/aos/atom_code/queue-tmpl.h
+++ b/aos/atom_code/queue-tmpl.h
@@ -5,7 +5,7 @@
assert(msg_ != NULL);
msg_->SetTimeToNow();
assert(queue_ != NULL);
- bool return_value = aos_queue_write_msg_free(queue_, msg_, OVERRIDE) == 0;
+ bool return_value = queue_->WriteMessage(msg_, RawQueue::kOverride);
msg_ = NULL;
return return_value;
}
@@ -15,7 +15,7 @@
assert(msg_ != NULL);
msg_->SetTimeToNow();
assert(queue_ != NULL);
- bool return_value = aos_queue_write_msg_free(queue_, msg_, BLOCK) == 0;
+ bool return_value = queue_->WriteMessage(msg_, RawQueue::kBlock);
msg_ = NULL;
return return_value;
}
@@ -23,7 +23,7 @@
template <class T>
void ScopedMessagePtr<T>::reset(T *msg) {
if (queue_ != NULL && msg_ != NULL) {
- aos_queue_free_msg(queue_, msg_);
+ queue_->FreeMessage(msg);
}
msg_ = msg;
}
@@ -81,13 +81,12 @@
assert(msg_ != NULL);
assert(queue_ != NULL);
msg_->SetTimeToNow();
- T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
+ T *shm_msg = static_cast<T *>(queue_->GetMessage());
if (shm_msg == NULL) {
return false;
}
*shm_msg = *msg_;
- bool return_value =
- aos_queue_write_msg_free(queue_, shm_msg, OVERRIDE) == 0;
+ bool return_value = queue_->WriteMessage(shm_msg, RawQueue::kOverride);
reset();
return return_value;
}
@@ -100,12 +99,12 @@
assert(msg_ != NULL);
assert(queue_ != NULL);
msg_->SetTimeToNow();
- T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
+ T *shm_msg = static_cast<T *>(queue_->GetMessage());
if (shm_msg == NULL) {
return false;
}
*shm_msg = *msg_;
- bool return_value = aos_queue_write_msg_free(queue_, shm_msg, BLOCK) == 0;
+ bool return_value = queue_->WriteMessage(shm_msg, RawQueue::kBlock);
reset();
return return_value;
}
@@ -145,7 +144,7 @@
friend class aos::SafeMessageBuilder<T>;
// Only Queue should be able to build a message pointer.
- SafeScopedMessagePtr(aos_queue *queue)
+ SafeScopedMessagePtr(RawQueue *queue)
: queue_(queue), msg_(new T()) {}
// Sets the pointer to msg, freeing the old value if it was there.
@@ -159,10 +158,10 @@
}
// Sets the queue that owns this message.
- void set_queue(aos_queue *queue) { queue_ = queue; }
+ void set_queue(RawQueue *queue) { queue_ = queue; }
// The queue that the message is a part of.
- aos_queue *queue_;
+ RawQueue *queue_;
// The message or NULL.
T *msg_;
};
@@ -170,11 +169,9 @@
template <class T>
void Queue<T>::Init() {
if (queue_ == NULL) {
- // Signature of the message.
- aos_type_sig kQueueSignature{sizeof(T), static_cast<int>(T::kHash),
- T::kQueueLength};
-
- queue_ = aos_fetch_queue(queue_name_, &kQueueSignature);
+ queue_ = RawQueue::Fetch(queue_name_, sizeof(T),
+ static_cast<int>(T::kHash),
+ T::kQueueLength);
queue_msg_.set_queue(queue_);
}
}
@@ -191,11 +188,11 @@
template <class T>
bool Queue<T>::FetchNext() {
Init();
- // TODO(aschuh): Use aos_queue_read_msg_index so that multiple readers
+ // TODO(aschuh): Use RawQueue::ReadMessageIndex so that multiple readers
// reading don't randomly get only part of the messages.
// Document here the tradoffs that are part of each method.
- const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
- NON_BLOCK));
+ const T *msg = static_cast<const T *>(
+ queue_->ReadMessage(RawQueue::kNonBlock));
// Only update the internal pointer if we got a new message.
if (msg != NULL) {
queue_msg_.reset(msg);
@@ -206,7 +203,7 @@
template <class T>
bool Queue<T>::FetchNextBlocking() {
Init();
- const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_, BLOCK));
+ const T *msg = static_cast<const T *>(queue_->ReadMessage(RawQueue::kBlock));
queue_msg_.reset(msg);
assert (msg != NULL);
return true;
@@ -215,16 +212,16 @@
template <class T>
bool Queue<T>::FetchLatest() {
Init();
- const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
- FROM_END | NON_BLOCK | PEEK));
+ const T *msg = static_cast<const T *>(queue_->ReadMessage(
+ RawQueue::kFromEnd | RawQueue::kNonBlock | RawQueue::kPeek));
// Only update the internal pointer if we got a new message.
if (msg != NULL && msg != queue_msg_.get()) {
queue_msg_.reset(msg);
return true;
}
- // The message has to get freed if we didn't use it (and aos_queue_free_msg is
- // ok to call on NULL).
- aos_queue_free_msg(queue_, msg);
+ // The message has to get freed if we didn't use it (and RawQueue::FreeMessage
+ // is ok to call on NULL).
+ queue_->FreeMessage(msg);
return false;
}
@@ -244,7 +241,7 @@
template <class T>
T *Queue<T>::MakeRawMessage() {
- T *ret = static_cast<T *>(aos_queue_get_msg(queue_));
+ T *ret = static_cast<T *>(queue_->GetMessage());
assert(ret != NULL);
return ret;
}
diff --git a/aos/build/aos.gyp b/aos/build/aos.gyp
index 874b6a7..c91ab43 100644
--- a/aos/build/aos.gyp
+++ b/aos/build/aos.gyp
@@ -17,10 +17,10 @@
'<(AOS)/atom_code/logging/atom_logging.cc',
],
'dependencies': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
'export_dependent_settings': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
]
}],
],
diff --git a/aos/build/aos_all.gyp b/aos/build/aos_all.gyp
index 4bbd4fa..ba30a35 100644
--- a/aos/build/aos_all.gyp
+++ b/aos/build/aos_all.gyp
@@ -13,7 +13,8 @@
'../atom_code/camera/camera.gyp:CameraHTTPStreamer',
'../atom_code/camera/camera.gyp:CameraReader',
'../atom_code/core/core.gyp:*',
- '../atom_code/ipc_lib/ipc_lib.gyp:*',
+ '../atom_code/ipc_lib/ipc_lib.gyp:raw_queue_test',
+ '../atom_code/ipc_lib/ipc_lib.gyp:ipc_stress_test',
'../atom_code/starter/starter.gyp:starter_exe',
'../atom_code/starter/starter.gyp:netconsole',
'../common/common.gyp:queue_test',
@@ -43,6 +44,7 @@
'<(AOS)/common/common.gyp:type_traits_test',
'<(AOS)/common/common.gyp:time_test',
'<(AOS)/common/common.gyp:mutex_test',
+ '<(AOS)/common/common.gyp:condition_test',
'<(AOS)/common/common.gyp:once_test',
'<(AOS)/common/logging/logging.gyp:logging_impl_test',
],
diff --git a/aos/build/queues/output/message_dec.rb b/aos/build/queues/output/message_dec.rb
index a643392..fcc3c60 100644
--- a/aos/build/queues/output/message_dec.rb
+++ b/aos/build/queues/output/message_dec.rb
@@ -161,13 +161,13 @@
cons_ifdef_statement = CPP::PreprocessorIf.new(cons, unsafe_cons)
cons_ifdef_statement.name = "!defined(__VXWORKS__) && !defined(__TEST_VXWORKS__)"
template.add_member(:private,cons_ifdef_statement)
- cons.args << "aos_queue *queue"
+ cons.args << "RawQueue *queue"
cons.args << "#{t} *msg"
unsafe_cons.args << "#{t} *msg"
cons.add_cons("msg_ptr_","queue","msg")
unsafe_cons.add_cons("msg_ptr_","msg")
cons = safetemplate.add_member(:private,CPP::Constructor.new(safetemplate))
- cons.args << "aos_queue *queue"
+ cons.args << "RawQueue *queue"
cons.add_cons("msg_ptr_","queue")
safetemplate.public
template.public
diff --git a/aos/common/common.gyp b/aos/common/common.gyp
index 442e4ac..f3b4e69 100644
--- a/aos/common/common.gyp
+++ b/aos/common/common.gyp
@@ -21,11 +21,14 @@
'queue_testutils.cc',
],
'dependencies': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
'<(AOS)/build/aos.gyp:logging',
'once',
'<(EXTERNALS):gtest',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:shared_mem',
],
+ 'export_dependent_settings': [
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:shared_mem',
+ ],
},
{
'target_name': 'time',
@@ -54,10 +57,10 @@
},
{
'dependencies': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
'export_dependent_settings': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
}]
],
@@ -216,6 +219,24 @@
],
},
{
+ 'target_name': 'condition',
+ 'type': 'static_library',
+ 'sources': [
+ '<(AOS)/atom_code/ipc_lib/condition.cc',
+ ],
+ 'dependencies': [
+ 'mutex',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:aos_sync',
+ # TODO(aschuh): Fix this dependency loop by
+ # providing a logging interface.
+ # '<(AOS)/build/aos.gyp:logging',
+ ],
+ 'export_dependent_settings': [
+ 'mutex',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:aos_sync',
+ ],
+ },
+ {
'target_name': 'mutex',
'type': 'static_library',
'conditions': [
@@ -228,10 +249,10 @@
'<(AOS)/atom_code/ipc_lib/mutex.cpp',
],
'dependencies': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:aos_sync',
],
'export_dependent_settings': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:aos_sync',
],
}],
],
@@ -254,6 +275,23 @@
],
},
{
+ 'target_name': 'condition_test',
+ 'type': 'executable',
+ 'sources': [
+ 'condition_test.cc',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):gtest',
+ 'condition',
+ '<(AOS)/common/util/util.gyp:thread',
+ 'time',
+ 'mutex',
+ '<(AOS)/build/aos.gyp:logging',
+ 'queue_testutils',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:core_lib',
+ ],
+ },
+ {
'target_name': 'die_test',
'type': 'executable',
'sources': [
diff --git a/aos/common/condition.h b/aos/common/condition.h
new file mode 100644
index 0000000..c407070
--- /dev/null
+++ b/aos/common/condition.h
@@ -0,0 +1,79 @@
+#ifndef AOS_COMMON_CONDITION_H_
+#define AOS_COMMON_CONDITION_H_
+
+#include "aos/common/mutex.h"
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
+namespace aos {
+
+// A condition variable (IPC mechanism where 1 process/task can notify all
+// others that are waiting for something to happen) without the race condition
+// where a notification is sent after some process has checked if the thing has
+// happened but before it has started listening for notifications.
+//
+// This implementation will print debugging information and abort the process
+// if anything weird happens.
+//
+// A simple example of the use of a condition variable (adapted from
+// pthread_cond(3)):
+//
+// int x, y;
+// Mutex mutex;
+// Condition condition(&mutex);
+//
+// // Waiting until x is greater than y:
+// {
+// MutexLocker locker(&mutex);
+// while (!(x > y)) condition.Wait();
+// // do whatever
+// }
+//
+// // Modifying x and/or y:
+// {
+// MutexLocker locker(&mutex);
+// // modify x and y
+// if (x > y) condition.Broadcast();
+// }
+//
+// Notice the loop around the Wait(). This is very important because some other
+// process can lock the mutex and modify the shared state (possibly undoing
+// whatever the Wait()er was waiting for) in between the Broadcast()er unlocking
+// the mutex and the Wait()er(s) relocking it.
+//
+// Multiple condition variables may be associated with the same mutex but
+// exactly 1 mutex must be associated with each condition variable.
+class Condition {
+ public:
+ // m is the mutex that will be associated with this condition variable. This
+ // object will hold on to a reference to it but does not take ownership.
+ explicit Condition(Mutex *m);
+
+ // Waits for the condition variable to be signalled, atomically unlocking the
+ // mutex associated with this condition variable at the same time. The mutex
+ // associated with this condition variable must be locked when this is called
+ // and will be locked when this method returns.
+ // NOTE: The relocking of the mutex is not performed atomically with waking
+ // up.
+ void Wait();
+
+ // Signals at most 1 other process currently Wait()ing on this condition
+ // variable. Calling this does not require the mutex associated with this
+ // condition variable to be locked.
+ // One of the processes with the highest priority level will be woken.
+ void Signal();
+ // Wakes all processes that are currently Wait()ing on this condition
+ // variable. Calling this does not require the mutex associated with this
+ // condition variable to be locked.
+ void Broadcast();
+
+ // Retrieves the mutex associated with this condition variable.
+ Mutex *m() { return m_; }
+
+ private:
+ mutex impl_;
+ Mutex *m_;
+};
+
+} // namespace aos
+
+#endif // AOS_COMMON_CONDITION_H_
diff --git a/aos/common/condition_test.cc b/aos/common/condition_test.cc
new file mode 100644
index 0000000..fca2820
--- /dev/null
+++ b/aos/common/condition_test.cc
@@ -0,0 +1,308 @@
+#include "aos/common/condition.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "gtest/gtest.h"
+
+#include "aos/common/time.h"
+#include "aos/common/mutex.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+#include "aos/common/logging/logging.h"
+#include "aos/common/macros.h"
+
+using ::aos::time::Time;
+using ::aos::common::testing::GlobalCoreInstance;
+
+namespace aos {
+namespace testing {
+
+class ConditionTest : public ::testing::Test {
+ public:
+ struct Shared {
+ Shared() : condition(&mutex) {}
+
+ Mutex mutex;
+ Condition condition;
+ };
+ static_assert(shm_ok<Shared>::value,
+ "it's going to get shared between forked processes");
+
+ ConditionTest() : shared_(static_cast<Shared *>(shm_malloc(sizeof(Shared)))) {
+ new (shared_) Shared();
+ }
+
+ GlobalCoreInstance my_core;
+
+ Shared *const shared_;
+
+ void Settle() {
+ time::SleepFor(::Time::InSeconds(0.008));
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ConditionTest);
+};
+
+class ConditionTestProcess {
+ public:
+ enum class Action {
+ kWaitLockStart, // lock, delay, wait, unlock
+ kWait, // delay, lock, wait, unlock
+ kWaitNoUnlock, // delay, lock, wait
+ };
+
+ // This amount gets added to any passed in delay to make the test repeatable.
+ static constexpr ::Time kMinimumDelay = ::Time::InSeconds(0.015);
+ static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.09);
+
+ // delay is how long to wait before doing action to condition.
+ // timeout is how long to wait after delay before deciding that it's hung.
+ ConditionTestProcess(const ::Time &delay, Action action, Condition *condition,
+ const ::Time &timeout = kDefaultTimeout)
+ : delay_(kMinimumDelay + delay), action_(action), condition_(condition),
+ timeout_(delay_ + timeout), child_(-1),
+ shared_(static_cast<Shared *>(shm_malloc(sizeof(Shared)))) {
+ new (shared_) Shared();
+ }
+ ~ConditionTestProcess() {
+ assert(child_ == -1);
+ }
+
+ void Start() {
+ ASSERT_FALSE(shared_->started);
+
+ child_ = fork();
+ if (child_ == 0) { // in child
+ ::aos::common::testing::PreventExit();
+ Run();
+ exit(EXIT_SUCCESS);
+ } else { // in parent
+ assert(child_ != -1);
+
+ shared_->ready.Lock();
+
+ shared_->started = true;
+ }
+ }
+
+ bool IsFinished() {
+ return shared_->finished;
+ }
+
+ ::testing::AssertionResult Hung() {
+ if (!shared_->started) {
+ ADD_FAILURE();
+ return ::testing::AssertionFailure() << "not started yet";
+ }
+ if (shared_->finished) {
+ Join();
+ return ::testing::AssertionFailure() << "already returned";
+ }
+ if (shared_->delayed) {
+ if (shared_->start_time > ::Time::Now() + timeout_) {
+ Kill();
+ return ::testing::AssertionSuccess() << "already been too long";
+ }
+ } else {
+ shared_->done_delaying.Lock();
+ }
+ time::SleepFor(::Time::InSeconds(0.01));
+ if (!shared_->finished) time::SleepUntil(shared_->start_time + timeout_);
+ if (shared_->finished) {
+ Join();
+ return ::testing::AssertionFailure() << "completed within timeout";
+ } else {
+ Kill();
+ return ::testing::AssertionSuccess() << "took too long";
+ }
+ }
+ ::testing::AssertionResult Test() {
+ Start();
+ return Hung();
+ }
+
+ private:
+ struct Shared {
+ Shared()
+ : started(false), delayed(false), start_time(0, 0), finished(false) {
+ done_delaying.Lock();
+ ready.Lock();
+ }
+
+ volatile bool started;
+ volatile bool delayed;
+ Mutex done_delaying;
+ ::Time start_time;
+ volatile bool finished;
+ Mutex ready;
+ };
+ static_assert(shm_ok<Shared>::value,
+ "it's going to get shared between forked processes");
+
+ void Run() {
+ if (action_ == Action::kWaitLockStart) {
+ shared_->ready.Unlock();
+ condition_->m()->Lock();
+ }
+ time::SleepFor(delay_);
+ shared_->start_time = ::Time::Now();
+ shared_->delayed = true;
+ shared_->done_delaying.Unlock();
+ if (action_ != Action::kWaitLockStart) {
+ shared_->ready.Unlock();
+ condition_->m()->Lock();
+ }
+ condition_->Wait();
+ shared_->finished = true;
+ if (action_ != Action::kWaitNoUnlock) {
+ condition_->m()->Unlock();
+ }
+ }
+
+ void Join() {
+ assert(child_ != -1);
+ int status;
+ do {
+ assert(waitpid(child_, &status, 0) == child_);
+ } while (!(WIFEXITED(status) || WIFSIGNALED(status)));
+ child_ = -1;
+ }
+ void Kill() {
+ assert(child_ != -1);
+ assert(kill(child_, SIGTERM) == 0);
+ Join();
+ }
+
+ const ::Time delay_;
+ const Action action_;
+ Condition *const condition_;
+ const ::Time timeout_;
+
+ pid_t child_;
+
+ Shared *const shared_;
+
+ DISALLOW_COPY_AND_ASSIGN(ConditionTestProcess);
+};
+constexpr ::Time ConditionTestProcess::kMinimumDelay;
+constexpr ::Time ConditionTestProcess::kDefaultTimeout;
+
+// Makes sure that the testing framework and everything work for a really simple
+// Wait() and then Signal().
+TEST_F(ConditionTest, Basic) {
+ ConditionTestProcess child(::Time(0, 0),
+ ConditionTestProcess::Action::kWait,
+ &shared_->condition);
+ child.Start();
+ Settle();
+ EXPECT_FALSE(child.IsFinished());
+ shared_->condition.Signal();
+ EXPECT_FALSE(child.Hung());
+}
+
+// Makes sure that the worker child locks before it tries to Wait() etc.
+TEST_F(ConditionTest, Locking) {
+ ConditionTestProcess child(::Time(0, 0),
+ ConditionTestProcess::Action::kWait,
+ &shared_->condition);
+ shared_->mutex.Lock();
+ child.Start();
+ Settle();
+ // This Signal() shouldn't do anything because the child should still be
+ // waiting to lock the mutex.
+ shared_->condition.Signal();
+ Settle();
+ shared_->mutex.Unlock();
+ EXPECT_TRUE(child.Hung());
+}
+
+// Tests that the work child only catches a Signal() after the mutex gets
+// unlocked.
+TEST_F(ConditionTest, LockFirst) {
+ ConditionTestProcess child(::Time(0, 0),
+ ConditionTestProcess::Action::kWait,
+ &shared_->condition);
+ shared_->mutex.Lock();
+ child.Start();
+ Settle();
+ shared_->condition.Signal();
+ Settle();
+ EXPECT_FALSE(child.IsFinished());
+ shared_->mutex.Unlock();
+ Settle();
+ EXPECT_FALSE(child.IsFinished());
+ shared_->condition.Signal();
+ EXPECT_FALSE(child.Hung());
+}
+
+// Tests that the mutex gets relocked after Wait() returns.
+TEST_F(ConditionTest, Relocking) {
+ ConditionTestProcess child(::Time(0, 0),
+ ConditionTestProcess::Action::kWaitNoUnlock,
+ &shared_->condition);
+ child.Start();
+ Settle();
+ shared_->condition.Signal();
+ EXPECT_FALSE(child.Hung());
+ EXPECT_FALSE(shared_->mutex.TryLock());
+}
+
+// Tests that Signal() stops exactly 1 Wait()er.
+TEST_F(ConditionTest, SignalOne) {
+ ConditionTestProcess child1(::Time(0, 0),
+ ConditionTestProcess::Action::kWait,
+ &shared_->condition);
+ ConditionTestProcess child2(::Time(0, 0),
+ ConditionTestProcess::Action::kWait,
+ &shared_->condition);
+ ConditionTestProcess child3(::Time(0, 0),
+ ConditionTestProcess::Action::kWait,
+ &shared_->condition);
+ auto number_finished = [&]() { return (child1.IsFinished() ? 1 : 0) +
+ (child2.IsFinished() ? 1 : 0) + (child3.IsFinished() ? 1 : 0); };
+ child1.Start();
+ child2.Start();
+ child3.Start();
+ Settle();
+ EXPECT_EQ(0, number_finished());
+ shared_->condition.Signal();
+ Settle();
+ EXPECT_EQ(1, number_finished());
+ shared_->condition.Signal();
+ Settle();
+ EXPECT_EQ(2, number_finished());
+ shared_->condition.Signal();
+ Settle();
+ EXPECT_EQ(3, number_finished());
+ EXPECT_FALSE(child1.Hung());
+ EXPECT_FALSE(child2.Hung());
+ EXPECT_FALSE(child3.Hung());
+}
+
+// Tests that Brodcast() wakes multiple Wait()ers.
+TEST_F(ConditionTest, Broadcast) {
+ ConditionTestProcess child1(::Time(0, 0),
+ ConditionTestProcess::Action::kWait,
+ &shared_->condition);
+ ConditionTestProcess child2(::Time(0, 0),
+ ConditionTestProcess::Action::kWait,
+ &shared_->condition);
+ ConditionTestProcess child3(::Time(0, 0),
+ ConditionTestProcess::Action::kWait,
+ &shared_->condition);
+ child1.Start();
+ child2.Start();
+ child3.Start();
+ Settle();
+ shared_->condition.Broadcast();
+ EXPECT_FALSE(child1.Hung());
+ EXPECT_FALSE(child2.Hung());
+ EXPECT_FALSE(child3.Hung());
+}
+
+} // namespace testing
+} // namespace aos
diff --git a/aos/common/macros.h b/aos/common/macros.h
index 88fc52e..2018b36 100644
--- a/aos/common/macros.h
+++ b/aos/common/macros.h
@@ -6,8 +6,8 @@
// A macro to disallow the copy constructor and operator= functions
// This should be used in the private: declarations for a class
#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
- TypeName(const TypeName&); \
- void operator=(const TypeName&)
+ TypeName(const TypeName&) = delete; \
+ void operator=(const TypeName&) = delete
// A macro to wrap arguments to macros that contain commas.
// Useful for DISALLOW_COPY_AND_ASSIGNing templated types with multiple template
// arguments.
diff --git a/aos/common/messages/QueueHolder.h b/aos/common/messages/QueueHolder.h
index 23ddc95..8d8ba51 100644
--- a/aos/common/messages/QueueHolder.h
+++ b/aos/common/messages/QueueHolder.h
@@ -71,7 +71,7 @@
#define aos_check_rv __attribute__((warn_unused_result))
template<typename T> class QueueHolderNoBuilder {
#ifndef __VXWORKS__
- aos_queue *const queue_;
+ Queue *const queue_;
static_assert(shm_ok<T>::value, "T must be able to"
" go through shared memory and memcpy");
T t_;
@@ -80,7 +80,7 @@
#endif
public:
#ifndef __VXWORKS__
- explicit QueueHolderNoBuilder(aos_queue *queue) : queue_(queue) {}
+ explicit QueueHolderNoBuilder(Queue *queue) : queue_(queue) {}
#else
QueueHolderNoBuilder() {}
#endif
@@ -158,7 +158,7 @@
QueueBuilder<T> builder_;
public:
#ifndef __VXWORKS__
- explicit QueueHolder(aos_queue *queue) : QueueHolderNoBuilder<T>(queue),
+ explicit QueueHolder(Queue *queue) : QueueHolderNoBuilder<T>(queue),
builder_(*this) {}
#else
QueueHolder() : builder_(*this) {}
@@ -171,7 +171,6 @@
}
};
-} // namespace aos
+} // namespace aos
#endif
-
diff --git a/aos/common/messages/messages.gyp b/aos/common/messages/messages.gyp
index 3b8d389..b76dbba 100644
--- a/aos/common/messages/messages.gyp
+++ b/aos/common/messages/messages.gyp
@@ -19,10 +19,10 @@
'conditions': [
['OS!="crio"', {
'dependencies': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
'export_dependent_settings': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
}],
],
diff --git a/aos/common/mutex.h b/aos/common/mutex.h
index 035889b..b6b277c 100644
--- a/aos/common/mutex.h
+++ b/aos/common/mutex.h
@@ -11,6 +11,8 @@
namespace aos {
+class Condition;
+
// An abstraction of a mutex that has implementations both for the
// atom and for the cRIO.
// If there are multiple tasks or processes contending for the mutex,
@@ -42,6 +44,9 @@
typedef mutex ImplementationType;
#endif
ImplementationType impl_;
+
+ friend class Condition; // for access to impl_
+
#ifdef __VXWORKS__
DISALLOW_COPY_AND_ASSIGN(Mutex);
#endif
@@ -64,6 +69,20 @@
Mutex *mutex_;
DISALLOW_COPY_AND_ASSIGN(MutexLocker);
};
+// The inverse of MutexLocker.
+class MutexUnlocker {
+ public:
+ explicit MutexUnlocker(Mutex *mutex) : mutex_(mutex) {
+ mutex_->Unlock();
+ }
+ ~MutexUnlocker() {
+ mutex_->Lock();
+ }
+
+ private:
+ Mutex *mutex_;
+ DISALLOW_COPY_AND_ASSIGN(MutexUnlocker);
+};
} // namespace aos
diff --git a/aos/common/mutex_test.cpp b/aos/common/mutex_test.cpp
index a5d5e86..652cd9e 100644
--- a/aos/common/mutex_test.cpp
+++ b/aos/common/mutex_test.cpp
@@ -1,7 +1,16 @@
#include "aos/common/mutex.h"
+#include <sched.h>
+#include <math.h>
+#include <pthread.h>
+#ifdef __VXWORKS__
+#include <taskLib.h>
+#endif
+
#include "gtest/gtest.h"
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
namespace aos {
namespace testing {
@@ -51,5 +60,17 @@
EXPECT_TRUE(test_mutex.TryLock());
}
+TEST_F(MutexTest, MutexUnlocker) {
+ test_mutex.Lock();
+ {
+ aos::MutexUnlocker unlocker(&test_mutex);
+ // If this fails, then something weird is going on and the next line might
+ // hang, so fail immediately.
+ ASSERT_TRUE(test_mutex.TryLock());
+ test_mutex.Unlock();
+ }
+ EXPECT_FALSE(test_mutex.TryLock());
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/common/queue.h b/aos/common/queue.h
index 612429a..68cb338 100644
--- a/aos/common/queue.h
+++ b/aos/common/queue.h
@@ -138,7 +138,7 @@
#ifndef USE_UNSAFE
// Only Queue should be able to build a queue.
- ScopedMessagePtr(aos_queue *queue, T *msg)
+ ScopedMessagePtr(RawQueue *queue, T *msg)
: queue_(queue), msg_(msg) {}
#else
ScopedMessagePtr(T *msg)
@@ -152,10 +152,10 @@
#ifndef USE_UNSAFE
// Sets the queue that owns this message.
- void set_queue(aos_queue *queue) { queue_ = queue; }
+ void set_queue(RawQueue *queue) { queue_ = queue; }
// The queue that the message is a part of.
- aos_queue *queue_;
+ RawQueue *queue_;
#endif // USE_UNSAFE
// The message or NULL.
T *msg_;
@@ -281,7 +281,7 @@
#else
T *MakeRawMessage();
// Pointer to the queue that this object fetches from.
- aos_queue *queue_;
+ RawQueue *queue_;
#endif
// Scoped pointer holding the latest message or NULL.
ScopedMessagePtr<const T> queue_msg_;
diff --git a/aos/common/queue_test.cc b/aos/common/queue_test.cc
index 65a1c25..32d1d23 100644
--- a/aos/common/queue_test.cc
+++ b/aos/common/queue_test.cc
@@ -48,7 +48,7 @@
usleep(50000);
my_test_queue.MakeWithBuilder().test_bool(true).test_int(0x971).Send();
t.Join();
- EXPECT_EQ(true, t.threaded_test_queue.IsNewerThanMS(20));
+ EXPECT_LE(t.threaded_test_queue.Age(), time::Time::InMS(55));
}
// Tests that we can send a message with the message pointer and get it back.
diff --git a/aos/common/queue_testutils.cc b/aos/common/queue_testutils.cc
index 1d47e62..8d195c5 100644
--- a/aos/common/queue_testutils.cc
+++ b/aos/common/queue_testutils.cc
@@ -1,6 +1,10 @@
#include "aos/common/queue_testutils.h"
#include <string.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <assert.h>
#include "gtest/gtest.h"
@@ -110,15 +114,23 @@
Once<void> enable_test_logging_once(DoEnableTestLogging);
+const size_t kCoreSize = 0x100000;
+
+void TerminateExitHandler() {
+ _exit(EXIT_SUCCESS);
+}
+
} // namespace
GlobalCoreInstance::GlobalCoreInstance() {
- const size_t kCoreSize = 0x100000;
global_core = &global_core_data_;
- global_core->owner = 1;
- void *memory = malloc(kCoreSize);
- assert(memory != NULL);
- memset(memory, 0, kCoreSize);
+ global_core->owner = true;
+ // Use mmap(2) manually instead of through malloc(3) so that we can pass
+ // MAP_SHARED which allows forked processes to communicate using the
+ // "shared" memory.
+ void *memory = mmap(NULL, kCoreSize, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+ assert(memory != MAP_FAILED);
assert(aos_core_use_address_as_shared_mem(memory, kCoreSize) == 0);
@@ -126,7 +138,7 @@
}
GlobalCoreInstance::~GlobalCoreInstance() {
- free(global_core->mem_struct);
+ assert(munmap(global_core->mem_struct, kCoreSize) == 0);
global_core = NULL;
}
@@ -134,6 +146,10 @@
enable_test_logging_once.Get();
}
+void PreventExit() {
+ assert(atexit(TerminateExitHandler) == 0);
+}
+
} // namespace testing
} // namespace common
} // namespace aos
diff --git a/aos/common/queue_testutils.h b/aos/common/queue_testutils.h
index aabdd2d..2d26262 100644
--- a/aos/common/queue_testutils.h
+++ b/aos/common/queue_testutils.h
@@ -1,11 +1,20 @@
-#include "aos/common/queue.h"
+#ifndef AOS_COMMON_QUEUE_TESTUTILS_H_
+#define AOS_COMMON_QUEUE_TESTUTILS_H_
+
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+
+// This file has some general helper functions for dealing with testing things
+// that use shared memory etc.
namespace aos {
namespace common {
namespace testing {
+// Manages creating and cleaning up "shared memory" which works within this
+// process and any that it fork(2)s.
class GlobalCoreInstance {
public:
+ // Calls EnableTestLogging().
GlobalCoreInstance();
~GlobalCoreInstance();
@@ -20,6 +29,14 @@
// initialized), however it can be called more than that.
void EnableTestLogging();
+// Registers an exit handler (using atexit(3)) which will call _exit(2).
+// Intended to be called in a freshly fork(2)ed process where it will run before
+// any other exit handlers that were already registered and prevent them from
+// being run.
+void PreventExit();
+
} // namespace testing
} // namespace common
} // namespace aos
+
+#endif // AOS_COMMON_QUEUE_TESTUTILS_H_