summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNikolaus Rath <Nikolaus@rath.org>2016-03-09 10:10:20 -0800
committerNikolaus Rath <Nikolaus@rath.org>2016-03-09 10:10:20 -0800
commit57ba7d4c658aa7c5d2e0ca2ae71e3915e6052b17 (patch)
tree192904d2eaa4f63ec239b644c75797c6024b2e2a /src
parent061b768a9d87d125df6edb494df519447fab21c6 (diff)
Import s3ql_2.14+dfsg.orig.tar.gz
Diffstat (limited to 'src')
-rw-r--r--src/s3ql.egg-info/PKG-INFO2
-rw-r--r--src/s3ql.egg-info/SOURCES.txt2
-rw-r--r--src/s3ql/__init__.py11
-rw-r--r--src/s3ql/adm.py258
-rw-r--r--src/s3ql/backends/common.py225
-rw-r--r--src/s3ql/backends/comprenc.py324
-rw-r--r--src/s3ql/backends/local.py26
-rw-r--r--src/s3ql/backends/s3c.py45
-rw-r--r--src/s3ql/backends/swift.py21
-rw-r--r--src/s3ql/backends/swiftks.py4
-rw-r--r--src/s3ql/common.py215
-rw-r--r--src/s3ql/cp.py7
-rw-r--r--src/s3ql/ctrl.py18
-rw-r--r--src/s3ql/fs.py50
-rw-r--r--src/s3ql/fsck.py103
-rw-r--r--src/s3ql/lock.py4
-rw-r--r--src/s3ql/metadata.py76
-rw-r--r--src/s3ql/mkfs.py31
-rw-r--r--src/s3ql/mount.py65
-rw-r--r--src/s3ql/remove.py9
-rw-r--r--src/s3ql/umount.py5
-rw-r--r--src/s3ql/verify.py68
22 files changed, 534 insertions, 1035 deletions
diff --git a/src/s3ql.egg-info/PKG-INFO b/src/s3ql.egg-info/PKG-INFO
index 0a4bac9..24517cd 100644
--- a/src/s3ql.egg-info/PKG-INFO
+++ b/src/s3ql.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: s3ql
-Version: 2.13
+Version: 2.14
Summary: a full-featured file system for online data storage
Home-page: https://bitbucket.org/nikratio/s3ql/
Author: Nikolaus Rath
diff --git a/src/s3ql.egg-info/SOURCES.txt b/src/s3ql.egg-info/SOURCES.txt
index 059d67d..dc1fed6 100644
--- a/src/s3ql.egg-info/SOURCES.txt
+++ b/src/s3ql.egg-info/SOURCES.txt
@@ -25,7 +25,6 @@ contrib/pcp.1
contrib/pcp.py
contrib/remove_objects.py
contrib/s3ql_backup.sh
-contrib/s3ql_upstart.conf
contrib/scramble_db.py
doc/manual.pdf
doc/html/.buildinfo
@@ -233,7 +232,6 @@ tests/pytest.ini
tests/t1_backends.py
tests/t1_dump.py
tests/t1_retry.py
-tests/t1_safe_unpickle.py
tests/t1_serialization.py
tests/t2_block_cache.py
tests/t3_fs_api.py
diff --git a/src/s3ql/__init__.py b/src/s3ql/__init__.py
index cccf83f..2521b85 100644
--- a/src/s3ql/__init__.py
+++ b/src/s3ql/__init__.py
@@ -19,20 +19,19 @@ __all__ = [ 'adm', 'backends', 'block_cache', 'common', 'calc_mro',
'fsck', 'inherit_docstrings', 'inode_cache', 'lock',
'logging', 'metadata', 'mkfs', 'mount', 'parse_args',
'remove', 'statfs', 'umount', 'VERSION', 'CURRENT_FS_REV',
- 'REV_VER_MAP', 'RELEASE', 'BUFSIZE', 'PICKLE_PROTOCOL',
+ 'REV_VER_MAP', 'RELEASE', 'BUFSIZE',
'CTRL_NAME', 'CTRL_INODE' ]
-VERSION = '2.13'
+VERSION = '2.14'
RELEASE = '%s' % VERSION
+# TODO: On next upgrade, remove pickle support from
+# common.py:load_params().
CURRENT_FS_REV = 22
# Buffer size when writing objects
BUFSIZE = 64 * 1024
-# Pickle protocol version to use.
-PICKLE_PROTOCOL = 2
-
# Name and inode of the special s3ql control file
CTRL_NAME = '.__s3ql__ctrl__'
CTRL_INODE = 2
@@ -40,7 +39,7 @@ CTRL_INODE = 2
# Maps file system revisions to the last S3QL version that
# supported this revision.
REV_VER_MAP = {
- 21: '2.12',
+ 21: '2.13',
20: '2.9',
16: '1.15',
15: '1.10',
diff --git a/src/s3ql/adm.py b/src/s3ql/adm.py
index 27a5dd8..6901b20 100644
--- a/src/s3ql/adm.py
+++ b/src/s3ql/adm.py
@@ -7,24 +7,18 @@ This program can be distributed under the terms of the GNU GPLv3.
'''
from .logging import logging, QuietError, setup_logging
-from . import CURRENT_FS_REV, REV_VER_MAP, PICKLE_PROTOCOL, BUFSIZE
+from . import CURRENT_FS_REV, REV_VER_MAP
from .backends.comprenc import ComprencBackend
from .database import Connection
-from .common import (get_backend_cachedir, get_seq_no, stream_write_bz2,
- stream_read_bz2, is_mounted, AsyncFn, get_backend,
- get_backend_factory, pretty_print_size, split_by_n,
- handle_on_return)
-from .metadata import restore_metadata, cycle_metadata, dump_metadata
+from .common import (get_backend_cachedir, get_seq_no, is_mounted, get_backend,
+ freeze_basic_mapping, load_params)
+from .metadata import dump_and_upload_metadata, download_metadata
from .parse_args import ArgumentParser
from datetime import datetime as Datetime
from getpass import getpass
-from base64 import b64encode
-from queue import Queue, Full as QueueFull
import os
-import pickle
import shutil
import sys
-import tempfile
import textwrap
import time
@@ -87,20 +81,20 @@ def main(args=None):
raise QuietError('Can not work on mounted file system.')
if options.action == 'clear':
- with get_backend(options, plain=True) as backend:
+ with get_backend(options, raw=True) as backend:
return clear(backend, options)
- if options.action == 'upgrade':
- return upgrade(options)
-
with get_backend(options) as backend:
- if options.action == 'passphrase':
+ if options.action == 'upgrade':
+ return upgrade(backend, get_backend_cachedir(options.storage_url,
+ options.cachedir))
+ elif options.action == 'passphrase':
return change_passphrase(backend)
- if options.action == 'download-metadata':
- return download_metadata(backend, options.storage_url)
+ elif options.action == 'download-metadata':
+ return download_metadata_cmd(backend, options.storage_url)
-def download_metadata(backend, storage_url):
+def download_metadata_cmd(backend, storage_url):
'''Download old metadata backups'''
backups = sorted(backend.list('s3ql_metadata'))
@@ -139,25 +133,14 @@ def download_metadata(backend, storage_url):
raise QuietError('%s already exists, aborting.' % cachepath + i)
param = backend.lookup(name)
- with tempfile.TemporaryFile() as tmpfh:
- def do_read(fh):
- tmpfh.seek(0)
- tmpfh.truncate()
- stream_read_bz2(fh, tmpfh)
-
- log.info('Downloading and decompressing %s...', name)
- backend.perform_read(do_read, name)
-
- log.info("Reading metadata...")
- tmpfh.seek(0)
- restore_metadata(tmpfh, cachepath + '.db')
+ download_metadata(backend, cachepath + ".db", name)
# Raise sequence number so that fsck.s3ql actually uses the
# downloaded backup
seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in backend.list('s3ql_seq_no_') ]
param['seq_no'] = max(seq_nos) + 1
with open(cachepath + '.params', 'wb') as fh:
- pickle.dump(param, fh, PICKLE_PROTOCOL)
+ fh.write(freeze_basic_mapping(param))
def change_passphrase(backend):
'''Change file system passphrase'''
@@ -221,23 +204,16 @@ def get_old_rev_msg(rev, prog):
''' % { 'version': REV_VER_MAP[rev],
'prog': prog })
-
-@handle_on_return
-def upgrade(options, on_return):
+def upgrade(backend, cachepath):
'''Upgrade file system to newest revision'''
log.info('Getting file system parameters..')
- cachepath = get_backend_cachedir(options.storage_url, options.cachedir)
-
- backend_factory = get_backend_factory(options)
- backend = on_return.enter_context(backend_factory())
# Check for cached metadata
db = None
seq_no = get_seq_no(backend)
if os.path.exists(cachepath + '.params'):
- with open(cachepath + '.params', 'rb') as fh:
- param = pickle.load(fh)
+ param = load_params(cachepath + '.params')
if param['seq_no'] < seq_no:
log.info('Ignoring locally cached metadata (outdated).')
param = backend.lookup('s3ql_metadata')
@@ -271,7 +247,10 @@ def upgrade(options, on_return):
raise QuietError()
# Check revision
- if param['revision'] < CURRENT_FS_REV-1:
+ # Upgrade from 21 to 22 is only possible with release 2.13,
+ # because we removed support for reading the old storage object
+ # format after 2.13.
+ if param['revision'] == 21 or param['revision'] < CURRENT_FS_REV-1:
print(textwrap.dedent('''
File system revision too old to upgrade!
@@ -305,18 +284,7 @@ def upgrade(options, on_return):
if not db:
# Need to download metadata
- with tempfile.TemporaryFile() as tmpfh:
- def do_read(fh):
- tmpfh.seek(0)
- tmpfh.truncate()
- stream_read_bz2(fh, tmpfh)
-
- log.info("Downloading & uncompressing metadata...")
- backend.perform_read(do_read, "s3ql_metadata")
-
- log.info("Reading metadata...")
- tmpfh.seek(0)
- db = restore_metadata(tmpfh, cachepath + '.db')
+ db = download_metadata(backend, cachepath + '.db')
log.info('Upgrading from revision %d to %d...', param['revision'], CURRENT_FS_REV)
@@ -324,41 +292,14 @@ def upgrade(options, on_return):
param['last-modified'] = time.time()
param['seq_no'] += 1
- # Ensure that there are backups of the master key
- if backend.passphrase is not None:
- data_pw = backend.passphrase
- backend.passphrase = backend.fs_passphrase
- for i in range(1,4):
- obj_id = 's3ql_passphrase_bak%d' % i
- if obj_id not in backend:
- backend[obj_id] = data_pw
- backend.passphrase = data_pw
-
- # Upgrade all objects, so that we can remove legacy conversion
- # routines in the next release.
- update_obj_metadata(backend, backend_factory, db, options.threads)
-
- log.info('Dumping metadata...')
- with tempfile.TemporaryFile() as fh:
- dump_metadata(db, fh)
- def do_write(obj_fh):
- fh.seek(0)
- stream_write_bz2(fh, obj_fh)
- return obj_fh
-
- log.info("Compressing and uploading metadata...")
- backend.store('s3ql_seq_no_%d' % param['seq_no'], b'Empty')
- obj_fh = backend.perform_write(do_write, "s3ql_metadata_new", metadata=param,
- is_compressed=True)
-
- log.info('Wrote %s of compressed metadata.', pretty_print_size(obj_fh.get_obj_size()))
- log.info('Cycling metadata backups...')
- cycle_metadata(backend)
+ # Upgrade code goes here
+
+ dump_and_upload_metadata(backend, db, param)
backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty'
with open(cachepath + '.params', 'wb') as fh:
- pickle.dump(param, fh, PICKLE_PROTOCOL)
+ fh.write(freeze_basic_mapping(param))
log.info('Cleaning up local metadata...')
db.execute('ANALYZE')
@@ -366,154 +307,5 @@ def upgrade(options, on_return):
print('File system upgrade complete.')
- if backend.passphrase is not None:
- print('Please store the following master key in a safe location. It allows ',
- 'decryption of the S3QL file system in case the storage objects holding ',
- 'this information get corrupted:',
- '---BEGIN MASTER KEY---',
- ' '.join(split_by_n(b64encode(backend.passphrase).decode(), 4)),
- '---END MASTER KEY---',
- sep='\n')
-
-@handle_on_return
-def update_obj_metadata(backend, backend_factory, db,
- thread_count, on_return):
- '''Upgrade metadata of storage objects'''
-
- plain_backend = backend.backend
-
- # No need to update sequence number, since we are going to
- # write out a new one after the upgrade.
- if backend.passphrase is None:
- extra_objects = { 's3ql_metadata' }
- else:
- extra_objects = { 's3ql_metadata',
- 's3ql_passphrase', 's3ql_passphrase_bak1',
- 's3ql_passphrase_bak2', 's3ql_passphrase_bak3' }
-
- for i in range(30):
- obj_id = 's3ql_metadata_bak_%d' % i
- if obj_id in plain_backend:
- extra_objects.add(obj_id)
-
- def yield_objects():
- for (id_,) in db.query('SELECT id FROM objects'):
- yield 's3ql_data_%d' % id_
- for obj_id in extra_objects:
- yield obj_id
- total = db.get_val('SELECT COUNT(id) FROM objects') + len(extra_objects)
-
- queue = Queue(maxsize=thread_count)
- threads = []
- for _ in range(thread_count):
- t = AsyncFn(upgrade_loop, queue, on_return.push(backend_factory()))
- # Don't wait for worker threads, gives deadlock if main thread
- # terminates with exception
- t.daemon = True
- t.start()
- threads.append(t)
-
- # Updating this value is prone to race conditions. However,
- # we don't care because this is for an approximate progress
- # output only.
- queue.rewrote_size = 0
- stamp = 0
- for (i, obj_id) in enumerate(yield_objects()):
- stamp2 = time.time()
- if stamp2 - stamp > 1:
- sys.stdout.write('\r..processed %d/%d objects (%.1f%%, %s rewritten)..'
- % (i, total, i/total*100,
- pretty_print_size(queue.rewrote_size)))
- sys.stdout.flush()
- stamp = stamp2
-
- # Terminate early if any thread failed with an exception
- for t in threads:
- if not t.is_alive():
- t.join_and_raise()
-
- # Avoid blocking if all threads terminated
- while True:
- try:
- queue.put(obj_id, timeout=1)
- except QueueFull:
- pass
- else:
- break
- for t in threads:
- if not t.is_alive():
- t.join_and_raise()
-
- queue.maxsize += len(threads)
- for t in threads:
- queue.put(None)
-
- for t in threads:
- t.join_and_raise()
-
- sys.stdout.write('\n')
-
-def upgrade_loop(queue, backend):
- plain_backend = backend.backend
-
- while True:
- obj_id = queue.get()
- if obj_id is None:
- break
-
- meta = plain_backend.lookup(obj_id)
- if meta.get('format_version', 0) == 2:
- continue
-
- # For important objects, we make a copy first (just to be safe)
- if not obj_id.startswith('s3ql_data'):
- plain_backend.copy(obj_id, 's3ql_pre2.13' + obj_id[4:])
-
- # When reading passphrase objects, we have to use the
- # "outer" password
- if obj_id.startswith('s3ql_passphrase'):
- data_pw = backend.passphrase
- backend.passphrase = backend.fs_passphrase
-
- meta = backend._convert_legacy_metadata(meta)
- if meta['encryption'] == 'AES':
- # Two statements to reduce likelihood of update races
- size = rewrite_legacy_object(backend, obj_id)
- queue.rewrote_size += size
- else:
- plain_backend.update_meta(obj_id, meta)
-
- if obj_id.startswith('s3ql_passphrase'):
- backend.passphrase = data_pw
-
-def rewrite_legacy_object(backend, obj_id):
- with tempfile.TemporaryFile() as tmpfh:
-
- # Read object
- def do_read(fh):
- tmpfh.seek(0)
- tmpfh.truncate()
- while True:
- buf = fh.read(BUFSIZE)
- if not buf:
- break
- tmpfh.write(buf)
- return fh.metadata
-
- meta = backend.perform_read(do_read, obj_id)
-
- # Write object
- def do_write(fh):
- tmpfh.seek(0)
- while True:
- buf = tmpfh.read(BUFSIZE)
- if not buf:
- break
- fh.write(buf)
- return fh
- out_fh = backend.perform_write(do_write, obj_id, meta)
-
- return out_fh.get_obj_size()
-
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/src/s3ql/backends/common.py b/src/s3ql/backends/common.py
index 7d0b261..cfcc4f8 100644
--- a/src/s3ql/backends/common.py
+++ b/src/s3ql/backends/common.py
@@ -10,37 +10,90 @@ from ..logging import logging, QuietError, LOG_ONCE # Ensure use of custom logge
from abc import abstractmethod, ABCMeta
from functools import wraps
import time
-import codecs
-import io
import textwrap
import hashlib
import struct
import hmac
import inspect
-from base64 import b64decode, b64encode
-import binascii
-from ast import literal_eval
import ssl
import os
import re
-import pickletools
-import pickle
+import threading
log = logging.getLogger(__name__)
+class RateTracker:
+ '''
+ Maintain an average occurence rate for events over a configurable time
+ window. The rate is computed with one second resolution.
+ '''
+
+ def __init__(self, window_length):
+ if not isinstance(window_length, int):
+ raise ValueError('only integer window lengths are supported')
+
+ self.buckets = [0] * window_length
+ self.window_length = window_length
+ self.last_update = int(time.monotonic())
+ self.lock = threading.Lock()
+
+ def register(self, _not_really=False):
+ '''Register occurence of an event.
+
+ The keyword argument is for class-internal use only.
+ '''
+
+ buckets = self.buckets
+ bucket_count = len(self.buckets)
+ now = int(time.monotonic())
+
+ elapsed = min(now - self.last_update, bucket_count)
+ for i in range(elapsed):
+ buckets[(now - i) % bucket_count] = 0
+
+ if _not_really:
+ return
+
+ with self.lock:
+ buckets[now % bucket_count] += 1
+ self.last_update = now
+
+ def get_rate(self):
+ '''Return average rate of event occurance'''
+
+ self.register(_not_really=True)
+ return sum(self.buckets) / len(self.buckets)
+
+ def get_count(self):
+ '''Return total number of events in window'''
+
+ self.register(_not_really=True)
+ return sum(self.buckets)
+
+
+# We maintain a (global) running average of temporary errors, so
+# that we can log a warning if this number becomes large. We
+# use a relatively large window to prevent bogus spikes if
+# multiple threads all have to retry after a long period of
+# inactivity.
RETRY_TIMEOUT = 60 * 60 * 24
-def retry(method):
+def retry(method, _tracker=RateTracker(60)):
'''Wrap *method* for retrying on some exceptions
If *method* raises an exception for which the instance's
- `is_temp_failure(exc)` method is true, the *method* is called again
- at increasing intervals. If this persists for more than `RETRY_TIMEOUT`
- seconds, the most-recently caught exception is re-raised.
+ `is_temp_failure(exc)` method is true, the *method* is called again at
+ increasing intervals. If this persists for more than `RETRY_TIMEOUT`
+ seconds, the most-recently caught exception is re-raised. If the
+ method defines a keyword parameter *is_retry*, then this parameter
+ will be set to True whenever the function is retried.
'''
if inspect.isgeneratorfunction(method):
raise TypeError('Wrapping a generator function is pointless')
+ sig = inspect.signature(method)
+ has_is_retry = 'is_retry' in sig.parameters
+
@wraps(method)
def wrapped(*a, **kw):
self = a[0]
@@ -48,6 +101,8 @@ def retry(method):
waited = 0
retries = 0
while True:
+ if has_is_retry:
+ kw['is_retry'] = (retries > 0)
try:
return method(*a, **kw)
except Exception as exc:
@@ -55,6 +110,16 @@ def retry(method):
#pylint: disable=W0212
if not self.is_temp_failure(exc):
raise
+
+ _tracker.register()
+ rate = _tracker.get_rate()
+ if rate > 5:
+ log.warning('Had to retry %d times over the last %d seconds, '
+ 'server or network problem?',
+ rate * _tracker.window_length, _tracker.window_length)
+ else:
+ log.debug('Average retry rate: %.2f Hz', rate)
+
if waited > RETRY_TIMEOUT:
log.error('%s.%s(*): Timeout exceeded, re-raising %r exception',
self.__class__.__name__, method.__name__, exc)
@@ -68,8 +133,9 @@ def retry(method):
else:
log_fn = log.warning
- log_fn('Encountered %s exception (%s), retrying call to %s.%s for the %d-th time...',
- type(exc).__name__, exc, self.__class__.__name__, method.__name__, retries)
+ log_fn('Encountered %s (%s), retrying %s.%s (attempt %d)...',
+ type(exc).__name__, exc, self.__class__.__name__, method.__name__,
+ retries)
if hasattr(exc, 'retry_after') and exc.retry_after:
log.debug('retry_after is %.2f seconds', exc.retry_after)
@@ -436,7 +502,8 @@ class AbstractBackend(object, metaclass=ABCMeta):
def update_meta(self, key, metadata):
"""Replace metadata of *key* with *metadata*
- Metadata must be `dict` instance and pickle-able.
+ `metadata` must be a mapping with keys of type `str`, and values of an
+ elementary type (`str`, `bytes`, `int`, `float` or `bool`).
"""
pass
@@ -583,72 +650,6 @@ def get_proxy(ssl):
return proxy
-
-class ThawError(Exception):
- def __str__(self):
- return 'Malformed serialization data'
-
-def thaw_basic_mapping(buf):
- '''Reconstruct dict from serialized representation
-
- *buf* must be a bytes-like object as created by
- `freeze_basic_mapping`. Raises `ThawError` if *buf* is not a valid
- representation.
-
- This procedure is safe even if *buf* comes from an untrusted source.
- '''
-
- try:
- d = literal_eval(buf.decode('utf-8'))
- except (UnicodeDecodeError, SyntaxError, ValueError):
- raise ThawError()
-
- # Decode bytes values
- for (k,v) in d.items():
- if not isinstance(v, bytes):
- continue
- try:
- d[k] = b64decode(v)
- except binascii.Error:
- raise ThawError()
-
- return d
-
-def freeze_basic_mapping(d):
- '''Serialize mapping of elementary types
-
- Keys of *d* must be strings. Values of *d* must be of elementary type (i.e.,
- `str`, `bytes`, `int`, `float`, `complex`, `bool` or None).
-
- The output is a bytestream that can be used to reconstruct the mapping. The
- bytestream is not guaranteed to be deterministic. Look at
- `checksum_basic_mapping` if you need a deterministic bytestream.
- '''
-
- els = []
- for (k,v) in d.items():
- if not isinstance(k, str):
- raise ValueError('key %s must be str, not %s' % (k, type(k)))
-
- if (not isinstance(v, (str, bytes, bytearray, int, float, complex, bool))
- and v is not None):
- raise ValueError('value for key %s (%s) is not elementary' % (k, v))
-
- # To avoid wasting space, we b64encode non-ascii byte values.
- if isinstance(v, (bytes, bytearray)):
- v = b64encode(v)
-
- # This should be a pretty safe assumption for elementary types, but we
- # add an assert just to be safe (Python docs just say that repr makes
- # "best effort" to produce something parseable)
- (k_repr, v_repr) = (repr(k), repr(v))
- assert (literal_eval(k_repr), literal_eval(v_repr)) == (k, v)
-
- els.append(('%s: %s' % (k_repr, v_repr)))
-
- buf = '{ %s }' % ', '.join(els)
- return buf.encode('utf-8')
-
def checksum_basic_mapping(metadata, key=None):
'''Compute checksum for mapping of elementary types
@@ -695,67 +696,3 @@ def checksum_basic_mapping(metadata, key=None):
chk.update(val)
return chk.digest()
-
-
-SAFE_UNPICKLE_OPCODES = {'BININT', 'BININT1', 'BININT2', 'LONG1', 'LONG4',
- 'BINSTRING', 'SHORT_BINSTRING', 'GLOBAL',
- 'NONE', 'NEWTRUE', 'NEWFALSE', 'BINUNICODE',
- 'BINFLOAT', 'EMPTY_LIST', 'APPEND', 'APPENDS',
- 'LIST', 'EMPTY_TUPLE', 'TUPLE', 'TUPLE1', 'TUPLE2',
- 'TUPLE3', 'EMPTY_DICT', 'DICT', 'SETITEM',
- 'SETITEMS', 'POP', 'DUP', 'MARK', 'POP_MARK',
- 'BINGET', 'LONG_BINGET', 'BINPUT', 'LONG_BINPUT',
- 'PROTO', 'STOP', 'REDUCE'}
-
-SAFE_UNPICKLE_GLOBAL_NAMES = { ('__builtin__', 'bytearray'),
- ('__builtin__', 'set'),
- ('__builtin__', 'frozenset'),
- ('_codecs', 'encode') }
-SAFE_UNPICKLE_GLOBAL_OBJS = { bytearray, set, frozenset, codecs.encode }
-
-class SafeUnpickler(pickle.Unpickler):
- def find_class(self, module, name):
- if (module, name) not in SAFE_UNPICKLE_GLOBAL_NAMES:
- raise pickle.UnpicklingError("global '%s.%s' is unsafe" %
- (module, name))
- ret = super().find_class(module, name)
- if ret not in SAFE_UNPICKLE_GLOBAL_OBJS:
- raise pickle.UnpicklingError("global '%s.%s' is unsafe" %
- (module, name))
- return ret
-
-
-def safe_unpickle_fh(fh, fix_imports=True, encoding="ASCII",
- errors="strict"):
- '''Safely unpickle untrusted data from *fh*
-
- *fh* must be seekable.
- '''
-
- if not fh.seekable():
- raise TypeError('*fh* must be seekable')
- pos = fh.tell()
-
- # First make sure that we know all used opcodes
- try:
- for (opcode, arg, _) in pickletools.genops(fh):
- if opcode.proto > 2 or opcode.name not in SAFE_UNPICKLE_OPCODES:
- raise pickle.UnpicklingError('opcode %s is unsafe' % opcode.name)
- except (ValueError, EOFError):
- raise pickle.UnpicklingError('corrupted data')
-
- fh.seek(pos)
-
- # Then use a custom Unpickler to ensure that we only give access to
- # specific, whitelisted globals. Note that with the above opcodes, there is
- # no way to trigger attribute access, so "brachiating" from a white listed
- # object to __builtins__ is not possible.
- return SafeUnpickler(fh, fix_imports=fix_imports,
- encoding=encoding, errors=errors).load()
-
-def safe_unpickle(buf, fix_imports=True, encoding="ASCII",
- errors="strict"):
- '''Safely unpickle untrusted data in *buf*'''
-
- return safe_unpickle_fh(io.BytesIO(buf), fix_imports=fix_imports,
- encoding=encoding, errors=errors)
diff --git a/src/s3ql/backends/comprenc.py b/src/s3ql/backends/comprenc.py
index bade392..b3ffc1b 100644
--- a/src/s3ql/backends/comprenc.py
+++ b/src/s3ql/backends/comprenc.py
@@ -7,21 +7,17 @@ This program can be distributed under the terms of the GNU GPLv3.
'''
from ..logging import logging # Ensure use of custom logger class
-from .. import BUFSIZE, PICKLE_PROTOCOL
-from .common import (AbstractBackend, CorruptedObjectError, safe_unpickle,
- freeze_basic_mapping, thaw_basic_mapping, ThawError,
- checksum_basic_mapping)
+from .. import BUFSIZE
+from .common import AbstractBackend, CorruptedObjectError, checksum_basic_mapping
+from ..common import ThawError, freeze_basic_mapping, thaw_basic_mapping
from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring,
ABCDocstMeta)
from Crypto.Cipher import AES
from Crypto.Util import Counter
-from base64 import b64decode, b64encode
-import binascii
import bz2
import hashlib
import hmac
import lzma
-import pickle
import io
import struct
import time
@@ -71,8 +67,6 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta):
@copy_ancestor_docstring
def lookup(self, key):
meta_raw = self.backend.lookup(key)
- if meta_raw.get('format_version', 0) < 2:
- meta_raw = self._convert_legacy_metadata(meta_raw)
return self._verify_meta(key, meta_raw)[1]
@prepend_ancestor_docstring
@@ -153,32 +147,12 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta):
"""
fh = self.backend.open_read(key)
- checksum_warning = False
try:
- if fh.metadata.get('format_version', 0) < 2:
- meta_raw = self._convert_legacy_metadata(fh.metadata)
- else:
- meta_raw = fh.metadata
-
- # Also checks if this is a BetterBucket storage object
+ meta_raw = fh.metadata
(nonce, meta) = self._verify_meta(key, meta_raw)
if nonce:
data_key = sha256(self.passphrase + nonce)
- compr_alg = meta_raw['compression']
- encr_alg = meta_raw['encryption']
-
- if compr_alg == 'BZIP2':
- decompressor = bz2.BZ2Decompressor()
- elif compr_alg == 'LZMA':
- decompressor = lzma.LZMADecompressor()
- elif compr_alg == 'ZLIB':
- decompressor = zlib.decompressobj()
- elif compr_alg == 'None':
- decompressor = None
- else:
- raise RuntimeError('Unsupported compression: %s' % compr_alg)
-
# The `payload_offset` key only exists if the storage object was
# created with on old S3QL version. In order to avoid having to
# download and re-upload the entire object during the upgrade, the
@@ -189,24 +163,27 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta):
while to_skip:
to_skip -= len(fh.read(to_skip))
- # If we've come this far, we want to emit a warning if the object
- # has not been read completely on close().
- checksum_warning = True
-
- if encr_alg == 'AES':
- fh = LegacyDecryptDecompressFilter(fh, data_key, decompressor)
- decompressor = None
- elif encr_alg == 'AES_v2':
+ encr_alg = meta_raw['encryption']
+ if encr_alg == 'AES_v2':
fh = DecryptFilter(fh, data_key)
elif encr_alg != 'None':
raise RuntimeError('Unsupported encryption: %s' % encr_alg)
- if decompressor:
- fh = DecompressFilter(fh, decompressor)
+ compr_alg = meta_raw['compression']
+ if compr_alg == 'BZIP2':
+ fh = DecompressFilter(fh, bz2.BZ2Decompressor())
+ elif compr_alg == 'LZMA':
+ fh = DecompressFilter(fh, lzma.LZMADecompressor())
+ elif compr_alg == 'ZLIB':
+ fh = DecompressFilter(fh,zlib.decompressobj())
+ elif compr_alg != 'None':
+ raise RuntimeError('Unsupported compression: %s' % compr_alg)
fh.metadata = meta
except:
- fh.close(checksum_warning=checksum_warning)
+ # Don't emit checksum warning, caller hasn't even
+ # started reading anything.
+ fh.close(checksum_warning=False)
raise
return fh
@@ -298,8 +275,6 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta):
def _copy_or_rename(self, src, dest, rename, metadata=None):
meta_raw = self.backend.lookup(src)
- if meta_raw.get('format_version', 0) < 2:
- meta_raw = self._convert_legacy_metadata(meta_raw)
(nonce, meta_old) = self._verify_meta(src, meta_raw)
if nonce:
@@ -328,182 +303,6 @@ class ComprencBackend(AbstractBackend, metaclass=ABCDocstMeta):
def close(self):
self.backend.close()
- def _convert_legacy_metadata(self, meta):
- '''Convert metadata to newest format
-
- This method ensures that we can read objects written
- by older S3QL versions.
- '''
-
- format_version = meta.get('format_version', 0)
- assert format_version in (0,1)
- if format_version == 0:
- meta = self._convert_legacy_metadata0(meta)
- return self._convert_legacy_metadata1(meta)
-
- def _convert_legacy_metadata0(self, meta,
- LEN_BYTES = struct.calcsize(b'<B'),
- TIME_BYTES = struct.calcsize(b'<f')):
- meta_new = dict(format_version=1)
-
- if ('encryption' in meta and
- 'compression' in meta):
- meta_new['encryption'] = meta['encryption']
- meta_new['compression'] = meta['compression']
-
- elif 'encrypted' in meta:
- s = meta['encrypted']
- if s == 'True':
- meta_new['encryption'] = 'AES'
- meta_new['compression'] = 'BZIP2'
-
- elif s == 'False':
- meta_new['encryption'] = 'None'
- meta_new['compression'] = 'None'
-
- elif s.startswith('AES/'):
- meta_new['encryption'] = 'AES'
- meta_new['compression'] = s[4:]
-
- elif s.startswith('PLAIN/'):
- meta_new['encryption'] = 'None'
- meta_new['compression'] = s[6:]
- else:
- raise RuntimeError('Unsupported encryption')
-
- if meta_new['compression'] == 'BZ2':
- meta_new['compression'] = 'BZIP2'
-
- if meta_new['compression'] == 'NONE':
- meta_new['compression'] = 'None'
- else:
- meta_new['encryption'] = 'None'
- meta_new['compression'] = 'None'
-
- # Extract metadata (pre 2.x versions use multiple headers)
- if any(k.startswith('meta') for k in meta):
- parts = [ meta[k] for k in sorted(meta.keys())
- if k.startswith('meta') ]
- meta_new['data'] = ''.join(parts)
- else:
- try:
- meta_new['data'] = meta['data']
- except KeyError:
- raise CorruptedObjectError('meta key data is missing')
-
- if not self.passphrase:
- return meta_new
-
- meta_buf = b64decode(meta_new['data'])
- off = 0
- def read(len_):
- nonlocal off
- tmp = meta_buf[off:off+len_]
- off += len_
- return tmp
-
- len_ = struct.unpack(b'<B', read(LEN_BYTES))[0]
- nonce = read(len_)
- key = sha256(self.passphrase + nonce)
- cipher = aes_cipher(key)
- hmac_ = hmac.new(key, digestmod=hashlib.sha256)
- hash_ = read(HMAC_SIZE)
- meta_buf = meta_buf[off:]
- meta_buf_plain = cipher.decrypt(meta_buf)
- hmac_.update(meta_buf_plain)
- hash_ = cipher.decrypt(hash_)
-
- if not hmac.compare_digest(hash_, hmac_.digest()):
- raise CorruptedObjectError('HMAC mismatch')
-
- obj_id = nonce[TIME_BYTES:].decode('utf-8')
- meta_key = sha256(self.passphrase + nonce + b'meta')
- meta_new['nonce'] = b64encode(nonce)
- meta_new['payload_offset'] = LEN_BYTES + len(nonce)
- meta_new['data'] = b64encode(aes_cipher(meta_key).encrypt(meta_buf_plain))
- meta_new['object_id'] = b64encode(obj_id.encode('utf-8'))
- meta_new['signature'] = calc_legacy_meta_checksum(meta_new, meta_key)
-
- return meta_new
-
- def _convert_legacy_metadata1(self, metadata):
- if not isinstance(metadata, dict):
- raise CorruptedObjectError('metadata should be dict, not %s' % type(metadata))
-
- for mkey in ('encryption', 'compression', 'data'):
- if mkey not in metadata:
- raise CorruptedObjectError('meta key %s is missing' % mkey)
-
- encr_alg = metadata['encryption']
- encrypted = (encr_alg != 'None')
-
- if encrypted and self.passphrase is None:
- raise CorruptedObjectError('Encrypted object and no passphrase supplied')
-
- elif not encrypted and self.passphrase is not None:
- raise ObjectNotEncrypted()
-
- try:
- meta_buf = b64decode(metadata['data'])
- except binascii.Error:
- raise CorruptedObjectError('Invalid metadata, b64decode failed')
-
- if not encrypted:
- try:
- meta2 = safe_unpickle(meta_buf, encoding='latin1')
- except pickle.UnpicklingError as exc:
- raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc)
- if meta2 is None:
- meta2 = dict()
- metadata['data'] = freeze_basic_mapping(meta2)
- metadata['format_version'] = 2
- return metadata
-
- # Encrypted
- for mkey in ('nonce', 'signature', 'object_id'):
- if mkey not in metadata:
- raise CorruptedObjectError('meta key %s is missing' % mkey)
-
- nonce = b64decode(metadata['nonce'])
- meta_key = sha256(self.passphrase + nonce + b'meta')
- meta_sig = calc_legacy_meta_checksum(metadata, meta_key)
- if not hmac.compare_digest(metadata['signature'], meta_sig):
- raise CorruptedObjectError('HMAC mismatch')
-
- buf = aes_cipher(meta_key).decrypt(meta_buf)
- try:
- meta2 = safe_unpickle(buf, encoding='latin1')
- except pickle.UnpicklingError as exc:
- raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc)
- if meta2 is None:
- meta2 = dict()
-
- meta_buf = freeze_basic_mapping(meta2)
- metadata['nonce'] = nonce
- metadata['object_id'] = b64decode(metadata['object_id']).decode('utf-8')
- metadata['data'] = aes_cipher(meta_key).encrypt(meta_buf)
- metadata['format_version'] = 2
- metadata['signature'] = checksum_basic_mapping(metadata, meta_key)
-
- return metadata
-
-def calc_legacy_meta_checksum(metadata, key):
- # This works most of the time, so we still try to validate the
- # signature. But in general, the pickle output is not unique so this is
- # not a good way to compute a checksum.
- chk = hmac.new(key, digestmod=hashlib.sha256)
- for mkey in sorted(metadata.keys()):
- assert isinstance(mkey, str)
- if mkey == 'signature':
- continue
- val = metadata[mkey]
- if isinstance(val, str):
- val = val.encode('utf-8')
- elif not isinstance(val, (bytes, bytearray)):
- val = pickle.dumps(val, PICKLE_PROTOCOL)
- chk.update(mkey.encode('utf-8') + val)
- return b64encode(chk.digest())
-
class CompressFilter(object):
'''Compress data while writing'''
@@ -842,93 +641,6 @@ class DecryptFilter(InputFilter):
self.close()
return False
-class LegacyDecryptDecompressFilter(io.RawIOBase):
- '''Decrypt and Decompress data while reading
-
- Reader has to read the entire stream in order for HMAC
- checking to work.
- '''
-
- def __init__(self, fh, key, decomp):
- '''Initialize
-
- *fh* should be a file-like object and may be unbuffered.
- '''
- super().__init__()
-
- self.fh = fh
- self.decomp = decomp
- self.hmac_checked = False
- self.cipher = aes_cipher(key)
- self.hmac = hmac.new(key, digestmod=hashlib.sha256)
- self.hash = fh.read(HMAC_SIZE)
-
- def discard_input(self):
- while True:
- buf = self.fh.read(BUFSIZE)
- if not buf:
- break
-
- def _decrypt(self, buf):
- # Work around https://bugs.launchpad.net/pycrypto/+bug/1256172
- # cipher.decrypt refuses to work with anything but bytes
- if not isinstance(buf, bytes):
- buf = bytes(buf)
-
- len_ = len(buf)
- buf = self.cipher.decrypt(buf)
- assert len(buf) == len_
- return buf
-
- def read(self, size=-1):
- '''Read up to *size* bytes
-
- This method is currently buggy and may also return *more*
- than *size* bytes. Callers should be prepared to handle
- that. This is because some of the used (de)compression modules
- don't support output limiting.
- '''
-
- if size == -1:
- return self.readall()
- elif size == 0:
- return b''
-
- buf = None
- while not buf:
- buf = self.fh.read(size)
- if not buf and not self.hmac_checked:
- if not hmac.compare_digest(self._decrypt(self.hash),
- self.hmac.digest()):
- raise CorruptedObjectError('HMAC mismatch')
- elif self.decomp and self.decomp.unused_data:
- raise CorruptedObjectError('Data after end of compressed stream')
- else:
- self.hmac_checked = True
- return b''
- elif not buf:
- return b''
-
- buf = self._decrypt(buf)
- if not self.decomp:
- break
-
- buf = decompress(self.decomp, buf)
-
- self.hmac.update(buf)
- return buf
-
- def close(self, *a, **kw):
- self.fh.close(*a, **kw)
-
- def __enter__(self):
- return self
-
- def __exit__(self, *a):
- self.close()
- return False
-
-
def decompress(decomp, buf):
'''Decompress *buf* using *decomp*
diff --git a/src/s3ql/backends/local.py b/src/s3ql/backends/local.py
index 99ba0ad..1c4719d 100644
--- a/src/s3ql/backends/local.py
+++ b/src/s3ql/backends/local.py
@@ -10,13 +10,12 @@ from ..logging import logging # Ensure use of custom logger class
from .. import BUFSIZE
from ..inherit_docstrings import (copy_ancestor_docstring, ABCDocstMeta)
from .common import (AbstractBackend, DanglingStorageURLError, NoSuchObject,
- CorruptedObjectError, safe_unpickle_fh, ThawError,
- freeze_basic_mapping, thaw_basic_mapping)
+ CorruptedObjectError)
+from ..common import ThawError, freeze_basic_mapping, thaw_basic_mapping
import _thread
import struct
import io
import os
-import pickle
import shutil
log = logging.getLogger(__name__)
@@ -241,19 +240,14 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
def _read_meta(fh):
buf = fh.read(9)
- if buf.startswith(b's3ql_1\n'):
- len_ = struct.unpack('<H', buf[-2:])[0]
- try:
- return thaw_basic_mapping(fh.read(len_))
- except ThawError:
- raise CorruptedObjectError('Invalid metadata')
- else:
- fh.seek(0)
- try:
- return safe_unpickle_fh(fh, encoding='latin1')
- except pickle.UnpicklingError as exc:
- raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc)
-
+ if not buf.startswith(b's3ql_1\n'):
+ raise CorruptedObjectError('Invalid object header: %r' % buf)
+
+ len_ = struct.unpack('<H', buf[-2:])[0]
+ try:
+ return thaw_basic_mapping(fh.read(len_))
+ except ThawError:
+ raise CorruptedObjectError('Invalid metadata')
def escape(s):
'''Escape '/', '=' and '.' in s'''
diff --git a/src/s3ql/backends/s3c.py b/src/s3ql/backends/s3c.py
index 4f888d8..910f574 100644
--- a/src/s3ql/backends/s3c.py
+++ b/src/s3ql/backends/s3c.py
@@ -10,7 +10,7 @@ from ..logging import logging, QuietError # Ensure use of custom logger class
from .. import BUFSIZE
from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError,
AuthenticationError, DanglingStorageURLError, retry_generator,
- get_proxy, get_ssl_context, CorruptedObjectError, safe_unpickle,
+ get_proxy, get_ssl_context, CorruptedObjectError,
checksum_basic_mapping)
from ..inherit_docstrings import (copy_ancestor_docstring, prepend_ancestor_docstring,
ABCDocstMeta)
@@ -32,7 +32,6 @@ import re
import tempfile
import time
import urllib.parse
-import pickle
C_DAY_NAMES = [ 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun' ]
C_MONTH_NAMES = [ 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec' ]
@@ -86,7 +85,8 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
# NOTE: ! This function is also used by the swift backend !
@copy_ancestor_docstring
def reset(self):
- if self.conn.response_pending() or self.conn._out_remaining:
+ if (self.conn is not None and
+ (self.conn.response_pending() or self.conn._out_remaining)):
log.debug('Resetting state of http connection %d', id(self.conn))
self.conn.disconnect()
@@ -123,7 +123,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
conn = HTTPConnection(self.hostname, self.port, proxy=self.proxy,
ssl_context=self.ssl_context)
- conn.timeout = int(self.options.get('tcp-timeout', 10))
+ conn.timeout = int(self.options.get('tcp-timeout', 20))
return conn
# This method is also used implicitly for the retry handling of
@@ -194,13 +194,15 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
@retry
@copy_ancestor_docstring
- def delete(self, key, force=False):
+ def delete(self, key, force=False, is_retry=False):
log.debug('started with %s', key)
try:
resp = self._do_request('DELETE', '/%s%s' % (self.prefix, key))
self._assert_empty_response(resp)
except NoSuchKeyError:
- if force:
+ # Server may have deleted the object even though we did not
+ # receive the response.
+ if force or is_retry:
pass
else:
raise NoSuchObject(key)
@@ -522,7 +524,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
raise HTTPError(resp.status, resp.reason, resp.headers)
# If not XML, do the best we can
- if not XML_CONTENT_RE.match(content_type):
+ if not XML_CONTENT_RE.match(content_type) or resp.length == 0:
self.conn.discard()
raise HTTPError(resp.status, resp.reason, resp.headers)
@@ -709,33 +711,8 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
'''Extract metadata from HTTP response object'''
format_ = resp.headers.get('%smeta-format' % self.hdr_prefix, 'raw')
- if format_ in ('raw', 'pickle'):
- meta = CaseInsensitiveDict()
- pattern = re.compile(r'^%smeta-(.+)$' % re.escape(self.hdr_prefix),
- re.IGNORECASE)
- for fname in resp.headers:
- hit = pattern.search(fname)
- if hit:
- meta[hit.group(1)] = resp.headers[fname]
-
- if format_ == 'raw':
- return meta
-
- # format_ == pickle
- buf = ''.join(meta[x] for x in sorted(meta) if x.lower().startswith('data-'))
- if 'md5' in meta and md5sum_b64(buf.encode('us-ascii')) != meta['md5']:
- log.warning('MD5 mismatch in metadata for %s', obj_key)
- raise BadDigestError('BadDigest', 'Meta MD5 for %s does not match' % obj_key)
- try:
- return safe_unpickle(b64decode(buf), encoding='latin1')
- except binascii.Error:
- raise CorruptedObjectError('Corrupted metadata, b64decode failed')
- except pickle.UnpicklingError as exc:
- raise CorruptedObjectError('Corrupted metadata, pickle says: %s' % exc)
-
- elif format_ != 'raw2': # Current
- raise RuntimeError('Unknown metadata format %s for key %s'
- % (format_, obj_key))
+ if format_ != 'raw2': # Current
+ raise CorruptedObjectError('Invalid metadata format: %s' % format_)
parts = []
for i in count():
diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py
index a02ca24..8b761b6 100644
--- a/src/s3ql/backends/swift.py
+++ b/src/s3ql/backends/swift.py
@@ -151,7 +151,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
with HTTPConnection(self.hostname, self.port, proxy=self.proxy,
ssl_context=ssl_context) as conn:
- conn.timeout = int(self.options.get('tcp-timeout', 10))
+ conn.timeout = int(self.options.get('tcp-timeout', 20))
for auth_path in ('/v1.0', '/auth/v1.0'):
log.debug('GET %s', auth_path)
@@ -184,7 +184,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
conn = HTTPConnection(o.hostname, o.port, proxy=self.proxy,
ssl_context=ssl_context)
- conn.timeout = int(self.options.get('tcp-timeout', 10))
+ conn.timeout = int(self.options.get('tcp-timeout', 20))
return conn
raise RuntimeError('No valid authentication path found')
@@ -379,7 +379,7 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
@retry
@copy_ancestor_docstring
- def delete(self, key, force=False):
+ def delete(self, key, force=False, is_retry=False):
if key.endswith(TEMP_SUFFIX):
raise ValueError('Keys must not end with %s' % TEMP_SUFFIX)
log.debug('started with %s', key)
@@ -387,13 +387,22 @@ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
resp = self._do_request('DELETE', '/%s%s' % (self.prefix, key))
self._assert_empty_response(resp)
except HTTPError as exc:
- if exc.status == 404 and not force:
+ # Server may have deleted the object even though we did not
+ # receive the response.
+ if exc.status == 404 and not (force or is_retry):
raise NoSuchObject(key)
elif exc.status != 404:
raise
- # Temporary hack. I promise to make this nicer by the next
- # release :-).
+ # We cannot wrap the entire copy() method into a retry() decorator, because
+ # copy() issues multiple requests. If the server happens to regularly close
+ # the connection after a request (even though it was processed correctly),
+ # we'd never make any progress if we always restart from the first request.
+ # We experimented with adding a retry(fn, args, kwargs) function to wrap
+ # individual calls, but this doesn't really improve the code because we
+ # typically also have to wrap e.g. a discard() or assert_empty() call, so we
+ # end up with having to create an additional function anyway. Having
+ # anonymous blocks in Python would be very nice here.
@retry
def _copy_helper(self, method, path, headers):
self._do_request(method, path, headers=headers)
diff --git a/src/s3ql/backends/swiftks.py b/src/s3ql/backends/swiftks.py
index 2492c0a..1af7577 100644
--- a/src/s3ql/backends/swiftks.py
+++ b/src/s3ql/backends/swiftks.py
@@ -85,7 +85,7 @@ class Backend(swift.Backend):
with HTTPConnection(self.hostname, port=self.port, proxy=self.proxy,
ssl_context=ssl_context) as conn:
- conn.timeout = int(self.options.get('tcp-timeout', 10))
+ conn.timeout = int(self.options.get('tcp-timeout', 20))
conn.send_request('POST', '/v2.0/tokens', headers=headers,
body=json.dumps(auth_body).encode('utf-8'))
@@ -122,7 +122,7 @@ class Backend(swift.Backend):
conn = HTTPConnection(o.hostname, o.port, proxy=self.proxy,
ssl_context=ssl_context)
- conn.timeout = int(self.options.get('tcp-timeout', 10))
+ conn.timeout = int(self.options.get('tcp-timeout', 20))
return conn
if len(avail_regions) < 10:
diff --git a/src/s3ql/common.py b/src/s3ql/common.py
index 61af937..3514bba 100644
--- a/src/s3ql/common.py
+++ b/src/s3ql/common.py
@@ -8,12 +8,11 @@ This program can be distributed under the terms of the GNU GPLv3.
from .logging import logging, QuietError # Ensure use of custom logger class
from . import BUFSIZE, CTRL_NAME, ROOT_INODE
-from .backends import prefix_map
-from .backends.common import (CorruptedObjectError, NoSuchObject, AuthenticationError,
- DanglingStorageURLError, AuthorizationError)
-from .backends.comprenc import ComprencBackend
from dugong import HostnameNotResolvable
from getpass import getpass
+from ast import literal_eval
+from base64 import b64decode, b64encode
+import binascii
import configparser
import re
import stat
@@ -22,7 +21,7 @@ import traceback
import sys
import os
import subprocess
-import bz2
+import pickle
import errno
import hashlib
import llfuse
@@ -41,16 +40,13 @@ def bytes2path(s):
def get_seq_no(backend):
'''Get current metadata sequence number'''
+ from .backends.common import NoSuchObject
+
seq_nos = list(backend.list('s3ql_seq_no_'))
if not seq_nos:
# Maybe list result is outdated
seq_nos = [ 's3ql_seq_no_1' ]
- if (seq_nos[0].endswith('.meta')
- or seq_nos[0].endswith('.dat')):
- raise QuietError('Old file system revision, please run `s3qladm upgrade` first.',
- exitcode=32)
-
seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in seq_nos ]
seq_no = max(seq_nos)
@@ -73,36 +69,6 @@ def get_seq_no(backend):
return seq_no
-def stream_write_bz2(ifh, ofh):
- '''Compress *ifh* into *ofh* using bz2 compression'''
-
- compr = bz2.BZ2Compressor(9)
- while True:
- buf = ifh.read(BUFSIZE)
- if not buf:
- break
- buf = compr.compress(buf)
- if buf:
- ofh.write(buf)
- buf = compr.flush()
- if buf:
- ofh.write(buf)
-
-def stream_read_bz2(ifh, ofh):
- '''Uncompress bz2 compressed *ifh* into *ofh*'''
-
- decompressor = bz2.BZ2Decompressor()
- while True:
- buf = ifh.read(BUFSIZE)
- if not buf:
- break
- buf = decompressor.decompress(buf)
- if buf:
- ofh.write(buf)
-
- if decompressor.unused_data or ifh.read(1) != b'':
- raise CorruptedObjectError('Data after end of bz2 stream')
-
def is_mounted(storage_url):
'''Try to determine if *storage_url* is mounted
@@ -282,25 +248,33 @@ def assert_s3ql_mountpoint(mountpoint):
return ctrlfile
-def get_backend(options, plain=False):
+def get_backend(options, raw=False):
'''Return backend for given storage-url
- If *plain* is true, don't attempt to unlock and don't wrap into
+ If *raw* is true, don't attempt to unlock and don't wrap into
ComprencBackend.
'''
- return get_backend_factory(options, plain)()
+ return get_backend_factory(options.storage_url, options.backend_options,
+ options.authfile,
+ getattr(options, 'compress', ('lzma', 2)), raw)()
-def get_backend_factory(options, plain=False):
+def get_backend_factory(storage_url, backend_options, authfile,
+ compress=('lzma', 2), raw=False):
'''Return factory producing backend objects for given storage-url
- If *plain* is true, don't attempt to unlock and don't wrap into
+ If *raw* is true, don't attempt to unlock and don't wrap into
ComprencBackend.
'''
- hit = re.match(r'^([a-zA-Z0-9]+)://', options.storage_url)
+ from .backends import prefix_map
+ from .backends.common import (CorruptedObjectError, NoSuchObject, AuthenticationError,
+ DanglingStorageURLError, AuthorizationError)
+ from .backends.comprenc import ComprencBackend
+
+ hit = re.match(r'^([a-zA-Z0-9]+)://', storage_url)
if not hit:
- raise QuietError('Unable to parse storage url "%s"' % options.storage_url,
+ raise QuietError('Unable to parse storage url "%s"' % storage_url,
exitcode=2)
backend = hit.group(1)
@@ -310,19 +284,19 @@ def get_backend_factory(options, plain=False):
raise QuietError('No such backend: %s' % backend, exitcode=11)
# Validate backend options
- for opt in options.backend_options.keys():
+ for opt in backend_options.keys():
if opt not in backend_class.known_options:
raise QuietError('Unknown backend option: %s' % opt,
exitcode=3)
# Read authfile
config = configparser.ConfigParser()
- if os.path.isfile(options.authfile):
- mode = os.stat(options.authfile).st_mode
+ if os.path.isfile(authfile):
+ mode = os.stat(authfile).st_mode
if mode & (stat.S_IRGRP | stat.S_IROTH):
- raise QuietError("%s has insecure permissions, aborting." % options.authfile,
+ raise QuietError("%s has insecure permissions, aborting." % authfile,
exitcode=12)
- config.read(options.authfile)
+ config.read(authfile)
backend_login = None
backend_passphrase = None
@@ -336,16 +310,12 @@ def get_backend_factory(options, plain=False):
pattern = getopt('storage-url')
- if not pattern or not options.storage_url.startswith(pattern):
+ if not pattern or not storage_url.startswith(pattern):
continue
backend_login = getopt('backend-login') or backend_login
backend_passphrase = getopt('backend-password') or backend_passphrase
fs_passphrase = getopt('fs-passphrase') or fs_passphrase
- if getopt('fs-passphrase') is None and getopt('bucket-passphrase') is not None:
- fs_passphrase = getopt('bucket-passphrase')
- log.warning("Warning: the 'bucket-passphrase' configuration option has been "
- "renamed to 'fs-passphrase'! Please update your authinfo file.")
if not backend_login and backend_class.needs_login:
if sys.stdin.isatty():
@@ -361,8 +331,8 @@ def get_backend_factory(options, plain=False):
backend = None
try:
- backend = backend_class(options.storage_url, backend_login, backend_passphrase,
- options.backend_options)
+ backend = backend_class(storage_url, backend_login, backend_passphrase,
+ backend_options)
# Do not use backend.lookup(), this would use a HEAD request and
# not provide any useful error messages if something goes wrong
@@ -394,9 +364,9 @@ def get_backend_factory(options, plain=False):
if backend is not None:
backend.close()
- if plain:
- return lambda: backend_class(options.storage_url, backend_login, backend_passphrase,
- options.backend_options)
+ if raw:
+ return lambda: backend_class(storage_url, backend_login, backend_passphrase,
+ backend_options)
if encrypted and not fs_passphrase:
if sys.stdin.isatty():
@@ -409,15 +379,10 @@ def get_backend_factory(options, plain=False):
if fs_passphrase is not None:
fs_passphrase = fs_passphrase.encode('utf-8')
- if hasattr(options, 'compress'):
- compress = options.compress
- else:
- compress = ('lzma', 2)
-
if not encrypted:
return lambda: ComprencBackend(None, compress,
- backend_class(options.storage_url, backend_login,
- backend_passphrase, options.backend_options))
+ backend_class(storage_url, backend_login,
+ backend_passphrase, backend_options))
with ComprencBackend(fs_passphrase, compress, backend) as tmp_backend:
try:
@@ -429,8 +394,8 @@ def get_backend_factory(options, plain=False):
# passphrase in every backend object.
def factory():
b = ComprencBackend(data_pw, compress,
- backend_class(options.storage_url, backend_login,
- backend_passphrase, options.backend_options))
+ backend_class(storage_url, backend_login,
+ backend_passphrase, backend_options))
b.fs_passphrase = fs_passphrase
return b
@@ -538,7 +503,7 @@ def split_by_n(seq, n):
while seq:
yield seq[:n]
seq = seq[n:]
-
+
def handle_on_return(fn):
'''Provide fresh ExitStack instance in `on_return` argument'''
@functools.wraps(fn)
@@ -548,3 +513,109 @@ def handle_on_return(fn):
kw['on_return'] = on_return
return fn(*a, **kw)
return wrapper
+
+def parse_literal(buf, type_spec):
+ '''Try to parse *buf* as *type_spec*
+
+ Raise `ValueError` if *buf* does not contain a valid
+ Python literal, or if the literal does not correspond
+ to *type_spec*.
+
+ Example use::
+
+ buf = b'[1, 'a', 3]'
+ parse_literal(buf, [int, str, int])
+
+ '''
+
+ try:
+ obj = literal_eval(buf.decode())
+ except UnicodeDecodeError:
+ raise ValueError('unable to decode as utf-8')
+ except (ValueError, SyntaxError):
+ raise ValueError('unable to parse as python literal')
+
+ if (isinstance(type_spec, list) and type(obj) == list
+ and [ type(x) for x in obj ] == type_spec):
+ return obj
+ elif (isinstance(type_spec, tuple) and type(obj) == tuple
+ and [ type(x) for x in obj ] == list(type_spec)):
+ return obj
+ elif type(obj) == type_spec:
+ return obj
+
+ raise ValueError('literal has wrong type')
+
+class ThawError(Exception):
+ def __str__(self):
+ return 'Malformed serialization data'
+
+def thaw_basic_mapping(buf):
+ '''Reconstruct dict from serialized representation
+
+ *buf* must be a bytes-like object as created by
+ `freeze_basic_mapping`. Raises `ThawError` if *buf* is not a valid
+ representation.
+
+ This procedure is safe even if *buf* comes from an untrusted source.
+ '''
+
+ try:
+ d = literal_eval(buf.decode('utf-8'))
+ except (UnicodeDecodeError, SyntaxError, ValueError):
+ raise ThawError()
+
+ # Decode bytes values
+ for (k,v) in d.items():
+ if not isinstance(v, bytes):
+ continue
+ try:
+ d[k] = b64decode(v)
+ except binascii.Error:
+ raise ThawError()
+
+ return d
+
+def freeze_basic_mapping(d):
+ '''Serialize mapping of elementary types
+
+ Keys of *d* must be strings. Values of *d* must be of elementary type (i.e.,
+ `str`, `bytes`, `int`, `float`, `complex`, `bool` or None).
+
+ The output is a bytestream that can be used to reconstruct the mapping. The
+ bytestream is not guaranteed to be deterministic. Look at
+ `checksum_basic_mapping` if you need a deterministic bytestream.
+ '''
+
+ els = []
+ for (k,v) in d.items():
+ if not isinstance(k, str):
+ raise ValueError('key %s must be str, not %s' % (k, type(k)))
+
+ if (not isinstance(v, (str, bytes, bytearray, int, float, complex, bool))
+ and v is not None):
+ raise ValueError('value for key %s (%s) is not elementary' % (k, v))
+
+ # To avoid wasting space, we b64encode non-ascii byte values.
+ if isinstance(v, (bytes, bytearray)):
+ v = b64encode(v)
+
+ # This should be a pretty safe assumption for elementary types, but we
+ # add an assert just to be safe (Python docs just say that repr makes
+ # "best effort" to produce something parseable)
+ (k_repr, v_repr) = (repr(k), repr(v))
+ assert (literal_eval(k_repr), literal_eval(v_repr)) == (k, v)
+
+ els.append(('%s: %s' % (k_repr, v_repr)))
+
+ buf = '{ %s }' % ', '.join(els)
+ return buf.encode('utf-8')
+
+def load_params(fname):
+ with open(fname, 'rb') as fh:
+ proto = fh.read(2)
+ fh.seek(0)
+ if proto == b'\x80\x02':
+ return pickle.load(fh)
+ else:
+ return thaw_basic_mapping(fh.read())
diff --git a/src/s3ql/cp.py b/src/s3ql/cp.py
index 7b1e4d5..38118cf 100644
--- a/src/s3ql/cp.py
+++ b/src/s3ql/cp.py
@@ -8,11 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3.
from .logging import logging, setup_logging, QuietError
from .common import assert_fs_owner
-from . import PICKLE_PROTOCOL
from .parse_args import ArgumentParser
import llfuse
import os
-import pickle
import stat
import sys
import textwrap
@@ -88,9 +86,8 @@ def main(args=None):
raise QuietError('No permission to create target directory')
fstat_t = os.stat(options.target)
- llfuse.setxattr(ctrlfile, 'copy', pickle.dumps((fstat_s.st_ino, fstat_t.st_ino),
- PICKLE_PROTOCOL))
-
+ llfuse.setxattr(ctrlfile, 'copy',
+ ('(%d, %d)' % (fstat_s.st_ino, fstat_t.st_ino)).encode())
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/src/s3ql/ctrl.py b/src/s3ql/ctrl.py
index ca667f1..dfc39af 100644
--- a/src/s3ql/ctrl.py
+++ b/src/s3ql/ctrl.py
@@ -8,10 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3.
from .logging import logging, setup_logging
from .common import assert_fs_owner
-from . import PICKLE_PROTOCOL
from .parse_args import ArgumentParser
import llfuse
-import pickle
import sys
import textwrap
@@ -69,14 +67,8 @@ def parse_args(args):
if not options.modules:
options.modules = [ 'all' ]
- if options.level:
- # Protected member ok, hopefully this won't break
- #pylint: disable=W0212
- options.level = logging._levelNames[options.level.upper()]
-
return options
-
def main(args=None):
'''Control a mounted S3QL File System.'''
@@ -97,15 +89,11 @@ def main(args=None):
llfuse.setxattr(ctrlfile, 'upload-meta', b'dummy')
elif options.action == 'log':
- llfuse.setxattr(ctrlfile, 'logging',
- pickle.dumps((options.level, options.modules),
- PICKLE_PROTOCOL))
+ cmd = ('(%r, %r)' % (options.level, ','.join(options.modules))).encode()
+ llfuse.setxattr(ctrlfile, 'logging', cmd)
elif options.action == 'cachesize':
- llfuse.setxattr(ctrlfile, 'cachesize',
- pickle.dumps(options.cachesize * 1024, PICKLE_PROTOCOL))
-
-
+ llfuse.setxattr(ctrlfile, 'cachesize', ('%d' % options.cachesize * 1024).encode())
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/src/s3ql/fs.py b/src/s3ql/fs.py
index b54936b..70c5d43 100644
--- a/src/s3ql/fs.py
+++ b/src/s3ql/fs.py
@@ -7,9 +7,9 @@ This program can be distributed under the terms of the GNU GPLv3.
'''
from .logging import logging
-from . import deltadump, CTRL_NAME, CTRL_INODE, PICKLE_PROTOCOL
+from . import deltadump, CTRL_NAME, CTRL_INODE
from .backends.common import NoSuchObject, CorruptedObjectError
-from .common import get_path
+from .common import get_path, parse_literal
from .database import NoSuchRowError
from .inode_cache import OutOfInodesError
from io import BytesIO
@@ -19,7 +19,6 @@ import errno
import llfuse
import math
import os
-import pickle
import stat
import struct
import time
@@ -207,7 +206,7 @@ class Operations(llfuse.Operations):
# Handle S3QL commands
if id_ == CTRL_INODE:
if name == b's3ql_pid?':
- return pickle.dumps(os.getpid(), PICKLE_PROTOCOL)
+ return ('%d' % os.getpid()).encode()
elif name == b's3qlstat':
return self.extstat()
@@ -242,22 +241,55 @@ class Operations(llfuse.Operations):
if id_ == CTRL_INODE:
if name == b's3ql_flushcache!':
self.cache.clear()
+
elif name == b'copy':
- self.copy_tree(*pickle.loads(value))
+ try:
+ tup = parse_literal(value, (int, int))
+ except ValueError:
+ log.warning('Received malformed command via control inode')
+ raise FUSEError.EINVAL()
+ self.copy_tree(*tup)
+
elif name == b'upload-meta':
if self.upload_event is not None:
self.upload_event.set()
else:
raise llfuse.FUSEError(errno.ENOTTY)
+
elif name == b'lock':
- self.lock_tree(*pickle.loads(value))
+ try:
+ id_ = parse_literal(value, int)
+ except ValueError:
+ log.warning('Received malformed command via control inode')
+ raise FUSEError.EINVAL()
+ self.lock_tree(id_)
+
elif name == b'rmtree':
- self.remove_tree(*pickle.loads(value))
+ try:
+ tup = parse_literal(value, (int, bytes))
+ except ValueError:
+ log.warning('Received malformed command via control inode')
+ raise FUSEError.EINVAL()
+ self.remove_tree(*tup)
+
elif name == b'logging':
- update_logging(*pickle.loads(value))
+ try:
+ (lvl, modules)= parse_literal(value, (str, str))
+ lvl = logging._levelNames[lvl.upper()]
+ except (ValueError, KeyError):
+ log.warning('Received malformed command via control inode')
+ raise FUSEError.EINVAL()
+ update_logging(lvl, modules.split(',') if modules else None)
+
elif name == b'cachesize':
- self.cache.max_size = pickle.loads(value)
+ try:
+ self.cache.cache.max_size = parse_literal(value, int)
+ except ValueError:
+ log.warning('Received malformed command via control inode')
+ raise FUSEError.EINVAL()
+
else:
+ log.warning('Received unknown command via control inode')
raise llfuse.FUSEError(errno.EINVAL)
# http://code.google.com/p/s3ql/issues/detail?id=385
diff --git a/src/s3ql/fsck.py b/src/s3ql/fsck.py
index b7a1094..cbec005 100644
--- a/src/s3ql/fsck.py
+++ b/src/s3ql/fsck.py
@@ -7,27 +7,25 @@ This program can be distributed under the terms of the GNU GPLv3.
'''
from .logging import logging, setup_logging, QuietError
-from . import CURRENT_FS_REV, BUFSIZE, CTRL_INODE, PICKLE_PROTOCOL, ROOT_INODE
+from . import CURRENT_FS_REV, BUFSIZE, CTRL_INODE, ROOT_INODE
from .backends.common import NoSuchObject
from .backends.comprenc import ComprencBackend
from .backends.local import Backend as LocalBackend
from .common import (inode_for_path, sha256_fh, get_path, get_backend_cachedir,
- get_seq_no, stream_write_bz2, stream_read_bz2, is_mounted,
- get_backend, pretty_print_size)
+ get_seq_no, is_mounted, get_backend, load_params,
+ freeze_basic_mapping)
from .database import NoSuchRowError, Connection
-from .metadata import restore_metadata, cycle_metadata, dump_metadata, create_tables
+from .metadata import create_tables, dump_and_upload_metadata, download_metadata
from .parse_args import ArgumentParser
from os.path import basename
import apsw
import os
-import pickle
import hashlib
import re
import shutil
import itertools
import stat
import sys
-import tempfile
import textwrap
import time
import atexit
@@ -267,7 +265,7 @@ class Fsck(object):
(id_p_new, newname) = self.resolve_free(b"/lost+found", newname)
self.log_error('Content entry for inode %d refers to non-existing name with id %d, '
- 'moving to /lost+found/%s', inode, name_id, newname)
+ 'moving to /lost+found/%s', inode, name_id, to_str(newname))
self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? WHERE rowid=?',
(self._add_name(newname), id_p_new, rowid))
@@ -287,7 +285,8 @@ class Fsck(object):
(id_p_new, newname) = self.resolve_free(b"/lost+found",
('[%d]-%s' % (inode_p, name)).encode())
- self.log_error('Parent inode %d for "%s" vanished, moving to /lost+found', inode_p, name)
+ self.log_error('Parent inode %d for "%s" vanished, moving to /lost+found',
+ inode_p, to_str(name))
self._del_name(name_id)
self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? WHERE rowid=?',
(self._add_name(newname), id_p_new, rowid))
@@ -308,7 +307,7 @@ class Fsck(object):
except NoSuchRowError:
path = '[inode %d, parent %d]' % (inode, inode_p)
- self.log_error('Inode for %s vanished, deleting', path)
+ self.log_error('Inode for %s vanished, deleting', to_str(path))
self._del_name(name_id)
to_delete.append(rowid)
@@ -329,7 +328,7 @@ class Fsck(object):
'FROM contents_v WHERE inode=?', (inode,)):
path = get_path(id_p, self.conn, name)
self.log_error('Extended attribute %d of %s refers to non-existing name %d, renaming..',
- rowid, path, name_id)
+ rowid, to_str(path), name_id)
while True:
name_id = self._add_name('lost+found_%d' % rowid)
@@ -404,7 +403,7 @@ class Fsck(object):
(id_p, name) = self.resolve_free(b"/lost+found", name)
self.log_error("Found unreachable filesystem entries, re-anchoring %s [%d] "
- "in /lost+found", name, inode)
+ "in /lost+found", to_str(name), inode)
self.conn.execute('UPDATE contents SET parent_inode=?, name_id=? '
'WHERE inode=? AND name_id=?',
(id_p, self._add_name(name), inode, name_id))
@@ -441,7 +440,7 @@ class Fsck(object):
self.found_errors = True
self.log_error("Size of inode %d (%s) does not agree with number of blocks, "
"setting from %d to %d",
- id_, get_path(id_, self.conn), size_old, size)
+ id_, to_str(get_path(id_, self.conn)), size_old, size)
self.conn.execute("UPDATE inodes SET size=? WHERE id=?", (size, id_))
finally:
self.conn.execute('DROP TABLE min_sizes')
@@ -473,14 +472,15 @@ class Fsck(object):
self.found_errors = True
if cnt is None:
(id_p, name) = self.resolve_free(b"/lost+found", ("inode-%d" % id_).encode())
- self.log_error("Inode %d not referenced, adding as /lost+found/%s", id_, name)
+ self.log_error("Inode %d not referenced, adding as /lost+found/%s",
+ id_, to_str(name))
self.conn.execute("INSERT INTO contents (name_id, inode, parent_inode) "
"VALUES (?,?,?)", (self._add_name(basename(name)), id_, id_p))
self.conn.execute("UPDATE inodes SET refcount=? WHERE id=?", (1, id_))
else:
self.log_error("Inode %d (%s) has wrong reference count, setting from %d to %d",
- id_, get_path(id_, self.conn), cnt_old, cnt)
+ id_, to_str(get_path(id_, self.conn)), cnt_old, cnt)
self.conn.execute("UPDATE inodes SET refcount=? WHERE id=?", (cnt, id_))
finally:
self.conn.execute('DROP TABLE refcounts')
@@ -504,7 +504,7 @@ class Fsck(object):
for (name, name_id, id_p) in self.conn.query('SELECT name, name_id, parent_inode '
'FROM contents_v WHERE inode=?', (inode,)):
path = get_path(id_p, self.conn, name)
- self.log_error("File may lack data, moved to /lost+found: %s", path)
+ self.log_error("File may lack data, moved to /lost+found: %s", to_str(path))
(lof_id, newname) = self.resolve_free(b"/lost+found", escape(path))
self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? '
@@ -555,7 +555,7 @@ class Fsck(object):
for (name, name_id, id_p) in self.conn.query('SELECT name, name_id, parent_inode '
'FROM contents_v WHERE inode=?', (inode,)):
path = get_path(id_p, self.conn, name)
- self.log_error("File may lack data, moved to /lost+found: %s", path)
+ self.log_error("File may lack data, moved to /lost+found: %s", to_str(path))
(lof_id, newname) = self.resolve_free(b"/lost+found", escape(path))
self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? '
@@ -619,7 +619,8 @@ class Fsck(object):
elif cnt is None:
self.found_errors = True
(id_p, name) = self.resolve_free(b"/lost+found", ("block-%d" % id_).encode())
- self.log_error("Block %d not referenced, adding as /lost+found/%s", id_, name)
+ self.log_error("Block %d not referenced, adding as /lost+found/%s",
+ id_, to_str(name))
timestamp = time.time()
size = self.conn.get_val('SELECT size FROM blocks WHERE id=?', (id_,))
inode = self.create_inode(mode=stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR,
@@ -751,20 +752,20 @@ class Fsck(object):
self.found_errors = True
self.log_error('Inode %d (%s): directory entry has no type, changed '
- 'to %s.', inode, get_path(inode, self.conn), made_to)
+ 'to %s.', inode, to_str(get_path(inode, self.conn)), made_to)
self.conn.execute('UPDATE inodes SET mode=? WHERE id=?', (mode, inode))
if stat.S_ISLNK(mode) and target is None:
self.found_errors = True
self.log_error('Inode %d (%s): symlink does not have target. '
'This is probably going to confuse your system!',
- inode, get_path(inode, self.conn))
+ inode, to_str(get_path(inode, self.conn)))
if stat.S_ISLNK(mode) and target is not None and size != len(target):
self.found_errors = True
self.log_error('Inode %d (%s): symlink size (%d) does not agree with target '
'length (%d). This is probably going to confuse your system!',
- inode, get_path(inode, self.conn), size, len(target))
+ inode, to_str(get_path(inode, self.conn)), size, len(target))
if size != 0 and (not stat.S_ISREG(mode)
and not stat.S_ISLNK(mode)
@@ -772,33 +773,33 @@ class Fsck(object):
self.found_errors = True
self.log_error('Inode %d (%s) is not regular file but has non-zero size. '
'This is may confuse your system!',
- inode, get_path(inode, self.conn))
+ inode, to_str(get_path(inode, self.conn)))
if target is not None and not stat.S_ISLNK(mode):
self.found_errors = True
self.log_error('Inode %d (%s) is not symlink but has symlink target. '
'This is probably going to confuse your system!',
- inode, get_path(inode, self.conn))
+ inode, to_str(get_path(inode, self.conn)))
if rdev != 0 and not (stat.S_ISBLK(mode) or stat.S_ISCHR(mode)):
self.found_errors = True
self.log_error('Inode %d (%s) is not device but has device number. '
'This is probably going to confuse your system!',
- inode, get_path(inode, self.conn))
+ inode, to_str(get_path(inode, self.conn)))
if has_children and not stat.S_ISDIR(mode):
self.found_errors = True
self.log_error('Inode %d (%s) is not a directory but has child entries. '
'This is probably going to confuse your system!',
- inode, get_path(inode, self.conn))
+ inode, to_str(get_path(inode, self.conn)))
if (not stat.S_ISREG(mode) and
self.conn.has_val('SELECT 1 FROM inode_blocks WHERE inode=?', (inode,))):
self.found_errors = True
self.log_error('Inode %d (%s) is not a regular file but has data blocks. '
'This is probably going to confuse your system!',
- inode, get_path(inode, self.conn))
+ inode, to_str(get_path(inode, self.conn)))
@@ -808,7 +809,8 @@ class Fsck(object):
'WHERE LENGTH(name) > 255'):
path = get_path(id_p, self.conn, name)
self.log_error('Entry name %s... in %s has more than 255 characters, '
- 'this could cause problems', name[:40], path[:-len(name)])
+ 'this could cause problems', to_str(name[:40]),
+ to_str(path[:-len(name)]))
self.found_errors = True
def check_objects_refcount(self):
@@ -947,7 +949,7 @@ class Fsck(object):
for (name, name_id, id_p) in self.conn.query('SELECT name, name_id, parent_inode '
'FROM contents_v WHERE inode=?', (id_,)):
path = get_path(id_p, self.conn, name)
- self.log_error("File may lack data, moved to /lost+found: %s", path)
+ self.log_error("File may lack data, moved to /lost+found: %s", to_str(path))
(_, newname) = self.resolve_free(b"/lost+found", escape(path))
self.conn.execute('UPDATE contents SET name_id=?, parent_inode=? '
@@ -991,13 +993,15 @@ class Fsck(object):
if not isinstance(path, bytes):
raise TypeError('path must be of type bytes')
+ if not isinstance(name, bytes):
+ raise TypeError('name must be of type bytes')
inode_p = inode_for_path(path, self.conn)
# Debugging http://code.google.com/p/s3ql/issues/detail?id=217
# and http://code.google.com/p/s3ql/issues/detail?id=261
if len(name) > 255 - 4:
- name = '%s ... %s' % (name[0:120], name[-120:])
+ name = b''.join((name[0:120], b' ... ', name[-120:]))
i = 0
newname = name
@@ -1046,8 +1050,7 @@ class ROFsck(Fsck):
db = Connection(path + '.db')
db.execute('PRAGMA journal_mode = WAL')
- with open(path + '.params', 'rb') as fh:
- param = pickle.load(fh)
+ param = load_params(path + '.params')
super().__init__(None, None, param, db)
def check(self):
@@ -1161,8 +1164,7 @@ def main(args=None):
if os.path.exists(cachepath + '.params'):
assert os.path.exists(cachepath + '.db')
- with open(cachepath + '.params', 'rb') as fh:
- param = pickle.load(fh)
+ param = load_params(cachepath + '.params')
if param['seq_no'] < seq_no:
log.info('Ignoring locally cached metadata (outdated).')
param = backend.lookup('s3ql_metadata')
@@ -1248,25 +1250,14 @@ def main(args=None):
+ cachepath + '.db (corrupted)\n'
+ cachepath + '.param (intact)', exitcode=43)
else:
- with tempfile.TemporaryFile() as tmpfh:
- def do_read(fh):
- tmpfh.seek(0)
- tmpfh.truncate()
- stream_read_bz2(fh, tmpfh)
-
- log.info('Downloading and decompressing metadata...')
- backend.perform_read(do_read, "s3ql_metadata")
-
- log.info("Reading metadata...")
- tmpfh.seek(0)
- db = restore_metadata(tmpfh, cachepath + '.db')
+ db = download_metadata(backend, cachepath + '.db')
# Increase metadata sequence no
param['seq_no'] += 1
param['needs_fsck'] = True
backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty'
with open(cachepath + '.params', 'wb') as fh:
- pickle.dump(param, fh, PICKLE_PROTOCOL)
+ fh.write(freeze_basic_mapping(param))
fsck = Fsck(cachepath + '-cache', backend, param, db)
fsck.check()
@@ -1291,22 +1282,9 @@ def main(args=None):
param['last_fsck'] = time.time()
param['last-modified'] = time.time()
- log.info('Dumping metadata...')
- with tempfile.TemporaryFile() as fh:
- dump_metadata(db, fh)
- def do_write(obj_fh):
- fh.seek(0)
- stream_write_bz2(fh, obj_fh)
- return obj_fh
-
- log.info("Compressing and uploading metadata...")
- obj_fh = backend.perform_write(do_write, "s3ql_metadata_new", metadata=param,
- is_compressed=True)
- log.info('Wrote %s of compressed metadata.', pretty_print_size(obj_fh.get_obj_size()))
- log.info('Cycling metadata backups...')
- cycle_metadata(backend)
+ dump_and_upload_metadata(backend, db, param)
with open(cachepath + '.params', 'wb') as fh:
- pickle.dump(param, fh, PICKLE_PROTOCOL)
+ fh.write(freeze_basic_mapping(param))
log.info('Cleaning up local metadata...')
db.execute('ANALYZE')
@@ -1381,5 +1359,10 @@ def escape(path):
return path[1:].replace(b'_', b'__').replace(b'/', b'_')
+def to_str(name):
+ '''Decode path name for printing'''
+
+ return str(name, encoding='utf-8', errors='replace')
+
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/src/s3ql/lock.py b/src/s3ql/lock.py
index 2364605..20b15c8 100644
--- a/src/s3ql/lock.py
+++ b/src/s3ql/lock.py
@@ -8,11 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3.
from .logging import logging, setup_logging, QuietError
from .common import assert_fs_owner
-from . import PICKLE_PROTOCOL
from .parse_args import ArgumentParser
import llfuse
import os
-import pickle
import sys
import textwrap
@@ -54,7 +52,7 @@ def main(args=None):
raise QuietError('%s is a mount point.' % name)
ctrlfile = assert_fs_owner(name)
fstat = os.stat(name)
- llfuse.setxattr(ctrlfile, 'lock', pickle.dumps((fstat.st_ino,), PICKLE_PROTOCOL))
+ llfuse.setxattr(ctrlfile, 'lock', ('%d' % fstat.st_ino).encode())
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/src/s3ql/metadata.py b/src/s3ql/metadata.py
index bb27dae..5ac2978 100644
--- a/src/s3ql/metadata.py
+++ b/src/s3ql/metadata.py
@@ -8,9 +8,13 @@ This program can be distributed under the terms of the GNU GPLv3.
from .logging import logging # Ensure use of custom logger class
from .database import Connection
+from . import BUFSIZE
+from .common import pretty_print_size
from .deltadump import INTEGER, BLOB, TIME, dump_table, load_table
-from .backends.common import NoSuchObject
+from .backends.common import NoSuchObject, CorruptedObjectError
import os
+import tempfile
+import bz2
import stat
log = logging.getLogger(__name__)
@@ -124,7 +128,11 @@ def cycle_metadata(backend, keep=10):
# However, the current metadata object should always be copied,
# so that even if there's a crash we don't end up without it
- backend.copy("s3ql_metadata", "s3ql_metadata_bak_0")
+ try:
+ backend.copy("s3ql_metadata", "s3ql_metadata_bak_0")
+ except NoSuchObject:
+ # In case of mkfs, there may be no metadata object yet
+ pass
cycle_fn("s3ql_metadata_new", "s3ql_metadata")
if cycle_fn is backend.copy:
@@ -252,3 +260,67 @@ def create_tables(conn):
CREATE VIEW ext_attributes_v AS
SELECT * FROM ext_attributes JOIN names ON names.id = name_id
""")
+
+def stream_write_bz2(ifh, ofh):
+ '''Compress *ifh* into *ofh* using bz2 compression'''
+
+ compr = bz2.BZ2Compressor(9)
+ while True:
+ buf = ifh.read(BUFSIZE)
+ if not buf:
+ break
+ buf = compr.compress(buf)
+ if buf:
+ ofh.write(buf)
+ buf = compr.flush()
+ if buf:
+ ofh.write(buf)
+
+def stream_read_bz2(ifh, ofh):
+ '''Uncompress bz2 compressed *ifh* into *ofh*'''
+
+ decompressor = bz2.BZ2Decompressor()
+ while True:
+ buf = ifh.read(BUFSIZE)
+ if not buf:
+ break
+ buf = decompressor.decompress(buf)
+ if buf:
+ ofh.write(buf)
+
+ if decompressor.unused_data or ifh.read(1) != b'':
+ raise CorruptedObjectError('Data after end of bz2 stream')
+
+def download_metadata(backend, db_file, name='s3ql_metadata'):
+ with tempfile.TemporaryFile() as tmpfh:
+ def do_read(fh):
+ tmpfh.seek(0)
+ tmpfh.truncate()
+ stream_read_bz2(fh, tmpfh)
+
+ log.info('Downloading and decompressing metadata...')
+ backend.perform_read(do_read, name)
+
+ log.info("Reading metadata...")
+ tmpfh.seek(0)
+ return restore_metadata(tmpfh, db_file)
+
+def dump_and_upload_metadata(backend, db, param):
+ with tempfile.TemporaryFile() as fh:
+ log.info('Dumping metadata...')
+ dump_metadata(db, fh)
+ upload_metadata(backend, fh, param)
+
+def upload_metadata(backend, fh, param):
+ log.info("Compressing and uploading metadata...")
+ def do_write(obj_fh):
+ fh.seek(0)
+ stream_write_bz2(fh, obj_fh)
+ return obj_fh
+ obj_fh = backend.perform_write(do_write, "s3ql_metadata_new",
+ metadata=param, is_compressed=True)
+ log.info('Wrote %s of compressed metadata.',
+ pretty_print_size(obj_fh.get_obj_size()))
+
+ log.info('Cycling metadata backups...')
+ cycle_metadata(backend)
diff --git a/src/s3ql/mkfs.py b/src/s3ql/mkfs.py
index 27f48fe..ba46e35 100644
--- a/src/s3ql/mkfs.py
+++ b/src/s3ql/mkfs.py
@@ -7,22 +7,20 @@ This program can be distributed under the terms of the GNU GPLv3.
'''
from .logging import logging, setup_logging, QuietError
-from . import CURRENT_FS_REV, CTRL_INODE, PICKLE_PROTOCOL, ROOT_INODE
+from . import CURRENT_FS_REV, CTRL_INODE, ROOT_INODE
from .backends.comprenc import ComprencBackend
from .backends import s3
-from .common import (get_backend_cachedir, stream_write_bz2, get_backend,
- pretty_print_size, split_by_n)
+from .common import (get_backend_cachedir, get_backend, split_by_n,
+ freeze_basic_mapping)
from .database import Connection
-from .metadata import dump_metadata, create_tables
+from .metadata import dump_and_upload_metadata, create_tables
from .parse_args import ArgumentParser
from getpass import getpass
from base64 import b64encode
import os
-import pickle
import shutil
import stat
import sys
-import tempfile
import time
import atexit
@@ -95,7 +93,7 @@ def main(args=None):
log.warning('Warning: maximum object sizes less than 1 MiB will seriously degrade '
'performance.', extra={ 'force_log': True })
- plain_backend = get_backend(options, plain=True)
+ plain_backend = get_backend(options, raw=True)
atexit.register(plain_backend.close)
log.info("Before using S3QL, make sure to read the user's guide, especially\n"
@@ -170,23 +168,10 @@ def main(args=None):
param['last-modified'] = time.time()
log.info('Dumping metadata...')
- with tempfile.TemporaryFile() as fh:
- dump_metadata(db, fh)
- def do_write(obj_fh):
- fh.seek(0)
- stream_write_bz2(fh, obj_fh)
- return obj_fh
-
- # Store metadata first, and seq_no second so that if mkfs
- # is interrupted, fsck won't see a file system at all.
- log.info("Compressing and uploading metadata...")
- obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param,
- is_compressed=True)
- backend.store('s3ql_seq_no_%d' % param['seq_no'], b'Empty')
-
- log.info('Wrote %s of compressed metadata.', pretty_print_size(obj_fh.get_obj_size()))
+ dump_and_upload_metadata(backend, db, param)
+ backend.store('s3ql_seq_no_%d' % param['seq_no'], b'Empty')
with open(cachepath + '.params', 'wb') as fh:
- pickle.dump(param, fh, PICKLE_PROTOCOL)
+ fh.write(freeze_basic_mapping(param))
if data_pw is not None:
print('Please store the following master key in a safe location. It allows ',
diff --git a/src/s3ql/mount.py b/src/s3ql/mount.py
index 3192a86..44b8fbc 100644
--- a/src/s3ql/mount.py
+++ b/src/s3ql/mount.py
@@ -7,15 +7,16 @@ This program can be distributed under the terms of the GNU GPLv3.
'''
from .logging import logging, setup_logging, QuietError
-from . import fs, CURRENT_FS_REV, PICKLE_PROTOCOL
+from . import fs, CURRENT_FS_REV
from .backends.pool import BackendPool
from .block_cache import BlockCache
-from .common import (get_backend_cachedir, get_seq_no, stream_write_bz2, stream_read_bz2,
- get_backend_factory, pretty_print_size)
+from .common import (get_backend_cachedir, get_seq_no, get_backend_factory,
+ load_params, freeze_basic_mapping)
from .daemonize import daemonize
from .database import Connection
from .inode_cache import InodeCache
-from .metadata import cycle_metadata, dump_metadata, restore_metadata
+from .metadata import (download_metadata, upload_metadata, dump_and_upload_metadata,
+ dump_metadata)
from .parse_args import ArgumentParser
from .exit_stack import ExitStack
from threading import Thread
@@ -27,7 +28,6 @@ import itertools
import os
import platform
import subprocess
-import pickle
import re
import signal
import resource
@@ -116,7 +116,8 @@ def main(args=None):
import pstats
prof = cProfile.Profile()
- backend_factory = get_backend_factory(options)
+ backend_factory = get_backend_factory(options.storage_url, options.backend_options,
+ options.authfile, options.compress)
backend_pool = BackendPool(backend_factory)
atexit.register(backend_pool.flush)
@@ -242,26 +243,12 @@ def main(args=None):
del backend['s3ql_seq_no_%d' % param['seq_no']]
param['seq_no'] -= 1
with open(cachepath + '.params', 'wb') as fh:
- pickle.dump(param, fh, PICKLE_PROTOCOL)
+ fh.write(freeze_basic_mapping(param))
elif seq_no == param['seq_no']:
param['last-modified'] = time.time()
-
- log.info('Dumping metadata...')
- with tempfile.TemporaryFile() as fh:
- dump_metadata(db, fh)
- def do_write(obj_fh):
- fh.seek(0)
- stream_write_bz2(fh, obj_fh)
- return obj_fh
-
- log.info("Compressing and uploading metadata...")
- obj_fh = backend.perform_write(do_write, "s3ql_metadata_new",
- metadata=param, is_compressed=True)
- log.info('Wrote %s of compressed metadata.', pretty_print_size(obj_fh.get_obj_size()))
- log.info('Cycling metadata backups...')
- cycle_metadata(backend)
+ dump_and_upload_metadata(backend, db, param)
with open(cachepath + '.params', 'wb') as fh:
- pickle.dump(param, fh, PICKLE_PROTOCOL)
+ fh.write(freeze_basic_mapping(param))
else:
log.error('Remote metadata is newer than local (%d vs %d), '
'refusing to overwrite!', seq_no, param['seq_no'])
@@ -376,8 +363,7 @@ def get_metadata(backend, cachepath):
# Check for cached metadata
db = None
if os.path.exists(cachepath + '.params'):
- with open(cachepath + '.params', 'rb') as fh:
- param = pickle.load(fh)
+ param = load_params(cachepath + '.params')
if param['seq_no'] < seq_no:
log.info('Ignoring locally cached metadata (outdated).')
param = backend.lookup('s3ql_metadata')
@@ -430,21 +416,10 @@ def get_metadata(backend, cachepath):
# Download metadata
if not db:
- with tempfile.TemporaryFile() as tmpfh:
- def do_read(fh):
- tmpfh.seek(0)
- tmpfh.truncate()
- stream_read_bz2(fh, tmpfh)
-
- log.info('Downloading and decompressing metadata...')
- backend.perform_read(do_read, "s3ql_metadata")
-
- log.info("Reading metadata...")
- tmpfh.seek(0)
- db = restore_metadata(tmpfh, cachepath + '.db')
+ db = download_metadata(backend, cachepath + '.db')
with open(cachepath + '.params', 'wb') as fh:
- pickle.dump(param, fh, PICKLE_PROTOCOL)
+ fh.write(freeze_basic_mapping(param))
return (param, db)
@@ -454,7 +429,7 @@ def mark_metadata_dirty(backend, cachepath, param):
param['seq_no'] += 1
param['needs_fsck'] = True
with open(cachepath + '.params', 'wb') as fh:
- pickle.dump(param, fh, PICKLE_PROTOCOL)
+ fh.write(freeze_basic_mapping(param))
# Fsync to make sure that the updated sequence number is committed to
# disk. Otherwise, a crash immediately after mount could result in both
@@ -679,17 +654,7 @@ class MetadataUploadThread(Thread):
# Temporarily decrease sequence no, this is not the final upload
self.param['seq_no'] -= 1
- def do_write(obj_fh):
- fh.seek(0)
- stream_write_bz2(fh, obj_fh)
- return obj_fh
- log.info("Compressing and uploading metadata...")
- obj_fh = backend.perform_write(do_write, "s3ql_metadata_new",
- metadata=self.param, is_compressed=True)
- log.info('Wrote %s of compressed metadata.',
- pretty_print_size(obj_fh.get_obj_size()))
- log.info('Cycling metadata backups...')
- cycle_metadata(backend)
+ upload_metadata(backend, fh, self.param)
self.param['seq_no'] += 1
fh.close()
diff --git a/src/s3ql/remove.py b/src/s3ql/remove.py
index ac7ac22..8e98e26 100644
--- a/src/s3ql/remove.py
+++ b/src/s3ql/remove.py
@@ -8,11 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3.
from .logging import logging, setup_logging, QuietError
from .common import assert_fs_owner, path2bytes
-from . import PICKLE_PROTOCOL
from .parse_args import ArgumentParser
import llfuse
import os
-import pickle
import sys
import textwrap
@@ -54,10 +52,9 @@ def main(args=None):
ctrlfile = assert_fs_owner(name)
fstat_p = os.stat(os.path.dirname(os.path.abspath(name)))
- llfuse.setxattr(ctrlfile, 'rmtree', pickle.dumps((fstat_p.st_ino,
- path2bytes(os.path.basename(name))),
- PICKLE_PROTOCOL))
-
+ cmd = ('(%d, %r)' % (fstat_p.st_ino,
+ path2bytes(os.path.basename(name)))).encode()
+ llfuse.setxattr(ctrlfile, 'rmtree', cmd)
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/src/s3ql/umount.py b/src/s3ql/umount.py
index 164535c..f8affd3 100644
--- a/src/s3ql/umount.py
+++ b/src/s3ql/umount.py
@@ -8,11 +8,10 @@ This program can be distributed under the terms of the GNU GPLv3.
from .logging import logging, setup_logging
from . import CTRL_NAME
-from .common import assert_s3ql_mountpoint
+from .common import assert_s3ql_mountpoint, parse_literal
from .parse_args import ArgumentParser
import llfuse
import os
-import pickle
import subprocess
import platform
import sys
@@ -120,7 +119,7 @@ def blocking_umount(mountpoint):
# Get pid
log.debug('Trying to get pid')
- pid = pickle.loads(llfuse.getxattr(ctrlfile, 's3ql_pid?'))
+ pid = parse_literal(llfuse.getxattr(ctrlfile, 's3ql_pid?'), int)
log.debug('PID is %d', pid)
# Get command line to make race conditions less-likely
diff --git a/src/s3ql/verify.py b/src/s3ql/verify.py
index c47a946..104998a 100644
--- a/src/s3ql/verify.py
+++ b/src/s3ql/verify.py
@@ -11,7 +11,6 @@ from .mount import get_metadata
from . import BUFSIZE
from .common import (get_backend_factory, get_backend_cachedir, pretty_print_size,
AsyncFn)
-from .backends.pool import BackendPool
from .backends.common import NoSuchObject, CorruptedObjectError
from .parse_args import ArgumentParser
from queue import Queue, Full as QueueFull
@@ -91,15 +90,14 @@ def main(args=None):
options = parse_args(args)
setup_logging(options)
- backend_factory = get_backend_factory(options)
- backend_pool = BackendPool(backend_factory)
- atexit.register(backend_pool.flush)
+ backend_factory = get_backend_factory(options.storage_url, options.backend_options,
+ options.authfile)
# Get paths
cachepath = get_backend_cachedir(options.storage_url, options.cachedir)
# Retrieve metadata
- with backend_pool() as backend:
+ with backend_factory() as backend:
(param, db) = get_metadata(backend, cachepath)
retrieve_objects(db, backend_factory, options.corrupted_file,
@@ -121,16 +119,14 @@ def retrieve_objects(db, backend_factory, corrupted_fh, missing_fh,
queue = Queue(thread_count)
threads = []
- backends = []
for _ in range(thread_count):
- b = backend_factory()
- t = AsyncFn(_retrieve_loop, queue, b, corrupted_fh, missing_fh, full)
+ t = AsyncFn(_retrieve_loop, queue, backend_factory, corrupted_fh,
+ missing_fh, full)
# Don't wait for worker threads, gives deadlock if main thread
# terminates with exception
t.daemon = True
t.start()
threads.append(t)
- backends.append(b)
total_size = db.get_val('SELECT SUM(size) FROM objects')
total_count = db.get_val('SELECT COUNT(id) FROM objects')
@@ -182,12 +178,9 @@ def retrieve_objects(db, backend_factory, corrupted_fh, missing_fh,
for t in threads:
t.join_and_raise()
- for backend in backends:
- backend.close()
-
log.info('Verified all %d storage objects.', i)
-def _retrieve_loop(queue, backend, corrupted_fh, missing_fh, full=False):
+def _retrieve_loop(queue, backend_factory, corrupted_fh, missing_fh, full=False):
'''Retrieve object ids arriving in *queue* from *backend*
If *full* is False, lookup and read metadata. If *full* is True,
@@ -199,30 +192,31 @@ def _retrieve_loop(queue, backend, corrupted_fh, missing_fh, full=False):
Terminate when None is received.
'''
- while True:
- obj_id = queue.get()
- if obj_id is None:
- break
-
- log.debug('reading object %s', obj_id)
- def do_read(fh):
- while True:
- buf = fh.read(BUFSIZE)
- if not buf:
- break
-
- key = 's3ql_data_%d' % obj_id
- try:
- if full:
- backend.perform_read(do_read, key)
- else:
- backend.lookup(key)
- except NoSuchObject:
- log.warning('Backend seems to have lost object %d', obj_id)
- print(key, file=missing_fh)
- except CorruptedObjectError:
- log.warning('Object %d is corrupted', obj_id)
- print(key, file=corrupted_fh)
+ with backend_factory() as backend:
+ while True:
+ obj_id = queue.get()
+ if obj_id is None:
+ break
+
+ log.debug('reading object %s', obj_id)
+ def do_read(fh):
+ while True:
+ buf = fh.read(BUFSIZE)
+ if not buf:
+ break
+
+ key = 's3ql_data_%d' % obj_id
+ try:
+ if full:
+ backend.perform_read(do_read, key)
+ else:
+ backend.lookup(key)
+ except NoSuchObject:
+ log.warning('Backend seems to have lost object %d', obj_id)
+ print(key, file=missing_fh)
+ except CorruptedObjectError:
+ log.warning('Object %d is corrupted', obj_id)
+ print(key, file=corrupted_fh)
if __name__ == '__main__':
main(sys.argv[1:])