diff options
author | Aanand Prasad <aanand.prasad@gmail.com> | 2016-04-08 17:46:13 +0100 |
---|---|---|
committer | Aanand Prasad <aanand.prasad@gmail.com> | 2016-04-08 17:51:09 +0100 |
commit | 141b96bb312d85753de2189227941512bd42f33e (patch) | |
tree | a1c6c89f3d06df585b6900be3df44533b461e584 /compose/parallel.py | |
parent | bcdf541c8c6ccc0070ab011a909f244f501676d6 (diff) |
Abort operations if their dependencies fail
Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
Diffstat (limited to 'compose/parallel.py')
-rw-r--r-- | compose/parallel.py | 86 |
1 files changed, 51 insertions, 35 deletions
diff --git a/compose/parallel.py b/compose/parallel.py index 79699236..745d4635 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -32,7 +32,7 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): for obj in objects: writer.initialize(get_name(obj)) - q = setup_queue(objects, func, get_deps, get_name) + q = setup_queue(objects, func, get_deps) done = 0 errors = {} @@ -54,6 +54,8 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): elif isinstance(exception, APIError): errors[get_name(obj)] = exception.explanation writer.write(get_name(obj), 'error') + elif isinstance(exception, UpstreamError): + writer.write(get_name(obj), 'error') else: errors[get_name(obj)] = exception error_to_reraise = exception @@ -72,58 +74,72 @@ def _no_deps(x): return [] -def setup_queue(objects, func, get_deps, get_name): +def setup_queue(objects, func, get_deps): if get_deps is None: get_deps = _no_deps results = Queue() output = Queue() - def consumer(): - started = set() # objects being processed - finished = set() # objects which have been processed - - def ready(obj): - """ - Returns true if obj is ready to be processed: - - all dependencies have been processed - - obj is not already being processed - """ - return obj not in started and all( - dep not in objects or dep in finished - for dep in get_deps(obj) - ) + t = Thread(target=queue_consumer, args=(objects, func, get_deps, results, output)) + t.daemon = True + t.start() + + return output + + +def queue_producer(obj, func, results): + try: + result = func(obj) + results.put((obj, result, None)) + except Exception as e: + results.put((obj, None, e)) + + +def queue_consumer(objects, func, get_deps, results, output): + started = set() # objects being processed + finished = set() # objects which have been processed + failed = set() # objects which either failed or whose dependencies failed + + while len(finished) + len(failed) < len(objects): + pending = set(objects) - started - finished - failed + log.debug('Pending: {}'.format(pending)) - while len(finished) < len(objects): - for obj in filter(ready, objects): + for obj in pending: + deps = get_deps(obj) + + if any(dep in failed for dep in deps): + log.debug('{} has upstream errors - not processing'.format(obj)) + output.put((obj, None, UpstreamError())) + failed.add(obj) + elif all( + dep not in objects or dep in finished + for dep in deps + ): log.debug('Starting producer thread for {}'.format(obj)) - t = Thread(target=producer, args=(obj,)) + t = Thread(target=queue_producer, args=(obj, func, results)) t.daemon = True t.start() started.add(obj) - try: - event = results.get(timeout=1) - except Empty: - continue + try: + event = results.get(timeout=1) + except Empty: + continue - obj = event[0] + obj, _, exception = event + if exception is None: log.debug('Finished processing: {}'.format(obj)) finished.add(obj) - output.put(event) + else: + log.debug('Failed: {}'.format(obj)) + failed.add(obj) - def producer(obj): - try: - result = func(obj) - results.put((obj, result, None)) - except Exception as e: - results.put((obj, None, e)) + output.put(event) - t = Thread(target=consumer) - t.daemon = True - t.start() - return output +class UpstreamError(Exception): + pass class ParallelStreamWriter(object): |