Convert aos over to flatbuffers

Everything builds, and all the tests pass.  I suspect that some entries
are missing from the config files, but those will be found pretty
quickly on startup.

There is no logging or live introspection of queue messages.

Change-Id: I496ee01ed68f202c7851bed7e8786cee30df29f5
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 684620a..908cb80 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -1,5 +1,3 @@
-package(default_visibility = ["//visibility:public"])
-
 cc_library(
     name = "aos_sync",
     srcs = [
@@ -11,11 +9,12 @@
     linkopts = [
         "-lpthread",
     ],
+    visibility = ["//visibility:public"],
     deps = [
         "//aos:macros",
-        "@com_google_absl//absl/base",
-        "//aos/logging",
         "//aos/util:compiler_memory_barrier",
+        "@com_github_google_glog//:glog",
+        "@com_google_absl//absl/base",
     ],
 )
 
@@ -27,6 +26,7 @@
     hdrs = [
         "core_lib.h",
     ],
+    visibility = ["//visibility:public"],
     deps = [
         ":aos_sync",
         ":shared_mem_types",
@@ -36,7 +36,7 @@
 cc_library(
     name = "shared_mem",
     srcs = [
-        "shared_mem.c",
+        "shared_mem.cc",
     ],
     hdrs = [
         "shared_mem.h",
@@ -44,11 +44,12 @@
     linkopts = [
         "-lrt",
     ],
+    visibility = ["//visibility:public"],
     deps = [
         ":aos_sync",
         ":core_lib",
         ":shared_mem_types",
-        "//aos/logging",
+        "@com_github_google_glog//:glog",
     ],
 )
 
@@ -75,11 +76,11 @@
     linkopts = [
         "-lrt",
     ],
+    visibility = ["//visibility:public"],
     deps = [
         ":core_lib",
         ":shared_mem",
         "//aos:condition",
-        "//aos/logging",
         "//aos/mutex",
         "//aos/util:options",
     ],
@@ -157,8 +158,9 @@
     hdrs = [
         "signalfd.h",
     ],
+    visibility = ["//visibility:public"],
     deps = [
-        "//aos/logging",
+        "@com_github_google_glog//:glog",
     ],
 )
 
@@ -166,6 +168,7 @@
     name = "index",
     srcs = ["index.cc"],
     hdrs = ["index.h"],
+    visibility = ["//visibility:public"],
 )
 
 cc_test(
@@ -174,7 +177,7 @@
     deps = [
         ":index",
         "//aos/testing:googletest",
-        "//aos/testing:test_logging",
+        "@com_github_google_glog//:glog",
     ],
 )
 
@@ -185,11 +188,11 @@
         "lockless_queue_memory.h",
     ],
     hdrs = ["lockless_queue.h"],
+    visibility = ["//visibility:public"],
     deps = [
         ":aos_sync",
         ":index",
-        "//aos:init",
-        "//aos/logging",
+        "//aos:realtime",
         "//aos/time",
         "//aos/util:compiler_memory_barrier",
         "@com_github_google_glog//:glog",
@@ -208,7 +211,7 @@
     deps = [
         ":lockless_queue",
         "//aos:event",
-        "//third_party/googletest:gtest",
+        "//aos/testing:googletest",
     ],
 )
 
@@ -225,7 +228,6 @@
         "//aos/libc:aos_strsignal",
         "//aos/testing:googletest",
         "//aos/testing:prevent_exit",
-        "//aos/testing:test_logging",
     ],
 )
 
diff --git a/aos/ipc_lib/aos_sync.cc b/aos/ipc_lib/aos_sync.cc
index 3bdba21..efb1fb1 100644
--- a/aos/ipc_lib/aos_sync.cc
+++ b/aos/ipc_lib/aos_sync.cc
@@ -26,10 +26,10 @@
 #include <algorithm>
 #include <type_traits>
 
-#include "aos/logging/logging.h"
+#include "absl/base/call_once.h"
 #include "aos/macros.h"
 #include "aos/util/compiler_memory_barrier.h"
-#include "absl/base/call_once.h"
+#include "glog/logging.h"
 
 using ::aos::linux_code::ipc_lib::FutexAccessorObserver;
 
@@ -354,16 +354,16 @@
   return r;
 }
 
-// This gets called by functions before AOS_LOG(FATAL)ing with error messages
+// This gets called by functions before LOG(FATAL)ing with error messages
 // that would be incorrect if the error was caused by a process forking without
 // initialize_in_new_thread getting called in the fork.
 void check_cached_tid(pid_t tid) {
   pid_t actual = do_get_tid();
   if (tid != actual) {
-    AOS_LOG(FATAL,
-            "task %jd forked into %jd without letting aos_sync know"
-            " so we're not really sure what's going on\n",
-            static_cast<intmax_t>(tid), static_cast<intmax_t>(actual));
+    LOG(FATAL) << "task " << static_cast<intmax_t>(tid) << " forked into "
+               << static_cast<intmax_t>(actual)
+               << " without letting aos_sync know so we're not really sure "
+                  "what's going on";
   }
 }
 
@@ -378,9 +378,9 @@
 }
 
 void InstallAtforkHook() {
-  if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
-    AOS_PLOG(FATAL, "pthread_atfork(NULL, NULL, %p) failed", atfork_child);
-  }
+  PCHECK(pthread_atfork(NULL, NULL, &atfork_child) == 0)
+      << ": pthread_atfork(NULL, NULL, "
+      << reinterpret_cast<void *>(&atfork_child) << ") failed";
 }
 
 // This gets called to set everything up in a new thread by get_tid().
@@ -472,11 +472,10 @@
   robust_head.futex_offset = static_cast<ssize_t>(offsetof(aos_mutex, futex)) -
                              static_cast<ssize_t>(offsetof(aos_mutex, next));
   robust_head.pending_next = 0;
