diff options
author | Nikolaus Rath <Nikolaus@rath.org> | 2016-03-09 10:10:20 -0800 |
---|---|---|
committer | Nikolaus Rath <Nikolaus@rath.org> | 2016-03-09 10:10:20 -0800 |
commit | 57ba7d4c658aa7c5d2e0ca2ae71e3915e6052b17 (patch) | |
tree | 192904d2eaa4f63ec239b644c75797c6024b2e2a /src | |
parent | 061b768a9d87d125df6edb494df519447fab21c6 (diff) |
Import s3ql_2.14+dfsg.orig.tar.gz
Diffstat (limited to 'src')
-rw-r--r-- | src/s3ql.egg-info/PKG-INFO | 2 | ||||
-rw-r--r-- | src/s3ql.egg-info/SOURCES.txt | 2 | ||||
-rw-r--r-- | src/s3ql/__init__.py | 11 | ||||
-rw-r--r-- | src/s3ql/adm.py | 258 | ||||
-rw-r--r-- | src/s3ql/backends/common.py | 225 | ||||
-rw-r--r-- | src/s3ql/backends/comprenc.py | 324 | ||||
-rw-r--r-- | src/s3ql/backends/local.py | 26 | ||||
-rw-r--r-- | src/s3ql/backends/s3c.py | 45 | ||||
-rw-r--r-- | src/s3ql/backends/swift.py | 21 | ||||
-rw-r--r-- | src/s3ql/backends/swiftks.py | 4 | ||||
-rw-r--r-- | src/s3ql/common.py | 215 | ||||
-rw-r--r-- | src/s3ql/cp.py | 7 | ||||
-rw-r--r-- | src/s3ql/ctrl.py | 18 | ||||
-rw-r--r-- | src/s3ql/fs.py | 50 | ||||
-rw-r--r-- | src/s3ql/fsck.py | 103 | ||||
-rw-r--r-- | src/s3ql/lock.py | 4 | ||||
-rw-r--r-- | src/s3ql/metadata.py | 76 | ||||
-rw-r--r-- | src/s3ql/mkfs.py | 31 | ||||
-rw-r--r-- | src/s3ql/mount.py | 65 | ||||
-rw-r--r-- | src/s3ql/remove.py | 9 | ||||
-rw-r--r-- | src/s3ql/umount.py | 5 | ||||
-rw-r--r-- | src/s3ql/verify.py | 68 |
22 files changed, 534 insertions, 1035 deletions
diff --git a/src/s3ql.egg-info/PKG-INFO b/src/s3ql.egg-info/PKG-INFO index 0a4bac9..24517cd 100644 --- a/src/s3ql.egg-info/PKG-INFO +++ b/src/s3ql.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.1 Name: s3ql -Version: 2.13 +Version: 2.14 Summary: a full-featured file system for online data storage Home-page: https://bitbucket.org/nikratio/s3ql/ Author: Nikolaus Rath diff --git a/src/s3ql.egg-info/SOURCES.txt b/src/s3ql.egg-info/SOURCES.txt index 059d67d..dc1fed6 100644 --- a/src/s3ql.egg-info/SOURCES.txt +++ b/src/s3ql.egg-info/SOURCES.txt @@ -25,7 +25,6 @@ contrib/pcp.1 contrib/pcp.py contrib/remove_objects.py contrib/s3ql_backup.sh -contrib/s3ql_upstart.conf contrib/scramble_db.py doc/manual.pdf doc/html/.buildinfo @@ -233,7 +232,6 @@ tests/pytest.ini tests/t1_backends.py tests/t1_dump.py tests/t1_retry.py -tests/t1_safe_unpickle.py tests/t1_serialization.py tests/t2_block_cache.py tests/t3_fs_api.py diff --git a/src/s3ql/__init__.py b/src/s3ql/__init__.py index cccf83f..2521b85 100644 --- a/src/s3ql/__init__.py +++ b/src/s3ql/__init__.py @@ -19,20 +19,19 @@ __all__ = [ 'adm', 'backends', 'block_cache', 'common', 'calc_mro', 'fsck', 'inherit_docstrings', 'inode_cache', 'lock', 'logging', 'metadata', 'mkfs', 'mount', 'parse_args', 'remove', 'statfs', 'umount', 'VERSION', 'CURRENT_FS_REV', - 'REV_VER_MAP', 'RELEASE', 'BUFSIZE', 'PICKLE_PROTOCOL', + 'REV_VER_MAP', 'RELEASE', 'BUFSIZE', 'CTRL_NAME', 'CTRL_INODE' ] -VERSION = '2.13' +VERSION = '2.14' RELEASE = '%s' % VERSION +# TODO: On next upgrade, remove pickle support from +# common.py:load_params(). CURRENT_FS_REV = 22 # Buffer size when writing objects BUFSIZE = 64 * 1024 -# Pickle protocol version to use. -PICKLE_PROTOCOL = 2 - # Name and inode of the special s3ql control file CTRL_NAME = '.__s3ql__ctrl__' CTRL_INODE = 2 @@ -40,7 +39,7 @@ CTRL_INODE = 2 # Maps file system revisions to the last S3QL version that # supported this revision. REV_VER_MAP = { - 21: '2.12', + 21: '2.13', 20: '2.9', 16: '1.15', 15: '1.10', diff --git a/src/s3ql/adm.py b/src/s3ql/adm.py index 27a5dd8..6901b20 100644 --- a/src/s3ql/adm.py +++ b/src/s3ql/adm.py @@ -7,24 +7,18 @@ This program can be distributed under the terms of the GNU GPLv3. ''' from .logging import logging, QuietError, setup_logging -from . import CURRENT_FS_REV, REV_VER_MAP, PICKLE_PROTOCOL, BUFSIZE +from . import CURRENT_FS_REV, REV_VER_MAP from .backends.comprenc import ComprencBackend from .database import Connection -from .common import (get_backend_cachedir, get_seq_no, stream_write_bz2, - stream_read_bz2, is_mounted, AsyncFn, get_backend, - get_backend_factory, pretty_print_size, split_by_n, - handle_on_return) -from .metadata import restore_metadata, cycle_metadata, dump_metadata +from .common import (get_backend_cachedir, get_seq_no, is_mounted, get_backend, + freeze_basic_mapping, load_params) +from .metadata import dump_and_upload_metadata, download_metadata from .parse_args import ArgumentParser from datetime import datetime as Datetime from getpass import getpass -from base64 import b64encode -from queue import Queue, Full as QueueFull import os -import pickle import shutil import sys -import tempfile import textwrap import time @@ -87,20 +81,20 @@ def main(args=None): raise QuietError('Can not work on mounted file system.') if options.action == 'clear': - with get_backend(options, plain=True) as backend: + with get_backend(options, raw=True) as backend: return clear(backend, options) - if options.action == 'upgrade': - return upgrade(options) - with get_backend(options) as backend: - if options.action == 'passphrase': + if options.action == 'upgrade': + return upgrade(backend, get_backend_cachedir(options.storage_url, + options.cachedir)) + elif options.action == 'passphrase': return change_passphrase(backend) - if options.action == 'download-metadata': - return download_metadata(backend, options.storage_url) + elif options.action == 'download-metadata': + return download_metadata_cmd(backend, options.storage_url) -def download_metadata(backend, storage_url): +def download_metadata_cmd(backend, storage_url): '''Download old metadata backups''' backups = sorted(backend.list('s3ql_metadata')) @@ -139,25 +133,14 @@ def download_metadata(backend, storage_url): raise QuietError('%s already exists, aborting.' % cachepath + i) param = backend.lookup(name) - with tempfile.TemporaryFile() as tmpfh: - def do_read(fh): - tmpfh.seek(0) - tmpfh.truncate() - stream_read_bz2(fh, tmpfh) - - log.info('Downloading and decompressing %s...', name) - backend.perform_read(do_read, name) - - log.info("Reading metadata...") - tmpfh.seek(0) - restore_metadata(tmpfh, cachepath + '.db') + download_metadata(backend, cachepath + ".db", name) # Raise sequence number so that fsck.s3ql actually uses the # downloaded backup seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in backend.list('s3ql_seq_no_') ] param['seq_no'] = max(seq_nos) + 1 with open(cachepath + '.params', 'wb') as fh: - pickle.dump(param, fh, PICKLE_PROTOCOL) + fh.write(freeze_basic_mapping(param)) def change_passphrase(backend): '''Change file system passphrase''' @@ -221,23 +204,16 @@ def get_old_rev_msg(rev, prog): ''' % { 'version': REV_VER_MAP[rev], 'prog': prog }) - -@handle_on_return -def upgrade(options, on_return): +def upgrade(backend, cachepath): '''Upgrade file system to newest revision''' log.info('Getting file system parameters..') - cachepath = get_backend_cachedir(options.storage_url, options.cachedir) - - backend_factory = get_backend_factory(options) - backend = on_return.enter_context(backend_factory()) # Check for cached metadata db = None seq_no = get_seq_no(backend) if os.path.exists(cachepath + '.params'): - with open(cachepath + '.params', 'rb') as fh: - param = pickle.load(fh) + param = load_params(cachepath + '.params') if param['seq_no'] < seq_no: log.info('Ignoring locally cached metadata (outdated).') param = backend.lookup('s3ql_metadata') @@ -271,7 +247,10 @@ def upgrade(options, on_return): raise QuietError() # Check revision - if param['revision'] < CURRENT_FS_REV-1: + # Upgrade from 21 to 22 is only possible with release 2.13, + # because we removed support for reading the old storage object + # format after 2.13. + if param['revision'] == 21 or param['revision'] < CURRENT_FS_REV-1: print(textwrap.dedent(''' File system revision too old to upgrade! @@ -305,18 +284,7 @@ def upgrade(options, on_return): if not db: # Need to download metadata - with tempfile.TemporaryFile() as tmpfh: - def do_read(fh): - tmpfh.seek(0) - tmpfh.truncate() - stream_read_bz2(fh, tmpfh) - - log.info("Downloading & uncompressing metadata...") - backend.perform_read(do_read, "s3ql_metadata") - - log.info("Reading metadata...") - tmpfh.seek(0) - db = restore_metadata(tmpfh, cachepath + '.db') + db = download_metadata(backend, cachepath + '.db') log.info('Upgrading from revision %d to %d...', param['revision'], CURRENT_FS_REV) @@ -324,41 +292,14 @@ def upgrade(options, on_return): param['last-modified'] = time.time() param['seq_no'] += 1 - # Ensure that there are backups of the master key - if backend.passphrase is not None: - data_pw = backend.passphrase - backend.passphrase = backend.fs_passphrase - for i in range(1,4): - obj_id = 's3ql_passphrase_bak%d' % i - if obj_id not in backend: - backend[obj_id] = data_pw - backend.passphrase = data_pw - - # Upgrade all objects, so that we can remove legacy conversion - # routines in the next release. - update_obj_metadata(backend, backend_factory, db, options.threads) - - log.info('Dumping metadata...') - with tempfile.TemporaryFile() as fh: - dump_metadata(db, fh) - def do_write(obj_fh): - fh.seek(0) - stream_write_bz2(fh, obj_fh) - return obj_fh - - log.info("Compressing and uploading metadata...") - backend.store('s3ql_seq_no_%d' % param['seq_no'], b'Empty') - obj_fh = backend.perform_write(do_write, "s3ql_metadata_new", metadata=param, - is_compressed=True) - - log.info('Wrote %s of compressed metadata.', pretty_print_size(obj_fh.get_obj_size())) - log.info('Cycling metadata backups...') - cycle_metadata(backend) + # Upgrade code goes here + + dump_and_upload_metadata(backend, db, param) backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty' with open(cachepath + '.params', 'wb') as fh: - pickle.dump(param, fh, PICKLE_PROTOCOL) + fh.write(freeze_basic_mapping(param)) log.info('Cleaning up local metadata...') db.execute('ANALYZE') @@ -366,154 +307,5 @@ def upgrade(options, on_return): print('File system upgrade complete.') - if backend.passphrase is not None: - print('Please store the following master key in a safe location. It allows ', - 'decryption of the S3QL file system in case the storage objects holding ', - 'this information get corrupted:', - '---BEGIN MASTER KEY---', - ' '.join(split_by_n(b64encode(backend.passphrase).decode(), 4)), - '---END MASTER KEY---', - sep='\n') - -@handle_on_return -def update_obj_metadata(backend, backend_factory, db, - thread_count, on_return): - '''Upgrade metadata of storage objects''' - - plain_backend = backend.backend - - # No need to update sequence number, since we are going to - # write out a new one after the upgrade. - if backend.passphrase is None: - extra_objects = { 's3ql_metadata' } - else: - extra_objects = { 's3ql_metadata', - 's3ql_passphrase', 's3ql_passphrase_bak1', - 's3ql_passphrase_bak2', 's3ql_passphrase_bak3' } - - for i in range(30): - obj_id = 's3ql_metadata_bak_%d' % i - if obj_id in plain_backend: - extra_objects.add(obj_id) - - def yield_objects(): - for (id_,) in db.query('SELECT id FROM objects'): - yield 's3ql_data_%d' % id_ - for obj_id in extra_objects: - yield obj_id - total = db.get_val('SELECT COUNT(id) FROM objects') + len(extra_objects) - - queue = Queue(maxsize=thread_count) - threads = [] - for _ in range(thread_count): - t = AsyncFn(upgrade_loop, queue, on_return.push(backend_factory())) - # Don't wait for worker threads, gives deadlock if main thread - # terminates with exception - t.daemon = True - t.start() - threads.append(t) - - # Updating this value is prone to race conditions. However, - # we don't care because this is for an approximate progress - # output only. - queue.rewrote_size = 0 - stamp = 0 - for (i, obj_id) in enumerate(yield_objects()): - stamp2 = time.time() - if stamp2 - stamp > 1: - sys.stdout.write('\r..processed %d/%d objects (%.1f%%, %s rewritten)..' - % (i, total, i/total*100, - pretty_print_size(queue.rewrote_size))) - sys.stdout.flush() - stamp = stamp2 - - # Terminate early if any thread failed with an exception - for t in threads: - if not t.is_alive(): - t.join_and_raise() - - # Avoid blocking if all threads terminated - while True: - try: - queue.put(obj_id, timeout=1) - except QueueFull: - pass - else: - break - for t in threads: - if not t.is_alive(): - t.join_and_raise() - - queue.maxsize += len(threads) - for t in threads: - queue.put(None) - - for t in threads: - t.join_and_raise() - - sys.stdout.write('\n') - -def upgrade_loop(queue, backend): - plain_backend = backend.backend - - while True: - obj_id = queue.get() - if obj_id is None: - break - - meta = plain_backend.lookup(obj_id) - if meta.get('format_version', 0) == 2: - continue - - # For important objects, we make a copy first (just to be safe) - if not obj_id.startswith('s3ql_data'): - plain_backend.copy(obj_id, 's3ql_pre2.13' + obj_id[4:]) - - # When reading passphrase objects, we have to use the - # "outer" password - if obj_id.startswith('s3ql_passphrase'): - data_pw = backend.passphrase - backend.passphrase = backend.fs_passphrase - - meta = backend._convert_legacy_metadata(meta) - if meta['encryption'] == 'AES': - # Two statements to reduce likelihood of update races - size = rewrite_legacy_object(backend, obj_id) - queue.rewrote_size += size - else: - plain_backend.update_meta(obj_id, meta) - - if obj_id.startswith('s3ql_passphrase'): - backend.passphrase = data_pw - -def rewrite_legacy_object(backend, obj_id): - with tempfile.TemporaryFile() as tmpfh: - - # Read object - def do_read(fh): - tmpfh.seek(0) - tmpfh.truncate() - while True: - buf = fh.read(BUFSIZE) - if not buf: - break - tmpfh.write(buf) - return fh.metadata - - meta = backend.perform_read(do_read, obj_id) - - # Write object - def do_write(fh): - tmpfh.seek(0) - while True: - buf = tmpfh.read(BUFSIZE) - if not buf: - break - fh.write(buf) - return fh - out_fh = backend.perform_write(do_write, obj_id, meta) - - return out_fh.get_obj_size() - if __name__ == '__main__': main(sys.argv[1:]) diff --git a/src/s3ql/backends/common.py b/src/s3ql/backends/common.py index 7d0b261..cfcc4f8 100644 --- a/src/s3ql/backends/common.py +++ b/src/s3ql/backends/common.py @@ -10,37 +10,90 @@ from ..logging import logging, QuietError, LOG_ONCE # Ensure use of custom logge from abc import abstractmethod, ABCMeta from functools import wraps import time -import codecs -import io import textwrap import hashlib import struct import hmac import inspect -from base64 import b64decode, b64encode -import binascii -from ast import literal_eval import ssl import os import re -import pickletools -import pickle +import threading log = logging.getLogger(__name__) +class RateTracker: + ''' + Maintain an average occurence rate for events over a configurable time + window. The rate is computed with one second resolution. + ''' + + def __init__(self, window_length): + if not isinstance(window_length, int): + raise ValueError('only integer window lengths are supported') + + self.buckets = [0] * window_length + self.window_length = window_length + self.last_update = int(time.monotonic()) + self.lock = threading.Lock() + + def register(self, _not_really=False): + '''Register occurence of an event. + + The keyword argument is for class-internal use only. + ''' + + buckets = self.buckets + bucket_count = len(self.buckets) + now = int(time.monotonic()) + + elapsed = min(now - self.last_update, bucket_count) + for i in range(elapsed): + buckets[(now - i) % bucket_count] = 0 + + if _not_really: + return + + with self.lock: + buckets[now % bucket_count] += 1 + self.last_update = now + + def get_rate(self): + '''Return average rate of event occurance''' + + self.register(_not_really=True) + return sum(self.buckets) / len(self.buckets) + + def get_count(self): + '''Return total number of events in window''' + + self.register(_not_really=True) + return sum(self.buckets) + + +# We maintain a (global) running average of temporary errors, so +# that we can log a warning if this number becomes large. We +# use a relatively large window to prevent bogus spikes if +# multiple threads all have to retry after a long period of +# inactivity. RETRY_TIMEOUT = 60 * 60 * 24 -def retry(method): +def retry(method, _tracker=RateTracker(60)): '''Wrap *method* for retrying on some exceptions If *method* raises an exception for which the instance's - `is_temp_failure(exc)` method is true, the *method* is called again - at increasing intervals. If this persists for more than `RETRY_TIMEOUT` - seconds, the most-recently caught exception is re-raised. + `is_temp_failure(exc)` method is true, the *method* is called again at + increasing intervals. If this persists for more than `RETRY_TIMEOUT` + seconds, the most-recently caught exception is re-raised. If the + method defines a keyword parameter *is_retry*, then this parameter + will be set to True whenever the function is retried. ''' if inspect.isgeneratorfunction(method): raise TypeError('Wrapping a generator function is pointless') + sig = inspect.signature(method) + has_is_retry = 'is_retry' in sig.parameters + @wraps(method) def wrapped(*a, **kw): self = a[0] @@ -48,6 +101,8 @@ def retry(method): waited = 0 retries = 0 while True: + if has_is_retry: + kw['is_retry'] = (retries > 0) try: return method(*a, **kw) except Exception as exc: @@ -55,6 +110,16 @@ def retry(method): #pylint: disable=W0212 if not self.is_temp_failure(exc): raise + + _tracker.register() + rate = _tracker.get_rate() + if rate > 5: + log.warning('Had to retry %d times over the last %d seconds, ' + 'server or network problem?', + rate * _tracker.window_length, _tracker.window_length) + else: + log.debug('Average retry rate: %.2f Hz', rate) + if waited > RETRY_TIMEOUT: log.error('%s.%s(*): Timeout exceeded, re-raising %r exception', self.__class__.__name__, method.__name__, exc) @@ -68,8 +133,9 @@ def retry(method): else: log_fn = log.warning - log_fn('Encountered %s exception (%s), retrying call to %s.%s for the %d-th time...', - type(exc).__name__, exc, self.__class__.__name__, method.__name__, retries) + log_fn('Encountered %s (%s), retrying %s.%s (attempt %d)...', + type(exc).__name__, exc, self.__class__.__name__, method.__name__, + retries) if hasattr(exc, 'retry_after') and exc.retry_after: log.debug('retry_after is %.2f seconds', exc.retry_after) @@ -436,7 +502,8 @@ class AbstractBackend(object, metaclass=ABCMeta): def update_meta(self, key, metadata): """Replace metadata of *key* with *metadata* - Metadata must be `dict` instance and pickle-able. + `metadata` must be a mapping with keys of type `str`, and values of an + elementary type (`str`, `bytes`, `int`, `float` or `bool`). """ pass @@ -583,72 +650,6 @@ def get_proxy(ssl): return proxy - -class ThawError(Exception): - def __str__(self): - return 'Malformed serialization data' - -def thaw_basic_mapping(buf): - '''Reconstruct dict from serialized representation - - *buf* must be a bytes-like object as created by - `freeze_basic_mapping`. Raises `ThawError` if *buf* is not a valid - representation. - - This procedure is safe even if *buf* comes from an untrusted source. - ''' - - try: - d = literal_eval(buf.decode('utf-8')) - except (UnicodeDecodeError, SyntaxError, ValueError): - raise ThawError() - - # Decode bytes values - for (k,v) in d.items(): - if not isinstance(v, bytes): - continue - try: - d[k] = b64decode(v) - except binascii.Error: - raise ThawError() - - return d - -def freeze_basic_mapping(d): - '''Serialize mapping of elementary types - - Keys of *d* must be strings. Values of *d* must be of elementary type (i.e., - `str`, `bytes`, `int`, `float`, `complex`, `bool` or None). - - The output is a bytestream that can be used to reconstruct the mapping. The - bytestream is not guaranteed to be deterministic. Look at - `checksum_basic_mapping` if you need a deterministic bytestream. - ''' - - els = [] - for (k,v) in d.items(): - if not isinstance(k, str): - raise ValueError('key %s must be str, not %s' % (k, type(k))) - - if (not isinstance(v, (str, bytes, bytearray, int, float, complex, bool)) - and v is not None): - raise ValueError('value for key %s (%s) is not elementary' % (k, v)) - - # To avoid wasting space, we b64encode non-ascii byte values. - if isinstance(v, (bytes, bytearray)): - v = b64encode(v) - - # This should be a pretty safe assumption for elementary types, but we - # add an assert just to be safe (Python docs just say that repr makes - # "best effort" to produce something parseable) - (k_repr, v_repr) = (repr(k), repr(v)) - assert (literal_eval(k_repr), literal_eval(v_repr)) == (k, v) - - els.append(('%s: %s' % (k_repr, v_repr))) - - buf = '{ %s }' % ', '.join(els) - return buf.encode('utf-8') - def checksum_basic_mapping(metadata, key=None): '''Compute checksum for mapping of elementary types @@ -695,67 +696,3 @@ def checksum_basic_mapping(metadata, key=None): chk.update(val) return chk.digest() - - -SAFE_UNPICKLE_OPCODES = {'BININT', 'BININT1', 'BININT2', 'LONG1', 'LONG4', - 'BINSTRING', 'SHORT_BINSTRING', 'GLOBAL', - 'NONE', 'NEWTRUE', 'NEWFALSE', 'BINUNICODE', - 'BINFLOAT', 'EMPTY_LIST', 'APPEND', 'APPENDS', - 'LIST', 'EMPTY_TUPLE', 'TUPLE', 'TUPLE1', 'TUPLE2', - 'TUPLE3', 'EMPTY_DICT', 'DICT', 'SETITEM', - 'SETITEMS', 'POP', 'DUP', 'MARK', 'POP_MARK', - 'BINGET', 'LONG_BINGET', 'BINPUT', 'LONG_BINPUT', - 'PROTO', 'STOP', 'REDUCE'} - -SAFE_UNPICKLE_GLOBAL_NAMES = { ('__builtin__', 'bytearray'), - ('__builtin__', 'set'), - ('__builtin__', 'frozenset'), - ('_codecs', 'encode') } -SAFE_UNPICKLE_GLOBAL_OBJS = { bytearray, set, frozenset, codecs.encode } - -class SafeUnpickler(pickle.Unpickler): - def find_class(self, module, name): - if (module, name) not in SAFE_UNPICKLE_GLOBAL_NAMES: - raise pickle.UnpicklingError("global '%s.%s' is unsafe" % - (module, name)) - ret = super().find_class(module, name) - if ret not in SAFE_UNPICKLE_GLOBAL_OBJS: - raise pickle.UnpicklingError("global '%s.%s' is unsafe" % - (module, name)) - return ret - - -def safe_unpickle_fh(fh, fix_imports=True, encoding="ASCII", - errors="strict"): - '''Safely unpickle untrusted data from *fh* - - *fh* must be seekable. - ''' - - if not fh.seekable(): - raise TypeError('*fh* must be seekable') - pos = fh.tell() - - # First make sure that we know all used opcodes - try: - for (opcode, arg, _) in pickletools.genops(fh): - if opcode.proto > 2 or opcode.name not in SAFE_UNPICKLE_OPCODES: - raise pickle.UnpicklingError('opcode %s is unsafe' % opcode.name) - except (ValueError, EOFError): - raise pickle.UnpicklingError('corrupted data') - - fh.seek(pos) - - # Then use a custom Unpickler to ensure that we only give access to - # specific, whitelisted globals. Note that with the above opcodes, there is - # no way to trigger attribute access, so "brachiating" from a white listed - # object to __builtins__ is not possible. - return SafeUnpickler(fh, fix_imports=fix_imports, - encoding=encoding, errors=errors).load() - -def safe_unpickle(buf, fix_imports=True, encoding="ASCII", - errors="strict"): - '''Safely unpickle untrusted data in *buf*''' - - return safe_unpickle_fh(io.BytesIO(buf), fix_imports=fix_imports, - encoding=encoding, errors=errors) diff --git a/src/s3ql/backends/comprenc.py b/src/s3ql/backends/comprenc.py index bade392..b3ffc1b 100644 --- a/src/s3ql/backends/comprenc.py +++ b/src/s3ql/backends/comprenc.py @@ -7,21 +7,17 @@ This program can be distributed under the terms of the GNU GPLv3. ''' from ..logging import logging # Ensure use of custom logger class -from .. import BUFSIZE, PICKLE_PROTOCOL -from .common import (AbstractBackend, CorruptedObjectError, safe_unpickle, - freeze_basic_mapping, thaw_basic_mapping, ThawError, - checksum_basic_mapping) +from .. import BUFSIZE +from .common import AbstractBackend, CorruptedObjectError, checksum_basic_mapping +from ..common import ThawError, freeze_basic_mapping, thaw_basic_mapping from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring, ABCDocstMeta) from Crypto.Cipher import AES from Crypto.Util import Counter -from base64 import b64decode, b64encode -import binascii import bz2 import hashlib import hmac import lzma -import pickle import io import struct import time @@ -71,8 +67,6 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): @copy_ancestor_docstring def lookup(self, key): meta_raw = self.backend.lookup(key) - if meta_raw.get('format_version', 0) < 2: - meta_raw = self._convert_legacy_metadata(meta_raw) return self._verify_meta(key, meta_raw)[1] @prepend_ancestor_docstring @@ -153,32 +147,12 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): """ fh = self.backend.open_read(key) - checksum_warning = False try: - if fh.metadata.get('format_version', 0) < 2: - meta_raw = self._convert_legacy_metadata(fh.metadata) - else: - meta_raw = fh.metadata - - # Also checks if this is a BetterBucket storage object + meta_raw = fh.metadata (nonce, meta) = self._verify_meta(key, meta_raw) if nonce: data_key = sha256(self.passphrase + nonce) - compr_alg = meta_raw['compression'] - encr_alg = meta_raw['encryption'] - - if compr_alg == 'BZIP2': - decompressor = bz2.BZ2Decompressor() - elif compr_alg == 'LZMA': - decompressor = lzma.LZMADecompressor() - elif compr_alg == 'ZLIB': - decompressor = zlib.decompressobj() - elif compr_alg == 'None': - decompressor = None - else: - raise RuntimeError('Unsupported compression: %s' % compr_alg) - # The `payload_offset` key only exists if the storage object was # created with on old S3QL version. In order to avoid having to # download and re-upload the entire object during the upgrade, the @@ -189,24 +163,27 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): while to_skip: to_skip -= len(fh.read(to_skip)) - # If we've come this far, we want to emit a warning if the object - # has not been read completely on close(). - checksum_warning = True - - if encr_alg == 'AES': - fh = LegacyDecryptDecompressFilter(fh, data_key, decompressor) - decompressor = None - elif encr_alg == 'AES_v2': + encr_alg = meta_raw['encryption'] + if encr_alg == 'AES_v2': fh = DecryptFilter(fh, data_key) elif encr_alg != 'None': raise RuntimeError('Unsupported encryption: %s' % encr_alg) - if decompressor: - fh = DecompressFilter(fh, decompressor) + compr_alg = meta_raw['compression'] + if compr_alg == 'BZIP2': + fh = DecompressFilter(fh, bz2.BZ2Decompressor()) + elif compr_alg == 'LZMA': + fh = DecompressFilter(fh, lzma.LZMADecompressor()) + elif compr_alg == 'ZLIB': + fh = DecompressFilter(fh,zlib.decompressobj()) + elif compr_alg != 'None': + raise RuntimeError('Unsupported compression: %s' % compr_alg) fh.metadata = meta except: - fh.close(checksum_warning=checksum_warning) + # Don't emit checksum warning, caller hasn't even + # started reading anything. + fh.close(checksum_warning=False) raise return fh @@ -298,8 +275,6 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): def _copy_or_rename(self, src, dest, rename, metadata=None): meta_raw = self.backend.lookup(src) - if meta_raw.get('format_version', 0) < 2: - meta_raw = self._convert_legacy_metadata(meta_raw) (nonce, meta_old) = self._verify_meta(src, meta_raw) if nonce: @@ -328,182 +303,6 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta): def close(self): self.backend.close() - def _convert_legacy_metadata(self, meta): - '''Convert metadata to newest format - - This method ensures that we can read objects written - by older S3QL versions. - ''' - - format_version = meta.get('format_version', 0) - assert format_version in (0,1) - if format_version == 0: - meta = self._convert_legacy_metadata0(meta) - return self._convert_legacy_metadata1(meta) - - def _convert_legacy_metadata0(self, meta, - LEN_BYTES = struct.calcsize(b'<B'), - TIME_BYTES = struct.calcsize(b'<f')): - meta_new = dict(format_version=1) - - if ('encryption' in meta and - 'compression' in meta): - meta_new['encryption'] = meta['encryption'] - meta_new['compression'] = meta['compression'] - - elif 'encrypted' in meta: - s = meta['encrypted'] - if s == 'True': - meta_new['encryption'] = 'AES' - meta_new['compression'] = 'BZIP2' - - elif s == 'False': - meta_new['encryption'] = 'None' - meta_new['compression'] = 'None' - - elif s.startswith('AES/'): - meta_new['encryption'] = 'AES' - meta_new['compression'] = s[4:] - - elif s.startswith('PLAIN/'): - meta_new['encryption'] = 'None' - meta_new['compression'] = s[6:] - else: - raise RuntimeError('Unsupported encryption') - - if meta_new['compression'] == 'BZ2': - meta_new['compression'] = 'BZIP2' - - if meta_new['compression'] == 'NONE': - meta_new['compression'] = 'None' - else: - meta_new['encryption'] = 'None' - meta_new['compression'] = 'None' - - # Extract metadata (pre 2.x versions use multiple headers) - if any(k.startswith('meta') for k in meta): - parts = [ meta[k] for k in sorted(meta.keys()) - if k.startswith('meta') ] - meta_new['data'] = ''.join(parts) - else: - try: - meta_new['data'] = meta['data'] - except KeyError: - raise CorruptedObjectError('meta key data is missing') - - if not self.passphrase: - return meta_new - - meta_buf = b64decode(meta_new['data']) - off = 0 - def read(len_): - nonlocal off - tmp = meta_buf[off:off+len_] - off += len_ - return tmp - - len_ = struct.unpack(b'<B', read(LEN_BYTES))[0] - nonce = read(len_) - key = sha256(self.passphrase + nonce) - cipher = aes_cipher(key) - hmac_ = hmac.new(key, digestmod=hashlib.sha256) - hash_ = read(HMAC_SIZE) - meta_buf = meta_buf[off:] - meta_buf_plain = cipher.decrypt(meta_buf) - hmac_.update(meta_buf_plain) - hash_ = cipher.decrypt(hash_) - - if not hmac.compare_digest(hash_, hmac_.digest()): - raise CorruptedObjectError('HMAC mismatch') - - obj_id = nonce[TIME_BYTES:].decode('utf-8') - meta_key = sha256(self.passphrase + nonce + b'meta') - meta_new['nonce'] = b64encode(nonce) - meta_new['payload_offset'] = LEN_BYTES + len(nonce) - meta_new['data'] = b64encode(aes_cipher(meta_key).encrypt(meta_buf_plain)) - meta_new['object_id'] = b64encode(obj_id.encode('utf-8')) - meta_new['signature'] = calc_legacy_meta_checksum(meta_new, meta_key) - - return meta_new - - def _convert_legacy_metadata1(self, metadata): - if not isinstance(metadata, dict): - raise CorruptedObjectError('metadata should be dict, not %s' % type(metadata)) - - for mkey in ('encryption', 'compression', 'data'): - if mkey not in metadata: - raise CorruptedObjectError('meta key %s is missing' % mkey) - - encr_alg = metadata['encryption'] - encrypted = (encr_alg != 'None') - - if encrypted and self.passphrase is None: - raise CorruptedObjectError('Encrypted object and no passphrase supplied') - - elif not encrypted and self.passphrase is not None: - raise ObjectNotEncrypted() - - try: - meta_buf = b64decode(metadata['data']) - except binascii.Error: - raise CorruptedObjectError('Invalid metadata, b64decode failed') - - if not encrypted: - try: - meta2 = safe_unpickle(meta_buf, encoding='latin1') - except pickle.UnpicklingError as exc: - raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc) - if meta2 is None: - meta2 = dict() - metadata['data'] = freeze_basic_mapping(meta2) - metadata['format_version'] = 2 - return metadata - - # Encrypted - for mkey in ('nonce', 'signature', 'object_id'): - if mkey not in metadata: - raise CorruptedObjectError('meta key %s is missing' % mkey) - - nonce = b64decode(metadata['nonce']) - meta_key = sha256(self.passphrase + nonce + b'meta') - meta_sig = calc_legacy_meta_checksum(metadata, meta_key) - if not hmac.compare_digest(metadata['signature'], meta_sig): - raise CorruptedObjectError('HMAC mismatch') - - buf = aes_cipher(meta_key).decrypt(meta_buf) - try: - meta2 = safe_unpickle(buf, encoding='latin1') - except pickle.UnpicklingError as exc: - raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc) - if meta2 is None: - meta2 = dict() - - meta_buf = freeze_basic_mapping(meta2) - metadata['nonce'] = nonce - metadata['object_id'] = b64decode(metadata['object_id']).decode('utf-8') - metadata['data'] = aes_cipher(meta_key).encrypt(meta_buf) - metadata['format_version'] = 2 - metadata['signature'] = checksum_basic_mapping(metadata, meta_key) - - return metadata - -def calc_legacy_meta_checksum(metadata, key): - # This works most of the time, so we still try to validate the - # signature. But in general, the pickle output is not unique so this is - # not a good way to compute a checksum. - chk = hmac.new(key, digestmod=hashlib.sha256) - for mkey in sorted(metadata.keys()): - assert isinstance(mkey, str) - if mkey == 'signature': - continue - val = metadata[mkey] - if isinstance(val, str): - val = val.encode('utf-8') - elif not isinstance(val, (bytes, bytearray)): - val = pickle.dumps(val, PICKLE_PROTOCOL) - chk.update(mkey.encode('utf-8') + val) - return b64encode(chk.digest()) - class CompressFilter(object): '''Compress data while writing''' @@ -842,93 +641,6 @@ class DecryptFilter(InputFilter): self.close() return False -class LegacyDecryptDecompressFilter(io.RawIOBase): - '''Decrypt and Decompress data while reading - - Reader has to read the entire stream in order for HMAC - checking to work. - ''' - - def __init__(self, fh, key, decomp): - '''Initialize - - *fh* should be a file-like object and may be unbuffered. - ''' - super().__init__() - - self.fh = fh - self.decomp = decomp - self.hmac_checked = False - self.cipher = aes_cipher(key) - self.hmac = hmac.new(key, digestmod=hashlib.sha256) - self.hash = fh.read(HMAC_SIZE) - - def discard_input(self): - while True: - buf = self.fh.read(BUFSIZE) - if not buf: - break - - def _decrypt(self, buf): - # Work around https://bugs.launchpad.net/pycrypto/+bug/1256172 - # cipher.decrypt refuses to work with anything but bytes - if not isinstance(buf, bytes): - buf = bytes(buf) - - len_ = len(buf) - buf = self.cipher.decrypt(buf) - assert len(buf) == len_ - return buf - - def read(self, size=-1): - '''Read up to *size* bytes - - This method is currently buggy and may also return *more* - than *size* bytes. Callers should be prepared to handle - that. This is because some of the used (de)compression modules - don't support output limiting. - ''' - - if size == -1: - return self.readall() - elif size == 0: - return b'' - - buf = None - while not buf: - buf = self.fh.read(size) - if not buf and not self.hmac_checked: - if not hmac.compare_digest(self._decrypt(self.hash), - self.hmac.digest()): - raise CorruptedObjectError('HMAC mismatch') - elif self.decomp and self.decomp.unused_data: - raise CorruptedObjectError('Data after end of compressed stream') - else: - self.hmac_checked = True - return b'' - elif not buf: - return b'' - - buf = self._decrypt(buf) - if not self.decomp: - break - - buf = decompress(self.decomp, buf) - - self.hmac.update(buf) - return buf - - def close(self, *a, **kw): - self.fh.close(*a, **kw) - - def __enter__(self): - return self - - def __exit__(self, *a): - self.close() - return False - - def decompress(decomp, buf): '''Decompress *buf* using *decomp* diff --git a/src/s3ql/backends/local.py b/src/s3ql/backends/local.py index 99ba0ad..1c4719d 100644 --- a/src/s3ql/backends/local.py +++ b/src/s3ql/backends/local.py @@ -10,13 +10,12 @@ from ..logging import logging # Ensure use of custom logger class from .. import BUFSIZE from ..inherit_docstrings import (copy_ancestor_docstring, ABCDocstMeta) from .common import (AbstractBackend, DanglingStorageURLError, NoSuchObject, - CorruptedObjectError, safe_unpickle_fh, ThawError, - freeze_basic_mapping, thaw_basic_mapping) + CorruptedObjectError) +from ..common import ThawError, freeze_basic_mapping, thaw_basic_mapping import _thread import struct import io import os -import pickle import shutil log = logging.getLogger(__name__) @@ -241,19 +240,14 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): def _read_meta(fh): buf = fh.read(9) - if buf.startswith(b's3ql_1\n'): - len_ = struct.unpack('<H', buf[-2:])[0] - try: - return thaw_basic_mapping(fh.read(len_)) - except ThawError: - raise CorruptedObjectError('Invalid metadata') - else: - fh.seek(0) - try: - return safe_unpickle_fh(fh, encoding='latin1') - except pickle.UnpicklingError as exc: - raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc) - + if not buf.startswith(b's3ql_1\n'): + raise CorruptedObjectError('Invalid object header: %r' % buf) + + len_ = struct.unpack('<H', buf[-2:])[0] + try: + return thaw_basic_mapping(fh.read(len_)) + except ThawError: + raise CorruptedObjectError('Invalid metadata') def escape(s): '''Escape '/', '=' and '.' in s''' diff --git a/src/s3ql/backends/s3c.py b/src/s3ql/backends/s3c.py index 4f888d8..910f574 100644 --- a/src/s3ql/backends/s3c.py +++ b/src/s3ql/backends/s3c.py @@ -10,7 +10,7 @@ from ..logging import logging, QuietError # Ensure use of custom logger class from .. import BUFSIZE from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError, AuthenticationError, DanglingStorageURLError, retry_generator, - get_proxy, get_ssl_context, CorruptedObjectError, safe_unpickle, + get_proxy, get_ssl_context, CorruptedObjectError, checksum_basic_mapping) from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring, ABCDocstMeta) @@ -32,7 +32,6 @@ import re import tempfile import time import urllib.parse -import pickle C_DAY_NAMES = [ 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun' ] C_MONTH_NAMES = [ 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec' ] @@ -86,7 +85,8 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): # NOTE: ! This function is also used by the swift backend ! @copy_ancestor_docstring def reset(self): - if self.conn.response_pending() or self.conn._out_remaining: + if (self.conn is not None and + (self.conn.response_pending() or self.conn._out_remaining)): log.debug('Resetting state of http connection %d', id(self.conn)) self.conn.disconnect() @@ -123,7 +123,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): conn = HTTPConnection(self.hostname, self.port, proxy=self.proxy, ssl_context=self.ssl_context) - conn.timeout = int(self.options.get('tcp-timeout', 10)) + conn.timeout = int(self.options.get('tcp-timeout', 20)) return conn # This method is also used implicitly for the retry handling of @@ -194,13 +194,15 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): @retry @copy_ancestor_docstring - def delete(self, key, force=False): + def delete(self, key, force=False, is_retry=False): log.debug('started with %s', key) try: resp = self._do_request('DELETE', '/%s%s' % (self.prefix, key)) self._assert_empty_response(resp) except NoSuchKeyError: - if force: + # Server may have deleted the object even though we did not + # receive the response. + if force or is_retry: pass else: raise NoSuchObject(key) @@ -522,7 +524,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): raise HTTPError(resp.status, resp.reason, resp.headers) # If not XML, do the best we can - if not XML_CONTENT_RE.match(content_type): + if not XML_CONTENT_RE.match(content_type) or resp.length == 0: self.conn.discard() raise HTTPError(resp.status, resp.reason, resp.headers) @@ -709,33 +711,8 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): '''Extract metadata from HTTP response object''' format_ = resp.headers.get('%smeta-format' % self.hdr_prefix, 'raw') - if format_ in ('raw', 'pickle'): - meta = CaseInsensitiveDict() - pattern = re.compile(r'^%smeta-(.+)$' % re.escape(self.hdr_prefix), - re.IGNORECASE) - for fname in resp.headers: - hit = pattern.search(fname) - if hit: - meta[hit.group(1)] = resp.headers[fname] - - if format_ == 'raw': - return meta - - # format_ == pickle - buf = ''.join(meta[x] for x in sorted(meta) if x.lower().startswith('data-')) - if 'md5' in meta and md5sum_b64(buf.encode('us-ascii')) != meta['md5']: - log.warning('MD5 mismatch in metadata for %s', obj_key) - raise BadDigestError('BadDigest', 'Meta MD5 for %s does not match' % obj_key) - try: - return safe_unpickle(b64decode(buf), encoding='latin1') - except binascii.Error: - raise CorruptedObjectError('Corrupted metadata, b64decode failed') - except pickle.UnpicklingError as exc: - raise CorruptedObjectError('Corrupted metadata, pickle says: %s' % exc) - - elif format_ != 'raw2': # Current - raise RuntimeError('Unknown metadata format %s for key %s' - % (format_, obj_key)) + if format_ != 'raw2': # Current + raise CorruptedObjectError('Invalid metadata format: %s' % format_) parts = [] for i in count(): diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py index a02ca24..8b761b6 100644 --- a/src/s3ql/backends/swift.py +++ b/src/s3ql/backends/swift.py @@ -151,7 +151,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): with HTTPConnection(self.hostname, self.port, proxy=self.proxy, ssl_context=ssl_context) as conn: - conn.timeout = int(self.options.get('tcp-timeout', 10)) + conn.timeout = int(self.options.get('tcp-timeout', 20)) for auth_path in ('/v1.0', '/auth/v1.0'): log.debug('GET %s', auth_path) @@ -184,7 +184,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): conn = HTTPConnection(o.hostname, o.port, proxy=self.proxy, ssl_context=ssl_context) - conn.timeout = int(self.options.get('tcp-timeout', 10)) + conn.timeout = int(self.options.get('tcp-timeout', 20)) return conn raise RuntimeError('No valid authentication path found') @@ -379,7 +379,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): @retry @copy_ancestor_docstring - def delete(self, key, force=False): + def delete(self, key, force=False, is_retry=False): if key.endswith(TEMP_SUFFIX): raise ValueError('Keys must not end with %s' % TEMP_SUFFIX) log.debug('started with %s', key) @@ -387,13 +387,22 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta): resp = self._do_request('DELETE', '/%s%s' % (self.prefix, key)) self._assert_empty_response(resp) except HTTPError as exc: - if exc.status == 404 and not force: + # Server may have deleted the object even though we did not + # receive the response. + if exc.status == 404 and not (force or is_retry): raise NoSuchObject(key) elif exc.status != 404: raise - # Temporary hack. I promise to make this nicer by the next - # release :-). + # We cannot wrap the entire copy() method into a retry() decorator, because + # copy() issues multiple requests. If the server happens to regularly close + # the connection after a request (even though it was processed correctly), + # we'd never make any progress if we always restart from the first request. + # We experimented with adding a retry(fn, args, kwargs) function to wrap + # individual calls, but this doesn't really improve the code because we + # typically also have to wrap e.g. a discard() or assert_empty() call, so we + # end up with having to create an additional function anyway. Having + # anonymous blocks in Python would be very nice here. @retry def _copy_helper(self, method, path, headers): self._do_request(method, path, headers=headers) diff --git a/src/s3ql/backends/swiftks.py b/src/s3ql/backends/swiftks.py index 2492c0a..1af7577 100644 --- a/src/s3ql/backends/swiftks.py +++ b/src/s3ql/backends/swiftks.py @@ -85,7 +85,7 @@ class Backend(swift.Backend): with HTTPConnection(self.hostname, port=self.port, proxy=self.proxy, ssl_context=ssl_context) as conn: - conn.timeout = int(self.options.get('tcp-timeout', 10)) + conn.timeout = int(self.options.get('tcp-timeout', 20)) conn.send_request('POST', '/v2.0/tokens', headers=headers, body=json.dumps(auth_body).encode('utf-8')) @@ -122,7 +122,7 @@ class Backend(swift.Backend): conn = HTTPConnection(o.hostname, o.port, proxy=self.proxy, ssl_context=ssl_context) - conn.timeout = int(self.options.get('tcp-timeout', 10)) + conn.timeout = int(self.options.get('tcp-timeout', 20)) return conn if len(avail_regions) < 10: diff --git a/src/s3ql/common.py b/src/s3ql/common.py index 61af937..3514bba 100644 --- a/src/s3ql/common.py +++ b/src/s3ql/common.py @@ -8,12 +8,11 @@ This program can be distributed under the terms of the GNU GPLv3. from .logging import logging, QuietError # Ensure use of custom logger class from . import BUFSIZE, CTRL_NAME, ROOT_INODE -from .backends import prefix_map -from .backends.common import (CorruptedObjectError, NoSuchObject, AuthenticationError, - DanglingStorageURLError, AuthorizationError) -from .backends.comprenc import ComprencBackend from dugong import HostnameNotResolvable from getpass import getpass +from ast import literal_eval +from base64 import b64decode, b64encode +import binascii import configparser import re import stat @@ -22,7 +21,7 @@ import traceback import sys import os import subprocess -import bz2 +import pickle import errno import hashlib import llfuse @@ -41,16 +40,13 @@ def bytes2path(s): def get_seq_no(backend): '''Get current metadata sequence number''' + from .backends.common import NoSuchObject + seq_nos = list(backend.list('s3ql_seq_no_')) if not seq_nos: # Maybe list result is outdated seq_nos = [ 's3ql_seq_no_1' ] - if (seq_nos[0].endswith('.meta') - or seq_nos[0].endswith('.dat')): - raise QuietError('Old file system revision, please run `s3qladm upgrade` first.', - exitcode=32) - seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in seq_nos ] seq_no = max(seq_nos) @@ -73,36 +69,6 @@ def get_seq_no(backend): return seq_no -def stream_write_bz2(ifh, ofh): - '''Compress *ifh* into *ofh* using bz2 compression''' - - compr = bz2.BZ2Compressor(9) - while True: - buf = ifh.read(BUFSIZE) - if not buf: - break - buf = compr.compress(buf) - if buf: - ofh.write(buf) - buf = compr.flush() - if buf: - ofh.write(buf) - -def stream_read_bz2(ifh, ofh): - '''Uncompress bz2 compressed *ifh* into *ofh*''' - - decompressor = bz2.BZ2Decompressor() - while True: - buf = ifh.read(BUFSIZE) - if not buf: - break - buf = decompressor.decompress(buf) - if buf: - ofh.write(buf) - - if decompressor.unused_data or ifh.read(1) != b'': - raise CorruptedObjectError('Data after end of bz2 stream') - def is_mounted(storage_url): '''Try to determine if *storage_url* is mounted @@ -282,25 +248,33 @@ def assert_s3ql_mountpoint(mountpoint): return ctrlfile -def get_backend(options, plain=False): +def get_backend(options, raw=False): '''Return backend for given storage-url - If *plain* is true, don't attempt to unlock and don't wrap into + If *raw* is true, don't attempt to unlock and don't wrap into ComprencBackend. ''' - return get_backend_factory(options, plain)() + return get_backend_factory(options.storage_url, options.backend_options, + options.authfile, + getattr(options, 'compress', ('lzma', 2)), raw)() -def get_backend_factory(options, plain=False): +def get_backend_factory(storage_url, backend_options, authfile, + compress=('lzma', 2), raw=False): '''Return factory producing backend objects for given storage-url - If *plain* is true, don't attempt to unlock and don't wrap into + If *raw* is true, don't attempt to unlock and don't wrap into ComprencBackend. ''' - hit = re.match(r'^([a-zA-Z0-9]+)://', options.storage_url) + from .backends import prefix_map + from .backends.common import (CorruptedObjectError, NoSuchObject, AuthenticationError, + DanglingStorageURLError, AuthorizationError) + from .backends.comprenc import ComprencBackend + + hit = re.match(r'^([a-zA-Z0-9]+)://', storage_url) if not hit: - raise QuietError('Unable to parse storage url "%s"' % options.storage_url, + raise QuietError('Unable to parse storage url "%s"' % storage_url, exitcode=2) backend = hit.group(1) @@ -310,19 +284,19 @@ def get_backend_factory(options, plain=False): raise QuietError('No such backend: %s' % backend, exitcode=11) # Validate backend options - for opt in options.backend_options.keys(): + for opt in backend_options.keys(): if opt not in backend_class.known_options: raise QuietError('Unknown backend option: %s' % opt, exitcode=3) # Read authfile config = configparser.ConfigParser() - if os.path.isfile(options.authfile): - mode = os.stat(options.authfile).st_mode + if os.path.isfile(authfile): + mode = os.stat(authfile).st_mode if mode & (stat.S_IRGRP | stat.S_IROTH): - raise QuietError("%s has insecure permissions, aborting." % options.authfile, + raise QuietError("%s has insecure permissions, aborting." % authfile, exitcode=12) - config.read(options.authfile) + config.read(authfile) backend_login = None backend_passphrase = None @@ -336,16 +310,12 @@ def get_backend_factory(options, plain=False): pattern = getopt('storage-url') - if not pattern or not options.storage_url.startswith(pattern): + if not pattern or not storage_url.startswith(pattern): continue backend_login = getopt('backend-login') or backend_login backend_passphrase = getopt('backend-password') or backend_passphrase fs_passphrase = getopt('fs-passphrase') or fs_passphrase - if getopt('fs-passphrase') is None and getopt('bucket-passphrase') is not None: - fs_passphrase = getopt('bucket-passphrase') - log.warning("Warning: the 'bucket-passphrase' configuration option has been " - "renamed to 'fs-passphrase'! Please update your authinfo file.") if not backend_login and backend_class.needs_login: if sys.stdin.isatty(): @@ -361,8 +331,8 @@ def get_backend_factory(options, plain=False): backend = None try: - backend = backend_class(options.storage_url, backend_login, backend_passphrase, - options.backend_options) + backend = backend_class(storage_url, backend_login, backend_passphrase, + backend_options) # Do not use backend.lookup(), this would use a HEAD request and # not provide any useful error messages if something goes wrong @@ -394,9 +364,9 @@ def get_backend_factory(options, plain=False): if backend is not None: backend.close() - if plain: - return lambda: backend_class(options.storage_url, backend_login, backend_passphrase, - options.backend_options) + if raw: + return lambda: backend_class(storage_url, backend_login, backend_passphrase, + backend_options) if encrypted and not fs_passphrase: if sys.stdin.isatty(): @@ -409,15 +379,10 @@ def get_backend_factory(options, plain=False): if fs_passphrase is not None: fs_passphrase = fs_passphrase.encode('utf-8') - if hasattr(options, 'compress'): - compress = options.compress - else: - compress = ('lzma', 2) - if not encrypted: return lambda: ComprencBackend(None, compress, - backend_class(options.storage_url, backend_login, - backend_passphrase, options.backend_options)) + backend_class(storage_url, backend_login, + backend_passphrase, backend_options)) with ComprencBackend(fs_passphrase, compress, backend) as tmp_backend: try: @@ -429,8 +394,8 @@ def get_backend_factory(options, plain=False): # passphrase in every backend object. def factory(): b = ComprencBackend(data_pw, compress, - backend_class(options.storage_url, backend_login, - backend_passphrase, options.backend_options)) + backend_class(storage_url, backend_login, + backend_passphrase, backend_options)) b.fs_passphrase = fs_passphrase return b @@ -538,7 +503,7 @@ def split_by_n(seq, n): while seq: yield seq[:n] seq = seq[n:] - + def handle_on_return(fn): '''Provide fresh ExitStack instance in `on_return` argument''' @functools.wraps(fn) @@ -548,3 +513,109 @@ def handle_on_return(fn): kw['on_return'] = on_return return fn(*a, **kw) return wrapper + +def parse_literal(buf, type_spec): + '''Try to parse *buf* as *type_spec* + + Raise `ValueError` if *buf* does not contain a valid + Python literal, or if the literal does not correspond + to *type_spec*. + + Example use:: + + buf = b'[1, 'a', 3]' + parse_literal(buf, [int, str, int]) + + ''' + + try: + obj = literal_eval(buf.decode()) + except UnicodeDecodeError: + raise ValueError('unable to decode as utf-8') + except (ValueError, SyntaxError): + raise ValueError('unable to parse as python literal') + + if (isinstance(type_spec, list) and type(obj) == list + and [ type(x) for x in obj ] == type_spec): + return obj + elif (isinstance(type_spec, tuple) and type(obj) == tuple + and [ type(x) for x in obj ] == list(type_spec)): + return obj + elif type(obj) == type_spec: + return obj + + raise ValueError('literal has wrong type') + +class ThawError(Exception): + def __str__(self): + return 'Malformed serialization data' + +def thaw_basic_mapping(buf): + '''Reconstruct dict from serialized representation + + *buf* must be a bytes-like object as created by + `freeze_basic_mapping`. Raises `ThawError` if *buf* is not a valid + representation. + + This procedure is safe even if *buf* comes from an untrusted source. + ''' + + try: + d = literal_eval(buf.decode('utf-8')) + except (UnicodeDecodeError, SyntaxError, ValueError): + raise ThawError() + + # Decode bytes values + for (k,v) in d.items(): + if not isinstance(v, bytes): + continue + try: + d[k] = b64decode(v) + except binascii.Error: + raise ThawError() + + return d + +def freeze_basic_mapping(d): + '''Serialize mapping of elementary types + + Keys of *d* must be strings. Values of *d* must be of elementary type (i.e., + `str`, `bytes`, `int`, `float`, `complex`, `bool` or None). + + The output is a bytestream that can be used to reconstruct the mapping. The + bytestream is not guaranteed to be deterministic. Look at + `checksum_basic_mapping` if you need a deterministic bytestream. + ''' + + els = [] + for (k,v) in d.items(): + if not isinstance(k, str): + raise ValueError('key %s must be str, not %s' % (k, type(k))) + + if (not isinstance(v, (str, bytes, bytearray, int, float, complex, bool)) + and v is not None): + raise ValueError('value for key %s (%s) is not elementary' % (k, v)) + + # To avoid wasting space, we b64encode non-ascii byte values. + if isinstance(v, (bytes, bytearray)): + v = b64encode(v) + + # This should be a pretty safe assumption for elementary types, but we + # add an assert just to be safe (Python docs just say that repr makes + # "best effort" to produce something parseable) + (k_repr, v_repr) = (repr(k), repr(v)) + assert (literal_eval(k_repr), literal_eval(v_repr)) == (k, v) + + els.append(('%s: %s' % (k_repr, v_repr))) + + buf = '{ %s }' % ', '.join(els) + return buf.encode('utf-8') + +def load_params(fname): + with open(fname, 'rb') as fh: + proto = fh.read(2) + fh.seek(0) + if proto == b'\x80\x02': + return pickle.load(fh) + else: + return thaw_basic_mapping(fh.read()) diff --git a/src/s3ql/cp.py b/src/s3ql/cp.py index 7b1e4d5..38118cf 100644 --- a/src/s3ql/cp.py +++ b/src/s3ql/cp.py @@ -8,11 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3. from .logging import logging, setup_logging, QuietError from .common import assert_fs_owner -from . import PICKLE_PROTOCOL from .parse_args import ArgumentParser import llfuse import os -import pickle import stat import sys import textwrap @@ -88,9 +86,8 @@ def main(args=None): raise QuietError('No permission to create target directory') fstat_t = os.stat(options.target) - llfuse.setxattr(ctrlfile, 'copy', pickle.dumps((fstat_s.st_ino, fstat_t.st_ino), - PICKLE_PROTOCOL)) - + llfuse.setxattr(ctrlfile, 'copy', + ('(%d, %d)' % (fstat_s.st_ino, fstat_t.st_ino)).encode()) if __name__ == '__main__': main(sys.argv[1:]) diff --git a/src/s3ql/ctrl.py b/src/s3ql/ctrl.py index ca667f1..dfc39af 100644 --- a/src/s3ql/ctrl.py +++ b/src/s3ql/ctrl.py @@ -8,10 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3. from .logging import logging, setup_logging from .common import assert_fs_owner -from . import PICKLE_PROTOCOL from .parse_args import ArgumentParser import llfuse -import pickle import sys import textwrap @@ -69,14 +67,8 @@ def parse_args(args): if not options.modules: options.modules = [ 'all' ] - if options.level: - # Protected member ok, hopefully this won't break - #pylint: disable=W0212 - options.level = logging._levelNames[options.level.upper()] - return options - def main(args=None): '''Control a mounted S3QL File System.''' @@ -97,15 +89,11 @@ def main(args=None): llfuse.setxattr(ctrlfile, 'upload-meta', b'dummy') elif options.action == 'log': - llfuse.setxattr(ctrlfile, 'logging', - pickle.dumps((options.level, options.modules), - PICKLE_PROTOCOL)) + cmd = ('(%r, %r)' % (options.level, ','.join(options.modules))).encode() + llfuse.setxattr(ctrlfile, 'logging', cmd) elif options.action == 'cachesize': - llfuse.setxattr(ctrlfile, 'cachesize', - pickle.dumps(options.cachesize * 1024, PICKLE_PROTOCOL)) - - + llfuse.setxattr(ctrlfile, 'cachesize', ('%d' % options.cachesize * 1024).encode()) if __name__ == '__main__': main(sys.argv[1:]) diff --git a/src/s3ql/fs.py b/src/s3ql/fs.py index b54936b..70c5d43 100644 --- a/src/s3ql/fs.py +++ b/src/s3ql/fs.py @@ -7,9 +7,9 @@ This program can be distributed under the terms of the GNU GPLv3. ''' from .logging import logging -from . import deltadump, CTRL_NAME, CTRL_INODE, PICKLE_PROTOCOL +from . import deltadump, CTRL_NAME, CTRL_INODE from .backends.common import NoSuchObject, CorruptedObjectError -from .common import get_path +from .common import get_path, parse_literal from .database import NoSuchRowError from .inode_cache import OutOfInodesError from io import BytesIO @@ -19,7 +19,6 @@ import errno import llfuse import math import os -import pickle import stat import struct import time @@ -207,7 +206,7 @@ class Operations(llfuse.Operations): # Handle S3QL commands if id_ == CTRL_INODE: if name == b's3ql_pid?': - return pickle.dumps(os.getpid(), PICKLE_PROTOCOL) + return ('%d' % os.getpid()).encode() elif name == b's3qlstat': return self.extstat() @@ -242,22 +241,55 @@ class Operations(llfuse.Operations): if id_ == CTRL_INODE: if name == b's3ql_flushcache!': self.cache.clear() + elif name == b'copy': - self.copy_tree(*pickle.loads(value)) + try: + tup = parse_literal(value, (int, int)) + except ValueError: + log.warning('Received malformed command via control inode') + raise FUSEError.EINVAL() + self.copy_tree(*tup) + elif name == b'upload-meta': if self.upload_event is not None: self.upload_event.set() else: raise llfuse.FUSEError(errno.ENOTTY) + elif name == b'lock': - self.lock_tree(*pickle.loads(value)) + try: + id_ = parse_literal(value, int) + except ValueError: + log.warning('Received malformed command via control inode') + raise FUSEError.EINVAL() + self.lock_tree(id_) + elif name == b'rmtree': - self.remove_tree(*pickle.loads(value)) + try: + tup = parse_literal(value, (int, bytes)) + except ValueError: + log.warning('Received malformed command via control inode') + raise FUSEError.EINVAL() + self.remove_tree(*tup) + elif name == b'logging': - update_logging(*pickle.loads(value)) + try: + (lvl, modules)= parse_literal(value, (str, str)) + lvl = logging._levelNames[lvl.upper()] + except (ValueError, KeyError): + log.warning('Received malformed command via control inode') + raise FUSEError.EINVAL() + update_logging(lvl, modules.split(',') if modules else None) + elif name == b'cachesize': - self.cache.max_size = pickle.loads(value) + try: + self.cache.cache.max_size = parse_literal(value, int) + except ValueError: + log.warning('Received malformed command via control inode') + raise FUSEError.EINVAL() + else: + log.warning('Received unknown command via control inode') raise llfuse.FUSEError(errno.EINVAL) # http://code.google.com/p/s3ql/issues/detail?id=385 diff --git a/src/s3ql/fsck.py b/src/s3ql/fsck.py index b7a1094..cbec005 100644 --- a/src/s3ql/fsck.py +++ b/src/s3ql/fsck.py @@ -7,27 +7,25 @@ This program can be distributed under the terms of the GNU GPLv3. ''' from .logging import logging, setup_logging, QuietError -from . import CURRENT_FS_REV, BUFSIZE, CTRL_INODE, PICKLE_PROTOCOL, ROOT_INODE +from . import CURRENT_FS_REV, BUFSIZE, CTRL_INODE, ROOT_INODE from .backends.common import NoSuchObject from .backends.comprenc import ComprencBackend from .backends.local import Backend as LocalBackend from .common import (inode_for_path, sha256_fh, get_path, get_backend_cachedir, - get_seq_no, stream_write_bz2, stream_read_bz2, is_mounted, - get_backend, pretty_print_size) + get_seq_no, is_mounted, get_backend, load_params, + freeze_basic_mapping) from .database import NoSuchRowError, Connection -from .metadata import restore_metadata, cycle_metadata, dump_metadata, create_tables +from .metadata import create_tables, dump_and_upload_metadata, download_metadata from .parse_args import ArgumentParser from os.path import basename import apsw import os -import pickle import hashlib import re import shutil import itertools import stat import sys -import tempfile import textwrap import time import atexit @@ -267,7 +265,7 @@ class Fsck(object): (id_p_new, newname) = self.resolve_free(b"/lost+found", newname) self.log_error('Content entry for inode %d refers to non-existing name with id %d, ' - 'moving to /lost+found/%s', inode, name_id, newname) + 'moving to /lost+found/%s', inode, name_id, to_str(newname)) self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? WHERE rowid=?', (self._add_name(newname), id_p_new, rowid)) @@ -287,7 +285,8 @@ class Fsck(object): (id_p_new, newname) = self.resolve_free(b"/lost+found", ('[%d]-%s' % (inode_p, name)).encode()) - self.log_error('Parent inode %d for "%s" vanished, moving to /lost+found', inode_p, name) + self.log_error('Parent inode %d for "%s" vanished, moving to /lost+found', + inode_p, to_str(name)) self._del_name(name_id) self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? WHERE rowid=?', (self._add_name(newname), id_p_new, rowid)) @@ -308,7 +307,7 @@ class Fsck(object): except NoSuchRowError: path = '[inode %d, parent %d]' % (inode, inode_p) - self.log_error('Inode for %s vanished, deleting', path) + self.log_error('Inode for %s vanished, deleting', to_str(path)) self._del_name(name_id) to_delete.append(rowid) @@ -329,7 +328,7 @@ class Fsck(object): 'FROM contents_v WHERE inode=?', (inode,)): path = get_path(id_p, self.conn, name) self.log_error('Extended attribute %d of %s refers to non-existing name %d, renaming..', - rowid, path, name_id) + rowid, to_str(path), name_id) while True: name_id = self._add_name('lost+found_%d' % rowid) @@ -404,7 +403,7 @@ class Fsck(object): (id_p, name) = self.resolve_free(b"/lost+found", name) self.log_error("Found unreachable filesystem entries, re-anchoring %s [%d] " - "in /lost+found", name, inode) + "in /lost+found", to_str(name), inode) self.conn.execute('UPDATE contents SET parent_inode=?, name_id=? ' 'WHERE inode=? AND name_id=?', (id_p, self._add_name(name), inode, name_id)) @@ -441,7 +440,7 @@ class Fsck(object): self.found_errors = True self.log_error("Size of inode %d (%s) does not agree with number of blocks, " "setting from %d to %d", - id_, get_path(id_, self.conn), size_old, size) + id_, to_str(get_path(id_, self.conn)), size_old, size) self.conn.execute("UPDATE inodes SET size=? WHERE id=?", (size, id_)) finally: self.conn.execute('DROP TABLE min_sizes') @@ -473,14 +472,15 @@ class Fsck(object): self.found_errors = True if cnt is None: (id_p, name) = self.resolve_free(b"/lost+found", ("inode-%d" % id_).encode()) - self.log_error("Inode %d not referenced, adding as /lost+found/%s", id_, name) + self.log_error("Inode %d not referenced, adding as /lost+found/%s", + id_, to_str(name)) self.conn.execute("INSERT INTO contents (name_id, inode, parent_inode) " "VALUES (?,?,?)", (self._add_name(basename(name)), id_, id_p)) self.conn.execute("UPDATE inodes SET refcount=? WHERE id=?", (1, id_)) else: self.log_error("Inode %d (%s) has wrong reference count, setting from %d to %d", - id_, get_path(id_, self.conn), cnt_old, cnt) + id_, to_str(get_path(id_, self.conn)), cnt_old, cnt) self.conn.execute("UPDATE inodes SET refcount=? WHERE id=?", (cnt, id_)) finally: self.conn.execute('DROP TABLE refcounts') @@ -504,7 +504,7 @@ class Fsck(object): for (name, name_id, id_p) in self.conn.query('SELECT name, name_id, parent_inode ' 'FROM contents_v WHERE inode=?', (inode,)): path = get_path(id_p, self.conn, name) - self.log_error("File may lack data, moved to /lost+found: %s", path) + self.log_error("File may lack data, moved to /lost+found: %s", to_str(path)) (lof_id, newname) = self.resolve_free(b"/lost+found", escape(path)) self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? ' @@ -555,7 +555,7 @@ class Fsck(object): for (name, name_id, id_p) in self.conn.query('SELECT name, name_id, parent_inode ' 'FROM contents_v WHERE inode=?', (inode,)): path = get_path(id_p, self.conn, name) - self.log_error("File may lack data, moved to /lost+found: %s", path) + self.log_error("File may lack data, moved to /lost+found: %s", to_str(path)) (lof_id, newname) = self.resolve_free(b"/lost+found", escape(path)) self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? ' @@ -619,7 +619,8 @@ class Fsck(object): elif cnt is None: self.found_errors = True (id_p, name) = self.resolve_free(b"/lost+found", ("block-%d" % id_).encode()) - self.log_error("Block %d not referenced, adding as /lost+found/%s", id_, name) + self.log_error("Block %d not referenced, adding as /lost+found/%s", + id_, to_str(name)) timestamp = time.time() size = self.conn.get_val('SELECT size FROM blocks WHERE id=?', (id_,)) inode = self.create_inode(mode=stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR, @@ -751,20 +752,20 @@ class Fsck(object): self.found_errors = True self.log_error('Inode %d (%s): directory entry has no type, changed ' - 'to %s.', inode, get_path(inode, self.conn), made_to) + 'to %s.', inode, to_str(get_path(inode, self.conn)), made_to) self.conn.execute('UPDATE inodes SET mode=? WHERE id=?', (mode, inode)) if stat.S_ISLNK(mode) and target is None: self.found_errors = True self.log_error('Inode %d (%s): symlink does not have target. ' 'This is probably going to confuse your system!', - inode, get_path(inode, self.conn)) + inode, to_str(get_path(inode, self.conn))) if stat.S_ISLNK(mode) and target is not None and size != len(target): self.found_errors = True self.log_error('Inode %d (%s): symlink size (%d) does not agree with target ' 'length (%d). This is probably going to confuse your system!', - inode, get_path(inode, self.conn), size, len(target)) + inode, to_str(get_path(inode, self.conn)), size, len(target)) if size != 0 and (not stat.S_ISREG(mode) and not stat.S_ISLNK(mode) @@ -772,33 +773,33 @@ class Fsck(object): self.found_errors = True self.log_error('Inode %d (%s) is not regular file but has non-zero size. ' 'This is may confuse your system!', - inode, get_path(inode, self.conn)) + inode, to_str(get_path(inode, self.conn))) if target is not None and not stat.S_ISLNK(mode): self.found_errors = True self.log_error('Inode %d (%s) is not symlink but has symlink target. ' 'This is probably going to confuse your system!', - inode, get_path(inode, self.conn)) + inode, to_str(get_path(inode, self.conn))) if rdev != 0 and not (stat.S_ISBLK(mode) or stat.S_ISCHR(mode)): self.found_errors = True self.log_error('Inode %d (%s) is not device but has device number. ' 'This is probably going to confuse your system!', - inode, get_path(inode, self.conn)) + inode, to_str(get_path(inode, self.conn))) if has_children and not stat.S_ISDIR(mode): self.found_errors = True self.log_error('Inode %d (%s) is not a directory but has child entries. ' 'This is probably going to confuse your system!', - inode, get_path(inode, self.conn)) + inode, to_str(get_path(inode, self.conn))) if (not stat.S_ISREG(mode) and self.conn.has_val('SELECT 1 FROM inode_blocks WHERE inode=?', (inode,))): self.found_errors = True self.log_error('Inode %d (%s) is not a regular file but has data blocks. ' 'This is probably going to confuse your system!', - inode, get_path(inode, self.conn)) + inode, to_str(get_path(inode, self.conn))) @@ -808,7 +809,8 @@ class Fsck(object): 'WHERE LENGTH(name) > 255'): path = get_path(id_p, self.conn, name) self.log_error('Entry name %s... in %s has more than 255 characters, ' - 'this could cause problems', name[:40], path[:-len(name)]) + 'this could cause problems', to_str(name[:40]), + to_str(path[:-len(name)])) self.found_errors = True def check_objects_refcount(self): @@ -947,7 +949,7 @@ class Fsck(object): for (name, name_id, id_p) in self.conn.query('SELECT name, name_id, parent_inode ' 'FROM contents_v WHERE inode=?', (id_,)): path = get_path(id_p, self.conn, name) - self.log_error("File may lack data, moved to /lost+found: %s", path) + self.log_error("File may lack data, moved to /lost+found: %s", to_str(path)) (_, newname) = self.resolve_free(b"/lost+found", escape(path)) self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? ' @@ -991,13 +993,15 @@ class Fsck(object): if not isinstance(path, bytes): raise TypeError('path must be of type bytes') + if not isinstance(name, bytes): + raise TypeError('name must be of type bytes') inode_p = inode_for_path(path, self.conn) # Debugging http://code.google.com/p/s3ql/issues/detail?id=217 # and http://code.google.com/p/s3ql/issues/detail?id=261 if len(name) > 255 - 4: - name = '%s ... %s' % (name[0:120], name[-120:]) + name = b''.join((name[0:120], b' ... ', name[-120:])) i = 0 newname = name @@ -1046,8 +1050,7 @@ class ROFsck(Fsck): db = Connection(path + '.db') db.execute('PRAGMA journal_mode = WAL') - with open(path + '.params', 'rb') as fh: - param = pickle.load(fh) + param = load_params(path + '.params') super().__init__(None, None, param, db) def check(self): @@ -1161,8 +1164,7 @@ def main(args=None): if os.path.exists(cachepath + '.params'): assert os.path.exists(cachepath + '.db') - with open(cachepath + '.params', 'rb') as fh: - param = pickle.load(fh) + param = load_params(cachepath + '.params') if param['seq_no'] < seq_no: log.info('Ignoring locally cached metadata (outdated).') param = backend.lookup('s3ql_metadata') @@ -1248,25 +1250,14 @@ def main(args=None): + cachepath + '.db (corrupted)\n' + cachepath + '.param (intact)', exitcode=43) else: - with tempfile.TemporaryFile() as tmpfh: - def do_read(fh): - tmpfh.seek(0) - tmpfh.truncate() - stream_read_bz2(fh, tmpfh) - - log.info('Downloading and decompressing metadata...') - backend.perform_read(do_read, "s3ql_metadata") - - log.info("Reading metadata...") - tmpfh.seek(0) - db = restore_metadata(tmpfh, cachepath + '.db') + db = download_metadata(backend, cachepath + '.db') # Increase metadata sequence no param['seq_no'] += 1 param['needs_fsck'] = True backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty' with open(cachepath + '.params', 'wb') as fh: - pickle.dump(param, fh, PICKLE_PROTOCOL) + fh.write(freeze_basic_mapping(param)) fsck = Fsck(cachepath + '-cache', backend, param, db) fsck.check() @@ -1291,22 +1282,9 @@ def main(args=None): param['last_fsck'] = time.time() param['last-modified'] = time.time() - log.info('Dumping metadata...') - with tempfile.TemporaryFile() as fh: - dump_metadata(db, fh) - def do_write(obj_fh): - fh.seek(0) - stream_write_bz2(fh, obj_fh) - return obj_fh - - log.info("Compressing and uploading metadata...") - obj_fh = backend.perform_write(do_write, "s3ql_metadata_new", metadata=param, - is_compressed=True) - log.info('Wrote %s of compressed metadata.', pretty_print_size(obj_fh.get_obj_size())) - log.info('Cycling metadata backups...') - cycle_metadata(backend) + dump_and_upload_metadata(backend, db, param) with open(cachepath + '.params', 'wb') as fh: - pickle.dump(param, fh, PICKLE_PROTOCOL) + fh.write(freeze_basic_mapping(param)) log.info('Cleaning up local metadata...') db.execute('ANALYZE') @@ -1381,5 +1359,10 @@ def escape(path): return path[1:].replace(b'_', b'__').replace(b'/', b'_') +def to_str(name): + '''Decode path name for printing''' + + return str(name, encoding='utf-8', errors='replace') + if __name__ == '__main__': main(sys.argv[1:]) diff --git a/src/s3ql/lock.py b/src/s3ql/lock.py index 2364605..20b15c8 100644 --- a/src/s3ql/lock.py +++ b/src/s3ql/lock.py @@ -8,11 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3. from .logging import logging, setup_logging, QuietError from .common import assert_fs_owner -from . import PICKLE_PROTOCOL from .parse_args import ArgumentParser import llfuse import os -import pickle import sys import textwrap @@ -54,7 +52,7 @@ def main(args=None): raise QuietError('%s is a mount point.' % name) ctrlfile = assert_fs_owner(name) fstat = os.stat(name) - llfuse.setxattr(ctrlfile, 'lock', pickle.dumps((fstat.st_ino,), PICKLE_PROTOCOL)) + llfuse.setxattr(ctrlfile, 'lock', ('%d' % fstat.st_ino).encode()) if __name__ == '__main__': main(sys.argv[1:]) diff --git a/src/s3ql/metadata.py b/src/s3ql/metadata.py index bb27dae..5ac2978 100644 --- a/src/s3ql/metadata.py +++ b/src/s3ql/metadata.py @@ -8,9 +8,13 @@ This program can be distributed under the terms of the GNU GPLv3. from .logging import logging # Ensure use of custom logger class from .database import Connection +from . import BUFSIZE +from .common import pretty_print_size from .deltadump import INTEGER, BLOB, TIME, dump_table, load_table -from .backends.common import NoSuchObject +from .backends.common import NoSuchObject, CorruptedObjectError import os +import tempfile +import bz2 import stat log = logging.getLogger(__name__) @@ -124,7 +128,11 @@ def cycle_metadata(backend, keep=10): # However, the current metadata object should always be copied, # so that even if there's a crash we don't end up without it - backend.copy("s3ql_metadata", "s3ql_metadata_bak_0") + try: + backend.copy("s3ql_metadata", "s3ql_metadata_bak_0") + except NoSuchObject: + # In case of mkfs, there may be no metadata object yet + pass cycle_fn("s3ql_metadata_new", "s3ql_metadata") if cycle_fn is backend.copy: @@ -252,3 +260,67 @@ def create_tables(conn): CREATE VIEW ext_attributes_v AS SELECT * FROM ext_attributes JOIN names ON names.id = name_id """) + +def stream_write_bz2(ifh, ofh): + '''Compress *ifh* into *ofh* using bz2 compression''' + + compr = bz2.BZ2Compressor(9) + while True: + buf = ifh.read(BUFSIZE) + if not buf: + break + buf = compr.compress(buf) + if buf: + ofh.write(buf) + buf = compr.flush() + if buf: + ofh.write(buf) + +def stream_read_bz2(ifh, ofh): + '''Uncompress bz2 compressed *ifh* into *ofh*''' + + decompressor = bz2.BZ2Decompressor() + while True: + buf = ifh.read(BUFSIZE) + if not buf: + break + buf = decompressor.decompress(buf) + if buf: + ofh.write(buf) + + if decompressor.unused_data or ifh.read(1) != b'': + raise CorruptedObjectError('Data after end of bz2 stream') + +def download_metadata(backend, db_file, name='s3ql_metadata'): + with tempfile.TemporaryFile() as tmpfh: + def do_read(fh): + tmpfh.seek(0) + tmpfh.truncate() + stream_read_bz2(fh, tmpfh) + + log.info('Downloading and decompressing metadata...') + backend.perform_read(do_read, name) + + log.info("Reading metadata...") + tmpfh.seek(0) + return restore_metadata(tmpfh, db_file) + +def dump_and_upload_metadata(backend, db, param): + with tempfile.TemporaryFile() as fh: + log.info('Dumping metadata...') + dump_metadata(db, fh) + upload_metadata(backend, fh, param) + +def upload_metadata(backend, fh, param): + log.info("Compressing and uploading metadata...") + def do_write(obj_fh): + fh.seek(0) + stream_write_bz2(fh, obj_fh) + return obj_fh + obj_fh = backend.perform_write(do_write, "s3ql_metadata_new", + metadata=param, is_compressed=True) + log.info('Wrote %s of compressed metadata.', + pretty_print_size(obj_fh.get_obj_size())) + + log.info('Cycling metadata backups...') + cycle_metadata(backend) diff --git a/src/s3ql/mkfs.py b/src/s3ql/mkfs.py index 27f48fe..ba46e35 100644 --- a/src/s3ql/mkfs.py +++ b/src/s3ql/mkfs.py @@ -7,22 +7,20 @@ This program can be distributed under the terms of the GNU GPLv3. ''' from .logging import logging, setup_logging, QuietError -from . import CURRENT_FS_REV, CTRL_INODE, PICKLE_PROTOCOL, ROOT_INODE +from . import CURRENT_FS_REV, CTRL_INODE, ROOT_INODE from .backends.comprenc import ComprencBackend from .backends import s3 -from .common import (get_backend_cachedir, stream_write_bz2, get_backend, - pretty_print_size, split_by_n) +from .common import (get_backend_cachedir, get_backend, split_by_n, + freeze_basic_mapping) from .database import Connection -from .metadata import dump_metadata, create_tables +from .metadata import dump_and_upload_metadata, create_tables from .parse_args import ArgumentParser from getpass import getpass from base64 import b64encode import os -import pickle import shutil import stat import sys -import tempfile import time import atexit @@ -95,7 +93,7 @@ def main(args=None): log.warning('Warning: maximum object sizes less than 1 MiB will seriously degrade ' 'performance.', extra={ 'force_log': True }) - plain_backend = get_backend(options, plain=True) + plain_backend = get_backend(options, raw=True) atexit.register(plain_backend.close) log.info("Before using S3QL, make sure to read the user's guide, especially\n" @@ -170,23 +168,10 @@ def main(args=None): param['last-modified'] = time.time() log.info('Dumping metadata...') - with tempfile.TemporaryFile() as fh: - dump_metadata(db, fh) - def do_write(obj_fh): - fh.seek(0) - stream_write_bz2(fh, obj_fh) - return obj_fh - - # Store metadata first, and seq_no second so that if mkfs - # is interrupted, fsck won't see a file system at all. - log.info("Compressing and uploading metadata...") - obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param, - is_compressed=True) - backend.store('s3ql_seq_no_%d' % param['seq_no'], b'Empty') - - log.info('Wrote %s of compressed metadata.', pretty_print_size(obj_fh.get_obj_size())) + dump_and_upload_metadata(backend, db, param) + backend.store('s3ql_seq_no_%d' % param['seq_no'], b'Empty') with open(cachepath + '.params', 'wb') as fh: - pickle.dump(param, fh, PICKLE_PROTOCOL) + fh.write(freeze_basic_mapping(param)) if data_pw is not None: print('Please store the following master key in a safe location. It allows ', diff --git a/src/s3ql/mount.py b/src/s3ql/mount.py index 3192a86..44b8fbc 100644 --- a/src/s3ql/mount.py +++ b/src/s3ql/mount.py @@ -7,15 +7,16 @@ This program can be distributed under the terms of the GNU GPLv3. ''' from .logging import logging, setup_logging, QuietError -from . import fs, CURRENT_FS_REV, PICKLE_PROTOCOL +from . import fs, CURRENT_FS_REV from .backends.pool import BackendPool from .block_cache import BlockCache -from .common import (get_backend_cachedir, get_seq_no, stream_write_bz2, stream_read_bz2, - get_backend_factory, pretty_print_size) +from .common import (get_backend_cachedir, get_seq_no, get_backend_factory, + load_params, freeze_basic_mapping) from .daemonize import daemonize from .database import Connection from .inode_cache import InodeCache -from .metadata import cycle_metadata, dump_metadata, restore_metadata +from .metadata import (download_metadata, upload_metadata, dump_and_upload_metadata, + dump_metadata) from .parse_args import ArgumentParser from .exit_stack import ExitStack from threading import Thread @@ -27,7 +28,6 @@ import itertools import os import platform import subprocess -import pickle import re import signal import resource @@ -116,7 +116,8 @@ def main(args=None): import pstats prof = cProfile.Profile() - backend_factory = get_backend_factory(options) + backend_factory = get_backend_factory(options.storage_url, options.backend_options, + options.authfile, options.compress) backend_pool = BackendPool(backend_factory) atexit.register(backend_pool.flush) @@ -242,26 +243,12 @@ def main(args=None): del backend['s3ql_seq_no_%d' % param['seq_no']] param['seq_no'] -= 1 with open(cachepath + '.params', 'wb') as fh: - pickle.dump(param, fh, PICKLE_PROTOCOL) + fh.write(freeze_basic_mapping(param)) elif seq_no == param['seq_no']: param['last-modified'] = time.time() - - log.info('Dumping metadata...') - with tempfile.TemporaryFile() as fh: - dump_metadata(db, fh) - def do_write(obj_fh): - fh.seek(0) - stream_write_bz2(fh, obj_fh) - return obj_fh - - log.info("Compressing and uploading metadata...") - obj_fh = backend.perform_write(do_write, "s3ql_metadata_new", - metadata=param, is_compressed=True) - log.info('Wrote %s of compressed metadata.', pretty_print_size(obj_fh.get_obj_size())) - log.info('Cycling metadata backups...') - cycle_metadata(backend) + dump_and_upload_metadata(backend, db, param) with open(cachepath + '.params', 'wb') as fh: - pickle.dump(param, fh, PICKLE_PROTOCOL) + fh.write(freeze_basic_mapping(param)) else: log.error('Remote metadata is newer than local (%d vs %d), ' 'refusing to overwrite!', seq_no, param['seq_no']) @@ -376,8 +363,7 @@ def get_metadata(backend, cachepath): # Check for cached metadata db = None if os.path.exists(cachepath + '.params'): - with open(cachepath + '.params', 'rb') as fh: - param = pickle.load(fh) + param = load_params(cachepath + '.params') if param['seq_no'] < seq_no: log.info('Ignoring locally cached metadata (outdated).') param = backend.lookup('s3ql_metadata') @@ -430,21 +416,10 @@ def get_metadata(backend, cachepath): # Download metadata if not db: - with tempfile.TemporaryFile() as tmpfh: - def do_read(fh): - tmpfh.seek(0) - tmpfh.truncate() - stream_read_bz2(fh, tmpfh) - - log.info('Downloading and decompressing metadata...') - backend.perform_read(do_read, "s3ql_metadata") - - log.info("Reading metadata...") - tmpfh.seek(0) - db = restore_metadata(tmpfh, cachepath + '.db') + db = download_metadata(backend, cachepath + '.db') with open(cachepath + '.params', 'wb') as fh: - pickle.dump(param, fh, PICKLE_PROTOCOL) + fh.write(freeze_basic_mapping(param)) return (param, db) @@ -454,7 +429,7 @@ def mark_metadata_dirty(backend, cachepath, param): param['seq_no'] += 1 param['needs_fsck'] = True with open(cachepath + '.params', 'wb') as fh: - pickle.dump(param, fh, PICKLE_PROTOCOL) + fh.write(freeze_basic_mapping(param)) # Fsync to make sure that the updated sequence number is committed to # disk. Otherwise, a crash immediately after mount could result in both @@ -679,17 +654,7 @@ class MetadataUploadThread(Thread): # Temporarily decrease sequence no, this is not the final upload self.param['seq_no'] -= 1 - def do_write(obj_fh): - fh.seek(0) - stream_write_bz2(fh, obj_fh) - return obj_fh - log.info("Compressing and uploading metadata...") - obj_fh = backend.perform_write(do_write, "s3ql_metadata_new", - metadata=self.param, is_compressed=True) - log.info('Wrote %s of compressed metadata.', - pretty_print_size(obj_fh.get_obj_size())) - log.info('Cycling metadata backups...') - cycle_metadata(backend) + upload_metadata(backend, fh, self.param) self.param['seq_no'] += 1 fh.close() diff --git a/src/s3ql/remove.py b/src/s3ql/remove.py index ac7ac22..8e98e26 100644 --- a/src/s3ql/remove.py +++ b/src/s3ql/remove.py @@ -8,11 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3. from .logging import logging, setup_logging, QuietError from .common import assert_fs_owner, path2bytes -from . import PICKLE_PROTOCOL from .parse_args import ArgumentParser import llfuse import os -import pickle import sys import textwrap @@ -54,10 +52,9 @@ def main(args=None): ctrlfile = assert_fs_owner(name) fstat_p = os.stat(os.path.dirname(os.path.abspath(name))) - llfuse.setxattr(ctrlfile, 'rmtree', pickle.dumps((fstat_p.st_ino, - path2bytes(os.path.basename(name))), - PICKLE_PROTOCOL)) - + cmd = ('(%d, %r)' % (fstat_p.st_ino, + path2bytes(os.path.basename(name)))).encode() + llfuse.setxattr(ctrlfile, 'rmtree', cmd) if __name__ == '__main__': main(sys.argv[1:]) diff --git a/src/s3ql/umount.py b/src/s3ql/umount.py index 164535c..f8affd3 100644 --- a/src/s3ql/umount.py +++ b/src/s3ql/umount.py @@ -8,11 +8,10 @@ This program can be distributed under the terms of the GNU GPLv3. from .logging import logging, setup_logging from . import CTRL_NAME -from .common import assert_s3ql_mountpoint +from .common import assert_s3ql_mountpoint, parse_literal from .parse_args import ArgumentParser import llfuse import os -import pickle import subprocess import platform import sys @@ -120,7 +119,7 @@ def blocking_umount(mountpoint): # Get pid log.debug('Trying to get pid') - pid = pickle.loads(llfuse.getxattr(ctrlfile, 's3ql_pid?')) + pid = parse_literal(llfuse.getxattr(ctrlfile, 's3ql_pid?'), int) log.debug('PID is %d', pid) # Get command line to make race conditions less-likely diff --git a/src/s3ql/verify.py b/src/s3ql/verify.py index c47a946..104998a 100644 --- a/src/s3ql/verify.py +++ b/src/s3ql/verify.py @@ -11,7 +11,6 @@ from .mount import get_metadata from . import BUFSIZE from .common import (get_backend_factory, get_backend_cachedir, pretty_print_size, AsyncFn) -from .backends.pool import BackendPool from .backends.common import NoSuchObject, CorruptedObjectError from .parse_args import ArgumentParser from queue import Queue, Full as QueueFull @@ -91,15 +90,14 @@ def main(args=None): options = parse_args(args) setup_logging(options) - backend_factory = get_backend_factory(options) - backend_pool = BackendPool(backend_factory) - atexit.register(backend_pool.flush) + backend_factory = get_backend_factory(options.storage_url, options.backend_options, + options.authfile) # Get paths cachepath = get_backend_cachedir(options.storage_url, options.cachedir) # Retrieve metadata - with backend_pool() as backend: + with backend_factory() as backend: (param, db) = get_metadata(backend, cachepath) retrieve_objects(db, backend_factory, options.corrupted_file, @@ -121,16 +119,14 @@ def retrieve_objects(db, backend_factory, corrupted_fh, missing_fh, queue = Queue(thread_count) threads = [] - backends = [] for _ in range(thread_count): - b = backend_factory() - t = AsyncFn(_retrieve_loop, queue, b, corrupted_fh, missing_fh, full) + t = AsyncFn(_retrieve_loop, queue, backend_factory, corrupted_fh, + missing_fh, full) # Don't wait for worker threads, gives deadlock if main thread # terminates with exception t.daemon = True t.start() threads.append(t) - backends.append(b) total_size = db.get_val('SELECT SUM(size) FROM objects') total_count = db.get_val('SELECT COUNT(id) FROM objects') @@ -182,12 +178,9 @@ def retrieve_objects(db, backend_factory, corrupted_fh, missing_fh, for t in threads: t.join_and_raise() - for backend in backends: - backend.close() - log.info('Verified all %d storage objects.', i) -def _retrieve_loop(queue, backend, corrupted_fh, missing_fh, full=False): +def _retrieve_loop(queue, backend_factory, corrupted_fh, missing_fh, full=False): '''Retrieve object ids arriving in *queue* from *backend* If *full* is False, lookup and read metadata. If *full* is True, @@ -199,30 +192,31 @@ def _retrieve_loop(queue, backend, corrupted_fh, missing_fh, full=False): Terminate when None is received. ''' - while True: - obj_id = queue.get() - if obj_id is None: - break - - log.debug('reading object %s', obj_id) - def do_read(fh): - while True: - buf = fh.read(BUFSIZE) - if not buf: - break - - key = 's3ql_data_%d' % obj_id - try: - if full: - backend.perform_read(do_read, key) - else: - backend.lookup(key) - except NoSuchObject: - log.warning('Backend seems to have lost object %d', obj_id) - print(key, file=missing_fh) - except CorruptedObjectError: - log.warning('Object %d is corrupted', obj_id) - print(key, file=corrupted_fh) + with backend_factory() as backend: + while True: + obj_id = queue.get() + if obj_id is None: + break + + log.debug('reading object %s', obj_id) + def do_read(fh): + while True: + buf = fh.read(BUFSIZE) + if not buf: + break + + key = 's3ql_data_%d' % obj_id + try: + if full: + backend.perform_read(do_read, key) + else: + backend.lookup(key) + except NoSuchObject: + log.warning('Backend seems to have lost object %d', obj_id) + print(key, file=missing_fh) + except CorruptedObjectError: + log.warning('Object %d is corrupted', obj_id) + print(key, file=corrupted_fh) if __name__ == '__main__': main(sys.argv[1:]) |