summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--LICENSE.txt1
-rw-r--r--src/etcd/__init__.py9
-rw-r--r--src/etcd/client.py35
-rw-r--r--src/etcd/tests/unit/test_old_request.py10
-rw-r--r--src/etcd/tests/unit/test_request.py23
5 files changed, 64 insertions, 14 deletions
diff --git a/LICENSE.txt b/LICENSE.txt
index 09012da..1d1c746 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -1,6 +1,7 @@
The MIT License (MIT)
Copyright (c) 2013 Jose Plana Mario
+Modifications, Copyright (c) 2015 Metaswitch Networks Limited
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/src/etcd/__init__.py b/src/etcd/__init__.py
index 9367ce9..a3f6d6c 100644
--- a/src/etcd/__init__.py
+++ b/src/etcd/__init__.py
@@ -110,6 +110,15 @@ class EtcdException(Exception):
self.payload=payload
+class EtcdClusterIdChanged(EtcdException):
+ """
+ The etcd cluster ID changed. This may indicate the cluster was replaced
+ with a backup. Raised to prevent waiting on an etcd_index that was only
+ valid on the old cluster.
+ """
+ pass
+
+
class EtcdKeyError(EtcdException):
"""
Etcd Generic KeyError Exception
diff --git a/src/etcd/client.py b/src/etcd/client.py
index e5acafc..809b4a9 100644
--- a/src/etcd/client.py
+++ b/src/etcd/client.py
@@ -7,6 +7,7 @@
"""
import urllib3
+import urllib3.util
import json
import ssl
import etcd
@@ -41,6 +42,7 @@ class Client(object):
ca_cert=None,
allow_reconnect=False,
use_proxies=False,
+ expected_cluster_id=None,
):
"""
Initialize the client.
@@ -72,6 +74,13 @@ class Client(object):
use_proxies (bool): we are using a list of proxies to which we connect,
and don't want to connect to the original etcd cluster.
+
+ expected_cluster_id (str): If a string, recorded as the expected
+ UUID of the cluster (rather than
+ learning it from the first request),
+ reads will raise EtcdClusterIdChanged
+ if they receive a response with a
+ different cluster ID.
"""
self._protocol = protocol
@@ -87,6 +96,7 @@ class Client(object):
self._machines_cache = [uri(self._protocol, *conn) for conn in host]
self._base_uri = self._machines_cache.pop(0)
+ self.expected_cluster_id = expected_cluster_id
self.version_prefix = version_prefix
self._read_timeout = read_timeout
@@ -328,8 +338,6 @@ class Client(object):
return self.write(obj.key, obj.value, **kwdargs)
-
-
def read(self, key, **kwdargs):
"""
Returns the value of the key 'key'.
@@ -375,7 +383,8 @@ class Client(object):
timeout = kwdargs.get('timeout', None)
response = self.api_execute(
- self.key_endpoint + key, self._MGET, params=params, timeout=timeout)
+ self.key_endpoint + key, self._MGET, params=params,
+ timeout=timeout)
return self._result_from_response(response)
def delete(self, key, recursive=None, dir=None, **kwdargs):
@@ -594,7 +603,8 @@ class Client(object):
url,
timeout=timeout,
fields=params,
- redirect=self.allow_redirect)
+ redirect=self.allow_redirect,
+ preload_content=False)
elif (method == self._MPUT) or (method == self._MPOST):
response = self.http.request_encode_body(
@@ -603,7 +613,8 @@ class Client(object):
fields=params,
timeout=timeout,
encode_multipart=False,
- redirect=self.allow_redirect)
+ redirect=self.allow_redirect,
+ preload_content=False)
else:
raise etcd.EtcdException(
'HTTP method {} not supported'.format(method))
@@ -612,6 +623,20 @@ class Client(object):
self._base_uri = self._next_server()
some_request_failed = True
+ else:
+ # Check the cluster ID hasn't changed under us.
+ # We need preload_content == False above to ensure we can read
+ # the headers here before waiting for the content of a watch
+ # below.
+ cluster_id = response.getheader("x-etcd-cluster-id")
+ if self.expected_cluster_id:
+ if self.expected_cluster_id != cluster_id:
+ raise etcd.EtcdClusterIdChanged(
+ 'The UUID of the cluster changed from {} to '
+ '{}.'.format(self.expected_cluster_id, cluster_id))
+ else:
+ self.expected_cluster_id = cluster_id
+
if some_request_failed:
if not self._use_proxies:
# The cluster may have changed since last invokation
diff --git a/src/etcd/tests/unit/test_old_request.py b/src/etcd/tests/unit/test_old_request.py
index b11c711..bd70d82 100644
--- a/src/etcd/tests/unit/test_old_request.py
+++ b/src/etcd/tests/unit/test_old_request.py
@@ -10,12 +10,18 @@ from etcd import EtcdException
class FakeHTTPResponse(object):
- def __init__(self, status, data=''):
+ def __init__(self, status, data='', headers=None):
self.status = status
self.data = data.encode('utf-8')
+ self.headers = headers or {
+ "x-etcd-cluster-id": "abdef12345",
+ }
def getheaders(self):
- return {}
+ return self.headers
+
+ def getheader(self, header):
+ return self.headers[header]
class TestClientRequest(unittest.TestCase):
diff --git a/src/etcd/tests/unit/test_request.py b/src/etcd/tests/unit/test_request.py
index 1fe3d0e..23a5690 100644
--- a/src/etcd/tests/unit/test_request.py
+++ b/src/etcd/tests/unit/test_request.py
@@ -16,7 +16,7 @@ class TestClientApiBase(unittest.TestCase):
def setUp(self):
self.client = etcd.Client()
- def _prepare_response(self, s, d):
+ def _prepare_response(self, s, d, cluster_id=None):
if isinstance(d, dict):
data = json.dumps(d).encode('utf-8')
else:
@@ -25,10 +25,11 @@ class TestClientApiBase(unittest.TestCase):
r = mock.create_autospec(urllib3.response.HTTPResponse)()
r.status = s
r.data = data
+ r.getheader.return_value = cluster_id or "abcd1234"
return r
- def _mock_api(self, status, d):
- resp = self._prepare_response(status, d)
+ def _mock_api(self, status, d, cluster_id=None):
+ resp = self._prepare_response(status, d, cluster_id=cluster_id)
self.client.api_execute = mock.create_autospec(
self.client.api_execute, return_value=resp)
@@ -98,7 +99,6 @@ class TestClientApiInternals(TestClientApiBase):
(('/v2/keys/newdir', 'PUT'), dict(params={'dir': 'true'})))
-
class TestClientApiInterface(TestClientApiBase):
"""
All tests defined in this class are executed also in TestClientRequest.
@@ -463,10 +463,11 @@ class EtcdLockTestCase(TestClientApiBase):
class TestClientRequest(TestClientApiInterface):
def setUp(self):
- self.client = etcd.Client()
+ self.client = etcd.Client(expected_cluster_id="abcdef1234")
- def _mock_api(self, status, d):
+ def _mock_api(self, status, d, cluster_id=None):
resp = self._prepare_response(status, d)
+ resp.getheader.return_value = cluster_id or "abcdef1234"
self.client.http.request_encode_body = mock.create_autospec(
self.client.http.request_encode_body, return_value=resp
)
@@ -474,11 +475,13 @@ class TestClientRequest(TestClientApiInterface):
self.client.http.request, return_value=resp
)
- def _mock_error(self, error_code, msg, cause, method='PUT', fields=None):
+ def _mock_error(self, error_code, msg, cause, method='PUT', fields=None,
+ cluster_id=None):
resp = self._prepare_response(
500,
{'errorCode': error_code, 'message': msg, 'cause': cause}
)
+ resp.getheader.return_value = cluster_id or "abcdef1234"
self.client.http.request_encode_body = mock.create_autospec(
self.client.http.request_encode_body, return_value=resp
)
@@ -508,6 +511,12 @@ class TestClientRequest(TestClientApiInterface):
self.assertRaises(etcd.EtcdException,
self.client.api_execute, '/testpath/bar', 'TRACE')
+ def test_read_cluster_id_changed(self):
+ """ Read timeout set to the default """
+ self._mock_api(200, {}, cluster_id="notabcd1234")
+ self.assertRaises(etcd.EtcdClusterIdChanged,
+ self.client.read, '/testkey')
+
def test_not_in(self):
pass