diff options
Diffstat (limited to 'src/s3ql/block_cache.py')
-rw-r--r-- | src/s3ql/block_cache.py | 97 |
1 files changed, 36 insertions, 61 deletions
diff --git a/src/s3ql/block_cache.py b/src/s3ql/block_cache.py index 0b694d5..6673f96 100644 --- a/src/s3ql/block_cache.py +++ b/src/s3ql/block_cache.py @@ -7,8 +7,7 @@ This program can be distributed under the terms of the GNU GPLv3. ''' from __future__ import division, print_function, absolute_import -from .backends.common import CompressFilter -from .common import sha256_fh +from .common import sha256_fh, BUFSIZE from .database import NoSuchRowError from .ordered_dict import OrderedDict from Queue import Queue @@ -23,9 +22,6 @@ import time # standard logger for this module log = logging.getLogger("BlockCache") -# Buffer size when writing objects -BUFSIZE = 256 * 1024 - # Special queue entry that signals threads to terminate QuitSentinel = object() @@ -226,13 +222,12 @@ class BlockCache(object): self.in_transit = set() self.removed_in_transit = set() self.to_upload = Distributor() - self.to_remove = Queue() + self.to_remove = Queue(250) self.upload_threads = [] self.removal_threads = [] self.transfer_completed = SimpleEvent() - if not os.path.exists(self.path): - os.mkdir(self.path) + os.mkdir(self.path) def __len__(self): '''Get number of objects in cache''' @@ -292,32 +287,30 @@ class BlockCache(object): def _do_upload(self, el, obj_id): '''Upload object''' + def do_write(fh): + el.seek(0) + while True: + buf = el.read(BUFSIZE) + if not buf: + break + fh.write(buf) + return fh + try: if log.isEnabledFor(logging.DEBUG): time_ = time.time() - with self.bucket_pool() as bucket: - with bucket.open_write('s3ql_data_%d' % obj_id) as fh: - el.seek(0) - while True: - buf = el.read(BUFSIZE) - if not buf: - break - fh.write(buf) - + with self.bucket_pool() as bucket: + obj_size = bucket.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size() + if log.isEnabledFor(logging.DEBUG): time_ = time.time() - time_ rate = el.size / (1024**2 * time_) if time_ != 0 else 0 log.debug('_do_upload(%s): uploaded %d bytes in %.3f seconds, %.2f MB/s', obj_id, el.size, time_, rate) - - if isinstance(fh, CompressFilter): - obj_size = fh.compr_size - else: - obj_size = el.size - + with lock: - self.db.execute('UPDATE objects SET compr_size=? WHERE id=?', + self.db.execute('UPDATE objects SET size=? WHERE id=?', (obj_size, obj_id)) el.dirty = False @@ -364,14 +357,9 @@ class BlockCache(object): hash_ = sha256_fh(el) try: - if el.blockno == 0: - old_block_id = self.db.get_val('SELECT block_id FROM inodes ' - 'WHERE id=? AND block_id IS NOT NULL', - (el.inode,)) - else: - old_block_id = self.db.get_val('SELECT block_id FROM inode_blocks ' - 'WHERE inode=? AND blockno=?', - (el.inode, el.blockno)) + old_block_id = self.db.get_val('SELECT block_id FROM inode_blocks ' + 'WHERE inode=? AND blockno=?', + (el.inode, el.blockno)) except NoSuchRowError: old_block_id = None @@ -411,12 +399,8 @@ class BlockCache(object): self.in_transit.remove((el.inode, el.blockno)) raise - - if el.blockno == 0: - self.db.execute('UPDATE inodes SET block_id=? WHERE id=?', (block_id, el.inode)) - else: - self.db.execute('INSERT OR REPLACE INTO inode_blocks (block_id, inode, blockno) ' - 'VALUES(?,?,?)', (block_id, el.inode, el.blockno)) + self.db.execute('INSERT OR REPLACE INTO inode_blocks (block_id, inode, blockno) ' + 'VALUES(?,?,?)', (block_id, el.inode, el.blockno)) # Check if we have to remove an old block if not old_block_id: @@ -513,13 +497,9 @@ class BlockCache(object): except KeyError: filename = os.path.join(self.path, '%d-%d' % (inode, blockno)) try: - if blockno == 0: - block_id = self.db.get_val('SELECT block_id FROM inodes ' - 'WHERE id=? AND block_id IS NOT NULL', (inode,)) - else: - block_id = self.db.get_val('SELECT block_id FROM inode_blocks ' - 'WHERE inode=? AND blockno=?', (inode, blockno)) - + block_id = self.db.get_val('SELECT block_id FROM inode_blocks ' + 'WHERE inode=? AND blockno=?', (inode, blockno)) + # No corresponding object except NoSuchRowError: #log.debug('get(inode=%d, block=%d): creating new block', inode, blockno) @@ -541,13 +521,16 @@ class BlockCache(object): self.in_transit.add(obj_id) log.debug('get(inode=%d, block=%d): downloading object %d..', inode, blockno, obj_id) + def do_read(fh): + el = CacheEntry(inode, blockno, filename) + shutil.copyfileobj(fh, el, BUFSIZE) + return el try: - el = CacheEntry(inode, blockno, filename) + with lock_released: with self.bucket_pool() as bucket: - with bucket.open_read('s3ql_data_%d' % obj_id) as fh: - shutil.copyfileobj(fh, el) - + el = bucket.perform_read(do_read, 's3ql_data_%d' % obj_id) + # Note: We need to do this *before* releasing the global # lock to notify other threads self.entries[(inode, blockno)] = el @@ -686,23 +669,15 @@ class BlockCache(object): el.unlink() try: - if blockno == 0: - block_id = self.db.get_val('SELECT block_id FROM inodes ' - 'WHERE id=? AND block_id IS NOT NULL', - (inode,)) - else: - block_id = self.db.get_val('SELECT block_id FROM inode_blocks ' - 'WHERE inode=? AND blockno=?', (inode, blockno)) + block_id = self.db.get_val('SELECT block_id FROM inode_blocks ' + 'WHERE inode=? AND blockno=?', (inode, blockno)) except NoSuchRowError: log.debug('remove(inode=%d, blockno=%d): block not in db', inode, blockno) continue # Detach inode from block - if blockno == 0: - self.db.execute('UPDATE inodes SET block_id=NULL WHERE id=?', (inode,)) - else: - self.db.execute('DELETE FROM inode_blocks WHERE inode=? AND blockno=?', - (inode, blockno)) + self.db.execute('DELETE FROM inode_blocks WHERE inode=? AND blockno=?', + (inode, blockno)) # Decrease block refcount refcount = self.db.get_val('SELECT refcount FROM blocks WHERE id=?', (block_id,)) |