summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiuseppe Lavagetto <lavagetto@gmail.com>2015-11-27 18:41:25 +0100
committerGiuseppe Lavagetto <lavagetto@gmail.com>2015-11-28 16:50:10 +0100
commitdd38063e371eec384907c8220366fc836bde6a00 (patch)
tree031accac4734004213d1a24b0c99b5a4413bd1ae
parent4d8231afd9f1b7f11aa9863a6bef89f7ea311dde (diff)
Move the boilerplate retry logic to a wrapper, add api_execute_json
Since what we have in AuthClient right now is not DRY at all, and that needs to be fixed before it ships in any release.
-rw-r--r--src/etcd/client.py189
1 files changed, 104 insertions, 85 deletions
diff --git a/src/etcd/client.py b/src/etcd/client.py
index 5bb69f4..16eb4c8 100644
--- a/src/etcd/client.py
+++ b/src/etcd/client.py
@@ -19,6 +19,7 @@ import urllib3.util
import json
import ssl
import dns.resolver
+from functools import wraps
import etcd
try:
@@ -772,95 +773,114 @@ class Client(object):
_log.info("Selected new etcd server %s", mach)
return mach
+ def _wrap_request(payload):
+ @wraps(payload)
+ def wrapper(self, path, method, params=None, timeout=None):
+ some_request_failed = False
+ response = False
+
+ if timeout is None:
+ timeout = self.read_timeout
+
+ if timeout == 0:
+ timeout = None
+
+ if not path.startswith('/'):
+ raise ValueError('Path does not start with /')
+
+ while not response:
+ try:
+ response = payload(self, path, method,
+ params=params, timeout=timeout)
+ # Check the cluster ID hasn't changed under us. We use
+ # preload_content=False above so we can read the headers
+ # before we wait for the content of a watch.
+ self._check_cluster_id(response)
+ # Now force the data to be preloaded in order to trigger any
+ # IO-related errors in this method rather than when we try to
+ # access it later.
+ _ = response.data
+ # urllib3 doesn't wrap all httplib exceptions and earlier versions
+ # don't wrap socket errors either.
+ except (urllib3.exceptions.HTTPError,
+ HTTPException, socket.error) as e:
+ if (params.get("wait") == "true" and
+ isinstance(e,
+ urllib3.exceptions.ReadTimeoutError)):
+ _log.debug("Watch timed out.")
+ raise etcd.EtcdWatchTimedOut(
+ "Watch timed out: %r" % e,
+ cause=e
+ )
+ _log.error("Request to server %s failed: %r",
+ self._base_uri, e)
+ if self._allow_reconnect:
+ _log.info("Reconnection allowed, looking for another "
+ "server.")
+ # _next_server() raises EtcdException if there are no
+ # machines left to try, breaking out of the loop.
+ self._base_uri = self._next_server(cause=e)
+ some_request_failed = True
+ else:
+ _log.debug("Reconnection disabled, giving up.")
+ raise etcd.EtcdConnectionFailed(
+ "Connection to etcd failed due to %r" % e,
+ cause=e
+ )
+ except:
+ _log.exception("Unexpected request failure, re-raising.")
+ raise
+
+ if some_request_failed:
+ if not self._use_proxies:
+ # The cluster may have changed since last invocation
+ self._machines_cache = self.machines
+ self._machines_cache.remove(self._base_uri)
+ return self._handle_server_response(response)
+ return wrapper
+
+ @_wrap_request
def api_execute(self, path, method, params=None, timeout=None):
""" Executes the query. """
-
- some_request_failed = False
- response = False
-
- if timeout is None:
- timeout = self.read_timeout
-
- if timeout == 0:
- timeout = None
-
- if not path.startswith('/'):
- raise ValueError('Path does not start with /')
-
- while not response:
- try:
- url = self._base_uri + path
-
- if (method == self._MGET) or (method == self._MDELETE):
- response = self.http.request(
- method,
- url,
- timeout=timeout,
- fields=params,
- redirect=self.allow_redirect,
- headers=self._get_headers(),
- preload_content=False)
-
- elif (method == self._MPUT) or (method == self._MPOST):
- response = self.http.request_encode_body(
- method,
- url,
- fields=params,
- timeout=timeout,
- encode_multipart=False,
- redirect=self.allow_redirect,
- headers=self._get_headers(),
- preload_content=False)
- else:
+ url = self._base_uri + path
+
+ if (method == self._MGET) or (method == self._MDELETE):
+ return self.http.request(
+ method,
+ url,
+ timeout=timeout,
+ fields=params,
+ redirect=self.allow_redirect,
+ headers=self._get_headers(),
+ preload_content=False)
+
+ elif (method == self._MPUT) or (method == self._MPOST):
+ return self.http.request_encode_body(
+ method,
+ url,
+ fields=params,
+ timeout=timeout,
+ encode_multipart=False,
+ redirect=self.allow_redirect,
+ headers=self._get_headers(),
+ preload_content=False)
+ else:
raise etcd.EtcdException(
'HTTP method {} not supported'.format(method))
- # Check the cluster ID hasn't changed under us. We use
- # preload_content=False above so we can read the headers
- # before we wait for the content of a watch.
- self._check_cluster_id(response)
- # Now force the data to be preloaded in order to trigger any
- # IO-related errors in this method rather than when we try to
- # access it later.
- _ = response.data
- # urllib3 doesn't wrap all httplib exceptions and earlier versions
- # don't wrap socket errors either.
- except (urllib3.exceptions.HTTPError,
- HTTPException,
- socket.error) as e:
- if (params.get("wait") == "true" and
- isinstance(e, urllib3.exceptions.ReadTimeoutError)):
- _log.debug("Watch timed out.")
- raise etcd.EtcdWatchTimedOut(
- "Watch timed out: %r" % e,
- cause=e
- )
-
- _log.error("Request to server %s failed: %r",
- self._base_uri, e)
- if self._allow_reconnect:
- _log.info("Reconnection allowed, looking for another "
- "server.")
- # _next_server() raises EtcdException if there are no
- # machines left to try, breaking out of the loop.
- self._base_uri = self._next_server(cause=e)
- some_request_failed = True
- else:
- _log.debug("Reconnection disabled, giving up.")
- raise etcd.EtcdConnectionFailed(
- "Connection to etcd failed due to %r" % e,
- cause=e
- )
- except:
- _log.exception("Unexpected request failure, re-raising.")
- raise
-
- if some_request_failed:
- if not self._use_proxies:
- # The cluster may have changed since last invocation
- self._machines_cache = self.machines
- self._machines_cache.remove(self._base_uri)
- return self._handle_server_response(response)
+ @_wrap_request
+ def api_execute_json(self, path, method, params=None, timeout=None):
+ url = self._base_uri + path
+ json_payload = json.dumps(params)
+ headers = self._get_headers()
+ headers['Content-Type'] = 'application/json'
+ return self.http.urlopen(method,
+ url,
+ body=json_payload,
+ timeout=timeout,
+ redirect=self.allow_redirect,
+ headers=headers,
+ preload_content=False)
def _check_cluster_id(self, response):
cluster_id = response.getheader("x-etcd-cluster-id")
@@ -902,4 +922,3 @@ class Client(object):
credentials = ':'.join((self.username, self.password))
return urllib3.make_headers(basic_auth=credentials)
return {}
-