summaryrefslogtreecommitdiff
path: root/waflib/Runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'waflib/Runner.py')
-rw-r--r--waflib/Runner.py350
1 files changed, 350 insertions, 0 deletions
diff --git a/waflib/Runner.py b/waflib/Runner.py
new file mode 100644
index 0000000..c6480a3
--- /dev/null
+++ b/waflib/Runner.py
@@ -0,0 +1,350 @@
+#! /usr/bin/env python
+# encoding: utf-8
+# WARNING! Do not edit! https://waf.io/book/index.html#_obtaining_the_waf_file
+
+import heapq,traceback
+try:
+ from queue import Queue,PriorityQueue
+except ImportError:
+ from Queue import Queue
+ try:
+ from Queue import PriorityQueue
+ except ImportError:
+ class PriorityQueue(Queue):
+ def _init(self,maxsize):
+ self.maxsize=maxsize
+ self.queue=[]
+ def _put(self,item):
+ heapq.heappush(self.queue,item)
+ def _get(self):
+ return heapq.heappop(self.queue)
+from waflib import Utils,Task,Errors,Logs
+GAP=5
+class PriorityTasks(object):
+ def __init__(self):
+ self.lst=[]
+ def __len__(self):
+ return len(self.lst)
+ def __iter__(self):
+ return iter(self.lst)
+ def clear(self):
+ self.lst=[]
+ def append(self,task):
+ heapq.heappush(self.lst,task)
+ def appendleft(self,task):
+ heapq.heappush(self.lst,task)
+ def pop(self):
+ return heapq.heappop(self.lst)
+ def extend(self,lst):
+ if self.lst:
+ for x in lst:
+ self.append(x)
+ else:
+ if isinstance(lst,list):
+ self.lst=lst
+ heapq.heapify(lst)
+ else:
+ self.lst=lst.lst
+class Consumer(Utils.threading.Thread):
+ def __init__(self,spawner,task):
+ Utils.threading.Thread.__init__(self)
+ self.task=task
+ self.spawner=spawner
+ self.setDaemon(1)
+ self.start()
+ def run(self):
+ try:
+ if not self.spawner.master.stop:
+ self.spawner.master.process_task(self.task)
+ finally:
+ self.spawner.sem.release()
+ self.spawner.master.out.put(self.task)
+ self.task=None
+ self.spawner=None
+class Spawner(Utils.threading.Thread):
+ def __init__(self,master):
+ Utils.threading.Thread.__init__(self)
+ self.master=master
+ self.sem=Utils.threading.Semaphore(master.numjobs)
+ self.setDaemon(1)
+ self.start()
+ def run(self):
+ try:
+ self.loop()
+ except Exception:
+ pass
+ def loop(self):
+ master=self.master
+ while 1:
+ task=master.ready.get()
+ self.sem.acquire()
+ if not master.stop:
+ task.log_display(task.generator.bld)
+ Consumer(self,task)
+class Parallel(object):
+ def __init__(self,bld,j=2):
+ self.numjobs=j
+ self.bld=bld
+ self.outstanding=PriorityTasks()
+ self.postponed=PriorityTasks()
+ self.incomplete=set()
+ self.ready=PriorityQueue(0)
+ self.out=Queue(0)
+ self.count=0
+ self.processed=0
+ self.stop=False
+ self.error=[]
+ self.biter=None
+ self.dirty=False
+ self.revdeps=Utils.defaultdict(set)
+ self.spawner=Spawner(self)
+ def get_next_task(self):
+ if not self.outstanding:
+ return None
+ return self.outstanding.pop()
+ def postpone(self,tsk):
+ self.postponed.append(tsk)
+ def refill_task_list(self):
+ while self.count>self.numjobs*GAP:
+ self.get_out()
+ while not self.outstanding:
+ if self.count:
+ self.get_out()
+ if self.outstanding:
+ break
+ elif self.postponed:
+ try:
+ cond=self.deadlock==self.processed
+ except AttributeError:
+ pass
+ else:
+ if cond:
+ lst=[]
+ for tsk in self.postponed:
+ deps=[id(x)for x in tsk.run_after if not x.hasrun]
+ lst.append('%s\t-> %r'%(repr(tsk),deps))
+ if not deps:
+ lst.append('\n task %r dependencies are done, check its *runnable_status*?'%id(tsk))
+ raise Errors.WafError('Deadlock detected: check the task build order%s'%''.join(lst))
+ self.deadlock=self.processed
+ if self.postponed:
+ self.outstanding.extend(self.postponed)
+ self.postponed.clear()
+ elif not self.count:
+ if self.incomplete:
+ for x in self.incomplete:
+ for k in x.run_after:
+ if not k.hasrun:
+ break
+ else:
+ self.incomplete.remove(x)
+ self.outstanding.append(x)
+ break
+ else:
+ raise Errors.WafError('Broken revdeps detected on %r'%self.incomplete)
+ else:
+ tasks=next(self.biter)
+ ready,waiting=self.prio_and_split(tasks)
+ self.outstanding.extend(ready)
+ self.incomplete.update(waiting)
+ self.total=self.bld.total()
+ break
+ def add_more_tasks(self,tsk):
+ if getattr(tsk,'more_tasks',None):
+ more=set(tsk.more_tasks)
+ groups_done=set()
+ def iteri(a,b):
+ for x in a:
+ yield x
+ for x in b:
+ yield x
+ for x in iteri(self.outstanding,self.incomplete):
+ for k in x.run_after:
+ if isinstance(k,Task.TaskGroup):
+ if k not in groups_done:
+ groups_done.add(k)
+ for j in k.prev&more:
+ self.revdeps[j].add(k)
+ elif k in more:
+ self.revdeps[k].add(x)
+ ready,waiting=self.prio_and_split(tsk.more_tasks)
+ self.outstanding.extend(ready)
+ self.incomplete.update(waiting)
+ self.total+=len(tsk.more_tasks)
+ def mark_finished(self,tsk):
+ def try_unfreeze(x):
+ if x in self.incomplete:
+ for k in x.run_after:
+ if not k.hasrun:
+ break
+ else:
+ self.incomplete.remove(x)
+ self.outstanding.append(x)
+ if tsk in self.revdeps:
+ for x in self.revdeps[tsk]:
+ if isinstance(x,Task.TaskGroup):
+ x.prev.remove(tsk)
+ if not x.prev:
+ for k in x.next:
+ k.run_after.remove(x)
+ try_unfreeze(k)
+ x.next=[]
+ else:
+ try_unfreeze(x)
+ del self.revdeps[tsk]
+ def get_out(self):
+ tsk=self.out.get()
+ if not self.stop:
+ self.add_more_tasks(tsk)
+ self.mark_finished(tsk)
+ self.count-=1
+ self.dirty=True
+ return tsk
+ def add_task(self,tsk):
+ self.ready.put(tsk)
+ def process_task(self,tsk):
+ tsk.process()
+ if tsk.hasrun!=Task.SUCCESS:
+ self.error_handler(tsk)
+ def skip(self,tsk):
+ tsk.hasrun=Task.SKIPPED
+ self.mark_finished(tsk)
+ def cancel(self,tsk):
+ tsk.hasrun=Task.CANCELED
+ self.mark_finished(tsk)
+ def error_handler(self,tsk):
+ if not self.bld.keep:
+ self.stop=True
+ self.error.append(tsk)
+ def task_status(self,tsk):
+ try:
+ return tsk.runnable_status()
+ except Exception:
+ self.processed+=1
+ tsk.err_msg=traceback.format_exc()
+ if not self.stop and self.bld.keep:
+ self.skip(tsk)
+ if self.bld.keep==1:
+ if Logs.verbose>1 or not self.error:
+ self.error.append(tsk)
+ self.stop=True
+ else:
+ if Logs.verbose>1:
+ self.error.append(tsk)
+ return Task.EXCEPTION
+ tsk.hasrun=Task.EXCEPTION
+ self.error_handler(tsk)
+ return Task.EXCEPTION
+ def start(self):
+ self.total=self.bld.total()
+ while not self.stop:
+ self.refill_task_list()
+ tsk=self.get_next_task()
+ if not tsk:
+ if self.count:
+ continue
+ else:
+ break
+ if tsk.hasrun:
+ self.processed+=1
+ continue
+ if self.stop:
+ break
+ st=self.task_status(tsk)
+ if st==Task.RUN_ME:
+ self.count+=1
+ self.processed+=1
+ if self.numjobs==1:
+ tsk.log_display(tsk.generator.bld)
+ try:
+ self.process_task(tsk)
+ finally:
+ self.out.put(tsk)
+ else:
+ self.add_task(tsk)
+ elif st==Task.ASK_LATER:
+ self.postpone(tsk)
+ elif st==Task.SKIP_ME:
+ self.processed+=1
+ self.skip(tsk)
+ self.add_more_tasks(tsk)
+ elif st==Task.CANCEL_ME:
+ if Logs.verbose>1:
+ self.error.append(tsk)
+ self.processed+=1
+ self.cancel(tsk)
+ while self.error and self.count:
+ self.get_out()
+ self.ready.put(None)
+ if not self.stop:
+ assert not self.count
+ assert not self.postponed
+ assert not self.incomplete
+ def prio_and_split(self,tasks):
+ for x in tasks:
+ x.visited=0
+ reverse=self.revdeps
+ groups_done=set()
+ for x in tasks:
+ for k in x.run_after:
+ if isinstance(k,Task.TaskGroup):
+ if k not in groups_done:
+ groups_done.add(k)
+ for j in k.prev:
+ reverse[j].add(k)
+ else:
+ reverse[k].add(x)
+ def visit(n):
+ if isinstance(n,Task.TaskGroup):
+ return sum(visit(k)for k in n.next)
+ if n.visited==0:
+ n.visited=1
+ if n in reverse:
+ rev=reverse[n]
+ n.prio_order=n.tree_weight+len(rev)+sum(visit(k)for k in rev)
+ else:
+ n.prio_order=n.tree_weight
+ n.visited=2
+ elif n.visited==1:
+ raise Errors.WafError('Dependency cycle found!')
+ return n.prio_order
+ for x in tasks:
+ if x.visited!=0:
+ continue
+ try:
+ visit(x)
+ except Errors.WafError:
+ self.debug_cycles(tasks,reverse)
+ ready=[]
+ waiting=[]
+ for x in tasks:
+ for k in x.run_after:
+ if not k.hasrun:
+ waiting.append(x)
+ break
+ else:
+ ready.append(x)
+ return(ready,waiting)
+ def debug_cycles(self,tasks,reverse):
+ tmp={}
+ for x in tasks:
+ tmp[x]=0
+ def visit(n,acc):
+ if isinstance(n,Task.TaskGroup):
+ for k in n.next:
+ visit(k,acc)
+ return
+ if tmp[n]==0:
+ tmp[n]=1
+ for k in reverse.get(n,[]):
+ visit(k,[n]+acc)
+ tmp[n]=2
+ elif tmp[n]==1:
+ lst=[]
+ for tsk in acc:
+ lst.append(repr(tsk))
+ if tsk is n:
+ break
+ raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s'%''.join(lst))
+ for x in tasks:
+ visit(x,[])