merging in the rewritten queue code and cleaned up linux IPC stuff

This change has been around for over a year now, and it's time to merge
it in, because it makes that code much easier to deal with.
diff --git a/aos/linux_code/camera/Buffers.cpp b/aos/linux_code/camera/Buffers.cpp
index e1d22b6..19c1d45 100644
--- a/aos/linux_code/camera/Buffers.cpp
+++ b/aos/linux_code/camera/Buffers.cpp
@@ -61,24 +61,21 @@
 }
 
 void Buffers::Release() {
-  if (message_ != NULL) {
-    queue_->FreeMessage(message_);
-    message_ = NULL;
-  }
+  message_.reset();
 }
-const void *Buffers::GetNext(bool block,
-                       uint32_t *bytesused, timeval *timestamp, uint32_t *sequence) {
+const void *Buffers::GetNext(bool block, uint32_t *bytesused,
+                             timeval *timestamp, uint32_t *sequence) {
   Release();
 
   // TODO(brians) make sure the camera reader process hasn't died
   do {
     if (block) {
-      message_ = static_cast<const Message *>(queue_->ReadMessage(
-              RawQueue::kPeek | RawQueue::kBlock));
+      message_.reset(static_cast<const Message *>(
+          queue_->ReadMessage(RawQueue::kPeek | RawQueue::kBlock)));
     } else {
       static int index = 0;
-      message_ = static_cast<const Message *>(queue_->ReadMessageIndex(
-              RawQueue::kBlock, &index));
+      message_.reset(static_cast<const Message *>(
+          queue_->ReadMessageIndex(RawQueue::kBlock, &index)));
     }
   } while (block && message_ == NULL);
   if (message_ != NULL) {
@@ -132,9 +129,12 @@
   
   return myfds[0];
 }
-Buffers::Buffers() : server_(CreateSocket(connect)), fd_(FetchFD()), message_(NULL) {
+Buffers::Buffers()
+    : server_(CreateSocket(connect)),
+      fd_(FetchFD()),
+      queue_(RawQueue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1)),
+      message_(queue_) {
   MMap();
-  queue_ = RawQueue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1);
 }
 
 Buffers::~Buffers() {
diff --git a/aos/linux_code/camera/Buffers.h b/aos/linux_code/camera/Buffers.h
index b447468..aedd79f 100644
--- a/aos/linux_code/camera/Buffers.h
+++ b/aos/linux_code/camera/Buffers.h
@@ -8,6 +8,7 @@
 
 #include "aos/linux_code/ipc_lib/queue.h"
 #include "aos/common/type_traits.h"
+#include "aos/atom_code/ipc_lib/unique_message_ptr.h"
 
 namespace aos {
 namespace camera {
@@ -17,6 +18,7 @@
   // It has to do a lot of the same things as all the other ones, but it gets
   // the information from different places (some of it gets sent out by it).
   friend class Reader;
+
   // Not an abstract name so that an existing one can just be unlinked without
   // disturbing it if necessary (like with shm_link).
   static const std::string kFDServerName;
@@ -50,14 +52,17 @@
     uint32_t sequence;
   };
   static_assert(shm_ok<Message>::value, "it's going through queues");
-  // The current one. Sometimes NULL.
-  const Message *message_;
-  static const std::string kQueueName;
+
   // NULL for the Reader one.
-  RawQueue *queue_;
+  RawQueue *const queue_;
+  // The current one. Sometimes NULL.
+  unique_message_ptr<const Message> message_;
+
+  static const std::string kQueueName;
   // Make the actual mmap calls.
   // Called by Buffers() automatically.
   void MMap();
+
  public:
   Buffers();
   // Will clean everything up.
@@ -89,4 +94,3 @@
 } // namespace aos
 
 #endif
-
diff --git a/aos/linux_code/camera/camera.gyp b/aos/linux_code/camera/camera.gyp
index e4f1e04..fabd52f 100644
--- a/aos/linux_code/camera/camera.gyp
+++ b/aos/linux_code/camera/camera.gyp
@@ -47,9 +47,11 @@
       'dependencies': [
         '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:queue',
         '<(AOS)/build/aos.gyp:logging',
+        '<(AOS)/atom_code/ipc_lib/ipc_lib.gyp:scoped_message_ptr',
       ],
       'export_dependent_settings': [
         '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:queue',
+        '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:scoped_message_ptr',
       ],
     },
     {
diff --git a/aos/linux_code/ipc_lib/aos_sync.c b/aos/linux_code/ipc_lib/aos_sync.c
index 52ebed1..a607001 100644
--- a/aos/linux_code/ipc_lib/aos_sync.c
+++ b/aos/linux_code/ipc_lib/aos_sync.c
@@ -18,13 +18,19 @@
   return result;
 }
 
-// this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
-// should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
+// this code is based on something that appears to be based on
+//   <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful
+//   information
+// should probably use
+// <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it
+// becomes available
 //   (sys_set_robust_list appears to be the function name)
 // <http://locklessinc.com/articles/futex_cheat_sheet/> and
 //   <http://locklessinc.com/articles/mutex_cv_futex/> are useful
-// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009 (fairly recent compared to everything else...)
-// can't use PRIVATE futex operations because they use the pid (or something) as part of the hash
+// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009
+//   (fairly recent compared to everything else...)
+// can't use PRIVATE futex operations because they use the pid (or something) as
+//   part of the hash
 //
 // Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
 // FUTEX_WAIT, the docs call it EWOULDBLOCK...)
@@ -60,7 +66,6 @@
   if (c == 1) c = xchg(m, 2);
   while (c) {
     /* Wait in the kernel */
-    //printf("sync here %d\n", __LINE__);
     if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
       if (signals_fail && errno == EINTR) {
         return 1;
@@ -69,7 +74,6 @@
         return 2;
       }
     }
-    //printf("sync here %d\n", __LINE__);
     c = xchg(m, 2);
   }
   return 0;
