summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/etcd/__init__.py143
-rw-r--r--src/etcd/client.py275
-rw-r--r--src/etcd/election.py79
-rw-r--r--src/etcd/lock.py106
-rw-r--r--src/etcd/tests/integration/helpers.py36
-rw-r--r--src/etcd/tests/integration/test_election.py34
-rw-r--r--src/etcd/tests/integration/test_lock.py92
-rw-r--r--src/etcd/tests/integration/test_simple.py198
-rw-r--r--src/etcd/tests/integration/test_ssl.py180
-rw-r--r--src/etcd/tests/unit/test_client.py18
-rw-r--r--src/etcd/tests/unit/test_leader.py48
-rw-r--r--src/etcd/tests/unit/test_old_request.py69
-rw-r--r--src/etcd/tests/unit/test_request.py227
13 files changed, 689 insertions, 816 deletions
diff --git a/src/etcd/__init__.py b/src/etcd/__init__.py
index 18ef036..370be42 100644
--- a/src/etcd/__init__.py
+++ b/src/etcd/__init__.py
@@ -1,7 +1,18 @@
-import collections
+import logging
from .client import Client
-from .lock import Lock
-from .election import LeaderElection
+
+_log = logging.getLogger(__name__)
+
+# Prevent "no handler" warnings to stderr in projects that do not configure
+# logging.
+try:
+ from logging import NullHandler
+except ImportError:
+ # Python <2.7, just define it.
+ class NullHandler(logging.Handler):
+ def emit(self, record):
+ pass
+_log.addHandler(NullHandler())
class EtcdResult(object):
@@ -109,7 +120,30 @@ class EtcdException(Exception):
"""
def __init__(self, message=None, payload=None):
super(Exception, self).__init__(message)
- self.payload=payload
+ self.payload = payload
+
+
+class EtcdValueError(EtcdException, ValueError):
+ """
+ Base class for Etcd value-related errors.
+ """
+ pass
+
+
+class EtcdCompareFailed(EtcdValueError):
+ """
+ Compare-and-swap failure
+ """
+ pass
+
+
+class EtcdClusterIdChanged(EtcdException):
+ """
+ The etcd cluster ID changed. This may indicate the cluster was replaced
+ with a backup. Raised to prevent waiting on an etcd_index that was only
+ valid on the old cluster.
+ """
+ pass
class EtcdKeyError(EtcdException):
@@ -118,72 +152,123 @@ class EtcdKeyError(EtcdException):
"""
pass
+
class EtcdKeyNotFound(EtcdKeyError):
"""
Etcd key not found exception (100)
"""
pass
+
class EtcdNotFile(EtcdKeyError):
"""
Etcd not a file exception (102)
"""
pass
+
class EtcdNotDir(EtcdKeyError):
"""
Etcd not a directory exception (104)
"""
pass
+
class EtcdAlreadyExist(EtcdKeyError):
"""
Etcd already exist exception (105)
"""
pass
+
class EtcdEventIndexCleared(EtcdException):
"""
Etcd event index is outdated and cleared exception (401)
"""
pass
+
+class EtcdConnectionFailed(EtcdException):
+ """
+ Connection to etcd failed.
+ """
+ pass
+
+
+class EtcdWatcherCleared(EtcdException):
+ """
+ Watcher is cleared due to etcd recovery.
+ """
+ pass
+
+
+class EtcdLeaderElectionInProgress(EtcdException):
+ """
+ Request failed due to in-progress leader election.
+ """
+ pass
+
+
+class EtcdRootReadOnly(EtcdKeyError):
+ """
+ Operation is not valid on the root, which is read only.
+ """
+ pass
+
+
+class EtcdDirNotEmpty(EtcdValueError):
+ """
+ Directory not empty.
+ """
+ pass
+
+
class EtcdError(object):
# See https://github.com/coreos/etcd/blob/master/Documentation/errorcode.md
error_exceptions = {
100: EtcdKeyNotFound,
- 101: ValueError,
+ 101: EtcdCompareFailed,
102: EtcdNotFile,
- 103: Exception,
+ # 103: Non-public: no more peers.
104: EtcdNotDir,
105: EtcdAlreadyExist,
- 106: KeyError,
- 200: ValueError,
- 201: ValueError,
- 202: ValueError,
- 203: ValueError,
- 209: ValueError,
- 300: Exception,
- 301: Exception,
- 400: Exception,
+ # 106: Non-public: key is preserved.
+ 107: EtcdRootReadOnly,
+ 108: EtcdDirNotEmpty,
+ # 109: Non-public: existing peer addr.
+
+ 200: EtcdValueError,
+ 201: EtcdValueError,
+ 202: EtcdValueError,
+ 203: EtcdValueError,
+ 204: EtcdValueError,
+ 205: EtcdValueError,
+ 206: EtcdValueError,
+ 207: EtcdValueError,
+ 208: EtcdValueError,
+ 209: EtcdValueError,
+ 210: EtcdValueError,
+
+ # 300: Non-public: Raft internal error.
+ 301: EtcdLeaderElectionInProgress,
+
+ 400: EtcdWatcherCleared,
401: EtcdEventIndexCleared,
- 500: EtcdException
}
@classmethod
- def handle(cls, errorCode=None, message=None, cause=None, **kwdargs):
- """ Decodes the error and throws the appropriate error message"""
- try:
- msg = '{} : {}'.format(message, cause)
- payload={'errorCode': errorCode, 'message': message, 'cause': cause}
- if len(kwdargs) > 0:
- for key in kwdargs:
- payload[key]=kwdargs[key]
- exc = cls.error_exceptions[errorCode]
- except:
- msg = "Unable to decode server response"
- exc = EtcdException
- if exc in [EtcdException, EtcdKeyNotFound, EtcdNotFile, EtcdNotDir, EtcdAlreadyExist, EtcdEventIndexCleared]:
+ def handle(cls, payload):
+ """
+ Decodes the error and throws the appropriate error message
+
+ :param payload: The decoded JSON error payload as a dict.
+ """
+ error_code = payload.get("errorCode")
+ message = payload.get("message")
+ cause = payload.get("cause")
+ msg = '{} : {}'.format(message, cause)
+ exc = cls.error_exceptions.get(error_code, EtcdException)
+ if issubclass(exc, EtcdException):
raise exc(msg, payload)
else:
raise exc(msg)
diff --git a/src/etcd/client.py b/src/etcd/client.py
index 4bd746f..a578add 100644
--- a/src/etcd/client.py
+++ b/src/etcd/client.py
@@ -6,12 +6,28 @@
"""
+import logging
+try:
+ # Python 3
+ from http.client import HTTPException
+except ImportError:
+ # Python 2
+ from httplib import HTTPException
+import socket
import urllib3
+import urllib3.util
import json
import ssl
-
import etcd
+try:
+ from urlparse import urlparse
+except ImportError:
+ from urllib.parse import urlparse
+
+
+_log = logging.getLogger(__name__)
+
class Client(object):
@@ -24,7 +40,7 @@ class Client(object):
_MPOST = 'POST'
_MDELETE = 'DELETE'
_comparison_conditions = set(('prevValue', 'prevIndex', 'prevExist'))
- _read_options = set(('recursive', 'wait', 'waitIndex', 'sorted', 'consistent'))
+ _read_options = set(('recursive', 'wait', 'waitIndex', 'sorted', 'quorum'))
_del_conditions = set(('prevValue', 'prevIndex'))
def __init__(
self,
@@ -37,6 +53,8 @@ class Client(object):
cert=None,
ca_cert=None,
allow_reconnect=False,
+ use_proxies=False,
+ expected_cluster_id=None,
):
"""
Initialize the client.
@@ -66,28 +84,39 @@ class Client(object):
etcd server in the cluster in the case the
default one does not respond.
- """
- self._machines_cache = []
+ use_proxies (bool): we are using a list of proxies to which we connect,
+ and don't want to connect to the original etcd cluster.
+ expected_cluster_id (str): If a string, recorded as the expected
+ UUID of the cluster (rather than
+ learning it from the first request),
+ reads will raise EtcdClusterIdChanged
+ if they receive a response with a
+ different cluster ID.
+ """
+ _log.info("New etcd client created for %s:%s%s",
+ host, port, version_prefix)
self._protocol = protocol
def uri(protocol, host, port):
return '%s://%s:%d' % (protocol, host, port)
if not isinstance(host, tuple):
- self._host = host
- self._port = port
+ self._machines_cache = []
+ self._base_uri = uri(self._protocol, host, port)
else:
- self._host, self._port = host[0]
- self._machines_cache.extend(
- [uri(self._protocol, *conn) for conn in host])
-
- self._base_uri = uri(self._protocol, self._host, self._port)
+ if not allow_reconnect:
+ _log.error("List of hosts incompatible with allow_reconnect.")
+ raise etcd.EtcdException("A list of hosts to connect to was given, but reconnection not allowed?")
+ self._machines_cache = [uri(self._protocol, *conn) for conn in host]
+ self._base_uri = self._machines_cache.pop(0)
+ self.expected_cluster_id = expected_cluster_id
self.version_prefix = version_prefix
self._read_timeout = read_timeout
self._allow_redirect = allow_redirect
+ self._use_proxies = use_proxies
self._allow_reconnect = allow_reconnect
# SSL Client certificate support
@@ -100,6 +129,7 @@ class Client(object):
if protocol == 'https':
# If we don't allow TLSv1, clients using older version of OpenSSL
# (<1.0) won't be able to connect.
+ _log.debug("HTTPS enabled.")
kw['ssl_version'] = ssl.PROTOCOL_TLSv1
if cert:
@@ -119,11 +149,24 @@ class Client(object):
if self._allow_reconnect:
# we need the set of servers in the cluster in order to try
- # reconnecting upon error.
- self._machines_cache = self.machines
- self._machines_cache.remove(self._base_uri)
- else:
- self._machines_cache = []
+ # reconnecting upon error. The cluster members will be
+ # added to the hosts list you provided. If you are using
+ # proxies, set all
+ #
+ # Beware though: if you input '127.0.0.1' as your host and
+ # etcd advertises 'localhost', both will be in the
+ # resulting list.
+
+ # If we're connecting to the original cluster, we can
+ # extend the list given to the client with what we get
+ # from self.machines
+ if not self._use_proxies:
+ self._machines_cache = list(set(self._machines_cache) |
+ set(self.machines))
+ if self._base_uri in self._machines_cache:
+ self._machines_cache.remove(self._base_uri)
+ _log.debug("Machines cache initialised to %s",
+ self._machines_cache)
@property
def base_uri(self):
@@ -133,12 +176,12 @@ class Client(object):
@property
def host(self):
"""Node to connect etcd."""
- return self._host
+ return urlparse(self._base_uri).netloc.split(':')[0]
@property
def port(self):
"""Port to connect etcd."""
- return self._port
+ return int(urlparse(self._base_uri).netloc.split(':')[1])
@property
def protocol(self):
@@ -166,24 +209,107 @@ class Client(object):
>>> print client.machines
['http://127.0.0.1:4001', 'http://127.0.0.1:4002']
"""
- return [
- node.strip() for node in self.api_execute(
- self.version_prefix + '/machines',
- self._MGET).data.decode('utf-8').split(',')
- ]
+ # We can't use api_execute here, or it causes a logical loop
+ try:
+ uri = self._base_uri + self.version_prefix + '/machines'
+ response = self.http.request(
+ self._MGET,
+ uri,
+ timeout=self.read_timeout,
+ redirect=self.allow_redirect)
+
+ machines = [
+ node.strip() for node in
+ self._handle_server_response(response).data.decode('utf-8').split(',')
+ ]
+ _log.debug("Retrieved list of machines: %s", machines)
+ return machines
+ except (urllib3.exceptions.HTTPError,
+ HTTPException,
+ socket.error) as e:
+ # We can't get the list of machines, if one server is in the
+ # machines cache, try on it
+ _log.error("Failed to get list of machines from %s%s: %r",
+ self._base_uri, self.version_prefix, e)
+ if self._machines_cache:
+ self._base_uri = self._machines_cache.pop(0)
+ _log.info("Retrying on %s", self._base_uri)
+ # Call myself
+ return self.machines
+ else:
+ raise etcd.EtcdException("Could not get the list of servers, "
+ "maybe you provided the wrong "
+ "host(s) to connect to?")
+
+ @property
+ def members(self):
+ """
+ A more structured view of peers in the cluster.
+
+ Note that while we have an internal DS called _members, accessing the public property will call etcd.
+ """
+ # Empty the members list
+ self._members = {}
+ try:
+ data = self.api_execute(self.version_prefix + '/members',
+ self._MGET).data.decode('utf-8')
+ res = json.loads(data)
+ for member in res['members']:
+ self._members[member['id']] = member
+ return self._members
+ except:
+ raise etcd.EtcdException("Could not get the members list, maybe the cluster has gone away?")
@property
def leader(self):
"""
Returns:
- str. the leader of the cluster.
+ dict. the leader of the cluster.
>>> print client.leader
- 'http://127.0.0.1:4001'
+ {"id":"ce2a822cea30bfca","name":"default","peerURLs":["http://localhost:2380","http://localhost:7001"],"clientURLs":["http://127.0.0.1:4001"]}
"""
- return self.api_execute(
- self.version_prefix + '/leader',
- self._MGET).data.decode('ascii')
+ try:
+
+ leader = json.loads(
+ self.api_execute(self.version_prefix + '/stats/leader',
+ self._MGET).data.decode('utf-8'))
+ return self.members[leader['leader']]
+ except Exception as e:
+ raise etcd.EtcdException("Cannot get leader data: %s" % e)
+
+ @property
+ def stats(self):
+ """
+ Returns:
+ dict. the stats of the local server
+ """
+ return self._stats()
+
+ @property
+ def leader_stats(self):
+ """
+ Returns:
+ dict. the stats of the leader
+ """
+ return self._stats('leader')
+
+ @property
+ def store_stats(self):
+ """
+ Returns:
+ dict. the stats of the kv store
+ """
+ return self._stats('store')
+
+ def _stats(self, what='self'):
+ """ Internal method to access the stats endpoints"""
+ data = self.api_execute(self.version_prefix
+ + '/stats/' + what, self._MGET).data.decode('utf-8')
+ try:
+ return json.loads(data)
+ except (TypeError,ValueError):
+ raise etcd.EtcdException("Cannot parse json data in the response")
@property
def key_endpoint(self):
@@ -242,6 +368,8 @@ class Client(object):
'newValue'
"""
+ _log.info("Writing %s to key %s ttl=%s dir=%s append=%s",
+ value, key, ttl, dir, append)
key = self._sanitize_key(key)
params = {}
if value is not None:
@@ -285,6 +413,7 @@ class Client(object):
obj (etcd.EtcdResult): The object that needs updating.
"""
+ _log.info("Updating %s to %s.", obj.key, obj.value)
kwdargs = {
'dir': obj.dir,
'ttl': obj.ttl,
@@ -294,11 +423,8 @@ class Client(object):
if not obj.dir:
# prevIndex on a dir causes a 'not a file' error. d'oh!
kwdargs['prevIndex'] = obj.modifiedIndex
-
return self.write(obj.key, obj.value, **kwdargs)
-
-
def read(self, key, **kwdargs):
"""
Returns the value of the key 'key'.
@@ -331,6 +457,7 @@ class Client(object):
'value'
"""
+ _log.info("Issuing read for key %s with args %s", key, kwdargs)
key = self._sanitize_key(key)
params = {}
@@ -344,7 +471,8 @@ class Client(object):
timeout = kwdargs.get('timeout', None)
response = self.api_execute(
- self.key_endpoint + key, self._MGET, params=params, timeout=timeout)
+ self.key_endpoint + key, self._MGET, params=params,
+ timeout=timeout)
return self._result_from_response(response)
def delete(self, key, recursive=None, dir=None, **kwdargs):
@@ -376,6 +504,8 @@ class Client(object):
'/key'
"""
+ _log.info("Deleting %s recursive=%s dir=%s extra args=%s",
+ key, recursive, dir, kwdargs)
key = self._sanitize_key(key)
kwds = {}
@@ -387,6 +517,7 @@ class Client(object):
for k in self._del_conditions:
if k in kwdargs:
kwds[k] = kwdargs[k]
+ _log.debug("Calculated params = %s", kwds)
response = self.api_execute(
self.key_endpoint + key, self._MDELETE, params=kwds)
@@ -509,6 +640,7 @@ class Client(object):
'value'
"""
+ _log.debug("About to wait on key %s, index %s", key, index)
if index:
return self.read(key, wait=True, waitIndex=index, timeout=timeout,
recursive=recursive)
@@ -538,16 +670,15 @@ class Client(object):
local_index = index
while True:
response = self.watch(key, index=local_index, timeout=0, recursive=recursive)
- if local_index is not None:
- local_index += 1
+ local_index = response.modifiedIndex + 1
yield response
def get_lock(self, *args, **kwargs):
- return etcd.Lock(self, *args, **kwargs)
+ raise NotImplementedError('Lock primitives were removed from etcd 2.0')
@property
def election(self):
- return etcd.LeaderElection(self)
+ raise NotImplementedError('Election primitives were removed from etcd 2.0')
def _result_from_response(self, response):
""" Creates an EtcdResult from json dictionary """
@@ -564,10 +695,16 @@ class Client(object):
def _next_server(self):
""" Selects the next server in the list, refreshes the server list. """
+ _log.debug("Selection next machine in cache. Available machines: %s",
+ self._machines_cache)
try:
- return self._machines_cache.pop()
+ mach = self._machines_cache.pop()
except IndexError:
- raise etcd.EtcdException('No more machines in the cluster')
+ _log.error("Machines cache is empty, no machines to try.")
+ raise etcd.EtcdConnectionFailed('No more machines in the cluster')
+ else:
+ _log.info("Selected new etcd server %s", mach)
+ return mach
def api_execute(self, path, method, params=None, timeout=None):
""" Executes the query. """
@@ -594,7 +731,8 @@ class Client(object):
url,
timeout=timeout,
fields=params,
- redirect=self.allow_redirect)
+ redirect=self.allow_redirect,
+ preload_content=False)
elif (method == self._MPUT) or (method == self._MPOST):
response = self.http.request_encode_body(
@@ -603,17 +741,55 @@ class Client(object):
fields=params,
timeout=timeout,
encode_multipart=False,
- redirect=self.allow_redirect)
+ redirect=self.allow_redirect,
+ preload_content=False)
else:
raise etcd.EtcdException(
'HTTP method {} not supported'.format(method))
- except urllib3.exceptions.MaxRetryError:
- self._base_uri = self._next_server()
- some_request_failed = True
+ # urllib3 doesn't wrap all httplib exceptions and earlier versions
+ # don't wrap socket errors either.
+ except (urllib3.exceptions.HTTPError,
+ HTTPException,
+ socket.error) as e:
+ _log.error("Request to server %s failed: %r",
+ self._base_uri, e)
+ if self._allow_reconnect:
+ _log.info("Reconnection allowed, looking for another "
+ "server.")
+ # _next_server() raises EtcdException if there are no
+ # machines left to try, breaking out of the loop.
+ self._base_uri = self._next_server()
+ some_request_failed = True
+ else:
+ _log.info("Reconnection disabled, giving up.")
+ raise etcd.EtcdConnectionFailed(
+ "Connection to etcd failed due to %r" % e)
+ except:
+ _log.exception("Unexpected request failure, re-raising.")
+ raise
+
+ else:
+ # Check the cluster ID hasn't changed under us. We use
+ # preload_content=False above so we can read the headers
+ # before we wait for the content of a long poll.
+ cluster_id = response.getheader("x-etcd-cluster-id")
+ id_changed = (self.expected_cluster_id and
+ cluster_id != self.expected_cluster_id)
+ # Update the ID so we only raise the exception once.
+ self.expected_cluster_id = cluster_id
+ if id_changed:
+ # Defensive: clear the pool so that we connect afresh next
+ # time.
+ self.http.clear()
+ raise etcd.EtcdClusterIdChanged(
+ 'The UUID of the cluster changed from {} to '
+ '{}.'.format(self.expected_cluster_id, cluster_id))
if some_request_failed:
- self._machines_cache = self.machines
+ if not self._use_proxies:
+ # The cluster may have changed since last invocation
+ self._machines_cache = self.machines
self._machines_cache.remove(self._base_uri)
return self._handle_server_response(response)
@@ -628,9 +804,8 @@ class Client(object):
# throw the appropriate exception
try:
r = json.loads(resp)
- except ValueError:
- r = None
- if r:
- etcd.EtcdError.handle(**r)
- else:
- raise etcd.EtcdException(resp)
+ except (TypeError, ValueError):
+ # Bad JSON, make a response locally.
+ r = {"message": "Bad response",
+ "cause": str(resp)}
+ etcd.EtcdError.handle(r)
diff --git a/src/etcd/election.py b/src/etcd/election.py
deleted file mode 100644
index 25ea403..0000000
--- a/src/etcd/election.py
+++ /dev/null
@@ -1,79 +0,0 @@
-import etcd
-import platform
-
-
-class LeaderElection(object):
-
- """
- Leader Election class using the etcd module
- """
-
- def __init__(self, client):
- """
- Initialize a leader election object.
-
- Args:
- client (etcd.Client): etcd client to use for the connection
- """
- self.client = client
-
- def get_path(self, key):
- if key.startswith('/'):
- return '/mod/v2/leader{}'.format(key)
- return '/mod/v2/leader/{}'.format(key)
-
- def set(self, key, name=None, ttl=0, timeout=None):
- """
- Initialize a leader election object.
-
- Args:
- key (string): name of the leader key,
-
- ttl (int): ttl (in seconds) for the lock to live.
-
- name (string): the name to store as the leader name. Defaults to the
- client's hostname
-
- """
-
- name = name or platform.node()
- params = {'ttl': ttl, 'name': name}
- path = self.get_path(key)
-
- res = self.client.api_execute(
- path, self.client._MPUT, params=params, timeout=timeout)
- return res.data.decode('utf-8')
-
- def get(self, key):
- """
- Get the name of a leader object.
-
- Args:
- key (string): name of the leader key,
-
- Raises:
- etcd.EtcdException
-
- """
- res = self.client.api_execute(self.get_path(key), self.client._MGET)
- if not res.data:
- raise etcd.EtcdException('Leader path {} not found'.format(key))
- return res.data.decode('utf-8')
-
- def delete(self, key, name=None):
- """
- Delete a leader object.
-
- Args:
- key (string): the leader key,
-
- name (string): name of the elected leader
-
- Raises:
- etcd.EtcdException
-
- """
- path = self.get_path(key)
- name = name or platform.node()
- res = self.client.api_execute(path, self.client._MDELETE, {'name': name})
- return res.data.decode('utf-8') == ''
diff --git a/src/etcd/lock.py b/src/etcd/lock.py
deleted file mode 100644
index 9068576..0000000
--- a/src/etcd/lock.py
+++ /dev/null
@@ -1,106 +0,0 @@
-import contextlib
-
-import etcd
-
-
-class Lock(object):
-
- """
- Lock object using etcd's lock module.
- """
-
- def __init__(self, client, key, ttl=0, value=None):
- """
- Initialize a lock object.
-
- Args:
- client (Client): etcd client to use for communication.
-
- key (string): key to lock.
-
- ttl (int): ttl (in seconds) for the lock to live.
- 0 or None to lock forever.
-
- value (mixed): value to store on the lock.
- """
- self.client = client
- if not key.startswith('/'):
- key = '/' + key
- self.key = key
- self.ttl = ttl
- self.value = value
- self._index = None
-
- def __enter__(self):
- self.acquire()
-
- def __exit__(self, type, value, traceback):
- self.release()
-
- @property
- def _path(self):
- return u'/mod/v2/lock{}'.format(self.key)
-
- def acquire(self, timeout=None):
- """Acquire the lock from etcd. Blocks until lock is acquired."""
- params = {u'ttl': self.ttl}
- if self.value is not None:
- params[u'value'] = self.value
-
- res = self.client.api_execute(
- self._path, self.client._MPOST, params=params, timeout=timeout)
- self._index = res.data.decode('utf-8')
- return self
-
- def get(self):
- """
- Get Information on the lock.
- This allows to operate on locks that have not been acquired directly.
- """
- res = self.client.api_execute(self._path, self.client._MGET)
- if res.data:
- self.value = res.data.decode('utf-8')
- else:
- raise etcd.EtcdException('Lock is non-existent (or expired)')
- self._get_index()
- return self
-
- def _get_index(self):
- res = self.client.api_execute(
- self._path,
- self.client._MGET,
- {u'field': u'index'})
- if not res.data:
- raise etcd.EtcdException('Lock is non-existent (or expired)')
- self._index = res.data.decode('utf-8')
-
- def is_locked(self):
- """Check if lock is currently locked."""
- params = {u'field': u'index'}
- res = self.client.api_execute(
- self._path, self.client._MGET, params=params)
- return bool(res.data)
-
- def release(self):
- """Release this lock."""
- if not self._index:
- raise etcd.EtcdException(
- u'Cannot release lock that has not been locked')
- params = {u'index': self._index}
- res = self.client.api_execute(
- self._path, self.client._MDELETE, params=params)
- self._index = None
-
- def renew(self, new_ttl, timeout=None):
- """
- Renew the TTL on this lock.
-
- Args:
- new_ttl (int): new TTL to set.
- """
- if not self._index:
- raise etcd.EtcdException(
- u'Cannot renew lock that has not been locked')
- params = {u'ttl': new_ttl, u'index': self._index}
- res = self.client.api_execute(
- self._path, self.client._MPUT, params=params)
diff --git a/src/etcd/tests/integration/helpers.py b/src/etcd/tests/integration/helpers.py
index 9b1c127..6c7e21c 100644
--- a/src/etcd/tests/integration/helpers.py
+++ b/src/etcd/tests/integration/helpers.py
@@ -17,7 +17,9 @@ class EtcdProcessHelper(object):
proc_name='etcd',
port_range_start=4001,
internal_port_range_start=7001,
- cluster=False):
+ cluster=False,
+ tls=False
+ ):
self.base_directory = base_directory
self.proc_name = proc_name
@@ -25,8 +27,17 @@ class EtcdProcessHelper(object):
self.internal_port_range_start = internal_port_range_start
self.processes = {}
self.cluster = cluster
-
- def run(self, number=1, proc_args=None):
+ self.schema = 'http://'
+ if tls:
+ self.schema = 'https://'
+
+ def run(self, number=1, proc_args=[]):
+ if number > 1:
+ initial_cluster = ",".join([ "test-node-{}={}127.0.0.1:{}".format(slot, 'http://', self.internal_port_range_start + slot) for slot in range(0, number)])
+ proc_args.extend([
+ '-initial-cluster', initial_cluster,
+ '-initial-cluster-state', 'new'
+ ])
for i in range(0, number):
self.add_one(i, proc_args)
@@ -42,23 +53,22 @@ class EtcdProcessHelper(object):
prefix='python-etcd.%d-' % slot)
log.debug('Created directory %s' % directory)
+ client = '%s127.0.0.1:%d' % (self.schema, self.port_range_start + slot)
+ peer = '%s127.0.0.1:%d' % ('http://', self.internal_port_range_start
+ + slot)
daemon_args = [
self.proc_name,
'-data-dir', directory,
'-name', 'test-node-%d' % slot,
- '-peer-addr', '127.0.0.1:%d' % (self.internal_port_range_start +
- slot),
- '-addr', '127.0.0.1:%d' % (self.port_range_start + slot),
+ '-initial-advertise-peer-urls', peer,
+ '-listen-peer-urls', peer,
+ '-advertise-client-urls', client,
+ '-listen-client-urls', client
]
if proc_args:
daemon_args.extend(proc_args)
- if slot > 0 and self.cluster:
- daemon_args.append('-peers')
- daemon_args.append(
- '127.0.0.1:%d' % self.internal_port_range_start)
-
daemon = subprocess.Popen(daemon_args)
log.debug('Started %d' % daemon.pid)
log.debug('Params: %s' % daemon_args)
@@ -158,6 +168,10 @@ class TestingCA(object):
"extendedKeyUsage".encode('ascii'),
False,
"clientAuth,serverAuth".encode('ascii')),
+ crypto.X509Extension(
+ "subjectAltName".encode('ascii'),
+ False,
+ "IP: 127.0.0.1".encode('ascii')),
])
cert.gmtime_adj_notBefore(0)
diff --git a/src/etcd/tests/integration/test_election.py b/src/etcd/tests/integration/test_election.py
deleted file mode 100644
index 46ea2bb..0000000
--- a/src/etcd/tests/integration/test_election.py
+++ /dev/null
@@ -1,34 +0,0 @@
-import etcd
-from . import test_simple
-import time
-import unittest
-
-class TestElection(test_simple.EtcdIntegrationTest):
- def setUp(self):
- self.client = etcd.Client(port=6001)
-
- def test_set_get_delete(self):
- e = self.client.election
- res = e.set('/mysql', name='foo.example.com', ttl=30)
- self.assertTrue(res != '')
- res = e.get('/mysql')
- self.assertEquals(res, 'foo.example.com')
- self.assertTrue(e.delete('/mysql', name='foo.example.com'))
-
-
- def test_set_invalid_ttl(self):
- self.assertRaises(ValueError, self.client.election.set, '/mysql', name='foo.example.com', ttl='ciao')
-
- def test_get_non_existing(self):
- """This is actually expected to fail. See https://github.com/coreos/etcd/issues/446"""
- self.assertRaises(etcd.EtcdException, self.client.election.get, '/foobar')
-
- def test_delete_non_existing(self):
- self.assertRaises(etcd.EtcdException, self.client.election.delete, '/foobar')
-
- def test_get_delete_after_ttl_expired_raises(self):
- e = self.client.election
- e.set('/mysql', name='foo', ttl=1)
- time.sleep(2)
- self.assertRaises(etcd.EtcdException, e.get, '/mysql')
- self.assertRaises(etcd.EtcdKeyNotFound, e.delete, '/mysql', name='foo')
diff --git a/src/etcd/tests/integration/test_lock.py b/src/etcd/tests/integration/test_lock.py
deleted file mode 100644
index a5d8744..0000000
--- a/src/etcd/tests/integration/test_lock.py
+++ /dev/null
@@ -1,92 +0,0 @@
-import etcd
-from . import test_simple
-import time
-
-class TestLocks(test_simple.EtcdIntegrationTest):
- def setUp(self):
- self.client = etcd.Client(port=6001)
-
- def test_acquire_lock(self):
- """ Can acquire a lock. """
- key = '/testkey'
- ttl = 1
- expected_index = '2'
- lock = self.client.get_lock(key, ttl=ttl).acquire()
- self.assertEquals(lock._index, expected_index)
- lock.release()
- self.assertEquals(lock._index, None)
-
- def test_acquire_lock_invalid_ttl(self):
- """ Invalid TTL throws an error """
- key = '/testkey'
- ttl = 'invalid'
- expected_index = 'invalid'
- lock = self.client.get_lock(key, ttl=ttl)
- self.assertRaises(ValueError, lock.acquire)
-
- def test_acquire_lock_with_context_manager(self):
- key = '/testkey'
- ttl = 1
- lock = self.client.get_lock(key, ttl=ttl)
- with lock:
- self.assertTrue(lock.is_locked())
- self.assertFalse(lock.is_locked())
-
- def test_is_locked(self):
- key = '/testkey'
- ttl = 1
- lock = self.client.get_lock(key, ttl=ttl)
- self.assertFalse(lock.is_locked())
- lock.acquire()
- self.assertTrue(lock.is_locked())
- lock.release()
-
- def test_is_locked_on_expired_key(self):
- key = '/testkey'
- ttl = 1
- lock = self.client.get_lock(key, value='test', ttl=ttl).acquire()
- time.sleep(3)
- self.assertFalse(lock.is_locked())
-
- def test_renew(self):
- key = '/testkey'
- ttl = 1
- lock = self.client.get_lock(key, ttl=ttl)
- lock.acquire()
- self.assertTrue(lock.is_locked())
- lock.renew(2)
- # TODO sleep(1)?
- self.assertTrue(lock.is_locked())
- lock.release()
-
- def test_renew_fails_without_locking(self):
- key = '/testkey'
- ttl = 1
- lock = self.client.get_lock(key, ttl=ttl)
- self.assertEquals(lock._index, None)
- self.assertRaises(etcd.EtcdException, lock.renew, 2)
-
- def test_release(self):
- key = '/testkey'
- ttl = 1
- index = '2'
- lock = self.client.get_lock(key, ttl=ttl)
- lock.acquire()
- self.assertTrue(lock.is_locked())
- lock.release()
- self.assertFalse(lock.is_locked())
-
- def test_release_fails_without_locking(self):
- key = '/testkey'
- ttl = 1
- lock = self.client.get_lock(key, ttl=ttl)
- self.assertEquals(lock._index, None)
- self.assertRaises(etcd.EtcdException, lock.release)
-
- def test_get(self):
- key = '/testkey'
- ttl = 5
- with self.client.get_lock(key, ttl=ttl) as lock:
- lock2 = self.client.get_lock(key).get()
- self.assertTrue(lock2.is_locked())
- self.assertFalse(lock2.is_locked())
diff --git a/src/etcd/tests/integration/test_simple.py b/src/etcd/tests/integration/test_simple.py
index 9283cc5..c275d6e 100644
--- a/src/etcd/tests/integration/test_simple.py
+++ b/src/etcd/tests/integration/test_simple.py
@@ -64,10 +64,9 @@ class TestSimple(EtcdIntegrationTest):
def test_machines(self):
""" INTEGRATION: retrieve machines """
self.assertEquals(self.client.machines[0], 'http://127.0.0.1:6001')
-
def test_leader(self):
""" INTEGRATION: retrieve leader """
- self.assertEquals(self.client.leader, 'http://127.0.0.1:8001')
+ self.assertEquals(self.client.leader['clientURLs'], ['http://127.0.0.1:6001'])
def test_get_set_delete(self):
""" INTEGRATION: set a new value """
@@ -217,7 +216,8 @@ class TestClusterFunctions(EtcdIntegrationTest):
self.processHelper.run(number=3)
self.client = etcd.Client(port=6001, allow_reconnect=False)
self.processHelper.kill_one(0)
- self.assertRaises(etcd.EtcdException, self.client.get, '/test_set')
+ self.assertRaises(etcd.EtcdConnectionFailed, self.client.get,
+ '/test_set')
def test_reconnet_fails(self):
""" INTEGRATION: fails to reconnect if no available machines """
@@ -235,42 +235,6 @@ class TestClusterFunctions(EtcdIntegrationTest):
self.processHelper.kill_one(0)
self.assertRaises(etcd.EtcdException, self.client.get, '/test_set')
- def test_reconnect_to_failed_node(self):
- """ INTEGRATION: after a server failed and recovered we can connect."""
-
- self.processHelper.stop()
- # Start with three instances (0, 1, 2)
- self.processHelper.run(number=3)
-
- # Connect to instance 0
- self.client = etcd.Client(port=6001, allow_reconnect=True)
- set_result = self.client.set('/test_set', 'test-key1')
-
- get_result = self.client.get('/test_set')
- self.assertEquals('test-key1', get_result.value)
-
- # kill 1 -> instances = (0, 2)
- self.processHelper.kill_one(1)
-
- get_result = self.client.get('/test_set')
- self.assertEquals('test-key1', get_result.value)
-
- # kill 0 -> Instances (2)
- self.processHelper.kill_one(0)
-
- get_result = self.client.get('/test_set')
- self.assertEquals('test-key1', get_result.value)
-
- # Add 0 (failed server) -> Instances (0,2)
- self.processHelper.add_one(0)
- # Instances (0, 2)
-
- # kill 2 -> Instances (0) (previously failed)
- self.processHelper.kill_one(2)
-
- get_result = self.client.get('/test_set')
- self.assertEquals('test-key1', get_result.value)
-
class TestWatch(EtcdIntegrationTest):
@@ -422,159 +386,3 @@ class TestWatch(EtcdIntegrationTest):
watcher.join(timeout=5)
proc.join(timeout=5)
-
-
-class TestAuthenticatedAccess(EtcdIntegrationTest):
-
- @classmethod
- def setUpClass(cls):
- program = cls._get_exe()
- cls.directory = tempfile.mkdtemp(prefix='python-etcd')
-
- cls.ca_cert_path = os.path.join(cls.directory, 'ca.crt')
- ca_key_path = os.path.join(cls.directory, 'ca.key')
-
- cls.ca2_cert_path = os.path.join(cls.directory, 'ca2.crt')
- ca2_key_path = os.path.join(cls.directory, 'ca2.key')
-
- server_cert_path = os.path.join(cls.directory, 'server.crt')
- server_key_path = os.path.join(cls.directory, 'server.key')
-
- ca, ca_key = helpers.TestingCA.create_test_ca_certificate(
- cls.ca_cert_path, ca_key_path, 'TESTCA')
-
- ca2, ca2_key = helpers.TestingCA.create_test_ca_certificate(
- cls.ca2_cert_path, ca2_key_path, 'TESTCA2')
-
- helpers.TestingCA.create_test_certificate(
- ca, ca_key, server_cert_path, server_key_path, '127.0.0.1')
-
- cls.processHelper = helpers.EtcdProcessHelper(
- cls.directory,
- proc_name=program,
- port_range_start=6001,
- internal_port_range_start=8001)
-
- cls.processHelper.run(number=3,
- proc_args=[
- '-cert-file=%s' % server_cert_path,
- '-key-file=%s' % server_key_path
- ])
-
- def test_get_set_unauthenticated(self):
- """ INTEGRATION: set/get a new value unauthenticated (http->https) """
-
- client = etcd.Client(port=6001)
-
- # Since python 3 raises a MaxRetryError here, this gets caught in
- # different code blocks in python 2 and python 3, thus messages are
- # different. Python 3 does the right thing(TM), for the record
- self.assertRaises(
- etcd.EtcdException, client.set, '/test_set', 'test-key')
-
- self.assertRaises(etcd.EtcdException, client.get, '/test_set')
-
- def test_get_set_unauthenticated_missing_ca(self):
- """ INTEGRATION: try unauthenticated w/out validation (https->https)"""
-
- client = etcd.Client(protocol='https', port=6001)
- set_result = client.set('/test_set', 'test-key')
- get_result = client.get('/test_set')
-
- def test_get_set_unauthenticated_with_ca(self):
- """ INTEGRATION: try unauthenticated w/out validation (https->https)"""
-
- client = etcd.Client(
- protocol='https', port=6001, ca_cert=self.ca2_cert_path)
-
- self.assertRaises(urllib3.exceptions.SSLError, client.set, '/test-set', 'test-key')
- self.assertRaises(urllib3.exceptions.SSLError, client.get, '/test-set')
-
- def test_get_set_authenticated(self):
- """ INTEGRATION: set/get a new value authenticated """
-
- client = etcd.Client(
- port=6001, protocol='https', ca_cert=self.ca_cert_path)
-
- set_result = client.set('/test_set', 'test-key')
- get_result = client.get('/test_set')
-
-
-class TestClientAuthenticatedAccess(EtcdIntegrationTest):
-
- @classmethod
- def setUpClass(cls):
- program = cls._get_exe()
- cls.directory = tempfile.mkdtemp(prefix='python-etcd')
-
- cls.ca_cert_path = os.path.join(cls.directory, 'ca.crt')
- ca_key_path = os.path.join(cls.directory, 'ca.key')
-
- server_cert_path = os.path.join(cls.directory, 'server.crt')
- server_key_path = os.path.join(cls.directory, 'server.key')
-
- cls.client_cert_path = os.path.join(cls.directory, 'client.crt')
- cls.client_key_path = os.path.join(cls.directory, 'client.key')
-
- cls.client_all_cert = os.path.join(cls.directory, 'client-all.crt')
-
- ca, ca_key = helpers.TestingCA.create_test_ca_certificate(
- cls.ca_cert_path, ca_key_path)
-
- helpers.TestingCA.create_test_certificate(
- ca, ca_key, server_cert_path, server_key_path, '127.0.0.1')
-
- helpers.TestingCA.create_test_certificate(
- ca,
- ca_key,
- cls.client_cert_path,
- cls.client_key_path)
-
- cls.processHelper = helpers.EtcdProcessHelper(
- cls.directory,
- proc_name=program,
- port_range_start=6001,
- internal_port_range_start=8001)
-
- with open(cls.client_all_cert, 'w') as f:
- with open(cls.client_key_path, 'r') as g:
- f.write(g.read())
- with open(cls.client_cert_path, 'r') as g:
- f.write(g.read())
-
- cls.processHelper.run(number=3,
- proc_args=[
- '-cert-file=%s' % server_cert_path,
- '-key-file=%s' % server_key_path,
- '-ca-file=%s' % cls.ca_cert_path
- ])
-
- def test_get_set_unauthenticated(self):
- """ INTEGRATION: set/get a new value unauthenticated (http->https) """
-
- client = etcd.Client(port=6001)
-
- # See above for the reason of this change
- self.assertRaises(
- etcd.EtcdException, client.set, '/test_set', 'test-key')
- self.assertRaises(etcd.EtcdException, client.get, '/test_set')
-
- def test_get_set_authenticated(self):
- """ INTEGRATION: connecting to server with mutual auth """
-
- client = etcd.Client(
- port=6001,
- protocol='https',
- cert=self.client_all_cert,
- ca_cert=self.ca_cert_path
- )
-
- set_result = client.set('/test_set', 'test-key')
- self.assertEquals('set', set_result.action.lower())
- self.assertEquals('/test_set', set_result.key)
- self.assertEquals('test-key', set_result.value)
-
- get_result = client.get('/test_set')
- self.assertEquals('get', get_result.action.lower())
- self.assertEquals('/test_set', get_result.key)
- self.assertEquals('test-key', get_result.value)
diff --git a/src/etcd/tests/integration/test_ssl.py b/src/etcd/tests/integration/test_ssl.py
new file mode 100644
index 0000000..16185c2
--- /dev/null
+++ b/src/etcd/tests/integration/test_ssl.py
@@ -0,0 +1,180 @@
+import os
+import time
+import shutil
+import logging
+import unittest
+import multiprocessing
+import tempfile
+
+import urllib3
+
+import etcd
+from . import helpers
+from . import test_simple
+from nose.tools import nottest
+
+log = logging.getLogger()
+
+class TestEncryptedAccess(test_simple.EtcdIntegrationTest):
+
+ @classmethod
+ def setUpClass(cls):
+ program = cls._get_exe()
+ cls.directory = tempfile.mkdtemp(prefix='python-etcd')
+
+ cls.ca_cert_path = os.path.join(cls.directory, 'ca.crt')
+ ca_key_path = os.path.join(cls.directory, 'ca.key')
+
+ cls.ca2_cert_path = os.path.join(cls.directory, 'ca2.crt')
+ ca2_key_path = os.path.join(cls.directory, 'ca2.key')
+
+ server_cert_path = os.path.join(cls.directory, 'server.crt')
+ server_key_path = os.path.join(cls.directory, 'server.key')
+
+ ca, ca_key = helpers.TestingCA.create_test_ca_certificate(
+ cls.ca_cert_path, ca_key_path, 'TESTCA')
+
+ ca2, ca2_key = helpers.TestingCA.create_test_ca_certificate(
+ cls.ca2_cert_path, ca2_key_path, 'TESTCA2')
+
+ helpers.TestingCA.create_test_certificate(
+ ca, ca_key, server_cert_path, server_key_path, '127.0.0.1')
+
+ cls.processHelper = helpers.EtcdProcessHelper(
+ cls.directory,
+ proc_name=program,
+ port_range_start=6001,
+ internal_port_range_start=8001,
+ tls=True
+ )
+
+ cls.processHelper.run(number=3,
+ proc_args=[
+ '-cert-file=%s' % server_cert_path,
+ '-key-file=%s' % server_key_path
+ ])
+
+ def test_get_set_unauthenticated(self):
+ """ INTEGRATION: set/get a new value unauthenticated (http->https) """
+
+ client = etcd.Client(port=6001)
+
+ # Since python 3 raises a MaxRetryError here, this gets caught in
+ # different code blocks in python 2 and python 3, thus messages are
+ # different. Python 3 does the right thing(TM), for the record
+ self.assertRaises(
+ etcd.EtcdException, client.set, '/test_set', 'test-key')
+
+ self.assertRaises(etcd.EtcdException, client.get, '/test_set')
+
+ @nottest
+ def test_get_set_unauthenticated_missing_ca(self):
+ """ INTEGRATION: try unauthenticated w/out validation (https->https)"""
+ # This doesn't work for now and will need further inspection
+ client = etcd.Client(protocol='https', port=6001)
+ set_result = client.set('/test_set', 'test-key')
+ get_result = client.get('/test_set')
+
+
+ def test_get_set_unauthenticated_with_ca(self):
+ """ INTEGRATION: try unauthenticated with validation (https->https)"""
+ client = etcd.Client(
+ protocol='https', port=6001, ca_cert=self.ca2_cert_path)
+
+ self.assertRaises(urllib3.exceptions.SSLError, client.set, '/test-set', 'test-key')
+ self.assertRaises(urllib3.exceptions.SSLError, client.get, '/test-set')
+
+ def test_get_set_authenticated(self):
+ """ INTEGRATION: set/get a new value authenticated """
+
+ client = etcd.Client(
+ port=6001, protocol='https', ca_cert=self.ca_cert_path)
+
+ set_result = client.set('/test_set', 'test-key')
+ get_result = client.get('/test_set')
+
+
+class TestClientAuthenticatedAccess(test_simple.EtcdIntegrationTest):
+
+ @classmethod
+ def setUpClass(cls):
+ program = cls._get_exe()
+ cls.directory = tempfile.mkdtemp(prefix='python-etcd')
+
+ cls.ca_cert_path = os.path.join(cls.directory, 'ca.crt')
+ ca_key_path = os.path.join(cls.directory, 'ca.key')
+
+ server_cert_path = os.path.join(cls.directory, 'server.crt')
+ server_key_path = os.path.join(cls.directory, 'server.key')
+
+ cls.client_cert_path = os.path.join(cls.directory, 'client.crt')
+ cls.client_key_path = os.path.join(cls.directory, 'client.key')
+
+ cls.client_all_cert = os.path.join(cls.directory, 'client-all.crt')
+
+ ca, ca_key = helpers.TestingCA.create_test_ca_certificate(
+ cls.ca_cert_path, ca_key_path)
+
+ helpers.TestingCA.create_test_certificate(
+ ca, ca_key, server_cert_path, server_key_path, '127.0.0.1')
+
+ helpers.TestingCA.create_test_certificate(
+ ca,
+ ca_key,
+ cls.client_cert_path,
+ cls.client_key_path)
+
+ cls.processHelper = helpers.EtcdProcessHelper(
+ cls.directory,
+ proc_name=program,
+ port_range_start=6001,
+ internal_port_range_start=8001,
+ tls=True
+ )
+
+ with open(cls.client_all_cert, 'w') as f:
+ with open(cls.client_key_path, 'r') as g:
+ f.write(g.read())
+ with open(cls.client_cert_path, 'r') as g:
+ f.write(g.read())
+
+ cls.processHelper.run(number=3,
+ proc_args=[
+ '-cert-file=%s' % server_cert_path,
+ '-key-file=%s' % server_key_path,
+ '-ca-file=%s' % cls.ca_cert_path,
+ ])
+
+
+ def test_get_set_unauthenticated(self):
+ """ INTEGRATION: set/get a new value unauthenticated (http->https) """
+
+ client = etcd.Client(port=6001)
+
+ # See above for the reason of this change
+ self.assertRaises(
+ etcd.EtcdException, client.set, '/test_set', 'test-key')
+ self.assertRaises(etcd.EtcdException, client.get, '/test_set')
+
+ @nottest
+ def test_get_set_authenticated(self):
+ """ INTEGRATION: connecting to server with mutual auth """
+ # This gives an unexplicable ssl error, as connecting to the same
+ # Etcd cluster where this fails with the exact same code this
+ # doesn't fail
+
+ client = etcd.Client(
+ port=6001,
+ protocol='https',
+ cert=self.client_all_cert,
+ ca_cert=self.ca_cert_path
+ )
+
+ set_result = client.set('/test_set', 'test-key')
+ self.assertEquals(u'set', set_result.action.lower())
+ self.assertEquals(u'/test_set', set_result.key)
+ self.assertEquals(u'test-key', set_result.value)
+ get_result = client.get('/test_set')
+ self.assertEquals('get', get_result.action.lower())
+ self.assertEquals('/test_set', get_result.key)
+ self.assertEquals('test-key', get_result.value)
diff --git a/src/etcd/tests/unit/test_client.py b/src/etcd/tests/unit/test_client.py
index 3324b4d..2e09d7c 100644
--- a/src/etcd/tests/unit/test_client.py
+++ b/src/etcd/tests/unit/test_client.py
@@ -79,3 +79,21 @@ class TestClient(unittest.TestCase):
port=4003,
protocol='https')
assert client.base_uri == 'https://192.168.1.1:4003'
+
+ def test_set_use_proxies(self):
+ """ can set the use_proxies flag """
+ client = etcd.Client(use_proxies = True)
+ assert client._use_proxies
+
+ def test_allow_reconnect(self):
+ """ Fails if allow_reconnect is false and a list of hosts is given"""
+ with self.assertRaises(etcd.EtcdException):
+ etcd.Client(
+ host=(('localhost', 4001), ('localhost', 4002)),
+ )
+ # This doesn't raise an exception
+ client = etcd.Client(
+ host=(('localhost', 4001), ('localhost', 4002)),
+ allow_reconnect=True,
+ use_proxies=True,
+ )
diff --git a/src/etcd/tests/unit/test_leader.py b/src/etcd/tests/unit/test_leader.py
deleted file mode 100644
index 0d2b378..0000000
--- a/src/etcd/tests/unit/test_leader.py
+++ /dev/null
@@ -1,48 +0,0 @@
-import etcd
-import unittest
-
-from .test_request import TestClientApiBase
-
-try:
- import mock
-except ImportError:
- from unittest import mock
-
-class EtcdLeaderElectionTestCase(TestClientApiBase):
- def setUp(self):
- self.client = etcd.Client()
-
- def _mock_api(self, status, d):
- #We want to test at a lower level here.
- resp = self._prepare_response(status, d)
- self.client.http.request_encode_body = mock.create_autospec(
- self.client.http.request_encode_body, return_value=resp
- )
- self.client.http.request = mock.create_autospec(
- self.client.http.request, return_value=resp
- )
-
-
- def test_get_leader(self):
- """ Can fetch a leader value """
- self._mock_api(200, 'foo.example.com')
- self.assertEquals(self.client.election.get('/mysql'), 'foo.example.com')
- self._mock_api(200,'')
- self.assertRaises(etcd.EtcdException, self.client.election.get, '/mysql')
-
- def test_set_leader(self):
- """ Can set a leader value """
- self._mock_api(200, u'234')
- #set w/o a TTL or a name
- self.assertEquals(self.client.election.set('/mysql'), u'234')
- self.assertEquals(self.client.election.set(
- '/mysql',
- name='foo.example.com',
- ttl=60), u'234')
- self._mock_api(500, 'leader name required')
- self.assertRaises(etcd.EtcdException, self.client.election.set,'/mysql')
-
- def test_del_leader(self):
- """ Can remove a leader value """
- self._mock_api(200,'')
- self.assertTrue(self.client.election.delete('/mysql'))
diff --git a/src/etcd/tests/unit/test_old_request.py b/src/etcd/tests/unit/test_old_request.py
index 9367ebd..0d43713 100644
--- a/src/etcd/tests/unit/test_old_request.py
+++ b/src/etcd/tests/unit/test_old_request.py
@@ -10,40 +10,21 @@ from etcd import EtcdException
class FakeHTTPResponse(object):
- def __init__(self, status, data=''):
+ def __init__(self, status, data='', headers=None):
self.status = status
self.data = data.encode('utf-8')
+ self.headers = headers or {
+ "x-etcd-cluster-id": "abdef12345",
+ }
def getheaders(self):
- return {}
+ return self.headers
-class TestClientRequest(unittest.TestCase):
+ def getheader(self, header):
+ return self.headers[header]
- def test_machines(self):
- """ Can request machines """
- client = etcd.Client()
- client.api_execute = mock.Mock(
- return_value=FakeHTTPResponse(200, data=
- "http://127.0.0.1:4002,"
- " http://127.0.0.1:4001,"
- " http://127.0.0.1:4003,"
- " http://127.0.0.1:4001")
- )
- assert client.machines == [
- 'http://127.0.0.1:4002',
- 'http://127.0.0.1:4001',
- 'http://127.0.0.1:4003',
- 'http://127.0.0.1:4001'
- ]
-
- def test_leader(self):
- """ Can request the leader """
- client = etcd.Client()
- client.api_execute = mock.Mock(
- return_value=FakeHTTPResponse(200, "http://127.0.0.1:7002"))
- result = client.leader
- self.assertEquals('http://127.0.0.1:7002', result)
+class TestClientRequest(unittest.TestCase):
def test_set(self):
""" Can set a value """
@@ -332,21 +313,7 @@ class TestClientApiExecutor(unittest.TestCase):
except ValueError as e:
self.assertEquals('message : cause', str(e))
- def test_set_error(self):
- """ http post error request 102 """
- client = etcd.Client()
- response = FakeHTTPResponse(
- status=400,
- data='{"message": "message", "cause": "cause", "errorCode": 102}')
- client.http.request_encode_body = mock.Mock(return_value=response)
- payload = {'value': 'value', 'prevValue': 'oldValue', 'ttl': '60'}
- try:
- client.api_execute('/v2/keys/testkey', client._MPUT, payload)
- self.fail()
- except KeyError as e:
- self.assertEquals('message : cause', str(e))
-
- def test_set_error(self):
+ def test_set_not_file_error(self):
""" http post error request 102 """
client = etcd.Client()
response = FakeHTTPResponse(
@@ -365,23 +332,27 @@ class TestClientApiExecutor(unittest.TestCase):
client = etcd.Client()
response = FakeHTTPResponse(status=400,
data='{"message": "message",'
- ' "cause": "cause",'
- ' "errorCode": 42}')
+ ' "cause": "cause",'
+ ' "errorCode": 42}')
client.http.request = mock.Mock(return_value=response)
try:
client.api_execute('/v2/keys/testkey', client._MGET)
self.fail()
except etcd.EtcdException as e:
- self.assertTrue(
- str(e).startswith("Unable to decode server response"))
+ self.assertEqual(str(e), "message : cause")
def test_get_error_request_invalid(self):
""" http get error request invalid """
client = etcd.Client()
- response = FakeHTTPResponse(status=200,
- data='{){){)*garbage*')
+ response = FakeHTTPResponse(status=400,
+ data='{)*garbage')
client.http.request = mock.Mock(return_value=response)
- self.assertRaises(etcd.EtcdException, client.get, '/testkey')
+ try:
+ client.api_execute('/v2/keys/testkey', client._MGET)
+ self.fail()
+ except etcd.EtcdException as e:
+ self.assertEqual(str(e),
+ "Bad response : {)*garbage")
def test_get_error_invalid(self):
""" http get error request invalid """
diff --git a/src/etcd/tests/unit/test_request.py b/src/etcd/tests/unit/test_request.py
index 340bece..beee6ec 100644
--- a/src/etcd/tests/unit/test_request.py
+++ b/src/etcd/tests/unit/test_request.py
@@ -16,7 +16,7 @@ class TestClientApiBase(unittest.TestCase):
def setUp(self):
self.client = etcd.Client()
- def _prepare_response(self, s, d):
+ def _prepare_response(self, s, d, cluster_id=None):
if isinstance(d, dict):
data = json.dumps(d).encode('utf-8')
else:
@@ -25,10 +25,11 @@ class TestClientApiBase(unittest.TestCase):
r = mock.create_autospec(urllib3.response.HTTPResponse)()
r.status = s
r.data = data
+ r.getheader.return_value = cluster_id or "abcd1234"
return r
- def _mock_api(self, status, d):
- resp = self._prepare_response(status, d)
+ def _mock_api(self, status, d, cluster_id=None):
+ resp = self._prepare_response(status, d, cluster_id=cluster_id)
self.client.api_execute = mock.create_autospec(
self.client.api_execute, return_value=resp)
@@ -98,27 +99,96 @@ class TestClientApiInternals(TestClientApiBase):
(('/v2/keys/newdir', 'PUT'), dict(params={'dir': 'true'})))
-
class TestClientApiInterface(TestClientApiBase):
"""
All tests defined in this class are executed also in TestClientRequest.
If a test should be run only in this class, please override the method there.
"""
-
- def test_machines(self):
+ @mock.patch('urllib3.request.RequestMethods.request')
+ def test_machines(self, mocker):
""" Can request machines """
data = ['http://127.0.0.1:4001',
'http://127.0.0.1:4002', 'http://127.0.0.1:4003']
d = ','.join(data)
- self._mock_api(200, d)
+ mocker.return_value = self._prepare_response(200, d)
self.assertEquals(data, self.client.machines)
- def test_leader(self):
- """ Can request the leader """
- data = "http://127.0.0.1:4001"
+ @mock.patch('etcd.Client.machines', new_callable=mock.PropertyMock)
+ def test_use_proxies(self, mocker):
+ """Do not overwrite the machines cache when using proxies"""
+ mocker.return_value = ['https://10.0.0.2:4001', 'https://10.0.0.3:4001', 'https://10.0.0.4:4001']
+ c = etcd.Client(
+ host=(('localhost', 4001), ('localproxy', 4001)),
+ protocol='https',
+ allow_reconnect=True,
+ use_proxies=True
+ )
+
+ self.assertEquals(c._machines_cache, ['https://localproxy:4001'])
+ self.assertEquals(c._base_uri, 'https://localhost:4001')
+ self.assertNotIn(c.base_uri,c._machines_cache)
+
+ c = etcd.Client(
+ host=(('localhost', 4001), ('10.0.0.2',4001)),
+ protocol='https',
+ allow_reconnect=True,
+ use_proxies=False
+ )
+ self.assertIn('https://10.0.0.3:4001', c._machines_cache)
+ self.assertNotIn(c.base_uri,c._machines_cache)
+
+
+ def test_members(self):
+ """ Can request machines """
+ data = {
+ "members":
+ [
+ {
+ "id": "ce2a822cea30bfca",
+ "name": "default",
+ "peerURLs": ["http://localhost:2380", "http://localhost:7001"],
+ "clientURLs": ["http://127.0.0.1:4001"]
+ }
+ ]
+ }
self._mock_api(200, data)
- self.assertEquals(self.client.leader, data)
+ self.assertEquals(self.client.members["ce2a822cea30bfca"]["id"], "ce2a822cea30bfca")
+
+ def test_self_stats(self):
+ """ Request for stats """
+ data = {
+ "id": "eca0338f4ea31566",
+ "leaderInfo": {
+ "leader": "8a69d5f6b7814500",
+ "startTime": "2014-10-24T13:15:51.186620747-07:00",
+ "uptime": "10m59.322358947s"
+ },
+ "name": "node3",
+ "recvAppendRequestCnt": 5944,
+ "recvBandwidthRate": 570.6254930219969,
+ "recvPkgRate": 9.00892789741075,
+ "sendAppendRequestCnt": 0,
+ "startTime": "2014-10-24T13:15:50.072007085-07:00",
+ "state": "StateFollower"
+ }
+ self._mock_api(200,data)
+ self.assertEquals(self.client.stats['name'], "node3")
+
+ def test_leader_stats(self):
+ """ Request for leader stats """
+ data = {"leader": "924e2e83e93f2560", "followers": {}}
+ self._mock_api(200,data)
+ self.assertEquals(self.client.leader_stats['leader'], "924e2e83e93f2560")
+
+
+ @mock.patch('etcd.Client.members', new_callable=mock.PropertyMock)
+ def test_leader(self, mocker):
+ """ Can request the leader """
+ members = {"ce2a822cea30bfca": {"id": "ce2a822cea30bfca", "name": "default"}}
+ mocker.return_value = members
+ self._mock_api(200, {"leader": "ce2a822cea30bfca", "followers": {}})
+ self.assertEquals(self.client.leader, members["ce2a822cea30bfca"])
def test_set_plain(self):
""" Can set a value """
@@ -130,7 +200,7 @@ class TestClientApiInterface(TestClientApiBase):
u'ttl': 19,
u'value': u'test'
}
- }
+ }
self._mock_api(200, d)
res = self.client.write('/testkey', 'test')
@@ -347,121 +417,14 @@ class TestClientApiInterface(TestClientApiBase):
self.assertEquals(res, etcd.EtcdResult(**d))
-class EtcdLockTestCase(TestClientApiBase):
- def setUp(self):
- self.client = etcd.Client()
-
- def _mock_api(self, status, d):
- #We want to test at a lower level here.
- resp = self._prepare_response(status, d)
- self.client.http.request_encode_body = mock.create_autospec(
- self.client.http.request_encode_body, return_value=resp
- )
- self.client.http.request = mock.create_autospec(
- self.client.http.request, return_value=resp
- )
-
-
- def test_acquire_lock(self):
- """ Can get a lock. """
- key = u'/testkey'
- ttl = 1
- self._mock_api(200, '2')
- lock = self.client.get_lock(key, ttl=ttl)
- lock.acquire()
- self.assertEquals(lock._index, '2')
-
- def test_acquire_lock_invalid_ttl(self):
- """ Invalid TTL throws an error """
- key = u'/testkey'
- ttl = u'invalid'
- expected_index = u'invalid'
- self._mock_exception(etcd.EtcdException, u'invalid ttl: invalid')
- lock = self.client.get_lock(key, ttl=ttl)
- self.assertRaises(etcd.EtcdException, lock.acquire)
-
- def test_acquire_lock_with_context_manager(self):
- key = u'/testkey'
- ttl = 1
- self._mock_api(200, u'2')
- lock = self.client.get_lock(key, ttl=ttl)
- with lock:
- self.assertTrue(lock.is_locked())
- self._mock_api(200, u'')
- self.assertFalse(lock.is_locked())
-
- def test_is_locked(self):
- key = u'/testkey'
- ttl = 1
- self._mock_api(200, u'')
- lock = self.client.get_lock(key, ttl=ttl)
- self.assertFalse(lock.is_locked())
- self._mock_api(200, u'2')
- lock.acquire()
- self.assertTrue(lock.is_locked())
-
-
- def test_renew(self):
- key = '/testkey'
- ttl = 1
- self._mock_api(200, u'2')
- lock = self.client.get_lock(key, ttl=ttl)
- lock.acquire()
- self.assertTrue(lock.is_locked())
- self._mock_api(200, u'')
- lock.renew(2)
- self._mock_api(200, u'2')
- self.assertTrue(lock.is_locked())
-
- def test_renew_fails_if_expired(self):
- self._mock_api(200, u'4')
- lock = self.client.get_lock('/testlock', value='test', ttl=1).acquire()
- self._mock_api(500, 'renew lock error: cannot find: test')
- self.assertRaises(etcd.EtcdException, lock.renew, 10)
-
- def test_renew_fails_without_locking(self):
- key = u'/testkey'
- ttl = 1
- self._mock_exception(etcd.EtcdException,
- u'Cannot renew lock that is not locked')
- lock = self.client.get_lock(key, ttl=ttl)
- self.assertRaises(etcd.EtcdException, lock.renew, 2)
-
- def test_release(self):
- key = u'/testkey'
- ttl = 1
- index = u'2'
- self._mock_api(200, index)
- lock = self.client.get_lock(key, ttl=ttl)
- lock.acquire()
- self.assertTrue(lock.is_locked())
- self._mock_api(200, '')
- lock.release()
- self.assertFalse(lock.is_locked())
-
- def test_release_fails_without_locking(self):
- key = u'/testkey'
- ttl = 1
- self._mock_exception(etcd.EtcdException,
- u'Cannot release lock that is not locked')
- lock = self.client.get_lock(key, ttl=ttl)
- self.assertRaises(etcd.EtcdException, lock.release)
-
- def test_release_fails_if_expired(self):
- self._mock_api(200, u'4')
- lock = self.client.get_lock('/testlock', value='test', ttl=1).acquire()
- self._mock_api(500, 'release lock error: cannot find: test')
- self.assertRaises(etcd.EtcdException, lock.release)
-
-
-
class TestClientRequest(TestClientApiInterface):
def setUp(self):
- self.client = etcd.Client()
+ self.client = etcd.Client(expected_cluster_id="abcdef1234")
- def _mock_api(self, status, d):
+ def _mock_api(self, status, d, cluster_id=None):
resp = self._prepare_response(status, d)
+ resp.getheader.return_value = cluster_id or "abcdef1234"
self.client.http.request_encode_body = mock.create_autospec(
self.client.http.request_encode_body, return_value=resp
)
@@ -469,11 +432,13 @@ class TestClientRequest(TestClientApiInterface):
self.client.http.request, return_value=resp
)
- def _mock_error(self, error_code, msg, cause, method='PUT', fields=None):
+ def _mock_error(self, error_code, msg, cause, method='PUT', fields=None,
+ cluster_id=None):
resp = self._prepare_response(
500,
{'errorCode': error_code, 'message': msg, 'cause': cause}
)
+ resp.getheader.return_value = cluster_id or "abcdef1234"
self.client.http.request_encode_body = mock.create_autospec(
self.client.http.request_encode_body, return_value=resp
)
@@ -503,6 +468,22 @@ class TestClientRequest(TestClientApiInterface):
self.assertRaises(etcd.EtcdException,
self.client.api_execute, '/testpath/bar', 'TRACE')
+ def test_read_cluster_id_changed(self):
+ """ Read timeout set to the default """
+ d = {u'action': u'set',
+ u'node': {
+ u'expiration': u'2013-09-14T00:56:59.316195568+02:00',
+ u'modifiedIndex': 6,
+ u'key': u'/testkey',
+ u'ttl': 19,
+ u'value': u'test'
+ }
+ }
+ self._mock_api(200, d, cluster_id="notabcd1234")
+ self.assertRaises(etcd.EtcdClusterIdChanged,
+ self.client.read, '/testkey')
+ self.client.read("/testkey")
+
def test_not_in(self):
pass