summaryrefslogtreecommitdiff
path: root/src/s3ql/block_cache.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/s3ql/block_cache.py')
-rw-r--r--src/s3ql/block_cache.py97
1 files changed, 36 insertions, 61 deletions
diff --git a/src/s3ql/block_cache.py b/src/s3ql/block_cache.py
index 0b694d5..6673f96 100644
--- a/src/s3ql/block_cache.py
+++ b/src/s3ql/block_cache.py
@@ -7,8 +7,7 @@ This program can be distributed under the terms of the GNU GPLv3.
'''
from __future__ import division, print_function, absolute_import
-from .backends.common import CompressFilter
-from .common import sha256_fh
+from .common import sha256_fh, BUFSIZE
from .database import NoSuchRowError
from .ordered_dict import OrderedDict
from Queue import Queue
@@ -23,9 +22,6 @@ import time
# standard logger for this module
log = logging.getLogger("BlockCache")
-# Buffer size when writing objects
-BUFSIZE = 256 * 1024
-
# Special queue entry that signals threads to terminate
QuitSentinel = object()
@@ -226,13 +222,12 @@ class BlockCache(object):
self.in_transit = set()
self.removed_in_transit = set()
self.to_upload = Distributor()
- self.to_remove = Queue()
+ self.to_remove = Queue(250)
self.upload_threads = []
self.removal_threads = []
self.transfer_completed = SimpleEvent()
- if not os.path.exists(self.path):
- os.mkdir(self.path)
+ os.mkdir(self.path)
def __len__(self):
'''Get number of objects in cache'''
@@ -292,32 +287,30 @@ class BlockCache(object):
def _do_upload(self, el, obj_id):
'''Upload object'''
+ def do_write(fh):
+ el.seek(0)
+ while True:
+ buf = el.read(BUFSIZE)
+ if not buf:
+ break
+ fh.write(buf)
+ return fh
+
try:
if log.isEnabledFor(logging.DEBUG):
time_ = time.time()
- with self.bucket_pool() as bucket:
- with bucket.open_write('s3ql_data_%d' % obj_id) as fh:
- el.seek(0)
- while True:
- buf = el.read(BUFSIZE)
- if not buf:
- break
- fh.write(buf)
-
+ with self.bucket_pool() as bucket:
+ obj_size = bucket.perform_write(do_write, 's3ql_data_%d' % obj_id).get_obj_size()
+
if log.isEnabledFor(logging.DEBUG):
time_ = time.time() - time_
rate = el.size / (1024**2 * time_) if time_ != 0 else 0
log.debug('_do_upload(%s): uploaded %d bytes in %.3f seconds, %.2f MB/s',
obj_id, el.size, time_, rate)
-
- if isinstance(fh, CompressFilter):
- obj_size = fh.compr_size
- else:
- obj_size = el.size
-
+
with lock:
- self.db.execute('UPDATE objects SET compr_size=? WHERE id=?',
+ self.db.execute('UPDATE objects SET size=? WHERE id=?',
(obj_size, obj_id))
el.dirty = False
@@ -364,14 +357,9 @@ class BlockCache(object):
hash_ = sha256_fh(el)
try:
- if el.blockno == 0:
- old_block_id = self.db.get_val('SELECT block_id FROM inodes '
- 'WHERE id=? AND block_id IS NOT NULL',
- (el.inode,))
- else:
- old_block_id = self.db.get_val('SELECT block_id FROM inode_blocks '
- 'WHERE inode=? AND blockno=?',
- (el.inode, el.blockno))
+ old_block_id = self.db.get_val('SELECT block_id FROM inode_blocks '
+ 'WHERE inode=? AND blockno=?',
+ (el.inode, el.blockno))
except NoSuchRowError:
old_block_id = None
@@ -411,12 +399,8 @@ class BlockCache(object):
self.in_transit.remove((el.inode, el.blockno))
raise
-
- if el.blockno == 0:
- self.db.execute('UPDATE inodes SET block_id=? WHERE id=?', (block_id, el.inode))
- else:
- self.db.execute('INSERT OR REPLACE INTO inode_blocks (block_id, inode, blockno) '
- 'VALUES(?,?,?)', (block_id, el.inode, el.blockno))
+ self.db.execute('INSERT OR REPLACE INTO inode_blocks (block_id, inode, blockno) '
+ 'VALUES(?,?,?)', (block_id, el.inode, el.blockno))
# Check if we have to remove an old block
if not old_block_id:
@@ -513,13 +497,9 @@ class BlockCache(object):
except KeyError:
filename = os.path.join(self.path, '%d-%d' % (inode, blockno))
try:
- if blockno == 0:
- block_id = self.db.get_val('SELECT block_id FROM inodes '
- 'WHERE id=? AND block_id IS NOT NULL', (inode,))
- else:
- block_id = self.db.get_val('SELECT block_id FROM inode_blocks '
- 'WHERE inode=? AND blockno=?', (inode, blockno))
-
+ block_id = self.db.get_val('SELECT block_id FROM inode_blocks '
+ 'WHERE inode=? AND blockno=?', (inode, blockno))
+
# No corresponding object
except NoSuchRowError:
#log.debug('get(inode=%d, block=%d): creating new block', inode, blockno)
@@ -541,13 +521,16 @@ class BlockCache(object):
self.in_transit.add(obj_id)
log.debug('get(inode=%d, block=%d): downloading object %d..',
inode, blockno, obj_id)
+ def do_read(fh):
+ el = CacheEntry(inode, blockno, filename)
+ shutil.copyfileobj(fh, el, BUFSIZE)
+ return el
try:
- el = CacheEntry(inode, blockno, filename)
+
with lock_released:
with self.bucket_pool() as bucket:
- with bucket.open_read('s3ql_data_%d' % obj_id) as fh:
- shutil.copyfileobj(fh, el)
-
+ el = bucket.perform_read(do_read, 's3ql_data_%d' % obj_id)
+
# Note: We need to do this *before* releasing the global
# lock to notify other threads
self.entries[(inode, blockno)] = el
@@ -686,23 +669,15 @@ class BlockCache(object):
el.unlink()
try:
- if blockno == 0:
- block_id = self.db.get_val('SELECT block_id FROM inodes '
- 'WHERE id=? AND block_id IS NOT NULL',
- (inode,))
- else:
- block_id = self.db.get_val('SELECT block_id FROM inode_blocks '
- 'WHERE inode=? AND blockno=?', (inode, blockno))
+ block_id = self.db.get_val('SELECT block_id FROM inode_blocks '
+ 'WHERE inode=? AND blockno=?', (inode, blockno))
except NoSuchRowError:
log.debug('remove(inode=%d, blockno=%d): block not in db', inode, blockno)
continue
# Detach inode from block
- if blockno == 0:
- self.db.execute('UPDATE inodes SET block_id=NULL WHERE id=?', (inode,))
- else:
- self.db.execute('DELETE FROM inode_blocks WHERE inode=? AND blockno=?',
- (inode, blockno))
+ self.db.execute('DELETE FROM inode_blocks WHERE inode=? AND blockno=?',
+ (inode, blockno))
# Decrease block refcount
refcount = self.db.get_val('SELECT refcount FROM blocks WHERE id=?', (block_id,))