@@ -148,6 +152,8 @@
   mutex_unlock(m);
 
   while (1) {
+    // Wait in the kernel iff the value of it doesn't change (ie somebody else
+    // does a wake) from before we unlocked the mutex.
     if (sys_futex(c, FUTEX_WAIT, wait_start, NULL, NULL, 0) == -1) {
       // If it failed for some reason other than somebody else doing a wake
       // before we actually made it to sleep.
@@ -161,8 +167,12 @@
         abort();
       }
     }
+    // Relock the mutex now that we're done waiting.
     // Simplified mutex_lock that always leaves it
     // contended in case anybody else got requeued.
+    // If we got requeued above, this will just succeed the first time because
+    // the person waking us from the above wait (changed to be on the mutex
+    // instead of the condition) will have just set it to 0.
     while (xchg(m, 2) != 0) {
       if (sys_futex(m, FUTEX_WAIT, 2, NULL, NULL, 0) == -1) {
         // Try again if it was because of a signal or somebody else unlocked it
@@ -180,7 +190,11 @@
 }
 
 void condition_signal(mutex *c) {
+  // This will cause anybody else who is in between unlocking the mutex and
+  // going to sleep in the kernel to not go to sleep and return immediately
+  // instead.
   __sync_fetch_and_add(c, 1);
+  // Wake at most 1 person who is waiting in the kernel.
   if (sys_futex(c, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
     fprintf(stderr, "sync: FUTEX_WAKE(%p, 1, NULL, NULL, 0)"
         " failed with %d: %s\n",
@@ -192,7 +206,9 @@
 
 void condition_broadcast(mutex *c, mutex *m) {
   __sync_fetch_and_add(c, 1);
-  // Wake 1 waiter and requeue the rest.
+  // Wake at most 1 waiter and requeue the rest.
+  // Everybody else is going to have to wait for the 1st person to take the
+  // mutex anyways.
   if (sys_futex_requeue(c, FUTEX_REQUEUE, 1, INT_MAX, m) == -1) {
     fprintf(stderr, "sync: FUTEX_REQUEUE(%p, 1, INT_MAX, %p, 0)"
         " failed with %d: %s\n",
diff --git a/aos/linux_code/ipc_lib/aos_sync.h b/aos/linux_code/ipc_lib/aos_sync.h
index 7a81ca3..0d55246 100644
--- a/aos/linux_code/ipc_lib/aos_sync.h
+++ b/aos/linux_code/ipc_lib/aos_sync.h
@@ -48,6 +48,8 @@
 // They are designed for signalling when something happens (possibly to
 // multiple listeners). A mutex manipulated with them can only be set or unset.
 //
+// Another name for this kind of synchronization mechanism is a "notification".
+//
 // They are different from the condition_ functions in that they do NOT work
 // correctly as standard condition variables. While it is possible to keep
 // track of the "condition" using the value part of the futex_* functions, the
diff --git a/aos/linux_code/ipc_lib/condition.cc b/aos/linux_code/ipc_lib/condition.cc
index b764026..0ba4145 100644
--- a/aos/linux_code/ipc_lib/condition.cc
+++ b/aos/linux_code/ipc_lib/condition.cc
@@ -6,8 +6,8 @@
 
 namespace aos {
 
-static_assert(shm_ok<Condition>::value, "Condition should work"
-              " in shared memory");
+static_assert(shm_ok<Condition>::value,
+              "Condition should work in shared memory");
 
 Condition::Condition(Mutex *m) : impl_(), m_(m) {}
 
diff --git a/aos/linux_code/ipc_lib/core_lib.c b/aos/linux_code/ipc_lib/core_lib.c
index cc1ccbb..2bd6c25 100644
--- a/aos/linux_code/ipc_lib/core_lib.c
+++ b/aos/linux_code/ipc_lib/core_lib.c
@@ -5,7 +5,7 @@
 
 #include "aos/linux_code/ipc_lib/shared_mem.h"
 
-static inline uint8_t aos_8max(uint8_t l, uint8_t r) {
+static uint8_t aos_8max(uint8_t l, uint8_t r) {
   return (l > r) ? l : r;
 }
 void *shm_malloc_aligned(size_t length, uint8_t alignment) {
diff --git a/aos/linux_code/ipc_lib/ipc_lib.gyp b/aos/linux_code/ipc_lib/ipc_lib.gyp
index df7b658..08e8df8 100644
--- a/aos/linux_code/ipc_lib/ipc_lib.gyp
+++ b/aos/linux_code/ipc_lib/ipc_lib.gyp
@@ -60,6 +60,7 @@
         'core_lib',
         '<(AOS)/common/common.gyp:queue_testutils',
         '<(AOS)/common/common.gyp:time',
+        '<(AOS)/common/common.gyp:die',
       ],
     },
     {
@@ -76,6 +77,22 @@
         'core_lib',
         '<(AOS)/common/common.gyp:die',
       ],
+      'variables': {
+        'is_special_test': 1,
+      },
+    },
+    {
+      'target_name': 'scoped_message_ptr',
+      'type': 'static_library',
+      'sources': [
+        #'scoped_message_ptr.h',
+      ],
+      'dependencies': [
+        'queue',
+      ],
+      'export_dependent_settings': [
+        'queue',
+      ],
     },
   ],
 }
diff --git a/aos/linux_code/ipc_lib/ipc_stress_test.cc b/aos/linux_code/ipc_lib/ipc_stress_test.cc
index 1b55e82..4a9d93f 100644
--- a/aos/linux_code/ipc_lib/ipc_stress_test.cc
+++ b/aos/linux_code/ipc_lib/ipc_stress_test.cc
@@ -7,7 +7,6 @@
 #include <libgen.h>
 #include <assert.h>
 
-#include <vector>
 #include <string>
 
 #include "aos/common/time.h"
@@ -22,6 +21,10 @@
 // stderr output from each test run and only prints it out (not interleaved with
 // the output from any other run) if the test fails.
 //
+// They have to be run in separate processes because (in addition to various
+// parts of our code not being thread-safe...) gtest does not like multiple
+// threads.
+//
 // It's written in C++ for performance. We need actual OS-level parallelism for
 // this to work, which means that Ruby's out because it doesn't have good
 // support for doing that. My Python implementation ended up pretty heavily disk
@@ -33,14 +36,14 @@
 // arguments to pass to it.
 // Using --gtest_filter is a bad idea because it seems to result in a lot of
 // swapping which causes everything to be disk-bound (at least for me).
-static const ::std::vector< ::std::vector< ::std::string>> kTests = {
+static const char * kTests[][] = {
   {"queue_test"},
   {"condition_test"},
   {"mutex_test"},
   {"raw_queue_test"},
 };
 // These arguments get inserted before any per-test arguments.
-static const ::std::vector< ::std::string> kDefaultArgs = {
+static const char *kDefaultArgs[] = {
   "--gtest_repeat=30",
   "--gtest_shuffle",
 };
@@ -75,7 +78,7 @@
 
 // Gets called after each child forks to run a test.
 void __attribute__((noreturn)) DoRunTest(
-    Shared *shared, const ::std::vector< ::std::string> &test, int pipes[2]) {
+    Shared *shared, const ::std::array<const char *> &test, int pipes[2]) {
   if (close(pipes[0]) == -1) {
     Die("close(%d) of read end of pipe failed with %d: %s\n",
         pipes[0], errno, strerror(errno));
@@ -95,6 +98,7 @@
 
   size_t size = test.size();
   size_t default_size = kDefaultArgs.size();
+  // There's no chance to free this because we either exec or Die.
   const char **args = new const char *[size + default_size + 1];
   // The actual executable to run.
   ::std::string executable;
@@ -120,7 +124,9 @@
 void DoRun(Shared *shared) {
   int iterations = 0;
   // An iterator pointing to a random one of the tests.
-  auto test = kTests.begin() + (getpid() % kTests.size());
+  // We randomize based on PID because otherwise they all end up running the
+  // same test at the same time for the whole test.
+  const char *(*test)[] = &kTests[getpid() % kTestsLength];
   int pipes[2];
   while (time::Time::Now() < shared->stop_time) {
     if (pipe(pipes) == -1) {
@@ -212,7 +218,10 @@
   new (shared) Shared(time::Time::Now() + kTestTime);
 
   char *temp = strdup(argv[0]);
-  shared->path = strdup(dirname(temp));
+  if (asprintf(const_cast<char **>(&shared->path),
+               "%s/../tests", dirname(temp)) == -1) {
+    Die("asprintf failed with %d: %s\n", errno, strerror(errno));
+  }
   free(temp);
 
   for (int i = 0; i < kTesters; ++i) {
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
index e67f22c..8a2f15b 100644
--- a/aos/linux_code/ipc_lib/queue.cc
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -38,6 +38,7 @@
 struct RawQueue::MessageHeader {
   int ref_count;
   int index;  // in pool_
+  // Gets the message header immediately preceding msg.
   static MessageHeader *Get(const void *msg) {
     return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
         static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
@@ -50,8 +51,8 @@
     memcpy(this, &temp, sizeof(*this));
   }
 };
-static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
-              " is to stick it in shared memory");
+static_assert(shm_ok<RawQueue::MessageHeader>::value,
+              "the whole point is to stick it in shared memory");
 
 struct RawQueue::ReadData {
   bool writable_start;
@@ -73,7 +74,7 @@
 }
 
 RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
-  : readable_(&data_lock_), writable_(&data_lock_) {
+    : readable_(&data_lock_), writable_(&data_lock_) {
   const size_t name_size = strlen(name) + 1;
   char *temp = static_cast<char *>(shm_malloc(name_size));
   memcpy(temp, name, name_size);
diff --git a/aos/linux_code/ipc_lib/queue_test.cc b/aos/linux_code/ipc_lib/queue_test.cc
index c87bd7b..0a982cc 100644
--- a/aos/linux_code/ipc_lib/queue_test.cc
+++ b/aos/linux_code/ipc_lib/queue_test.cc
@@ -15,6 +15,7 @@
 #include "aos/common/queue_testutils.h"
 #include "aos/common/time.h"
 #include "aos/common/logging/logging.h"
+#include "aos/common/die.h"
 
 using ::testing::AssertionResult;
 using ::testing::AssertionSuccess;
@@ -136,6 +137,9 @@
 
   void SetUp() override {
     ::testing::Test::SetUp();
+
+    SetDieTestMode(true);
+
     fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
     static bool registered = false;
     if (!registered) {