got an IPC stress tester that actually works

I had to write it in C++ to get it to actually exercise processing power
instead of disk IO. Also, I had to tweak some timeouts etc in some of
the tests to avoid random false failures.
diff --git a/aos/atom_code/ipc_lib/ipc_lib.gyp b/aos/atom_code/ipc_lib/ipc_lib.gyp
index 7d42b04..c57dae7 100644
--- a/aos/atom_code/ipc_lib/ipc_lib.gyp
+++ b/aos/atom_code/ipc_lib/ipc_lib.gyp
@@ -61,5 +61,26 @@
         'core_lib',
       ],
     },
+    {
+      'target_name': 'ipc_stress_test',
+      'type': 'executable',
+      'sources': [
+        'ipc_stress_test.cc',
+      ],
+      'dependencies': [
+        '<(EXTERNALS):gtest',
+        '<(AOS)/common/common.gyp:time',
+        '<(AOS)/common/common.gyp:queue_testutils',
+        '<(AOS)/common/common.gyp:mutex',
+        'core_lib',
+        '<(AOS)/common/common.gyp:die',
+
+        # These are the binaries that it runs.
+        'ipc_queue_test',
+        '<(AOS)/common/common.gyp:queue_test',
+        '<(AOS)/common/common.gyp:condition_test',
+        '<(AOS)/common/common.gyp:mutex_test',
+      ],
+    },
   ],
 }
