summaryrefslogtreecommitdiff
path: root/src/s3ql/upload_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/s3ql/upload_manager.py')
-rw-r--r--src/s3ql/upload_manager.py387
1 files changed, 387 insertions, 0 deletions
diff --git a/src/s3ql/upload_manager.py b/src/s3ql/upload_manager.py
new file mode 100644
index 0000000..1cf374d
--- /dev/null
+++ b/src/s3ql/upload_manager.py
@@ -0,0 +1,387 @@
+'''
+upload_manager.py - this file is part of S3QL (http://s3ql.googlecode.com)
+
+Copyright (C) 2008-2009 Nikolaus Rath <Nikolaus@rath.org>
+
+This program can be distributed under the terms of the GNU LGPL.
+'''
+
+from __future__ import division, print_function, absolute_import
+
+from .backends.common import NoSuchObject
+from .common import sha256_fh, TimeoutError
+from .thread_group import ThreadGroup, Thread
+from .database import NoSuchRowError
+import logging
+import threading
+import os
+import errno
+from llfuse import lock
+import time
+from s3ql.common import EmbeddedException
+
+__all__ = [ "UploadManager", 'retry_exc', 'RemoveThread' ]
+
+# standard logger for this module
+log = logging.getLogger("UploadManager")
+
+
+MAX_UPLOAD_THREADS = 10
+MAX_COMPRESS_THREADS = 1
+MIN_TRANSIT_SIZE = 1024 * 1024
+class UploadManager(object):
+ '''
+ Schedules and executes object uploads to make optimum usage
+ network bandwidth and CPU time.
+
+ Methods which release the
+ global lock have are marked as such in their docstring.
+
+ Attributes:
+ -----------
+
+ :encountered_errors: This attribute is set if some non-fatal errors
+ were encountered during asynchronous operations (for
+ example, an object that was supposed to be deleted did
+ not exist).
+ '''
+
+ def __init__(self, bucket, db, removal_queue):
+ self.upload_threads = ThreadGroup(MAX_UPLOAD_THREADS)
+ self.compress_threads = ThreadGroup(MAX_COMPRESS_THREADS)
+ self.removal_queue = removal_queue
+ self.bucket = bucket
+ self.db = db
+ self.transit_size = 0
+ self.transit_size_lock = threading.Lock()
+ self.in_transit = set()
+ self.encountered_errors = False
+
+ def add(self, el):
+ '''Upload cache entry `el` asynchronously
+
+ Return (uncompressed) size of cache entry.
+
+ This method releases the global lock.
+ '''
+
+ log.debug('UploadManager.add(%s): start', el)
+
+ if (el.inode, el.blockno) in self.in_transit:
+ raise ValueError('Block already in transit')
+
+ old_obj_id = el.obj_id
+ size = os.fstat(el.fileno()).st_size
+ el.seek(0)
+ if log.isEnabledFor(logging.DEBUG):
+ time_ = time.time()
+ hash_ = sha256_fh(el)
+ time_ = time.time() - time_
+ if time_ != 0:
+ rate = size / (1024**2 * time_)
+ else:
+ rate = 0
+ log.debug('UploadManager(inode=%d, blockno=%d): '
+ 'hashed %d bytes in %.3f seconds, %.2f MB/s',
+ el.inode, el.blockno, size, time_, rate)
+ else:
+ hash_ = sha256_fh(el)
+
+ try:
+ el.obj_id = self.db.get_val('SELECT id FROM objects WHERE hash=?', (hash_,))
+
+ except NoSuchRowError:
+ need_upload = True
+ el.obj_id = self.db.rowid('INSERT INTO objects (refcount, hash, size) VALUES(?, ?, ?)',
+ (1, hash_, size))
+ log.debug('add(inode=%d, blockno=%d): created new object %d',
+ el.inode, el.blockno, el.obj_id)
+
+ else:
+ need_upload = False
+ if old_obj_id == el.obj_id:
+ log.debug('add(inode=%d, blockno=%d): unchanged, obj_id=%d',
+ el.inode, el.blockno, el.obj_id)
+ el.dirty = False
+ el.modified_after_upload = False
+ os.rename(el.name + '.d', el.name)
+ return size
+
+ log.debug('add(inode=%d, blockno=%d): (re)linking to %d',
+ el.inode, el.blockno, el.obj_id)
+ self.db.execute('UPDATE objects SET refcount=refcount+1 WHERE id=?',
+ (el.obj_id,))
+
+ to_delete = False
+ if old_obj_id is None:
+ log.debug('add(inode=%d, blockno=%d): no previous object',
+ el.inode, el.blockno)
+ self.db.execute('INSERT INTO blocks (obj_id, inode, blockno) VALUES(?,?,?)',
+ (el.obj_id, el.inode, el.blockno))
+ else:
+ self.db.execute('UPDATE blocks SET obj_id=? WHERE inode=? AND blockno=?',
+ (el.obj_id, el.inode, el.blockno))
+ refcount = self.db.get_val('SELECT refcount FROM objects WHERE id=?',
+ (old_obj_id,))
+ if refcount > 1:
+ log.debug('add(inode=%d, blockno=%d): '
+ 'decreased refcount for prev. obj: %d',
+ el.inode, el.blockno, old_obj_id)
+ self.db.execute('UPDATE objects SET refcount=refcount-1 WHERE id=?',
+ (old_obj_id,))
+ else:
+ log.debug('add(inode=%d, blockno=%d): '
+ 'prev. obj %d marked for removal',
+ el.inode, el.blockno, old_obj_id)
+ self.db.execute('DELETE FROM objects WHERE id=?', (old_obj_id,))
+ to_delete = True
+
+ if need_upload:
+ log.debug('add(inode=%d, blockno=%d): starting compression thread',
+ el.inode, el.blockno)
+ el.modified_after_upload = False
+ self.in_transit.add((el.inode, el.blockno))
+
+ # Create a new fd so that we don't get confused if another
+ # thread repositions the cursor (and do so before unlocking)
+ fh = open(el.name + '.d', 'rb')
+ self.compress_threads.add_thread(CompressThread(el, fh, self, size)) # Releases global lock
+
+ else:
+ el.dirty = False
+ el.modified_after_upload = False
+ os.rename(el.name + '.d', el.name)
+
+ if to_delete:
+ log.debug('add(inode=%d, blockno=%d): removing object %d',
+ el.inode, el.blockno, old_obj_id)
+
+ try:
+ # Note: Old object can not be in transit
+ # Releases global lock
+ self.removal_queue.add_thread(RemoveThread(old_obj_id, self.bucket))
+ except EmbeddedException as exc:
+ exc = exc.exc
+ if isinstance(exc, NoSuchObject):
+ log.warn('Backend seems to have lost object %s', exc.key)
+ self.encountered_errors = True
+ else:
+ raise
+
+ log.debug('add(inode=%d, blockno=%d): end', el.inode, el.blockno)
+ return size
+
+ def join_all(self):
+ '''Wait until all blocks in transit have been uploaded
+
+ This method releases the global lock.
+ '''
+
+ self.compress_threads.join_all()
+ self.upload_threads.join_all()
+
+ def join_one(self):
+ '''Wait until one block has been uploaded
+
+ If there are no blocks in transit, return immediately.
+ This method releases the global lock.
+ '''
+
+ if len(self.upload_threads) == 0:
+ self.compress_threads.join_one()
+
+ self.upload_threads.join_one()
+
+ def upload_in_progress(self):
+ '''Return True if there are any blocks in transit'''
+
+ return len(self.compress_threads) + len(self.upload_threads) > 0
+
+
+class CompressThread(Thread):
+ '''
+ Compress a block and then pass it on for uploading.
+
+ This class uses the llfuse global lock. When calling objects
+ passed in the constructor, the global lock is acquired first.
+
+ The `size` attribute will be updated to the compressed size.
+ '''
+
+ def __init__(self, el, fh, um, size):
+ super(CompressThread, self).__init__()
+ self.el = el
+ self.fh = fh
+ self.um = um
+ self.size = size
+
+ def run_protected(self):
+ '''Compress block
+
+ After compression:
+ - the file handle is closed
+ - the compressed block size is updated in the database
+ - an UploadThread instance started for uploading the data.
+
+ In case of an exception, the block is removed from the in_transit
+ set.
+ '''
+
+ try:
+ if log.isEnabledFor(logging.DEBUG):
+ oldsize = self.size
+ time_ = time.time()
+ (self.size, fn) = self.um.bucket.prep_store_fh('s3ql_data_%d' % self.el.obj_id,
+ self.fh)
+ time_ = time.time() - time_
+ if time_ != 0:
+ rate = oldsize / (1024**2 * time_)
+ else:
+ rate = 0
+ log.debug('CompressionThread(inode=%d, blockno=%d): '
+ 'compressed %d bytes in %.3f seconds, %.2f MB/s',
+ self.el.inode, self.el.blockno, oldsize,
+ time_, rate)
+ else:
+ (self.size, fn) = self.um.bucket.prep_store_fh('s3ql_data_%d' % self.el.obj_id,
+ self.fh)
+
+ self.fh.close()
+
+ with lock:
+ # If we already have the minimum transit size, do not start more
+ # than two threads
+ log.debug('CompressThread(%s): starting upload thread', self.el)
+
+ if self.um.transit_size > MIN_TRANSIT_SIZE:
+ max_threads = 2
+ else:
+ max_threads = None
+
+ self.um.transit_size += self.size
+ self.um.db.execute('UPDATE objects SET compr_size=? WHERE id=?',
+ (self.size, self.el.obj_id))
+ self.um.upload_threads.add_thread(UploadThread(fn, self.el, self.size, self.um),
+ max_threads)
+
+ except EmbeddedException:
+ raise
+ except:
+ with lock:
+ self.um.in_transit.remove((self.el.inode, self.el.blockno))
+ self.um.transit_size -= self.size
+ raise
+
+
+class UploadThread(Thread):
+ '''
+ Uploads a cache entry with the function passed in the constructor.
+
+ This class uses the llfuse global lock. When calling objects
+ passed in the constructor, the global lock is acquired first.
+ '''
+
+ def __init__(self, fn, el, size, um):
+ super(UploadThread, self).__init__()
+ self.fn = fn
+ self.el = el
+ self.size = size
+ self.um = um
+
+ def run_protected(self):
+ '''Upload block by calling self.fn()
+
+ The upload duration is timed. After the upload (or if an exception
+ occurs), the block is removed from in_transit.
+ '''
+ try:
+ if log.isEnabledFor(logging.DEBUG):
+ time_ = time.time()
+ self.fn()
+ time_ = time.time() - time_
+ if time_ != 0:
+ rate = self.size / (1024**2 * time_)
+ else:
+ rate = 0
+ log.debug('CompressionThread(inode=%d, blockno=%d): '
+ 'compressed %d bytes in %.3f seconds, %.2f MB/s',
+ self.el.inode, self.el.blockno, self.size,
+ time_, rate)
+ else:
+ self.fn()
+
+ except:
+ with lock:
+ self.um.in_transit.remove((self.el.inode, self.el.blockno))
+ self.um.transit_size -= self.size
+ raise
+
+ with lock:
+ self.um.in_transit.remove((self.el.inode, self.el.blockno))
+ self.um.transit_size -= self.size
+
+ if not self.el.modified_after_upload:
+ self.el.dirty = False
+ try:
+ os.rename(self.el.name + '.d', self.el.name)
+ except OSError as exc:
+ # Entry may have been removed while being uploaded
+ if exc.errno != errno.ENOENT:
+ raise
+
+
+def retry_exc(timeout, exc_types, fn, *a, **kw):
+ """Wait for fn(*a, **kw) to succeed
+
+ If `fn(*a, **kw)` raises an exception in `exc_types`, the function is called again.
+ If the timeout is reached, `TimeoutError` is raised.
+ """
+
+ step = 0.2
+ waited = 0
+ while waited < timeout:
+ try:
+ return fn(*a, **kw)
+ except BaseException as exc:
+ for exc_type in exc_types:
+ if isinstance(exc, exc_type):
+ log.warn('Encountered %s error when calling %s, retrying...',
+ exc.__class__.__name__, fn.__name__)
+ break
+ else:
+ raise exc
+
+ time.sleep(step)
+ waited += step
+ if step < timeout / 30:
+ step *= 2
+
+ raise TimeoutError()
+
+class RemoveThread(Thread):
+ '''
+ Remove an object from backend. If a transit key is specified, the
+ thread first waits until the object is no longer in transit.
+
+ TThis class uses the llfuse global lock. When calling objects
+ passed in the constructor, the global lock is acquired first.
+ '''
+
+ def __init__(self, id_, bucket, transit_key=None, upload_manager=None):
+ super(RemoveThread, self).__init__()
+ self.id = id_
+ self.bucket = bucket
+ self.transit_key = transit_key
+ self.um = upload_manager
+
+ def run_protected(self):
+ if self.transit_key:
+ while self.transit_key in self.um.in_transit:
+ with lock:
+ self.um.join_one()
+
+ if self.bucket.read_after_create_consistent():
+ self.bucket.delete('s3ql_data_%d' % self.id)
+ else:
+ retry_exc(300, [ NoSuchObject ], self.bucket.delete,
+ 's3ql_data_%d' % self.id) \ No newline at end of file