diff options
Diffstat (limited to 'src/s3ql/fs.py')
-rw-r--r-- | src/s3ql/fs.py | 292 |
1 files changed, 127 insertions, 165 deletions
diff --git a/src/s3ql/fs.py b/src/s3ql/fs.py index 3cf55f9..78664cf 100644 --- a/src/s3ql/fs.py +++ b/src/s3ql/fs.py @@ -3,32 +3,26 @@ fs.py - this file is part of S3QL (http://s3ql.googlecode.com) Copyright (C) 2008-2009 Nikolaus Rath <Nikolaus@rath.org> -This program can be distributed under the terms of the GNU LGPL. +This program can be distributed under the terms of the GNU GPLv3. ''' from __future__ import division, print_function, absolute_import - -import os -import errno -import stat -import llfuse -import collections -import logging +from .backends.common import NoSuchObject, ChecksumError +from .common import (get_path, CTRL_NAME, CTRL_INODE, LoggerFilter) +from .database import NoSuchRowError from .inode_cache import InodeCache, OutOfInodesError -from .common import (get_path, CTRL_NAME, CTRL_INODE, LoggerFilter, - EmbeddedException, ExceptionStoringThread) -import time -from .block_cache import BlockCache from cStringIO import StringIO -from .database import NoSuchRowError -from .backends.common import NoSuchObject, ChecksumError -import struct +from llfuse import FUSEError import cPickle as pickle +import collections +import errno +import llfuse +import logging import math -import threading -from llfuse import FUSEError, lock, lock_released - -__all__ = [ "Server" ] +import os +import stat +import struct +import time # standard logger for this module log = logging.getLogger("fs") @@ -52,7 +46,6 @@ class Operations(llfuse.Operations): ----------- :cache: Holds information about cached blocks - :encountered_errors: Is set to true if a request handler raised an exception :inode_cache: A cache for the attributes of the currently opened inodes. :open_inodes: dict of currently opened inodes. This is used to not remove the blocks of unlinked inodes that are still open. @@ -74,52 +67,19 @@ class Operations(llfuse.Operations): users relying on unlink()/rmdir() to fail for a directory/file. For that, it explicitly checks the st_mode attribute. """ - - def handle_exc(self, fn, exc): - '''Handle exceptions that occurred during request processing. - - This method marks the file system as needing fsck and logs the - error. - ''' - # Unused arguments - #pylint: disable=W0613 - log.error("Unexpected internal filesystem error.\n" - "Filesystem may be corrupted, run fsck.s3ql as soon as possible!\n" - "Please report this bug on http://code.google.com/p/s3ql/.") - self.encountered_errors = True - - - def __init__(self, bucket, db, cachedir, blocksize, cache_size, - cache_entries=768, upload_event=None): + def __init__(self, block_cache, db, blocksize, upload_event=None): super(Operations, self).__init__() - self.encountered_errors = False self.inodes = InodeCache(db) self.db = db self.upload_event = upload_event - self.inode_flush_thread = None self.open_inodes = collections.defaultdict(lambda: 0) self.blocksize = blocksize - self.cache = BlockCache(bucket, db, cachedir, cache_size, cache_entries) - - def init(self): - self.cache.init() - self.inode_flush_thread = InodeFlushThread(self.inodes) - self.inode_flush_thread.start() + self.cache = block_cache def destroy(self): - try: - self.inode_flush_thread.stop() - except EmbeddedException: - log.error('FlushThread terminated with exception.') - self.encountered_errors = True - self.inodes.destroy() - self.cache.destroy() - - if self.cache.encountered_errors: - self.encountered_errors = True def lookup(self, id_p, name): if name == CTRL_NAME: @@ -141,7 +101,7 @@ class Operations(llfuse.Operations): return self.inodes[id_] try: - id_ = self.db.get_val("SELECT inode FROM contents WHERE name=? AND parent_inode=?", + id_ = self.db.get_val("SELECT inode FROM contents_v WHERE name=? AND parent_inode=?", (name, id_p)) except NoSuchRowError: raise(llfuse.FUSEError(errno.ENOENT)) @@ -168,7 +128,11 @@ class Operations(llfuse.Operations): inode = self.inodes[id_] if inode.atime < inode.ctime or inode.atime < inode.mtime: inode.atime = timestamp - return inode.target + try: + return self.db.get_val("SELECT target FROM symlink_targets WHERE inode=?", (id_,)) + except NoSuchRowError: + log.warn('Inode does not have symlink target: %d', id_) + raise FUSEError(errno.EINVAL) def opendir(self, id_): return id_ @@ -190,20 +154,15 @@ class Operations(llfuse.Operations): # The ResultSet is automatically deleted # when yield raises GeneratorExit. - res = self.db.query("SELECT rowid, name, inode FROM contents WHERE parent_inode=? " - 'AND rowid > ? ORDER BY rowid', (id_, off)) + res = self.db.query("SELECT rowid, name, inode FROM contents_v " + 'WHERE parent_inode=? AND contents_v.rowid > ? ORDER BY rowid', (id_, off)) for (next_, name, cid_) in res: yield (name, self.inodes[cid_], next_) def getxattr(self, id_, name): # Handle S3QL commands if id_ == CTRL_INODE: - if name == b's3ql_errors?': - if self.encountered_errors: - return b'errors encountered' - else: - return b'no errors' - elif name == b's3ql_pid?': + if name == b's3ql_pid?': return bytes(os.getpid()) elif name == b's3qlstat': @@ -231,7 +190,6 @@ class Operations(llfuse.Operations): if id_ == CTRL_INODE: if name == b's3ql_flushcache!': self.cache.clear() - self.cache.upload_manager.join_all() elif name == 'copy': self.copy_tree(*struct.unpack('II', value)) elif name == 'upload-meta': @@ -317,17 +275,17 @@ class Operations(llfuse.Operations): while True: found_subdirs = False id_p = queue.pop() - for (name, id_) in self.db.query('SELECT name, inode FROM contents WHERE ' - 'parent_inode=?', (id_p,)): + for (name_id, id_) in self.db.query('SELECT name_id, inode FROM contents WHERE ' + 'parent_inode=?', (id_p,)): - if self.db.has_val('SELECT 1 FROM contents WHERE parent_inode=?', - (id_,)): + if self.db.has_val('SELECT 1 FROM contents WHERE parent_inode=?', (id_,)): if not found_subdirs: found_subdirs = True queue.append(id_p) queue.append(id_) else: + name = self.db.get_val("SELECT name FROM names WHERE id=?", (name_id,)) llfuse.invalidate_entry(id_p, name) self._remove(id_p, name, id_, force=True) @@ -374,7 +332,8 @@ class Operations(llfuse.Operations): for attr in ('atime', 'ctime', 'mtime', 'mode', 'uid', 'gid'): setattr(target_inode, attr, getattr(src_inode, attr)) - # We first replicate into a dummy inode + # We first replicate into a dummy inode, so that we + # need to invalidate only once. timestamp = time.time() tmp = make_inode(mtime=timestamp, ctime=timestamp, atime=timestamp, uid=0, gid=0, mode=0, refcount=0) @@ -384,14 +343,13 @@ class Operations(llfuse.Operations): processed = 0 # Number of steps since last GIL release stamp = time.time() # Time of last GIL release gil_step = 100 # Approx. number of steps between GIL releases - in_transit = set() while queue: (src_id, target_id, rowid) = queue.pop() log.debug('copy_tree(%d, %d): Processing directory (%d, %d, %d)', src_inode.id, target_inode.id, src_id, target_id, rowid) - for (name, id_, rowid) in db.query('SELECT name, inode, rowid FROM contents ' - 'WHERE parent_inode=? AND rowid > ? ' - 'ORDER BY rowid', (src_id, rowid)): + for (name_id, id_, rowid) in db.query('SELECT name_id, inode, rowid FROM contents ' + 'WHERE parent_inode=? AND rowid > ? ' + 'ORDER BY rowid', (src_id, rowid)): if id_ not in id_cache: inode = self.inodes[id_] @@ -400,8 +358,7 @@ class Operations(llfuse.Operations): inode_new = make_inode(refcount=1, mode=inode.mode, size=inode.size, uid=inode.uid, gid=inode.gid, mtime=inode.mtime, atime=inode.atime, - ctime=inode.ctime, target=inode.target, - rdev=inode.rdev) + ctime=inode.ctime, rdev=inode.rdev) except OutOfInodesError: log.warn('Could not find a free inode') raise FUSEError(errno.ENOSPC) @@ -411,24 +368,37 @@ class Operations(llfuse.Operations): if inode.refcount != 1: id_cache[id_] = id_new - for (obj_id, blockno) in db.query('SELECT obj_id, blockno FROM blocks ' - 'WHERE inode=?', (id_,)): - processed += 1 - db.execute('INSERT INTO blocks (inode, blockno, obj_id) VALUES(?, ?, ?)', - (id_new, blockno, obj_id)) - db.execute('UPDATE objects SET refcount=refcount+1 WHERE id=?', (obj_id,)) + db.execute('INSERT INTO symlink_targets (inode, target) ' + 'SELECT ?, target FROM symlink_targets WHERE inode=?', + (id_new, id_)) - if (id_, blockno) in self.cache.upload_manager.in_transit: - in_transit.add((id_, blockno)) - + db.execute('INSERT INTO inode_blocks (inode, blockno, block_id) ' + 'SELECT ?, blockno, block_id FROM inode_blocks ' + 'WHERE inode=?', (id_new, id_)) + + db.execute('UPDATE inodes SET block_id=' + '(SELECT block_id FROM inodes WHERE id=?) ' + 'WHERE id=?', (id_, id_new)) + + processed += db.execute('REPLACE INTO blocks (id, hash, refcount, size, obj_id) ' + 'SELECT blocks.id, hash, blocks.refcount+1, blocks.size, obj_id ' + 'FROM inodes JOIN blocks ON block_id = blocks.id ' + 'WHERE inodes.id = ? AND block_id IS NOT NULL', (id_new,)) + + processed += db.execute('REPLACE INTO blocks (id, hash, refcount, size, obj_id) ' + 'SELECT id, hash, refcount+1, size, obj_id ' + 'FROM inode_blocks JOIN blocks ON block_id = id ' + 'WHERE inode = ?', (id_new,)) + if db.has_val('SELECT 1 FROM contents WHERE parent_inode=?', (id_,)): queue.append((id_, id_new, 0)) else: id_new = id_cache[id_] self.inodes[id_new].refcount += 1 - db.execute('INSERT INTO contents (name, inode, parent_inode) VALUES(?, ?, ?)', - (name, id_new, target_id)) + db.execute('INSERT INTO contents (name_id, inode, parent_inode) VALUES(?, ?, ?)', + (name_id, id_new, target_id)) + db.execute('UPDATE names SET refcount=refcount+1 WHERE id=?', (name_id,)) processed += 1 @@ -446,29 +416,15 @@ class Operations(llfuse.Operations): processed = 0 llfuse.lock.yield_() stamp = time.time() - - # If we replicated blocks whose associated objects where still in - # transit, we have to wait for the transit to complete before we make - # the replicated tree visible to the user. Otherwise access to the newly - # created blocks will raise a NoSuchObject exception. - while in_transit: - log.debug('copy_tree(%d, %d): in_transit: %s', - src_inode.id, target_inode.id, in_transit) - in_transit = [ x for x in in_transit - if x in self.cache.upload_manager.in_transit ] - if in_transit: - self.cache.upload_manager.join_one() - # Make replication visible self.db.execute('UPDATE contents SET parent_inode=? WHERE parent_inode=?', - (target_inode.id, tmp.id)) + (target_inode.id, tmp.id)) del self.inodes[tmp.id] llfuse.invalidate_inode(target_inode.id) log.debug('copy_tree(%d, %d): end', src_inode.id, target_inode.id) - def unlink(self, id_p, name): inode = self.lookup(id_p, name) @@ -509,8 +465,10 @@ class Operations(llfuse.Operations): if self.inodes[id_p].locked and not force: raise FUSEError(errno.EPERM) - self.db.execute("DELETE FROM contents WHERE name=? AND parent_inode=?", - (name, id_p)) + name_id = self._del_name(name) + self.db.execute("DELETE FROM contents WHERE name_id=? AND parent_inode=?", + (name_id, id_p)) + inode = self.inodes[id_] inode.refcount -= 1 inode.ctime = timestamp @@ -524,6 +482,7 @@ class Operations(llfuse.Operations): # Since the inode is not open, it's not possible that new blocks # get created at this point and we can safely delete the inode self.db.execute('DELETE FROM ext_attributes WHERE inode=?', (id_,)) + self.db.execute('DELETE FROM symlink_targets WHERE inode=?', (id_,)) del self.inodes[id_] def symlink(self, id_p, name, target, ctx): @@ -536,7 +495,11 @@ class Operations(llfuse.Operations): # with this size. If the kernel ever learns to open and read # symlinks directly, it will read the corresponding number of \0 # bytes. - return self._create(id_p, name, mode, ctx, target=target, size=len(target)) + inode = self._create(id_p, name, mode, ctx, size=len(target)) + self.db.execute('INSERT INTO symlink_targets (inode, target) VALUES(?,?)', + (inode.id, target)) + + return inode def rename(self, id_p_old, name_old, id_p_new, name_new): if name_new == CTRL_NAME or name_old == CTRL_NAME: @@ -569,12 +532,46 @@ class Operations(llfuse.Operations): self._rename(id_p_old, name_old, id_p_new, name_new) + def _add_name(self, name): + '''Get id for *name* and increase refcount + + Name is inserted in table if it does not yet exist. + ''' + + try: + name_id = self.db.get_val('SELECT id FROM names WHERE name=?', (name,)) + except NoSuchRowError: + name_id = self.db.rowid('INSERT INTO names (name, refcount) VALUES(?,?)', + (name, 1)) + else: + self.db.execute('UPDATE names SET refcount=refcount+1 WHERE id=?', (name_id,)) + return name_id + + def _del_name(self, name): + '''Decrease refcount for *name* + + Name is removed from table if refcount drops to zero. Returns the + (possibly former) id of the name. + ''' + + (name_id, refcount) = self.db.get_row('SELECT id, refcount FROM names WHERE name=?', (name,)) + + if refcount > 1: + self.db.execute('UPDATE names SET refcount=refcount-1 WHERE id=?', (name_id,)) + else: + self.db.execute('DELETE FROM names WHERE id=?', (name_id,)) + + return name_id + def _rename(self, id_p_old, name_old, id_p_new, name_new): timestamp = time.time() - self.db.execute("UPDATE contents SET name=?, parent_inode=? WHERE name=? " - "AND parent_inode=?", (name_new, id_p_new, - name_old, id_p_old)) + name_id_new = self._add_name(name_new) + name_id_old = self._del_name(name_old) + + self.db.execute("UPDATE contents SET name_id=?, parent_inode=? WHERE name_id=? " + "AND parent_inode=?", (name_id_new, id_p_new, + name_id_old, id_p_old)) inode_p_old = self.inodes[id_p_old] inode_p_new = self.inodes[id_p_new] @@ -594,13 +591,15 @@ class Operations(llfuse.Operations): raise llfuse.FUSEError(errno.EINVAL) # Replace target - self.db.execute("UPDATE contents SET inode=? WHERE name=? AND parent_inode=?", - (id_old, name_new, id_p_new)) + name_id_new = self.db.get_val('SELECT id FROM names WHERE name=?', (name_new,)) + self.db.execute("UPDATE contents SET inode=? WHERE name_id=? AND parent_inode=?", + (id_old, name_id_new, id_p_new)) # Delete old name - self.db.execute('DELETE FROM contents WHERE name=? AND parent_inode=?', - (name_old, id_p_old)) + name_id_old = self._del_name(name_old) + self.db.execute('DELETE FROM contents WHERE name_id=? AND parent_inode=?', + (name_id_old, id_p_old)) inode_new = self.inodes[id_new] inode_new.refcount -= 1 @@ -620,6 +619,7 @@ class Operations(llfuse.Operations): # Since the inode is not open, it's not possible that new blocks # get created at this point and we can safely delete the inode self.db.execute('DELETE FROM ext_attributes WHERE inode=?', (id_new,)) + self.db.execute('DELETE FROM symlink_targets WHERE inode=?', (id_new,)) del self.inodes[id_new] @@ -643,8 +643,8 @@ class Operations(llfuse.Operations): inode_p.ctime = timestamp inode_p.mtime = timestamp - self.db.execute("INSERT INTO contents (name, inode, parent_inode) VALUES(?,?,?)", - (new_name, id_, new_id_p)) + self.db.execute("INSERT INTO contents (name_id, inode, parent_inode) VALUES(?,?,?)", + (self._add_name(new_name), id_, new_id_p)) inode = self.inodes[id_] inode.refcount += 1 inode.ctime = timestamp @@ -684,7 +684,6 @@ class Operations(llfuse.Operations): except NoSuchObject as exc: log.warn('Backend lost block %d of inode %d (id %s)!', last_block, id_, exc.key) - self.encountered_errors = True raise FUSEError(errno.EIO) except ChecksumError as exc: @@ -727,11 +726,14 @@ class Operations(llfuse.Operations): def extstat(self): '''Return extended file system statistics''' + # Flush inode cache to get better estimate of total fs size + self.inodes.flush() + entries = self.db.get_val("SELECT COUNT(rowid) FROM contents") blocks = self.db.get_val("SELECT COUNT(id) FROM objects") inodes = self.db.get_val("SELECT COUNT(id) FROM inodes") fs_size = self.db.get_val('SELECT SUM(size) FROM inodes') or 0 - dedup_size = self.db.get_val('SELECT SUM(size) FROM objects') or 0 + dedup_size = self.db.get_val('SELECT SUM(size) FROM blocks') or 0 compr_size = self.db.get_val('SELECT SUM(compr_size) FROM objects') or 0 return struct.pack('QQQQQQQ', entries, blocks, inodes, fs_size, dedup_size, @@ -744,7 +746,7 @@ class Operations(llfuse.Operations): # Get number of blocks & inodes blocks = self.db.get_val("SELECT COUNT(id) FROM objects") inodes = self.db.get_val("SELECT COUNT(id) FROM inodes") - size = self.db.get_val('SELECT SUM(size) FROM objects') + size = self.db.get_val('SELECT SUM(size) FROM blocks') if size is None: size = 0 @@ -759,10 +761,7 @@ class Operations(llfuse.Operations): # size of fs in f_frsize units # (since backend is supposed to be unlimited, always return a half-full filesystem, # but at least 50 GB) - if stat_.f_bsize != 0: - total_blocks = int(max(2 * blocks, 50 * 1024 ** 3 // stat_.f_bsize)) - else: - total_blocks = 2 * blocks + total_blocks = int(max(2 * blocks, 50 * 1024 ** 3 // stat_.f_frsize)) stat_.f_blocks = total_blocks stat_.f_bfree = total_blocks - blocks @@ -800,7 +799,7 @@ class Operations(llfuse.Operations): self.open_inodes[inode.id] += 1 return (inode.id, inode) - def _create(self, id_p, name, mode, ctx, rdev=0, target=None, size=0): + def _create(self, id_p, name, mode, ctx, rdev=0, size=0): if name == CTRL_NAME: log.warn('Attempted to create s3ql control file at %s', get_path(id_p, self.db, name)) @@ -822,13 +821,13 @@ class Operations(llfuse.Operations): try: inode = self.inodes.create_inode(mtime=timestamp, ctime=timestamp, atime=timestamp, uid=ctx.uid, gid=ctx.gid, mode=mode, refcount=1, - rdev=rdev, target=target, size=size) + rdev=rdev, size=size) except OutOfInodesError: log.warn('Could not find a free inode') raise FUSEError(errno.ENOSPC) - self.db.execute("INSERT INTO contents(name, inode, parent_inode) VALUES(?,?,?)", - (name, inode.id, id_p)) + self.db.execute("INSERT INTO contents(name_id, inode, parent_inode) VALUES(?,?,?)", + (self._add_name(name), inode.id, id_p)) return inode @@ -889,7 +888,6 @@ class Operations(llfuse.Operations): except NoSuchObject as exc: log.warn('Backend lost block %d of inode %d (id %s)!', blockno, id_, exc.key) - self.encountered_errors = True raise FUSEError(errno.EIO) except ChecksumError as exc: @@ -957,7 +955,6 @@ class Operations(llfuse.Operations): except NoSuchObject as exc: log.warn('Backend lost block %d of inode %d (id %s)!', blockno, id_, exc.key) - self.encountered_errors = True raise FUSEError(errno.EIO) except ChecksumError as exc: @@ -987,7 +984,7 @@ class Operations(llfuse.Operations): inode = self.inodes[fh] if inode.refcount == 0: self.cache.remove(inode.id, 0, - int(math.ceil(inode.size // self.blocksize))) + int(math.ceil(inode.size / self.blocksize))) # Since the inode is not open, it's not possible that new blocks # get created at this point and we can safely delete the in del self.inodes[fh] @@ -1016,39 +1013,4 @@ def update_logging(level, modules): else: logging.disable(logging.DEBUG) root_logger.setLevel(level) - - -class InodeFlushThread(ExceptionStoringThread): - ''' - Periodically commit dirty inodes. - - This class uses the llfuse global lock. When calling objects - passed in the constructor, the global lock is acquired first. - ''' - - def __init__(self, cache): - super(InodeFlushThread, self).__init__() - self.cache = cache - self.stop_event = threading.Event() - self.name = 'Inode Flush Thread' - self.daemon = True - - def run_protected(self): - log.debug('FlushThread: start') - - while not self.stop_event.is_set(): - with lock: - self.cache.flush() - self.stop_event.wait(5) - log.debug('FlushThread: end') - - def stop(self): - '''Wait for thread to finish, raise any occurred exceptions. - - This method releases the global lock. - ''' - - self.stop_event.set() - with lock_released: - self.join_and_raise() - +
\ No newline at end of file |