summaryrefslogtreecommitdiff
path: root/compose/parallel.py
diff options
context:
space:
mode:
authorMatthieu Nottale <matthieu.nottale@docker.com>2018-03-05 14:28:46 +0100
committerMatthieu Nottale <matthieu.nottale@docker.com>2018-03-07 15:22:38 +0100
commit31dcfcff2ad9124d028e642e5dd61530714b15c7 (patch)
treee25b71de64fd488332a2dd85fbcc5bb245c4201e /compose/parallel.py
parenteee55231b841a62e9c031fede784f6a1c86f57d4 (diff)
Revamp ParallelStreamWriter to fix display issues.
Signed-off-by: Matthieu Nottale <matthieu.nottale@docker.com>
Diffstat (limited to 'compose/parallel.py')
-rw-r--r--compose/parallel.py106
1 files changed, 57 insertions, 49 deletions
diff --git a/compose/parallel.py b/compose/parallel.py
index dd83c70c..5d4791f9 100644
--- a/compose/parallel.py
+++ b/compose/parallel.py
@@ -43,55 +43,60 @@ class GlobalLimit(object):
cls.global_limiter = Semaphore(value)
-def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None):
- """Runs func on objects in parallel while ensuring that func is
- ran on object only after it is ran on all its dependencies.
-
- get_deps called on object must return a collection with its dependencies.
- get_name called on object must return its name.
+def parallel_execute_watch(events, writer, errors, results, msg, get_name):
+ """ Watch events from a parallel execution, update status and fill errors and results.
+ Returns exception to re-raise.
"""
- objects = list(objects)
- stream = get_output_stream(sys.stderr)
-
- writer = ParallelStreamWriter(stream, msg)
-
- display_objects = list(parent_objects) if parent_objects else objects
-
- for obj in display_objects:
- writer.add_object(get_name(obj))
-
- # write data in a second loop to consider all objects for width alignment
- # and avoid duplicates when parent_objects exists
- for obj in objects:
- writer.write_initial(get_name(obj))
-
- events = parallel_execute_iter(objects, func, get_deps, limit)
-
- errors = {}
- results = []
error_to_reraise = None
-
for obj, result, exception in events:
if exception is None:
- writer.write(get_name(obj), 'done', green)
+ writer.write(msg, get_name(obj), 'done', green)
results.append(result)
elif isinstance(exception, ImageNotFound):
# This is to bubble up ImageNotFound exceptions to the client so we
# can prompt the user if they want to rebuild.
errors[get_name(obj)] = exception.explanation
- writer.write(get_name(obj), 'error', red)
+ writer.write(msg, get_name(obj), 'error', red)
error_to_reraise = exception
elif isinstance(exception, APIError):
errors[get_name(obj)] = exception.explanation
- writer.write(get_name(obj), 'error', red)
+ writer.write(msg, get_name(obj), 'error', red)
elif isinstance(exception, (OperationFailedError, HealthCheckFailed, NoHealthCheckConfigured)):
errors[get_name(obj)] = exception.msg
- writer.write(get_name(obj), 'error', red)
+ writer.write(msg, get_name(obj), 'error', red)
elif isinstance(exception, UpstreamError):
- writer.write(get_name(obj), 'error', red)
+ writer.write(msg, get_name(obj), 'error', red)
else:
errors[get_name(obj)] = exception
error_to_reraise = exception
+ return error_to_reraise
+
+
+def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None):
+ """Runs func on objects in parallel while ensuring that func is
+ ran on object only after it is ran on all its dependencies.
+
+ get_deps called on object must return a collection with its dependencies.
+ get_name called on object must return its name.
+ """
+ objects = list(objects)
+ stream = get_output_stream(sys.stderr)
+
+ if ParallelStreamWriter.instance:
+ writer = ParallelStreamWriter.instance
+ else:
+ writer = ParallelStreamWriter(stream)
+
+ for obj in objects:
+ writer.add_object(msg, get_name(obj))
+ for obj in objects:
+ writer.write_initial(msg, get_name(obj))
+
+ events = parallel_execute_iter(objects, func, get_deps, limit)
+
+ errors = {}
+ results = []
+ error_to_reraise = parallel_execute_watch(events, writer, errors, results, msg, get_name)
for obj_name, error in errors.items():
stream.write("\nERROR: for {} {}\n".format(obj_name, error))
@@ -253,55 +258,58 @@ class ParallelStreamWriter(object):
noansi = False
lock = Lock()
+ instance = None
@classmethod
def set_noansi(cls, value=True):
cls.noansi = value
- def __init__(self, stream, msg):
+ def __init__(self, stream):
self.stream = stream
- self.msg = msg
self.lines = []
self.width = 0
+ ParallelStreamWriter.instance = self
- def add_object(self, obj_index):
- self.lines.append(obj_index)
- self.width = max(self.width, len(obj_index))
+ def add_object(self, msg, obj_index):
+ if msg is None:
+ return
+ self.lines.append(msg + obj_index)
+ self.width = max(self.width, len(msg + ' ' + obj_index))
- def write_initial(self, obj_index):
- if self.msg is None:
+ def write_initial(self, msg, obj_index):
+ if msg is None:
return
- self.stream.write("{} {:<{width}} ... \r\n".format(
- self.msg, self.lines[self.lines.index(obj_index)], width=self.width))
+ self.stream.write("{:<{width}} ... \r\n".format(
+ msg + ' ' + obj_index, width=self.width))
self.stream.flush()
- def _write_ansi(self, obj_index, status):
+ def _write_ansi(self, msg, obj_index, status):
self.lock.acquire()
- position = self.lines.index(obj_index)
+ position = self.lines.index(msg + obj_index)
diff = len(self.lines) - position
# move up
self.stream.write("%c[%dA" % (27, diff))
# erase
self.stream.write("%c[2K\r" % 27)
- self.stream.write("{} {:<{width}} ... {}\r".format(self.msg, obj_index,
+ self.stream.write("{:<{width}} ... {}\r".format(msg + ' ' + obj_index,
status, width=self.width))
# move back down
self.stream.write("%c[%dB" % (27, diff))
self.stream.flush()
self.lock.release()
- def _write_noansi(self, obj_index, status):
- self.stream.write("{} {:<{width}} ... {}\r\n".format(self.msg, obj_index,
+ def _write_noansi(self, msg, obj_index, status):
+ self.stream.write("{:<{width}} ... {}\r\n".format(msg + ' ' + obj_index,
status, width=self.width))
self.stream.flush()
- def write(self, obj_index, status, color_func):
- if self.msg is None:
+ def write(self, msg, obj_index, status, color_func):
+ if msg is None:
return
if self.noansi:
- self._write_noansi(obj_index, status)
+ self._write_noansi(msg, obj_index, status)
else:
- self._write_ansi(obj_index, color_func(status))
+ self._write_ansi(msg, obj_index, color_func(status))
def parallel_operation(containers, operation, options, message):