[OE-core] [PATCH 1/6] oeqa: Add selftest parallelisation support
Robert Yang
liezhi.yang at windriver.com
Thu Jul 26 03:03:36 UTC 2018
Hi RP,
On 07/17/2018 12:33 AM, Richard Purdie wrote:
> This allows oe-selftest to take a -j option which specifies how much test
> parallelisation to use. Currently this is "module" based with each module
> being split and run in a separate build directory. Further splitting could
> be done but this seems a good compromise between test setup and parallelism.
>
> You need python-testtools and python-subunit installed to use this but only
> when the -j option is specified.
Should we add python-testtools-native and python-subunit-native, please ?
And add them to TESTIMAGEDEPENDS ?
// Robert
>
> See notes posted to the openedmbedded-architecture list for more details
> about the design choices here.
>
> Some of this functionality may make more sense in the oeqa core ultimately.
>
> Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
> ---
> meta/lib/oeqa/core/context.py | 10 +-
> meta/lib/oeqa/core/runner.py | 24 +-
> meta/lib/oeqa/core/utils/concurrencytest.py | 254 ++++++++++++++++++++
> meta/lib/oeqa/selftest/context.py | 8 +-
> 4 files changed, 288 insertions(+), 8 deletions(-)
> create mode 100644 meta/lib/oeqa/core/utils/concurrencytest.py
>
> diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py
> index 10481b44b61..8cdfbf834f3 100644
> --- a/meta/lib/oeqa/core/context.py
> +++ b/meta/lib/oeqa/core/context.py
> @@ -58,14 +58,20 @@ class OETestContext(object):
> modules_required, filters)
> self.suites = self.loader.discover()
>
> - def runTests(self, skips=[]):
> + def runTests(self, processes=None, skips=[]):
> self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True)
>
> # Dinamically skip those tests specified though arguments
> self.skipTests(skips)
>
> self._run_start_time = time.time()
> - result = self.runner.run(self.suites)
> + if processes:
> + from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
> +
> + concurrent_suite = ConcurrentTestSuite(self.suites, processes)
> + result = self.runner.run(concurrent_suite)
> + else:
> + result = self.runner.run(self.suites)
> self._run_end_time = time.time()
>
> return result
> diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py
> index 219102c6b0f..6adbe3827b4 100644
> --- a/meta/lib/oeqa/core/runner.py
> +++ b/meta/lib/oeqa/core/runner.py
> @@ -43,11 +43,17 @@ class OETestResult(_TestResult):
> super(OETestResult, self).__init__(*args, **kwargs)
>
> self.successes = []
> + self.starttime = {}
> + self.endtime = {}
> + self.progressinfo = {}
>
> self.tc = tc
> self._tc_map_results()
>
> def startTest(self, test):
> + # May have been set by concurrencytest
> + if test.id() not in self.starttime:
> + self.starttime[test.id()] = time.time()
> super(OETestResult, self).startTest(test)
>
> def _tc_map_results(self):
> @@ -57,6 +63,12 @@ class OETestResult(_TestResult):
> self.tc._results['expectedFailures'] = self.expectedFailures
> self.tc._results['successes'] = self.successes
>
> + def stopTest(self, test):
> + self.endtime[test.id()] = time.time()
> + super(OETestResult, self).stopTest(test)
> + if test.id() in self.progressinfo:
> + print(self.progressinfo[test.id()])
> +
> def logSummary(self, component, context_msg=''):
> elapsed_time = self.tc._run_end_time - self.tc._run_start_time
> self.tc.logger.info("SUMMARY:")
> @@ -141,12 +153,16 @@ class OETestResult(_TestResult):
> if hasattr(d, 'oeid'):
> oeid = d.oeid
>
> + t = ""
> + if case.id() in self.starttime and case.id() in self.endtime:
> + t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)"
> +
> if fail:
> - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
> - oeid, desc))
> + self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
> + oeid, desc, t))
> else:
> - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
> - oeid, 'UNKNOWN'))
> + self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
> + oeid, 'UNKNOWN', t))
>
> class OEListTestsResult(object):
> def wasSuccessful(self):
> diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
> new file mode 100644
> index 00000000000..850586516a4
> --- /dev/null
> +++ b/meta/lib/oeqa/core/utils/concurrencytest.py
> @@ -0,0 +1,254 @@
> +#!/usr/bin/env python3
> +#
> +# Modified for use in OE by Richard Purdie, 2018
> +#
> +# Modified by: Corey Goldberg, 2013
> +# License: GPLv2+
> +#
> +# Original code from:
> +# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
> +# Copyright (C) 2005-2011 Canonical Ltd
> +# License: GPLv2+
> +
> +import os
> +import sys
> +import traceback
> +import unittest
> +import subprocess
> +import testtools
> +import threading
> +import time
> +import io
> +
> +from queue import Queue
> +from itertools import cycle
> +from subunit import ProtocolTestCase, TestProtocolClient
> +from subunit.test_results import AutoTimingTestResultDecorator
> +from testtools import ThreadsafeForwardingResult, iterate_tests
> +
> +import bb.utils
> +import oe.path
> +
> +_all__ = [
> + 'ConcurrentTestSuite',
> + 'fork_for_tests',
> + 'partition_tests',
> +]
> +
> +#
> +# Patch the version from testtools to allow access to _test_start and allow
> +# computation of timing information and threading progress
> +#
> +class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
> +
> + def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
> + super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
> + self.threadnum = threadnum
> + self.totalinprocess = totalinprocess
> + self.totaltests = totaltests
> +
> + def _add_result_with_semaphore(self, method, test, *args, **kwargs):
> + self.semaphore.acquire()
> + try:
> + self.result.starttime[test.id()] = self._test_start.timestamp()
> + self.result.threadprogress[self.threadnum].append(test.id())
> + totalprogress = sum(len(x) for x in self.result.threadprogress.values())
> + self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
> + self.threadnum,
> + len(self.result.threadprogress[self.threadnum]),
> + self.totalinprocess,
> + totalprogress,
> + self.totaltests,
> + "{0:.2f}".format(time.time()-self._test_start.timestamp()),
> + test.id())
> + finally:
> + self.semaphore.release()
> + super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
> +
> +#
> +# A dummy structure to add to io.StringIO so that the .buffer object
> +# is available and accepts writes. This allows unittest with buffer=True
> +# to interact ok with subunit which wants to access sys.stdout.buffer.
> +#
> +class dummybuf(object):
> + def __init__(self, parent):
> + self.p = parent
> + def write(self, data):
> + self.p.write(data.decode("utf-8"))
> +
> +#
> +# Taken from testtools.ConncurrencyTestSuite but modified for OE use
> +#
> +class ConcurrentTestSuite(unittest.TestSuite):
> +
> + def __init__(self, suite, processes):
> + super(ConcurrentTestSuite, self).__init__([suite])
> + self.processes = processes
> +
> + def run(self, result):
> + tests, totaltests = fork_for_tests(self.processes, self)
> + try:
> + threads = {}
> + queue = Queue()
> + semaphore = threading.Semaphore(1)
> + result.threadprogress = {}
> + for i, (test, testnum) in enumerate(tests):
> + result.threadprogress[i] = []
> + process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests)
> + # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
> + # as per default in parent code
> + process_result.buffer = True
> + # We have to add a buffer object to stdout to keep subunit happy
> + process_result._stderr_buffer = io.StringIO()
> + process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
> + process_result._stdout_buffer = io.StringIO()
> + process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
> + reader_thread = threading.Thread(
> + target=self._run_test, args=(test, process_result, queue))
> + threads[test] = reader_thread, process_result
> + reader_thread.start()
> + while threads:
> + finished_test = queue.get()
> + threads[finished_test][0].join()
> + del threads[finished_test]
> + except:
> + for thread, process_result in threads.values():
> + process_result.stop()
> + raise
> +
> + def _run_test(self, test, process_result, queue):
> + try:
> + try:
> + test.run(process_result)
> + except Exception:
> + # The run logic itself failed
> + case = testtools.ErrorHolder(
> + "broken-runner",
> + error=sys.exc_info())
> + case.run(process_result)
> + finally:
> + queue.put(test)
> +
> +def removebuilddir(d):
> + delay = 5
> + while delay and os.path.exists(d + "/bitbake.lock"):
> + time.sleep(1)
> + delay = delay - 1
> + bb.utils.prunedir(d)
> +
> +def fork_for_tests(concurrency_num, suite):
> + result = []
> + test_blocks = partition_tests(suite, concurrency_num)
> + # Clear the tests from the original suite so it doesn't keep them alive
> + suite._tests[:] = []
> + totaltests = sum(len(x) for x in test_blocks)
> + for process_tests in test_blocks:
> + numtests = len(process_tests)
> + process_suite = unittest.TestSuite(process_tests)
> + # Also clear each split list so new suite has only reference
> + process_tests[:] = []
> + c2pread, c2pwrite = os.pipe()
> + # Clear buffers before fork to avoid duplicate output
> + sys.stdout.flush()
> + sys.stderr.flush()
> + pid = os.fork()
> + if pid == 0:
> + ourpid = os.getpid()
> + try:
> + newbuilddir = None
> + stream = os.fdopen(c2pwrite, 'wb', 1)
> + os.close(c2pread)
> +
> + # Create a new separate BUILDDIR for each group of tests
> + if 'BUILDDIR' in os.environ:
> + builddir = os.environ['BUILDDIR']
> + newbuilddir = builddir + "-st-" + str(ourpid)
> + selftestdir = os.path.abspath(builddir + "/../meta-selftest")
> + newselftestdir = newbuilddir + "/meta-selftest"
> +
> + bb.utils.mkdirhier(newbuilddir)
> + oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
> + oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
> + oe.path.copytree(selftestdir, newselftestdir)
> +
> + for e in os.environ:
> + if builddir in os.environ[e]:
> + os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
> +
> + subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
> +
> + # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
> + subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
> +
> + os.chdir(newbuilddir)
> +
> + for t in process_suite:
> + if not hasattr(t, "tc"):
> + continue
> + cp = t.tc.config_paths
> + for p in cp:
> + if selftestdir in cp[p] and newselftestdir not in cp[p]:
> + cp[p] = cp[p].replace(selftestdir, newselftestdir)
> + if builddir in cp[p] and newbuilddir not in cp[p]:
> + cp[p] = cp[p].replace(builddir, newbuilddir)
> +
> + # Leave stderr and stdout open so we can see test noise
> + # Close stdin so that the child goes away if it decides to
> + # read from stdin (otherwise its a roulette to see what
> + # child actually gets keystrokes for pdb etc).
> + newsi = os.open(os.devnull, os.O_RDWR)
> + os.dup2(newsi, sys.stdin.fileno())
> +
> + subunit_client = TestProtocolClient(stream)
> + # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
> + # as per default in parent code
> + subunit_client.buffer = True
> + subunit_result = AutoTimingTestResultDecorator(subunit_client)
> + process_suite.run(subunit_result)
> + if ourpid != os.getpid():
> + os._exit(0)
> + if newbuilddir:
> + removebuilddir(newbuilddir)
> + except:
> + # Don't do anything with process children
> + if ourpid != os.getpid():
> + os._exit(1)
> + # Try and report traceback on stream, but exit with error
> + # even if stream couldn't be created or something else
> + # goes wrong. The traceback is formatted to a string and
> + # written in one go to avoid interleaving lines from
> + # multiple failing children.
> + try:
> + stream.write(traceback.format_exc().encode('utf-8'))
> + except:
> + sys.stderr.write(traceback.format_exc())
> + finally:
> + if newbuilddir:
> + removebuilddir(newbuilddir)
> + os._exit(1)
> + os._exit(0)
> + else:
> + os.close(c2pwrite)
> + stream = os.fdopen(c2pread, 'rb', 1)
> + test = ProtocolTestCase(stream)
> + result.append((test, numtests))
> + return result, totaltests
> +
> +def partition_tests(suite, count):
> + # Keep tests from the same class together but allow tests from modules
> + # to go to different processes to aid parallelisation.
> + modules = {}
> + for test in iterate_tests(suite):
> + m = test.__module__ + "." + test.__class__.__name__
> + if m not in modules:
> + modules[m] = []
> + modules[m].append(test)
> +
> + # Simply divide the test blocks between the available processes
> + partitions = [list() for _ in range(count)]
> + for partition, m in zip(cycle(partitions), modules):
> + partition.extend(modules[m])
> +
> + # No point in empty threads so drop them
> + return [p for p in partitions if p]
> +
> diff --git a/meta/lib/oeqa/selftest/context.py b/meta/lib/oeqa/selftest/context.py
> index 9e90d3c2565..c937b8171c9 100644
> --- a/meta/lib/oeqa/selftest/context.py
> +++ b/meta/lib/oeqa/selftest/context.py
> @@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext):
> self.custommachine = None
> self.config_paths = config_paths
>
> - def runTests(self, machine=None, skips=[]):
> + def runTests(self, processes=None, machine=None, skips=[]):
> if machine:
> self.custommachine = machine
> if machine == 'random':
> self.custommachine = choice(self.machines)
> self.logger.info('Run tests with custom MACHINE set to: %s' % \
> self.custommachine)
> - return super(OESelftestTestContext, self).runTests(skips)
> + return super(OESelftestTestContext, self).runTests(processes, skips)
>
> def listTests(self, display_type, machine=None):
> return super(OESelftestTestContext, self).listTests(display_type)
> @@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
> action="store_true", default=False,
> help='List all available tests.')
>
> + parser.add_argument('-j', '--num-processes', dest='processes', action='store',
> + type=int, help="number of processes to execute in parallel with")
> +
> parser.add_argument('--machine', required=False, choices=['random', 'all'],
> help='Run tests on different machines (random/all).')
>
> @@ -137,6 +140,7 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
> self.tc_kwargs['init']['config_paths']['bblayers_backup'])
>
> self.tc_kwargs['run']['skips'] = args.skips
> + self.tc_kwargs['run']['processes'] = args.processes
>
> def _pre_run(self):
> def _check_required_env_variables(vars):
>
More information about the Openembedded-core
mailing list