[oe-commits] [openembedded-core] 21/34: utils: Add multiprocess_launch API and testcase

git at git.openembedded.org git at git.openembedded.org
Tue Jul 24 10:54:13 UTC 2018


This is an automated email from the git hooks/post-receive script.

rpurdie pushed a commit to branch master
in repository openembedded-core.

commit 88f0c214e593a45566df5131bda4c946f5ccc8c2
Author: Richard Purdie <richard.purdie at linuxfoundation.org>
AuthorDate: Thu Jul 19 20:31:35 2018 +0000

    utils: Add multiprocess_launch API and testcase
    
    The current methods of spawning processes for parallel execution have
    issues around collection of results or exceptions.
    
    Take the code from package_ipk/deb, make it generic, add a results
    collection mechanism, fix the exception handling and for it into a
    standard library function.
    
    Also add a test case which tests both the success and failure modes
    of operation to stop this functionality regressiing again.
    
    In particular, compared to multiprocess_exec, this fork off the parent
    approach means we can pass in the datastore and functions work in the
    same scope as the parent. This removes some of the complexities
    found trying to scale multiprocess_exec to wider use.
    
    Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
 meta/lib/oe/utils.py                        | 70 +++++++++++++++++++++++++++++
 meta/lib/oeqa/selftest/cases/oelib/utils.py | 46 ++++++++++++++++++-
 2 files changed, 115 insertions(+), 1 deletion(-)

diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 6aed6dc..753b577 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -1,4 +1,6 @@
 import subprocess
+import multiprocessing
+import traceback
 
 def read_file(filename):
     try:
@@ -280,6 +282,74 @@ def multiprocess_exec(commands, function):
 
     return results
 
+# For each item in items, call the function 'target' with item as the first 
+# argument, extraargs as the other arguments and handle any exceptions in the
+# parent thread
+def multiprocess_launch(target, items, d, extraargs=None):
+
+    class ProcessLaunch(multiprocessing.Process):
+        def __init__(self, *args, **kwargs):
+            multiprocessing.Process.__init__(self, *args, **kwargs)
+            self._pconn, self._cconn = multiprocessing.Pipe()
+            self._exception = None
+            self._result = None
+
+        def run(self):
+            try:
+                ret = self._target(*self._args, **self._kwargs)
+                self._cconn.send((None, ret))
+            except Exception as e:
+                tb = traceback.format_exc()
+                self._cconn.send((e, tb))
+
+        def update(self):
+            if self._pconn.poll():
+                (e, tb) = self._pconn.recv()
+                if e is not None:
+                    self._exception = (e, tb)
+                else:
+                    self._result = tb
+
+        @property
+        def exception(self):
+            self.update()
+            return self._exception
+
+        @property
+        def result(self):
+            self.update()
+            return self._result
+
+    max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1)
+    launched = []
+    errors = []
+    results = []
+    items = list(items)
+    while (items and not errors) or launched:
+        if not errors and items and len(launched) < max_process:
+            args = (items.pop(),)
+            if extraargs is not None:
+                args = args + extraargs
+            p = ProcessLaunch(target=target, args=args)
+            p.start()
+            launched.append(p)
+        for q in launched:
+            # The finished processes are joined when calling is_alive()
+            if not q.is_alive():
+                if q.exception:
+                    errors.append(q.exception)
+                if q.result:
+                    results.append(q.result)
+                launched.remove(q)
+    # Paranoia doesn't hurt
+    for p in launched:
+        p.join()
+    if errors:
+        for (e, tb) in errors:
+            bb.error(str(tb))
+        bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above")
+    return results
+
 def squashspaces(string):
     import re
     return re.sub("\s+", " ", string).strip()
diff --git a/meta/lib/oeqa/selftest/cases/oelib/utils.py b/meta/lib/oeqa/selftest/cases/oelib/utils.py
index 9fb6c15..275aeda 100644
--- a/meta/lib/oeqa/selftest/cases/oelib/utils.py
+++ b/meta/lib/oeqa/selftest/cases/oelib/utils.py
@@ -1,5 +1,8 @@
+import sys
 from unittest.case import TestCase
-from oe.utils import packages_filter_out_system, trim_version
+from contextlib import contextmanager
+from io import StringIO
+from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch
 
 class TestPackagesFilterOutSystem(TestCase):
     def test_filter(self):
@@ -49,3 +52,44 @@ class TestTrimVersion(TestCase):
         self.assertEqual(trim_version("1.2.3", 2), "1.2")
         self.assertEqual(trim_version("1.2.3", 3), "1.2.3")
         self.assertEqual(trim_version("1.2.3", 4), "1.2.3")
+
+
+class TestMultiprocessLaunch(TestCase):
+
+    def test_multiprocesslaunch(self):
+        import bb
+
+        def testfunction(item, d):
+            if item == "2" or item == "1":
+                raise KeyError("Invalid number %s" % item)
+            return "Found %s" % item
+
+        def dummyerror(msg):
+            print("ERROR: %s" % msg)
+
+        @contextmanager
+        def captured_output():
+            new_out, new_err = StringIO(), StringIO()
+            old_out, old_err = sys.stdout, sys.stderr
+            try:
+                sys.stdout, sys.stderr = new_out, new_err
+                yield sys.stdout, sys.stderr
+            finally:
+                sys.stdout, sys.stderr = old_out, old_err
+
+        d = bb.data_smart.DataSmart()
+        bb.error = dummyerror
+
+        # Assert the function returns the right results
+        result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,))
+        self.assertIn("Found 3", result)
+        self.assertIn("Found 4", result)
+        self.assertIn("Found 5", result)
+        self.assertIn("Found 6", result)
+        self.assertEqual(len(result), 4)
+
+        # Assert the function prints exceptions
+        with captured_output() as (out, err):
+            self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,))
+        self.assertIn("KeyError: 'Invalid number 1'", out.getvalue())
+        self.assertIn("KeyError: 'Invalid number 2'", out.getvalue())

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the Openembedded-commits mailing list