diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/etcd/__init__.py | 143 | ||||
-rw-r--r-- | src/etcd/client.py | 275 | ||||
-rw-r--r-- | src/etcd/election.py | 79 | ||||
-rw-r--r-- | src/etcd/lock.py | 106 | ||||
-rw-r--r-- | src/etcd/tests/integration/helpers.py | 36 | ||||
-rw-r--r-- | src/etcd/tests/integration/test_election.py | 34 | ||||
-rw-r--r-- | src/etcd/tests/integration/test_lock.py | 92 | ||||
-rw-r--r-- | src/etcd/tests/integration/test_simple.py | 198 | ||||
-rw-r--r-- | src/etcd/tests/integration/test_ssl.py | 180 | ||||
-rw-r--r-- | src/etcd/tests/unit/test_client.py | 18 | ||||
-rw-r--r-- | src/etcd/tests/unit/test_leader.py | 48 | ||||
-rw-r--r-- | src/etcd/tests/unit/test_old_request.py | 69 | ||||
-rw-r--r-- | src/etcd/tests/unit/test_request.py | 227 |
13 files changed, 689 insertions, 816 deletions
diff --git a/src/etcd/__init__.py b/src/etcd/__init__.py index 18ef036..370be42 100644 --- a/src/etcd/__init__.py +++ b/src/etcd/__init__.py @@ -1,7 +1,18 @@ -import collections +import logging from .client import Client -from .lock import Lock -from .election import LeaderElection + +_log = logging.getLogger(__name__) + +# Prevent "no handler" warnings to stderr in projects that do not configure +# logging. +try: + from logging import NullHandler +except ImportError: + # Python <2.7, just define it. + class NullHandler(logging.Handler): + def emit(self, record): + pass +_log.addHandler(NullHandler()) class EtcdResult(object): @@ -109,7 +120,30 @@ class EtcdException(Exception): """ def __init__(self, message=None, payload=None): super(Exception, self).__init__(message) - self.payload=payload + self.payload = payload + + +class EtcdValueError(EtcdException, ValueError): + """ + Base class for Etcd value-related errors. + """ + pass + + +class EtcdCompareFailed(EtcdValueError): + """ + Compare-and-swap failure + """ + pass + + +class EtcdClusterIdChanged(EtcdException): + """ + The etcd cluster ID changed. This may indicate the cluster was replaced + with a backup. Raised to prevent waiting on an etcd_index that was only + valid on the old cluster. + """ + pass class EtcdKeyError(EtcdException): @@ -118,72 +152,123 @@ class EtcdKeyError(EtcdException): """ pass + class EtcdKeyNotFound(EtcdKeyError): """ Etcd key not found exception (100) """ pass + class EtcdNotFile(EtcdKeyError): """ Etcd not a file exception (102) """ pass + class EtcdNotDir(EtcdKeyError): """ Etcd not a directory exception (104) """ pass + class EtcdAlreadyExist(EtcdKeyError): """ Etcd already exist exception (105) """ pass + class EtcdEventIndexCleared(EtcdException): """ Etcd event index is outdated and cleared exception (401) """ pass + +class EtcdConnectionFailed(EtcdException): + """ + Connection to etcd failed. + """ + pass + + +class EtcdWatcherCleared(EtcdException): + """ + Watcher is cleared due to etcd recovery. + """ + pass + + +class EtcdLeaderElectionInProgress(EtcdException): + """ + Request failed due to in-progress leader election. + """ + pass + + +class EtcdRootReadOnly(EtcdKeyError): + """ + Operation is not valid on the root, which is read only. + """ + pass + + +class EtcdDirNotEmpty(EtcdValueError): + """ + Directory not empty. + """ + pass + + class EtcdError(object): # See https://github.com/coreos/etcd/blob/master/Documentation/errorcode.md error_exceptions = { 100: EtcdKeyNotFound, - 101: ValueError, + 101: EtcdCompareFailed, 102: EtcdNotFile, - 103: Exception, + # 103: Non-public: no more peers. 104: EtcdNotDir, 105: EtcdAlreadyExist, - 106: KeyError, - 200: ValueError, - 201: ValueError, - 202: ValueError, - 203: ValueError, - 209: ValueError, - 300: Exception, - 301: Exception, - 400: Exception, + # 106: Non-public: key is preserved. + 107: EtcdRootReadOnly, + 108: EtcdDirNotEmpty, + # 109: Non-public: existing peer addr. + + 200: EtcdValueError, + 201: EtcdValueError, + 202: EtcdValueError, + 203: EtcdValueError, + 204: EtcdValueError, + 205: EtcdValueError, + 206: EtcdValueError, + 207: EtcdValueError, + 208: EtcdValueError, + 209: EtcdValueError, + 210: EtcdValueError, + + # 300: Non-public: Raft internal error. + 301: EtcdLeaderElectionInProgress, + + 400: EtcdWatcherCleared, 401: EtcdEventIndexCleared, - 500: EtcdException } @classmethod - def handle(cls, errorCode=None, message=None, cause=None, **kwdargs): - """ Decodes the error and throws the appropriate error message""" - try: - msg = '{} : {}'.format(message, cause) - payload={'errorCode': errorCode, 'message': message, 'cause': cause} - if len(kwdargs) > 0: - for key in kwdargs: - payload[key]=kwdargs[key] - exc = cls.error_exceptions[errorCode] - except: - msg = "Unable to decode server response" - exc = EtcdException - if exc in [EtcdException, EtcdKeyNotFound, EtcdNotFile, EtcdNotDir, EtcdAlreadyExist, EtcdEventIndexCleared]: + def handle(cls, payload): + """ + Decodes the error and throws the appropriate error message + + :param payload: The decoded JSON error payload as a dict. + """ + error_code = payload.get("errorCode") + message = payload.get("message") + cause = payload.get("cause") + msg = '{} : {}'.format(message, cause) + exc = cls.error_exceptions.get(error_code, EtcdException) + if issubclass(exc, EtcdException): raise exc(msg, payload) else: raise exc(msg) diff --git a/src/etcd/client.py b/src/etcd/client.py index 4bd746f..a578add 100644 --- a/src/etcd/client.py +++ b/src/etcd/client.py @@ -6,12 +6,28 @@ """ +import logging +try: + # Python 3 + from http.client import HTTPException +except ImportError: + # Python 2 + from httplib import HTTPException +import socket import urllib3 +import urllib3.util import json import ssl - import etcd +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + + +_log = logging.getLogger(__name__) + class Client(object): @@ -24,7 +40,7 @@ class Client(object): _MPOST = 'POST' _MDELETE = 'DELETE' _comparison_conditions = set(('prevValue', 'prevIndex', 'prevExist')) - _read_options = set(('recursive', 'wait', 'waitIndex', 'sorted', 'consistent')) + _read_options = set(('recursive', 'wait', 'waitIndex', 'sorted', 'quorum')) _del_conditions = set(('prevValue', 'prevIndex')) def __init__( self, @@ -37,6 +53,8 @@ class Client(object): cert=None, ca_cert=None, allow_reconnect=False, + use_proxies=False, + expected_cluster_id=None, ): """ Initialize the client. @@ -66,28 +84,39 @@ class Client(object): etcd server in the cluster in the case the default one does not respond. - """ - self._machines_cache = [] + use_proxies (bool): we are using a list of proxies to which we connect, + and don't want to connect to the original etcd cluster. + expected_cluster_id (str): If a string, recorded as the expected + UUID of the cluster (rather than + learning it from the first request), + reads will raise EtcdClusterIdChanged + if they receive a response with a + different cluster ID. + """ + _log.info("New etcd client created for %s:%s%s", + host, port, version_prefix) self._protocol = protocol def uri(protocol, host, port): return '%s://%s:%d' % (protocol, host, port) if not isinstance(host, tuple): - self._host = host - self._port = port + self._machines_cache = [] + self._base_uri = uri(self._protocol, host, port) else: - self._host, self._port = host[0] - self._machines_cache.extend( - [uri(self._protocol, *conn) for conn in host]) - - self._base_uri = uri(self._protocol, self._host, self._port) + if not allow_reconnect: + _log.error("List of hosts incompatible with allow_reconnect.") + raise etcd.EtcdException("A list of hosts to connect to was given, but reconnection not allowed?") + self._machines_cache = [uri(self._protocol, *conn) for conn in host] + self._base_uri = self._machines_cache.pop(0) + self.expected_cluster_id = expected_cluster_id self.version_prefix = version_prefix self._read_timeout = read_timeout self._allow_redirect = allow_redirect + self._use_proxies = use_proxies self._allow_reconnect = allow_reconnect # SSL Client certificate support @@ -100,6 +129,7 @@ class Client(object): if protocol == 'https': # If we don't allow TLSv1, clients using older version of OpenSSL # (<1.0) won't be able to connect. + _log.debug("HTTPS enabled.") kw['ssl_version'] = ssl.PROTOCOL_TLSv1 if cert: @@ -119,11 +149,24 @@ class Client(object): if self._allow_reconnect: # we need the set of servers in the cluster in order to try - # reconnecting upon error. - self._machines_cache = self.machines - self._machines_cache.remove(self._base_uri) - else: - self._machines_cache = [] + # reconnecting upon error. The cluster members will be + # added to the hosts list you provided. If you are using + # proxies, set all + # + # Beware though: if you input '127.0.0.1' as your host and + # etcd advertises 'localhost', both will be in the + # resulting list. + + # If we're connecting to the original cluster, we can + # extend the list given to the client with what we get + # from self.machines + if not self._use_proxies: + self._machines_cache = list(set(self._machines_cache) | + set(self.machines)) + if self._base_uri in self._machines_cache: + self._machines_cache.remove(self._base_uri) + _log.debug("Machines cache initialised to %s", + self._machines_cache) @property def base_uri(self): @@ -133,12 +176,12 @@ class Client(object): @property def host(self): """Node to connect etcd.""" - return self._host + return urlparse(self._base_uri).netloc.split(':')[0] @property def port(self): """Port to connect etcd.""" - return self._port + return int(urlparse(self._base_uri).netloc.split(':')[1]) @property def protocol(self): @@ -166,24 +209,107 @@ class Client(object): >>> print client.machines ['http://127.0.0.1:4001', 'http://127.0.0.1:4002'] """ - return [ - node.strip() for node in self.api_execute( - self.version_prefix + '/machines', - self._MGET).data.decode('utf-8').split(',') - ] + # We can't use api_execute here, or it causes a logical loop + try: + uri = self._base_uri + self.version_prefix + '/machines' + response = self.http.request( + self._MGET, + uri, + timeout=self.read_timeout, + redirect=self.allow_redirect) + + machines = [ + node.strip() for node in + self._handle_server_response(response).data.decode('utf-8').split(',') + ] + _log.debug("Retrieved list of machines: %s", machines) + return machines + except (urllib3.exceptions.HTTPError, + HTTPException, + socket.error) as e: + # We can't get the list of machines, if one server is in the + # machines cache, try on it + _log.error("Failed to get list of machines from %s%s: %r", + self._base_uri, self.version_prefix, e) + if self._machines_cache: + self._base_uri = self._machines_cache.pop(0) + _log.info("Retrying on %s", self._base_uri) + # Call myself + return self.machines + else: + raise etcd.EtcdException("Could not get the list of servers, " + "maybe you provided the wrong " + "host(s) to connect to?") + + @property + def members(self): + """ + A more structured view of peers in the cluster. + + Note that while we have an internal DS called _members, accessing the public property will call etcd. + """ + # Empty the members list + self._members = {} + try: + data = self.api_execute(self.version_prefix + '/members', + self._MGET).data.decode('utf-8') + res = json.loads(data) + for member in res['members']: + self._members[member['id']] = member + return self._members + except: + raise etcd.EtcdException("Could not get the members list, maybe the cluster has gone away?") @property def leader(self): """ Returns: - str. the leader of the cluster. + dict. the leader of the cluster. >>> print client.leader - 'http://127.0.0.1:4001' + {"id":"ce2a822cea30bfca","name":"default","peerURLs":["http://localhost:2380","http://localhost:7001"],"clientURLs":["http://127.0.0.1:4001"]} """ - return self.api_execute( - self.version_prefix + '/leader', - self._MGET).data.decode('ascii') + try: + + leader = json.loads( + self.api_execute(self.version_prefix + '/stats/leader', + self._MGET).data.decode('utf-8')) + return self.members[leader['leader']] + except Exception as e: + raise etcd.EtcdException("Cannot get leader data: %s" % e) + + @property + def stats(self): + """ + Returns: + dict. the stats of the local server + """ + return self._stats() + + @property + def leader_stats(self): + """ + Returns: + dict. the stats of the leader + """ + return self._stats('leader') + + @property + def store_stats(self): + """ + Returns: + dict. the stats of the kv store + """ + return self._stats('store') + + def _stats(self, what='self'): + """ Internal method to access the stats endpoints""" + data = self.api_execute(self.version_prefix + + '/stats/' + what, self._MGET).data.decode('utf-8') + try: + return json.loads(data) + except (TypeError,ValueError): + raise etcd.EtcdException("Cannot parse json data in the response") @property def key_endpoint(self): @@ -242,6 +368,8 @@ class Client(object): 'newValue' """ + _log.info("Writing %s to key %s ttl=%s dir=%s append=%s", + value, key, ttl, dir, append) key = self._sanitize_key(key) params = {} if value is not None: @@ -285,6 +413,7 @@ class Client(object): obj (etcd.EtcdResult): The object that needs updating. """ + _log.info("Updating %s to %s.", obj.key, obj.value) kwdargs = { 'dir': obj.dir, 'ttl': obj.ttl, @@ -294,11 +423,8 @@ class Client(object): if not obj.dir: # prevIndex on a dir causes a 'not a file' error. d'oh! kwdargs['prevIndex'] = obj.modifiedIndex - return self.write(obj.key, obj.value, **kwdargs) - - def read(self, key, **kwdargs): """ Returns the value of the key 'key'. @@ -331,6 +457,7 @@ class Client(object): 'value' """ + _log.info("Issuing read for key %s with args %s", key, kwdargs) key = self._sanitize_key(key) params = {} @@ -344,7 +471,8 @@ class Client(object): timeout = kwdargs.get('timeout', None) response = self.api_execute( - self.key_endpoint + key, self._MGET, params=params, timeout=timeout) + self.key_endpoint + key, self._MGET, params=params, + timeout=timeout) return self._result_from_response(response) def delete(self, key, recursive=None, dir=None, **kwdargs): @@ -376,6 +504,8 @@ class Client(object): '/key' """ + _log.info("Deleting %s recursive=%s dir=%s extra args=%s", + key, recursive, dir, kwdargs) key = self._sanitize_key(key) kwds = {} @@ -387,6 +517,7 @@ class Client(object): for k in self._del_conditions: if k in kwdargs: kwds[k] = kwdargs[k] + _log.debug("Calculated params = %s", kwds) response = self.api_execute( self.key_endpoint + key, self._MDELETE, params=kwds) @@ -509,6 +640,7 @@ class Client(object): 'value' """ + _log.debug("About to wait on key %s, index %s", key, index) if index: return self.read(key, wait=True, waitIndex=index, timeout=timeout, recursive=recursive) @@ -538,16 +670,15 @@ class Client(object): local_index = index while True: response = self.watch(key, index=local_index, timeout=0, recursive=recursive) - if local_index is not None: - local_index += 1 + local_index = response.modifiedIndex + 1 yield response def get_lock(self, *args, **kwargs): - return etcd.Lock(self, *args, **kwargs) + raise NotImplementedError('Lock primitives were removed from etcd 2.0') @property def election(self): - return etcd.LeaderElection(self) + raise NotImplementedError('Election primitives were removed from etcd 2.0') def _result_from_response(self, response): """ Creates an EtcdResult from json dictionary """ @@ -564,10 +695,16 @@ class Client(object): def _next_server(self): """ Selects the next server in the list, refreshes the server list. """ + _log.debug("Selection next machine in cache. Available machines: %s", + self._machines_cache) try: - return self._machines_cache.pop() + mach = self._machines_cache.pop() except IndexError: - raise etcd.EtcdException('No more machines in the cluster') + _log.error("Machines cache is empty, no machines to try.") + raise etcd.EtcdConnectionFailed('No more machines in the cluster') + else: + _log.info("Selected new etcd server %s", mach) + return mach def api_execute(self, path, method, params=None, timeout=None): """ Executes the query. """ @@ -594,7 +731,8 @@ class Client(object): url, timeout=timeout, fields=params, - redirect=self.allow_redirect) + redirect=self.allow_redirect, + preload_content=False) elif (method == self._MPUT) or (method == self._MPOST): response = self.http.request_encode_body( @@ -603,17 +741,55 @@ class Client(object): fields=params, timeout=timeout, encode_multipart=False, - redirect=self.allow_redirect) + redirect=self.allow_redirect, + preload_content=False) else: raise etcd.EtcdException( 'HTTP method {} not supported'.format(method)) - except urllib3.exceptions.MaxRetryError: - self._base_uri = self._next_server() - some_request_failed = True + # urllib3 doesn't wrap all httplib exceptions and earlier versions + # don't wrap socket errors either. + except (urllib3.exceptions.HTTPError, + HTTPException, + socket.error) as e: + _log.error("Request to server %s failed: %r", + self._base_uri, e) + if self._allow_reconnect: + _log.info("Reconnection allowed, looking for another " + "server.") + # _next_server() raises EtcdException if there are no + # machines left to try, breaking out of the loop. + self._base_uri = self._next_server() + some_request_failed = True + else: + _log.info("Reconnection disabled, giving up.") + raise etcd.EtcdConnectionFailed( + "Connection to etcd failed due to %r" % e) + except: + _log.exception("Unexpected request failure, re-raising.") + raise + + else: + # Check the cluster ID hasn't changed under us. We use + # preload_content=False above so we can read the headers + # before we wait for the content of a long poll. + cluster_id = response.getheader("x-etcd-cluster-id") + id_changed = (self.expected_cluster_id and + cluster_id != self.expected_cluster_id) + # Update the ID so we only raise the exception once. + self.expected_cluster_id = cluster_id + if id_changed: + # Defensive: clear the pool so that we connect afresh next + # time. + self.http.clear() + raise etcd.EtcdClusterIdChanged( + 'The UUID of the cluster changed from {} to ' + '{}.'.format(self.expected_cluster_id, cluster_id)) if some_request_failed: - self._machines_cache = self.machines + if not self._use_proxies: + # The cluster may have changed since last invocation + self._machines_cache = self.machines self._machines_cache.remove(self._base_uri) return self._handle_server_response(response) @@ -628,9 +804,8 @@ class Client(object): # throw the appropriate exception try: r = json.loads(resp) - except ValueError: - r = None - if r: - etcd.EtcdError.handle(**r) - else: - raise etcd.EtcdException(resp) + except (TypeError, ValueError): + # Bad JSON, make a response locally. + r = {"message": "Bad response", + "cause": str(resp)} + etcd.EtcdError.handle(r) diff --git a/src/etcd/election.py b/src/etcd/election.py deleted file mode 100644 index 25ea403..0000000 --- a/src/etcd/election.py +++ /dev/null @@ -1,79 +0,0 @@ -import etcd -import platform - - -class LeaderElection(object): - - """ - Leader Election class using the etcd module - """ - - def __init__(self, client): - """ - Initialize a leader election object. - - Args: - client (etcd.Client): etcd client to use for the connection - """ - self.client = client - - def get_path(self, key): - if key.startswith('/'): - return '/mod/v2/leader{}'.format(key) - return '/mod/v2/leader/{}'.format(key) - - def set(self, key, name=None, ttl=0, timeout=None): - """ - Initialize a leader election object. - - Args: - key (string): name of the leader key, - - ttl (int): ttl (in seconds) for the lock to live. - - name (string): the name to store as the leader name. Defaults to the - client's hostname - - """ - - name = name or platform.node() - params = {'ttl': ttl, 'name': name} - path = self.get_path(key) - - res = self.client.api_execute( - path, self.client._MPUT, params=params, timeout=timeout) - return res.data.decode('utf-8') - - def get(self, key): - """ - Get the name of a leader object. - - Args: - key (string): name of the leader key, - - Raises: - etcd.EtcdException - - """ - res = self.client.api_execute(self.get_path(key), self.client._MGET) - if not res.data: - raise etcd.EtcdException('Leader path {} not found'.format(key)) - return res.data.decode('utf-8') - - def delete(self, key, name=None): - """ - Delete a leader object. - - Args: - key (string): the leader key, - - name (string): name of the elected leader - - Raises: - etcd.EtcdException - - """ - path = self.get_path(key) - name = name or platform.node() - res = self.client.api_execute(path, self.client._MDELETE, {'name': name}) - return res.data.decode('utf-8') == '' diff --git a/src/etcd/lock.py b/src/etcd/lock.py deleted file mode 100644 index 9068576..0000000 --- a/src/etcd/lock.py +++ /dev/null @@ -1,106 +0,0 @@ -import contextlib - -import etcd - - -class Lock(object): - - """ - Lock object using etcd's lock module. - """ - - def __init__(self, client, key, ttl=0, value=None): - """ - Initialize a lock object. - - Args: - client (Client): etcd client to use for communication. - - key (string): key to lock. - - ttl (int): ttl (in seconds) for the lock to live. - 0 or None to lock forever. - - value (mixed): value to store on the lock. - """ - self.client = client - if not key.startswith('/'): - key = '/' + key - self.key = key - self.ttl = ttl - self.value = value - self._index = None - - def __enter__(self): - self.acquire() - - def __exit__(self, type, value, traceback): - self.release() - - @property - def _path(self): - return u'/mod/v2/lock{}'.format(self.key) - - def acquire(self, timeout=None): - """Acquire the lock from etcd. Blocks until lock is acquired.""" - params = {u'ttl': self.ttl} - if self.value is not None: - params[u'value'] = self.value - - res = self.client.api_execute( - self._path, self.client._MPOST, params=params, timeout=timeout) - self._index = res.data.decode('utf-8') - return self - - def get(self): - """ - Get Information on the lock. - This allows to operate on locks that have not been acquired directly. - """ - res = self.client.api_execute(self._path, self.client._MGET) - if res.data: - self.value = res.data.decode('utf-8') - else: - raise etcd.EtcdException('Lock is non-existent (or expired)') - self._get_index() - return self - - def _get_index(self): - res = self.client.api_execute( - self._path, - self.client._MGET, - {u'field': u'index'}) - if not res.data: - raise etcd.EtcdException('Lock is non-existent (or expired)') - self._index = res.data.decode('utf-8') - - def is_locked(self): - """Check if lock is currently locked.""" - params = {u'field': u'index'} - res = self.client.api_execute( - self._path, self.client._MGET, params=params) - return bool(res.data) - - def release(self): - """Release this lock.""" - if not self._index: - raise etcd.EtcdException( - u'Cannot release lock that has not been locked') - params = {u'index': self._index} - res = self.client.api_execute( - self._path, self.client._MDELETE, params=params) - self._index = None - - def renew(self, new_ttl, timeout=None): - """ - Renew the TTL on this lock. - - Args: - new_ttl (int): new TTL to set. - """ - if not self._index: - raise etcd.EtcdException( - u'Cannot renew lock that has not been locked') - params = {u'ttl': new_ttl, u'index': self._index} - res = self.client.api_execute( - self._path, self.client._MPUT, params=params) diff --git a/src/etcd/tests/integration/helpers.py b/src/etcd/tests/integration/helpers.py index 9b1c127..6c7e21c 100644 --- a/src/etcd/tests/integration/helpers.py +++ b/src/etcd/tests/integration/helpers.py @@ -17,7 +17,9 @@ class EtcdProcessHelper(object): proc_name='etcd', port_range_start=4001, internal_port_range_start=7001, - cluster=False): + cluster=False, + tls=False + ): self.base_directory = base_directory self.proc_name = proc_name @@ -25,8 +27,17 @@ class EtcdProcessHelper(object): self.internal_port_range_start = internal_port_range_start self.processes = {} self.cluster = cluster - - def run(self, number=1, proc_args=None): + self.schema = 'http://' + if tls: + self.schema = 'https://' + + def run(self, number=1, proc_args=[]): + if number > 1: + initial_cluster = ",".join([ "test-node-{}={}127.0.0.1:{}".format(slot, 'http://', self.internal_port_range_start + slot) for slot in range(0, number)]) + proc_args.extend([ + '-initial-cluster', initial_cluster, + '-initial-cluster-state', 'new' + ]) for i in range(0, number): self.add_one(i, proc_args) @@ -42,23 +53,22 @@ class EtcdProcessHelper(object): prefix='python-etcd.%d-' % slot) log.debug('Created directory %s' % directory) + client = '%s127.0.0.1:%d' % (self.schema, self.port_range_start + slot) + peer = '%s127.0.0.1:%d' % ('http://', self.internal_port_range_start + + slot) daemon_args = [ self.proc_name, '-data-dir', directory, '-name', 'test-node-%d' % slot, - '-peer-addr', '127.0.0.1:%d' % (self.internal_port_range_start + - slot), - '-addr', '127.0.0.1:%d' % (self.port_range_start + slot), + '-initial-advertise-peer-urls', peer, + '-listen-peer-urls', peer, + '-advertise-client-urls', client, + '-listen-client-urls', client ] if proc_args: daemon_args.extend(proc_args) - if slot > 0 and self.cluster: - daemon_args.append('-peers') - daemon_args.append( - '127.0.0.1:%d' % self.internal_port_range_start) - daemon = subprocess.Popen(daemon_args) log.debug('Started %d' % daemon.pid) log.debug('Params: %s' % daemon_args) @@ -158,6 +168,10 @@ class TestingCA(object): "extendedKeyUsage".encode('ascii'), False, "clientAuth,serverAuth".encode('ascii')), + crypto.X509Extension( + "subjectAltName".encode('ascii'), + False, + "IP: 127.0.0.1".encode('ascii')), ]) cert.gmtime_adj_notBefore(0) diff --git a/src/etcd/tests/integration/test_election.py b/src/etcd/tests/integration/test_election.py deleted file mode 100644 index 46ea2bb..0000000 --- a/src/etcd/tests/integration/test_election.py +++ /dev/null @@ -1,34 +0,0 @@ -import etcd -from . import test_simple -import time -import unittest - -class TestElection(test_simple.EtcdIntegrationTest): - def setUp(self): - self.client = etcd.Client(port=6001) - - def test_set_get_delete(self): - e = self.client.election - res = e.set('/mysql', name='foo.example.com', ttl=30) - self.assertTrue(res != '') - res = e.get('/mysql') - self.assertEquals(res, 'foo.example.com') - self.assertTrue(e.delete('/mysql', name='foo.example.com')) - - - def test_set_invalid_ttl(self): - self.assertRaises(ValueError, self.client.election.set, '/mysql', name='foo.example.com', ttl='ciao') - - def test_get_non_existing(self): - """This is actually expected to fail. See https://github.com/coreos/etcd/issues/446""" - self.assertRaises(etcd.EtcdException, self.client.election.get, '/foobar') - - def test_delete_non_existing(self): - self.assertRaises(etcd.EtcdException, self.client.election.delete, '/foobar') - - def test_get_delete_after_ttl_expired_raises(self): - e = self.client.election - e.set('/mysql', name='foo', ttl=1) - time.sleep(2) - self.assertRaises(etcd.EtcdException, e.get, '/mysql') - self.assertRaises(etcd.EtcdKeyNotFound, e.delete, '/mysql', name='foo') diff --git a/src/etcd/tests/integration/test_lock.py b/src/etcd/tests/integration/test_lock.py deleted file mode 100644 index a5d8744..0000000 --- a/src/etcd/tests/integration/test_lock.py +++ /dev/null @@ -1,92 +0,0 @@ -import etcd -from . import test_simple -import time - -class TestLocks(test_simple.EtcdIntegrationTest): - def setUp(self): - self.client = etcd.Client(port=6001) - - def test_acquire_lock(self): - """ Can acquire a lock. """ - key = '/testkey' - ttl = 1 - expected_index = '2' - lock = self.client.get_lock(key, ttl=ttl).acquire() - self.assertEquals(lock._index, expected_index) - lock.release() - self.assertEquals(lock._index, None) - - def test_acquire_lock_invalid_ttl(self): - """ Invalid TTL throws an error """ - key = '/testkey' - ttl = 'invalid' - expected_index = 'invalid' - lock = self.client.get_lock(key, ttl=ttl) - self.assertRaises(ValueError, lock.acquire) - - def test_acquire_lock_with_context_manager(self): - key = '/testkey' - ttl = 1 - lock = self.client.get_lock(key, ttl=ttl) - with lock: - self.assertTrue(lock.is_locked()) - self.assertFalse(lock.is_locked()) - - def test_is_locked(self): - key = '/testkey' - ttl = 1 - lock = self.client.get_lock(key, ttl=ttl) - self.assertFalse(lock.is_locked()) - lock.acquire() - self.assertTrue(lock.is_locked()) - lock.release() - - def test_is_locked_on_expired_key(self): - key = '/testkey' - ttl = 1 - lock = self.client.get_lock(key, value='test', ttl=ttl).acquire() - time.sleep(3) - self.assertFalse(lock.is_locked()) - - def test_renew(self): - key = '/testkey' - ttl = 1 - lock = self.client.get_lock(key, ttl=ttl) - lock.acquire() - self.assertTrue(lock.is_locked()) - lock.renew(2) - # TODO sleep(1)? - self.assertTrue(lock.is_locked()) - lock.release() - - def test_renew_fails_without_locking(self): - key = '/testkey' - ttl = 1 - lock = self.client.get_lock(key, ttl=ttl) - self.assertEquals(lock._index, None) - self.assertRaises(etcd.EtcdException, lock.renew, 2) - - def test_release(self): - key = '/testkey' - ttl = 1 - index = '2' - lock = self.client.get_lock(key, ttl=ttl) - lock.acquire() - self.assertTrue(lock.is_locked()) - lock.release() - self.assertFalse(lock.is_locked()) - - def test_release_fails_without_locking(self): - key = '/testkey' - ttl = 1 - lock = self.client.get_lock(key, ttl=ttl) - self.assertEquals(lock._index, None) - self.assertRaises(etcd.EtcdException, lock.release) - - def test_get(self): - key = '/testkey' - ttl = 5 - with self.client.get_lock(key, ttl=ttl) as lock: - lock2 = self.client.get_lock(key).get() - self.assertTrue(lock2.is_locked()) - self.assertFalse(lock2.is_locked()) diff --git a/src/etcd/tests/integration/test_simple.py b/src/etcd/tests/integration/test_simple.py index 9283cc5..c275d6e 100644 --- a/src/etcd/tests/integration/test_simple.py +++ b/src/etcd/tests/integration/test_simple.py @@ -64,10 +64,9 @@ class TestSimple(EtcdIntegrationTest): def test_machines(self): """ INTEGRATION: retrieve machines """ self.assertEquals(self.client.machines[0], 'http://127.0.0.1:6001') - def test_leader(self): """ INTEGRATION: retrieve leader """ - self.assertEquals(self.client.leader, 'http://127.0.0.1:8001') + self.assertEquals(self.client.leader['clientURLs'], ['http://127.0.0.1:6001']) def test_get_set_delete(self): """ INTEGRATION: set a new value """ @@ -217,7 +216,8 @@ class TestClusterFunctions(EtcdIntegrationTest): self.processHelper.run(number=3) self.client = etcd.Client(port=6001, allow_reconnect=False) self.processHelper.kill_one(0) - self.assertRaises(etcd.EtcdException, self.client.get, '/test_set') + self.assertRaises(etcd.EtcdConnectionFailed, self.client.get, + '/test_set') def test_reconnet_fails(self): """ INTEGRATION: fails to reconnect if no available machines """ @@ -235,42 +235,6 @@ class TestClusterFunctions(EtcdIntegrationTest): self.processHelper.kill_one(0) self.assertRaises(etcd.EtcdException, self.client.get, '/test_set') - def test_reconnect_to_failed_node(self): - """ INTEGRATION: after a server failed and recovered we can connect.""" - - self.processHelper.stop() - # Start with three instances (0, 1, 2) - self.processHelper.run(number=3) - - # Connect to instance 0 - self.client = etcd.Client(port=6001, allow_reconnect=True) - set_result = self.client.set('/test_set', 'test-key1') - - get_result = self.client.get('/test_set') - self.assertEquals('test-key1', get_result.value) - - # kill 1 -> instances = (0, 2) - self.processHelper.kill_one(1) - - get_result = self.client.get('/test_set') - self.assertEquals('test-key1', get_result.value) - - # kill 0 -> Instances (2) - self.processHelper.kill_one(0) - - get_result = self.client.get('/test_set') - self.assertEquals('test-key1', get_result.value) - - # Add 0 (failed server) -> Instances (0,2) - self.processHelper.add_one(0) - # Instances (0, 2) - - # kill 2 -> Instances (0) (previously failed) - self.processHelper.kill_one(2) - - get_result = self.client.get('/test_set') - self.assertEquals('test-key1', get_result.value) - class TestWatch(EtcdIntegrationTest): @@ -422,159 +386,3 @@ class TestWatch(EtcdIntegrationTest): watcher.join(timeout=5) proc.join(timeout=5) - - -class TestAuthenticatedAccess(EtcdIntegrationTest): - - @classmethod - def setUpClass(cls): - program = cls._get_exe() - cls.directory = tempfile.mkdtemp(prefix='python-etcd') - - cls.ca_cert_path = os.path.join(cls.directory, 'ca.crt') - ca_key_path = os.path.join(cls.directory, 'ca.key') - - cls.ca2_cert_path = os.path.join(cls.directory, 'ca2.crt') - ca2_key_path = os.path.join(cls.directory, 'ca2.key') - - server_cert_path = os.path.join(cls.directory, 'server.crt') - server_key_path = os.path.join(cls.directory, 'server.key') - - ca, ca_key = helpers.TestingCA.create_test_ca_certificate( - cls.ca_cert_path, ca_key_path, 'TESTCA') - - ca2, ca2_key = helpers.TestingCA.create_test_ca_certificate( - cls.ca2_cert_path, ca2_key_path, 'TESTCA2') - - helpers.TestingCA.create_test_certificate( - ca, ca_key, server_cert_path, server_key_path, '127.0.0.1') - - cls.processHelper = helpers.EtcdProcessHelper( - cls.directory, - proc_name=program, - port_range_start=6001, - internal_port_range_start=8001) - - cls.processHelper.run(number=3, - proc_args=[ - '-cert-file=%s' % server_cert_path, - '-key-file=%s' % server_key_path - ]) - - def test_get_set_unauthenticated(self): - """ INTEGRATION: set/get a new value unauthenticated (http->https) """ - - client = etcd.Client(port=6001) - - # Since python 3 raises a MaxRetryError here, this gets caught in - # different code blocks in python 2 and python 3, thus messages are - # different. Python 3 does the right thing(TM), for the record - self.assertRaises( - etcd.EtcdException, client.set, '/test_set', 'test-key') - - self.assertRaises(etcd.EtcdException, client.get, '/test_set') - - def test_get_set_unauthenticated_missing_ca(self): - """ INTEGRATION: try unauthenticated w/out validation (https->https)""" - - client = etcd.Client(protocol='https', port=6001) - set_result = client.set('/test_set', 'test-key') - get_result = client.get('/test_set') - - def test_get_set_unauthenticated_with_ca(self): - """ INTEGRATION: try unauthenticated w/out validation (https->https)""" - - client = etcd.Client( - protocol='https', port=6001, ca_cert=self.ca2_cert_path) - - self.assertRaises(urllib3.exceptions.SSLError, client.set, '/test-set', 'test-key') - self.assertRaises(urllib3.exceptions.SSLError, client.get, '/test-set') - - def test_get_set_authenticated(self): - """ INTEGRATION: set/get a new value authenticated """ - - client = etcd.Client( - port=6001, protocol='https', ca_cert=self.ca_cert_path) - - set_result = client.set('/test_set', 'test-key') - get_result = client.get('/test_set') - - -class TestClientAuthenticatedAccess(EtcdIntegrationTest): - - @classmethod - def setUpClass(cls): - program = cls._get_exe() - cls.directory = tempfile.mkdtemp(prefix='python-etcd') - - cls.ca_cert_path = os.path.join(cls.directory, 'ca.crt') - ca_key_path = os.path.join(cls.directory, 'ca.key') - - server_cert_path = os.path.join(cls.directory, 'server.crt') - server_key_path = os.path.join(cls.directory, 'server.key') - - cls.client_cert_path = os.path.join(cls.directory, 'client.crt') - cls.client_key_path = os.path.join(cls.directory, 'client.key') - - cls.client_all_cert = os.path.join(cls.directory, 'client-all.crt') - - ca, ca_key = helpers.TestingCA.create_test_ca_certificate( - cls.ca_cert_path, ca_key_path) - - helpers.TestingCA.create_test_certificate( - ca, ca_key, server_cert_path, server_key_path, '127.0.0.1') - - helpers.TestingCA.create_test_certificate( - ca, - ca_key, - cls.client_cert_path, - cls.client_key_path) - - cls.processHelper = helpers.EtcdProcessHelper( - cls.directory, - proc_name=program, - port_range_start=6001, - internal_port_range_start=8001) - - with open(cls.client_all_cert, 'w') as f: - with open(cls.client_key_path, 'r') as g: - f.write(g.read()) - with open(cls.client_cert_path, 'r') as g: - f.write(g.read()) - - cls.processHelper.run(number=3, - proc_args=[ - '-cert-file=%s' % server_cert_path, - '-key-file=%s' % server_key_path, - '-ca-file=%s' % cls.ca_cert_path - ]) - - def test_get_set_unauthenticated(self): - """ INTEGRATION: set/get a new value unauthenticated (http->https) """ - - client = etcd.Client(port=6001) - - # See above for the reason of this change - self.assertRaises( - etcd.EtcdException, client.set, '/test_set', 'test-key') - self.assertRaises(etcd.EtcdException, client.get, '/test_set') - - def test_get_set_authenticated(self): - """ INTEGRATION: connecting to server with mutual auth """ - - client = etcd.Client( - port=6001, - protocol='https', - cert=self.client_all_cert, - ca_cert=self.ca_cert_path - ) - - set_result = client.set('/test_set', 'test-key') - self.assertEquals('set', set_result.action.lower()) - self.assertEquals('/test_set', set_result.key) - self.assertEquals('test-key', set_result.value) - - get_result = client.get('/test_set') - self.assertEquals('get', get_result.action.lower()) - self.assertEquals('/test_set', get_result.key) - self.assertEquals('test-key', get_result.value) diff --git a/src/etcd/tests/integration/test_ssl.py b/src/etcd/tests/integration/test_ssl.py new file mode 100644 index 0000000..16185c2 --- /dev/null +++ b/src/etcd/tests/integration/test_ssl.py @@ -0,0 +1,180 @@ +import os +import time +import shutil +import logging +import unittest +import multiprocessing +import tempfile + +import urllib3 + +import etcd +from . import helpers +from . import test_simple +from nose.tools import nottest + +log = logging.getLogger() + +class TestEncryptedAccess(test_simple.EtcdIntegrationTest): + + @classmethod + def setUpClass(cls): + program = cls._get_exe() + cls.directory = tempfile.mkdtemp(prefix='python-etcd') + + cls.ca_cert_path = os.path.join(cls.directory, 'ca.crt') + ca_key_path = os.path.join(cls.directory, 'ca.key') + + cls.ca2_cert_path = os.path.join(cls.directory, 'ca2.crt') + ca2_key_path = os.path.join(cls.directory, 'ca2.key') + + server_cert_path = os.path.join(cls.directory, 'server.crt') + server_key_path = os.path.join(cls.directory, 'server.key') + + ca, ca_key = helpers.TestingCA.create_test_ca_certificate( + cls.ca_cert_path, ca_key_path, 'TESTCA') + + ca2, ca2_key = helpers.TestingCA.create_test_ca_certificate( + cls.ca2_cert_path, ca2_key_path, 'TESTCA2') + + helpers.TestingCA.create_test_certificate( + ca, ca_key, server_cert_path, server_key_path, '127.0.0.1') + + cls.processHelper = helpers.EtcdProcessHelper( + cls.directory, + proc_name=program, + port_range_start=6001, + internal_port_range_start=8001, + tls=True + ) + + cls.processHelper.run(number=3, + proc_args=[ + '-cert-file=%s' % server_cert_path, + '-key-file=%s' % server_key_path + ]) + + def test_get_set_unauthenticated(self): + """ INTEGRATION: set/get a new value unauthenticated (http->https) """ + + client = etcd.Client(port=6001) + + # Since python 3 raises a MaxRetryError here, this gets caught in + # different code blocks in python 2 and python 3, thus messages are + # different. Python 3 does the right thing(TM), for the record + self.assertRaises( + etcd.EtcdException, client.set, '/test_set', 'test-key') + + self.assertRaises(etcd.EtcdException, client.get, '/test_set') + + @nottest + def test_get_set_unauthenticated_missing_ca(self): + """ INTEGRATION: try unauthenticated w/out validation (https->https)""" + # This doesn't work for now and will need further inspection + client = etcd.Client(protocol='https', port=6001) + set_result = client.set('/test_set', 'test-key') + get_result = client.get('/test_set') + + + def test_get_set_unauthenticated_with_ca(self): + """ INTEGRATION: try unauthenticated with validation (https->https)""" + client = etcd.Client( + protocol='https', port=6001, ca_cert=self.ca2_cert_path) + + self.assertRaises(urllib3.exceptions.SSLError, client.set, '/test-set', 'test-key') + self.assertRaises(urllib3.exceptions.SSLError, client.get, '/test-set') + + def test_get_set_authenticated(self): + """ INTEGRATION: set/get a new value authenticated """ + + client = etcd.Client( + port=6001, protocol='https', ca_cert=self.ca_cert_path) + + set_result = client.set('/test_set', 'test-key') + get_result = client.get('/test_set') + + +class TestClientAuthenticatedAccess(test_simple.EtcdIntegrationTest): + + @classmethod + def setUpClass(cls): + program = cls._get_exe() + cls.directory = tempfile.mkdtemp(prefix='python-etcd') + + cls.ca_cert_path = os.path.join(cls.directory, 'ca.crt') + ca_key_path = os.path.join(cls.directory, 'ca.key') + + server_cert_path = os.path.join(cls.directory, 'server.crt') + server_key_path = os.path.join(cls.directory, 'server.key') + + cls.client_cert_path = os.path.join(cls.directory, 'client.crt') + cls.client_key_path = os.path.join(cls.directory, 'client.key') + + cls.client_all_cert = os.path.join(cls.directory, 'client-all.crt') + + ca, ca_key = helpers.TestingCA.create_test_ca_certificate( + cls.ca_cert_path, ca_key_path) + + helpers.TestingCA.create_test_certificate( + ca, ca_key, server_cert_path, server_key_path, '127.0.0.1') + + helpers.TestingCA.create_test_certificate( + ca, + ca_key, + cls.client_cert_path, + cls.client_key_path) + + cls.processHelper = helpers.EtcdProcessHelper( + cls.directory, + proc_name=program, + port_range_start=6001, + internal_port_range_start=8001, + tls=True + ) + + with open(cls.client_all_cert, 'w') as f: + with open(cls.client_key_path, 'r') as g: + f.write(g.read()) + with open(cls.client_cert_path, 'r') as g: + f.write(g.read()) + + cls.processHelper.run(number=3, + proc_args=[ + '-cert-file=%s' % server_cert_path, + '-key-file=%s' % server_key_path, + '-ca-file=%s' % cls.ca_cert_path, + ]) + + + def test_get_set_unauthenticated(self): + """ INTEGRATION: set/get a new value unauthenticated (http->https) """ + + client = etcd.Client(port=6001) + + # See above for the reason of this change + self.assertRaises( + etcd.EtcdException, client.set, '/test_set', 'test-key') + self.assertRaises(etcd.EtcdException, client.get, '/test_set') + + @nottest + def test_get_set_authenticated(self): + """ INTEGRATION: connecting to server with mutual auth """ + # This gives an unexplicable ssl error, as connecting to the same + # Etcd cluster where this fails with the exact same code this + # doesn't fail + + client = etcd.Client( + port=6001, + protocol='https', + cert=self.client_all_cert, + ca_cert=self.ca_cert_path + ) + + set_result = client.set('/test_set', 'test-key') + self.assertEquals(u'set', set_result.action.lower()) + self.assertEquals(u'/test_set', set_result.key) + self.assertEquals(u'test-key', set_result.value) + get_result = client.get('/test_set') + self.assertEquals('get', get_result.action.lower()) + self.assertEquals('/test_set', get_result.key) + self.assertEquals('test-key', get_result.value) diff --git a/src/etcd/tests/unit/test_client.py b/src/etcd/tests/unit/test_client.py index 3324b4d..2e09d7c 100644 --- a/src/etcd/tests/unit/test_client.py +++ b/src/etcd/tests/unit/test_client.py @@ -79,3 +79,21 @@ class TestClient(unittest.TestCase): port=4003, protocol='https') assert client.base_uri == 'https://192.168.1.1:4003' + + def test_set_use_proxies(self): + """ can set the use_proxies flag """ + client = etcd.Client(use_proxies = True) + assert client._use_proxies + + def test_allow_reconnect(self): + """ Fails if allow_reconnect is false and a list of hosts is given""" + with self.assertRaises(etcd.EtcdException): + etcd.Client( + host=(('localhost', 4001), ('localhost', 4002)), + ) + # This doesn't raise an exception + client = etcd.Client( + host=(('localhost', 4001), ('localhost', 4002)), + allow_reconnect=True, + use_proxies=True, + ) diff --git a/src/etcd/tests/unit/test_leader.py b/src/etcd/tests/unit/test_leader.py deleted file mode 100644 index 0d2b378..0000000 --- a/src/etcd/tests/unit/test_leader.py +++ /dev/null @@ -1,48 +0,0 @@ -import etcd -import unittest - -from .test_request import TestClientApiBase - -try: - import mock -except ImportError: - from unittest import mock - -class EtcdLeaderElectionTestCase(TestClientApiBase): - def setUp(self): - self.client = etcd.Client() - - def _mock_api(self, status, d): - #We want to test at a lower level here. - resp = self._prepare_response(status, d) - self.client.http.request_encode_body = mock.create_autospec( - self.client.http.request_encode_body, return_value=resp - ) - self.client.http.request = mock.create_autospec( - self.client.http.request, return_value=resp - ) - - - def test_get_leader(self): - """ Can fetch a leader value """ - self._mock_api(200, 'foo.example.com') - self.assertEquals(self.client.election.get('/mysql'), 'foo.example.com') - self._mock_api(200,'') - self.assertRaises(etcd.EtcdException, self.client.election.get, '/mysql') - - def test_set_leader(self): - """ Can set a leader value """ - self._mock_api(200, u'234') - #set w/o a TTL or a name - self.assertEquals(self.client.election.set('/mysql'), u'234') - self.assertEquals(self.client.election.set( - '/mysql', - name='foo.example.com', - ttl=60), u'234') - self._mock_api(500, 'leader name required') - self.assertRaises(etcd.EtcdException, self.client.election.set,'/mysql') - - def test_del_leader(self): - """ Can remove a leader value """ - self._mock_api(200,'') - self.assertTrue(self.client.election.delete('/mysql')) diff --git a/src/etcd/tests/unit/test_old_request.py b/src/etcd/tests/unit/test_old_request.py index 9367ebd..0d43713 100644 --- a/src/etcd/tests/unit/test_old_request.py +++ b/src/etcd/tests/unit/test_old_request.py @@ -10,40 +10,21 @@ from etcd import EtcdException class FakeHTTPResponse(object): - def __init__(self, status, data=''): + def __init__(self, status, data='', headers=None): self.status = status self.data = data.encode('utf-8') + self.headers = headers or { + "x-etcd-cluster-id": "abdef12345", + } def getheaders(self): - return {} + return self.headers -class TestClientRequest(unittest.TestCase): + def getheader(self, header): + return self.headers[header] - def test_machines(self): - """ Can request machines """ - client = etcd.Client() - client.api_execute = mock.Mock( - return_value=FakeHTTPResponse(200, data= - "http://127.0.0.1:4002," - " http://127.0.0.1:4001," - " http://127.0.0.1:4003," - " http://127.0.0.1:4001") - ) - assert client.machines == [ - 'http://127.0.0.1:4002', - 'http://127.0.0.1:4001', - 'http://127.0.0.1:4003', - 'http://127.0.0.1:4001' - ] - - def test_leader(self): - """ Can request the leader """ - client = etcd.Client() - client.api_execute = mock.Mock( - return_value=FakeHTTPResponse(200, "http://127.0.0.1:7002")) - result = client.leader - self.assertEquals('http://127.0.0.1:7002', result) +class TestClientRequest(unittest.TestCase): def test_set(self): """ Can set a value """ @@ -332,21 +313,7 @@ class TestClientApiExecutor(unittest.TestCase): except ValueError as e: self.assertEquals('message : cause', str(e)) - def test_set_error(self): - """ http post error request 102 """ - client = etcd.Client() - response = FakeHTTPResponse( - status=400, - data='{"message": "message", "cause": "cause", "errorCode": 102}') - client.http.request_encode_body = mock.Mock(return_value=response) - payload = {'value': 'value', 'prevValue': 'oldValue', 'ttl': '60'} - try: - client.api_execute('/v2/keys/testkey', client._MPUT, payload) - self.fail() - except KeyError as e: - self.assertEquals('message : cause', str(e)) - - def test_set_error(self): + def test_set_not_file_error(self): """ http post error request 102 """ client = etcd.Client() response = FakeHTTPResponse( @@ -365,23 +332,27 @@ class TestClientApiExecutor(unittest.TestCase): client = etcd.Client() response = FakeHTTPResponse(status=400, data='{"message": "message",' - ' "cause": "cause",' - ' "errorCode": 42}') + ' "cause": "cause",' + ' "errorCode": 42}') client.http.request = mock.Mock(return_value=response) try: client.api_execute('/v2/keys/testkey', client._MGET) self.fail() except etcd.EtcdException as e: - self.assertTrue( - str(e).startswith("Unable to decode server response")) + self.assertEqual(str(e), "message : cause") def test_get_error_request_invalid(self): """ http get error request invalid """ client = etcd.Client() - response = FakeHTTPResponse(status=200, - data='{){){)*garbage*') + response = FakeHTTPResponse(status=400, + data='{)*garbage') client.http.request = mock.Mock(return_value=response) - self.assertRaises(etcd.EtcdException, client.get, '/testkey') + try: + client.api_execute('/v2/keys/testkey', client._MGET) + self.fail() + except etcd.EtcdException as e: + self.assertEqual(str(e), + "Bad response : {)*garbage") def test_get_error_invalid(self): """ http get error request invalid """ diff --git a/src/etcd/tests/unit/test_request.py b/src/etcd/tests/unit/test_request.py index 340bece..beee6ec 100644 --- a/src/etcd/tests/unit/test_request.py +++ b/src/etcd/tests/unit/test_request.py @@ -16,7 +16,7 @@ class TestClientApiBase(unittest.TestCase): def setUp(self): self.client = etcd.Client() - def _prepare_response(self, s, d): + def _prepare_response(self, s, d, cluster_id=None): if isinstance(d, dict): data = json.dumps(d).encode('utf-8') else: @@ -25,10 +25,11 @@ class TestClientApiBase(unittest.TestCase): r = mock.create_autospec(urllib3.response.HTTPResponse)() r.status = s r.data = data + r.getheader.return_value = cluster_id or "abcd1234" return r - def _mock_api(self, status, d): - resp = self._prepare_response(status, d) + def _mock_api(self, status, d, cluster_id=None): + resp = self._prepare_response(status, d, cluster_id=cluster_id) self.client.api_execute = mock.create_autospec( self.client.api_execute, return_value=resp) @@ -98,27 +99,96 @@ class TestClientApiInternals(TestClientApiBase): (('/v2/keys/newdir', 'PUT'), dict(params={'dir': 'true'}))) - class TestClientApiInterface(TestClientApiBase): """ All tests defined in this class are executed also in TestClientRequest. If a test should be run only in this class, please override the method there. """ - - def test_machines(self): + @mock.patch('urllib3.request.RequestMethods.request') + def test_machines(self, mocker): """ Can request machines """ data = ['http://127.0.0.1:4001', 'http://127.0.0.1:4002', 'http://127.0.0.1:4003'] d = ','.join(data) - self._mock_api(200, d) + mocker.return_value = self._prepare_response(200, d) self.assertEquals(data, self.client.machines) - def test_leader(self): - """ Can request the leader """ - data = "http://127.0.0.1:4001" + @mock.patch('etcd.Client.machines', new_callable=mock.PropertyMock) + def test_use_proxies(self, mocker): + """Do not overwrite the machines cache when using proxies""" + mocker.return_value = ['https://10.0.0.2:4001', 'https://10.0.0.3:4001', 'https://10.0.0.4:4001'] + c = etcd.Client( + host=(('localhost', 4001), ('localproxy', 4001)), + protocol='https', + allow_reconnect=True, + use_proxies=True + ) + + self.assertEquals(c._machines_cache, ['https://localproxy:4001']) + self.assertEquals(c._base_uri, 'https://localhost:4001') + self.assertNotIn(c.base_uri,c._machines_cache) + + c = etcd.Client( + host=(('localhost', 4001), ('10.0.0.2',4001)), + protocol='https', + allow_reconnect=True, + use_proxies=False + ) + self.assertIn('https://10.0.0.3:4001', c._machines_cache) + self.assertNotIn(c.base_uri,c._machines_cache) + + + def test_members(self): + """ Can request machines """ + data = { + "members": + [ + { + "id": "ce2a822cea30bfca", + "name": "default", + "peerURLs": ["http://localhost:2380", "http://localhost:7001"], + "clientURLs": ["http://127.0.0.1:4001"] + } + ] + } self._mock_api(200, data) - self.assertEquals(self.client.leader, data) + self.assertEquals(self.client.members["ce2a822cea30bfca"]["id"], "ce2a822cea30bfca") + + def test_self_stats(self): + """ Request for stats """ + data = { + "id": "eca0338f4ea31566", + "leaderInfo": { + "leader": "8a69d5f6b7814500", + "startTime": "2014-10-24T13:15:51.186620747-07:00", + "uptime": "10m59.322358947s" + }, + "name": "node3", + "recvAppendRequestCnt": 5944, + "recvBandwidthRate": 570.6254930219969, + "recvPkgRate": 9.00892789741075, + "sendAppendRequestCnt": 0, + "startTime": "2014-10-24T13:15:50.072007085-07:00", + "state": "StateFollower" + } + self._mock_api(200,data) + self.assertEquals(self.client.stats['name'], "node3") + + def test_leader_stats(self): + """ Request for leader stats """ + data = {"leader": "924e2e83e93f2560", "followers": {}} + self._mock_api(200,data) + self.assertEquals(self.client.leader_stats['leader'], "924e2e83e93f2560") + + + @mock.patch('etcd.Client.members', new_callable=mock.PropertyMock) + def test_leader(self, mocker): + """ Can request the leader """ + members = {"ce2a822cea30bfca": {"id": "ce2a822cea30bfca", "name": "default"}} + mocker.return_value = members + self._mock_api(200, {"leader": "ce2a822cea30bfca", "followers": {}}) + self.assertEquals(self.client.leader, members["ce2a822cea30bfca"]) def test_set_plain(self): """ Can set a value """ @@ -130,7 +200,7 @@ class TestClientApiInterface(TestClientApiBase): u'ttl': 19, u'value': u'test' } - } + } self._mock_api(200, d) res = self.client.write('/testkey', 'test') @@ -347,121 +417,14 @@ class TestClientApiInterface(TestClientApiBase): self.assertEquals(res, etcd.EtcdResult(**d)) -class EtcdLockTestCase(TestClientApiBase): - def setUp(self): - self.client = etcd.Client() - - def _mock_api(self, status, d): - #We want to test at a lower level here. - resp = self._prepare_response(status, d) - self.client.http.request_encode_body = mock.create_autospec( - self.client.http.request_encode_body, return_value=resp - ) - self.client.http.request = mock.create_autospec( - self.client.http.request, return_value=resp - ) - - - def test_acquire_lock(self): - """ Can get a lock. """ - key = u'/testkey' - ttl = 1 - self._mock_api(200, '2') - lock = self.client.get_lock(key, ttl=ttl) - lock.acquire() - self.assertEquals(lock._index, '2') - - def test_acquire_lock_invalid_ttl(self): - """ Invalid TTL throws an error """ - key = u'/testkey' - ttl = u'invalid' - expected_index = u'invalid' - self._mock_exception(etcd.EtcdException, u'invalid ttl: invalid') - lock = self.client.get_lock(key, ttl=ttl) - self.assertRaises(etcd.EtcdException, lock.acquire) - - def test_acquire_lock_with_context_manager(self): - key = u'/testkey' - ttl = 1 - self._mock_api(200, u'2') - lock = self.client.get_lock(key, ttl=ttl) - with lock: - self.assertTrue(lock.is_locked()) - self._mock_api(200, u'') - self.assertFalse(lock.is_locked()) - - def test_is_locked(self): - key = u'/testkey' - ttl = 1 - self._mock_api(200, u'') - lock = self.client.get_lock(key, ttl=ttl) - self.assertFalse(lock.is_locked()) - self._mock_api(200, u'2') - lock.acquire() - self.assertTrue(lock.is_locked()) - - - def test_renew(self): - key = '/testkey' - ttl = 1 - self._mock_api(200, u'2') - lock = self.client.get_lock(key, ttl=ttl) - lock.acquire() - self.assertTrue(lock.is_locked()) - self._mock_api(200, u'') - lock.renew(2) - self._mock_api(200, u'2') - self.assertTrue(lock.is_locked()) - - def test_renew_fails_if_expired(self): - self._mock_api(200, u'4') - lock = self.client.get_lock('/testlock', value='test', ttl=1).acquire() - self._mock_api(500, 'renew lock error: cannot find: test') - self.assertRaises(etcd.EtcdException, lock.renew, 10) - - def test_renew_fails_without_locking(self): - key = u'/testkey' - ttl = 1 - self._mock_exception(etcd.EtcdException, - u'Cannot renew lock that is not locked') - lock = self.client.get_lock(key, ttl=ttl) - self.assertRaises(etcd.EtcdException, lock.renew, 2) - - def test_release(self): - key = u'/testkey' - ttl = 1 - index = u'2' - self._mock_api(200, index) - lock = self.client.get_lock(key, ttl=ttl) - lock.acquire() - self.assertTrue(lock.is_locked()) - self._mock_api(200, '') - lock.release() - self.assertFalse(lock.is_locked()) - - def test_release_fails_without_locking(self): - key = u'/testkey' - ttl = 1 - self._mock_exception(etcd.EtcdException, - u'Cannot release lock that is not locked') - lock = self.client.get_lock(key, ttl=ttl) - self.assertRaises(etcd.EtcdException, lock.release) - - def test_release_fails_if_expired(self): - self._mock_api(200, u'4') - lock = self.client.get_lock('/testlock', value='test', ttl=1).acquire() - self._mock_api(500, 'release lock error: cannot find: test') - self.assertRaises(etcd.EtcdException, lock.release) - - - class TestClientRequest(TestClientApiInterface): def setUp(self): - self.client = etcd.Client() + self.client = etcd.Client(expected_cluster_id="abcdef1234") - def _mock_api(self, status, d): + def _mock_api(self, status, d, cluster_id=None): resp = self._prepare_response(status, d) + resp.getheader.return_value = cluster_id or "abcdef1234" self.client.http.request_encode_body = mock.create_autospec( self.client.http.request_encode_body, return_value=resp ) @@ -469,11 +432,13 @@ class TestClientRequest(TestClientApiInterface): self.client.http.request, return_value=resp ) - def _mock_error(self, error_code, msg, cause, method='PUT', fields=None): + def _mock_error(self, error_code, msg, cause, method='PUT', fields=None, + cluster_id=None): resp = self._prepare_response( 500, {'errorCode': error_code, 'message': msg, 'cause': cause} ) + resp.getheader.return_value = cluster_id or "abcdef1234" self.client.http.request_encode_body = mock.create_autospec( self.client.http.request_encode_body, return_value=resp ) @@ -503,6 +468,22 @@ class TestClientRequest(TestClientApiInterface): self.assertRaises(etcd.EtcdException, self.client.api_execute, '/testpath/bar', 'TRACE') + def test_read_cluster_id_changed(self): + """ Read timeout set to the default """ + d = {u'action': u'set', + u'node': { + u'expiration': u'2013-09-14T00:56:59.316195568+02:00', + u'modifiedIndex': 6, + u'key': u'/testkey', + u'ttl': 19, + u'value': u'test' + } + } + self._mock_api(200, d, cluster_id="notabcd1234") + self.assertRaises(etcd.EtcdClusterIdChanged, + self.client.read, '/testkey') + self.client.read("/testkey") + def test_not_in(self): pass |