[oe-commits] [openembedded-core] 01/02: oeqa: Add selftest parallelisation support

git at git.openembedded.org git at git.openembedded.org
Tue Jul 10 17:58:34 UTC 2018


This is an automated email from the git hooks/post-receive script.

rpurdie pushed a commit to branch master-next
in repository openembedded-core.

commit af5207525965d93d9a66e07225c3cfb4b2214585
Author: Richard Purdie <richard.purdie at linuxfoundation.org>
AuthorDate: Mon Jul 9 15:20:34 2018 +0000

    oeqa: Add selftest parallelisation support
    
    This allows oe-selftest to take a -j option which specifies how much test
    parallelisation to use. Currently this is "module" based with each module
    being split and run in a separate build directory. Further splitting could
    be done but this seems a good compromise between test setup and parallelism.
    
    You need python-testtools and python-subunit installed to used this.
    
    Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
 meta/lib/oeqa/core/context.py               |  14 ++-
 meta/lib/oeqa/core/utils/concurrencytest.py | 175 ++++++++++++++++++++++++++++
 meta/lib/oeqa/selftest/context.py           |   7 +-
 3 files changed, 192 insertions(+), 4 deletions(-)

diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py
index acd5474..b8e4131 100644
--- a/meta/lib/oeqa/core/context.py
+++ b/meta/lib/oeqa/core/context.py
@@ -58,14 +58,20 @@ class OETestContext(object):
                 modules_required, filters)
         self.suites = self.loader.discover()
 
-    def runTests(self, skips=[]):
+    def runTests(self, processes=None, skips=[]):
         self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
 
         # Dinamically skip those tests specified though arguments
         self.skipTests(skips)
 
         self._run_start_time = time.time()
-        result = self.runner.run(self.suites)
+        if processes:
+            from oeqa.core.utils.concurrencytest import fork_for_tests, ConcurrentTestSuite
+
+            concurrent_suite = ConcurrentTestSuite(self.suites, fork_for_tests(processes))
+            result = self.runner.run(concurrent_suite)
+        else:
+            result = self.runner.run(self.suites)
         self._run_end_time = time.time()
 
         return result
@@ -87,6 +93,8 @@ class OETestContextExecutor(object):
     default_test_data = os.path.join(default_cases[0], 'data.json')
     default_tests = None
 
+    processes = None
+
     def register_commands(self, logger, subparsers):
         self.parser = subparsers.add_parser(self.name, help=self.help,
                 description=self.description, group='components')
@@ -150,12 +158,14 @@ class OETestContextExecutor(object):
             self.tc_kwargs['init']['td'] = {}
 
         if args.run_tests:
+            print(str(args.run_tests))
             self.tc_kwargs['load']['modules'] = args.run_tests
             self.tc_kwargs['load']['modules_required'] = args.run_tests
         else:
             self.tc_kwargs['load']['modules'] = []
 
         self.tc_kwargs['run']['skips'] = []
