Convert aos over to flatbuffers

Everything builds, and all the tests pass.  I suspect that some entries
are missing from the config files, but those will be found pretty
quickly on startup.

There is no logging or live introspection of queue messages.

Change-Id: I496ee01ed68f202c7851bed7e8786cee30df29f5
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index cd0e2ae..c2b0254 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -9,9 +9,8 @@
 #include <iostream>
 #include <sstream>
 
-#include "aos/init.h"
 #include "aos/ipc_lib/lockless_queue_memory.h"
-#include "aos/logging/logging.h"
+#include "aos/realtime.h"
 #include "aos/util/compiler_memory_barrier.h"
 #include "glog/logging.h"
 
@@ -20,8 +19,6 @@
 
 namespace {
 
-constexpr bool kDebug = false;
-
 void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
   const int result = mutex_grab(&(memory->queue_setup_lock));
   CHECK(result == 0 || result == 1);
@@ -59,9 +56,7 @@
     const uint32_t tid =
         __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
     if (tid & FUTEX_OWNER_DIED) {
-      if (kDebug) {
-        printf("Found an easy death for sender %zu\n", i);
-      }
+      VLOG(3) << "Found an easy death for sender " << i;
       const Index to_replace = sender->to_replace.RelaxedLoad();
       const Index scratch_index = sender->scratch_index.Load();
 
@@ -119,9 +114,7 @@
     return;
   }
 
-  if (kDebug) {
-    printf("Starting hard cleanup\n");
-  }
+  VLOG(3) << "Starting hard cleanup";
 
   size_t num_accounted_for = 0;
   size_t num_missing = 0;
@@ -166,9 +159,8 @@
         // Candidate.
         CHECK_LE(to_replace.message_index(), accounted_for.size());
         if (accounted_for[to_replace.message_index()]) {
-          if (kDebug) {
-            printf("Sender %zu died, to_replace is already accounted for\n", i);
-          }
+          VLOG(3) << "Sender " << i
+                  << " died, to_replace is already accounted for";
           // If both are accounted for, we are corrupt...
           CHECK(!accounted_for[scratch_index.message_index()]);
 
@@ -185,9 +177,8 @@
           --num_missing;
           ++num_accounted_for;
         } else if (accounted_for[scratch_index.message_index()]) {
-          if (kDebug) {
-            printf("Sender %zu died, scratch_index is already accounted for\n", i);
-          }
+          VLOG(3) << "Sender " << i
+                  << " died, scratch_index is already accounted for";
           // scratch_index is accounted for.  That means we did the insert,
           // but didn't record it.
           CHECK(to_replace.valid());
@@ -204,9 +195,7 @@
           --num_missing;
           ++num_accounted_for;
         } else {
-          if (kDebug) {
-            printf("Sender %zu died, neither is accounted for\n", i);
-          }
+          VLOG(3) << "Sender " << i << " died, neither is accounted for";
           // Ambiguous.  There will be an unambiguous one somewhere that we
           // can do first.
         }
@@ -406,7 +395,8 @@
   // creating a pidfd is likely not RT.
   for (size_t i = 0; i < num_watchers; ++i) {
     Watcher *w = memory_->GetWatcher(i);
-    // Start by reading the tid.  This needs to be atomic to force it to come first.
+    // Start by reading the tid.  This needs to be atomic to force it to come
+    // first.
     watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
     watcher_copy_[i].pid = w->pid;
     watcher_copy_[i].priority = w->priority;
@@ -521,18 +511,32 @@
   return index;
 }
 
+size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
+
+void *LocklessQueue::Sender::Data() {
+  ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
+  Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *message = memory_->GetMessage(scratch_index);
+  message->header.queue_index.Invalidate();
+
+  return &message->data[0];
+}
+
 void LocklessQueue::Sender::Send(const char *data, size_t length) {
+  CHECK_LE(length, size());
+  memcpy(Data(), data, length);
+  Send(length);
+}
+
+void LocklessQueue::Sender::Send(size_t length) {
   const size_t queue_size = memory_->queue_size();
-  CHECK_LE(length, memory_->message_data_size());
+  CHECK_LE(length, size());
 
   ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
   Index scratch_index = sender->scratch_index.RelaxedLoad();
   Message *message = memory_->GetMessage(scratch_index);
 
-  message->header.queue_index.Invalidate();
-
   message->header.length = length;
-  memcpy(&message->data[0], data, length);
 
   while (true) {
     const QueueIndex actual_next_queue_index =
@@ -564,10 +568,8 @@
       memory_->next_queue_index.CompareAndExchangeStrong(
           actual_next_queue_index, incremented_queue_index);
 
-      if (kDebug) {
-        printf("We were beat.  Try again.  Was %x, is %x\n", to_replace.get(),
-               decremented_queue_index.index());
-      }
+      VLOG(3) << "We were beat.  Try again.  Was " << std::hex
+              << to_replace.get() << ", is " << decremented_queue_index.index();
       continue;
     }
 
@@ -581,12 +583,10 @@
               ->header.queue_index.RelaxedLoad(queue_size);
       if (previous_index != decremented_queue_index && previous_index.valid()) {
         // Retry.
-        if (kDebug) {
-          printf(
-              "Something fishy happened, queue index doesn't match.  Retrying. "
-              " Previous index was %x, should be %x\n",
-              previous_index.index(), decremented_queue_index.index());
-        }
+        VLOG(3) << "Something fishy happened, queue index doesn't match.  "
+                   "Retrying.  Previous index was "
+                << std::hex << previous_index.index() << ", should be "
+                << decremented_queue_index.index();
         continue;
       }
     }
@@ -597,8 +597,7 @@
     // Before we are fully done filling out the message, update the Sender state
     // with the new index to write.  This re-uses the barrier for the
     // queue_index store.
-    const Index index_to_write(next_queue_index,
-                               scratch_index.message_index());
+    const Index index_to_write(next_queue_index, scratch_index.message_index());
 
     sender->scratch_index.RelaxedStore(index_to_write);
 
@@ -616,9 +615,7 @@
              ->CompareAndExchangeStrong(to_replace, index_to_write)) {
       // Aw, didn't succeed.  Retry.
       sender->to_replace.RelaxedInvalidate();
-      if (kDebug) {
-        printf("Failed to wrap into queue\n");
-      }
+      VLOG(3) << "Failed to wrap into queue";
       continue;
     }
 
