summaryrefslogtreecommitdiff
path: root/compose/parallel.py
diff options
context:
space:
mode:
authorAanand Prasad <aanand.prasad@gmail.com>2016-04-08 17:46:13 +0100
committerAanand Prasad <aanand.prasad@gmail.com>2016-04-08 17:51:09 +0100
commit141b96bb312d85753de2189227941512bd42f33e (patch)
treea1c6c89f3d06df585b6900be3df44533b461e584 /compose/parallel.py
parentbcdf541c8c6ccc0070ab011a909f244f501676d6 (diff)
Abort operations if their dependencies fail
Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
Diffstat (limited to 'compose/parallel.py')
-rw-r--r--compose/parallel.py86
1 files changed, 51 insertions, 35 deletions
diff --git a/compose/parallel.py b/compose/parallel.py
index 79699236..745d4635 100644
--- a/compose/parallel.py
+++ b/compose/parallel.py
@@ -32,7 +32,7 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
for obj in objects:
writer.initialize(get_name(obj))
- q = setup_queue(objects, func, get_deps, get_name)
+ q = setup_queue(objects, func, get_deps)
done = 0
errors = {}
@@ -54,6 +54,8 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
elif isinstance(exception, APIError):
errors[get_name(obj)] = exception.explanation
writer.write(get_name(obj), 'error')
+ elif isinstance(exception, UpstreamError):
+ writer.write(get_name(obj), 'error')
else:
errors[get_name(obj)] = exception
error_to_reraise = exception
@@ -72,58 +74,72 @@ def _no_deps(x):
return []
-def setup_queue(objects, func, get_deps, get_name):
+def setup_queue(objects, func, get_deps):
if get_deps is None:
get_deps = _no_deps
results = Queue()
output = Queue()
- def consumer():
- started = set() # objects being processed
- finished = set() # objects which have been processed
-
- def ready(obj):
- """
- Returns true if obj is ready to be processed:
- - all dependencies have been processed
- - obj is not already being processed
- """
- return obj not in started and all(
- dep not in objects or dep in finished
- for dep in get_deps(obj)
- )
+ t = Thread(target=queue_consumer, args=(objects, func, get_deps, results, output))
+ t.daemon = True
+ t.start()
+
+ return output
+
+
+def queue_producer(obj, func, results):
+ try:
+ result = func(obj)
+ results.put((obj, result, None))
+ except Exception as e:
+ results.put((obj, None, e))
+
+
+def queue_consumer(objects, func, get_deps, results, output):
+ started = set() # objects being processed
+ finished = set() # objects which have been processed
+ failed = set() # objects which either failed or whose dependencies failed
+
+ while len(finished) + len(failed) < len(objects):
+ pending = set(objects) - started - finished - failed
+ log.debug('Pending: {}'.format(pending))
- while len(finished) < len(objects):
- for obj in filter(ready, objects):
+ for obj in pending:
+ deps = get_deps(obj)
+
+ if any(dep in failed for dep in deps):
+ log.debug('{} has upstream errors - not processing'.format(obj))
+ output.put((obj, None, UpstreamError()))
+ failed.add(obj)
+ elif all(
+ dep not in objects or dep in finished
+ for dep in deps
+ ):
log.debug('Starting producer thread for {}'.format(obj))
- t = Thread(target=producer, args=(obj,))
+ t = Thread(target=queue_producer, args=(obj, func, results))
t.daemon = True
t.start()
started.add(obj)
- try:
- event = results.get(timeout=1)
- except Empty:
- continue
+ try:
+ event = results.get(timeout=1)
+ except Empty:
+ continue
- obj = event[0]
+ obj, _, exception = event
+ if exception is None:
log.debug('Finished processing: {}'.format(obj))
finished.add(obj)
- output.put(event)
+ else:
+ log.debug('Failed: {}'.format(obj))
+ failed.add(obj)
- def producer(obj):
- try:
- result = func(obj)
- results.put((obj, result, None))
- except Exception as e:
- results.put((obj, None, e))
+ output.put(event)
- t = Thread(target=consumer)
- t.daemon = True
- t.start()
- return output
+class UpstreamError(Exception):
+ pass
class ParallelStreamWriter(object):