run tests in parallel
diff --git a/aos/build/build.py b/aos/build/build.py
index 5d7011d..27e11d5 100755
--- a/aos/build/build.py
+++ b/aos/build/build.py
@@ -9,6 +9,53 @@
import string
import shutil
import errno
+import queue
+import threading
+
+class TestThread(threading.Thread):
+ def __init__(self, executable, env, done_queue, start_semaphore):
+ super(TestThread, self).__init__(
+ name=os.path.split(executable)[1])
+
+ self.executable = executable
+ self.env = env
+ self.done_queue = done_queue
+ self.start_semaphore = start_semaphore
+
+ self.process_lock = threading.Lock()
+ self.process = None
+ self.stopped = False
+
+ def run(self):
+ with self.start_semaphore:
+ if self.stopped: return
+ test_output('Starting test %s...' % self.name)
+ self.output, subprocess_output = os.pipe()
+ with self.process_lock:
+ self.process = subprocess.Popen((self.executable,
+ '--gtest_color=yes'),
+ env=self.env,
+ stderr=subprocess.STDOUT,
+ stdout=subprocess_output,
+ stdin=open(os.devnull, 'r'))
+ os.close(subprocess_output)
+ self.process.wait()
+ with self.process_lock:
+ self.returncode = self.process.returncode
+ self.process = None
+ if not self.stopped:
+ self.done_queue.put(self)
+
+ def terminate_process(self):
+ with self.process_lock:
+ self.stopped = True
+ if not self.process: return
+ self.process.terminate()
+ def kill_process(self):
+ with self.process_lock:
+ self.stopped = True
+ if not self.process: return
+ self.process.kill()
def aos_path():
return os.path.join(os.path.dirname(__file__), '..')
@@ -31,6 +78,11 @@
def user_output(message):
print('build.py: ' + message, file=sys.stderr)
+test_output_lock = threading.RLock()
+def test_output(message):
+ with test_output_lock:
+ print('build.py: ' + message, file=sys.stdout)
+
class Processor(object):
class UnknownPlatform(Exception):
def __init__(self, message):
@@ -217,11 +269,11 @@
SANITIZER_TEST_WARNINGS = {
'memory': (True,
"""We don't have all of the libraries instrumented which leads to lots of false
-errors with msan (especially stdlibc++).
-TODO(brians): Figure out a way to deal with it."""),
+ errors with msan (especially stdlibc++).
+ TODO(brians): Figure out a way to deal with it."""),
'undefined': (False,
"""There are several warnings in other people's code that ubsan catches.
-The following have been verified non-interesting:
+ The following have been verified non-interesting:
include/c++/4.8.2/array:*: runtime error: reference binding to null pointer of type 'int'
This happens with ::std::array<T, 0> and it doesn't seem to cause any issues.
output/downloaded/eigen-3.2.1/Eigen/src/Core/util/Memory.h:782:*: runtime error: load of misaligned address 0x* for type 'const int', which requires 4 byte alignment
@@ -256,8 +308,9 @@
if is_test:
self.default_platforms = self.select_platforms(architecture='amd64',
debug=True)
- for sanitizer in PrimeProcessor.SANITIZER_TEST_WARNINGS:
- self.default_platforms -= self.select_platforms(sanitizer=sanitizer)
+ for sanitizer, warning in PrimeProcessor.SANITIZER_TEST_WARNINGS.items():
+ if warning[0]:
+ self.default_platforms -= self.select_platforms(sanitizer=sanitizer)
elif is_deploy:
# TODO(brians): Switch to deploying the code built with clang.
self.default_platforms = self.select_platforms(architecture='arm',
@@ -342,6 +395,10 @@
'target',
help='target to build',
nargs='*')
+ parser.add_argument(
+ '--jobs', '-j',
+ help='number of things to do at once',
+ type=int)
def AddCommonArgs(parser):
parser.add_argument(
'platforms',
@@ -548,9 +605,11 @@
user_output("Not running gyp")
try:
- subprocess.check_call(
- (tools_config['NINJA'],
- '-C', platform.outdir()) + tuple(targets),
+ call = (tools_config['NINJA'],
+ '-C', platform.outdir()) + tuple(targets)
+ if args.jobs:
+ call += ('-j', str(args.jobs))
+ subprocess.check_call(call,
stdin=open(os.devnull, 'r'),
env=env(platform))
except subprocess.CalledProcessError as e:
@@ -562,12 +621,59 @@
platform.deploy(args.dry_run)
elif args.action_name == 'tests':
dirname = os.path.join(platform.outdir(), 'tests')
- for f in targets or os.listdir(dirname):
- user_output('Running test %s...' % f)
- subprocess.check_call(
- os.path.join(dirname, f),
- env=env(platform))
- user_output('Test %s succeeded' % f)
+ done_queue = queue.Queue()
+ running = []
+ if args.jobs:
+ number_jobs = args.jobs
+ else:
+ number_jobs = os.sysconf('SC_NPROCESSORS_ONLN') + 2
+ test_start_semaphore = threading.Semaphore(number_jobs)
+ if targets:
+ to_run = []
+ for target in targets:
+ if target.endswith('_test'):
+ to_run.append(target)
+ else:
+ to_run = os.listdir(dirname)
+ for f in to_run:
+ thread = TestThread(os.path.join(dirname, f), env(platform), done_queue,
+ test_start_semaphore)
+ running.append(thread)
+ thread.start()
+ try:
+ while running:
+ done = done_queue.get()
+ running.remove(done)
+ with test_output_lock:
+ test_output('Output from test %s:' % done.name)
+ for line in os.fdopen(done.output):
+ if not sys.stdout.isatty():
+ # Remove color escape codes.
+ line = re.sub(r'\x1B\[[0-9;]*[a-zA-Z]', '', line)
+ sys.stdout.write(line)
+ if done.returncode == 0:
+ test_output('Test %s succeeded' % done.name)
+ else:
+ test_output('Test %s failed' % done.name)
+ user_output('Aborting because of test failure.')
+ exit(1)
+ finally:
+ if running:
+ test_output('Killing other tests...')
+ for thread in running:
+ thread.terminate_process()
+ to_remove = []
+ for thread in running:
+ thread.join(5)
+ if not thread.is_alive():
+ to_remove.append(thread);
+ for thread in to_remove: running.remove(thread)
+ for thread in running:
+ test_output(
+ 'Test %s did not terminate. Killing it.' % thread.name)
+ thread.kill_process()
+ thread.join()
+ test_output('Done killing other tests')
user_output('Done building %s (%d/%d)' % (platform, num, len(platforms)))
num += 1