Convert aos over to flatbuffers
Everything builds, and all the tests pass. I suspect that some entries
are missing from the config files, but those will be found pretty
quickly on startup.
There is no logging or live introspection of queue messages.
Change-Id: I496ee01ed68f202c7851bed7e8786cee30df29f5
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index cd0e2ae..c2b0254 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -9,9 +9,8 @@
#include <iostream>
#include <sstream>
-#include "aos/init.h"
#include "aos/ipc_lib/lockless_queue_memory.h"
-#include "aos/logging/logging.h"
+#include "aos/realtime.h"
#include "aos/util/compiler_memory_barrier.h"
#include "glog/logging.h"
@@ -20,8 +19,6 @@
namespace {
-constexpr bool kDebug = false;
-
void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
const int result = mutex_grab(&(memory->queue_setup_lock));
CHECK(result == 0 || result == 1);
@@ -59,9 +56,7 @@
const uint32_t tid =
__atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
if (tid & FUTEX_OWNER_DIED) {
- if (kDebug) {
- printf("Found an easy death for sender %zu\n", i);
- }
+ VLOG(3) << "Found an easy death for sender " << i;
const Index to_replace = sender->to_replace.RelaxedLoad();
const Index scratch_index = sender->scratch_index.Load();
@@ -119,9 +114,7 @@
return;
}
- if (kDebug) {
- printf("Starting hard cleanup\n");
- }
+ VLOG(3) << "Starting hard cleanup";
size_t num_accounted_for = 0;
size_t num_missing = 0;
@@ -166,9 +159,8 @@
// Candidate.
CHECK_LE(to_replace.message_index(), accounted_for.size());
if (accounted_for[to_replace.message_index()]) {
- if (kDebug) {
- printf("Sender %zu died, to_replace is already accounted for\n", i);
- }
+ VLOG(3) << "Sender " << i
+ << " died, to_replace is already accounted for";
// If both are accounted for, we are corrupt...
CHECK(!accounted_for[scratch_index.message_index()]);
@@ -185,9 +177,8 @@
--num_missing;
++num_accounted_for;
} else if (accounted_for[scratch_index.message_index()]) {
- if (kDebug) {
- printf("Sender %zu died, scratch_index is already accounted for\n", i);
- }
+ VLOG(3) << "Sender " << i
+ << " died, scratch_index is already accounted for";
// scratch_index is accounted for. That means we did the insert,
// but didn't record it.
CHECK(to_replace.valid());
@@ -204,9 +195,7 @@
--num_missing;
++num_accounted_for;
} else {
- if (kDebug) {
- printf("Sender %zu died, neither is accounted for\n", i);
- }
+ VLOG(3) << "Sender " << i << " died, neither is accounted for";
// Ambiguous. There will be an unambiguous one somewhere that we
// can do first.
}
@@ -406,7 +395,8 @@
// creating a pidfd is likely not RT.
for (size_t i = 0; i < num_watchers; ++i) {
Watcher *w = memory_->GetWatcher(i);
- // Start by reading the tid. This needs to be atomic to force it to come first.
+ // Start by reading the tid. This needs to be atomic to force it to come
+ // first.
watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
watcher_copy_[i].pid = w->pid;
watcher_copy_[i].priority = w->priority;
@@ -521,18 +511,32 @@
return index;
}
+size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
+
+void *LocklessQueue::Sender::Data() {
+ ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
+ Index scratch_index = sender->scratch_index.RelaxedLoad();
+ Message *message = memory_->GetMessage(scratch_index);
+ message->header.queue_index.Invalidate();
+
+ return &message->data[0];
+}
+
void LocklessQueue::Sender::Send(const char *data, size_t length) {
+ CHECK_LE(length, size());
+ memcpy(Data(), data, length);
+ Send(length);
+}
+
+void LocklessQueue::Sender::Send(size_t length) {
const size_t queue_size = memory_->queue_size();
- CHECK_LE(length, memory_->message_data_size());
+ CHECK_LE(length, size());
::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
Index scratch_index = sender->scratch_index.RelaxedLoad();
Message *message = memory_->GetMessage(scratch_index);
- message->header.queue_index.Invalidate();
-
message->header.length = length;
- memcpy(&message->data[0], data, length);
while (true) {
const QueueIndex actual_next_queue_index =
@@ -564,10 +568,8 @@
memory_->next_queue_index.CompareAndExchangeStrong(
actual_next_queue_index, incremented_queue_index);
- if (kDebug) {
- printf("We were beat. Try again. Was %x, is %x\n", to_replace.get(),
- decremented_queue_index.index());
- }
+ VLOG(3) << "We were beat. Try again. Was " << std::hex
+ << to_replace.get() << ", is " << decremented_queue_index.index();
continue;
}
@@ -581,12 +583,10 @@
->header.queue_index.RelaxedLoad(queue_size);
if (previous_index != decremented_queue_index && previous_index.valid()) {
// Retry.
- if (kDebug) {
- printf(
- "Something fishy happened, queue index doesn't match. Retrying. "
- " Previous index was %x, should be %x\n",
- previous_index.index(), decremented_queue_index.index());
- }
+ VLOG(3) << "Something fishy happened, queue index doesn't match. "
+ "Retrying. Previous index was "
+ << std::hex << previous_index.index() << ", should be "
+ << decremented_queue_index.index();
continue;
}
}
@@ -597,8 +597,7 @@
// Before we are fully done filling out the message, update the Sender state
// with the new index to write. This re-uses the barrier for the
// queue_index store.
- const Index index_to_write(next_queue_index,
- scratch_index.message_index());
+ const Index index_to_write(next_queue_index, scratch_index.message_index());
sender->scratch_index.RelaxedStore(index_to_write);
@@ -616,9 +615,7 @@
->CompareAndExchangeStrong(to_replace, index_to_write)) {
// Aw, didn't succeed. Retry.
sender->to_replace.RelaxedInvalidate();
- if (kDebug) {
- printf("Failed to wrap into queue\n");
- }
+ VLOG(3) << "Failed to wrap into queue";
continue;
}
@@ -661,10 +658,8 @@
if (starting_queue_index != queue_index) {
// If we found a message that is exactly 1 loop old, we just wrapped.
if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
- if (kDebug) {
- printf("Matches: %x, %x\n", starting_queue_index.index(),
- queue_index.DecrementBy(queue_size).index());
- }
+ VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
+ << ", " << queue_index.DecrementBy(queue_size).index();
return ReadResult::NOTHING_NEW;
} else {
// Someone has re-used this message between when we pulled it out of the
@@ -673,9 +668,7 @@
Message *new_m = memory_->GetMessage(queue_index);
if (m != new_m) {
m = new_m;
- if (kDebug) {
- printf("Retrying, m doesn't match\n");
- }
+ VLOG(3) << "Retrying, m doesn't match";
continue;
}
@@ -686,17 +679,14 @@
// Either we got too far behind (signaled by this being a valid
// message), or this is one of the initial messages which are invalid.
if (starting_queue_index.valid()) {
- if (kDebug) {
- printf("Too old. Tried for %x, got %x, behind by %d\n",
- queue_index.index(), starting_queue_index.index(),
- starting_queue_index.index() - queue_index.index());
- }
+ VLOG(3) << "Too old. Tried for " << std::hex << queue_index.index()
+ << ", got " << starting_queue_index.index() << ", behind by "
+ << std::dec
+ << (starting_queue_index.index() - queue_index.index());
return ReadResult::TOO_OLD;
}
- if (kDebug) {
- printf("Initial\n");
- }
+ VLOG(3) << "Initial";
// There isn't a valid message at this location.
//
@@ -705,28 +695,24 @@
// asking for something crazy, like something before the beginning of
// the queue. Tell them that they are behind.
if (uint32_queue_index < memory_->queue_size()) {
- if (kDebug) {
- printf("Near zero, %x\n", uint32_queue_index);
- }
+ VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
return ReadResult::NOTHING_NEW;
} else {
- if (kDebug) {
- printf("not near zero, %x\n", uint32_queue_index);
- }
+ VLOG(3) << "not near zero, " << std::hex << uint32_queue_index;
return ReadResult::TOO_OLD;
}
}
}
- if (kDebug) {
- printf("Eq: %x, %x\n", starting_queue_index.index(), queue_index.index());
- }
+ VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
+ << queue_index.index();
break;
}
- // Then read the data out.
+ // Then read the data out. Copy it all out to be deterministic and so we can
+ // make length be from either end.
*monotonic_sent_time = m->header.monotonic_sent_time;
*realtime_sent_time = m->header.realtime_sent_time;
- memcpy(data, &m->data[0], m->header.length);
+ memcpy(data, &m->data[0], message_data_size());
*length = m->header.length;
// And finally, confirm that the message *still* points to the queue index we
@@ -735,20 +721,22 @@
// it's lifetime.
const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
if (final_queue_index != queue_index) {
- if (kDebug) {
- printf(
- "Changed out from under us. Reading %x, finished with %x, delta: "
- "%d\n",
- queue_index.index(), final_queue_index.index(),
- final_queue_index.index() - queue_index.index());
- }
- return ReadResult::TOO_OLD;
+ VLOG(3) << "Changed out from under us. Reading " << std::hex
+ << queue_index.index() << ", finished with "
+ << final_queue_index.index() << ", delta: " << std::dec
+ << (final_queue_index.index() - queue_index.index());
+ return ReadResult::OVERWROTE;
}
return ReadResult::GOOD;
}
-uint32_t LocklessQueue::LatestQueueIndex() {
+size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
+size_t LocklessQueue::message_data_size() const {
+ return memory_->message_data_size();
+}
+
+QueueIndex LocklessQueue::LatestQueueIndex() {
const size_t queue_size = memory_->queue_size();
// There is only one interesting case. We need to know if the queue is empty.
@@ -757,7 +745,7 @@
memory_->next_queue_index.Load(queue_size);
if (next_queue_index.valid()) {
const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
- return current_queue_index.index();
+ return current_queue_index;
} else {
return empty_queue_index();
}
@@ -845,7 +833,8 @@
}
::std::cout << " }" << ::std::endl;
- ::std::cout << " Sender senders[" << memory->num_senders() << "] {" << ::std::endl;
+ ::std::cout << " Sender senders[" << memory->num_senders() << "] {"
+ << ::std::endl;
for (size_t i = 0; i < memory->num_senders(); ++i) {
Sender *s = memory->GetSender(i);
::std::cout << " [" << i << "] -> Sender {" << ::std::endl;