diff --git a/aos/atom_code/ipc_lib/ipc_stress_test.cc b/aos/atom_code/ipc_lib/ipc_stress_test.cc
new file mode 100644
index 0000000..dc8c738
--- /dev/null
+++ b/aos/atom_code/ipc_lib/ipc_stress_test.cc
@@ -0,0 +1,248 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <libgen.h>
+
+#include <vector>
+#include <string>
+
+#include "aos/common/time.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/common/mutex.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+#include "aos/common/die.h"
+
+// This runs all of the IPC-related tests in a bunch of parallel processes for a
+// while and makes sure that they don't fail. It also captures the stdout and
+// stderr output from each test run and only prints it out (not interleaved with
+// the output from any other run) if the test fails.
+//
+// It's written in C++ for performance. You need actual OS-level parallelism for
+// this to work, which means that Ruby's out because it doesn't have good
+// support for doing that. My Python implementation ended up pretty heavily disk
+// IO-bound which isn't a very good way to test CPU contention.
+
+namespace aos {
+
+// Each test is represented by the name of the test binary and then any
+// arguments to pass to it.
+// Using --gtest_filter is a bad idea because it seems to result in a lot of
+// swapping which causes everything to be disk-bound (at least on my computer).
+static const ::std::vector< ::std::vector< ::std::string>> kTests = {
+  {"queue_test"},
+  {"condition_test"},
+  {"mutex_test"},
+  // TODO(brians): Enable this one once it supports running in parallel.
+  //{"ipc_queue_test"},
+};
+// These arguments get inserted before any per-test arguments.
+static const ::std::vector< ::std::string> kDefaultArgs = {
+  "--gtest_repeat=35",
+  "--gtest_shuffle",
+};
+
+// How many test processes to run at a time.
+static const int kTesters = 12;
+// How long to test for.
+static constexpr time::Time kTestTime = time::Time::InSeconds(20);
+
+// The structure that gets put into shared memory and then referenced by all of
+// the child processes.
+struct Shared {
+  Shared(const time::Time &stop_time)
+    : stop_time(stop_time), total_iterations(0) {}
+
+  // Synchronizes access to stdout/stderr to avoid interleaving failure
+  // messages.
+  Mutex output_mutex;
+
+  // When to stop.
+  time::Time stop_time;
+
+  // The total number of iterations. Updated by each child as it finishes.
+  int total_iterations;
+  // Sychronizes writes to total_iterations
+  Mutex total_iterations_mutex;
+
+  const char *path;
+};
+static_assert(shm_ok<Shared>::value,
+              "it's going to get shared between forked processes");
+
+// Gets called after each child forks to run a test.
+void __attribute__((noreturn)) DoRunTest(
+    Shared *shared, const ::std::vector< ::std::string> &test, int pipes[2]) {
+  if (close(pipes[0]) == -1) {
+    Die("close(%d) of read end of pipe failed with %d: %s\n",
+        pipes[0], errno, strerror(errno));
+  }
+  if (close(STDIN_FILENO) == -1) {
+    Die("close(STDIN_FILENO(=%d)) failed with %d: %s\n",
+        STDIN_FILENO, errno, strerror(errno));
+  }
+  if (dup2(pipes[1], STDOUT_FILENO) == -1) {
+    Die("dup2(%d, STDOUT_FILENO(=%d)) failed with %d: %s\n",
+        pipes[1], STDOUT_FILENO, errno, strerror(errno));
+  }
+  if (dup2(pipes[1], STDERR_FILENO) == -1) {
+    Die("dup2(%d, STDERR_FILENO(=%d)) failed with %d: %s\n",
+        pipes[1], STDERR_FILENO, errno, strerror(errno));
+  }
+
+  size_t size = test.size();
+  size_t default_size = kDefaultArgs.size();
+  const char **args = new const char *[size + default_size + 1];
+  // The actual executable to run.
+  ::std::string executable;
+  int i = 0;
+  for (const ::std::string &c : test) {
+    if (i == 0) {
+      executable = ::std::string(shared->path) + "/" + c;
+      args[0] = executable.c_str();
+      for (const ::std::string &ci : kDefaultArgs) {
+        args[i++] = ci.c_str();
+      }
+    } else {
+      args[i] = c.c_str();
+    }
+    ++i;
+  }
+  args[size] = NULL;
+  execv(executable.c_str(), const_cast<char *const *>(args));
+  Die("execv(%s, %p) failed with %d: %s\n",
+      executable.c_str(), args, errno, strerror(errno));
+}
+
+void DoRun(Shared *shared) {
+  int iterations = 0;
+  // An iterator pointing to a random one of the tests.
+  auto test = kTests.begin() + (getpid() % kTests.size());
+  int pipes[2];
+  while (time::Time::Now() < shared->stop_time) {
+    if (pipe(pipes) == -1) {
+      Die("pipe(%p) failed with %d: %s\n", &pipes, errno, strerror(errno));
+    }
+    switch (fork()) {
+      case 0:  // in runner
+        DoRunTest(shared, *test, pipes);
+      case -1:
+        Die("fork() failed with %d: %s\n", errno, strerror(errno));
+    }
+
+    if (close(pipes[1]) == -1) {
+      Die("close(%d) of write end of pipe failed with %d: %s\n",
+          pipes[1], errno, strerror(errno));
+    }
+
+    ::std::string output;
+    char buffer[2048];
+    while (true) {
+      ssize_t ret = read(pipes[0], &buffer, sizeof(buffer));
+      if (ret == 0) {  // EOF
+        if (close(pipes[0]) == -1) {
+          Die("close(%d) of pipe at EOF failed with %d: %s\n",
+              pipes[0], errno, strerror(errno));
+        }
+        break;
+      } else if (ret == -1) {
+        Die("read(%d, %p, %zd) failed with %d: %s\n",
+            pipes[0], &buffer, sizeof(buffer), errno, strerror(errno));
+      }
+      output += ::std::string(buffer, ret);
+    }
+
+    int status;
+    while (true) {
+      if (wait(&status) == -1) {
+        if (errno == EINTR) continue;
+        Die("wait(%p) in child failed with %d: %s\n",
+            &status, errno, strerror(errno));
+      } else {
+        break;
+      }
+    }
+    if (WIFEXITED(status)) {
+      if (WEXITSTATUS(status) != 0) {
+        MutexLocker sync(&shared->output_mutex);
+        fprintf(stderr, "Test %s exited with status %d. output:\n",
+                test->at(0).c_str(), WEXITSTATUS(status));
+        fputs(output.c_str(), stderr);
+      }
+    } else if (WIFSIGNALED(status)) {
+      MutexLocker sync(&shared->output_mutex);
+      fprintf(stderr, "Test %s terminated by signal %d: %s.\n",
+              test->at(0).c_str(),
+              WTERMSIG(status), strsignal(WTERMSIG(status)));
+        fputs(output.c_str(), stderr);
+    } else {
+      assert(WIFSTOPPED(status));
+      Die("Test %s was stopped.\n", test->at(0).c_str());
+    }
+
+    ++test;
+    if (test == kTests.end()) test = kTests.begin();
+    ++iterations;
+  }
+  {
+    MutexLocker sync(&shared->total_iterations_mutex);
+    shared->total_iterations += iterations;
+  }
+}
+
+void Run(Shared *shared) {
+  switch (fork()) {
+    case 0:  // in child
+      DoRun(shared);
+      _exit(EXIT_SUCCESS);
+    case -1:
+      Die("fork() of child failed with %d: %s\n", errno, strerror(errno));
+  }
+}
+
+int Main(int argc, char **argv) {
+  assert(argc >= 1);
+
+  ::aos::common::testing::GlobalCoreInstance global_core;
+
+  Shared *shared = static_cast<Shared *>(shm_malloc(sizeof(Shared)));
+  new (shared) Shared(time::Time::Now() + kTestTime);
+
+  char *temp = strdup(argv[0]);
+  shared->path = strdup(dirname(temp));
+  free(temp);
+
+  for (int i = 0; i < kTesters; ++i) {
+    Run(shared);
+  }
+
+  bool error = false;
+  for (int i = 0; i < kTesters; ++i) {
+    int status;
+    if (wait(&status) == -1) {
+      if (errno == EINTR) {
+        --i;
+      } else {
+        Die("wait(%p) failed with %d: %s\n", &status, errno, strerror(errno));
+      }
+    }
+    if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+      error = true;
+    }
+  }
+
+  printf("Ran a total of %d tests.\n", shared->total_iterations);
+  if (error) {
+    printf("A child had a problem during the test.\n");
+  }
+  return error ? EXIT_FAILURE : EXIT_SUCCESS;
+}
+
+}  // namespace aos
+
+int main(int argc, char **argv) {
+  return ::aos::Main(argc, argv);
+}
diff --git a/aos/atom_code/ipc_lib/ipc_stress_test.py b/aos/atom_code/ipc_lib/ipc_stress_test.py
deleted file mode 100755
index 8e402ac..0000000
--- a/aos/atom_code/ipc_lib/ipc_stress_test.py
+++ /dev/null
@@ -1,67 +0,0 @@
-#!/usr/bin/python
-
-# This is a script that runs all of the tests that deal with the IPC stuff in
-# parallel for a while and makes sure that they don't ever fail.
-
-import subprocess
-import random
-import multiprocessing
-import time
-import os
-import Queue
-import sys
-
-TESTS = [
-  ['queue_test'],
-  ['condition_test'],
-  # The fairness test doesn't work under load.
-  ['mutex_test', '--gtest_filter=-MutexTest.Fairness'],
-  #['ipc_queue_test'],
-]
-TESTS_PATH = '../../../out_atom/Default/tests'
-# The tests spend a lot of their time waiting (ie for things to time out), so I
-# had to use this many to get the highest CPU utilization.
-TESTERS = 35
-TEST_TIME = 10
-
-def run(iterations_queue, output_lock, stop_time):
-  iterations = 0
-  while time.time() < stop_time:
-    test = random.choice(TESTS)
-    try:
-      output = subprocess.check_output(
-          ["%s/%s/%s" %(
-            os.path.dirname(os.path.abspath(__file__)), TESTS_PATH, test[0])] +
-            test[1:],
-          stderr=subprocess.STDOUT,
-          bufsize=-1)
-    except subprocess.CalledProcessError as error:
-      with output_lock:
-        sys.stderr.write("------Test %s failed with exit %d output:------\n%s" %
-            (test, error.returncode, error.output))
-    iterations += 1
-  iterations_queue.put(iterations)
-
-def main():
-  processes = []
-  output_lock = multiprocessing.Lock()
-  iterations_queue = multiprocessing.Queue()
-  stop_time = time.time() + TEST_TIME
-  stop_event = multiprocessing.Event()
-  for _ in xrange(TESTERS):
-    process = multiprocessing.Process(target=run,
-      args=(iterations_queue, output_lock, stop_time))
-    processes.append(process)
-    process.start()
-  for process in processes:
-    process.join()
-  total_iterations = 0
-  try:
-    while True:
-      total_iterations += iterations_queue.get_nowait()
-  except Queue.Empty:
-    pass
-  print("Iterated a total of %d times." % total_iterations)
-
-if __name__ == '__main__':
-  main()
diff --git a/aos/build/aos_all.gyp b/aos/build/aos_all.gyp
index adfaf65..de15572 100644
--- a/aos/build/aos_all.gyp
+++ b/aos/build/aos_all.gyp
@@ -13,7 +13,8 @@
         '../atom_code/camera/camera.gyp:CameraHTTPStreamer',
         '../atom_code/camera/camera.gyp:CameraReader',
         '../atom_code/core/core.gyp:*',
