summaryrefslogtreecommitdiff
path: root/src/s3ql/backends/swift.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/s3ql/backends/swift.py')
-rw-r--r--src/s3ql/backends/swift.py198
1 files changed, 105 insertions, 93 deletions
diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py
index c065734..ae4b9ab 100644
--- a/src/s3ql/backends/swift.py
+++ b/src/s3ql/backends/swift.py
@@ -8,7 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
from ..common import QuietError, BUFSIZE
-from .common import AbstractBucket, NoSuchObject, retry, AuthorizationError, http_connection
+from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError, http_connection,
+ DanglingStorageURL)
from .s3c import HTTPError, BadDigest
from urlparse import urlsplit
import json
@@ -20,14 +21,13 @@ import re
import tempfile
import time
import urllib
-from s3ql.backends.common import NoSuchBucket
log = logging.getLogger("backend.swift")
-class Bucket(AbstractBucket):
- """A bucket stored in OpenStack Swift
+class Backend(AbstractBackend):
+ """A backend to store data in OpenStack Swift
- The bucket guarantees get after create consistency, i.e. a newly created
+ The backend guarantees get after create consistency, i.e. a newly created
object will be immediately retrievable.
"""
@@ -35,30 +35,31 @@ class Bucket(AbstractBucket):
# Unused argument
#pylint: disable=W0613
- super(Bucket, self).__init__()
+ super(Backend, self).__init__()
- (host, port, bucket_name, prefix) = self._parse_storage_url(storage_url)
+ (host, port, container_name, prefix) = self._parse_storage_url(storage_url)
self.hostname = host
self.port = port
- self.bucket_name = bucket_name
+ self.container_name = container_name
self.prefix = prefix
self.password = password
self.login = login
self.auth_token = None
self.auth_prefix = None
- self.conn = self._get_conn()
+ self.conn = None
- self._bucket_exists()
+ self._container_exists()
- def _bucket_exists(self):
- '''Make sure that the bucket exists'''
+ @retry
+ def _container_exists(self):
+ '''Make sure that the container exists'''
try:
resp = self._do_request('GET', '/', query_string={'limit': 1 })
except HTTPError as exc:
if exc.status == 404:
- raise NoSuchBucket(self.bucket_name)
+ raise DanglingStorageURL(self.container_name)
raise
resp.read()
@@ -66,7 +67,7 @@ class Bucket(AbstractBucket):
def _parse_storage_url(storage_url):
'''Extract information from storage URL
- Return a tuple *(host, port, bucket_name, prefix)* .
+ Return a tuple *(host, port, container_name, prefix)* .
'''
hit = re.match(r'^[a-zA-Z0-9]+://' # Backend
@@ -80,23 +81,21 @@ class Bucket(AbstractBucket):
hostname = hit.group(1)
port = int(hit.group(2) or '443')
- bucketname = hit.group(3)
+ containername = hit.group(3)
prefix = hit.group(4) or ''
- return (hostname, port, bucketname, prefix)
+ return (hostname, port, containername, prefix)
def is_temp_failure(self, exc): #IGNORE:W0613
'''Return true if exc indicates a temporary error
- Return true if the given exception is used by this bucket's backend
- to indicate a temporary problem. Most instance methods automatically
- retry the request in this case, so the caller does not need to
- worry about temporary failures.
+ Return true if the given exception indicates a temporary problem. Most instance methods
+ automatically retry the request in this case, so the caller does not need to worry about
+ temporary failures.
- However, in same cases (e.g. when reading or writing an object), the
- request cannot automatically be retried. In these case this method can
- be used to check for temporary problems and so that the request can
- be manually restarted if applicable.
+ However, in same cases (e.g. when reading or writing an object), the request cannot
+ automatically be retried. In these case this method can be used to check for temporary
+ problems and so that the request can be manually restarted if applicable.
'''
if isinstance(exc, (httplib.IncompleteRead,)):
@@ -111,10 +110,15 @@ class Bucket(AbstractBucket):
exc.errno in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT,
errno.EINTR)):
return True
+
+ elif isinstance(exc, HTTPError) and exc.status >= 500 and exc.status <= 599:
+ return True
+ elif isinstance(exc, AuthenticationExpired):
+ return True
+
return False
- @retry
def _get_conn(self):
'''Obtain connection to server and authentication token'''
@@ -134,14 +138,11 @@ class Bucket(AbstractBucket):
resp.read()
continue
- if resp.status == 401:
+ elif resp.status == 401:
raise AuthorizationError(resp.read())
elif resp.status > 299 or resp.status < 200:
- log.error('_refresh_auth(): unexpected response: %d %s\n%s',
- resp.status, resp.msg, resp.read())
- raise RuntimeError('Unexpected response: %d %s' % (resp.status,
- resp.msg))
+ raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read())
# Pylint can't infer SplitResult Types
#pylint: disable=E1103
@@ -167,12 +168,15 @@ class Bucket(AbstractBucket):
if headers is None:
headers = dict()
-
if not body:
headers['content-length'] = '0'
-
+
+ if self.conn is None:
+ log.info('_do_request(): no active connection, calling _get_conn()')
+ self.conn = self._get_conn()
+
# Construct full path
- path = urllib.quote('%s/%s%s' % (self.auth_prefix, self.bucket_name, path))
+ path = urllib.quote('%s/%s%s' % (self.auth_prefix, self.container_name, path))
if query_string:
s = urllib.urlencode(query_string, doseq=True)
if subres:
@@ -183,43 +187,40 @@ class Bucket(AbstractBucket):
path += '?%s' % subres
headers['connection'] = 'keep-alive'
-
- while True:
- headers['X-Auth-Token'] = self.auth_token
+ headers['X-Auth-Token'] = self.auth_token
- try:
- log.debug('_do_request(): %s %s', method, path)
- self.conn.request(method, path, body, headers)
+ try:
+ log.debug('_do_request(): %s %s', method, path)
+ self.conn.request(method, path, body, headers)
+
+ log.debug('_do_request(): Reading response..')
+ resp = self.conn.getresponse()
+ except:
+ # We probably can't use the connection anymore
+ self.conn.close()
+ raise
- log.debug('_do_request(): Reading response..')
- resp = self.conn.getresponse()
- except:
- # We probably can't use the connection anymore
- self.conn.close()
- raise
+ # We need to call read() at least once for httplib to consider this
+ # request finished, even if there is no response body.
+ if resp.length == 0:
+ resp.read()
+
+ # Success
+ if resp.status >= 200 and resp.status <= 299:
+ return resp
+
+ # Expired auth token
+ if resp.status == 401:
+ log.info('OpenStack auth token seems to have expired, requesting new one.')
+ self.conn = None
+ raise AuthenticationExpired(resp.reason)
- # We need to call read() at least once for httplib to consider this
- # request finished, even if there is no response body.
- if resp.length == 0:
- resp.read()
-
- # Success
- if resp.status >= 200 and resp.status <= 299:
- return resp
-
- # Expired auth token
- if resp.status == 401:
- log.info('OpenStack auth token seems to have expired, requesting new one.')
- resp.read()
- self.conn = self._get_conn()
- continue
-
- # If method == HEAD, server must not return response body
- # even in case of errors
- if method.upper() == 'HEAD':
- raise HTTPError(resp.status, resp.reason)
- else:
- raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read())
+ # If method == HEAD, server must not return response body
+ # even in case of errors
+ if method.upper() == 'HEAD':
+ raise HTTPError(resp.status, resp.reason)
+ else:
+ raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read())
@retry
def lookup(self, key):
@@ -264,7 +265,7 @@ class Bucket(AbstractBucket):
def open_read(self, key):
''''Open object for reading
- Return a tuple of a file-like object. Bucket contents can be read from
+ Return a tuple of a file-like object. Backend contents can be read from
the file-like object, metadata is stored in its *metadata* attribute and
can be modified by the caller at will. The object must be closed explicitly.
'''
@@ -281,15 +282,13 @@ class Bucket(AbstractBucket):
def open_write(self, key, metadata=None, is_compressed=False):
"""Open object for writing
- `metadata` can be a dict of additional attributes to store with the
- object. Returns a file-like object. The object must be closed
- explicitly. After closing, the *get_obj_size* may be used to retrieve
- the size of the stored object (which may differ from the size of the
+ `metadata` can be a dict of additional attributes to store with the object. Returns a file-
+ like object. The object must be closed explicitly. After closing, the *get_obj_size* may be
+ used to retrieve the size of the stored object (which may differ from the size of the
written data).
- The *is_compressed* parameter indicates that the caller is going
- to write compressed data, and may be used to avoid recompression
- by the bucket.
+ The *is_compressed* parameter indicates that the caller is going to write compressed data,
+ and may be used to avoid recompression by the backend.
"""
log.debug('open_write(%s): start', key)
@@ -302,7 +301,7 @@ class Bucket(AbstractBucket):
return ObjectW(key, self, headers)
def clear(self):
- """Delete all objects in bucket"""
+ """Delete all objects in backend"""
# We have to cache keys, because otherwise we can't use the
# http connection to delete keys.
@@ -312,14 +311,14 @@ class Bucket(AbstractBucket):
log.debug('clear(): deleting key %s', s3key)
- # Ignore missing objects when clearing bucket
+ # Ignore missing objects when clearing backend
self.delete(s3key, True)
@retry
def delete(self, key, force=False):
"""Delete object stored under `key`
- ``bucket.delete(key)`` can also be written as ``del bucket[key]``.
+ ``backend.delete(key)`` can also be written as ``del backend[key]``.
If `force` is true, do not return an error if the key does not exist.
"""
@@ -345,7 +344,7 @@ class Bucket(AbstractBucket):
try:
resp = self._do_request('PUT', '/%s%s' % (self.prefix, dest),
- headers={ 'X-Copy-From': '/%s/%s%s' % (self.bucket_name,
+ headers={ 'X-Copy-From': '/%s/%s%s' % (self.container_name,
self.prefix, src)})
# Discard response body
resp.read()
@@ -355,9 +354,9 @@ class Bucket(AbstractBucket):
raise
def list(self, prefix=''):
- '''List keys in bucket
+ '''List keys in backend
- Returns an iterator over all keys in the bucket. This method
+ Returns an iterator over all keys in the backend. This method
handles temporary errors.
'''
@@ -381,7 +380,7 @@ class Bucket(AbstractBucket):
type(exc).__name__)
raise
- log.info('Encountered %s exception (%s), retrying call to swift.Bucket.list()',
+ log.info('Encountered %s exception (%s), retrying call to swift.Backend.list()',
type(exc).__name__, exc)
if hasattr(exc, 'retry_after') and exc.retry_after:
@@ -396,9 +395,9 @@ class Bucket(AbstractBucket):
yield marker
def _list(self, prefix='', start='', batch_size=5000):
- '''List keys in bucket, starting with *start*
+ '''List keys in backend, starting with *start*
- Returns an iterator over all keys in the bucket. This method
+ Returns an iterator over all keys in the backend. This method
does not retry on errors.
'''
@@ -416,7 +415,7 @@ class Bucket(AbstractBucket):
'limit': batch_size })
except HTTPError as exc:
if exc.status == 404:
- raise NoSuchBucket(self.bucket_name)
+ raise DanglingStorageURL(self.container_name)
raise
if resp.status == 204:
@@ -450,9 +449,9 @@ class ObjectW(object):
the close() method is called.
'''
- def __init__(self, key, bucket, headers):
+ def __init__(self, key, backend, headers):
self.key = key
- self.bucket = bucket
+ self.backend = backend
self.headers = headers
self.closed = False
self.obj_size = 0
@@ -470,7 +469,7 @@ class ObjectW(object):
self.obj_size += len(buf)
def is_temp_failure(self, exc):
- return self.bucket.is_temp_failure(exc)
+ return self.backend.is_temp_failure(exc)
@retry
def close(self):
@@ -486,7 +485,7 @@ class ObjectW(object):
self.headers['Content-Type'] = 'application/octet-stream'
self.fh.seek(0)
- resp = self.bucket._do_request('PUT', '/%s%s' % (self.bucket.prefix, self.key),
+ resp = self.backend._do_request('PUT', '/%s%s' % (self.backend.prefix, self.key),
headers=self.headers, body=self.fh)
etag = resp.getheader('ETag').strip('"')
resp.read()
@@ -495,7 +494,7 @@ class ObjectW(object):
log.warn('ObjectW(%s).close(): MD5 mismatch (%s vs %s)', self.key, etag,
self.md5.hexdigest)
try:
- self.bucket.delete(self.key)
+ self.backend.delete(self.key)
except:
log.exception('Objectw(%s).close(): unable to delete corrupted object!',
self.key)
@@ -516,11 +515,11 @@ class ObjectW(object):
class ObjectR(object):
'''A SWIFT object opened for reading'''
- def __init__(self, key, resp, bucket, metadata=None):
+ def __init__(self, key, resp, backend, metadata=None):
self.key = key
self.resp = resp
self.md5_checked = False
- self.bucket = bucket
+ self.backend = backend
self.metadata = metadata
# False positive, hashlib *does* have md5 member
@@ -573,4 +572,17 @@ def extractmeta(resp):
continue
meta[hit.group(1)] = val
- return meta \ No newline at end of file
+ return meta
+
+
+class AuthenticationExpired(Exception):
+ '''Raised if the provided Authentication Token has expired'''
+
+ def __init__(self, msg):
+ super(AuthenticationExpired, self).__init__()
+ self.msg = msg
+
+ def __str__(self):
+ return 'Auth token expired. Server said: %s' % self.msg
+
+ \ No newline at end of file