summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNikolaus Rath <Nikolaus@rath.org>2016-03-09 10:08:43 -0800
committerNikolaus Rath <Nikolaus@rath.org>2016-03-09 10:08:43 -0800
commitdaf84979549ef394924dd5dacae76bd3fc79ddb2 (patch)
treed62bf4e62e2ad2ffa3f49c147d1341eb64b5896c /src
parent60e9b1ff268a057f3cce12586e991095e7b0dda1 (diff)
Import s3ql_1.11.orig.tar.bz2
Diffstat (limited to 'src')
-rw-r--r--src/s3ql.egg-info/PKG-INFO4
-rw-r--r--src/s3ql.egg-info/SOURCES.txt1
-rw-r--r--src/s3ql/__init__.py4
-rw-r--r--src/s3ql/adm.py235
-rw-r--r--src/s3ql/backends/common.py246
-rw-r--r--src/s3ql/backends/gs.py10
-rw-r--r--src/s3ql/backends/local.py40
-rw-r--r--src/s3ql/backends/s3.py8
-rw-r--r--src/s3ql/backends/s3c.py80
-rw-r--r--src/s3ql/backends/swift.py198
-rw-r--r--src/s3ql/block_cache.py16
-rw-r--r--src/s3ql/common.py14
-rw-r--r--src/s3ql/fsck.py46
-rw-r--r--src/s3ql/inode_cache.py1
-rw-r--r--src/s3ql/metadata.py6
-rw-r--r--src/s3ql/mkfs.py32
-rw-r--r--src/s3ql/mount.py57
-rw-r--r--src/s3ql/umount.py2
18 files changed, 443 insertions, 557 deletions
diff --git a/src/s3ql.egg-info/PKG-INFO b/src/s3ql.egg-info/PKG-INFO
index ee0e64c..e45ca54 100644
--- a/src/s3ql.egg-info/PKG-INFO
+++ b/src/s3ql.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: s3ql
-Version: 1.10
+Version: 1.11
Summary: a full-featured file system for online data storage
Home-page: http://code.google.com/p/s3ql/
Author: Nikolaus Rath
@@ -19,7 +19,7 @@ Description: .. -*- mode: rst -*-
<http://aws.amazon.com/s3 Amazon S3>`_ or `OpenStack
<http://openstack.org/projects/storage/>`_. S3QL effectively provides
a hard disk of dynamic, infinite capacity that can be accessed from
- any computer with internet access.
+ any computer with internet access running Linux, FreeBSD or OS-X.
S3QL is a standard conforming, full featured UNIX file system that is
conceptually indistinguishable from any local file system.
diff --git a/src/s3ql.egg-info/SOURCES.txt b/src/s3ql.egg-info/SOURCES.txt
index b8e165e..b7d4375 100644
--- a/src/s3ql.egg-info/SOURCES.txt
+++ b/src/s3ql.egg-info/SOURCES.txt
@@ -220,7 +220,6 @@ tests/t4_fuse.py
tests/t5_cp.py
tests/t5_ctrl.py
tests/t5_fsck.py
-tests/t5_fsck.py.orig
tests/t5_full.py
tests/t5_lock_rm.py
util/cmdline_lexer.py
diff --git a/src/s3ql/__init__.py b/src/s3ql/__init__.py
index 558b31c..ca51a3a 100644
--- a/src/s3ql/__init__.py
+++ b/src/s3ql/__init__.py
@@ -14,8 +14,8 @@ __all__ = [ 'adm', 'backends', 'block_cache', 'cleanup_manager', 'common',
'parse_args', 'remove', 'statfs', 'umount', 'VERSION',
'CURRENT_FS_REV', 'REV_VER_MAP' ]
-VERSION = '1.10'
-CURRENT_FS_REV = 15
+VERSION = '1.11'
+CURRENT_FS_REV = 16
# Maps file system revisions to the last S3QL version that
# supported this revision.
diff --git a/src/s3ql/adm.py b/src/s3ql/adm.py
index deb7e7f..fc7a06c 100644
--- a/src/s3ql/adm.py
+++ b/src/s3ql/adm.py
@@ -8,18 +8,16 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
from . import CURRENT_FS_REV, REV_VER_MAP
-from .backends.common import BetterBucket, get_bucket, NoSuchBucket
-from .common import (QuietError, BUFSIZE, setup_logging, get_bucket_cachedir, get_seq_no,
+from .backends.common import BetterBackend, get_backend, DanglingStorageURL
+from .common import (QuietError, setup_logging, get_backend_cachedir, get_seq_no,
stream_write_bz2, stream_read_bz2, CTRL_INODE)
-from .database import Connection, NoSuchRowError
-from .metadata import restore_metadata, cycle_metadata, dump_metadata, create_tables
+from .database import Connection
+from .metadata import restore_metadata, cycle_metadata, dump_metadata
from .parse_args import ArgumentParser
from datetime import datetime as Datetime
from getpass import getpass
-from llfuse import ROOT_INODE
import cPickle as pickle
import logging
-import lzma
import os
import shutil
import stat
@@ -34,7 +32,7 @@ def parse_args(args):
'''Parse command line'''
parser = ArgumentParser(
- description="Manage S3QL Buckets.",
+ description="Manage S3QL File Systems.",
epilog=textwrap.dedent('''\
Hint: run `%(prog)s <action> --help` to get help on the additional
arguments that the different actions take.'''))
@@ -46,11 +44,11 @@ def parse_args(args):
subparsers = parser.add_subparsers(metavar='<action>', dest='action',
help='may be either of')
- subparsers.add_parser("passphrase", help="change bucket passphrase",
+ subparsers.add_parser("passphrase", help="change file system passphrase",
parents=[pparser])
subparsers.add_parser("upgrade", help="upgrade file system to newest revision",
parents=[pparser])
- subparsers.add_parser("clear", help="delete all S3QL data from the bucket",
+ subparsers.add_parser("clear", help="delete file system and all data",
parents=[pparser])
subparsers.add_parser("download-metadata",
help="Interactively download metadata backups. "
@@ -88,32 +86,32 @@ def main(args=None):
if options.action == 'clear':
try:
- bucket = get_bucket(options, plain=True)
- except NoSuchBucket as exc:
+ backend = get_backend(options, plain=True)
+ except DanglingStorageURL as exc:
raise QuietError(str(exc))
- return clear(bucket,
- get_bucket_cachedir(options.storage_url, options.cachedir))
+ return clear(backend,
+ get_backend_cachedir(options.storage_url, options.cachedir))
try:
- bucket = get_bucket(options)
- except NoSuchBucket as exc:
+ backend = get_backend(options)
+ except DanglingStorageURL as exc:
raise QuietError(str(exc))
if options.action == 'upgrade':
- return upgrade(bucket, get_bucket_cachedir(options.storage_url,
+ return upgrade(backend, get_backend_cachedir(options.storage_url,
options.cachedir))
if options.action == 'passphrase':
- return change_passphrase(bucket)
+ return change_passphrase(backend)
if options.action == 'download-metadata':
- return download_metadata(bucket, options.storage_url)
+ return download_metadata(backend, options.storage_url)
-def download_metadata(bucket, storage_url):
+def download_metadata(backend, storage_url):
'''Download old metadata backups'''
- backups = sorted(bucket.list('s3ql_metadata_bak_'))
+ backups = sorted(backend.list('s3ql_metadata_bak_'))
if not backups:
raise QuietError('No metadata backups found.')
@@ -121,7 +119,7 @@ def download_metadata(bucket, storage_url):
log.info('The following backups are available:')
log.info('%3s %-23s %-15s', 'No', 'Name', 'Date')
for (i, name) in enumerate(backups):
- params = bucket.lookup(name)
+ params = backend.lookup(name)
if 'last-modified' in params:
date = Datetime.fromtimestamp(params['last-modified']).strftime('%Y-%m-%d %H:%M:%S')
else:
@@ -138,19 +136,19 @@ def download_metadata(bucket, storage_url):
except:
log.warn('Invalid input')
- cachepath = get_bucket_cachedir(storage_url, '.')
+ cachepath = get_backend_cachedir(storage_url, '.')
for i in ('.db', '.params'):
if os.path.exists(cachepath + i):
raise QuietError('%s already exists, aborting.' % cachepath + i)
- param = bucket.lookup(name)
+ param = backend.lookup(name)
try:
log.info('Downloading and decompressing %s...', name)
def do_read(fh):
tmpfh = tempfile.TemporaryFile()
stream_read_bz2(fh, tmpfh)
return tmpfh
- tmpfh = bucket.perform_read(do_read, name)
+ tmpfh = backend.perform_read(do_read, name)
os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC,
stat.S_IRUSR | stat.S_IWUSR))
db = Connection(cachepath + '.db.tmp', fast_mode=True)
@@ -167,17 +165,17 @@ def download_metadata(bucket, storage_url):
# Raise sequence number so that fsck.s3ql actually uses the
# downloaded backup
- seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in bucket.list('s3ql_seq_no_') ]
+ seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in backend.list('s3ql_seq_no_') ]
param['seq_no'] = max(seq_nos) + 1
pickle.dump(param, open(cachepath + '.params', 'wb'), 2)
-def change_passphrase(bucket):
- '''Change bucket passphrase'''
+def change_passphrase(backend):
+ '''Change file system passphrase'''
- if not isinstance(bucket, BetterBucket) and bucket.passphrase:
- raise QuietError('Bucket is not encrypted.')
+ if not isinstance(backend, BetterBackend) and backend.passphrase:
+ raise QuietError('File system is not encrypted.')
- data_pw = bucket.passphrase
+ data_pw = backend.passphrase
if sys.stdin.isatty():
wrap_pw = getpass("Enter new encryption password: ")
@@ -186,12 +184,12 @@ def change_passphrase(bucket):
else:
wrap_pw = sys.stdin.readline().rstrip()
- bucket.passphrase = wrap_pw
- bucket['s3ql_passphrase'] = data_pw
- bucket.passphrase = data_pw
+ backend.passphrase = wrap_pw
+ backend['s3ql_passphrase'] = data_pw
+ backend.passphrase = data_pw
-def clear(bucket, cachepath):
- print('I am about to delete the S3QL file system in %s.' % bucket,
+def clear(backend, cachepath):
+ print('I am about to delete the S3QL file system in %s.' % backend,
'Please enter "yes" to continue.', '> ', sep='\n', end='')
if sys.stdin.readline().strip().lower() != 'yes':
@@ -208,7 +206,7 @@ def clear(bucket, cachepath):
if os.path.exists(name):
shutil.rmtree(name)
- bucket.clear()
+ backend.clear()
log.info('File system deleted.')
log.info('Note: it may take a while for the removals to propagate through the backend.')
@@ -220,16 +218,17 @@ def get_old_rev_msg(rev, prog):
$ wget http://s3ql.googlecode.com/files/s3ql-%(version)s.tar.bz2
$ tar xjf s3ql-%(version)s.tar.bz2
+ $ (cd s3ql-%(version)s; ./setup.py build_ext)
$ s3ql-%(version)s/bin/%(prog)s <options>
''' % { 'version': REV_VER_MAP[rev],
'prog': prog })
-def upgrade(bucket, cachepath):
+def upgrade(backend, cachepath):
'''Upgrade file system to newest revision'''
log.info('Getting file system parameters..')
- seq_nos = list(bucket.list('s3ql_seq_no_'))
+ seq_nos = list(backend.list('s3ql_seq_no_'))
if (seq_nos[0].endswith('.meta')
or seq_nos[0].endswith('.dat')):
print(textwrap.dedent('''
@@ -241,7 +240,7 @@ def upgrade(bucket, cachepath):
'''))
print(get_old_rev_msg(11 + 1, 's3qladm'))
raise QuietError()
- seq_no = get_seq_no(bucket)
+ seq_no = get_seq_no(backend)
# Check for cached metadata
db = None
@@ -249,12 +248,12 @@ def upgrade(bucket, cachepath):
param = pickle.load(open(cachepath + '.params', 'rb'))
if param['seq_no'] < seq_no:
log.info('Ignoring locally cached metadata (outdated).')
- param = bucket.lookup('s3ql_metadata')
+ param = backend.lookup('s3ql_metadata')
else:
log.info('Using cached metadata.')
db = Connection(cachepath + '.db')
else:
- param = bucket.lookup('s3ql_metadata')
+ param = backend.lookup('s3ql_metadata')
# Check for unclean shutdown
if param['seq_no'] < seq_no:
@@ -309,33 +308,30 @@ def upgrade(bucket, cachepath):
if not db:
log.info("Downloading & uncompressing metadata...")
def do_read(fh):
- os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC,
- stat.S_IRUSR | stat.S_IWUSR))
- db = Connection(cachepath + '.db.tmp', fast_mode=True)
- try:
- restore_legacy_metadata(fh, db)
- finally:
- # If metata reading has to be retried, we don't want to hold
- # a lock on the database.
- db.close()
- bucket.perform_read(do_read, "s3ql_metadata")
+ tmpfh = tempfile.TemporaryFile()
+ stream_read_bz2(fh, tmpfh)
+ return tmpfh
+ tmpfh = backend.perform_read(do_read, "s3ql_metadata")
+ os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC,
+ stat.S_IRUSR | stat.S_IWUSR))
+ db = Connection(cachepath + '.db.tmp', fast_mode=True)
+ log.info("Reading metadata...")
+ tmpfh.seek(0)
+ restore_metadata(tmpfh, db)
+ db.close()
os.rename(cachepath + '.db.tmp', cachepath + '.db')
db = Connection(cachepath + '.db')
- log.info('Upgrading from revision %d to %d...', param['revision'], CURRENT_FS_REV)
- if 'max_obj_size' not in param:
- param['max_obj_size'] = param['blocksize']
- if 'blocksize' in param:
- del param['blocksize']
+ log.info('Upgrading from revision %d to %d...', param['revision'], CURRENT_FS_REV)
- db.execute('UPDATE inodes SET mtime=mtime+?, ctime=ctime+?, atime=atime+?',
- (time.timezone, time.timezone, time.timezone))
+ db.execute('UPDATE inodes SET mode=? WHERE id=?',
+ (stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR, CTRL_INODE))
param['revision'] = CURRENT_FS_REV
param['last-modified'] = time.time()
- cycle_metadata(bucket)
+ cycle_metadata(backend)
log.info('Dumping metadata...')
fh = tempfile.TemporaryFile()
dump_metadata(db, fh)
@@ -345,8 +341,8 @@ def upgrade(bucket, cachepath):
return obj_fh
log.info("Compressing and uploading metadata...")
- bucket.store('s3ql_seq_no_%d' % param['seq_no'], 'Empty')
- obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=param,
+ backend.store('s3ql_seq_no_%d' % param['seq_no'], 'Empty')
+ obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param,
is_compressed=True)
log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2)
pickle.dump(param, open(cachepath + '.params', 'wb'), 2)
@@ -354,123 +350,6 @@ def upgrade(bucket, cachepath):
db.execute('ANALYZE')
db.execute('VACUUM')
-def _add_name(db, name):
- '''Get id for *name* and increase refcount
-
- Name is inserted in table if it does not yet exist.
- '''
-
- try:
- name_id = db.get_val('SELECT id FROM names WHERE name=?', (name,))
- except NoSuchRowError:
- name_id = db.rowid('INSERT INTO names (name, refcount) VALUES(?,?)',
- (name, 1))
- else:
- db.execute('UPDATE names SET refcount=refcount+1 WHERE id=?', (name_id,))
-
- return name_id
-
-def renumber_inodes(db):
- '''Renumber inodes'''
-
- log.info('Renumbering inodes...')
- for table in ('inodes', 'inode_blocks', 'symlink_targets',
- 'contents', 'names', 'blocks', 'objects', 'ext_attributes'):
- db.execute('ALTER TABLE %s RENAME TO %s_old' % (table, table))
-
- for table in ('contents_v', 'ext_attributes_v'):
- db.execute('DROP VIEW %s' % table)
-
- create_tables(db)
- for table in ('names', 'blocks', 'objects'):
- db.execute('DROP TABLE %s' % table)
- db.execute('ALTER TABLE %s_old RENAME TO %s' % (table, table))
-
- log.info('..mapping..')
- db.execute('CREATE TEMPORARY TABLE inode_map (rowid INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER UNIQUE)')
- db.execute('INSERT INTO inode_map (rowid, id) VALUES(?,?)', (ROOT_INODE, ROOT_INODE))
- db.execute('INSERT INTO inode_map (rowid, id) VALUES(?,?)', (CTRL_INODE, CTRL_INODE))
- db.execute('INSERT INTO inode_map (id) SELECT id FROM inodes_old WHERE id > ? ORDER BY ctime ASC',
- (CTRL_INODE,))
-
- log.info('..inodes..')
- db.execute('INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount,size,locked,rdev) '
- 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = inodes_old.id), '
- ' mode,uid,gid,mtime,atime,ctime,refcount,size,locked,rdev FROM inodes_old')
-
- log.info('..inode_blocks..')
- db.execute('INSERT INTO inode_blocks (inode, blockno, block_id) '
- 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = inode_blocks_old.inode), '
- ' blockno, block_id FROM inode_blocks_old')
-
- log.info('..contents..')
- db.execute('INSERT INTO contents (inode, parent_inode, name_id) '
- 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = contents_old.inode), '
- ' (SELECT rowid FROM inode_map WHERE inode_map.id = contents_old.parent_inode), '
- ' name_id FROM contents_old')
-
- log.info('..symlink_targets..')
- db.execute('INSERT INTO symlink_targets (inode, target) '
- 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = symlink_targets_old.inode), '
- ' target FROM symlink_targets_old')
-
- log.info('..ext_attributes..')
- db.execute('INSERT INTO ext_attributes (inode, name_id, value) '
- 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = ext_attributes_old.inode), '
- ' name_id, value FROM ext_attributes_old')
-
- for table in ('inodes', 'inode_blocks', 'symlink_targets',
- 'contents', 'ext_attributes'):
- db.execute('DROP TABLE %s_old' % table)
-
- db.execute('DROP TABLE inode_map')
-
-def restore_legacy_metadata(ifh, conn):
-
- # Note: unpickling is terribly slow if fh is not a real file object, so
- # uncompressing to a temporary file also gives a performance boost
- log.info('Downloading and decompressing metadata...')
- tmp = tempfile.TemporaryFile()
- decompressor = lzma.LZMADecompressor()
- while True:
- buf = ifh.read(BUFSIZE)
- if not buf:
- break
- buf = decompressor.decompress(buf)
- if buf:
- tmp.write(buf)
- del decompressor
- tmp.seek(0)
-
- log.info("Reading metadata...")
- unpickler = pickle.Unpickler(tmp)
- (to_dump, columns) = unpickler.load()
- create_tables(conn)
- conn.execute("""
- DROP VIEW ext_attributes_v;
- DROP TABLE ext_attributes;
- CREATE TABLE ext_attributes (
- inode INTEGER NOT NULL REFERENCES inodes(id),
- name BLUB NOT NULL,
- value BLOB NOT NULL,
-
- PRIMARY KEY (inode, name)
- )""")
-
- for (table, _) in to_dump:
- log.info('..%s..', table)
- col_str = ', '.join(columns[table])
- val_str = ', '.join('?' for _ in columns[table])
- sql_str = 'INSERT INTO %s (%s) VALUES(%s)' % (table, col_str, val_str)
- while True:
- buf = unpickler.load()
- if not buf:
- break
- for row in buf:
- conn.execute(sql_str, row)
-
- tmp.close()
- conn.execute('ANALYZE')
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/src/s3ql/backends/common.py b/src/s3ql/backends/common.py
index 8d12d44..ca28206 100644
--- a/src/s3ql/backends/common.py
+++ b/src/s3ql/backends/common.py
@@ -29,6 +29,7 @@ import re
import stat
import struct
import sys
+import math
import threading
import time
import zlib
@@ -130,8 +131,8 @@ def http_connection(hostname, port, ssl=False):
def sha256(s):
return hashlib.sha256(s).digest()
-class BucketPool(object):
- '''A pool of buckets
+class BackendPool(object):
+ '''A pool of backends
This class is threadsafe. All methods (except for internal methods
starting with underscore) may be called concurrently by different
@@ -175,13 +176,13 @@ class BucketPool(object):
self.push_conn(conn)
-class AbstractBucket(object):
+class AbstractBackend(object):
'''Functionality shared between all backends.
Instances behave similarly to dicts. They can be iterated over and
indexed into, but raise a separate set of exceptions.
- The bucket guarantees get after create consistency, i.e. a newly created
+ The backend guarantees get after create consistency, i.e. a newly created
object will be immediately retrievable. Additional consistency guarantees
may or may not be available and can be queried for with instance methods.
'''
@@ -190,7 +191,7 @@ class AbstractBucket(object):
needs_login = True
def __init__(self):
- super(AbstractBucket, self).__init__()
+ super(AbstractBackend, self).__init__()
def __getitem__(self, key):
return self.fetch(key)[0]
@@ -213,22 +214,22 @@ class AbstractBucket(object):
@retry
def perform_read(self, fn, key):
- '''Read bucket data using *fn*, retry on temporary failure
+ '''Read object data using *fn*, retry on temporary failure
- Open bucket for reading, call `fn(fh)` and close bucket. If a temporary
- error (as defined by `is_temp_failure`) occurs during opening, closing
- or execution of *fn*, the operation is retried.
+ Open object for reading, call `fn(fh)` and close object. If a temporary error (as defined by
+ `is_temp_failure`) occurs during opening, closing or execution of *fn*, the operation is
+ retried.
'''
with self.open_read(key) as fh:
return fn(fh)
@retry
def perform_write(self, fn, key, metadata=None, is_compressed=False):
- '''Read bucket data using *fn*, retry on temporary failure
+ '''Read object data using *fn*, retry on temporary failure
- Open bucket for writing, call `fn(fh)` and close bucket. If a temporary
- error (as defined by `is_temp_failure`) occurs during opening, closing
- or execution of *fn*, the operation is retried.
+ Open object for writing, call `fn(fh)` and close object. If a temporary error (as defined by
+ `is_temp_failure`) occurs during opening, closing or execution of *fn*, the operation is
+ retried.
'''
with self.open_write(key, metadata, is_compressed) as fh:
@@ -238,8 +239,8 @@ class AbstractBucket(object):
"""Return data stored under `key`.
Returns a tuple with the data and metadata. If only the data itself is
- required, ``bucket[key]`` is a more concise notation for
- ``bucket.fetch(key)[0]``.
+ required, ``backend[key]`` is a more concise notation for
+ ``backend.fetch(key)[0]``.
"""
def do_read(fh):
@@ -255,8 +256,8 @@ class AbstractBucket(object):
object.
If no metadata is required, one can simply assign to the subscripted
- bucket instead of using this function: ``bucket[key] = val`` is
- equivalent to ``bucket.store(key, val)``.
+ backend instead of using this function: ``backend[key] = val`` is
+ equivalent to ``backend.store(key, val)``.
"""
self.perform_write(lambda fh: fh.write(val), key, metadata)
@@ -265,15 +266,13 @@ class AbstractBucket(object):
def is_temp_failure(self, exc):
'''Return true if exc indicates a temporary error
- Return true if the given exception is used by this bucket's backend
- to indicate a temporary problem. Most instance methods automatically
- retry the request in this case, so the caller does not need to
- worry about temporary failures.
+ Return true if the given exception indicates a temporary problem. Most instance methods
+ automatically retry the request in this case, so the caller does not need to worry about
+ temporary failures.
- However, in same cases (e.g. when reading or writing an object), the
- request cannot automatically be retried. In these case this method can
- be used to check for temporary problems and so that the request can
- be manually restarted if applicable.
+ However, in same cases (e.g. when reading or writing an object), the request cannot
+ automatically be retried. In these case this method can be used to check for temporary
+ problems and so that the request can be manually restarted if applicable.
'''
pass
@@ -296,10 +295,9 @@ class AbstractBucket(object):
def open_read(self, key):
"""Open object for reading
- Return a tuple of a file-like object. Bucket contents can be read from
- the file-like object, metadata is stored in its *metadata* attribute and
- can be modified by the caller at will. The object must be closed
- explicitly.
+ Return a file-like object. Data can be read using the `read` method. metadata is stored in
+ its *metadata* attribute and can be modified by the caller at will. The object must be
+ closed explicitly.
"""
pass
@@ -308,26 +306,24 @@ class AbstractBucket(object):
def open_write(self, key, metadata=None, is_compressed=False):
"""Open object for writing
- `metadata` can be a dict of additional attributes to store with the
- object. Returns a file-like object. The object must be closed
- explicitly. After closing, the *get_obj_size* may be used to retrieve
- the size of the stored object (which may differ from the size of the
+ `metadata` can be a dict of additional attributes to store with the object. Returns a file-
+ like object. The object must be closed explicitly. After closing, the *get_obj_size* may be
+ used to retrieve the size of the stored object (which may differ from the size of the
written data).
- The *is_compressed* parameter indicates that the caller is going
- to write compressed data, and may be used to avoid recompression
- by the bucket.
+ The *is_compressed* parameter indicates that the caller is going to write compressed data,
+ and may be used to avoid recompression by the backend.
"""
pass
@abstractmethod
def clear(self):
- """Delete all objects in bucket"""
+ """Delete all objects in backend"""
pass
def contains(self, key):
- '''Check if `key` is in bucket'''
+ '''Check if `key` is in backend'''
try:
self.lookup(key)
@@ -340,16 +336,16 @@ class AbstractBucket(object):
def delete(self, key, force=False):
"""Delete object stored under `key`
- ``bucket.delete(key)`` can also be written as ``del bucket[key]``.
+ ``backend.delete(key)`` can also be written as ``del backend[key]``.
If `force` is true, do not return an error if the key does not exist.
"""
pass
@abstractmethod
def list(self, prefix=''):
- '''List keys in bucket
+ '''List keys in backend
- Returns an iterator over all keys in the bucket.
+ Returns an iterator over all keys in the backend.
'''
pass
@@ -373,18 +369,18 @@ class AbstractBucket(object):
self.copy(src, dest)
self.delete(src)
-class BetterBucket(AbstractBucket):
+class BetterBackend(AbstractBackend):
'''
This class adds encryption, compression and integrity protection to a plain
- bucket.
+ backend.
'''
- def __init__(self, passphrase, compression, bucket):
- super(BetterBucket, self).__init__()
+ def __init__(self, passphrase, compression, backend):
+ super(BetterBackend, self).__init__()
self.passphrase = passphrase
self.compression = compression
- self.bucket = bucket
+ self.backend = backend
if compression not in ('bzip2', 'lzma', 'zlib', None):
raise ValueError('Unsupported compression: %s' % compression)
@@ -395,7 +391,7 @@ class BetterBucket(AbstractBucket):
If the key does not exist, `NoSuchObject` is raised.
"""
- metadata = self.bucket.lookup(key)
+ metadata = self.backend.lookup(key)
convert_legacy_metadata(metadata)
return self._unwrap_meta(metadata)
@@ -406,28 +402,26 @@ class BetterBucket(AbstractBucket):
that's actually occupied by the object.
'''
- return self.bucket.get_size(key)
+ return self.backend.get_size(key)
def is_temp_failure(self, exc):
'''Return true if exc indicates a temporary error
- Return true if the given exception is used by this bucket's backend
- to indicate a temporary problem. Most instance methods automatically
- retry the request in this case, so the caller does not need to
- worry about temporary failures.
+ Return true if the given exception indicates a temporary problem. Most instance methods
+ automatically retry the request in this case, so the caller does not need to worry about
+ temporary failures.
- However, in same cases (e.g. when reading or writing an object), the
- request cannot automatically be retried. In these case this method can
- be used to check for temporary problems and so that the request can
- be manually restarted if applicable.
+ However, in same cases (e.g. when reading or writing an object), the request cannot
+ automatically be retried. In these case this method can be used to check for temporary
+ problems and so that the request can be manually restarted if applicable.
'''
- return self.bucket.is_temp_failure(exc)
+ return self.backend.is_temp_failure(exc)
def _unwrap_meta(self, metadata):
'''Unwrap metadata
- If the bucket has a password set but the object is not encrypted,
+ If the backend has a password set but the object is not encrypted,
`ObjectNotEncrypted` is raised.
'''
@@ -440,7 +434,9 @@ class BetterBucket(AbstractBucket):
elif not encrypted and self.passphrase:
raise ObjectNotEncrypted()
- buf = b64decode(metadata['meta'])
+ buf = b64decode(''.join(metadata[k]
+ for k in sorted(metadata.keys())
+ if k.startswith('meta')))
if encrypted:
buf = decrypt(buf, self.passphrase)
@@ -460,15 +456,15 @@ class BetterBucket(AbstractBucket):
def open_read(self, key):
"""Open object for reading
- Return a tuple of a file-like object. Bucket contents can be read from
- the file-like object, metadata is stored in its *metadata* attribute and
- can be modified by the caller at will. The object must be closed explicitly.
+ Return a file-like object. Data can be read using the `read` method. metadata is stored in
+ its *metadata* attribute and can be modified by the caller at will. The object must be
+ closed explicitly.
- If the bucket has a password set but the object is not encrypted,
- `ObjectNotEncrypted` is raised.
+ If the backend has a password set but the object is not encrypted, `ObjectNotEncrypted` is
+ raised.
"""
- fh = self.bucket.open_read(key)
+ fh = self.backend.open_read(key)
convert_legacy_metadata(fh.metadata)
compr_alg = fh.metadata['compression']
@@ -513,7 +509,7 @@ class BetterBucket(AbstractBucket):
The *is_compressed* parameter indicates that the caller is going
to write compressed data, and may be used to avoid recompression
- by the bucket.
+ by the backend.
"""
# We always store metadata (even if it's just None), so that we can
@@ -525,12 +521,18 @@ class BetterBucket(AbstractBucket):
if self.passphrase:
meta_raw['encryption'] = 'AES_v2'
nonce = struct.pack(b'<f', time.time()) + bytes(key)
- meta_raw['meta'] = b64encode(encrypt(meta_buf, self.passphrase, nonce))
+ meta_buf = b64encode(encrypt(meta_buf, self.passphrase, nonce))
else:
meta_raw['encryption'] = 'None'
- meta_raw['meta'] = b64encode(meta_buf)
+ meta_buf = b64encode(meta_buf)
nonce = None
+ # Some backends restrict individual metadata fields to 256 bytes,
+ # so we split the data into several fields if necessary
+ chunksize = 255
+ for i in range(int(math.ceil(len(meta_buf) / chunksize))):
+ meta_raw['meta-%2d' % i] = meta_buf[i*chunksize:(i+1)*chunksize]
+
if is_compressed or not self.compression:
compr = None
meta_raw['compression'] = 'None'
@@ -544,7 +546,7 @@ class BetterBucket(AbstractBucket):
compr = lzma.LZMACompressor(options={ 'level': 7 })
meta_raw['compression'] = 'LZMA'
- fh = self.bucket.open_write(key, meta_raw)
+ fh = self.backend.open_write(key, meta_raw)
if nonce:
fh = EncryptFilter(fh, self.passphrase, nonce)
@@ -554,27 +556,27 @@ class BetterBucket(AbstractBucket):
return fh
def clear(self):
- """Delete all objects in bucket"""
- return self.bucket.clear()
+ """Delete all objects in backend"""
+ return self.backend.clear()
def contains(self, key):
- '''Check if `key` is in bucket'''
- return self.bucket.contains(key)
+ '''Check if `key` is in backend'''
+ return self.backend.contains(key)
def delete(self, key, force=False):
"""Delete object stored under `key`
- ``bucket.delete(key)`` can also be written as ``del bucket[key]``.
+ ``backend.delete(key)`` can also be written as ``del backend[key]``.
If `force` is true, do not return an error if the key does not exist.
"""
- return self.bucket.delete(key, force)
+ return self.backend.delete(key, force)
def list(self, prefix=''):
- '''List keys in bucket
+ '''List keys in backend
- Returns an iterator over all keys in the bucket.
+ Returns an iterator over all keys in the backend.
'''
- return self.bucket.list(prefix)
+ return self.backend.list(prefix)
def copy(self, src, dest):
"""Copy data stored under key `src` to key `dest`
@@ -582,7 +584,7 @@ class BetterBucket(AbstractBucket):
If `dest` already exists, it will be overwritten. The copying
is done on the remote side.
"""
- return self.bucket.copy(src, dest)
+ return self.backend.copy(src, dest)
def rename(self, src, dest):
"""Rename key `src` to `dest`
@@ -590,7 +592,7 @@ class BetterBucket(AbstractBucket):
If `dest` already exists, it will be overwritten. The rename
is done on the remote side.
"""
- return self.bucket.rename(src, dest)
+ return self.backend.rename(src, dest)
class AbstractInputFilter(object):
@@ -1004,7 +1006,7 @@ def decrypt(buf, passphrase):
class ObjectNotEncrypted(Exception):
'''
Raised by the backend if an object was requested from an encrypted
- bucket, but the object was stored without encryption.
+ backend, but the object was stored without encryption.
We do not want to simply return the uncrypted object, because the
caller may rely on the objects integrity being cryptographically
@@ -1014,28 +1016,28 @@ class ObjectNotEncrypted(Exception):
pass
class NoSuchObject(Exception):
- '''Raised if the requested object does not exist in the bucket'''
+ '''Raised if the requested object does not exist in the backend'''
def __init__(self, key):
super(NoSuchObject, self).__init__()
self.key = key
def __str__(self):
- return 'Bucket does not have anything stored under key %r' % self.key
+ return 'Backend does not have anything stored under key %r' % self.key
-class NoSuchBucket(Exception):
- '''Raised if the requested bucket does not exist'''
+class DanglingStorageURL(Exception):
+ '''Raised if the backend can't store data at the given location'''
- def __init__(self, name):
- super(NoSuchBucket, self).__init__()
- self.name = name
+ def __init__(self, loc):
+ super(DanglingStorageURL, self).__init__()
+ self.loc = loc
def __str__(self):
- return 'Bucket %r does not exist' % self.name
+ return '%r does not exist' % self.loc
class AuthorizationError(Exception):
- '''Raised if the credentials don't give access to the requested bucket'''
+ '''Raised if the credentials don't give access to the requested backend'''
def __init__(self, msg):
super(AuthorizationError, self).__init__()
@@ -1091,20 +1093,20 @@ def convert_legacy_metadata(meta):
meta['compression'] = 'None'
-def get_bucket(options, plain=False):
- '''Return bucket for given storage-url
+def get_backend(options, plain=False):
+ '''Return backend for given storage-url
If *plain* is true, don't attempt to unlock and don't wrap into
- BetterBucket.
+ BetterBackend.
'''
- return get_bucket_factory(options, plain)()
+ return get_backend_factory(options, plain)()
-def get_bucket_factory(options, plain=False, ssl=False):
- '''Return factory producing bucket objects for given storage-url
+def get_backend_factory(options, plain=False):
+ '''Return factory producing backend objects for given storage-url
If *plain* is true, don't attempt to unlock and don't wrap into
- BetterBucket.
+ BetterBackend.
'''
hit = re.match(r'^([a-zA-Z0-9]+)://', options.storage_url)
@@ -1117,7 +1119,7 @@ def get_bucket_factory(options, plain=False, ssl=False):
except ImportError:
raise QuietError('No such backend: %s' % hit.group(1))
- bucket_class = getattr(sys.modules[backend_name], 'Bucket')
+ backend_class = getattr(sys.modules[backend_name], 'Backend')
# Read authfile
config = ConfigParser.SafeConfigParser()
@@ -1129,7 +1131,7 @@ def get_bucket_factory(options, plain=False, ssl=False):
backend_login = None
backend_pw = None
- bucket_passphrase = None
+ backend_passphrase = None
for section in config.sections():
def getopt(name):
try:
@@ -1144,34 +1146,38 @@ def get_bucket_factory(options, plain=False, ssl=False):
backend_login = backend_login or getopt('backend-login')
backend_pw = backend_pw or getopt('backend-password')
- bucket_passphrase = bucket_passphrase or getopt('bucket-passphrase')
+ backend_passphrase = backend_passphrase or getopt('fs-passphrase')
+ if backend_passphrase is None and getopt('bucket-passphrase') is not None:
+ backend_passphrase = getopt('bucket-passphrase')
+ log.warn("Warning: the 'bucket-passphrase' configuration option has been "
+ "renamed to 'fs-passphrase'! Please update your authinfo file.")
- if not backend_login and bucket_class.needs_login:
+ if not backend_login and backend_class.needs_login:
if sys.stdin.isatty():
backend_login = getpass("Enter backend login: ")
else:
backend_login = sys.stdin.readline().rstrip()
- if not backend_pw and bucket_class.needs_login:
+ if not backend_pw and backend_class.needs_login:
if sys.stdin.isatty():
backend_pw = getpass("Enter backend password: ")
else:
backend_pw = sys.stdin.readline().rstrip()
try:
- bucket = bucket_class(options.storage_url, backend_login, backend_pw,
+ backend = backend_class(options.storage_url, backend_login, backend_pw,
options.ssl)
- # Do not use bucket.lookup(), this would use a HEAD request and
+ # Do not use backend.lookup(), this would use a HEAD request and
# not provide any useful error messages if something goes wrong
# (e.g. wrong credentials)
- _ = bucket['s3ql_passphrase']
+ _ = backend['s3ql_passphrase']
- except NoSuchBucket:
- raise QuietError('Bucket does not exist')
+ except DanglingStorageURL as exc:
+ raise QuietError(str(exc))
except AuthorizationError:
- raise QuietError('No permission to access bucket.')
+ raise QuietError('No permission to access backend.')
except AuthenticationError:
raise QuietError('Invalid credentials or skewed system clock.')
@@ -1183,16 +1189,16 @@ def get_bucket_factory(options, plain=False, ssl=False):
encrypted = True
if plain:
- return lambda: bucket_class(options.storage_url, backend_login, backend_pw,
+ return lambda: backend_class(options.storage_url, backend_login, backend_pw,
options.ssl)
- if encrypted and not bucket_passphrase:
+ if encrypted and not backend_passphrase:
if sys.stdin.isatty():
- bucket_passphrase = getpass("Enter bucket encryption passphrase: ")
+ backend_passphrase = getpass("Enter file system encryption passphrase: ")
else:
- bucket_passphrase = sys.stdin.readline().rstrip()
+ backend_passphrase = sys.stdin.readline().rstrip()
elif not encrypted:
- bucket_passphrase = None
+ backend_passphrase = None
if hasattr(options, 'compress'):
compress = options.compress
@@ -1200,17 +1206,17 @@ def get_bucket_factory(options, plain=False, ssl=False):
compress = None
if not encrypted:
- return lambda: BetterBucket(None, compress,
- bucket_class(options.storage_url, backend_login, backend_pw,
+ return lambda: BetterBackend(None, compress,
+ backend_class(options.storage_url, backend_login, backend_pw,
options.ssl))
- tmp_bucket = BetterBucket(bucket_passphrase, compress, bucket)
+ tmp_backend = BetterBackend(backend_passphrase, compress, backend)
try:
- data_pw = tmp_bucket['s3ql_passphrase']
+ data_pw = tmp_backend['s3ql_passphrase']
except ChecksumError:
- raise QuietError('Wrong bucket passphrase')
+ raise QuietError('Wrong backend passphrase')
- return lambda: BetterBucket(data_pw, compress,
- bucket_class(options.storage_url, backend_login, backend_pw,
+ return lambda: BetterBackend(data_pw, compress,
+ backend_class(options.storage_url, backend_login, backend_pw,
options.ssl))
diff --git a/src/s3ql/backends/gs.py b/src/s3ql/backends/gs.py
index f746a21..ae58d5e 100644
--- a/src/s3ql/backends/gs.py
+++ b/src/s3ql/backends/gs.py
@@ -8,28 +8,26 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
from . import s3c
-from .s3c import retry
from s3ql.common import QuietError
import logging
import re
-import xml.etree.cElementTree as ElementTree
# Pylint goes berserk with false positives
#pylint: disable=E1002,E1101,W0201
log = logging.getLogger("backends.gs")
-class Bucket(s3c.Bucket):
- """A bucket stored in Google Storage
+class Backend(s3c.Backend):
+ """A backend to store data in Google Storage
This class uses standard HTTP connections to connect to GS.
- The bucket guarantees immediate get consistency and eventual list
+ The backend guarantees immediate get consistency and eventual list
consistency.
"""
def __init__(self, storage_url, gs_key, gs_secret, use_ssl):
- super(Bucket, self).__init__(storage_url, gs_key, gs_secret, use_ssl)
+ super(Backend, self).__init__(storage_url, gs_key, gs_secret, use_ssl)
self.namespace = 'http://doc.s3.amazonaws.com/2006-03-01'
diff --git a/src/s3ql/backends/local.py b/src/s3ql/backends/local.py
index 423cfec..6d4e817 100644
--- a/src/s3ql/backends/local.py
+++ b/src/s3ql/backends/local.py
@@ -8,7 +8,7 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
-from .common import AbstractBucket, NoSuchBucket, NoSuchObject, ChecksumError
+from .common import AbstractBackend, DanglingStorageURL, NoSuchObject, ChecksumError
from ..common import BUFSIZE
import shutil
import logging
@@ -19,27 +19,27 @@ import thread
log = logging.getLogger("backend.local")
-class Bucket(AbstractBucket):
+class Backend(AbstractBackend):
'''
- A bucket that is stored on the local hard disk
+ A backend that stores data on the local hard disk
'''
needs_login = False
def __init__(self, storage_url, backend_login, backend_pw, use_ssl=False):
- '''Initialize local bucket
+ '''Initialize local backend
Login and password are ignored.
'''
# Unused argument
#pylint: disable=W0613
- super(Bucket, self).__init__()
+ super(Backend, self).__init__()
name = storage_url[len('local://'):]
self.name = name
if not os.path.exists(name):
- raise NoSuchBucket(name)
+ raise DanglingStorageURL(name)
def __str__(self):
return 'local://%s' % self.name
@@ -78,9 +78,9 @@ class Bucket(AbstractBucket):
def open_read(self, key):
"""Open object for reading
- Return a tuple of a file-like object. Bucket contents can be read from
- the file-like object, metadata is stored in its *metadata* attribute and
- can be modified by the caller at will.
+ Return a file-like object. Data can be read using the `read` method. metadata is stored in
+ its *metadata* attribute and can be modified by the caller at will. The object must be
+ closed explicitly.
"""
path = self._key_to_path(key)
@@ -104,15 +104,13 @@ class Bucket(AbstractBucket):
def open_write(self, key, metadata=None, is_compressed=False):
"""Open object for writing
- `metadata` can be a dict of additional attributes to store with the
- object. Returns a file-like object. The object must be closed
- explicitly. After closing, the *get_obj_size* may be used to retrieve
- the size of the stored object (which may differ from the size of the
+ `metadata` can be a dict of additional attributes to store with the object. Returns a file-
+ like object. The object must be closed explicitly. After closing, the *get_obj_size* may be
+ used to retrieve the size of the stored object (which may differ from the size of the
written data).
- The *is_compressed* parameter indicates that the caller is going
- to write compressed data, and may be used to avoid recompression
- by the bucket.
+ The *is_compressed* parameter indicates that the caller is going to write compressed data,
+ and may be used to avoid recompression by the backend.
"""
if metadata is None:
@@ -145,7 +143,7 @@ class Bucket(AbstractBucket):
return dest
def clear(self):
- """Delete all objects in bucket"""
+ """Delete all objects in backend"""
for name in os.listdir(self.name):
path = os.path.join(self.name, name)
@@ -155,7 +153,7 @@ class Bucket(AbstractBucket):
os.unlink(path)
def contains(self, key):
- '''Check if `key` is in bucket'''
+ '''Check if `key` is in backend'''
path = self._key_to_path(key)
try:
@@ -169,7 +167,7 @@ class Bucket(AbstractBucket):
def delete(self, key, force=False):
"""Delete object stored under `key`
- ``bucket.delete(key)`` can also be written as ``del bucket[key]``.
+ ``backend.delete(key)`` can also be written as ``del backend[key]``.
If `force` is true, do not return an error if the key does not exist.
"""
path = self._key_to_path(key)
@@ -185,9 +183,9 @@ class Bucket(AbstractBucket):
raise
def list(self, prefix=''):
- '''List keys in bucket
+ '''List keys in backend
- Returns an iterator over all keys in the bucket.
+ Returns an iterator over all keys in the backend.
'''
if prefix:
base = os.path.dirname(self._key_to_path(prefix))
diff --git a/src/s3ql/backends/s3.py b/src/s3ql/backends/s3.py
index f485d6e..a41f512 100644
--- a/src/s3ql/backends/s3.py
+++ b/src/s3ql/backends/s3.py
@@ -17,18 +17,18 @@ log = logging.getLogger("backend.s3")
# Pylint goes berserk with false positives
#pylint: disable=E1002,E1101
-class Bucket(s3c.Bucket):
- """A bucket stored in Amazon S3
+class Backend(s3c.Backend):
+ """A backend to store data in Amazon S3
This class uses standard HTTP connections to connect to S3.
- The bucket guarantees get after create consistency, i.e. a newly created
+ The backend guarantees get after create consistency, i.e. a newly created
object will be immediately retrievable. Additional consistency guarantees
may or may not be available and can be queried for with instance methods.
"""
def __init__(self, storage_url, login, password, use_ssl):
- super(Bucket, self).__init__(storage_url, login, password, use_ssl)
+ super(Backend, self).__init__(storage_url, login, password, use_ssl)
@staticmethod
diff --git a/src/s3ql/backends/s3c.py b/src/s3ql/backends/s3c.py
index 53077ae..c5b84d7 100644
--- a/src/s3ql/backends/s3c.py
+++ b/src/s3ql/backends/s3c.py
@@ -8,9 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
from ..common import BUFSIZE, QuietError
-from .common import AbstractBucket, NoSuchObject, retry, AuthorizationError, http_connection, \
+from .common import AbstractBackend, NoSuchObject, retry, AuthorizationError, http_connection, \
AuthenticationError
-from .common import NoSuchBucket as NoSuchBucket_common
+from .common import DanglingStorageURL as DanglingStorageURL_common
from base64 import b64encode
from email.utils import parsedate_tz, mktime_tz
from urlparse import urlsplit
@@ -33,16 +33,16 @@ XML_CONTENT_RE = re.compile('^application/xml(?:;\s+|$)', re.IGNORECASE)
log = logging.getLogger("backends.s3c")
-class Bucket(AbstractBucket):
- """A bucket stored in some S3 compatible storage service.
+class Backend(AbstractBackend):
+ """A backend to stored data in some S3 compatible storage service.
This class uses standard HTTP connections to connect to GS.
- The bucket guarantees only immediate get after create consistency.
+ The backend guarantees only immediate get after create consistency.
"""
def __init__(self, storage_url, login, password, use_ssl):
- super(Bucket, self).__init__()
+ super(Backend, self).__init__()
(host, port, bucket_name, prefix) = self._parse_storage_url(storage_url, use_ssl)
@@ -93,15 +93,13 @@ class Bucket(AbstractBucket):
def is_temp_failure(self, exc): #IGNORE:W0613
'''Return true if exc indicates a temporary error
- Return true if the given exception is used by this bucket's backend
- to indicate a temporary problem. Most instance methods automatically
- retry the request in this case, so the caller does not need to
- worry about temporary failures.
+ Return true if the given exception indicates a temporary problem. Most instance methods
+ automatically retry the request in this case, so the caller does not need to worry about
+ temporary failures.
- However, in same cases (e.g. when reading or writing an object), the
- request cannot automatically be retried. In these case this method can
- be used to check for temporary problems and so that the request can
- be manually restarted if applicable.
+ However, in same cases (e.g. when reading or writing an object), the request cannot
+ automatically be retried. In these case this method can be used to check for temporary
+ problems and so that the request can be manually restarted if applicable.
'''
if isinstance(exc, (InternalError, BadDigest, IncompleteBody, RequestTimeout,
@@ -139,9 +137,9 @@ class Bucket(AbstractBucket):
raise NoSuchObject(key)
def list(self, prefix=''):
- '''List keys in bucket
+ '''List keys in backend
- Returns an iterator over all keys in the bucket. This method
+ Returns an iterator over all keys in the backend. This method
handles temporary errors.
'''
@@ -165,7 +163,7 @@ class Bucket(AbstractBucket):
type(exc).__name__)
raise
- log.info('Encountered %s exception (%s), retrying call to swift.Bucket.list()',
+ log.info('Encountered %s exception (%s), retrying call to s3c.Backend.list()',
type(exc).__name__, exc)
if hasattr(exc, 'retry_after') and exc.retry_after:
@@ -180,9 +178,9 @@ class Bucket(AbstractBucket):
yield marker
def _list(self, prefix='', start=''):
- '''List keys in bucket, starting with *start*
+ '''List keys in backend, starting with *start*
- Returns an iterator over all keys in the bucket. This method
+ Returns an iterator over all keys in the backend. This method
does not retry on errors.
'''
@@ -272,12 +270,12 @@ class Bucket(AbstractBucket):
@retry
def open_read(self, key):
- ''''Open object for reading
+ """Open object for reading
- Return a tuple of a file-like object. Bucket contents can be read from
- the file-like object, metadata is stored in its *metadata* attribute and
- can be modified by the caller at will. The object must be closed explicitly.
- '''
+ Return a file-like object. Data can be read using the `read` method. metadata is stored in
+ its *metadata* attribute and can be modified by the caller at will. The object must be
+ closed explicitly.
+ """
try:
resp = self._do_request('GET', '/%s%s' % (self.prefix, key))
@@ -289,16 +287,14 @@ class Bucket(AbstractBucket):
def open_write(self, key, metadata=None, is_compressed=False):
"""Open object for writing
- `metadata` can be a dict of additional attributes to store with the
- object. Returns a file-like object. The object must be closed
- explicitly. After closing, the *get_obj_size* may be used to retrieve
- the size of the stored object (which may differ from the size of the
+ `metadata` can be a dict of additional attributes to store with the object. Returns a file-
+ like object. The object must be closed explicitly. After closing, the *get_obj_size* may be
+ used to retrieve the size of the stored object (which may differ from the size of the
written data).
-
- The *is_compressed* parameter indicates that the caller is going
- to write compressed data, and may be used to avoid recompression
- by the bucket.
-
+
+ The *is_compressed* parameter indicates that the caller is going to write compressed data,
+ and may be used to avoid recompression by the backend.
+
Since Amazon S3 does not support chunked uploads, the entire data will
be buffered in memory before upload.
"""
@@ -411,7 +407,7 @@ class Bucket(AbstractBucket):
def clear(self):
- """Delete all objects in bucket
+ """Delete all objects in backend
Note that this method may not be able to see (and therefore also not
delete) recently uploaded objects.
@@ -508,11 +504,11 @@ class Bucket(AbstractBucket):
class ObjectR(object):
'''An S3 object open for reading'''
- def __init__(self, key, resp, bucket, metadata=None):
+ def __init__(self, key, resp, backend, metadata=None):
self.key = key
self.resp = resp
self.md5_checked = False
- self.bucket = bucket
+ self.backend = backend
self.metadata = metadata
# False positive, hashlib *does* have md5 member
@@ -561,9 +557,9 @@ class ObjectW(object):
the close() method is called.
'''
- def __init__(self, key, bucket, headers):
+ def __init__(self, key, backend, headers):
self.key = key
- self.bucket = bucket
+ self.backend = backend
self.headers = headers
self.closed = False
self.obj_size = 0
@@ -581,7 +577,7 @@ class ObjectW(object):
self.obj_size += len(buf)
def is_temp_failure(self, exc):
- return self.bucket.is_temp_failure(exc)
+ return self.backend.is_temp_failure(exc)
@retry
def close(self):
@@ -596,7 +592,7 @@ class ObjectW(object):
self.headers['Content-Length'] = self.obj_size
self.fh.seek(0)
- resp = self.bucket._do_request('PUT', '/%s%s' % (self.bucket.prefix, self.key),
+ resp = self.backend._do_request('PUT', '/%s%s' % (self.backend.prefix, self.key),
headers=self.headers, body=self.fh)
etag = resp.getheader('ETag').strip('"')
assert resp.length == 0
@@ -605,7 +601,7 @@ class ObjectW(object):
log.warn('ObjectW(%s).close(): MD5 mismatch (%s vs %s)', self.key, etag,
self.md5.hexdigest)
try:
- self.bucket.delete(self.key)
+ self.backend.delete(self.key)
except:
log.exception('Objectw(%s).close(): unable to delete corrupted object!',
self.key)
@@ -708,4 +704,4 @@ class OperationAborted(S3Error): pass
class RequestTimeout(S3Error): pass
class SlowDown(S3Error): pass
class RequestTimeTooSkewed(S3Error): pass
-class NoSuchBucket(S3Error, NoSuchBucket_common): pass
+class DanglingStorageURL(S3Error, DanglingStorageURL_common): pass
diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py
index c065734..ae4b9ab 100644
--- a/src/s3ql/backends/swift.py
+++ b/src/s3ql/backends/swift.py
@@ -8,7 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
from ..common import QuietError, BUFSIZE
-from .common import AbstractBucket, NoSuchObject, retry, AuthorizationError, http_connection
+from .common import (AbstractBackend, NoSuchObject, retry, AuthorizationError, http_connection,
+ DanglingStorageURL)
from .s3c import HTTPError, BadDigest
from urlparse import urlsplit
import json
@@ -20,14 +21,13 @@ import re
import tempfile
import time
import urllib
-from s3ql.backends.common import NoSuchBucket
log = logging.getLogger("backend.swift")
-class Bucket(AbstractBucket):
- """A bucket stored in OpenStack Swift
+class Backend(AbstractBackend):
+ """A backend to store data in OpenStack Swift
- The bucket guarantees get after create consistency, i.e. a newly created
+ The backend guarantees get after create consistency, i.e. a newly created
object will be immediately retrievable.
"""
@@ -35,30 +35,31 @@ class Bucket(AbstractBucket):
# Unused argument
#pylint: disable=W0613
- super(Bucket, self).__init__()
+ super(Backend, self).__init__()
- (host, port, bucket_name, prefix) = self._parse_storage_url(storage_url)
+ (host, port, container_name, prefix) = self._parse_storage_url(storage_url)
self.hostname = host
self.port = port
- self.bucket_name = bucket_name
+ self.container_name = container_name
self.prefix = prefix
self.password = password
self.login = login
self.auth_token = None
self.auth_prefix = None
- self.conn = self._get_conn()
+ self.conn = None
- self._bucket_exists()
+ self._container_exists()
- def _bucket_exists(self):
- '''Make sure that the bucket exists'''
+ @retry
+ def _container_exists(self):
+ '''Make sure that the container exists'''
try:
resp = self._do_request('GET', '/', query_string={'limit': 1 })
except HTTPError as exc:
if exc.status == 404:
- raise NoSuchBucket(self.bucket_name)
+ raise DanglingStorageURL(self.container_name)
raise
resp.read()
@@ -66,7 +67,7 @@ class Bucket(AbstractBucket):
def _parse_storage_url(storage_url):
'''Extract information from storage URL
- Return a tuple *(host, port, bucket_name, prefix)* .
+ Return a tuple *(host, port, container_name, prefix)* .
'''
hit = re.match(r'^[a-zA-Z0-9]+://' # Backend
@@ -80,23 +81,21 @@ class Bucket(AbstractBucket):
hostname = hit.group(1)
port = int(hit.group(2) or '443')
- bucketname = hit.group(3)
+ containername = hit.group(3)
prefix = hit.group(4) or ''
- return (hostname, port, bucketname, prefix)
+ return (hostname, port, containername, prefix)
def is_temp_failure(self, exc): #IGNORE:W0613
'''Return true if exc indicates a temporary error
- Return true if the given exception is used by this bucket's backend
- to indicate a temporary problem. Most instance methods automatically
- retry the request in this case, so the caller does not need to
- worry about temporary failures.
+ Return true if the given exception indicates a temporary problem. Most instance methods
+ automatically retry the request in this case, so the caller does not need to worry about
+ temporary failures.
- However, in same cases (e.g. when reading or writing an object), the
- request cannot automatically be retried. In these case this method can
- be used to check for temporary problems and so that the request can
- be manually restarted if applicable.
+ However, in same cases (e.g. when reading or writing an object), the request cannot
+ automatically be retried. In these case this method can be used to check for temporary
+ problems and so that the request can be manually restarted if applicable.
'''
if isinstance(exc, (httplib.IncompleteRead,)):
@@ -111,10 +110,15 @@ class Bucket(AbstractBucket):
exc.errno in (errno.EPIPE, errno.ECONNRESET, errno.ETIMEDOUT,
errno.EINTR)):
return True
+
+ elif isinstance(exc, HTTPError) and exc.status >= 500 and exc.status <= 599:
+ return True
+ elif isinstance(exc, AuthenticationExpired):
+ return True
+
return False
- @retry
def _get_conn(self):
'''Obtain connection to server and authentication token'''
@@ -134,14 +138,11 @@ class Bucket(AbstractBucket):
resp.read()
continue
- if resp.status == 401:
+ elif resp.status == 401:
raise AuthorizationError(resp.read())
elif resp.status > 299 or resp.status < 200:
- log.error('_refresh_auth(): unexpected response: %d %s\n%s',
- resp.status, resp.msg, resp.read())
- raise RuntimeError('Unexpected response: %d %s' % (resp.status,
- resp.msg))
+ raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read())
# Pylint can't infer SplitResult Types
#pylint: disable=E1103
@@ -167,12 +168,15 @@ class Bucket(AbstractBucket):
if headers is None:
headers = dict()
-
if not body:
headers['content-length'] = '0'
-
+
+ if self.conn is None:
+ log.info('_do_request(): no active connection, calling _get_conn()')
+ self.conn = self._get_conn()
+
# Construct full path
- path = urllib.quote('%s/%s%s' % (self.auth_prefix, self.bucket_name, path))
+ path = urllib.quote('%s/%s%s' % (self.auth_prefix, self.container_name, path))
if query_string:
s = urllib.urlencode(query_string, doseq=True)
if subres:
@@ -183,43 +187,40 @@ class Bucket(AbstractBucket):
path += '?%s' % subres
headers['connection'] = 'keep-alive'
-
- while True:
- headers['X-Auth-Token'] = self.auth_token
+ headers['X-Auth-Token'] = self.auth_token
- try:
- log.debug('_do_request(): %s %s', method, path)
- self.conn.request(method, path, body, headers)
+ try:
+ log.debug('_do_request(): %s %s', method, path)
+ self.conn.request(method, path, body, headers)
+
+ log.debug('_do_request(): Reading response..')
+ resp = self.conn.getresponse()
+ except:
+ # We probably can't use the connection anymore
+ self.conn.close()
+ raise
- log.debug('_do_request(): Reading response..')
- resp = self.conn.getresponse()
- except:
- # We probably can't use the connection anymore
- self.conn.close()
- raise
+ # We need to call read() at least once for httplib to consider this
+ # request finished, even if there is no response body.
+ if resp.length == 0:
+ resp.read()
+
+ # Success
+ if resp.status >= 200 and resp.status <= 299:
+ return resp
+
+ # Expired auth token
+ if resp.status == 401:
+ log.info('OpenStack auth token seems to have expired, requesting new one.')
+ self.conn = None
+ raise AuthenticationExpired(resp.reason)
- # We need to call read() at least once for httplib to consider this
- # request finished, even if there is no response body.
- if resp.length == 0:
- resp.read()
-
- # Success
- if resp.status >= 200 and resp.status <= 299:
- return resp
-
- # Expired auth token
- if resp.status == 401:
- log.info('OpenStack auth token seems to have expired, requesting new one.')
- resp.read()
- self.conn = self._get_conn()
- continue
-
- # If method == HEAD, server must not return response body
- # even in case of errors
- if method.upper() == 'HEAD':
- raise HTTPError(resp.status, resp.reason)
- else:
- raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read())
+ # If method == HEAD, server must not return response body
+ # even in case of errors
+ if method.upper() == 'HEAD':
+ raise HTTPError(resp.status, resp.reason)
+ else:
+ raise HTTPError(resp.status, resp.reason, resp.getheaders(), resp.read())
@retry
def lookup(self, key):
@@ -264,7 +265,7 @@ class Bucket(AbstractBucket):
def open_read(self, key):
''''Open object for reading
- Return a tuple of a file-like object. Bucket contents can be read from
+ Return a tuple of a file-like object. Backend contents can be read from
the file-like object, metadata is stored in its *metadata* attribute and
can be modified by the caller at will. The object must be closed explicitly.
'''
@@ -281,15 +282,13 @@ class Bucket(AbstractBucket):
def open_write(self, key, metadata=None, is_compressed=False):
"""Open object for writing
- `metadata` can be a dict of additional attributes to store with the
- object. Returns a file-like object. The object must be closed
- explicitly. After closing, the *get_obj_size* may be used to retrieve
- the size of the stored object (which may differ from the size of the
+ `metadata` can be a dict of additional attributes to store with the object. Returns a file-
+ like object. The object must be closed explicitly. After closing, the *get_obj_size* may be
+ used to retrieve the size of the stored object (which may differ from the size of the
written data).
- The *is_compressed* parameter indicates that the caller is going
- to write compressed data, and may be used to avoid recompression
- by the bucket.
+ The *is_compressed* parameter indicates that the caller is going to write compressed data,
+ and may be used to avoid recompression by the backend.
"""
log.debug('open_write(%s): start', key)
@@ -302,7 +301,7 @@ class Bucket(AbstractBucket):
return ObjectW(key, self, headers)
def clear(self):
- """Delete all objects in bucket"""
+ """Delete all objects in backend"""
# We have to cache keys, because otherwise we can't use the
# http connection to delete keys.
@@ -312,14 +311,14 @@ class Bucket(AbstractBucket):
log.debug('clear(): deleting key %s', s3key)
- # Ignore missing objects when clearing bucket
+ # Ignore missing objects when clearing backend
self.delete(s3key, True)
@retry
def delete(self, key, force=False):
"""Delete object stored under `key`
- ``bucket.delete(key)`` can also be written as ``del bucket[key]``.
+ ``backend.delete(key)`` can also be written as ``del backend[key]``.
If `force` is true, do not return an error if the key does not exist.
"""
@@ -345,7 +344,7 @@ class Bucket(AbstractBucket):
try:
resp = self._do_request('PUT', '/%s%s' % (self.prefix, dest),
- headers={ 'X-Copy-From': '/%s/%s%s' % (self.bucket_name,
+ headers={ 'X-Copy-From': '/%s/%s%s' % (self.container_name,
self.prefix, src)})
# Discard response body
resp.read()
@@ -355,9 +354,9 @@ class Bucket(AbstractBucket):
raise
def list(self, prefix=''):
- '''List keys in bucket
+ '''List keys in backend
- Returns an iterator over all keys in the bucket. This method
+ Returns an iterator over all keys in the backend. This method
handles temporary errors.
'''
@@ -381,7 +380,7 @@ class Bucket(AbstractBucket):
type(exc).__name__)
raise
- log.info('Encountered %s exception (%s), retrying call to swift.Bucket.list()',
+ log.info('Encountered %s exception (%s), retrying call to swift.Backend.list()',
type(exc).__name__, exc)
if hasattr(exc, 'retry_after') and exc.retry_after:
@@ -396,9 +395,9 @@ class Bucket(AbstractBucket):
yield marker
def _list(self, prefix='', start='', batch_size=5000):
- '''List keys in bucket, starting with *start*
+ '''List keys in backend, starting with *start*
- Returns an iterator over all keys in the bucket. This method
+ Returns an iterator over all keys in the backend. This method
does not retry on errors.
'''
@@ -416,7 +415,7 @@ class Bucket(AbstractBucket):
'limit': batch_size })
except HTTPError as exc:
if exc.status == 404:
- raise NoSuchBucket(self.bucket_name)
+ raise DanglingStorageURL(self.container_name)
raise
if resp.status == 204:
@@ -450,9 +449,9 @@ class ObjectW(object):
the close() method is called.
'''
- def __init__(self, key, bucket, headers):
+ def __init__(self, key, backend, headers):
self.key = key
- self.bucket = bucket
+ self.backend = backend
self.headers = headers
self.closed = False
self.obj_size = 0
@@ -470,7 +469,7 @@ class ObjectW(object):
self.obj_size += len(buf)
def is_temp_failure(self, exc):
- return self.bucket.is_temp_failure(exc)
+ return self.backend.is_temp_failure(exc)
@retry
def close(self):
@@ -486,7 +485,7 @@ class ObjectW(object):
self.headers['Content-Type'] = 'application/octet-stream'
self.fh.seek(0)
- resp = self.bucket._do_request('PUT', '/%s%s' % (self.bucket.prefix, self.key),
+ resp = self.backend._do_request('PUT', '/%s%s' % (self.backend.prefix, self.key),
headers=self.headers, body=self.fh)
etag = resp.getheader('ETag').strip('"')
resp.read()
@@ -495,7 +494,7 @@ class ObjectW(object):
log.warn('ObjectW(%s).close(): MD5 mismatch (%s vs %s)', self.key, etag,
self.md5.hexdigest)
try:
- self.bucket.delete(self.key)
+ self.backend.delete(self.key)
except:
log.exception('Objectw(%s).close(): unable to delete corrupted object!',
self.key)
@@ -516,11 +515,11 @@ class ObjectW(object):
class ObjectR(object):
'''A SWIFT object opened for reading'''
- def __init__(self, key, resp, bucket, metadata=None):
+ def __init__(self, key, resp, backend, metadata=None):
self.key = key
self.resp = resp
self.md5_checked = False
- self.bucket = bucket
+ self.backend = backend
self.metadata = metadata
# False positive, hashlib *does* have md5 member
@@ -573,4 +572,17 @@ def extractmeta(resp):
continue
meta[hit.group(1)] = val
- return meta \ No newline at end of file
+ return meta
+
+
+class AuthenticationExpired(Exception):
+ '''Raised if the provided Authentication Token has expired'''
+
+ def __init__(self, msg):
+ super(AuthenticationExpired, self).__init__()
+ self.msg = msg
+
+ def __str__(self):
+ return 'Auth token expired. Server said: %s' % self.msg
+
+ \ No newline at end of file
diff --git a/src/s3ql/block_cache.py b/src/s3ql/block_cache.py
index 19c6b8d..2e775de 100644
--- a/src/s3ql/block_cache.py
+++ b/src/s3ql/block_cache.py
@@ -210,12 +210,12 @@ class BlockCache(object):
uploaded)
"""
- def __init__(self, bucket_pool, db, cachedir, max_size, max_entries=768):
+ def __init__(self, backend_pool, db, cachedir, max_size, max_entries=768):
log.debug('Initializing')
self.path = cachedir
self.db = db
- self.bucket_pool = bucket_pool
+ self.backend_pool = backend_pool
self.entries = OrderedDict()
self.max_entries = max_entries
self.size = 0
@@ -301,8 +301,8 @@ class BlockCache(object):
if log.isEnabledFor(logging.DEBUG):
time_ = time.time()
- with self.bucket_pool() as bucket:
- obj_size = bucket.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size()
+ with self.backend_pool() as backend:
+ obj_size = backend.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size()
if log.isEnabledFor(logging.DEBUG):
time_ = time.time() - time_
@@ -468,8 +468,8 @@ class BlockCache(object):
def _do_removal(self, obj_id):
'''Remove object'''
- with self.bucket_pool() as bucket:
- bucket.delete('s3ql_data_%d' % obj_id)
+ with self.backend_pool() as backend:
+ backend.delete('s3ql_data_%d' % obj_id)
@contextmanager
def get(self, inode, blockno):
@@ -534,8 +534,8 @@ class BlockCache(object):
return el
try:
with lock_released:
- with self.bucket_pool() as bucket:
- el = bucket.perform_read(do_read, 's3ql_data_%d' % obj_id)
+ with self.backend_pool() as backend:
+ el = backend.perform_read(do_read, 's3ql_data_%d' % obj_id)
# Note: We need to do this *before* releasing the global
# lock to notify other threads
diff --git a/src/s3ql/common.py b/src/s3ql/common.py
index 1aa8162..61f11d5 100644
--- a/src/s3ql/common.py
+++ b/src/s3ql/common.py
@@ -92,11 +92,11 @@ def add_stdout_logging(quiet=False):
root_logger.addHandler(handler)
return handler
-def get_seq_no(bucket):
+def get_seq_no(backend):
'''Get current metadata sequence number'''
from .backends.common import NoSuchObject
- seq_nos = list(bucket.list('s3ql_seq_no_'))
+ seq_nos = list(backend.list('s3ql_seq_no_'))
if not seq_nos:
# Maybe list result is outdated
seq_nos = [ 's3ql_seq_no_1' ]
@@ -109,18 +109,18 @@ def get_seq_no(bucket):
seq_no = max(seq_nos)
# Make sure that object really exists
- while ('s3ql_seq_no_%d' % seq_no) not in bucket:
+ while ('s3ql_seq_no_%d' % seq_no) not in backend:
seq_no -= 1
if seq_no == 0:
- raise QuietError('No S3QL file system found in bucket.')
- while ('s3ql_seq_no_%d' % seq_no) in bucket:
+ raise QuietError('No S3QL file system found at given storage URL.')
+ while ('s3ql_seq_no_%d' % seq_no) in backend:
seq_no += 1
seq_no -= 1
# Delete old seq nos
for i in [ x for x in seq_nos if x < seq_no - 10 ]:
try:
- del bucket['s3ql_seq_no_%d' % i]
+ del backend['s3ql_seq_no_%d' % i]
except NoSuchObject:
pass # Key list may not be up to date
@@ -338,7 +338,7 @@ def _escape(s):
return s
-def get_bucket_cachedir(storage_url, cachedir):
+def get_backend_cachedir(storage_url, cachedir):
if not os.path.exists(cachedir):
os.mkdir(cachedir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return os.path.join(cachedir, _escape(storage_url))
diff --git a/src/s3ql/fsck.py b/src/s3ql/fsck.py
index 0f890e2..b5ba9fe 100644
--- a/src/s3ql/fsck.py
+++ b/src/s3ql/fsck.py
@@ -8,8 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
from . import CURRENT_FS_REV
-from .backends.common import NoSuchObject, get_bucket, NoSuchBucket
-from .common import (ROOT_INODE, inode_for_path, sha256_fh, get_path, BUFSIZE, get_bucket_cachedir,
+from .backends.common import NoSuchObject, get_backend, DanglingStorageURL
+from .common import (ROOT_INODE, inode_for_path, sha256_fh, get_path, BUFSIZE, get_backend_cachedir,
setup_logging, QuietError, get_seq_no, stream_write_bz2, stream_read_bz2, CTRL_INODE)
from .database import NoSuchRowError, Connection
from .metadata import restore_metadata, cycle_metadata, dump_metadata, create_tables
@@ -35,10 +35,10 @@ S_IFMT = (stat.S_IFDIR | stat.S_IFREG | stat.S_IFSOCK | stat.S_IFBLK |
class Fsck(object):
- def __init__(self, cachedir_, bucket_, param, conn):
+ def __init__(self, cachedir_, backend_, param, conn):
self.cachedir = cachedir_
- self.bucket = bucket_
+ self.backend = backend_
self.expect_errors = False
self.found_errors = False
self.uncorrectable_errors = False
@@ -182,7 +182,7 @@ class Fsck(object):
shutil.copyfileobj(fh, obj_fh, BUFSIZE)
return obj_fh
- obj_size = self.bucket.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size()
+ obj_size = self.backend.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size()
self.conn.execute('UPDATE objects SET size=? WHERE id=?', (obj_size, obj_id))
@@ -799,7 +799,7 @@ class Fsck(object):
# Delete objects which (correctly had) refcount=0
for obj_id in self.conn.query('SELECT id FROM objects WHERE refcount = 0'):
- del self.bucket['s3ql_data_%d' % obj_id]
+ del self.backend['s3ql_data_%d' % obj_id]
self.conn.execute("DELETE FROM objects WHERE refcount = 0")
@@ -814,7 +814,7 @@ class Fsck(object):
# We use this table to keep track of the objects that we have seen
self.conn.execute("CREATE TEMP TABLE obj_ids (id INTEGER PRIMARY KEY)")
try:
- for (i, obj_name) in enumerate(self.bucket.list('s3ql_data_')):
+ for (i, obj_name) in enumerate(self.backend.list('s3ql_data_')):
if i != 0 and i % 5000 == 0:
log.info('..processed %d objects so far..', i)
@@ -832,10 +832,10 @@ class Fsck(object):
'EXCEPT SELECT id FROM objects'):
try:
if obj_id in self.unlinked_objects:
- del self.bucket['s3ql_data_%d' % obj_id]
+ del self.backend['s3ql_data_%d' % obj_id]
else:
# TODO: Save the data in lost+found instead
- del self.bucket['s3ql_data_%d' % obj_id]
+ del self.backend['s3ql_data_%d' % obj_id]
self.found_errors = True
self.log_error("Deleted spurious object %d", obj_id)
except NoSuchObject:
@@ -844,12 +844,12 @@ class Fsck(object):
self.conn.execute('CREATE TEMPORARY TABLE missing AS '
'SELECT id FROM objects EXCEPT SELECT id FROM obj_ids')
for (obj_id,) in self.conn.query('SELECT * FROM missing'):
- if ('s3ql_data_%d' % obj_id) in self.bucket:
+ if ('s3ql_data_%d' % obj_id) in self.backend:
# Object was just not in list yet
continue
self.found_errors = True
- self.log_error("object %s only exists in table but not in bucket, deleting", obj_id)
+ self.log_error("object %s only exists in table but not in backend, deleting", obj_id)
for (id_,) in self.conn.query('SELECT inode FROM inode_blocks JOIN blocks ON block_id = id '
'WHERE obj_id=? ', (obj_id,)):
@@ -893,7 +893,7 @@ class Fsck(object):
self.log_error("Object %d has no size information, retrieving from backend...", obj_id)
self.conn.execute('UPDATE objects SET size=? WHERE id=?',
- (self.bucket.get_size('s3ql_data_%d' % obj_id), obj_id))
+ (self.backend.get_size('s3ql_data_%d' % obj_id), obj_id))
def resolve_free(self, path, name):
@@ -1059,13 +1059,13 @@ def main(args=None):
raise QuietError('Can not check mounted file system.')
try:
- bucket = get_bucket(options)
- except NoSuchBucket as exc:
+ backend = get_backend(options)
+ except DanglingStorageURL as exc:
raise QuietError(str(exc))
- cachepath = get_bucket_cachedir(options.storage_url, options.cachedir)
- seq_no = get_seq_no(bucket)
- param_remote = bucket.lookup('s3ql_metadata')
+ cachepath = get_backend_cachedir(options.storage_url, options.cachedir)
+ seq_no = get_seq_no(backend)
+ param_remote = backend.lookup('s3ql_metadata')
db = None
if os.path.exists(cachepath + '.params'):
@@ -1073,7 +1073,7 @@ def main(args=None):
param = pickle.load(open(cachepath + '.params', 'rb'))
if param['seq_no'] < seq_no:
log.info('Ignoring locally cached metadata (outdated).')
- param = bucket.lookup('s3ql_metadata')
+ param = backend.lookup('s3ql_metadata')
else:
log.info('Using cached metadata.')
db = Connection(cachepath + '.db')
@@ -1145,7 +1145,7 @@ def main(args=None):
stream_read_bz2(fh, tmpfh)
return tmpfh
log.info('Downloading and decompressing metadata...')
- tmpfh = bucket.perform_read(do_read, "s3ql_metadata")
+ tmpfh = backend.perform_read(do_read, "s3ql_metadata")
os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC,
stat.S_IRUSR | stat.S_IWUSR))
db = Connection(cachepath + '.db.tmp', fast_mode=True)
@@ -1159,10 +1159,10 @@ def main(args=None):
# Increase metadata sequence no
param['seq_no'] += 1
param['needs_fsck'] = True
- bucket['s3ql_seq_no_%d' % param['seq_no']] = 'Empty'
+ backend['s3ql_seq_no_%d' % param['seq_no']] = 'Empty'
pickle.dump(param, open(cachepath + '.params', 'wb'), 2)
- fsck = Fsck(cachepath + '-cache', bucket, param, db)
+ fsck = Fsck(cachepath + '-cache', backend, param, db)
fsck.check()
param['max_inode'] = db.get_val('SELECT MAX(id) FROM inodes')
@@ -1181,7 +1181,7 @@ def main(args=None):
log.warn('File system was marked as clean, yet fsck found problems.')
log.warn('Please report this to the S3QL mailing list, http://groups.google.com/group/s3ql')
- cycle_metadata(bucket)
+ cycle_metadata(backend)
param['needs_fsck'] = False
param['last_fsck'] = time.time()
param['last-modified'] = time.time()
@@ -1195,7 +1195,7 @@ def main(args=None):
return obj_fh
log.info("Compressing and uploading metadata...")
- obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=param,
+ obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param,
is_compressed=True)
log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2)
pickle.dump(param, open(cachepath + '.params', 'wb'), 2)
diff --git a/src/s3ql/inode_cache.py b/src/s3ql/inode_cache.py
index e5491e3..a56ec65 100644
--- a/src/s3ql/inode_cache.py
+++ b/src/s3ql/inode_cache.py
@@ -8,7 +8,6 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
-import time
import logging
from .database import NoSuchRowError
diff --git a/src/s3ql/metadata.py b/src/s3ql/metadata.py
index 2007318..cc0cd49 100644
--- a/src/s3ql/metadata.py
+++ b/src/s3ql/metadata.py
@@ -74,17 +74,17 @@ def restore_metadata(fh, db):
load_table(table, columns, db=db, fh=fh)
db.execute('ANALYZE')
-def cycle_metadata(bucket):
+def cycle_metadata(backend):
from .backends.common import NoSuchObject
log.info('Backing up old metadata...')
for i in reversed(range(10)):
try:
- bucket.copy("s3ql_metadata_bak_%d" % i, "s3ql_metadata_bak_%d" % (i + 1))
+ backend.copy("s3ql_metadata_bak_%d" % i, "s3ql_metadata_bak_%d" % (i + 1))
except NoSuchObject:
pass
- bucket.copy("s3ql_metadata", "s3ql_metadata_bak_0")
+ backend.copy("s3ql_metadata", "s3ql_metadata_bak_0")
def dump_metadata(db, fh):
'''Dump metadata into fh
diff --git a/src/s3ql/mkfs.py b/src/s3ql/mkfs.py
index 15f329f..9f03619 100644
--- a/src/s3ql/mkfs.py
+++ b/src/s3ql/mkfs.py
@@ -8,8 +8,8 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
from . import CURRENT_FS_REV
-from .backends.common import get_bucket, BetterBucket, NoSuchBucket
-from .common import get_bucket_cachedir, setup_logging, QuietError, CTRL_INODE, stream_write_bz2
+from .backends.common import get_backend, BetterBackend, DanglingStorageURL
+from .common import get_backend_cachedir, setup_logging, QuietError, CTRL_INODE, stream_write_bz2
from .database import Connection
from .metadata import dump_metadata, create_tables
from .parse_args import ArgumentParser
@@ -66,7 +66,7 @@ def init_tables(conn):
# Insert control inode, the actual values don't matter that much
conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount) "
"VALUES (?,?,?,?,?,?,?,?)",
- (CTRL_INODE, stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR,
+ (CTRL_INODE, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR,
0, 0, timestamp, timestamp, timestamp, 42))
# Insert lost+found directory
@@ -92,19 +92,19 @@ def main(args=None):
'performance.')
try:
- plain_bucket = get_bucket(options, plain=True)
- except NoSuchBucket as exc:
+ plain_backend = get_backend(options, plain=True)
+ except DanglingStorageURL as exc:
raise QuietError(str(exc))
log.info("Before using S3QL, make sure to read the user's guide, especially\n"
"the 'Important Rules to Avoid Loosing Data' section.")
- if 's3ql_metadata' in plain_bucket:
+ if 's3ql_metadata' in plain_backend:
if not options.force:
raise QuietError("Found existing file system! Use --force to overwrite")
log.info('Purging existing file system data..')
- plain_bucket.clear()
+ plain_backend.clear()
log.info('Please note that the new file system may appear inconsistent\n'
'for a while until the removals have propagated through the backend.')
@@ -122,17 +122,17 @@ def main(args=None):
data_pw = fh.read(32)
fh.close()
- bucket = BetterBucket(wrap_pw, 'bzip2', plain_bucket)
- bucket['s3ql_passphrase'] = data_pw
+ backend = BetterBackend(wrap_pw, 'bzip2', plain_backend)
+ backend['s3ql_passphrase'] = data_pw
else:
data_pw = None
- bucket = BetterBucket(data_pw, 'bzip2', plain_bucket)
+ backend = BetterBackend(data_pw, 'bzip2', plain_backend)
# Setup database
- cachepath = get_bucket_cachedir(options.storage_url, options.cachedir)
+ cachepath = get_backend_cachedir(options.storage_url, options.cachedir)
- # There can't be a corresponding bucket, so we can safely delete
+ # There can't be a corresponding backend, so we can safely delete
# these files.
if os.path.exists(cachepath + '.db'):
os.unlink(cachepath + '.db')
@@ -156,8 +156,8 @@ def main(args=None):
param['last-modified'] = time.time()
# This indicates that the convert_legacy_metadata() stuff
- # in BetterBucket is not required for this file system.
- param['bucket_revision'] = 1
+ # in BetterBackend is not required for this file system.
+ param['backend_revision'] = 1
log.info('Dumping metadata...')
fh = tempfile.TemporaryFile()
@@ -168,8 +168,8 @@ def main(args=None):
return obj_fh
log.info("Compressing and uploading metadata...")
- bucket.store('s3ql_seq_no_%d' % param['seq_no'], 'Empty')
- obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=param,
+ backend.store('s3ql_seq_no_%d' % param['seq_no'], 'Empty')
+ obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param,
is_compressed=True)
log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2)
pickle.dump(param, open(cachepath + '.params', 'wb'), 2)
diff --git a/src/s3ql/mount.py b/src/s3ql/mount.py
index 4744487..a6fdcaa 100644
--- a/src/s3ql/mount.py
+++ b/src/s3ql/mount.py
@@ -8,9 +8,9 @@ This program can be distributed under the terms of the GNU GPLv3.
from __future__ import division, print_function, absolute_import
from . import fs, CURRENT_FS_REV
-from .backends.common import get_bucket_factory, BucketPool, NoSuchBucket
+from .backends.common import get_backend_factory, BackendPool, DanglingStorageURL
from .block_cache import BlockCache
-from .common import (setup_logging, get_bucket_cachedir, get_seq_no, QuietError, stream_write_bz2,
+from .common import (setup_logging, get_backend_cachedir, get_seq_no, QuietError, stream_write_bz2,
stream_read_bz2)
from .daemonize import daemonize
from .database import Connection
@@ -29,7 +29,6 @@ import tempfile
import thread
import threading
import time
-
log = logging.getLogger("mount")
@@ -81,17 +80,17 @@ def main(args=None):
import pstats
prof = cProfile.Profile()
- bucket_factory = get_bucket_factory(options)
- bucket_pool = BucketPool(bucket_factory)
+ backend_factory = get_backend_factory(options)
+ backend_pool = BackendPool(backend_factory)
# Get paths
- cachepath = get_bucket_cachedir(options.storage_url, options.cachedir)
+ cachepath = get_backend_cachedir(options.storage_url, options.cachedir)
# Retrieve metadata
try:
- with bucket_pool() as bucket:
- (param, db) = get_metadata(bucket, cachepath)
- except NoSuchBucket as exc:
+ with backend_pool() as backend:
+ (param, db) = get_metadata(backend, cachepath)
+ except DanglingStorageURL as exc:
raise QuietError(str(exc))
if param['max_obj_size'] < options.min_obj_size:
@@ -105,15 +104,15 @@ def main(args=None):
else:
db.execute('DROP INDEX IF EXISTS ix_contents_inode')
- metadata_upload_thread = MetadataUploadThread(bucket_pool, param, db,
+ metadata_upload_thread = MetadataUploadThread(backend_pool, param, db,
options.metadata_upload_interval)
- block_cache = BlockCache(bucket_pool, db, cachepath + '-cache',
+ block_cache = BlockCache(backend_pool, db, cachepath + '-cache',
options.cachesize * 1024, options.max_cache_entries)
commit_thread = CommitThread(block_cache)
operations = fs.Operations(block_cache, db, max_obj_size=param['max_obj_size'],
inode_cache=InodeCache(db, param['inode_gen']),
upload_event=metadata_upload_thread.event)
-
+
log.info('Mounting filesystem...')
llfuse.init(operations, options.mountpoint, get_fuse_opts(options))
@@ -206,15 +205,15 @@ def main(args=None):
# Do not update .params yet, dump_metadata() may fail if the database is
# corrupted, in which case we want to force an fsck.
param['max_inode'] = db.get_val('SELECT MAX(id) FROM inodes')
- with bucket_pool() as bucket:
- seq_no = get_seq_no(bucket)
+ with backend_pool() as backend:
+ seq_no = get_seq_no(backend)
if metadata_upload_thread.db_mtime == os.stat(cachepath + '.db').st_mtime:
log.info('File system unchanged, not uploading metadata.')
- del bucket['s3ql_seq_no_%d' % param['seq_no']]
+ del backend['s3ql_seq_no_%d' % param['seq_no']]
param['seq_no'] -= 1
pickle.dump(param, open(cachepath + '.params', 'wb'), 2)
elif seq_no == param['seq_no']:
- cycle_metadata(bucket)
+ cycle_metadata(backend)
param['last-modified'] = time.time()
log.info('Dumping metadata...')
@@ -226,7 +225,7 @@ def main(args=None):
return obj_fh
log.info("Compressing and uploading metadata...")
- obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=param,
+ obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=param,
is_compressed=True)
log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2)
pickle.dump(param, open(cachepath + '.params', 'wb'), 2)
@@ -290,10 +289,10 @@ def determine_threads(options):
log.info("Using %d upload threads.", threads)
return threads
-def get_metadata(bucket, cachepath):
+def get_metadata(backend, cachepath):
'''Retrieve metadata'''
- seq_no = get_seq_no(bucket)
+ seq_no = get_seq_no(backend)
# Check for cached metadata
db = None
@@ -301,12 +300,12 @@ def get_metadata(bucket, cachepath):
param = pickle.load(open(cachepath + '.params', 'rb'))
if param['seq_no'] < seq_no:
log.info('Ignoring locally cached metadata (outdated).')
- param = bucket.lookup('s3ql_metadata')
+ param = backend.lookup('s3ql_metadata')
else:
log.info('Using cached metadata.')
db = Connection(cachepath + '.db')
else:
- param = bucket.lookup('s3ql_metadata')
+ param = backend.lookup('s3ql_metadata')
# Check for unclean shutdown
if param['seq_no'] < seq_no:
@@ -338,7 +337,7 @@ def get_metadata(bucket, cachepath):
stream_read_bz2(fh, tmpfh)
return tmpfh
log.info('Downloading and decompressing metadata...')
- tmpfh = bucket.perform_read(do_read, "s3ql_metadata")
+ tmpfh = backend.perform_read(do_read, "s3ql_metadata")
os.close(os.open(cachepath + '.db.tmp', os.O_RDWR | os.O_CREAT | os.O_TRUNC,
stat.S_IRUSR | stat.S_IWUSR))
db = Connection(cachepath + '.db.tmp', fast_mode=True)
@@ -352,7 +351,7 @@ def get_metadata(bucket, cachepath):
# Increase metadata sequence no
param['seq_no'] += 1
param['needs_fsck'] = True
- bucket['s3ql_seq_no_%d' % param['seq_no']] = 'Empty'
+ backend['s3ql_seq_no_%d' % param['seq_no']] = 'Empty'
pickle.dump(param, open(cachepath + '.params', 'wb'), 2)
param['needs_fsck'] = False
@@ -497,9 +496,9 @@ class MetadataUploadThread(Thread):
passed in the constructor, the global lock is acquired first.
'''
- def __init__(self, bucket_pool, param, db, interval):
+ def __init__(self, backend_pool, param, db, interval):
super(MetadataUploadThread, self).__init__()
- self.bucket_pool = bucket_pool
+ self.backend_pool = backend_pool
self.param = param
self.db = db
self.interval = interval
@@ -532,15 +531,15 @@ class MetadataUploadThread(Thread):
dump_metadata(self.db, fh)
- with self.bucket_pool() as bucket:
- seq_no = get_seq_no(bucket)
+ with self.backend_pool() as backend:
+ seq_no = get_seq_no(backend)
if seq_no != self.param['seq_no']:
log.error('Remote metadata is newer than local (%d vs %d), '
'refusing to overwrite!', seq_no, self.param['seq_no'])
fh.close()
continue
- cycle_metadata(bucket)
+ cycle_metadata(backend)
fh.seek(0)
self.param['last-modified'] = time.time()
@@ -551,7 +550,7 @@ class MetadataUploadThread(Thread):
stream_write_bz2(fh, obj_fh)
return obj_fh
log.info("Compressing and uploading metadata...")
- obj_fh = bucket.perform_write(do_write, "s3ql_metadata", metadata=self.param,
+ obj_fh = backend.perform_write(do_write, "s3ql_metadata", metadata=self.param,
is_compressed=True)
log.info('Wrote %.2f MB of compressed metadata.', obj_fh.get_obj_size() / 1024 ** 2)
self.param['seq_no'] += 1
diff --git a/src/s3ql/umount.py b/src/s3ql/umount.py
index cf67d0a..b2fdab8 100644
--- a/src/s3ql/umount.py
+++ b/src/s3ql/umount.py
@@ -7,7 +7,7 @@ This program can be distributed under the terms of the GNU GPLv3.
'''
from __future__ import division, print_function, absolute_import
-from .common import CTRL_NAME, QuietError, setup_logging
+from .common import CTRL_NAME, setup_logging
from .parse_args import ArgumentParser
import llfuse
import logging