Remove old shared memory queue
We can't quite kill shared memory, but we can kill the queue. There are
no more users.
Change-Id: I8c3ab008ab31a51cbcccf12ce89a268aed304473
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 1989f06..75452bd 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -65,45 +65,6 @@
],
)
-cc_library(
- name = "queue",
- srcs = [
- "queue.cc",
- ],
- hdrs = [
- "queue.h",
- ],
- linkopts = [
- "-lrt",
- ],
- visibility = ["//visibility:public"],
- deps = [
- ":core_lib",
- ":shared_mem",
- "//aos:condition",
- "//aos/mutex",
- "//aos/util:options",
- ],
-)
-
-cc_test(
- name = "raw_queue_test",
- srcs = [
- "raw_queue_test.cc",
- ],
- deps = [
- ":core_lib",
- ":queue",
- "//aos:die",
- "//aos/logging",
- "//aos/testing:googletest",
- "//aos/testing:prevent_exit",
- "//aos/testing:test_shm",
- "//aos/time",
- "//aos/util:death_test_log_implementation",
- ],
-)
-
cc_test(
name = "ipc_stress_test",
srcs = [
@@ -125,20 +86,12 @@
],
)
-cc_library(
- name = "scoped_message_ptr",
- deps = [
- ":queue",
- ],
-)
-
cc_binary(
name = "ipc_comparison",
srcs = [
"ipc_comparison.cc",
],
deps = [
- ":queue",
"//aos:condition",
"//aos:event",
"//aos:init",
diff --git a/aos/ipc_lib/ipc_comparison.cc b/aos/ipc_lib/ipc_comparison.cc
index cf5581e..592327e 100644
--- a/aos/ipc_lib/ipc_comparison.cc
+++ b/aos/ipc_lib/ipc_comparison.cc
@@ -24,7 +24,6 @@
#include "aos/condition.h"
#include "aos/event.h"
#include "aos/init.h"
-#include "aos/ipc_lib/queue.h"
#include "aos/logging/implementations.h"
#include "aos/logging/logging.h"
#include "aos/mutex/mutex.h"
@@ -756,55 +755,6 @@
sem_t *ping_sem_, *pong_sem_;
};
-class AOSQueuePingPonger : public PingPongerInterface {
- public:
- AOSQueuePingPonger()
- : ping_queue_(RawQueue::Fetch("ping", sizeof(Data), 0, 1)),
- pong_queue_(RawQueue::Fetch("pong", sizeof(Data), 0, 1)) {}
-
- Data *PingData() override {
- AOS_CHECK_EQ(nullptr, ping_to_send_);
- ping_to_send_ = static_cast<Data *>(ping_queue_->GetMessage());
- return ping_to_send_;
- }
-
- const Data *Ping() override {
- AOS_CHECK_NE(nullptr, ping_to_send_);
- AOS_CHECK(ping_queue_->WriteMessage(ping_to_send_, RawQueue::kBlock));
- ping_to_send_ = nullptr;
- pong_queue_->FreeMessage(pong_received_);
- pong_received_ =
- static_cast<const Data *>(pong_queue_->ReadMessage(RawQueue::kBlock));
- return pong_received_;
- }
-
- const Data *Wait() override {
- ping_queue_->FreeMessage(ping_received_);
- ping_received_ =
- static_cast<const Data *>(ping_queue_->ReadMessage(RawQueue::kBlock));
- return ping_received_;
- }
-
- Data *PongData() override {
- AOS_CHECK_EQ(nullptr, pong_to_send_);
- pong_to_send_ = static_cast<Data *>(pong_queue_->GetMessage());
- return pong_to_send_;
- }
-
- void Pong() override {
- AOS_CHECK_NE(nullptr, pong_to_send_);
- AOS_CHECK(pong_queue_->WriteMessage(pong_to_send_, RawQueue::kBlock));
- pong_to_send_ = nullptr;
- }
-
- private:
- RawQueue *const ping_queue_;
- RawQueue *const pong_queue_;
-
- Data *ping_to_send_ = nullptr, *pong_to_send_ = nullptr;
- const Data *ping_received_ = nullptr, *pong_received_ = nullptr;
-};
-
int Main(int /*argc*/, char **argv) {
::std::unique_ptr<PingPongerInterface> ping_ponger;
if (FLAGS_method == "pipe") {
@@ -823,8 +773,6 @@
ping_ponger.reset(new PthreadMutexPingPonger(true, true));
} else if (FLAGS_method == "pthread_mutex_pi") {
ping_ponger.reset(new PthreadMutexPingPonger(false, true));
- } else if (FLAGS_method == "aos_queue") {
- ping_ponger.reset(new AOSQueuePingPonger());
} else if (FLAGS_method == "eventfd") {
ping_ponger.reset(new EventFDPingPonger());
} else if (FLAGS_method == "sysv_semaphore") {
@@ -938,7 +886,6 @@
"\tpthread_mutex_pshared\n"
"\tpthread_mutex_pshared_pi\n"
"\tpthread_mutex_pi\n"
- "\taos_queue\n"
"\teventfd\n"
"\tsysv_semaphore\n"
"\tsysv_queue\n"
diff --git a/aos/ipc_lib/queue.cc b/aos/ipc_lib/queue.cc
deleted file mode 100644
index 3b6e5a1..0000000
--- a/aos/ipc_lib/queue.cc
+++ /dev/null
@@ -1,574 +0,0 @@
-#if !AOS_DEBUG
-#undef NDEBUG
-#define NDEBUG
-#endif
-
-#include "aos/ipc_lib/queue.h"
-
-#include <assert.h>
-#include <errno.h>
-#include <stdio.h>
-#include <string.h>
-
-#include <algorithm>
-#include <memory>
-
-#include "aos/ipc_lib/core_lib.h"
-#include "aos/type_traits/type_traits.h"
-
-namespace aos {
-namespace {
-
-namespace chrono = ::std::chrono;
-
-static_assert(shm_ok<RawQueue>::value,
- "RawQueue instances go into shared memory");
-
-const bool kReadDebug = false;
-const bool kWriteDebug = false;
-const bool kRefDebug = false;
-const bool kFetchDebug = false;
-const bool kReadIndexDebug = false;
-
-// The number of extra messages the pool associated with each queue will be able
-// to hold (for readers who are slow about freeing them or who leak one when
-// they get killed).
-const int kExtraMessages = 20;
-
-} // namespace
-
-constexpr Options<RawQueue>::Option RawQueue::kPeek;
-constexpr Options<RawQueue>::Option RawQueue::kFromEnd;
-constexpr Options<RawQueue>::Option RawQueue::kNonBlock;
-constexpr Options<RawQueue>::Option RawQueue::kBlock;
-constexpr Options<RawQueue>::Option RawQueue::kOverride;
-
-// This is what gets stuck in before each queue message in memory. It is always
-// allocated aligned to 8 bytes and its size has to maintain that alignment for
-// the message that follows immediately.
-struct RawQueue::MessageHeader {
- MessageHeader *next;
-
- // Gets the message header immediately preceding msg.
- static MessageHeader *Get(const void *msg) {
- return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
- static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
- alignof(MessageHeader)));
- }
-
- int32_t ref_count() const {
- return __atomic_load_n(&ref_count_, __ATOMIC_RELAXED);
- }
- void set_ref_count(int32_t val) {
- __atomic_store_n(&ref_count_, val, __ATOMIC_RELAXED);
- }
-
- void ref_count_sub() { __atomic_sub_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
- void ref_count_add() { __atomic_add_fetch(&ref_count_, 1, __ATOMIC_RELAXED); }
-
- private:
- // This gets accessed with atomic instructions without any
- // locks held by various member functions.
- int32_t ref_count_;
-
-// Padding to make the total size 8 bytes if we have 4-byte pointers or bump
-// it to 16 if a pointer is 8 bytes by itself.
-#if __SIZEOF_POINTER__ == 8
-#ifdef __clang__
- // Clang is smart enough to realize this is unused, but GCC doesn't like the
- // attribute here...
- __attribute__((unused))
-#endif
- char padding[4];
-#elif __SIZEOF_POINTER__ == 4
-// No padding needed to get 8 byte total size.
-#else
-#error Unknown pointer size.
-#endif
-};
-
-inline int RawQueue::index_add1(int index) {
- // Doing it this way instead of with % is more efficient on ARM.
- int r = index + 1;
- assert(index <= data_length_);
- if (r == data_length_) {
- return 0;
- } else {
- return r;
- }
-}
-
-void RawQueue::DecrementMessageReferenceCount(const void *msg) {
- MessageHeader *header = MessageHeader::Get(msg);
- header->ref_count_sub();
- if (kRefDebug) {
- printf("%p ref dec count: %p count=%d\n", this, msg, header->ref_count());
- }
-
- // The only way it should ever be 0 is if we were the last one to decrement,
- // in which case nobody else should have it around to re-increment it or
- // anything in the middle, so this is safe to do not atomically with the
- // decrement.
- if (header->ref_count() == 0) {
- DoFreeMessage(msg);
- } else {
- assert(header->ref_count() > 0);
- }
-}
-
-inline void RawQueue::IncrementMessageReferenceCount(const void *msg) const {
- MessageHeader *const header = MessageHeader::Get(msg);
- header->ref_count_add();
- if (kRefDebug) {
- printf("%p ref inc count: %p\n", this, msg);
- }
-}
-
-inline void RawQueue::DoFreeMessage(const void *msg) {
- MessageHeader *header = MessageHeader::Get(msg);
- if (kRefDebug) {
- printf("%p ref free to %p: %p\n", this, recycle_, msg);
- }
-
- if (__builtin_expect(recycle_ != nullptr, 0)) {
- void *const new_msg = recycle_->GetMessage();
- if (new_msg == nullptr) {
- fprintf(stderr,
- "queue: couldn't get a message"
- " for recycle queue %p\n",
- recycle_);
- } else {
- header->ref_count_add();
- if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
- fprintf(stderr,
- "queue: %p->WriteMessage(%p, kOverride) failed."
- " aborting\n",
- recycle_, msg);
- printf("see stderr\n");
- abort();
- }
- msg = new_msg;
- header = MessageHeader::Get(new_msg);
- }
- }
-
- // This works around GCC bug 60272 (fixed in 4.8.3).
- // new_next should just get replaced with header->next (and the body of the
- // loop should become empty).
- // The bug is that the store to new_next after the compare/exchange is
- // unconditional but it should only be if it fails, which could mean
- // overwriting what somebody else who preempted us right then changed it to.
- // TODO(brians): Get rid of this workaround once we get a new enough GCC.
- MessageHeader *new_next = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
- do {
- header->next = new_next;
- } while (__builtin_expect(
- !__atomic_compare_exchange_n(&free_messages_, &new_next, header, true,
- __ATOMIC_RELEASE, __ATOMIC_RELAXED),
- 0));
-}
-
-void *RawQueue::GetMessage() {
- MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
- do {
- if (__builtin_expect(header == nullptr, 0)) {
- LOG(FATAL) << "overused pool of queue " << this << " (" << name_ << ")";
- }
- } while (__builtin_expect(
- !__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
- __ATOMIC_ACQ_REL, __ATOMIC_RELAXED),
- 0));
- void *msg = reinterpret_cast<uint8_t *>(header + 1);
- // It might be uninitialized, 0 from a previous use, or 1 from previously
- // being recycled.
- header->set_ref_count(1);
- if (kRefDebug) {
- printf("%p ref alloc: %p\n", this, msg);
- }
- return msg;
-}
-
-RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
- : readable_(&data_lock_), writable_(&data_lock_) {
- static_assert(shm_ok<RawQueue::MessageHeader>::value,
- "the whole point is to stick it in shared memory");
- static_assert((sizeof(RawQueue::MessageHeader) % 8) == 0,
- "need to revalidate size/alignent assumptions");
-
- CHECK_GE(queue_length, 1) << ": queue length " << queue_length << " of "
- << name << " needs to be at least 1";
-
- const size_t name_size = strlen(name) + 1;
- char *temp = static_cast<char *>(shm_malloc(name_size));
- memcpy(temp, name, name_size);
- name_ = temp;
- length_ = length;
- hash_ = hash;
- queue_length_ = queue_length;
-
- next_ = NULL;
- recycle_ = NULL;
-
- if (kFetchDebug) {
- printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n", name,
- length, hash, queue_length);
- }
-
- data_length_ = queue_length + 1;
- data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
- data_start_ = 0;
- data_end_ = 0;
- messages_ = 0;
-
- msg_length_ = length + sizeof(MessageHeader);
-
- // Create all of the messages for the free list and stick them on.
- {
- MessageHeader *previous = nullptr;
- for (int i = 0; i < queue_length + kExtraMessages; ++i) {
- MessageHeader *const message =
- static_cast<MessageHeader *>(shm_malloc(msg_length_));
- free_messages_ = message;
- message->next = previous;
- previous = message;
- }
- }
-
- readable_waiting_ = false;
-
- if (kFetchDebug) {
- printf("made queue %s\n", name);
- }
-}
-
-RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
- int queue_length) {
- if (kFetchDebug) {
- printf("fetching queue %s\n", name);
- }
- if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
- LOG(FATAL) << "mutex_lock(" << &global_core->mem_struct->queues.lock
- << ") failed";
- }
- RawQueue *current =
- static_cast<RawQueue *>(global_core->mem_struct->queues.pointer);
- if (current != NULL) {
- while (true) {
- // If we found a matching queue.
- if (strcmp(current->name_, name) == 0 && current->length_ == length &&
- current->hash_ == hash && current->queue_length_ == queue_length) {
- mutex_unlock(&global_core->mem_struct->queues.lock);
- return current;
- } else {
- if (kFetchDebug) {
- printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
- strcmp(current->name_, name), name);
- }
- }
- // If this is the last one.
- if (current->next_ == NULL) break;
- current = current->next_;
- }
- }
-
- RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
- RawQueue(name, length, hash, queue_length);
- if (current == NULL) { // if we don't already have one
- global_core->mem_struct->queues.pointer = r;
- } else {
- current->next_ = r;
- }
-
- mutex_unlock(&global_core->mem_struct->queues.lock);
- return r;
-}
-
-RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
- int queue_length, int recycle_hash,
- int recycle_length, RawQueue **recycle) {
- RawQueue *r = Fetch(name, length, hash, queue_length);
- r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
- if (r == r->recycle_) {
- fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
- printf("see stderr\n");
- r->recycle_ = NULL;
- abort();
- }
- *recycle = r->recycle_;
- return r;
-}
-
-bool RawQueue::DoWriteMessage(void *msg, Options<RawQueue> options) {
- if (kWriteDebug) {
- printf("queue: %p->WriteMessage(%p, %x), len :%zu\n", this, msg, options.printable(), msg_length_);
- }
-
- bool signal_readable;
-
- {
- IPCMutexLocker locker(&data_lock_);
- CHECK(!locker.owner_died());
-
- int new_end;
- while (true) {
- new_end = index_add1(data_end_);
- // If there is room in the queue right now.
- if (new_end != data_start_) break;
- if (options & kNonBlock) {
- if (kWriteDebug) {
- printf("queue: not blocking on %p. returning false\n", this);
- }
- DecrementMessageReferenceCount(msg);
- return false;
- } else if (options & kOverride) {
- if (kWriteDebug) {
- printf("queue: overriding on %p\n", this);
- }
- // Avoid leaking the message that we're going to overwrite.
- DecrementMessageReferenceCount(data_[data_start_]);
- data_start_ = index_add1(data_start_);
- } else { // kBlock
- assert(options & kBlock);
- if (kWriteDebug) {
- printf("queue: going to wait for writable_ of %p\n", this);
- }
- CHECK(!writable_.Wait());
- }
- }
- data_[data_end_] = msg;
- ++messages_;
- data_end_ = new_end;
-
- signal_readable = readable_waiting_;
- readable_waiting_ = false;
- }
-
- if (signal_readable) {
- if (kWriteDebug) {
- printf("queue: broadcasting to readable_ of %p\n", this);
- }
- readable_.Broadcast();
- } else if (kWriteDebug) {
- printf("queue: skipping broadcast to readable_ of %p\n", this);
- }
-
- if (kWriteDebug) {
- printf("queue: write returning true on queue %p\n", this);
- }
- return true;
-}
-
-inline void RawQueue::ReadCommonEnd() {
- if (is_writable()) {
- if (kReadDebug) {
- printf("queue: %ssignalling writable_ of %p\n",
- writable_start_ ? "not " : "", this);
- }
- if (!writable_start_) writable_.Broadcast();
- }
-}
-
-bool RawQueue::ReadCommonStart(Options<RawQueue> options, int *index,
- chrono::nanoseconds timeout) {
- while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
- if (options & kNonBlock) {
- if (kReadDebug) {
- printf("queue: not going to block waiting on %p\n", this);
- }
- return false;
- } else { // kBlock
- assert(options & kBlock);
- if (kReadDebug) {
- printf("queue: going to wait for readable_ of %p\n", this);
- }
- readable_waiting_ = true;
- // Wait for a message to become readable.
- while (true) {
- Condition::WaitResult wait_result = readable_.WaitTimed(timeout);
- if (wait_result == Condition::WaitResult::kOk) {
- break;
- }
- CHECK(wait_result != Condition::WaitResult::kOwnerDied);
- if (wait_result == Condition::WaitResult::kTimeout) {
- return false;
- }
- }
-
- if (kReadDebug) {
- printf("queue: done waiting for readable_ of %p\n", this);
- }
- }
- }
- // We have to check down here because we might have unlocked the mutex while
- // Wait()ing above so this value might have changed.
- writable_start_ = is_writable();
- if (kReadDebug) {
- printf("queue: %p->read(%p) start=%d end=%d writable_start=%d\n", this,
- index, data_start_, data_end_, writable_start_);
- }
- return true;
-}
-
-inline int RawQueue::LastMessageIndex() const {
- int pos = data_end_ - 1;
- if (pos < 0) { // If it wrapped around.
- pos = data_length_ - 1;
- }
- return pos;
-}
-
-const void *RawQueue::DoReadMessage(Options<RawQueue> options) {
- // TODO(brians): Test this function.
- if (kReadDebug) {
- printf("queue: %p->ReadMessage(%x)\n", this, options.printable());
- }
- void *msg = NULL;
-
- IPCMutexLocker locker(&data_lock_);
- CHECK(!locker.owner_died());
-
- if (!ReadCommonStart(options, nullptr, chrono::nanoseconds(0))) {
- if (kReadDebug) {
- printf("queue: %p common returned false\n", this);
- }
- return NULL;
- }
-
- if (options & kFromEnd) {
- if (options & kPeek) {
- if (kReadDebug) {
- printf("queue: %p shortcutting c2: %d\n", this, LastMessageIndex());
- }
- msg = data_[LastMessageIndex()];
- IncrementMessageReferenceCount(msg);
- } else {
- while (true) {
- if (kReadDebug) {
- printf("queue: %p start of c2\n", this);
- }
- // This loop pulls each message out of the buffer.
- const int pos = data_start_;
- data_start_ = index_add1(data_start_);
- // If this is the last one.
- if (data_start_ == data_end_) {
- if (kReadDebug) {
- printf("queue: %p reading from c2: %d\n", this, pos);
- }
- msg = data_[pos];
- break;
- }
- // This message is not going to be in the queue any more.
- DecrementMessageReferenceCount(data_[pos]);
- }
- }
- } else {
- if (kReadDebug) {
- printf("queue: %p reading from d2: %d\n", this, data_start_);
- }
- msg = data_[data_start_];
- if (options & kPeek) {
- IncrementMessageReferenceCount(msg);
- } else {
- data_start_ = index_add1(data_start_);
- }
- }
- ReadCommonEnd();
- if (kReadDebug) {
- printf("queue: %p read returning %p\n", this, msg);
- }
- return msg;
-}
-
-const void *RawQueue::DoReadMessageIndex(Options<RawQueue> options, int *index,
- chrono::nanoseconds timeout) {
- if (kReadDebug) {
- printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n", this,
- options.printable(), index, *index);
- }
- void *msg = NULL;
-
- IPCMutexLocker locker(&data_lock_);
- CHECK(!locker.owner_died());
-
- if (!ReadCommonStart(options, index, timeout)) {
- if (kReadDebug) {
- printf("queue: %p common returned false\n", this);
- }
- return NULL;
- }
-
- // TODO(parker): Handle integer wrap on the index.
-
- if (options & kFromEnd) {
- if (kReadDebug) {
- printf("queue: %p reading from c1: %d\n", this, LastMessageIndex());
- }
- msg = data_[LastMessageIndex()];
-
- // We'd skip this if we had kPeek, but kPeek | kFromEnd isn't valid for
- // reading with an index.
- *index = messages_;
- } else {
- // Where we're going to start reading.
- int my_start;
-
- const int unread_messages = messages_ - *index;
- assert(unread_messages > 0);
- int current_messages = data_end_ - data_start_;
- if (current_messages < 0) current_messages += data_length_;
- if (kReadIndexDebug) {
- printf("queue: %p start=%d end=%d current=%d\n", this, data_start_,
- data_end_, current_messages);
- }
- assert(current_messages > 0);
- // If we're behind the available messages.
- if (unread_messages > current_messages) {
- // Catch index up to the last available message.
- *index = messages_ - current_messages;
- // And that's the one we're going to read.
- my_start = data_start_;
- if (kReadIndexDebug) {
- printf("queue: %p jumping ahead to message %d (have %d) (at %d)\n",
- this, *index, messages_, data_start_);
- }
- } else {
- // Just start reading at the first available message that we haven't yet
- // read.
- my_start = data_end_ - unread_messages;
- if (kReadIndexDebug) {
- printf("queue: %p original read from %d\n", this, my_start);
- }
- if (data_start_ < data_end_) {
- assert(my_start >= 0);
- }
- if (my_start < 0) my_start += data_length_;
- }
-
- if (kReadDebug) {
- printf("queue: %p reading from d1: %d\n", this, my_start);
- }
- // We have to be either after the start or before the end, even if the queue
- // is wrapped around (should be both if it's not).
- assert((my_start >= data_start_) || (my_start < data_end_));
- // More sanity checking.
- assert((my_start >= 0) && (my_start < data_length_));
- msg = data_[my_start];
- if (!(options & kPeek)) ++(*index);
- }
- IncrementMessageReferenceCount(msg);
-
- ReadCommonEnd();
- return msg;
-}
-
-int RawQueue::FreeMessages() const {
- int r = 0;
- MessageHeader *header = free_messages_;
- while (header != nullptr) {
- ++r;
- header = header->next;
- }
- return r;
-}
-
-} // namespace aos
diff --git a/aos/ipc_lib/queue.h b/aos/ipc_lib/queue.h
deleted file mode 100644
index 6e07993..0000000
--- a/aos/ipc_lib/queue.h
+++ /dev/null
@@ -1,240 +0,0 @@
-#ifndef AOS_IPC_LIB_QUEUE_H_
-#define AOS_IPC_LIB_QUEUE_H_
-
-#include "aos/ipc_lib/shared_mem.h"
-#include "aos/mutex/mutex.h"
-#include "aos/condition.h"
-#include "aos/util/options.h"
-#include "glog/logging.h"
-
-// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
-// code to make checking for leaks work better
-// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
-// describes how
-
-// Any pointers returned from these functions can be safely passed to other
-// processes because they are all shared memory pointers.
-// IMPORTANT: Any message pointer must be passed back in some way
-// (FreeMessage and WriteMessage are common ones) or the
-// application will leak shared memory.
-// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
-// might work, but it is not guaranteed to.
-
-namespace aos {
-
-// Queues are the primary way to use shared memory. Basic use consists of
-// calling Queue::Fetch and then reading and/or writing messages.
-// Queues (as the name suggests) are a FIFO stack of messages. Each combination
-// of name and type signature will result in a different queue, which means
-// that if you only recompile some code that uses differently sized messages,
-// it will simply use a different queue than the old code.
-class RawQueue {
- public:
- // Retrieves (and creates if necessary) a queue. Each combination of name and
- // signature refers to a completely independent queue.
- // length is how large each message will be
- // hash can differentiate multiple otherwise identical queues
- // queue_length is how many messages the queue will be able to hold
- // Will never return NULL.
- static RawQueue *Fetch(const char *name, size_t length, int hash,
- int queue_length);
- // Same as above, except sets up the returned queue so that it will put
- // messages on *recycle when they are freed (after they have been released by
- // all other readers/writers and are not in the queue).
- // recycle_queue_length determines how many freed messages will be kept.
- // Other code can retrieve the 2 queues separately (the recycle queue will
- // have the same length and hash as the main one). However, any frees made
- // using a queue with only (name,length,hash,queue_length) before the
- // recycle queue has been associated with it will not go on to the recycle
- // queue.
- // NOTE: calling this function with the same (name,length,hash,queue_length)
- // but multiple recycle_queue_lengths will result in each freed message being
- // put onto an undefined one of the recycle queues.
- // Will never return NULL.
- static RawQueue *Fetch(const char *name, size_t length, int hash,
- int queue_length, int recycle_hash,
- int recycle_queue_length, RawQueue **recycle);
-
- // Doesn't update the currently read index (the read messages in the queue or
- // the index). This means the returned message (and any others skipped with
- // kFromEnd) will be left in the queue.
- // For reading only.
- // Not valid for ReadMessageIndex combined with kFromEnd.
- static constexpr Options<RawQueue>::Option kPeek{0x0001};
- // Reads the last message in the queue instead of just the next one.
- // NOTE: This removes all of the messages until the last one from the queue
- // (which means that nobody else will read them).
- // For reading only.
- // Not valid for ReadMessageIndex combined with kPeek.
- static constexpr Options<RawQueue>::Option kFromEnd{0x0002};
- // Causes reads to return NULL and writes to fail instead of waiting.
- // For reading and writing.
- static constexpr Options<RawQueue>::Option kNonBlock{0x0004};
- // Causes things to block.
- // For reading and writing.
- static constexpr Options<RawQueue>::Option kBlock{0x0008};
- // Causes writes to overwrite the oldest message in the queue instead of
- // blocking.
- // For writing only.
- static constexpr Options<RawQueue>::Option kOverride{0x0010};
-
- RawQueue(const RawQueue &) = default;
- RawQueue &operator=(const RawQueue &) = default;
-
- // Writes a message into the queue.
- // This function takes ownership of msg.
- // NOTE: msg must point to a valid message from this queue
- // Returns true on success. A return value of false means msg has already been
- // freed.
- bool WriteMessage(void *msg, Options<RawQueue> options) {
- static constexpr Options<RawQueue> kWriteFailureOptions =
- kNonBlock | kBlock | kOverride;
- if (!options.NoOthersSet(kWriteFailureOptions)) {
- LOG(FATAL) << "illegal write options in " << std::hex
- << options.printable();
- }
- if (!options.ExactlyOneSet(kWriteFailureOptions)) {
- LOG(FATAL) << "invalid write options " << std::hex << options.printable();
- }
- return DoWriteMessage(msg, options);
- }
-
- // Reads a message out of the queue.
- // The return value will have at least the length of this queue's worth of
- // valid data where it's pointing to.
- // The return value is const because other people might be viewing the same
- // messsage. Do not cast the const away!
- // IMPORTANT: The return value (if not NULL) must eventually be passed to
- // FreeMessage.
- const void *ReadMessage(Options<RawQueue> options) {
- CheckReadOptions(options);
- return DoReadMessage(options);
- }
- // The same as ReadMessage, except it will never return the
- // same message twice (when used with the same index argument). However,
- // may not return some messages that pass through the queue.
- // *index should start as 0. index does not have to be in shared memory, but
- // it can be.
- // Calling with both kPeek and kFromEnd in options isn't valid because that
- // would mean ignoring index, which would make this function the same as
- // ReadMessage (which should be used instead).
- const void *ReadMessageIndex(
- Options<RawQueue> options, int *index,
- ::std::chrono::nanoseconds timeout = ::std::chrono::nanoseconds(0)) {
- CheckReadOptions(options);
- static constexpr Options<RawQueue> kFromEndAndPeek = kFromEnd | kPeek;
- if (options.AllSet(kFromEndAndPeek)) {
- LOG(FATAL) << "ReadMessageIndex(kFromEnd | kPeek) is not allowed";
- }
- return DoReadMessageIndex(options, index, timeout);
- }
-
- // Retrieves ("allocates") a message that can then be written to the queue.
- // NOTE: the return value will be completely uninitialized
- // The return value will have at least the length of this queue's worth of
- // valid memory where it's pointing to.
- // Returns NULL for error.
- // IMPORTANT: The return value (if not NULL) must eventually be passed to
- // FreeMessage or WriteMessage.
- void *GetMessage();
-
- // It is ok to call this method with a NULL msg.
- void FreeMessage(const void *msg) {
- if (msg != NULL) DecrementMessageReferenceCount(msg);
- }
-
- // UNSAFE! Returns the number of free messages we have. Only safe to use when
- // only 1 task is using this object (ie in tests).
- int FreeMessages() const;
-
- // Returns the name of the queue.
- const char *name() const { return name_; }
-
- private:
- struct MessageHeader;
-
- // The public wrappers around these are inlined and do argument checking.
- bool DoWriteMessage(void *msg, Options<RawQueue> options);
- const void *DoReadMessage(Options<RawQueue> options);
- const void *DoReadMessageIndex(Options<RawQueue> options, int *index,
- ::std::chrono::nanoseconds timeout);
- void CheckReadOptions(Options<RawQueue> options) {
- static constexpr Options<RawQueue> kValidOptions =
- kPeek | kFromEnd | kNonBlock | kBlock;
- if (!options.NoOthersSet(kValidOptions)) {
- LOG(FATAL) << "illegal read options in " << std::hex
- << options.printable();
- }
- static constexpr Options<RawQueue> kBlockChoices = kNonBlock | kBlock;
- if (!options.ExactlyOneSet(kBlockChoices)) {
- LOG(FATAL) << "invalid read options " << std::hex << options.printable();
- }
- }
-
- // Adds 1 to the given index and handles wrapping correctly.
- int index_add1(int index);
-
- bool is_readable() { return data_end_ != data_start_; }
- bool is_writable() { return index_add1(data_end_) != data_start_; }
-
- // These next 4 allow finding the right one.
- const char *name_;
- size_t length_;
- int hash_;
- int queue_length_;
- // The next one in the linked list of queues.
- RawQueue *next_;
-
- RawQueue *recycle_;
-
- Mutex data_lock_; // protects operations on data_ etc
- // Always gets broadcasted to because different readers might have different
- // ideas of what "readable" means (ie ones using separated indices).
- Condition readable_;
- Condition writable_;
- int data_length_; // max length into data + 1
- int data_start_; // is an index into data
- int data_end_; // is an index into data
- int messages_; // that have passed through
- void **data_; // array of messages (with headers)
-
- size_t msg_length_; // sizeof(each message) including the header
- // A pointer to the first in the linked list of free messages.
- MessageHeader *free_messages_;
-
- // Keeps track of if the queue was writable before a read so we can Signal() a
- // reader if we transition it.
- bool writable_start_;
-
- // True iff somebody is currently Wait()ing on readable_.
- // Set to true by each reader before calling Wait() and set back to false
- // before the Broadcast().
- bool readable_waiting_;
-
- // Actually frees the given message.
- void DoFreeMessage(const void *msg);
- // Calls DoFreeMessage if appropriate.
- void DecrementMessageReferenceCount(const void *msg);
- // Only does the actual incrementing of the reference count.
- void IncrementMessageReferenceCount(const void *msg) const;
-
- // Must be called with data_lock_ locked.
- // *read_data will be initialized.
- // Returns with a readable message in data_ or false.
- bool ReadCommonStart(Options<RawQueue> options, int *index,
- ::std::chrono::nanoseconds timeout);
- // Deals with setting/unsetting readable_ and writable_.
- // Must be called after data_lock_ has been unlocked.
- // read_data should be the same thing that was passed in to ReadCommonStart.
- void ReadCommonEnd();
- // Returns the index of the last message.
- // Useful for reading with kPeek.
- int LastMessageIndex() const;
-
- // Gets called by Fetch when necessary (with placement new).
- RawQueue(const char *name, size_t length, int hash, int queue_length);
-};
-
-} // namespace aos
-
-#endif // AOS_IPC_LIB_QUEUE_H_
diff --git a/aos/ipc_lib/raw_queue_test.cc b/aos/ipc_lib/raw_queue_test.cc
deleted file mode 100644
index 3684029..0000000
--- a/aos/ipc_lib/raw_queue_test.cc
+++ /dev/null
@@ -1,1047 +0,0 @@
-#include "aos/ipc_lib/queue.h"
-
-#include <unistd.h>
-#include <sys/mman.h>
-#include <inttypes.h>
-
-#include <chrono>
-#include <ostream>
-#include <memory>
-#include <map>
-#include <thread>
-
-#include "gtest/gtest.h"
-
-#include "aos/ipc_lib/core_lib.h"
-#include "aos/type_traits/type_traits.h"
-#include "aos/testing/test_shm.h"
-#include "aos/time/time.h"
-#include "aos/logging/logging.h"
-#include "aos/die.h"
-#include "aos/util/options.h"
-#include "aos/util/death_test_log_implementation.h"
-#include "aos/testing/prevent_exit.h"
-
-using ::testing::AssertionResult;
-using ::testing::AssertionSuccess;
-using ::testing::AssertionFailure;
-
-namespace aos {
-namespace testing {
-
-namespace chrono = ::std::chrono;
-namespace this_thread = ::std::this_thread;
-
-// The same constant from queue.cc. This will have to be updated if that one is.
-const int kExtraMessages = 20;
-
-class RawQueueTest : public ::testing::Test {
- protected:
- static const size_t kFailureSize = 400;
- static char *fatal_failure;
- private:
- enum class ResultType : uint8_t {
- NotCalled,
- Called,
- Returned,
- };
- const std::string ResultTypeString(volatile const ResultType &result) {
- switch (result) {
- case ResultType::Returned:
- return "Returned";
- case ResultType::Called:
- return "Called";
- case ResultType::NotCalled:
- return "NotCalled";
- default:
- return std::string("unknown(") +
- ::std::to_string(static_cast<uint8_t>(result)) + ")";
- }
- }
- static_assert(aos::shm_ok<ResultType>::value,
- "this will get put in shared memory");
- template<typename T>
- struct FunctionToCall {
- FunctionToCall() : result(ResultType::NotCalled), started() {
- }
-
- volatile ResultType result;
- bool expected;
- void (*function)(T*, char*);
- T *arg;
- volatile char failure[kFailureSize];
- aos_futex started;
- };
- template<typename T>
- static void Hangs_(FunctionToCall<T> *const to_call) {
- this_thread::sleep_for(chrono::milliseconds(10));
- ASSERT_EQ(1, futex_set(&to_call->started));
- to_call->result = ResultType::Called;
- to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
- to_call->result = ResultType::Returned;
- }
-
- // How long until a function is considered to have hung.
- static constexpr chrono::nanoseconds kHangTime = chrono::milliseconds(90);
- // How long to sleep after forking (for debugging).
- static constexpr chrono::nanoseconds kForkSleep = chrono::milliseconds(0);
-
- // Represents a process that has been forked off. The destructor kills the
- // process and wait(2)s for it.
- class ForkedProcess {
- public:
- ForkedProcess(pid_t pid, aos_futex *done)
- : pid_(pid), done_(done), exiting_(false) {};
- ~ForkedProcess() {
- if (!exiting_) {
- if (kill(pid_, SIGTERM) == -1) {
- if (errno == ESRCH) {
- printf("process %jd was already dead\n",
- static_cast<intmax_t>(pid_));
- } else {
- AOS_PLOG(FATAL, "kill(SIGKILL, %jd) failed",
- static_cast<intmax_t>(pid_));
- }
- }
- }
- const pid_t ret = wait(NULL);
- if (ret == -1) {
- AOS_LOG(WARNING,
- "wait(NULL) failed."
- " child %jd might still be alive\n",
- static_cast<intmax_t>(pid_));
- } else if (ret == 0) {
- AOS_LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
- static_cast<intmax_t>(pid_));
- } else if (ret != pid_) {
- AOS_LOG(WARNING,
- "child %d is now confirmed dead"
- ", but child %jd might still be alive\n",
- ret, static_cast<intmax_t>(pid_));
- }
- }
-
- enum class JoinResult {
- Finished, Hung, Error
- };
- JoinResult Join(chrono::nanoseconds timeout = kHangTime) {
- struct timespec done_timeout;
- {
- auto full_timeout = kForkSleep + timeout;
- ::std::chrono::seconds sec =
- ::std::chrono::duration_cast<::std::chrono::seconds>(full_timeout);
- ::std::chrono::nanoseconds nsec =
- ::std::chrono::duration_cast<::std::chrono::nanoseconds>(
- full_timeout - sec);
- done_timeout.tv_sec = sec.count();
- done_timeout.tv_nsec = nsec.count();
- }
-
- switch (futex_wait_timeout(done_, &done_timeout)) {
- case 2:
- return JoinResult::Hung;
- case 0:
- exiting_ = true;
- return JoinResult::Finished;
- default:
- return JoinResult::Error;
- }
- }
-
- private:
- const pid_t pid_;
- aos_futex *const done_;
- // True iff we know that the process is already exiting.
- bool exiting_;
- } __attribute__((unused));
-
- // State for HangsFork and HangsCheck.
- typedef uint8_t ChildID;
- static void ReapExitHandler() {
- for (auto it = children_.begin(); it != children_.end(); ++it) {
- delete it->second;
- }
- }
- static std::map<ChildID, ForkedProcess *> children_;
- std::map<ChildID, FunctionToCall<void> *> to_calls_;
-
- void SetUp() override {
- ::testing::Test::SetUp();
-
- SetDieTestMode(true);
-
- fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
- static bool registered = false;
- if (!registered) {
- atexit(ReapExitHandler);
- registered = true;
- }
- }
-
- protected:
- // function gets called with arg in a forked process.
- // Leaks shared memory.
- template<typename T> __attribute__((warn_unused_result))
- std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
- aos_futex *done = static_cast<aos_futex *>(shm_malloc_aligned(
- sizeof(*done), alignof(aos_futex)));
- *done = 0;
- const pid_t pid = fork();
- switch (pid) {
- case 0: // child
- if (kForkSleep != chrono::milliseconds(0)) {
- AOS_LOG(INFO, "pid %jd sleeping for %" PRId64 "ns\n",
- static_cast<intmax_t>(getpid()),
- static_cast<int64_t>(kForkSleep.count()));
- this_thread::sleep_for(kForkSleep);
- }
- ::aos::testing::PreventExit();
- function(arg);
- AOS_CHECK_NE(-1, futex_set(done));
- exit(EXIT_SUCCESS);
- case -1: // parent failure
- AOS_PLOG(ERROR, "fork() failed");
- return std::unique_ptr<ForkedProcess>();
- default: // parent
- return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, done));
- }
- }
-
- // Checks whether or not the given function hangs.
- // expected is whether to return success or failure if the function hangs
- // NOTE: There are other reasons for it to return a failure than the function
- // doing the wrong thing.
- // Leaks shared memory.
- template<typename T>
- AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
- AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
- if (!fork_result) {
- return fork_result;
- }
- return HangsCheck(0);
- }
- // Starts the first part of Hangs.
- // Use HangsCheck to get the result.
- // Returns whether the fork succeeded or not, NOT whether or not the hang
- // check succeeded.
- template<typename T>
- AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
- bool expected, ChildID id) {
- static_assert(aos::shm_ok<FunctionToCall<T>>::value,
- "this is going into shared memory");
- FunctionToCall<T> *const to_call =
- static_cast<FunctionToCall<T> *>(
- shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
- new (to_call) FunctionToCall<T>();
- to_call->function = function;
- to_call->arg = arg;
- to_call->expected = expected;
- to_call->failure[0] = '\0';
- static_cast<char *>(fatal_failure)[0] = '\0';
- children_[id] = ForkExecute(Hangs_, to_call).release();
- if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
- AOS_CHECK_EQ(0, futex_wait(&to_call->started));
- to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
- return AssertionSuccess();
- }
- // Checks whether or not a function hung like it was supposed to.
- // Use HangsFork first.
- // NOTE: calls to HangsFork and HangsCheck with the same id argument will
- // correspond, but they do not nest. Also, id 0 is used by Hangs.
- // Return value is the same as Hangs.
- AssertionResult HangsCheck(ChildID id) {
- std::unique_ptr<ForkedProcess> child(children_[id]);
- children_.erase(id);
- const ForkedProcess::JoinResult result = child->Join();
- if (to_calls_[id]->failure[0] != '\0') {
- return AssertionFailure() << "function says: "
- << const_cast<char *>(to_calls_[id]->failure);
- }
- if (result == ForkedProcess::JoinResult::Finished) {
- return !to_calls_[id]->expected ? AssertionSuccess() : (AssertionFailure()
- << "something happened and the the test only got to "
- << ResultTypeString(to_calls_[id]->result));
- } else {
- if (to_calls_[id]->result == ResultType::Called) {
- return to_calls_[id]->expected ? AssertionSuccess() :
- AssertionFailure();
- } else if (result == ForkedProcess::JoinResult::Error) {
- return AssertionFailure() << "error joining child";
- } else {
- if (to_calls_[id]->result == ResultType::NotCalled) {
- return AssertionFailure() << "stuff took too long getting started";
- }
- return AssertionFailure() << "something weird happened";
- }
- }
- }
-#define EXPECT_HANGS(function, arg) \
- EXPECT_HANGS_COND(function, arg, true, EXPECT_TRUE)
-#define EXPECT_RETURNS(function, arg) \
- EXPECT_HANGS_COND(function, arg, false, EXPECT_TRUE)
-#define EXPECT_RETURNS_FAILS(function, arg) \
- EXPECT_HANGS_COND(function, arg, false, EXPECT_FALSE)
-#define EXPECT_HANGS_COND(function, arg, hangs, cond) do { \
- cond(Hangs(function, arg, hangs)); \
- if (fatal_failure[0] != '\0') { \
- FAIL() << fatal_failure; \
- } \
-} while (false)
-
- struct TestMessage {
- // Some contents because we don't really want to test empty messages.
- int16_t data;
- };
- struct MessageArgs {
- RawQueue *const queue;
- Options<RawQueue> flags;
- int16_t data; // -1 means NULL expected
- };
- static void WriteTestMessage(MessageArgs *args, char *failure) {
- TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
- if (msg == NULL) {
- snprintf(fatal_failure, kFailureSize,
- "couldn't get_msg from %p", args->queue);
- return;
- }
- msg->data = args->data;
- if (!args->queue->WriteMessage(msg, args->flags)) {
- snprintf(failure, kFailureSize, "%p->WriteMessage(%p, %x) failed",
- args->queue, msg, args->flags.printable());
- }
- }
- static void ReadTestMessage(MessageArgs *args, char *failure) {
- const TestMessage *msg = static_cast<const TestMessage *>(
- args->queue->ReadMessage(args->flags));
- if (msg == NULL) {
- if (args->data != -1) {
- snprintf(failure, kFailureSize,
- "expected data of %" PRId16 " but got NULL message",
- args->data);
- }
- } else {
- if (args->data != msg->data) {
- snprintf(failure, kFailureSize,
- "expected data of %" PRId16 " but got %" PRId16 " instead",
- args->data, msg->data);
- }
- args->queue->FreeMessage(msg);
- }
- }
-
- void PushMessage(RawQueue *queue, uint16_t data) {
- TestMessage *message = static_cast<TestMessage *>(queue->GetMessage());
- message->data = data;
- ASSERT_TRUE(queue->WriteMessage(message, RawQueue::kOverride));
- }
-
- private:
- ::aos::testing::TestSharedMemory my_shm_;
-};
-
-char *RawQueueTest::fatal_failure;
-std::map<RawQueueTest::ChildID, RawQueueTest::ForkedProcess *>
- RawQueueTest::children_;
-constexpr chrono::nanoseconds RawQueueTest::kHangTime;
-constexpr chrono::nanoseconds RawQueueTest::kForkSleep;
-
-typedef RawQueueTest RawQueueDeathTest;
-
-TEST_F(RawQueueTest, Reading) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
- MessageArgs args{queue, RawQueue::kBlock, -1};
-
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kBlock;
- EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = RawQueue::kPeek | RawQueue::kBlock;
- EXPECT_HANGS(ReadTestMessage, &args);
- args.data = 254;
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kPeek | RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kPeek | RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kBlock;
- args.data = -1;
- EXPECT_HANGS(ReadTestMessage, &args);
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kBlock;
- args.data = 971;
- EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
-}
-TEST_F(RawQueueTest, Writing) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
- MessageArgs args{queue, RawQueue::kBlock, 973};
-
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kBlock;
- EXPECT_HANGS(WriteTestMessage, &args);
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
- args.flags = RawQueue::kPeek | RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.data = 971;
- args.flags = RawQueue::kOverride;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kOverride;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kNonBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- args.flags = RawQueue::kOverride;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
-}
-
-TEST_F(RawQueueTest, MultiRead) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
- MessageArgs args{queue, RawQueue::kBlock, 1323};
-
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- args.flags = RawQueue::kBlock;
- ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
- ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
- AssertionResult one = HangsCheck(1);
- AssertionResult two = HangsCheck(2);
- EXPECT_TRUE(one != two) << "'" <<
- one.failure_message() << "' vs '" << two.failure_message() << "'";
- // TODO(brians) finish this
-}
-
-// There used to be a bug where reading first without an index and then with an
-// index would crash. This test makes sure that's fixed.
-TEST_F(RawQueueTest, ReadIndexAndNot) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
-
- // Write a message, read it (with ReadMessage), and then write another
- // message (before freeing the read one so the queue allocates a distinct
- // message to use for it).
- TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
- ASSERT_NE(nullptr, msg);
- ASSERT_TRUE(queue->WriteMessage(msg, RawQueue::kBlock));
- const void *read_msg = queue->ReadMessage(RawQueue::kBlock);
- EXPECT_NE(nullptr, read_msg);
- msg = static_cast<TestMessage *>(queue->GetMessage());
- queue->FreeMessage(read_msg);
- ASSERT_NE(nullptr, msg);
- ASSERT_TRUE(queue->WriteMessage(msg, RawQueue::kBlock));
-
- int index = 0;
- const void *second_read_msg =
- queue->ReadMessageIndex(RawQueue::kBlock, &index);
- EXPECT_NE(nullptr, second_read_msg);
- EXPECT_NE(read_msg, second_read_msg)
- << "We already took that message out of the queue.";
-}
-
-TEST_F(RawQueueTest, Recycle) {
- // TODO(brians) basic test of recycle queue
- // include all of the ways a message can get into the recycle queue
- RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
- 1, 2, 2, 2, &recycle_queue);
- ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
- MessageArgs args{queue, RawQueue::kBlock, 973},
- recycle{recycle_queue, RawQueue::kBlock, 973};
-
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(WriteTestMessage, &args);
- EXPECT_HANGS(ReadTestMessage, &recycle);
- args.data = 254;
- EXPECT_RETURNS(WriteTestMessage, &args);
- EXPECT_HANGS(ReadTestMessage, &recycle);
- args.data = 971;
- args.flags = RawQueue::kOverride;
- EXPECT_RETURNS(WriteTestMessage, &args);
- recycle.flags = RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &recycle);
-
- EXPECT_HANGS(ReadTestMessage, &recycle);
-
- TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
- ASSERT_TRUE(msg != NULL);
- msg->data = 341;
- queue->FreeMessage(msg);
- recycle.data = 341;
- EXPECT_RETURNS(ReadTestMessage, &recycle);
-
- EXPECT_HANGS(ReadTestMessage, &recycle);
-
- args.data = 254;
- args.flags = RawQueue::kPeek | RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- recycle.flags = RawQueue::kBlock;
- EXPECT_HANGS(ReadTestMessage, &recycle);
- args.flags = RawQueue::kBlock;
- EXPECT_RETURNS(ReadTestMessage, &args);
- recycle.data = 254;
- EXPECT_RETURNS(ReadTestMessage, &recycle);
-}
-
-// Makes sure that when a message doesn't get written with kNonBlock it does get
-// freed.
-TEST_F(RawQueueTest, NonBlockFailFree) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
-
- void *message1 = queue->GetMessage();
- void *message2 = queue->GetMessage();
- ASSERT_TRUE(queue->WriteMessage(message1, RawQueue::kNonBlock));
- ASSERT_FALSE(queue->WriteMessage(message2, RawQueue::kNonBlock));
- EXPECT_EQ(message2, queue->GetMessage());
-}
-
-// All of the tests from here down are designed to test every branch to
-// make sure it does what it's supposed to. They are generally pretty repetitive
-// and boring, and some of them may duplicate other tests above, but these ones
-// make it a lot easier to figure out what's wrong with bugs not related to race
-// conditions.
-
-TEST_F(RawQueueTest, ReadIndexNotFull) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const TestMessage *message, *peek_message;
-
- EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
- PushMessage(queue, 971);
- EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
-
- int index = 0;
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(971, message->data);
- EXPECT_EQ(1, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- PushMessage(queue, 1768);
- EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(1768, message->data);
- EXPECT_EQ(2, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- PushMessage(queue, 254);
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(254, message->data);
- EXPECT_EQ(3, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- index = 0;
- peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
- RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
- message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
- RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(254, message->data);
- EXPECT_EQ(3, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
-}
-
-TEST_F(RawQueueTest, ReadIndexNotBehind) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const TestMessage *message, *peek_message;
-
- EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
- PushMessage(queue, 971);
- EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
- PushMessage(queue, 1768);
- EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
-
- int index = 0;
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(971, message->data);
- EXPECT_EQ(1, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- index = 0;
- peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
- RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
- message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
- RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(1768, message->data);
- EXPECT_EQ(2, index);
- queue->FreeMessage(message);
-}
-
-TEST_F(RawQueueTest, ReadIndexLittleBehindNotFull) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const TestMessage *message, *peek_message;
-
- PushMessage(queue, 971);
- PushMessage(queue, 1768);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
-
- int index = 0;
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(1768, message->data);
- EXPECT_EQ(2, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- index = 0;
- peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
- RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
- message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
- RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(1768, message->data);
- EXPECT_EQ(2, index);
- queue->FreeMessage(message);
-}
-
-TEST_F(RawQueueTest, ReadIndexMoreBehind) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const TestMessage *message, *peek_message;
-
- PushMessage(queue, 971);
- PushMessage(queue, 1768);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 254);
-
- int index = 0;
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(1768, message->data);
- EXPECT_EQ(2, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(254, message->data);
- EXPECT_EQ(3, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- index = 0;
- peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
- RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
- message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
- RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(254, message->data);
- EXPECT_EQ(3, index);
- queue->FreeMessage(message);
-}
-
-TEST_F(RawQueueTest, ReadIndexMoreBehindNotFull) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const TestMessage *message, *peek_message;
-
- PushMessage(queue, 971);
- PushMessage(queue, 1768);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 254);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
-
- int index = 0;
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(254, message->data);
- EXPECT_EQ(3, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- index = 0;
- peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
- RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
- message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
- RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(254, message->data);
- EXPECT_EQ(3, index);
- queue->FreeMessage(message);
-}
-
-TEST_F(RawQueueTest, ReadIndexLotBehind) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const TestMessage *message, *peek_message;
-
- PushMessage(queue, 971);
- PushMessage(queue, 1768);
- {
- const void *message1, *message2;
- message1 = queue->ReadMessage(RawQueue::kNonBlock);
- ASSERT_NE(nullptr, message1);
- PushMessage(queue, 254);
- message2 = queue->ReadMessage(RawQueue::kNonBlock);
- ASSERT_NE(nullptr, message2);
- PushMessage(queue, 973);
- EXPECT_EQ(4, kExtraMessages + 2 - queue->FreeMessages());
- queue->FreeMessage(message1);
- EXPECT_EQ(3, kExtraMessages + 2 - queue->FreeMessages());
- queue->FreeMessage(message2);
- EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
- }
-
- int index = 0;
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(254, message->data);
- EXPECT_EQ(3, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(973, message->data);
- EXPECT_EQ(4, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- index = 0;
- peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
- RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
- message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
- RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(973, message->data);
- EXPECT_EQ(4, index);
- queue->FreeMessage(message);
-}
-
-TEST_F(RawQueueTest, ReadIndexLotBehindNotFull) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const TestMessage *message, *peek_message;
-
- PushMessage(queue, 971);
- PushMessage(queue, 1768);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 254);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 973);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
-
- int index = 0;
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(973, message->data);
- EXPECT_EQ(4, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- index = 0;
- peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
- RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
- message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
- RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(973, message->data);
- EXPECT_EQ(4, index);
- queue->FreeMessage(message);
-}
-
-TEST_F(RawQueueTest, ReadIndexEvenMoreBehind) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const TestMessage *message, *peek_message;
-
- PushMessage(queue, 971);
- PushMessage(queue, 1768);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 254);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 973);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 1114);
-
- int index = 0;
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(973, message->data);
- EXPECT_EQ(4, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(1114, message->data);
- EXPECT_EQ(5, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- index = 0;
- peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
- RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
- message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
- RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(1114, message->data);
- EXPECT_EQ(5, index);
- queue->FreeMessage(message);
-}
-
-TEST_F(RawQueueTest, ReadIndexEvenMoreBehindNotFull) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const TestMessage *message, *peek_message;
-
- PushMessage(queue, 971);
- PushMessage(queue, 1768);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 254);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 973);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
- PushMessage(queue, 1114);
- ASSERT_NE(nullptr, queue->ReadMessage(RawQueue::kNonBlock));
-
- int index = 0;
-
- peek_message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock | RawQueue::kPeek, &index));
- message = static_cast<const TestMessage *>(
- queue->ReadMessageIndex(RawQueue::kNonBlock, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(1114, message->data);
- EXPECT_EQ(5, index);
- queue->FreeMessage(message);
- queue->FreeMessage(peek_message);
-
- index = 0;
- peek_message = static_cast<const TestMessage *>(queue->ReadMessage(
- RawQueue::kNonBlock | RawQueue::kPeek | RawQueue::kFromEnd));
- message = static_cast<const TestMessage *>(queue->ReadMessageIndex(
- RawQueue::kNonBlock | RawQueue::kFromEnd, &index));
- ASSERT_NE(nullptr, message);
- EXPECT_EQ(message, peek_message);
- EXPECT_EQ(1114, message->data);
- EXPECT_EQ(5, index);
- queue->FreeMessage(message);
-}
-
-TEST_F(RawQueueTest, MessageReferenceCounts) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- const void *message1, *message2;
-
- EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
- message1 = queue->GetMessage();
- EXPECT_NE(nullptr, message1);
- EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
- message2 = queue->GetMessage();
- EXPECT_NE(nullptr, message2);
- EXPECT_EQ(2, kExtraMessages + 2 - queue->FreeMessages());
- queue->FreeMessage(message1);
- EXPECT_EQ(1, kExtraMessages + 2 - queue->FreeMessages());
- queue->FreeMessage(message2);
- EXPECT_EQ(0, kExtraMessages + 2 - queue->FreeMessages());
-}
-
-// Tests that writing with kNonBlock fails and frees the message.
-TEST_F(RawQueueTest, WriteDontBlock) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
- void *message;
-
- PushMessage(queue, 971);
- int free_before = queue->FreeMessages();
- message = queue->GetMessage();
- ASSERT_NE(nullptr, message);
- EXPECT_NE(free_before, queue->FreeMessages());
- EXPECT_FALSE(queue->WriteMessage(message, RawQueue::kNonBlock));
- EXPECT_EQ(free_before, queue->FreeMessages());
-}
-
-// Tests that writing with kOverride pushes the last message out of the queue.
-TEST_F(RawQueueTest, WriteOverride) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 2);
- TestMessage *message1;
-
- PushMessage(queue, 971);
- PushMessage(queue, 1768);
- int free_before = queue->FreeMessages();
- message1 = static_cast<TestMessage *>(queue->GetMessage());
- ASSERT_NE(nullptr, message1);
- EXPECT_NE(free_before, queue->FreeMessages());
- message1->data = 254;
- EXPECT_TRUE(queue->WriteMessage(message1, RawQueue::kOverride));
- EXPECT_EQ(free_before, queue->FreeMessages());
-
- const TestMessage *message2;
- message2 =
- static_cast<const TestMessage *>(queue->ReadMessage(RawQueue::kNonBlock));
- EXPECT_EQ(1768, message2->data);
- queue->FreeMessage(message2);
- EXPECT_EQ(free_before + 1, queue->FreeMessages());
- message2 =
- static_cast<const TestMessage *>(queue->ReadMessage(RawQueue::kNonBlock));
- EXPECT_EQ(254, message2->data);
- queue->FreeMessage(message2);
- EXPECT_EQ(free_before + 2, queue->FreeMessages());
-}
-
-// Makes sure that ThreadSanitizer doesn't catch any issues freeing from
-// multiple threads at once.
-TEST_F(RawQueueTest, MultiThreadedFree) {
- RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
- PushMessage(queue, 971);
- int free_before = queue->FreeMessages();
-
- const void *const message1 =
- queue->ReadMessage(RawQueue::kPeek | RawQueue::kNonBlock);
- const void *const message2 =
- queue->ReadMessage(RawQueue::kPeek | RawQueue::kNonBlock);
- ASSERT_NE(nullptr, message1);
- ASSERT_NE(nullptr, message2);
- EXPECT_EQ(free_before, queue->FreeMessages());
- std::thread t1([message1, queue] {
- queue->FreeMessage(message1);
- });
- std::thread t2([message2, queue] {
- queue->FreeMessage(message2);
- });
- t1.join();
- t2.join();
- EXPECT_EQ(free_before, queue->FreeMessages());
-}
-
-TEST_F(RawQueueDeathTest, OptionsValidation) {
- RawQueue *const queue = RawQueue::Fetch("Queue", 1, 1, 1);
-
- EXPECT_DEATH(
- {
- logging::SetImplementation(new util::DeathTestLogImplementation());
- queue->WriteMessage(nullptr, RawQueue::kPeek);
- },
- ".*illegal write option.*");
- EXPECT_DEATH(
- {
- logging::SetImplementation(new util::DeathTestLogImplementation());
- queue->WriteMessage(nullptr, RawQueue::kFromEnd);
- },
- ".*illegal write option.*");
- EXPECT_DEATH(
- {
- logging::SetImplementation(new util::DeathTestLogImplementation());
- queue->WriteMessage(nullptr, RawQueue::kPeek | RawQueue::kFromEnd);
- },
- ".*illegal write option.*");
- EXPECT_DEATH(
- {
- logging::SetImplementation(new util::DeathTestLogImplementation());
- queue->WriteMessage(nullptr, RawQueue::kNonBlock | RawQueue::kBlock);
- },
- ".*invalid write option.*");
-
- EXPECT_DEATH(
- {
- logging::SetImplementation(new util::DeathTestLogImplementation());
- queue->ReadMessageIndex(
- RawQueue::kBlock | RawQueue::kFromEnd | RawQueue::kPeek, nullptr);
- },
- ".*ReadMessageIndex.*is not allowed.*");
- EXPECT_DEATH(
- {
- logging::SetImplementation(new util::DeathTestLogImplementation());
- queue->ReadMessageIndex(RawQueue::kOverride, nullptr);
- },
- ".*illegal read option.*");
- EXPECT_DEATH(
- {
- logging::SetImplementation(new util::DeathTestLogImplementation());
- queue->ReadMessageIndex(RawQueue::kOverride | RawQueue::kBlock,
- nullptr);
- },
- ".*illegal read option.*");
- EXPECT_DEATH(
- {
- logging::SetImplementation(new util::DeathTestLogImplementation());
- queue->ReadMessage(RawQueue::kNonBlock | RawQueue::kBlock);
- },
- ".*invalid read option.*");
-}
-
-} // namespace testing
-} // namespace aos
diff --git a/aos/ipc_lib/unique_message_ptr.h b/aos/ipc_lib/unique_message_ptr.h
deleted file mode 100644
index e30a4c0..0000000
--- a/aos/ipc_lib/unique_message_ptr.h
+++ /dev/null
@@ -1,39 +0,0 @@
-#include <memory>
-
-#include "aos/atom_code/ipc_lib/queue.h"
-
-namespace aos {
-namespace internal {
-
-template<typename T>
-class queue_free {
- public:
- queue_free(RawQueue *queue) : queue_(queue) {}
-
- void operator()(const T *message) {
- queue_->FreeMessage(static_cast<const void *>(message));
- }
-
- private:
- RawQueue *const queue_;
-};
-
-} // namespace internal
-
-template<typename T>
-class unique_message_ptr : public ::std::unique_ptr<T, ::aos::internal::queue_free<T>> {
- public:
- unique_message_ptr(RawQueue *queue, T *message = NULL)
- : ::std::unique_ptr<T, ::aos::internal::queue_free<T>>(message, ::aos::internal::queue_free<T>(queue)) {}
-
- // Perfectly forward this so that the move functionality of ::std::unique_ptr
- // works.
- template <typename... Args>
- unique_message_ptr<T> &operator=(Args &&... args) {
- ::std::unique_ptr<T, ::aos::internal::queue_free<T>>::operator=(
- ::std::forward<Args>(args)...);
- return *this;
- }
-};
-
-} // namespace aos
diff --git a/aos/logging/BUILD b/aos/logging/BUILD
index 0ba9931..a499d68 100644
--- a/aos/logging/BUILD
+++ b/aos/logging/BUILD
@@ -82,7 +82,6 @@
":sizes",
"//aos:die",
"//aos:macros",
- "//aos/ipc_lib:queue",
"//aos/mutex",
"//aos/time",
"//aos/type_traits",
diff --git a/aos/logging/implementations.cc b/aos/logging/implementations.cc
index 04831a2..094ecbb 100644
--- a/aos/logging/implementations.cc
+++ b/aos/logging/implementations.cc
@@ -8,7 +8,6 @@
#include "absl/base/call_once.h"
#include "aos/die.h"
-#include "aos/ipc_lib/queue.h"
#include "aos/logging/printf_formats.h"
#include "aos/time/time.h"
@@ -161,78 +160,6 @@
namespace {
-RawQueue *queue = NULL;
-
-int dropped_messages = 0;
-monotonic_clock::time_point dropped_start, backoff_start;
-// Wait this long after dropping a message before even trying to write any more.
-constexpr chrono::milliseconds kDropBackoff = chrono::milliseconds(100);
-
-LogMessage *GetMessageOrDie() {
- LogMessage *message = static_cast<LogMessage *>(queue->GetMessage());
- if (message == NULL) {
- AOS_LOG(FATAL, "%p->GetMessage() failed\n", queue);
- } else {
- return message;
- }
-}
-
-void Write(LogMessage *msg) {
- if (__builtin_expect(dropped_messages > 0, false)) {
- monotonic_clock::time_point message_time(
- chrono::seconds(msg->seconds) + chrono::nanoseconds(msg->nseconds));
- if (message_time - backoff_start < kDropBackoff) {
- ++dropped_messages;
- queue->FreeMessage(msg);
- return;
- }
-
- LogMessage *dropped_message = GetMessageOrDie();
- chrono::seconds dropped_start_sec = chrono::duration_cast<chrono::seconds>(
- dropped_start.time_since_epoch());
- chrono::nanoseconds dropped_start_nsec =
- chrono::duration_cast<chrono::nanoseconds>(
- dropped_start.time_since_epoch() - dropped_start_sec);
- internal::FillInMessageVarargs(
- ERROR, message_time, dropped_message,
- "%d logs starting at %" PRId32 ".%" PRId32 " dropped\n",
- dropped_messages, static_cast<int32_t>(dropped_start_sec.count()),
- static_cast<int32_t>(dropped_start_nsec.count()));
- if (queue->WriteMessage(dropped_message, RawQueue::kNonBlock)) {
- dropped_messages = 0;
- } else {
- // Don't even bother trying to write this message because it's not likely
- // to work and it would be confusing to have one log in the middle of a
- // string of failures get through.
- ++dropped_messages;
- backoff_start = message_time;
- queue->FreeMessage(msg);
- return;
- }
- }
- if (!queue->WriteMessage(msg, RawQueue::kNonBlock)) {
- if (dropped_messages == 0) {
- monotonic_clock::time_point message_time(
- chrono::seconds(msg->seconds) + chrono::nanoseconds(msg->nseconds));
- dropped_start = backoff_start = message_time;
- }
- ++dropped_messages;
- }
-}
-
-class LinuxQueueLogImplementation : public LogImplementation {
- virtual ::aos::monotonic_clock::time_point monotonic_now() const {
- return ::aos::monotonic_clock::now();
- }
-
- __attribute__((format(GOOD_PRINTF_FORMAT_TYPE, 3, 0))) void DoLog(
- log_level level, const char *format, va_list ap) override {
- LogMessage *message = GetMessageOrDie();
- internal::FillInMessage(level, monotonic_now(), format, ap, message);
- Write(message);
- }
-};
-
class CallbackLogImplementation : public HandleMessageLogImplementation {
public:
CallbackLogImplementation(
@@ -247,21 +174,6 @@
} // namespace
-RawQueue *GetLoggingQueue() {
- return RawQueue::Fetch("LoggingQueue", sizeof(LogMessage), 1323, 10000);
-}
-
-void RegisterQueueImplementation() {
- Init();
-
- queue = GetLoggingQueue();
- if (queue == NULL) {
- Die("logging: couldn't fetch queue\n");
- }
-
- SetImplementation(new LinuxQueueLogImplementation());
-}
-
void RegisterCallbackImplementation(
const ::std::function<void(const LogMessage &)> &callback,
bool update_global) {
diff --git a/aos/logging/implementations.h b/aos/logging/implementations.h
index 6cf8376..0e17165 100644
--- a/aos/logging/implementations.h
+++ b/aos/logging/implementations.h
@@ -23,13 +23,6 @@
#include "aos/time/time.h"
#include "aos/type_traits/type_traits.h"
-namespace aos {
-
-struct MessageType;
-class RawQueue;
-
-} // namespace aos
-
// This file has various concrete LogImplementations.
namespace aos {
@@ -156,16 +149,6 @@
// lazily created when needed. It is actually the opposite of Load().
void Cleanup();
-// Returns a queue which deals with LogMessage-sized messages.
-// The caller takes ownership.
-RawQueue *GetLoggingQueue();
-
-// Calls SetImplementation to register the standard linux logging implementation
-// which sends the messages through a queue. This implementation relies on
-// another process(es) to read the log messages that it puts into the queue.
-// This function is usually called by aos::Init*.
-void RegisterQueueImplementation();
-
void RegisterCallbackImplementation(
const ::std::function<void(const LogMessage &)> &callback,
bool update_global = true);
diff --git a/aos/testing/BUILD b/aos/testing/BUILD
index 8693f27..052cabe 100644
--- a/aos/testing/BUILD
+++ b/aos/testing/BUILD
@@ -69,7 +69,6 @@
visibility = ["//visibility:public"],
deps = [
":test_logging",
- "//aos/ipc_lib:queue",
"//aos/ipc_lib:shared_mem",
"//aos/logging",
],
diff --git a/aos/testing/test_shm.cc b/aos/testing/test_shm.cc
index b1c3848..0787e5a 100644
--- a/aos/testing/test_shm.cc
+++ b/aos/testing/test_shm.cc
@@ -2,7 +2,6 @@
#include <sys/mman.h>
-#include "aos/ipc_lib/queue.h"
#include "aos/logging/logging.h"
#include "aos/testing/test_logging.h"
diff --git a/frc971/control_loops/drivetrain/BUILD b/frc971/control_loops/drivetrain/BUILD
index ffa2c11..2e357c7 100644
--- a/frc971/control_loops/drivetrain/BUILD
+++ b/frc971/control_loops/drivetrain/BUILD
@@ -223,6 +223,7 @@
":drivetrain_status_fbs",
":spline",
":trajectory",
+ "//aos:condition",
"//aos:init",
"//aos/util:math",
"//frc971/control_loops:control_loops_fbs",