diff options
Diffstat (limited to 'src/s3ql/backends/swift.py')
-rw-r--r-- | src/s3ql/backends/swift.py | 198 |
1 files changed, 105 insertions, 93 deletions
diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py index c065734..ae4b9ab 100644 --- a/src/s3ql/backends/swift.py +++ b/src/s3ql/backends/swift.py @@ -8,7 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import from ..common import QuietError, BUFSIZE -from .common import AbstractBucket, NoSuchObject, retry, AuthorizationError, http_connection +from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError, http_connection, + DanglingStorageURL) from .s3c import HTTPError, BadDigest from urlparse import urlsplit import json @@ -20,14 +21,13 @@ import re import tempfile import time import urllib -from s3ql.backends.common import NoSuchBucket log = logging.getLogger("backend.swift") -class Bucket(AbstractBucket): - """A bucket stored in OpenStack Swift +class Backend(AbstractBackend): + """A backend to store data in OpenStack Swift - The bucket guarantees get after create consistency, i.e. a newly created + The backend guarantees get after create consistency, i.e. a newly created object will be immediately retrievable. """ @@ -35,30 +35,31 @@ class Bucket(AbstractBucket): # Unused argument #pylint: disable=W0613 - super(Bucket, self).__init__() + super(Backend, self).__init__() - (host, port, bucket_name, prefix) = self._parse_storage_url(storage_url) + (host, port, container_name, prefix) = self._parse_storage_url(storage_url) self.hostname = host self.port = port - self.bucket_name = bucket_name + self.container_name = container_name self.prefix = prefix self.password = password self.login = login self.auth_token = None self.auth_prefix = None - self.conn = self._get_conn() + self.conn = None - self._bucket_exists() + self._container_exists() - def _bucket_exists(self): - '''Make sure that the bucket exists''' + @retry + def _container_exists(self): + '''Make sure that the container exists''' try: resp = self._do_request('GET', '/', query_string={'limit': 1 }) except HTTPError as exc: if exc.status == 404: - raise NoSuchBucket(self.bucket_name) + raise DanglingStorageURL(self.container_name) raise resp.read() @@ -66,7 +67,7 @@ class Bucket(AbstractBucket): def _parse_storage_url(storage_url): '''Extract information from storage URL - Return a tuple *(host, port, bucket_name, prefix)* . + Return a tuple *(host, port, container_name, prefix)* . ''' hit = re.match(r'^[a-zA-Z0-9]+://' # Backend @@ -80,23 +81,21 @@ class Bucket(AbstractBucket): hostname = hit.group(1) port = int(hit.group(2) or '443') - bucketname = hit.group(3) + containername = hit.group(3) prefix = hit.group(4) or '' - return (hostname, port, bucketname, prefix) + return (hostname, port, containername, prefix) def is_temp_failure(self, exc): #IGNORE:W0613 '''Return true if exc indicates a temporary error - Return true if the given exception is used by this bucket's backend - to indicate a temporary problem. Most instance methods automatically - retry the request in this case, so the caller does not need to - worry about temporary failures. + Return true if the given exception indicates a temporary problem. Most instance methods + automatically retry the request in this case, so the caller does not need to worry about + temporary failures. - However, in same cases (e.g. when reading or writing an object), the - request cannot automatically be retried. In these case this method can - be used to check for temporary problems and so that the request can - be manually restarted if applicable. + However, in same cases (e.g. when reading or writing an object), the request cannot + automatically be retried. In these case this method can be used to check for temporary + problems and so that the request can be manually restarted if applicable. ''' if isinstance(exc, (httplib.IncompleteRead,)): @@ -111,10 +110,15 @@ class Bucket(AbstractBucket): exc.errno in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT, errno.EINTR)): return True + + elif isinstance(exc, HTTPError) and exc.status >= 500 and exc.status <= 599: + return True + elif isinstance(exc, AuthenticationExpired): + return True + return False - @retry def _get_conn(self): '''Obtain connection to server and authentication token''' @@ -134,14 +138,11 @@ class Bucket(AbstractBucket): resp.read() continue - if resp.status == 401: + elif resp.status == 401: raise AuthorizationError(resp.read()) elif resp.status > 299 or resp.status < 200: - log.error('_refresh_auth(): unexpected response: %d %s\n%s', - resp.status, resp.msg, resp.read()) - raise RuntimeError('Unexpected response: %d %s' % (resp.status, - resp.msg)) + raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read()) # Pylint can't infer SplitResult Types #pylint: disable=E1103 @@ -167,12 +168,15 @@ class Bucket(AbstractBucket): if headers is None: headers = dict() - if not body: headers['content-length'] = '0' - + + if self.conn is None: + log.info('_do_request(): no active connection, calling _get_conn()') + self.conn = self._get_conn() + # Construct full path - path = urllib.quote('%s/%s%s' % (self.auth_prefix, self.bucket_name, path)) + path = urllib.quote('%s/%s%s' % (self.auth_prefix, self.container_name, path)) if query_string: s = urllib.urlencode(query_string, doseq=True) if subres: @@ -183,43 +187,40 @@ class Bucket(AbstractBucket): path += '?%s' % subres headers['connection'] = 'keep-alive' - - while True: - headers['X-Auth-Token'] = self.auth_token + headers['X-Auth-Token'] = self.auth_token - try: - log.debug('_do_request(): %s %s', method, path) - self.conn.request(method, path, body, headers) + try: + log.debug('_do_request(): %s %s', method, path) + self.conn.request(method, path, body, headers) + + log.debug('_do_request(): Reading response..') + resp = self.conn.getresponse() + except: + # We probably can't use the connection anymore + self.conn.close() + raise - log.debug('_do_request(): Reading response..') - resp = self.conn.getresponse() - except: - # We probably can't use the connection anymore - self.conn.close() - raise + # We need to call read() at least once for httplib to consider this + # request finished, even if there is no response body. + if resp.length == 0: + resp.read() + + # Success + if resp.status >= 200 and resp.status <= 299: + return resp + + # Expired auth token + if resp.status == 401: + log.info('OpenStack auth token seems to have expired, requesting new one.') + self.conn = None + raise AuthenticationExpired(resp.reason) - # We need to call read() at least once for httplib to consider this - # request finished, even if there is no response body. - if resp.length == 0: - resp.read() - - # Success - if resp.status >= 200 and resp.status <= 299: - return resp - - # Expired auth token - if resp.status == 401: - log.info('OpenStack auth token seems to have expired, requesting new one.') - resp.read() - self.conn = self._get_conn() - continue - - # If method == HEAD, server must not return response body - # even in case of errors - if method.upper() == 'HEAD': - raise HTTPError(resp.status, resp.reason) - else: - raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read()) + # If method == HEAD, server must not return response body + # even in case of errors + if method.upper() == 'HEAD': + raise HTTPError(resp.status, resp.reason) + else: + raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read()) @retry def lookup(self, key): @@ -264,7 +265,7 @@ class Bucket(AbstractBucket): def open_read(self, key): ''''Open object for reading - Return a tuple of a file-like object. Bucket contents can be read from + Return a tuple of a file-like object. Backend contents can be read from the file-like object, metadata is stored in its *metadata* attribute and can be modified by the caller at will. The object must be closed explicitly. ''' @@ -281,15 +282,13 @@ class Bucket(AbstractBucket): def open_write(self, key, metadata=None, is_compressed=False): """Open object for writing - `metadata` can be a dict of additional attributes to store with the - object. Returns a file-like object. The object must be closed - explicitly. After closing, the *get_obj_size* may be used to retrieve - the size of the stored object (which may differ from the size of the + `metadata` can be a dict of additional attributes to store with the object. Returns a file- + like object. The object must be closed explicitly. After closing, the *get_obj_size* may be + used to retrieve the size of the stored object (which may differ from the size of the written data). - The *is_compressed* parameter indicates that the caller is going - to write compressed data, and may be used to avoid recompression - by the bucket. + The *is_compressed* parameter indicates that the caller is going to write compressed data, + and may be used to avoid recompression by the backend. """ log.debug('open_write(%s): start', key) @@ -302,7 +301,7 @@ class Bucket(AbstractBucket): return ObjectW(key, self, headers) def clear(self): - """Delete all objects in bucket""" + """Delete all objects in backend""" # We have to cache keys, because otherwise we can't use the # http connection to delete keys. @@ -312,14 +311,14 @@ class Bucket(AbstractBucket): log.debug('clear(): deleting key %s', s3key) - # Ignore missing objects when clearing bucket + # Ignore missing objects when clearing backend self.delete(s3key, True) @retry def delete(self, key, force=False): """Delete object stored under `key` - ``bucket.delete(key)`` can also be written as ``del bucket[key]``. + ``backend.delete(key)`` can also be written as ``del backend[key]``. If `force` is true, do not return an error if the key does not exist. """ @@ -345,7 +344,7 @@ class Bucket(AbstractBucket): try: resp = self._do_request('PUT', '/%s%s' % (self.prefix, dest), - headers={ 'X-Copy-From': '/%s/%s%s' % (self.bucket_name, + headers={ 'X-Copy-From': '/%s/%s%s' % (self.container_name, self.prefix, src)}) # Discard response body resp.read() @@ -355,9 +354,9 @@ class Bucket(AbstractBucket): raise def list(self, prefix=''): - '''List keys in bucket + '''List keys in backend - Returns an iterator over all keys in the bucket. This method + Returns an iterator over all keys in the backend. This method handles temporary errors. ''' @@ -381,7 +380,7 @@ class Bucket(AbstractBucket): type(exc).__name__) raise - log.info('Encountered %s exception (%s), retrying call to swift.Bucket.list()', + log.info('Encountered %s exception (%s), retrying call to swift.Backend.list()', type(exc).__name__, exc) if hasattr(exc, 'retry_after') and exc.retry_after: @@ -396,9 +395,9 @@ class Bucket(AbstractBucket): yield marker def _list(self, prefix='', start='', batch_size=5000): - '''List keys in bucket, starting with *start* + '''List keys in backend, starting with *start* - Returns an iterator over all keys in the bucket. This method + Returns an iterator over all keys in the backend. This method does not retry on errors. ''' @@ -416,7 +415,7 @@ class Bucket(AbstractBucket): 'limit': batch_size }) except HTTPError as exc: if exc.status == 404: - raise NoSuchBucket(self.bucket_name) + raise DanglingStorageURL(self.container_name) raise if resp.status == 204: @@ -450,9 +449,9 @@ class ObjectW(object): the close() method is called. ''' - def __init__(self, key, bucket, headers): + def __init__(self, key, backend, headers): self.key = key - self.bucket = bucket + self.backend = backend self.headers = headers self.closed = False self.obj_size = 0 @@ -470,7 +469,7 @@ class ObjectW(object): self.obj_size += len(buf) def is_temp_failure(self, exc): - return self.bucket.is_temp_failure(exc) + return self.backend.is_temp_failure(exc) @retry def close(self): @@ -486,7 +485,7 @@ class ObjectW(object): self.headers['Content-Type'] = 'application/octet-stream' self.fh.seek(0) - resp = self.bucket._do_request('PUT', '/%s%s' % (self.bucket.prefix, self.key), + resp = self.backend._do_request('PUT', '/%s%s' % (self.backend.prefix, self.key), headers=self.headers, body=self.fh) etag = resp.getheader('ETag').strip('"') resp.read() @@ -495,7 +494,7 @@ class ObjectW(object): log.warn('ObjectW(%s).close(): MD5 mismatch (%s vs %s)', self.key, etag, self.md5.hexdigest) try: - self.bucket.delete(self.key) + self.backend.delete(self.key) except: log.exception('Objectw(%s).close(): unable to delete corrupted object!', self.key) @@ -516,11 +515,11 @@ class ObjectW(object): class ObjectR(object): '''A SWIFT object opened for reading''' - def __init__(self, key, resp, bucket, metadata=None): + def __init__(self, key, resp, backend, metadata=None): self.key = key self.resp = resp self.md5_checked = False - self.bucket = bucket + self.backend = backend self.metadata = metadata # False positive, hashlib *does* have md5 member @@ -573,4 +572,17 @@ def extractmeta(resp): continue meta[hit.group(1)] = val - return meta
\ No newline at end of file + return meta + + +class AuthenticationExpired(Exception): + '''Raised if the provided Authentication Token has expired''' + + def __init__(self, msg): + super(AuthenticationExpired, self).__init__() + self.msg = msg + + def __str__(self): + return 'Auth token expired. Server said: %s' % self.msg + +
\ No newline at end of file |