diff options
author | Nikolaus Rath <Nikolaus@rath.org> | 2016-03-09 10:09:53 -0800 |
---|---|---|
committer | Nikolaus Rath <Nikolaus@rath.org> | 2016-03-09 10:09:53 -0800 |
commit | 218f6abbf0063f9f6630a0d36a8d869699bb5cce (patch) | |
tree | c355476ced0dc9a3360e936c4b1786ad2985211a /src/s3ql/backends | |
parent | 181ca7febce49f2c2ee17e4615cc528ad03431b6 (diff) |
Import s3ql_2.11.1+dfsg.orig.tar.gz
Diffstat (limited to 'src/s3ql/backends')
-rw-r--r-- | src/s3ql/backends/common.py | 143 | ||||
-rw-r--r-- | src/s3ql/backends/comprenc.py | 65 | ||||
-rw-r--r-- | src/s3ql/backends/gs.py | 57 | ||||
-rw-r--r-- | src/s3ql/backends/local.py | 28 | ||||
-rw-r--r-- | src/s3ql/backends/rackspace.py | 9 | ||||
-rw-r--r-- | src/s3ql/backends/s3.py | 27 | ||||
-rw-r--r-- | src/s3ql/backends/s3c.py | 56 | ||||
-rw-r--r-- | src/s3ql/backends/swift.py | 150 | ||||
-rw-r--r-- | src/s3ql/backends/swiftks.py | 42 |
9 files changed, 386 insertions, 191 deletions
diff --git a/src/s3ql/backends/common.py b/src/s3ql/backends/common.py index 6c64f47..b1083e6 100644 --- a/src/s3ql/backends/common.py +++ b/src/s3ql/backends/common.py @@ -6,12 +6,19 @@ Copyright © 2008 Nikolaus Rath <Nikolaus@rath.org> This program can be distributed under the terms of the GNU GPLv3. ''' -from ..logging import logging # Ensure use of custom logger class +from ..logging import logging, QuietError, LOG_ONCE # Ensure use of custom logger class from abc import abstractmethod, ABCMeta from functools import wraps import time +import codecs +import io import textwrap import inspect +import ssl +import os +import re +import pickletools +import pickle log = logging.getLogger(__name__) @@ -456,12 +463,16 @@ class NoSuchObject(Exception): class DanglingStorageURLError(Exception): '''Raised if the backend can't store data at the given location''' - def __init__(self, loc): + def __init__(self, loc, msg=None): super().__init__() self.loc = loc + self.msg = msg def __str__(self): - return '%r does not exist' % self.loc + if self.msg is None: + return '%r does not exist' % self.loc + else: + return self.msg class AuthorizationError(Exception): '''Raised if the credentials don't give access to the requested backend''' @@ -483,9 +494,12 @@ class AuthenticationError(Exception): def __str__(self): return 'Access denied. Server said: %s' % self.msg -class ChecksumError(Exception): +class CorruptedObjectError(Exception): """ - Raised if there is a checksum error in the data that we received. + Raised if a storage object is corrupted. + + Note that this is different from BadDigest error, which is raised + if a transmission error has been detected. """ def __init__(self, str_): @@ -494,3 +508,122 @@ class ChecksumError(Exception): def __str__(self): return self.str + +def get_ssl_context(path): + '''Construct SSLContext object''' + + # Best practice according to http://docs.python.org/3/library/ssl.html#protocol-versions + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.options |= ssl.OP_NO_SSLv2 + context.verify_mode = ssl.CERT_REQUIRED + + if path is None: + log.debug('Reading default CA certificates.') + context.set_default_verify_paths() + elif os.path.isfile(path): + log.debug('Reading CA certificates from file %s', path) + context.load_verify_locations(cafile=path) + else: + log.debug('Reading CA certificates from directory %s', path) + context.load_verify_locations(capath=path) + + return context + +def get_proxy(ssl): + '''Read system proxy settings + + Returns either `None`, or a tuple ``(host, port)``. + + This function may raise `QuietError`. + ''' + + if ssl: + proxy_env = 'https_proxy' + else: + proxy_env = 'http_proxy' + + if proxy_env in os.environ: + proxy = os.environ[proxy_env] + hit = re.match(r'^(https?://)?([a-zA-Z0-9.-]+)(:[0-9]+)?/?$', proxy) + if not hit: + raise QuietError('Unable to parse proxy setting %s=%r' % + (proxy_env, proxy), exitcode=13) + + if hit.group(1) == 'https://': + log.warning('HTTPS connection to proxy is probably pointless and not supported, ' + 'will use standard HTTP', extra=LOG_ONCE) + + if hit.group(3): + proxy_port = int(hit.group(3)[1:]) + else: + proxy_port = 80 + + proxy_host = hit.group(2) + log.info('Using CONNECT proxy %s:%d', proxy_host, proxy_port, + extra=LOG_ONCE) + proxy = (proxy_host, proxy_port) + else: + proxy = None + + return proxy + + +SAFE_UNPICKLE_OPCODES = {'BININT', 'BININT1', 'BININT2', 'LONG1', 'LONG4', + 'BINSTRING', 'SHORT_BINSTRING', 'GLOBAL', + 'NONE', 'NEWTRUE', 'NEWFALSE', 'BINUNICODE', + 'BINFLOAT', 'EMPTY_LIST', 'APPEND', 'APPENDS', + 'LIST', 'EMPTY_TUPLE', 'TUPLE', 'TUPLE1', 'TUPLE2', + 'TUPLE3', 'EMPTY_DICT', 'DICT', 'SETITEM', + 'SETITEMS', 'POP', 'DUP', 'MARK', 'POP_MARK', + 'BINGET', 'LONG_BINGET', 'BINPUT', 'LONG_BINPUT', + 'PROTO', 'STOP', 'REDUCE'} + +SAFE_UNPICKLE_GLOBAL_NAMES = { ('__builtin__', 'bytearray'), + ('__builtin__', 'set'), + ('__builtin__', 'frozenset'), + ('_codecs', 'encode') } +SAFE_UNPICKLE_GLOBAL_OBJS = { bytearray, set, frozenset, codecs.encode } + +class SafeUnpickler(pickle.Unpickler): + def find_class(self, module, name): + if (module, name) not in SAFE_UNPICKLE_GLOBAL_NAMES: + raise pickle.UnpicklingError("global '%s.%s' is unsafe" % + (module, name)) + ret = super().find_class(module, name) + if ret not in SAFE_UNPICKLE_GLOBAL_OBJS: + raise pickle.UnpicklingError("global '%s.%s' is unsafe" % + (module, name)) + return ret + + +def safe_unpickle_fh(fh, fix_imports=True, encoding="ASCII", + errors="strict"): + '''Safely unpickle untrusted data from *fh* + + *fh* must be seekable. + ''' + + if not fh.seekable(): + raise TypeError('*fh* must be seekable') + pos = fh.tell() + + # First make sure that we know all used opcodes + for (opcode, arg, _) in pickletools.genops(fh): + if opcode.proto > 2 or opcode.name not in SAFE_UNPICKLE_OPCODES: + raise pickle.UnpicklingError('opcode %s is unsafe' % opcode.name) + + fh.seek(pos) + + # Then use a custom Unpickler to ensure that we only give access to + # specific, whitelisted globals. Note that with the above opcodes, there is + # no way to trigger attribute access, so "brachiating" from a white listed + # object to __builtins__ is not possible. + return SafeUnpickler(fh, fix_imports=fix_imports, + encoding=encoding, errors=errors).load() + +def safe_unpickle(buf, fix_imports=True, encoding="ASCII", + errors="strict"): + '''Safely unpickle untrusted data in *buf*''' + + return safe_unpickle_fh(io.BytesIO(buf), fix_imports=fix_imports, + encoding=encoding, errors=errors) diff --git a/src/s3ql/backends/comprenc.py b/src/s3ql/backends/comprenc.py index 7be7fd4..c7689a2 100644 --- a/src/s3ql/backends/comprenc.py +++ b/src/s3ql/backends/comprenc.py @@ -8,7 +8,7 @@ This program can be distributed under the terms of the GNU GPLv3. from ..logging import logging # Ensure use of custom logger class from .. import BUFSIZE, PICKLE_PROTOCOL -from .common import AbstractBackend, ChecksumError +from .common import AbstractBackend, CorruptedObjectError, safe_unpickle from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring, ABCDocstMeta) from Crypto.Cipher import AES @@ -94,16 +94,21 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): ''' if not isinstance(metadata, dict): - raise MalformedObjectError() + raise CorruptedObjectError('metadata should be dict, not %s' % type(metadata)) + + format_version = metadata.get('format_version', 0) + if not 0 < format_version <= 2: + raise CorruptedObjectError('format_version %s unsupported' % format_version) + for mkey in ('encryption', 'compression', 'data'): if mkey not in metadata: - raise MalformedObjectError() + raise CorruptedObjectError('meta key %s is missing' % mkey) encr_alg = metadata['encryption'] encrypted = (encr_alg != 'None') if encrypted and self.passphrase is None: - raise ChecksumError('Encrypted object and no passphrase supplied') + raise CorruptedObjectError('Encrypted object and no passphrase supplied') elif not encrypted and self.passphrase is not None: raise ObjectNotEncrypted() @@ -112,30 +117,30 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): if not encrypted: try: - return (None, pickle.loads(buf, encoding='latin1')) + return (None, safe_unpickle(buf, encoding='latin1')) except pickle.UnpicklingError: - raise ChecksumError('Invalid metadata') + raise CorruptedObjectError('Invalid metadata') # Encrypted for mkey in ('nonce', 'signature', 'object_id'): if mkey not in metadata: - raise MalformedObjectError() + raise CorruptedObjectError('meta key %s is missing' % mkey) nonce = b64decode(metadata['nonce']) meta_key = sha256(self.passphrase + nonce + b'meta') meta_sig = compute_metadata_signature(meta_key, metadata) if not hmac.compare_digest(b64decode(metadata['signature']), meta_sig): - raise ChecksumError('HMAC mismatch') + raise CorruptedObjectError('HMAC mismatch') stored_key = b64decode(metadata['object_id']).decode('utf-8') if stored_key != key: - raise ChecksumError('Object content does not match its key (%s vs %s)' - % (stored_key, key)) + raise CorruptedObjectError('Object content does not match its key (%s vs %s)' + % (stored_key, key)) buf = b64decode(metadata['data']) - return (nonce, pickle.loads(aes_cipher(meta_key).decrypt(buf), - encoding='latin1')) + return (nonce, safe_unpickle(aes_cipher(meta_key).decrypt(buf), + encoding='latin1')) @prepend_ancestor_docstring def open_read(self, key): @@ -375,7 +380,7 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): try: meta_new['data'] = meta['data'] except KeyError: - raise MalformedObjectError() + raise CorruptedObjectError('meta key data is missing') if not self.passphrase: return meta_new @@ -400,7 +405,7 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): hash_ = cipher.decrypt(hash_) if not hmac.compare_digest(hash_, hmac_.digest()): - raise ChecksumError('HMAC mismatch') + raise CorruptedObjectError('HMAC mismatch') obj_id = nonce[TIME_BYTES:].decode('utf-8') meta_key = sha256(self.passphrase + nonce + b'meta') @@ -549,15 +554,15 @@ class DecompressFilter(InputFilter): buf = self.fh.read(size) if not buf: if not self.decomp.eof: - raise ChecksumError('Premature end of stream.') + raise CorruptedObjectError('Premature end of stream.') if self.decomp.unused_data: - raise ChecksumError('Data after end of compressed stream') + raise CorruptedObjectError('Data after end of compressed stream') return b'' try: buf = decompress(self.decomp, buf) - except ChecksumError: + except CorruptedObjectError: # Read rest of stream, so that we raise HMAC or MD5 error instead # if problem is on lower layer self.discard_input() @@ -674,7 +679,7 @@ class DecryptFilter(InputFilter): buf = self.fh.read(size) if not buf: - raise ChecksumError('Premature end of stream.') + raise CorruptedObjectError('Premature end of stream.') # Work around https://bugs.launchpad.net/pycrypto/+bug/1256172 # cipher.decrypt refuses to work with anything but bytes @@ -747,10 +752,10 @@ class DecryptFilter(InputFilter): # Read rest of stream, so that we raise MD5 error instead # if problem is on lower layer self.discard_input() - raise ChecksumError('Extraneous data at end of object') + raise CorruptedObjectError('Extraneous data at end of object') if not hmac.compare_digest(inbuf, self.hmac.digest()): - raise ChecksumError('HMAC mismatch') + raise CorruptedObjectError('HMAC mismatch') self.hmac_checked = True break @@ -825,9 +830,9 @@ class LegacyDecryptDecompressFilter(io.RawIOBase): if not buf and not self.hmac_checked: if not hmac.compare_digest(self._decrypt(self.hash), self.hmac.digest()): - raise ChecksumError('HMAC mismatch') + raise CorruptedObjectError('HMAC mismatch') elif self.decomp and self.decomp.unused_data: - raise ChecksumError('Data after end of compressed stream') + raise CorruptedObjectError('Data after end of compressed stream') else: self.hmac_checked = True return b'' @@ -865,16 +870,16 @@ def decompress(decomp, buf): return decomp.decompress(buf) except IOError as exc: if exc.args[0].lower().startswith('invalid data stream'): - raise ChecksumError('Invalid compressed stream') + raise CorruptedObjectError('Invalid compressed stream') raise except lzma.LZMAError as exc: if (exc.args[0].lower().startswith('corrupt input data') or exc.args[0].startswith('Input format not supported')): - raise ChecksumError('Invalid compressed stream') + raise CorruptedObjectError('Invalid compressed stream') raise except zlib.error as exc: if exc.args[0].lower().startswith('error -3 while decompressing'): - raise ChecksumError('Invalid compressed stream') + raise CorruptedObjectError('Invalid compressed stream') raise class ObjectNotEncrypted(Exception): @@ -888,13 +893,3 @@ class ObjectNotEncrypted(Exception): ''' pass - - -class MalformedObjectError(Exception): - ''' - Raised by ComprencBackend when trying to access an object that - wasn't stored by ComprencBackend, i.e. has no information about - encryption or compression. - ''' - - pass diff --git a/src/s3ql/backends/gs.py b/src/s3ql/backends/gs.py index 11f35f5..a7944cf 100644 --- a/src/s3ql/backends/gs.py +++ b/src/s3ql/backends/gs.py @@ -16,6 +16,7 @@ from dugong import CaseInsensitiveDict, HTTPConnection from urllib.parse import urlencode import re import json +import threading import time # Pylint goes berserk with false positives @@ -40,10 +41,10 @@ class Backend(s3c.Backend): # This class variable holds the mapping from refresh tokens to # access tokens. access_token = dict() + _refresh_lock = threading.Lock() - def __init__(self, storage_url, gs_key, gs_secret, ssl_context=None, proxy=None): - super().__init__(storage_url, gs_key, gs_secret, ssl_context=ssl_context, - proxy=proxy) + def __init__(self, storage_url, gs_key, gs_secret, options): + super().__init__(storage_url, gs_key, gs_secret, options) self.use_oauth2 = (gs_key == 'oauth2') @@ -103,6 +104,17 @@ class Backend(s3c.Backend): now.tm_year, now.tm_hour, now.tm_min, now.tm_sec)) + # This method performs a different kind of HTTP request than the methods + # decorated with `retry` that it is called by, so in theory it should do its + # own retry handling (perhaps with a new `retry_on` decorator that allows to + # specify a custom `is_temp_failure` function instead of calling the + # instance method). However, in practice there is currently no difference in + # the set of exceptions that are considered temporary when retrieving an + # access token, and the set of exceptions checked for in the + # `_is_temp_failure` method. Therefore, for now we avoid the additional + # complexity of custom retry handling and rely on the @retry decorator of + # the caller to handle temporary errors. This should be kept in mind + # when modifying either method. def _get_access_token(self): log.info('Requesting new access token') @@ -122,22 +134,34 @@ class Backend(s3c.Backend): body=body.encode('utf-8')) resp = conn.read_response() - resp_json = None - if 'Content-Type' in resp.headers: + if resp.status > 299 or resp.status < 200: + raise HTTPError(resp.status, resp.reason, resp.headers) + + content_type = resp.headers.get('Content-Type', None) + if content_type: hit = re.match(r'application/json(?:; charset="(.+)")?$', resp.headers['Content-Type'], re.IGNORECASE) - if hit: - charset = hit.group(1) or 'utf-8' - body = conn.readall().decode(charset) - resp_json = json.loads(body) + else: + hit = None + + if not hit: + log.error('Unexpected server reply when refreshing access token:\n%s', + self._dump_response(resp)) + raise RuntimeError('Unable to parse server response') + + charset = hit.group(1) or 'utf-8' + body = conn.readall().decode(charset) + resp_json = json.loads(body) + + if not isinstance(resp_json, dict): + log.error('Invalid json server response. Expected dict, got:\n%s', body) + raise RuntimeError('Unable to parse server response') if 'error' in resp_json: raise AuthenticationError(resp_json['error']) - if resp.status > 299 or resp.status < 200: - raise HTTPError(resp.status, resp.reason, resp.headers) - if 'access_token' not in resp_json: + log.error('Unable to find access token in server response:\n%s', body) raise RuntimeError('Unable to parse server response') self.access_token[self.password] = resp_json['access_token'] @@ -160,6 +184,10 @@ class Backend(s3c.Backend): except S3Error as exc: if exc.code != 'AuthenticationRequired': raise + + if body and not isinstance(body, (bytes, bytearray, memoryview)): + body.seek(0) + try: del self.access_token[self.password] except KeyError: # Mind multithreading.. @@ -168,7 +196,10 @@ class Backend(s3c.Backend): # If we use OAuth2 and don't have an access token, retrieve # one if self.use_oauth2 and self.password not in self.access_token: - self._get_access_token() + # Grab lock to prevent multiple threads from refreshing the token + with self._refresh_lock: + if self.password not in self.access_token: + self._get_access_token() # Try request. If we are using OAuth2 and this fails, propagate # the error (because we have just refreshed the access token) diff --git a/src/s3ql/backends/local.py b/src/s3ql/backends/local.py index a11013b..d23106e 100644 --- a/src/s3ql/backends/local.py +++ b/src/s3ql/backends/local.py @@ -9,7 +9,8 @@ This program can be distributed under the terms of the GNU GPLv3. from ..logging import logging # Ensure use of custom logger class from .. import BUFSIZE, PICKLE_PROTOCOL from ..inherit_docstrings import (copy_ancestor_docstring, ABCDocstMeta) -from .common import AbstractBackend, DanglingStorageURLError, NoSuchObject, ChecksumError +from .common import (AbstractBackend, DanglingStorageURLError, NoSuchObject, + CorruptedObjectError, safe_unpickle_fh) import _thread import io import os @@ -24,9 +25,10 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): ''' needs_login = False + known_options = set() - def __init__(self, storage_url, backend_login, backend_pw, - ssl_context=None, proxy=None): + def __init__(self, storage_url, backend_login=None, backend_pw=None, + options=None): '''Initialize local backend Login and password are ignored. @@ -57,14 +59,11 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): path = self._key_to_path(key) try: with open(path, 'rb') as src: - return pickle.load(src, encoding='latin1') + return safe_unpickle_fh(src, encoding='latin1') except FileNotFoundError: raise NoSuchObject(key) except pickle.UnpicklingError as exc: - if (isinstance(exc.args[0], str) - and exc.args[0].startswith('invalid load key')): - raise ChecksumError('Invalid metadata') - raise + raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc) @copy_ancestor_docstring def get_size(self, key): @@ -79,12 +78,9 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): raise NoSuchObject(key) try: - fh.metadata = pickle.load(fh, encoding='latin1') + fh.metadata = safe_unpickle_fh(fh, encoding='latin1') except pickle.UnpicklingError as exc: - if (isinstance(exc.args[0], str) - and exc.args[0].startswith('invalid load key')): - raise ChecksumError('Invalid metadata') - raise + raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc) return fh @copy_ancestor_docstring @@ -191,9 +187,9 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): if metadata is not None: try: - pickle.load(src, encoding='latin1') - except pickle.UnpicklingError: - raise ChecksumError('Invalid metadata') + safe_unpickle_fh(src, encoding='latin1') + except pickle.UnpicklingError as exc: + raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc) pickle.dump(metadata, dest, PICKLE_PROTOCOL) shutil.copyfileobj(src, dest, BUFSIZE) except: diff --git a/src/s3ql/backends/rackspace.py b/src/s3ql/backends/rackspace.py index 5e37c2c..9efcdc3 100644 --- a/src/s3ql/backends/rackspace.py +++ b/src/s3ql/backends/rackspace.py @@ -16,9 +16,8 @@ log = logging.getLogger(__name__) class Backend(swiftks.Backend): """A backend to store data in Rackspace CloudFiles""" - @staticmethod @copy_ancestor_docstring - def _parse_storage_url(storage_url, ssl_context): + def _parse_storage_url(self, storage_url, ssl_context): hit = re.match(r'^rackspace://' # Backend r'([^/:]+)' # Region @@ -37,4 +36,8 @@ class Backend(swiftks.Backend): else: port = 80 - return ('auth.api.rackspacecloud.com', port, region, containername, prefix) + self.hostname = 'auth.api.rackspacecloud.com' + self.port = port + self.container_name = containername + self.prefix = prefix + self.region = region diff --git a/src/s3ql/backends/s3.py b/src/s3ql/backends/s3.py index 743653d..573f1b7 100644 --- a/src/s3ql/backends/s3.py +++ b/src/s3ql/backends/s3.py @@ -32,10 +32,10 @@ class Backend(s3c.Backend): may or may not be available and can be queried for with instance methods. """ - def __init__(self, storage_url, login, password, ssl_context=None, - proxy=None): - super().__init__(storage_url, login, password, proxy=proxy, - ssl_context=ssl_context) + known_options = s3c.Backend.known_options | { 'sse', 'rrs' } + + def __init__(self, storage_url, login, password, options): + super().__init__(storage_url, login, password, options) @staticmethod def _parse_storage_url(storage_url, ssl_context): @@ -82,6 +82,25 @@ class Backend(s3c.Backend): finally: keys[:MAX_KEYS] = tmp + @copy_ancestor_docstring + def copy(self, src, dest, metadata=None): + extra_headers = {} + if 'sse' in self.options: + extra_headers['x-amz-server-side-encryption'] = 'AES256' + if 'rrs' in self.options: + extra_headers['x-amz-storage-class'] = 'REDUCED_REDUNDANCY' + return super().copy(src, dest, metadata=metadata, + extra_headers=extra_headers) + + @copy_ancestor_docstring + def open_write(self, key, metadata=None, is_compressed=False): + extra_headers = {} + if 'sse' in self.options: + extra_headers['x-amz-server-side-encryption'] = 'AES256' + if 'rrs' in self.options: + extra_headers['x-amz-storage-class'] = 'REDUCED_REDUNDANCY' + return super().open_write(key, metadata=metadata, is_compressed=is_compressed, + extra_headers=extra_headers) @retry def _delete_multi(self, keys, force=False): diff --git a/src/s3ql/backends/s3c.py b/src/s3ql/backends/s3c.py index aac7404..ca0c85d 100644 --- a/src/s3ql/backends/s3c.py +++ b/src/s3ql/backends/s3c.py @@ -9,7 +9,8 @@ This program can be distributed under the terms of the GNU GPLv3. from ..logging import logging, QuietError # Ensure use of custom logger class from .. import PICKLE_PROTOCOL, BUFSIZE from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError, - AuthenticationError, DanglingStorageURLError, retry_generator) + AuthenticationError, DanglingStorageURLError, retry_generator, + get_proxy, get_ssl_context, CorruptedObjectError, safe_unpickle) from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring, ABCDocstMeta) from io import BytesIO @@ -50,9 +51,9 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): use_expect_100c = True xml_ns_prefix = '{http://s3.amazonaws.com/doc/2006-03-01/}' hdr_prefix = 'x-amz-' + known_options = {'no-ssl', 'ssl-ca-path', 'tcp-timeout'} - def __init__(self, storage_url, login, password, ssl_context=None, - proxy=None): + def __init__(self, storage_url, login, password, options): '''Initialize backend object *ssl_context* may be a `ssl.SSLContext` instance or *None*. @@ -60,16 +61,21 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): super().__init__() - (host, port, bucket_name, prefix) = self._parse_storage_url(storage_url, ssl_context) + if 'no-ssl' in options: + self.ssl_context = None + else: + self.ssl_context = get_ssl_context(options.get('ssl-ca-path', None)) + + (host, port, bucket_name, prefix) = self._parse_storage_url(storage_url, + self.ssl_context) + self.options = options self.bucket_name = bucket_name self.prefix = prefix self.hostname = host self.port = port - self.ssl_context = ssl_context - self.proxy = proxy + self.proxy = get_proxy(self.ssl_context is not None) self.conn = self._get_conn() - self.password = password self.login = login @@ -117,9 +123,13 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): conn = HTTPConnection(self.hostname, self.port, proxy=self.proxy, ssl_context=self.ssl_context) - conn.timeout = 30 + conn.timeout = self.options.get('tcp-timeout', 10) return conn + # This method is also used implicitly for the retry handling of + # `gs.Backend._get_access_token`. When modifying this method, do not forget + # to check if this makes it unsuitable for use by `_get_access_token` (in + # that case we will have to implement a custom retry logic there). @copy_ancestor_docstring def is_temp_failure(self, exc): #IGNORE:W0613 if isinstance(exc, (InternalError, BadDigestError, IncompleteBodyError, @@ -129,11 +139,15 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): elif is_temp_network_error(exc): return True - # In doubt, we retry on 5xx. However, there are some codes - # where retry is definitely not desired. - elif (isinstance(exc, HTTPError) - and exc.status >= 500 and exc.status <= 599 - and exc.status not in (501,505,508,510,511,523)): + # In doubt, we retry on 5xx (Server error). However, there are some + # codes where retry is definitely not desired. For 4xx (client error) we + # do not retry in general, but for 408 (Request Timeout) RFC 2616 + # specifies that the client may repeat the request without + # modifications. + elif (isinstance(exc, HTTPError) and + ((500 <= exc.status <= 599 + and exc.status not in (501,505,508,510,511,523)) + or exc.status == 408)): return True return False @@ -293,7 +307,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): return ObjectR(key, resp, self, meta) @prepend_ancestor_docstring - def open_write(self, key, metadata=None, is_compressed=False): + def open_write(self, key, metadata=None, is_compressed=False, extra_headers=None): """ The returned object will buffer all data and only start the upload when its `close` method is called. @@ -307,6 +321,8 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): raise TypeError('*metadata*: expected dict or None, got %s' % type(metadata)) headers = CaseInsensitiveDict() + if extra_headers is not None: + headers.update(extra_headers) self._add_meta_headers(headers, metadata) return ObjectW(key, self, headers) @@ -330,13 +346,15 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): @retry @copy_ancestor_docstring - def copy(self, src, dest, metadata=None): + def copy(self, src, dest, metadata=None, extra_headers=None): log.debug('copy(%s, %s): start', src, dest) if not (metadata is None or isinstance(metadata, dict)): raise TypeError('*metadata*: expected dict or None, got %s' % type(metadata)) headers = CaseInsensitiveDict() + if extra_headers is not None: + headers.update(extra_headers) headers[self.hdr_prefix + 'copy-source'] = \ '/%s/%s%s' % (self.bucket_name, self.prefix, src) @@ -376,8 +394,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): headers=None, body=None): '''Send request, read and return response object''' - log.debug('_do_request(): start with parameters (%r, %r, %r, %r, %r, %r)', - method, path, subres, query_string, headers, body) + log.debug('preparing %s %s?%s, qs=%s', method, path, subres, query_string) if headers is None: headers = CaseInsensitiveDict() @@ -689,7 +706,10 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): log.warning('MD5 mismatch in metadata for %s', key) raise BadDigestError('BadDigest', 'Meta MD5 for %s does not match' % key) - return pickle.loads(b64decode(buf), encoding='latin1') + try: + return safe_unpickle(b64decode(buf), encoding='latin1') + except pickle.UnpicklingError as exc: + raise CorruptedObjectError('Corrupted metadata, pickle says: %s' % exc) elif format_ == 'raw': # No MD5 available return meta else: diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py index a3fe613..5d1c84c 100644 --- a/src/s3ql/backends/swift.py +++ b/src/s3ql/backends/swift.py @@ -9,7 +9,8 @@ This program can be distributed under the terms of the GNU GPLv3. from ..logging import logging, QuietError # Ensure use of custom logger class from .. import BUFSIZE from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError, - DanglingStorageURLError, retry_generator) + DanglingStorageURLError, retry_generator, get_proxy, + get_ssl_context) from .s3c import HTTPError, ObjectR, ObjectW, md5sum_b64, BadDigestError from . import s3c from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring, @@ -40,30 +41,31 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): use_expect_100c = True hdr_prefix = 'X-Object-' + known_options = {'no-ssl', 'ssl-ca-path', 'tcp-timeout'} _add_meta_headers = s3c.Backend._add_meta_headers _extractmeta = s3c.Backend._extractmeta - def __init__(self, storage_url, login, password, ssl_context=None, proxy=None): - # Unused argument - #pylint: disable=W0613 - + def __init__(self, storage_url, login, password, options): super().__init__() - - (host, port, container_name, prefix) = self._parse_storage_url(storage_url, ssl_context) - - self.hostname = host - self.port = port - self.container_name = container_name - self.prefix = prefix - self.password = password - self.login = login + self.options = options + self.hostname = None + self.port = None + self.container_name = None + self.prefix = None self.auth_token = None self.auth_prefix = None self.conn = None - self.proxy = proxy - self.ssl_context = ssl_context + self.password = password + self.login = login + + if 'no-ssl' in options: + self.ssl_context = None + else: + self.ssl_context = get_ssl_context(options.get('ssl-ca-path', None)) + self._parse_storage_url(storage_url, self.ssl_context) + self.proxy = get_proxy(self.ssl_context is not None) self._container_exists() def __str__(self): @@ -86,12 +88,8 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): raise DanglingStorageURLError(self.container_name) raise - @staticmethod - def _parse_storage_url(storage_url, ssl_context): - '''Extract information from storage URL - - Return a tuple *(host, port, container_name, prefix)* . - ''' + def _parse_storage_url(self, storage_url, ssl_context): + '''Init instance variables from storage url''' hit = re.match(r'^[a-zA-Z0-9]+://' # Backend r'([^/:]+)' # Hostname @@ -112,14 +110,25 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): containername = hit.group(3) prefix = hit.group(4) or '' - return (hostname, port, containername, prefix) + self.hostname = hostname + self.port = port + self.container_name = containername + self.prefix = prefix @copy_ancestor_docstring def is_temp_failure(self, exc): #IGNORE:W0613 if isinstance(exc, AuthenticationExpired): return True - elif isinstance(exc, HTTPError) and exc.status >= 500 and exc.status <= 599: + # In doubt, we retry on 5xx (Server error). However, there are some + # codes where retry is definitely not desired. For 4xx (client error) we + # do not retry in general, but for 408 (Request Timeout) RFC 2616 + # specifies that the client may repeat the request without + # modifications. + elif (isinstance(exc, HTTPError) and + ((500 <= exc.status <= 599 + and exc.status not in (501,505,508,510,511,523)) + or exc.status == 408)): return True elif is_temp_network_error(exc): @@ -135,6 +144,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): conn = HTTPConnection(self.hostname, self.port, proxy=self.proxy, ssl_context=self.ssl_context) + conn.timeout = self.options.get('tcp-timeout', 10) headers = CaseInsensitiveDict() headers['X-Auth-User'] = self.login @@ -165,8 +175,10 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): self.auth_prefix = urllib.parse.unquote(o.path) conn.disconnect() - return HTTPConnection(o.hostname, o.port, proxy=self.proxy, + conn = HTTPConnection(o.hostname, o.port, proxy=self.proxy, ssl_context=self.ssl_context) + conn.timeout = self.options.get('tcp-timeout', 10) + return conn raise RuntimeError('No valid authentication path found') @@ -201,53 +213,9 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): elif subres: path += '?%s' % subres - # We can probably remove the assertions at some point and - # call self.conn.read_response() directly - def read_response(): - resp = self.conn.read_response() - assert resp.method == method - assert resp.path == path - return resp - headers['X-Auth-Token'] = self.auth_token try: - resp = None - log.debug('_send_request(): %s %s', method, path) - if body is None or isinstance(body, (bytes, bytearray, memoryview)): - self.conn.send_request(method, path, body=body, headers=headers) - else: - body_len = os.fstat(body.fileno()).st_size - self.conn.send_request(method, path, expect100=self.use_expect_100c, - headers=headers, body=BodyFollowing(body_len)) - - if self.use_expect_100c: - resp = read_response() - if resp.status == 100: - resp = None - # Otherwise fall through below - - try: - shutil.copyfileobj(body, self.conn, BUFSIZE) - except ConnectionClosed: - # Server closed connection while we were writing body data - - # but we may still be able to read an error response - try: - resp = read_response() - except ConnectionClosed: # No server response available - pass - else: - if resp.status < 400: - log.warning('Server broke connection during upload, but signaled ' - '%d %s', resp.status, resp.reason) - resp = None - - # Re-raise original error - if resp is None: - raise - - if resp is None: - resp = read_response() - + resp = self._do_request_inner(method, path, body=body, headers=headers) except Exception as exc: if is_temp_network_error(exc): # We probably can't use the connection anymore @@ -275,6 +243,48 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): else: raise HTTPError(resp.status, resp.reason, resp.headers) + # Including this code directly in _do_request would be very messy since + # we can't `return` the response early, thus the separate method + def _do_request_inner(self, method, path, body, headers): + '''The guts of the _do_request method''' + + log.debug('_do_request_inner(): %s %s', method, path) + + if body is None or isinstance(body, (bytes, bytearray, memoryview)): + self.conn.send_request(method, path, body=body, headers=headers) + else: + body_len = os.fstat(body.fileno()).st_size + self.conn.send_request(method, path, expect100=self.use_expect_100c, + headers=headers, body=BodyFollowing(body_len)) + + if self.use_expect_100c: + log.debug('waiting for 100-continue') + resp = self.conn.read_response() + if resp.status != 100: + return resp + + log.debug('writing body data') + try: + shutil.copyfileobj(body, self.conn, BUFSIZE) + except ConnectionClosed: + log.debug('interrupted write, server closed connection') + # Server closed connection while we were writing body data - + # but we may still be able to read an error response + try: + resp = self.conn.read_response() + except ConnectionClosed: # No server response available + log.debug('no response available in buffer') + pass + else: + if resp.status >= 400: # error response + return resp + log.warning('Server broke connection during upload, but signaled ' + '%d %s', resp.status, resp.reason) + + # Re-raise original error + raise + + return self.conn.read_response() def _assert_empty_response(self, resp): '''Assert that current response body is empty''' diff --git a/src/s3ql/backends/swiftks.py b/src/s3ql/backends/swiftks.py index 959499b..19a644b 100644 --- a/src/s3ql/backends/swiftks.py +++ b/src/s3ql/backends/swiftks.py @@ -21,31 +21,12 @@ log = logging.getLogger(__name__) class Backend(swift.Backend): - def __init__(self, storage_url, login, password, ssl_context=None, proxy=None): - # Unused argument - #pylint: disable=W0613 + def __init__(self, storage_url, login, password, options): + self.region = None + super().__init__(storage_url, login, password, options) - (host, port, region, - container_name, prefix) = self._parse_storage_url(storage_url, ssl_context) - - self.hostname = host - self.port = port - self.region = region - self.container_name = container_name - self.prefix = prefix - self.proxy = proxy - self.password = password - self.login = login - self.auth_token = None - self.auth_prefix = None - self.conn = None - self.ssl_context = ssl_context - - self._container_exists() - - @staticmethod @copy_ancestor_docstring - def _parse_storage_url(storage_url, ssl_context): + def _parse_storage_url(self, storage_url, ssl_context): hit = re.match(r'^[a-zA-Z0-9]+://' # Backend r'([^/:]+)' # Hostname @@ -68,7 +49,11 @@ class Backend(swift.Backend): containername = hit.group(4) prefix = hit.group(5) or '' - return (hostname, port, region, containername, prefix) + self.hostname = hostname + self.port = port + self.container_name = containername + self.prefix = prefix + self.region = region @retry def _get_conn(self): @@ -78,6 +63,7 @@ class Backend(swift.Backend): conn = HTTPConnection(self.hostname, port=self.port, proxy=self.proxy, ssl_context=self.ssl_context) + conn.timeout = self.options.get('tcp-timeout', 10) headers = CaseInsensitiveDict() headers['Content-Type'] = 'application/json' @@ -125,14 +111,16 @@ class Backend(swift.Backend): self.auth_prefix = urllib.parse.unquote(o.path) conn.disconnect() - return HTTPConnection(o.hostname, o.port, proxy=self.proxy, + conn = HTTPConnection(o.hostname, o.port, proxy=self.proxy, ssl_context=self.ssl_context) + conn.timeout = self.options.get('tcp-timeout', 10) + return conn if len(avail_regions) < 10: - raise DanglingStorageURLError( + raise DanglingStorageURLError(self.container_name, 'No accessible object storage service found in region %s' ' (available regions: %s)' % (self.region, ', '.join(avail_regions))) else: - raise DanglingStorageURLError( + raise DanglingStorageURLError(self.container_name, 'No accessible object storage service found in region %s' % self.region) |