diff options
author | Matthieu Nottale <matthieu.nottale@docker.com> | 2018-03-05 14:28:46 +0100 |
---|---|---|
committer | Matthieu Nottale <matthieu.nottale@docker.com> | 2018-03-07 15:22:38 +0100 |
commit | 31dcfcff2ad9124d028e642e5dd61530714b15c7 (patch) | |
tree | e25b71de64fd488332a2dd85fbcc5bb245c4201e /compose/parallel.py | |
parent | eee55231b841a62e9c031fede784f6a1c86f57d4 (diff) |
Revamp ParallelStreamWriter to fix display issues.
Signed-off-by: Matthieu Nottale <matthieu.nottale@docker.com>
Diffstat (limited to 'compose/parallel.py')
-rw-r--r-- | compose/parallel.py | 106 |
1 files changed, 57 insertions, 49 deletions
diff --git a/compose/parallel.py b/compose/parallel.py index dd83c70c..5d4791f9 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -43,55 +43,60 @@ class GlobalLimit(object): cls.global_limiter = Semaphore(value) -def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None): - """Runs func on objects in parallel while ensuring that func is - ran on object only after it is ran on all its dependencies. - - get_deps called on object must return a collection with its dependencies. - get_name called on object must return its name. +def parallel_execute_watch(events, writer, errors, results, msg, get_name): + """ Watch events from a parallel execution, update status and fill errors and results. + Returns exception to re-raise. """ - objects = list(objects) - stream = get_output_stream(sys.stderr) - - writer = ParallelStreamWriter(stream, msg) - - display_objects = list(parent_objects) if parent_objects else objects - - for obj in display_objects: - writer.add_object(get_name(obj)) - - # write data in a second loop to consider all objects for width alignment - # and avoid duplicates when parent_objects exists - for obj in objects: - writer.write_initial(get_name(obj)) - - events = parallel_execute_iter(objects, func, get_deps, limit) - - errors = {} - results = [] error_to_reraise = None - for obj, result, exception in events: if exception is None: - writer.write(get_name(obj), 'done', green) + writer.write(msg, get_name(obj), 'done', green) results.append(result) elif isinstance(exception, ImageNotFound): # This is to bubble up ImageNotFound exceptions to the client so we # can prompt the user if they want to rebuild. errors[get_name(obj)] = exception.explanation - writer.write(get_name(obj), 'error', red) + writer.write(msg, get_name(obj), 'error', red) error_to_reraise = exception elif isinstance(exception, APIError): errors[get_name(obj)] = exception.explanation - writer.write(get_name(obj), 'error', red) + writer.write(msg, get_name(obj), 'error', red) elif isinstance(exception, (OperationFailedError, HealthCheckFailed, NoHealthCheckConfigured)): errors[get_name(obj)] = exception.msg - writer.write(get_name(obj), 'error', red) + writer.write(msg, get_name(obj), 'error', red) elif isinstance(exception, UpstreamError): - writer.write(get_name(obj), 'error', red) + writer.write(msg, get_name(obj), 'error', red) else: errors[get_name(obj)] = exception error_to_reraise = exception + return error_to_reraise + + +def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None): + """Runs func on objects in parallel while ensuring that func is + ran on object only after it is ran on all its dependencies. + + get_deps called on object must return a collection with its dependencies. + get_name called on object must return its name. + """ + objects = list(objects) + stream = get_output_stream(sys.stderr) + + if ParallelStreamWriter.instance: + writer = ParallelStreamWriter.instance + else: + writer = ParallelStreamWriter(stream) + + for obj in objects: + writer.add_object(msg, get_name(obj)) + for obj in objects: + writer.write_initial(msg, get_name(obj)) + + events = parallel_execute_iter(objects, func, get_deps, limit) + + errors = {} + results = [] + error_to_reraise = parallel_execute_watch(events, writer, errors, results, msg, get_name) for obj_name, error in errors.items(): stream.write("\nERROR: for {} {}\n".format(obj_name, error)) @@ -253,55 +258,58 @@ class ParallelStreamWriter(object): noansi = False lock = Lock() + instance = None @classmethod def set_noansi(cls, value=True): cls.noansi = value - def __init__(self, stream, msg): + def __init__(self, stream): self.stream = stream - self.msg = msg self.lines = [] self.width = 0 + ParallelStreamWriter.instance = self - def add_object(self, obj_index): - self.lines.append(obj_index) - self.width = max(self.width, len(obj_index)) + def add_object(self, msg, obj_index): + if msg is None: + return + self.lines.append(msg + obj_index) + self.width = max(self.width, len(msg + ' ' + obj_index)) - def write_initial(self, obj_index): - if self.msg is None: + def write_initial(self, msg, obj_index): + if msg is None: return - self.stream.write("{} {:<{width}} ... \r\n".format( - self.msg, self.lines[self.lines.index(obj_index)], width=self.width)) + self.stream.write("{:<{width}} ... \r\n".format( + msg + ' ' + obj_index, width=self.width)) self.stream.flush() - def _write_ansi(self, obj_index, status): + def _write_ansi(self, msg, obj_index, status): self.lock.acquire() - position = self.lines.index(obj_index) + position = self.lines.index(msg + obj_index) diff = len(self.lines) - position # move up self.stream.write("%c[%dA" % (27, diff)) # erase self.stream.write("%c[2K\r" % 27) - self.stream.write("{} {:<{width}} ... {}\r".format(self.msg, obj_index, + self.stream.write("{:<{width}} ... {}\r".format(msg + ' ' + obj_index, status, width=self.width)) # move back down self.stream.write("%c[%dB" % (27, diff)) self.stream.flush() self.lock.release() - def _write_noansi(self, obj_index, status): - self.stream.write("{} {:<{width}} ... {}\r\n".format(self.msg, obj_index, + def _write_noansi(self, msg, obj_index, status): + self.stream.write("{:<{width}} ... {}\r\n".format(msg + ' ' + obj_index, status, width=self.width)) self.stream.flush() - def write(self, obj_index, status, color_func): - if self.msg is None: + def write(self, msg, obj_index, status, color_func): + if msg is None: return if self.noansi: - self._write_noansi(obj_index, status) + self._write_noansi(msg, obj_index, status) else: - self._write_ansi(obj_index, color_func(status)) + self._write_ansi(msg, obj_index, color_func(status)) def parallel_operation(containers, operation, options, message): |