summaryrefslogtreecommitdiff
path: root/src/s3ql/fs.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/s3ql/fs.py')
-rw-r--r--src/s3ql/fs.py292
1 files changed, 127 insertions, 165 deletions
diff --git a/src/s3ql/fs.py b/src/s3ql/fs.py
index 3cf55f9..78664cf 100644
--- a/src/s3ql/fs.py
+++ b/src/s3ql/fs.py
@@ -3,32 +3,26 @@ fs.py - this file is part of S3QL (http://s3ql.googlecode.com)
Copyright (C) 2008-2009 Nikolaus Rath <Nikolaus@rath.org>
-This program can be distributed under the terms of the GNU LGPL.
+This program can be distributed under the terms of the GNU GPLv3.
'''
from __future__ import division, print_function, absolute_import
-
-import os
-import errno
-import stat
-import llfuse
-import collections
-import logging
+from .backends.common import NoSuchObject, ChecksumError
+from .common import (get_path, CTRL_NAME, CTRL_INODE, LoggerFilter)
+from .database import NoSuchRowError
from .inode_cache import InodeCache, OutOfInodesError
-from .common import (get_path, CTRL_NAME, CTRL_INODE, LoggerFilter,
- EmbeddedException, ExceptionStoringThread)
-import time
-from .block_cache import BlockCache
from cStringIO import StringIO
-from .database import NoSuchRowError
-from .backends.common import NoSuchObject, ChecksumError
-import struct
+from llfuse import FUSEError
import cPickle as pickle
+import collections
+import errno
+import llfuse
+import logging
import math
-import threading
-from llfuse import FUSEError, lock, lock_released
-
-__all__ = [ "Server" ]
+import os
+import stat
+import struct
+import time
# standard logger for this module
log = logging.getLogger("fs")
@@ -52,7 +46,6 @@ class Operations(llfuse.Operations):
-----------
:cache: Holds information about cached blocks
- :encountered_errors: Is set to true if a request handler raised an exception
:inode_cache: A cache for the attributes of the currently opened inodes.
:open_inodes: dict of currently opened inodes. This is used to not remove
the blocks of unlinked inodes that are still open.
@@ -74,52 +67,19 @@ class Operations(llfuse.Operations):
users relying on unlink()/rmdir() to fail for a directory/file. For that, it
explicitly checks the st_mode attribute.
"""
-
- def handle_exc(self, fn, exc):
- '''Handle exceptions that occurred during request processing.
-
- This method marks the file system as needing fsck and logs the
- error.
- '''
- # Unused arguments
- #pylint: disable=W0613
- log.error("Unexpected internal filesystem error.\n"
- "Filesystem may be corrupted, run fsck.s3ql as soon as possible!\n"
- "Please report this bug on http://code.google.com/p/s3ql/.")
- self.encountered_errors = True
-
-
- def __init__(self, bucket, db, cachedir, blocksize, cache_size,
- cache_entries=768, upload_event=None):
+ def __init__(self, block_cache, db, blocksize, upload_event=None):
super(Operations, self).__init__()
- self.encountered_errors = False
self.inodes = InodeCache(db)
self.db = db
self.upload_event = upload_event
- self.inode_flush_thread = None
self.open_inodes = collections.defaultdict(lambda: 0)
self.blocksize = blocksize
- self.cache = BlockCache(bucket, db, cachedir, cache_size, cache_entries)
-
- def init(self):
- self.cache.init()
- self.inode_flush_thread = InodeFlushThread(self.inodes)
- self.inode_flush_thread.start()
+ self.cache = block_cache
def destroy(self):
- try:
- self.inode_flush_thread.stop()
- except EmbeddedException:
- log.error('FlushThread terminated with exception.')
- self.encountered_errors = True
-
self.inodes.destroy()
- self.cache.destroy()
-
- if self.cache.encountered_errors:
- self.encountered_errors = True
def lookup(self, id_p, name):
if name == CTRL_NAME:
@@ -141,7 +101,7 @@ class Operations(llfuse.Operations):
return self.inodes[id_]
try:
- id_ = self.db.get_val("SELECT inode FROM contents WHERE name=? AND parent_inode=?",
+ id_ = self.db.get_val("SELECT inode FROM contents_v WHERE name=? AND parent_inode=?",
(name, id_p))
except NoSuchRowError:
raise(llfuse.FUSEError(errno.ENOENT))
@@ -168,7 +128,11 @@ class Operations(llfuse.Operations):
inode = self.inodes[id_]
if inode.atime < inode.ctime or inode.atime < inode.mtime:
inode.atime = timestamp
- return inode.target
+ try:
+ return self.db.get_val("SELECT target FROM symlink_targets WHERE inode=?", (id_,))
+ except NoSuchRowError:
+ log.warn('Inode does not have symlink target: %d', id_)
+ raise FUSEError(errno.EINVAL)
def opendir(self, id_):
return id_
@@ -190,20 +154,15 @@ class Operations(llfuse.Operations):
# The ResultSet is automatically deleted
# when yield raises GeneratorExit.
- res = self.db.query("SELECT rowid, name, inode FROM contents WHERE parent_inode=? "
- 'AND rowid > ? ORDER BY rowid', (id_, off))
+ res = self.db.query("SELECT rowid, name, inode FROM contents_v "
+ 'WHERE parent_inode=? AND contents_v.rowid > ? ORDER BY rowid', (id_, off))
for (next_, name, cid_) in res:
yield (name, self.inodes[cid_], next_)
def getxattr(self, id_, name):
# Handle S3QL commands
if id_ == CTRL_INODE:
- if name == b's3ql_errors?':
- if self.encountered_errors:
- return b'errors encountered'
- else:
- return b'no errors'
- elif name == b's3ql_pid?':
+ if name == b's3ql_pid?':
return bytes(os.getpid())
elif name == b's3qlstat':
@@ -231,7 +190,6 @@ class Operations(llfuse.Operations):
if id_ == CTRL_INODE:
if name == b's3ql_flushcache!':
self.cache.clear()
- self.cache.upload_manager.join_all()
elif name == 'copy':
self.copy_tree(*struct.unpack('II', value))
elif name == 'upload-meta':
@@ -317,17 +275,17 @@ class Operations(llfuse.Operations):
while True:
found_subdirs = False
id_p = queue.pop()
- for (name, id_) in self.db.query('SELECT name, inode FROM contents WHERE '
- 'parent_inode=?', (id_p,)):
+ for (name_id, id_) in self.db.query('SELECT name_id, inode FROM contents WHERE '
+ 'parent_inode=?', (id_p,)):
- if self.db.has_val('SELECT 1 FROM contents WHERE parent_inode=?',
- (id_,)):
+ if self.db.has_val('SELECT 1 FROM contents WHERE parent_inode=?', (id_,)):
if not found_subdirs:
found_subdirs = True
queue.append(id_p)
queue.append(id_)
else:
+ name = self.db.get_val("SELECT name FROM names WHERE id=?", (name_id,))
llfuse.invalidate_entry(id_p, name)
self._remove(id_p, name, id_, force=True)
@@ -374,7 +332,8 @@ class Operations(llfuse.Operations):
for attr in ('atime', 'ctime', 'mtime', 'mode', 'uid', 'gid'):
setattr(target_inode, attr, getattr(src_inode, attr))
- # We first replicate into a dummy inode
+ # We first replicate into a dummy inode, so that we
+ # need to invalidate only once.
timestamp = time.time()
tmp = make_inode(mtime=timestamp, ctime=timestamp, atime=timestamp,
uid=0, gid=0, mode=0, refcount=0)
@@ -384,14 +343,13 @@ class Operations(llfuse.Operations):
processed = 0 # Number of steps since last GIL release
stamp = time.time() # Time of last GIL release
gil_step = 100 # Approx. number of steps between GIL releases
- in_transit = set()
while queue:
(src_id, target_id, rowid) = queue.pop()
log.debug('copy_tree(%d, %d): Processing directory (%d, %d, %d)',
src_inode.id, target_inode.id, src_id, target_id, rowid)
- for (name, id_, rowid) in db.query('SELECT name, inode, rowid FROM contents '
- 'WHERE parent_inode=? AND rowid > ? '
- 'ORDER BY rowid', (src_id, rowid)):
+ for (name_id, id_, rowid) in db.query('SELECT name_id, inode, rowid FROM contents '
+ 'WHERE parent_inode=? AND rowid > ? '
+ 'ORDER BY rowid', (src_id, rowid)):
if id_ not in id_cache:
inode = self.inodes[id_]
@@ -400,8 +358,7 @@ class Operations(llfuse.Operations):
inode_new = make_inode(refcount=1, mode=inode.mode, size=inode.size,
uid=inode.uid, gid=inode.gid,
mtime=inode.mtime, atime=inode.atime,
- ctime=inode.ctime, target=inode.target,
- rdev=inode.rdev)
+ ctime=inode.ctime, rdev=inode.rdev)
except OutOfInodesError:
log.warn('Could not find a free inode')
raise FUSEError(errno.ENOSPC)
@@ -411,24 +368,37 @@ class Operations(llfuse.Operations):
if inode.refcount != 1:
id_cache[id_] = id_new
- for (obj_id, blockno) in db.query('SELECT obj_id, blockno FROM blocks '
- 'WHERE inode=?', (id_,)):
- processed += 1
- db.execute('INSERT INTO blocks (inode, blockno, obj_id) VALUES(?, ?, ?)',
- (id_new, blockno, obj_id))
- db.execute('UPDATE objects SET refcount=refcount+1 WHERE id=?', (obj_id,))
+ db.execute('INSERT INTO symlink_targets (inode, target) '
+ 'SELECT ?, target FROM symlink_targets WHERE inode=?',
+ (id_new, id_))
- if (id_, blockno) in self.cache.upload_manager.in_transit:
- in_transit.add((id_, blockno))
-
+ db.execute('INSERT INTO inode_blocks (inode, blockno, block_id) '
+ 'SELECT ?, blockno, block_id FROM inode_blocks '
+ 'WHERE inode=?', (id_new, id_))
+
+ db.execute('UPDATE inodes SET block_id='
+ '(SELECT block_id FROM inodes WHERE id=?) '
+ 'WHERE id=?', (id_, id_new))
+
+ processed += db.execute('REPLACE INTO blocks (id, hash, refcount, size, obj_id) '
+ 'SELECT blocks.id, hash, blocks.refcount+1, blocks.size, obj_id '
+ 'FROM inodes JOIN blocks ON block_id = blocks.id '
+ 'WHERE inodes.id = ? AND block_id IS NOT NULL', (id_new,))
+
+ processed += db.execute('REPLACE INTO blocks (id, hash, refcount, size, obj_id) '
+ 'SELECT id, hash, refcount+1, size, obj_id '
+ 'FROM inode_blocks JOIN blocks ON block_id = id '
+ 'WHERE inode = ?', (id_new,))
+
if db.has_val('SELECT 1 FROM contents WHERE parent_inode=?', (id_,)):
queue.append((id_, id_new, 0))
else:
id_new = id_cache[id_]
self.inodes[id_new].refcount += 1
- db.execute('INSERT INTO contents (name, inode, parent_inode) VALUES(?, ?, ?)',
- (name, id_new, target_id))
+ db.execute('INSERT INTO contents (name_id, inode, parent_inode) VALUES(?, ?, ?)',
+ (name_id, id_new, target_id))
+ db.execute('UPDATE names SET refcount=refcount+1 WHERE id=?', (name_id,))
processed += 1
@@ -446,29 +416,15 @@ class Operations(llfuse.Operations):
processed = 0
llfuse.lock.yield_()
stamp = time.time()
-
- # If we replicated blocks whose associated objects where still in
- # transit, we have to wait for the transit to complete before we make
- # the replicated tree visible to the user. Otherwise access to the newly
- # created blocks will raise a NoSuchObject exception.
- while in_transit:
- log.debug('copy_tree(%d, %d): in_transit: %s',
- src_inode.id, target_inode.id, in_transit)
- in_transit = [ x for x in in_transit
- if x in self.cache.upload_manager.in_transit ]
- if in_transit:
- self.cache.upload_manager.join_one()
-
# Make replication visible
self.db.execute('UPDATE contents SET parent_inode=? WHERE parent_inode=?',
- (target_inode.id, tmp.id))
+ (target_inode.id, tmp.id))
del self.inodes[tmp.id]
llfuse.invalidate_inode(target_inode.id)
log.debug('copy_tree(%d, %d): end', src_inode.id, target_inode.id)
-
def unlink(self, id_p, name):
inode = self.lookup(id_p, name)
@@ -509,8 +465,10 @@ class Operations(llfuse.Operations):
if self.inodes[id_p].locked and not force:
raise FUSEError(errno.EPERM)
- self.db.execute("DELETE FROM contents WHERE name=? AND parent_inode=?",
- (name, id_p))
+ name_id = self._del_name(name)
+ self.db.execute("DELETE FROM contents WHERE name_id=? AND parent_inode=?",
+ (name_id, id_p))
+
inode = self.inodes[id_]
inode.refcount -= 1
inode.ctime = timestamp
@@ -524,6 +482,7 @@ class Operations(llfuse.Operations):
# Since the inode is not open, it's not possible that new blocks
# get created at this point and we can safely delete the inode
self.db.execute('DELETE FROM ext_attributes WHERE inode=?', (id_,))
+ self.db.execute('DELETE FROM symlink_targets WHERE inode=?', (id_,))
del self.inodes[id_]
def symlink(self, id_p, name, target, ctx):
@@ -536,7 +495,11 @@ class Operations(llfuse.Operations):
# with this size. If the kernel ever learns to open and read
# symlinks directly, it will read the corresponding number of \0
# bytes.
- return self._create(id_p, name, mode, ctx, target=target, size=len(target))
+ inode = self._create(id_p, name, mode, ctx, size=len(target))
+ self.db.execute('INSERT INTO symlink_targets (inode, target) VALUES(?,?)',
+ (inode.id, target))
+
+ return inode
def rename(self, id_p_old, name_old, id_p_new, name_new):
if name_new == CTRL_NAME or name_old == CTRL_NAME:
@@ -569,12 +532,46 @@ class Operations(llfuse.Operations):
self._rename(id_p_old, name_old, id_p_new, name_new)
+ def _add_name(self, name):
+ '''Get id for *name* and increase refcount
+
+ Name is inserted in table if it does not yet exist.
+ '''
+
+ try:
+ name_id = self.db.get_val('SELECT id FROM names WHERE name=?', (name,))
+ except NoSuchRowError:
+ name_id = self.db.rowid('INSERT INTO names (name, refcount) VALUES(?,?)',
+ (name, 1))
+ else:
+ self.db.execute('UPDATE names SET refcount=refcount+1 WHERE id=?', (name_id,))
+ return name_id
+
+ def _del_name(self, name):
+ '''Decrease refcount for *name*
+
+ Name is removed from table if refcount drops to zero. Returns the
+ (possibly former) id of the name.
+ '''
+
+ (name_id, refcount) = self.db.get_row('SELECT id, refcount FROM names WHERE name=?', (name,))
+
+ if refcount > 1:
+ self.db.execute('UPDATE names SET refcount=refcount-1 WHERE id=?', (name_id,))
+ else:
+ self.db.execute('DELETE FROM names WHERE id=?', (name_id,))
+
+ return name_id
+
def _rename(self, id_p_old, name_old, id_p_new, name_new):
timestamp = time.time()
- self.db.execute("UPDATE contents SET name=?, parent_inode=? WHERE name=? "
- "AND parent_inode=?", (name_new, id_p_new,
- name_old, id_p_old))
+ name_id_new = self._add_name(name_new)
+ name_id_old = self._del_name(name_old)
+
+ self.db.execute("UPDATE contents SET name_id=?, parent_inode=? WHERE name_id=? "
+ "AND parent_inode=?", (name_id_new, id_p_new,
+ name_id_old, id_p_old))
inode_p_old = self.inodes[id_p_old]
inode_p_new = self.inodes[id_p_new]
@@ -594,13 +591,15 @@ class Operations(llfuse.Operations):
raise llfuse.FUSEError(errno.EINVAL)
# Replace target
- self.db.execute("UPDATE contents SET inode=? WHERE name=? AND parent_inode=?",
- (id_old, name_new, id_p_new))
+ name_id_new = self.db.get_val('SELECT id FROM names WHERE name=?', (name_new,))
+ self.db.execute("UPDATE contents SET inode=? WHERE name_id=? AND parent_inode=?",
+ (id_old, name_id_new, id_p_new))
# Delete old name
- self.db.execute('DELETE FROM contents WHERE name=? AND parent_inode=?',
- (name_old, id_p_old))
+ name_id_old = self._del_name(name_old)
+ self.db.execute('DELETE FROM contents WHERE name_id=? AND parent_inode=?',
+ (name_id_old, id_p_old))
inode_new = self.inodes[id_new]
inode_new.refcount -= 1
@@ -620,6 +619,7 @@ class Operations(llfuse.Operations):
# Since the inode is not open, it's not possible that new blocks
# get created at this point and we can safely delete the inode
self.db.execute('DELETE FROM ext_attributes WHERE inode=?', (id_new,))
+ self.db.execute('DELETE FROM symlink_targets WHERE inode=?', (id_new,))
del self.inodes[id_new]
@@ -643,8 +643,8 @@ class Operations(llfuse.Operations):
inode_p.ctime = timestamp
inode_p.mtime = timestamp
- self.db.execute("INSERT INTO contents (name, inode, parent_inode) VALUES(?,?,?)",
- (new_name, id_, new_id_p))
+ self.db.execute("INSERT INTO contents (name_id, inode, parent_inode) VALUES(?,?,?)",
+ (self._add_name(new_name), id_, new_id_p))
inode = self.inodes[id_]
inode.refcount += 1
inode.ctime = timestamp
@@ -684,7 +684,6 @@ class Operations(llfuse.Operations):
except NoSuchObject as exc:
log.warn('Backend lost block %d of inode %d (id %s)!',
last_block, id_, exc.key)
- self.encountered_errors = True
raise FUSEError(errno.EIO)
except ChecksumError as exc:
@@ -727,11 +726,14 @@ class Operations(llfuse.Operations):
def extstat(self):
'''Return extended file system statistics'''
+ # Flush inode cache to get better estimate of total fs size
+ self.inodes.flush()
+
entries = self.db.get_val("SELECT COUNT(rowid) FROM contents")
blocks = self.db.get_val("SELECT COUNT(id) FROM objects")
inodes = self.db.get_val("SELECT COUNT(id) FROM inodes")
fs_size = self.db.get_val('SELECT SUM(size) FROM inodes') or 0
- dedup_size = self.db.get_val('SELECT SUM(size) FROM objects') or 0
+ dedup_size = self.db.get_val('SELECT SUM(size) FROM blocks') or 0
compr_size = self.db.get_val('SELECT SUM(compr_size) FROM objects') or 0
return struct.pack('QQQQQQQ', entries, blocks, inodes, fs_size, dedup_size,
@@ -744,7 +746,7 @@ class Operations(llfuse.Operations):
# Get number of blocks & inodes
blocks = self.db.get_val("SELECT COUNT(id) FROM objects")
inodes = self.db.get_val("SELECT COUNT(id) FROM inodes")
- size = self.db.get_val('SELECT SUM(size) FROM objects')
+ size = self.db.get_val('SELECT SUM(size) FROM blocks')
if size is None:
size = 0
@@ -759,10 +761,7 @@ class Operations(llfuse.Operations):
# size of fs in f_frsize units
# (since backend is supposed to be unlimited, always return a half-full filesystem,
# but at least 50 GB)
- if stat_.f_bsize != 0:
- total_blocks = int(max(2 * blocks, 50 * 1024 ** 3 // stat_.f_bsize))
- else:
- total_blocks = 2 * blocks
+ total_blocks = int(max(2 * blocks, 50 * 1024 ** 3 // stat_.f_frsize))
stat_.f_blocks = total_blocks
stat_.f_bfree = total_blocks - blocks
@@ -800,7 +799,7 @@ class Operations(llfuse.Operations):
self.open_inodes[inode.id] += 1
return (inode.id, inode)
- def _create(self, id_p, name, mode, ctx, rdev=0, target=None, size=0):
+ def _create(self, id_p, name, mode, ctx, rdev=0, size=0):
if name == CTRL_NAME:
log.warn('Attempted to create s3ql control file at %s',
get_path(id_p, self.db, name))
@@ -822,13 +821,13 @@ class Operations(llfuse.Operations):
try:
inode = self.inodes.create_inode(mtime=timestamp, ctime=timestamp, atime=timestamp,
uid=ctx.uid, gid=ctx.gid, mode=mode, refcount=1,
- rdev=rdev, target=target, size=size)
+ rdev=rdev, size=size)
except OutOfInodesError:
log.warn('Could not find a free inode')
raise FUSEError(errno.ENOSPC)
- self.db.execute("INSERT INTO contents(name, inode, parent_inode) VALUES(?,?,?)",
- (name, inode.id, id_p))
+ self.db.execute("INSERT INTO contents(name_id, inode, parent_inode) VALUES(?,?,?)",
+ (self._add_name(name), inode.id, id_p))
return inode
@@ -889,7 +888,6 @@ class Operations(llfuse.Operations):
except NoSuchObject as exc:
log.warn('Backend lost block %d of inode %d (id %s)!',
blockno, id_, exc.key)
- self.encountered_errors = True
raise FUSEError(errno.EIO)
except ChecksumError as exc:
@@ -957,7 +955,6 @@ class Operations(llfuse.Operations):
except NoSuchObject as exc:
log.warn('Backend lost block %d of inode %d (id %s)!',
blockno, id_, exc.key)
- self.encountered_errors = True
raise FUSEError(errno.EIO)
except ChecksumError as exc:
@@ -987,7 +984,7 @@ class Operations(llfuse.Operations):
inode = self.inodes[fh]
if inode.refcount == 0:
self.cache.remove(inode.id, 0,
- int(math.ceil(inode.size // self.blocksize)))
+ int(math.ceil(inode.size / self.blocksize)))
# Since the inode is not open, it's not possible that new blocks
# get created at this point and we can safely delete the in
del self.inodes[fh]
@@ -1016,39 +1013,4 @@ def update_logging(level, modules):
else:
logging.disable(logging.DEBUG)
root_logger.setLevel(level)
-
-
-class InodeFlushThread(ExceptionStoringThread):
- '''
- Periodically commit dirty inodes.
-
- This class uses the llfuse global lock. When calling objects
- passed in the constructor, the global lock is acquired first.
- '''
-
- def __init__(self, cache):
- super(InodeFlushThread, self).__init__()
- self.cache = cache
- self.stop_event = threading.Event()
- self.name = 'Inode Flush Thread'
- self.daemon = True
-
- def run_protected(self):
- log.debug('FlushThread: start')
-
- while not self.stop_event.is_set():
- with lock:
- self.cache.flush()
- self.stop_event.wait(5)
- log.debug('FlushThread: end')
-
- def stop(self):
- '''Wait for thread to finish, raise any occurred exceptions.
-
- This method releases the global lock.
- '''
-
- self.stop_event.set()
- with lock_released:
- self.join_and_raise()
-
+ \ No newline at end of file