[OE-core] [PATCH 01/10] utils: Add multiprocess_launch API and testcase

Richard Purdie richard.purdie at linuxfoundation.org
Fri Jul 20 10:39:39 UTC 2018


The current methods of spawning processes for parallel execution have
issues around collection of results or exceptions.

Take the code from package_ipk/deb, make it generic, add a results
collection mechanism, fix the exception handling and for it into a
standard library function.

Also add a test case which tests both the success and failure modes
of operation to stop this functionality regressiing again.

Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
 meta/lib/oe/utils.py                        | 70 +++++++++++++++++++++
 meta/lib/oeqa/selftest/cases/oelib/utils.py | 46 +++++++++++++-
 2 files changed, 115 insertions(+), 1 deletion(-)

diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 6aed6dc993b..77c1939b8ab 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -1,4 +1,6 @@
 import subprocess
+import multiprocessing
+import traceback
 
 def read_file(filename):
     try:
@@ -280,6 +282,74 @@ def multiprocess_exec(commands, function):
 
     return results
 
+# For each item in items, call the function 'target' with item as the first 
+# argument, extraargs as the other arguments and handle any exceptions in the
+# parent thread
+def multiprocess_launch(target, items, d, extraargs=None):
+
+    class ProcessLauch(multiprocessing.Process):
+        def __init__(self, *args, **kwargs):
+            multiprocessing.Process.__init__(self, *args, **kwargs)
+            self._pconn, self._cconn = multiprocessing.Pipe()
+            self._exception = None
+            self._result = None
+
+        def run(self):
+            try:
+                ret = self._target(*self._args, **self._kwargs)
+                self._cconn.send((None, ret))
+            except Exception as e:
+                tb = traceback.format_exc()
+                self._cconn.send((e, tb))
+
+        def update(self):
+            if self._pconn.poll():
+                (e, tb) = self._pconn.recv()
+                if e is not None:
+                    self._exception = (e, tb)
+                else:
+                    self._result = tb
+
+        @property
+        def exception(self):
+            self.update()
+            return self._exception
+
+        @property
+        def result(self):
+            self.update()
+            return self._result
+
+    max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1)
+    launched = []
+    errors = []
+    results = []
+    items = list(items)
+    while (items and not errors) or launched:
+        if not errors and items and len(launched) < max_process:
+            args = (items.pop(),)
+            if extraargs is not None:
+                args = args + extraargs
+            p = ProcessLauch(target=target, args=args)
+            p.start()
+            launched.append(p)
+        for q in launched:
+            # The finished processes are joined when calling is_alive()
+            if not q.is_alive():
+                if q.exception:
+                    errors.append(q.exception)
+                if q.result:
+                    results.append(q.result)
+                launched.remove(q)
+    # Paranoia doesn't hurt
+    for p in launched:
+        p.join()
+    if errors:
+        for (e, tb) in errors:
+            bb.error(str(tb))
+        bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above")
+    return results
+
 def squashspaces(string):
     import re
     return re.sub("\s+", " ", string).strip()
diff --git a/meta/lib/oeqa/selftest/cases/oelib/utils.py b/meta/lib/oeqa/selftest/cases/oelib/utils.py
index 9fb6c1576e0..275aeda74eb 100644
--- a/meta/lib/oeqa/selftest/cases/oelib/utils.py
+++ b/meta/lib/oeqa/selftest/cases/oelib/utils.py
@@ -1,5 +1,8 @@
+import sys
 from unittest.case import TestCase
-from oe.utils import packages_filter_out_system, trim_version
+from contextlib import contextmanager
+from io import StringIO
+from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch
 
 class TestPackagesFilterOutSystem(TestCase):
     def test_filter(self):
@@ -49,3 +52,44 @@ class TestTrimVersion(TestCase):
         self.assertEqual(trim_version("1.2.3", 2), "1.2")
         self.assertEqual(trim_version("1.2.3", 3), "1.2.3")
         self.assertEqual(trim_version("1.2.3", 4), "1.2.3")
+
+
+class TestMultiprocessLaunch(TestCase):
+
+    def test_multiprocesslaunch(self):
+        import bb
+
+        def testfunction(item, d):
+            if item == "2" or item == "1":
+                raise KeyError("Invalid number %s" % item)
+            return "Found %s" % item
+
+        def dummyerror(msg):
+            print("ERROR: %s" % msg)
+
+        @contextmanager
+        def captured_output():
+            new_out, new_err = StringIO(), StringIO()
+            old_out, old_err = sys.stdout, sys.stderr
+            try:
+                sys.stdout, sys.stderr = new_out, new_err
+                yield sys.stdout, sys.stderr
+            finally:
+                sys.stdout, sys.stderr = old_out, old_err
+
+        d = bb.data_smart.DataSmart()
+        bb.error = dummyerror
+
+        # Assert the function returns the right results
+        result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,))
+        self.assertIn("Found 3", result)
+        self.assertIn("Found 4", result)
+        self.assertIn("Found 5", result)
+        self.assertIn("Found 6", result)
+        self.assertEqual(len(result), 4)
+
+        # Assert the function prints exceptions
+        with captured_output() as (out, err):
+            self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,))
+        self.assertIn("KeyError: 'Invalid number 1'", out.getvalue())
+        self.assertIn("KeyError: 'Invalid number 2'", out.getvalue())
-- 
2.17.1




More information about the Openembedded-core mailing list