copied everything over from 2012 and removed all of the actual robot code except the drivetrain stuff
git-svn-id: https://robotics.mvla.net/svn/frc971/2013/trunk/src@4078 f308d9b7-e957-4cde-b6ac-9a88185e7312
diff --git a/aos/atom_code/ipc_lib/aos_sync.c b/aos/atom_code/ipc_lib/aos_sync.c
new file mode 100644
index 0000000..a9a4779
--- /dev/null
+++ b/aos/atom_code/ipc_lib/aos_sync.c
@@ -0,0 +1,135 @@
+#include <stdio.h>
+#include <linux/futex.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <errno.h>
+#include "aos_sync.h"
+#include "cmpxchg.h"
+#include <stdint.h>
+#include <limits.h>
+#include <string.h>
+
+// this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
+// should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
+// <http://locklessinc.com/articles/futex_cheat_sheet/> and <http://locklessinc.com/articles/mutex_cv_futex/> are useful
+// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009 (fairly recent compared to everything else...)
+// can't use PRIVATE futex operations because they use the pid (or something) as part of the hash
+//
+// Values for a mutex:
+// 0 = unlocked
+// 1 = locked, not contended
+// 2 = locked, probably contended
+// Values for a condition:
+// 0 = unset
+// 1 = set
+
+static inline int sys_futex(mutex *addr1, int op, int val1, const struct timespec *timeout,
+ void *addr2, int val3) {
+ return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+
+static inline int mutex_get(mutex *m, uint8_t signals_fail, const
+ struct timespec *timeout) {
+ int c;
+ c = cmpxchg(m, 0, 1);
+ if (!c) return 0;
+ /* The lock is now contended */
+ if (c == 1) c = xchg(m, 2);
+ while (c) {
+ /* Wait in the kernel */
+ //printf("sync here %d\n", __LINE__);
+ if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
+ if (signals_fail && errno == EINTR) {
+ return 1;
+ }
+ if (timeout != NULL && errno == ETIMEDOUT) {
+ return 2;
+ }
+ }
+ //printf("sync here %d\n", __LINE__);
+ c = xchg(m, 2);
+ }
+ return 0;
+}
+int mutex_lock(mutex *m) {
+ return mutex_get(m, 1, NULL);
+}
+int mutex_lock_timeout(mutex *m, const struct timespec *timeout) {
+ return mutex_get(m, 1, timeout);
+}
+int mutex_grab(mutex *m) {
+ return mutex_get(m, 0, NULL);
+}
+
+int mutex_unlock(mutex *m) {
+ /* Unlock, and if not contended then exit. */
+ //printf("mutex_unlock(%p) => %d \n",m,*m);
+ switch (xchg(m, 0)) {
+ case 0:
+ fprintf(stderr, "sync: multiple unlock of %p. aborting\n", m);
+ printf("see stderr\n");
+ abort();
+ case 1:
+ //printf("mutex_unlock return(%p) => %d \n",m,*m);
+ return 0;
+ case 2:
+ if (sys_futex(m, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+ fprintf(stderr, "sync: waking 1 from %p failed with %d: %s\n",
+ m, errno, strerror(errno));
+ return -1;
+ } else {
+ return 0;
+ }
+ default:
+ fprintf(stderr, "sync: got a garbage value from mutex %p. aborting\n",
+ m);
+ printf("see stderr\n");
+ abort();
+ }
+}
+int mutex_trylock(mutex *m) {
+ /* Try to take the lock, if is currently unlocked */
+ unsigned c = cmpxchg(m, 0, 1);
+ if (!c) return 0;
+ return 1;
+}
+
+int condition_wait(mutex *m) {
+ if (*m) {
+ return 0;
+ }
+ if (sys_futex(m, FUTEX_WAIT, 0, NULL, NULL, 0) == -1) {
+ if (errno == EINTR) {
+ return 1;
+ } else if (errno != EWOULDBLOCK) {
+ return -1;
+ }
+ }
+ return 0;
+}
+int condition_wait_force(mutex *m) {
+ while (1) {
+ if (sys_futex(m, FUTEX_WAIT, *m, NULL, NULL, 0) == -1) {
+ if (errno != EWOULDBLOCK) { // if it was an actual problem
+ if (errno == EINTR) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+ } else {
+ return 0;
+ }
+ }
+}
+inline int condition_set_value(mutex *m, mutex value) {
+ xchg(m, value);
+ return sys_futex(m, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+}
+int condition_set(mutex *m) {
+ return condition_set_value(m, 1);
+}
+int condition_unset(mutex *m) {
+ return !xchg(m, 0);
+}
+
diff --git a/aos/atom_code/ipc_lib/aos_sync.h b/aos/atom_code/ipc_lib/aos_sync.h
new file mode 100644
index 0000000..3c66264
--- /dev/null
+++ b/aos/atom_code/ipc_lib/aos_sync.h
@@ -0,0 +1,63 @@
+#ifndef AOS_IPC_LIB_SYNC_H_
+#define AOS_IPC_LIB_SYNC_H_
+
+#include <stdlib.h>
+#include <signal.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+// TODO(brians) add client requests to make helgrind useful with this code
+// <http://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.client-requests>
+// and <http://www.valgrind.org/docs/manual/drd-manual.html#drd-manual.clientreqs>
+// list the interesting ones
+
+// Have to align structs containing it to to sizeof(int).
+// Valid initial values for use with mutex_ functions are 0 (unlocked) and 1 (locked).
+// Valid initial values for use with condition_ functions are 0 (unset) and 1 (set).
+// The value should not be changed after multiple processes have started
+// accessing an instance except through the functions declared in this file.
+typedef volatile uint32_t mutex __attribute__((aligned(sizeof(int))));
+
+// All return -1 for other error (which will be in errno from futex(2)).
+
+// Returns 1 if interrupted by a signal.
+int mutex_lock(mutex *m) __attribute__((warn_unused_result));
+// Returns 2 if it timed out or 1 if interrupted by a signal.
+int mutex_lock_timeout(mutex *m, const struct timespec *timeout)
+ __attribute__((warn_unused_result));
+// Ignores signals. Can not fail.
+int mutex_grab(mutex *m);
+// Returns 1 for multiple unlocking and -1 if something bad happened and
+// whoever's waiting didn't get woken up.
+int mutex_unlock(mutex *m);
+// Returns 0 when successful in locking the mutex and 1 if somebody else has it
+// locked.
+int mutex_trylock(mutex *m) __attribute__((warn_unused_result));
+
+// The condition_ functions are similar to the mutex_ ones but different.
+// They are designed for signalling when something happens (possibly to
+// multiple listeners). A mutex manipulated with them can only be set or unset.
+
+// Wait for the condition to be set. Will return immediately if it's already set.
+// Returns 0 if successful or it was already set, 1 if interrupted by a signal, or -1.
+int condition_wait(mutex *m) __attribute__((warn_unused_result));
+// Will wait for the next condition_set, even if the condition is already set.
+// Returns 0 if successful, 1 if interrupted by a signal, or -1.
+int condition_wait_force(mutex *m) __attribute__((warn_unused_result));
+// Set the condition and wake up anybody waiting on it.
+// Returns the number that were woken or -1.
+int condition_set(mutex *m);
+// Same as above except lets something other than 1 be used as the final value.
+int condition_set_value(mutex *m, mutex value);
+// Unsets the condition.
+// Returns 0 if it was set before and 1 if it wasn't.
+int condition_unset(mutex *m);
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif
diff --git a/aos/atom_code/ipc_lib/binheap.c b/aos/atom_code/ipc_lib/binheap.c
new file mode 100644
index 0000000..8eb024d
--- /dev/null
+++ b/aos/atom_code/ipc_lib/binheap.c
@@ -0,0 +1,125 @@
+#include "binheap.h"
+#include <stdlib.h>
+#include <stdio.h>
+
+#ifndef TESTING_ASSERT
+#define TESTING_ASSERT(...)
+#endif
+#define Error(x) TESTING_ASSERT(0, x)
+
+#define MinData (0)
+
+void Initialize( int Elements, PriorityQueue H )
+{
+ H->Capacity = Elements - 1;
+ H->Size = 0;
+ H->Elements[ 0 ] = MinData;
+}
+
+int Insert( ElementType X, PriorityQueue H )
+{
+ int i;
+
+ if( IsFull( H ) )
+ {
+ return -1;
+ }
+
+ for( i = ++H->Size; H->Elements[ i / 2 ] > X; i /= 2 )
+ H->Elements[ i ] = H->Elements[ i / 2 ];
+ H->Elements[ i ] = X;
+ return 0;
+}
+
+void Remove( ElementType X, PriorityQueue H )
+{
+ int i, Child, removed = 0;
+ ElementType LastElement;
+
+ for ( i = 1; i <= H->Size; ++i )
+ {
+ if( H->Elements[ i ] == X )
+ {
+ removed = i;
+ break;
+ }
+ }
+ if( removed == 0 )
+ {
+ fprintf(stderr, "could not find element %d to remove. not removing any\n", X);
+ return;
+ }
+
+ LastElement = H->Elements[ H->Size-- ];
+
+ for( i = removed; i * 2 <= H->Size; i = Child )
+ {
+ /* Find smaller child */
+ Child = i * 2;
+ if( Child != H->Size && H->Elements[ Child + 1 ]
+ < H->Elements[ Child ] )
+ Child++;
+
+ /* Percolate one level */
+ if( LastElement > H->Elements[ Child ] )
+ H->Elements[ i ] = H->Elements[ Child ];
+ else
+ break;
+ }
+ H->Elements[ i ] = LastElement;
+}
+
+ElementType DeleteMin( PriorityQueue H )
+{
+ int i, Child;
+ ElementType MinElement, LastElement;
+
+ if( IsEmpty( H ) )
+ {
+ Error( "Priority queue is empty" );
+ return H->Elements[ 0 ];
+ }
+ MinElement = H->Elements[ 1 ];
+ LastElement = H->Elements[ H->Size-- ];
+
+ for( i = 1; i * 2 <= H->Size; i = Child )
+ {
+ /* Find smaller child */
+ Child = i * 2;
+ if( Child != H->Size && H->Elements[ Child + 1 ]
+ < H->Elements[ Child ] )
+ Child++;
+
+ /* Percolate one level */
+ if( LastElement > H->Elements[ Child ] )
+ H->Elements[ i ] = H->Elements[ Child ];
+ else
+ break;
+ }
+ H->Elements[ i ] = LastElement;
+ return MinElement;
+}
+
+ElementType GetMin( PriorityQueue H )
+{
+ if( !IsEmpty( H ) )
+ return H->Elements[ 1 ];
+ Error( "Priority Queue is Empty" );
+ return H->Elements[ 0 ];
+}
+
+int IsEmpty( PriorityQueue H )
+{
+ return H->Size == 0;
+}
+
+int IsFull( PriorityQueue H )
+{
+ return H->Size == H->Capacity;
+}
+
+int GetSize( PriorityQueue H )
+{
+ return H->Size;
+}
+
diff --git a/aos/atom_code/ipc_lib/binheap.h b/aos/atom_code/ipc_lib/binheap.h
new file mode 100644
index 0000000..8c26f9f
--- /dev/null
+++ b/aos/atom_code/ipc_lib/binheap.h
@@ -0,0 +1,29 @@
+#ifndef _BinHeap_H
+#define _BinHeap_H
+
+#include <stdint.h>
+
+typedef uint8_t ElementType;
+struct HeapStruct;
+typedef struct HeapStruct *PriorityQueue;
+
+struct HeapStruct
+{
+ int Capacity;
+ int Size;
+ ElementType *Elements;
+};
+
+// Elements is the number allocated at H->Elements
+void Initialize( int Elements, PriorityQueue H );
+// 0 if successful, -1 if full
+int Insert( ElementType X, PriorityQueue H );
+void Remove( ElementType X, PriorityQueue H );
+ElementType DeleteMin( PriorityQueue H );
+ElementType GetMin( PriorityQueue H );
+int IsEmpty( PriorityQueue H );
+int IsFull( PriorityQueue H );
+int GetSize( PriorityQueue H );
+
+#endif
+
diff --git a/aos/atom_code/ipc_lib/binheap_test.cpp b/aos/atom_code/ipc_lib/binheap_test.cpp
new file mode 100644
index 0000000..62eecd4
--- /dev/null
+++ b/aos/atom_code/ipc_lib/binheap_test.cpp
@@ -0,0 +1,65 @@
+extern "C" {
+#include "binheap.h"
+}
+
+#include <gtest/gtest.h>
+
+class BinHeapTest : public testing::Test{
+ protected:
+ static const int TEST_ELEMENTS = 57;
+ PriorityQueue queue;
+ virtual void SetUp(){
+ queue = new HeapStruct();
+ queue->Elements = new uint8_t[TEST_ELEMENTS];
+ Initialize(TEST_ELEMENTS, queue);
+ }
+ virtual void TearDown(){
+ delete[] queue->Elements;
+ delete queue;
+ }
+};
+
+std::ostream& operator<< (std::ostream& o, uint8_t c){
+ return o<<(int)c;
+}
+
+testing::AssertionResult Contains(PriorityQueue queue, const uint8_t expected[], size_t length){
+ for(size_t i = 0; i < length; ++i){
+ //printf("expected[%d]=%d\n", i, expected[i]);
+ if(DeleteMin(queue) != expected[i]){
+ return testing::AssertionFailure() << "queue[" << i << "] != " << expected[i];
+ }
+ }
+ if(!IsEmpty(queue))
+ return testing::AssertionFailure() << "queue is longer than " << length;
+ return ::testing::AssertionSuccess();
+}
+
+TEST_F(BinHeapTest, SingleElement){
+ Insert(87, queue);
+ EXPECT_EQ(87, DeleteMin(queue));
+ EXPECT_TRUE(IsEmpty(queue));
+}
+TEST_F(BinHeapTest, MultipleElements){
+ Insert(54, queue);
+ Insert(1, queue);
+ Insert(0, queue);
+ Insert(255, queue);
+ Insert(123, queue);
+ uint8_t expected[] = {0, 1, 54, 123, 255};
+ EXPECT_TRUE(Contains(queue, expected, sizeof(expected)));
+}
+TEST_F(BinHeapTest, Removals){
+ Insert(54, queue);
+ Insert(1, queue);
+ Insert(0, queue);
+ Insert(255, queue);
+ Insert(123, queue);
+ Remove(255, queue);
+ Remove(0, queue);
+ Insert(222, queue);
+ Insert(67, queue);
+ uint8_t expected[] = {1, 54, 67, 123, 222};
+ EXPECT_TRUE(Contains(queue, expected, sizeof(expected)));
+}
+
diff --git a/aos/atom_code/ipc_lib/cmpxchg.h b/aos/atom_code/ipc_lib/cmpxchg.h
new file mode 100644
index 0000000..acb4a3c
--- /dev/null
+++ b/aos/atom_code/ipc_lib/cmpxchg.h
@@ -0,0 +1,153 @@
+#ifndef __ASM_CMPXCHG_H
+#define __ASM_CMPXCHG_H
+
+#include <stdint.h>
+
+//TODO implement xchg using gcc's atomic builtins (http://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html)
+//or maybe http://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html
+//__atomic_fetch_sub looks promising
+
+#define cmpxchg(ptr, o, n) __sync_val_compare_and_swap(ptr, o, n)
+/*#define xchg(ptr, n) ({typeof(*ptr) r; \
+ do{ \
+ r = *ptr; \
+ }while(!__sync_bool_compare_and_swap(ptr, r, n)); \
+ r; \
+})*/
+
+# define LOCK "lock;"
+# define LOCK_PREFIX "lock;"
+
+#define xchg(ptr,v) ((__typeof__(*(ptr)))__xchg((unsigned long)(v),(ptr),sizeof(*(ptr))))
+
+#define __xg(x) ((volatile long long *)(x))
+
+/*static inline void set_64bit(volatile unsigned long *ptr, unsigned long val)
+{
+ *ptr = val;
+}
+
+#define _set_64bit set_64bit*/
+
+/*
+ * Note: no "lock" prefix even on SMP: xchg always implies lock anyway
+ * Note 2: xchg has side effect, so that attribute volatile is necessary,
+ * but generally the primitive is invalid, *ptr is output argument. --ANK
+ */
+static inline unsigned long __xchg(unsigned long x, volatile void * ptr, int size)
+{
+ switch (size) {
+ case 1:
+ __asm__ __volatile__("xchgb %b0,%1"
+ :"=q" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ case 2:
+ __asm__ __volatile__("xchgw %w0,%1"
+ :"=r" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ case 4:
+ __asm__ __volatile__("xchgl %k0,%1"
+ :"=r" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ case 8:
+ __asm__ __volatile__("xchg %0,%1"
+ :"=r" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ }
+ return x;
+}
+
+/*
+ * Atomic compare and exchange. Compare OLD with MEM, if identical,
+ * store NEW in MEM. Return the initial value in MEM. Success is
+ * indicated by comparing RETURN with OLD.
+ */
+
+#if 0
+
+#define __HAVE_ARCH_CMPXCHG 1
+
+static inline unsigned long __cmpxchg(volatile void *ptr, unsigned long old,
+ unsigned long new, int size)
+{
+ int32_t prev;
+ switch (size) {
+ case 1:
+ __asm__ __volatile__(LOCK_PREFIX "cmpxchgb %b1,%2"
+ : "=a"(prev)
+ : "q"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 2:
+ __asm__ __volatile__(LOCK_PREFIX "cmpxchgw %w1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 4:
+ __asm__ __volatile__(LOCK_PREFIX "cmpxchgl %k1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 8:
+ __asm__ __volatile__("lock; cmpxchg %1,%2"
+ : "=a"(prev)
+ : "q"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ }
+ return old;
+}
+
+/*
+static inline unsigned long __cmpxchg_local(volatile void *ptr,
+ unsigned long old, unsigned long new, int size)
+{
+ unsigned long prev;
+ switch (size) {
+ case 1:
+ __asm__ __volatile__("cmpxchgb %b1,%2"
+ : "=a"(prev)
+ : "q"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 2:
+ __asm__ __volatile__("cmpxchgw %w1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 4:
+ __asm__ __volatile__("cmpxchgl %k1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 8:
+ __asm__ __volatile__("cmpxchgq %1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ }
+ return old;
+}*/
+
+#define cmpxchg(ptr,o,n)\
+ ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
+ (unsigned long)(n),sizeof(*(ptr))))
+/*#define cmpxchg_local(ptr,o,n)\
+ ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
+ (unsigned long)(n),sizeof(*(ptr))))*/
+#endif
+
+#endif
diff --git a/aos/atom_code/ipc_lib/core_lib.c b/aos/atom_code/ipc_lib/core_lib.c
new file mode 100644
index 0000000..d4988cc
--- /dev/null
+++ b/aos/atom_code/ipc_lib/core_lib.c
@@ -0,0 +1,53 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include "shared_mem.h"
+#include "core_lib.h"
+#include <time.h>
+
+void init_shared_mem_core(aos_shm_core *shm_core) {
+ clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
+ shm_core->queues.alloc_flag = 0;
+ shm_core->msg_alloc_lock = 0;
+ shm_core->queues.queue_list = NULL;
+ shm_core->queues.alloc_lock = 0;
+ aos_resource_entity_root_create();
+ for(int i = 0; i < AOS_RESOURCE_NUM; ++i){
+ aos_resource_init(i);
+ }
+}
+static inline uint8_t aos_8max(uint8_t l, uint8_t r) {
+ return (l > r) ? l : r;
+}
+void *shm_malloc_aligned(size_t length, uint8_t alignment) {
+ // minimum alignments from <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
+ if (length <= 1) {
+ alignment = aos_8max(alignment, 1);
+ } else if (length <= 2) {
+ alignment = aos_8max(alignment, 2);
+ } else if (length <= 4) {
+ alignment = aos_8max(alignment, 4);
+ } else if (length <= 8) {
+ alignment = aos_8max(alignment, 8);
+ } else if (length <= 16) {
+ alignment = aos_8max(alignment, 16);
+ } else {
+ alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
+ }
+
+ void *msg = NULL;
+ aos_shm_core *shm_core = global_core->mem_struct;
+ mutex_grab(&shm_core->msg_alloc_lock);
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
+ const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
+ msg = shm_core->msg_alloc;
+ if (msg <= global_core->shared_mem) {
+ fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
+ printf("if you didn't see the stderr output just then, you should\n");
+ abort();
+ }
+ //printf("alloc %p\n", msg);
+ mutex_unlock(&shm_core->msg_alloc_lock);
+ return msg;
+}
+
diff --git a/aos/atom_code/ipc_lib/core_lib.h b/aos/atom_code/ipc_lib/core_lib.h
new file mode 100644
index 0000000..1983f7a
--- /dev/null
+++ b/aos/atom_code/ipc_lib/core_lib.h
@@ -0,0 +1,55 @@
+#ifndef _AOS_CORE_LIB_H_
+#define _AOS_CORE_LIB_H_
+
+// required by resource.h
+// defined in shared_mem.c
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+extern struct aos_core *global_core;
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#include "aos_sync.h"
+#include "queue.h"
+#include <stdint.h>
+#include "resource_core.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+struct aos_queue_list_t;
+typedef struct aos_queue_hash_t {
+ int alloc_flag;
+ mutex alloc_lock;
+ struct aos_queue_list_t *queue_list;
+} aos_queue_hash;
+
+typedef struct aos_shm_core_t {
+ // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
+ // this shared memory area
+ struct timespec identifier;
+ // gets 0-initialized at the start (as part of shared memory) and
+ // the owner sets as soon as it finishes setting stuff up
+ mutex creation_condition;
+ mutex msg_alloc_lock;
+ void *msg_alloc;
+ aos_queue_hash queues;
+ aos_resource_list resources;
+} aos_shm_core;
+
+void init_shared_mem_core(aos_shm_core *shm_core);
+
+void *shm_malloc_aligned(size_t length, uint8_t alignment);
+static void *shm_malloc(size_t length);
+static inline void *shm_malloc(size_t length) {
+ return shm_malloc_aligned(length, 0);
+}
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif
diff --git a/aos/atom_code/ipc_lib/ipc_lib.gyp b/aos/atom_code/ipc_lib/ipc_lib.gyp
new file mode 100644
index 0000000..4dd0fa9
--- /dev/null
+++ b/aos/atom_code/ipc_lib/ipc_lib.gyp
@@ -0,0 +1,55 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'ipc_lib',
+ 'type': 'static_library',
+ 'sources': [
+ 'aos_sync.c',
+ 'binheap.c',
+ 'core_lib.c',
+ 'queue.c',
+ 'resource.c',
+ 'shared_mem.c',
+ ],
+ 'dependencies': [
+ '<(AOS)/build/aos.gyp:aos/ResourceList.h',
+ ],
+ 'export_dependent_settings': [
+ '<(AOS)/build/aos.gyp:aos/ResourceList.h',
+ ],
+ },
+ {
+ 'target_name': 'binheap_test',
+ 'type': 'executable',
+ 'sources': [
+ 'binheap_test.cpp',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):gtest',
+ '<(AOS)/build/aos.gyp:libaos',
+ ],
+ },
+ {
+ 'target_name': 'resource_test',
+ 'type': 'executable',
+ 'sources': [
+ 'resource_test.cpp',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):gtest',
+ '<(AOS)/build/aos.gyp:libaos',
+ ],
+ },
+ {
+ 'target_name': 'ipc_queue_test',
+ 'type': 'executable',
+ 'sources': [
+ 'queue_test.cpp',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):gtest',
+ '<(AOS)/build/aos.gyp:libaos',
+ ],
+ },
+ ],
+}
diff --git a/aos/atom_code/ipc_lib/mutex.cpp b/aos/atom_code/ipc_lib/mutex.cpp
new file mode 100644
index 0000000..d1f0ef2
--- /dev/null
+++ b/aos/atom_code/ipc_lib/mutex.cpp
@@ -0,0 +1,37 @@
+#include "aos/common/mutex.h"
+
+#include <inttypes.h>
+#include <errno.h>
+
+#include "aos/aos_core.h"
+#include "aos/common/type_traits.h"
+
+namespace aos {
+
+Mutex::Mutex() : impl_(0) {
+ static_assert(shm_ok<Mutex>::value,
+ "Mutex is not safe for use in shared memory.");
+}
+
+// Lock and Unlock use the return values of mutex_lock/mutex_unlock
+// to determine whether the lock/unlock succeeded.
+
+void Mutex::Lock() {
+ if (mutex_grab(&impl_) != 0) {
+ LOG(FATAL, "mutex_grab(%p(=%"PRIu32")) failed because of %d: %s\n",
+ &impl_, impl_, errno, strerror(errno));
+ }
+}
+
+void Mutex::Unlock() {
+ if (mutex_unlock(&impl_) != 0) {
+ LOG(FATAL, "mutex_unlock(%p(=%"PRIu32")) failed because of %d: %s\n",
+ &impl_, impl_, errno, strerror(errno));
+ }
+}
+
+bool Mutex::TryLock() {
+ return mutex_trylock(&impl_) == 0;
+}
+
+} // namespace aos
diff --git a/aos/atom_code/ipc_lib/queue.c b/aos/atom_code/ipc_lib/queue.c
new file mode 100644
index 0000000..5cfd2ac
--- /dev/null
+++ b/aos/atom_code/ipc_lib/queue.c
@@ -0,0 +1,510 @@
+#include "aos/atom_code/ipc_lib/queue.h"
+#include "aos/atom_code/ipc_lib/queue_internal.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#define READ_DEBUG 0
+#define WRITE_DEBUG 0
+#define REF_DEBUG 0
+
+static inline aos_msg_header *get_header(void *msg) {
+ return (aos_msg_header *)((uint8_t *)msg - sizeof(aos_msg_header));
+}
+static inline aos_queue *aos_core_alloc_queue() {
+ return shm_malloc_aligned(sizeof(aos_queue), sizeof(int));
+}
+static inline void *aos_alloc_msg(aos_msg_pool *pool) {
+ return shm_malloc(pool->msg_length);
+}
+
+// actually free the given message
+static inline int aos_free_msg(aos_msg_pool *pool, void *msg, aos_queue *queue) {
+#if REF_DEBUG
+ if (pool->pool_lock == 0) {
+ //LOG(WARNING, "unprotected\n");
+ }
+#endif
+ aos_msg_header *header = get_header(msg);
+ if (pool->pool[header->index] != header) { // if something's messed up
+ fprintf(stderr, "queue: something is very very wrong with queue %p."
+ " pool->pool(=%p)[header->index(=%d)] != header(=%p)\n",
+ queue, pool->pool, header->index, header);
+ printf("queue: see stderr\n");
+ abort();
+ }
+#if REF_DEBUG
+ printf("ref_free_count: %p\n", msg);
+#endif
+ --pool->used;
+
+ if (queue->recycle != NULL) {
+ void *const new_msg = aos_queue_get_msg(queue->recycle);
+ if (new_msg == NULL) {
+ fprintf(stderr, "queue: couldn't get a message"
+ " for recycle queue %p\n", queue->recycle);
+ } else {
+ // Take a message from recycle_queue and switch its
+ // header with the one being freed, which effectively
+ // switches which queue each message belongs to.
+ aos_msg_header *const new_header = get_header(new_msg);
+ // also switch the messages between the pools
+ pool->pool[header->index] = new_header;
+ if (mutex_lock(&queue->recycle->pool.pool_lock)) {
+ return -1;
+ }
+ queue->recycle->pool.pool[new_header->index] = header;
+ // swap the information in both headers
+ header_swap(header, new_header);
+ // don't unlock the other pool until all of its messages are valid
+ mutex_unlock(&queue->recycle->pool.pool_lock);
+ // use the header for new_msg which is now for this pool
+ header = new_header;
+ if (aos_queue_write_msg_free(queue->recycle,
+ (void *)msg, OVERRIDE) != 0) {
+ printf("queue: warning aos_queue_write_msg("
+ "%p(=queue(=%p)->recycle), %p, OVERRIDE)"
+ " failed\n",
+ queue->recycle, queue, msg);
+ }
+ msg = new_msg;
+ }
+ }
+
+ // where the one we're freeing was
+ int index = header->index;
+ header->index = -1;
+ if (index != pool->used) { // if we're not freeing the one on the end
+ // put the last one where the one we're freeing was
+ header = pool->pool[index] = pool->pool[pool->used];
+ // put the one we're freeing at the end
+ pool->pool[pool->used] = get_header(msg);
+ // update the former last one's index
+ header->index = index;
+ }
+ return 0;
+}
+// TODO(brians) maybe do this with atomic integer instructions so it doesn't have to lock/unlock pool_lock
+static inline int msg_ref_dec(void *msg, aos_msg_pool *pool, aos_queue *queue) {
+ if (msg == NULL) {
+ return 0;
+ }
+
+ int rv = 0;
+ if (mutex_lock(&pool->pool_lock)) {
+ return -1;
+ }
+ aos_msg_header *const header = get_header(msg);
+ header->ref_count --;
+ assert(header->ref_count >= 0);
+#if REF_DEBUG
+ printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
+#endif
+ if (header->ref_count == 0) {
+ rv = aos_free_msg(pool, msg, queue);
+ }
+ mutex_unlock(&pool->pool_lock);
+ return rv;
+}
+
+static inline int sigcmp(const aos_type_sig *sig1, const aos_type_sig *sig2) {
+ if (sig1->length != sig2->length) {
+ //LOG(DEBUG, "length mismatch 1=%d 2=%d\n", sig1->length, sig2->length);
+ return 0;
+ }
+ if (sig1->queue_length != sig2->queue_length) {
+ //LOG(DEBUG, "queue_length mismatch 1=%d 2=%d\n", sig1->queue_length, sig2->queue_length);
+ return 0;
+ }
+ if (sig1->hash != sig2->hash) {
+ //LOG(DEBUG, "hash mismatch 1=%d 2=%d\n", sig1->hash, sig2->hash);
+ return 0;
+ }
+ //LOG(DEBUG, "signature match\n");
+ return 1;
+}
+static inline aos_queue *aos_create_queue(const aos_type_sig *sig) {
+ aos_queue *const queue = aos_core_alloc_queue();
+ aos_msg_pool *const pool = &queue->pool;
+ pool->mem_length = sig->queue_length + EXTRA_MESSAGES;
+ pool->length = 0;
+ pool->used = 0;
+ pool->msg_length = sig->length + sizeof(aos_msg_header);
+ pool->pool = shm_malloc(sizeof(void *) * pool->mem_length);
+ aos_ring_buf *const buf = &queue->buf;
+ buf->length = sig->queue_length + 1;
+ if (buf->length < 2) { // TODO(brians) when could this happen?
+ buf->length = 2;
+ }
+ buf->data = shm_malloc(buf->length * sizeof(void *));
+ buf->start = 0;
+ buf->end = 0;
+ buf->msgs = 0;
+ buf->writable = 1;
+ buf->readable = 0;
+ buf->buff_lock = 0;
+ pool->pool_lock = 0;
+ queue->recycle = NULL;
+ return queue;
+}
+aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig) {
+ //LOG(DEBUG, "Fetching the stupid queue: %s\n", name);
+ mutex_grab(&global_core->mem_struct->queues.alloc_lock);
+ aos_queue_list *list = global_core->mem_struct->queues.queue_list;
+ aos_queue_list *last = NULL;
+ while (list != NULL) {
+ // if we found a matching queue
+ if (strcmp(list->name, name) == 0 && sigcmp(&list->sig, sig)) {
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return list->queue;
+ } else {
+ //LOG(DEBUG, "rejected queue %s strcmp=%d target=%s\n", (*list)->name, strcmp((*list)->name, name), name);
+ }
+ last = list;
+ list = list->next;
+ }
+ list = shm_malloc(sizeof(aos_queue_list));
+ if (last == NULL) {
+ global_core->mem_struct->queues.queue_list = list;
+ } else {
+ last->next = list;
+ }
+ list->sig = *sig;
+ const size_t name_size = strlen(name) + 1;
+ list->name = shm_malloc(name_size);
+ memcpy(list->name, name, name_size);
+ //LOG(INFO, "creating queue{name=%s, sig.length=%zd, sig.hash=%d, sig.queue_length=%d}\n", name, sig->length, sig->hash, sig->queue_length);
+ list->queue = aos_create_queue(sig);
+ //LOG(DEBUG, "Made the stupid queue: %s happy?\n", name);
+ list->next = NULL;
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return list->queue;
+}
+aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
+ const aos_type_sig *recycle_sig, aos_queue **recycle) {
+ if (sig->length != recycle_sig->length || sig->hash == recycle_sig->hash) {
+ *recycle = NULL;
+ return NULL;
+ }
+ aos_queue *const r = aos_fetch_queue(name, sig);
+ r->recycle = aos_fetch_queue(name, recycle_sig);
+ if (r == r->recycle) {
+ fprintf(stderr, "queue: r->recycle(=%p) == r(=%p)\n", r->recycle, r);
+ printf("see stderr\n");
+ abort();
+ }
+ *recycle = r->recycle;
+ return r;
+}
+
+int aos_queue_write_msg(aos_queue *queue, void *msg, int opts) {
+#if WRITE_DEBUG
+ printf("queue: write_msg(%p, %p, %d)\n", queue, msg, opts);
+#endif
+ int rv = 0;
+ if (msg == NULL || msg < (void *)global_core->mem_struct ||
+ msg > (void *)((intptr_t)global_core->mem_struct + global_core->size)) {
+ fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
+ msg, queue);
+ printf("see stderr\n");
+ abort();
+ }
+ aos_ring_buf *const buf = &queue->buf;
+ if (mutex_lock(&buf->buff_lock)) {
+#if WRITE_DEBUG
+ printf("queue: locking buff_lock of %p failed\n", buf);
+#endif
+ return -1;
+ }
+ int new_end = (buf->end + 1) % buf->length;
+ while (new_end == buf->start) {
+ if (opts & NON_BLOCK) {
+#if WRITE_DEBUG
+ printf("queue: not blocking on %p. returning -1\n", queue);
+#endif
+ mutex_unlock(&buf->buff_lock);
+ return -1;
+ } else if (opts & OVERRIDE) {
+#if WRITE_DEBUG
+ printf("queue: overriding on %p\n", queue);
+#endif
+ // avoid leaking the message that we're going to overwrite
+ msg_ref_dec(buf->data[buf->start], &queue->pool, queue);
+ buf->start = (buf->start + 1) % buf->length;
+ } else { // BLOCK
+ mutex_unlock(&buf->buff_lock);
+#if WRITE_DEBUG
+ printf("queue: going to wait for writable(=%p) of %p\n",
+ &buf->writable, queue);
+#endif
+ if (condition_wait(&buf->writable)) {
+#if WRITE_DEBUG
+ printf("queue: waiting for writable(=%p) of %p failed\n",
+ &buf->writable, queue);
+#endif
+ return -1;
+ }
+#if WRITE_DEBUG
+ printf("queue: going to re-lock buff_lock of %p to write\n", queue);
+#endif
+ if (mutex_lock(&buf->buff_lock)) {
+#if WRITE_DEBUG
+ printf("queue: error locking buff_lock of %p\n", queue);
+#endif
+ return -1;
+ }
+ }
+ new_end = (buf->end + 1) % buf->length;
+ }
+ buf->data[buf->end] = msg;
+ ++buf->msgs;
+ buf->end = new_end;
+ mutex_unlock(&buf->buff_lock);
+#if WRITE_DEBUG
+ printf("queue: setting readable(=%p) of %p\n", &buf->readable, queue);
+#endif
+ condition_set(&buf->readable);
+ if (((buf->end + 1) % buf->length) == buf->start) { // if it's now full
+ condition_unset(&buf->writable);
+ }
+#if WRITE_DEBUG
+ printf("queue: write returning %d on queue %p\n", rv, queue);
+#endif
+ return rv;
+}
+
+int aos_queue_free_msg(aos_queue *queue, const void *msg) {
+ // TODO(brians) get rid of this
+ void *msg_temp;
+ memcpy(&msg_temp, &msg, sizeof(msg_temp));
+ return msg_ref_dec(msg_temp, &queue->pool, queue);
+}
+// Deals with setting/unsetting readable and writable.
+// Should be called after buff_lock has been unlocked.
+// read is whether or not this read call read one off the queue
+static inline void aos_read_msg_common_end(aos_ring_buf *const buf, int read) {
+ if (read) {
+ condition_set(&buf->writable);
+ if (buf->start == buf->end) {
+ condition_unset(&buf->readable);
+ }
+ }
+}
+// Returns with buff_lock locked and a readable message in buf.
+// Returns -1 for error (if it returns -1, buff_lock will be unlocked).
+static inline int aos_queue_read_msg_common(int opts, aos_ring_buf *const buf,
+ aos_queue *const queue, int *index) {
+#if !READ_DEBUG
+ (void)queue;
+#endif
+ if (mutex_lock(&buf->buff_lock)) {
+#if READ_DEBUG
+ printf("queue: couldn't lock buff_lock of %p\n", queue);
+#endif
+ return -1;
+ }
+ while (buf->start == buf->end || ((index != NULL) && buf->msgs <= *index)) {
+ mutex_unlock(&buf->buff_lock);
+ if (opts & NON_BLOCK) {
+#if READ_DEBUG
+ printf("queue: not going to block waiting on %p\n", queue);
+#endif
+ return -1;
+ } else { // BLOCK
+#if READ_DEBUG
+ printf("queue: going to wait for readable(=%p) of %p\n",
+ &buf->readable, queue);
+#endif
+ // wait for a message to become readable
+ if ((index == NULL) ? condition_wait(&buf->readable) :
+ condition_wait_force(&buf->readable)) {
+#if READ_DEBUG
+ printf("queue: waiting for readable(=%p) of %p failed\n",
+ &buf->readable, queue);
+#endif
+ return -1;
+ }
+ }
+#if READ_DEBUG
+ printf("queue: going to re-lock buff_lock of %p to read\n", queue);
+#endif
+ if (mutex_lock(&buf->buff_lock)) {
+#if READ_DEBUG
+ printf("couldn't re-lock buff_lock of %p\n", queue);
+#endif
+ return -1;
+ }
+ }
+#if READ_DEBUG
+ printf("queue: read start=%d end=%d from %p\n", buf->start, buf->end, queue);
+#endif
+ return 0;
+}
+// handles reading with PEEK
+static inline void *read_msg_peek(aos_ring_buf *const buf, int opts, int start) {
+ void *ret;
+ if (opts & FROM_END) {
+ int pos = buf->end - 1;
+ if (pos < 0) { // if it needs to wrap
+ pos = buf->length - 1;
+ }
+#if READ_DEBUG
+ printf("queue: reading from line %d: %d\n", __LINE__, pos);
+#endif
+ ret = buf->data[pos];
+ } else {
+#if READ_DEBUG
+ printf("queue: reading from line %d: %d\n", __LINE__, start);
+#endif
+ ret = buf->data[start];
+ }
+ aos_msg_header *const header = get_header(ret);
+ header->ref_count ++;
+#if REF_DEBUG
+ printf("ref inc count: %p\n", ret);
+#endif
+ return ret;
+}
+const void *aos_queue_read_msg(aos_queue *queue, int opts) {
+#if READ_DEBUG
+ printf("queue: read_msg(%p, %d)\n", queue, opts);
+#endif
+ void *msg = NULL;
+ aos_ring_buf *const buf = &queue->buf;
+ if (aos_queue_read_msg_common(opts, buf, queue, NULL) == -1) {
+#if READ_DEBUG
+ printf("queue: common returned -1 for %p\n", queue);
+#endif
+ return NULL;
+ }
+ if (opts & PEEK) {
+ msg = read_msg_peek(buf, opts, buf->start);
+ } else {
+ if (opts & FROM_END) {
+ while (1) {
+#if READ_DEBUG
+ printf("queue: start of c2 of %p\n", queue);
+#endif
+ // This loop pulls each message out of the buffer.
+ const int pos = buf->start;
+ buf->start = (buf->start + 1) % buf->length;
+ // if this is the last one
+ if (buf->start == buf->end) {
+#if READ_DEBUG
+ printf("queue: reading from c2: %d\n", pos);
+#endif
+ msg = buf->data[pos];
+ break;
+ }
+ // it's not going to be in the queue any more
+ msg_ref_dec(buf->data[pos], &queue->pool, queue);
+ }
+ } else {
+#if READ_DEBUG
+ printf("queue: reading from d2: %d\n", buf->start);
+#endif
+ msg = buf->data[buf->start];
+ buf->start = (buf->start + 1) % buf->length;
+ }
+ }
+ mutex_unlock(&buf->buff_lock);
+ aos_read_msg_common_end(buf, !(opts & PEEK));
+#if READ_DEBUG
+ printf("queue: read returning %p\n", msg);
+#endif
+ return msg;
+}
+const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index) {
+#if READ_DEBUG
+ printf("queue: read_msg_index(%p, %d, %p(*=%d))\n", queue, opts, index, *index);
+#endif
+ void *msg = NULL;
+ aos_ring_buf *const buf = &queue->buf;
+ if (aos_queue_read_msg_common(opts, buf, queue, index) == -1) {
+#if READ_DEBUG
+ printf("queue: common returned -1\n");
+#endif
+ return NULL;
+ }
+ // TODO(parker): Handle integer wrap on the index.
+ const int offset = buf->msgs - *index;
+ int my_start = buf->end - offset;
+ if (offset >= buf->length) { // if we're behind the available messages
+ // catch index up to the last available message
+ *index += buf->start - my_start;
+ // and that's the one we're going to read
+ my_start = buf->start;
+ }
+ if (my_start < 0) { // if we want to read off the end of the buffer
+ // unwrap where we're going to read from
+ my_start += buf->length;
+ }
+ if (opts & PEEK) {
+ msg = read_msg_peek(buf, opts, my_start);
+ } else {
+ if (opts & FROM_END) {
+#if READ_DEBUG
+ printf("queue: start of c1 of %p\n", queue);
+#endif
+ int pos = buf->end - 1;
+ if (pos < 0) { // if it wrapped
+ pos = buf->length - 1; // unwrap it
+ }
+#if READ_DEBUG
+ printf("queue: reading from c1: %d\n", pos);
+#endif
+ msg = buf->data[pos];
+ *index = buf->msgs;
+ } else {
+#if READ_DEBUG
+ printf("queue: reading from d1: %d\n", my_start);
+#endif
+ msg = buf->data[my_start];
+ ++(*index);
+ }
+ aos_msg_header *const header = get_header(msg);
+ ++header->ref_count;
+#if REF_DEBUG
+ printf("ref_inc_count: %p\n", msg);
+#endif
+ }
+ mutex_unlock(&buf->buff_lock);
+ // this function never consumes one off the queue
+ aos_read_msg_common_end(buf, 0);
+ return msg;
+}
+static inline void *aos_pool_get_msg(aos_msg_pool *pool) {
+ if (mutex_lock(&pool->pool_lock)) {
+ return NULL;
+ }
+ void *msg;
+ if (pool->length - pool->used > 0) {
+ msg = pool->pool[pool->used];
+ } else {
+ if (pool->length >= pool->mem_length) {
+ //TODO(brians) log this if it isn't the log queue
+ fprintf(stderr, "queue: overused_pool\n");
+ msg = NULL;
+ goto exit;
+ }
+ msg = pool->pool[pool->length] = aos_alloc_msg(pool);
+ ++pool->length;
+ }
+ aos_msg_header *const header = msg;
+ msg = (uint8_t *)msg + sizeof(aos_msg_header);
+ header->ref_count = 1;
+#if REF_DEBUG
+ printf("ref alloc: %p\n", msg);
+#endif
+ header->index = pool->used;
+ ++pool->used;
+exit:
+ mutex_unlock(&pool->pool_lock);
+ return msg;
+}
+void *aos_queue_get_msg(aos_queue *queue) {
+ return aos_pool_get_msg(&queue->pool);
+}
+
diff --git a/aos/atom_code/ipc_lib/queue.h b/aos/atom_code/ipc_lib/queue.h
new file mode 100644
index 0000000..4c279e1
--- /dev/null
+++ b/aos/atom_code/ipc_lib/queue.h
@@ -0,0 +1,134 @@
+#ifndef AOS_IPC_LIB_QUEUE_H_
+#define AOS_IPC_LIB_QUEUE_H_
+
+#include "shared_mem.h"
+#include "aos_sync.h"
+
+// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
+// code to make checking for leaks work better
+// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
+// describes how
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// Queues are the primary way to use shared memory. Basic use consists of
+// initializing an aos_type_sig and then calling aos_fetch_queue on it.
+// This aos_queue* can then be used to get a message and write it or to read a
+// message.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and aos_type_sig will result in a different queue, which means that
+// if you only recompile some code that uses differently sized messages, it will
+// simply use a different queue than the old code.
+//
+// Any pointers returned from these functions can be safely passed to other
+// processes because they are all shared memory pointers.
+// IMPORTANT: Any message pointer must be passed back in some way
+// (aos_queue_free_msg and aos_queue_write_msg are common ones) or the
+// application will leak shared memory.
+// NOTE: Taking a message from read_msg and then passing it to write_msg might
+// work, but it is not guaranteed to.
+
+typedef struct aos_type_sig_t {
+ size_t length; // sizeof(message)
+ int hash; // can differentiate multiple otherwise identical queues
+ int queue_length; // how many messages the queue can hold
+} aos_type_sig;
+
+// Structures that are opaque to users (defined in queue_internal.h).
+typedef struct aos_queue_list_t aos_queue_list;
+typedef struct aos_queue_t aos_queue;
+
+// Retrieves (and creates if necessary) a queue. Each combination of name and
+// signature refers to a completely independent queue.
+aos_queue *aos_fetch_queue(const char *name, const aos_type_sig *sig);
+// Same as above, except sets up the returned queue so that it will put messages
+// on *recycle (retrieved with recycle_sig) when they are freed (after they have
+// been released by all other readers/writers and are not in the queue).
+// The length of recycle_sig determines how many freed messages will be kept.
+// Other code can retrieve recycle_sig and sig separately. However, any frees
+// made using aos_fetch_queue with only sig before the recycle queue has been
+// associated with it will not go on to the recyce queue.
+// Will return NULL for both queues if sig->length != recycle_sig->length or
+// sig->hash == recycle_sig->hash (just to be safe).
+// NOTE: calling this function with the same sig but multiple recycle_sig s
+// will result in each freed message being put onto an undefined recycle_sig.
+aos_queue *aos_fetch_queue_recycle(const char *name, const aos_type_sig *sig,
+ const aos_type_sig *recycle_sig, aos_queue **recycle);
+
+// Constants for passing to opts arguments.
+// #defines so that c code can use queues
+// The non-conflicting ones can be combined with bitwise-or.
+// TODO(brians) prefix these?
+//
+// Causes the returned message to be left in the queue.
+// For reading only.
+#define PEEK 0x0001
+// Reads the last message in the queue instead of just the next one.
+// NOTE: This removes all of the messages until the last one from the queue
+// (which means that nobody else will read them). However, PEEK means to not
+// remove any from the queue, including the ones that are skipped.
+// For reading only.
+#define FROM_END 0x0002
+// Causes reads to return NULL and writes to fail instead of waiting.
+// For reading and writing.
+#define NON_BLOCK 0x0004
+// Causes things to block.
+// IMPORTANT: #defined to 0 so that it is the default. This has to stay.
+// For reading and writing.
+#define BLOCK 0x0000
+// Causes writes to overwrite the oldest message in the queue instead of
+// blocking.
+// For writing only.
+#define OVERRIDE 0x0008
+
+// Frees a message. Does nothing if msg is NULL.
+int aos_queue_free_msg(aos_queue *queue, const void *msg);
+
+// Writes a message into the queue.
+// NOTE: msg must point to at least the length of this queue's worth of valid
+// data to write
+// IMPORTANT: if this returns -1, then the caller must do something with msg
+// (like free it)
+int aos_queue_write_msg(aos_queue *queue, void *msg, int opts);
+// Exactly the same as aos_queue_write_msg, except it automatically frees the
+// message if writing fails.
+static inline int aos_queue_write_msg_free(aos_queue *queue, void *msg, int opts) {
+ const int ret = aos_queue_write_msg(queue, msg, opts);
+ if (ret != 0) {
+ aos_queue_free_msg(queue, msg);
+ }
+ return ret;
+}
+
+// Reads a message out of the queue.
+// The return value will have at least the length of this queue's worth of valid
+// data where it's pointing to.
+// The return value is const because other people might be viewing the same
+// messsage. Do not cast the const away!
+// IMPORTANT: The return value (if not NULL) must eventually be passed to
+// aos_queue_free_msg.
+const void *aos_queue_read_msg(aos_queue *buf, int opts);
+// Exactly the same as aos_queue_read_msg, except it will never return the same
+// message twice with the same index argument. However, it may not return some
+// messages that pass through the queue.
+// *index should start as 0. index does not have to be in shared memory, but it
+// can be
+const void *aos_queue_read_msg_index(aos_queue *queue, int opts, int *index);
+
+// Retrieves ("allocates") a message that can then be written to the queue.
+// NOTE: the return value will be completely uninitialized
+// The return value will have at least the length of this queue's worth of valid
+// data where it's pointing to.
+// Returns NULL for error.
+// IMPORTANT: The return value (if not NULL) must eventually be passed to
+// aos_queue_free_msg.
+void *aos_queue_get_msg(aos_queue *queue);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
diff --git a/aos/atom_code/ipc_lib/queue_internal.h b/aos/atom_code/ipc_lib/queue_internal.h
new file mode 100644
index 0000000..e6b23ef
--- /dev/null
+++ b/aos/atom_code/ipc_lib/queue_internal.h
@@ -0,0 +1,62 @@
+#ifndef AOS_IPC_LIB_QUEUE_INTERNAL_H_
+#define AOS_IPC_LIB_QUEUE_INTERNAL_H_
+
+#include "shared_mem.h"
+#include "aos_sync.h"
+
+// Should only be used by queue.c. Contains definitions of the structures
+// it uses.
+
+// The number of extra messages the pool associated with each queue will be able
+// to hold (for readers who are slow about freeing them).
+#define EXTRA_MESSAGES 20
+
+typedef struct aos_msg_header_t {
+ int ref_count;
+ int index; // in the pool
+} aos_msg_header;
+static inline void header_swap(aos_msg_header *l, aos_msg_header *r) {
+ aos_msg_header tmp;
+ tmp.ref_count = l->ref_count;
+ tmp.index = l->index;
+ l->ref_count = r->ref_count;
+ l->index = r->index;
+ r->ref_count = tmp.ref_count;
+ r->index = tmp.index;
+}
+
+struct aos_queue_list_t {
+ char *name;
+ aos_type_sig sig;
+ aos_queue *queue;
+ aos_queue_list *next;
+};
+
+typedef struct aos_ring_buf_t {
+ mutex buff_lock; // the main lock protecting operations on this buffer
+ // conditions
+ mutex writable;
+ mutex readable;
+ int length; // max index into data + 1
+ int start; // is an index into data
+ int end; // is an index into data
+ int msgs; // that have passed through
+ void **data; // array of messages (w/ headers)
+} aos_ring_buf;
+
+typedef struct aos_msg_pool_t {
+ mutex pool_lock;
+ size_t msg_length;
+ int mem_length; // the number of messages
+ int used; // number of messages
+ int length; // number of allocated messages
+ void **pool; // array of messages
+} aos_msg_pool;
+
+struct aos_queue_t {
+ aos_msg_pool pool;
+ aos_ring_buf buf;
+ aos_queue *recycle;
+};
+
+#endif
diff --git a/aos/atom_code/ipc_lib/queue_test.cpp b/aos/atom_code/ipc_lib/queue_test.cpp
new file mode 100644
index 0000000..d4eb700
--- /dev/null
+++ b/aos/atom_code/ipc_lib/queue_test.cpp
@@ -0,0 +1,404 @@
+#include <unistd.h>
+#include <sys/mman.h>
+#include <inttypes.h>
+
+#include <ostream>
+#include <memory>
+#include <map>
+
+#include "gtest/gtest.h"
+
+#include "aos/aos_core.h"
+#include "aos/atom_code/ipc_lib/sharedmem_test_setup.h"
+#include "aos/common/type_traits.h"
+
+using testing::AssertionResult;
+using testing::AssertionSuccess;
+using testing::AssertionFailure;
+
+// IMPORTANT: Some of the functions that do test predicate functions allocate
+// shared memory (and don't free it).
+class QueueTest : public SharedMemTestSetup {
+ protected:
+ static const size_t kFailureSize = 400;
+ static char *fatal_failure;
+ private:
+ // This gets registered right after the fork, so it will get run before any
+ // exit handlers that had already been registered.
+ static void ExitExitHandler() {
+ _exit(EXIT_SUCCESS);
+ }
+ enum class ResultType : uint8_t {
+ NotCalled,
+ Called,
+ Returned,
+ };
+ const std::string ResultTypeString(volatile const ResultType &result) {
+ switch (result) {
+ case ResultType::Returned:
+ return "Returned";
+ case ResultType::Called:
+ return "Called";
+ case ResultType::NotCalled:
+ return "NotCalled";
+ default:
+ return std::string("unknown(" + static_cast<uint8_t>(result)) + ")";
+ }
+ }
+ static_assert(aos::shm_ok<ResultType>::value, "this will get put in shared memory");
+ // Gets allocated in shared memory so it has to be volatile.
+ template<typename T> struct FunctionToCall {
+ ResultType result;
+ bool expected;
+ void (*function)(T*, char*);
+ T *arg;
+ volatile char failure[kFailureSize];
+ };
+ template<typename T> static void Hangs_(volatile FunctionToCall<T> *const to_call) {
+ to_call->result = ResultType::Called;
+ to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
+ to_call->result = ResultType::Returned;
+ }
+
+ static const long kMsToNs = 1000000;
+ // The number of ms after which a function is considered to have hung.
+ // Must be < 1000.
+ static const long kHangTime = 10;
+ static const unsigned int kForkSleep = 0; // how many seconds to sleep after forking
+
+ // Represents a process that has been forked off. The destructor kills the
+ // process and wait(2)s for it.
+ class ForkedProcess {
+ const pid_t pid_;
+ mutex *const lock_;
+ public:
+ ForkedProcess(pid_t pid, mutex *lock) : pid_(pid), lock_(lock) {};
+ ~ForkedProcess() {
+ if (kill(pid_, SIGINT) == -1) {
+ if (errno == ESRCH) {
+ printf("process %jd was already dead\n", static_cast<intmax_t>(pid_));
+ } else {
+ fprintf(stderr, "kill(SIGKILL, %jd) failed with %d: %s\n",
+ static_cast<intmax_t>(pid_), errno, strerror(errno));
+ }
+ return;
+ }
+ const pid_t ret = wait(NULL);
+ if (ret == -1) {
+ fprintf(stderr, "wait(NULL) failed."
+ " child %jd might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret == 0) {
+ fprintf(stderr, "child %jd wasn't waitable. it might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret != pid_) {
+ fprintf(stderr, "child %d is dead, but child %jd might still be alive\n",
+ ret, static_cast<intmax_t>(pid_));
+ }
+ }
+
+ enum class JoinResult {
+ Finished, Hung, Error
+ };
+ JoinResult Join(long timeout = kHangTime) {
+ timespec ts{kForkSleep, timeout * kMsToNs};
+ switch (mutex_lock_timeout(lock_, &ts)) {
+ case 2:
+ return JoinResult::Hung;
+ case 0:
+ return JoinResult::Finished;
+ default:
+ return JoinResult::Error;
+ }
+ }
+ } __attribute__((unused));
+
+ // Member variables for HangsFork and HangsCheck.
+ typedef uint8_t ChildID;
+ static void ReapExitHandler() {
+ for (auto it = children_.begin(); it != children_.end(); ++it) {
+ delete it->second;
+ }
+ }
+ static std::map<ChildID, ForkedProcess *> children_;
+ std::map<ChildID, volatile FunctionToCall<void> *> to_calls_;
+
+ void SetUp() {
+ SharedMemTestSetup::SetUp();
+ fatal_failure = reinterpret_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+ static bool registered = false;
+ if (!registered) {
+ atexit(ReapExitHandler);
+ registered = true;
+ }
+ }
+
+ protected:
+ // Function gets called with arg in a forked process.
+ // Leaks shared memory.
+ // the attribute is in the middle to make gcc happy
+ template<typename T> __attribute__((warn_unused_result))
+ std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
+ mutex *lock = reinterpret_cast<mutex *>(shm_malloc_aligned(
+ sizeof(*lock), sizeof(int)));
+ *lock = 1;
+ const pid_t pid = fork();
+ switch (pid) {
+ case 0: // child
+ if (kForkSleep != 0) {
+ printf("pid %jd sleeping for %u\n", static_cast<intmax_t>(getpid()),
+ kForkSleep);
+ sleep(kForkSleep);
+ }
+ atexit(ExitExitHandler);
+ function(arg);
+ mutex_unlock(lock);
+ exit(EXIT_SUCCESS);
+ case -1: // parent failure
+ printf("fork() failed with %d: %s\n", errno, strerror(errno));
+ return std::unique_ptr<ForkedProcess>();
+ default: // parent
+ return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, lock));
+ }
+ }
+
+ // Checks whether or not the given function hangs.
+ // expected is whether to return success or failure if the function hangs
+ // NOTE: There are other reasons for it to return a failure than the function
+ // doing the wrong thing.
+ // Leaks shared memory.
+ template<typename T> AssertionResult Hangs(void (*function)(T*, char*), T *arg,
+ bool expected) {
+ AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
+ if (!fork_result) {
+ return fork_result;
+ }
+ return HangsCheck(0);
+ }
+ // Starts the first part of Hangs.
+ // Use HangsCheck to get the result.
+ // Returns whether the fork succeeded or not, NOT whether or not the hang
+ // check succeeded.
+ template<typename T> AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
+ bool expected, ChildID id) {
+ static_assert(aos::shm_ok<FunctionToCall<T>>::value,
+ "this is going into shared memory");
+ volatile FunctionToCall<T> *const to_call = reinterpret_cast<FunctionToCall<T> *>(
+ shm_malloc_aligned(sizeof(*to_call), sizeof(int)));
+ to_call->result = ResultType::NotCalled;
+ to_call->function = function;
+ to_call->arg = arg;
+ to_call->expected = expected;
+ to_call->failure[0] = '\0';
+ static_cast<volatile char *>(fatal_failure)[0] = '\0';
+ children_[id] = ForkExecute(Hangs_, to_call).release();
+ if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
+ to_calls_[id] = reinterpret_cast<volatile FunctionToCall<void> *>(to_call);
+ return AssertionSuccess();
+ }
+ // Checks whether or not a function hung like it was supposed to.
+ // Use HangsFork first.
+ // NOTE: calls to HangsFork and HangsCheck with the same id argument will
+ // correspond, but they do not nest. Also, id 0 is used by Hangs.
+ // Return value is the same as Hangs.
+ AssertionResult HangsCheck(ChildID id) {
+ std::unique_ptr<ForkedProcess> child(children_[id]);
+ children_.erase(id);
+ const ForkedProcess::JoinResult result = child->Join();
+ if (to_calls_[id]->failure[0] != '\0') {
+ return AssertionFailure() << "function says: "
+ << const_cast<char *>(to_calls_[id]->failure);
+ }
+ if (result == ForkedProcess::JoinResult::Finished) {
+ return !to_calls_[id]->expected ? AssertionSuccess() : (AssertionFailure()
+ << "something happened and the the test only got to "
+ << ResultTypeString(to_calls_[id]->result));
+ } else {
+ if (to_calls_[id]->result == ResultType::Called) {
+ return to_calls_[id]->expected ? AssertionSuccess() : AssertionFailure();
+ } else {
+ return AssertionFailure() << "something weird happened";
+ }
+ }
+ }
+#define EXPECT_HANGS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, true, EXPECT_TRUE)
+#define EXPECT_RETURNS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_TRUE)
+#define EXPECT_RETURNS_FAILS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_FALSE)
+#define EXPECT_HANGS_COND(function, arg, hangs, cond) do { \
+ cond(Hangs(function, arg, hangs)); \
+ if (fatal_failure[0] != '\0') { \
+ FAIL() << fatal_failure; \
+ } \
+} while (false)
+
+ struct TestMessage {
+ int16_t data; // don't really want to test empty messages
+ };
+ struct MessageArgs {
+ aos_queue *const queue;
+ int flags;
+ int16_t data; // -1 means NULL expected
+ };
+ static void WriteTestMessage(MessageArgs *args, char *failure) {
+ TestMessage *msg = reinterpret_cast<TestMessage *>(aos_queue_get_msg(args->queue));
+ if (msg == NULL) {
+ snprintf(fatal_failure, kFailureSize, "couldn't get_msg from %p", args->queue);
+ return;
+ }
+ msg->data = args->data;
+ if (aos_queue_write_msg_free(args->queue, msg, args->flags) == -1) {
+ snprintf(failure, kFailureSize, "write_msg_free(%p, %p, %d) failed",
+ args->queue, msg, args->flags);
+ }
+ }
+ static void ReadTestMessage(MessageArgs *args, char *failure) {
+ const TestMessage *msg = reinterpret_cast<const TestMessage *>(
+ aos_queue_read_msg(args->queue, args->flags));
+ if (msg == NULL) {
+ if (args->data != -1) {
+ snprintf(failure, kFailureSize, "expected data of %"PRId16" but got NULL message",
+ args->data);
+ }
+ } else {
+ if (args->data != msg->data) {
+ snprintf(failure, kFailureSize,
+ "expected data of %"PRId16" but got %"PRId16" instead",
+ args->data, msg->data);
+ }
+ aos_queue_free_msg(args->queue, msg);
+ }
+ }
+};
+char *QueueTest::fatal_failure;
+std::map<QueueTest::ChildID, QueueTest::ForkedProcess *> QueueTest::children_;
+
+TEST_F(QueueTest, Reading) {
+ static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
+ aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+ MessageArgs args{queue, 0, -1};
+
+ EXPECT_EQ(BLOCK, 0);
+ EXPECT_EQ(BLOCK | FROM_END, FROM_END);
+
+ args.flags = NON_BLOCK;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = NON_BLOCK | PEEK;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = PEEK;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.data = 254;
+ args.flags = BLOCK;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = PEEK;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = PEEK;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = PEEK | NON_BLOCK;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ args.data = -1;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = NON_BLOCK;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ args.data = 971;
+ EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
+}
+TEST_F(QueueTest, Writing) {
+ static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
+ aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+ MessageArgs args{queue, 0, 973};
+
+ args.flags = BLOCK;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = BLOCK;
+ EXPECT_HANGS(WriteTestMessage, &args);
+ args.flags = NON_BLOCK;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = NON_BLOCK;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = PEEK;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.data = 971;
+ args.flags = OVERRIDE;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = OVERRIDE;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = NON_BLOCK;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = OVERRIDE;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+}
+
+TEST_F(QueueTest, MultiRead) {
+ static const aos_type_sig signature{sizeof(TestMessage), 1, 1};
+ aos_queue *const queue = aos_fetch_queue("Queue", &signature);
+ MessageArgs args{queue, 0, 1323};
+
+ args.flags = BLOCK;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = BLOCK;
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
+ EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
+ // TODO(brians) finish this
+}
+
+TEST_F(QueueTest, Recycle) {
+ // TODO(brians) basic test of recycle queue
+ // include all of the ways a message can get into the recycle queue
+ static const aos_type_sig signature{sizeof(TestMessage), 1, 2},
+ recycle_signature{sizeof(TestMessage), 2, 2};
+ aos_queue *recycle_queue = reinterpret_cast<aos_queue *>(23);
+ aos_queue *const queue = aos_fetch_queue_recycle("Queue", &signature,
+ &recycle_signature, &recycle_queue);
+ ASSERT_NE(reinterpret_cast<aos_queue *>(23), recycle_queue);
+ MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
+
+ args.flags = BLOCK;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 254;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 971;
+ args.flags = OVERRIDE;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ recycle.flags = BLOCK;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ TestMessage *msg = static_cast<TestMessage *>(aos_queue_get_msg(queue));
+ ASSERT_TRUE(msg != NULL);
+ msg->data = 341;
+ aos_queue_free_msg(queue, msg);
+ recycle.data = 341;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ args.data = 254;
+ args.flags = PEEK;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.flags = BLOCK;
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.flags = BLOCK;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.data = 254;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+}
+
diff --git a/aos/atom_code/ipc_lib/resource.c b/aos/atom_code/ipc_lib/resource.c
new file mode 100644
index 0000000..472191f
--- /dev/null
+++ b/aos/atom_code/ipc_lib/resource.c
@@ -0,0 +1,199 @@
+//#define TESTING_ASSERT(...)
+#define TESTING_ASSERT(cond, desc, args...) if(!(cond)){fprintf(stderr, "error: " desc " at " __FILE__ ": %d\n", ##args, __LINE__);}
+#define TESTING_ASSERT_RETURN(cond, desc, args...) TESTING_ASSERT(cond, desc, ##args); if(!(cond)){return 1;}
+// leave TESTING_ASSERT_RETURN (segfaults result otherwise)
+
+#include "resource_internal.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+#include <errno.h>
+
+int RESOURCE_KILL_SIGNAL;
+
+__attribute__((constructor)) void __constructor(){
+ RESOURCE_KILL_SIGNAL = SIGRTMIN + 5;
+}
+
+void aos_resource_init(uint16_t num){
+ aos_resource *resource = aos_resource_get(num);
+ resource->num = num;
+ resource->resource_mutex = 0;
+ resource->modify = 0;
+ resource->request = 0;
+ resource->kill = 0;
+ resource->owner = 0;
+ resource->priorities = shm_malloc(sizeof(struct HeapStruct) + AOS_RESOURCE_PRIORITY_STACK_LENGTH * sizeof(uint8_t));
+ resource->priorities->Elements = (uint8_t *)((uintptr_t)resource->priorities + sizeof(struct HeapStruct)); // only do 1 shm_malloc (directly above)
+ Initialize(AOS_RESOURCE_PRIORITY_STACK_LENGTH, resource->priorities);
+ AOS_RESOURCE_STATE_SET_ON(AOS_RESOURCE_STATE_WANTS_IT, num, aos_resource_entity_root_get());
+}
+void aos_resource_entity_root_create(){
+ global_core->mem_struct->resources.root = aos_resource_entity_create(0);
+}
+inline aos_resource_entity *aos_resource_entity_root_get(){
+ return global_core->mem_struct->resources.root;
+}
+inline aos_resource *aos_resource_get(uint16_t num){
+ return &global_core->mem_struct->resources.resources[num];
+}
+
+aos_resource_entity *aos_resource_entity_create(uint8_t base_priority){
+ aos_resource_entity *local = shm_malloc(sizeof(aos_resource_entity));
+ memset(local->state, 0x00, sizeof(local->state));
+ local->base_priority = base_priority;
+ local->parent = NULL; // for the root entity
+ return local;
+}
+int aos_resource_entity_set_parent(aos_resource_entity *local, aos_resource_entity *parent){
+ TESTING_ASSERT_RETURN(local != NULL, "do not have a local entity");
+ TESTING_ASSERT_RETURN(parent != local, "can't set parent to self");
+ TESTING_ASSERT_RETURN(parent != NULL, "have to have a parent to set to");
+ local->parent = parent;
+ if(parent->parent == NULL){
+ local->root_action = getpid();
+ }else{
+ local->root_action = parent->root_action;
+ }
+ if(parent->priority > local->base_priority){
+ local->priority = parent->priority;
+ }else{
+ local->priority = local->base_priority;
+ }
+ if(local->state[0] != 0 || memcmp(local->state, local->state + 1, sizeof(local->state) - 1)){ // if it's not all 0s
+ TESTING_ASSERT(0, "local->state isn't all 0s when changing parents (fixing it)");
+ memset(local->state, 0x00, sizeof(local->state));
+ }
+ return 0;
+}
+
+void aos_resource_kill(pid_t action){
+ union sigval sival;
+ sival.sival_int = 0;
+ if(sigqueue(action, RESOURCE_KILL_SIGNAL, sival) < 0){ // if sending the signal failed
+ fprintf(stderr, "sigqueue RESOURCE_KILL_SIGNAL (which is %d) with pid %d failed with errno %d ", RESOURCE_KILL_SIGNAL, action, errno);
+ perror(NULL);
+ }
+}
+int aos_resource_request(aos_resource_entity *local, aos_resource *resource){
+ TESTING_ASSERT_RETURN(local != NULL, "do not have a local entity");
+
+ if(mutex_lock(&resource->request))
+ return 1;
+ if(mutex_lock(&resource->kill)){
+ mutex_unlock(&resource->request);
+ return 1;
+ }
+ if(mutex_lock(&resource->modify)){
+ mutex_unlock(&resource->kill);
+ mutex_unlock(&resource->request);
+ return 1;
+ }
+
+ aos_resource_entity *c = local;
+ while(c->parent != NULL && !AOS_RESOURCE_STATE_GET_HAS_IT(resource->num, c)){
+ c = c->parent;
+ }
+ TESTING_ASSERT((c->parent == NULL) == (c == aos_resource_entity_root_get()), "found a resource with no parent that isn't the root")
+ if(c->parent == NULL && !AOS_RESOURCE_STATE_GET_WANTS_IT(resource->num, c)){ // if c is the root entity and doesn't want it
+ TESTING_ASSERT(0, "root entity does not want resource %d (will fix it)", resource->num);
+ *((int *)NULL) = 0;
+ AOS_RESOURCE_STATE_SET_ON(AOS_RESOURCE_STATE_WANTS_IT, resource->num, c);
+ }
+ uint8_t locked_resource_mutex = 0;
+ if(AOS_RESOURCE_STATE_GET_HAS_PASSED_DOWN(resource->num, c)){
+ if(c->parent == NULL){
+ if(GetMin(resource->priorities) >= local->priority){
+ mutex_unlock(&resource->modify);
+ aos_resource_kill(resource->owner);
+ if(mutex_lock(&resource->kill)){ // got released by one that got killed (after unlocking resource_mutex)
+ mutex_unlock(&resource->resource_mutex);
+ mutex_unlock(&resource->request);
+ return 1;
+ }
+ if(mutex_lock(&resource->resource_mutex)){ // wait for the other process to release it
+ mutex_unlock(&resource->request);
+ return 1;
+ }
+ locked_resource_mutex = 1;
+ if(mutex_lock(&resource->modify)){
+ mutex_unlock(&resource->request);
+ return 1;
+ }
+ }else{
+ mutex_unlock(&resource->modify);
+ mutex_unlock(&resource->request);
+ return -1;
+ }
+ }else{
+ fprintf(stderr, "PROGRAMMER ERROR!!!!! 2 sub-actions both requested resource %d!!! stopping the root action\n", resource->num);
+ mutex_unlock(&resource->modify);
+ mutex_unlock(&resource->request);
+ return -1;
+ }
+ }
+ AOS_RESOURCE_STATE_SET_ON(AOS_RESOURCE_STATE_WANTS_IT, resource->num, local);
+ aos_resource_entity *c2 = local;
+ do{
+ c2 = c2->parent;
+ TESTING_ASSERT_RETURN(c2 != NULL, "couldn't find the parent that has resource %d", resource->num)
+ AOS_RESOURCE_STATE_SET_ON(AOS_RESOURCE_STATE_HAS_PASSED_DOWN, resource->num, c2);
+ } while(c2 != c);
+ TESTING_ASSERT(c2 == c, "c2 != c");
+ if(c->parent == NULL){ // if you found the root entity
+ resource->owner = local->root_action;
+ if(!locked_resource_mutex){
+ int rv = mutex_trylock(&resource->resource_mutex); // don't deadlock if somebody died or whatever
+ TESTING_ASSERT(rv == 0, "the resource_mutex was already locked when getting %d from the root", resource->num);
+ }
+ }else{
+ TESTING_ASSERT(resource->owner == local->root_action, "my action chain has resource %d, but my chain's root(%d) isn't the owner(%d)", resource->num, local->root_action, resource->owner);
+ TESTING_ASSERT(mutex_trylock(&resource->resource_mutex) != 0, "my action has the resource_mutex for %d, but the resource_mutex wasn't already locked", resource->num);
+ }
+ if(Insert(local->priority, resource->priorities) < 0){
+ fprintf(stderr, "BAD NEWS: ran out of space on the priority heap for resource %d. Increase the size of AOS_RESOURCE_PRIORITY_STACK_LENGTH in resource.h\n", resource->num);
+ mutex_unlock(&resource->modify);
+ mutex_unlock(&resource->request);
+ return -1;
+ }
+ mutex_unlock(&resource->modify);
+ mutex_unlock(&resource->kill);
+ mutex_unlock(&resource->request);
+ return 0;
+}
+
+int aos_resource_release(aos_resource_entity *local, aos_resource *resource){
+ TESTING_ASSERT_RETURN(local != NULL, "do not have a local entity");
+
+ if(mutex_lock(&resource->modify)){
+ return 1;
+ }
+
+ AOS_RESOURCE_STATE_SET_OFF(AOS_RESOURCE_STATE_WANTS_IT, resource->num, local);
+ if(!AOS_RESOURCE_STATE_GET_HAS_PASSED_DOWN(resource->num, local)){ // if we're actually supposed to go release it
+ aos_resource_entity *c = local;
+ while(c->parent != NULL && !AOS_RESOURCE_STATE_GET_WANTS_IT(resource->num, c)){
+ AOS_RESOURCE_STATE_SET_OFF(AOS_RESOURCE_STATE_HAS_PASSED_DOWN, resource->num, c);
+ c = c->parent;
+ }
+ if(c->parent == NULL && !AOS_RESOURCE_STATE_GET_WANTS_IT(resource->num, c)){ // if c is the root entity and doesn't want it
+ TESTING_ASSERT(0, "root entity does not want resource %d (will fix it)", resource->num);
+ AOS_RESOURCE_STATE_SET_ON(AOS_RESOURCE_STATE_WANTS_IT, resource->num, c);
+ }
+ AOS_RESOURCE_STATE_SET_OFF(AOS_RESOURCE_STATE_HAS_PASSED_DOWN, resource->num, c);
+ Remove(local->priority, resource->priorities);
+ TESTING_ASSERT(local->root_action == resource->owner, "freeing a resource (%d) whose owner(%d) isn't my chain's root(%d)", resource->num, resource->owner, local->root_action);
+ if(c->parent == NULL){ // if you gave it back to the root entity (c)
+ TESTING_ASSERT(IsEmpty(resource->priorities), "priority stack isn't empty (size=%d)", GetSize(resource->priorities));
+ resource->owner = 0;
+ mutex_unlock(&resource->resource_mutex);
+ mutex_unlock(&resource->kill);
+ }
+ } // else has passed it down
+ mutex_unlock(&resource->modify);
+ return 0;
+}
+
diff --git a/aos/atom_code/ipc_lib/resource.h b/aos/atom_code/ipc_lib/resource.h
new file mode 100644
index 0000000..de1de1a
--- /dev/null
+++ b/aos/atom_code/ipc_lib/resource.h
@@ -0,0 +1,28 @@
+#ifndef __AOS_RESOURCE_H_
+#define __AOS_RESOURCE_H_
+
+// notes at <https://docs.google.com/document/d/1gzRrVcqL2X9VgNQUI5DrvLVVVziIH7c5ZerATVbiS7U/edit?hl=en_US>
+
+#include <sys/types.h>
+#include "shared_mem.h"
+#include "binheap.h"
+#include "aos_sync.h"
+#include "core_lib.h"
+
+#define AOS_RESOURCE_PRIORITY_STACK_LENGTH 50
+
+extern int RESOURCE_KILL_SIGNAL;
+
+/* Return Values
+ 0 = success
+ -1 = you should stop (like if you got killed) (only for request)
+ 1 = error
+*/
+int aos_resource_request(aos_resource_entity *local, aos_resource *resource);
+int aos_resource_release(aos_resource_entity *local, aos_resource *resource);
+int aos_resource_entity_set_parent(aos_resource_entity *local, aos_resource_entity *parent);
+aos_resource_entity *aos_resource_entity_create(uint8_t base_priority);
+aos_resource *aos_resource_get(uint16_t num);
+aos_resource_entity *aos_resource_entity_root_get(void);
+
+#endif
diff --git a/aos/atom_code/ipc_lib/resource_core.h b/aos/atom_code/ipc_lib/resource_core.h
new file mode 100644
index 0000000..27d1af8
--- /dev/null
+++ b/aos/atom_code/ipc_lib/resource_core.h
@@ -0,0 +1,28 @@
+#ifndef AOS_ATOM_CODE_IPC_LIB_RESOURCE_CORE_H_
+#define AOS_ATOM_CODE_IPC_LIB_RESOURCE_CORE_H_
+// this file has to be separate due to #include order dependencies
+
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+#include "aos/atom_code/ipc_lib/binheap.h"
+#include "aos/ResourceList.h"
+
+typedef struct aos_resource_t{
+ mutex resource_mutex; // gets locked whenever resource is taken from root entity and unlocked when its given back
+ mutex modify; // needs to be locked while somebody is requesting or releasing this resource
+ mutex request; // needs to be locked while somebody is requesting this resource (including possibly waiting for somebody else to release it after being killed)
+ mutex kill; // gets locked while somebody is dying for this resource (gets unlocked whenever this resource gets given back to the root entity)
+ pid_t owner;
+ PriorityQueue priorities; // lower number = higher priority
+
+ uint16_t num;
+} aos_resource;
+typedef struct aos_resource_entity_t aos_resource_entity;
+typedef struct aos_resource_list_t {
+ aos_resource resources[AOS_RESOURCE_NUM];
+ aos_resource_entity *root;
+} aos_resource_list;
+void aos_resource_init(uint16_t num);
+void aos_resource_entity_root_create(void);
+
+#endif
+
diff --git a/aos/atom_code/ipc_lib/resource_internal.h b/aos/atom_code/ipc_lib/resource_internal.h
new file mode 100644
index 0000000..eebecf9
--- /dev/null
+++ b/aos/atom_code/ipc_lib/resource_internal.h
@@ -0,0 +1,27 @@
+#ifndef __AOS_RESOURCE_INTERNAL_H_
+#define __AOS_RESOURCE_INTERNAL_H_
+
+#include "resource.h"
+
+#define AOS_RESOURCE_STATES_PER_BYTE 4
+#define AOS_RESOURCE_STATE_SHIFTER(num) ((num % AOS_RESOURCE_STATES_PER_BYTE) * (8 / AOS_RESOURCE_STATES_PER_BYTE))
+#define AOS_RESOURCE_STATE_GET(num, entity) (entity->state[num / AOS_RESOURCE_STATES_PER_BYTE] >> AOS_RESOURCE_STATE_SHIFTER(num))
+#define AOS_RESOURCE_STATE_SET_ON(mask, num, entity) (entity->state[num / AOS_RESOURCE_STATES_PER_BYTE] |= (mask << AOS_RESOURCE_STATE_SHIFTER(num)))
+#define AOS_RESOURCE_STATE_SET_OFF(mask, num, entity) (entity->state[num / AOS_RESOURCE_STATES_PER_BYTE] &= ~(mask << AOS_RESOURCE_STATE_SHIFTER(num)))
+#define AOS_RESOURCE_STATE_GET_HAS_IT(num, entity) (AOS_RESOURCE_STATE_GET_WANTS_IT(num, entity) || AOS_RESOURCE_STATE_GET_HAS_PASSED_DOWN(num, entity))
+#define AOS_RESOURCE_STATE_WANTS_IT 0x01
+#define AOS_RESOURCE_STATE_HAS_PASSED_DOWN 0x02
+#define AOS_RESOURCE_STATE_GET_WANTS_IT(num, entity) (AOS_RESOURCE_STATE_GET(num, entity) & AOS_RESOURCE_STATE_WANTS_IT)
+#define AOS_RESOURCE_STATE_GET_HAS_PASSED_DOWN(num, entity) (AOS_RESOURCE_STATE_GET(num, entity) & AOS_RESOURCE_STATE_HAS_PASSED_DOWN)
+
+struct aos_resource_entity_t{
+ aos_resource_entity *parent;
+ uint8_t state[(AOS_RESOURCE_NUM + (AOS_RESOURCE_STATES_PER_BYTE - 1)) / AOS_RESOURCE_STATES_PER_BYTE];
+ pid_t root_action;
+ uint8_t base_priority, priority;
+};
+
+inline void aos_resource_init(uint16_t num);
+
+#endif
+
diff --git a/aos/atom_code/ipc_lib/resource_list.txt b/aos/atom_code/ipc_lib/resource_list.txt
new file mode 100644
index 0000000..1aabdca
--- /dev/null
+++ b/aos/atom_code/ipc_lib/resource_list.txt
@@ -0,0 +1,4 @@
+test_resource1
+test_resource2
+drivetrain_left
+drivetrain_right
diff --git a/aos/atom_code/ipc_lib/resource_test.cpp b/aos/atom_code/ipc_lib/resource_test.cpp
new file mode 100644
index 0000000..9ce6e06
--- /dev/null
+++ b/aos/atom_code/ipc_lib/resource_test.cpp
@@ -0,0 +1,28 @@
+#include "aos/atom_code/ipc_lib/sharedmem_test_setup.h"
+#include "aos/aos_core.h"
+
+#include <gtest/gtest.h>
+#include <gtest/gtest-spi.h>
+
+class ResourceTest : public SharedMemTestSetup{
+};
+
+TEST_F(ResourceTest, GetResource){
+ aos_resource *first = aos_resource_get(1);
+ AllSharedMemAllocated();
+ aos_resource *second = aos_resource_get(1);
+ EXPECT_EQ(first, second);
+}
+TEST_F(ResourceTest, CheckLocal){
+ EXPECT_EQ(1, aos_resource_request(NULL, aos_resource_get(2)));
+}
+
+TEST_F(ResourceTest, LocalCreate){
+ EXPECT_TRUE(aos_resource_entity_create(56) != NULL);
+}
+TEST_F(ResourceTest, LocalSetParentSelf){
+ aos_resource_entity *local = aos_resource_entity_create(76);
+ ASSERT_TRUE(local != NULL);
+ EXPECT_EQ(1, aos_resource_entity_set_parent(local, local));
+}
+
diff --git a/aos/atom_code/ipc_lib/shared_mem.c b/aos/atom_code/ipc_lib/shared_mem.c
new file mode 100644
index 0000000..f631b78
--- /dev/null
+++ b/aos/atom_code/ipc_lib/shared_mem.c
@@ -0,0 +1,119 @@
+#include "shared_mem.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <errno.h>
+
+// the path for the shared memory segment. see shm_open(3) for restrictions
+#define AOS_SHM_NAME "/aos_shared_mem"
+// Size of the shared mem segment.
+// set to the maximum number that worked
+#define SIZEOFSHMSEG (4096 * 27813)
+
+ptrdiff_t aos_core_get_mem_usage(void) {
+ return global_core->size -
+ ((ptrdiff_t)global_core->mem_struct->msg_alloc -
+ (ptrdiff_t)global_core->mem_struct);
+}
+
+struct aos_core global_core_data;
+struct aos_core *global_core = NULL;
+
+int aos_core_create_shared_mem(enum aos_core_create to_create) {
+ global_core = &global_core_data;
+ int shm;
+before:
+ if (to_create == create) {
+ printf("shared_mem: creating\n");
+ shm = shm_open(AOS_SHM_NAME, O_RDWR | O_CREAT | O_EXCL, 0666);
+ global_core->owner = 1;
+ if (shm == -1 && errno == EEXIST) {
+ printf("shared_mem: going to shm_unlink(" AOS_SHM_NAME ")\n");
+ if (shm_unlink(AOS_SHM_NAME) == -1) {
+ fprintf(stderr, "shared_mem: shm_unlink(" AOS_SHM_NAME ") failed with of %d: %s\n", errno, strerror(errno));
+ } else {
+ goto before;
+ }
+ }
+ } else {
+ printf("shared_mem: not creating\n");
+ shm = shm_open(AOS_SHM_NAME, O_RDWR, 0);
+ global_core->owner = 0;
+ }
+ if (shm == -1) {
+ fprintf(stderr, "shared_mem:"
+ " shm_open(" AOS_SHM_NAME ", O_RDWR [| O_CREAT | O_EXCL, 0|0666)"
+ " failed with %d: %s\n", errno, strerror(errno));
+ return -1;
+ }
+ if (global_core->owner) {
+ if (ftruncate(shm, SIZEOFSHMSEG) == -1) {
+ fprintf(stderr, "shared_mem: fruncate(%d, 0x%zx) failed with %d: %s\n",
+ shm, (size_t)SIZEOFSHMSEG, errno, strerror(errno));
+ return -1;
+ }
+ }
+ void *shm_address = mmap(
+ (void *)SHM_START, SIZEOFSHMSEG, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_FIXED | MAP_LOCKED | MAP_POPULATE, shm, 0);
+ if (shm_address == MAP_FAILED) {
+ fprintf(stderr, "shared_mem: mmap(%p, 0x%zx, stuff, stuff, %d, 0) failed"
+ " with %d: %s\n",
+ (void *)SHM_START, SIZEOFSHMSEG, shm, errno, strerror(errno));
+ return -1;
+ }
+ printf("shared_mem: shm at: %p\n", shm_address);
+ if (close(shm) == -1) {
+ printf("shared_mem: close(%d(=shm) failed with %d: %s\n",
+ shm, errno, strerror(errno));
+ }
+ if (shm_address != (void *)SHM_START) {
+ fprintf(stderr, "shared_mem: shm isn't at hard-coded %p. at %p instead\n",
+ (void *)SHM_START, shm_address);
+ return -1;
+ }
+ return aos_core_use_address_as_shared_mem(shm_address, SIZEOFSHMSEG);
+}
+
+int aos_core_use_address_as_shared_mem(void *address, size_t size) {
+ global_core->mem_struct = address;
+ global_core->size = size;
+ global_core->shared_mem = (uint8_t *)address + sizeof(*global_core->mem_struct);
+ if (global_core->owner) {
+ global_core->mem_struct->msg_alloc = (uint8_t *)address + global_core->size;
+ init_shared_mem_core(global_core->mem_struct);
+ }
+ if (global_core->owner) {
+ condition_set(&global_core->mem_struct->creation_condition);
+ } else {
+ if (condition_wait(&global_core->mem_struct->creation_condition) != 0) {
+ fprintf(stderr, "waiting on creation_condition failed\n");
+ return -1;
+ }
+ }
+ fprintf(stderr, "shared_mem: end of create_shared_mem owner=%d\n",
+ global_core->owner);
+ return 0;
+}
+
+int aos_core_free_shared_mem(){
+ void *shm_address = global_core->shared_mem;
+ if (munmap((void *)SHM_START, SIZEOFSHMSEG) == -1) {
+ fprintf(stderr, "shared_mem: munmap(%p, 0x%zx) failed with %d: %s\n",
+ shm_address, SIZEOFSHMSEG, errno, strerror(errno));
+ return -1;
+ }
+ if (global_core->owner) {
+ if (shm_unlink(AOS_SHM_NAME)) {
+ fprintf(stderr, "shared_mem: shm_unlink(" AOS_SHM_NAME ") failed with %d: %s\n",
+ errno, strerror(errno));
+ return -1;
+ }
+ }
+ return 0;
+}
+
diff --git a/aos/atom_code/ipc_lib/shared_mem.h b/aos/atom_code/ipc_lib/shared_mem.h
new file mode 100644
index 0000000..b1a2608
--- /dev/null
+++ b/aos/atom_code/ipc_lib/shared_mem.h
@@ -0,0 +1,46 @@
+#ifndef _SHARED_MEM_H_
+#define _SHARED_MEM_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "core_lib.h"
+#include <stddef.h>
+#include <unistd.h>
+
+// Where the shared memory segment starts in each process's address space.
+// Has to be the same in all of them so that stuff in shared memory
+// can have regular pointers to other stuff in shared memory.
+#define SHM_START 0x20000000
+
+enum aos_core_create {
+ create,
+ reference
+};
+struct aos_core {
+ int owner;
+ void *shared_mem;
+ // How large the chunk of shared memory is.
+ ptrdiff_t size;
+ aos_shm_core *mem_struct;
+};
+
+ptrdiff_t aos_core_get_mem_usage(void);
+
+// Takes the specified memory address and uses it as the shared memory.
+// address is the memory address, and size is the size of the memory.
+// global_core needs to point to an instance of struct aos_core, and owner
+// should be set correctly there.
+// The owner should verify that the first sizeof(mutex) of data is set to 0
+// before passing the memory to this function.
+int aos_core_use_address_as_shared_mem(void *address, size_t size);
+
+int aos_core_create_shared_mem(enum aos_core_create to_create);
+int aos_core_free_shared_mem(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/aos/atom_code/ipc_lib/sharedmem_test_setup.h b/aos/atom_code/ipc_lib/sharedmem_test_setup.h
new file mode 100644
index 0000000..e461c43
--- /dev/null
+++ b/aos/atom_code/ipc_lib/sharedmem_test_setup.h
@@ -0,0 +1,147 @@
+// defines a fixture (SharedMemTestSetup) that sets up shared memory
+
+extern "C" {
+#include "shared_mem.h"
+ extern struct aos_core *global_core;
+}
+
+#include <signal.h>
+
+#include <gtest/gtest.h>
+#include <sys/types.h>
+
+// TODO(brians) read logs from here
+class SharedMemTestSetup : public testing::Test{
+ protected:
+ pid_t core;
+ int start[2];
+ int memcheck[2];
+ static void signal_handler(int){
+ if(aos_core_free_shared_mem()){
+ exit(- 1);
+ }
+ exit(0);
+ }
+ static int get_mem_usage(){
+ return global_core->size - ((uint8_t *)global_core->mem_struct->msg_alloc - (uint8_t *)SHM_START);
+ }
+ bool checking_mem;
+
+ virtual void BeforeLocalShmSetup() {}
+ virtual void SetUp(){
+ ASSERT_EQ(0, pipe(start)) << "couldn't create start pipes";
+ ASSERT_EQ(0, pipe(memcheck)) << "couldn't create memcheck pipes";
+ checking_mem = false;
+ if((core = fork()) == 0){
+ close(start[0]);
+ close(memcheck[1]);
+ struct sigaction act;
+ act.sa_handler = signal_handler;
+ sigaction(SIGINT, &act, NULL);
+ if(aos_core_create_shared_mem(create)){
+ exit(-1);
+ }
+ write_pipe(start[1], "a", 1);
+ int usage = 0;
+ while(1){
+ char buf1;
+ read_pipe(memcheck[0], &buf1, 1);
+ if(usage == 0)
+ usage = get_mem_usage();
+ if(usage == get_mem_usage())
+ buf1 = 1;
+ else
+ buf1 = 0;
+ write_pipe(start[1], &buf1, 1);
+ }
+ }
+ close(start[1]);
+ close(memcheck[0]);
+ ASSERT_NE(-1, core) << "fork failed";
+ char buf;
+ read_pipe(start[0], &buf, 1);
+
+ BeforeLocalShmSetup();
+
+ ASSERT_EQ(0, aos_core_create_shared_mem(reference)) << "couldn't create shared mem reference";
+ }
+ virtual void TearDown(){
+ if(checking_mem){
+ write_pipe(memcheck[1], "a", 1);
+ char buf;
+ read_pipe(start[0], &buf, 1);
+ EXPECT_EQ(1, buf) << "memory got leaked";
+ }
+ EXPECT_EQ(0, aos_core_free_shared_mem()) << "issues freeing shared mem";
+ if(core > 0){
+ kill(core, SIGINT);
+ siginfo_t status;
+ ASSERT_EQ(0, waitid(P_PID, core, &status, WEXITED)) << "waiting for the core to finish failed";
+ EXPECT_EQ(CLD_EXITED, status.si_code) << "core died";
+ EXPECT_EQ(0, status.si_status) << "core exited with an error";
+ }
+ }
+ // if any more shared memory gets allocated after calling this and not freed by the end, it's an error
+ void AllSharedMemAllocated(){
+ checking_mem = true;
+ write_pipe(memcheck[1], "a", 1);
+ char buf;
+ read_pipe(start[0], &buf, 1);
+ }
+ private:
+ // Wrapper functions for pipes because they should never have errors.
+ void read_pipe(int fd, void *buf, size_t count) {
+ if (read(fd, buf, count) < 0) abort();
+ }
+ void write_pipe(int fd, const void *buf, size_t count) {
+ if (write(fd, buf, count) < 0) abort();
+ }
+};
+class ExecVeTestSetup : public SharedMemTestSetup {
+ protected:
+ std::vector<std::string> files;
+ std::vector<pid_t> pids;
+ virtual void BeforeLocalShmSetup(){
+ std::vector<std::string>::iterator it;
+ pid_t child;
+ for(it = files.begin(); it < files.end(); ++it){
+ if((child = fork()) == 0){
+ char *null = NULL;
+ execve(it->c_str(), &null, &null);
+ ADD_FAILURE() << "execve failed";
+ perror("execve");
+ exit(0);
+ }
+ if(child > 0)
+ pids.push_back(child);
+ else
+ ADD_FAILURE() << "fork failed return=" << child;
+ }
+ usleep(150000);
+ }
+ virtual void TearDown(){
+ std::vector<pid_t>::iterator it;
+ siginfo_t status;
+ for(it = pids.begin(); it < pids.end(); ++it){
+ printf("attempting to SIGINT(%d) %d\n", SIGINT, *it);
+ if(*it > 0){
+ kill(*it, SIGINT);
+ ASSERT_EQ(0, waitid(P_PID, *it, &status, WEXITED)) << "waiting for the AsyncAction(pid=" << *it << ") to finish failed";
+ EXPECT_EQ(CLD_EXITED, status.si_code) << "child died (killed by signal is " << (int)CLD_KILLED << ")";
+ EXPECT_EQ(0, status.si_status) << "child exited with an error";
+ }else{
+ FAIL();
+ }
+ }
+
+ SharedMemTestSetup::TearDown();
+ }
+ // call this _before_ ExecVeTestSetup::SetUp()
+ void AddProcess(const std::string file){
+ files.push_back(file);
+ }
+ void PercolatePause(){
+ usleep(50000);
+ }
+};
+