summaryrefslogtreecommitdiff
path: root/compose/parallel.py
diff options
context:
space:
mode:
authorAanand Prasad <aanand.prasad@gmail.com>2016-04-08 18:30:28 +0100
committerAanand Prasad <aanand.prasad@gmail.com>2016-04-08 18:30:28 +0100
commitaf9526fb820f40a8b7eafb16d29f990b1696f4fe (patch)
treea1a33cc83b9921c38ef0785fc7d4661f6741fe6f /compose/parallel.py
parent141b96bb312d85753de2189227941512bd42f33e (diff)
Move queue logic out of parallel_execute()
Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
Diffstat (limited to 'compose/parallel.py')
-rw-r--r--compose/parallel.py28
1 files changed, 14 insertions, 14 deletions
diff --git a/compose/parallel.py b/compose/parallel.py
index 745d4635..8172d8ea 100644
--- a/compose/parallel.py
+++ b/compose/parallel.py
@@ -32,22 +32,13 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
for obj in objects:
writer.initialize(get_name(obj))
- q = setup_queue(objects, func, get_deps)
+ events = parallel_execute_stream(objects, func, get_deps)
- done = 0
errors = {}
results = []
error_to_reraise = None
- while done < len(objects):
- try:
- obj, result, exception = q.get(timeout=1)
- except Empty:
- continue
- # See https://github.com/docker/compose/issues/189
- except thread.error:
- raise ShutdownException()
-
+ for obj, result, exception in events:
if exception is None:
writer.write(get_name(obj), 'done')
results.append(result)
@@ -59,7 +50,6 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
else:
errors[get_name(obj)] = exception
error_to_reraise = exception
- done += 1
for obj_name, error in errors.items():
stream.write("\nERROR: for {} {}\n".format(obj_name, error))
@@ -74,7 +64,7 @@ def _no_deps(x):
return []
-def setup_queue(objects, func, get_deps):
+def parallel_execute_stream(objects, func, get_deps):
if get_deps is None:
get_deps = _no_deps
@@ -85,7 +75,17 @@ def setup_queue(objects, func, get_deps):
t.daemon = True
t.start()
- return output
+ done = 0
+
+ while done < len(objects):
+ try:
+ yield output.get(timeout=1)
+ done += 1
+ except Empty:
+ continue
+ # See https://github.com/docker/compose/issues/189
+ except thread.error:
+ raise ShutdownException()
def queue_producer(obj, func, results):