[oe-commits] [openembedded-core] 18/18: Add test timing debugging

git at git.openembedded.org git at git.openembedded.org
Sat Jul 14 15:06:32 UTC 2018


This is an automated email from the git hooks/post-receive script.

rpurdie pushed a commit to branch master-next
in repository openembedded-core.

commit 61be76839ae2f0c20cf0b19aeb13be8379d249b3
Author: Richard Purdie <richard.purdie at linuxfoundation.org>
AuthorDate: Fri Jul 13 15:22:26 2018 +0000

    Add test timing debugging
    
    Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
 meta/lib/oeqa/core/runner.py                |  24 +++++--
 meta/lib/oeqa/core/utils/concurrencytest.py | 105 ++++++++++++++++++++++++++--
 2 files changed, 120 insertions(+), 9 deletions(-)

diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py
index ddf02b9..a545f0b 100644
--- a/meta/lib/oeqa/core/runner.py
+++ b/meta/lib/oeqa/core/runner.py
@@ -35,12 +35,24 @@ class OETestResult(_TestResult):
         super(OETestResult, self).__init__(*args, **kwargs)
 
         self.successes = []
+        self.starttime = {}
+        self.endtime = {}
+        self.proginfo = {}
 
         self.tc = tc
 
     def startTest(self, test):
+        # May have been set by concurrencytest
+        if test.id() not in self.starttime:
+            self.starttime[test.id()] = time.time()
         super(OETestResult, self).startTest(test)
 
+    def stopTest(self, test):
+        self.endtime[test.id()] = time.time()
+        super(OETestResult, self).stopTest(test)
+        if test.id() in self.proginfo:
+            print(self.proginfo[test.id()])
+
     def logSummary(self, component, context_msg=''):
         elapsed_time = self.tc._run_end_time - self.tc._run_start_time
         self.tc.logger.info("SUMMARY:")
@@ -116,12 +128,16 @@ class OETestResult(_TestResult):
                     if hasattr(d, 'oeid'):
                         oeid = d.oeid
 
+            t = ""
+            if case.id() in self.starttime and case.id() in self.endtime:
+                t = "(" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)"
+
             if fail:
-                self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
-                    oeid, desc))
+                self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
+                    oeid, desc, t))
             else:
-                self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
-                    oeid, 'UNKNOWN'))
+                self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
+                    oeid, 'UNKNOWN', t))
 
 class OEListTestsResult(object):
     def wasSuccessful(self):
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
index 8f424dd..0f2bc3a 100644
--- a/meta/lib/oeqa/core/utils/concurrencytest.py
+++ b/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -15,12 +15,16 @@ import sys
 import traceback
 import unittest
 import subprocess
+import testtools
 from itertools import cycle
+from queue import Queue
+import threading
+import time
 
 from subunit import ProtocolTestCase, TestProtocolClient
 from subunit.test_results import AutoTimingTestResultDecorator
 
-from testtools import ConcurrentTestSuite, iterate_tests
+from testtools import ThreadsafeForwardingResult, iterate_tests
 
 import bb.utils
 import oe.path
@@ -31,6 +35,92 @@ _all__ = [
     'partition_tests',
 ]
 