-  if (syscall(SYS_set_robust_list, robust_head_next_value(), sizeof(robust_head)) !=
-      0) {
-    AOS_PLOG(FATAL, "set_robust_list(%p, %zd) failed",
-             reinterpret_cast<void *>(robust_head.next), sizeof(robust_head));
-  }
+  PCHECK(syscall(SYS_set_robust_list, robust_head_next_value(),
+                 sizeof(robust_head)) == 0)
+      << ": set_robust_list(" << reinterpret_cast<void *>(robust_head.next)
+      << ", " << sizeof(robust_head) << ") failed";
   if (kRobustListDebug) {
     printf("%" PRId32 ": init done\n", get_tid());
   }
@@ -675,12 +674,12 @@
           }
         }
         my_robust_list::robust_head.pending_next = 0;
-        if (ret == -EDEADLK) {
-          AOS_LOG(FATAL, "multiple lock of %p by %" PRId32 "\n", m, tid);
-        }
-        AOS_PELOG(FATAL, -ret, "FUTEX_LOCK_PI(%p(=%" PRIu32 "), 1, %p) failed",
-                  &m->futex, __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST),
-                  timeout);
+        CHECK_NE(ret, -EDEADLK) << ": multiple lock of " << m << " by " << tid;
+
+        errno = -ret;
+        PLOG(FATAL) << "FUTEX_LOCK_PI(" << &m->futex
+                    << "(=" << __atomic_load_n(&m->futex, __ATOMIC_SEQ_CST)
+                    << "), 1, " << timeout << ") failed";
       } else {
         if (kLockDebug) {
           printf("%" PRId32 ": %p kernel lock done\n", tid, m);
@@ -746,8 +745,9 @@
         continue;
       }
       my_robust_list::robust_head.pending_next = 0;
-      AOS_PELOG(FATAL, -ret, "FUTEX_CMP_REQUEUE_PI(%p, 1, %d, %p, *%p) failed",
-                c, number_requeue, &m->futex, c);
+      errno = -ret;
+      PLOG(FATAL) << "FUTEX_CMP_REQUEUE_PI(" << c << ", 1, " << number_requeue
+                  << ", " << &m->futex << ", *" << c << ") failed";
     } else {
       return;
     }
@@ -778,11 +778,10 @@
     my_robust_list::robust_head.pending_next = 0;
     check_cached_tid(tid);
     if ((value & FUTEX_TID_MASK) == 0) {
-      AOS_LOG(FATAL, "multiple unlock of aos_mutex %p by %" PRId32 "\n", m,
-              tid);
+      LOG(FATAL) << "multiple unlock of aos_mutex " << m << " by " << tid;
     } else {
-      AOS_LOG(FATAL, "aos_mutex %p is locked by %" PRId32 ", not %" PRId32 "\n",
-              m, value & FUTEX_TID_MASK, tid);
+      LOG(FATAL) << "aos_mutex " << m << " is locked by "
+                 << (value & FUTEX_TID_MASK) << ", not " << tid;
     }
   }
 
@@ -795,7 +794,8 @@
     const int ret = sys_futex_unlock_pi(&m->futex);
     if (ret != 0) {
       my_robust_list::robust_head.pending_next = 0;
-      AOS_PELOG(FATAL, -ret, "FUTEX_UNLOCK_PI(%p) failed", &m->futex);
+      errno = -ret;
+      PLOG(FATAL) << "FUTEX_UNLOCK_PI(" << (&m->futex) << ") failed";
     }
   } else {
     // There aren't any waiters, so no need to call into the kernel.
@@ -832,8 +832,9 @@
           return 4;
         }
         my_robust_list::robust_head.pending_next = 0;
-        AOS_PELOG(FATAL, -ret, "FUTEX_TRYLOCK_PI(%p, 0, NULL) failed",
-                  &m->futex);
+        errno = -ret;
+        PLOG(FATAL) << "FUTEX_TRYLOCK_PI(" << (&m->futex)
+                    << ", 0, NULL) failed";
       }
     }
   }
@@ -898,9 +899,9 @@
         continue;
       }
       my_robust_list::robust_head.pending_next = 0;
