got an IPC stress tester that actually works
I had to write it in C++ to get it to actually exercise processing power
instead of disk IO. Also, I had to tweak some timeouts etc in some of
the tests to avoid random false failures.
diff --git a/aos/atom_code/ipc_lib/ipc_lib.gyp b/aos/atom_code/ipc_lib/ipc_lib.gyp
index 7d42b04..c57dae7 100644
--- a/aos/atom_code/ipc_lib/ipc_lib.gyp
+++ b/aos/atom_code/ipc_lib/ipc_lib.gyp
@@ -61,5 +61,26 @@
'core_lib',
],
},
+ {
+ 'target_name': 'ipc_stress_test',
+ 'type': 'executable',
+ 'sources': [
+ 'ipc_stress_test.cc',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):gtest',
+ '<(AOS)/common/common.gyp:time',
+ '<(AOS)/common/common.gyp:queue_testutils',
+ '<(AOS)/common/common.gyp:mutex',
+ 'core_lib',
+ '<(AOS)/common/common.gyp:die',
+
+ # These are the binaries that it runs.
+ 'ipc_queue_test',
+ '<(AOS)/common/common.gyp:queue_test',
+ '<(AOS)/common/common.gyp:condition_test',
+ '<(AOS)/common/common.gyp:mutex_test',
+ ],
+ },
],
}
diff --git a/aos/atom_code/ipc_lib/ipc_stress_test.cc b/aos/atom_code/ipc_lib/ipc_stress_test.cc
new file mode 100644
index 0000000..dc8c738
--- /dev/null
+++ b/aos/atom_code/ipc_lib/ipc_stress_test.cc
@@ -0,0 +1,248 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <libgen.h>
+
+#include <vector>
+#include <string>
+
+#include "aos/common/time.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/common/mutex.h"
+#include "aos/atom_code/ipc_lib/core_lib.h"
+#include "aos/common/die.h"
+
+// This runs all of the IPC-related tests in a bunch of parallel processes for a
+// while and makes sure that they don't fail. It also captures the stdout and
+// stderr output from each test run and only prints it out (not interleaved with
+// the output from any other run) if the test fails.
+//
+// It's written in C++ for performance. You need actual OS-level parallelism for
+// this to work, which means that Ruby's out because it doesn't have good
+// support for doing that. My Python implementation ended up pretty heavily disk
+// IO-bound which isn't a very good way to test CPU contention.
+
+namespace aos {
+
+// Each test is represented by the name of the test binary and then any
+// arguments to pass to it.
+// Using --gtest_filter is a bad idea because it seems to result in a lot of
+// swapping which causes everything to be disk-bound (at least on my computer).
+static const ::std::vector< ::std::vector< ::std::string>> kTests = {
+ {"queue_test"},
+ {"condition_test"},
+ {"mutex_test"},
+ // TODO(brians): Enable this one once it supports running in parallel.
+ //{"ipc_queue_test"},
+};
+// These arguments get inserted before any per-test arguments.
+static const ::std::vector< ::std::string> kDefaultArgs = {
+ "--gtest_repeat=35",
+ "--gtest_shuffle",
+};
+
+// How many test processes to run at a time.
+static const int kTesters = 12;
+// How long to test for.
+static constexpr time::Time kTestTime = time::Time::InSeconds(20);
+
+// The structure that gets put into shared memory and then referenced by all of
+// the child processes.
+struct Shared {
+ Shared(const time::Time &stop_time)
+ : stop_time(stop_time), total_iterations(0) {}
+
+ // Synchronizes access to stdout/stderr to avoid interleaving failure
+ // messages.
+ Mutex output_mutex;
+
+ // When to stop.
+ time::Time stop_time;
+
+ // The total number of iterations. Updated by each child as it finishes.
+ int total_iterations;
+ // Sychronizes writes to total_iterations
+ Mutex total_iterations_mutex;
+
+ const char *path;
+};
+static_assert(shm_ok<Shared>::value,
+ "it's going to get shared between forked processes");
+
+// Gets called after each child forks to run a test.
+void __attribute__((noreturn)) DoRunTest(
+ Shared *shared, const ::std::vector< ::std::string> &test, int pipes[2]) {
+ if (close(pipes[0]) == -1) {
+ Die("close(%d) of read end of pipe failed with %d: %s\n",
+ pipes[0], errno, strerror(errno));
+ }
+ if (close(STDIN_FILENO) == -1) {
+ Die("close(STDIN_FILENO(=%d)) failed with %d: %s\n",
+ STDIN_FILENO, errno, strerror(errno));
+ }
+ if (dup2(pipes[1], STDOUT_FILENO) == -1) {
+ Die("dup2(%d, STDOUT_FILENO(=%d)) failed with %d: %s\n",
+ pipes[1], STDOUT_FILENO, errno, strerror(errno));
+ }
+ if (dup2(pipes[1], STDERR_FILENO) == -1) {
+ Die("dup2(%d, STDERR_FILENO(=%d)) failed with %d: %s\n",
+ pipes[1], STDERR_FILENO, errno, strerror(errno));
+ }
+
+ size_t size = test.size();
+ size_t default_size = kDefaultArgs.size();
+ const char **args = new const char *[size + default_size + 1];
+ // The actual executable to run.
+ ::std::string executable;
+ int i = 0;
+ for (const ::std::string &c : test) {
+ if (i == 0) {
+ executable = ::std::string(shared->path) + "/" + c;
+ args[0] = executable.c_str();
+ for (const ::std::string &ci : kDefaultArgs) {
+ args[i++] = ci.c_str();
+ }
+ } else {
+ args[i] = c.c_str();
+ }
+ ++i;
+ }
+ args[size] = NULL;
+ execv(executable.c_str(), const_cast<char *const *>(args));
+ Die("execv(%s, %p) failed with %d: %s\n",
+ executable.c_str(), args, errno, strerror(errno));
+}
+
+void DoRun(Shared *shared) {
+ int iterations = 0;
+ // An iterator pointing to a random one of the tests.
+ auto test = kTests.begin() + (getpid() % kTests.size());
+ int pipes[2];
+ while (time::Time::Now() < shared->stop_time) {
+ if (pipe(pipes) == -1) {
+ Die("pipe(%p) failed with %d: %s\n", &pipes, errno, strerror(errno));
+ }
+ switch (fork()) {
+ case 0: // in runner
+ DoRunTest(shared, *test, pipes);
+ case -1:
+ Die("fork() failed with %d: %s\n", errno, strerror(errno));
+ }
+
+ if (close(pipes[1]) == -1) {
+ Die("close(%d) of write end of pipe failed with %d: %s\n",
+ pipes[1], errno, strerror(errno));
+ }
+
+ ::std::string output;
+ char buffer[2048];
+ while (true) {
+ ssize_t ret = read(pipes[0], &buffer, sizeof(buffer));
+ if (ret == 0) { // EOF
+ if (close(pipes[0]) == -1) {
+ Die("close(%d) of pipe at EOF failed with %d: %s\n",
+ pipes[0], errno, strerror(errno));
+ }
+ break;
+ } else if (ret == -1) {
+ Die("read(%d, %p, %zd) failed with %d: %s\n",
+ pipes[0], &buffer, sizeof(buffer), errno, strerror(errno));
+ }
+ output += ::std::string(buffer, ret);
+ }
+
+ int status;
+ while (true) {
+ if (wait(&status) == -1) {
+ if (errno == EINTR) continue;
+ Die("wait(%p) in child failed with %d: %s\n",
+ &status, errno, strerror(errno));
+ } else {
+ break;
+ }
+ }
+ if (WIFEXITED(status)) {
+ if (WEXITSTATUS(status) != 0) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s exited with status %d. output:\n",
+ test->at(0).c_str(), WEXITSTATUS(status));
+ fputs(output.c_str(), stderr);
+ }
+ } else if (WIFSIGNALED(status)) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s terminated by signal %d: %s.\n",
+ test->at(0).c_str(),
+ WTERMSIG(status), strsignal(WTERMSIG(status)));
+ fputs(output.c_str(), stderr);
+ } else {
+ assert(WIFSTOPPED(status));
+ Die("Test %s was stopped.\n", test->at(0).c_str());
+ }
+
+ ++test;
+ if (test == kTests.end()) test = kTests.begin();
+ ++iterations;
+ }
+ {
+ MutexLocker sync(&shared->total_iterations_mutex);
+ shared->total_iterations += iterations;
+ }
+}
+
+void Run(Shared *shared) {
+ switch (fork()) {
+ case 0: // in child
+ DoRun(shared);
+ _exit(EXIT_SUCCESS);
+ case -1:
+ Die("fork() of child failed with %d: %s\n", errno, strerror(errno));
+ }
+}
+
+int Main(int argc, char **argv) {
+ assert(argc >= 1);
+
+ ::aos::common::testing::GlobalCoreInstance global_core;
+
+ Shared *shared = static_cast<Shared *>(shm_malloc(sizeof(Shared)));
+ new (shared) Shared(time::Time::Now() + kTestTime);
+
+ char *temp = strdup(argv[0]);
+ shared->path = strdup(dirname(temp));
+ free(temp);
+
+ for (int i = 0; i < kTesters; ++i) {
+ Run(shared);
+ }
+
+ bool error = false;
+ for (int i = 0; i < kTesters; ++i) {
+ int status;
+ if (wait(&status) == -1) {
+ if (errno == EINTR) {
+ --i;
+ } else {
+ Die("wait(%p) failed with %d: %s\n", &status, errno, strerror(errno));
+ }
+ }
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ error = true;
+ }
+ }
+
+ printf("Ran a total of %d tests.\n", shared->total_iterations);
+ if (error) {
+ printf("A child had a problem during the test.\n");
+ }
+ return error ? EXIT_FAILURE : EXIT_SUCCESS;
+}
+
+} // namespace aos
+
+int main(int argc, char **argv) {
+ return ::aos::Main(argc, argv);
+}
diff --git a/aos/atom_code/ipc_lib/ipc_stress_test.py b/aos/atom_code/ipc_lib/ipc_stress_test.py
deleted file mode 100755
index 8e402ac..0000000
--- a/aos/atom_code/ipc_lib/ipc_stress_test.py
+++ /dev/null
@@ -1,67 +0,0 @@
-#!/usr/bin/python
-
-# This is a script that runs all of the tests that deal with the IPC stuff in
-# parallel for a while and makes sure that they don't ever fail.
-
-import subprocess
-import random
-import multiprocessing
-import time
-import os
-import Queue
-import sys
-
-TESTS = [
- ['queue_test'],
- ['condition_test'],
- # The fairness test doesn't work under load.
- ['mutex_test', '--gtest_filter=-MutexTest.Fairness'],
- #['ipc_queue_test'],
-]
-TESTS_PATH = '../../../out_atom/Default/tests'
-# The tests spend a lot of their time waiting (ie for things to time out), so I
-# had to use this many to get the highest CPU utilization.
-TESTERS = 35
-TEST_TIME = 10
-
-def run(iterations_queue, output_lock, stop_time):
- iterations = 0
- while time.time() < stop_time:
- test = random.choice(TESTS)
- try:
- output = subprocess.check_output(
- ["%s/%s/%s" %(
- os.path.dirname(os.path.abspath(__file__)), TESTS_PATH, test[0])] +
- test[1:],
- stderr=subprocess.STDOUT,
- bufsize=-1)
- except subprocess.CalledProcessError as error:
- with output_lock:
- sys.stderr.write("------Test %s failed with exit %d output:------\n%s" %
- (test, error.returncode, error.output))
- iterations += 1
- iterations_queue.put(iterations)
-
-def main():
- processes = []
- output_lock = multiprocessing.Lock()
- iterations_queue = multiprocessing.Queue()
- stop_time = time.time() + TEST_TIME
- stop_event = multiprocessing.Event()
- for _ in xrange(TESTERS):
- process = multiprocessing.Process(target=run,
- args=(iterations_queue, output_lock, stop_time))
- processes.append(process)
- process.start()
- for process in processes:
- process.join()
- total_iterations = 0
- try:
- while True:
- total_iterations += iterations_queue.get_nowait()
- except Queue.Empty:
- pass
- print("Iterated a total of %d times." % total_iterations)
-
-if __name__ == '__main__':
- main()
diff --git a/aos/build/aos_all.gyp b/aos/build/aos_all.gyp
index adfaf65..de15572 100644
--- a/aos/build/aos_all.gyp
+++ b/aos/build/aos_all.gyp
@@ -13,7 +13,8 @@
'../atom_code/camera/camera.gyp:CameraHTTPStreamer',
'../atom_code/camera/camera.gyp:CameraReader',
'../atom_code/core/core.gyp:*',
- '../atom_code/ipc_lib/ipc_lib.gyp:*',
+ '../atom_code/ipc_lib/ipc_lib.gyp:ipc_queue_test',
+ '../atom_code/ipc_lib/ipc_lib.gyp:ipc_stress_test',
'../atom_code/starter/starter.gyp:starter_exe',
'../atom_code/starter/starter.gyp:netconsole',
'../common/common.gyp:queue_test',
diff --git a/aos/common/condition_test.cc b/aos/common/condition_test.cc
index ddc12d5..49112bc 100644
--- a/aos/common/condition_test.cc
+++ b/aos/common/condition_test.cc
@@ -40,7 +40,7 @@
Shared *const shared_;
void Settle() {
- time::SleepFor(::Time::InSeconds(0.009));
+ time::SleepFor(::Time::InSeconds(0.008));
}
};
@@ -56,7 +56,7 @@
// This amount gets added to any passed in delay to make the test repeatable.
static constexpr ::Time kMinimumDelay = ::Time::InSeconds(0.015);
- static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.06);
+ static constexpr ::Time kDefaultTimeout = ::Time::InSeconds(0.09);
// delay is how long to wait before doing action to condition.
// timeout is how long to wait after delay before deciding that it's hung.
diff --git a/aos/common/mutex_test.cpp b/aos/common/mutex_test.cpp
index d9327a1..6ab7ed6 100644
--- a/aos/common/mutex_test.cpp
+++ b/aos/common/mutex_test.cpp
@@ -110,7 +110,7 @@
#ifdef __VXWORKS__
// Without this, all of the "task ... deleted ..." messages come out at
// once, and it looks weird and triggers an socat bug (at least for
- // Squeeze's version 1.7.1.3-1).
+ // Squeeze's version 1.7.1.3-1) which locks it up and is very annoying.
taskDelay(index_);
#endif
}
@@ -138,7 +138,7 @@
#ifdef __VXWORKS__
static const int kWarmupCycles = 1000, kRunCycles = 60000, kMaxDeviation = 20;
#else
- static const int kWarmupCycles = 30000, kRunCycles = 3000000, kMaxDeviation = 10000;
+ static const int kWarmupCycles = 30000, kRunCycles = 3000000, kMaxDeviation = 20000;
#endif
int cycles[kThreads];
@@ -166,8 +166,8 @@
for (int i = 0; i < kThreads; ++i) {
variance += (cycles[i] - expected) * (cycles[i] - expected);
}
+ ASSERT_GT(variance, 0);
double deviation = sqrt(variance / kThreads);
- printf("deviation=%f\n", deviation);
ASSERT_GT(deviation, 0);
EXPECT_LT(deviation, kMaxDeviation);
}
diff --git a/aos/common/queue_test.cc b/aos/common/queue_test.cc
index 65a1c25..32d1d23 100644
--- a/aos/common/queue_test.cc
+++ b/aos/common/queue_test.cc
@@ -48,7 +48,7 @@
usleep(50000);
my_test_queue.MakeWithBuilder().test_bool(true).test_int(0x971).Send();
t.Join();
- EXPECT_EQ(true, t.threaded_test_queue.IsNewerThanMS(20));
+ EXPECT_LE(t.threaded_test_queue.Age(), time::Time::InMS(55));
}
// Tests that we can send a message with the message pointer and get it back.