diff options
author | Nikolaus Rath <Nikolaus@rath.org> | 2016-03-09 10:08:43 -0800 |
---|---|---|
committer | Nikolaus Rath <Nikolaus@rath.org> | 2016-03-09 10:08:43 -0800 |
commit | daf84979549ef394924dd5dacae76bd3fc79ddb2 (patch) | |
tree | d62bf4e62e2ad2ffa3f49c147d1341eb64b5896c /src | |
parent | 60e9b1ff268a057f3cce12586e991095e7b0dda1 (diff) |
Import s3ql_1.11.orig.tar.bz2
Diffstat (limited to 'src')
-rw-r--r-- | src/s3ql.egg-info/PKG-INFO | 4 | ||||
-rw-r--r-- | src/s3ql.egg-info/SOURCES.txt | 1 | ||||
-rw-r--r-- | src/s3ql/__init__.py | 4 | ||||
-rw-r--r-- | src/s3ql/adm.py | 235 | ||||
-rw-r--r-- | src/s3ql/backends/common.py | 246 | ||||
-rw-r--r-- | src/s3ql/backends/gs.py | 10 | ||||
-rw-r--r-- | src/s3ql/backends/local.py | 40 | ||||
-rw-r--r-- | src/s3ql/backends/s3.py | 8 | ||||
-rw-r--r-- | src/s3ql/backends/s3c.py | 80 | ||||
-rw-r--r-- | src/s3ql/backends/swift.py | 198 | ||||
-rw-r--r-- | src/s3ql/block_cache.py | 16 | ||||
-rw-r--r-- | src/s3ql/common.py | 14 | ||||
-rw-r--r-- | src/s3ql/fsck.py | 46 | ||||
-rw-r--r-- | src/s3ql/inode_cache.py | 1 | ||||
-rw-r--r-- | src/s3ql/metadata.py | 6 | ||||
-rw-r--r-- | src/s3ql/mkfs.py | 32 | ||||
-rw-r--r-- | src/s3ql/mount.py | 57 | ||||
-rw-r--r-- | src/s3ql/umount.py | 2 |
18 files changed, 443 insertions, 557 deletions
diff --git a/src/s3ql.egg-info/PKG-INFO b/src/s3ql.egg-info/PKG-INFO index ee0e64c..e45ca54 100644 --- a/src/s3ql.egg-info/PKG-INFO +++ b/src/s3ql.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.1 Name: s3ql -Version: 1.10 +Version: 1.11 Summary: a full-featured file system for online data storage Home-page: http://code.google.com/p/s3ql/ Author: Nikolaus Rath @@ -19,7 +19,7 @@ Description: .. -*- mode: rst -*- <http://aws.amazon.com/s3 Amazon S3>`_ or `OpenStack <http://openstack.org/projects/storage/>`_. S3QL effectively provides a hard disk of dynamic, infinite capacity that can be accessed from - any computer with internet access. + any computer with internet access running Linux, FreeBSD or OS-X. S3QL is a standard conforming, full featured UNIX file system that is conceptually indistinguishable from any local file system. diff --git a/src/s3ql.egg-info/SOURCES.txt b/src/s3ql.egg-info/SOURCES.txt index b8e165e..b7d4375 100644 --- a/src/s3ql.egg-info/SOURCES.txt +++ b/src/s3ql.egg-info/SOURCES.txt @@ -220,7 +220,6 @@ tests/t4_fuse.py tests/t5_cp.py tests/t5_ctrl.py tests/t5_fsck.py -tests/t5_fsck.py.orig tests/t5_full.py tests/t5_lock_rm.py util/cmdline_lexer.py diff --git a/src/s3ql/__init__.py b/src/s3ql/__init__.py index 558b31c..ca51a3a 100644 --- a/src/s3ql/__init__.py +++ b/src/s3ql/__init__.py @@ -14,8 +14,8 @@ __all__ = [ 'adm', 'backends', 'block_cache', 'cleanup_manager', 'common', 'parse_args', 'remove', 'statfs', 'umount', 'VERSION', 'CURRENT_FS_REV', 'REV_VER_MAP' ] -VERSION = '1.10' -CURRENT_FS_REV = 15 +VERSION = '1.11' +CURRENT_FS_REV = 16 # Maps file system revisions to the last S3QL version that # supported this revision. diff --git a/src/s3ql/adm.py b/src/s3ql/adm.py index deb7e7f..fc7a06c 100644 --- a/src/s3ql/adm.py +++ b/src/s3ql/adm.py @@ -8,18 +8,16 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import from . import CURRENT_FS_REV, REV_VER_MAP -from .backends.common import BetterBucket, get_bucket, NoSuchBucket -from .common import (QuietError, BUFSIZE, setup_logging, get_bucket_cachedir, get_seq_no, +from .backends.common import BetterBackend, get_backend, DanglingStorageURL +from .common import (QuietError, setup_logging, get_backend_cachedir, get_seq_no, stream_write_bz2, stream_read_bz2, CTRL_INODE) -from .database import Connection, NoSuchRowError -from .metadata import restore_metadata, cycle_metadata, dump_metadata, create_tables +from .database import Connection +from .metadata import restore_metadata, cycle_metadata, dump_metadata from .parse_args import ArgumentParser from datetime import datetime as Datetime from getpass import getpass -from llfuse import ROOT_INODE import cPickle as pickle import logging -import lzma import os import shutil import stat @@ -34,7 +32,7 @@ def parse_args(args): '''Parse command line''' parser = ArgumentParser( - description="Manage S3QL Buckets.", + description="Manage S3QL File Systems.", epilog=textwrap.dedent('''\ Hint: run `%(prog)s <action> --help` to get help on the additional arguments that the different actions take.''')) @@ -46,11 +44,11 @@ def parse_args(args): subparsers = parser.add_subparsers(metavar='<action>', dest='action', help='may be either of') - subparsers.add_parser("passphrase", help="change bucket passphrase", + subparsers.add_parser("passphrase", help="change file system passphrase", parents=[pparser]) subparsers.add_parser("upgrade", help="upgrade file system to newest revision", parents=[pparser]) - subparsers.add_parser("clear", help="delete all S3QL data from the bucket", + subparsers.add_parser("clear", help="delete file system and all data", parents=[pparser]) subparsers.add_parser("download-metadata", help="Interactively download metadata backups. " @@ -88,32 +86,32 @@ def main(args=None): if options.action == 'clear': try: - bucket = get_bucket(options, plain=True) - except NoSuchBucket as exc: + backend = get_backend(options, plain=True) + except DanglingStorageURL as exc: raise QuietError(str(exc)) - return clear(bucket, - get_bucket_cachedir(options.storage_url, options.cachedir)) + return clear(backend, + get_backend_cachedir(options.storage_url, options.cachedir)) try: - bucket = get_bucket(options) - except NoSuchBucket as exc: + backend = get_backend(options) + except DanglingStorageURL as exc: raise QuietError(str(exc)) if options.action == 'upgrade': - return upgrade(bucket, get_bucket_cachedir(options.storage_url, + return upgrade(backend, get_backend_cachedir(options.storage_url, options.cachedir)) if options.action == 'passphrase': - return change_passphrase(bucket) + return change_passphrase(backend) if options.action == 'download-metadata': - return download_metadata(bucket, options.storage_url) + return download_metadata(backend, options.storage_url) -def download_metadata(bucket, storage_url): +def download_metadata(backend, storage_url): '''Download old metadata backups''' - backups = sorted(bucket.list('s3ql_metadata_bak_')) + backups = sorted(backend.list('s3ql_metadata_bak_')) if not backups: raise QuietError('No metadata backups found.') @@ -121,7 +119,7 @@ def download_metadata(bucket, storage_url): log.info('The following backups are available:') log.info('%3s %-23s %-15s', 'No', 'Name', 'Date') for (i, name) in enumerate(backups): - params = bucket.lookup(name) + params = backend.lookup(name) if 'last-modified' in params: date = Datetime.fromtimestamp(params['last-modified']).strftime('%Y-%m-%d %H:%M:%S') else: @@ -138,19 +136,19 @@ def download_metadata(bucket, storage_url): except: log.warn('Invalid input') - cachepath = get_bucket_cachedir(storage_url, '.') + cachepath = get_backend_cachedir(storage_url, '.') for i in ('.db', '.params'): if os.path.exists(cachepath + i): raise QuietError('%s already exists, aborting.' % cachepath + i) - param = bucket.lookup(name) + param = backend.lookup(name) try: log.info('Downloading and decompressing %s...', name) def do_read(fh): tmpfh = tempfile.TemporaryFile() stream_read_bz2(fh, tmpfh) return tmpfh - tmpfh = bucket.perform_read(do_read, name) + tmpfh = backend.perform_read(do_read, name) os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC, stat.S_IRUSR | stat.S_IWUSR)) db = Connection(cachepath + '.db.tmp', fast_mode=True) @@ -167,17 +165,17 @@ def download_metadata(bucket, storage_url): # Raise sequence number so that fsck.s3ql actually uses the # downloaded backup - seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in bucket.list('s3ql_seq_no_') ] + seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in backend.list('s3ql_seq_no_') ] param['seq_no'] = max(seq_nos) + 1 pickle.dump(param, open(cachepath + '.params', 'wb'), 2) -def change_passphrase(bucket): - '''Change bucket passphrase''' +def change_passphrase(backend): + '''Change file system passphrase''' - if not isinstance(bucket, BetterBucket) and bucket.passphrase: - raise QuietError('Bucket is not encrypted.') + if not isinstance(backend, BetterBackend) and backend.passphrase: + raise QuietError('File system is not encrypted.') - data_pw = bucket.passphrase + data_pw = backend.passphrase if sys.stdin.isatty(): wrap_pw = getpass("Enter new encryption password: ") @@ -186,12 +184,12 @@ def change_passphrase(bucket): else: wrap_pw = sys.stdin.readline().rstrip() - bucket.passphrase = wrap_pw - bucket['s3ql_passphrase'] = data_pw - bucket.passphrase = data_pw + backend.passphrase = wrap_pw + backend['s3ql_passphrase'] = data_pw + backend.passphrase = data_pw -def clear(bucket, cachepath): - print('I am about to delete the S3QL file system in %s.' % bucket, +def clear(backend, cachepath): + print('I am about to delete the S3QL file system in %s.' % backend, 'Please enter "yes" to continue.', '> ', sep='\n', end='') if sys.stdin.readline().strip().lower() != 'yes': @@ -208,7 +206,7 @@ def clear(bucket, cachepath): if os.path.exists(name): shutil.rmtree(name) - bucket.clear() + backend.clear() log.info('File system deleted.') log.info('Note: it may take a while for the removals to propagate through the backend.') @@ -220,16 +218,17 @@ def get_old_rev_msg(rev, prog): $ wget http://s3ql.googlecode.com/files/s3ql-%(version)s.tar.bz2 $ tar xjf s3ql-%(version)s.tar.bz2 + $ (cd s3ql-%(version)s; ./setup.py build_ext) $ s3ql-%(version)s/bin/%(prog)s <options> ''' % { 'version': REV_VER_MAP[rev], 'prog': prog }) -def upgrade(bucket, cachepath): +def upgrade(backend, cachepath): '''Upgrade file system to newest revision''' log.info('Getting file system parameters..') - seq_nos = list(bucket.list('s3ql_seq_no_')) + seq_nos = list(backend.list('s3ql_seq_no_')) if (seq_nos[0].endswith('.meta') or seq_nos[0].endswith('.dat')): print(textwrap.dedent(''' @@ -241,7 +240,7 @@ def upgrade(bucket, cachepath): ''')) print(get_old_rev_msg(11 + 1, 's3qladm')) raise QuietError() - seq_no = get_seq_no(bucket) + seq_no = get_seq_no(backend) # Check for cached metadata db = None @@ -249,12 +248,12 @@ def upgrade(bucket, cachepath): param = pickle.load(open(cachepath + '.params', 'rb')) if param['seq_no'] < seq_no: log.info('Ignoring locally cached metadata (outdated).') - param = bucket.lookup('s3ql_metadata') + param = backend.lookup('s3ql_metadata') else: log.info('Using cached metadata.') db = Connection(cachepath + '.db') else: - param = bucket.lookup('s3ql_metadata') + param = backend.lookup('s3ql_metadata') # Check for unclean shutdown if param['seq_no'] < seq_no: @@ -309,33 +308,30 @@ def upgrade(bucket, cachepath): if not db: log.info("Downloading & uncompressing metadata...") def do_read(fh): - os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC, - stat.S_IRUSR | stat.S_IWUSR)) - db = Connection(cachepath + '.db.tmp', fast_mode=True) - try: - restore_legacy_metadata(fh, db) - finally: - # If metata reading has to be retried, we don't want to hold - # a lock on the database. - db.close() - bucket.perform_read(do_read, "s3ql_metadata") + tmpfh = tempfile.TemporaryFile() + stream_read_bz2(fh, tmpfh) + return tmpfh + tmpfh = backend.perform_read(do_read, "s3ql_metadata") + os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC, + stat.S_IRUSR | stat.S_IWUSR)) + db = Connection(cachepath + '.db.tmp', fast_mode=True) + log.info("Reading metadata...") + tmpfh.seek(0) + restore_metadata(tmpfh, db) + db.close() os.rename(cachepath + '.db.tmp', cachepath + '.db') db = Connection(cachepath + '.db') - log.info('Upgrading from revision %d to %d...', param['revision'], CURRENT_FS_REV) - if 'max_obj_size' not in param: - param['max_obj_size'] = param['blocksize'] - if 'blocksize' in param: - del param['blocksize'] + log.info('Upgrading from revision %d to %d...', param['revision'], CURRENT_FS_REV) - db.execute('UPDATE inodes SET mtime=mtime+?, ctime=ctime+?, atime=atime+?', - (time.timezone, time.timezone, time.timezone)) + db.execute('UPDATE inodes SET mode=? WHERE id=?', + (stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR, CTRL_INODE)) param['revision'] = CURRENT_FS_REV param['last-modified'] = time.time() - cycle_metadata(bucket) + cycle_metadata(backend) log.info('Dumping metadata...') fh = tempfile.TemporaryFile() dump_metadata(db, fh) @@ -345,8 +341,8 @@ def upgrade(bucket, cachepath): return obj_fh log.info("Compressing and uploading metadata...") - bucket.store('s3ql_seq_no_%d' % param['seq_no'], 'Empty') - obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=param, + backend.store('s3ql_seq_no_%d' % param['seq_no'], 'Empty') + obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param, is_compressed=True) log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2) pickle.dump(param, open(cachepath + '.params', 'wb'), 2) @@ -354,123 +350,6 @@ def upgrade(bucket, cachepath): db.execute('ANALYZE') db.execute('VACUUM') -def _add_name(db, name): - '''Get id for *name* and increase refcount - - Name is inserted in table if it does not yet exist. - ''' - - try: - name_id = db.get_val('SELECT id FROM names WHERE name=?', (name,)) - except NoSuchRowError: - name_id = db.rowid('INSERT INTO names (name, refcount) VALUES(?,?)', - (name, 1)) - else: - db.execute('UPDATE names SET refcount=refcount+1 WHERE id=?', (name_id,)) - - return name_id - -def renumber_inodes(db): - '''Renumber inodes''' - - log.info('Renumbering inodes...') - for table in ('inodes', 'inode_blocks', 'symlink_targets', - 'contents', 'names', 'blocks', 'objects', 'ext_attributes'): - db.execute('ALTER TABLE %s RENAME TO %s_old' % (table, table)) - - for table in ('contents_v', 'ext_attributes_v'): - db.execute('DROP VIEW %s' % table) - - create_tables(db) - for table in ('names', 'blocks', 'objects'): - db.execute('DROP TABLE %s' % table) - db.execute('ALTER TABLE %s_old RENAME TO %s' % (table, table)) - - log.info('..mapping..') - db.execute('CREATE TEMPORARY TABLE inode_map (rowid INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER UNIQUE)') - db.execute('INSERT INTO inode_map (rowid, id) VALUES(?,?)', (ROOT_INODE, ROOT_INODE)) - db.execute('INSERT INTO inode_map (rowid, id) VALUES(?,?)', (CTRL_INODE, CTRL_INODE)) - db.execute('INSERT INTO inode_map (id) SELECT id FROM inodes_old WHERE id > ? ORDER BY ctime ASC', - (CTRL_INODE,)) - - log.info('..inodes..') - db.execute('INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount,size,locked,rdev) ' - 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = inodes_old.id), ' - ' mode,uid,gid,mtime,atime,ctime,refcount,size,locked,rdev FROM inodes_old') - - log.info('..inode_blocks..') - db.execute('INSERT INTO inode_blocks (inode, blockno, block_id) ' - 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = inode_blocks_old.inode), ' - ' blockno, block_id FROM inode_blocks_old') - - log.info('..contents..') - db.execute('INSERT INTO contents (inode, parent_inode, name_id) ' - 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = contents_old.inode), ' - ' (SELECT rowid FROM inode_map WHERE inode_map.id = contents_old.parent_inode), ' - ' name_id FROM contents_old') - - log.info('..symlink_targets..') - db.execute('INSERT INTO symlink_targets (inode, target) ' - 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = symlink_targets_old.inode), ' - ' target FROM symlink_targets_old') - - log.info('..ext_attributes..') - db.execute('INSERT INTO ext_attributes (inode, name_id, value) ' - 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = ext_attributes_old.inode), ' - ' name_id, value FROM ext_attributes_old') - - for table in ('inodes', 'inode_blocks', 'symlink_targets', - 'contents', 'ext_attributes'): - db.execute('DROP TABLE %s_old' % table) - - db.execute('DROP TABLE inode_map') - -def restore_legacy_metadata(ifh, conn): - - # Note: unpickling is terribly slow if fh is not a real file object, so - # uncompressing to a temporary file also gives a performance boost - log.info('Downloading and decompressing metadata...') - tmp = tempfile.TemporaryFile() - decompressor = lzma.LZMADecompressor() - while True: - buf = ifh.read(BUFSIZE) - if not buf: - break - buf = decompressor.decompress(buf) - if buf: - tmp.write(buf) - del decompressor - tmp.seek(0) - - log.info("Reading metadata...") - unpickler = pickle.Unpickler(tmp) - (to_dump, columns) = unpickler.load() - create_tables(conn) - conn.execute(""" - DROP VIEW ext_attributes_v; - DROP TABLE ext_attributes; - CREATE TABLE ext_attributes ( - inode INTEGER NOT NULL REFERENCES inodes(id), - name BLUB NOT NULL, - value BLOB NOT NULL, - - PRIMARY KEY (inode, name) - )""") - - for (table, _) in to_dump: - log.info('..%s..', table) - col_str = ', '.join(columns[table]) - val_str = ', '.join('?' for _ in columns[table]) - sql_str = 'INSERT INTO %s (%s) VALUES(%s)' % (table, col_str, val_str) - while True: - buf = unpickler.load() - if not buf: - break - for row in buf: - conn.execute(sql_str, row) - - tmp.close() - conn.execute('ANALYZE') if __name__ == '__main__': main(sys.argv[1:]) diff --git a/src/s3ql/backends/common.py b/src/s3ql/backends/common.py index 8d12d44..ca28206 100644 --- a/src/s3ql/backends/common.py +++ b/src/s3ql/backends/common.py @@ -29,6 +29,7 @@ import re import stat import struct import sys +import math import threading import time import zlib @@ -130,8 +131,8 @@ def http_connection(hostname, port, ssl=False): def sha256(s): return hashlib.sha256(s).digest() -class BucketPool(object): - '''A pool of buckets +class BackendPool(object): + '''A pool of backends This class is threadsafe. All methods (except for internal methods starting with underscore) may be called concurrently by different @@ -175,13 +176,13 @@ class BucketPool(object): self.push_conn(conn) -class AbstractBucket(object): +class AbstractBackend(object): '''Functionality shared between all backends. Instances behave similarly to dicts. They can be iterated over and indexed into, but raise a separate set of exceptions. - The bucket guarantees get after create consistency, i.e. a newly created + The backend guarantees get after create consistency, i.e. a newly created object will be immediately retrievable. Additional consistency guarantees may or may not be available and can be queried for with instance methods. ''' @@ -190,7 +191,7 @@ class AbstractBucket(object): needs_login = True def __init__(self): - super(AbstractBucket, self).__init__() + super(AbstractBackend, self).__init__() def __getitem__(self, key): return self.fetch(key)[0] @@ -213,22 +214,22 @@ class AbstractBucket(object): @retry def perform_read(self, fn, key): - '''Read bucket data using *fn*, retry on temporary failure + '''Read object data using *fn*, retry on temporary failure - Open bucket for reading, call `fn(fh)` and close bucket. If a temporary - error (as defined by `is_temp_failure`) occurs during opening, closing - or execution of *fn*, the operation is retried. + Open object for reading, call `fn(fh)` and close object. If a temporary error (as defined by + `is_temp_failure`) occurs during opening, closing or execution of *fn*, the operation is + retried. ''' with self.open_read(key) as fh: return fn(fh) @retry def perform_write(self, fn, key, metadata=None, is_compressed=False): - '''Read bucket data using *fn*, retry on temporary failure + '''Read object data using *fn*, retry on temporary failure - Open bucket for writing, call `fn(fh)` and close bucket. If a temporary - error (as defined by `is_temp_failure`) occurs during opening, closing - or execution of *fn*, the operation is retried. + Open object for writing, call `fn(fh)` and close object. If a temporary error (as defined by + `is_temp_failure`) occurs during opening, closing or execution of *fn*, the operation is + retried. ''' with self.open_write(key, metadata, is_compressed) as fh: @@ -238,8 +239,8 @@ class AbstractBucket(object): """Return data stored under `key`. Returns a tuple with the data and metadata. If only the data itself is - required, ``bucket[key]`` is a more concise notation for - ``bucket.fetch(key)[0]``. + required, ``backend[key]`` is a more concise notation for + ``backend.fetch(key)[0]``. """ def do_read(fh): @@ -255,8 +256,8 @@ class AbstractBucket(object): object. If no metadata is required, one can simply assign to the subscripted - bucket instead of using this function: ``bucket[key] = val`` is - equivalent to ``bucket.store(key, val)``. + backend instead of using this function: ``backend[key] = val`` is + equivalent to ``backend.store(key, val)``. """ self.perform_write(lambda fh: fh.write(val), key, metadata) @@ -265,15 +266,13 @@ class AbstractBucket(object): def is_temp_failure(self, exc): '''Return true if exc indicates a temporary error - Return true if the given exception is used by this bucket's backend - to indicate a temporary problem. Most instance methods automatically - retry the request in this case, so the caller does not need to - worry about temporary failures. + Return true if the given exception indicates a temporary problem. Most instance methods + automatically retry the request in this case, so the caller does not need to worry about + temporary failures. - However, in same cases (e.g. when reading or writing an object), the - request cannot automatically be retried. In these case this method can - be used to check for temporary problems and so that the request can - be manually restarted if applicable. + However, in same cases (e.g. when reading or writing an object), the request cannot + automatically be retried. In these case this method can be used to check for temporary + problems and so that the request can be manually restarted if applicable. ''' pass @@ -296,10 +295,9 @@ class AbstractBucket(object): def open_read(self, key): """Open object for reading - Return a tuple of a file-like object. Bucket contents can be read from - the file-like object, metadata is stored in its *metadata* attribute and - can be modified by the caller at will. The object must be closed - explicitly. + Return a file-like object. Data can be read using the `read` method. metadata is stored in + its *metadata* attribute and can be modified by the caller at will. The object must be + closed explicitly. """ pass @@ -308,26 +306,24 @@ class AbstractBucket(object): def open_write(self, key, metadata=None, is_compressed=False): """Open object for writing - `metadata` can be a dict of additional attributes to store with the - object. Returns a file-like object. The object must be closed - explicitly. After closing, the *get_obj_size* may be used to retrieve - the size of the stored object (which may differ from the size of the + `metadata` can be a dict of additional attributes to store with the object. Returns a file- + like object. The object must be closed explicitly. After closing, the *get_obj_size* may be + used to retrieve the size of the stored object (which may differ from the size of the written data). - The *is_compressed* parameter indicates that the caller is going - to write compressed data, and may be used to avoid recompression - by the bucket. + The *is_compressed* parameter indicates that the caller is going to write compressed data, + and may be used to avoid recompression by the backend. """ pass @abstractmethod def clear(self): - """Delete all objects in bucket""" + """Delete all objects in backend""" pass def contains(self, key): - '''Check if `key` is in bucket''' + '''Check if `key` is in backend''' try: self.lookup(key) @@ -340,16 +336,16 @@ class AbstractBucket(object): def delete(self, key, force=False): """Delete object stored under `key` - ``bucket.delete(key)`` can also be written as ``del bucket[key]``. + ``backend.delete(key)`` can also be written as ``del backend[key]``. If `force` is true, do not return an error if the key does not exist. """ pass @abstractmethod def list(self, prefix=''): - '''List keys in bucket + '''List keys in backend - Returns an iterator over all keys in the bucket. + Returns an iterator over all keys in the backend. ''' pass @@ -373,18 +369,18 @@ class AbstractBucket(object): self.copy(src, dest) self.delete(src) -class BetterBucket(AbstractBucket): +class BetterBackend(AbstractBackend): ''' This class adds encryption, compression and integrity protection to a plain - bucket. + backend. ''' - def __init__(self, passphrase, compression, bucket): - super(BetterBucket, self).__init__() + def __init__(self, passphrase, compression, backend): + super(BetterBackend, self).__init__() self.passphrase = passphrase self.compression = compression - self.bucket = bucket + self.backend = backend if compression not in ('bzip2', 'lzma', 'zlib', None): raise ValueError('Unsupported compression: %s' % compression) @@ -395,7 +391,7 @@ class BetterBucket(AbstractBucket): If the key does not exist, `NoSuchObject` is raised. """ - metadata = self.bucket.lookup(key) + metadata = self.backend.lookup(key) convert_legacy_metadata(metadata) return self._unwrap_meta(metadata) @@ -406,28 +402,26 @@ class BetterBucket(AbstractBucket): that's actually occupied by the object. ''' - return self.bucket.get_size(key) + return self.backend.get_size(key) def is_temp_failure(self, exc): '''Return true if exc indicates a temporary error - Return true if the given exception is used by this bucket's backend - to indicate a temporary problem. Most instance methods automatically - retry the request in this case, so the caller does not need to - worry about temporary failures. + Return true if the given exception indicates a temporary problem. Most instance methods + automatically retry the request in this case, so the caller does not need to worry about + temporary failures. - However, in same cases (e.g. when reading or writing an object), the - request cannot automatically be retried. In these case this method can - be used to check for temporary problems and so that the request can - be manually restarted if applicable. + However, in same cases (e.g. when reading or writing an object), the request cannot + automatically be retried. In these case this method can be used to check for temporary + problems and so that the request can be manually restarted if applicable. ''' - return self.bucket.is_temp_failure(exc) + return self.backend.is_temp_failure(exc) def _unwrap_meta(self, metadata): '''Unwrap metadata - If the bucket has a password set but the object is not encrypted, + If the backend has a password set but the object is not encrypted, `ObjectNotEncrypted` is raised. ''' @@ -440,7 +434,9 @@ class BetterBucket(AbstractBucket): elif not encrypted and self.passphrase: raise ObjectNotEncrypted() - buf = b64decode(metadata['meta']) + buf = b64decode(''.join(metadata[k] + for k in sorted(metadata.keys()) + if k.startswith('meta'))) if encrypted: buf = decrypt(buf, self.passphrase) @@ -460,15 +456,15 @@ class BetterBucket(AbstractBucket): def open_read(self, key): """Open object for reading - Return a tuple of a file-like object. Bucket contents can be read from - the file-like object, metadata is stored in its *metadata* attribute and - can be modified by the caller at will. The object must be closed explicitly. + Return a file-like object. Data can be read using the `read` method. metadata is stored in + its *metadata* attribute and can be modified by the caller at will. The object must be + closed explicitly. - If the bucket has a password set but the object is not encrypted, - `ObjectNotEncrypted` is raised. + If the backend has a password set but the object is not encrypted, `ObjectNotEncrypted` is + raised. """ - fh = self.bucket.open_read(key) + fh = self.backend.open_read(key) convert_legacy_metadata(fh.metadata) compr_alg = fh.metadata['compression'] @@ -513,7 +509,7 @@ class BetterBucket(AbstractBucket): The *is_compressed* parameter indicates that the caller is going to write compressed data, and may be used to avoid recompression - by the bucket. + by the backend. """ # We always store metadata (even if it's just None), so that we can @@ -525,12 +521,18 @@ class BetterBucket(AbstractBucket): if self.passphrase: meta_raw['encryption'] = 'AES_v2' nonce = struct.pack(b'<f', time.time()) + bytes(key) - meta_raw['meta'] = b64encode(encrypt(meta_buf, self.passphrase, nonce)) + meta_buf = b64encode(encrypt(meta_buf, self.passphrase, nonce)) else: meta_raw['encryption'] = 'None' - meta_raw['meta'] = b64encode(meta_buf) + meta_buf = b64encode(meta_buf) nonce = None + # Some backends restrict individual metadata fields to 256 bytes, + # so we split the data into several fields if necessary + chunksize = 255 + for i in range(int(math.ceil(len(meta_buf) / chunksize))): + meta_raw['meta-%2d' % i] = meta_buf[i*chunksize:(i+1)*chunksize] + if is_compressed or not self.compression: compr = None meta_raw['compression'] = 'None' @@ -544,7 +546,7 @@ class BetterBucket(AbstractBucket): compr = lzma.LZMACompressor(options={ 'level': 7 }) meta_raw['compression'] = 'LZMA' - fh = self.bucket.open_write(key, meta_raw) + fh = self.backend.open_write(key, meta_raw) if nonce: fh = EncryptFilter(fh, self.passphrase, nonce) @@ -554,27 +556,27 @@ class BetterBucket(AbstractBucket): return fh def clear(self): - """Delete all objects in bucket""" - return self.bucket.clear() + """Delete all objects in backend""" + return self.backend.clear() def contains(self, key): - '''Check if `key` is in bucket''' - return self.bucket.contains(key) + '''Check if `key` is in backend''' + return self.backend.contains(key) def delete(self, key, force=False): """Delete object stored under `key` - ``bucket.delete(key)`` can also be written as ``del bucket[key]``. + ``backend.delete(key)`` can also be written as ``del backend[key]``. If `force` is true, do not return an error if the key does not exist. """ - return self.bucket.delete(key, force) + return self.backend.delete(key, force) def list(self, prefix=''): - '''List keys in bucket + '''List keys in backend - Returns an iterator over all keys in the bucket. + Returns an iterator over all keys in the backend. ''' - return self.bucket.list(prefix) + return self.backend.list(prefix) def copy(self, src, dest): """Copy data stored under key `src` to key `dest` @@ -582,7 +584,7 @@ class BetterBucket(AbstractBucket): If `dest` already exists, it will be overwritten. The copying is done on the remote side. """ - return self.bucket.copy(src, dest) + return self.backend.copy(src, dest) def rename(self, src, dest): """Rename key `src` to `dest` @@ -590,7 +592,7 @@ class BetterBucket(AbstractBucket): If `dest` already exists, it will be overwritten. The rename is done on the remote side. """ - return self.bucket.rename(src, dest) + return self.backend.rename(src, dest) class AbstractInputFilter(object): @@ -1004,7 +1006,7 @@ def decrypt(buf, passphrase): class ObjectNotEncrypted(Exception): ''' Raised by the backend if an object was requested from an encrypted - bucket, but the object was stored without encryption. + backend, but the object was stored without encryption. We do not want to simply return the uncrypted object, because the caller may rely on the objects integrity being cryptographically @@ -1014,28 +1016,28 @@ class ObjectNotEncrypted(Exception): pass class NoSuchObject(Exception): - '''Raised if the requested object does not exist in the bucket''' + '''Raised if the requested object does not exist in the backend''' def __init__(self, key): super(NoSuchObject, self).__init__() self.key = key def __str__(self): - return 'Bucket does not have anything stored under key %r' % self.key + return 'Backend does not have anything stored under key %r' % self.key -class NoSuchBucket(Exception): - '''Raised if the requested bucket does not exist''' +class DanglingStorageURL(Exception): + '''Raised if the backend can't store data at the given location''' - def __init__(self, name): - super(NoSuchBucket, self).__init__() - self.name = name + def __init__(self, loc): + super(DanglingStorageURL, self).__init__() + self.loc = loc def __str__(self): - return 'Bucket %r does not exist' % self.name + return '%r does not exist' % self.loc class AuthorizationError(Exception): - '''Raised if the credentials don't give access to the requested bucket''' + '''Raised if the credentials don't give access to the requested backend''' def __init__(self, msg): super(AuthorizationError, self).__init__() @@ -1091,20 +1093,20 @@ def convert_legacy_metadata(meta): meta['compression'] = 'None' -def get_bucket(options, plain=False): - '''Return bucket for given storage-url +def get_backend(options, plain=False): + '''Return backend for given storage-url If *plain* is true, don't attempt to unlock and don't wrap into - BetterBucket. + BetterBackend. ''' - return get_bucket_factory(options, plain)() + return get_backend_factory(options, plain)() -def get_bucket_factory(options, plain=False, ssl=False): - '''Return factory producing bucket objects for given storage-url +def get_backend_factory(options, plain=False): + '''Return factory producing backend objects for given storage-url If *plain* is true, don't attempt to unlock and don't wrap into - BetterBucket. + BetterBackend. ''' hit = re.match(r'^([a-zA-Z0-9]+)://', options.storage_url) @@ -1117,7 +1119,7 @@ def get_bucket_factory(options, plain=False, ssl=False): except ImportError: raise QuietError('No such backend: %s' % hit.group(1)) - bucket_class = getattr(sys.modules[backend_name], 'Bucket') + backend_class = getattr(sys.modules[backend_name], 'Backend') # Read authfile config = ConfigParser.SafeConfigParser() @@ -1129,7 +1131,7 @@ def get_bucket_factory(options, plain=False, ssl=False): backend_login = None backend_pw = None - bucket_passphrase = None + backend_passphrase = None for section in config.sections(): def getopt(name): try: @@ -1144,34 +1146,38 @@ def get_bucket_factory(options, plain=False, ssl=False): backend_login = backend_login or getopt('backend-login') backend_pw = backend_pw or getopt('backend-password') - bucket_passphrase = bucket_passphrase or getopt('bucket-passphrase') + backend_passphrase = backend_passphrase or getopt('fs-passphrase') + if backend_passphrase is None and getopt('bucket-passphrase') is not None: + backend_passphrase = getopt('bucket-passphrase') + log.warn("Warning: the 'bucket-passphrase' configuration option has been " + "renamed to 'fs-passphrase'! Please update your authinfo file.") - if not backend_login and bucket_class.needs_login: + if not backend_login and backend_class.needs_login: if sys.stdin.isatty(): backend_login = getpass("Enter backend login: ") else: backend_login = sys.stdin.readline().rstrip() - if not backend_pw and bucket_class.needs_login: + if not backend_pw and backend_class.needs_login: if sys.stdin.isatty(): backend_pw = getpass("Enter backend password: ") else: backend_pw = sys.stdin.readline().rstrip() try: - bucket = bucket_class(options.storage_url, backend_login, backend_pw, + backend = backend_class(options.storage_url, backend_login, backend_pw, options.ssl) - # Do not use bucket.lookup(), this would use a HEAD request and + # Do not use backend.lookup(), this would use a HEAD request and # not provide any useful error messages if something goes wrong # (e.g. wrong credentials) - _ = bucket['s3ql_passphrase'] + _ = backend['s3ql_passphrase'] - except NoSuchBucket: - raise QuietError('Bucket does not exist') + except DanglingStorageURL as exc: + raise QuietError(str(exc)) except AuthorizationError: - raise QuietError('No permission to access bucket.') + raise QuietError('No permission to access backend.') except AuthenticationError: raise QuietError('Invalid credentials or skewed system clock.') @@ -1183,16 +1189,16 @@ def get_bucket_factory(options, plain=False, ssl=False): encrypted = True if plain: - return lambda: bucket_class(options.storage_url, backend_login, backend_pw, + return lambda: backend_class(options.storage_url, backend_login, backend_pw, options.ssl) - if encrypted and not bucket_passphrase: + if encrypted and not backend_passphrase: if sys.stdin.isatty(): - bucket_passphrase = getpass("Enter bucket encryption passphrase: ") + backend_passphrase = getpass("Enter file system encryption passphrase: ") else: - bucket_passphrase = sys.stdin.readline().rstrip() + backend_passphrase = sys.stdin.readline().rstrip() elif not encrypted: - bucket_passphrase = None + backend_passphrase = None if hasattr(options, 'compress'): compress = options.compress @@ -1200,17 +1206,17 @@ def get_bucket_factory(options, plain=False, ssl=False): compress = None if not encrypted: - return lambda: BetterBucket(None, compress, - bucket_class(options.storage_url, backend_login, backend_pw, + return lambda: BetterBackend(None, compress, + backend_class(options.storage_url, backend_login, backend_pw, options.ssl)) - tmp_bucket = BetterBucket(bucket_passphrase, compress, bucket) + tmp_backend = BetterBackend(backend_passphrase, compress, backend) try: - data_pw = tmp_bucket['s3ql_passphrase'] + data_pw = tmp_backend['s3ql_passphrase'] except ChecksumError: - raise QuietError('Wrong bucket passphrase') + raise QuietError('Wrong backend passphrase') - return lambda: BetterBucket(data_pw, compress, - bucket_class(options.storage_url, backend_login, backend_pw, + return lambda: BetterBackend(data_pw, compress, + backend_class(options.storage_url, backend_login, backend_pw, options.ssl)) diff --git a/src/s3ql/backends/gs.py b/src/s3ql/backends/gs.py index f746a21..ae58d5e 100644 --- a/src/s3ql/backends/gs.py +++ b/src/s3ql/backends/gs.py @@ -8,28 +8,26 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import from . import s3c -from .s3c import retry from s3ql.common import QuietError import logging import re -import xml.etree.cElementTree as ElementTree # Pylint goes berserk with false positives #pylint: disable=E1002,E1101,W0201 log = logging.getLogger("backends.gs") -class Bucket(s3c.Bucket): - """A bucket stored in Google Storage +class Backend(s3c.Backend): + """A backend to store data in Google Storage This class uses standard HTTP connections to connect to GS. - The bucket guarantees immediate get consistency and eventual list + The backend guarantees immediate get consistency and eventual list consistency. """ def __init__(self, storage_url, gs_key, gs_secret, use_ssl): - super(Bucket, self).__init__(storage_url, gs_key, gs_secret, use_ssl) + super(Backend, self).__init__(storage_url, gs_key, gs_secret, use_ssl) self.namespace = 'http://doc.s3.amazonaws.com/2006-03-01' diff --git a/src/s3ql/backends/local.py b/src/s3ql/backends/local.py index 423cfec..6d4e817 100644 --- a/src/s3ql/backends/local.py +++ b/src/s3ql/backends/local.py @@ -8,7 +8,7 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import -from .common import AbstractBucket, NoSuchBucket, NoSuchObject, ChecksumError +from .common import AbstractBackend, DanglingStorageURL, NoSuchObject, ChecksumError from ..common import BUFSIZE import shutil import logging @@ -19,27 +19,27 @@ import thread log = logging.getLogger("backend.local") -class Bucket(AbstractBucket): +class Backend(AbstractBackend): ''' - A bucket that is stored on the local hard disk + A backend that stores data on the local hard disk ''' needs_login = False def __init__(self, storage_url, backend_login, backend_pw, use_ssl=False): - '''Initialize local bucket + '''Initialize local backend Login and password are ignored. ''' # Unused argument #pylint: disable=W0613 - super(Bucket, self).__init__() + super(Backend, self).__init__() name = storage_url[len('local://'):] self.name = name if not os.path.exists(name): - raise NoSuchBucket(name) + raise DanglingStorageURL(name) def __str__(self): return 'local://%s' % self.name @@ -78,9 +78,9 @@ class Bucket(AbstractBucket): def open_read(self, key): """Open object for reading - Return a tuple of a file-like object. Bucket contents can be read from - the file-like object, metadata is stored in its *metadata* attribute and - can be modified by the caller at will. + Return a file-like object. Data can be read using the `read` method. metadata is stored in + its *metadata* attribute and can be modified by the caller at will. The object must be + closed explicitly. """ path = self._key_to_path(key) @@ -104,15 +104,13 @@ class Bucket(AbstractBucket): def open_write(self, key, metadata=None, is_compressed=False): """Open object for writing - `metadata` can be a dict of additional attributes to store with the - object. Returns a file-like object. The object must be closed - explicitly. After closing, the *get_obj_size* may be used to retrieve - the size of the stored object (which may differ from the size of the + `metadata` can be a dict of additional attributes to store with the object. Returns a file- + like object. The object must be closed explicitly. After closing, the *get_obj_size* may be + used to retrieve the size of the stored object (which may differ from the size of the written data). - The *is_compressed* parameter indicates that the caller is going - to write compressed data, and may be used to avoid recompression - by the bucket. + The *is_compressed* parameter indicates that the caller is going to write compressed data, + and may be used to avoid recompression by the backend. """ if metadata is None: @@ -145,7 +143,7 @@ class Bucket(AbstractBucket): return dest def clear(self): - """Delete all objects in bucket""" + """Delete all objects in backend""" for name in os.listdir(self.name): path = os.path.join(self.name, name) @@ -155,7 +153,7 @@ class Bucket(AbstractBucket): os.unlink(path) def contains(self, key): - '''Check if `key` is in bucket''' + '''Check if `key` is in backend''' path = self._key_to_path(key) try: @@ -169,7 +167,7 @@ class Bucket(AbstractBucket): def delete(self, key, force=False): """Delete object stored under `key` - ``bucket.delete(key)`` can also be written as ``del bucket[key]``. + ``backend.delete(key)`` can also be written as ``del backend[key]``. If `force` is true, do not return an error if the key does not exist. """ path = self._key_to_path(key) @@ -185,9 +183,9 @@ class Bucket(AbstractBucket): raise def list(self, prefix=''): - '''List keys in bucket + '''List keys in backend - Returns an iterator over all keys in the bucket. + Returns an iterator over all keys in the backend. ''' if prefix: base = os.path.dirname(self._key_to_path(prefix)) diff --git a/src/s3ql/backends/s3.py b/src/s3ql/backends/s3.py index f485d6e..a41f512 100644 --- a/src/s3ql/backends/s3.py +++ b/src/s3ql/backends/s3.py @@ -17,18 +17,18 @@ log = logging.getLogger("backend.s3") # Pylint goes berserk with false positives #pylint: disable=E1002,E1101 -class Bucket(s3c.Bucket): - """A bucket stored in Amazon S3 +class Backend(s3c.Backend): + """A backend to store data in Amazon S3 This class uses standard HTTP connections to connect to S3. - The bucket guarantees get after create consistency, i.e. a newly created + The backend guarantees get after create consistency, i.e. a newly created object will be immediately retrievable. Additional consistency guarantees may or may not be available and can be queried for with instance methods. """ def __init__(self, storage_url, login, password, use_ssl): - super(Bucket, self).__init__(storage_url, login, password, use_ssl) + super(Backend, self).__init__(storage_url, login, password, use_ssl) @staticmethod diff --git a/src/s3ql/backends/s3c.py b/src/s3ql/backends/s3c.py index 53077ae..c5b84d7 100644 --- a/src/s3ql/backends/s3c.py +++ b/src/s3ql/backends/s3c.py @@ -8,9 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import from ..common import BUFSIZE, QuietError -from .common import AbstractBucket, NoSuchObject, retry, AuthorizationError, http_connection, \ +from .common import AbstractBackend, NoSuchObject, retry, AuthorizationError, http_connection, \ AuthenticationError -from .common import NoSuchBucket as NoSuchBucket_common +from .common import DanglingStorageURL as DanglingStorageURL_common from base64 import b64encode from email.utils import parsedate_tz, mktime_tz from urlparse import urlsplit @@ -33,16 +33,16 @@ XML_CONTENT_RE = re.compile('^application/xml(?:;\s+|$)', re.IGNORECASE) log = logging.getLogger("backends.s3c") -class Bucket(AbstractBucket): - """A bucket stored in some S3 compatible storage service. +class Backend(AbstractBackend): + """A backend to stored data in some S3 compatible storage service. This class uses standard HTTP connections to connect to GS. - The bucket guarantees only immediate get after create consistency. + The backend guarantees only immediate get after create consistency. """ def __init__(self, storage_url, login, password, use_ssl): - super(Bucket, self).__init__() + super(Backend, self).__init__() (host, port, bucket_name, prefix) = self._parse_storage_url(storage_url, use_ssl) @@ -93,15 +93,13 @@ class Bucket(AbstractBucket): def is_temp_failure(self, exc): #IGNORE:W0613 '''Return true if exc indicates a temporary error - Return true if the given exception is used by this bucket's backend - to indicate a temporary problem. Most instance methods automatically - retry the request in this case, so the caller does not need to - worry about temporary failures. + Return true if the given exception indicates a temporary problem. Most instance methods + automatically retry the request in this case, so the caller does not need to worry about + temporary failures. - However, in same cases (e.g. when reading or writing an object), the - request cannot automatically be retried. In these case this method can - be used to check for temporary problems and so that the request can - be manually restarted if applicable. + However, in same cases (e.g. when reading or writing an object), the request cannot + automatically be retried. In these case this method can be used to check for temporary + problems and so that the request can be manually restarted if applicable. ''' if isinstance(exc, (InternalError, BadDigest, IncompleteBody, RequestTimeout, @@ -139,9 +137,9 @@ class Bucket(AbstractBucket): raise NoSuchObject(key) def list(self, prefix=''): - '''List keys in bucket + '''List keys in backend - Returns an iterator over all keys in the bucket. This method + Returns an iterator over all keys in the backend. This method handles temporary errors. ''' @@ -165,7 +163,7 @@ class Bucket(AbstractBucket): type(exc).__name__) raise - log.info('Encountered %s exception (%s), retrying call to swift.Bucket.list()', + log.info('Encountered %s exception (%s), retrying call to s3c.Backend.list()', type(exc).__name__, exc) if hasattr(exc, 'retry_after') and exc.retry_after: @@ -180,9 +178,9 @@ class Bucket(AbstractBucket): yield marker def _list(self, prefix='', start=''): - '''List keys in bucket, starting with *start* + '''List keys in backend, starting with *start* - Returns an iterator over all keys in the bucket. This method + Returns an iterator over all keys in the backend. This method does not retry on errors. ''' @@ -272,12 +270,12 @@ class Bucket(AbstractBucket): @retry def open_read(self, key): - ''''Open object for reading + """Open object for reading - Return a tuple of a file-like object. Bucket contents can be read from - the file-like object, metadata is stored in its *metadata* attribute and - can be modified by the caller at will. The object must be closed explicitly. - ''' + Return a file-like object. Data can be read using the `read` method. metadata is stored in + its *metadata* attribute and can be modified by the caller at will. The object must be + closed explicitly. + """ try: resp = self._do_request('GET', '/%s%s' % (self.prefix, key)) @@ -289,16 +287,14 @@ class Bucket(AbstractBucket): def open_write(self, key, metadata=None, is_compressed=False): """Open object for writing - `metadata` can be a dict of additional attributes to store with the - object. Returns a file-like object. The object must be closed - explicitly. After closing, the *get_obj_size* may be used to retrieve - the size of the stored object (which may differ from the size of the + `metadata` can be a dict of additional attributes to store with the object. Returns a file- + like object. The object must be closed explicitly. After closing, the *get_obj_size* may be + used to retrieve the size of the stored object (which may differ from the size of the written data). - - The *is_compressed* parameter indicates that the caller is going - to write compressed data, and may be used to avoid recompression - by the bucket. - + + The *is_compressed* parameter indicates that the caller is going to write compressed data, + and may be used to avoid recompression by the backend. + Since Amazon S3 does not support chunked uploads, the entire data will be buffered in memory before upload. """ @@ -411,7 +407,7 @@ class Bucket(AbstractBucket): def clear(self): - """Delete all objects in bucket + """Delete all objects in backend Note that this method may not be able to see (and therefore also not delete) recently uploaded objects. @@ -508,11 +504,11 @@ class Bucket(AbstractBucket): class ObjectR(object): '''An S3 object open for reading''' - def __init__(self, key, resp, bucket, metadata=None): + def __init__(self, key, resp, backend, metadata=None): self.key = key self.resp = resp self.md5_checked = False - self.bucket = bucket + self.backend = backend self.metadata = metadata # False positive, hashlib *does* have md5 member @@ -561,9 +557,9 @@ class ObjectW(object): the close() method is called. ''' - def __init__(self, key, bucket, headers): + def __init__(self, key, backend, headers): self.key = key - self.bucket = bucket + self.backend = backend self.headers = headers self.closed = False self.obj_size = 0 @@ -581,7 +577,7 @@ class ObjectW(object): self.obj_size += len(buf) def is_temp_failure(self, exc): - return self.bucket.is_temp_failure(exc) + return self.backend.is_temp_failure(exc) @retry def close(self): @@ -596,7 +592,7 @@ class ObjectW(object): self.headers['Content-Length'] = self.obj_size self.fh.seek(0) - resp = self.bucket._do_request('PUT', '/%s%s' % (self.bucket.prefix, self.key), + resp = self.backend._do_request('PUT', '/%s%s' % (self.backend.prefix, self.key), headers=self.headers, body=self.fh) etag = resp.getheader('ETag').strip('"') assert resp.length == 0 @@ -605,7 +601,7 @@ class ObjectW(object): log.warn('ObjectW(%s).close(): MD5 mismatch (%s vs %s)', self.key, etag, self.md5.hexdigest) try: - self.bucket.delete(self.key) + self.backend.delete(self.key) except: log.exception('Objectw(%s).close(): unable to delete corrupted object!', self.key) @@ -708,4 +704,4 @@ class OperationAborted(S3Error): pass class RequestTimeout(S3Error): pass class SlowDown(S3Error): pass class RequestTimeTooSkewed(S3Error): pass -class NoSuchBucket(S3Error, NoSuchBucket_common): pass +class DanglingStorageURL(S3Error, DanglingStorageURL_common): pass diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py index c065734..ae4b9ab 100644 --- a/src/s3ql/backends/swift.py +++ b/src/s3ql/backends/swift.py @@ -8,7 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import from ..common import QuietError, BUFSIZE -from .common import AbstractBucket, NoSuchObject, retry, AuthorizationError, http_connection +from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError, http_connection, + DanglingStorageURL) from .s3c import HTTPError, BadDigest from urlparse import urlsplit import json @@ -20,14 +21,13 @@ import re import tempfile import time import urllib -from s3ql.backends.common import NoSuchBucket log = logging.getLogger("backend.swift") -class Bucket(AbstractBucket): - """A bucket stored in OpenStack Swift +class Backend(AbstractBackend): + """A backend to store data in OpenStack Swift - The bucket guarantees get after create consistency, i.e. a newly created + The backend guarantees get after create consistency, i.e. a newly created object will be immediately retrievable. """ @@ -35,30 +35,31 @@ class Bucket(AbstractBucket): # Unused argument #pylint: disable=W0613 - super(Bucket, self).__init__() + super(Backend, self).__init__() - (host, port, bucket_name, prefix) = self._parse_storage_url(storage_url) + (host, port, container_name, prefix) = self._parse_storage_url(storage_url) self.hostname = host self.port = port - self.bucket_name = bucket_name + self.container_name = container_name self.prefix = prefix self.password = password self.login = login self.auth_token = None self.auth_prefix = None - self.conn = self._get_conn() + self.conn = None - self._bucket_exists() + self._container_exists() - def _bucket_exists(self): - '''Make sure that the bucket exists''' + @retry + def _container_exists(self): + '''Make sure that the container exists''' try: resp = self._do_request('GET', '/', query_string={'limit': 1 }) except HTTPError as exc: if exc.status == 404: - raise NoSuchBucket(self.bucket_name) + raise DanglingStorageURL(self.container_name) raise resp.read() @@ -66,7 +67,7 @@ class Bucket(AbstractBucket): def _parse_storage_url(storage_url): '''Extract information from storage URL - Return a tuple *(host, port, bucket_name, prefix)* . + Return a tuple *(host, port, container_name, prefix)* . ''' hit = re.match(r'^[a-zA-Z0-9]+://' # Backend @@ -80,23 +81,21 @@ class Bucket(AbstractBucket): hostname = hit.group(1) port = int(hit.group(2) or '443') - bucketname = hit.group(3) + containername = hit.group(3) prefix = hit.group(4) or '' - return (hostname, port, bucketname, prefix) + return (hostname, port, containername, prefix) def is_temp_failure(self, exc): #IGNORE:W0613 '''Return true if exc indicates a temporary error - Return true if the given exception is used by this bucket's backend - to indicate a temporary problem. Most instance methods automatically - retry the request in this case, so the caller does not need to - worry about temporary failures. + Return true if the given exception indicates a temporary problem. Most instance methods + automatically retry the request in this case, so the caller does not need to worry about + temporary failures. - However, in same cases (e.g. when reading or writing an object), the - request cannot automatically be retried. In these case this method can - be used to check for temporary problems and so that the request can - be manually restarted if applicable. + However, in same cases (e.g. when reading or writing an object), the request cannot + automatically be retried. In these case this method can be used to check for temporary + problems and so that the request can be manually restarted if applicable. ''' if isinstance(exc, (httplib.IncompleteRead,)): @@ -111,10 +110,15 @@ class Bucket(AbstractBucket): exc.errno in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT, errno.EINTR)): return True + + elif isinstance(exc, HTTPError) and exc.status >= 500 and exc.status <= 599: + return True + elif isinstance(exc, AuthenticationExpired): + return True + return False - @retry def _get_conn(self): '''Obtain connection to server and authentication token''' @@ -134,14 +138,11 @@ class Bucket(AbstractBucket): resp.read() continue - if resp.status == 401: + elif resp.status == 401: raise AuthorizationError(resp.read()) elif resp.status > 299 or resp.status < 200: - log.error('_refresh_auth(): unexpected response: %d %s\n%s', - resp.status, resp.msg, resp.read()) - raise RuntimeError('Unexpected response: %d %s' % (resp.status, - resp.msg)) + raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read()) # Pylint can't infer SplitResult Types #pylint: disable=E1103 @@ -167,12 +168,15 @@ class Bucket(AbstractBucket): if headers is None: headers = dict() - if not body: headers['content-length'] = '0' - + + if self.conn is None: + log.info('_do_request(): no active connection, calling _get_conn()') + self.conn = self._get_conn() + # Construct full path - path = urllib.quote('%s/%s%s' % (self.auth_prefix, self.bucket_name, path)) + path = urllib.quote('%s/%s%s' % (self.auth_prefix, self.container_name, path)) if query_string: s = urllib.urlencode(query_string, doseq=True) if subres: @@ -183,43 +187,40 @@ class Bucket(AbstractBucket): path += '?%s' % subres headers['connection'] = 'keep-alive' - - while True: - headers['X-Auth-Token'] = self.auth_token + headers['X-Auth-Token'] = self.auth_token - try: - log.debug('_do_request(): %s %s', method, path) - self.conn.request(method, path, body, headers) + try: + log.debug('_do_request(): %s %s', method, path) + self.conn.request(method, path, body, headers) + + log.debug('_do_request(): Reading response..') + resp = self.conn.getresponse() + except: + # We probably can't use the connection anymore + self.conn.close() + raise - log.debug('_do_request(): Reading response..') - resp = self.conn.getresponse() - except: - # We probably can't use the connection anymore - self.conn.close() - raise + # We need to call read() at least once for httplib to consider this + # request finished, even if there is no response body. + if resp.length == 0: + resp.read() + + # Success + if resp.status >= 200 and resp.status <= 299: + return resp + + # Expired auth token + if resp.status == 401: + log.info('OpenStack auth token seems to have expired, requesting new one.') + self.conn = None + raise AuthenticationExpired(resp.reason) - # We need to call read() at least once for httplib to consider this - # request finished, even if there is no response body. - if resp.length == 0: - resp.read() - - # Success - if resp.status >= 200 and resp.status <= 299: - return resp - - # Expired auth token - if resp.status == 401: - log.info('OpenStack auth token seems to have expired, requesting new one.') - resp.read() - self.conn = self._get_conn() - continue - - # If method == HEAD, server must not return response body - # even in case of errors - if method.upper() == 'HEAD': - raise HTTPError(resp.status, resp.reason) - else: - raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read()) + # If method == HEAD, server must not return response body + # even in case of errors + if method.upper() == 'HEAD': + raise HTTPError(resp.status, resp.reason) + else: + raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read()) @retry def lookup(self, key): @@ -264,7 +265,7 @@ class Bucket(AbstractBucket): def open_read(self, key): ''''Open object for reading - Return a tuple of a file-like object. Bucket contents can be read from + Return a tuple of a file-like object. Backend contents can be read from the file-like object, metadata is stored in its *metadata* attribute and can be modified by the caller at will. The object must be closed explicitly. ''' @@ -281,15 +282,13 @@ class Bucket(AbstractBucket): def open_write(self, key, metadata=None, is_compressed=False): """Open object for writing - `metadata` can be a dict of additional attributes to store with the - object. Returns a file-like object. The object must be closed - explicitly. After closing, the *get_obj_size* may be used to retrieve - the size of the stored object (which may differ from the size of the + `metadata` can be a dict of additional attributes to store with the object. Returns a file- + like object. The object must be closed explicitly. After closing, the *get_obj_size* may be + used to retrieve the size of the stored object (which may differ from the size of the written data). - The *is_compressed* parameter indicates that the caller is going - to write compressed data, and may be used to avoid recompression - by the bucket. + The *is_compressed* parameter indicates that the caller is going to write compressed data, + and may be used to avoid recompression by the backend. """ log.debug('open_write(%s): start', key) @@ -302,7 +301,7 @@ class Bucket(AbstractBucket): return ObjectW(key, self, headers) def clear(self): - """Delete all objects in bucket""" + """Delete all objects in backend""" # We have to cache keys, because otherwise we can't use the # http connection to delete keys. @@ -312,14 +311,14 @@ class Bucket(AbstractBucket): log.debug('clear(): deleting key %s', s3key) - # Ignore missing objects when clearing bucket + # Ignore missing objects when clearing backend self.delete(s3key, True) @retry def delete(self, key, force=False): """Delete object stored under `key` - ``bucket.delete(key)`` can also be written as ``del bucket[key]``. + ``backend.delete(key)`` can also be written as ``del backend[key]``. If `force` is true, do not return an error if the key does not exist. """ @@ -345,7 +344,7 @@ class Bucket(AbstractBucket): try: resp = self._do_request('PUT', '/%s%s' % (self.prefix, dest), - headers={ 'X-Copy-From': '/%s/%s%s' % (self.bucket_name, + headers={ 'X-Copy-From': '/%s/%s%s' % (self.container_name, self.prefix, src)}) # Discard response body resp.read() @@ -355,9 +354,9 @@ class Bucket(AbstractBucket): raise def list(self, prefix=''): - '''List keys in bucket + '''List keys in backend - Returns an iterator over all keys in the bucket. This method + Returns an iterator over all keys in the backend. This method handles temporary errors. ''' @@ -381,7 +380,7 @@ class Bucket(AbstractBucket): type(exc).__name__) raise - log.info('Encountered %s exception (%s), retrying call to swift.Bucket.list()', + log.info('Encountered %s exception (%s), retrying call to swift.Backend.list()', type(exc).__name__, exc) if hasattr(exc, 'retry_after') and exc.retry_after: @@ -396,9 +395,9 @@ class Bucket(AbstractBucket): yield marker def _list(self, prefix='', start='', batch_size=5000): - '''List keys in bucket, starting with *start* + '''List keys in backend, starting with *start* - Returns an iterator over all keys in the bucket. This method + Returns an iterator over all keys in the backend. This method does not retry on errors. ''' @@ -416,7 +415,7 @@ class Bucket(AbstractBucket): 'limit': batch_size }) except HTTPError as exc: if exc.status == 404: - raise NoSuchBucket(self.bucket_name) + raise DanglingStorageURL(self.container_name) raise if resp.status == 204: @@ -450,9 +449,9 @@ class ObjectW(object): the close() method is called. ''' - def __init__(self, key, bucket, headers): + def __init__(self, key, backend, headers): self.key = key - self.bucket = bucket + self.backend = backend self.headers = headers self.closed = False self.obj_size = 0 @@ -470,7 +469,7 @@ class ObjectW(object): self.obj_size += len(buf) def is_temp_failure(self, exc): - return self.bucket.is_temp_failure(exc) + return self.backend.is_temp_failure(exc) @retry def close(self): @@ -486,7 +485,7 @@ class ObjectW(object): self.headers['Content-Type'] = 'application/octet-stream' self.fh.seek(0) - resp = self.bucket._do_request('PUT', '/%s%s' % (self.bucket.prefix, self.key), + resp = self.backend._do_request('PUT', '/%s%s' % (self.backend.prefix, self.key), headers=self.headers, body=self.fh) etag = resp.getheader('ETag').strip('"') resp.read() @@ -495,7 +494,7 @@ class ObjectW(object): log.warn('ObjectW(%s).close(): MD5 mismatch (%s vs %s)', self.key, etag, self.md5.hexdigest) try: - self.bucket.delete(self.key) + self.backend.delete(self.key) except: log.exception('Objectw(%s).close(): unable to delete corrupted object!', self.key) @@ -516,11 +515,11 @@ class ObjectW(object): class ObjectR(object): '''A SWIFT object opened for reading''' - def __init__(self, key, resp, bucket, metadata=None): + def __init__(self, key, resp, backend, metadata=None): self.key = key self.resp = resp self.md5_checked = False - self.bucket = bucket + self.backend = backend self.metadata = metadata # False positive, hashlib *does* have md5 member @@ -573,4 +572,17 @@ def extractmeta(resp): continue meta[hit.group(1)] = val - return meta
\ No newline at end of file + return meta + + +class AuthenticationExpired(Exception): + '''Raised if the provided Authentication Token has expired''' + + def __init__(self, msg): + super(AuthenticationExpired, self).__init__() + self.msg = msg + + def __str__(self): + return 'Auth token expired. Server said: %s' % self.msg + +
\ No newline at end of file diff --git a/src/s3ql/block_cache.py b/src/s3ql/block_cache.py index 19c6b8d..2e775de 100644 --- a/src/s3ql/block_cache.py +++ b/src/s3ql/block_cache.py @@ -210,12 +210,12 @@ class BlockCache(object): uploaded) """ - def __init__(self, bucket_pool, db, cachedir, max_size, max_entries=768): + def __init__(self, backend_pool, db, cachedir, max_size, max_entries=768): log.debug('Initializing') self.path = cachedir self.db = db - self.bucket_pool = bucket_pool + self.backend_pool = backend_pool self.entries = OrderedDict() self.max_entries = max_entries self.size = 0 @@ -301,8 +301,8 @@ class BlockCache(object): if log.isEnabledFor(logging.DEBUG): time_ = time.time() - with self.bucket_pool() as bucket: - obj_size = bucket.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size() + with self.backend_pool() as backend: + obj_size = backend.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size() if log.isEnabledFor(logging.DEBUG): time_ = time.time() - time_ @@ -468,8 +468,8 @@ class BlockCache(object): def _do_removal(self, obj_id): '''Remove object''' - with self.bucket_pool() as bucket: - bucket.delete('s3ql_data_%d' % obj_id) + with self.backend_pool() as backend: + backend.delete('s3ql_data_%d' % obj_id) @contextmanager def get(self, inode, blockno): @@ -534,8 +534,8 @@ class BlockCache(object): return el try: with lock_released: - with self.bucket_pool() as bucket: - el = bucket.perform_read(do_read, 's3ql_data_%d' % obj_id) + with self.backend_pool() as backend: + el = backend.perform_read(do_read, 's3ql_data_%d' % obj_id) # Note: We need to do this *before* releasing the global # lock to notify other threads diff --git a/src/s3ql/common.py b/src/s3ql/common.py index 1aa8162..61f11d5 100644 --- a/src/s3ql/common.py +++ b/src/s3ql/common.py @@ -92,11 +92,11 @@ def add_stdout_logging(quiet=False): root_logger.addHandler(handler) return handler -def get_seq_no(bucket): +def get_seq_no(backend): '''Get current metadata sequence number''' from .backends.common import NoSuchObject - seq_nos = list(bucket.list('s3ql_seq_no_')) + seq_nos = list(backend.list('s3ql_seq_no_')) if not seq_nos: # Maybe list result is outdated seq_nos = [ 's3ql_seq_no_1' ] @@ -109,18 +109,18 @@ def get_seq_no(bucket): seq_no = max(seq_nos) # Make sure that object really exists - while ('s3ql_seq_no_%d' % seq_no) not in bucket: + while ('s3ql_seq_no_%d' % seq_no) not in backend: seq_no -= 1 if seq_no == 0: - raise QuietError('No S3QL file system found in bucket.') - while ('s3ql_seq_no_%d' % seq_no) in bucket: + raise QuietError('No S3QL file system found at given storage URL.') + while ('s3ql_seq_no_%d' % seq_no) in backend: seq_no += 1 seq_no -= 1 # Delete old seq nos for i in [ x for x in seq_nos if x < seq_no - 10 ]: try: - del bucket['s3ql_seq_no_%d' % i] + del backend['s3ql_seq_no_%d' % i] except NoSuchObject: pass # Key list may not be up to date @@ -338,7 +338,7 @@ def _escape(s): return s -def get_bucket_cachedir(storage_url, cachedir): +def get_backend_cachedir(storage_url, cachedir): if not os.path.exists(cachedir): os.mkdir(cachedir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) return os.path.join(cachedir, _escape(storage_url)) diff --git a/src/s3ql/fsck.py b/src/s3ql/fsck.py index 0f890e2..b5ba9fe 100644 --- a/src/s3ql/fsck.py +++ b/src/s3ql/fsck.py @@ -8,8 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import from . import CURRENT_FS_REV -from .backends.common import NoSuchObject, get_bucket, NoSuchBucket -from .common import (ROOT_INODE, inode_for_path, sha256_fh, get_path, BUFSIZE, get_bucket_cachedir, +from .backends.common import NoSuchObject, get_backend, DanglingStorageURL +from .common import (ROOT_INODE, inode_for_path, sha256_fh, get_path, BUFSIZE, get_backend_cachedir, setup_logging, QuietError, get_seq_no, stream_write_bz2, stream_read_bz2, CTRL_INODE) from .database import NoSuchRowError, Connection from .metadata import restore_metadata, cycle_metadata, dump_metadata, create_tables @@ -35,10 +35,10 @@ S_IFMT = (stat.S_IFDIR | stat.S_IFREG | stat.S_IFSOCK | stat.S_IFBLK | class Fsck(object): - def __init__(self, cachedir_, bucket_, param, conn): + def __init__(self, cachedir_, backend_, param, conn): self.cachedir = cachedir_ - self.bucket = bucket_ + self.backend = backend_ self.expect_errors = False self.found_errors = False self.uncorrectable_errors = False @@ -182,7 +182,7 @@ class Fsck(object): shutil.copyfileobj(fh, obj_fh, BUFSIZE) return obj_fh - obj_size = self.bucket.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size() + obj_size = self.backend.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size() self.conn.execute('UPDATE objects SET size=? WHERE id=?', (obj_size, obj_id)) @@ -799,7 +799,7 @@ class Fsck(object): # Delete objects which (correctly had) refcount=0 for obj_id in self.conn.query('SELECT id FROM objects WHERE refcount = 0'): - del self.bucket['s3ql_data_%d' % obj_id] + del self.backend['s3ql_data_%d' % obj_id] self.conn.execute("DELETE FROM objects WHERE refcount = 0") @@ -814,7 +814,7 @@ class Fsck(object): # We use this table to keep track of the objects that we have seen self.conn.execute("CREATE TEMP TABLE obj_ids (id INTEGER PRIMARY KEY)") try: - for (i, obj_name) in enumerate(self.bucket.list('s3ql_data_')): + for (i, obj_name) in enumerate(self.backend.list('s3ql_data_')): if i != 0 and i % 5000 == 0: log.info('..processed %d objects so far..', i) @@ -832,10 +832,10 @@ class Fsck(object): 'EXCEPT SELECT id FROM objects'): try: if obj_id in self.unlinked_objects: - del self.bucket['s3ql_data_%d' % obj_id] + del self.backend['s3ql_data_%d' % obj_id] else: # TODO: Save the data in lost+found instead - del self.bucket['s3ql_data_%d' % obj_id] + del self.backend['s3ql_data_%d' % obj_id] self.found_errors = True self.log_error("Deleted spurious object %d", obj_id) except NoSuchObject: @@ -844,12 +844,12 @@ class Fsck(object): self.conn.execute('CREATE TEMPORARY TABLE missing AS ' 'SELECT id FROM objects EXCEPT SELECT id FROM obj_ids') for (obj_id,) in self.conn.query('SELECT * FROM missing'): - if ('s3ql_data_%d' % obj_id) in self.bucket: + if ('s3ql_data_%d' % obj_id) in self.backend: # Object was just not in list yet continue self.found_errors = True - self.log_error("object %s only exists in table but not in bucket, deleting", obj_id) + self.log_error("object %s only exists in table but not in backend, deleting", obj_id) for (id_,) in self.conn.query('SELECT inode FROM inode_blocks JOIN blocks ON block_id = id ' 'WHERE obj_id=? ', (obj_id,)): @@ -893,7 +893,7 @@ class Fsck(object): self.log_error("Object %d has no size information, retrieving from backend...", obj_id) self.conn.execute('UPDATE objects SET size=? WHERE id=?', - (self.bucket.get_size('s3ql_data_%d' % obj_id), obj_id)) + (self.backend.get_size('s3ql_data_%d' % obj_id), obj_id)) def resolve_free(self, path, name): @@ -1059,13 +1059,13 @@ def main(args=None): raise QuietError('Can not check mounted file system.') try: - bucket = get_bucket(options) - except NoSuchBucket as exc: + backend = get_backend(options) + except DanglingStorageURL as exc: raise QuietError(str(exc)) - cachepath = get_bucket_cachedir(options.storage_url, options.cachedir) - seq_no = get_seq_no(bucket) - param_remote = bucket.lookup('s3ql_metadata') + cachepath = get_backend_cachedir(options.storage_url, options.cachedir) + seq_no = get_seq_no(backend) + param_remote = backend.lookup('s3ql_metadata') db = None if os.path.exists(cachepath + '.params'): @@ -1073,7 +1073,7 @@ def main(args=None): param = pickle.load(open(cachepath + '.params', 'rb')) if param['seq_no'] < seq_no: log.info('Ignoring locally cached metadata (outdated).') - param = bucket.lookup('s3ql_metadata') + param = backend.lookup('s3ql_metadata') else: log.info('Using cached metadata.') db = Connection(cachepath + '.db') @@ -1145,7 +1145,7 @@ def main(args=None): stream_read_bz2(fh, tmpfh) return tmpfh log.info('Downloading and decompressing metadata...') - tmpfh = bucket.perform_read(do_read, "s3ql_metadata") + tmpfh = backend.perform_read(do_read, "s3ql_metadata") os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC, stat.S_IRUSR | stat.S_IWUSR)) db = Connection(cachepath + '.db.tmp', fast_mode=True) @@ -1159,10 +1159,10 @@ def main(args=None): # Increase metadata sequence no param['seq_no'] += 1 param['needs_fsck'] = True - bucket['s3ql_seq_no_%d' % param['seq_no']] = 'Empty' + backend['s3ql_seq_no_%d' % param['seq_no']] = 'Empty' pickle.dump(param, open(cachepath + '.params', 'wb'), 2) - fsck = Fsck(cachepath + '-cache', bucket, param, db) + fsck = Fsck(cachepath + '-cache', backend, param, db) fsck.check() param['max_inode'] = db.get_val('SELECT MAX(id) FROM inodes') @@ -1181,7 +1181,7 @@ def main(args=None): log.warn('File system was marked as clean, yet fsck found problems.') log.warn('Please report this to the S3QL mailing list, http://groups.google.com/group/s3ql') - cycle_metadata(bucket) + cycle_metadata(backend) param['needs_fsck'] = False param['last_fsck'] = time.time() param['last-modified'] = time.time() @@ -1195,7 +1195,7 @@ def main(args=None): return obj_fh log.info("Compressing and uploading metadata...") - obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=param, + obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param, is_compressed=True) log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2) pickle.dump(param, open(cachepath + '.params', 'wb'), 2) diff --git a/src/s3ql/inode_cache.py b/src/s3ql/inode_cache.py index e5491e3..a56ec65 100644 --- a/src/s3ql/inode_cache.py +++ b/src/s3ql/inode_cache.py @@ -8,7 +8,6 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import -import time import logging from .database import NoSuchRowError diff --git a/src/s3ql/metadata.py b/src/s3ql/metadata.py index 2007318..cc0cd49 100644 --- a/src/s3ql/metadata.py +++ b/src/s3ql/metadata.py @@ -74,17 +74,17 @@ def restore_metadata(fh, db): load_table(table, columns, db=db, fh=fh) db.execute('ANALYZE') -def cycle_metadata(bucket): +def cycle_metadata(backend): from .backends.common import NoSuchObject log.info('Backing up old metadata...') for i in reversed(range(10)): try: - bucket.copy("s3ql_metadata_bak_%d" % i, "s3ql_metadata_bak_%d" % (i + 1)) + backend.copy("s3ql_metadata_bak_%d" % i, "s3ql_metadata_bak_%d" % (i + 1)) except NoSuchObject: pass - bucket.copy("s3ql_metadata", "s3ql_metadata_bak_0") + backend.copy("s3ql_metadata", "s3ql_metadata_bak_0") def dump_metadata(db, fh): '''Dump metadata into fh diff --git a/src/s3ql/mkfs.py b/src/s3ql/mkfs.py index 15f329f..9f03619 100644 --- a/src/s3ql/mkfs.py +++ b/src/s3ql/mkfs.py @@ -8,8 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import from . import CURRENT_FS_REV -from .backends.common import get_bucket, BetterBucket, NoSuchBucket -from .common import get_bucket_cachedir, setup_logging, QuietError, CTRL_INODE, stream_write_bz2 +from .backends.common import get_backend, BetterBackend, DanglingStorageURL +from .common import get_backend_cachedir, setup_logging, QuietError, CTRL_INODE, stream_write_bz2 from .database import Connection from .metadata import dump_metadata, create_tables from .parse_args import ArgumentParser @@ -66,7 +66,7 @@ def init_tables(conn): # Insert control inode, the actual values don't matter that much conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount) " "VALUES (?,?,?,?,?,?,?,?)", - (CTRL_INODE, stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR, + (CTRL_INODE, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR, 0, 0, timestamp, timestamp, timestamp, 42)) # Insert lost+found directory @@ -92,19 +92,19 @@ def main(args=None): 'performance.') try: - plain_bucket = get_bucket(options, plain=True) - except NoSuchBucket as exc: + plain_backend = get_backend(options, plain=True) + except DanglingStorageURL as exc: raise QuietError(str(exc)) log.info("Before using S3QL, make sure to read the user's guide, especially\n" "the 'Important Rules to Avoid Loosing Data' section.") - if 's3ql_metadata' in plain_bucket: + if 's3ql_metadata' in plain_backend: if not options.force: raise QuietError("Found existing file system! Use --force to overwrite") log.info('Purging existing file system data..') - plain_bucket.clear() + plain_backend.clear() log.info('Please note that the new file system may appear inconsistent\n' 'for a while until the removals have propagated through the backend.') @@ -122,17 +122,17 @@ def main(args=None): data_pw = fh.read(32) fh.close() - bucket = BetterBucket(wrap_pw, 'bzip2', plain_bucket) - bucket['s3ql_passphrase'] = data_pw + backend = BetterBackend(wrap_pw, 'bzip2', plain_backend) + backend['s3ql_passphrase'] = data_pw else: data_pw = None - bucket = BetterBucket(data_pw, 'bzip2', plain_bucket) + backend = BetterBackend(data_pw, 'bzip2', plain_backend) # Setup database - cachepath = get_bucket_cachedir(options.storage_url, options.cachedir) + cachepath = get_backend_cachedir(options.storage_url, options.cachedir) - # There can't be a corresponding bucket, so we can safely delete + # There can't be a corresponding backend, so we can safely delete # these files. if os.path.exists(cachepath + '.db'): os.unlink(cachepath + '.db') @@ -156,8 +156,8 @@ def main(args=None): param['last-modified'] = time.time() # This indicates that the convert_legacy_metadata() stuff - # in BetterBucket is not required for this file system. - param['bucket_revision'] = 1 + # in BetterBackend is not required for this file system. + param['backend_revision'] = 1 log.info('Dumping metadata...') fh = tempfile.TemporaryFile() @@ -168,8 +168,8 @@ def main(args=None): return obj_fh log.info("Compressing and uploading metadata...") - bucket.store('s3ql_seq_no_%d' % param['seq_no'], 'Empty') - obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=param, + backend.store('s3ql_seq_no_%d' % param['seq_no'], 'Empty') + obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param, is_compressed=True) log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2) pickle.dump(param, open(cachepath + '.params', 'wb'), 2) diff --git a/src/s3ql/mount.py b/src/s3ql/mount.py index 4744487..a6fdcaa 100644 --- a/src/s3ql/mount.py +++ b/src/s3ql/mount.py @@ -8,9 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3. from __future__ import division, print_function, absolute_import from . import fs, CURRENT_FS_REV -from .backends.common import get_bucket_factory, BucketPool, NoSuchBucket +from .backends.common import get_backend_factory, BackendPool, DanglingStorageURL from .block_cache import BlockCache -from .common import (setup_logging, get_bucket_cachedir, get_seq_no, QuietError, stream_write_bz2, +from .common import (setup_logging, get_backend_cachedir, get_seq_no, QuietError, stream_write_bz2, stream_read_bz2) from .daemonize import daemonize from .database import Connection @@ -29,7 +29,6 @@ import tempfile import thread import threading import time - log = logging.getLogger("mount") @@ -81,17 +80,17 @@ def main(args=None): import pstats prof = cProfile.Profile() - bucket_factory = get_bucket_factory(options) - bucket_pool = BucketPool(bucket_factory) + backend_factory = get_backend_factory(options) + backend_pool = BackendPool(backend_factory) # Get paths - cachepath = get_bucket_cachedir(options.storage_url, options.cachedir) + cachepath = get_backend_cachedir(options.storage_url, options.cachedir) # Retrieve metadata try: - with bucket_pool() as bucket: - (param, db) = get_metadata(bucket, cachepath) - except NoSuchBucket as exc: + with backend_pool() as backend: + (param, db) = get_metadata(backend, cachepath) + except DanglingStorageURL as exc: raise QuietError(str(exc)) if param['max_obj_size'] < options.min_obj_size: @@ -105,15 +104,15 @@ def main(args=None): else: db.execute('DROP INDEX IF EXISTS ix_contents_inode') - metadata_upload_thread = MetadataUploadThread(bucket_pool, param, db, + metadata_upload_thread = MetadataUploadThread(backend_pool, param, db, options.metadata_upload_interval) - block_cache = BlockCache(bucket_pool, db, cachepath + '-cache', + block_cache = BlockCache(backend_pool, db, cachepath + '-cache', options.cachesize * 1024, options.max_cache_entries) commit_thread = CommitThread(block_cache) operations = fs.Operations(block_cache, db, max_obj_size=param['max_obj_size'], inode_cache=InodeCache(db, param['inode_gen']), upload_event=metadata_upload_thread.event) - + log.info('Mounting filesystem...') llfuse.init(operations, options.mountpoint, get_fuse_opts(options)) @@ -206,15 +205,15 @@ def main(args=None): # Do not update .params yet, dump_metadata() may fail if the database is # corrupted, in which case we want to force an fsck. param['max_inode'] = db.get_val('SELECT MAX(id) FROM inodes') - with bucket_pool() as bucket: - seq_no = get_seq_no(bucket) + with backend_pool() as backend: + seq_no = get_seq_no(backend) if metadata_upload_thread.db_mtime == os.stat(cachepath + '.db').st_mtime: log.info('File system unchanged, not uploading metadata.') - del bucket['s3ql_seq_no_%d' % param['seq_no']] + del backend['s3ql_seq_no_%d' % param['seq_no']] param['seq_no'] -= 1 pickle.dump(param, open(cachepath + '.params', 'wb'), 2) elif seq_no == param['seq_no']: - cycle_metadata(bucket) + cycle_metadata(backend) param['last-modified'] = time.time() log.info('Dumping metadata...') @@ -226,7 +225,7 @@ def main(args=None): return obj_fh log.info("Compressing and uploading metadata...") - obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=param, + obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param, is_compressed=True) log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2) pickle.dump(param, open(cachepath + '.params', 'wb'), 2) @@ -290,10 +289,10 @@ def determine_threads(options): log.info("Using %d upload threads.", threads) return threads -def get_metadata(bucket, cachepath): +def get_metadata(backend, cachepath): '''Retrieve metadata''' - seq_no = get_seq_no(bucket) + seq_no = get_seq_no(backend) # Check for cached metadata db = None @@ -301,12 +300,12 @@ def get_metadata(bucket, cachepath): param = pickle.load(open(cachepath + '.params', 'rb')) if param['seq_no'] < seq_no: log.info('Ignoring locally cached metadata (outdated).') - param = bucket.lookup('s3ql_metadata') + param = backend.lookup('s3ql_metadata') else: log.info('Using cached metadata.') db = Connection(cachepath + '.db') else: - param = bucket.lookup('s3ql_metadata') + param = backend.lookup('s3ql_metadata') # Check for unclean shutdown if param['seq_no'] < seq_no: @@ -338,7 +337,7 @@ def get_metadata(bucket, cachepath): stream_read_bz2(fh, tmpfh) return tmpfh log.info('Downloading and decompressing metadata...') - tmpfh = bucket.perform_read(do_read, "s3ql_metadata") + tmpfh = backend.perform_read(do_read, "s3ql_metadata") os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC, stat.S_IRUSR | stat.S_IWUSR)) db = Connection(cachepath + '.db.tmp', fast_mode=True) @@ -352,7 +351,7 @@ def get_metadata(bucket, cachepath): # Increase metadata sequence no param['seq_no'] += 1 param['needs_fsck'] = True - bucket['s3ql_seq_no_%d' % param['seq_no']] = 'Empty' + backend['s3ql_seq_no_%d' % param['seq_no']] = 'Empty' pickle.dump(param, open(cachepath + '.params', 'wb'), 2) param['needs_fsck'] = False @@ -497,9 +496,9 @@ class MetadataUploadThread(Thread): passed in the constructor, the global lock is acquired first. ''' - def __init__(self, bucket_pool, param, db, interval): + def __init__(self, backend_pool, param, db, interval): super(MetadataUploadThread, self).__init__() - self.bucket_pool = bucket_pool + self.backend_pool = backend_pool self.param = param self.db = db self.interval = interval @@ -532,15 +531,15 @@ class MetadataUploadThread(Thread): dump_metadata(self.db, fh) - with self.bucket_pool() as bucket: - seq_no = get_seq_no(bucket) + with self.backend_pool() as backend: + seq_no = get_seq_no(backend) if seq_no != self.param['seq_no']: log.error('Remote metadata is newer than local (%d vs %d), ' 'refusing to overwrite!', seq_no, self.param['seq_no']) fh.close() continue - cycle_metadata(bucket) + cycle_metadata(backend) fh.seek(0) self.param['last-modified'] = time.time() @@ -551,7 +550,7 @@ class MetadataUploadThread(Thread): stream_write_bz2(fh, obj_fh) return obj_fh log.info("Compressing and uploading metadata...") - obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=self.param, + obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=self.param, is_compressed=True) log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2) self.param['seq_no'] += 1 diff --git a/src/s3ql/umount.py b/src/s3ql/umount.py index cf67d0a..b2fdab8 100644 --- a/src/s3ql/umount.py +++ b/src/s3ql/umount.py @@ -7,7 +7,7 @@ This program can be distributed under the terms of the GNU GPLv3. ''' from __future__ import division, print_function, absolute_import -from .common import CTRL_NAME, QuietError, setup_logging +from .common import CTRL_NAME, setup_logging from .parse_args import ArgumentParser import llfuse import logging |