@@ -661,10 +658,8 @@
     if (starting_queue_index != queue_index) {
       // If we found a message that is exactly 1 loop old, we just wrapped.
       if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
-        if (kDebug) {
-          printf("Matches: %x, %x\n", starting_queue_index.index(),
-                 queue_index.DecrementBy(queue_size).index());
-        }
+        VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
+                << ", " << queue_index.DecrementBy(queue_size).index();
         return ReadResult::NOTHING_NEW;
       } else {
         // Someone has re-used this message between when we pulled it out of the
@@ -673,9 +668,7 @@
         Message *new_m = memory_->GetMessage(queue_index);
         if (m != new_m) {
           m = new_m;
-          if (kDebug) {
-            printf("Retrying, m doesn't match\n");
-          }
+          VLOG(3) << "Retrying, m doesn't match";
           continue;
         }
 
@@ -686,17 +679,14 @@
         // Either we got too far behind (signaled by this being a valid
         // message), or this is one of the initial messages which are invalid.
         if (starting_queue_index.valid()) {
-          if (kDebug) {
-            printf("Too old.  Tried for %x, got %x, behind by %d\n",
-                   queue_index.index(), starting_queue_index.index(),
-                   starting_queue_index.index() - queue_index.index());
-          }
+          VLOG(3) << "Too old.  Tried for " << std::hex << queue_index.index()
+                  << ", got " << starting_queue_index.index() << ", behind by "
+                  << std::dec
+                  << (starting_queue_index.index() - queue_index.index());
           return ReadResult::TOO_OLD;
         }
 
-        if (kDebug) {
-          printf("Initial\n");
-        }
+        VLOG(3) << "Initial";
 
         // There isn't a valid message at this location.
         //
@@ -705,28 +695,24 @@
         // asking for something crazy, like something before the beginning of
         // the queue.  Tell them that they are behind.
         if (uint32_queue_index < memory_->queue_size()) {
-          if (kDebug) {
-            printf("Near zero, %x\n", uint32_queue_index);
-          }
+          VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
           return ReadResult::NOTHING_NEW;
         } else {
-          if (kDebug) {
-            printf("not near zero, %x\n", uint32_queue_index);
-          }
+          VLOG(3) << "not near zero, " << std::hex << uint32_queue_index;
           return ReadResult::TOO_OLD;
         }
       }
     }
-    if (kDebug) {
-      printf("Eq: %x, %x\n", starting_queue_index.index(), queue_index.index());
-    }
+    VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
+            << queue_index.index();
     break;
   }
 
-  // Then read the data out.
+  // Then read the data out.  Copy it all out to be deterministic and so we can
+  // make length be from either end.
   *monotonic_sent_time = m->header.monotonic_sent_time;
   *realtime_sent_time = m->header.realtime_sent_time;
-  memcpy(data, &m->data[0], m->header.length);
+  memcpy(data, &m->data[0], message_data_size());
   *length = m->header.length;
 
   // And finally, confirm that the message *still* points to the queue index we
@@ -735,20 +721,22 @@
   // it's lifetime.
   const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
   if (final_queue_index != queue_index) {
-    if (kDebug) {
-      printf(
-          "Changed out from under us.  Reading %x, finished with %x, delta: "
-          "%d\n",
-          queue_index.index(), final_queue_index.index(),
-          final_queue_index.index() - queue_index.index());
-    }
-    return ReadResult::TOO_OLD;
+    VLOG(3) << "Changed out from under us.  Reading " << std::hex
+            << queue_index.index() << ", finished with "
+            << final_queue_index.index() << ", delta: " << std::dec
+            << (final_queue_index.index() - queue_index.index());
+    return ReadResult::OVERWROTE;
   }
 
   return ReadResult::GOOD;
 }
 
-uint32_t LocklessQueue::LatestQueueIndex() {
+size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
+size_t LocklessQueue::message_data_size() const {
+  return memory_->message_data_size();
+}
+
+QueueIndex LocklessQueue::LatestQueueIndex() {
   const size_t queue_size = memory_->queue_size();
 
   // There is only one interesting case.  We need to know if the queue is empty.
@@ -757,7 +745,7 @@
       memory_->next_queue_index.Load(queue_size);
   if (next_queue_index.valid()) {
     const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
-    return current_queue_index.index();
+    return current_queue_index;
   } else {
     return empty_queue_index();
   }
@@ -845,7 +833,8 @@
   }
   ::std::cout << "  }" << ::std::endl;
 
-  ::std::cout << "  Sender senders[" << memory->num_senders() << "] {" << ::std::endl;
+  ::std::cout << "  Sender senders[" << memory->num_senders() << "] {"
+              << ::std::endl;
   for (size_t i = 0; i < memory->num_senders(); ++i) {
     Sender *s = memory->GetSender(i);
     ::std::cout << "    [" << i << "] -> Sender {" << ::std::endl;