summaryrefslogtreecommitdiff
path: root/src/s3ql/backends
diff options
context:
space:
mode:
authorNikolaus Rath <Nikolaus@rath.org>2016-03-09 10:09:53 -0800
committerNikolaus Rath <Nikolaus@rath.org>2016-03-09 10:09:53 -0800
commit218f6abbf0063f9f6630a0d36a8d869699bb5cce (patch)
treec355476ced0dc9a3360e936c4b1786ad2985211a /src/s3ql/backends
parent181ca7febce49f2c2ee17e4615cc528ad03431b6 (diff)
Import s3ql_2.11.1+dfsg.orig.tar.gz
Diffstat (limited to 'src/s3ql/backends')
-rw-r--r--src/s3ql/backends/common.py143
-rw-r--r--src/s3ql/backends/comprenc.py65
-rw-r--r--src/s3ql/backends/gs.py57
-rw-r--r--src/s3ql/backends/local.py28
-rw-r--r--src/s3ql/backends/rackspace.py9
-rw-r--r--src/s3ql/backends/s3.py27
-rw-r--r--src/s3ql/backends/s3c.py56
-rw-r--r--src/s3ql/backends/swift.py150
-rw-r--r--src/s3ql/backends/swiftks.py42
9 files changed, 386 insertions, 191 deletions
diff --git a/src/s3ql/backends/common.py b/src/s3ql/backends/common.py
index 6c64f47..b1083e6 100644
--- a/src/s3ql/backends/common.py
+++ b/src/s3ql/backends/common.py
@@ -6,12 +6,19 @@ Copyright © 2008 Nikolaus Rath <Nikolaus@rath.org>
This program can be distributed under the terms of the GNU GPLv3.
'''
-from ..logging import logging # Ensure use of custom logger class
+from ..logging import logging, QuietError, LOG_ONCE # Ensure use of custom logger class
from abc import abstractmethod, ABCMeta
from functools import wraps
import time
+import codecs
+import io
import textwrap
import inspect
+import ssl
+import os
+import re
+import pickletools
+import pickle
log = logging.getLogger(__name__)
@@ -456,12 +463,16 @@ class NoSuchObject(Exception):
class DanglingStorageURLError(Exception):
'''Raised if the backend can't store data at the given location'''
- def __init__(self, loc):
+ def __init__(self, loc, msg=None):
super().__init__()
self.loc = loc
+ self.msg = msg
def __str__(self):
- return '%r does not exist' % self.loc
+ if self.msg is None:
+ return '%r does not exist' % self.loc
+ else:
+ return self.msg
class AuthorizationError(Exception):
'''Raised if the credentials don't give access to the requested backend'''
@@ -483,9 +494,12 @@ class AuthenticationError(Exception):
def __str__(self):
return 'Access denied. Server said: %s' % self.msg
-class ChecksumError(Exception):
+class CorruptedObjectError(Exception):
"""
- Raised if there is a checksum error in the data that we received.
+ Raised if a storage object is corrupted.
+
+ Note that this is different from BadDigest error, which is raised
+ if a transmission error has been detected.
"""
def __init__(self, str_):
@@ -494,3 +508,122 @@ class ChecksumError(Exception):
def __str__(self):
return self.str
+
+def get_ssl_context(path):
+ '''Construct SSLContext object'''
+
+ # Best practice according to http://docs.python.org/3/library/ssl.html#protocol-versions
+ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context.options |= ssl.OP_NO_SSLv2
+ context.verify_mode = ssl.CERT_REQUIRED
+
+ if path is None:
+ log.debug('Reading default CA certificates.')
+ context.set_default_verify_paths()
+ elif os.path.isfile(path):
+ log.debug('Reading CA certificates from file %s', path)
+ context.load_verify_locations(cafile=path)
+ else:
+ log.debug('Reading CA certificates from directory %s', path)
+ context.load_verify_locations(capath=path)
+
+ return context
+
+def get_proxy(ssl):
+ '''Read system proxy settings
+
+ Returns either `None`, or a tuple ``(host, port)``.
+
+ This function may raise `QuietError`.
+ '''
+
+ if ssl:
+ proxy_env = 'https_proxy'
+ else:
+ proxy_env = 'http_proxy'
+
+ if proxy_env in os.environ:
+ proxy = os.environ[proxy_env]
+ hit = re.match(r'^(https?://)?([a-zA-Z0-9.-]+)(:[0-9]+)?/?$', proxy)
+ if not hit:
+ raise QuietError('Unable to parse proxy setting %s=%r' %
+ (proxy_env, proxy), exitcode=13)
+
+ if hit.group(1) == 'https://':
+ log.warning('HTTPS connection to proxy is probably pointless and not supported, '
+ 'will use standard HTTP', extra=LOG_ONCE)
+
+ if hit.group(3):
+ proxy_port = int(hit.group(3)[1:])
+ else:
+ proxy_port = 80
+
+ proxy_host = hit.group(2)
+ log.info('Using CONNECT proxy %s:%d', proxy_host, proxy_port,
+ extra=LOG_ONCE)
+ proxy = (proxy_host, proxy_port)
+ else:
+ proxy = None
+
+ return proxy
+
+
+SAFE_UNPICKLE_OPCODES = {'BININT', 'BININT1', 'BININT2', 'LONG1', 'LONG4',
+ 'BINSTRING', 'SHORT_BINSTRING', 'GLOBAL',
+ 'NONE', 'NEWTRUE', 'NEWFALSE', 'BINUNICODE',
+ 'BINFLOAT', 'EMPTY_LIST', 'APPEND', 'APPENDS',
+ 'LIST', 'EMPTY_TUPLE', 'TUPLE', 'TUPLE1', 'TUPLE2',
+ 'TUPLE3', 'EMPTY_DICT', 'DICT', 'SETITEM',
+ 'SETITEMS', 'POP', 'DUP', 'MARK', 'POP_MARK',
+ 'BINGET', 'LONG_BINGET', 'BINPUT', 'LONG_BINPUT',
+ 'PROTO', 'STOP', 'REDUCE'}
+
+SAFE_UNPICKLE_GLOBAL_NAMES = { ('__builtin__', 'bytearray'),
+ ('__builtin__', 'set'),
+ ('__builtin__', 'frozenset'),
+ ('_codecs', 'encode') }
+SAFE_UNPICKLE_GLOBAL_OBJS = { bytearray, set, frozenset, codecs.encode }
+
+class SafeUnpickler(pickle.Unpickler):
+ def find_class(self, module, name):
+ if (module, name) not in SAFE_UNPICKLE_GLOBAL_NAMES:
+ raise pickle.UnpicklingError("global '%s.%s' is unsafe" %
+ (module, name))
+ ret = super().find_class(module, name)
+ if ret not in SAFE_UNPICKLE_GLOBAL_OBJS:
+ raise pickle.UnpicklingError("global '%s.%s' is unsafe" %
+ (module, name))
+ return ret
+
+
+def safe_unpickle_fh(fh, fix_imports=True, encoding="ASCII",
+ errors="strict"):
+ '''Safely unpickle untrusted data from *fh*
+
+ *fh* must be seekable.
+ '''
+
+ if not fh.seekable():
+ raise TypeError('*fh* must be seekable')
+ pos = fh.tell()
+
+ # First make sure that we know all used opcodes
+ for (opcode, arg, _) in pickletools.genops(fh):
+ if opcode.proto > 2 or opcode.name not in SAFE_UNPICKLE_OPCODES:
+ raise pickle.UnpicklingError('opcode %s is unsafe' % opcode.name)
+
+ fh.seek(pos)
+
+ # Then use a custom Unpickler to ensure that we only give access to
+ # specific, whitelisted globals. Note that with the above opcodes, there is
+ # no way to trigger attribute access, so "brachiating" from a white listed
+ # object to __builtins__ is not possible.
+ return SafeUnpickler(fh, fix_imports=fix_imports,
+ encoding=encoding, errors=errors).load()
+
+def safe_unpickle(buf, fix_imports=True, encoding="ASCII",
+ errors="strict"):
+ '''Safely unpickle untrusted data in *buf*'''
+
+ return safe_unpickle_fh(io.BytesIO(buf), fix_imports=fix_imports,
+ encoding=encoding, errors=errors)
diff --git a/src/s3ql/backends/comprenc.py b/src/s3ql/backends/comprenc.py
index 7be7fd4..c7689a2 100644
--- a/src/s3ql/backends/comprenc.py
+++ b/src/s3ql/backends/comprenc.py
@@ -8,7 +8,7 @@ This program can be distributed under the terms of the GNU GPLv3.
from ..logging import logging # Ensure use of custom logger class
from .. import BUFSIZE, PICKLE_PROTOCOL
-from .common import AbstractBackend, ChecksumError
+from .common import AbstractBackend, CorruptedObjectError, safe_unpickle
from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring,
ABCDocstMeta)
from Crypto.Cipher import AES
@@ -94,16 +94,21 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta):
'''
if not isinstance(metadata, dict):
- raise MalformedObjectError()
+ raise CorruptedObjectError('metadata should be dict, not %s' % type(metadata))
+
+ format_version = metadata.get('format_version', 0)
+ if not 0 < format_version <= 2:
+ raise CorruptedObjectError('format_version %s unsupported' % format_version)
+
for mkey in ('encryption', 'compression', 'data'):
if mkey not in metadata:
- raise MalformedObjectError()
+ raise CorruptedObjectError('meta key %s is missing' % mkey)
encr_alg = metadata['encryption']
encrypted = (encr_alg != 'None')
if encrypted and self.passphrase is None:
- raise ChecksumError('Encrypted object and no passphrase supplied')
+ raise CorruptedObjectError('Encrypted object and no passphrase supplied')
elif not encrypted and self.passphrase is not None:
raise ObjectNotEncrypted()
@@ -112,30 +117,30 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta):
if not encrypted:
try:
- return (None, pickle.loads(buf, encoding='latin1'))
+ return (None, safe_unpickle(buf, encoding='latin1'))
except pickle.UnpicklingError:
- raise ChecksumError('Invalid metadata')
+ raise CorruptedObjectError('Invalid metadata')
# Encrypted
for mkey in ('nonce', 'signature', 'object_id'):
if mkey not in metadata:
- raise MalformedObjectError()
+ raise CorruptedObjectError('meta key %s is missing' % mkey)
nonce = b64decode(metadata['nonce'])
meta_key = sha256(self.passphrase + nonce + b'meta')
meta_sig = compute_metadata_signature(meta_key, metadata)
if not hmac.compare_digest(b64decode(metadata['signature']),
meta_sig):
- raise ChecksumError('HMAC mismatch')
+ raise CorruptedObjectError('HMAC mismatch')
stored_key = b64decode(metadata['object_id']).decode('utf-8')
if stored_key != key:
- raise ChecksumError('Object content does not match its key (%s vs %s)'
- % (stored_key, key))
+ raise CorruptedObjectError('Object content does not match its key (%s vs %s)'
+ % (stored_key, key))
buf = b64decode(metadata['data'])
- return (nonce, pickle.loads(aes_cipher(meta_key).decrypt(buf),
- encoding='latin1'))
+ return (nonce, safe_unpickle(aes_cipher(meta_key).decrypt(buf),
+ encoding='latin1'))
@prepend_ancestor_docstring
def open_read(self, key):
@@ -375,7 +380,7 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta):
try:
meta_new['data'] = meta['data']
except KeyError:
- raise MalformedObjectError()
+ raise CorruptedObjectError('meta key data is missing')
if not self.passphrase:
return meta_new
@@ -400,7 +405,7 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta):
hash_ = cipher.decrypt(hash_)
if not hmac.compare_digest(hash_, hmac_.digest()):
- raise ChecksumError('HMAC mismatch')
+ raise CorruptedObjectError('HMAC mismatch')
obj_id = nonce[TIME_BYTES:].decode('utf-8')
meta_key = sha256(self.passphrase + nonce + b'meta')
@@ -549,15 +554,15 @@ class DecompressFilter(InputFilter):
buf = self.fh.read(size)
if not buf:
if not self.decomp.eof:
- raise ChecksumError('Premature end of stream.')
+ raise CorruptedObjectError('Premature end of stream.')
if self.decomp.unused_data:
- raise ChecksumError('Data after end of compressed stream')
+ raise CorruptedObjectError('Data after end of compressed stream')
return b''
try:
buf = decompress(self.decomp, buf)
- except ChecksumError:
+ except CorruptedObjectError:
# Read rest of stream, so that we raise HMAC or MD5 error instead
# if problem is on lower layer
self.discard_input()
@@ -674,7 +679,7 @@ class DecryptFilter(InputFilter):
buf = self.fh.read(size)
if not buf:
- raise ChecksumError('Premature end of stream.')
+ raise CorruptedObjectError('Premature end of stream.')
# Work around https://bugs.launchpad.net/pycrypto/+bug/1256172
# cipher.decrypt refuses to work with anything but bytes
@@ -747,10 +752,10 @@ class DecryptFilter(InputFilter):
# Read rest of stream, so that we raise MD5 error instead
# if problem is on lower layer
self.discard_input()
- raise ChecksumError('Extraneous data at end of object')
+ raise CorruptedObjectError('Extraneous data at end of object')
if not hmac.compare_digest(inbuf, self.hmac.digest()):
- raise ChecksumError('HMAC mismatch')
+ raise CorruptedObjectError('HMAC mismatch')
self.hmac_checked = True
break
@@ -825,9 +830,9 @@ class LegacyDecryptDecompressFilter(io.RawIOBase):
if not buf and not self.hmac_checked:
if not hmac.compare_digest(self._decrypt(self.hash),
self.hmac.digest()):
- raise ChecksumError('HMAC mismatch')
+ raise CorruptedObjectError('HMAC mismatch')
elif self.decomp and self.decomp.unused_data:
- raise ChecksumError('Data after end of compressed stream')
+ raise CorruptedObjectError('Data after end of compressed stream')
else:
self.hmac_checked = True
return b''
@@ -865,16 +870,16 @@ def decompress(decomp, buf):
return decomp.decompress(buf)
except IOError as exc:
if exc.args[0].lower().startswith('invalid data stream'):
- raise ChecksumError('Invalid compressed stream')
+ raise CorruptedObjectError('Invalid compressed stream')
raise
except lzma.LZMAError as exc:
if (exc.args[0].lower().startswith('corrupt input data')
or exc.args[0].startswith('Input format not supported')):
- raise ChecksumError('Invalid compressed stream')
+ raise CorruptedObjectError('Invalid compressed stream')
raise
except zlib.error as exc:
if exc.args[0].lower().startswith('error -3 while decompressing'):
- raise ChecksumError('Invalid compressed stream')
+ raise CorruptedObjectError('Invalid compressed stream')
raise
class ObjectNotEncrypted(Exception):
@@ -888,13 +893,3 @@ class ObjectNotEncrypted(Exception):
'''
pass
-
-
-class MalformedObjectError(Exception):
- '''
- Raised by ComprencBackend when trying to access an object that
- wasn't stored by ComprencBackend, i.e. has no information about
- encryption or compression.
- '''
-
- pass
diff --git a/src/s3ql/backends/gs.py b/src/s3ql/backends/gs.py
index 11f35f5..a7944cf 100644
--- a/src/s3ql/backends/gs.py
+++ b/src/s3ql/backends/gs.py
@@ -16,6 +16,7 @@ from dugong import CaseInsensitiveDict, HTTPConnection
from urllib.parse import urlencode
import re
import json
+import threading
import time
# Pylint goes berserk with false positives
@@ -40,10 +41,10 @@ class Backend(s3c.Backend):
# This class variable holds the mapping from refresh tokens to
# access tokens.
access_token = dict()
+ _refresh_lock = threading.Lock()
- def __init__(self, storage_url, gs_key, gs_secret, ssl_context=None, proxy=None):
- super().__init__(storage_url, gs_key, gs_secret, ssl_context=ssl_context,
- proxy=proxy)
+ def __init__(self, storage_url, gs_key, gs_secret, options):
+ super().__init__(storage_url, gs_key, gs_secret, options)
self.use_oauth2 = (gs_key == 'oauth2')
@@ -103,6 +104,17 @@ class Backend(s3c.Backend):
now.tm_year, now.tm_hour,
now.tm_min, now.tm_sec))
+ # This method performs a different kind of HTTP request than the methods
+ # decorated with `retry` that it is called by, so in theory it should do its
+ # own retry handling (perhaps with a new `retry_on` decorator that allows to
+ # specify a custom `is_temp_failure` function instead of calling the
+ # instance method). However, in practice there is currently no difference in
+ # the set of exceptions that are considered temporary when retrieving an
+ # access token, and the set of exceptions checked for in the
+ # `_is_temp_failure` method. Therefore, for now we avoid the additional
+ # complexity of custom retry handling and rely on the @retry decorator of
+ # the caller to handle temporary errors. This should be kept in mind
+ # when modifying either method.
def _get_access_token(self):
log.info('Requesting new access token')
@@ -122,22 +134,34 @@ class Backend(s3c.Backend):
body=body.encode('utf-8'))
resp = conn.read_response()
- resp_json = None
- if 'Content-Type' in resp.headers:
+ if resp.status > 299 or resp.status < 200:
+ raise HTTPError(resp.status, resp.reason, resp.headers)
+
+ content_type = resp.headers.get('Content-Type', None)
+ if content_type:
hit = re.match(r'application/json(?:; charset="(.+)")?$',
resp.headers['Content-Type'], re.IGNORECASE)
- if hit:
- charset = hit.group(1) or 'utf-8'
- body = conn.readall().decode(charset)
- resp_json = json.loads(body)
+ else:
+ hit = None
+
+ if not hit:
+ log.error('Unexpected server reply when refreshing access token:\n%s',
+ self._dump_response(resp))
+ raise RuntimeError('Unable to parse server response')
+
+ charset = hit.group(1) or 'utf-8'
+ body = conn.readall().decode(charset)
+ resp_json = json.loads(body)
+
+ if not isinstance(resp_json, dict):
+ log.error('Invalid json server response. Expected dict, got:\n%s', body)
+ raise RuntimeError('Unable to parse server response')
if 'error' in resp_json:
raise AuthenticationError(resp_json['error'])
- if resp.status > 299 or resp.status < 200:
- raise HTTPError(resp.status, resp.reason, resp.headers)
-
if 'access_token' not in resp_json:
+ log.error('Unable to find access token in server response:\n%s', body)
raise RuntimeError('Unable to parse server response')
self.access_token[self.password] = resp_json['access_token']
@@ -160,6 +184,10 @@ class Backend(s3c.Backend):
except S3Error as exc:
if exc.code != 'AuthenticationRequired':
raise
+
+ if body and not isinstance(body, (bytes, bytearray, memoryview)):
+ body.seek(0)
+
try:
del self.access_token[self.password]
except KeyError: # Mind multithreading..
@@ -168,7 +196,10 @@ class Backend(s3c.Backend):
# If we use OAuth2 and don't have an access token, retrieve
# one
if self.use_oauth2 and self.password not in self.access_token:
- self._get_access_token()
+ # Grab lock to prevent multiple threads from refreshing the token
+ with self._refresh_lock:
+ if self.password not in self.access_token:
+ self._get_access_token()
# Try request. If we are using OAuth2 and this fails, propagate
# the error (because we have just refreshed the access token)
diff --git a/src/s3ql/backends/local.py b/src/s3ql/backends/local.py
index a11013b..d23106e 100644
--- a/src/s3ql/backends/local.py
+++ b/src/s3ql/backends/local.py
@@ -9,7 +9,8 @@ This program can be distributed under the terms of the GNU GPLv3.
from ..logging import logging # Ensure use of custom logger class
from .. import BUFSIZE, PICKLE_PROTOCOL
from ..inherit_docstrings import (copy_ancestor_docstring, ABCDocstMeta)
-from .common import AbstractBackend, DanglingStorageURLError, NoSuchObject, ChecksumError
+from .common import (AbstractBackend, DanglingStorageURLError, NoSuchObject,
+ CorruptedObjectError, safe_unpickle_fh)
import _thread
import io
import os
@@ -24,9 +25,10 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
'''
needs_login = False
+ known_options = set()
- def __init__(self, storage_url, backend_login, backend_pw,
- ssl_context=None, proxy=None):
+ def __init__(self, storage_url, backend_login=None, backend_pw=None,
+ options=None):
'''Initialize local backend
Login and password are ignored.
@@ -57,14 +59,11 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
path = self._key_to_path(key)
try:
with open(path, 'rb') as src:
- return pickle.load(src, encoding='latin1')
+ return safe_unpickle_fh(src, encoding='latin1')
except FileNotFoundError:
raise NoSuchObject(key)
except pickle.UnpicklingError as exc:
- if (isinstance(exc.args[0], str)
- and exc.args[0].startswith('invalid load key')):
- raise ChecksumError('Invalid metadata')
- raise
+ raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc)
@copy_ancestor_docstring
def get_size(self, key):
@@ -79,12 +78,9 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
raise NoSuchObject(key)
try:
- fh.metadata = pickle.load(fh, encoding='latin1')
+ fh.metadata = safe_unpickle_fh(fh, encoding='latin1')
except pickle.UnpicklingError as exc:
- if (isinstance(exc.args[0], str)
- and exc.args[0].startswith('invalid load key')):
- raise ChecksumError('Invalid metadata')
- raise
+ raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc)
return fh
@copy_ancestor_docstring
@@ -191,9 +187,9 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
if metadata is not None:
try:
- pickle.load(src, encoding='latin1')
- except pickle.UnpicklingError:
- raise ChecksumError('Invalid metadata')
+ safe_unpickle_fh(src, encoding='latin1')
+ except pickle.UnpicklingError as exc:
+ raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc)
pickle.dump(metadata, dest, PICKLE_PROTOCOL)
shutil.copyfileobj(src, dest, BUFSIZE)
except:
diff --git a/src/s3ql/backends/rackspace.py b/src/s3ql/backends/rackspace.py
index 5e37c2c..9efcdc3 100644
--- a/src/s3ql/backends/rackspace.py
+++ b/src/s3ql/backends/rackspace.py
@@ -16,9 +16,8 @@ log = logging.getLogger(__name__)
class Backend(swiftks.Backend):
"""A backend to store data in Rackspace CloudFiles"""
- @staticmethod
@copy_ancestor_docstring
- def _parse_storage_url(storage_url, ssl_context):
+ def _parse_storage_url(self, storage_url, ssl_context):
hit = re.match(r'^rackspace://' # Backend
r'([^/:]+)' # Region
@@ -37,4 +36,8 @@ class Backend(swiftks.Backend):
else:
port = 80
- return ('auth.api.rackspacecloud.com', port, region, containername, prefix)
+ self.hostname = 'auth.api.rackspacecloud.com'
+ self.port = port
+ self.container_name = containername
+ self.prefix = prefix
+ self.region = region
diff --git a/src/s3ql/backends/s3.py b/src/s3ql/backends/s3.py
index 743653d..573f1b7 100644
--- a/src/s3ql/backends/s3.py
+++ b/src/s3ql/backends/s3.py
@@ -32,10 +32,10 @@ class Backend(s3c.Backend):
may or may not be available and can be queried for with instance methods.
"""
- def __init__(self, storage_url, login, password, ssl_context=None,
- proxy=None):
- super().__init__(storage_url, login, password, proxy=proxy,
- ssl_context=ssl_context)
+ known_options = s3c.Backend.known_options | { 'sse', 'rrs' }
+
+ def __init__(self, storage_url, login, password, options):
+ super().__init__(storage_url, login, password, options)
@staticmethod
def _parse_storage_url(storage_url, ssl_context):
@@ -82,6 +82,25 @@ class Backend(s3c.Backend):
finally:
keys[:MAX_KEYS] = tmp
+ @copy_ancestor_docstring
+ def copy(self, src, dest, metadata=None):
+ extra_headers = {}
+ if 'sse' in self.options:
+ extra_headers['x-amz-server-side-encryption'] = 'AES256'
+ if 'rrs' in self.options:
+ extra_headers['x-amz-storage-class'] = 'REDUCED_REDUNDANCY'
+ return super().copy(src, dest, metadata=metadata,
+ extra_headers=extra_headers)
+
+ @copy_ancestor_docstring
+ def open_write(self, key, metadata=None, is_compressed=False):
+ extra_headers = {}
+ if 'sse' in self.options:
+ extra_headers['x-amz-server-side-encryption'] = 'AES256'
+ if 'rrs' in self.options:
+ extra_headers['x-amz-storage-class'] = 'REDUCED_REDUNDANCY'
+ return super().open_write(key, metadata=metadata, is_compressed=is_compressed,
+ extra_headers=extra_headers)
@retry
def _delete_multi(self, keys, force=False):
diff --git a/src/s3ql/backends/s3c.py b/src/s3ql/backends/s3c.py
index aac7404..ca0c85d 100644
--- a/src/s3ql/backends/s3c.py
+++ b/src/s3ql/backends/s3c.py
@@ -9,7 +9,8 @@ This program can be distributed under the terms of the GNU GPLv3.
from ..logging import logging, QuietError # Ensure use of custom logger class
from .. import PICKLE_PROTOCOL, BUFSIZE
from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError,
- AuthenticationError, DanglingStorageURLError, retry_generator)
+ AuthenticationError, DanglingStorageURLError, retry_generator,
+ get_proxy, get_ssl_context, CorruptedObjectError, safe_unpickle)
from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring,
ABCDocstMeta)
from io import BytesIO
@@ -50,9 +51,9 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
use_expect_100c = True
xml_ns_prefix = '{http://s3.amazonaws.com/doc/2006-03-01/}'
hdr_prefix = 'x-amz-'
+ known_options = {'no-ssl', 'ssl-ca-path', 'tcp-timeout'}
- def __init__(self, storage_url, login, password, ssl_context=None,
- proxy=None):
+ def __init__(self, storage_url, login, password, options):
'''Initialize backend object
*ssl_context* may be a `ssl.SSLContext` instance or *None*.
@@ -60,16 +61,21 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
super().__init__()
- (host, port, bucket_name, prefix) = self._parse_storage_url(storage_url, ssl_context)
+ if 'no-ssl' in options:
+ self.ssl_context = None
+ else:
+ self.ssl_context = get_ssl_context(options.get('ssl-ca-path', None))
+
+ (host, port, bucket_name, prefix) = self._parse_storage_url(storage_url,
+ self.ssl_context)
+ self.options = options
self.bucket_name = bucket_name
self.prefix = prefix
self.hostname = host
self.port = port
- self.ssl_context = ssl_context
- self.proxy = proxy
+ self.proxy = get_proxy(self.ssl_context is not None)
self.conn = self._get_conn()
-
self.password = password
self.login = login
@@ -117,9 +123,13 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
conn = HTTPConnection(self.hostname, self.port, proxy=self.proxy,
ssl_context=self.ssl_context)
- conn.timeout = 30
+ conn.timeout = self.options.get('tcp-timeout', 10)
return conn
+ # This method is also used implicitly for the retry handling of
+ # `gs.Backend._get_access_token`. When modifying this method, do not forget
+ # to check if this makes it unsuitable for use by `_get_access_token` (in
+ # that case we will have to implement a custom retry logic there).
@copy_ancestor_docstring
def is_temp_failure(self, exc): #IGNORE:W0613
if isinstance(exc, (InternalError, BadDigestError, IncompleteBodyError,
@@ -129,11 +139,15 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
elif is_temp_network_error(exc):
return True
- # In doubt, we retry on 5xx. However, there are some codes
- # where retry is definitely not desired.
- elif (isinstance(exc, HTTPError)
- and exc.status >= 500 and exc.status <= 599
- and exc.status not in (501,505,508,510,511,523)):
+ # In doubt, we retry on 5xx (Server error). However, there are some
+ # codes where retry is definitely not desired. For 4xx (client error) we
+ # do not retry in general, but for 408 (Request Timeout) RFC 2616
+ # specifies that the client may repeat the request without
+ # modifications.
+ elif (isinstance(exc, HTTPError) and
+ ((500 <= exc.status <= 599
+ and exc.status not in (501,505,508,510,511,523))
+ or exc.status == 408)):
return True
return False
@@ -293,7 +307,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
return ObjectR(key, resp, self, meta)
@prepend_ancestor_docstring
- def open_write(self, key, metadata=None, is_compressed=False):
+ def open_write(self, key, metadata=None, is_compressed=False, extra_headers=None):
"""
The returned object will buffer all data and only start the upload
when its `close` method is called.
@@ -307,6 +321,8 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
raise TypeError('*metadata*: expected dict or None, got %s' % type(metadata))
headers = CaseInsensitiveDict()
+ if extra_headers is not None:
+ headers.update(extra_headers)
self._add_meta_headers(headers, metadata)
return ObjectW(key, self, headers)
@@ -330,13 +346,15 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
@retry
@copy_ancestor_docstring
- def copy(self, src, dest, metadata=None):
+ def copy(self, src, dest, metadata=None, extra_headers=None):
log.debug('copy(%s, %s): start', src, dest)
if not (metadata is None or isinstance(metadata, dict)):
raise TypeError('*metadata*: expected dict or None, got %s' % type(metadata))
headers = CaseInsensitiveDict()
+ if extra_headers is not None:
+ headers.update(extra_headers)
headers[self.hdr_prefix + 'copy-source'] = \
'/%s/%s%s' % (self.bucket_name, self.prefix, src)
@@ -376,8 +394,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
headers=None, body=None):
'''Send request, read and return response object'''
- log.debug('_do_request(): start with parameters (%r, %r, %r, %r, %r, %r)',
- method, path, subres, query_string, headers, body)
+ log.debug('preparing %s %s?%s, qs=%s', method, path, subres, query_string)
if headers is None:
headers = CaseInsensitiveDict()
@@ -689,7 +706,10 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
log.warning('MD5 mismatch in metadata for %s', key)
raise BadDigestError('BadDigest',
'Meta MD5 for %s does not match' % key)
- return pickle.loads(b64decode(buf), encoding='latin1')
+ try:
+ return safe_unpickle(b64decode(buf), encoding='latin1')
+ except pickle.UnpicklingError as exc:
+ raise CorruptedObjectError('Corrupted metadata, pickle says: %s' % exc)
elif format_ == 'raw': # No MD5 available
return meta
else:
diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py
index a3fe613..5d1c84c 100644
--- a/src/s3ql/backends/swift.py
+++ b/src/s3ql/backends/swift.py
@@ -9,7 +9,8 @@ This program can be distributed under the terms of the GNU GPLv3.
from ..logging import logging, QuietError # Ensure use of custom logger class
from .. import BUFSIZE
from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError,
- DanglingStorageURLError, retry_generator)
+ DanglingStorageURLError, retry_generator, get_proxy,
+ get_ssl_context)
from .s3c import HTTPError, ObjectR, ObjectW, md5sum_b64, BadDigestError
from . import s3c
from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring,
@@ -40,30 +41,31 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
use_expect_100c = True
hdr_prefix = 'X-Object-'
+ known_options = {'no-ssl', 'ssl-ca-path', 'tcp-timeout'}
_add_meta_headers = s3c.Backend._add_meta_headers
_extractmeta = s3c.Backend._extractmeta
- def __init__(self, storage_url, login, password, ssl_context=None, proxy=None):
- # Unused argument
- #pylint: disable=W0613
-
+ def __init__(self, storage_url, login, password, options):
super().__init__()
-
- (host, port, container_name, prefix) = self._parse_storage_url(storage_url, ssl_context)
-
- self.hostname = host
- self.port = port
- self.container_name = container_name
- self.prefix = prefix
- self.password = password
- self.login = login
+ self.options = options
+ self.hostname = None
+ self.port = None
+ self.container_name = None
+ self.prefix = None
self.auth_token = None
self.auth_prefix = None
self.conn = None
- self.proxy = proxy
- self.ssl_context = ssl_context
+ self.password = password
+ self.login = login
+
+ if 'no-ssl' in options:
+ self.ssl_context = None
+ else:
+ self.ssl_context = get_ssl_context(options.get('ssl-ca-path', None))
+ self._parse_storage_url(storage_url, self.ssl_context)
+ self.proxy = get_proxy(self.ssl_context is not None)
self._container_exists()
def __str__(self):
@@ -86,12 +88,8 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
raise DanglingStorageURLError(self.container_name)
raise
- @staticmethod
- def _parse_storage_url(storage_url, ssl_context):
- '''Extract information from storage URL
-
- Return a tuple *(host, port, container_name, prefix)* .
- '''
+ def _parse_storage_url(self, storage_url, ssl_context):
+ '''Init instance variables from storage url'''
hit = re.match(r'^[a-zA-Z0-9]+://' # Backend
r'([^/:]+)' # Hostname
@@ -112,14 +110,25 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
containername = hit.group(3)
prefix = hit.group(4) or ''
- return (hostname, port, containername, prefix)
+ self.hostname = hostname
+ self.port = port
+ self.container_name = containername
+ self.prefix = prefix
@copy_ancestor_docstring
def is_temp_failure(self, exc): #IGNORE:W0613
if isinstance(exc, AuthenticationExpired):
return True
- elif isinstance(exc, HTTPError) and exc.status >= 500 and exc.status <= 599:
+ # In doubt, we retry on 5xx (Server error). However, there are some
+ # codes where retry is definitely not desired. For 4xx (client error) we
+ # do not retry in general, but for 408 (Request Timeout) RFC 2616
+ # specifies that the client may repeat the request without
+ # modifications.
+ elif (isinstance(exc, HTTPError) and
+ ((500 <= exc.status <= 599
+ and exc.status not in (501,505,508,510,511,523))
+ or exc.status == 408)):
return True
elif is_temp_network_error(exc):
@@ -135,6 +144,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
conn = HTTPConnection(self.hostname, self.port, proxy=self.proxy,
ssl_context=self.ssl_context)
+ conn.timeout = self.options.get('tcp-timeout', 10)
headers = CaseInsensitiveDict()
headers['X-Auth-User'] = self.login
@@ -165,8 +175,10 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
self.auth_prefix = urllib.parse.unquote(o.path)
conn.disconnect()
- return HTTPConnection(o.hostname, o.port, proxy=self.proxy,
+ conn = HTTPConnection(o.hostname, o.port, proxy=self.proxy,
ssl_context=self.ssl_context)
+ conn.timeout = self.options.get('tcp-timeout', 10)
+ return conn
raise RuntimeError('No valid authentication path found')
@@ -201,53 +213,9 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
elif subres:
path += '?%s' % subres
- # We can probably remove the assertions at some point and
- # call self.conn.read_response() directly
- def read_response():
- resp = self.conn.read_response()
- assert resp.method == method
- assert resp.path == path
- return resp
-
headers['X-Auth-Token'] = self.auth_token
try:
- resp = None
- log.debug('_send_request(): %s %s', method, path)
- if body is None or isinstance(body, (bytes, bytearray, memoryview)):
- self.conn.send_request(method, path, body=body, headers=headers)
- else:
- body_len = os.fstat(body.fileno()).st_size
- self.conn.send_request(method, path, expect100=self.use_expect_100c,
- headers=headers, body=BodyFollowing(body_len))
-
- if self.use_expect_100c:
- resp = read_response()
- if resp.status == 100:
- resp = None
- # Otherwise fall through below
-
- try:
- shutil.copyfileobj(body, self.conn, BUFSIZE)
- except ConnectionClosed:
- # Server closed connection while we were writing body data -
- # but we may still be able to read an error response
- try:
- resp = read_response()
- except ConnectionClosed: # No server response available
- pass
- else:
- if resp.status < 400:
- log.warning('Server broke connection during upload, but signaled '
- '%d %s', resp.status, resp.reason)
- resp = None
-
- # Re-raise original error
- if resp is None:
- raise
-
- if resp is None:
- resp = read_response()
-
+ resp = self._do_request_inner(method, path, body=body, headers=headers)
except Exception as exc:
if is_temp_network_error(exc):
# We probably can't use the connection anymore
@@ -275,6 +243,48 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
else:
raise HTTPError(resp.status, resp.reason, resp.headers)
+ # Including this code directly in _do_request would be very messy since
+ # we can't `return` the response early, thus the separate method
+ def _do_request_inner(self, method, path, body, headers):
+ '''The guts of the _do_request method'''
+
+ log.debug('_do_request_inner(): %s %s', method, path)
+
+ if body is None or isinstance(body, (bytes, bytearray, memoryview)):
+ self.conn.send_request(method, path, body=body, headers=headers)
+ else:
+ body_len = os.fstat(body.fileno()).st_size
+ self.conn.send_request(method, path, expect100=self.use_expect_100c,
+ headers=headers, body=BodyFollowing(body_len))
+
+ if self.use_expect_100c:
+ log.debug('waiting for 100-continue')
+ resp = self.conn.read_response()
+ if resp.status != 100:
+ return resp
+
+ log.debug('writing body data')
+ try:
+ shutil.copyfileobj(body, self.conn, BUFSIZE)
+ except ConnectionClosed:
+ log.debug('interrupted write, server closed connection')
+ # Server closed connection while we were writing body data -
+ # but we may still be able to read an error response
+ try:
+ resp = self.conn.read_response()
+ except ConnectionClosed: # No server response available
+ log.debug('no response available in buffer')
+ pass
+ else:
+ if resp.status >= 400: # error response
+ return resp
+ log.warning('Server broke connection during upload, but signaled '
+ '%d %s', resp.status, resp.reason)
+
+ # Re-raise original error
+ raise
+
+ return self.conn.read_response()
def _assert_empty_response(self, resp):
'''Assert that current response body is empty'''
diff --git a/src/s3ql/backends/swiftks.py b/src/s3ql/backends/swiftks.py
index 959499b..19a644b 100644
--- a/src/s3ql/backends/swiftks.py
+++ b/src/s3ql/backends/swiftks.py
@@ -21,31 +21,12 @@ log = logging.getLogger(__name__)
class Backend(swift.Backend):
- def __init__(self, storage_url, login, password, ssl_context=None, proxy=None):
- # Unused argument
- #pylint: disable=W0613
+ def __init__(self, storage_url, login, password, options):
+ self.region = None
+ super().__init__(storage_url, login, password, options)
- (host, port, region,
- container_name, prefix) = self._parse_storage_url(storage_url, ssl_context)
-
- self.hostname = host
- self.port = port
- self.region = region
- self.container_name = container_name
- self.prefix = prefix
- self.proxy = proxy
- self.password = password
- self.login = login
- self.auth_token = None
- self.auth_prefix = None
- self.conn = None
- self.ssl_context = ssl_context
-
- self._container_exists()
-
- @staticmethod
@copy_ancestor_docstring
- def _parse_storage_url(storage_url, ssl_context):
+ def _parse_storage_url(self, storage_url, ssl_context):
hit = re.match(r'^[a-zA-Z0-9]+://' # Backend
r'([^/:]+)' # Hostname
@@ -68,7 +49,11 @@ class Backend(swift.Backend):
containername = hit.group(4)
prefix = hit.group(5) or ''
- return (hostname, port, region, containername, prefix)
+ self.hostname = hostname
+ self.port = port
+ self.container_name = containername
+ self.prefix = prefix
+ self.region = region
@retry
def _get_conn(self):
@@ -78,6 +63,7 @@ class Backend(swift.Backend):
conn = HTTPConnection(self.hostname, port=self.port, proxy=self.proxy,
ssl_context=self.ssl_context)
+ conn.timeout = self.options.get('tcp-timeout', 10)
headers = CaseInsensitiveDict()
headers['Content-Type'] = 'application/json'
@@ -125,14 +111,16 @@ class Backend(swift.Backend):
self.auth_prefix = urllib.parse.unquote(o.path)
conn.disconnect()
- return HTTPConnection(o.hostname, o.port, proxy=self.proxy,
+ conn = HTTPConnection(o.hostname, o.port, proxy=self.proxy,
ssl_context=self.ssl_context)
+ conn.timeout = self.options.get('tcp-timeout', 10)
+ return conn
if len(avail_regions) < 10:
- raise DanglingStorageURLError(
+ raise DanglingStorageURLError(self.container_name,
'No accessible object storage service found in region %s'
' (available regions: %s)' % (self.region, ', '.join(avail_regions)))
else:
- raise DanglingStorageURLError(
+ raise DanglingStorageURLError(self.container_name,
'No accessible object storage service found in region %s'
% self.region)