-        '../atom_code/ipc_lib/ipc_lib.gyp:*',
+        '../atom_code/ipc_lib/ipc_lib.gyp:ipc_queue_test',
+        '../atom_code/ipc_lib/ipc_lib.gyp:ipc_stress_test',
         '../atom_code/starter/starter.gyp:starter_exe',
         '../atom_code/starter/starter.gyp:netconsole',
         '../common/common.gyp:queue_test',
diff --git a/aos/common/condition_test.cc b/aos/common/condition_test.cc
index ddc12d5..49112bc 100644
--- a/aos/common/condition_test.cc
+++ b/aos/common/condition_test.cc
@@ -40,7 +40,7 @@
   Shared *const shared_;
 
   void Settle() {
-    time::SleepFor(::Time::InSeconds(0.009));
+    time::SleepFor(::Time::InSeconds(0.008));
   }
 };
 
@@ -56,7 +56,7 @@
 
   // This amount gets added to any passed in delay to make the test repeatable.
   static constexpr ::Time kMinimumDelay = ::Time::InSeconds(0.015);
-  static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.06);
+  static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.09);
 
   // delay is how long to wait before doing action to condition.
   // timeout is how long to wait after delay before deciding that it's hung.
diff --git a/aos/common/mutex_test.cpp b/aos/common/mutex_test.cpp
index d9327a1..6ab7ed6 100644
--- a/aos/common/mutex_test.cpp
+++ b/aos/common/mutex_test.cpp
@@ -110,7 +110,7 @@
 #ifdef __VXWORKS__
     // Without this, all of the "task ... deleted ..." messages come out at
     // once, and it looks weird and triggers an socat bug (at least for
