diff options
author | Giuseppe Lavagetto <lavagetto@gmail.com> | 2015-11-27 18:41:25 +0100 |
---|---|---|
committer | Giuseppe Lavagetto <lavagetto@gmail.com> | 2015-11-28 16:50:10 +0100 |
commit | dd38063e371eec384907c8220366fc836bde6a00 (patch) | |
tree | 031accac4734004213d1a24b0c99b5a4413bd1ae | |
parent | 4d8231afd9f1b7f11aa9863a6bef89f7ea311dde (diff) |
Move the boilerplate retry logic to a wrapper, add api_execute_json
Since what we have in AuthClient right now is not DRY at all, and that
needs to be fixed before it ships in any release.
-rw-r--r-- | src/etcd/client.py | 189 |
1 files changed, 104 insertions, 85 deletions
diff --git a/src/etcd/client.py b/src/etcd/client.py index 5bb69f4..16eb4c8 100644 --- a/src/etcd/client.py +++ b/src/etcd/client.py @@ -19,6 +19,7 @@ import urllib3.util import json import ssl import dns.resolver +from functools import wraps import etcd try: @@ -772,95 +773,114 @@ class Client(object): _log.info("Selected new etcd server %s", mach) return mach + def _wrap_request(payload): + @wraps(payload) + def wrapper(self, path, method, params=None, timeout=None): + some_request_failed = False + response = False + + if timeout is None: + timeout = self.read_timeout + + if timeout == 0: + timeout = None + + if not path.startswith('/'): + raise ValueError('Path does not start with /') + + while not response: + try: + response = payload(self, path, method, + params=params, timeout=timeout) + # Check the cluster ID hasn't changed under us. We use + # preload_content=False above so we can read the headers + # before we wait for the content of a watch. + self._check_cluster_id(response) + # Now force the data to be preloaded in order to trigger any + # IO-related errors in this method rather than when we try to + # access it later. + _ = response.data + # urllib3 doesn't wrap all httplib exceptions and earlier versions + # don't wrap socket errors either. + except (urllib3.exceptions.HTTPError, + HTTPException, socket.error) as e: + if (params.get("wait") == "true" and + isinstance(e, + urllib3.exceptions.ReadTimeoutError)): + _log.debug("Watch timed out.") + raise etcd.EtcdWatchTimedOut( + "Watch timed out: %r" % e, + cause=e + ) + _log.error("Request to server %s failed: %r", + self._base_uri, e) + if self._allow_reconnect: + _log.info("Reconnection allowed, looking for another " + "server.") + # _next_server() raises EtcdException if there are no + # machines left to try, breaking out of the loop. + self._base_uri = self._next_server(cause=e) + some_request_failed = True + else: + _log.debug("Reconnection disabled, giving up.") + raise etcd.EtcdConnectionFailed( + "Connection to etcd failed due to %r" % e, + cause=e + ) + except: + _log.exception("Unexpected request failure, re-raising.") + raise + + if some_request_failed: + if not self._use_proxies: + # The cluster may have changed since last invocation + self._machines_cache = self.machines + self._machines_cache.remove(self._base_uri) + return self._handle_server_response(response) + return wrapper + + @_wrap_request def api_execute(self, path, method, params=None, timeout=None): """ Executes the query. """ - - some_request_failed = False - response = False - - if timeout is None: - timeout = self.read_timeout - - if timeout == 0: - timeout = None - - if not path.startswith('/'): - raise ValueError('Path does not start with /') - - while not response: - try: - url = self._base_uri + path - - if (method == self._MGET) or (method == self._MDELETE): - response = self.http.request( - method, - url, - timeout=timeout, - fields=params, - redirect=self.allow_redirect, - headers=self._get_headers(), - preload_content=False) - - elif (method == self._MPUT) or (method == self._MPOST): - response = self.http.request_encode_body( - method, - url, - fields=params, - timeout=timeout, - encode_multipart=False, - redirect=self.allow_redirect, - headers=self._get_headers(), - preload_content=False) - else: + url = self._base_uri + path + + if (method == self._MGET) or (method == self._MDELETE): + return self.http.request( + method, + url, + timeout=timeout, + fields=params, + redirect=self.allow_redirect, + headers=self._get_headers(), + preload_content=False) + + elif (method == self._MPUT) or (method == self._MPOST): + return self.http.request_encode_body( + method, + url, + fields=params, + timeout=timeout, + encode_multipart=False, + redirect=self.allow_redirect, + headers=self._get_headers(), + preload_content=False) + else: raise etcd.EtcdException( 'HTTP method {} not supported'.format(method)) - # Check the cluster ID hasn't changed under us. We use - # preload_content=False above so we can read the headers - # before we wait for the content of a watch. - self._check_cluster_id(response) - # Now force the data to be preloaded in order to trigger any - # IO-related errors in this method rather than when we try to - # access it later. - _ = response.data - # urllib3 doesn't wrap all httplib exceptions and earlier versions - # don't wrap socket errors either. - except (urllib3.exceptions.HTTPError, - HTTPException, - socket.error) as e: - if (params.get("wait") == "true" and - isinstance(e, urllib3.exceptions.ReadTimeoutError)): - _log.debug("Watch timed out.") - raise etcd.EtcdWatchTimedOut( - "Watch timed out: %r" % e, - cause=e - ) - - _log.error("Request to server %s failed: %r", - self._base_uri, e) - if self._allow_reconnect: - _log.info("Reconnection allowed, looking for another " - "server.") - # _next_server() raises EtcdException if there are no - # machines left to try, breaking out of the loop. - self._base_uri = self._next_server(cause=e) - some_request_failed = True - else: - _log.debug("Reconnection disabled, giving up.") - raise etcd.EtcdConnectionFailed( - "Connection to etcd failed due to %r" % e, - cause=e - ) - except: - _log.exception("Unexpected request failure, re-raising.") - raise - - if some_request_failed: - if not self._use_proxies: - # The cluster may have changed since last invocation - self._machines_cache = self.machines - self._machines_cache.remove(self._base_uri) - return self._handle_server_response(response) + @_wrap_request + def api_execute_json(self, path, method, params=None, timeout=None): + url = self._base_uri + path + json_payload = json.dumps(params) + headers = self._get_headers() + headers['Content-Type'] = 'application/json' + return self.http.urlopen(method, + url, + body=json_payload, + timeout=timeout, + redirect=self.allow_redirect, + headers=headers, + preload_content=False) def _check_cluster_id(self, response): cluster_id = response.getheader("x-etcd-cluster-id") @@ -902,4 +922,3 @@ class Client(object): credentials = ':'.join((self.username, self.password)) return urllib3.make_headers(basic_auth=credentials) return {} - |