diff options
author | Aanand Prasad <aanand.prasad@gmail.com> | 2016-04-08 18:30:28 +0100 |
---|---|---|
committer | Aanand Prasad <aanand.prasad@gmail.com> | 2016-04-08 18:30:28 +0100 |
commit | af9526fb820f40a8b7eafb16d29f990b1696f4fe (patch) | |
tree | a1a33cc83b9921c38ef0785fc7d4661f6741fe6f /compose/parallel.py | |
parent | 141b96bb312d85753de2189227941512bd42f33e (diff) |
Move queue logic out of parallel_execute()
Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
Diffstat (limited to 'compose/parallel.py')
-rw-r--r-- | compose/parallel.py | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/compose/parallel.py b/compose/parallel.py index 745d4635..8172d8ea 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -32,22 +32,13 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): for obj in objects: writer.initialize(get_name(obj)) - q = setup_queue(objects, func, get_deps) + events = parallel_execute_stream(objects, func, get_deps) - done = 0 errors = {} results = [] error_to_reraise = None - while done < len(objects): - try: - obj, result, exception = q.get(timeout=1) - except Empty: - continue - # See https://github.com/docker/compose/issues/189 - except thread.error: - raise ShutdownException() - + for obj, result, exception in events: if exception is None: writer.write(get_name(obj), 'done') results.append(result) @@ -59,7 +50,6 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): else: errors[get_name(obj)] = exception error_to_reraise = exception - done += 1 for obj_name, error in errors.items(): stream.write("\nERROR: for {} {}\n".format(obj_name, error)) @@ -74,7 +64,7 @@ def _no_deps(x): return [] -def setup_queue(objects, func, get_deps): +def parallel_execute_stream(objects, func, get_deps): if get_deps is None: get_deps = _no_deps @@ -85,7 +75,17 @@ def setup_queue(objects, func, get_deps): t.daemon = True t.start() - return output + done = 0 + + while done < len(objects): + try: + yield output.get(timeout=1) + done += 1 + except Empty: + continue + # See https://github.com/docker/compose/issues/189 + except thread.error: + raise ShutdownException() def queue_producer(obj, func, results): |