-      AOS_PELOG(FATAL, -ret,
-                "FUTEX_WAIT_REQUEUE_PI(%p, %" PRIu32 ", %p) failed", c,
-                wait_start, &m->futex);
+      errno = -ret;
+      PLOG(FATAL) << "FUTEX_WAIT_REQUEUE_PI(" << c << ", " << wait_start << ", "
+                  << (&m->futex) << ") failed";
     } else {
       // Record that the kernel relocked it for us.
       lock_pthread_mutex(m);
diff --git a/aos/ipc_lib/index_test.cc b/aos/ipc_lib/index_test.cc
index 689ed24..2e9a37b 100644
--- a/aos/ipc_lib/index_test.cc
+++ b/aos/ipc_lib/index_test.cc
@@ -1,7 +1,7 @@
 #include "aos/ipc_lib/index.h"
 
-#include "aos/testing/test_logging.h"
 #include "gtest/gtest.h"
+#include "glog/logging.h"
 
 namespace aos {
 namespace ipc_lib {
@@ -10,7 +10,8 @@
 class QueueIndexTest : public ::testing::Test {
  protected:
   uint32_t GetIndex(const QueueIndex &index) {
-    printf("Index, count: %x, %x\n", index.index_, index.count_);
+    LOG(INFO) << "Index, count: " << std::hex << index.index_ << ", "
+              << index.count_;
     return index.index();
   }
 
diff --git a/aos/ipc_lib/ipc_comparison.cc b/aos/ipc_lib/ipc_comparison.cc
index 5ea1c8b..1553159 100644
--- a/aos/ipc_lib/ipc_comparison.cc
+++ b/aos/ipc_lib/ipc_comparison.cc
@@ -28,6 +28,7 @@
 #include "aos/logging/implementations.h"
 #include "aos/logging/logging.h"
 #include "aos/mutex/mutex.h"
+#include "aos/realtime.h"
 #include "aos/time/time.h"
 
 DEFINE_string(method, "", "Which IPC method to use");
diff --git a/aos/ipc_lib/lockless_queue.cc b/aos/ipc_lib/lockless_queue.cc
index cd0e2ae..c2b0254 100644
--- a/aos/ipc_lib/lockless_queue.cc
+++ b/aos/ipc_lib/lockless_queue.cc
@@ -9,9 +9,8 @@
 #include <iostream>
 #include <sstream>
 
-#include "aos/init.h"
 #include "aos/ipc_lib/lockless_queue_memory.h"
-#include "aos/logging/logging.h"
+#include "aos/realtime.h"
 #include "aos/util/compiler_memory_barrier.h"
 #include "glog/logging.h"
 
@@ -20,8 +19,6 @@
 
 namespace {
 
-constexpr bool kDebug = false;
-
 void GrabQueueSetupLockOrDie(LocklessQueueMemory *memory) {
   const int result = mutex_grab(&(memory->queue_setup_lock));
   CHECK(result == 0 || result == 1);
@@ -59,9 +56,7 @@
     const uint32_t tid =
         __atomic_load_n(&(sender->tid.futex), __ATOMIC_RELAXED);
     if (tid & FUTEX_OWNER_DIED) {
-      if (kDebug) {
-        printf("Found an easy death for sender %zu\n", i);
-      }
+      VLOG(3) << "Found an easy death for sender " << i;
       const Index to_replace = sender->to_replace.RelaxedLoad();
       const Index scratch_index = sender->scratch_index.Load();
 
@@ -119,9 +114,7 @@
     return;
   }
 
-  if (kDebug) {
-    printf("Starting hard cleanup\n");
-  }
+  VLOG(3) << "Starting hard cleanup";
 
   size_t num_accounted_for = 0;
   size_t num_missing = 0;
@@ -166,9 +159,8 @@
         // Candidate.
         CHECK_LE(to_replace.message_index(), accounted_for.size());
         if (accounted_for[to_replace.message_index()]) {
-          if (kDebug) {
-            printf("Sender %zu died, to_replace is already accounted for\n", i);
-          }
+          VLOG(3) << "Sender " << i
+                  << " died, to_replace is already accounted for";
           // If both are accounted for, we are corrupt...
           CHECK(!accounted_for[scratch_index.message_index()]);
 
@@ -185,9 +177,8 @@
           --num_missing;
           ++num_accounted_for;
         } else if (accounted_for[scratch_index.message_index()]) {
-          if (kDebug) {
-            printf("Sender %zu died, scratch_index is already accounted for\n", i);
-          }
+          VLOG(3) << "Sender " << i
+                  << " died, scratch_index is already accounted for";
           // scratch_index is accounted for.  That means we did the insert,
           // but didn't record it.
           CHECK(to_replace.valid());
@@ -204,9 +195,7 @@
           --num_missing;
           ++num_accounted_for;
         } else {
-          if (kDebug) {
-            printf("Sender %zu died, neither is accounted for\n", i);
-          }
+          VLOG(3) << "Sender " << i << " died, neither is accounted for";
           // Ambiguous.  There will be an unambiguous one somewhere that we
           // can do first.
         }
@@ -406,7 +395,8 @@
   // creating a pidfd is likely not RT.
   for (size_t i = 0; i < num_watchers; ++i) {
     Watcher *w = memory_->GetWatcher(i);
-    // Start by reading the tid.  This needs to be atomic to force it to come first.
+    // Start by reading the tid.  This needs to be atomic to force it to come
+    // first.
     watcher_copy_[i].tid = __atomic_load_n(&(w->tid.futex), __ATOMIC_SEQ_CST);
     watcher_copy_[i].pid = w->pid;
     watcher_copy_[i].priority = w->priority;
@@ -521,18 +511,32 @@
   return index;
 }
 
+size_t LocklessQueue::Sender::size() { return memory_->message_data_size(); }
+
+void *LocklessQueue::Sender::Data() {
+  ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
+  Index scratch_index = sender->scratch_index.RelaxedLoad();
+  Message *message = memory_->GetMessage(scratch_index);
+  message->header.queue_index.Invalidate();
+
+  return &message->data[0];
+}
+
 void LocklessQueue::Sender::Send(const char *data, size_t length) {
+  CHECK_LE(length, size());
+  memcpy(Data(), data, length);
+  Send(length);
+}
+
+void LocklessQueue::Sender::Send(size_t length) {
   const size_t queue_size = memory_->queue_size();
-  CHECK_LE(length, memory_->message_data_size());
+  CHECK_LE(length, size());
 
   ::aos::ipc_lib::Sender *sender = memory_->GetSender(sender_index_);
   Index scratch_index = sender->scratch_index.RelaxedLoad();
   Message *message = memory_->GetMessage(scratch_index);
 
-  message->header.queue_index.Invalidate();
-
   message->header.length = length;
-  memcpy(&message->data[0], data, length);
 
   while (true) {
     const QueueIndex actual_next_queue_index =
@@ -564,10 +568,8 @@
       memory_->next_queue_index.CompareAndExchangeStrong(
           actual_next_queue_index, incremented_queue_index);
 
-      if (kDebug) {
-        printf("We were beat.  Try again.  Was %x, is %x\n", to_replace.get(),
-               decremented_queue_index.index());
-      }
+      VLOG(3) << "We were beat.  Try again.  Was " << std::hex
+              << to_replace.get() << ", is " << decremented_queue_index.index();
       continue;
     }
 
@@ -581,12 +583,10 @@
               ->header.queue_index.RelaxedLoad(queue_size);
       if (previous_index != decremented_queue_index && previous_index.valid()) {
         // Retry.
-        if (kDebug) {
-          printf(
-              "Something fishy happened, queue index doesn't match.  Retrying. "
-              " Previous index was %x, should be %x\n",
-              previous_index.index(), decremented_queue_index.index());
-        }
+        VLOG(3) << "Something fishy happened, queue index doesn't match.  "
+                   "Retrying.  Previous index was "
+                << std::hex << previous_index.index() << ", should be "
+                << decremented_queue_index.index();
         continue;
       }
     }
@@ -597,8 +597,7 @@
     // Before we are fully done filling out the message, update the Sender state
     // with the new index to write.  This re-uses the barrier for the
     // queue_index store.
-    const Index index_to_write(next_queue_index,
-                               scratch_index.message_index());
+    const Index index_to_write(next_queue_index, scratch_index.message_index());
 
     sender->scratch_index.RelaxedStore(index_to_write);
 
@@ -616,9 +615,7 @@
              ->CompareAndExchangeStrong(to_replace, index_to_write)) {
       // Aw, didn't succeed.  Retry.
       sender->to_replace.RelaxedInvalidate();
-      if (kDebug) {
-        printf("Failed to wrap into queue\n");
-      }
+      VLOG(3) << "Failed to wrap into queue";
       continue;
     }
 
@@ -661,10 +658,8 @@
     if (starting_queue_index != queue_index) {
       // If we found a message that is exactly 1 loop old, we just wrapped.
       if (starting_queue_index == queue_index.DecrementBy(queue_size)) {
-        if (kDebug) {
-          printf("Matches: %x, %x\n", starting_queue_index.index(),
-                 queue_index.DecrementBy(queue_size).index());
-        }
+        VLOG(3) << "Matches: " << std::hex << starting_queue_index.index()
+                << ", " << queue_index.DecrementBy(queue_size).index();
         return ReadResult::NOTHING_NEW;
       } else {
         // Someone has re-used this message between when we pulled it out of the
@@ -673,9 +668,7 @@
         Message *new_m = memory_->GetMessage(queue_index);
         if (m != new_m) {
           m = new_m;
-          if (kDebug) {
-            printf("Retrying, m doesn't match\n");
-          }
+          VLOG(3) << "Retrying, m doesn't match";
           continue;
         }
 
@@ -686,17 +679,14 @@
         // Either we got too far behind (signaled by this being a valid
         // message), or this is one of the initial messages which are invalid.
         if (starting_queue_index.valid()) {
-          if (kDebug) {
-            printf("Too old.  Tried for %x, got %x, behind by %d\n",
-                   queue_index.index(), starting_queue_index.index(),
-                   starting_queue_index.index() - queue_index.index());
-          }
+          VLOG(3) << "Too old.  Tried for " << std::hex << queue_index.index()
+                  << ", got " << starting_queue_index.index() << ", behind by "
+                  << std::dec
+                  << (starting_queue_index.index() - queue_index.index());
           return ReadResult::TOO_OLD;
         }
 
-        if (kDebug) {
-          printf("Initial\n");
-        }
+        VLOG(3) << "Initial";
 
         // There isn't a valid message at this location.
         //
@@ -705,28 +695,24 @@
         // asking for something crazy, like something before the beginning of
         // the queue.  Tell them that they are behind.
         if (uint32_queue_index < memory_->queue_size()) {
-          if (kDebug) {
-            printf("Near zero, %x\n", uint32_queue_index);
-          }
+          VLOG(3) << "Near zero, " << std::hex << uint32_queue_index;
           return ReadResult::NOTHING_NEW;
         } else {
-          if (kDebug) {
-            printf("not near zero, %x\n", uint32_queue_index);
-          }
+          VLOG(3) << "not near zero, " << std::hex << uint32_queue_index;
           return ReadResult::TOO_OLD;
         }
       }
     }
-    if (kDebug) {
-      printf("Eq: %x, %x\n", starting_queue_index.index(), queue_index.index());
-    }
+    VLOG(3) << "Eq: " << std::hex << starting_queue_index.index() << ", "
+            << queue_index.index();
     break;
   }
 
-  // Then read the data out.
+  // Then read the data out.  Copy it all out to be deterministic and so we can
+  // make length be from either end.
   *monotonic_sent_time = m->header.monotonic_sent_time;
   *realtime_sent_time = m->header.realtime_sent_time;
-  memcpy(data, &m->data[0], m->header.length);
+  memcpy(data, &m->data[0], message_data_size());
   *length = m->header.length;
 
   // And finally, confirm that the message *still* points to the queue index we
@@ -735,20 +721,22 @@
   // it's lifetime.
   const QueueIndex final_queue_index = m->header.queue_index.Load(queue_size);
   if (final_queue_index != queue_index) {
-    if (kDebug) {
-      printf(
-          "Changed out from under us.  Reading %x, finished with %x, delta: "
-          "%d\n",
-          queue_index.index(), final_queue_index.index(),
-          final_queue_index.index() - queue_index.index());
-    }
-    return ReadResult::TOO_OLD;
+    VLOG(3) << "Changed out from under us.  Reading " << std::hex
+            << queue_index.index() << ", finished with "
+            << final_queue_index.index() << ", delta: " << std::dec
+            << (final_queue_index.index() - queue_index.index());
+    return ReadResult::OVERWROTE;
   }
 
   return ReadResult::GOOD;
 }
 
-uint32_t LocklessQueue::LatestQueueIndex() {
+size_t LocklessQueue::queue_size() const { return memory_->queue_size(); }
+size_t LocklessQueue::message_data_size() const {
+  return memory_->message_data_size();
+}
+
+QueueIndex LocklessQueue::LatestQueueIndex() {
   const size_t queue_size = memory_->queue_size();
 
   // There is only one interesting case.  We need to know if the queue is empty.
@@ -757,7 +745,7 @@
       memory_->next_queue_index.Load(queue_size);
   if (next_queue_index.valid()) {
     const QueueIndex current_queue_index = next_queue_index.DecrementBy(1u);
-    return current_queue_index.index();
+    return current_queue_index;
   } else {
     return empty_queue_index();
   }
@@ -845,7 +833,8 @@
   }
   ::std::cout << "  }" << ::std::endl;
 
-  ::std::cout << "  Sender senders[" << memory->num_senders() << "] {" << ::std::endl;
+  ::std::cout << "  Sender senders[" << memory->num_senders() << "] {"
+              << ::std::endl;
   for (size_t i = 0; i < memory->num_senders(); ++i) {
     Sender *s = memory->GetSender(i);
     ::std::cout << "    [" << i << "] -> Sender {" << ::std::endl;
diff --git a/aos/ipc_lib/lockless_queue.h b/aos/ipc_lib/lockless_queue.h
index dafc157..fcc5d79 100644
--- a/aos/ipc_lib/lockless_queue.h
+++ b/aos/ipc_lib/lockless_queue.h
@@ -106,7 +106,7 @@
 // Prints to stdout the data inside the queue for debugging.
 void PrintLocklessQueueMemory(LocklessQueueMemory *memory);
 
-const static int kWakeupSignal = SIGRTMIN + 2;
+const static unsigned int kWakeupSignal = SIGRTMIN + 2;
 
 // Class to manage sending and receiving data in the lockless queue.  This is
 // separate from the actual memory backing the queue so that memory can be
@@ -122,6 +122,8 @@
   // Returns the number of messages in the queue.
   size_t QueueSize() const;
 
+  size_t message_data_size() const;
+
   // Registers this thread to receive the kWakeupSignal signal when Wakeup is
   // called. Returns false if there was an error in registration.
   bool RegisterWakeup(int priority);
@@ -137,8 +139,9 @@
   // If you ask for a queue index 2 past the newest, you will still get
   // NOTHING_NEW until that gets overwritten with new data.  If you ask for an
   // element newer than QueueSize() from the current message, we consider it
-  // behind by a large amount and return TOO_OLD.
-  enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW };
+  // behind by a large amount and return TOO_OLD.  If the message is modified
+  // out from underneath us as we read it, return OVERWROTE.
+  enum class ReadResult { TOO_OLD, GOOD, NOTHING_NEW, OVERWROTE };
   ReadResult Read(uint32_t queue_index,
                   ::aos::monotonic_clock::time_point *monotonic_sent_time,
                   ::aos::realtime_clock::time_point *realtime_sent_time,
@@ -147,8 +150,12 @@
   // Returns the index to the latest queue message.  Returns empty_queue_index()
   // if there are no messages in the queue.  Do note that this index wraps if
   // more than 2^32 messages are sent.
-  uint32_t LatestQueueIndex();
-  static constexpr uint32_t empty_queue_index() { return 0xffffffff; }
+  QueueIndex LatestQueueIndex();
+  static QueueIndex empty_queue_index() { return QueueIndex::Invalid(); }
+
+  // Returns the size of the queue.  This is mostly useful for manipulating
+  // QueueIndex.
+  size_t queue_size() const;
 
   // TODO(austin): Return the oldest queue index.  This lets us catch up nicely
   // if we got behind.
@@ -181,6 +188,14 @@
 
     ~Sender();
 
+    // Sends a message without copying the data.
+    // Copy at most size() bytes of data into the memory pointed to by Data(),
+    // and then call Send().
+    // Note: calls to Data() are expensive enough that you should cache it.
+    size_t size();
+    void *Data();
+    void Send(size_t length);
+
     // Sends up to length data.  Does not wakeup the target.
     void Send(const char *data, size_t length);
 
diff --git a/aos/ipc_lib/lockless_queue_death_test.cc b/aos/ipc_lib/lockless_queue_death_test.cc
index 7a68273..4f1d7e4 100644
--- a/aos/ipc_lib/lockless_queue_death_test.cc
+++ b/aos/ipc_lib/lockless_queue_death_test.cc
@@ -11,10 +11,10 @@
 #include <memory>
 #include <thread>
 
-#include "aos/init.h"
 #include "aos/ipc_lib/aos_sync.h"
 #include "aos/ipc_lib/lockless_queue_memory.h"
 #include "aos/libc/aos_strsignal.h"
+#include "aos/realtime.h"
 #include "aos/testing/prevent_exit.h"
 #include "aos/testing/test_logging.h"
 #include "gflags/gflags.h"
@@ -507,14 +507,14 @@
 
   TestShmRobustness(
       config,
-      [this, config, tid](void *memory) {
+      [config, tid](void *memory) {
         // Initialize the queue and grab the tid.
         LocklessQueue queue(
             reinterpret_cast<aos::ipc_lib::LocklessQueueMemory *>(memory),
             config);
         *tid = gettid();
       },
-      [this, config](void *memory) {
+      [config](void *memory) {
         // Now try to write 2 messages.  We will get killed a bunch as this
         // tries to happen.
         LocklessQueue queue(
@@ -527,7 +527,7 @@
           sender.Send(data, s + 1);
         }
       },
-      [this, config, tid](void *raw_memory) {
+      [config, tid](void *raw_memory) {
         // Confirm that we can create 2 senders (the number in the queue), and
         // send a message.  And that all the messages in the queue are valid.
         ::aos::ipc_lib::LocklessQueueMemory *memory =
diff --git a/aos/ipc_lib/lockless_queue_test.cc b/aos/ipc_lib/lockless_queue_test.cc
index f3b49b6..b9cb54d 100644
--- a/aos/ipc_lib/lockless_queue_test.cc
+++ b/aos/ipc_lib/lockless_queue_test.cc
@@ -11,11 +11,10 @@
 
 #include "aos/event.h"
 #include "aos/events/epoll.h"
-#include "aos/init.h"
 #include "aos/ipc_lib/aos_sync.h"
 #include "aos/ipc_lib/queue_racer.h"
 #include "aos/ipc_lib/signalfd.h"
-#include "aos/testing/test_logging.h"
+#include "aos/realtime.h"
 #include "gflags/gflags.h"
 #include "gtest/gtest.h"
 
@@ -43,7 +42,6 @@
 class LocklessQueueTest : public ::testing::Test {
  public:
   LocklessQueueTest() {
-    ::aos::testing::EnableTestLogging();
     config_.num_watchers = 10;
     config_.num_senders = 100;
     config_.queue_size = 10000;
@@ -247,8 +245,8 @@
   // Send enough messages to wrap.
   for (int i = 0; i < 20000; ++i) {
     // Confirm that the queue index makes sense given the number of sends.
-    EXPECT_EQ(queue.LatestQueueIndex(),
-              i == 0 ? LocklessQueue::empty_queue_index() : i - 1);
+    EXPECT_EQ(queue.LatestQueueIndex().index(),
+              i == 0 ? LocklessQueue::empty_queue_index().index() : i - 1);
 
     // Send a trivial piece of data.
     char data[100];
@@ -257,7 +255,7 @@
 
     // Confirm that the queue index still makes sense.  This is easier since the
     // empty case has been handled.
-    EXPECT_EQ(queue.LatestQueueIndex(), i);
+    EXPECT_EQ(queue.LatestQueueIndex().index(), i);
 
     // Read a result from 5 in the past.
     ::aos::monotonic_clock::time_point monotonic_sent_time;
diff --git a/aos/ipc_lib/queue.cc b/aos/ipc_lib/queue.cc
index 5b6a0fa..3b6e5a1 100644
--- a/aos/ipc_lib/queue.cc
+++ b/aos/ipc_lib/queue.cc
@@ -172,7 +172,7 @@
   MessageHeader *header = __atomic_load_n(&free_messages_, __ATOMIC_RELAXED);
   do {
     if (__builtin_expect(header == nullptr, 0)) {
-      AOS_LOG(FATAL, "overused pool of queue %p (%s)\n", this, name_);
+      LOG(FATAL) << "overused pool of queue " << this << " (" << name_ << ")";
     }
   } while (__builtin_expect(
       !__atomic_compare_exchange_n(&free_messages_, &header, header->next, true,
@@ -195,10 +195,8 @@
   static_assert((sizeof(RawQueue::MessageHeader) % 8) == 0,
                 "need to revalidate size/alignent assumptions");
 
-  if (queue_length < 1) {
-    AOS_LOG(FATAL, "queue length %d of %s needs to be at least 1\n",
-            queue_length, name);
-  }
+  CHECK_GE(queue_length, 1) << ": queue length " << queue_length << " of "
+                            << name << " needs to be at least 1";
 
   const size_t name_size = strlen(name) + 1;
   char *temp = static_cast<char *>(shm_malloc(name_size));
@@ -249,8 +247,8 @@
     printf("fetching queue %s\n", name);
   }
   if (mutex_lock(&global_core->mem_struct->queues.lock) != 0) {
-    AOS_LOG(FATAL, "mutex_lock(%p) failed\n",
-            &global_core->mem_struct->queues.lock);
+    LOG(FATAL) << "mutex_lock(" << &global_core->mem_struct->queues.lock
+               << ") failed";
   }
   RawQueue *current =
       static_cast<RawQueue *>(global_core->mem_struct->queues.pointer);
@@ -302,14 +300,14 @@
 
 bool RawQueue::DoWriteMessage(void *msg, Options<RawQueue> options) {
   if (kWriteDebug) {
-    printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options.printable());
+    printf("queue: %p->WriteMessage(%p, %x), len :%zu\n", this, msg, options.printable(), msg_length_);
   }
 
   bool signal_readable;
 
   {
     IPCMutexLocker locker(&data_lock_);
-    AOS_CHECK(!locker.owner_died());
+    CHECK(!locker.owner_died());
 
     int new_end;
     while (true) {
@@ -334,7 +332,7 @@
         if (kWriteDebug) {
           printf("queue: going to wait for writable_ of %p\n", this);
         }
-        AOS_CHECK(!writable_.Wait());
+        CHECK(!writable_.Wait());
       }
     }
     data_[data_end_] = msg;
@@ -390,7 +388,7 @@
         if (wait_result == Condition::WaitResult::kOk) {
           break;
         }
-        AOS_CHECK(wait_result != Condition::WaitResult::kOwnerDied);
+        CHECK(wait_result != Condition::WaitResult::kOwnerDied);
         if (wait_result == Condition::WaitResult::kTimeout) {
           return false;
         }
@@ -427,7 +425,7 @@
   void *msg = NULL;
 
   IPCMutexLocker locker(&data_lock_);
-  AOS_CHECK(!locker.owner_died());
+  CHECK(!locker.owner_died());
 
   if (!ReadCommonStart(options, nullptr, chrono::nanoseconds(0))) {
     if (kReadDebug) {
@@ -490,7 +488,7 @@
   void *msg = NULL;
 
   IPCMutexLocker locker(&data_lock_);
-  AOS_CHECK(!locker.owner_died());
+  CHECK(!locker.owner_died());
 
   if (!ReadCommonStart(options, index, timeout)) {
     if (kReadDebug) {
diff --git a/aos/ipc_lib/queue.h b/aos/ipc_lib/queue.h
index 5b68f2e..6e07993 100644
--- a/aos/ipc_lib/queue.h
+++ b/aos/ipc_lib/queue.h
@@ -5,7 +5,7 @@
 #include "aos/mutex/mutex.h"
 #include "aos/condition.h"
 #include "aos/util/options.h"
-#include "aos/logging/logging.h"
+#include "glog/logging.h"
 
 // TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
 // code to make checking for leaks work better
@@ -90,10 +90,11 @@
     static constexpr Options<RawQueue> kWriteFailureOptions =
         kNonBlock | kBlock | kOverride;
     if (!options.NoOthersSet(kWriteFailureOptions)) {
-      AOS_LOG(FATAL, "illegal write options in %x\n", options.printable());
+      LOG(FATAL) << "illegal write options in " << std::hex
+                 << options.printable();
     }
     if (!options.ExactlyOneSet(kWriteFailureOptions)) {
-      AOS_LOG(FATAL, "invalid write options %x\n", options.printable());
+      LOG(FATAL) << "invalid write options " << std::hex << options.printable();
     }
     return DoWriteMessage(msg, options);
   }
@@ -123,7 +124,7 @@
     CheckReadOptions(options);
     static constexpr Options<RawQueue> kFromEndAndPeek = kFromEnd | kPeek;
     if (options.AllSet(kFromEndAndPeek)) {
-      AOS_LOG(FATAL, "ReadMessageIndex(kFromEnd | kPeek) is not allowed\n");
+      LOG(FATAL) << "ReadMessageIndex(kFromEnd | kPeek) is not allowed";
     }
     return DoReadMessageIndex(options, index, timeout);
   }
@@ -161,11 +162,12 @@
     static constexpr Options<RawQueue> kValidOptions =
         kPeek | kFromEnd | kNonBlock | kBlock;
     if (!options.NoOthersSet(kValidOptions)) {
-      AOS_LOG(FATAL, "illegal read options in %x\n", options.printable());
+      LOG(FATAL) << "illegal read options in " << std::hex
+                 << options.printable();
     }
     static constexpr Options<RawQueue> kBlockChoices = kNonBlock | kBlock;
     if (!options.ExactlyOneSet(kBlockChoices)) {
-      AOS_LOG(FATAL, "invalid read options %x\n", options.printable());
+      LOG(FATAL) << "invalid read options " << std::hex << options.printable();
     }
   }
 
diff --git a/aos/ipc_lib/queue_racer.cc b/aos/ipc_lib/queue_racer.cc
index 53490a0..bb754d8 100644
--- a/aos/ipc_lib/queue_racer.cc
+++ b/aos/ipc_lib/queue_racer.cc
@@ -82,12 +82,15 @@
       //
       // So, grab them in order.
       const uint64_t finished_writes = finished_writes_.load();
-      const uint32_t latest_queue_index_uint32_t = queue.LatestQueueIndex();
+      const QueueIndex latest_queue_index_queue_index =
+          queue.LatestQueueIndex();
       const uint64_t started_writes = started_writes_.load();
 
+      const uint32_t latest_queue_index_uint32_t =
+          queue.LatestQueueIndex().index();
       uint64_t latest_queue_index = latest_queue_index_uint32_t;
 
-      if (latest_queue_index_uint32_t != LocklessQueue::empty_queue_index()) {
+      if (latest_queue_index_queue_index != LocklessQueue::empty_queue_index()) {
         // If we got smaller, we wrapped.
         if (latest_queue_index_uint32_t < last_queue_index) {
           ++wrap_count;
@@ -104,19 +107,19 @@
 
       // If we are at the beginning, the queue needs to always return empty.
       if (started_writes == 0) {
-        EXPECT_EQ(latest_queue_index_uint32_t,
+        EXPECT_EQ(latest_queue_index_queue_index,
                   LocklessQueue::empty_queue_index());
         EXPECT_EQ(finished_writes, 0);
       } else {
         if (finished_writes == 0) {
           // Plausible to be at the beginning.
-          if (latest_queue_index_uint32_t !=
+          if (latest_queue_index_queue_index !=
               LocklessQueue::empty_queue_index()) {
             // Otherwise, we have started.  The queue is always allowed to
             EXPECT_GE(started_writes, latest_queue_index + 1);
           }
         } else {
-          EXPECT_NE(latest_queue_index_uint32_t,
+          EXPECT_NE(latest_queue_index_queue_index,
                     LocklessQueue::empty_queue_index());
           // latest_queue_index is an index, not a count.  So it always reads 1
           // low.
diff --git a/aos/ipc_lib/queue_racer.h b/aos/ipc_lib/queue_racer.h
index 59435b1..eaeedd4 100644
--- a/aos/ipc_lib/queue_racer.h
+++ b/aos/ipc_lib/queue_racer.h
@@ -36,7 +36,7 @@
 
   size_t CurrentIndex() {
     LocklessQueue queue(memory_, config_);
-    return queue.LatestQueueIndex();
+    return queue.LatestQueueIndex().index();
   }
 
  private:
diff --git a/aos/ipc_lib/raw_queue_test.cc b/aos/ipc_lib/raw_queue_test.cc
index aa3b335..3048e1b 100644
--- a/aos/ipc_lib/raw_queue_test.cc
+++ b/aos/ipc_lib/raw_queue_test.cc
@@ -191,7 +191,8 @@
       case 0:  // child
         if (kForkSleep != chrono::milliseconds(0)) {
           AOS_LOG(INFO, "pid %jd sleeping for %" PRId64 "ns\n",
-                  static_cast<intmax_t>(getpid()), kForkSleep.count());
+                  static_cast<intmax_t>(getpid()),
+                  static_cast<int64_t>(kForkSleep.count()));
           this_thread::sleep_for(kForkSleep);
         }
         ::aos::testing::PreventExit();
diff --git a/aos/ipc_lib/shared_mem.c b/aos/ipc_lib/shared_mem.cc
similarity index 71%
rename from aos/ipc_lib/shared_mem.c
rename to aos/ipc_lib/shared_mem.cc
index da55ebc..93ced7d 100644
--- a/aos/ipc_lib/shared_mem.c
+++ b/aos/ipc_lib/shared_mem.cc
@@ -10,9 +10,9 @@
 #include <stdlib.h>
 #include <assert.h>
 
-#include "aos/ipc_lib/core_lib.h"
-#include "aos/logging/logging.h"
 #include "aos/ipc_lib/aos_sync.h"
+#include "aos/ipc_lib/core_lib.h"
+#include "glog/logging.h"
 
 // the path for the shared memory segment. see shm_open(3) for restrictions
 #define AOS_SHM_NAME "/aos_shared_mem"
@@ -61,7 +61,7 @@
       if (shm == -1 && errno == EEXIST) {
         printf("shared_mem: going to shm_unlink(%s)\n", global_core->shm_name);
         if (shm_unlink(global_core->shm_name) == -1) {
-          AOS_PLOG(WARNING, "shm_unlink(%s) failed", global_core->shm_name);
+          PLOG(WARNING) << "shm_unlink(" << global_core->shm_name << ") failed";
           break;
         }
       } else {
@@ -73,22 +73,22 @@
     global_core->owner = 0;
   }
   if (shm == -1) {
-    AOS_PLOG(FATAL, "shm_open(%s, O_RDWR [| O_CREAT | O_EXCL, 0|0666) failed",
-             global_core->shm_name);
+    PLOG(FATAL) << "shm_open(" << global_core->shm_name
+                << ", O_RDWR [| O_CREAT | O_EXCL, 0|0666) failed";
   }
   if (global_core->owner) {
-    if (ftruncate(shm, SIZEOFSHMSEG) == -1) {
-      AOS_PLOG(FATAL, "fruncate(%d, 0x%zx) failed", shm, (size_t)SIZEOFSHMSEG);
-    }
+    PCHECK(ftruncate(shm, SIZEOFSHMSEG) == 0)
+        << ": fruncate(" << shm << ", 0x" << std::hex << (size_t)SIZEOFSHMSEG
+        << ") failed";
   }
   int flags = MAP_SHARED | MAP_FIXED;
   if (lock) flags |= MAP_LOCKED | MAP_POPULATE;
   void *shm_address = mmap((void *)SHM_START, SIZEOFSHMSEG,
                            PROT_READ | PROT_WRITE, flags, shm, 0);
-  if (shm_address == MAP_FAILED) {
-    AOS_PLOG(FATAL, "shared_mem: mmap(%p, 0x%zx, stuff, %x, %d, 0) failed",
-             (void *)SHM_START, (size_t)SIZEOFSHMSEG, flags, shm);
-  }
+  PCHECK(shm_address != MAP_FAILED)
+      << std::hex << "shared_mem: mmap(" << (void *)SHM_START << ", 0x"
+      << (size_t)SIZEOFSHMSEG << ", stuff, " << flags << ", " << shm
+      << ", 0) failed";
   if (create) {
     printf("shared_mem: creating %s, shm at: %p\n", global_core->shm_name,
            shm_address);
@@ -96,19 +96,18 @@
     printf("shared_mem: not creating, shm at: %p\n", shm_address);
   }
   if (close(shm) == -1) {
-    AOS_PLOG(WARNING, "close(%d(=shm) failed", shm);
+    PLOG(WARNING) << "close(" << shm << "(=shm) failed";
   }
-  if (shm_address != (void *)SHM_START) {
-    AOS_LOG(FATAL, "shm isn't at hard-coded %p. at %p instead\n",
-            (void *)SHM_START, shm_address);
-  }
+  PCHECK(shm_address == (void *)SHM_START)
+      << "shm isn't at hard-coded " << (void *)SHM_START << ". at "
+      << shm_address << " instead";
   aos_core_use_address_as_shared_mem(shm_address, SIZEOFSHMSEG);
-  AOS_LOG(INFO, "shared_mem: end of create_shared_mem owner=%d\n",
-          global_core->owner);
+  LOG(INFO) << "shared_mem: end of create_shared_mem owner="
+            << global_core->owner;
 }
 
 void aos_core_use_address_as_shared_mem(void *address, size_t size) {
-  global_core->mem_struct = address;
+  global_core->mem_struct = reinterpret_cast<aos_shm_core_t *>(address);
   global_core->size = size;
   global_core->shared_mem =
       (uint8_t *)address + sizeof(*global_core->mem_struct);
@@ -118,22 +117,19 @@
     futex_set(&global_core->mem_struct->creation_condition);
   } else {
     if (futex_wait(&global_core->mem_struct->creation_condition) != 0) {
-      AOS_LOG(FATAL, "waiting on creation_condition failed\n");
+      LOG(FATAL) << "waiting on creation_condition failed";
     }
   }
 }
 
 void aos_core_free_shared_mem() {
   void *shm_address = global_core->shared_mem;
-  if (munmap((void *)SHM_START, SIZEOFSHMSEG) == -1) {
-    AOS_PLOG(FATAL, "munmap(%p, 0x%zx) failed", shm_address,
-             (size_t)SIZEOFSHMSEG);
-  }
+  PCHECK(munmap((void *)SHM_START, SIZEOFSHMSEG) != -1)
+      << ": munmap(" << shm_address << ", 0x" << std::hex
+      << (size_t)SIZEOFSHMSEG << ") failed";
   if (global_core->owner) {
-    if (shm_unlink(global_core->shm_name)) {
-      AOS_PLOG(FATAL, "shared_mem: shm_unlink(%s) failed",
-               global_core->shm_name);
-    }
+    PCHECK(shm_unlink(global_core->shm_name) == 0)
+        << ": shared_mem: shm_unlink(" << global_core->shm_name << ") failed";
   }
 }
 
diff --git a/aos/ipc_lib/signalfd.cc b/aos/ipc_lib/signalfd.cc
index 045444b..af95598 100644
--- a/aos/ipc_lib/signalfd.cc
+++ b/aos/ipc_lib/signalfd.cc
@@ -5,12 +5,12 @@
 #include <unistd.h>
 #include <initializer_list>
 
-#include "aos/logging/logging.h"
+#include "glog/logging.h"
 
 namespace aos {
 namespace ipc_lib {
 
-SignalFd::SignalFd(::std::initializer_list<int> signals) {
+SignalFd::SignalFd(::std::initializer_list<unsigned int> signals) {
   // Build up the mask with the provided signals.
   sigemptyset(&mask_);
   for (int signal : signals) {
@@ -18,7 +18,7 @@
   }
   // Then build a signalfd.  Make it nonblocking so it works well with an epoll
   // loop, and have it close on exec.
-  AOS_PCHECK(fd_ = signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC));
+  PCHECK((fd_ = signalfd(-1, &mask_, SFD_NONBLOCK | SFD_CLOEXEC)) != 0);
   // Now that we have a consumer of the signal, block the signals so the
   // signalfd gets them.
   pthread_sigmask(SIG_BLOCK, &mask_, nullptr);
@@ -27,7 +27,7 @@
 SignalFd::~SignalFd() {
   // Unwind the constructor.  Unblock the signals and close the fd.
   pthread_sigmask(SIG_UNBLOCK, &mask_, nullptr);
-  AOS_PCHECK(close(fd_));
+  PCHECK(close(fd_) == 0);
 }
 
 signalfd_siginfo SignalFd::Read() {
diff --git a/aos/ipc_lib/signalfd.h b/aos/ipc_lib/signalfd.h
index a545a80..7d2021a 100644
--- a/aos/ipc_lib/signalfd.h
+++ b/aos/ipc_lib/signalfd.h
@@ -13,7 +13,7 @@
  public:
   // Constructs a SignalFd for the provided list of signals.
   // Blocks the signals at the same time in this thread.
-  SignalFd(::std::initializer_list<int> signals);
+  SignalFd(::std::initializer_list<unsigned int> signals);
 
   SignalFd(const SignalFd &) = delete;
   SignalFd &operator=(const SignalFd &) = delete;