wrote the stuff for putting MessageTypes in shm (untested)
diff --git a/aos/build/aos.gyp b/aos/build/aos.gyp
index a6e4f33..b7775a1 100644
--- a/aos/build/aos.gyp
+++ b/aos/build/aos.gyp
@@ -25,7 +25,6 @@
],
'dependencies': [
'<(AOS)/common/common.gyp:die',
- '<(AOS)/common/common.gyp:queue_types',
],
},
{
diff --git a/aos/common/common.gyp b/aos/common/common.gyp
index 00697b4..d7ec2d2 100644
--- a/aos/common/common.gyp
+++ b/aos/common/common.gyp
@@ -47,6 +47,11 @@
'sources': [
'queue_types.cc',
],
+ 'dependencies': [
+ '<(AOS)/build/aos.gyp:logging_interface',
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:shared_mem',
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:core_lib',
+ ],
},
{
'target_name': 'queue_types_test',
diff --git a/aos/common/logging/logging.gyp b/aos/common/logging/logging.gyp
index b6b339a..ad20166 100644
--- a/aos/common/logging/logging.gyp
+++ b/aos/common/logging/logging.gyp
@@ -11,5 +11,15 @@
'<(AOS)/build/aos.gyp:logging',
],
},
+ {
+ 'target_name': 'queue_logging',
+ 'type': 'static_library',
+ 'sources': [
+ 'queue_logging.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/common/common.gyp:queue_types',
+ ],
+ },
],
}
diff --git a/aos/common/queue_types.cc b/aos/common/queue_types.cc
index 03d048a..a1925e9 100644
--- a/aos/common/queue_types.cc
+++ b/aos/common/queue_types.cc
@@ -1,10 +1,14 @@
#include "aos/common/queue_types.h"
-#include <errno.h>
+#include <inttypes.h>
#include <memory>
+#include <unordered_map>
#include "aos/common/byteorder.h"
+#include "aos/linux_code/ipc_lib/shared_mem.h"
+#include "aos/common/logging/logging.h"
+#include "aos/linux_code/ipc_lib/core_lib.h"
namespace aos {
@@ -20,7 +24,6 @@
}
if (max_bytes < sizeof(id) + sizeof(name_length) + sizeof(number_fields) +
name_length + fields_size) {
- errno = EOVERFLOW;
return -1;
}
to_network(&id, buffer);
@@ -49,7 +52,6 @@
decltype(MessageType::id) id;
decltype(MessageType::number_fields) number_fields;
if (*bytes < sizeof(id) + sizeof(name_length) + sizeof(number_fields)) {
- errno = EOVERFLOW;
return nullptr;
}
*bytes -= sizeof(id) + sizeof(name_length) + sizeof(number_fields);
@@ -62,7 +64,6 @@
buffer += sizeof(number_fields);
if (*bytes < name_length) {
- errno = EOVERFLOW;
return nullptr;
}
*bytes -= name_length;
@@ -79,7 +80,6 @@
for (int i = 0; i < number_fields; ++i) {
size_t field_name_length;
if (*bytes < sizeof(fields[i]->type) + sizeof(field_name_length)) {
- errno = EOVERFLOW;
return nullptr;
}
*bytes -= sizeof(fields[i]->type) + sizeof(field_name_length);
@@ -90,7 +90,6 @@
buffer += sizeof(field_name_length);
if (*bytes < field_name_length) {
- errno = EOVERFLOW;
return nullptr;
}
*bytes -= field_name_length;
@@ -104,4 +103,95 @@
return r.release();
}
+namespace type_cache {
+namespace {
+
+struct CacheEntry {
+ const MessageType &type;
+ bool in_shm;
+
+ CacheEntry(const MessageType &type) : type(type), in_shm(false) {}
+};
+
+struct ShmType {
+ uint32_t id;
+ volatile ShmType *next;
+
+ size_t serialized_size;
+ char serialized[];
+};
+
+::std::unordered_map<uint32_t, CacheEntry> cache;
+
+} // namespace
+
+void Add(const MessageType &type) {
+ if (cache.count(type.id) == 0) {
+ cache.emplace(type.id, type);
+ }
+}
+
+const MessageType &Get(uint32_t type_id) {
+ if (cache.count(type_id) > 0) {
+ return cache.at(type_id).type;
+ }
+
+ const volatile ShmType *c = static_cast<volatile ShmType *>(
+ global_core->mem_struct->queue_types.pointer);
+ while (c != nullptr) {
+ if (c->id == type_id) {
+ size_t bytes = c->serialized_size;
+ MessageType *type = MessageType::Deserialize(
+ const_cast<const char *>(c->serialized), &bytes);
+ cache.emplace(type_id, *type);
+ return *type;
+ }
+ c = c->next;
+ }
+ LOG(FATAL, "MessageType for id 0x%" PRIx32 " not found\n", type_id);
+}
+
+void AddShm(uint32_t type_id) {
+ CacheEntry &cached = cache.at(type_id);
+ if (cached.in_shm) return;
+
+ if (mutex_lock(&global_core->mem_struct->queue_types.lock) != 0) {
+ LOG(FATAL, "locking queue_types lock failed\n");
+ }
+ volatile ShmType *current = static_cast<volatile ShmType *>(
+ global_core->mem_struct->queue_types.pointer);
+ if (current != nullptr) {
+ while (true) {
+ if (current->id == type_id) {
+ cached.in_shm = true;
+ mutex_unlock(&global_core->mem_struct->queue_types.lock);
+ return;
+ }
+ if (current->next == nullptr) break;
+ current = current->next;
+ }
+ }
+ char buffer[512];
+ ssize_t size = cached.type.Serialize(buffer, sizeof(buffer));
+ if (size == -1) {
+ LOG(FATAL, "type %s is too big to fit into %zd bytes\n",
+ cached.type.name, sizeof(buffer));
+ }
+
+ volatile ShmType *shm =
+ static_cast<volatile ShmType *>(shm_malloc(sizeof(ShmType) + size));
+ shm->id = type_id;
+ shm->next = nullptr;
+ shm->serialized_size = size;
+ memcpy(const_cast<char *>(shm->serialized), buffer, size);
+
+ if (current == NULL) {
+ global_core->mem_struct->queue_types.pointer = const_cast<ShmType *>(shm);
+ } else {
+ current->next = shm;
+ }
+ mutex_unlock(&global_core->mem_struct->queue_types.lock);
+}
+
+} // namespace type_cache
} // namespace aos
diff --git a/aos/common/queue_types.h b/aos/common/queue_types.h
index ddc6aea..ba1d7fb 100644
--- a/aos/common/queue_types.h
+++ b/aos/common/queue_types.h
@@ -39,7 +39,7 @@
// Constructs a MessageType that doesn't own the storage for any of its
// names.
MessageType(uint32_t id, const char *name,
- ::std::initializer_list<Field *> fields_initializer)
+ ::std::initializer_list<const Field *> fields_initializer)
: id(id), name(name), owns_names(false) {
number_fields = fields_initializer.size();
fields = new const Field *[number_fields];
@@ -61,11 +61,11 @@
}
}
- // Returns -1 for error (in errno).
+ // Returns -1 if max_bytes is too small.
ssize_t Serialize(char *buffer, size_t max_bytes) const;
// bytes should start out as the number of bytes available in buffer and gets
// reduced by the number actually read before returning.
- // Returns a new instance allocated with new or NULL for error (in errno).
+ // Returns a new instance allocated with new or nullptr for error.
static MessageType *Deserialize(const char *buffer, size_t *bytes);
static bool IsPrimitive(uint32_t type_id) {
diff --git a/aos/common/queue_types_test.cc b/aos/common/queue_types_test.cc
index 38334df..bd95e3a 100644
--- a/aos/common/queue_types_test.cc
+++ b/aos/common/queue_types_test.cc
@@ -13,15 +13,59 @@
new Field{0, "field2"},
new Field{0, "field3"}});
-TEST(QueueTypesTest, Serialization) {
+class QueueTypesTest : public ::testing::Test {
+ public:
+ ::testing::AssertionResult Equal(const MessageType &l, const MessageType &r) {
+ using ::testing::AssertionFailure;
+ if (l.id != r.id) {
+ return AssertionFailure() << "id " << l.id << " != " << r.id;
+ }
+ if (strcmp(l.name, r.name) != 0) {
+ return AssertionFailure() << "name '" << l.name << "' != '" << r.name
+ << "'";
+ }
+ if (l.number_fields != r.number_fields) {
+ return AssertionFailure() << "number_fields " << l.number_fields
+ << " != " << r.number_fields;
+ }
+ for (int i = 0; i < l.number_fields; ++i) {
+ SCOPED_TRACE("field " + ::std::to_string(i));
+ if (l.fields[i]->type != r.fields[i]->type) {
+ return AssertionFailure() << "type " << l.fields[i]->type
+ << " != " << r.fields[i]->type;
+ }
+ if (strcmp(l.fields[i]->name, r.fields[i]->name) != 0) {
+ return AssertionFailure() << "name '" << l.fields[i]->name << "' != '"
+ << r.fields[i]->name << "'";
+ }
+ }
+ return ::testing::AssertionSuccess();
+ }
+};
+
+TEST_F(QueueTypesTest, Serialization) {
char buffer[512];
ssize_t size;
+ size_t out_size;
+ ::std::unique_ptr<MessageType> deserialized;
size = kTestType1.Serialize(buffer, sizeof(buffer));
EXPECT_GT(size, 1);
- size_t out_size = size;
- ::std::unique_ptr<MessageType> deserialized(MessageType::Deserialize(buffer, &out_size));
+
+ out_size = size;
+ deserialized.reset(MessageType::Deserialize(buffer, &out_size));
EXPECT_EQ(0u, out_size);
+ EXPECT_TRUE(Equal(kTestType1, *deserialized));
+
+ out_size = size - 1;
+ deserialized.reset(MessageType::Deserialize(buffer, &out_size));
+ EXPECT_EQ(nullptr, deserialized.get());
+
+ out_size = size + 1;
+ ASSERT_LE(out_size, sizeof(buffer));
+ deserialized.reset(MessageType::Deserialize(buffer, &out_size));
+ EXPECT_EQ(1u, out_size);
+ EXPECT_TRUE(Equal(kTestType1, *deserialized));
}
} // namespace aos
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index 8a2f15b..8a86b7f 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -116,17 +116,17 @@
if (kFetchDebug) {
printf("fetching queue %s\n", name);
}
- if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
+ if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
return NULL;
}
RawQueue *current = static_cast<RawQueue *>(
- global_core->mem_struct->queues.queue_list);
+ global_core->mem_struct->queues.pointer);
if (current != NULL) {
while (true) {
// If we found a matching queue.
if (strcmp(current->name_, name) == 0 && current->length_ == length &&
current->hash_ == hash && current->queue_length_ == queue_length) {
- mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ mutex_unlock(&global_core->mem_struct->queues.lock);
return current;
} else {
if (kFetchDebug) {
@@ -143,12 +143,12 @@
RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
RawQueue(name, length, hash, queue_length);
if (current == NULL) { // if we don't already have one
- global_core->mem_struct->queues.queue_list = r;
+ global_core->mem_struct->queues.pointer = r;
} else {
current->next_ = r;
}
- mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ mutex_unlock(&global_core->mem_struct->queues.lock);
return r;
}
RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
diff --git a/aos/linux_code/ipc_lib/shared_mem.c b/aos/linux_code/ipc_lib/shared_mem.c
index 4e38117..da91b68 100644
--- a/aos/linux_code/ipc_lib/shared_mem.c
+++ b/aos/linux_code/ipc_lib/shared_mem.c
@@ -21,8 +21,10 @@
void init_shared_mem_core(aos_shm_core *shm_core) {
clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
shm_core->msg_alloc_lock = 0;
- shm_core->queues.queue_list = NULL;
- shm_core->queues.alloc_lock = 0;
+ shm_core->queues.pointer = NULL;
+ shm_core->queues.lock = 0;
+ shm_core->queue_types.pointer = NULL;
+ shm_core->queue_types.lock = 0;
}
ptrdiff_t aos_core_get_mem_usage(void) {
diff --git a/aos/linux_code/ipc_lib/shared_mem.h b/aos/linux_code/ipc_lib/shared_mem.h
index e5059c4..c1b1f9c 100644
--- a/aos/linux_code/ipc_lib/shared_mem.h
+++ b/aos/linux_code/ipc_lib/shared_mem.h
@@ -18,10 +18,12 @@
// can have regular pointers to other stuff in shared memory.
#define SHM_START 0x20000000
-typedef struct aos_queue_global_t {
- mutex alloc_lock;
- void *queue_list; // an aos::Queue* declared in C code
-} aos_queue_global;
+// A structure that represents some kind of global pointer that everything
+// shares.
+typedef struct aos_global_pointer_t {
+ mutex lock;
+ void *pointer;
+} aos_global_pointer;
typedef struct aos_shm_core_t {
// clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
@@ -32,7 +34,12 @@
mutex creation_condition;
mutex msg_alloc_lock;
void *msg_alloc;
- aos_queue_global queues;
+ // A pointer to the head of the linked list of queues.
+ // pointer points to a ::aos::Queue.
+ aos_global_pointer queues;
+ // A pointer to the head of the linked list of queue message types.
+ // pointer points to a ::aos::type_cache::ShmType.
+ aos_global_pointer queue_types;
} aos_shm_core;
enum aos_core_create {