[oe-commits] [bitbake] 22/31: runqueue: Merge the queues and execute setscene and normal tasks in parallel

git at git.openembedded.org git at git.openembedded.org
Mon Jul 15 08:33:33 UTC 2019


This is an automated email from the git hooks/post-receive script.

rpurdie pushed a commit to branch master-next
in repository bitbake.

commit 58b3f0847cc2d47e76f74d59dcbbf78fe41b118b
Author: Richard Purdie <richard.purdie at linuxfoundation.org>
AuthorDate: Thu Jul 4 00:14:02 2019 +0100

    runqueue: Merge the queues and execute setscene and normal tasks in parallel
    
    This is the serious functionality change in this runqueue patch series of
    changes.
    
    Rather than two phases of execution, the scenequeue setscene phase, followed
    by normal task exeuction, this change allows them to execute in parallel
    together.
    
    To do this we need to handle marking of tasks as covered/uncovered in a piecemeal
    fashion on a task by task basis rather than in a single function.
    
    The code will block normal task exeuction until any setcene task which could
    cover that task is executed and its status is known. There is a slight
    optimisation which could be possible here at the risk of races but that
    doesn't seem worthwhile.
    
    The state engine isn't entirely cleaned up in this commit (see FIXME) and
    the setscenewhitelist functionality is broken by it (see following patches)
    however its good enough to test with normal workflows.
    
    Signed-off-by: Richard Purdie <richard.purdie at linuxfoundation.org>
---
 lib/bb/runqueue.py | 169 ++++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 115 insertions(+), 54 deletions(-)

diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index c1c4fd1..aafb6ff 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -142,7 +142,7 @@ class RunQueueScheduler(object):
         Return the id of the first task we find that is buildable
         """
         self.buildable = [x for x in self.buildable if x not in self.rq.runq_running]
-        buildable = self.buildable
+        buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)]
         if not buildable:
             return None
 
@@ -1454,25 +1454,18 @@ class RunQueue:
 
             # If we don't have any setscene functions, skip execution
             if len(self.rqdata.runq_setscene_tids) == 0:
-                self.rqdata.init_progress_reporter.finish()
-                self.state = runQueueRunInit
-            else:
-                logger.info('Executing SetScene Tasks')
-                self.state = runQueueSceneRun
-
-        if self.state is runQueueSceneRun:
-            retval = self.rqexe.sq_execute()
-
-        if self.state is runQueueRunInit:
-            if self.cooker.configuration.setsceneonly:
-                self.state = runQueueComplete
-
-        if self.state is runQueueRunInit:
-            logger.info("Executing RunQueue Tasks")
-            start_runqueue_tasks(self.rqexe)
+                logger.info('No setscene tasks')
+                for tid in self.rqdata.runtaskentries:
+                    if len(self.rqdata.runtaskentries[tid].depends) == 0:
+                        self.rqexe.setbuildable(tid)
+                    self.rqexe.tasks_notcovered.add(tid)
+                self.rqexe.sqdone = True
+            logger.info('Executing Tasks')
             self.state = runQueueRunning
 
         if self.state is runQueueRunning:
+            retval = self.rqexe.sq_execute()
+            # FIXME revtal
             retval = self.rqexe.execute()
 
         if self.state is runQueueCleanUp:
@@ -1757,6 +1750,8 @@ class RunQueueExecute:
 
         self.stampcache = {}
 
+        self.sqdone = False
+
         self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
         self.sq_stats = RunQueueStats(len(self.rqdata.runq_setscene_tids))
 
@@ -1772,12 +1767,12 @@ class RunQueueExecute:
         self.scenequeue_covered = set()
         # List of tasks which are covered (including setscene ones)
         self.tasks_covered = set()
+        self.tasks_scenequeue_done = set()
         self.scenequeue_notcovered = set()
+        self.tasks_notcovered = set()
         self.scenequeue_notneeded = set()
 
-        if len(self.rqdata.runq_setscene_tids) > 0:
-            self.sqdata = SQData()
-            build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
+        self.coveredtopocess = set()
 
         schedulers = self.get_schedulers()
         for scheduler in schedulers:
@@ -1789,6 +1784,10 @@ class RunQueueExecute:
             bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
                      (self.scheduler, ", ".join(obj.name for obj in schedulers)))
 
+        if len(self.rqdata.runq_setscene_tids) > 0:
+            self.sqdata = SQData()
+            build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
+
     def runqueue_process_waitpid(self, task, status):
 
         # self.build_stamps[pid] may not exist when use shared work directory.
@@ -1951,6 +1950,9 @@ class RunQueueExecute:
             if process_setscenewhitelist(self.rq, self.rqdata, self.stampcache, self.sched, self):
                 return True
 
+        if self.cooker.configuration.setsceneonly:
+            return True
+
         self.rq.read_workers()
 
         if self.stats.total == 0:
@@ -2014,7 +2016,7 @@ class RunQueueExecute:
             if self.can_start_task():
                 return True
 
-        if self.stats.active > 0:
+        if self.stats.active > 0 or self.sq_stats.active > 0:
             self.rq.read_workers()
             return self.rq.active_fds()
 
@@ -2026,9 +2028,9 @@ class RunQueueExecute:
         for task in self.rqdata.runtaskentries:
             if task not in self.runq_buildable:
                 logger.error("Task %s never buildable!", task)
-            if task not in self.runq_running:
+            elif task not in self.runq_running:
                 logger.error("Task %s never ran!", task)
-            if task not in self.runq_complete:
+            elif task not in self.runq_complete:
                 logger.error("Task %s never completed!", task)
         self.rq.state = runQueueComplete
 
@@ -2070,7 +2072,42 @@ class RunQueueExecute:
         #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
         return taskdepdata
 
-    def scenequeue_updatecounters(self, task, fail = False):
+    def scenequeue_process_notcovered(self, task):
+        logger.debug(1, 'Not skipping setscene task %s', task)
+        if len(self.rqdata.runtaskentries[task].depends) == 0:
+            self.setbuildable(task)
+        notcovered = set([task])
+        while notcovered:
+            new = set()
+            for t in notcovered:
+                for deptask in self.rqdata.runtaskentries[t].depends:
+                    if deptask in notcovered or deptask in new or deptask in self.rqdata.runq_setscene_tids or deptask in self.tasks_notcovered:
+                        continue
+                    logger.debug(1, 'Task %s depends on non-setscene task %s so not skipping' % (t, deptask))
+                    new.add(deptask)
+                    self.tasks_notcovered.add(deptask)
+                    if len(self.rqdata.runtaskentries[deptask].depends) == 0:
+                        self.setbuildable(deptask)
+            notcovered = new
+
+    def scenequeue_process_unskippable(self, task):
+        # Look up the dependency chain for non-setscene things which depend on this task
+        # and mark as 'done'/notcovered
+        ready = set([task])
+        while ready:
+            new = set()
+            for t in ready:
+                for deptask in self.rqdata.runtaskentries[t].revdeps:
+                    if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids:
+                        continue
+                    if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done):
+                        new.add(deptask)
+                        self.tasks_scenequeue_done.add(deptask)
+                        self.tasks_notcovered.add(deptask)
+                        #logger.warning("Up: " + str(deptask))
+            ready = new
+
+    def scenequeue_updatecounters(self, task, fail=False):
         for dep in self.sqdata.sq_deps[task]:
             if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]:
                 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
@@ -2083,6 +2120,43 @@ class RunQueueExecute:
             if len(self.sqdata.sq_revdeps2[dep]) == 0:
                 self.sq_buildable.add(dep)
 
+        next = set([task])
+        while next:
+            new = set()
+            for t in next:
+                self.tasks_scenequeue_done.add(t)
+                # Look down the dependency chain for non-setscene things which this task depends on
+                # and mark as 'done'
+                for dep in self.rqdata.runtaskentries[t].depends:
+                    if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
+                        continue
+                    if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
+                        new.add(dep)
+                        #logger.warning(" Down: " + dep)
+            next = new
+
+        if task in self.sqdata.unskippable:
+            self.scenequeue_process_unskippable(task)
+
+        if task in self.scenequeue_notcovered:
+            self.scenequeue_process_notcovered(task)
+        elif task in self.scenequeue_covered:
+            logger.debug(1, 'Queued setscene task %s', task)
+            self.coveredtopocess.add(task)
+
+        for task in self.coveredtopocess.copy():
+            if self.sqdata.sq_covered_tasks[task].issubset(self.tasks_scenequeue_done):
+                logger.debug(1, 'Processing setscene task %s', task)
+                covered = self.sqdata.sq_covered_tasks[task]
+                covered.add(task)
+                # Remove notcovered tasks
+                covered.difference_update(self.tasks_notcovered)
+                self.tasks_covered.update(covered)
+                self.coveredtopocess.remove(task)
+                for tid in covered:
+                    if len(self.rqdata.runtaskentries[tid].depends) == 0:
+                        self.setbuildable(tid)
+
     def sq_task_completeoutright(self, task):
         """
         Mark a task as completed
