Convert aos over to flatbuffers
Everything builds, and all the tests pass. I suspect that some entries
are missing from the config files, but those will be found pretty
quickly on startup.
There is no logging or live introspection of queue messages.
Change-Id: I496ee01ed68f202c7851bed7e8786cee30df29f5
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 684620a..908cb80 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -1,5 +1,3 @@
-package(default_visibility = ["//visibility:public"])
-
cc_library(
name = "aos_sync",
srcs = [
@@ -11,11 +9,12 @@
linkopts = [
"-lpthread",
],
+ visibility = ["//visibility:public"],
deps = [
"//aos:macros",
- "@com_google_absl//absl/base",
- "//aos/logging",
"//aos/util:compiler_memory_barrier",
+ "@com_github_google_glog//:glog",
+ "@com_google_absl//absl/base",
],
)
@@ -27,6 +26,7 @@
hdrs = [
"core_lib.h",
],
+ visibility = ["//visibility:public"],
deps = [
":aos_sync",
":shared_mem_types",
@@ -36,7 +36,7 @@
cc_library(
name = "shared_mem",
srcs = [
- "shared_mem.c",
+ "shared_mem.cc",
],
hdrs = [
"shared_mem.h",
@@ -44,11 +44,12 @@
linkopts = [
"-lrt",
],
+ visibility = ["//visibility:public"],
deps = [
":aos_sync",
":core_lib",
":shared_mem_types",
- "//aos/logging",
+ "@com_github_google_glog//:glog",
],
)
@@ -75,11 +76,11 @@
linkopts = [
"-lrt",
],
+ visibility = ["//visibility:public"],
deps = [
":core_lib",
":shared_mem",
"//aos:condition",
- "//aos/logging",
"//aos/mutex",
"//aos/util:options",
],
@@ -157,8 +158,9 @@
hdrs = [
"signalfd.h",
],
+ visibility = ["//visibility:public"],
deps = [
- "//aos/logging",
+ "@com_github_google_glog//:glog",
],
)
@@ -166,6 +168,7 @@
name = "index",
srcs = ["index.cc"],
hdrs = ["index.h"],
+ visibility = ["//visibility:public"],
)
cc_test(
@@ -174,7 +177,7 @@
deps = [
":index",
"//aos/testing:googletest",
- "//aos/testing:test_logging",
+ "@com_github_google_glog//:glog",
],
)
@@ -185,11 +188,11 @@
"lockless_queue_memory.h",
],
hdrs = ["lockless_queue.h"],
+ visibility = ["//visibility:public"],
deps = [
":aos_sync",
":index",
- "//aos:init",
- "//aos/logging",
+ "//aos:realtime",
"//aos/time",
"//aos/util:compiler_memory_barrier",
"@com_github_google_glog//:glog",
@@ -208,7 +211,7 @@
deps = [
":lockless_queue",
"//aos:event",
- "//third_party/googletest:gtest",
+ "//aos/testing:googletest",
],
)
@@ -225,7 +228,6 @@
"//aos/libc:aos_strsignal",
"//aos/testing:googletest",
"//aos/testing:prevent_exit",
- "//aos/testing:test_logging",
],
)
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
index 3bdba21..efb1fb1 100644
--- a/aos/ipc_lib/aos_sync.cc
+++ b/aos/ipc_lib/aos_sync.cc
@@ -26,10 +26,10 @@
#include <algorithm>
#include <type_traits>
-#include "aos/logging/logging.h"
+#include "absl/base/call_once.h"
#include "aos/macros.h"
#include "aos/util/compiler_memory_barrier.h"
-#include "absl/base/call_once.h"
+#include "glog/logging.h"
using ::aos::linux_code::ipc_lib::FutexAccessorObserver;
@@ -354,16 +354,16 @@
return r;
}
-// This gets called by functions before AOS_LOG(FATAL)ing with error messages
+// This gets called by functions before LOG(FATAL)ing with error messages
// that would be incorrect if the error was caused by a process forking without
// initialize_in_new_thread getting called in the fork.
void check_cached_tid(pid_t tid) {
pid_t actual = do_get_tid();
if (tid != actual) {
- AOS_LOG(FATAL,
- "task %jd forked into %jd without letting aos_sync know"
- " so we're not really sure what's going on\n",
- static_cast<intmax_t>(tid), static_cast<intmax_t>(actual));
+ LOG(FATAL) << "task " << static_cast<intmax_t>(tid) << " forked into "
+ << static_cast<intmax_t>(actual)
+ << " without letting aos_sync know so we're not really sure "
+ "what's going on";
}
}
@@ -378,9 +378,9 @@
}
void InstallAtforkHook() {
- if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
- AOS_PLOG(FATAL, "pthread_atfork(NULL, NULL, %p) failed", atfork_child);
- }
+ PCHECK(pthread_atfork(NULL, NULL, &atfork_child) == 0)
+ << ": pthread_atfork(NULL, NULL, "
+ << reinterpret_cast<void *>(&atfork_child) << ") failed";
}
// This gets called to set everything up in a new thread by get_tid().
@@ -472,11 +472,10 @@
robust_head.futex_offset = static_cast<ssize_t>(offsetof(aos_mutex, futex)) -
static_cast<ssize_t>(offsetof(aos_mutex, next));
robust_head.pending_next = 0;
- if (syscall(SYS_set_robust_list, robust_head_next_value(), sizeof(robust_head)) !=
- 0) {
- AOS_PLOG(FATAL, "set_robust_list(%p, %zd) failed",
- reinterpret_cast<void *>(robust_head.next), sizeof(robust_head));
- }
+ PCHECK(syscall(SYS_set_robust_list, robust_head_next_value(),
+ sizeof(robust_head)) == 0)
+ << ": set_robust_list(" << reinterpret_cast<void *>(robust_head.next)
+ << ", " << sizeof(robust_head) << ") failed";
if (kRobustListDebug) {
printf("%" PRId32 ": init done\n", get_tid());
}
@@ -675,12 +674,12 @@
}
}
my_robust_list::robust_head.pending_next = 0;
- if (ret == -EDEADLK) {
- AOS_LOG(FATAL, "multiple lock of %p by %" PRId32 "\n", m, tid);
- }
- AOS_PELOG(FATAL, -ret, "FUTEX_LOCK_PI(%p(=%" PRIu32 "), 1, %p) failed",
- &m->futex, __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST),
- timeout);
+ CHECK_NE(ret, -EDEADLK) << ": multiple lock of " << m << " by " << tid;
+
+ errno = -ret;
+ PLOG(FATAL) << "FUTEX_LOCK_PI(" << &m->futex
+ << "(=" << __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST)
+ << "), 1, " << timeout << ") failed";
} else {
if (kLockDebug) {
printf("%" PRId32 ": %p kernel lock done\n", tid, m);
@@ -746,8 +745,9 @@
continue;
}
my_robust_list::robust_head.pending_next = 0;
- AOS_PELOG(FATAL, -ret, "FUTEX_CMP_REQUEUE_PI(%p, 1, %d, %p, *%p) failed",
- c, number_requeue, &m->futex, c);
+ errno = -ret;
+ PLOG(FATAL) << "FUTEX_CMP_REQUEUE_PI(" << c << ", 1, " << number_requeue
+ << ", " << &m->futex << ", *" << c << ") failed";
} else {
return;
}
@@ -778,11 +778,10 @@
my_robust_list::robust_head.pending_next = 0;
check_cached_tid(tid);
if ((value & FUTEX_TID_MASK) == 0) {
- AOS_LOG(FATAL, "multiple unlock of aos_mutex %p by %" PRId32 "\n", m,
- tid);
+ LOG(FATAL) << "multiple unlock of aos_mutex " << m << " by " << tid;
} else {
- AOS_LOG(FATAL, "aos_mutex %p is locked by %" PRId32 ", not %" PRId32 "\n",
- m, value & FUTEX_TID_MASK, tid);
+ LOG(FATAL) << "aos_mutex " << m << " is locked by "
+ << (value & FUTEX_TID_MASK) << ", not " << tid;
}
}
@@ -795,7 +794,8 @@
const int ret = sys_futex_unlock_pi(&m->futex);
if (ret != 0) {
my_robust_list::robust_head.pending_next = 0;
- AOS_PELOG(FATAL, -ret, "FUTEX_UNLOCK_PI(%p) failed", &m->futex);
+ errno = -ret;
+ PLOG(FATAL) << "FUTEX_UNLOCK_PI(" << (&m->futex) << ") failed";
}
} else {
// There aren't any waiters, so no need to call into the kernel.
@@ -832,8 +832,9 @@
return 4;
}
my_robust_list::robust_head.pending_next = 0;
- AOS_PELOG(FATAL, -ret, "FUTEX_TRYLOCK_PI(%p, 0, NULL) failed",
- &m->futex);
+ errno = -ret;
+ PLOG(FATAL) << "FUTEX_TRYLOCK_PI(" << (&m->futex)
+ << ", 0, NULL) failed";
}
}
}
@@ -898,9 +899,9 @@
continue;
}
my_robust_list::robust_head.pending_next = 0;
- AOS_PELOG(FATAL, -ret,
- "FUTEX_WAIT_REQUEUE_PI(%p, %" PRIu32 ", %p) failed", c,
- wait_start, &m->futex);
+ errno = -ret;
+ PLOG(FATAL) << "FUTEX_WAIT_REQUEUE_PI(" << c << ", " << wait_start << ", "
+ << (&m->futex) << ") failed";
} else {
// Record that the kernel relocked it for us.
lock_pthread_mutex(m);
diff --git a/aos/ipc_lib/index_test.cc b/aos/ipc_lib/index_test.cc
index 689ed24..2e9a37b 100644
--- a/aos/ipc_lib/index_test.cc
+++ b/aos/ipc_lib/index_test.cc
@@ -1,7 +1,7 @@
#include "aos/ipc_lib/index.h"
-#include "aos/testing/test_logging.h"
#include "gtest/gtest.h"
+#include "glog/logging.h"
namespace aos {
namespace ipc_lib {
@@ -10,7 +10,8 @@
class QueueIndexTest : public ::testing::Test {
protected:
uint32_t GetIndex(const QueueIndex &index) {
- printf("Index, count: %x, %x\n", index.index_, index.count_);
+ LOG(INFO) << "Index, count: " << std::hex << index.index_ << ", "
+ << index.count_;
return index.index();
}
diff --git a/aos/ipc_lib/ipc_comparison.cc b/aos/ipc_lib/ipc_comparison.cc
index 5ea1c8b..1553159 100644
--- a/aos/ipc_lib/ipc_comparison.cc
+++ b/aos/ipc_lib/ipc_comparison.cc
@@ -28,6 +28,7 @@
#include "aos/logging/implementations.h"
#include "aos/logging/logging.h"
#include "aos/mutex/mutex.h"
+#include "aos/realtime.h"
#include "aos/time/time.h"
DEFINE_string(method, "", "Which IPC method to use");
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index cd0e2ae..c2b0254 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -9,9 +9,8 @@
#include <iostream>
#include <sstream>
-#include "aos/init.h"
#include "aos/ipc_lib/lockless_queue_memory.h"
-#include "aos/logging/logging.h"
+#include "aos/realtime.h"
#include "aos/util/compiler_memory_barrier.h"
#include "glog/logging.h"
@@ -20,8 +19,6 @@
namespace {
-constexpr bool kDebug = false;
-
void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
const int result = mutex_grab(&(memory->queue_setup_lock));
CHECK(result == 0 || result == 1);
@@ -59,9 +56,7 @@
const uint32_t tid =
__atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
if (tid & FUTEX_OWNER_DIED) {
- if (kDebug) {
- printf("Found an easy death for sender %zu\n", i);
- }
+ VLOG(3) << "Found an easy death for sender " << i;
const Index to_replace = sender->to_replace.RelaxedLoad();
const Index scratch_index = sender->scratch_index.Load();
@@ -119,9 +114,7 @@
return;
}
- if (kDebug) {
- printf("Starting hard cleanup\n");
- }
+ VLOG(3) << "Starting hard cleanup";
size_t num_accounted_for = 0;
size_t num_missing = 0;
@@ -166,9 +159,8 @@
// Candidate.
CHECK_LE(to_replace.message_index(), accounted_for.size());
if (accounted_for[to_replace.message_index()]) {
- if (kDebug) {
- printf("Sender %zu died, to_replace is already accounted for\n", i);
- }
+ VLOG(3) << "Sender " << i
+ << " died, to_replace is already accounted for";
// If both are accounted for, we are corrupt...
CHECK(!accounted_for[scratch_index.message_index()]);
@@ -185,9 +177,8 @@
--num_missing;
++num_accounted_for;
} else if (accounted_for[scratch_index.message_index()]) {
- if (kDebug) {
- printf("Sender %zu died, scratch_index is already accounted for\n", i);
- }
+ VLOG(3) << "Sender " << i
+ << " died, scratch_index is already accounted for";
// scratch_index is accounted for. That means we did the insert,
// but didn't record it.
CHECK(to_replace.valid());
@@ -204,9 +195,7 @@
--num_missing;
++num_accounted_for;
} else {
- if (kDebug) {
- printf("Sender %zu died, neither is accounted for\n", i);
- }
+ VLOG(3) << "Sender " << i << " died, neither is accounted for";
// Ambiguous. There will be an unambiguous one somewhere that we
// can do first.
}
@@ -406,7 +395,8 @@
// creating a pidfd is likely not RT.
for (size_t i = 0; i < num_watchers; ++i) {
Watcher *w = memory_->GetWatcher(i);
- // Start by reading the tid. This needs to be atomic to force it to come first.
+ // Start by reading the tid. This needs to be atomic to force it to come
+ // first.
watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
watcher_copy_[i].pid = w->pid;
watcher_copy_[i].priority = w->priority;
@@ -521,18 +511,32 @@
return index;
}
+size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
+
+void *LocklessQueue::Sender::Data() {
+ ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
+ Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *message = memory_->GetMessage(scratch_index);
+ message->header.queue_index.Invalidate();
+
+ return &message->data[0];
+}
+
void LocklessQueue::Sender::Send(const char *data, size_t length) {
+ CHECK_LE(length, size());
+ memcpy(Data(), data, length);
+ Send(length);
+}
+
+void LocklessQueue::Sender::Send(size_t length) {
const size_t queue_size = memory_->queue_size();
- CHECK_LE(length, memory_->message_data_size());
+ CHECK_LE(length, size());
::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *message = memory_->GetMessage(scratch_index);
- message->header.queue_index.Invalidate();
-
message->header.length = length;
- memcpy(&message->data[0], data, length);
while (true) {
const QueueIndex actual_next_queue_index =
@@ -564,10 +568,8 @@
memory_->next_queue_index.CompareAndExchangeStrong(
actual_next_queue_index, incremented_queue_index);
- if (kDebug) {
- printf("We were beat. Try again. Was %x, is %x\n", to_replace.get(),
- decremented_queue_index.index());
- }
+ VLOG(3) << "We were beat. Try again. Was " << std::hex
+ << to_replace.get() << ", is " << decremented_queue_index.index();
continue;
}
@@ -581,12 +583,10 @@
->header.queue_index.RelaxedLoad(queue_size);
if (previous_index != decremented_queue_index && previous_index.valid()) {
// Retry.
- if (kDebug) {
- printf(
- "Something fishy happened, queue index doesn't match. Retrying. "
- " Previous index was %x, should be %x\n",
- previous_index.index(), decremented_queue_index.index());
- }
+ VLOG(3) << "Something fishy happened, queue index doesn't match. "
+ "Retrying. Previous index was "
+ << std::hex << previous_index.index() << ", should be "
+ << decremented_queue_index.index();
continue;
}
}
@@ -597,8 +597,7 @@
// Before we are fully done filling out the message, update the Sender state
// with the new index to write. This re-uses the barrier for the
// queue_index store.
- const Index index_to_write(next_queue_index,
- scratch_index.message_index());
+ const Index index_to_write(next_queue_index, scratch_index.message_index());
sender->scratch_index.RelaxedStore(index_to_write);
@@ -616,9 +615,7 @@
->CompareAndExchangeStrong(to_replace, index_to_write)) {
// Aw, didn't succeed. Retry.
sender->to_replace.RelaxedInvalidate();
- if (kDebug) {
- printf("Failed to wrap into queue\n");
- }
+ VLOG(3) << "Failed to wrap into queue";
continue;
}
@@ -661,10 +658,8 @@
if (starting_queue_index != queue_index) {
// If we found a message that is exactly 1 loop old, we just wrapped.
if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
- if (kDebug) {
- printf("Matches: %x, %x\n", starting_queue_index.index(),
- queue_index.DecrementBy(queue_size).index());
- }
+ VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
+ << ", " << queue_index.DecrementBy(queue_size).index();
return ReadResult::NOTHING_NEW;
} else {
// Someone has re-used this message between when we pulled it out of the
@@ -673,9 +668,7 @@
Message *new_m = memory_->GetMessage(queue_index);
if (m != new_m) {
m = new_m;
- if (kDebug) {
- printf("Retrying, m doesn't match\n");
- }
+ VLOG(3) << "Retrying, m doesn't match";
continue;
}
@@ -686,17 +679,14 @@
// Either we got too far behind (signaled by this being a valid
// message), or this is one of the initial messages which are invalid.
if (starting_queue_index.valid()) {
- if (kDebug) {
- printf("Too old. Tried for %x, got %x, behind by %d\n",
- queue_index.index(), starting_queue_index.index(),
- starting_queue_index.index() - queue_index.index());
- }
+ VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
+ << ", got " << starting_queue_index.index() << ", behind by "
+ << std::dec
+ << (starting_queue_index.index() - queue_index.index());
return ReadResult::TOO_OLD;
}
- if (kDebug) {
- printf("Initial\n");
- }
+ VLOG(3) << "Initial";
// There isn't a valid message at this location.
//
@@ -705,28 +695,24 @@
// asking for something crazy, like something before the beginning of
// the queue. Tell them that they are behind.
if (uint32_queue_index < memory_->queue_size()) {
- if (kDebug) {
- printf("Near zero, %x\n", uint32_queue_index);
- }
+ VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
return ReadResult::NOTHING_NEW;
} else {
- if (kDebug) {
- printf("not near zero, %x\n", uint32_queue_index);
- }
+ VLOG(3) << "not near zero, " << std::hex << uint32_queue_index;
return ReadResult::TOO_OLD;
}
}
}
- if (kDebug) {
- printf("Eq: %x, %x\n", starting_queue_index.index(), queue_index.index());
- }
+ VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
+ << queue_index.index();
break;
}
- // Then read the data out.
+ // Then read the data out. Copy it all out to be deterministic and so we can
+ // make length be from either end.
*monotonic_sent_time = m->header.monotonic_sent_time;
*realtime_sent_time = m->header.realtime_sent_time;
- memcpy(data, &m->data[0], m->header.length);
+ memcpy(data, &m->data[0], message_data_size());
*length = m->header.length;
// And finally, confirm that the message *still* points to the queue index we
@@ -735,20 +721,22 @@
// it's lifetime.
const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
if (final_queue_index != queue_index) {
- if (kDebug) {
- printf(
- "Changed out from under us. Reading %x, finished with %x, delta: "
- "%d\n",
- queue_index.index(), final_queue_index.index(),
- final_queue_index.index() - queue_index.index());
- }
- return ReadResult::TOO_OLD;
+ VLOG(3) << "Changed out from under us. Reading " << std::hex
+ << queue_index.index() << ", finished with "
+ << final_queue_index.index() << ", delta: " << std::dec
+ << (final_queue_index.index() - queue_index.index());
+ return ReadResult::OVERWROTE;
}
return ReadResult::GOOD;
}
-uint32_t LocklessQueue::LatestQueueIndex() {
+size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
+size_t LocklessQueue::message_data_size() const {
+ return memory_->message_data_size();
+}
+
+QueueIndex LocklessQueue::LatestQueueIndex() {
const size_t queue_size = memory_->queue_size();
// There is only one interesting case. We need to know if the queue is empty.
@@ -757,7 +745,7 @@
memory_->next_queue_index.Load(queue_size);
if (next_queue_index.valid()) {
const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
- return current_queue_index.index();
+ return current_queue_index;
} else {
return empty_queue_index();
}
@@ -845,7 +833,8 @@
}
::std::cout << " }" << ::std::endl;
- ::std::cout << " Sender senders[" << memory->num_senders() << "] {" << ::std::endl;
+ ::std::cout << " Sender senders[" << memory->num_senders() << "] {"
+ << ::std::endl;
for (size_t i = 0; i < memory->num_senders(); ++i) {
Sender *s = memory->GetSender(i);
::std::cout << " [" << i << "] -> Sender {" << ::std::endl;
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index dafc157..fcc5d79 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -106,7 +106,7 @@
// Prints to stdout the data inside the queue for debugging.
void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
-const static int kWakeupSignal = SIGRTMIN + 2;
+const static unsigned int kWakeupSignal = SIGRTMIN + 2;
// Class to manage sending and receiving data in the lockless queue. This is
// separate from the actual memory backing the queue so that memory can be
@@ -122,6 +122,8 @@
// Returns the number of messages in the queue.
size_t QueueSize() const;
+ size_t message_data_size() const;
+
// Registers this thread to receive the kWakeupSignal signal when Wakeup is
// called. Returns false if there was an error in registration.
bool RegisterWakeup(int priority);
@@ -137,8 +139,9 @@
// If you ask for a queue index 2 past the newest, you will still get
// NOTHING_NEW until that gets overwritten with new data. If you ask for an
// element newer than QueueSize() from the current message, we consider it
- // behind by a large amount and return TOO_OLD.
- enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW };
+ // behind by a large amount and return TOO_OLD. If the message is modified
+ // out from underneath us as we read it, return OVERWROTE.
+ enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
ReadResult Read(uint32_t queue_index,
::aos::monotonic_clock::time_point *monotonic_sent_time,
::aos::realtime_clock::time_point *realtime_sent_time,
@@ -147,8 +150,12 @@
// Returns the index to the latest queue message. Returns empty_queue_index()
// if there are no messages in the queue. Do note that this index wraps if
// more than 2^32 messages are sent.
- uint32_t LatestQueueIndex();
- static constexpr uint32_t empty_queue_index() { return 0xffffffff; }
+ QueueIndex LatestQueueIndex();
+ static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
+
+ // Returns the size of the queue. This is mostly useful for manipulating
+ // QueueIndex.
+ size_t queue_size() const;
// TODO(austin): Return the oldest queue index. This lets us catch up nicely
// if we got behind.
@@ -181,6 +188,14 @@
~Sender();
+ // Sends a message without copying the data.
+ // Copy at most size() bytes of data into the memory pointed to by Data(),
+ // and then call Send().
+ // Note: calls to Data() are expensive enough that you should cache it.
+ size_t size();
+ void *Data();
+ void Send(size_t length);
+
// Sends up to length data. Does not wakeup the target.
void Send(const char *data, size_t length);
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 7a68273..4f1d7e4 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -11,10 +11,10 @@
#include <memory>
#include <thread>
-#include "aos/init.h"
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/lockless_queue_memory.h"
#include "aos/libc/aos_strsignal.h"
+#include "aos/realtime.h"
#include "aos/testing/prevent_exit.h"
#include "aos/testing/test_logging.h"
#include "gflags/gflags.h"
@@ -507,14 +507,14 @@
TestShmRobustness(
config,
- [this, config, tid](void *memory) {
+ [config, tid](void *memory) {
// Initialize the queue and grab the tid.
LocklessQueue queue(
reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
config);
*tid = gettid();
},
- [this, config](void *memory) {
+ [config](void *memory) {
// Now try to write 2 messages. We will get killed a bunch as this
// tries to happen.
LocklessQueue queue(
@@ -527,7 +527,7 @@
sender.Send(data, s + 1);
}
},
- [this, config, tid](void *raw_memory) {
+ [config, tid](void *raw_memory) {
// Confirm that we can create 2 senders (the number in the queue), and
// send a message. And that all the messages in the queue are valid.
::aos::ipc_lib::LocklessQueueMemory *memory =
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index f3b49b6..b9cb54d 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -11,11 +11,10 @@
#include "aos/event.h"
#include "aos/events/epoll.h"
-#include "aos/init.h"
#include "aos/ipc_lib/aos_sync.h"
#include "aos/ipc_lib/queue_racer.h"
#include "aos/ipc_lib/signalfd.h"
-#include "aos/testing/test_logging.h"
+#include "aos/realtime.h"
#include "gflags/gflags.h"
#include "gtest/gtest.h"
@@ -43,7 +42,6 @@
class LocklessQueueTest : public ::testing::Test {
public:
LocklessQueueTest() {
- ::aos::testing::EnableTestLogging();
config_.num_watchers = 10;
config_.num_senders = 100;
config_.queue_size = 10000;
@@ -247,8 +245,8 @@
// Send enough messages to wrap.
for (int i = 0; i < 20000; ++i) {
// Confirm that the queue index makes sense given the number of sends.
- EXPECT_EQ(queue.LatestQueueIndex(),
- i == 0 ? LocklessQueue::empty_queue_index() : i - 1);
+ EXPECT_EQ(queue.LatestQueueIndex().index(),
+ i == 0 ? LocklessQueue::empty_queue_index().index() : i - 1);
// Send a trivial piece of data.
char data[100];
@@ -257,7 +255,7 @@
// Confirm that the queue index still makes sense. This is easier since the
// empty case has been handled.
- EXPECT_EQ(queue.LatestQueueIndex(), i);
+ EXPECT_EQ(queue.LatestQueueIndex().index(), i);
// Read a result from 5 in the past.
::aos::monotonic_clock::time_point monotonic_sent_time;
diff --git a/aos/ipc_lib/queue.cc b/aos/ipc_lib/queue.cc
index 5b6a0fa..3b6e5a1 100644
--- a/aos/ipc_lib/queue.cc
+++ b/aos/ipc_lib/queue.cc
@@ -172,7 +172,7 @@
MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
do {
if (__builtin_expect(header == nullptr, 0)) {
- AOS_LOG(FATAL, "overused pool of queue %p (%s)\n", this, name_);
+ LOG(FATAL) << "overused pool of queue " << this << " (" << name_ << ")";
}
} while (__builtin_expect(
!__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
@@ -195,10 +195,8 @@
static_assert((sizeof(RawQueue::MessageHeader) % 8) == 0,
"need to revalidate size/alignent assumptions");
- if (queue_length < 1) {
- AOS_LOG(FATAL, "queue length %d of %s needs to be at least 1\n",
- queue_length, name);
- }
+ CHECK_GE(queue_length, 1) << ": queue length " << queue_length << " of "
+ << name << " needs to be at least 1";
const size_t name_size = strlen(name) + 1;
char *temp = static_cast<char *>(shm_malloc(name_size));
@@ -249,8 +247,8 @@
printf("fetching queue %s\n", name);
}
if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
- AOS_LOG(FATAL, "mutex_lock(%p) failed\n",
- &global_core->mem_struct->queues.lock);
+ LOG(FATAL) << "mutex_lock(" << &global_core->mem_struct->queues.lock
+ << ") failed";
}
RawQueue *current =
static_cast<RawQueue *>(global_core->mem_struct->queues.pointer);
@@ -302,14 +300,14 @@
bool RawQueue::DoWriteMessage(void *msg, Options<RawQueue> options) {
if (kWriteDebug) {
- printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options.printable());
+ printf("queue: %p->WriteMessage(%p, %x), len :%zu\n", this, msg, options.printable(), msg_length_);
}
bool signal_readable;
{
IPCMutexLocker locker(&data_lock_);
- AOS_CHECK(!locker.owner_died());
+ CHECK(!locker.owner_died());
int new_end;
while (true) {
@@ -334,7 +332,7 @@
if (kWriteDebug) {
printf("queue: going to wait for writable_ of %p\n", this);
}
- AOS_CHECK(!writable_.Wait());
+ CHECK(!writable_.Wait());
}
}
data_[data_end_] = msg;
@@ -390,7 +388,7 @@
if (wait_result == Condition::WaitResult::kOk) {
break;
}
- AOS_CHECK(wait_result != Condition::WaitResult::kOwnerDied);
+ CHECK(wait_result != Condition::WaitResult::kOwnerDied);
if (wait_result == Condition::WaitResult::kTimeout) {
return false;
}
@@ -427,7 +425,7 @@
void *msg = NULL;
IPCMutexLocker locker(&data_lock_);
- AOS_CHECK(!locker.owner_died());
+ CHECK(!locker.owner_died());
if (!ReadCommonStart(options, nullptr, chrono::nanoseconds(0))) {
if (kReadDebug) {
@@ -490,7 +488,7 @@
void *msg = NULL;
IPCMutexLocker locker(&data_lock_);
- AOS_CHECK(!locker.owner_died());
+ CHECK(!locker.owner_died());
if (!ReadCommonStart(options, index, timeout)) {
if (kReadDebug) {
diff --git a/aos/ipc_lib/queue.h b/aos/ipc_lib/queue.h
index 5b68f2e..6e07993 100644
--- a/aos/ipc_lib/queue.h
+++ b/aos/ipc_lib/queue.h
@@ -5,7 +5,7 @@
#include "aos/mutex/mutex.h"
#include "aos/condition.h"
#include "aos/util/options.h"
-#include "aos/logging/logging.h"
+#include "glog/logging.h"
// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
// code to make checking for leaks work better
@@ -90,10 +90,11 @@
static constexpr Options<RawQueue> kWriteFailureOptions =
kNonBlock | kBlock | kOverride;
if (!options.NoOthersSet(kWriteFailureOptions)) {
- AOS_LOG(FATAL, "illegal write options in %x\n", options.printable());
+ LOG(FATAL) << "illegal write options in " << std::hex
+ << options.printable();
}
if (!options.ExactlyOneSet(kWriteFailureOptions)) {
- AOS_LOG(FATAL, "invalid write options %x\n", options.printable());
+ LOG(FATAL) << "invalid write options " << std::hex << options.printable();
}
return DoWriteMessage(msg, options);
}
@@ -123,7 +124,7 @@
CheckReadOptions(options);
static constexpr Options<RawQueue> kFromEndAndPeek = kFromEnd | kPeek;
if (options.AllSet(kFromEndAndPeek)) {
- AOS_LOG(FATAL, "ReadMessageIndex(kFromEnd | kPeek) is not allowed\n");
+ LOG(FATAL) << "ReadMessageIndex(kFromEnd | kPeek) is not allowed";
}
return DoReadMessageIndex(options, index, timeout);
}
@@ -161,11 +162,12 @@
static constexpr Options<RawQueue> kValidOptions =
kPeek | kFromEnd | kNonBlock | kBlock;
if (!options.NoOthersSet(kValidOptions)) {
- AOS_LOG(FATAL, "illegal read options in %x\n", options.printable());
+ LOG(FATAL) << "illegal read options in " << std::hex
+ << options.printable();
}
static constexpr Options<RawQueue> kBlockChoices = kNonBlock | kBlock;
if (!options.ExactlyOneSet(kBlockChoices)) {
- AOS_LOG(FATAL, "invalid read options %x\n", options.printable());
+ LOG(FATAL) << "invalid read options " << std::hex << options.printable();
}
}
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 53490a0..bb754d8 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -82,12 +82,15 @@
//
// So, grab them in order.
const uint64_t finished_writes = finished_writes_.load();
- const uint32_t latest_queue_index_uint32_t = queue.LatestQueueIndex();
+ const QueueIndex latest_queue_index_queue_index =
+ queue.LatestQueueIndex();
const uint64_t started_writes = started_writes_.load();
+ const uint32_t latest_queue_index_uint32_t =
+ queue.LatestQueueIndex().index();
uint64_t latest_queue_index = latest_queue_index_uint32_t;
- if (latest_queue_index_uint32_t != LocklessQueue::empty_queue_index()) {
+ if (latest_queue_index_queue_index != LocklessQueue::empty_queue_index()) {
// If we got smaller, we wrapped.
if (latest_queue_index_uint32_t < last_queue_index) {
++wrap_count;
@@ -104,19 +107,19 @@
// If we are at the beginning, the queue needs to always return empty.
if (started_writes == 0) {
- EXPECT_EQ(latest_queue_index_uint32_t,
+ EXPECT_EQ(latest_queue_index_queue_index,
LocklessQueue::empty_queue_index());
EXPECT_EQ(finished_writes, 0);
} else {
if (finished_writes == 0) {
// Plausible to be at the beginning.
- if (latest_queue_index_uint32_t !=
+ if (latest_queue_index_queue_index !=
LocklessQueue::empty_queue_index()) {
// Otherwise, we have started. The queue is always allowed to
EXPECT_GE(started_writes, latest_queue_index + 1);
}
} else {
- EXPECT_NE(latest_queue_index_uint32_t,
+ EXPECT_NE(latest_queue_index_queue_index,
LocklessQueue::empty_queue_index());
// latest_queue_index is an index, not a count. So it always reads 1
// low.
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index 59435b1..eaeedd4 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -36,7 +36,7 @@
size_t CurrentIndex() {
LocklessQueue queue(memory_, config_);
- return queue.LatestQueueIndex();
+ return queue.LatestQueueIndex().index();
}
private:
diff --git a/aos/ipc_lib/raw_queue_test.cc b/aos/ipc_lib/raw_queue_test.cc
index aa3b335..3048e1b 100644
--- a/aos/ipc_lib/raw_queue_test.cc
+++ b/aos/ipc_lib/raw_queue_test.cc
@@ -191,7 +191,8 @@
case 0: // child
if (kForkSleep != chrono::milliseconds(0)) {
AOS_LOG(INFO, "pid %jd sleeping for %" PRId64 "ns\n",
- static_cast<intmax_t>(getpid()), kForkSleep.count());
+ static_cast<intmax_t>(getpid()),
+ static_cast<int64_t>(kForkSleep.count()));
this_thread::sleep_for(kForkSleep);
}
::aos::testing::PreventExit();
diff --git a/aos/ipc_lib/shared_mem.c b/aos/ipc_lib/shared_mem.cc
similarity index 71%
rename from aos/ipc_lib/shared_mem.c
rename to aos/ipc_lib/shared_mem.cc
index da55ebc..93ced7d 100644
--- a/aos/ipc_lib/shared_mem.c
+++ b/aos/ipc_lib/shared_mem.cc
@@ -10,9 +10,9 @@
#include <stdlib.h>
#include <assert.h>
-#include "aos/ipc_lib/core_lib.h"
-#include "aos/logging/logging.h"
#include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/core_lib.h"
+#include "glog/logging.h"
// the path for the shared memory segment. see shm_open(3) for restrictions
#define AOS_SHM_NAME "/aos_shared_mem"
@@ -61,7 +61,7 @@
if (shm == -1 && errno == EEXIST) {
printf("shared_mem: going to shm_unlink(%s)\n", global_core->shm_name);
if (shm_unlink(global_core->shm_name) == -1) {
- AOS_PLOG(WARNING, "shm_unlink(%s) failed", global_core->shm_name);
+ PLOG(WARNING) << "shm_unlink(" << global_core->shm_name << ") failed";
break;
}
} else {
@@ -73,22 +73,22 @@
global_core->owner = 0;
}
if (shm == -1) {
- AOS_PLOG(FATAL, "shm_open(%s, O_RDWR [| O_CREAT | O_EXCL, 0|0666) failed",
- global_core->shm_name);
+ PLOG(FATAL) << "shm_open(" << global_core->shm_name
+ << ", O_RDWR [| O_CREAT | O_EXCL, 0|0666) failed";
}
if (global_core->owner) {
- if (ftruncate(shm, SIZEOFSHMSEG) == -1) {
- AOS_PLOG(FATAL, "fruncate(%d, 0x%zx) failed", shm, (size_t)SIZEOFSHMSEG);
- }
+ PCHECK(ftruncate(shm, SIZEOFSHMSEG) == 0)
+ << ": fruncate(" << shm << ", 0x" << std::hex << (size_t)SIZEOFSHMSEG
+ << ") failed";
}
int flags = MAP_SHARED | MAP_FIXED;
if (lock) flags |= MAP_LOCKED | MAP_POPULATE;
void *shm_address = mmap((void *)SHM_START, SIZEOFSHMSEG,
PROT_READ | PROT_WRITE, flags, shm, 0);
- if (shm_address == MAP_FAILED) {
- AOS_PLOG(FATAL, "shared_mem: mmap(%p, 0x%zx, stuff, %x, %d, 0) failed",
- (void *)SHM_START, (size_t)SIZEOFSHMSEG, flags, shm);
- }
+ PCHECK(shm_address != MAP_FAILED)
+ << std::hex << "shared_mem: mmap(" << (void *)SHM_START << ", 0x"
+ << (size_t)SIZEOFSHMSEG << ", stuff, " << flags << ", " << shm
+ << ", 0) failed";
if (create) {
printf("shared_mem: creating %s, shm at: %p\n", global_core->shm_name,
shm_address);
@@ -96,19 +96,18 @@
printf("shared_mem: not creating, shm at: %p\n", shm_address);
}
if (close(shm) == -1) {
- AOS_PLOG(WARNING, "close(%d(=shm) failed", shm);
+ PLOG(WARNING) << "close(" << shm << "(=shm) failed";
}
- if (shm_address != (void *)SHM_START) {
- AOS_LOG(FATAL, "shm isn't at hard-coded %p. at %p instead\n",
- (void *)SHM_START, shm_address);
- }
+ PCHECK(shm_address == (void *)SHM_START)
+ << "shm isn't at hard-coded " << (void *)SHM_START << ". at "
+ << shm_address << " instead";
aos_core_use_address_as_shared_mem(shm_address, SIZEOFSHMSEG);
- AOS_LOG(INFO, "shared_mem: end of create_shared_mem owner=%d\n",
- global_core->owner);
+ LOG(INFO) << "shared_mem: end of create_shared_mem owner="
+ << global_core->owner;
}
void aos_core_use_address_as_shared_mem(void *address, size_t size) {
- global_core->mem_struct = address;
+ global_core->mem_struct = reinterpret_cast<aos_shm_core_t *>(address);
global_core->size = size;
global_core->shared_mem =
(uint8_t *)address + sizeof(*global_core->mem_struct);
@@ -118,22 +117,19 @@
futex_set(&global_core->mem_struct->creation_condition);
} else {
if (futex_wait(&global_core->mem_struct->creation_condition) != 0) {
- AOS_LOG(FATAL, "waiting on creation_condition failed\n");
+ LOG(FATAL) << "waiting on creation_condition failed";
}
}
}
void aos_core_free_shared_mem() {
void *shm_address = global_core->shared_mem;
- if (munmap((void *)SHM_START, SIZEOFSHMSEG) == -1) {
- AOS_PLOG(FATAL, "munmap(%p, 0x%zx) failed", shm_address,
- (size_t)SIZEOFSHMSEG);
- }
+ PCHECK(munmap((void *)SHM_START, SIZEOFSHMSEG) != -1)
+ << ": munmap(" << shm_address << ", 0x" << std::hex
+ << (size_t)SIZEOFSHMSEG << ") failed";
if (global_core->owner) {
- if (shm_unlink(global_core->shm_name)) {
- AOS_PLOG(FATAL, "shared_mem: shm_unlink(%s) failed",
- global_core->shm_name);
- }
+ PCHECK(shm_unlink(global_core->shm_name) == 0)
+ << ": shared_mem: shm_unlink(" << global_core->shm_name << ") failed";
}
}
diff --git a/aos/ipc_lib/signalfd.cc b/aos/ipc_lib/signalfd.cc
index 045444b..af95598 100644
--- a/aos/ipc_lib/signalfd.cc
+++ b/aos/ipc_lib/signalfd.cc
@@ -5,12 +5,12 @@
#include <unistd.h>
#include <initializer_list>
-#include "aos/logging/logging.h"
+#include "glog/logging.h"
namespace aos {
namespace ipc_lib {
-SignalFd::SignalFd(::std::initializer_list<int> signals) {
+SignalFd::SignalFd(::std::initializer_list<unsigned int> signals) {
// Build up the mask with the provided signals.
sigemptyset(&mask_);
for (int signal : signals) {
@@ -18,7 +18,7 @@
}
// Then build a signalfd. Make it nonblocking so it works well with an epoll
// loop, and have it close on exec.
- AOS_PCHECK(fd_ = signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC));
+ PCHECK((fd_ = signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC)) != 0);
// Now that we have a consumer of the signal, block the signals so the
// signalfd gets them.
pthread_sigmask(SIG_BLOCK, &mask_, nullptr);
@@ -27,7 +27,7 @@
SignalFd::~SignalFd() {
// Unwind the constructor. Unblock the signals and close the fd.
pthread_sigmask(SIG_UNBLOCK, &mask_, nullptr);
- AOS_PCHECK(close(fd_));
+ PCHECK(close(fd_) == 0);
}
signalfd_siginfo SignalFd::Read() {
diff --git a/aos/ipc_lib/signalfd.h b/aos/ipc_lib/signalfd.h
index a545a80..7d2021a 100644
--- a/aos/ipc_lib/signalfd.h
+++ b/aos/ipc_lib/signalfd.h
@@ -13,7 +13,7 @@
public:
// Constructs a SignalFd for the provided list of signals.
// Blocks the signals at the same time in this thread.
- SignalFd(::std::initializer_list<int> signals);
+ SignalFd(::std::initializer_list<unsigned int> signals);
SignalFd(const SignalFd &) = delete;
SignalFd &operator=(const SignalFd &) = delete;