got all of the code to actually compile again
I don't think it actually works though.
diff --git a/aos/atom_code/atom_code.gyp b/aos/atom_code/atom_code.gyp
index 37caccd..51dcaec 100644
--- a/aos/atom_code/atom_code.gyp
+++ b/aos/atom_code/atom_code.gyp
@@ -7,7 +7,7 @@
'<(AOS)/atom_code/init.cc',
],
'dependencies': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:shared_mem',
'<(AOS)/common/common.gyp:die',
'<(AOS)/build/aos.gyp:logging',
],
diff --git a/aos/atom_code/camera/Buffers.cpp b/aos/atom_code/camera/Buffers.cpp
index bf16915..9cd1a7d 100644
--- a/aos/atom_code/camera/Buffers.cpp
+++ b/aos/atom_code/camera/Buffers.cpp
@@ -77,11 +77,11 @@
do {
if (block) {
message_ = static_cast<const Message *>(queue_->ReadMessage(
- Queue::kPeek | Queue::kBlock));
+ RawQueue::kPeek | RawQueue::kBlock));
} else {
static int index = 0;
message_ = static_cast<const Message *>(queue_->ReadMessageIndex(
- Queue::kBlock, &index));
+ RawQueue::kBlock, &index));
}
} while (block && message_ == NULL);
if (message_ != NULL) {
@@ -137,7 +137,7 @@
}
Buffers::Buffers() : server_(CreateSocket(connect)), fd_(FetchFD()), message_(NULL) {
MMap();
- queue_ = Queue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1);
+ queue_ = RawQueue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1);
}
Buffers::~Buffers() {
diff --git a/aos/atom_code/camera/Buffers.h b/aos/atom_code/camera/Buffers.h
index 07177bc..080e9eb 100644
--- a/aos/atom_code/camera/Buffers.h
+++ b/aos/atom_code/camera/Buffers.h
@@ -54,7 +54,7 @@
const Message *message_;
static const std::string kQueueName;
// NULL for the Reader one.
- Queue *queue_;
+ RawQueue *queue_;
// Make the actual mmap calls.
// Called by Buffers() automatically.
void MMap();
diff --git a/aos/atom_code/camera/Reader.cpp b/aos/atom_code/camera/Reader.cpp
index c87d173..95f6128 100644
--- a/aos/atom_code/camera/Reader.cpp
+++ b/aos/atom_code/camera/Reader.cpp
@@ -17,6 +17,7 @@
#include "aos/atom_code/camera/V4L2.h"
#include "aos/atom_code/camera/Buffers.h"
#include "aos/common/logging/logging.h"
+#include "aos/atom_code/ipc_lib/queue.h"
#define CLEAR(x) memset(&(x), 0, sizeof(x))
@@ -31,7 +32,7 @@
// the bound socket listening for fd requests
int server_fd_;
- Queue *queue_, *recycle_queue_;
+ RawQueue *queue_, *recycle_queue_;
// the number of buffers currently queued in v4l2
uint32_t queued_;
public:
@@ -51,11 +52,11 @@
dev_name, errno, strerror(errno));
}
- queue_ = Queue::Fetch(Buffers::kQueueName.c_str(),
+ queue_ = RawQueue::Fetch(Buffers::kQueueName.c_str(),
sizeof(Buffers::Message), 971, 1,
1, Buffers::kNumBuffers, &recycle_queue_);
// read off any existing recycled messages
- while (recycle_queue_->ReadMessage(Queue::kNonBlock) != NULL);
+ while (recycle_queue_->ReadMessage(RawQueue::kNonBlock) != NULL);
queued_ = 0;
InitServer();
@@ -141,7 +142,7 @@
// we block waiting for one if we can't dequeue one without leaving
// the driver <= 2 (to be safe)
recycle_queue_->ReadMessage((queued_ <= 2) ?
- Queue::kBlock : Queue::kNonBlock));
+ RawQueue::kBlock : RawQueue::kNonBlock));
if (read != NULL) {
buf.index = read->index;
recycle_queue_->FreeMessage(read);
@@ -176,7 +177,7 @@
msg->bytesused = buf.bytesused;
memcpy(&msg->timestamp, &buf.timestamp, sizeof(msg->timestamp));
msg->sequence = buf.sequence;
- if (!queue->WriteMessage(msg, Queue::kOverride)) {
+ if (!queue_->WriteMessage(msg, RawQueue::kOverride)) {
LOG(WARNING,
"sending message %p with buf #%" PRIu32 " to queue %p failed."
" re-queueing now\n", msg, buf.index, queue_);
diff --git a/aos/atom_code/camera/camera.gyp b/aos/atom_code/camera/camera.gyp
index 9931806..f1743be 100644
--- a/aos/atom_code/camera/camera.gyp
+++ b/aos/atom_code/camera/camera.gyp
@@ -45,11 +45,11 @@
'Buffers.cpp',
],
'dependencies': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
'<(AOS)/build/aos.gyp:logging',
],
'export_dependent_settings': [
- '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:ipc_lib',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
},
{
@@ -74,6 +74,7 @@
'buffers',
'<(AOS)/atom_code/atom_code.gyp:init',
'<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
},
],
diff --git a/aos/atom_code/core/LogStreamer.cpp b/aos/atom_code/core/LogStreamer.cpp
index 1294128..20c5987 100644
--- a/aos/atom_code/core/LogStreamer.cpp
+++ b/aos/atom_code/core/LogStreamer.cpp
@@ -31,7 +31,7 @@
int index = 0;
while (true) {
- const LogMessage *const msg = ReadNext(Queue::kBlock, &index);
+ const LogMessage *const msg = ReadNext(RawQueue::kBlock, &index);
if (msg == NULL) continue;
internal::PrintMessage(stdout, *msg);
diff --git a/aos/atom_code/core/core.gyp b/aos/atom_code/core/core.gyp
index 74b3232..1aaebfb 100644
--- a/aos/atom_code/core/core.gyp
+++ b/aos/atom_code/core/core.gyp
@@ -32,6 +32,7 @@
'<(AOS)/build/aos.gyp:logging',
'<(AOS)/atom_code/atom_code.gyp:init',
'<(AOS)/common/common.gyp:time',
+ '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:queue',
],
},
{
diff --git a/aos/atom_code/ipc_lib/aos_sync.c b/aos/atom_code/ipc_lib/aos_sync.c
index 29f3081..92b11c4 100644
--- a/aos/atom_code/ipc_lib/aos_sync.c
+++ b/aos/atom_code/ipc_lib/aos_sync.c
@@ -1,4 +1,4 @@
-#include "aos_sync.h"
+#include "aos/atom_code/ipc_lib/aos_sync.h"
#include <stdio.h>
#include <linux/futex.h>
diff --git a/aos/atom_code/ipc_lib/aos_sync.h b/aos/atom_code/ipc_lib/aos_sync.h
index 80364e9..0cecd68 100644
--- a/aos/atom_code/ipc_lib/aos_sync.h
+++ b/aos/atom_code/ipc_lib/aos_sync.h
@@ -1,5 +1,5 @@
-#ifndef AOS_IPC_LIB_SYNC_H_
-#define AOS_IPC_LIB_SYNC_H_
+#ifndef AOS_ATOM_CODE_IPC_LIB_SYNC_H_
+#define AOS_ATOM_CODE_IPC_LIB_SYNC_H_
#include <stdlib.h>
#include <signal.h>
@@ -90,4 +90,4 @@
}
#endif // __cplusplus
-#endif
+#endif // AOS_ATOM_CODE_IPC_LIB_SYNC_H_
diff --git a/aos/atom_code/ipc_lib/binheap.c b/aos/atom_code/ipc_lib/binheap.c
deleted file mode 100644
index 8eb024d..0000000
--- a/aos/atom_code/ipc_lib/binheap.c
+++ /dev/null
@@ -1,125 +0,0 @@
-#include "binheap.h"
-#include <stdlib.h>
-#include <stdio.h>
-
-#ifndef TESTING_ASSERT
-#define TESTING_ASSERT(...)
-#endif
-#define Error(x) TESTING_ASSERT(0, x)
-
-#define MinData (0)
-
-void Initialize( int Elements, PriorityQueue H )
-{
- H->Capacity = Elements - 1;
- H->Size = 0;
- H->Elements[ 0 ] = MinData;
-}
-
-int Insert( ElementType X, PriorityQueue H )
-{
- int i;
-
- if( IsFull( H ) )
- {
- return -1;
- }
-
- for( i = ++H->Size; H->Elements[ i / 2 ] > X; i /= 2 )
- H->Elements[ i ] = H->Elements[ i / 2 ];
- H->Elements[ i ] = X;
- return 0;
-}
-
-void Remove( ElementType X, PriorityQueue H )
-{
- int i, Child, removed = 0;
- ElementType LastElement;
-
- for ( i = 1; i <= H->Size; ++i )
- {
- if( H->Elements[ i ] == X )
- {
- removed = i;
- break;
- }
- }
- if( removed == 0 )
- {
- fprintf(stderr, "could not find element %d to remove. not removing any\n", X);
- return;
- }
-
- LastElement = H->Elements[ H->Size-- ];
-
- for( i = removed; i * 2 <= H->Size; i = Child )
- {
- /* Find smaller child */
- Child = i * 2;
- if( Child != H->Size && H->Elements[ Child + 1 ]
- < H->Elements[ Child ] )
- Child++;
-
- /* Percolate one level */
- if( LastElement > H->Elements[ Child ] )
- H->Elements[ i ] = H->Elements[ Child ];
- else
- break;
- }
- H->Elements[ i ] = LastElement;
-}
-
-ElementType DeleteMin( PriorityQueue H )
-{
- int i, Child;
- ElementType MinElement, LastElement;
-
- if( IsEmpty( H ) )
- {
- Error( "Priority queue is empty" );
- return H->Elements[ 0 ];
- }
- MinElement = H->Elements[ 1 ];
- LastElement = H->Elements[ H->Size-- ];
-
- for( i = 1; i * 2 <= H->Size; i = Child )
- {
- /* Find smaller child */
- Child = i * 2;
- if( Child != H->Size && H->Elements[ Child + 1 ]
- < H->Elements[ Child ] )
- Child++;
-
- /* Percolate one level */
- if( LastElement > H->Elements[ Child ] )
- H->Elements[ i ] = H->Elements[ Child ];
- else
- break;
- }
- H->Elements[ i ] = LastElement;
- return MinElement;
-}
-
-ElementType GetMin( PriorityQueue H )
-{
- if( !IsEmpty( H ) )
- return H->Elements[ 1 ];
- Error( "Priority Queue is Empty" );
- return H->Elements[ 0 ];
-}
-
-int IsEmpty( PriorityQueue H )
-{
- return H->Size == 0;
-}
-
-int IsFull( PriorityQueue H )
-{
- return H->Size == H->Capacity;
-}
-
-int GetSize( PriorityQueue H )
-{
- return H->Size;
-}
-
diff --git a/aos/atom_code/ipc_lib/binheap.h b/aos/atom_code/ipc_lib/binheap.h
deleted file mode 100644
index 8c26f9f..0000000
--- a/aos/atom_code/ipc_lib/binheap.h
+++ /dev/null
@@ -1,29 +0,0 @@
-#ifndef _BinHeap_H
-#define _BinHeap_H
-
-#include <stdint.h>
-
-typedef uint8_t ElementType;
-struct HeapStruct;
-typedef struct HeapStruct *PriorityQueue;
-
-struct HeapStruct
-{
- int Capacity;
- int Size;
- ElementType *Elements;
-};
-
-// Elements is the number allocated at H->Elements
-void Initialize( int Elements, PriorityQueue H );
-// 0 if successful, -1 if full
-int Insert( ElementType X, PriorityQueue H );
-void Remove( ElementType X, PriorityQueue H );
-ElementType DeleteMin( PriorityQueue H );
-ElementType GetMin( PriorityQueue H );
-int IsEmpty( PriorityQueue H );
-int IsFull( PriorityQueue H );
-int GetSize( PriorityQueue H );
-
-#endif
-
diff --git a/aos/atom_code/ipc_lib/binheap_test.cpp b/aos/atom_code/ipc_lib/binheap_test.cpp
deleted file mode 100644
index 62eecd4..0000000
--- a/aos/atom_code/ipc_lib/binheap_test.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-extern "C" {
-#include "binheap.h"
-}
-
-#include <gtest/gtest.h>
-
-class BinHeapTest : public testing::Test{
- protected:
- static const int TEST_ELEMENTS = 57;
- PriorityQueue queue;
- virtual void SetUp(){
- queue = new HeapStruct();
- queue->Elements = new uint8_t[TEST_ELEMENTS];
- Initialize(TEST_ELEMENTS, queue);
- }
- virtual void TearDown(){
- delete[] queue->Elements;
- delete queue;
- }
-};
-
-std::ostream& operator<< (std::ostream& o, uint8_t c){
- return o<<(int)c;
-}
-
-testing::AssertionResult Contains(PriorityQueue queue, const uint8_t expected[], size_t length){
- for(size_t i = 0; i < length; ++i){
- //printf("expected[%d]=%d\n", i, expected[i]);
- if(DeleteMin(queue) != expected[i]){
- return testing::AssertionFailure() << "queue[" << i << "] != " << expected[i];
- }
- }
- if(!IsEmpty(queue))
- return testing::AssertionFailure() << "queue is longer than " << length;
- return ::testing::AssertionSuccess();
-}
-
-TEST_F(BinHeapTest, SingleElement){
- Insert(87, queue);
- EXPECT_EQ(87, DeleteMin(queue));
- EXPECT_TRUE(IsEmpty(queue));
-}
-TEST_F(BinHeapTest, MultipleElements){
- Insert(54, queue);
- Insert(1, queue);
- Insert(0, queue);
- Insert(255, queue);
- Insert(123, queue);
- uint8_t expected[] = {0, 1, 54, 123, 255};
- EXPECT_TRUE(Contains(queue, expected, sizeof(expected)));
-}
-TEST_F(BinHeapTest, Removals){
- Insert(54, queue);
- Insert(1, queue);
- Insert(0, queue);
- Insert(255, queue);
- Insert(123, queue);
- Remove(255, queue);
- Remove(0, queue);
- Insert(222, queue);
- Insert(67, queue);
- uint8_t expected[] = {1, 54, 67, 123, 222};
- EXPECT_TRUE(Contains(queue, expected, sizeof(expected)));
-}
-
diff --git a/aos/atom_code/ipc_lib/cmpxchg.h b/aos/atom_code/ipc_lib/cmpxchg.h
index acb4a3c..715c57d 100644
--- a/aos/atom_code/ipc_lib/cmpxchg.h
+++ b/aos/atom_code/ipc_lib/cmpxchg.h
@@ -9,10 +9,10 @@
#define cmpxchg(ptr, o, n) __sync_val_compare_and_swap(ptr, o, n)
/*#define xchg(ptr, n) ({typeof(*ptr) r; \
- do{ \
- r = *ptr; \
- }while(!__sync_bool_compare_and_swap(ptr, r, n)); \
- r; \
+ do{ \
+ r = *ptr; \
+ }while(!__sync_bool_compare_and_swap(ptr, r, n)); \
+ r; \
})*/
# define LOCK "lock;"
@@ -24,7 +24,7 @@
/*static inline void set_64bit(volatile unsigned long *ptr, unsigned long val)
{
- *ptr = val;
+ *ptr = val;
}
#define _set_64bit set_64bit*/
@@ -32,37 +32,37 @@
/*
* Note: no "lock" prefix even on SMP: xchg always implies lock anyway
* Note 2: xchg has side effect, so that attribute volatile is necessary,
- * but generally the primitive is invalid, *ptr is output argument. --ANK
+ * but generally the primitive is invalid, *ptr is output argument. --ANK
*/
static inline unsigned long __xchg(unsigned long x, volatile void * ptr, int size)
{
- switch (size) {
- case 1:
- __asm__ __volatile__("xchgb %b0,%1"
- :"=q" (x)
- :"m" (*__xg(ptr)), "0" (x)
- :"memory");
- break;
- case 2:
- __asm__ __volatile__("xchgw %w0,%1"
- :"=r" (x)
- :"m" (*__xg(ptr)), "0" (x)
- :"memory");
- break;
- case 4:
- __asm__ __volatile__("xchgl %k0,%1"
- :"=r" (x)
- :"m" (*__xg(ptr)), "0" (x)
- :"memory");
- break;
- case 8:
- __asm__ __volatile__("xchg %0,%1"
- :"=r" (x)
- :"m" (*__xg(ptr)), "0" (x)
- :"memory");
- break;
- }
- return x;
+ switch (size) {
+ case 1:
+ __asm__ __volatile__("xchgb %b0,%1"
+ :"=q" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ case 2:
+ __asm__ __volatile__("xchgw %w0,%1"
+ :"=r" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ case 4:
+ __asm__ __volatile__("xchgl %k0,%1"
+ :"=r" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ case 8:
+ __asm__ __volatile__("xchg %0,%1"
+ :"=r" (x)
+ :"m" (*__xg(ptr)), "0" (x)
+ :"memory");
+ break;
+ }
+ return x;
}
/*
@@ -76,78 +76,78 @@
#define __HAVE_ARCH_CMPXCHG 1
static inline unsigned long __cmpxchg(volatile void *ptr, unsigned long old,
- unsigned long new, int size)
+ unsigned long new, int size)
{
- int32_t prev;
- switch (size) {
- case 1:
- __asm__ __volatile__(LOCK_PREFIX "cmpxchgb %b1,%2"
- : "=a"(prev)
- : "q"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 2:
- __asm__ __volatile__(LOCK_PREFIX "cmpxchgw %w1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 4:
- __asm__ __volatile__(LOCK_PREFIX "cmpxchgl %k1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 8:
- __asm__ __volatile__("lock; cmpxchg %1,%2"
- : "=a"(prev)
- : "q"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- }
- return old;
+ int32_t prev;
+ switch (size) {
+ case 1:
+ __asm__ __volatile__(LOCK_PREFIX "cmpxchgb %b1,%2"
+ : "=a"(prev)
+ : "q"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 2:
+ __asm__ __volatile__(LOCK_PREFIX "cmpxchgw %w1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 4:
+ __asm__ __volatile__(LOCK_PREFIX "cmpxchgl %k1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 8:
+ __asm__ __volatile__("lock; cmpxchg %1,%2"
+ : "=a"(prev)
+ : "q"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ }
+ return old;
}
/*
static inline unsigned long __cmpxchg_local(volatile void *ptr,
- unsigned long old, unsigned long new, int size)
+ unsigned long old, unsigned long new, int size)
{
- unsigned long prev;
- switch (size) {
- case 1:
- __asm__ __volatile__("cmpxchgb %b1,%2"
- : "=a"(prev)
- : "q"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 2:
- __asm__ __volatile__("cmpxchgw %w1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 4:
- __asm__ __volatile__("cmpxchgl %k1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- case 8:
- __asm__ __volatile__("cmpxchgq %1,%2"
- : "=a"(prev)
- : "r"(new), "m"(*__xg(ptr)), "0"(old)
- : "memory");
- return prev;
- }
- return old;
+ unsigned long prev;
+ switch (size) {
+ case 1:
+ __asm__ __volatile__("cmpxchgb %b1,%2"
+ : "=a"(prev)
+ : "q"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 2:
+ __asm__ __volatile__("cmpxchgw %w1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 4:
+ __asm__ __volatile__("cmpxchgl %k1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ case 8:
+ __asm__ __volatile__("cmpxchgq %1,%2"
+ : "=a"(prev)
+ : "r"(new), "m"(*__xg(ptr)), "0"(old)
+ : "memory");
+ return prev;
+ }
+ return old;
}*/
#define cmpxchg(ptr,o,n)\
- ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
- (unsigned long)(n),sizeof(*(ptr))))
+ ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
+ (unsigned long)(n),sizeof(*(ptr))))
/*#define cmpxchg_local(ptr,o,n)\
- ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
- (unsigned long)(n),sizeof(*(ptr))))*/
+ ((__typeof__(*(ptr)))__cmpxchg((ptr),(unsigned long)(o),\
+ (unsigned long)(n),sizeof(*(ptr))))*/
#endif
#endif
diff --git a/aos/atom_code/ipc_lib/condition.cc b/aos/atom_code/ipc_lib/condition.cc
index 1524773..f65c67d 100644
--- a/aos/atom_code/ipc_lib/condition.cc
+++ b/aos/atom_code/ipc_lib/condition.cc
@@ -6,49 +6,23 @@
namespace aos {
+static_assert(shm_ok<condition_variable>::value,
+ "all C structs really should work in shared memory");
static_assert(shm_ok<Condition>::value, "Condition should work"
" in shared memory");
-Condition::Condition() : impl_(0) {}
+Condition::Condition(Mutex *m) : impl_(), m_(m) {}
-bool Condition::Wait() {
- switch (condition_wait(&impl_)) {
- case 1:
- return false;
- case 0:
- return true;
- default:
- if (errno != EINTR) {
- LOG(FATAL, "condition_wait(%p(=%"PRIu32")) failed because of %d: %s\n",
- &impl_, impl_, errno, strerror(errno));
- }
- return false;
- }
-}
-bool Condition::WaitNext() {
- switch (condition_wait_force(&impl_)) {
- case 1:
- return false;
- case 0:
- return true;
- default:
- if (errno != EINTR) {
- LOG(FATAL, "condition_wait_force(%p(=%"PRIu32")) failed"
- " because of %d: %s\n", &impl_, impl_, errno, strerror(errno));
- }
- return false;
- }
+void Condition::Wait() {
+ condition_wait(&impl_, &m_->impl_);
}
-void Condition::Set() {
- if (condition_set(&impl_) == -1) {
- LOG(FATAL, "condition_set(%p(=%"PRIu32")) failed because of %d: %s\n",
- &impl_, impl_, errno, strerror(errno));
- }
+void Condition::Signal() {
+ condition_signal(&impl_);
}
-void Condition::Unset() {
- // can not fail
- condition_unset(&impl_);
+
+void Condition::Broadcast() {
+ condition_broadcast(&impl_);
}
} // namespace aos
diff --git a/aos/atom_code/ipc_lib/core_lib.c b/aos/atom_code/ipc_lib/core_lib.c
index 26581a1..bbd2f5b 100644
--- a/aos/atom_code/ipc_lib/core_lib.c
+++ b/aos/atom_code/ipc_lib/core_lib.c
@@ -1,48 +1,43 @@
+#include "aos/atom_code/ipc_lib/core_lib.h"
+
#include <stdio.h>
#include <stdlib.h>
-#include "shared_mem.h"
-#include "core_lib.h"
-#include <time.h>
-void init_shared_mem_core(aos_shm_core *shm_core) {
- clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
- shm_core->msg_alloc_lock = 0;
- shm_core->queues.queue_list = NULL;
- shm_core->queues.alloc_lock = 0;
-}
+#include "aos/atom_code/ipc_lib/shared_mem.h"
+
static inline uint8_t aos_8max(uint8_t l, uint8_t r) {
- return (l > r) ? l : r;
+ return (l > r) ? l : r;
}
void *shm_malloc_aligned(size_t length, uint8_t alignment) {
- // minimum alignments from <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
- if (length <= 1) {
- alignment = aos_8max(alignment, 1);
- } else if (length <= 2) {
- alignment = aos_8max(alignment, 2);
- } else if (length <= 4) {
- alignment = aos_8max(alignment, 4);
- } else if (length <= 8) {
- alignment = aos_8max(alignment, 8);
- } else if (length <= 16) {
- alignment = aos_8max(alignment, 16);
- } else {
- alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
- }
+ // minimum alignments from <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
+ if (length <= 1) {
+ alignment = aos_8max(alignment, 1);
+ } else if (length <= 2) {
+ alignment = aos_8max(alignment, 2);
+ } else if (length <= 4) {
+ alignment = aos_8max(alignment, 4);
+ } else if (length <= 8) {
+ alignment = aos_8max(alignment, 8);
+ } else if (length <= 16) {
+ alignment = aos_8max(alignment, 16);
+ } else {
+ alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
+ }
- void *msg = NULL;
- aos_shm_core *shm_core = global_core->mem_struct;
- mutex_grab(&shm_core->msg_alloc_lock);
- shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
- const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
- shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
- msg = shm_core->msg_alloc;
- if (msg <= global_core->shared_mem) {
- fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
- printf("if you didn't see the stderr output just then, you should\n");
- abort();
- }
- //printf("alloc %p\n", msg);
- mutex_unlock(&shm_core->msg_alloc_lock);
- return msg;
+ void *msg = NULL;
+ aos_shm_core *shm_core = global_core->mem_struct;
+ mutex_grab(&shm_core->msg_alloc_lock);
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
+ const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
+ msg = shm_core->msg_alloc;
+ if (msg <= global_core->shared_mem) {
+ fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
+ printf("if you didn't see the stderr output just then, you should have\n");
+ abort();
+ }
+ //printf("alloc %p\n", msg);
+ mutex_unlock(&shm_core->msg_alloc_lock);
+ return msg;
}
diff --git a/aos/atom_code/ipc_lib/core_lib.h b/aos/atom_code/ipc_lib/core_lib.h
index 233660b..5674220 100644
--- a/aos/atom_code/ipc_lib/core_lib.h
+++ b/aos/atom_code/ipc_lib/core_lib.h
@@ -1,42 +1,14 @@
#ifndef _AOS_CORE_LIB_H_
#define _AOS_CORE_LIB_H_
-// defined in shared_mem.c
-#ifdef __cplusplus
-extern "C" {
-#endif // __cplusplus
-extern struct aos_core *global_core;
-#ifdef __cplusplus
-}
-#endif // __cplusplus
-
-#include "aos_sync.h"
-#include "queue.h"
#include <stdint.h>
+#include "aos/atom_code/ipc_lib/aos_sync.h"
+
#ifdef __cplusplus
extern "C" {
#endif // __cplusplus
-typedef struct aos_queue_global_t {
- mutex alloc_lock;
- void *queue_list; // an aos::Queue* declared in C code
-} aos_queue_global;
-
-typedef struct aos_shm_core_t {
- // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
- // this shared memory area
- struct timespec identifier;
- // gets 0-initialized at the start (as part of shared memory) and
- // the owner sets as soon as it finishes setting stuff up
- mutex creation_condition;
- mutex msg_alloc_lock;
- void *msg_alloc;
- aos_queue_global queues;
-} aos_shm_core;
-
-void init_shared_mem_core(aos_shm_core *shm_core);
-
void *shm_malloc_aligned(size_t length, uint8_t alignment)
__attribute__((alloc_size(1)));
static void *shm_malloc(size_t length) __attribute__((alloc_size(1)));
diff --git a/aos/atom_code/ipc_lib/ipc_lib.gyp b/aos/atom_code/ipc_lib/ipc_lib.gyp
index 4614680..7d42b04 100644
--- a/aos/atom_code/ipc_lib/ipc_lib.gyp
+++ b/aos/atom_code/ipc_lib/ipc_lib.gyp
@@ -1,41 +1,64 @@
{
'targets': [
{
- 'target_name': 'ipc_lib',
+ 'target_name': 'aos_sync',
'type': 'static_library',
'sources': [
'aos_sync.c',
- 'binheap.c',
- 'core_lib.c',
- 'queue.c',
- 'shared_mem.c',
- ],
- 'dependencies': [
- # TODO(brians): fix this once there's a nice logging interface to use
- # '<(AOS)/build/aos.gyp:logging',
],
},
{
- 'target_name': 'binheap_test',
- 'type': 'executable',
+ 'target_name': 'core_lib',
+ 'type': 'static_library',
'sources': [
- 'binheap_test.cpp',
+ 'core_lib.c',
],
'dependencies': [
- '<(EXTERNALS):gtest',
- 'ipc_lib',
+ 'aos_sync',
+ 'shared_mem',
+ ],
+ 'export_dependent_settings': [
+ 'aos_sync',
+ ],
+ },
+ {
+ 'target_name': 'shared_mem',
+ 'type': 'static_library',
+ 'sources': [
+ 'shared_mem.c',
+ ],
+ 'dependencies': [
+ 'aos_sync',
+ ],
+ 'export_dependent_settings': [
+ 'aos_sync',
+ ],
+ },
+ {
+ 'target_name': 'queue',
+ 'type': 'static_library',
+ 'sources': [
+ 'queue.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/common/common.gyp:condition',
+ '<(AOS)/common/common.gyp:mutex',
+ 'core_lib',
+ # TODO(brians): fix this once there's a nice logging interface to use
+ # '<(AOS)/build/aos.gyp:logging',
],
},
{
'target_name': 'ipc_queue_test',
'type': 'executable',
'sources': [
- 'queue_test.cpp',
+ 'queue_test.cc',
],
'dependencies': [
'<(EXTERNALS):gtest',
- 'ipc_lib',
+ 'queue',
'<(AOS)/build/aos.gyp:logging',
+ 'core_lib',
],
},
],
diff --git a/aos/atom_code/ipc_lib/queue.cc b/aos/atom_code/ipc_lib/queue.cc
index 57e2a5e..81d036c 100644
--- a/aos/atom_code/ipc_lib/queue.cc
+++ b/aos/atom_code/ipc_lib/queue.cc
@@ -1,4 +1,4 @@
-#include "aos/common/queue.h"
+#include "aos/atom_code/ipc_lib/queue.h"
#include <stdio.h>
#include <string.h>
@@ -9,12 +9,13 @@
#include "aos/common/logging/logging.h"
#include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
namespace aos {
-
namespace {
-static_assert(shm_ok<Queue>::value, "Queue instances go into shared memory");
+static_assert(shm_ok<RawQueue>::value,
+ "RawQueue instances go into shared memory");
const bool kReadDebug = false;
const bool kWriteDebug = false;
@@ -22,12 +23,19 @@
const bool kFetchDebug = false;
// The number of extra messages the pool associated with each queue will be able
-// to hold (for readers who are slow about freeing them).
+// to hold (for readers who are slow about freeing them or who leak one when
+// they get killed).
const int kExtraMessages = 20;
} // namespace
-struct Queue::MessageHeader {
+const int RawQueue::kPeek;
+const int RawQueue::kFromEnd;
+const int RawQueue::kNonBlock;
+const int RawQueue::kBlock;
+const int RawQueue::kOverride;
+
+struct RawQueue::MessageHeader {
int ref_count;
int index; // in pool_
static MessageHeader *Get(const void *msg) {
@@ -42,12 +50,12 @@
memcpy(this, &temp, sizeof(*this));
}
};
-static_assert(shm_ok<Queue::MessageHeader>::value, "the whole point"
+static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
" is to stick it in shared memory");
// TODO(brians) maybe do this with atomic integer instructions so it doesn't
// have to lock/unlock pool_lock_
-void Queue::DecrementMessageReferenceCount(const void *msg) {
+void RawQueue::DecrementMessageReferenceCount(const void *msg) {
MutexLocker locker(&pool_lock_);
MessageHeader *header = MessageHeader::Get(msg);
--header->ref_count;
@@ -60,7 +68,8 @@
}
}
-Queue::Queue(const char *name, size_t length, int hash, int queue_length) {
+RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
+ : readable_(&data_lock_), writable_(&data_lock_) {
const size_t name_size = strlen(name) + 1;
char *temp = static_cast<char *>(shm_malloc(name_size));
memcpy(temp, name, name_size);
@@ -97,7 +106,7 @@
printf("made queue %s\n", name);
}
}
-Queue *Queue::Fetch(const char *name, size_t length, int hash,
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
int queue_length) {
if (kFetchDebug) {
printf("fetching queue %s\n", name);
@@ -105,9 +114,9 @@
if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
return NULL;
}
- Queue *current = static_cast<Queue *>(
+ RawQueue *current = static_cast<RawQueue *>(
global_core->mem_struct->queues.queue_list);
- Queue *last = NULL;
+ RawQueue *last = NULL;
while (current != NULL) {
// if we found a matching queue
if (strcmp(current->name_, name) == 0 && current->length_ == length &&
@@ -123,8 +132,8 @@
current = current->next_;
}
- void *temp = shm_malloc(sizeof(Queue));
- current = new (temp) Queue(name, length, hash, queue_length);
+ void *temp = shm_malloc(sizeof(RawQueue));
+ current = new (temp) RawQueue(name, length, hash, queue_length);
if (last == NULL) { // if we don't have one to tack the new one on to
global_core->mem_struct->queues.queue_list = current;
} else {
@@ -134,10 +143,10 @@
mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
return current;
}
-Queue *Queue::Fetch(const char *name, size_t length, int hash,
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
int queue_length,
- int recycle_hash, int recycle_length, Queue **recycle) {
- Queue *r = Fetch(name, length, hash, queue_length);
+ int recycle_hash, int recycle_length, RawQueue **recycle) {
+ RawQueue *r = Fetch(name, length, hash, queue_length);
r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
if (r == r->recycle_) {
fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
@@ -148,7 +157,7 @@
return r;
}
-void Queue::DoFreeMessage(const void *msg) {
+void RawQueue::DoFreeMessage(const void *msg) {
MessageHeader *header = MessageHeader::Get(msg);
if (pool_[header->index] != header) { // if something's messed up
fprintf(stderr, "queue: something is very very wrong with queue %p."
@@ -206,13 +215,13 @@
}
}
-bool Queue::WriteMessage(void *msg, int options) {
+bool RawQueue::WriteMessage(void *msg, int options) {
if (kWriteDebug) {
printf("queue: %p->WriteMessage(%p, %d)\n", this, msg, options);
}
if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
msg > static_cast<void *>((
- reinterpret_cast<uintptr_t>(global_core->mem_struct) +
+ reinterpret_cast<char *>(global_core->mem_struct) +
global_core->size))) {
fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
msg, this);
@@ -239,7 +248,7 @@
if (kWriteDebug) {
printf("queue: going to wait for writable_ of %p\n", this);
}
- writable_.Wait(&data_lock_);
+ writable_.Wait();
}
new_end = (data_end_ + 1) % data_length_;
}
@@ -257,12 +266,12 @@
return true;
}
-void Queue::ReadCommonEnd(bool read) {
+void RawQueue::ReadCommonEnd(bool read) {
if (read) {
writable_.Signal();
}
}
-bool Queue::ReadCommonStart(int options, int *index) {
+bool RawQueue::ReadCommonStart(int options, int *index) {
while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
if (options & kNonBlock) {
if (kReadDebug) {
@@ -287,7 +296,7 @@
}
return true;
}
-void *Queue::ReadPeek(int options, int start) {
+void *RawQueue::ReadPeek(int options, int start) {
void *ret;
if (options & kFromEnd) {
int pos = data_end_ - 1;
@@ -311,7 +320,7 @@
}
return ret;
}
-const void *Queue::ReadMessage(int options) {
+const void *RawQueue::ReadMessage(int options) {
if (kReadDebug) {
printf("queue: %p->ReadMessage(%d)\n", this, options);
}
@@ -359,7 +368,7 @@
}
return msg;
}
-const void *Queue::ReadMessageIndex(int options, int *index) {
+const void *RawQueue::ReadMessageIndex(int options, int *index) {
if (kReadDebug) {
printf("queue: %p->ReadMessageIndex(%d, %p(*=%d))\n",
this, options, index, *index);
@@ -421,14 +430,14 @@
return msg;
}
-void *Queue::GetMessage() {
+void *RawQueue::GetMessage() {
MutexLocker locker(&pool_lock_);
MessageHeader *header;
if (pool_length_ - messages_used_ > 0) {
header = pool_[messages_used_];
} else {
if (pool_length_ >= mem_length_) {
- LOG(FATAL, "overused pool %p from queue %p\n", pool, queue);
+ LOG(FATAL, "overused pool of queue %p\n", this);
}
header = pool_[pool_length_] =
static_cast<MessageHeader *>(shm_malloc(msg_length_));
diff --git a/aos/atom_code/ipc_lib/queue.h b/aos/atom_code/ipc_lib/queue.h
index f7c116b..0ab4227 100644
--- a/aos/atom_code/ipc_lib/queue.h
+++ b/aos/atom_code/ipc_lib/queue.h
@@ -1,5 +1,5 @@
-#ifndef AOS_COMMON_QUEUE_H_
-#define AOS_COMMON_QUEUE_H_
+#ifndef AOS_ATOM_CODE_IPC_LIB_QUEUE_H_
+#define AOS_ATOM_CODE_IPC_LIB_QUEUE_H_
#include "aos/atom_code/ipc_lib/shared_mem.h"
#include "aos/common/mutex.h"
@@ -26,14 +26,14 @@
// of name and type signature will result in a different queue, which means
// that if you only recompile some code that uses differently sized messages,
// it will simply use a different queue than the old code.
-class Queue {
+class RawQueue {
public:
// Retrieves (and creates if necessary) a queue. Each combination of name and
// signature refers to a completely independent queue.
// length is how large each message will be
// hash can differentiate multiple otherwise identical queues
// queue_length is how many messages the queue will be able to hold
- static Queue *Fetch(const char *name, size_t length, int hash,
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
int queue_length);
// Same as above, except sets up the returned queue so that it will put
// messages on *recycle when they are freed (after they have been released by
@@ -47,10 +47,10 @@
// NOTE: calling this function with the same (name,length,hash,queue_length)
// but multiple recycle_queue_lengths will result in each freed message being
// put onto an undefined one of the recycle queues.
- static Queue *Fetch(const char *name, size_t length, int hash,
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
int queue_length,
int recycle_hash, int recycle_queue_length,
- Queue **recycle);
+ RawQueue **recycle);
// Constants for passing to options arguments.
// The non-conflicting ones can be combined with bitwise-or.
@@ -79,6 +79,7 @@
// Writes a message into the queue.
// This function takes ownership of msg.
// NOTE: msg must point to a valid message from this queue
+ // Returns truen on success.
bool WriteMessage(void *msg, int options);
// Reads a message out of the queue.
@@ -105,6 +106,7 @@
// FreeMessage.
void *GetMessage();
+ // It is ok to call this with msg == NULL.
void FreeMessage(const void *msg) { DecrementMessageReferenceCount(msg); }
private:
@@ -116,9 +118,9 @@
int hash_;
int queue_length_;
// The next one in the linked list of queues.
- Queue *next_;
+ RawQueue *next_;
- Queue *recycle_;
+ RawQueue *recycle_;
Mutex data_lock_; // protects operations on data_ etc
Condition readable_;
@@ -152,9 +154,9 @@
void *ReadPeek(int options, int start);
// Gets called by Fetch when necessary (with placement new).
- Queue(const char *name, size_t length, int hash, int queue_length);
+ RawQueue(const char *name, size_t length, int hash, int queue_length);
};
} // namespace aos
-#endif // AOS_COMMONG_QUEUE_H_
+#endif // AOS_ATOM_CODE_IPC_LIB_QUEUE_H_
diff --git a/aos/atom_code/ipc_lib/queue_test.cc b/aos/atom_code/ipc_lib/queue_test.cc
index b437c86..276e81a 100644
--- a/aos/atom_code/ipc_lib/queue_test.cc
+++ b/aos/atom_code/ipc_lib/queue_test.cc
@@ -11,6 +11,7 @@
#include "gtest/gtest.h"
#include "aos/atom_code/ipc_lib/sharedmem_test_setup.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
#include "aos/common/type_traits.h"
using ::testing::AssertionResult;
@@ -245,7 +246,7 @@
int16_t data; // don't really want to test empty messages
};
struct MessageArgs {
- Queue *const queue;
+ RawQueue *const queue;
int flags;
int16_t data; // -1 means NULL expected
};
@@ -266,13 +267,14 @@
args->queue->ReadMessage(args->flags));
if (msg == NULL) {
if (args->data != -1) {
- snprintf(failure, kFailureSize, "expected data of %"PRId16" but got NULL message",
+ snprintf(failure, kFailureSize,
+ "expected data of %" PRId16 " but got NULL message",
args->data);
}
} else {
if (args->data != msg->data) {
snprintf(failure, kFailureSize,
- "expected data of %"PRId16" but got %"PRId16" instead",
+ "expected data of %" PRId16 " but got %" PRId16 " instead",
args->data, msg->data);
}
args->queue->FreeMessage(msg);
@@ -283,75 +285,75 @@
std::map<QueueTest::ChildID, QueueTest::ForkedProcess *> QueueTest::children_;
TEST_F(QueueTest, Reading) {
- Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
MessageArgs args{queue, 0, -1};
- args.flags = Queue::kNonBlock;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = Queue::kNonBlock | Queue::kPeek;
+ args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = Queue::kPeek;
+ args.flags = RawQueue::kPeek;
EXPECT_HANGS(ReadTestMessage, &args);
args.data = 254;
- args.flags = Queue::kBlock;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = Queue::kPeek;
+ args.flags = RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = Queue::kPeek;
+ args.flags = RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = Queue::kPeek | Queue::kNonBlock;
+ args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
args.data = -1;
EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = Queue::kNonBlock;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
args.flags = 0;
args.data = 971;
EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
}
TEST_F(QueueTest, Writing) {
- Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
MessageArgs args{queue, 0, 973};
- args.flags = Queue::kBlock;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = Queue::kBlock;
+ args.flags = RawQueue::kBlock;
EXPECT_HANGS(WriteTestMessage, &args);
- args.flags = Queue::kNonBlock;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = Queue::kNonBlock;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = Queue::kPeek;
+ args.flags = RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
args.data = 971;
- args.flags = Queue::kOverride;
+ args.flags = RawQueue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = Queue::kOverride;
+ args.flags = RawQueue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = Queue::kNonBlock;
+ args.flags = RawQueue::kNonBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = Queue::kOverride;
+ args.flags = RawQueue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
args.flags = 0;
EXPECT_RETURNS(ReadTestMessage, &args);
}
TEST_F(QueueTest, MultiRead) {
- Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
MessageArgs args{queue, 0, 1323};
- args.flags = Queue::kBlock;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = Queue::kBlock;
+ args.flags = RawQueue::kBlock;
ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
@@ -361,22 +363,22 @@
TEST_F(QueueTest, Recycle) {
// TODO(brians) basic test of recycle queue
// include all of the ways a message can get into the recycle queue
- Queue *recycle_queue = reinterpret_cast<Queue *>(23);
- Queue *const queue = Queue::Fetch("Queue", sizeof(TestMessage), 1, 2, 2, 2,
+ RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2, 2, 2,
&recycle_queue);
- ASSERT_NE(reinterpret_cast<Queue *>(23), recycle_queue);
+ ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
- args.flags = Queue::kBlock;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(WriteTestMessage, &args);
EXPECT_HANGS(ReadTestMessage, &recycle);
args.data = 254;
EXPECT_RETURNS(WriteTestMessage, &args);
EXPECT_HANGS(ReadTestMessage, &recycle);
args.data = 971;
- args.flags = Queue::kOverride;
+ args.flags = RawQueue::kOverride;
EXPECT_RETURNS(WriteTestMessage, &args);
- recycle.flags = Queue::kBlock;
+ recycle.flags = RawQueue::kBlock;
EXPECT_RETURNS(ReadTestMessage, &recycle);
EXPECT_HANGS(ReadTestMessage, &recycle);
@@ -391,11 +393,11 @@
EXPECT_HANGS(ReadTestMessage, &recycle);
args.data = 254;
- args.flags = Queue::kPeek;
+ args.flags = RawQueue::kPeek;
EXPECT_RETURNS(ReadTestMessage, &args);
- recycle.flags = Queue::kBlock;
+ recycle.flags = RawQueue::kBlock;
EXPECT_HANGS(ReadTestMessage, &recycle);
- args.flags = Queue::kBlock;
+ args.flags = RawQueue::kBlock;
EXPECT_RETURNS(ReadTestMessage, &args);
recycle.data = 254;
EXPECT_RETURNS(ReadTestMessage, &recycle);
diff --git a/aos/atom_code/ipc_lib/shared_mem.c b/aos/atom_code/ipc_lib/shared_mem.c
index cea9a67..e2c2c9e 100644
--- a/aos/atom_code/ipc_lib/shared_mem.c
+++ b/aos/atom_code/ipc_lib/shared_mem.c
@@ -1,4 +1,4 @@
-#include "shared_mem.h"
+#include "aos/atom_code/ipc_lib/shared_mem.h"
#include <stdio.h>
#include <string.h>
@@ -8,22 +8,31 @@
#include <sys/types.h>
#include <errno.h>
+#include "aos/atom_code/ipc_lib/core_lib.h"
+
// the path for the shared memory segment. see shm_open(3) for restrictions
#define AOS_SHM_NAME "/aos_shared_mem"
// Size of the shared mem segment.
// set to the maximum number that worked
#define SIZEOFSHMSEG (4096 * 27813)
+void init_shared_mem_core(aos_shm_core *shm_core) {
+ clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
+ shm_core->msg_alloc_lock = 0;
+ shm_core->queues.queue_list = NULL;
+ shm_core->queues.alloc_lock = 0;
+}
+
ptrdiff_t aos_core_get_mem_usage(void) {
return global_core->size -
((ptrdiff_t)global_core->mem_struct->msg_alloc -
(ptrdiff_t)global_core->mem_struct);
}
-struct aos_core global_core_data;
struct aos_core *global_core = NULL;
int aos_core_create_shared_mem(enum aos_core_create to_create) {
+ static struct aos_core global_core_data;
global_core = &global_core_data;
int shm;
before:
diff --git a/aos/atom_code/ipc_lib/shared_mem.h b/aos/atom_code/ipc_lib/shared_mem.h
index d8f2d18..c0d21ac 100644
--- a/aos/atom_code/ipc_lib/shared_mem.h
+++ b/aos/atom_code/ipc_lib/shared_mem.h
@@ -1,19 +1,40 @@
#ifndef _SHARED_MEM_H_
#define _SHARED_MEM_H_
-#include "core_lib.h"
#include <stddef.h>
#include <unistd.h>
+#include <time.h>
+
+#include "aos/atom_code/ipc_lib/aos_sync.h"
#ifdef __cplusplus
extern "C" {
#endif
+extern struct aos_core *global_core;
+
// Where the shared memory segment starts in each process's address space.
// Has to be the same in all of them so that stuff in shared memory
// can have regular pointers to other stuff in shared memory.
#define SHM_START 0x20000000
+typedef struct aos_queue_global_t {
+ mutex alloc_lock;
+ void *queue_list; // an aos::Queue* declared in C code
+} aos_queue_global;
+
+typedef struct aos_shm_core_t {
+ // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
+ // this shared memory area
+ struct timespec identifier;
+ // gets 0-initialized at the start (as part of shared memory) and
+ // the owner sets as soon as it finishes setting stuff up
+ mutex creation_condition;
+ mutex msg_alloc_lock;
+ void *msg_alloc;
+ aos_queue_global queues;
+} aos_shm_core;
+
enum aos_core_create {
create,
reference
@@ -26,6 +47,8 @@
aos_shm_core *mem_struct;
};
+void init_shared_mem_core(aos_shm_core *shm_core);
+
ptrdiff_t aos_core_get_mem_usage(void);
// Takes the specified memory address and uses it as the shared memory.
diff --git a/aos/atom_code/logging/atom_logging.cc b/aos/atom_code/logging/atom_logging.cc
index 6ea3b24..854da17 100644
--- a/aos/atom_code/logging/atom_logging.cc
+++ b/aos/atom_code/logging/atom_logging.cc
@@ -17,8 +17,6 @@
#include "aos/atom_code/thread_local.h"
#include "aos/atom_code/ipc_lib/queue.h"
-using ::aos::Queue;
-
namespace aos {
namespace logging {
namespace {
@@ -55,7 +53,7 @@
return process_name + '.' + thread_name;
}
-static Queue *queue;
+RawQueue *queue;
} // namespace
namespace internal {
@@ -100,7 +98,7 @@
void Register() {
Init();
- queue = Queue::Fetch("LoggingQueue", sizeof(LogMessage), 1323, 1500);
+ queue = RawQueue::Fetch("LoggingQueue", sizeof(LogMessage), 1323, 1500);
if (queue == NULL) {
Die("logging: couldn't fetch queue\n");
}
@@ -113,7 +111,7 @@
}
const LogMessage *ReadNext() {
- return ReadNext(Queue::kBlock);
+ return ReadNext(RawQueue::kBlock);
}
const LogMessage *ReadNext(int flags) {
@@ -121,7 +119,7 @@
do {
r = static_cast<const LogMessage *>(queue->ReadMessage(flags));
// not blocking means return a NULL if that's what it gets
- } while ((flags & Queue::kBlock) && r == NULL);
+ } while ((flags & RawQueue::kBlock) && r == NULL);
return r;
}
@@ -134,7 +132,7 @@
}
void Write(LogMessage *msg) {
- if (!queue->WriteMessage(msg, Queue::kOverride)) {
+ if (!queue->WriteMessage(msg, RawQueue::kOverride)) {
LOG(FATAL, "writing failed");
}
}
diff --git a/aos/atom_code/queue-tmpl.h b/aos/atom_code/queue-tmpl.h
index bb043e1..0cc9392 100644
--- a/aos/atom_code/queue-tmpl.h
+++ b/aos/atom_code/queue-tmpl.h
@@ -5,7 +5,7 @@
assert(msg_ != NULL);
msg_->SetTimeToNow();
assert(queue_ != NULL);
- bool return_value = aos_queue_write_msg_free(queue_, msg_, OVERRIDE) == 0;
+ bool return_value = queue_->WriteMessage(msg_, RawQueue::kOverride);
msg_ = NULL;
return return_value;
}
@@ -15,7 +15,7 @@
assert(msg_ != NULL);
msg_->SetTimeToNow();
assert(queue_ != NULL);
- bool return_value = aos_queue_write_msg_free(queue_, msg_, BLOCK) == 0;
+ bool return_value = queue_->WriteMessage(msg_, RawQueue::kBlock);
msg_ = NULL;
return return_value;
}
@@ -23,7 +23,7 @@
template <class T>
void ScopedMessagePtr<T>::reset(T *msg) {
if (queue_ != NULL && msg_ != NULL) {
- aos_queue_free_msg(queue_, msg_);
+ queue_->FreeMessage(msg);
}
msg_ = msg;
}
@@ -81,13 +81,12 @@
assert(msg_ != NULL);
assert(queue_ != NULL);
msg_->SetTimeToNow();
- T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
+ T *shm_msg = static_cast<T *>(queue_->GetMessage());
if (shm_msg == NULL) {
return false;
}
*shm_msg = *msg_;
- bool return_value =
- aos_queue_write_msg_free(queue_, shm_msg, OVERRIDE) == 0;
+ bool return_value = queue_->WriteMessage(shm_msg, RawQueue::kOverride);
reset();
return return_value;
}
@@ -100,12 +99,12 @@
assert(msg_ != NULL);
assert(queue_ != NULL);
msg_->SetTimeToNow();
- T *shm_msg = static_cast<T *>(aos_queue_get_msg(queue_));
+ T *shm_msg = static_cast<T *>(queue_->GetMessage());
if (shm_msg == NULL) {
return false;
}
*shm_msg = *msg_;
- bool return_value = aos_queue_write_msg_free(queue_, shm_msg, BLOCK) == 0;
+ bool return_value = queue_->WriteMessage(shm_msg, RawQueue::kBlock);
reset();
return return_value;
}
@@ -145,7 +144,7 @@
friend class aos::SafeMessageBuilder<T>;
// Only Queue should be able to build a message pointer.
- SafeScopedMessagePtr(aos_queue *queue)
+ SafeScopedMessagePtr(RawQueue *queue)
: queue_(queue), msg_(new T()) {}
// Sets the pointer to msg, freeing the old value if it was there.
@@ -159,10 +158,10 @@
}
// Sets the queue that owns this message.
- void set_queue(aos_queue *queue) { queue_ = queue; }
+ void set_queue(RawQueue *queue) { queue_ = queue; }
// The queue that the message is a part of.
- aos_queue *queue_;
+ RawQueue *queue_;
// The message or NULL.
T *msg_;
};
@@ -170,11 +169,9 @@
template <class T>
void Queue<T>::Init() {
if (queue_ == NULL) {
- // Signature of the message.
- aos_type_sig kQueueSignature{sizeof(T), static_cast<int>(T::kHash),
- T::kQueueLength};
-
- queue_ = aos_fetch_queue(queue_name_, &kQueueSignature);
+ queue_ = RawQueue::Fetch(queue_name_, sizeof(T),
+ static_cast<int>(T::kHash),
+ T::kQueueLength);
queue_msg_.set_queue(queue_);
}
}
@@ -191,11 +188,11 @@
template <class T>
bool Queue<T>::FetchNext() {
Init();
- // TODO(aschuh): Use aos_queue_read_msg_index so that multiple readers
+ // TODO(aschuh): Use RawQueue::ReadMessageIndex so that multiple readers
// reading don't randomly get only part of the messages.
// Document here the tradoffs that are part of each method.
- const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
- NON_BLOCK));
+ const T *msg = static_cast<const T *>(
+ queue_->ReadMessage(RawQueue::kNonBlock));
// Only update the internal pointer if we got a new message.
if (msg != NULL) {
queue_msg_.reset(msg);
@@ -206,7 +203,7 @@
template <class T>
bool Queue<T>::FetchNextBlocking() {
Init();
- const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_, BLOCK));
+ const T *msg = static_cast<const T *>(queue_->ReadMessage(RawQueue::kBlock));
queue_msg_.reset(msg);
assert (msg != NULL);
return true;
@@ -215,16 +212,16 @@
template <class T>
bool Queue<T>::FetchLatest() {
Init();
- const T *msg = static_cast<const T *>(aos_queue_read_msg(queue_,
- FROM_END | NON_BLOCK | PEEK));
+ const T *msg = static_cast<const T *>(queue_->ReadMessage(
+ RawQueue::kFromEnd | RawQueue::kNonBlock | RawQueue::kPeek));
// Only update the internal pointer if we got a new message.
if (msg != NULL && msg != queue_msg_.get()) {
queue_msg_.reset(msg);
return true;
}
- // The message has to get freed if we didn't use it (and aos_queue_free_msg is
- // ok to call on NULL).
- aos_queue_free_msg(queue_, msg);
+ // The message has to get freed if we didn't use it (and RawQueue::FreeMessage
+ // is ok to call on NULL).
+ queue_->FreeMessage(msg);
return false;
}
@@ -244,7 +241,7 @@
template <class T>
T *Queue<T>::MakeRawMessage() {
- T *ret = static_cast<T *>(aos_queue_get_msg(queue_));
+ T *ret = static_cast<T *>(queue_->GetMessage());
assert(ret != NULL);
return ret;
}