diff options
-rw-r--r-- | LICENSE.txt | 1 | ||||
-rw-r--r-- | src/etcd/__init__.py | 9 | ||||
-rw-r--r-- | src/etcd/client.py | 35 | ||||
-rw-r--r-- | src/etcd/tests/unit/test_old_request.py | 10 | ||||
-rw-r--r-- | src/etcd/tests/unit/test_request.py | 23 |
5 files changed, 64 insertions, 14 deletions
diff --git a/LICENSE.txt b/LICENSE.txt index 09012da..1d1c746 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,6 +1,7 @@ The MIT License (MIT) Copyright (c) 2013 Jose Plana Mario +Modifications, Copyright (c) 2015 Metaswitch Networks Limited Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/src/etcd/__init__.py b/src/etcd/__init__.py index 9367ce9..a3f6d6c 100644 --- a/src/etcd/__init__.py +++ b/src/etcd/__init__.py @@ -110,6 +110,15 @@ class EtcdException(Exception): self.payload=payload +class EtcdClusterIdChanged(EtcdException): + """ + The etcd cluster ID changed. This may indicate the cluster was replaced + with a backup. Raised to prevent waiting on an etcd_index that was only + valid on the old cluster. + """ + pass + + class EtcdKeyError(EtcdException): """ Etcd Generic KeyError Exception diff --git a/src/etcd/client.py b/src/etcd/client.py index e5acafc..809b4a9 100644 --- a/src/etcd/client.py +++ b/src/etcd/client.py @@ -7,6 +7,7 @@ """ import urllib3 +import urllib3.util import json import ssl import etcd @@ -41,6 +42,7 @@ class Client(object): ca_cert=None, allow_reconnect=False, use_proxies=False, + expected_cluster_id=None, ): """ Initialize the client. @@ -72,6 +74,13 @@ class Client(object): use_proxies (bool): we are using a list of proxies to which we connect, and don't want to connect to the original etcd cluster. + + expected_cluster_id (str): If a string, recorded as the expected + UUID of the cluster (rather than + learning it from the first request), + reads will raise EtcdClusterIdChanged + if they receive a response with a + different cluster ID. """ self._protocol = protocol @@ -87,6 +96,7 @@ class Client(object): self._machines_cache = [uri(self._protocol, *conn) for conn in host] self._base_uri = self._machines_cache.pop(0) + self.expected_cluster_id = expected_cluster_id self.version_prefix = version_prefix self._read_timeout = read_timeout @@ -328,8 +338,6 @@ class Client(object): return self.write(obj.key, obj.value, **kwdargs) - - def read(self, key, **kwdargs): """ Returns the value of the key 'key'. @@ -375,7 +383,8 @@ class Client(object): timeout = kwdargs.get('timeout', None) response = self.api_execute( - self.key_endpoint + key, self._MGET, params=params, timeout=timeout) + self.key_endpoint + key, self._MGET, params=params, + timeout=timeout) return self._result_from_response(response) def delete(self, key, recursive=None, dir=None, **kwdargs): @@ -594,7 +603,8 @@ class Client(object): url, timeout=timeout, fields=params, - redirect=self.allow_redirect) + redirect=self.allow_redirect, + preload_content=False) elif (method == self._MPUT) or (method == self._MPOST): response = self.http.request_encode_body( @@ -603,7 +613,8 @@ class Client(object): fields=params, timeout=timeout, encode_multipart=False, - redirect=self.allow_redirect) + redirect=self.allow_redirect, + preload_content=False) else: raise etcd.EtcdException( 'HTTP method {} not supported'.format(method)) @@ -612,6 +623,20 @@ class Client(object): self._base_uri = self._next_server() some_request_failed = True + else: + # Check the cluster ID hasn't changed under us. + # We need preload_content == False above to ensure we can read + # the headers here before waiting for the content of a watch + # below. + cluster_id = response.getheader("x-etcd-cluster-id") + if self.expected_cluster_id: + if self.expected_cluster_id != cluster_id: + raise etcd.EtcdClusterIdChanged( + 'The UUID of the cluster changed from {} to ' + '{}.'.format(self.expected_cluster_id, cluster_id)) + else: + self.expected_cluster_id = cluster_id + if some_request_failed: if not self._use_proxies: # The cluster may have changed since last invokation diff --git a/src/etcd/tests/unit/test_old_request.py b/src/etcd/tests/unit/test_old_request.py index b11c711..bd70d82 100644 --- a/src/etcd/tests/unit/test_old_request.py +++ b/src/etcd/tests/unit/test_old_request.py @@ -10,12 +10,18 @@ from etcd import EtcdException class FakeHTTPResponse(object): - def __init__(self, status, data=''): + def __init__(self, status, data='', headers=None): self.status = status self.data = data.encode('utf-8') + self.headers = headers or { + "x-etcd-cluster-id": "abdef12345", + } def getheaders(self): - return {} + return self.headers + + def getheader(self, header): + return self.headers[header] class TestClientRequest(unittest.TestCase): diff --git a/src/etcd/tests/unit/test_request.py b/src/etcd/tests/unit/test_request.py index 1fe3d0e..23a5690 100644 --- a/src/etcd/tests/unit/test_request.py +++ b/src/etcd/tests/unit/test_request.py @@ -16,7 +16,7 @@ class TestClientApiBase(unittest.TestCase): def setUp(self): self.client = etcd.Client() - def _prepare_response(self, s, d): + def _prepare_response(self, s, d, cluster_id=None): if isinstance(d, dict): data = json.dumps(d).encode('utf-8') else: @@ -25,10 +25,11 @@ class TestClientApiBase(unittest.TestCase): r = mock.create_autospec(urllib3.response.HTTPResponse)() r.status = s r.data = data + r.getheader.return_value = cluster_id or "abcd1234" return r - def _mock_api(self, status, d): - resp = self._prepare_response(status, d) + def _mock_api(self, status, d, cluster_id=None): + resp = self._prepare_response(status, d, cluster_id=cluster_id) self.client.api_execute = mock.create_autospec( self.client.api_execute, return_value=resp) @@ -98,7 +99,6 @@ class TestClientApiInternals(TestClientApiBase): (('/v2/keys/newdir', 'PUT'), dict(params={'dir': 'true'}))) - class TestClientApiInterface(TestClientApiBase): """ All tests defined in this class are executed also in TestClientRequest. @@ -463,10 +463,11 @@ class EtcdLockTestCase(TestClientApiBase): class TestClientRequest(TestClientApiInterface): def setUp(self): - self.client = etcd.Client() + self.client = etcd.Client(expected_cluster_id="abcdef1234") - def _mock_api(self, status, d): + def _mock_api(self, status, d, cluster_id=None): resp = self._prepare_response(status, d) + resp.getheader.return_value = cluster_id or "abcdef1234" self.client.http.request_encode_body = mock.create_autospec( self.client.http.request_encode_body, return_value=resp ) @@ -474,11 +475,13 @@ class TestClientRequest(TestClientApiInterface): self.client.http.request, return_value=resp ) - def _mock_error(self, error_code, msg, cause, method='PUT', fields=None): + def _mock_error(self, error_code, msg, cause, method='PUT', fields=None, + cluster_id=None): resp = self._prepare_response( 500, {'errorCode': error_code, 'message': msg, 'cause': cause} ) + resp.getheader.return_value = cluster_id or "abcdef1234" self.client.http.request_encode_body = mock.create_autospec( self.client.http.request_encode_body, return_value=resp ) @@ -508,6 +511,12 @@ class TestClientRequest(TestClientApiInterface): self.assertRaises(etcd.EtcdException, self.client.api_execute, '/testpath/bar', 'TRACE') + def test_read_cluster_id_changed(self): + """ Read timeout set to the default """ + self._mock_api(200, {}, cluster_id="notabcd1234") + self.assertRaises(etcd.EtcdClusterIdChanged, + self.client.read, '/testkey') + def test_not_in(self): pass |