-    // Squeeze's version 1.7.1.3-1).
+    // Squeeze's version 1.7.1.3-1) which locks it up and is very annoying.
     taskDelay(index_);
 #endif
   }
@@ -138,7 +138,7 @@
 #ifdef __VXWORKS__
   static const int kWarmupCycles = 1000, kRunCycles = 60000, kMaxDeviation = 20;
 #else
-  static const int kWarmupCycles = 30000, kRunCycles = 3000000, kMaxDeviation = 10000;
+  static const int kWarmupCycles = 30000, kRunCycles = 3000000, kMaxDeviation = 20000;
 #endif
 
   int cycles[kThreads];
@@ -166,8 +166,8 @@
   for (int i = 0; i < kThreads; ++i) {
     variance += (cycles[i] - expected) * (cycles[i] - expected);
   }
+  ASSERT_GT(variance, 0);
   double deviation = sqrt(variance / kThreads);
-  printf("deviation=%f\n", deviation);
   ASSERT_GT(deviation, 0);
   EXPECT_LT(deviation, kMaxDeviation);
 }
diff --git a/aos/common/queue_test.cc b/aos/common/queue_test.cc
index 65a1c25..32d1d23 100644
--- a/aos/common/queue_test.cc
+++ b/aos/common/queue_test.cc
@@ -48,7 +48,7 @@
   usleep(50000);
   my_test_queue.MakeWithBuilder().test_bool(true).test_int(0x971).Send();
   t.Join();
-  EXPECT_EQ(true, t.threaded_test_queue.IsNewerThanMS(20));
+  EXPECT_LE(t.threaded_test_queue.Age(), time::Time::InMS(55));
 }
 
 // Tests that we can send a message with the message pointer and get it back.