switched from fitpc/atom to prime/linux
Also removed a few old things that had nothing reasonable to be changed
to.
diff --git a/aos/linux_code/ipc_lib/aos_sync.c b/aos/linux_code/ipc_lib/aos_sync.c
new file mode 100644
index 0000000..52ebed1
--- /dev/null
+++ b/aos/linux_code/ipc_lib/aos_sync.c
@@ -0,0 +1,203 @@
+#include "aos/linux_code/ipc_lib/aos_sync.h"
+
+#include <stdio.h>
+#include <linux/futex.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <errno.h>
+#include <stdint.h>
+#include <limits.h>
+#include <string.h>
+#include <inttypes.h>
+
+// TODO(brians): Inline these in the new PI version.
+#define cmpxchg(ptr, o, n) __sync_val_compare_and_swap(ptr, o, n)
+static inline uint32_t xchg(mutex *pointer, uint32_t value) {
+ uint32_t result;
+ __atomic_exchange(pointer, &value, &result, __ATOMIC_SEQ_CST);
+ return result;
+}
+
+// this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
+// should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
+// (sys_set_robust_list appears to be the function name)
+// <http://locklessinc.com/articles/futex_cheat_sheet/> and
+// <http://locklessinc.com/articles/mutex_cv_futex/> are useful
+// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009 (fairly recent compared to everything else...)
+// can't use PRIVATE futex operations because they use the pid (or something) as part of the hash
+//
+// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
+// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
+//
+// Values for a mutex:
+// 0 = unlocked
+// 1 = locked, not contended
+// 2 = locked, probably contended
+// Values for a "futex":
+// 0 = unset
+// 1 = set
+
+static inline int sys_futex(mutex *addr1, int op, int val1,
+ const struct timespec *timeout, void *addr2, int val3) {
+ return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+static inline int sys_futex_requeue(mutex *addr1, int op, int num_wake,
+ int num_requeue, mutex *m) {
+ return syscall(SYS_futex, addr1, op, num_wake, num_requeue, m);
+}
+static inline int sys_futex_op(mutex *addr1, int op, int num_waiters1,
+ int num_waiters2, mutex *addr2, int op_args_etc) {
+ return syscall(SYS_futex, addr1, op, num_waiters1,
+ num_waiters2, addr2, op_args_etc);
+}
+
+static inline int mutex_get(mutex *m, uint8_t signals_fail, const
+ struct timespec *timeout) {
+ int c;
+ c = cmpxchg(m, 0, 1);
+ if (!c) return 0;
+ /* The lock is now contended */
+ if (c == 1) c = xchg(m, 2);
+ while (c) {
+ /* Wait in the kernel */
+ //printf("sync here %d\n", __LINE__);
+ if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
+ if (signals_fail && errno == EINTR) {
+ return 1;
+ }
+ if (timeout != NULL && errno == ETIMEDOUT) {
+ return 2;
+ }
+ }
+ //printf("sync here %d\n", __LINE__);
+ c = xchg(m, 2);
+ }
+ return 0;
+}
+int mutex_lock(mutex *m) {
+ return mutex_get(m, 1, NULL);
+}
+int mutex_lock_timeout(mutex *m, const struct timespec *timeout) {
+ return mutex_get(m, 1, timeout);
+}
+int mutex_grab(mutex *m) {
+ return mutex_get(m, 0, NULL);
+}
+
+void mutex_unlock(mutex *m) {
+ /* Unlock, and if not contended then exit. */
+ //printf("mutex_unlock(%p) => %d \n",m,*m);
+ switch (xchg(m, 0)) {
+ case 0:
+ fprintf(stderr, "sync: multiple unlock of %p. aborting\n", m);
+ printf("see stderr\n");
+ abort();
+ case 1:
+ //printf("mutex_unlock return(%p) => %d \n",m,*m);
+ break;
+ case 2:
+ if (sys_futex(m, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+ fprintf(stderr, "sync: waking 1 from %p failed with %d: %s\n",
+ m, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ } else {
+ break;
+ }
+ default:
+ fprintf(stderr, "sync: got a garbage value from mutex %p. aborting\n",
+ m);
+ printf("see stderr\n");
+ abort();
+ }
+}
+int mutex_trylock(mutex *m) {
+ /* Try to take the lock, if is currently unlocked */
+ unsigned c = cmpxchg(m, 0, 1);
+ if (!c) return 0;
+ return 1;
+}
+
+int futex_wait(mutex *m) {
+ if (*m) {
+ return 0;
+ }
+ if (sys_futex(m, FUTEX_WAIT, 0, NULL, NULL, 0) == -1) {
+ if (errno == EINTR) {
+ return 1;
+ } else if (errno != EWOULDBLOCK) {
+ return -1;
+ }
+ }
+ return 0;
+}
+int futex_set_value(mutex *m, mutex value) {
+ xchg(m, value);
+ return sys_futex(m, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+}
+int futex_set(mutex *m) {
+ return futex_set_value(m, 1);
+}
+int futex_unset(mutex *m) {
+ return !xchg(m, 0);
+}
+
+void condition_wait(mutex *c, mutex *m) {
+ const mutex wait_start = *c;
+
+ mutex_unlock(m);
+
+ while (1) {
+ if (sys_futex(c, FUTEX_WAIT, wait_start, NULL, NULL, 0) == -1) {
+ // If it failed for some reason other than somebody else doing a wake
+ // before we actually made it to sleep.
+ if (__builtin_expect(*c == wait_start, 0)) {
+ // Try again if it was because of a signal.
+ if (errno == EINTR) continue;
+ fprintf(stderr, "FUTEX_WAIT(%p, %"PRIu32", NULL, NULL, 0) failed"
+ " with %d: %s\n",
+ c, wait_start, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ }
+ // Simplified mutex_lock that always leaves it
+ // contended in case anybody else got requeued.
+ while (xchg(m, 2) != 0) {
+ if (sys_futex(m, FUTEX_WAIT, 2, NULL, NULL, 0) == -1) {
+ // Try again if it was because of a signal or somebody else unlocked it
+ // before we went to sleep.
+ if (errno == EINTR || errno == EWOULDBLOCK) continue;
+ fprintf(stderr, "sync: FUTEX_WAIT(%p, 2, NULL, NULL, 0)"
+ " failed with %d: %s\n",
+ m, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ }
+ return;
+ }
+}
+
+void condition_signal(mutex *c) {
+ __sync_fetch_and_add(c, 1);
+ if (sys_futex(c, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+ fprintf(stderr, "sync: FUTEX_WAKE(%p, 1, NULL, NULL, 0)"
+ " failed with %d: %s\n",
+ c, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+}
+
+void condition_broadcast(mutex *c, mutex *m) {
+ __sync_fetch_and_add(c, 1);
+ // Wake 1 waiter and requeue the rest.
+ if (sys_futex_requeue(c, FUTEX_REQUEUE, 1, INT_MAX, m) == -1) {
+ fprintf(stderr, "sync: FUTEX_REQUEUE(%p, 1, INT_MAX, %p, 0)"
+ " failed with %d: %s\n",
+ c, m, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+}
diff --git a/aos/linux_code/ipc_lib/aos_sync.h b/aos/linux_code/ipc_lib/aos_sync.h
new file mode 100644
index 0000000..7a81ca3
--- /dev/null
+++ b/aos/linux_code/ipc_lib/aos_sync.h
@@ -0,0 +1,96 @@
+#ifndef AOS_LINUX_CODE_IPC_LIB_SYNC_H_
+#define AOS_LINUX_CODE_IPC_LIB_SYNC_H_
+
+#include <stdlib.h>
+#include <signal.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+// TODO(brians) add client requests to make helgrind useful with this code
+// <http://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.client-requests>
+// and <http://www.valgrind.org/docs/manual/drd-manual.html#drd-manual.clientreqs>
+// list the interesting ones
+
+// Have to align structs containing it to sizeof(int).
+// Valid initial values for use with mutex_ functions are 0 (unlocked) and 1 (locked).
+// Valid initial values for use with futex_ functions are 0 (unset) and 1 (set).
+// No initialization is necessary for use as c with the condition_ functions.
+// The value should not be changed after multiple processes have started
+// accessing an instance except through the functions declared in this file.
+typedef volatile uint32_t mutex __attribute__((aligned(sizeof(int))));
+
+// All return -1 for other error (which will be in errno from futex(2)).
+//
+// There is no priority inversion protection.
+// TODO(brians) look at using
+// <http://www.kernel.org/doc/Documentation/pi-futex.txt>
+
+// Returns 1 if interrupted by a signal.
+//
+// One of the highest priority processes blocked on a given mutex will be the
+// one to lock it when it is unlocked.
+int mutex_lock(mutex *m) __attribute__((warn_unused_result));
+// Returns 2 if it timed out or 1 if interrupted by a signal.
+int mutex_lock_timeout(mutex *m, const struct timespec *timeout)
+ __attribute__((warn_unused_result));
+// Ignores signals. Can not fail.
+int mutex_grab(mutex *m);
+// abort(2)s for multiple unlocking.
+void mutex_unlock(mutex *m);
+// Returns 0 when successful in locking the mutex and 1 if somebody else has it
+// locked.
+int mutex_trylock(mutex *m) __attribute__((warn_unused_result));
+
+// The futex_ functions are similar to the mutex_ ones but different.
+// They are designed for signalling when something happens (possibly to
+// multiple listeners). A mutex manipulated with them can only be set or unset.
+//
+// They are different from the condition_ functions in that they do NOT work
+// correctly as standard condition variables. While it is possible to keep
+// track of the "condition" using the value part of the futex_* functions, the
+// obvious implementation has basically the same race condition that condition
+// variables are designed to prevent between somebody else grabbing the mutex
+// and changing whether it's set or not and the futex_ function changing the
+// futex's value.
+
+// Wait for the futex to be set. Will return immediately if it's already set.
+// Returns 0 if successful or it was already set, 1 if interrupted by a signal,
+// or -1.
+int futex_wait(mutex *m) __attribute__((warn_unused_result));
+// Set the futex and wake up anybody waiting on it.
+// Returns the number that were woken or -1.
+//
+// This will always wake up all waiters at the same time and set the value to 1.
+int futex_set(mutex *m);
+// Same as above except lets something other than 1 be used as the final value.
+int futex_set_value(mutex *m, mutex value);
+// Unsets the futex (sets the value to 0).
+// Returns 0 if it was set before and 1 if it wasn't.
+// Can not fail.
+int futex_unset(mutex *m);
+
+// The condition_ functions implement condition variable support. The API is
+// similar to the pthreads api and works the same way. The same m argument must
+// be passed in for all calls to all of the condition_ functions with a given c.
+
+// Wait for the condition variable to be signalled. m will be unlocked
+// atomically with actually starting to wait. m is guaranteed to be locked when
+// this function returns.
+// NOTE: The relocking of m is not atomic with stopping the actual wait and
+// other process(es) may lock (+unlock) the mutex first.
+void condition_wait(mutex *c, mutex *m);
+// If any other processes are condition_waiting on c, wake 1 of them. Does not
+// require m to be locked.
+void condition_signal(mutex *c);
+// Wakes all processes that are condition_waiting on c. Does not require m to be
+// locked.
+void condition_broadcast(mutex *c, mutex *m);
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif // AOS_LINUX_CODE_IPC_LIB_SYNC_H_
diff --git a/aos/linux_code/ipc_lib/condition.cc b/aos/linux_code/ipc_lib/condition.cc
new file mode 100644
index 0000000..b764026
--- /dev/null
+++ b/aos/linux_code/ipc_lib/condition.cc
@@ -0,0 +1,26 @@
+#include "aos/common/condition.h"
+
+#include <inttypes.h>
+
+#include "aos/common/type_traits.h"
+
+namespace aos {
+
+static_assert(shm_ok<Condition>::value, "Condition should work"
+ " in shared memory");
+
+Condition::Condition(Mutex *m) : impl_(), m_(m) {}
+
+void Condition::Wait() {
+ condition_wait(&impl_, &m_->impl_);
+}
+
+void Condition::Signal() {
+ condition_signal(&impl_);
+}
+
+void Condition::Broadcast() {
+ condition_broadcast(&impl_, &m_->impl_);
+}
+
+} // namespace aos
diff --git a/aos/linux_code/ipc_lib/core_lib.c b/aos/linux_code/ipc_lib/core_lib.c
new file mode 100644
index 0000000..cc1ccbb
--- /dev/null
+++ b/aos/linux_code/ipc_lib/core_lib.c
@@ -0,0 +1,44 @@
+#include "aos/linux_code/ipc_lib/core_lib.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "aos/linux_code/ipc_lib/shared_mem.h"
+
+static inline uint8_t aos_8max(uint8_t l, uint8_t r) {
+ return (l > r) ? l : r;
+}
+void *shm_malloc_aligned(size_t length, uint8_t alignment) {
+ // minimum alignments from
+ // <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
+ if (length <= 1) {
+ alignment = aos_8max(alignment, 1);
+ } else if (length <= 2) {
+ alignment = aos_8max(alignment, 2);
+ } else if (length <= 4) {
+ alignment = aos_8max(alignment, 4);
+ } else if (length <= 8) {
+ alignment = aos_8max(alignment, 8);
+ } else if (length <= 16) {
+ alignment = aos_8max(alignment, 16);
+ } else {
+ alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
+ }
+
+ void *msg = NULL;
+ aos_shm_core *shm_core = global_core->mem_struct;
+ mutex_grab(&shm_core->msg_alloc_lock);
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
+ const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
+ msg = shm_core->msg_alloc;
+ if (msg <= global_core->shared_mem) {
+ fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
+ printf("if you didn't see the stderr output just then, you should have\n");
+ abort();
+ }
+ //printf("alloc %p\n", msg);
+ mutex_unlock(&shm_core->msg_alloc_lock);
+ return msg;
+}
+
diff --git a/aos/linux_code/ipc_lib/core_lib.h b/aos/linux_code/ipc_lib/core_lib.h
new file mode 100644
index 0000000..1b0d754
--- /dev/null
+++ b/aos/linux_code/ipc_lib/core_lib.h
@@ -0,0 +1,23 @@
+#ifndef _AOS_CORE_LIB_H_
+#define _AOS_CORE_LIB_H_
+
+#include <stdint.h>
+
+#include "aos/linux_code/ipc_lib/aos_sync.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+void *shm_malloc_aligned(size_t length, uint8_t alignment)
+ __attribute__((alloc_size(1)));
+static void *shm_malloc(size_t length) __attribute__((alloc_size(1)));
+static inline void *shm_malloc(size_t length) {
+ return shm_malloc_aligned(length, 0);
+}
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif
diff --git a/aos/linux_code/ipc_lib/ipc_lib.gyp b/aos/linux_code/ipc_lib/ipc_lib.gyp
new file mode 100644
index 0000000..f947d5e
--- /dev/null
+++ b/aos/linux_code/ipc_lib/ipc_lib.gyp
@@ -0,0 +1,82 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'aos_sync',
+ 'type': 'static_library',
+ 'sources': [
+ 'aos_sync.c',
+ ],
+ },
+ {
+ 'target_name': 'core_lib',
+ 'type': 'static_library',
+ 'sources': [
+ 'core_lib.c',
+ ],
+ 'dependencies': [
+ 'aos_sync',
+ 'shared_mem',
+ ],
+ 'export_dependent_settings': [
+ 'aos_sync',
+ ],
+ },
+ {
+ 'target_name': 'shared_mem',
+ 'type': 'static_library',
+ 'sources': [
+ 'shared_mem.c',
+ ],
+ 'dependencies': [
+ 'aos_sync',
+ ],
+ 'export_dependent_settings': [
+ 'aos_sync',
+ ],
+ },
+ {
+ 'target_name': 'queue',
+ 'type': 'static_library',
+ 'sources': [
+ 'queue.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/common/common.gyp:condition',
+ '<(AOS)/common/common.gyp:mutex',
+ 'core_lib',
+ # TODO(brians): fix this once there's a nice logging interface to use
+ # '<(AOS)/build/aos.gyp:logging',
+ ],
+ },
+ {
+ 'target_name': 'raw_queue_test',
+ 'type': 'executable',
+ 'sources': [
+ 'queue_test.cc',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):gtest',
+ 'queue',
+ '<(AOS)/build/aos.gyp:logging',
+ 'core_lib',
+ '<(AOS)/common/common.gyp:queue_testutils',
+ '<(AOS)/common/common.gyp:time',
+ ],
+ },
+ {
+ 'target_name': 'ipc_stress_test',
+ 'type': 'executable',
+ 'sources': [
+ 'ipc_stress_test.cc',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):gtest',
+ '<(AOS)/common/common.gyp:time',
+ '<(AOS)/common/common.gyp:queue_testutils',
+ '<(AOS)/common/common.gyp:mutex',
+ 'core_lib',
+ '<(AOS)/common/common.gyp:die',
+ ],
+ },
+ ],
+}
diff --git a/aos/linux_code/ipc_lib/ipc_stress_test.cc b/aos/linux_code/ipc_lib/ipc_stress_test.cc
new file mode 100644
index 0000000..1b55e82
--- /dev/null
+++ b/aos/linux_code/ipc_lib/ipc_stress_test.cc
@@ -0,0 +1,248 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <libgen.h>
+#include <assert.h>
+
+#include <vector>
+#include <string>
+
+#include "aos/common/time.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/common/mutex.h"
+#include "aos/linux_code/ipc_lib/core_lib.h"
+#include "aos/common/die.h"
+
+// This runs all of the IPC-related tests in a bunch of parallel processes for a
+// while and makes sure that they don't fail. It also captures the stdout and
+// stderr output from each test run and only prints it out (not interleaved with
+// the output from any other run) if the test fails.
+//
+// It's written in C++ for performance. We need actual OS-level parallelism for
+// this to work, which means that Ruby's out because it doesn't have good
+// support for doing that. My Python implementation ended up pretty heavily disk
+// IO-bound, which is a bad way to test CPU contention.
+
+namespace aos {
+
+// Each test is represented by the name of the test binary and then any
+// arguments to pass to it.
+// Using --gtest_filter is a bad idea because it seems to result in a lot of
+// swapping which causes everything to be disk-bound (at least for me).
+static const ::std::vector< ::std::vector< ::std::string>> kTests = {
+ {"queue_test"},
+ {"condition_test"},
+ {"mutex_test"},
+ {"raw_queue_test"},
+};
+// These arguments get inserted before any per-test arguments.
+static const ::std::vector< ::std::string> kDefaultArgs = {
+ "--gtest_repeat=30",
+ "--gtest_shuffle",
+};
+
+// How many test processes to run at a time.
+static const int kTesters = 100;
+// How long to test for.
+static constexpr time::Time kTestTime = time::Time::InSeconds(30);
+
+// The structure that gets put into shared memory and then referenced by all of
+// the child processes.
+struct Shared {
+ Shared(const time::Time &stop_time)
+ : stop_time(stop_time), total_iterations(0) {}
+
+ // Synchronizes access to stdout/stderr to avoid interleaving failure
+ // messages.
+ Mutex output_mutex;
+
+ // When to stop.
+ time::Time stop_time;
+
+ // The total number of iterations. Updated by each child as it finishes.
+ int total_iterations;
+ // Sychronizes writes to total_iterations
+ Mutex total_iterations_mutex;
+
+ const char *path;
+};
+static_assert(shm_ok<Shared>::value,
+ "it's going to get shared between forked processes");
+
+// Gets called after each child forks to run a test.
+void __attribute__((noreturn)) DoRunTest(
+ Shared *shared, const ::std::vector< ::std::string> &test, int pipes[2]) {
+ if (close(pipes[0]) == -1) {
+ Die("close(%d) of read end of pipe failed with %d: %s\n",
+ pipes[0], errno, strerror(errno));
+ }
+ if (close(STDIN_FILENO) == -1) {
+ Die("close(STDIN_FILENO(=%d)) failed with %d: %s\n",
+ STDIN_FILENO, errno, strerror(errno));
+ }
+ if (dup2(pipes[1], STDOUT_FILENO) == -1) {
+ Die("dup2(%d, STDOUT_FILENO(=%d)) failed with %d: %s\n",
+ pipes[1], STDOUT_FILENO, errno, strerror(errno));
+ }
+ if (dup2(pipes[1], STDERR_FILENO) == -1) {
+ Die("dup2(%d, STDERR_FILENO(=%d)) failed with %d: %s\n",
+ pipes[1], STDERR_FILENO, errno, strerror(errno));
+ }
+
+ size_t size = test.size();
+ size_t default_size = kDefaultArgs.size();
+ const char **args = new const char *[size + default_size + 1];
+ // The actual executable to run.
+ ::std::string executable;
+ int i = 0;
+ for (const ::std::string &c : test) {
+ if (i == 0) {
+ executable = ::std::string(shared->path) + "/" + c;
+ args[0] = executable.c_str();
+ for (const ::std::string &ci : kDefaultArgs) {
+ args[++i] = ci.c_str();
+ }
+ } else {
+ args[i] = c.c_str();
+ }
+ ++i;
+ }
+ args[size] = NULL;
+ execv(executable.c_str(), const_cast<char *const *>(args));
+ Die("execv(%s, %p) failed with %d: %s\n",
+ executable.c_str(), args, errno, strerror(errno));
+}
+
+void DoRun(Shared *shared) {
+ int iterations = 0;
+ // An iterator pointing to a random one of the tests.
+ auto test = kTests.begin() + (getpid() % kTests.size());
+ int pipes[2];
+ while (time::Time::Now() < shared->stop_time) {
+ if (pipe(pipes) == -1) {
+ Die("pipe(%p) failed with %d: %s\n", &pipes, errno, strerror(errno));
+ }
+ switch (fork()) {
+ case 0: // in runner
+ DoRunTest(shared, *test, pipes);
+ case -1:
+ Die("fork() failed with %d: %s\n", errno, strerror(errno));
+ }
+
+ if (close(pipes[1]) == -1) {
+ Die("close(%d) of write end of pipe failed with %d: %s\n",
+ pipes[1], errno, strerror(errno));
+ }
+
+ ::std::string output;
+ char buffer[2048];
+ while (true) {
+ ssize_t ret = read(pipes[0], &buffer, sizeof(buffer));
+ if (ret == 0) { // EOF
+ if (close(pipes[0]) == -1) {
+ Die("close(%d) of pipe at EOF failed with %d: %s\n",
+ pipes[0], errno, strerror(errno));
+ }
+ break;
+ } else if (ret == -1) {
+ Die("read(%d, %p, %zd) failed with %d: %s\n",
+ pipes[0], &buffer, sizeof(buffer), errno, strerror(errno));
+ }
+ output += ::std::string(buffer, ret);
+ }
+
+ int status;
+ while (true) {
+ if (wait(&status) == -1) {
+ if (errno == EINTR) continue;
+ Die("wait(%p) in child failed with %d: %s\n",
+ &status, errno, strerror(errno));
+ } else {
+ break;
+ }
+ }
+ if (WIFEXITED(status)) {
+ if (WEXITSTATUS(status) != 0) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s exited with status %d. output:\n",
+ test->at(0).c_str(), WEXITSTATUS(status));
+ fputs(output.c_str(), stderr);
+ }
+ } else if (WIFSIGNALED(status)) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s terminated by signal %d: %s.\n",
+ test->at(0).c_str(),
+ WTERMSIG(status), strsignal(WTERMSIG(status)));
+ fputs(output.c_str(), stderr);
+ } else {
+ assert(WIFSTOPPED(status));
+ Die("Test %s was stopped.\n", test->at(0).c_str());
+ }
+
+ ++test;
+ if (test == kTests.end()) test = kTests.begin();
+ ++iterations;
+ }
+ {
+ MutexLocker sync(&shared->total_iterations_mutex);
+ shared->total_iterations += iterations;
+ }
+}
+
+void Run(Shared *shared) {
+ switch (fork()) {
+ case 0: // in child
+ DoRun(shared);
+ _exit(EXIT_SUCCESS);
+ case -1:
+ Die("fork() of child failed with %d: %s\n", errno, strerror(errno));
+ }
+}
+
+int Main(int argc, char **argv) {
+ assert(argc >= 1);
+
+ ::aos::common::testing::GlobalCoreInstance global_core;
+
+ Shared *shared = static_cast<Shared *>(shm_malloc(sizeof(Shared)));
+ new (shared) Shared(time::Time::Now() + kTestTime);
+
+ char *temp = strdup(argv[0]);
+ shared->path = strdup(dirname(temp));
+ free(temp);
+
+ for (int i = 0; i < kTesters; ++i) {
+ Run(shared);
+ }
+
+ bool error = false;
+ for (int i = 0; i < kTesters; ++i) {
+ int status;
+ if (wait(&status) == -1) {
+ if (errno == EINTR) {
+ --i;
+ } else {
+ Die("wait(%p) failed with %d: %s\n", &status, errno, strerror(errno));
+ }
+ }
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ error = true;
+ }
+ }
+
+ printf("Ran a total of %d tests.\n", shared->total_iterations);
+ if (error) {
+ printf("A child had a problem during the test.\n");
+ }
+ return error ? EXIT_FAILURE : EXIT_SUCCESS;
+}
+
+} // namespace aos
+
+int main(int argc, char **argv) {
+ return ::aos::Main(argc, argv);
+}
diff --git a/aos/linux_code/ipc_lib/mutex.cpp b/aos/linux_code/ipc_lib/mutex.cpp
new file mode 100644
index 0000000..47fc92a
--- /dev/null
+++ b/aos/linux_code/ipc_lib/mutex.cpp
@@ -0,0 +1,36 @@
+#include "aos/common/mutex.h"
+
+#include <inttypes.h>
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "aos/common/type_traits.h"
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+
+Mutex::Mutex() : impl_(0) {
+ static_assert(shm_ok<Mutex>::value,
+ "Mutex is not safe for use in shared memory.");
+}
+
+// Lock and Unlock use the return values of mutex_lock/mutex_unlock
+// to determine whether the lock/unlock succeeded.
+
+void Mutex::Lock() {
+ if (mutex_grab(&impl_) != 0) {
+ LOG(FATAL, "mutex_grab(%p(=%" PRIu32 ")) failed because of %d: %s\n",
+ &impl_, impl_, errno, strerror(errno));
+ }
+}
+
+void Mutex::Unlock() {
+ mutex_unlock(&impl_);
+}
+
+bool Mutex::TryLock() {
+ return mutex_trylock(&impl_) == 0;
+}
+
+} // namespace aos
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
new file mode 100644
index 0000000..e67f22c
--- /dev/null
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -0,0 +1,491 @@
+#include "aos/linux_code/ipc_lib/queue.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <memory>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/type_traits.h"
+#include "aos/linux_code/ipc_lib/core_lib.h"
+
+namespace aos {
+namespace {
+
+static_assert(shm_ok<RawQueue>::value,
+ "RawQueue instances go into shared memory");
+
+const bool kReadDebug = false;
+const bool kWriteDebug = false;
+const bool kRefDebug = false;
+const bool kFetchDebug = false;
+
+// The number of extra messages the pool associated with each queue will be able
+// to hold (for readers who are slow about freeing them or who leak one when
+// they get killed).
+const int kExtraMessages = 20;
+
+} // namespace
+
+const int RawQueue::kPeek;
+const int RawQueue::kFromEnd;
+const int RawQueue::kNonBlock;
+const int RawQueue::kBlock;
+const int RawQueue::kOverride;
+
+struct RawQueue::MessageHeader {
+ int ref_count;
+ int index; // in pool_
+ static MessageHeader *Get(const void *msg) {
+ return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
+ static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
+ alignof(MessageHeader)));
+ }
+ void Swap(MessageHeader *other) {
+ MessageHeader temp;
+ memcpy(&temp, other, sizeof(temp));
+ memcpy(other, this, sizeof(*other));
+ memcpy(this, &temp, sizeof(*this));
+ }
+};
+static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
+ " is to stick it in shared memory");
+
+struct RawQueue::ReadData {
+ bool writable_start;
+};
+
+// TODO(brians) maybe do this with atomic integer instructions so it doesn't
+// have to lock/unlock pool_lock_
+void RawQueue::DecrementMessageReferenceCount(const void *msg) {
+ MutexLocker locker(&pool_lock_);
+ MessageHeader *header = MessageHeader::Get(msg);
+ --header->ref_count;
+ assert(header->ref_count >= 0);
+ if (kRefDebug) {
+ printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
+ }
+ if (header->ref_count == 0) {
+ DoFreeMessage(msg);
+ }
+}
+
+RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
+ : readable_(&data_lock_), writable_(&data_lock_) {
+ const size_t name_size = strlen(name) + 1;
+ char *temp = static_cast<char *>(shm_malloc(name_size));
+ memcpy(temp, name, name_size);
+ name_ = temp;
+ length_ = length;
+ hash_ = hash;
+ queue_length_ = queue_length;
+
+ next_ = NULL;
+ recycle_ = NULL;
+
+ if (kFetchDebug) {
+ printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
+ name, length, hash, queue_length);
+ }
+
+ data_length_ = queue_length + 1;
+ if (data_length_ < 2) { // TODO(brians) when could this happen?
+ data_length_ = 2;
+ }
+ data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
+ data_start_ = 0;
+ data_end_ = 0;
+ messages_ = 0;
+
+ mem_length_ = queue_length + kExtraMessages;
+ pool_length_ = 0;
+ messages_used_ = 0;
+ msg_length_ = length + sizeof(MessageHeader);
+ pool_ = static_cast<MessageHeader **>(
+ shm_malloc(sizeof(MessageHeader *) * mem_length_));
+
+ if (kFetchDebug) {
+ printf("made queue %s\n", name);
+ }
+}
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+ int queue_length) {
+ if (kFetchDebug) {
+ printf("fetching queue %s\n", name);
+ }
+ if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
+ return NULL;
+ }
+ RawQueue *current = static_cast<RawQueue *>(
+ global_core->mem_struct->queues.queue_list);
+ if (current != NULL) {
+ while (true) {
+ // If we found a matching queue.
+ if (strcmp(current->name_, name) == 0 && current->length_ == length &&
+ current->hash_ == hash && current->queue_length_ == queue_length) {
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return current;
+ } else {
+ if (kFetchDebug) {
+ printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
+ strcmp(current->name_, name), name);
+ }
+ }
+ // If this is the last one.
+ if (current->next_ == NULL) break;
+ current = current->next_;
+ }
+ }
+
+ RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
+ RawQueue(name, length, hash, queue_length);
+ if (current == NULL) { // if we don't already have one
+ global_core->mem_struct->queues.queue_list = r;
+ } else {
+ current->next_ = r;
+ }
+
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return r;
+}
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_length, RawQueue **recycle) {
+ RawQueue *r = Fetch(name, length, hash, queue_length);
+ r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
+ if (r == r->recycle_) {
+ fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
+ printf("see stderr\n");
+ r->recycle_ = NULL;
+ abort();
+ }
+ *recycle = r->recycle_;
+ return r;
+}
+
+void RawQueue::DoFreeMessage(const void *msg) {
+ MessageHeader *header = MessageHeader::Get(msg);
+ if (pool_[header->index] != header) { // if something's messed up
+ fprintf(stderr, "queue: something is very very wrong with queue %p."
+ " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
+ this, pool_, header->index, header);
+ printf("queue: see stderr\n");
+ abort();
+ }
+ if (kRefDebug) {
+ printf("ref free: %p\n", msg);
+ }
+ --messages_used_;
+
+ if (recycle_ != NULL) {
+ void *const new_msg = recycle_->GetMessage();
+ if (new_msg == NULL) {
+ fprintf(stderr, "queue: couldn't get a message"
+ " for recycle queue %p\n", recycle_);
+ } else {
+ // Take a message from recycle_ and switch its
+ // header with the one being freed, which effectively
+ // switches which queue each message belongs to.
+ MessageHeader *const new_header = MessageHeader::Get(new_msg);
+ // Also switch the messages between the pools.
+ pool_[header->index] = new_header;
+ {
+ MutexLocker locker(&recycle_->pool_lock_);
+ recycle_->pool_[new_header->index] = header;
+ // Swap the information in both headers.
+ header->Swap(new_header);
+ // Don't unlock the other pool until all of its messages are valid.
+ }
+ // use the header for new_msg which is now for this pool
+ header = new_header;
+ if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
+ fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
+ " aborting\n", recycle_, msg);
+ printf("see stderr\n");
+ abort();
+ }
+ msg = new_msg;
+ }
+ }
+
+ // Where the one we're freeing was.
+ int index = header->index;
+ header->index = -1;
+ if (index != messages_used_) { // if we're not freeing the one on the end
+ // Put the last one where the one we're freeing was.
+ header = pool_[index] = pool_[messages_used_];
+ // Put the one we're freeing at the end.
+ pool_[messages_used_] = MessageHeader::Get(msg);
+ // Update the former last one's index.
+ header->index = index;
+ }
+}
+
+bool RawQueue::WriteMessage(void *msg, int options) {
+ if (kWriteDebug) {
+ printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
+ }
+ if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
+ msg > static_cast<void *>((
+ reinterpret_cast<char *>(global_core->mem_struct) +
+ global_core->size))) {
+ fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
+ msg, this);
+ printf("see stderr\n");
+ abort();
+ }
+ {
+ MutexLocker locker(&data_lock_);
+ bool writable_waited = false;
+
+ int new_end;
+ while (true) {
+ new_end = (data_end_ + 1) % data_length_;
+ // If there is room in the queue right now.
+ if (new_end != data_start_) break;
+ if (options & kNonBlock) {
+ if (kWriteDebug) {
+ printf("queue: not blocking on %p. returning false\n", this);
+ }
+ return false;
+ } else if (options & kOverride) {
+ if (kWriteDebug) {
+ printf("queue: overriding on %p\n", this);
+ }
+ // Avoid leaking the message that we're going to overwrite.
+ DecrementMessageReferenceCount(data_[data_start_]);
+ data_start_ = (data_start_ + 1) % data_length_;
+ } else { // kBlock
+ if (kWriteDebug) {
+ printf("queue: going to wait for writable_ of %p\n", this);
+ }
+ writable_.Wait();
+ writable_waited = true;
+ }
+ }
+ data_[data_end_] = msg;
+ ++messages_;
+ data_end_ = new_end;
+
+ if (kWriteDebug) {
+ printf("queue: broadcasting to readable_ of %p\n", this);
+ }
+ readable_.Broadcast();
+
+ // If we got a signal on writable_ here and it's still writable, then we
+ // need to signal the next person in line (if any).
+ if (writable_waited && is_writable()) {
+ if (kWriteDebug) {
+ printf("queue: resignalling writable_ of %p\n", this);
+ }
+ writable_.Signal();
+ }
+ }
+ if (kWriteDebug) {
+ printf("queue: write returning true on queue %p\n", this);
+ }
+ return true;
+}
+
+void RawQueue::ReadCommonEnd(ReadData *read_data) {
+ if (is_writable()) {
+ if (kReadDebug) {
+ printf("queue: %ssignalling writable_ of %p\n",
+ read_data->writable_start ? "not " : "", this);
+ }
+ if (!read_data->writable_start) writable_.Signal();
+ }
+}
+bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
+ read_data->writable_start = is_writable();
+ while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
+ if (options & kNonBlock) {
+ if (kReadDebug) {
+ printf("queue: not going to block waiting on %p\n", this);
+ }
+ return false;
+ } else { // kBlock
+ if (kReadDebug) {
+ printf("queue: going to wait for readable_ of %p\n", this);
+ }
+ // Wait for a message to become readable.
+ readable_.Wait();
+ if (kReadDebug) {
+ printf("queue: done waiting for readable_ of %p\n", this);
+ }
+ }
+ }
+ if (kReadDebug) {
+ printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
+ }
+ return true;
+}
+void *RawQueue::ReadPeek(int options, int start) {
+ void *ret;
+ if (options & kFromEnd) {
+ int pos = data_end_ - 1;
+ if (pos < 0) { // if it needs to wrap
+ pos = data_length_ - 1;
+ }
+ if (kReadDebug) {
+ printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
+ }
+ ret = data_[pos];
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
+ }
+ ret = data_[start];
+ }
+ MessageHeader *const header = MessageHeader::Get(ret);
+ ++header->ref_count;
+ if (kRefDebug) {
+ printf("ref inc count: %p\n", ret);
+ }
+ return ret;
+}
+const void *RawQueue::ReadMessage(int options) {
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessage(%x)\n", this, options);
+ }
+ void *msg = NULL;
+
+ MutexLocker locker(&data_lock_);
+
+ ReadData read_data;
+ if (!ReadCommonStart(options, NULL, &read_data)) {
+ if (kReadDebug) {
+ printf("queue: %p common returned false\n", this);
+ }
+ return NULL;
+ }
+
+ if (options & kPeek) {
+ msg = ReadPeek(options, data_start_);
+ } else {
+ if (options & kFromEnd) {
+ while (true) {
+ if (kReadDebug) {
+ printf("queue: %p start of c2\n", this);
+ }
+ // This loop pulls each message out of the buffer.
+ const int pos = data_start_;
+ data_start_ = (data_start_ + 1) % data_length_;
+ // If this is the last one.
+ if (data_start_ == data_end_) {
+ if (kReadDebug) {
+ printf("queue: %p reading from c2: %d\n", this, pos);
+ }
+ msg = data_[pos];
+ break;
+ }
+ // This message is not going to be in the queue any more.
+ DecrementMessageReferenceCount(data_[pos]);
+ }
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from d2: %d\n", this, data_start_);
+ }
+ msg = data_[data_start_];
+ // TODO(brians): Doesn't this need to increment the ref count?
+ data_start_ = (data_start_ + 1) % data_length_;
+ }
+ }
+ ReadCommonEnd(&read_data);
+ if (kReadDebug) {
+ printf("queue: %p read returning %p\n", this, msg);
+ }
+ return msg;
+}
+const void *RawQueue::ReadMessageIndex(int options, int *index) {
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
+ this, options, index, *index);
+ }
+ void *msg = NULL;
+
+ MutexLocker locker(&data_lock_);
+
+ ReadData read_data;
+ if (!ReadCommonStart(options, index, &read_data)) {
+ if (kReadDebug) {
+ printf("queue: %p common returned false\n", this);
+ }
+ return NULL;
+ }
+
+ // TODO(parker): Handle integer wrap on the index.
+
+ // How many unread messages we have.
+ const int offset = messages_ - *index;
+ // Where we're going to start reading.
+ int my_start = data_end_ - offset;
+ if (my_start < 0) { // If we want to read off the end of the buffer.
+ // Unwrap it.
+ my_start += data_length_;
+ }
+ if (offset >= data_length_) { // If we're behind the available messages.
+ // Catch index up to the last available message.
+ *index += data_start_ - my_start;
+ // And that's the one we're going to read.
+ my_start = data_start_;
+ }
+ if (options & kPeek) {
+ msg = ReadPeek(options, my_start);
+ } else {
+ if (options & kFromEnd) {
+ if (kReadDebug) {
+ printf("queue: %p start of c1\n", this);
+ }
+ int pos = data_end_ - 1;
+ if (pos < 0) { // If it wrapped.
+ pos = data_length_ - 1; // Unwrap it.
+ }
+ if (kReadDebug) {
+ printf("queue: %p reading from c1: %d\n", this, pos);
+ }
+ msg = data_[pos];
+ *index = messages_;
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from d1: %d\n", this, my_start);
+ }
+ msg = data_[my_start];
+ ++(*index);
+ }
+ MessageHeader *const header = MessageHeader::Get(msg);
+ ++header->ref_count;
+ if (kRefDebug) {
+ printf("ref_inc_count: %p\n", msg);
+ }
+ }
+ ReadCommonEnd(&read_data);
+ return msg;
+}
+
+void *RawQueue::GetMessage() {
+ MutexLocker locker(&pool_lock_);
+ MessageHeader *header;
+ if (pool_length_ - messages_used_ > 0) {
+ header = pool_[messages_used_];
+ } else {
+ if (pool_length_ >= mem_length_) {
+ LOG(FATAL, "overused pool of queue %p\n", this);
+ }
+ header = pool_[pool_length_] =
+ static_cast<MessageHeader *>(shm_malloc(msg_length_));
+ ++pool_length_;
+ }
+ void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
+ header->ref_count = 1;
+ if (kRefDebug) {
+ printf("%p ref alloc: %p\n", this, msg);
+ }
+ header->index = messages_used_;
+ ++messages_used_;
+ return msg;
+}
+
+} // namespace aos
diff --git a/aos/linux_code/ipc_lib/queue.h b/aos/linux_code/ipc_lib/queue.h
new file mode 100644
index 0000000..a58b65e
--- /dev/null
+++ b/aos/linux_code/ipc_lib/queue.h
@@ -0,0 +1,171 @@
+#ifndef AOS_LINUX_CODE_IPC_LIB_QUEUE_H_
+#define AOS_LINUX_CODE_IPC_LIB_QUEUE_H_
+
+#include "aos/linux_code/ipc_lib/shared_mem.h"
+#include "aos/common/mutex.h"
+#include "aos/common/condition.h"
+
+// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
+// code to make checking for leaks work better
+// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
+// describes how
+
+// Any pointers returned from these functions can be safely passed to other
+// processes because they are all shared memory pointers.
+// IMPORTANT: Any message pointer must be passed back in some way
+// (FreeMessage and WriteMessage are common ones) or the
+// application will leak shared memory.
+// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
+// might work, but it is not guaranteed to.
+
+namespace aos {
+
+// Queues are the primary way to use shared memory. Basic use consists of
+// calling Queue::Fetch and then reading and/or writing messages.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and type signature will result in a different queue, which means
+// that if you only recompile some code that uses differently sized messages,
+// it will simply use a different queue than the old code.
+class RawQueue {
+ public:
+ // Retrieves (and creates if necessary) a queue. Each combination of name and
+ // signature refers to a completely independent queue.
+ // length is how large each message will be
+ // hash can differentiate multiple otherwise identical queues
+ // queue_length is how many messages the queue will be able to hold
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
+ int queue_length);
+ // Same as above, except sets up the returned queue so that it will put
+ // messages on *recycle when they are freed (after they have been released by
+ // all other readers/writers and are not in the queue).
+ // recycle_queue_length determines how many freed messages will be kept.
+ // Other code can retrieve the 2 queues separately (the recycle queue will
+ // have the same length and hash as the main one). However, any frees made
+ // using a queue with only (name,length,hash,queue_length) before the
+ // recycle queue has been associated with it will not go on to the recycle
+ // queue.
+ // NOTE: calling this function with the same (name,length,hash,queue_length)
+ // but multiple recycle_queue_lengths will result in each freed message being
+ // put onto an undefined one of the recycle queues.
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_queue_length,
+ RawQueue **recycle);
+
+ // Constants for passing to options arguments.
+ // The non-conflicting ones can be combined with bitwise-or.
+
+ // Causes the returned message to be left in the queue.
+ // For reading only.
+ static const int kPeek = 0x0001;
+ // Reads the last message in the queue instead of just the next one.
+ // NOTE: This removes all of the messages until the last one from the queue
+ // (which means that nobody else will read them). However, PEEK means to not
+ // remove any from the queue, including the ones that are skipped.
+ // For reading only.
+ static const int kFromEnd = 0x0002;
+ // Causes reads to return NULL and writes to fail instead of waiting.
+ // For reading and writing.
+ static const int kNonBlock = 0x0004;
+ // Causes things to block.
+ // IMPORTANT: Has a value of 0 so that it is the default. This has to stay.
+ // For reading and writing.
+ static const int kBlock = 0x0000;
+ // Causes writes to overwrite the oldest message in the queue instead of
+ // blocking.
+ // For writing only.
+ static const int kOverride = 0x0008;
+
+ // Writes a message into the queue.
+ // This function takes ownership of msg.
+ // NOTE: msg must point to a valid message from this queue
+ // Returns truen on success.
+ bool WriteMessage(void *msg, int options);
+
+ // Reads a message out of the queue.
+ // The return value will have at least the length of this queue's worth of
+ // valid data where it's pointing to.
+ // The return value is const because other people might be viewing the same
+ // messsage. Do not cast the const away!
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ const void *ReadMessage(int options);
+ // Exactly the same as aos_queue_read_msg, except it will never return the
+ // same message twice with the same index argument. However, it may not
+ // return some messages that pass through the queue.
+ // *index should start as 0. index does not have to be in shared memory, but
+ // it can be
+ const void *ReadMessageIndex(int options, int *index);
+
+ // Retrieves ("allocates") a message that can then be written to the queue.
+ // NOTE: the return value will be completely uninitialized
+ // The return value will have at least the length of this queue's worth of
+ // valid memory where it's pointing to.
+ // Returns NULL for error.
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ void *GetMessage();
+
+ // It is ok to call this method with a NULL msg.
+ void FreeMessage(const void *msg) {
+ if (msg != NULL) DecrementMessageReferenceCount(msg);
+ }
+
+ private:
+ struct MessageHeader;
+ struct ReadData;
+
+ bool is_readable() { return data_end_ != data_start_; }
+ bool is_writable() { return ((data_end_ + 1) % data_length_) != data_start_; }
+
+ // These next 4 allow finding the right one.
+ const char *name_;
+ size_t length_;
+ int hash_;
+ int queue_length_;
+ // The next one in the linked list of queues.
+ RawQueue *next_;
+
+ RawQueue *recycle_;
+
+ Mutex data_lock_; // protects operations on data_ etc
+ // Always gets broadcasted to because different readers might have different
+ // ideas of what "readable" means (ie ones using separated indices).
+ Condition readable_;
+ Condition writable_;
+ int data_length_; // max length into data + 1
+ int data_start_; // is an index into data
+ int data_end_; // is an index into data
+ int messages_; // that have passed through
+ void **data_; // array of messages (with headers)
+
+ Mutex pool_lock_;
+ size_t msg_length_; // sizeof(each message) including the header
+ int mem_length_; // the max number of messages that will ever be allocated
+ int messages_used_;
+ int pool_length_; // the number of allocated messages
+ MessageHeader **pool_; // array of pointers to messages
+
+ // Actually frees the given message.
+ void DoFreeMessage(const void *msg);
+ // Calls DoFreeMessage if appropriate.
+ void DecrementMessageReferenceCount(const void *msg);
+
+ // Should be called with data_lock_ locked.
+ // *read_data will be initialized.
+ // Returns with a readable message in data_ or false.
+ bool ReadCommonStart(int options, int *index, ReadData *read_data);
+ // Deals with setting/unsetting readable_ and writable_.
+ // Should be called after data_lock_ has been unlocked.
+ // read_data should be the same thing that was passed in to ReadCommonStart.
+ void ReadCommonEnd(ReadData *read_data);
+ // Handles reading with kPeek.
+ void *ReadPeek(int options, int start);
+
+ // Gets called by Fetch when necessary (with placement new).
+ RawQueue(const char *name, size_t length, int hash, int queue_length);
+};
+
+} // namespace aos
+
+#endif // AOS_LINUX_CODE_IPC_LIB_QUEUE_H_
diff --git a/aos/linux_code/ipc_lib/queue_test.cc b/aos/linux_code/ipc_lib/queue_test.cc
new file mode 100644
index 0000000..c87bd7b
--- /dev/null
+++ b/aos/linux_code/ipc_lib/queue_test.cc
@@ -0,0 +1,424 @@
+#include "aos/common/queue.h"
+
+#include <unistd.h>
+#include <sys/mman.h>
+#include <inttypes.h>
+
+#include <ostream>
+#include <memory>
+#include <map>
+
+#include "gtest/gtest.h"
+
+#include "aos/linux_code/ipc_lib/core_lib.h"
+#include "aos/common/type_traits.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/time.h"
+#include "aos/common/logging/logging.h"
+
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+using ::aos::common::testing::GlobalCoreInstance;
+
+namespace aos {
+namespace testing {
+
+class QueueTest : public ::testing::Test {
+ protected:
+ static const size_t kFailureSize = 400;
+ static char *fatal_failure;
+ private:
+ enum class ResultType : uint8_t {
+ NotCalled,
+ Called,
+ Returned,
+ };
+ const std::string ResultTypeString(volatile const ResultType &result) {
+ switch (result) {
+ case ResultType::Returned:
+ return "Returned";
+ case ResultType::Called:
+ return "Called";
+ case ResultType::NotCalled:
+ return "NotCalled";
+ default:
+ return std::string("unknown(" + static_cast<uint8_t>(result)) + ")";
+ }
+ }
+ static_assert(aos::shm_ok<ResultType>::value,
+ "this will get put in shared memory");
+ template<typename T>
+ struct FunctionToCall {
+ FunctionToCall() : result(ResultType::NotCalled) {
+ started.Lock();
+ }
+
+ volatile ResultType result;
+ bool expected;
+ void (*function)(T*, char*);
+ T *arg;
+ volatile char failure[kFailureSize];
+ Mutex started;
+ };
+ template<typename T>
+ static void Hangs_(FunctionToCall<T> *const to_call) {
+ to_call->started.Unlock();
+ to_call->result = ResultType::Called;
+ to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
+ to_call->result = ResultType::Returned;
+ }
+
+ // How long until a function is considered to have hung.
+ static constexpr time::Time kHangTime = time::Time::InSeconds(0.035);
+ // How long to sleep after forking (for debugging).
+ static constexpr time::Time kForkSleep = time::Time::InSeconds(0);
+
+ // Represents a process that has been forked off. The destructor kills the
+ // process and wait(2)s for it.
+ class ForkedProcess {
+ public:
+ ForkedProcess(pid_t pid, mutex *lock) : pid_(pid), lock_(lock) {};
+ ~ForkedProcess() {
+ if (kill(pid_, SIGINT) == -1) {
+ if (errno == ESRCH) {
+ printf("process %jd was already dead\n", static_cast<intmax_t>(pid_));
+ } else {
+ fprintf(stderr, "kill(SIGKILL, %jd) failed with %d: %s\n",
+ static_cast<intmax_t>(pid_), errno, strerror(errno));
+ }
+ return;
+ }
+ const pid_t ret = wait(NULL);
+ if (ret == -1) {
+ LOG(WARNING, "wait(NULL) failed."
+ " child %jd might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret == 0) {
+ LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret != pid_) {
+ LOG(WARNING, "child %d is now confirmed dead"
+ ", but child %jd might still be alive\n",
+ ret, static_cast<intmax_t>(pid_));
+ }
+ }
+
+ enum class JoinResult {
+ Finished, Hung, Error
+ };
+ JoinResult Join(time::Time timeout = kHangTime) {
+ timespec lock_timeout = (kForkSleep + timeout).ToTimespec();
+ switch (mutex_lock_timeout(lock_, &lock_timeout)) {
+ case 2:
+ return JoinResult::Hung;
+ case 0:
+ return JoinResult::Finished;
+ default:
+ return JoinResult::Error;
+ }
+ }
+
+ private:
+ const pid_t pid_;
+ mutex *const lock_;
+ } __attribute__((unused));
+
+ // State for HangsFork and HangsCheck.
+ typedef uint8_t ChildID;
+ static void ReapExitHandler() {
+ for (auto it = children_.begin(); it != children_.end(); ++it) {
+ delete it->second;
+ }
+ }
+ static std::map<ChildID, ForkedProcess *> children_;
+ std::map<ChildID, FunctionToCall<void> *> to_calls_;
+
+ void SetUp() override {
+ ::testing::Test::SetUp();
+ fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+ static bool registered = false;
+ if (!registered) {
+ atexit(ReapExitHandler);
+ registered = true;
+ }
+ }
+
+ protected:
+ // function gets called with arg in a forked process.
+ // Leaks shared memory.
+ template<typename T> __attribute__((warn_unused_result))
+ std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
+ mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
+ sizeof(*lock), sizeof(int)));
+ assert(mutex_lock(lock) == 0);
+ const pid_t pid = fork();
+ switch (pid) {
+ case 0: // child
+ if (kForkSleep != time::Time(0, 0)) {
+ LOG(INFO, "pid %jd sleeping for %ds%dns\n",
+ static_cast<intmax_t>(getpid()),
+ kForkSleep.sec(), kForkSleep.nsec());
+ time::SleepFor(kForkSleep);
+ }
+ ::aos::common::testing::PreventExit();
+ function(arg);
+ mutex_unlock(lock);
+ exit(EXIT_SUCCESS);
+ case -1: // parent failure
+ LOG(ERROR, "fork() failed with %d: %s\n", errno, strerror(errno));
+ return std::unique_ptr<ForkedProcess>();
+ default: // parent
+ return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, lock));
+ }
+ }
+
+ // Checks whether or not the given function hangs.
+ // expected is whether to return success or failure if the function hangs
+ // NOTE: There are other reasons for it to return a failure than the function
+ // doing the wrong thing.
+ // Leaks shared memory.
+ template<typename T>
+ AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
+ AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
+ if (!fork_result) {
+ return fork_result;
+ }
+ return HangsCheck(0);
+ }
+ // Starts the first part of Hangs.
+ // Use HangsCheck to get the result.
+ // Returns whether the fork succeeded or not, NOT whether or not the hang
+ // check succeeded.
+ template<typename T>
+ AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
+ bool expected, ChildID id) {
+ static_assert(aos::shm_ok<FunctionToCall<T>>::value,
+ "this is going into shared memory");
+ FunctionToCall<T> *const to_call =
+ static_cast<FunctionToCall<T> *>(
+ shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
+ new (to_call) FunctionToCall<T>();
+ to_call->function = function;
+ to_call->arg = arg;
+ to_call->expected = expected;
+ to_call->failure[0] = '\0';
+ static_cast<char *>(fatal_failure)[0] = '\0';
+ children_[id] = ForkExecute(Hangs_, to_call).release();
+ if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
+ to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
+ to_call->started.Lock();
+ return AssertionSuccess();
+ }
+ // Checks whether or not a function hung like it was supposed to.
+ // Use HangsFork first.
+ // NOTE: calls to HangsFork and HangsCheck with the same id argument will
+ // correspond, but they do not nest. Also, id 0 is used by Hangs.
+ // Return value is the same as Hangs.
+ AssertionResult HangsCheck(ChildID id) {
+ std::unique_ptr<ForkedProcess> child(children_[id]);
+ children_.erase(id);
+ const ForkedProcess::JoinResult result = child->Join();
+ if (to_calls_[id]->failure[0] != '\0') {
+ return AssertionFailure() << "function says: "
+ << const_cast<char *>(to_calls_[id]->failure);
+ }
+ if (result == ForkedProcess::JoinResult::Finished) {
+ return !to_calls_[id]->expected ? AssertionSuccess() : (AssertionFailure()
+ << "something happened and the the test only got to "
+ << ResultTypeString(to_calls_[id]->result));
+ } else {
+ if (to_calls_[id]->result == ResultType::Called) {
+ return to_calls_[id]->expected ? AssertionSuccess() :
+ AssertionFailure();
+ } else if (result == ForkedProcess::JoinResult::Error) {
+ return AssertionFailure() << "error joining child";
+ } else {
+ abort();
+ return AssertionFailure() << "something weird happened";
+ }
+ }
+ }
+#define EXPECT_HANGS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, true, EXPECT_TRUE)
+#define EXPECT_RETURNS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_TRUE)
+#define EXPECT_RETURNS_FAILS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_FALSE)
+#define EXPECT_HANGS_COND(function, arg, hangs, cond) do { \
+ cond(Hangs(function, arg, hangs)); \
+ if (fatal_failure[0] != '\0') { \
+ FAIL() << fatal_failure; \
+ } \
+} while (false)
+
+ struct TestMessage {
+ // Some contents because we don't really want to test empty messages.
+ int16_t data;
+ };
+ struct MessageArgs {
+ RawQueue *const queue;
+ int flags;
+ int16_t data; // -1 means NULL expected
+ };
+ static void WriteTestMessage(MessageArgs *args, char *failure) {
+ TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
+ if (msg == NULL) {
+ snprintf(fatal_failure, kFailureSize,
+ "couldn't get_msg from %p", args->queue);
+ return;
+ }
+ msg->data = args->data;
+ if (!args->queue->WriteMessage(msg, args->flags)) {
+ snprintf(failure, kFailureSize, "write_msg_free(%p, %p, %d) failed",
+ args->queue, msg, args->flags);
+ }
+ }
+ static void ReadTestMessage(MessageArgs *args, char *failure) {
+ const TestMessage *msg = static_cast<const TestMessage *>(
+ args->queue->ReadMessage(args->flags));
+ if (msg == NULL) {
+ if (args->data != -1) {
+ snprintf(failure, kFailureSize,
+ "expected data of %" PRId16 " but got NULL message",
+ args->data);
+ }
+ } else {
+ if (args->data != msg->data) {
+ snprintf(failure, kFailureSize,
+ "expected data of %" PRId16 " but got %" PRId16 " instead",
+ args->data, msg->data);
+ }
+ args->queue->FreeMessage(msg);
+ }
+ }
+
+ private:
+ GlobalCoreInstance my_core;
+};
+char *QueueTest::fatal_failure;
+std::map<QueueTest::ChildID, QueueTest::ForkedProcess *> QueueTest::children_;
+constexpr time::Time QueueTest::kHangTime;
+constexpr time::Time QueueTest::kForkSleep;
+
+TEST_F(QueueTest, Reading) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, 0, -1};
+
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.data = 254;
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ args.data = -1;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ args.data = 971;
+ EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
+}
+TEST_F(QueueTest, Writing) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, 0, 973};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ EXPECT_HANGS(WriteTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.data = 971;
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+}
+
+TEST_F(QueueTest, MultiRead) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, 0, 1323};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
+ EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
+ // TODO(brians) finish this
+}
+
+TEST_F(QueueTest, Recycle) {
+ // TODO(brians) basic test of recycle queue
+ // include all of the ways a message can get into the recycle queue
+ RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
+ 1, 2, 2, 2, &recycle_queue);
+ ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
+ MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 254;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 971;
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ recycle.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+ ASSERT_TRUE(msg != NULL);
+ msg->data = 341;
+ queue->FreeMessage(msg);
+ recycle.data = 341;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ args.data = 254;
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.flags = RawQueue::kBlock;
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.data = 254;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+}
+
+} // namespace testing
+} // namespace aos
diff --git a/aos/linux_code/ipc_lib/shared_mem.c b/aos/linux_code/ipc_lib/shared_mem.c
new file mode 100644
index 0000000..4126d65
--- /dev/null
+++ b/aos/linux_code/ipc_lib/shared_mem.c
@@ -0,0 +1,129 @@
+#include "aos/linux_code/ipc_lib/shared_mem.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <errno.h>
+
+#include "aos/linux_code/ipc_lib/core_lib.h"
+
+// the path for the shared memory segment. see shm_open(3) for restrictions
+#define AOS_SHM_NAME "/aos_shared_mem"
+// Size of the shared mem segment.
+// Set to the maximum number that worked. Any bigger than this and the kernel
+// thinks you should be able to access all of it but it doesn't work with the
+// ARM kernel Brian was using on 2013-12-20.
+#define SIZEOFSHMSEG (4096 * 25074)
+
+void init_shared_mem_core(aos_shm_core *shm_core) {
+ clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
+ shm_core->msg_alloc_lock = 0;
+ shm_core->queues.queue_list = NULL;
+ shm_core->queues.alloc_lock = 0;
+}
+
+ptrdiff_t aos_core_get_mem_usage(void) {
+ return global_core->size -
+ ((ptrdiff_t)global_core->mem_struct->msg_alloc -
+ (ptrdiff_t)global_core->mem_struct);
+}
+
+struct aos_core *global_core = NULL;
+
+int aos_core_create_shared_mem(enum aos_core_create to_create) {
+ static struct aos_core global_core_data;
+ global_core = &global_core_data;
+ int shm;
+before:
+ if (to_create == create) {
+ printf("shared_mem: creating\n");
+ shm = shm_open(AOS_SHM_NAME, O_RDWR | O_CREAT | O_EXCL, 0666);
+ global_core->owner = 1;
+ if (shm == -1 && errno == EEXIST) {
+ printf("shared_mem: going to shm_unlink(" AOS_SHM_NAME ")\n");
+ if (shm_unlink(AOS_SHM_NAME) == -1) {
+ fprintf(stderr, "shared_mem: shm_unlink(" AOS_SHM_NAME ") failed with of %d: %s\n", errno, strerror(errno));
+ } else {
+ goto before;
+ }
+ }
+ } else {
+ printf("shared_mem: not creating\n");
+ shm = shm_open(AOS_SHM_NAME, O_RDWR, 0);
+ global_core->owner = 0;
+ }
+ if (shm == -1) {
+ fprintf(stderr, "shared_mem:"
+ " shm_open(" AOS_SHM_NAME ", O_RDWR [| O_CREAT | O_EXCL, 0|0666)"
+ " failed with %d: %s\n", errno, strerror(errno));
+ return -1;
+ }
+ if (global_core->owner) {
+ if (ftruncate(shm, SIZEOFSHMSEG) == -1) {
+ fprintf(stderr, "shared_mem: fruncate(%d, 0x%zx) failed with %d: %s\n",
+ shm, (size_t)SIZEOFSHMSEG, errno, strerror(errno));
+ return -1;
+ }
+ }
+ void *shm_address = mmap(
+ (void *)SHM_START, SIZEOFSHMSEG, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_FIXED | MAP_LOCKED | MAP_POPULATE, shm, 0);
+ if (shm_address == MAP_FAILED) {
+ fprintf(stderr, "shared_mem: mmap(%p, 0x%zx, stuff, stuff, %d, 0) failed"
+ " with %d: %s\n",
+ (void *)SHM_START, SIZEOFSHMSEG, shm, errno, strerror(errno));
+ return -1;
+ }
+ printf("shared_mem: shm at: %p\n", shm_address);
+ if (close(shm) == -1) {
+ printf("shared_mem: close(%d(=shm) failed with %d: %s\n",
+ shm, errno, strerror(errno));
+ }
+ if (shm_address != (void *)SHM_START) {
+ fprintf(stderr, "shared_mem: shm isn't at hard-coded %p. at %p instead\n",
+ (void *)SHM_START, shm_address);
+ return -1;
+ }
+ return aos_core_use_address_as_shared_mem(shm_address, SIZEOFSHMSEG);
+}
+
+int aos_core_use_address_as_shared_mem(void *address, size_t size) {
+ global_core->mem_struct = address;
+ global_core->size = size;
+ global_core->shared_mem = (uint8_t *)address + sizeof(*global_core->mem_struct);
+ if (global_core->owner) {
+ global_core->mem_struct->msg_alloc = (uint8_t *)address + global_core->size;
+ init_shared_mem_core(global_core->mem_struct);
+ }
+ if (global_core->owner) {
+ futex_set(&global_core->mem_struct->creation_condition);
+ } else {
+ if (futex_wait(&global_core->mem_struct->creation_condition) != 0) {
+ fprintf(stderr, "waiting on creation_condition failed\n");
+ return -1;
+ }
+ }
+ fprintf(stderr, "shared_mem: end of create_shared_mem owner=%d\n",
+ global_core->owner);
+ return 0;
+}
+
+int aos_core_free_shared_mem(){
+ void *shm_address = global_core->shared_mem;
+ if (munmap((void *)SHM_START, SIZEOFSHMSEG) == -1) {
+ fprintf(stderr, "shared_mem: munmap(%p, 0x%zx) failed with %d: %s\n",
+ shm_address, SIZEOFSHMSEG, errno, strerror(errno));
+ return -1;
+ }
+ if (global_core->owner) {
+ if (shm_unlink(AOS_SHM_NAME)) {
+ fprintf(stderr, "shared_mem: shm_unlink(" AOS_SHM_NAME ") failed with %d: %s\n",
+ errno, strerror(errno));
+ return -1;
+ }
+ }
+ return 0;
+}
diff --git a/aos/linux_code/ipc_lib/shared_mem.h b/aos/linux_code/ipc_lib/shared_mem.h
new file mode 100644
index 0000000..e5059c4
--- /dev/null
+++ b/aos/linux_code/ipc_lib/shared_mem.h
@@ -0,0 +1,69 @@
+#ifndef _SHARED_MEM_H_
+#define _SHARED_MEM_H_
+
+#include <stddef.h>
+#include <unistd.h>
+#include <time.h>
+
+#include "aos/linux_code/ipc_lib/aos_sync.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern struct aos_core *global_core;
+
+// Where the shared memory segment starts in each process's address space.
+// Has to be the same in all of them so that stuff in shared memory
+// can have regular pointers to other stuff in shared memory.
+#define SHM_START 0x20000000
+
+typedef struct aos_queue_global_t {
+ mutex alloc_lock;
+ void *queue_list; // an aos::Queue* declared in C code
+} aos_queue_global;
+
+typedef struct aos_shm_core_t {
+ // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
+ // this shared memory area
+ struct timespec identifier;
+ // gets 0-initialized at the start (as part of shared memory) and
+ // the owner sets as soon as it finishes setting stuff up
+ mutex creation_condition;
+ mutex msg_alloc_lock;
+ void *msg_alloc;
+ aos_queue_global queues;
+} aos_shm_core;
+
+enum aos_core_create {
+ create,
+ reference
+};
+struct aos_core {
+ int owner;
+ void *shared_mem;
+ // How large the chunk of shared memory is.
+ ptrdiff_t size;
+ aos_shm_core *mem_struct;
+};
+
+void init_shared_mem_core(aos_shm_core *shm_core);
+
+ptrdiff_t aos_core_get_mem_usage(void);
+
+// Takes the specified memory address and uses it as the shared memory.
+// address is the memory address, and size is the size of the memory.
+// global_core needs to point to an instance of struct aos_core, and owner
+// should be set correctly there.
+// The owner should verify that the first sizeof(mutex) of data is set to 0
+// before passing the memory to this function.
+int aos_core_use_address_as_shared_mem(void *address, size_t size);
+
+int aos_core_create_shared_mem(enum aos_core_create to_create);
+int aos_core_free_shared_mem(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif