[oe-commits] [openembedded-core] 01/07: oeqa: Add selftest parallelisation support

git at git.openembedded.org git at git.openembedded.org
Mon Jul 16 15:12:41 UTC 2018


This is an automated email from the git hooks/post-receive script.

rpurdie pushed a commit to branch master-next
in repository openembedded-core.

commit bad7225743d4ee86d635308c537b1da06529131d
Author: Richard Purdie <richard.purdie at linuxfoundation.org>
AuthorDate: Mon Jul 9 15:20:34 2018 +0000

    oeqa: Add selftest parallelisation support
    
    This allows oe-selftest to take a -j option which specifies how much test
    parallelisation to use. Currently this is "module" based with each module
    being split and run in a separate build directory. Further splitting could
    be done but this seems a good compromise between test setup and parallelism.
    
    You need python-testtools and python-subunit installed to use this but only
    when the -j option is specified.
    
    See notes posted to the openedmbedded-architecture list for more details
    about the design choices here.
    
    Some of this functionality may make more sense in the oeqa core ultimately.
    
    Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
 meta/lib/oeqa/core/context.py               |  11 +-
 meta/lib/oeqa/core/runner.py                |  24 ++-
 meta/lib/oeqa/core/utils/concurrencytest.py | 244 ++++++++++++++++++++++++++++
 meta/lib/oeqa/selftest/context.py           |   8 +-
 4 files changed, 279 insertions(+), 8 deletions(-)

diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py
index 10481b4..ce6256b 100644
--- a/meta/lib/oeqa/core/context.py
+++ b/meta/lib/oeqa/core/context.py
@@ -58,14 +58,20 @@ class OETestContext(object):
                 modules_required, filters)
         self.suites = self.loader.discover()
 
-    def runTests(self, skips=[]):
+    def runTests(self, processes=None, skips=[]):
         self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True)
 
         # Dinamically skip those tests specified though arguments
         self.skipTests(skips)
 
         self._run_start_time = time.time()
-        result = self.runner.run(self.suites)
+        if processes:
+            from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
+
+            concurrent_suite = ConcurrentTestSuite(self.suites, processes)
+            result = self.runner.run(concurrent_suite)
+        else:
+            result = self.runner.run(self.suites)
         self._run_end_time = time.time()
 
         return result
@@ -150,6 +156,7 @@ class OETestContextExecutor(object):
             self.tc_kwargs['init']['td'] = {}
 
         if args.run_tests:
+            print(str(args.run_tests))
             self.tc_kwargs['load']['modules'] = args.run_tests
             self.tc_kwargs['load']['modules_required'] = args.run_tests
         else:
diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py
index 219102c..6adbe38 100644
--- a/meta/lib/oeqa/core/runner.py
+++ b/meta/lib/oeqa/core/runner.py
@@ -43,11 +43,17 @@ class OETestResult(_TestResult):
         super(OETestResult, self).__init__(*args, **kwargs)
 
         self.successes = []
+        self.starttime = {}
+        self.endtime = {}
+        self.progressinfo = {}
 
         self.tc = tc
         self._tc_map_results()
 
     def startTest(self, test):
+        # May have been set by concurrencytest
+        if test.id() not in self.starttime:
+            self.starttime[test.id()] = time.time()
         super(OETestResult, self).startTest(test)
 
     def _tc_map_results(self):
@@ -57,6 +63,12 @@ class OETestResult(_TestResult):
         self.tc._results['expectedFailures'] = self.expectedFailures
         self.tc._results['successes'] = self.successes
 
+    def stopTest(self, test):
+        self.endtime[test.id()] = time.time()
+        super(OETestResult, self).stopTest(test)
+        if test.id() in self.progressinfo:
+            print(self.progressinfo[test.id()])
+
     def logSummary(self, component, context_msg=''):
         elapsed_time = self.tc._run_end_time - self.tc._run_start_time
         self.tc.logger.info("SUMMARY:")
@@ -141,12 +153,16 @@ class OETestResult(_TestResult):
                     if hasattr(d, 'oeid'):
                         oeid = d.oeid
 
+            t = ""
+            if case.id() in self.starttime and case.id() in self.endtime:
+                t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)"
+
             if fail:
-                self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
-                    oeid, desc))
+                self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
+                    oeid, desc, t))
             else:
-                self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
-                    oeid, 'UNKNOWN'))
+                self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
+                    oeid, 'UNKNOWN', t))
 
 class OEListTestsResult(object):
     def wasSuccessful(self):
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
new file mode 100644
index 0000000..7dae3d7
--- /dev/null
+++ b/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -0,0 +1,244 @@
+#!/usr/bin/env python3
+#
+# Modified for use in OE by Richard Purdie, 2018
+#
+# Modified by: Corey Goldberg, 2013
+#   License: GPLv2+
+#
+# Original code from:
+#   Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
+#   Copyright (C) 2005-2011 Canonical Ltd
+#   License: GPLv2+
+
+import os
+import sys
+import traceback
+import unittest
+import subprocess
+import testtools
+import threading
+import time
+import io
+
+from queue import Queue
+from itertools import cycle
+from subunit import ProtocolTestCase, TestProtocolClient
+from subunit.test_results import AutoTimingTestResultDecorator
+from testtools import ThreadsafeForwardingResult, iterate_tests
+
+import bb.utils
+import oe.path
+
+_all__ = [
+    'ConcurrentTestSuite',
+    'fork_for_tests',
+    'partition_tests',
+]
+
+#
+# Patch the version from testtools to allow access to _test_start and allow
+# computation of timing information and threading progress
+#
+class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
+
+    def __init__(self, target, semaphore, threadnum, totaltests):
+        super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
+        self.threadnum = threadnum
+        self.totaltests = totaltests
+
+    def _add_result_with_semaphore(self, method, test, *args, **kwargs):
+        self.semaphore.acquire()
+        try:
+            self.result.starttime[test.id()] = self._test_start.timestamp()
+            self.result.threadprogress[self.threadnum].append(test.id())
+            self.result.progressinfo[test.id()] = "%s:%s/%s (%ss) (%s)" % (self.threadnum, len(self.result.threadprogress[self.threadnum]), self.totaltests, "{0:.2f}".format(time.time()-self._test_start.timestamp()), test.id())
+        finally:
+            self.semaphore.release()
+        super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
+
+#
+# A dummy structure to add to io.StringIO so that the .buffer object
+# is available and accepts writes. This allows unittest with buffer=True
+# to interact ok with subunit which wants to access sys.stdout.buffer.
+#
+class dummybuf(object):
+   def __init__(self, parent):
+       self.p = parent
+   def write(self, data):
+       self.p.write(data.decode("utf-8"))
+
+#
+# Taken from testtools.ConncurrencyTestSuite but modified for OE use
+#
+class ConcurrentTestSuite(unittest.TestSuite):
+
+    def __init__(self, suite, processes):
+        super(ConcurrentTestSuite, self).__init__([suite])
+        self.processes = processes
+
+    def run(self, result):
+        tests = fork_for_tests(self.processes, self)
+        try:
+            threads = {}
+            queue = Queue()
+            semaphore = threading.Semaphore(1)
+            result.threadprogress = {}
+            for i, (test, testnum) in enumerate(tests):
+                result.threadprogress[i] = []
+                process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum)
+                # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
+                # as per default in parent code
+                process_result.buffer = True
+                # We have to add a buffer object to stdout to keep subunit happy
+                process_result._stderr_buffer = io.StringIO()
+                process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
+                process_result._stdout_buffer = io.StringIO()
+                process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
+                reader_thread = threading.Thread(
+                    target=self._run_test, args=(test, process_result, queue))
+                threads[test] = reader_thread, process_result
+                reader_thread.start()
+            while threads:
+                finished_test = queue.get()
+                threads[finished_test][0].join()
+                del threads[finished_test]
+        except:
+            for thread, process_result in threads.values():
+                process_result.stop()
+            raise
+
+    def _run_test(self, test, process_result, queue):
+        try:
+            try:
+                test.run(process_result)
+            except Exception:
+                # The run logic itself failed
+                case = testtools.ErrorHolder(
+                    "broken-runner",
+                    error=sys.exc_info())
+                case.run(process_result)
+        finally:
+            queue.put(test)
+
+def removebuilddir(d):
+    delay = 5
+    while delay and os.path.exists(d + "/bitbake.lock"):
+        time.sleep(1)
+        delay = delay - 1
+    bb.utils.prunedir(d)
+
+def fork_for_tests(concurrency_num, suite):
+    result = []
+    test_blocks = partition_tests(suite, concurrency_num)
+    # Clear the tests from the original suite so it doesn't keep them alive
+    suite._tests[:] = []
+    for process_tests in test_blocks:
+        numtests = len(process_tests)
+        process_suite = unittest.TestSuite(process_tests)
+        # Also clear each split list so new suite has only reference
+        process_tests[:] = []
+        c2pread, c2pwrite = os.pipe()
+        # Clear buffers before fork to avoid duplicate output
+        sys.stdout.flush()
+        sys.stderr.flush()
+        pid = os.fork()
+        if pid == 0:
+            ourpid = os.getpid()
+            try:
+                newbuilddir = None
+                stream = os.fdopen(c2pwrite, 'wb', 1)
+                os.close(c2pread)
+
+                # Create a new separate BUILDDIR for each group of tests
+                if 'BUILDDIR' in os.environ:
+                    builddir = os.environ['BUILDDIR']
+                    newbuilddir = builddir + "-st-" + str(ourpid)
+                    selftestdir = os.path.abspath(builddir + "/../meta-selftest")
+                    newselftestdir = newbuilddir + "/meta-selftest"
+
+                    bb.utils.mkdirhier(newbuilddir)
+                    oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
+                    oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
+                    oe.path.copytree(selftestdir, newselftestdir)
+
+                    for e in os.environ:
+                        if builddir in os.environ[e]:
+                            os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
+
+                    subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
+
+                    # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
+                    subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
+
+                    os.chdir(newbuilddir)
+
+                    for t in process_suite:
+                        if not hasattr(t, "tc"):
+                            continue
+                        cp = t.tc.config_paths
+                        for p in cp:
+                            if selftestdir in cp[p] and newselftestdir not in cp[p]:
+                                cp[p] = cp[p].replace(selftestdir, newselftestdir)
+                            if builddir in cp[p] and newbuilddir not in cp[p]:
+                                cp[p] = cp[p].replace(builddir, newbuilddir)
+
+                # Leave stderr and stdout open so we can see test noise
+                # Close stdin so that the child goes away if it decides to
+                # read from stdin (otherwise its a roulette to see what
+                # child actually gets keystrokes for pdb etc).
+                newsi = os.open(os.devnull, os.O_RDWR)
+                os.dup2(newsi, sys.stdin.fileno())
+
+                subunit_client = TestProtocolClient(stream)
+                # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
+                # as per default in parent code
+                subunit_client.buffer = True
+                subunit_result = AutoTimingTestResultDecorator(subunit_client)
+                process_suite.run(subunit_result)
+                if ourpid != os.getpid():
+                    os._exit(0)
+                if newbuilddir:
+                    removebuilddir(newbuilddir)
+            except:
+                # Don't do anything with process children
+                if ourpid != os.getpid():
+                    os._exit(1)
+                # Try and report traceback on stream, but exit with error
+                # even if stream couldn't be created or something else
+                # goes wrong.  The traceback is formatted to a string and
+                # written in one go to avoid interleaving lines from
+                # multiple failing children.
+                try:
+                    stream.write(traceback.format_exc().encode('utf-8'))
+                except:
+                    sys.stderr.write(traceback.format_exc())
+                finally:
+                    if newbuilddir:
+                        removebuilddir(newbuilddir)
+                    os._exit(1)
+            os._exit(0)
+        else:
+            os.close(c2pwrite)
+            stream = os.fdopen(c2pread, 'rb', 1)
+            test = ProtocolTestCase(stream)
+            result.append((test, numtests))
+    return result
+
+def partition_tests(suite, count):
+    # Keep tests from the same class together but allow tests from modules
+    # to go to different processes to aid parallelisation.
+    modules = {}
+    for test in iterate_tests(suite):
+        m = test.__module__ + "." + test.__class__.__name__
+        if m not in modules:
+            modules[m] = []
+        modules[m].append(test)
+
+    # Simply divide the test blocks between the available processes
+    partitions = [list() for _ in range(count)]
+    for partition, m in zip(cycle(partitions), modules):
+        partition.extend(modules[m])
+
+    # No point in empty threads so drop them
+    return [p for p in partitions if p]
+
diff --git a/meta/lib/oeqa/selftest/context.py b/meta/lib/oeqa/selftest/context.py
index 9e90d3c..c937b81 100644
--- a/meta/lib/oeqa/selftest/context.py
+++ b/meta/lib/oeqa/selftest/context.py
@@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext):
         self.custommachine = None
         self.config_paths = config_paths
 
-    def runTests(self, machine=None, skips=[]):
+    def runTests(self, processes=None, machine=None, skips=[]):
         if machine:
             self.custommachine = machine
             if machine == 'random':
                 self.custommachine = choice(self.machines)
             self.logger.info('Run tests with custom MACHINE set to: %s' % \
                     self.custommachine)
-        return super(OESelftestTestContext, self).runTests(skips)
+        return super(OESelftestTestContext, self).runTests(processes, skips)
 
     def listTests(self, display_type, machine=None):
         return super(OESelftestTestContext, self).listTests(display_type)
@@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
                 action="store_true", default=False,
                 help='List all available tests.')
 
+        parser.add_argument('-j', '--num-processes', dest='processes', action='store',
+                type=int, help="number of processes to execute in parallel with")
+
         parser.add_argument('--machine', required=False, choices=['random', 'all'],
                             help='Run tests on different machines (random/all).')
         
@@ -137,6 +140,7 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
                 self.tc_kwargs['init']['config_paths']['bblayers_backup'])
 
         self.tc_kwargs['run']['skips'] = args.skips
+        self.tc_kwargs['run']['processes'] = args.processes
 
     def _pre_run(self):
         def _check_required_env_variables(vars):

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the Openembedded-commits mailing list