[oe-commits] [openembedded-core] 01/02: oeqa: Add selftest parallelisation support
git at git.openembedded.org
git at git.openembedded.org
Tue Jul 10 17:58:34 UTC 2018
This is an automated email from the git hooks/post-receive script.
rpurdie pushed a commit to branch master-next
in repository openembedded-core.
commit af5207525965d93d9a66e07225c3cfb4b2214585
Author: Richard Purdie <richard.purdie at linuxfoundation.org>
AuthorDate: Mon Jul 9 15:20:34 2018 +0000
oeqa: Add selftest parallelisation support
This allows oe-selftest to take a -j option which specifies how much test
parallelisation to use. Currently this is "module" based with each module
being split and run in a separate build directory. Further splitting could
be done but this seems a good compromise between test setup and parallelism.
You need python-testtools and python-subunit installed to used this.
Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
meta/lib/oeqa/core/context.py | 14 ++-
meta/lib/oeqa/core/utils/concurrencytest.py | 175 ++++++++++++++++++++++++++++
meta/lib/oeqa/selftest/context.py | 7 +-
3 files changed, 192 insertions(+), 4 deletions(-)
diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py
index acd5474..b8e4131 100644
--- a/meta/lib/oeqa/core/context.py
+++ b/meta/lib/oeqa/core/context.py
@@ -58,14 +58,20 @@ class OETestContext(object):
modules_required, filters)
self.suites = self.loader.discover()
- def runTests(self, skips=[]):
+ def runTests(self, processes=None, skips=[]):
self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
# Dinamically skip those tests specified though arguments
self.skipTests(skips)
self._run_start_time = time.time()
- result = self.runner.run(self.suites)
+ if processes:
+ from oeqa.core.utils.concurrencytest import fork_for_tests, ConcurrentTestSuite
+
+ concurrent_suite = ConcurrentTestSuite(self.suites, fork_for_tests(processes))
+ result = self.runner.run(concurrent_suite)
+ else:
+ result = self.runner.run(self.suites)
self._run_end_time = time.time()
return result
@@ -87,6 +93,8 @@ class OETestContextExecutor(object):
default_test_data = os.path.join(default_cases[0], 'data.json')
default_tests = None
+ processes = None
+
def register_commands(self, logger, subparsers):
self.parser = subparsers.add_parser(self.name, help=self.help,
description=self.description, group='components')
@@ -150,12 +158,14 @@ class OETestContextExecutor(object):
self.tc_kwargs['init']['td'] = {}
if args.run_tests:
+ print(str(args.run_tests))
self.tc_kwargs['load']['modules'] = args.run_tests
self.tc_kwargs['load']['modules_required'] = args.run_tests
else:
self.tc_kwargs['load']['modules'] = []
self.tc_kwargs['run']['skips'] = []
+ self.tc_kwargs['run']['processes'] = args.processes
self.module_paths = args.CASES_PATHS
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
new file mode 100644
index 0000000..ce627ec
--- /dev/null
+++ b/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -0,0 +1,175 @@
+#!/usr/bin/env python3
+#
+# Modified for use in OE by Richard Purdie, 2018
+#
+# Modified by: Corey Goldberg, 2013
+# License: GPLv2+
+#
+# Original code from:
+# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
+# Copyright (C) 2005-2011 Canonical Ltd
+# License: GPLv2+
+
+import os
+import sys
+import traceback
+import unittest
+import subprocess
+from itertools import cycle
+
+from subunit import ProtocolTestCase, TestProtocolClient
+from subunit.test_results import AutoTimingTestResultDecorator
+
+from testtools import ConcurrentTestSuite, iterate_tests
+
+import bb.utils
+import oe.path
+
+_all__ = [
+ 'ConcurrentTestSuite',
+ 'fork_for_tests',
+ 'partition_tests',
+]
+
+def fork_for_tests(concurrency_num):
+ """Implementation of `make_tests` used to construct `ConcurrentTestSuite`.
+ :param concurrency_num: number of processes to use.
+ """
+ def do_fork(suite):
+ """Take suite and start up multiple runners by forking (Unix only).
+ :param suite: TestSuite object.
+ :return: An iterable of TestCase-like objects which can each have
+ run(result) called on them to feed tests to result.
+ """
+ result = []
+ test_blocks = partition_tests(suite, concurrency_num)
+ # Clear the tests from the original suite so it doesn't keep them alive
+ suite._tests[:] = []
+ for process_tests in test_blocks:
+ process_suite = unittest.TestSuite(process_tests)
+ # Also clear each split list so new suite has only reference
+ process_tests[:] = []
+ c2pread, c2pwrite = os.pipe()
+ pid = os.fork()
+ if pid == 0:
+ ourpid = os.getpid()
+ try:
+ # Create a new separate BUILDDIR for each group of tests
+ builddir = os.environ['BUILDDIR']
+ newbuilddir = builddir + "-st-" + str(ourpid)
+ selftestdir = os.path.abspath(builddir + "/../meta-selftest")
+ newselftestdir = newbuilddir + "/meta-selftest"
+
+ bb.utils.mkdirhier(newbuilddir)
+ oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
+ oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
+ oe.path.copytree(selftestdir, newselftestdir)
+
+ for e in os.environ:
+ if builddir in os.environ[e]:
+ os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
+
+ subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
+ subprocess.check_output("bitbake-layers remove-layer %s" % selftestdir, cwd=newbuilddir, shell=True)
+ subprocess.check_output("bitbake-layers add-layer %s" % newselftestdir, cwd=newbuilddir, shell=True)
+ os.chdir(newbuilddir)
+
+ for t in process_suite:
+ cp = t.tc.config_paths
+ for p in cp:
+ if selftestdir in cp[p] and newselftestdir not in cp[p]:
+ cp[p] = cp[p].replace(selftestdir, newselftestdir)
+ if builddir in cp[p] and newbuilddir not in cp[p]:
+ cp[p] = cp[p].replace(builddir, newbuilddir)
+
+
+
+ stream = os.fdopen(c2pwrite, 'wb', 1)
+ os.close(c2pread)
+ # Leave stderr and stdout open so we can see test noise
+ # Close stdin so that the child goes away if it decides to
+ # read from stdin (otherwise its a roulette to see what
+ # child actually gets keystrokes for pdb etc).
+ newsi = os.open(os.devnull, os.O_RDWR)
+ os.dup2(newsi, sys.stdin.fileno())
+
+ subunit_result = AutoTimingTestResultDecorator(
+ TestProtocolClient(stream)
+ )
+ process_suite.run(subunit_result)
+ if ourpid != os.getpid():
+ os._exit(0)
+ bb.utils.prunedir(newbuilddir)
+ except:
+ # Don't do anything with process children
+ if ourpid != os.getpid():
+ os._exit(1)
+ # Try and report traceback on stream, but exit with error
+ # even if stream couldn't be created or something else
+ # goes wrong. The traceback is formatted to a string and
+ # written in one go to avoid interleaving lines from
+ # multiple failing children.
+ try:
+ stream.write(traceback.format_exc())
+ finally:
+ bb.utils.prunedir(newbuilddir)
+ os._exit(1)
+ os._exit(0)
+ else:
+ os.close(c2pwrite)
+ stream = os.fdopen(c2pread, 'rb', 1)
+ test = ProtocolTestCase(stream)
+ result.append(test)
+ return result
+ return do_fork
+
+
+def partition_tests(suite, count):
+ """Partition suite into count lists of tests."""
+ # This just assigns tests in a round-robin fashion. On one hand this
+ # splits up blocks of related tests that might run faster if they shared
+ # resources, but on the other it avoids assigning blocks of slow tests to
+ # just one partition. So the slowest partition shouldn't be much slower
+ # than the fastest.
+ modules = {}
+ for test in iterate_tests(suite):
+ m = test.__module__.split('.')[0]
+ if m not in modules:
+ modules[m] = []
+ modules[m].append(test)
+
+ partitions = [list() for _ in range(count)]
+ for partition, m in zip(cycle(partitions), modules):
+ partition.extend(modules[m])
+ return partitions
+
+
+if __name__ == '__main__':
+ import time
+
+ class SampleTestCase(unittest.TestCase):
+ """Dummy tests that sleep for demo."""
+
+ def test_me_1(self):
+ time.sleep(0.5)
+
+ def test_me_2(self):
+ time.sleep(0.5)
+
+ def test_me_3(self):
+ time.sleep(0.5)
+
+ def test_me_4(self):
+ time.sleep(0.5)
+
+ # Load tests from SampleTestCase defined above
+ suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
+ runner = unittest.TextTestRunner()
+
+ # Run tests sequentially
+ runner.run(suite)
+
+ # Run same tests across 4 processes
+ suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
+ concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4))
+ runner.run(concurrent_suite)
diff --git a/meta/lib/oeqa/selftest/context.py b/meta/lib/oeqa/selftest/context.py
index 9e90d3c..026ee28 100644
--- a/meta/lib/oeqa/selftest/context.py
+++ b/meta/lib/oeqa/selftest/context.py
@@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext):
self.custommachine = None
self.config_paths = config_paths
- def runTests(self, machine=None, skips=[]):
+ def runTests(self, processes=None, machine=None, skips=[]):
if machine:
self.custommachine = machine
if machine == 'random':
self.custommachine = choice(self.machines)
self.logger.info('Run tests with custom MACHINE set to: %s' % \
self.custommachine)
- return super(OESelftestTestContext, self).runTests(skips)
+ return super(OESelftestTestContext, self).runTests(processes, skips)
def listTests(self, display_type, machine=None):
return super(OESelftestTestContext, self).listTests(display_type)
@@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
action="store_true", default=False,
help='List all available tests.')
+ parser.add_argument('-j', '--num-processes', dest='processes', action='store',
+ type=int, help="number of processes to execute in parallel with")
+
parser.add_argument('--machine', required=False, choices=['random', 'all'],
help='Run tests on different machines (random/all).')
--
To stop receiving notification emails like this one, please contact
the administrator of this repository.
More information about the Openembedded-commits
mailing list