diff options
author | Shaun Crampton <shaun@cantab.net> | 2015-05-17 18:56:03 -0700 |
---|---|---|
committer | Shaun Crampton <shaun@cantab.net> | 2015-05-17 19:11:41 -0700 |
commit | fa361efa492afab0f023084ddcfca38500fda8df (patch) | |
tree | 590fdf02022bf833a5169a367106e0bbcd0b7cd9 /src/etcd/client.py | |
parent | 72c8f0e8e3e9d339c740691b6786858ef21d155b (diff) |
Check cluster ID on each request.
* Load cluster ID on first request.
* If cluster ID changes, raise new EtcdClusterIdChanged exception.
Diffstat (limited to 'src/etcd/client.py')
-rw-r--r-- | src/etcd/client.py | 35 |
1 files changed, 30 insertions, 5 deletions
diff --git a/src/etcd/client.py b/src/etcd/client.py index e5acafc..809b4a9 100644 --- a/src/etcd/client.py +++ b/src/etcd/client.py @@ -7,6 +7,7 @@ """ import urllib3 +import urllib3.util import json import ssl import etcd @@ -41,6 +42,7 @@ class Client(object): ca_cert=None, allow_reconnect=False, use_proxies=False, + expected_cluster_id=None, ): """ Initialize the client. @@ -72,6 +74,13 @@ class Client(object): use_proxies (bool): we are using a list of proxies to which we connect, and don't want to connect to the original etcd cluster. + + expected_cluster_id (str): If a string, recorded as the expected + UUID of the cluster (rather than + learning it from the first request), + reads will raise EtcdClusterIdChanged + if they receive a response with a + different cluster ID. """ self._protocol = protocol @@ -87,6 +96,7 @@ class Client(object): self._machines_cache = [uri(self._protocol, *conn) for conn in host] self._base_uri = self._machines_cache.pop(0) + self.expected_cluster_id = expected_cluster_id self.version_prefix = version_prefix self._read_timeout = read_timeout @@ -328,8 +338,6 @@ class Client(object): return self.write(obj.key, obj.value, **kwdargs) - - def read(self, key, **kwdargs): """ Returns the value of the key 'key'. @@ -375,7 +383,8 @@ class Client(object): timeout = kwdargs.get('timeout', None) response = self.api_execute( - self.key_endpoint + key, self._MGET, params=params, timeout=timeout) + self.key_endpoint + key, self._MGET, params=params, + timeout=timeout) return self._result_from_response(response) def delete(self, key, recursive=None, dir=None, **kwdargs): @@ -594,7 +603,8 @@ class Client(object): url, timeout=timeout, fields=params, - redirect=self.allow_redirect) + redirect=self.allow_redirect, + preload_content=False) elif (method == self._MPUT) or (method == self._MPOST): response = self.http.request_encode_body( @@ -603,7 +613,8 @@ class Client(object): fields=params, timeout=timeout, encode_multipart=False, - redirect=self.allow_redirect) + redirect=self.allow_redirect, + preload_content=False) else: raise etcd.EtcdException( 'HTTP method {} not supported'.format(method)) @@ -612,6 +623,20 @@ class Client(object): self._base_uri = self._next_server() some_request_failed = True + else: + # Check the cluster ID hasn't changed under us. + # We need preload_content == False above to ensure we can read + # the headers here before waiting for the content of a watch + # below. + cluster_id = response.getheader("x-etcd-cluster-id") + if self.expected_cluster_id: + if self.expected_cluster_id != cluster_id: + raise etcd.EtcdClusterIdChanged( + 'The UUID of the cluster changed from {} to ' + '{}.'.format(self.expected_cluster_id, cluster_id)) + else: + self.expected_cluster_id = cluster_id + if some_request_failed: if not self._use_proxies: # The cluster may have changed since last invokation |