summaryrefslogtreecommitdiff
path: root/docker/credentials/store.py
blob: 001788897888bbf485be90d9bb4cc75fe1f9cf54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import errno
import json
import subprocess

import six

from . import constants
from . import errors
from .utils import create_environment_dict
from .utils import find_executable


class Store(object):
    def __init__(self, program, environment=None):
        """ Create a store object that acts as an interface to
            perform the basic operations for storing, retrieving
            and erasing credentials using `program`.
        """
        self.program = constants.PROGRAM_PREFIX + program
        self.exe = find_executable(self.program)
        self.environment = environment
        if self.exe is None:
            raise errors.InitializationError(
                '{} not installed or not available in PATH'.format(
                    self.program
                )
            )

    def get(self, server):
        """ Retrieve credentials for `server`. If no credentials are found,
            a `StoreError` will be raised.
        """
        if not isinstance(server, six.binary_type):
            server = server.encode('utf-8')
        data = self._execute('get', server)
        result = json.loads(data.decode('utf-8'))

        # docker-credential-pass will return an object for inexistent servers
        # whereas other helpers will exit with returncode != 0. For
        # consistency, if no significant data is returned,
        # raise CredentialsNotFound
        if result['Username'] == '' and result['Secret'] == '':
            raise errors.CredentialsNotFound(
                'No matching credentials in {}'.format(self.program)
            )

        return result

    def store(self, server, username, secret):
        """ Store credentials for `server`. Raises a `StoreError` if an error
            occurs.
        """
        data_input = json.dumps({
            'ServerURL': server,
            'Username': username,
            'Secret': secret
        }).encode('utf-8')
        return self._execute('store', data_input)

    def erase(self, server):
        """ Erase credentials for `server`. Raises a `StoreError` if an error
            occurs.
        """
        if not isinstance(server, six.binary_type):
            server = server.encode('utf-8')
        self._execute('erase', server)

    def list(self):
        """ List stored credentials. Requires v0.4.0+ of the helper.
        """
        data = self._execute('list', None)
        return json.loads(data.decode('utf-8'))

    def _execute(self, subcmd, data_input):
        output = None
        env = create_environment_dict(self.environment)
        try:
            if six.PY3:
                output = subprocess.check_output(
                    [self.exe, subcmd], input=data_input, env=env,
                )
            else:
                process = subprocess.Popen(
                    [self.exe, subcmd], stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE, env=env,
                )
                output, _ = process.communicate(data_input)
                if process.returncode != 0:
                    raise subprocess.CalledProcessError(
                        returncode=process.returncode, cmd='', output=output
                    )
        except subprocess.CalledProcessError as e:
            raise errors.process_store_error(e, self.program)
        except OSError as e:
            if e.errno == errno.ENOENT:
                raise errors.StoreError(
                    '{} not installed or not available in PATH'.format(
                        self.program
                    )
                )
            else:
                raise errors.StoreError(
                    'Unexpected OS error "{}", errno={}'.format(
                        e.strerror, e.errno
                    )
                )
        return output