+        self.tc_kwargs['run']['processes'] = args.processes
 
         self.module_paths = args.CASES_PATHS
 
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
new file mode 100644
index 0000000..ce627ec
--- /dev/null
+++ b/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -0,0 +1,175 @@
+#!/usr/bin/env python3
+#
+# Modified for use in OE by Richard Purdie, 2018
+#
+# Modified by: Corey Goldberg, 2013
+#   License: GPLv2+
+#
+# Original code from:
+#   Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
+#   Copyright (C) 2005-2011 Canonical Ltd
+#   License: GPLv2+
+
+import os
+import sys
+import traceback
+import unittest
+import subprocess
+from itertools import cycle
+
+from subunit import ProtocolTestCase, TestProtocolClient
+from subunit.test_results import AutoTimingTestResultDecorator
+
+from testtools import ConcurrentTestSuite, iterate_tests
+
+import bb.utils
+import oe.path
+
+_all__ = [
+    'ConcurrentTestSuite',
+    'fork_for_tests',
+    'partition_tests',
+]
+
+def fork_for_tests(concurrency_num):
+    """Implementation of `make_tests` used to construct `ConcurrentTestSuite`.
+    :param concurrency_num: number of processes to use.
+    """
+    def do_fork(suite):
+        """Take suite and start up multiple runners by forking (Unix only).
+        :param suite: TestSuite object.
+        :return: An iterable of TestCase-like objects which can each have
+        run(result) called on them to feed tests to result.
+        """
+        result = []
+        test_blocks = partition_tests(suite, concurrency_num)
+        # Clear the tests from the original suite so it doesn't keep them alive
+        suite._tests[:] = []
+        for process_tests in test_blocks:
+            process_suite = unittest.TestSuite(process_tests)
+            # Also clear each split list so new suite has only reference
+            process_tests[:] = []
+            c2pread, c2pwrite = os.pipe()
+            pid = os.fork()
+            if pid == 0:
+                ourpid = os.getpid()
+                try:
+                    # Create a new separate BUILDDIR for each group of tests
+                    builddir = os.environ['BUILDDIR']
+                    newbuilddir = builddir + "-st-" + str(ourpid)
+                    selftestdir = os.path.abspath(builddir + "/../meta-selftest")
+                    newselftestdir = newbuilddir + "/meta-selftest"
+
+                    bb.utils.mkdirhier(newbuilddir)
+                    oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
+                    oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
+                    oe.path.copytree(selftestdir, newselftestdir)
+
+                    for e in os.environ:
+                        if builddir in os.environ[e]:
+                            os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
+
+                    subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
+                    subprocess.check_output("bitbake-layers remove-layer %s" % selftestdir, cwd=newbuilddir, shell=True)
+                    subprocess.check_output("bitbake-layers add-layer %s" % newselftestdir, cwd=newbuilddir, shell=True)
+                    os.chdir(newbuilddir)
+
+                    for t in process_suite:
+                        cp = t.tc.config_paths
+                        for p in cp:
+                            if selftestdir in cp[p] and newselftestdir not in cp[p]:
+                                cp[p] = cp[p].replace(selftestdir, newselftestdir)
+                            if builddir in cp[p] and newbuilddir not in cp[p]:
+                                cp[p] = cp[p].replace(builddir, newbuilddir)
+
+
+
+                    stream = os.fdopen(c2pwrite, 'wb', 1)
+                    os.close(c2pread)
+                    # Leave stderr and stdout open so we can see test noise
+                    # Close stdin so that the child goes away if it decides to
+                    # read from stdin (otherwise its a roulette to see what
+                    # child actually gets keystrokes for pdb etc).
+                    newsi = os.open(os.devnull, os.O_RDWR)
+                    os.dup2(newsi, sys.stdin.fileno())
+
+                    subunit_result = AutoTimingTestResultDecorator(
+                        TestProtocolClient(stream)
+                    )
+                    process_suite.run(subunit_result)
+                    if ourpid != os.getpid():
+                        os._exit(0)
+                    bb.utils.prunedir(newbuilddir)
+                except:
+                    # Don't do anything with process children
+                    if ourpid != os.getpid():
+                        os._exit(1)
+                    # Try and report traceback on stream, but exit with error
+                    # even if stream couldn't be created or something else
+                    # goes wrong.  The traceback is formatted to a string and
+                    # written in one go to avoid interleaving lines from
+                    # multiple failing children.
+                    try:
+                        stream.write(traceback.format_exc())
+                    finally:
+                        bb.utils.prunedir(newbuilddir)
+                        os._exit(1)
+                os._exit(0)
+            else:
+                os.close(c2pwrite)
+                stream = os.fdopen(c2pread, 'rb', 1)
+                test = ProtocolTestCase(stream)
+                result.append(test)
+        return result
+    return do_fork
+
+
+def partition_tests(suite, count):
+    """Partition suite into count lists of tests."""
+    # This just assigns tests in a round-robin fashion.  On one hand this
+    # splits up blocks of related tests that might run faster if they shared
+    # resources, but on the other it avoids assigning blocks of slow tests to
+    # just one partition.  So the slowest partition shouldn't be much slower
+    # than the fastest.
+    modules = {}
+    for test in iterate_tests(suite):
+        m = test.__module__.split('.')[0]
+        if m not in modules:
+            modules[m] = []
+        modules[m].append(test)
+
+    partitions = [list() for _ in range(count)]
+    for partition, m in zip(cycle(partitions), modules):
+        partition.extend(modules[m])
+    return partitions
+
+
+if __name__ == '__main__':
+    import time
+
+    class SampleTestCase(unittest.TestCase):
+        """Dummy tests that sleep for demo."""
+
+        def test_me_1(self):
+            time.sleep(0.5)
+
+        def test_me_2(self):
+            time.sleep(0.5)
+
+        def test_me_3(self):
+            time.sleep(0.5)
+
+        def test_me_4(self):
+            time.sleep(0.5)
+
+    # Load tests from SampleTestCase defined above
+    suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
+    runner = unittest.TextTestRunner()
+
+    # Run tests sequentially
+    runner.run(suite)
+
+    # Run same tests across 4 processes
+    suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
+    concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4))
+    runner.run(concurrent_suite)
diff --git a/meta/lib/oeqa/selftest/context.py b/meta/lib/oeqa/selftest/context.py
index 9e90d3c..026ee28 100644
--- a/meta/lib/oeqa/selftest/context.py
+++ b/meta/lib/oeqa/selftest/context.py
@@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext):
         self.custommachine = None
         self.config_paths = config_paths
 
-    def runTests(self, machine=None, skips=[]):
+    def runTests(self, processes=None, machine=None, skips=[]):
         if machine:
             self.custommachine = machine
             if machine == 'random':
                 self.custommachine = choice(self.machines)
             self.logger.info('Run tests with custom MACHINE set to: %s' % \
                     self.custommachine)
-        return super(OESelftestTestContext, self).runTests(skips)
+        return super(OESelftestTestContext, self).runTests(processes, skips)
 
     def listTests(self, display_type, machine=None):
         return super(OESelftestTestContext, self).listTests(display_type)
@@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
                 action="store_true", default=False,
                 help='List all available tests.')
 
+        parser.add_argument('-j', '--num-processes', dest='processes', action='store',
+                type=int, help="number of processes to execute in parallel with")
+
         parser.add_argument('--machine', required=False, choices=['random', 'all'],
                             help='Run tests on different machines (random/all).')
         

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the Openembedded-commits mailing list