summaryrefslogtreecommitdiff
path: root/compose/parallel.py
diff options
context:
space:
mode:
authorDaniel Nephin <dnephin@docker.com>2016-03-01 16:39:43 -0500
committerDaniel Nephin <dnephin@docker.com>2016-03-01 16:39:43 -0500
commited4473c849cc3e9029dc7894ade716d791c918c6 (patch)
tree9defbd6b093cedee36853a0a3c8adadf3159dada /compose/parallel.py
parentfc99c7ee1981a4dc2b125bfd08d4c31cbdb5435a (diff)
Fix signal handling with pyinstaller.
Raise a ShutdownException instead of a KeyboardInterupt when a thread.error is caught. This thread.error is only raised when run from a pyinstaller binary (for reasons unknown). Signed-off-by: Daniel Nephin <dnephin@docker.com>
Diffstat (limited to 'compose/parallel.py')
-rw-r--r--compose/parallel.py36
1 files changed, 23 insertions, 13 deletions
diff --git a/compose/parallel.py b/compose/parallel.py
index b8415e5e..4810a106 100644
--- a/compose/parallel.py
+++ b/compose/parallel.py
@@ -6,9 +6,11 @@ import sys
from threading import Thread
from docker.errors import APIError
+from six.moves import _thread as thread
from six.moves.queue import Empty
from six.moves.queue import Queue
+from compose.cli.signals import ShutdownException
from compose.utils import get_output_stream
@@ -26,19 +28,7 @@ def parallel_execute(objects, func, index_func, msg):
objects = list(objects)
stream = get_output_stream(sys.stderr)
writer = ParallelStreamWriter(stream, msg)
-
- for obj in objects:
- writer.initialize(index_func(obj))
-
- q = Queue()
-
- # TODO: limit the number of threads #1828
- for obj in objects:
- t = Thread(
- target=perform_operation,
- args=(func, obj, q.put, index_func(obj)))
- t.daemon = True
- t.start()
+ q = setup_queue(writer, objects, func, index_func)
done = 0
errors = {}
@@ -48,6 +38,9 @@ def parallel_execute(objects, func, index_func, msg):
msg_index, result = q.get(timeout=1)
except Empty:
continue
+ # See https://github.com/docker/compose/issues/189
+ except thread.error:
+ raise ShutdownException()
if isinstance(result, APIError):
errors[msg_index] = "error", result.explanation
@@ -68,6 +61,23 @@ def parallel_execute(objects, func, index_func, msg):
raise error
+def setup_queue(writer, objects, func, index_func):
+ for obj in objects:
+ writer.initialize(index_func(obj))
+
+ q = Queue()
+
+ # TODO: limit the number of threads #1828
+ for obj in objects:
+ t = Thread(
+ target=perform_operation,
+ args=(func, obj, q.put, index_func(obj)))
+ t.daemon = True
+ t.start()
+
+ return q
+
+
class ParallelStreamWriter(object):
"""Write out messages for operations happening in parallel.