summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJelmer Vernooij <jelmer@jelmer.uk>2016-07-03 20:40:03 +0000
committerJelmer Vernooij <jelmer@jelmer.uk>2016-07-03 20:40:03 +0000
commitcb36aba9c5aca471774f196c0ed37fef7d9d3acb (patch)
tree46fa36621661c9362d0ea203dc44f12a78cfd5ea
parent6949ec9f5ba017563f8757d3e14f155f1ba88a37 (diff)
parent2e526af1098f8f188ef780296dc0a504c539fafe (diff)
Merge tag 'upstream/0.4.3' into unstable
Upstream version 0.4.3
-rw-r--r--.gitignore1
-rw-r--r--.travis.yml11
-rw-r--r--AUTHORS37
-rw-r--r--NEWS.txt15
-rw-r--r--README.rst5
-rw-r--r--buildout.cfg20
-rw-r--r--docs-source/conf.py4
-rw-r--r--setup.py11
-rw-r--r--src/etcd/__init__.py22
-rw-r--r--src/etcd/auth.py255
-rw-r--r--src/etcd/client.py258
-rw-r--r--src/etcd/lock.py2
-rw-r--r--src/etcd/tests/integration/helpers.py12
-rw-r--r--src/etcd/tests/integration/test_simple.py3
-rw-r--r--src/etcd/tests/test_auth.py161
-rw-r--r--src/etcd/tests/unit/__init__.py3
-rw-r--r--src/etcd/tests/unit/test_client.py62
-rw-r--r--src/etcd/tests/unit/test_lock.py38
-rw-r--r--src/etcd/tests/unit/test_request.py81
19 files changed, 870 insertions, 131 deletions
diff --git a/.gitignore b/.gitignore
index bef230c..2cf08af 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,7 +12,6 @@ tmp
build
dist
docs
-etcd
.coverage
*~
diff --git a/.travis.yml b/.travis.yml
index 46a2576..2c3ba50 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,20 +1,23 @@
language: python
python:
- "2.7"
- - "3.3"
+ - "3.5"
before_install:
- - ./build_etcd.sh v2.0.10
+ - ./build_etcd.sh v2.2.0
- pip install --upgrade setuptools
# command to install dependencies
-install:
+install:
+ - pip install coveralls
+ - pip install coverage
- python bootstrap.py
- bin/buildout
# command to run tests
script:
- PATH=$PATH:./etcd/bin bin/test
+ PATH=$PATH:./etcd/bin coverage run --source=src/etcd --omit="src/etcd/tests/*" bin/test
+after_success: coveralls
# Add env var to detect it during build
env: TRAVIS=True
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..0f07ed1
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,37 @@
+Maintainers:
+-----------
+Jose Plana (jplana)
+Giuseppe Lavagetto (lavagetto)
+
+Contributors:
+------------
+Aleksandar Veselinovic
+Alex Chan
+Alex Ianchici
+Bartlomiej Biernacki
+Bradley Cicenas
+Christoph Heer
+Hogenmiller
+Jimmy Zelinskie
+Jim Rollenhagen
+John Kristensen
+Joshua Conner
+Matthias Urlichs
+Michal Witkowski
+Nick Bartos
+Peter Wagner
+Roberto Aguilar
+Roy Smith
+Ryan Fowler
+Samuel Marks
+Sergio Castaño Arteaga
+Shaun Crampton
+Sigmund Augdal
+Simeon Visser
+Simon Gomizelj
+SkyLothar
+Spike Curtis
+Tomas Kral
+Tom Denham
+WillPlatnick
+WooParadog
diff --git a/NEWS.txt b/NEWS.txt
index 2158c21..52e7c4b 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -1,9 +1,22 @@
News
====
+0.4.3
+-----
+*Release date: 14-Dec-2015*
+
+* Fix check for parameters in case of connection error
+* Python 3.5 compatibility and general python3 cleanups
+* Added authentication and module for managing ACLs
+* Added srv record-based DNS discovery
+* Fixed (again) logging of cluster id changes
+* Fixed leader lookup
+* Properly retry request on exception
+* Client: clean up open connections when deleting
+
0.4.2
-----
-*Release data: 8-Oct-2015*
+*Release date: 8-Oct-2015*
* Fixed lock documentation
* Fixed lock sequences due to etcd 2.2 change
diff --git a/README.rst b/README.rst
index 163d257..c9d6dab 100644
--- a/README.rst
+++ b/README.rst
@@ -8,6 +8,9 @@ Official documentation: http://python-etcd.readthedocs.org/
.. image:: https://travis-ci.org/jplana/python-etcd.png?branch=master
:target: https://travis-ci.org/jplana/python-etcd
+.. image:: https://coveralls.io/repos/jplana/python-etcd/badge.svg?branch=master&service=github
+ :target: https://coveralls.io/github/jplana/python-etcd?branch=master
+
Installation
------------
@@ -41,6 +44,8 @@ Create a client object
client = etcd.Client(port=4002)
client = etcd.Client(host='127.0.0.1', port=4003)
client = etcd.Client(host='127.0.0.1', port=4003, allow_redirect=False) # wont let you run sensitive commands on non-leader machines, default is true
+ # If you have defined a SRV record for _etcd._tcp.example.com pointing to the clients
+ client = etcd.Client(srv_domain='example.com', protocol="https")
# create a client against https://api.example.com:443/etcd
client = etcd.Client(host='api.example.com', protocol='https', port=443, version_prefix='/etcd')
Write a key
diff --git a/buildout.cfg b/buildout.cfg
index 9aaf66e..4de9036 100644
--- a/buildout.cfg
+++ b/buildout.cfg
@@ -2,10 +2,12 @@
parts = python
sphinxbuilder
test
+ coverage
develop = .
eggs =
- urllib3==1.7
+ urllib3==1.7.1
pyOpenSSL==0.13.1
+ ${deps:extraeggs}
[python]
recipe = zc.recipe.egg
@@ -17,7 +19,23 @@ recipe = pbp.recipe.noserunner
eggs = ${python:eggs}
mock
+[coverage]
+recipe = pbp.recipe.noserunner
+eggs = ${test:eggs}
+ coverage
+defaults = --with-coverage
+ --cover-package=etcd
+
[sphinxbuilder]
recipe = collective.recipe.sphinxbuilder
source = ${buildout:directory}/docs-source
build = ${buildout:directory}/docs
+
+
+[deps:python2]
+extraeggs =
+ dnspython==1.12.0
+
+[deps:python3]
+extraeggs =
+ dnspython3==1.12.0
diff --git a/docs-source/conf.py b/docs-source/conf.py
index 996cb61..5148c23 100644
--- a/docs-source/conf.py
+++ b/docs-source/conf.py
@@ -52,7 +52,7 @@ master_doc = 'index'
# General information about the project.
project = u'python-etcd'
-copyright = u'2013, Jose Plana'
+copyright = u'2013-2015 Jose Plana, Giuseppe Lavagetto'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
@@ -61,7 +61,7 @@ copyright = u'2013, Jose Plana'
# The short X.Y version.
version = '0.4'
# The full version, including alpha/beta/rc tags.
-release = '0.4.2'
+release = '0.4.3'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
diff --git a/setup.py b/setup.py
index 011f798..5387ef4 100644
--- a/setup.py
+++ b/setup.py
@@ -6,10 +6,17 @@ README = open(os.path.join(here, 'README.rst')).read()
NEWS = open(os.path.join(here, 'NEWS.txt')).read()
-version = '0.4.2'
+version = '0.4.3'
+
+# Dnspython is two different packages depending on python version
+if sys.version_info.major == 2:
+ dns = 'dnspython'
+else:
+ dns = 'dnspython3'
install_requires = [
- 'urllib3>=1.7'
+ 'urllib3>=1.7.1',
+ dns
]
test_requires = [
diff --git a/src/etcd/__init__.py b/src/etcd/__init__.py
index b532be6..f52852c 100644
--- a/src/etcd/__init__.py
+++ b/src/etcd/__init__.py
@@ -200,6 +200,20 @@ class EtcdConnectionFailed(EtcdException):
self.cause = cause
+class EtcdInsufficientPermissions(EtcdException):
+ """
+ Request failed because of insufficient permissions.
+ """
+ pass
+
+
+class EtcdWatchTimedOut(EtcdConnectionFailed):
+ """
+ A watch timed out without returning a result.
+ """
+ pass
+
+
class EtcdWatcherCleared(EtcdException):
"""
Watcher is cleared due to etcd recovery.
@@ -246,6 +260,7 @@ class EtcdError(object):
107: EtcdRootReadOnly,
108: EtcdDirNotEmpty,
# 109: Non-public: existing peer addr.
+ 110: EtcdInsufficientPermissions,
200: EtcdValueError,
201: EtcdValueError,
@@ -277,6 +292,13 @@ class EtcdError(object):
message = payload.get("message")
cause = payload.get("cause")
msg = '{} : {}'.format(message, cause)
+ status = payload.get("status")
+ # Some general status handling, as
+ # not all endpoints return coherent error messages
+ if status == 404:
+ error_code = 100
+ elif status == 401:
+ error_code = 110
exc = cls.error_exceptions.get(error_code, EtcdException)
if issubclass(exc, EtcdException):
raise exc(msg, payload)
diff --git a/src/etcd/auth.py b/src/etcd/auth.py
new file mode 100644
index 0000000..796772d
--- /dev/null
+++ b/src/etcd/auth.py
@@ -0,0 +1,255 @@
+import json
+
+import logging
+import etcd
+
+_log = logging.getLogger(__name__)
+
+
+class EtcdAuthBase(object):
+ entity = 'example'
+
+ def __init__(self, client, name):
+ self.client = client
+ self.name = name
+ self.uri = "{}/auth/{}s/{}".format(self.client.version_prefix,
+ self.entity, self.name)
+
+ @property
+ def names(self):
+ key = "{}s".format(self.entity)
+ uri = "{}/auth/{}".format(self.client.version_prefix, key)
+ response = self.client.api_execute(uri, self.client._MGET)
+ return json.loads(response.data.decode('utf-8'))[key]
+
+ def read(self):
+ try:
+ response = self.client.api_execute(self.uri, self.client._MGET)
+ except etcd.EtcdInsufficientPermissions as e:
+ _log.error("Any action on the authorization requires the root role")
+ raise
+ except etcd.EtcdKeyNotFound:
+ _log.info("%s '%s' not found", self.entity, self.name)
+ raise
+ except Exception as e:
+ _log.error("Failed to fetch %s in %s%s: %r",
+ self.entity, self.client._base_uri,
+ self.client.version_prefix, e)
+ raise etcd.EtcdException(
+ "Could not fetch {} '{}'".format(self.entity, self.name))
+
+ self._from_net(response.data)
+
+ def write(self):
+ try:
+ r = self.__class__(self.client, self.name)
+ r.read()
+ except etcd.EtcdKeyNotFound:
+ r = None
+ try:
+ for payload in self._to_net(r):
+ response = self.client.api_execute_json(self.uri,
+ self.client._MPUT,
+ params=payload)
+ # This will fail if the response is an error
+ self._from_net(response.data)
+ except etcd.EtcdInsufficientPermissions as e:
+ _log.error("Any action on the authorization requires the root role")
+ raise
+ except Exception as e:
+ _log.error("Failed to write %s '%s'", self.entity, self.name)
+ # TODO: fine-grained exception handling
+ raise etcd.EtcdException(
+ "Could not write {} '{}': {}".format(self.entity,
+ self.name, e))
+
+ def delete(self):
+ try:
+ _ = self.client.api_execute(self.uri, self.client._MDELETE)
+ except etcd.EtcdInsufficientPermissions as e:
+ _log.error("Any action on the authorization requires the root role")
+ raise
+ except etcd.EtcdKeyNotFound:
+ _log.info("%s '%s' not found", self.entity, self.name)
+ raise
+ except Exception as e:
+ _log.error("Failed to delete %s in %s%s: %r",
+ self.entity, self._base_uri, self.version_prefix, e)
+ raise etcd.EtcdException(
+ "Could not delete {} '{}'".format(self.entity, self.name))
+
+ def _from_net(self, data):
+ raise NotImplementedError()
+
+ def _to_net(self, old=None):
+ raise NotImplementedError()
+
+ @classmethod
+ def new(cls, client, data):
+ c = cls(client, data[cls.entity])
+ c._from_net(data)
+ return c
+
+
+class EtcdUser(EtcdAuthBase):
+ """Class to manage in a orm-like way etcd users"""
+ entity = 'user'
+
+ def __init__(self, client, name):
+ super(EtcdUser, self).__init__(client, name)
+ self._roles = set()
+ self._password = None
+
+ def _from_net(self, data):
+ d = json.loads(data.decode('utf-8'))
+ self.roles = d.get('roles', [])
+ self.name = d.get('user')
+
+ def _to_net(self, prevobj=None):
+ if prevobj is None:
+ retval = [{"user": self.name, "password": self._password,
+ "roles": list(self.roles)}]
+ else:
+ retval = []
+ if self._password:
+ retval.append({"user": self.name, "password": self._password})
+ to_grant = list(self.roles - prevobj.roles)
+ to_revoke = list(prevobj.roles - self.roles)
+ if to_grant:
+ retval.append({"user": self.name, "grant": to_grant})
+ if to_revoke:
+ retval.append({"user": self.name, "revoke": to_revoke})
+ # Let's blank the password now
+ # Even if the user can't be written we don't want it to leak anymore.
+ self._password = None
+ return retval
+
+ @property
+ def roles(self):
+ return self._roles
+
+ @roles.setter
+ def roles(self, val):
+ self._roles = set(val)
+
+ @property
+ def password(self):
+ """Empty property for password."""
+ return None
+
+ @password.setter
+ def password(self, new_password):
+ """Change user's password."""
+ self._password = new_password
+
+ def __str__(self):
+ return json.dumps(self._to_net()[0])
+
+
+
+class EtcdRole(EtcdAuthBase):
+ entity = 'role'
+
+ def __init__(self, client, name):
+ super(EtcdRole, self).__init__(client, name)
+ self._read_paths = set()
+ self._write_paths = set()
+
+ def _from_net(self, data):
+ d = json.loads(data.decode('utf-8'))
+ self.name = d.get('role')
+
+ try:
+ kv = d["permissions"]["kv"]
+ except:
+ self._read_paths = set()
+ self._write_paths = set()
+ return
+
+ self._read_paths = set(kv.get('read', []))
+ self._write_paths = set(kv.get('write', []))
+
+ def _to_net(self, prevobj=None):
+ retval = []
+ if prevobj is None:
+ retval.append({
+ "role": self.name,
+ "permissions":
+ {
+ "kv":
+ {
+ "read": list(self._read_paths),
+ "write": list(self._write_paths)
+ }
+ }
+ })
+ else:
+ to_grant = {
+ 'read': list(self._read_paths - prevobj._read_paths),
+ 'write': list(self._write_paths - prevobj._write_paths)
+ }
+ to_revoke = {
+ 'read': list(prevobj._read_paths - self._read_paths),
+ 'write': list(prevobj._write_paths - self._write_paths)
+ }
+ if [path for sublist in to_revoke.values() for path in sublist]:
+ retval.append({'role': self.name, 'revoke': {'kv': to_revoke}})
+ if [path for sublist in to_grant.values() for path in sublist]:
+ retval.append({'role': self.name, 'grant': {'kv': to_grant}})
+ return retval
+
+ def grant(self, path, permission):
+ if permission.upper().find('R') >= 0:
+ self._read_paths.add(path)
+ if permission.upper().find('W') >= 0:
+ self._write_paths.add(path)
+
+ def revoke(self, path, permission):
+ if permission.upper().find('R') >= 0 and \
+ path in self._read_paths:
+ self._read_paths.remove(path)
+ if permission.upper().find('W') >= 0 and \
+ path in self._write_paths:
+ self._write_paths.remove(path)
+
+ @property
+ def acls(self):
+ perms = {}
+ try:
+ for path in self._read_paths:
+ perms[path] = 'R'
+ for path in self._write_paths:
+ if path in perms:
+ perms[path] += 'W'
+ else:
+ perms[path] = 'W'
+ except:
+ pass
+ return perms
+
+ @acls.setter
+ def acls(self, acls):
+ self._read_paths = set()
+ self._write_paths = set()
+ for path, permission in acls.items():
+ self.grant(path, permission)
+
+ def __str__(self):
+ return json.dumps({"role": self.name, 'acls': self.acls})
+
+
+class Auth(object):
+ def __init__(self, client):
+ self.client = client
+ self.uri = "{}/auth/enable".format(self.client.version_prefix)
+
+ @property
+ def active(self):
+ resp = self.client.api_execute(self.uri, self.client._MGET)
+ return json.loads(resp.data.decode('utf-8'))['enabled']
+
+ @active.setter
+ def active(self, value):
+ if value != self.active:
+ method = value and self.client._MPUT or self.client._MDELETE
+ self.client.api_execute(self.uri, method)
diff --git a/src/etcd/client.py b/src/etcd/client.py
index 03b0451..afeabe8 100644
--- a/src/etcd/client.py
+++ b/src/etcd/client.py
@@ -18,6 +18,8 @@ import urllib3
import urllib3.util
import json
import ssl
+import dns.resolver
+from functools import wraps
import etcd
try:
@@ -42,16 +44,22 @@ class Client(object):
_comparison_conditions = set(('prevValue', 'prevIndex', 'prevExist'))
_read_options = set(('recursive', 'wait', 'waitIndex', 'sorted', 'quorum'))
_del_conditions = set(('prevValue', 'prevIndex'))
+
+ http = None
+
def __init__(
self,
host='127.0.0.1',
port=4001,
+ srv_domain=None,
version_prefix='/v2',
read_timeout=60,
allow_redirect=True,
protocol='http',
cert=None,
ca_cert=None,
+ username=None,
+ password=None,
allow_reconnect=False,
use_proxies=False,
expected_cluster_id=None,
@@ -67,6 +75,8 @@ class Client(object):
port (int): Port used to connect to etcd.
+ srv_domain (str): Domain to search the SRV record for cluster autodiscovery.
+
version_prefix (str): Url or version prefix in etcd url (default=/v2).
read_timeout (int): max seconds to wait for a read.
@@ -81,6 +91,10 @@ class Client(object):
ca_cert (str): The ca certificate. If pressent it will enable
validation.
+ username (str): username for etcd authentication.
+
+ password (str): password for etcd authentication.
+
allow_reconnect (bool): allow the client to reconnect to another
etcd server in the cluster in the case the
default one does not respond.
@@ -98,8 +112,15 @@ class Client(object):
by host. By default this will use up to 10
connections.
"""
- _log.debug("New etcd client created for %s:%s%s",
- host, port, version_prefix)
+
+ # If a DNS record is provided, use it to get the hosts list
+ if srv_domain is not None:
+ try:
+ host = self._discover(srv_domain)
+ except Exception as e:
+ _log.error("Could not discover the etcd hosts from %s: %s",
+ srv_domain, e)
+
self._protocol = protocol
def uri(protocol, host, port):
@@ -151,8 +172,20 @@ class Client(object):
kw['ca_certs'] = ca_cert
kw['cert_reqs'] = ssl.CERT_REQUIRED
+ self.username = None
+ self.password = None
+ if username and password:
+ self.username = username
+ self.password = password
+ elif username:
+ _log.warning('Username provided without password, both are required for authentication')
+ elif password:
+ _log.warning('Password provided without username, both are required for authentication')
+
self.http = urllib3.PoolManager(num_pools=10, **kw)
+ _log.debug("New etcd client created for %s", self.base_uri)
+
if self._allow_reconnect:
# we need the set of servers in the cluster in order to try
# reconnecting upon error. The cluster members will be
@@ -174,6 +207,27 @@ class Client(object):
_log.debug("Machines cache initialised to %s",
self._machines_cache)
+ def _discover(self, domain):
+ srv_name = "_etcd._tcp.{}".format(domain)
+ answers = dns.resolver.query(srv_name, 'SRV')
+ hosts = []
+ for answer in answers:
+ hosts.append(
+ (answer.target.to_text(omit_final_dot=True), answer.port))
+ _log.debug("Found %s", hosts)
+ if not len(hosts):
+ raise ValueError("The SRV record is present but no host were found")
+ return tuple(hosts)
+
+ def __del__(self):
+ """Clean up open connections"""
+ if self.http is not None:
+ try:
+ self.http.clear()
+ except ReferenceError:
+ # this may hit an already-cleared weakref
+ pass
+
@property
def base_uri(self):
"""URI used by the client to connect to etcd."""
@@ -221,6 +275,7 @@ class Client(object):
response = self.http.request(
self._MGET,
uri,
+ headers=self._get_headers(),
timeout=self.read_timeout,
redirect=self.allow_redirect)
@@ -278,9 +333,9 @@ class Client(object):
try:
leader = json.loads(
- self.api_execute(self.version_prefix + '/stats/leader',
+ self.api_execute(self.version_prefix + '/stats/self',
self._MGET).data.decode('utf-8'))
- return self.members[leader['leader']]
+ return self.members[leader['leaderInfo']['leader']]
except Exception as e:
raise etcd.EtcdException("Cannot get leader data: %s" % e)
@@ -718,85 +773,123 @@ class Client(object):
_log.info("Selected new etcd server %s", mach)
return mach
+ def _wrap_request(payload):
+ @wraps(payload)
+ def wrapper(self, path, method, params=None, timeout=None):
+ some_request_failed = False
+ response = False
+
+ if timeout is None:
+ timeout = self.read_timeout
+
+ if timeout == 0:
+ timeout = None
+
+ if not path.startswith('/'):
+ raise ValueError('Path does not start with /')
+
+ while not response:
+ try:
+ response = payload(self, path, method,
+ params=params, timeout=timeout)
+ # Check the cluster ID hasn't changed under us. We use
+ # preload_content=False above so we can read the headers
+ # before we wait for the content of a watch.
+ self._check_cluster_id(response)
+ # Now force the data to be preloaded in order to trigger any
+ # IO-related errors in this method rather than when we try to
+ # access it later.
+ _ = response.data
+ # urllib3 doesn't wrap all httplib exceptions and earlier versions
+ # don't wrap socket errors either.
+ except (urllib3.exceptions.HTTPError,
+ HTTPException, socket.error) as e:
+ if (isinstance(params, dict) and
+ params.get("wait") == "true" and
+ isinstance(e,
+ urllib3.exceptions.ReadTimeoutError)):
+ _log.debug("Watch timed out.")
+ raise etcd.EtcdWatchTimedOut(
+ "Watch timed out: %r" % e,
+ cause=e
+ )
+ _log.error("Request to server %s failed: %r",
+ self._base_uri, e)
+ if self._allow_reconnect:
+ _log.info("Reconnection allowed, looking for another "
+ "server.")
+ # _next_server() raises EtcdException if there are no
+ # machines left to try, breaking out of the loop.
+ self._base_uri = self._next_server(cause=e)
+ some_request_failed = True
+
+ # if exception is raised on _ = response.data
+ # the condition for while loop will be False
+ # but we should retry
+ response = False
+ else:
+ _log.debug("Reconnection disabled, giving up.")
+ raise etcd.EtcdConnectionFailed(
+ "Connection to etcd failed due to %r" % e,
+ cause=e
+ )
+ except etcd.EtcdClusterIdChanged as e:
+ _log.warning(e)
+ raise
+ except:
+ _log.exception("Unexpected request failure, re-raising.")
+ raise
+
+ if some_request_failed:
+ if not self._use_proxies:
+ # The cluster may have changed since last invocation
+ self._machines_cache = self.machines
+ self._machines_cache.remove(self._base_uri)
+ return self._handle_server_response(response)
+ return wrapper
+
+ @_wrap_request
def api_execute(self, path, method, params=None, timeout=None):
""" Executes the query. """
-
- some_request_failed = False
- response = False
-
- if timeout is None:
- timeout = self.read_timeout
-
- if timeout == 0:
- timeout = None
-
- if not path.startswith('/'):
- raise ValueError('Path does not start with /')
-
- while not response:
- try:
- url = self._base_uri + path
-
- if (method == self._MGET) or (method == self._MDELETE):
- response = self.http.request(
- method,
- url,
- timeout=timeout,
- fields=params,
- redirect=self.allow_redirect,
- preload_content=False)
-
- elif (method == self._MPUT) or (method == self._MPOST):
- response = self.http.request_encode_body(
- method,
- url,
- fields=params,
- timeout=timeout,
- encode_multipart=False,
- redirect=self.allow_redirect,
- preload_content=False)
- else:
+ url = self._base_uri + path
+
+ if (method == self._MGET) or (method == self._MDELETE):
+ return self.http.request(
+ method,
+ url,
+ timeout=timeout,
+ fields=params,
+ redirect=self.allow_redirect,
+ headers=self._get_headers(),
+ preload_content=False)
+
+ elif (method == self._MPUT) or (method == self._MPOST):
+ return self.http.request_encode_body(
+ method,
+ url,
+ fields=params,
+ timeout=timeout,
+ encode_multipart=False,
+ redirect=self.allow_redirect,
+ headers=self._get_headers(),
+ preload_content=False)
+ else:
raise etcd.EtcdException(
'HTTP method {} not supported'.format(method))
-
- # Check the cluster ID hasn't changed under us. We use
- # preload_content=False above so we can read the headers
- # before we wait for the content of a watch.
- self._check_cluster_id(response)
- # Now force the data to be preloaded in order to trigger any
- # IO-related errors in this method rather than when we try to
- # access it later.
- _ = response.data
- # urllib3 doesn't wrap all httplib exceptions and earlier versions
- # don't wrap socket errors either.
- except (urllib3.exceptions.HTTPError,
- HTTPException,
- socket.error) as e:
- _log.error("Request to server %s failed: %r",
- self._base_uri, e)
- if self._allow_reconnect:
- _log.info("Reconnection allowed, looking for another "
- "server.")
- # _next_server() raises EtcdException if there are no
- # machines left to try, breaking out of the loop.
- self._base_uri = self._next_server(cause=e)
- some_request_failed = True
- else:
- _log.debug("Reconnection disabled, giving up.")
- raise etcd.EtcdConnectionFailed(
- "Connection to etcd failed due to %r" % e,
- cause=e
- )
- except:
- _log.exception("Unexpected request failure, re-raising.")
- raise
-
- if some_request_failed:
- if not self._use_proxies:
- # The cluster may have changed since last invocation
- self._machines_cache = self.machines
- self._machines_cache.remove(self._base_uri)
- return self._handle_server_response(response)
+
+ @_wrap_request
+ def api_execute_json(self, path, method, params=None, timeout=None):
+ url = self._base_uri + path
+ json_payload = json.dumps(params)
+ headers = self._get_headers()
+ headers['Content-Type'] = 'application/json'
+ return self.http.urlopen(method,
+ url,
+ body=json_payload,
+ timeout=timeout,
+ redirect=self.allow_redirect,
+ headers=headers,
+ preload_content=False)
def _check_cluster_id(self, response):
cluster_id = response.getheader("x-etcd-cluster-id")
@@ -827,8 +920,15 @@ class Client(object):
# throw the appropriate exception
try:
r = json.loads(resp)
+ r['status'] = response.status
except (TypeError, ValueError):
# Bad JSON, make a response locally.
r = {"message": "Bad response",
"cause": str(resp)}
etcd.EtcdError.handle(r)
+
+ def _get_headers(self):
+ if self.username and self.password:
+ credentials = ':'.join((self.username, self.password))
+ return urllib3.make_headers(basic_auth=credentials)
+ return {}
diff --git a/src/etcd/lock.py b/src/etcd/lock.py
index 582e670..687a548 100644
--- a/src/etcd/lock.py
+++ b/src/etcd/lock.py
@@ -99,7 +99,7 @@ class Lock(object):
def __exit__(self, type, value, traceback):
self.release()
- def _acquired(self, blocking=True, timeout=None):
+ def _acquired(self, blocking=True, timeout=0):
locker, nearest = self._get_locker()
self.is_taken = False
if self.lock_key == locker:
diff --git a/src/etcd/tests/integration/helpers.py b/src/etcd/tests/integration/helpers.py
index 6c7e21c..1f1d22b 100644
--- a/src/etcd/tests/integration/helpers.py
+++ b/src/etcd/tests/integration/helpers.py
@@ -38,6 +38,12 @@ class EtcdProcessHelper(object):
'-initial-cluster', initial_cluster,
'-initial-cluster-state', 'new'
])
+ else:
+ proc_args.extend([
+ '-initial-cluster', 'test-node-0=http://127.0.0.1:{}'.format(self.internal_port_range_start),
+ '-initial-cluster-state', 'new'
+ ])
+
for i in range(0, number):
self.add_one(i, proc_args)
@@ -77,12 +83,12 @@ class EtcdProcessHelper(object):
def kill_one(self, slot):
log = logging.getLogger()
- dir, process = self.processes.pop(slot)
+ data_dir, process = self.processes.pop(slot)
process.kill()
time.sleep(2)
log.debug('Killed etcd pid:%d', process.pid)
- shutil.rmtree(dir)
- log.debug('Removed directory %s' % dir)
+ shutil.rmtree(data_dir)
+ log.debug('Removed directory %s' % data_dir)
class TestingCA(object):
diff --git a/src/etcd/tests/integration/test_simple.py b/src/etcd/tests/integration/test_simple.py
index da0954d..660caa8 100644
--- a/src/etcd/tests/integration/test_simple.py
+++ b/src/etcd/tests/integration/test_simple.py
@@ -18,6 +18,7 @@ log = logging.getLogger()
class EtcdIntegrationTest(unittest.TestCase):
+ cl_size = 3
@classmethod
def setUpClass(cls):
@@ -28,7 +29,7 @@ class EtcdIntegrationTest(unittest.TestCase):
proc_name=program,
port_range_start=6001,
internal_port_range_start=8001)
- cls.processHelper.run(number=3)
+ cls.processHelper.run(number=cls.cl_size)
cls.client = etcd.Client(port=6001)
@classmethod
diff --git a/src/etcd/tests/test_auth.py b/src/etcd/tests/test_auth.py
new file mode 100644
index 0000000..fc6ce70
--- /dev/null
+++ b/src/etcd/tests/test_auth.py
@@ -0,0 +1,161 @@
+from etcd.tests.integration.test_simple import EtcdIntegrationTest
+from etcd import auth
+import etcd
+
+
+class TestEtcdAuthBase(EtcdIntegrationTest):
+ cl_size = 1
+
+ def setUp(self):
+ # Sets up the root user, toggles auth
+ u = auth.EtcdUser(self.client, 'root')
+ u.password = 'testpass'
+ u.write()
+ self.client = etcd.Client(port=6001, username='root',
+ password='testpass')
+ self.unauth_client = etcd.Client(port=6001)
+ a = auth.Auth(self.client)
+ a.active = True
+
+ def tearDown(self):
+ u = auth.EtcdUser(self.client, 'test_user')
+ r = auth.EtcdRole(self.client, 'test_role')
+ try:
+ u.delete()
+ except:
+ pass
+ try:
+ r.delete()
+ except:
+ pass
+ a = auth.Auth(self.client)
+ a.active = False
+
+
+class EtcdUserTest(TestEtcdAuthBase):
+ def test_names(self):
+ u = auth.EtcdUser(self.client, 'test_user')
+ self.assertEquals(u.names, ['root'])
+
+ def test_read(self):
+ u = auth.EtcdUser(self.client, 'root')
+ # Reading an existing user succeeds
+ try:
+ u.read()
+ except Exception:
+ self.fail("reading the root user raised an exception")
+
+ # roles for said user are fetched
+ self.assertEquals(u.roles, set(['root']))
+
+ # The user is correctly rendered out
+ self.assertEquals(u._to_net(), [{'user': 'root', 'password': None,
+ 'roles': ['root']}])
+
+ # An inexistent user raises the appropriate exception
+ u = auth.EtcdUser(self.client, 'user.does.not.exist')
+ self.assertRaises(etcd.EtcdKeyNotFound, u.read)
+
+ # Reading with an unauthenticated client raises an exception
+ u = auth.EtcdUser(self.unauth_client, 'root')
+ self.assertRaises(etcd.EtcdInsufficientPermissions, u.read)
+
+ # Generic errors are caught
+ c = etcd.Client(port=9999)
+ u = auth.EtcdUser(c, 'root')
+ self.assertRaises(etcd.EtcdException, u.read)
+
+ def test_write_and_delete(self):
+ # Create an user
+ u = auth.EtcdUser(self.client, 'test_user')
+ u.roles.add('guest')
+ u.roles.add('root')
+ # directly from my suitcase
+ u.password = '123456'
+ try:
+ u.write()
+ except:
+ self.fail("creating a user doesn't work")
+ # Password gets wiped
+ self.assertEquals(u.password, None)
+ u.read()
+ # Verify we can log in as this user and access the auth (it has the
+ # root role)
+ cl = etcd.Client(port=6001, username='test_user',
+ password='123456')
+ ul = auth.EtcdUser(cl, 'root')
+ try:
+ ul.read()
+ except etcd.EtcdInsufficientPermissions:
+ self.fail("Reading auth with the new user is not possible")
+
+ self.assertEquals(u.name, "test_user")
+ self.assertEquals(u.roles, set(['guest', 'root']))
+ # set roles as a list, it works!
+ u.roles = ['guest', 'test_group']
+ try:
+ u.write()
+ except:
+ self.fail("updating a user you previously created fails")
+ u.read()
+ self.assertIn('test_group', u.roles)
+
+ # Unauthorized access is properly handled
+ ua = auth.EtcdUser(self.unauth_client, 'test_user')
+ self.assertRaises(etcd.EtcdInsufficientPermissions, ua.write)
+
+ # now let's test deletion
+ du = auth.EtcdUser(self.client, 'user.does.not.exist')
+ self.assertRaises(etcd.EtcdKeyNotFound, du.delete)
+
+ # Delete test_user
+ u.delete()
+ self.assertRaises(etcd.EtcdKeyNotFound, u.read)
+ # Permissions are properly handled
+ self.assertRaises(etcd.EtcdInsufficientPermissions, ua.delete)
+
+
+class EtcdRoleTest(TestEtcdAuthBase):
+ def test_names(self):
+ r = auth.EtcdRole(self.client, 'guest')
+ self.assertListEqual(r.names, [u'guest', u'root'])
+
+ def test_read(self):
+ r = auth.EtcdRole(self.client, 'guest')
+ try:
+ r.read()
+ except:
+ self.fail('Reading an existing role failed')
+
+ self.assertEquals(r.acls, {'*': 'RW'})
+ # We can actually skip most other read tests as they are common
+ # with EtcdUser
+
+ def test_write_and_delete(self):
+ r = auth.EtcdRole(self.client, 'test_role')
+ r.acls = {'*': 'R', '/test/*': 'RW'}
+ try:
+ r.write()
+ except:
+ self.fail("Writing a simple groups should not fail")
+
+ r1 = auth.EtcdRole(self.client, 'test_role')
+ r1.read()
+ self.assertEquals(r1.acls, r.acls)
+ r.revoke('/test/*', 'W')
+ r.write()
+ r1.read()
+ self.assertEquals(r1.acls, {'*': 'R', '/test/*': 'R'})
+ r.grant('/pub/*', 'RW')
+ r.write()
+ r1.read()
+ self.assertEquals(r1.acls['/pub/*'], 'RW')
+ # All other exceptions are tested by the user tests
+ r1.name = None
+ self.assertRaises(etcd.EtcdException, r1.write)
+ # ditto for delete
+ try:
+ r.delete()
+ except:
+ self.fail("A normal delete should not fail")
+ self.assertRaises(etcd.EtcdKeyNotFound, r.read)
diff --git a/src/etcd/tests/unit/__init__.py b/src/etcd/tests/unit/__init__.py
index 1dd44ab..9360b6b 100644
--- a/src/etcd/tests/unit/__init__.py
+++ b/src/etcd/tests/unit/__init__.py
@@ -27,8 +27,7 @@ class TestClientApiBase(unittest.TestCase):
def _mock_api(self, status, d, cluster_id=None):
resp = self._prepare_response(status, d, cluster_id=cluster_id)
- self.client.api_execute = mock.create_autospec(
- self.client.api_execute, return_value=resp)
+ self.client.api_execute = mock.MagicMock(return_value=resp)
def _mock_exception(self, exc, msg):
self.client.api_execute = mock.Mock(side_effect=exc(msg))
diff --git a/src/etcd/tests/unit/test_client.py b/src/etcd/tests/unit/test_client.py
index 2e09d7c..bb05a66 100644
--- a/src/etcd/tests/unit/test_client.py
+++ b/src/etcd/tests/unit/test_client.py
@@ -1,5 +1,12 @@
import unittest
import etcd
+import dns.name
+import dns.rdtypes.IN.SRV
+import dns.resolver
+try:
+ import mock
+except ImportError:
+ from unittest import mock
class TestClient(unittest.TestCase):
@@ -38,6 +45,16 @@ class TestClient(unittest.TestCase):
client = etcd.Client()
assert client.allow_redirect
+ def test_default_username(self):
+ """ default username is None"""
+ client = etcd.Client()
+ assert client.username is None
+
+ def test_default_password(self):
+ """ default username is None"""
+ client = etcd.Client()
+ assert client.password is None
+
def test_set_host(self):
""" can change host """
client = etcd.Client(host='192.168.1.1')
@@ -85,6 +102,29 @@ class TestClient(unittest.TestCase):
client = etcd.Client(use_proxies = True)
assert client._use_proxies
+ def test_set_username_only(self):
+ client = etcd.Client(username='username')
+ assert client.username is None
+
+ def test_set_password_only(self):
+ client = etcd.Client(password='password')
+ assert client.password is None
+
+ def test_set_username_password(self):
+ client = etcd.Client(username='username', password='password')
+ assert client.username == 'username'
+ assert client.password == 'password'
+
+ def test_get_headers_with_auth(self):
+ client = etcd.Client(username='username', password='password')
+ assert client._get_headers() == {
+ 'authorization': 'Basic dXNlcm5hbWU6cGFzc3dvcmQ='
+ }
+
+ def test_get_headers_without_auth(self):
+ client = etcd.Client()
+ assert client._get_headers() == {}
+
def test_allow_reconnect(self):
""" Fails if allow_reconnect is false and a list of hosts is given"""
with self.assertRaises(etcd.EtcdException):
@@ -97,3 +137,25 @@ class TestClient(unittest.TestCase):
allow_reconnect=True,
use_proxies=True,
)
+
+ def test_discover(self):
+ """Tests discovery."""
+ answers = []
+ for i in range(1,3):
+ r = mock.create_autospec(dns.rdtypes.IN.SRV.SRV)
+ r.port = 2379
+ try:
+ method = dns.name.from_unicode
+ except AttributeError:
+ method = dns.name.from_text
+ r.target = method(u'etcd{}.example.com'.format(i))
+ answers.append(r)
+ dns.resolver.query = mock.create_autospec(dns.resolver.query, return_value=answers)
+ self.machines = etcd.Client.machines
+ etcd.Client.machines = mock.create_autospec(etcd.Client.machines, return_value=[u'https://etcd2.example.com:2379'])
+ c = etcd.Client(srv_domain="example.com", allow_reconnect=True, protocol="https")
+ etcd.Client.machines = self.machines
+ self.assertEquals(c.host, u'etcd1.example.com')
+ self.assertEquals(c.port, 2379)
+ self.assertEquals(c._machines_cache,
+ [u'https://etcd2.example.com:2379'])
diff --git a/src/etcd/tests/unit/test_lock.py b/src/etcd/tests/unit/test_lock.py
index 75ae676..107169f 100644
--- a/src/etcd/tests/unit/test_lock.py
+++ b/src/etcd/tests/unit/test_lock.py
@@ -1,5 +1,8 @@
import etcd
-import mock
+try:
+ import mock
+except ImportError:
+ from unittest import mock
from etcd.tests.unit import TestClientApiBase
@@ -37,8 +40,8 @@ class TestClientLock(TestClientApiBase):
Acquiring a precedingly inexistent lock works.
"""
l = etcd.Lock(self.client, 'test_lock')
- l._find_lock = mock.create_autospec(l._find_lock, return_value=False)
- l._acquired = mock.create_autospec(l._acquired, return_value=True)
+ l._find_lock = mock.MagicMock(spec=l._find_lock, return_value=False)
+ l._acquired = mock.MagicMock(spec=l._acquired, return_value=True)
# Mock the write
d = {
u'action': u'set',
@@ -87,13 +90,12 @@ class TestClientLock(TestClientApiBase):
"""
self.locker._sequence = '4'
retval = ('/_locks/test_lock/4', None)
- self.locker._get_locker = mock.create_autospec(
- self.locker._get_locker, return_value=retval)
+ self.locker._get_locker = mock.MagicMock(
+ spec=self.locker._get_locker, return_value=retval)
self.assertTrue(self.locker._acquired())
self.assertTrue(self.locker.is_taken)
retval = ('/_locks/test_lock/1', '/_locks/test_lock/4')
- self.locker._get_locker = mock.create_autospec(
- self.locker._get_locker, return_value=retval)
+ self.locker._get_locker = mock.MagicMock(return_value=retval)
self.assertFalse(self.locker._acquired(blocking=False))
self.assertFalse(self.locker.is_taken)
d = {
@@ -110,6 +112,27 @@ class TestClientLock(TestClientApiBase):
def side_effect():
return returns.pop()
+ self.locker._get_locker = mock.MagicMock(
+ spec=self.locker._get_locker, side_effect=side_effect)
+ self.assertTrue(self.locker._acquired())
+
+ def test_acquired_no_timeout(self):
+ self.locker._sequence = 4
+ returns = [('/_locks/test_lock/4', None), ('/_locks/test_lock/1', '/_locks/test_lock/4')]
+
+ def side_effect():
+ return returns.pop()
+
+ d = {
+ u'action': u'get',
+ u'node': {
+ u'modifiedIndex': 190,
+ u'key': u'/_locks/test_lock/4',
+ u'value': self.locker.uuid
+ }
+ }
+ self._mock_api(200, d)
+
self.locker._get_locker = mock.create_autospec(
self.locker._get_locker, side_effect=side_effect)
self.assertTrue(self.locker._acquired())
@@ -147,7 +170,6 @@ class TestClientLock(TestClientApiBase):
self.assertTrue(self.locker._find_lock())
self.assertEquals(self.locker._sequence, '34')
-
def test_get_locker(self):
self.recursive_read()
self.assertEquals((u'/_locks/test_lock/1', u'/_locks/test_lock/1'),
diff --git a/src/etcd/tests/unit/test_request.py b/src/etcd/tests/unit/test_request.py
index 9f54000..0942523 100644
--- a/src/etcd/tests/unit/test_request.py
+++ b/src/etcd/tests/unit/test_request.py
@@ -1,3 +1,6 @@
+import socket
+import urllib3
+
import etcd
from etcd.tests.unit import TestClientApiBase
@@ -66,7 +69,8 @@ class TestClientApiInternals(TestClientApiBase):
self._mock_api(200, d)
self.client.write('/newdir', None, dir=True)
self.assertEquals(self.client.api_execute.call_args,
- (('/v2/keys/newdir', 'PUT'), dict(params={'dir': 'true'})))
+ (('/v2/keys/newdir', 'PUT'),
+ dict(params={'dir': 'true'})))
class TestClientApiInterface(TestClientApiBase):
@@ -87,7 +91,9 @@ class TestClientApiInterface(TestClientApiBase):
@mock.patch('etcd.Client.machines', new_callable=mock.PropertyMock)
def test_use_proxies(self, mocker):
"""Do not overwrite the machines cache when using proxies"""
- mocker.return_value = ['https://10.0.0.2:4001', 'https://10.0.0.3:4001', 'https://10.0.0.4:4001']
+ mocker.return_value = ['https://10.0.0.2:4001',
+ 'https://10.0.0.3:4001',
+ 'https://10.0.0.4:4001']
c = etcd.Client(
host=(('localhost', 4001), ('localproxy', 4001)),
protocol='https',
@@ -97,17 +103,16 @@ class TestClientApiInterface(TestClientApiBase):
self.assertEquals(c._machines_cache, ['https://localproxy:4001'])
self.assertEquals(c._base_uri, 'https://localhost:4001')
- self.assertNotIn(c.base_uri,c._machines_cache)
+ self.assertNotIn(c.base_uri, c._machines_cache)
c = etcd.Client(
- host=(('localhost', 4001), ('10.0.0.2',4001)),
+ host=(('localhost', 4001), ('10.0.0.2', 4001)),
protocol='https',
allow_reconnect=True,
use_proxies=False
)
self.assertIn('https://10.0.0.3:4001', c._machines_cache)
- self.assertNotIn(c.base_uri,c._machines_cache)
-
+ self.assertNotIn(c.base_uri, c._machines_cache)
def test_members(self):
""" Can request machines """
@@ -157,7 +162,7 @@ class TestClientApiInterface(TestClientApiBase):
""" Can request the leader """
members = {"ce2a822cea30bfca": {"id": "ce2a822cea30bfca", "name": "default"}}
mocker.return_value = members
- self._mock_api(200, {"leader": "ce2a822cea30bfca", "followers": {}})
+ self._mock_api(200, {"leaderInfo":{"leader": "ce2a822cea30bfca", "followers": {}}})
self.assertEquals(self.client.leader, members["ce2a822cea30bfca"])
def test_set_plain(self):
@@ -395,12 +400,9 @@ class TestClientRequest(TestClientApiInterface):
def _mock_api(self, status, d, cluster_id=None):
resp = self._prepare_response(status, d)
resp.getheader.return_value = cluster_id or "abcdef1234"
- self.client.http.request_encode_body = mock.create_autospec(
- self.client.http.request_encode_body, return_value=resp
- )
- self.client.http.request = mock.create_autospec(
- self.client.http.request, return_value=resp
- )
+ self.client.http.request_encode_body = mock.MagicMock(
+ return_value=resp)
+ self.client.http.request = mock.MagicMock(return_value=resp)
def _mock_error(self, error_code, msg, cause, method='PUT', fields=None,
cluster_id=None):
@@ -428,6 +430,20 @@ class TestClientRequest(TestClientApiInterface):
prevValue='oldbog'
)
+ def test_watch_timeout(self):
+ """ Exception will be raised if prevValue != value in test_set """
+ self.client.http.request = mock.create_autospec(
+ self.client.http.request,
+ side_effect=urllib3.exceptions.ReadTimeoutError(self.client.http,
+ "foo",
+ "Read timed out")
+ )
+ self.assertRaises(
+ etcd.EtcdWatchTimedOut,
+ self.client.watch,
+ '/testKey',
+ )
+
def test_path_without_trailing_slash(self):
""" Exception will be raised if a path without a trailing slash is used """
self.assertRaises(ValueError, self.client.api_execute,
@@ -440,20 +456,32 @@ class TestClientRequest(TestClientApiInterface):
def test_read_cluster_id_changed(self):
""" Read timeout set to the default """
- d = {u'action': u'set',
- u'node': {
+ d = {
+ u'action': u'set',
+ u'node': {
u'expiration': u'2013-09-14T00:56:59.316195568+02:00',
u'modifiedIndex': 6,
u'key': u'/testkey',
u'ttl': 19,
- u'value': u'test'
- }
- }
+ u'value': u'test',
+ }
+ }
self._mock_api(200, d, cluster_id="notabcd1234")
self.assertRaises(etcd.EtcdClusterIdChanged,
self.client.read, '/testkey')
self.client.read("/testkey")
+ def test_read_connection_error(self):
+ self.client.http.request = mock.create_autospec(
+ self.client.http.request,
+ side_effect=socket.error()
+ )
+ self.assertRaises(etcd.EtcdConnectionFailed,
+ self.client.read, '/something')
+ # Direct GET request
+ self.assertRaises(etcd.EtcdConnectionFailed,
+ self.client.api_execute, '/a', 'GET')
+
def test_not_in(self):
pass
@@ -462,22 +490,23 @@ class TestClientRequest(TestClientApiInterface):
def test_update_fails(self):
""" Non-atomic updates fail """
- d = {u'action': u'set',
- u'node': {
+ d = {
+ u'action': u'set',
+ u'node': {
u'expiration': u'2013-09-14T00:56:59.316195568+02:00',
u'modifiedIndex': 6,
u'key': u'/testkey',
u'ttl': 19,
u'value': u'test'
- }
- }
+ }
+ }
res = etcd.EtcdResult(**d)
error = {
- "errorCode":101,
- "message":"Compare failed",
- "cause":"[ != bar] [7 != 6]",
- "index":6}
+ "errorCode": 101,
+ "message": "Compare failed",
+ "cause": "[ != bar] [7 != 6]",
+ "index": 6}
self._mock_api(412, error)
res.value = 'bar'
self.assertRaises(ValueError, self.client.update, res)