[OE-core] [PATCH 07/30] oeqa/core/threaded: Add support to run into a thread at end of execution

Leonardo Sandoval leonardo.sandoval.gonzalez at linux.intel.com
Wed Jul 12 14:22:26 UTC 2017


On Tue, 2017-07-11 at 15:23 -0500, Aníbal Limón wrote:
> Some test cases aren't allowed to run into a multi-thread environment so
> add the posibility to run those tests at end of execution.
> 

Hi Anibal,

which is the reason these need to run at the end?



> Signed-off-by: Aníbal Limón <anibal.limon at linux.intel.com>
> ---
>  meta/lib/oeqa/core/threaded.py | 102 +++++++++++++++++++++++++++++++----------
>  1 file changed, 78 insertions(+), 24 deletions(-)
> 
> diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
> index 34217f1a8b8..a7dc0aed401 100644
> --- a/meta/lib/oeqa/core/threaded.py
> +++ b/meta/lib/oeqa/core/threaded.py
> @@ -30,18 +30,35 @@ class OETestLoaderThreaded(OETestLoader):
>          suites = {}
>          suites['main'] = self.suiteClass()
>          suites['pool'] = []
> +        suites['end'] = self.suiteClass()
>          for _ in range(self.process_num - 1):
>              suites['pool'].append(self.suiteClass())
>  
> +        def _add_by_module_or_dep(suite, case, depends):
> +            """
> +                A test case that needs to run into the same thread
> +                because is on the same module or for dependency
> +                reasons.
> +            """
> +
> +            for c in suite._tests:
> +                if case.__module__ == c.__module__:
> +                    suite.addTest(case)
> +                    return True
> +
> +            if case.id() in depends:
> +                case_depends = depends[case.id()]
> +                for c in suite._tests:
> +                    if c.id() in case_depends:
> +                        suite.addTest(case)
> +                        return True
> +
> +            return False
> +
>          def _add_to_main_thread(main_suite, case, depends):
>              """
>                  Some test cases needs to be run into the main
> -                thread for several resons.
> -
> -                A test case that needs to run in the main thread
> -                can be for specific set via test class _main_thread
> -                attr or because is on the same module or for a dependency
> -                reason.
> +                thread by request.
>              """
>  
>              if hasattr(case.__class__, '_main_thread') and \
> @@ -50,19 +67,20 @@ class OETestLoaderThreaded(OETestLoader):
>                  main_suite.addTest(case)
>                  return True
>  
> -            for c in main_suite._tests:
> -                if case.__module__ == c.__module__:
> -                    main_suite.addTest(case)
> -                    return True
> +            return _add_by_module_or_dep(main_suite, case, depends)
>  
> -            if case.id() in depends:
> -                case_depends = depends[case.id()]
> -                for c in main_suite._tests:
> -                    if c.id() in case_depends:
> -                        main_suite.addTest(case)
> -                        return True
> +        def _add_to_end_thread(end_suite, case, depends):
> +            """
> +                Some test cases needs to be run into at end of
> +                execution into the main by request.
> +            """
> +            if hasattr(case.__class__, '_end_thread') and \
> +                    case.__class__._end_thread or \
> +                    self.process_num == 1:
> +                end_suite.addTest(case)
> +                return True
>  
> -            return False
> +            return _add_by_module_or_dep(end_suite, case, depends)
>  
>          def _search_for_module_idx(suites, case):
>              """
> @@ -112,6 +130,9 @@ class OETestLoaderThreaded(OETestLoader):
>                      if 'depends' in self.tc._registry:
>                          depends = self.tc._registry['depends']
>  
> +                    if _add_to_end_thread(suites['end'], case, depends):
> +                        continue
> +
>                      if _add_to_main_thread(suites['main'], case, depends):
>                          continue
>  
> @@ -135,7 +156,7 @@ class OETestLoaderThreaded(OETestLoader):
>  
>          # if the main suite doesn't have test cases
>          # use the first element of the suites pool
> -        if not len(suites['main']._tests):
> +        if not len(suites['main']._tests) and len(suites['pool']):
>              suites['main'] = suites['pool'].pop(0)
>  
>          return suites
> @@ -268,6 +289,12 @@ class _ThreadedPool:
>          self.tasks = queue.Queue(num_tasks)
>          self.workers = []
>  
> +        self.stream = stream
> +        self.result = result
> +
> +        self.end_task = None
> +        self.end_worker = None
> +
>          for _ in range(num_workers):
>              worker = _Worker(self.tasks, result, stream)
>              self.workers.append(worker)
> @@ -280,12 +307,25 @@ class _ThreadedPool:
>          """Add a task to the queue"""
>          self.tasks.put((func, args, kargs))
>  
> +    def add_end_task(self, func, *args, **kwargs):
> +        """Add a task to be executed at end"""
> +
> +        self.end_task = queue.Queue(1)
> +        self.end_task.put((func, args, kwargs))
> +        self.end_worker = _Worker(self.end_task, self.result,
> +                self.stream)
> +
>      def wait_completion(self):
>          """Wait for completion of all the tasks in the queue"""
>          self.tasks.join()
>          for worker in self.workers:
>              worker.join()
>  
> +        if self.end_task:
> +            self.end_worker.start()
> +            self.end_task.join()
> +            self.end_worker.join()
> +
>  class OETestRunnerThreaded(OETestRunner):
>      streamLoggerClass = OEStreamLoggerThreaded
>  
> @@ -293,32 +333,46 @@ class OETestRunnerThreaded(OETestRunner):
>          super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
>          self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
>  
> +    def _run_main_thread(self, suite, result):
> +        if len(suite._tests):
> +            run_start_time = time.time()
> +            rc = super(OETestRunnerThreaded, self).run(suite)
> +            run_end_time = time.time()
> +            result.addResult(rc, run_start_time, run_end_time)
> +            self.stream.finish()
> +
>      def run(self, suites):
>          result = OETestResultThreaded(self.tc)
>  
>          pool = None
> +
>          if suites['pool']:
>              thread_no = len(suites['pool'])
>              pool = _ThreadedPool(thread_no, thread_no, stream=self.stream,
>                      result=result)
>              for s in suites['pool']:
>                  pool.add_task(super(OETestRunnerThreaded, self).run, s)
> -            pool.start()
>  
> -        run_start_time = time.time()
> -        rc = super(OETestRunnerThreaded, self).run(suites['main'])
> -        run_end_time = time.time()
> -        result.addResult(rc, run_start_time, run_end_time)
> -        self.stream.finish()
> +        if len(suites['end']._tests):
> +            if not pool:
> +                pool = _ThreadedPool(0, 0, stream=self.stream,
> +                            result=result)
> +            pool.add_end_task(super(OETestRunnerThreaded, self).run,
> +                    suites['end'])
>  
>          if pool:
> +            pool.start()
> +        self._run_main_thread(suites['main'], result)
> +        if pool:
>              pool.wait_completion()
> +
>          result._fill_tc_results()
>  
>          return result
>  
>      def list_tests(self, suite, display_type):
>          suite['pool'].insert(0, suite['main'])
> +        suite['pool'].append(suite['end'])
>  
>          return super(OETestRunnerThreaded, self).list_tests(
>                  suite['pool'], display_type)





More information about the Openembedded-core mailing list