diff options
author | Daniel Nephin <dnephin@docker.com> | 2016-03-01 16:39:43 -0500 |
---|---|---|
committer | Daniel Nephin <dnephin@docker.com> | 2016-03-01 16:39:43 -0500 |
commit | ed4473c849cc3e9029dc7894ade716d791c918c6 (patch) | |
tree | 9defbd6b093cedee36853a0a3c8adadf3159dada /compose/parallel.py | |
parent | fc99c7ee1981a4dc2b125bfd08d4c31cbdb5435a (diff) |
Fix signal handling with pyinstaller.
Raise a ShutdownException instead of a KeyboardInterupt when a thread.error
is caught. This thread.error is only raised when run from a pyinstaller
binary (for reasons unknown).
Signed-off-by: Daniel Nephin <dnephin@docker.com>
Diffstat (limited to 'compose/parallel.py')
-rw-r--r-- | compose/parallel.py | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/compose/parallel.py b/compose/parallel.py index b8415e5e..4810a106 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -6,9 +6,11 @@ import sys from threading import Thread from docker.errors import APIError +from six.moves import _thread as thread from six.moves.queue import Empty from six.moves.queue import Queue +from compose.cli.signals import ShutdownException from compose.utils import get_output_stream @@ -26,19 +28,7 @@ def parallel_execute(objects, func, index_func, msg): objects = list(objects) stream = get_output_stream(sys.stderr) writer = ParallelStreamWriter(stream, msg) - - for obj in objects: - writer.initialize(index_func(obj)) - - q = Queue() - - # TODO: limit the number of threads #1828 - for obj in objects: - t = Thread( - target=perform_operation, - args=(func, obj, q.put, index_func(obj))) - t.daemon = True - t.start() + q = setup_queue(writer, objects, func, index_func) done = 0 errors = {} @@ -48,6 +38,9 @@ def parallel_execute(objects, func, index_func, msg): msg_index, result = q.get(timeout=1) except Empty: continue + # See https://github.com/docker/compose/issues/189 + except thread.error: + raise ShutdownException() if isinstance(result, APIError): errors[msg_index] = "error", result.explanation @@ -68,6 +61,23 @@ def parallel_execute(objects, func, index_func, msg): raise error +def setup_queue(writer, objects, func, index_func): + for obj in objects: + writer.initialize(index_func(obj)) + + q = Queue() + + # TODO: limit the number of threads #1828 + for obj in objects: + t = Thread( + target=perform_operation, + args=(func, obj, q.put, index_func(obj))) + t.daemon = True + t.start() + + return q + + class ParallelStreamWriter(object): """Write out messages for operations happening in parallel. |