summaryrefslogtreecommitdiff
path: root/compose/parallel.py
blob: 4810a10644010e05949e2489d022553d97b93d9b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from __future__ import absolute_import
from __future__ import unicode_literals

import operator
import sys
from threading import Thread

from docker.errors import APIError
from six.moves import _thread as thread
from six.moves.queue import Empty
from six.moves.queue import Queue

from compose.cli.signals import ShutdownException
from compose.utils import get_output_stream


def perform_operation(func, arg, callback, index):
    try:
        callback((index, func(arg)))
    except Exception as e:
        callback((index, e))


def parallel_execute(objects, func, index_func, msg):
    """For a given list of objects, call the callable passing in the first
    object we give it.
    """
    objects = list(objects)
    stream = get_output_stream(sys.stderr)
    writer = ParallelStreamWriter(stream, msg)
    q = setup_queue(writer, objects, func, index_func)

    done = 0
    errors = {}

    while done < len(objects):
        try:
            msg_index, result = q.get(timeout=1)
        except Empty:
            continue
        # See https://github.com/docker/compose/issues/189
        except thread.error:
            raise ShutdownException()

        if isinstance(result, APIError):
            errors[msg_index] = "error", result.explanation
            writer.write(msg_index, 'error')
        elif isinstance(result, Exception):
            errors[msg_index] = "unexpected_exception", result
        else:
            writer.write(msg_index, 'done')
        done += 1

    if not errors:
        return

    stream.write("\n")
    for msg_index, (result, error) in errors.items():
        stream.write("ERROR: for {}  {} \n".format(msg_index, error))
        if result == 'unexpected_exception':
            raise error


def setup_queue(writer, objects, func, index_func):
    for obj in objects:
        writer.initialize(index_func(obj))

    q = Queue()

    # TODO: limit the number of threads #1828
    for obj in objects:
        t = Thread(
            target=perform_operation,
            args=(func, obj, q.put, index_func(obj)))
        t.daemon = True
        t.start()

    return q


class ParallelStreamWriter(object):
    """Write out messages for operations happening in parallel.

    Each operation has it's own line, and ANSI code characters are used
    to jump to the correct line, and write over the line.
    """

    def __init__(self, stream, msg):
        self.stream = stream
        self.msg = msg
        self.lines = []

    def initialize(self, obj_index):
        self.lines.append(obj_index)
        self.stream.write("{} {} ... \r\n".format(self.msg, obj_index))
        self.stream.flush()

    def write(self, obj_index, status):
        position = self.lines.index(obj_index)
        diff = len(self.lines) - position
        # move up
        self.stream.write("%c[%dA" % (27, diff))
        # erase
        self.stream.write("%c[2K\r" % 27)
        self.stream.write("{} {} ... {}\r".format(self.msg, obj_index, status))
        # move back down
        self.stream.write("%c[%dB" % (27, diff))
        self.stream.flush()


def parallel_operation(containers, operation, options, message):
    parallel_execute(
        containers,
        operator.methodcaller(operation, **options),
        operator.attrgetter('name'),
        message)


def parallel_remove(containers, options):
    stopped_containers = [c for c in containers if not c.is_running]
    parallel_operation(stopped_containers, 'remove', options, 'Removing')


def parallel_stop(containers, options):
    parallel_operation(containers, 'stop', options, 'Stopping')


def parallel_start(containers, options):
    parallel_operation(containers, 'start', options, 'Starting')


def parallel_pause(containers, options):
    parallel_operation(containers, 'pause', options, 'Pausing')


def parallel_unpause(containers, options):
    parallel_operation(containers, 'unpause', options, 'Unpausing')


def parallel_kill(containers, options):
    parallel_operation(containers, 'kill', options, 'Killing')


def parallel_restart(containers, options):
    parallel_operation(containers, 'restart', options, 'Restarting')