diff options
author | Jelmer Vernooij <jelmer@jelmer.uk> | 2016-07-03 20:40:01 +0000 |
---|---|---|
committer | Jelmer Vernooij <jelmer@jelmer.uk> | 2016-07-03 20:40:01 +0000 |
commit | 2e526af1098f8f188ef780296dc0a504c539fafe (patch) | |
tree | b824e921bbd66c14825ae32d3413c8335032147c | |
parent | df6caa92770eb49429bf6ee23f6cebebfe44b9e5 (diff) | |
parent | d29eabf0d798aadfcf7cb881d3f9ad403c99542b (diff) |
Imported Upstream version 0.4.3
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | .travis.yml | 11 | ||||
-rw-r--r-- | AUTHORS | 37 | ||||
-rw-r--r-- | NEWS.txt | 15 | ||||
-rw-r--r-- | README.rst | 5 | ||||
-rw-r--r-- | buildout.cfg | 20 | ||||
-rw-r--r-- | docs-source/conf.py | 4 | ||||
-rw-r--r-- | setup.py | 11 | ||||
-rw-r--r-- | src/etcd/__init__.py | 22 | ||||
-rw-r--r-- | src/etcd/auth.py | 255 | ||||
-rw-r--r-- | src/etcd/client.py | 258 | ||||
-rw-r--r-- | src/etcd/lock.py | 2 | ||||
-rw-r--r-- | src/etcd/tests/integration/helpers.py | 12 | ||||
-rw-r--r-- | src/etcd/tests/integration/test_simple.py | 3 | ||||
-rw-r--r-- | src/etcd/tests/test_auth.py | 161 | ||||
-rw-r--r-- | src/etcd/tests/unit/__init__.py | 3 | ||||
-rw-r--r-- | src/etcd/tests/unit/test_client.py | 62 | ||||
-rw-r--r-- | src/etcd/tests/unit/test_lock.py | 38 | ||||
-rw-r--r-- | src/etcd/tests/unit/test_request.py | 81 |
19 files changed, 870 insertions, 131 deletions
@@ -12,5 +12,4 @@ tmp build
dist
docs
-etcd
.coverage
diff --git a/.travis.yml b/.travis.yml index 46a2576..2c3ba50 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,20 +1,23 @@ language: python python: - "2.7" - - "3.3" + - "3.5" before_install: - - ./build_etcd.sh v2.0.10 + - ./build_etcd.sh v2.2.0 - pip install --upgrade setuptools # command to install dependencies -install: +install: + - pip install coveralls + - pip install coverage - python bootstrap.py - bin/buildout # command to run tests script: - PATH=$PATH:./etcd/bin bin/test + PATH=$PATH:./etcd/bin coverage run --source=src/etcd --omit="src/etcd/tests/*" bin/test +after_success: coveralls # Add env var to detect it during build env: TRAVIS=True @@ -0,0 +1,37 @@ +Maintainers: +----------- +Jose Plana (jplana) +Giuseppe Lavagetto (lavagetto) + +Contributors: +------------ +Aleksandar Veselinovic +Alex Chan +Alex Ianchici +Bartlomiej Biernacki +Bradley Cicenas +Christoph Heer +Hogenmiller +Jimmy Zelinskie +Jim Rollenhagen +John Kristensen +Joshua Conner +Matthias Urlichs +Michal Witkowski +Nick Bartos +Peter Wagner +Roberto Aguilar +Roy Smith +Ryan Fowler +Samuel Marks +Sergio Castaño Arteaga +Shaun Crampton +Sigmund Augdal +Simeon Visser +Simon Gomizelj +SkyLothar +Spike Curtis +Tomas Kral +Tom Denham +WillPlatnick +WooParadog @@ -1,9 +1,22 @@ News ==== +0.4.3 +----- +*Release date: 14-Dec-2015* + +* Fix check for parameters in case of connection error +* Python 3.5 compatibility and general python3 cleanups +* Added authentication and module for managing ACLs +* Added srv record-based DNS discovery +* Fixed (again) logging of cluster id changes +* Fixed leader lookup +* Properly retry request on exception +* Client: clean up open connections when deleting + 0.4.2 ----- -*Release data: 8-Oct-2015* +*Release date: 8-Oct-2015* * Fixed lock documentation * Fixed lock sequences due to etcd 2.2 change @@ -8,6 +8,9 @@ Official documentation: http://python-etcd.readthedocs.org/ .. image:: https://travis-ci.org/jplana/python-etcd.png?branch=master :target: https://travis-ci.org/jplana/python-etcd +.. image:: https://coveralls.io/repos/jplana/python-etcd/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/jplana/python-etcd?branch=master + Installation ------------ @@ -41,6 +44,8 @@ Create a client object client = etcd.Client(port=4002) client = etcd.Client(host='127.0.0.1', port=4003) client = etcd.Client(host='127.0.0.1', port=4003, allow_redirect=False) # wont let you run sensitive commands on non-leader machines, default is true + # If you have defined a SRV record for _etcd._tcp.example.com pointing to the clients + client = etcd.Client(srv_domain='example.com', protocol="https") # create a client against https://api.example.com:443/etcd client = etcd.Client(host='api.example.com', protocol='https', port=443, version_prefix='/etcd') Write a key diff --git a/buildout.cfg b/buildout.cfg index 9aaf66e..4de9036 100644 --- a/buildout.cfg +++ b/buildout.cfg @@ -2,10 +2,12 @@ parts = python sphinxbuilder test + coverage develop = . eggs = - urllib3==1.7 + urllib3==1.7.1 pyOpenSSL==0.13.1 + ${deps:extraeggs} [python] recipe = zc.recipe.egg @@ -17,7 +19,23 @@ recipe = pbp.recipe.noserunner eggs = ${python:eggs} mock +[coverage] +recipe = pbp.recipe.noserunner +eggs = ${test:eggs} + coverage +defaults = --with-coverage + --cover-package=etcd + [sphinxbuilder] recipe = collective.recipe.sphinxbuilder source = ${buildout:directory}/docs-source build = ${buildout:directory}/docs + + +[deps:python2] +extraeggs = + dnspython==1.12.0 + +[deps:python3] +extraeggs = + dnspython3==1.12.0 diff --git a/docs-source/conf.py b/docs-source/conf.py index 996cb61..5148c23 100644 --- a/docs-source/conf.py +++ b/docs-source/conf.py @@ -52,7 +52,7 @@ master_doc = 'index' # General information about the project. project = u'python-etcd' -copyright = u'2013, Jose Plana' +copyright = u'2013-2015 Jose Plana, Giuseppe Lavagetto' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the @@ -61,7 +61,7 @@ copyright = u'2013, Jose Plana' # The short X.Y version. version = '0.4' # The full version, including alpha/beta/rc tags. -release = '0.4.2' +release = '0.4.3' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -6,10 +6,17 @@ README = open(os.path.join(here, 'README.rst')).read() NEWS = open(os.path.join(here, 'NEWS.txt')).read() -version = '0.4.2' +version = '0.4.3' + +# Dnspython is two different packages depending on python version +if sys.version_info.major == 2: + dns = 'dnspython' +else: + dns = 'dnspython3' install_requires = [ - 'urllib3>=1.7' + 'urllib3>=1.7.1', + dns ] test_requires = [ diff --git a/src/etcd/__init__.py b/src/etcd/__init__.py index b532be6..f52852c 100644 --- a/src/etcd/__init__.py +++ b/src/etcd/__init__.py @@ -200,6 +200,20 @@ class EtcdConnectionFailed(EtcdException): self.cause = cause +class EtcdInsufficientPermissions(EtcdException): + """ + Request failed because of insufficient permissions. + """ + pass + + +class EtcdWatchTimedOut(EtcdConnectionFailed): + """ + A watch timed out without returning a result. + """ + pass + + class EtcdWatcherCleared(EtcdException): """ Watcher is cleared due to etcd recovery. @@ -246,6 +260,7 @@ class EtcdError(object): 107: EtcdRootReadOnly, 108: EtcdDirNotEmpty, # 109: Non-public: existing peer addr. + 110: EtcdInsufficientPermissions, 200: EtcdValueError, 201: EtcdValueError, @@ -277,6 +292,13 @@ class EtcdError(object): message = payload.get("message") cause = payload.get("cause") msg = '{} : {}'.format(message, cause) + status = payload.get("status") + # Some general status handling, as + # not all endpoints return coherent error messages + if status == 404: + error_code = 100 + elif status == 401: + error_code = 110 exc = cls.error_exceptions.get(error_code, EtcdException) if issubclass(exc, EtcdException): raise exc(msg, payload) diff --git a/src/etcd/auth.py b/src/etcd/auth.py new file mode 100644 index 0000000..796772d --- /dev/null +++ b/src/etcd/auth.py @@ -0,0 +1,255 @@ +import json + +import logging +import etcd + +_log = logging.getLogger(__name__) + + +class EtcdAuthBase(object): + entity = 'example' + + def __init__(self, client, name): + self.client = client + self.name = name + self.uri = "{}/auth/{}s/{}".format(self.client.version_prefix, + self.entity, self.name) + + @property + def names(self): + key = "{}s".format(self.entity) + uri = "{}/auth/{}".format(self.client.version_prefix, key) + response = self.client.api_execute(uri, self.client._MGET) + return json.loads(response.data.decode('utf-8'))[key] + + def read(self): + try: + response = self.client.api_execute(self.uri, self.client._MGET) + except etcd.EtcdInsufficientPermissions as e: + _log.error("Any action on the authorization requires the root role") + raise + except etcd.EtcdKeyNotFound: + _log.info("%s '%s' not found", self.entity, self.name) + raise + except Exception as e: + _log.error("Failed to fetch %s in %s%s: %r", + self.entity, self.client._base_uri, + self.client.version_prefix, e) + raise etcd.EtcdException( + "Could not fetch {} '{}'".format(self.entity, self.name)) + + self._from_net(response.data) + + def write(self): + try: + r = self.__class__(self.client, self.name) + r.read() + except etcd.EtcdKeyNotFound: + r = None + try: + for payload in self._to_net(r): + response = self.client.api_execute_json(self.uri, + self.client._MPUT, + params=payload) + # This will fail if the response is an error + self._from_net(response.data) + except etcd.EtcdInsufficientPermissions as e: + _log.error("Any action on the authorization requires the root role") + raise + except Exception as e: + _log.error("Failed to write %s '%s'", self.entity, self.name) + # TODO: fine-grained exception handling + raise etcd.EtcdException( + "Could not write {} '{}': {}".format(self.entity, + self.name, e)) + + def delete(self): + try: + _ = self.client.api_execute(self.uri, self.client._MDELETE) + except etcd.EtcdInsufficientPermissions as e: + _log.error("Any action on the authorization requires the root role") + raise + except etcd.EtcdKeyNotFound: + _log.info("%s '%s' not found", self.entity, self.name) + raise + except Exception as e: + _log.error("Failed to delete %s in %s%s: %r", + self.entity, self._base_uri, self.version_prefix, e) + raise etcd.EtcdException( + "Could not delete {} '{}'".format(self.entity, self.name)) + + def _from_net(self, data): + raise NotImplementedError() + + def _to_net(self, old=None): + raise NotImplementedError() + + @classmethod + def new(cls, client, data): + c = cls(client, data[cls.entity]) + c._from_net(data) + return c + + +class EtcdUser(EtcdAuthBase): + """Class to manage in a orm-like way etcd users""" + entity = 'user' + + def __init__(self, client, name): + super(EtcdUser, self).__init__(client, name) + self._roles = set() + self._password = None + + def _from_net(self, data): + d = json.loads(data.decode('utf-8')) + self.roles = d.get('roles', []) + self.name = d.get('user') + + def _to_net(self, prevobj=None): + if prevobj is None: + retval = [{"user": self.name, "password": self._password, + "roles": list(self.roles)}] + else: + retval = [] + if self._password: + retval.append({"user": self.name, "password": self._password}) + to_grant = list(self.roles - prevobj.roles) + to_revoke = list(prevobj.roles - self.roles) + if to_grant: + retval.append({"user": self.name, "grant": to_grant}) + if to_revoke: + retval.append({"user": self.name, "revoke": to_revoke}) + # Let's blank the password now + # Even if the user can't be written we don't want it to leak anymore. + self._password = None + return retval + + @property + def roles(self): + return self._roles + + @roles.setter + def roles(self, val): + self._roles = set(val) + + @property + def password(self): + """Empty property for password.""" + return None + + @password.setter + def password(self, new_password): + """Change user's password.""" + self._password = new_password + + def __str__(self): + return json.dumps(self._to_net()[0]) + + + +class EtcdRole(EtcdAuthBase): + entity = 'role' + + def __init__(self, client, name): + super(EtcdRole, self).__init__(client, name) + self._read_paths = set() + self._write_paths = set() + + def _from_net(self, data): + d = json.loads(data.decode('utf-8')) + self.name = d.get('role') + + try: + kv = d["permissions"]["kv"] + except: + self._read_paths = set() + self._write_paths = set() + return + + self._read_paths = set(kv.get('read', [])) + self._write_paths = set(kv.get('write', [])) + + def _to_net(self, prevobj=None): + retval = [] + if prevobj is None: + retval.append({ + "role": self.name, + "permissions": + { + "kv": + { + "read": list(self._read_paths), + "write": list(self._write_paths) + } + } + }) + else: + to_grant = { + 'read': list(self._read_paths - prevobj._read_paths), + 'write': list(self._write_paths - prevobj._write_paths) + } + to_revoke = { + 'read': list(prevobj._read_paths - self._read_paths), + 'write': list(prevobj._write_paths - self._write_paths) + } + if [path for sublist in to_revoke.values() for path in sublist]: + retval.append({'role': self.name, 'revoke': {'kv': to_revoke}}) + if [path for sublist in to_grant.values() for path in sublist]: + retval.append({'role': self.name, 'grant': {'kv': to_grant}}) + return retval + + def grant(self, path, permission): + if permission.upper().find('R') >= 0: + self._read_paths.add(path) + if permission.upper().find('W') >= 0: + self._write_paths.add(path) + + def revoke(self, path, permission): + if permission.upper().find('R') >= 0 and \ + path in self._read_paths: + self._read_paths.remove(path) + if permission.upper().find('W') >= 0 and \ + path in self._write_paths: + self._write_paths.remove(path) + + @property + def acls(self): + perms = {} + try: + for path in self._read_paths: + perms[path] = 'R' + for path in self._write_paths: + if path in perms: + perms[path] += 'W' + else: + perms[path] = 'W' + except: + pass + return perms + + @acls.setter + def acls(self, acls): + self._read_paths = set() + self._write_paths = set() + for path, permission in acls.items(): + self.grant(path, permission) + + def __str__(self): + return json.dumps({"role": self.name, 'acls': self.acls}) + + +class Auth(object): + def __init__(self, client): + self.client = client + self.uri = "{}/auth/enable".format(self.client.version_prefix) + + @property + def active(self): + resp = self.client.api_execute(self.uri, self.client._MGET) + return json.loads(resp.data.decode('utf-8'))['enabled'] + + @active.setter + def active(self, value): + if value != self.active: + method = value and self.client._MPUT or self.client._MDELETE + self.client.api_execute(self.uri, method) diff --git a/src/etcd/client.py b/src/etcd/client.py index 03b0451..afeabe8 100644 --- a/src/etcd/client.py +++ b/src/etcd/client.py @@ -18,6 +18,8 @@ import urllib3 import urllib3.util import json import ssl +import dns.resolver +from functools import wraps import etcd try: @@ -42,16 +44,22 @@ class Client(object): _comparison_conditions = set(('prevValue', 'prevIndex', 'prevExist')) _read_options = set(('recursive', 'wait', 'waitIndex', 'sorted', 'quorum')) _del_conditions = set(('prevValue', 'prevIndex')) + + http = None + def __init__( self, host='127.0.0.1', port=4001, + srv_domain=None, version_prefix='/v2', read_timeout=60, allow_redirect=True, protocol='http', cert=None, ca_cert=None, + username=None, + password=None, allow_reconnect=False, use_proxies=False, expected_cluster_id=None, @@ -67,6 +75,8 @@ class Client(object): port (int): Port used to connect to etcd. + srv_domain (str): Domain to search the SRV record for cluster autodiscovery. + version_prefix (str): Url or version prefix in etcd url (default=/v2). read_timeout (int): max seconds to wait for a read. @@ -81,6 +91,10 @@ class Client(object): ca_cert (str): The ca certificate. If pressent it will enable validation. + username (str): username for etcd authentication. + + password (str): password for etcd authentication. + allow_reconnect (bool): allow the client to reconnect to another etcd server in the cluster in the case the default one does not respond. @@ -98,8 +112,15 @@ class Client(object): by host. By default this will use up to 10 connections. """ - _log.debug("New etcd client created for %s:%s%s", - host, port, version_prefix) + + # If a DNS record is provided, use it to get the hosts list + if srv_domain is not None: + try: + host = self._discover(srv_domain) + except Exception as e: + _log.error("Could not discover the etcd hosts from %s: %s", + srv_domain, e) + self._protocol = protocol def uri(protocol, host, port): @@ -151,8 +172,20 @@ class Client(object): kw['ca_certs'] = ca_cert kw['cert_reqs'] = ssl.CERT_REQUIRED + self.username = None + self.password = None + if username and password: + self.username = username + self.password = password + elif username: + _log.warning('Username provided without password, both are required for authentication') + elif password: + _log.warning('Password provided without username, both are required for authentication') + self.http = urllib3.PoolManager(num_pools=10, **kw) + _log.debug("New etcd client created for %s", self.base_uri) + if self._allow_reconnect: # we need the set of servers in the cluster in order to try # reconnecting upon error. The cluster members will be @@ -174,6 +207,27 @@ class Client(object): _log.debug("Machines cache initialised to %s", self._machines_cache) + def _discover(self, domain): + srv_name = "_etcd._tcp.{}".format(domain) + answers = dns.resolver.query(srv_name, 'SRV') + hosts = [] + for answer in answers: + hosts.append( + (answer.target.to_text(omit_final_dot=True), answer.port)) + _log.debug("Found %s", hosts) + if not len(hosts): + raise ValueError("The SRV record is present but no host were found") + return tuple(hosts) + + def __del__(self): + """Clean up open connections""" + if self.http is not None: + try: + self.http.clear() + except ReferenceError: + # this may hit an already-cleared weakref + pass + @property def base_uri(self): """URI used by the client to connect to etcd.""" @@ -221,6 +275,7 @@ class Client(object): response = self.http.request( self._MGET, uri, + headers=self._get_headers(), timeout=self.read_timeout, redirect=self.allow_redirect) @@ -278,9 +333,9 @@ class Client(object): try: leader = json.loads( - self.api_execute(self.version_prefix + '/stats/leader', + self.api_execute(self.version_prefix + '/stats/self', self._MGET).data.decode('utf-8')) - return self.members[leader['leader']] + return self.members[leader['leaderInfo']['leader']] except Exception as e: raise etcd.EtcdException("Cannot get leader data: %s" % e) @@ -718,85 +773,123 @@ class Client(object): _log.info("Selected new etcd server %s", mach) return mach + def _wrap_request(payload): + @wraps(payload) + def wrapper(self, path, method, params=None, timeout=None): + some_request_failed = False + response = False + + if timeout is None: + timeout = self.read_timeout + + if timeout == 0: + timeout = None + + if not path.startswith('/'): + raise ValueError('Path does not start with /') + + while not response: + try: + response = payload(self, path, method, + params=params, timeout=timeout) + # Check the cluster ID hasn't changed under us. We use + # preload_content=False above so we can read the headers + # before we wait for the content of a watch. + self._check_cluster_id(response) + # Now force the data to be preloaded in order to trigger any + # IO-related errors in this method rather than when we try to + # access it later. + _ = response.data + # urllib3 doesn't wrap all httplib exceptions and earlier versions + # don't wrap socket errors either. + except (urllib3.exceptions.HTTPError, + HTTPException, socket.error) as e: + if (isinstance(params, dict) and + params.get("wait") == "true" and + isinstance(e, + urllib3.exceptions.ReadTimeoutError)): + _log.debug("Watch timed out.") + raise etcd.EtcdWatchTimedOut( + "Watch timed out: %r" % e, + cause=e + ) + _log.error("Request to server %s failed: %r", + self._base_uri, e) + if self._allow_reconnect: + _log.info("Reconnection allowed, looking for another " + "server.") + # _next_server() raises EtcdException if there are no + # machines left to try, breaking out of the loop. + self._base_uri = self._next_server(cause=e) + some_request_failed = True + + # if exception is raised on _ = response.data + # the condition for while loop will be False + # but we should retry + response = False + else: + _log.debug("Reconnection disabled, giving up.") + raise etcd.EtcdConnectionFailed( + "Connection to etcd failed due to %r" % e, + cause=e + ) + except etcd.EtcdClusterIdChanged as e: + _log.warning(e) + raise + except: + _log.exception("Unexpected request failure, re-raising.") + raise + + if some_request_failed: + if not self._use_proxies: + # The cluster may have changed since last invocation + self._machines_cache = self.machines + self._machines_cache.remove(self._base_uri) + return self._handle_server_response(response) + return wrapper + + @_wrap_request def api_execute(self, path, method, params=None, timeout=None): """ Executes the query. """ - - some_request_failed = False - response = False - - if timeout is None: - timeout = self.read_timeout - - if timeout == 0: - timeout = None - - if not path.startswith('/'): - raise ValueError('Path does not start with /') - - while not response: - try: - url = self._base_uri + path - - if (method == self._MGET) or (method == self._MDELETE): - response = self.http.request( - method, - url, - timeout=timeout, - fields=params, - redirect=self.allow_redirect, - preload_content=False) - - elif (method == self._MPUT) or (method == self._MPOST): - response = self.http.request_encode_body( - method, - url, - fields=params, - timeout=timeout, - encode_multipart=False, - redirect=self.allow_redirect, - preload_content=False) - else: + url = self._base_uri + path + + if (method == self._MGET) or (method == self._MDELETE): + return self.http.request( + method, + url, + timeout=timeout, + fields=params, + redirect=self.allow_redirect, + headers=self._get_headers(), + preload_content=False) + + elif (method == self._MPUT) or (method == self._MPOST): + return self.http.request_encode_body( + method, + url, + fields=params, + timeout=timeout, + encode_multipart=False, + redirect=self.allow_redirect, + headers=self._get_headers(), + preload_content=False) + else: raise etcd.EtcdException( 'HTTP method {} not supported'.format(method)) - - # Check the cluster ID hasn't changed under us. We use - # preload_content=False above so we can read the headers - # before we wait for the content of a watch. - self._check_cluster_id(response) - # Now force the data to be preloaded in order to trigger any - # IO-related errors in this method rather than when we try to - # access it later. - _ = response.data - # urllib3 doesn't wrap all httplib exceptions and earlier versions - # don't wrap socket errors either. - except (urllib3.exceptions.HTTPError, - HTTPException, - socket.error) as e: - _log.error("Request to server %s failed: %r", - self._base_uri, e) - if self._allow_reconnect: - _log.info("Reconnection allowed, looking for another " - "server.") - # _next_server() raises EtcdException if there are no - # machines left to try, breaking out of the loop. - self._base_uri = self._next_server(cause=e) - some_request_failed = True - else: - _log.debug("Reconnection disabled, giving up.") - raise etcd.EtcdConnectionFailed( - "Connection to etcd failed due to %r" % e, - cause=e - ) - except: - _log.exception("Unexpected request failure, re-raising.") - raise - - if some_request_failed: - if not self._use_proxies: - # The cluster may have changed since last invocation - self._machines_cache = self.machines - self._machines_cache.remove(self._base_uri) - return self._handle_server_response(response) + + @_wrap_request + def api_execute_json(self, path, method, params=None, timeout=None): + url = self._base_uri + path + json_payload = json.dumps(params) + headers = self._get_headers() + headers['Content-Type'] = 'application/json' + return self.http.urlopen(method, + url, + body=json_payload, + timeout=timeout, + redirect=self.allow_redirect, + headers=headers, + preload_content=False) def _check_cluster_id(self, response): cluster_id = response.getheader("x-etcd-cluster-id") @@ -827,8 +920,15 @@ class Client(object): # throw the appropriate exception try: r = json.loads(resp) + r['status'] = response.status except (TypeError, ValueError): # Bad JSON, make a response locally. r = {"message": "Bad response", "cause": str(resp)} etcd.EtcdError.handle(r) + + def _get_headers(self): + if self.username and self.password: + credentials = ':'.join((self.username, self.password)) + return urllib3.make_headers(basic_auth=credentials) + return {} diff --git a/src/etcd/lock.py b/src/etcd/lock.py index 582e670..687a548 100644 --- a/src/etcd/lock.py +++ b/src/etcd/lock.py @@ -99,7 +99,7 @@ class Lock(object): def __exit__(self, type, value, traceback): self.release() - def _acquired(self, blocking=True, timeout=None): + def _acquired(self, blocking=True, timeout=0): locker, nearest = self._get_locker() self.is_taken = False if self.lock_key == locker: diff --git a/src/etcd/tests/integration/helpers.py b/src/etcd/tests/integration/helpers.py index 6c7e21c..1f1d22b 100644 --- a/src/etcd/tests/integration/helpers.py +++ b/src/etcd/tests/integration/helpers.py @@ -38,6 +38,12 @@ class EtcdProcessHelper(object): '-initial-cluster', initial_cluster, '-initial-cluster-state', 'new' ]) + else: + proc_args.extend([ + '-initial-cluster', 'test-node-0=http://127.0.0.1:{}'.format(self.internal_port_range_start), + '-initial-cluster-state', 'new' + ]) + for i in range(0, number): self.add_one(i, proc_args) @@ -77,12 +83,12 @@ class EtcdProcessHelper(object): def kill_one(self, slot): log = logging.getLogger() - dir, process = self.processes.pop(slot) + data_dir, process = self.processes.pop(slot) process.kill() time.sleep(2) log.debug('Killed etcd pid:%d', process.pid) - shutil.rmtree(dir) - log.debug('Removed directory %s' % dir) + shutil.rmtree(data_dir) + log.debug('Removed directory %s' % data_dir) class TestingCA(object): diff --git a/src/etcd/tests/integration/test_simple.py b/src/etcd/tests/integration/test_simple.py index da0954d..660caa8 100644 --- a/src/etcd/tests/integration/test_simple.py +++ b/src/etcd/tests/integration/test_simple.py @@ -18,6 +18,7 @@ log = logging.getLogger() class EtcdIntegrationTest(unittest.TestCase): + cl_size = 3 @classmethod def setUpClass(cls): @@ -28,7 +29,7 @@ class EtcdIntegrationTest(unittest.TestCase): proc_name=program, port_range_start=6001, internal_port_range_start=8001) - cls.processHelper.run(number=3) + cls.processHelper.run(number=cls.cl_size) cls.client = etcd.Client(port=6001) @classmethod diff --git a/src/etcd/tests/test_auth.py b/src/etcd/tests/test_auth.py new file mode 100644 index 0000000..fc6ce70 --- /dev/null +++ b/src/etcd/tests/test_auth.py @@ -0,0 +1,161 @@ +from etcd.tests.integration.test_simple import EtcdIntegrationTest +from etcd import auth +import etcd + + +class TestEtcdAuthBase(EtcdIntegrationTest): + cl_size = 1 + + def setUp(self): + # Sets up the root user, toggles auth + u = auth.EtcdUser(self.client, 'root') + u.password = 'testpass' + u.write() + self.client = etcd.Client(port=6001, username='root', + password='testpass') + self.unauth_client = etcd.Client(port=6001) + a = auth.Auth(self.client) + a.active = True + + def tearDown(self): + u = auth.EtcdUser(self.client, 'test_user') + r = auth.EtcdRole(self.client, 'test_role') + try: + u.delete() + except: + pass + try: + r.delete() + except: + pass + a = auth.Auth(self.client) + a.active = False + + +class EtcdUserTest(TestEtcdAuthBase): + def test_names(self): + u = auth.EtcdUser(self.client, 'test_user') + self.assertEquals(u.names, ['root']) + + def test_read(self): + u = auth.EtcdUser(self.client, 'root') + # Reading an existing user succeeds + try: + u.read() + except Exception: + self.fail("reading the root user raised an exception") + + # roles for said user are fetched + self.assertEquals(u.roles, set(['root'])) + + # The user is correctly rendered out + self.assertEquals(u._to_net(), [{'user': 'root', 'password': None, + 'roles': ['root']}]) + + # An inexistent user raises the appropriate exception + u = auth.EtcdUser(self.client, 'user.does.not.exist') + self.assertRaises(etcd.EtcdKeyNotFound, u.read) + + # Reading with an unauthenticated client raises an exception + u = auth.EtcdUser(self.unauth_client, 'root') + self.assertRaises(etcd.EtcdInsufficientPermissions, u.read) + + # Generic errors are caught + c = etcd.Client(port=9999) + u = auth.EtcdUser(c, 'root') + self.assertRaises(etcd.EtcdException, u.read) + + def test_write_and_delete(self): + # Create an user + u = auth.EtcdUser(self.client, 'test_user') + u.roles.add('guest') + u.roles.add('root') + # directly from my suitcase + u.password = '123456' + try: + u.write() + except: + self.fail("creating a user doesn't work") + # Password gets wiped + self.assertEquals(u.password, None) + u.read() + # Verify we can log in as this user and access the auth (it has the + # root role) + cl = etcd.Client(port=6001, username='test_user', + password='123456') + ul = auth.EtcdUser(cl, 'root') + try: + ul.read() + except etcd.EtcdInsufficientPermissions: + self.fail("Reading auth with the new user is not possible") + + self.assertEquals(u.name, "test_user") + self.assertEquals(u.roles, set(['guest', 'root'])) + # set roles as a list, it works! + u.roles = ['guest', 'test_group'] + try: + u.write() + except: + self.fail("updating a user you previously created fails") + u.read() + self.assertIn('test_group', u.roles) + + # Unauthorized access is properly handled + ua = auth.EtcdUser(self.unauth_client, 'test_user') + self.assertRaises(etcd.EtcdInsufficientPermissions, ua.write) + + # now let's test deletion + du = auth.EtcdUser(self.client, 'user.does.not.exist') + self.assertRaises(etcd.EtcdKeyNotFound, du.delete) + + # Delete test_user + u.delete() + self.assertRaises(etcd.EtcdKeyNotFound, u.read) + # Permissions are properly handled + self.assertRaises(etcd.EtcdInsufficientPermissions, ua.delete) + + +class EtcdRoleTest(TestEtcdAuthBase): + def test_names(self): + r = auth.EtcdRole(self.client, 'guest') + self.assertListEqual(r.names, [u'guest', u'root']) + + def test_read(self): + r = auth.EtcdRole(self.client, 'guest') + try: + r.read() + except: + self.fail('Reading an existing role failed') + + self.assertEquals(r.acls, {'*': 'RW'}) + # We can actually skip most other read tests as they are common + # with EtcdUser + + def test_write_and_delete(self): + r = auth.EtcdRole(self.client, 'test_role') + r.acls = {'*': 'R', '/test/*': 'RW'} + try: + r.write() + except: + self.fail("Writing a simple groups should not fail") + + r1 = auth.EtcdRole(self.client, 'test_role') + r1.read() + self.assertEquals(r1.acls, r.acls) + r.revoke('/test/*', 'W') + r.write() + r1.read() + self.assertEquals(r1.acls, {'*': 'R', '/test/*': 'R'}) + r.grant('/pub/*', 'RW') + r.write() + r1.read() + self.assertEquals(r1.acls['/pub/*'], 'RW') + # All other exceptions are tested by the user tests + r1.name = None + self.assertRaises(etcd.EtcdException, r1.write) + # ditto for delete + try: + r.delete() + except: + self.fail("A normal delete should not fail") + self.assertRaises(etcd.EtcdKeyNotFound, r.read) diff --git a/src/etcd/tests/unit/__init__.py b/src/etcd/tests/unit/__init__.py index 1dd44ab..9360b6b 100644 --- a/src/etcd/tests/unit/__init__.py +++ b/src/etcd/tests/unit/__init__.py @@ -27,8 +27,7 @@ class TestClientApiBase(unittest.TestCase): def _mock_api(self, status, d, cluster_id=None): resp = self._prepare_response(status, d, cluster_id=cluster_id) - self.client.api_execute = mock.create_autospec( - self.client.api_execute, return_value=resp) + self.client.api_execute = mock.MagicMock(return_value=resp) def _mock_exception(self, exc, msg): self.client.api_execute = mock.Mock(side_effect=exc(msg)) diff --git a/src/etcd/tests/unit/test_client.py b/src/etcd/tests/unit/test_client.py index 2e09d7c..bb05a66 100644 --- a/src/etcd/tests/unit/test_client.py +++ b/src/etcd/tests/unit/test_client.py @@ -1,5 +1,12 @@ import unittest import etcd +import dns.name +import dns.rdtypes.IN.SRV +import dns.resolver +try: + import mock +except ImportError: + from unittest import mock class TestClient(unittest.TestCase): @@ -38,6 +45,16 @@ class TestClient(unittest.TestCase): client = etcd.Client() assert client.allow_redirect + def test_default_username(self): + """ default username is None""" + client = etcd.Client() + assert client.username is None + + def test_default_password(self): + """ default username is None""" + client = etcd.Client() + assert client.password is None + def test_set_host(self): """ can change host """ client = etcd.Client(host='192.168.1.1') @@ -85,6 +102,29 @@ class TestClient(unittest.TestCase): client = etcd.Client(use_proxies = True) assert client._use_proxies + def test_set_username_only(self): + client = etcd.Client(username='username') + assert client.username is None + + def test_set_password_only(self): + client = etcd.Client(password='password') + assert client.password is None + + def test_set_username_password(self): + client = etcd.Client(username='username', password='password') + assert client.username == 'username' + assert client.password == 'password' + + def test_get_headers_with_auth(self): + client = etcd.Client(username='username', password='password') + assert client._get_headers() == { + 'authorization': 'Basic dXNlcm5hbWU6cGFzc3dvcmQ=' + } + + def test_get_headers_without_auth(self): + client = etcd.Client() + assert client._get_headers() == {} + def test_allow_reconnect(self): """ Fails if allow_reconnect is false and a list of hosts is given""" with self.assertRaises(etcd.EtcdException): @@ -97,3 +137,25 @@ class TestClient(unittest.TestCase): allow_reconnect=True, use_proxies=True, ) + + def test_discover(self): + """Tests discovery.""" + answers = [] + for i in range(1,3): + r = mock.create_autospec(dns.rdtypes.IN.SRV.SRV) + r.port = 2379 + try: + method = dns.name.from_unicode + except AttributeError: + method = dns.name.from_text + r.target = method(u'etcd{}.example.com'.format(i)) + answers.append(r) + dns.resolver.query = mock.create_autospec(dns.resolver.query, return_value=answers) + self.machines = etcd.Client.machines + etcd.Client.machines = mock.create_autospec(etcd.Client.machines, return_value=[u'https://etcd2.example.com:2379']) + c = etcd.Client(srv_domain="example.com", allow_reconnect=True, protocol="https") + etcd.Client.machines = self.machines + self.assertEquals(c.host, u'etcd1.example.com') + self.assertEquals(c.port, 2379) + self.assertEquals(c._machines_cache, + [u'https://etcd2.example.com:2379']) diff --git a/src/etcd/tests/unit/test_lock.py b/src/etcd/tests/unit/test_lock.py index 75ae676..107169f 100644 --- a/src/etcd/tests/unit/test_lock.py +++ b/src/etcd/tests/unit/test_lock.py @@ -1,5 +1,8 @@ import etcd -import mock +try: + import mock +except ImportError: + from unittest import mock from etcd.tests.unit import TestClientApiBase @@ -37,8 +40,8 @@ class TestClientLock(TestClientApiBase): Acquiring a precedingly inexistent lock works. """ l = etcd.Lock(self.client, 'test_lock') - l._find_lock = mock.create_autospec(l._find_lock, return_value=False) - l._acquired = mock.create_autospec(l._acquired, return_value=True) + l._find_lock = mock.MagicMock(spec=l._find_lock, return_value=False) + l._acquired = mock.MagicMock(spec=l._acquired, return_value=True) # Mock the write d = { u'action': u'set', @@ -87,13 +90,12 @@ class TestClientLock(TestClientApiBase): """ self.locker._sequence = '4' retval = ('/_locks/test_lock/4', None) - self.locker._get_locker = mock.create_autospec( - self.locker._get_locker, return_value=retval) + self.locker._get_locker = mock.MagicMock( + spec=self.locker._get_locker, return_value=retval) self.assertTrue(self.locker._acquired()) self.assertTrue(self.locker.is_taken) retval = ('/_locks/test_lock/1', '/_locks/test_lock/4') - self.locker._get_locker = mock.create_autospec( - self.locker._get_locker, return_value=retval) + self.locker._get_locker = mock.MagicMock(return_value=retval) self.assertFalse(self.locker._acquired(blocking=False)) self.assertFalse(self.locker.is_taken) d = { @@ -110,6 +112,27 @@ class TestClientLock(TestClientApiBase): def side_effect(): return returns.pop() + self.locker._get_locker = mock.MagicMock( + spec=self.locker._get_locker, side_effect=side_effect) + self.assertTrue(self.locker._acquired()) + + def test_acquired_no_timeout(self): + self.locker._sequence = 4 + returns = [('/_locks/test_lock/4', None), ('/_locks/test_lock/1', '/_locks/test_lock/4')] + + def side_effect(): + return returns.pop() + + d = { + u'action': u'get', + u'node': { + u'modifiedIndex': 190, + u'key': u'/_locks/test_lock/4', + u'value': self.locker.uuid + } + } + self._mock_api(200, d) + self.locker._get_locker = mock.create_autospec( self.locker._get_locker, side_effect=side_effect) self.assertTrue(self.locker._acquired()) @@ -147,7 +170,6 @@ class TestClientLock(TestClientApiBase): self.assertTrue(self.locker._find_lock()) self.assertEquals(self.locker._sequence, '34') - def test_get_locker(self): self.recursive_read() self.assertEquals((u'/_locks/test_lock/1', u'/_locks/test_lock/1'), diff --git a/src/etcd/tests/unit/test_request.py b/src/etcd/tests/unit/test_request.py index 9f54000..0942523 100644 --- a/src/etcd/tests/unit/test_request.py +++ b/src/etcd/tests/unit/test_request.py @@ -1,3 +1,6 @@ +import socket +import urllib3 + import etcd from etcd.tests.unit import TestClientApiBase @@ -66,7 +69,8 @@ class TestClientApiInternals(TestClientApiBase): self._mock_api(200, d) self.client.write('/newdir', None, dir=True) self.assertEquals(self.client.api_execute.call_args, - (('/v2/keys/newdir', 'PUT'), dict(params={'dir': 'true'}))) + (('/v2/keys/newdir', 'PUT'), + dict(params={'dir': 'true'}))) class TestClientApiInterface(TestClientApiBase): @@ -87,7 +91,9 @@ class TestClientApiInterface(TestClientApiBase): @mock.patch('etcd.Client.machines', new_callable=mock.PropertyMock) def test_use_proxies(self, mocker): """Do not overwrite the machines cache when using proxies""" - mocker.return_value = ['https://10.0.0.2:4001', 'https://10.0.0.3:4001', 'https://10.0.0.4:4001'] + mocker.return_value = ['https://10.0.0.2:4001', + 'https://10.0.0.3:4001', + 'https://10.0.0.4:4001'] c = etcd.Client( host=(('localhost', 4001), ('localproxy', 4001)), protocol='https', @@ -97,17 +103,16 @@ class TestClientApiInterface(TestClientApiBase): self.assertEquals(c._machines_cache, ['https://localproxy:4001']) self.assertEquals(c._base_uri, 'https://localhost:4001') - self.assertNotIn(c.base_uri,c._machines_cache) + self.assertNotIn(c.base_uri, c._machines_cache) c = etcd.Client( - host=(('localhost', 4001), ('10.0.0.2',4001)), + host=(('localhost', 4001), ('10.0.0.2', 4001)), protocol='https', allow_reconnect=True, use_proxies=False ) self.assertIn('https://10.0.0.3:4001', c._machines_cache) - self.assertNotIn(c.base_uri,c._machines_cache) - + self.assertNotIn(c.base_uri, c._machines_cache) def test_members(self): """ Can request machines """ @@ -157,7 +162,7 @@ class TestClientApiInterface(TestClientApiBase): """ Can request the leader """ members = {"ce2a822cea30bfca": {"id": "ce2a822cea30bfca", "name": "default"}} mocker.return_value = members - self._mock_api(200, {"leader": "ce2a822cea30bfca", "followers": {}}) + self._mock_api(200, {"leaderInfo":{"leader": "ce2a822cea30bfca", "followers": {}}}) self.assertEquals(self.client.leader, members["ce2a822cea30bfca"]) def test_set_plain(self): @@ -395,12 +400,9 @@ class TestClientRequest(TestClientApiInterface): def _mock_api(self, status, d, cluster_id=None): resp = self._prepare_response(status, d) resp.getheader.return_value = cluster_id or "abcdef1234" - self.client.http.request_encode_body = mock.create_autospec( - self.client.http.request_encode_body, return_value=resp - ) - self.client.http.request = mock.create_autospec( - self.client.http.request, return_value=resp - ) + self.client.http.request_encode_body = mock.MagicMock( + return_value=resp) + self.client.http.request = mock.MagicMock(return_value=resp) def _mock_error(self, error_code, msg, cause, method='PUT', fields=None, cluster_id=None): @@ -428,6 +430,20 @@ class TestClientRequest(TestClientApiInterface): prevValue='oldbog' ) + def test_watch_timeout(self): + """ Exception will be raised if prevValue != value in test_set """ + self.client.http.request = mock.create_autospec( + self.client.http.request, + side_effect=urllib3.exceptions.ReadTimeoutError(self.client.http, + "foo", + "Read timed out") + ) + self.assertRaises( + etcd.EtcdWatchTimedOut, + self.client.watch, + '/testKey', + ) + def test_path_without_trailing_slash(self): """ Exception will be raised if a path without a trailing slash is used """ self.assertRaises(ValueError, self.client.api_execute, @@ -440,20 +456,32 @@ class TestClientRequest(TestClientApiInterface): def test_read_cluster_id_changed(self): """ Read timeout set to the default """ - d = {u'action': u'set', - u'node': { + d = { + u'action': u'set', + u'node': { u'expiration': u'2013-09-14T00:56:59.316195568+02:00', u'modifiedIndex': 6, u'key': u'/testkey', u'ttl': 19, - u'value': u'test' - } - } + u'value': u'test', + } + } self._mock_api(200, d, cluster_id="notabcd1234") self.assertRaises(etcd.EtcdClusterIdChanged, self.client.read, '/testkey') self.client.read("/testkey") + def test_read_connection_error(self): + self.client.http.request = mock.create_autospec( + self.client.http.request, + side_effect=socket.error() + ) + self.assertRaises(etcd.EtcdConnectionFailed, + self.client.read, '/something') + # Direct GET request + self.assertRaises(etcd.EtcdConnectionFailed, + self.client.api_execute, '/a', 'GET') + def test_not_in(self): pass @@ -462,22 +490,23 @@ class TestClientRequest(TestClientApiInterface): def test_update_fails(self): """ Non-atomic updates fail """ - d = {u'action': u'set', - u'node': { + d = { + u'action': u'set', + u'node': { u'expiration': u'2013-09-14T00:56:59.316195568+02:00', u'modifiedIndex': 6, u'key': u'/testkey', u'ttl': 19, u'value': u'test' - } - } + } + } res = etcd.EtcdResult(**d) error = { - "errorCode":101, - "message":"Compare failed", - "cause":"[ != bar] [7 != 6]", - "index":6} + "errorCode": 101, + "message": "Compare failed", + "cause": "[ != bar] [7 != 6]", + "index": 6} self._mock_api(412, error) res.value = 'bar' self.assertRaises(ValueError, self.client.update, res) |