+class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
+
+    def __init__(self, target, semaphore, testnum, total):
+        super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
+        self.testnum = testnum
+        self.total = total
+
+    def _add_result_with_semaphore(self, method, test, *args, **kwargs):
+        self.semaphore.acquire()
+        try:
+            self.result.starttime[test.id()] = self._test_start.timestamp()
+            self.result.proginfo2[self.testnum].append(test.id())
+            self.result.proginfo[test.id()] = "%s:%s/%s (%ss) (%s)" % (self.testnum, len(self.result.proginfo2[self.testnum]), self.total, "{0:.2f}".format(time.time()-self._test_start.timestamp()), test.id())
+        finally:
+            self.semaphore.release()
+        super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
+
+
+class ConcurrentTestSuite(unittest.TestSuite):
+    """A TestSuite whose run() calls out to a concurrency strategy."""
+
+    def __init__(self, suite, make_tests):
+        """Create a ConcurrentTestSuite to execute suite.
+
+        :param suite: A suite to run concurrently.
+        :param make_tests: A helper function to split the tests in the
+            ConcurrentTestSuite into some number of concurrently executing
+            sub-suites. make_tests must take a suite, and return an iterable
+            of TestCase-like object, each of which must have a run(result)
+            method.
+        """
+        super(ConcurrentTestSuite, self).__init__([suite])
+        self.make_tests = make_tests
+
+    def run(self, result):
+        """Run the tests concurrently.
+
+        This calls out to the provided make_tests helper, and then serialises
+        the results so that result only sees activity from one TestCase at
+        a time.
+
+        ConcurrentTestSuite provides no special mechanism to stop the tests
+        returned by make_tests, it is up to the make_tests to honour the
+        shouldStop attribute on the result object they are run with, which will
+        be set if an exception is raised in the thread which
+        ConcurrentTestSuite.run is called in.
+        """
+        tests = self.make_tests(self)
+        try:
+            threads = {}
+            queue = Queue()
+            semaphore = threading.Semaphore(1)
+            result.proginfo2 = {}
+            for i, (test, testnum) in enumerate(tests):
+                result.proginfo2[i] = []
+                process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum)
+                # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
+                # as per default in parent code
+                process_result.buffer = True
+                reader_thread = threading.Thread(
+                    target=self._run_test, args=(test, process_result, queue))
+                threads[test] = reader_thread, process_result
+                reader_thread.start()
+            while threads:
+                finished_test = queue.get()
+                threads[finished_test][0].join()
+                del threads[finished_test]
+        except:
+            for thread, process_result in threads.values():
+                process_result.stop()
+            raise
+
+    def _run_test(self, test, process_result, queue):
+        try:
+            try:
+                test.run(process_result)
+            except Exception:
+                # The run logic itself failed.
+                case = testtools.ErrorHolder(
+                    "broken-runner",
+                    error=sys.exc_info())
+                case.run(process_result)
+        finally:
+            queue.put(test)
+
+
 def fork_for_tests(concurrency_num):
     """Implementation of `make_tests` used to construct `ConcurrentTestSuite`.
     :param concurrency_num: number of processes to use.
@@ -52,7 +142,10 @@ def fork_for_tests(concurrency_num):
         test_blocks = partition_tests(suite, concurrency_num)
         # Clear the tests from the original suite so it doesn't keep them alive
         suite._tests[:] = []
+        print(str(test_blocks))
         for process_tests in test_blocks:
+            print(str(process_tests))
+            numtests = len(process_tests)
             process_suite = unittest.TestSuite(process_tests)
             # Also clear each split list so new suite has only reference
             process_tests[:] = []
@@ -107,9 +200,11 @@ def fork_for_tests(concurrency_num):
                     newsi = os.open(os.devnull, os.O_RDWR)
                     os.dup2(newsi, sys.stdin.fileno())
 
-                    subunit_result = AutoTimingTestResultDecorator(
-                        TestProtocolClient(stream)
-                    )
+                    subunit_client = TestProtocolClient(stream)
+                    # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
+                    # as per default in parent code
+                    subunit_client.buffer = True
+                    subunit_result = AutoTimingTestResultDecorator(subunit_client)
                     process_suite.run(subunit_result)
                     if ourpid != os.getpid():
                         os._exit(0)
@@ -137,7 +232,7 @@ def fork_for_tests(concurrency_num):
                 os.close(c2pwrite)
                 stream = os.fdopen(c2pread, 'rb', 1)
                 test = ProtocolTestCase(stream)
-                result.append(test)
+                result.append((test, numtests))
         return result
     return do_fork
 

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the Openembedded-commits mailing list