@@ -2113,6 +2187,7 @@ class RunQueueExecute:
         self.sq_stats.taskFailed()
         bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData)
         self.scenequeue_notcovered.add(task)
+        self.tasks_notcovered.add(task)
         self.scenequeue_updatecounters(task, True)
         self.sq_check_taskfail(task)
 
@@ -2122,6 +2197,7 @@ class RunQueueExecute:
         self.sq_stats.taskSkipped()
         self.sq_stats.taskCompleted()
         self.scenequeue_notcovered.add(task)
+        self.tasks_notcovered.add(task)
         self.scenequeue_updatecounters(task, True)
 
     def sq_task_skip(self, task):
@@ -2136,6 +2212,9 @@ class RunQueueExecute:
         Run the tasks in a queue prepared by prepare_runqueue
         """
 
+        if self.sqdone:
+            return True
+
         self.rq.read_workers()
 
         task = None
@@ -2209,7 +2288,7 @@ class RunQueueExecute:
             if self.can_start_task():
                 return True
 
-        if self.sq_stats.active > 0:
+        if self.stats.active > 0 or self.sq_stats.active > 0:
             self.rq.read_workers()
             return self.rq.active_fds()
 
@@ -2221,11 +2300,14 @@ class RunQueueExecute:
 
         logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
 
-        self.rq.state = runQueueRunInit
-
         completeevent = sceneQueueComplete(self.sq_stats, self.rq)
         bb.event.fire(completeevent, self.cfgData)
 
+        if self.cooker.configuration.setsceneonly:
+            self.rq.state = runQueueComplete
+
+        self.sqdone = True
+
         return True
 
     def sq_build_taskdepdata(self, task):
@@ -2366,6 +2448,12 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
             if tid in rqdata.runq_setscene_tids:
                 continue
             sqdata.unskippable.remove(tid)
+            if len(rqdata.runtaskentries[tid].depends) == 0:
+                # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
+                sqrq.tasks_notcovered.add(tid)
+                sqrq.tasks_scenequeue_done.add(tid)
+                sqrq.setbuildable(tid)
+                sqrq.scenequeue_process_unskippable(tid)
             sqdata.unskippable |= rqdata.runtaskentries[tid].depends
             new = True
 
@@ -2499,33 +2587,6 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
                 logger.debug(2, 'No package found, so skipping setscene task %s', tid)
                 sqdata.outrightfail.append(tid)
 
-def start_runqueue_tasks(rqexec):
-        # Mark initial buildable tasks
-        for tid in rqexec.rqdata.runtaskentries:
-            if len(rqexec.rqdata.runtaskentries[tid].depends) == 0:
-                rqexec.setbuildable(tid)
-            if len(rqexec.rqdata.runtaskentries[tid].revdeps) > 0 and rqexec.rqdata.runtaskentries[tid].revdeps.issubset(rqexec.tasks_covered):
-                rqexec.tasks_covered.add(tid)
-
-        found = True
-        while found:
-            found = False
-            for tid in rqexec.rqdata.runtaskentries:
-                if tid in rqexec.tasks_covered:
-                    continue
-                logger.debug(1, 'Considering %s: %s' % (tid, str(rqexec.rqdata.runtaskentries[tid].revdeps)))
-
-                if len(rqexec.rqdata.runtaskentries[tid].revdeps) > 0 and rqexec.rqdata.runtaskentries[tid].revdeps.issubset(rqexec.tasks_covered):
-                    if tid in rqexec.scenequeue_notcovered:
-                        continue
-                    found = True
-                    rqexec.tasks_covered.add(tid)
-
-        logger.debug(1, 'Skip list %s', sorted(rqexec.tasks_covered))
-
-        for task in self.rq.scenequeue_notcovered:
-            logger.debug(1, 'Not skipping task %s', task)
-
 class TaskFailure(Exception):
     """
     Exception raised when a task in a runqueue fails

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the Openembedded-commits mailing list