diff options
author | Ilya Skriblovsky <IlyaSkriblovsky@gmail.com> | 2016-03-10 23:32:15 +0300 |
---|---|---|
committer | Ilya Skriblovsky <IlyaSkriblovsky@gmail.com> | 2016-03-14 22:55:42 +0300 |
commit | f933381a1253f5195406f80be746812a5bfa45a7 (patch) | |
tree | 6a65aecdb05c2db3ab7aa967b46f70f3d6cb9d76 /compose/parallel.py | |
parent | e5cd869c6119f75a0c33e5ed90d121ee9aa72a84 (diff) |
Dependency-ordered start/stop/up
Signed-off-by: Ilya Skriblovsky <IlyaSkriblovsky@gmail.com>
Diffstat (limited to 'compose/parallel.py')
-rw-r--r-- | compose/parallel.py | 106 |
1 files changed, 70 insertions, 36 deletions
diff --git a/compose/parallel.py b/compose/parallel.py index 4810a106..439f0f44 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -14,68 +14,98 @@ from compose.cli.signals import ShutdownException from compose.utils import get_output_stream -def perform_operation(func, arg, callback, index): - try: - callback((index, func(arg))) - except Exception as e: - callback((index, e)) +def parallel_execute(objects, func, get_name, msg, get_deps=None): + """Runs func on objects in parallel while ensuring that func is + ran on object only after it is ran on all its dependencies. - -def parallel_execute(objects, func, index_func, msg): - """For a given list of objects, call the callable passing in the first - object we give it. + get_deps called on object must return a collection with its dependencies. + get_name called on object must return its name. """ objects = list(objects) stream = get_output_stream(sys.stderr) + writer = ParallelStreamWriter(stream, msg) - q = setup_queue(writer, objects, func, index_func) + for obj in objects: + writer.initialize(get_name(obj)) + + q = setup_queue(objects, func, get_deps, get_name) done = 0 errors = {} + error_to_reraise = None + returned = [None] * len(objects) while done < len(objects): try: - msg_index, result = q.get(timeout=1) + obj, result, exception = q.get(timeout=1) except Empty: continue # See https://github.com/docker/compose/issues/189 except thread.error: raise ShutdownException() - if isinstance(result, APIError): - errors[msg_index] = "error", result.explanation - writer.write(msg_index, 'error') - elif isinstance(result, Exception): - errors[msg_index] = "unexpected_exception", result + if exception is None: + writer.write(get_name(obj), 'done') + returned[objects.index(obj)] = result + elif isinstance(exception, APIError): + errors[get_name(obj)] = exception.explanation + writer.write(get_name(obj), 'error') else: - writer.write(msg_index, 'done') + errors[get_name(obj)] = exception + error_to_reraise = exception + done += 1 - if not errors: - return + for obj_name, error in errors.items(): + stream.write("\nERROR: for {} {}\n".format(obj_name, error)) - stream.write("\n") - for msg_index, (result, error) in errors.items(): - stream.write("ERROR: for {} {} \n".format(msg_index, error)) - if result == 'unexpected_exception': - raise error + if error_to_reraise: + raise error_to_reraise + return returned -def setup_queue(writer, objects, func, index_func): - for obj in objects: - writer.initialize(index_func(obj)) - q = Queue() +def _no_deps(x): + return [] - # TODO: limit the number of threads #1828 - for obj in objects: - t = Thread( - target=perform_operation, - args=(func, obj, q.put, index_func(obj))) - t.daemon = True - t.start() - return q +def setup_queue(objects, func, get_deps, get_name): + if get_deps is None: + get_deps = _no_deps + + results = Queue() + + started = set() # objects, threads were started for + finished = set() # already finished objects + + def do_op(obj): + try: + result = func(obj) + results.put((obj, result, None)) + except Exception as e: + results.put((obj, None, e)) + + finished.add(obj) + feed() + + def ready(obj): + # Is object ready for performing operation + return obj not in started and all( + dep not in objects or dep in finished + for dep in get_deps(obj) + ) + + def feed(): + ready_objects = [o for o in objects if ready(o)] + for obj in ready_objects: + started.add(obj) + t = Thread(target=do_op, + args=(obj,)) + t.daemon = True + t.start() + + feed() + return results class ParallelStreamWriter(object): @@ -91,11 +121,15 @@ class ParallelStreamWriter(object): self.lines = [] def initialize(self, obj_index): + if self.msg is None: + return self.lines.append(obj_index) self.stream.write("{} {} ... \r\n".format(self.msg, obj_index)) self.stream.flush() def write(self, obj_index, status): + if self.msg is None: + return position = self.lines.index(obj_index) diff = len(self.lines) - position # move up |