diff options
author | Nikolaus Rath <Nikolaus@rath.org> | 2016-03-09 10:36:48 -0800 |
---|---|---|
committer | Nikolaus Rath <Nikolaus@rath.org> | 2016-03-09 10:36:48 -0800 |
commit | 3134cd3962e1a0d52a74a5ba8be8b8f74b4898e4 (patch) | |
tree | 3eee070d1db8d3311dbcb221e9fdc03cade34425 /src/s3ql | |
parent | ee66f49d060aae35cd189b540bd556876fe746c6 (diff) |
Import s3ql_2.17+dfsg.orig.tar.gz
Diffstat (limited to 'src/s3ql')
-rw-r--r-- | src/s3ql/__init__.py | 9 | ||||
-rw-r--r-- | src/s3ql/adm.py | 80 | ||||
-rw-r--r-- | src/s3ql/backends/s3.py | 24 | ||||
-rw-r--r-- | src/s3ql/block_cache.py | 39 | ||||
-rw-r--r-- | src/s3ql/common.py | 26 | ||||
-rw-r--r-- | src/s3ql/ctrl.py | 5 | ||||
-rw-r--r-- | src/s3ql/endian_indep.h | 5 | ||||
-rw-r--r-- | src/s3ql/fs.py | 111 | ||||
-rw-r--r-- | src/s3ql/fsck.py | 33 | ||||
-rw-r--r-- | src/s3ql/inode_cache.py | 41 | ||||
-rw-r--r-- | src/s3ql/metadata.py | 16 | ||||
-rw-r--r-- | src/s3ql/mkfs.py | 16 | ||||
-rw-r--r-- | src/s3ql/mount.py | 28 | ||||
-rw-r--r-- | src/s3ql/statfs.py | 26 |
14 files changed, 284 insertions, 175 deletions
diff --git a/src/s3ql/__init__.py b/src/s3ql/__init__.py index fabc193..5e17dab 100644 --- a/src/s3ql/__init__.py +++ b/src/s3ql/__init__.py @@ -22,12 +22,13 @@ __all__ = [ 'adm', 'backends', 'block_cache', 'common', 'calc_mro', 'REV_VER_MAP', 'RELEASE', 'BUFSIZE', 'CTRL_NAME', 'CTRL_INODE' ] -VERSION = '2.16' +VERSION = '2.17' RELEASE = '%s' % VERSION -# TODO: On next upgrade, remove pickle support from -# common.py:load_params(). -CURRENT_FS_REV = 22 +# TODO: On next revision bump, consider removing support for TIME +# values from deltadump.pyx. It is only present to allow upgrades +# from revisions <= 22. +CURRENT_FS_REV = 23 # Buffer size when writing objects BUFSIZE = 64 * 1024 diff --git a/src/s3ql/adm.py b/src/s3ql/adm.py index 2025e5e..b0dd334 100644 --- a/src/s3ql/adm.py +++ b/src/s3ql/adm.py @@ -10,14 +10,18 @@ from .logging import logging, QuietError, setup_logging from . import CURRENT_FS_REV, REV_VER_MAP from .backends.comprenc import ComprencBackend from .database import Connection +from .deltadump import TIME, INTEGER from .common import (get_backend_cachedir, get_seq_no, is_mounted, get_backend, - freeze_basic_mapping, load_params) + load_params, save_params) from .metadata import dump_and_upload_metadata, download_metadata +from . import metadata from .parse_args import ArgumentParser from datetime import datetime as Datetime from getpass import getpass +from contextlib import contextmanager import os import shutil +import functools import sys import textwrap import time @@ -139,8 +143,7 @@ def download_metadata_cmd(backend, storage_url): # downloaded backup seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in backend.list('s3ql_seq_no_') ] param['seq_no'] = max(seq_nos) + 1 - with open(cachepath + '.params', 'wb') as fh: - fh.write(freeze_basic_mapping(param)) + save_params(cachepath, param) def change_passphrase(backend): '''Change file system passphrase''' @@ -199,7 +202,7 @@ def get_old_rev_msg(rev, prog): $ wget http://s3ql.googlecode.com/files/s3ql-%(version)s.tar.bz2 \ || wget https://bitbucket.org/nikratio/s3ql/downloads/s3ql-%(version)s.tar.bz2 $ tar xjf s3ql-%(version)s.tar.bz2 - $ (cd s3ql-%(version)s; ./setup.py build_ext) + $ (cd s3ql-%(version)s; ./setup.py build_ext --inplace) $ s3ql-%(version)s/bin/%(prog)s <options> ''' % { 'version': REV_VER_MAP[rev], 'prog': prog }) @@ -213,7 +216,7 @@ def upgrade(backend, cachepath): db = None seq_no = get_seq_no(backend) if os.path.exists(cachepath + '.params'): - param = load_params(cachepath + '.params') + param = load_params(cachepath) if param['seq_no'] < seq_no: log.info('Ignoring locally cached metadata (outdated).') param = backend.lookup('s3ql_metadata') @@ -247,10 +250,7 @@ def upgrade(backend, cachepath): raise QuietError() # Check revision - # Upgrade from 21 to 22 is only possible with release 2.13, - # because we removed support for reading the old storage object - # format after 2.13. - if param['revision'] == 21 or param['revision'] < CURRENT_FS_REV-1: + if param['revision'] < CURRENT_FS_REV-1: print(textwrap.dedent(''' File system revision too old to upgrade! @@ -283,8 +283,8 @@ def upgrade(backend, cachepath): raise QuietError() if not db: - # Need to download metadata - db = download_metadata(backend, cachepath + '.db') + with monkeypatch_metadata_retrieval(): + db = download_metadata(backend, cachepath + '.db') log.info('Upgrading from revision %d to %d...', param['revision'], CURRENT_FS_REV) @@ -292,14 +292,17 @@ def upgrade(backend, cachepath): param['last-modified'] = time.time() param['seq_no'] += 1 - # Upgrade code goes here + # Upgrade + db.execute('ALTER TABLE inodes ADD COLUMN atime_ns INT NOT NULL DEFAULT 0') + db.execute('ALTER TABLE inodes ADD COLUMN mtime_ns INT NOT NULL DEFAULT 0') + db.execute('ALTER TABLE inodes ADD COLUMN ctime_ns INT NOT NULL DEFAULT 0') + db.execute('UPDATE inodes SET atime_ns = atime * 1e9') + db.execute('UPDATE inodes SET mtime_ns = mtime * 1e9') + db.execute('UPDATE inodes SET ctime_ns = ctime * 1e9') dump_and_upload_metadata(backend, db, param) - backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty' - - with open(cachepath + '.params', 'wb') as fh: - fh.write(freeze_basic_mapping(param)) + save_params(cachepath, param) log.info('Cleaning up local metadata...') db.execute('ANALYZE') @@ -307,5 +310,50 @@ def upgrade(backend, cachepath): print('File system upgrade complete.') + +@contextmanager +def monkeypatch_metadata_retrieval(): + DUMP_SPEC_bak = metadata.DUMP_SPEC + create_tables_bak = metadata.create_tables + + @functools.wraps(metadata.create_tables) + def create_tables(conn): + create_tables_bak(conn) + conn.execute('DROP TABLE inodes') + conn.execute(""" + CREATE TABLE inodes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + uid INT NOT NULL, + gid INT NOT NULL, + mode INT NOT NULL, + mtime REAL NOT NULL, + atime REAL NOT NULL, + ctime REAL NOT NULL, + refcount INT NOT NULL, + size INT NOT NULL DEFAULT 0, + rdev INT NOT NULL DEFAULT 0, + locked BOOLEAN NOT NULL DEFAULT 0 + )""") + metadata.create_tables = create_tables + + assert metadata.DUMP_SPEC[2][0] == 'inodes' + metadata.DUMP_SPEC[2] = ('inodes', 'id', (('id', INTEGER, 1), + ('uid', INTEGER), + ('gid', INTEGER), + ('mode', INTEGER), + ('mtime', TIME), + ('atime', TIME), + ('ctime', TIME), + ('size', INTEGER), + ('rdev', INTEGER), + ('locked', INTEGER), + ('refcount', INTEGER))) + + try: + yield + finally: + metadata.DUMP_SPEC = DUMP_SPEC_bak + metadata.create_tables = create_tables_bak + if __name__ == '__main__': main(sys.argv[1:]) diff --git a/src/s3ql/backends/s3.py b/src/s3ql/backends/s3.py index 1a20228..a1e6d20 100644 --- a/src/s3ql/backends/s3.py +++ b/src/s3ql/backends/s3.py @@ -32,7 +32,7 @@ class Backend(s3c.Backend): may or may not be available and can be queried for with instance methods. """ - known_options = ((s3c.Backend.known_options | { 'sse', 'rrs' }) + known_options = ((s3c.Backend.known_options | { 'sse', 'rrs', 'ia' }) - {'dumb-copy', 'disable-expect100'}) def __init__(self, storage_url, login, password, options): @@ -83,23 +83,29 @@ class Backend(s3c.Backend): finally: keys[:MAX_KEYS] = tmp + def _set_storage_options(self, headers): + if 'sse' in self.options: + headers['x-amz-server-side-encryption'] = 'AES256' + + if 'ia' in self.options: + sc = 'STANDARD_IA' + elif 'rrs' in self.options: + sc = 'REDUCED_REDUNDANCY' + else: + sc = 'STANDARD' + headers['x-amz-storage-class'] = sc + @copy_ancestor_docstring def copy(self, src, dest, metadata=None): extra_headers = {} - if 'sse' in self.options: - extra_headers['x-amz-server-side-encryption'] = 'AES256' - if 'rrs' in self.options: - extra_headers['x-amz-storage-class'] = 'REDUCED_REDUNDANCY' + self._set_storage_options(extra_headers) return super().copy(src, dest, metadata=metadata, extra_headers=extra_headers) @copy_ancestor_docstring def open_write(self, key, metadata=None, is_compressed=False): extra_headers = {} - if 'sse' in self.options: - extra_headers['x-amz-server-side-encryption'] = 'AES256' - if 'rrs' in self.options: - extra_headers['x-amz-storage-class'] = 'REDUCED_REDUNDANCY' + self._set_storage_options(extra_headers) return super().open_write(key, metadata=metadata, is_compressed=is_compressed, extra_headers=extra_headers) diff --git a/src/s3ql/block_cache.py b/src/s3ql/block_cache.py index f258f3c..c5dc566 100644 --- a/src/s3ql/block_cache.py +++ b/src/s3ql/block_cache.py @@ -19,6 +19,7 @@ import hashlib import shutil import threading import time +import sys # standard logger for this module log = logging.getLogger(__name__) @@ -930,19 +931,45 @@ class BlockCache(object): log.debug('finished') def get_usage(self): - '''Return cache size and dirty cache size + '''Get cache usage information. + + Return a tuple of + + * cache entries + * cache size + * dirty cache entries + * dirty cache size + * pending removals This method is O(n) in the number of cache entries. ''' used = self.cache.size - dirty = 0 + dirty_size = 0 + dirty_cnt = 0 for el in self.cache.values(): if el.dirty: - dirty += el.size + dirty_size += el.size + dirty_cnt += 1 + + if self.to_remove is None: + remove_cnt = 0 + else: + # This is an estimate which may be negative + remove_cnt = max(0, self.to_remove.qsize()) - return (used, dirty) + return (len(self.cache), used, dirty_cnt, dirty_size, remove_cnt) def __del__(self): - if len(self.cache) > 0: - raise RuntimeError("BlockManager instance was destroyed without calling destroy()!") + if len(self.cache) == 0: + return + + # Force execution of sys.excepthook (exceptions raised + # by __del__ are ignored) + try: + raise RuntimeError("BlockManager instance was destroyed without " + "calling destroy()!") + except RuntimeError: + exc_info = sys.exc_info() + + sys.excepthook(*exc_info) diff --git a/src/s3ql/common.py b/src/s3ql/common.py index c8a3fc1..09216cb 100644 --- a/src/s3ql/common.py +++ b/src/s3ql/common.py @@ -20,8 +20,8 @@ import threading import traceback import sys import os +import time import subprocess -import pickle import errno import hashlib import llfuse @@ -611,11 +611,19 @@ def freeze_basic_mapping(d): buf = '{ %s }' % ', '.join(els) return buf.encode('utf-8') -def load_params(fname): - with open(fname, 'rb') as fh: - proto = fh.read(2) - fh.seek(0) - if proto == b'\x80\x02': - return pickle.load(fh) - else: - return thaw_basic_mapping(fh.read()) +def load_params(cachepath): + with open(cachepath + '.params' , 'rb') as fh: + return thaw_basic_mapping(fh.read()) + +def save_params(cachepath, param): + with open(cachepath + '.params', 'wb') as fh: + fh.write(freeze_basic_mapping(param)) + + # Fsync to make sure that the updated sequence number is committed to + # disk. Otherwise, a crash immediately after mount could result in both + # the local and remote metadata appearing to be out of date. + fh.flush() + os.fsync(fh.fileno()) + +def time_ns(): + return int(time.time() * 1e9) diff --git a/src/s3ql/ctrl.py b/src/s3ql/ctrl.py index 38d4f0d..ad5f4ae 100644 --- a/src/s3ql/ctrl.py +++ b/src/s3ql/ctrl.py @@ -89,11 +89,12 @@ def main(args=None): llfuse.setxattr(ctrlfile, 'upload-meta', b'dummy') elif options.action == 'log': - cmd = ('(%r, %r)' % (options.level, ','.join(options.modules))).encode() + level = getattr(logging, options.level.upper()) + cmd = ('(%r, %r)' % (level, ','.join(options.modules))).encode() llfuse.setxattr(ctrlfile, 'logging', cmd) elif options.action == 'cachesize': - llfuse.setxattr(ctrlfile, 'cachesize', ('%d' % options.cachesize * 1024).encode()) + llfuse.setxattr(ctrlfile, 'cachesize', ('%d' % (options.cachesize * 1024,)).encode()) if __name__ == '__main__': main(sys.argv[1:]) diff --git a/src/s3ql/endian_indep.h b/src/s3ql/endian_indep.h index ee8b917..91283fc 100644 --- a/src/s3ql/endian_indep.h +++ b/src/s3ql/endian_indep.h @@ -27,7 +27,10 @@ This program can be distributed under the terms of the GNU GPLv3. #define be64toh(x) OSSwapBigToHostInt64(x) #define le64toh(x) OSSwapLittleToHostInt64(x) -#else /* __APPLE__ */ +#elif __FreeBSD__ +#include <sys/endian.h> + +#else #include <endian.h> #endif diff --git a/src/s3ql/fs.py b/src/s3ql/fs.py index f00f35d..916a2a8 100644 --- a/src/s3ql/fs.py +++ b/src/s3ql/fs.py @@ -9,7 +9,7 @@ This work can be distributed under the terms of the GNU GPLv3. from .logging import logging from . import deltadump, CTRL_NAME, CTRL_INODE from .backends.common import NoSuchObject, CorruptedObjectError -from .common import get_path, parse_literal +from .common import get_path, parse_literal, time_ns from .database import NoSuchRowError from .inode_cache import OutOfInodesError from io import BytesIO @@ -56,7 +56,6 @@ if not hasattr(errno, 'EOPNOTSUPP'): else: ACL_ERRNO = errno.EOPNOTSUPP - class Operations(llfuse.Operations): """A full-featured file system for online data storage @@ -168,10 +167,10 @@ class Operations(llfuse.Operations): def readlink(self, id_, ctx): log.debug('started with %d', id_) - timestamp = time.time() + now_ns = time_ns() inode = self.inodes[id_] - if inode.atime < inode.ctime or inode.atime < inode.mtime: - inode.atime = timestamp + if inode.atime_ns < inode.ctime_ns or inode.atime_ns < inode.mtime_ns: + inode.atime_ns = now_ns try: return self.db.get_val("SELECT target FROM symlink_targets WHERE inode=?", (id_,)) except NoSuchRowError: @@ -195,8 +194,8 @@ class Operations(llfuse.Operations): off = -1 inode = self.inodes[id_] - if inode.atime < inode.ctime or inode.atime < inode.mtime: - inode.atime = time.time() + if inode.atime_ns < inode.ctime_ns or inode.atime_ns < inode.mtime_ns: + inode.atime_ns = time_ns() with self.db.query("SELECT name_id, name, inode FROM contents_v " 'WHERE parent_inode=? AND name_id > ? ORDER BY name_id', @@ -277,8 +276,7 @@ class Operations(llfuse.Operations): elif name == b'logging': try: - (lvl, modules)= parse_literal(value, (str, str)) - lvl = logging._levelNames[lvl.upper()] + (lvl, modules)= parse_literal(value, (int, str)) except (ValueError, KeyError): log.warning('Received malformed command via control inode') raise FUSEError.EINVAL() @@ -290,6 +288,7 @@ class Operations(llfuse.Operations): except ValueError: log.warning('Received malformed command via control inode') raise FUSEError.EINVAL() + log.debug('updated cache size to %d bytes', self.cache.cache.max_size) else: log.warning('Received unknown command via control inode') @@ -309,7 +308,7 @@ class Operations(llfuse.Operations): self.db.execute('INSERT OR REPLACE INTO ext_attributes (inode, name_id, value) ' 'VALUES(?, ?, ?)', (id_, self._add_name(name), value)) - self.inodes[id_].ctime = time.time() + self.inodes[id_].ctime_ns = time_ns() def removexattr(self, id_, name, ctx): log.debug('started with %d, %r', id_, name) @@ -327,7 +326,7 @@ class Operations(llfuse.Operations): if changes == 0: raise llfuse.FUSEError(llfuse.ENOATTR) - self.inodes[id_].ctime = time.time() + self.inodes[id_].ctime_ns = time_ns() def lock_tree(self, id0): '''Lock directory tree''' @@ -339,7 +338,7 @@ class Operations(llfuse.Operations): queue = [ id0 ] self.inodes[id0].locked = True processed = 0 # Number of steps since last GIL release - stamp = time.time() # Time of last GIL release + stamp = time_ns() # Time of last GIL release gil_step = 250 # Approx. number of steps between GIL releases while True: id_p = queue.pop() @@ -457,13 +456,13 @@ class Operations(llfuse.Operations): target_inode = self.inodes[target_id] except KeyError: raise FUSEError(errno.ENOENT) - for attr in ('atime', 'ctime', 'mtime', 'mode', 'uid', 'gid'): + for attr in ('atime_ns', 'ctime_ns', 'mtime_ns', 'mode', 'uid', 'gid'): setattr(target_inode, attr, getattr(src_inode, attr)) # We first replicate into a dummy inode, so that we # need to invalidate only once. - timestamp = time.time() - tmp = make_inode(mtime=timestamp, ctime=timestamp, atime=timestamp, + now_ns = time_ns() + tmp = make_inode(mtime_ns=now_ns, ctime_ns=now_ns, atime_ns=now_ns, uid=0, gid=0, mode=0, refcount=0) queue = [ (src_id, tmp.id, 0) ] @@ -484,8 +483,8 @@ class Operations(llfuse.Operations): try: inode_new = make_inode(refcount=1, mode=inode.mode, size=inode.size, uid=inode.uid, gid=inode.gid, - mtime=inode.mtime, atime=inode.atime, - ctime=inode.ctime, rdev=inode.rdev) + mtime_ns=inode.mtime_ns, atime_ns=inode.atime_ns, + ctime_ns=inode.ctime_ns, rdev=inode.rdev) except OutOfInodesError: log.warning('Could not find a free inode') raise FUSEError(errno.ENOSPC) @@ -591,7 +590,7 @@ class Operations(llfuse.Operations): log.debug('started with %d, %r', id_p, name) - timestamp = time.time() + now_ns = time_ns() # Check that there are no child entries if self.db.has_val("SELECT 1 FROM contents WHERE parent_inode=?", (id_,)): @@ -608,11 +607,11 @@ class Operations(llfuse.Operations): inode = self.inodes[id_] inode.refcount -= 1 - inode.ctime = timestamp + inode.ctime_ns = now_ns inode_p = self.inodes[id_p] - inode_p.mtime = timestamp - inode_p.ctime = timestamp + inode_p.mtime_ns = now_ns + inode_p.ctime_ns = now_ns if inode.refcount == 0 and id_ not in self.open_inodes: log.debug('removing from cache') @@ -717,7 +716,7 @@ class Operations(llfuse.Operations): return name_id def _rename(self, id_p_old, name_old, id_p_new, name_new): - timestamp = time.time() + now_ns = time_ns() name_id_new = self._add_name(name_new) name_id_old = self._del_name(name_old) @@ -728,15 +727,15 @@ class Operations(llfuse.Operations): inode_p_old = self.inodes[id_p_old] inode_p_new = self.inodes[id_p_new] - inode_p_old.mtime = timestamp - inode_p_new.mtime = timestamp - inode_p_old.ctime = timestamp - inode_p_new.ctime = timestamp + inode_p_old.mtime_ns = now_ns + inode_p_new.mtime_ns = now_ns + inode_p_old.ctime_ns = now_ns + inode_p_new.ctime_ns = now_ns def _replace(self, id_p_old, name_old, id_p_new, name_new, id_old, id_new): - timestamp = time.time() + now_ns = time_ns() if self.db.has_val("SELECT 1 FROM contents WHERE parent_inode=?", (id_new,)): log.info("Attempted to overwrite entry with children: %s", @@ -755,15 +754,15 @@ class Operations(llfuse.Operations): inode_new = self.inodes[id_new] inode_new.refcount -= 1 - inode_new.ctime = timestamp + inode_new.ctime_ns = now_ns inode_p_old = self.inodes[id_p_old] - inode_p_old.ctime = timestamp - inode_p_old.mtime = timestamp + inode_p_old.ctime_ns = now_ns + inode_p_old.mtime_ns = now_ns inode_p_new = self.inodes[id_p_new] - inode_p_new.ctime = timestamp - inode_p_new.mtime = timestamp + inode_p_new.ctime_ns = now_ns + inode_p_new.mtime_ns = now_ns if inode_new.refcount == 0 and id_new not in self.open_inodes: self.cache.remove(id_new, 0, @@ -787,7 +786,7 @@ class Operations(llfuse.Operations): get_path(new_id_p, self.db, new_name)) raise llfuse.FUSEError(errno.EACCES) - timestamp = time.time() + now_ns = time_ns() inode_p = self.inodes[new_id_p] if inode_p.refcount == 0: @@ -798,14 +797,14 @@ class Operations(llfuse.Operations): if self.failsafe or inode_p.locked: raise FUSEError(errno.EPERM) - inode_p.ctime = timestamp - inode_p.mtime = timestamp + inode_p.ctime_ns = now_ns + inode_p.mtime_ns = now_ns self.db.execute("INSERT INTO contents (name_id, inode, parent_inode) VALUES(?,?,?)", (self._add_name(new_name), id_, new_id_p)) inode = self.inodes[id_] inode.refcount += 1 - inode.ctime = timestamp + inode.ctime_ns = now_ns self.open_inodes[inode.id] += 1 return inode.entry_attributes() @@ -815,7 +814,7 @@ class Operations(llfuse.Operations): inode = self.inodes[id_] if fh is not None: assert fh == id_ - timestamp = time.time() + now_ns = time_ns() if self.failsafe or inode.locked: raise FUSEError(errno.EPERM) @@ -861,12 +860,12 @@ class Operations(llfuse.Operations): inode.gid = attr.st_gid if fields.update_atime: - inode.atime = attr.st_atime_ns / 1e9 + inode.atime_ns = attr.st_atime_ns if fields.update_mtime: - inode.mtime = attr.st_mtime_ns / 1e9 + inode.mtime_ns = attr.st_mtime_ns - inode.ctime = timestamp + inode.ctime_ns = now_ns return inode.entry_attributes() @@ -900,11 +899,9 @@ class Operations(llfuse.Operations): fs_size = self.db.get_val('SELECT SUM(size) FROM inodes') or 0 dedup_size = self.db.get_val('SELECT SUM(size) FROM blocks') or 0 compr_size = self.db.get_val('SELECT SUM(size) FROM objects') or 0 - (cache_used, cache_dirty) = self.cache.get_usage() - - return struct.pack('QQQQQQQQQ', entries, blocks, inodes, fs_size, dedup_size, - compr_size, self.db.get_size(), cache_used, cache_dirty) + return struct.pack('QQQQQQQQQQQQ', entries, blocks, inodes, fs_size, dedup_size, + compr_size, self.db.get_size(), *self.cache.get_usage()) def statfs(self, ctx): log.debug('started') @@ -990,7 +987,7 @@ class Operations(llfuse.Operations): get_path(id_p, self.db, name)) raise FUSEError(errno.EACCES) - timestamp = time.time() + now_ns = time_ns() inode_p = self.inodes[id_p] if inode_p.locked: @@ -1000,12 +997,18 @@ class Operations(llfuse.Operations): log.warning('Attempted to create entry %s with unlinked parent %d', name, id_p) raise FUSEError(errno.EINVAL) - inode_p.mtime = timestamp - inode_p.ctime = timestamp + inode_p.mtime_ns = now_ns + inode_p.ctime_ns = now_ns + if inode_p.mode & stat.S_ISGID: + gid = inode_p.gid + if stat.S_ISDIR(mode): + mode |= stat.S_ISGID + else: + gid = ctx.gid try: - inode = self.inodes.create_inode(mtime=timestamp, ctime=timestamp, atime=timestamp, - uid=ctx.uid, gid=ctx.gid, mode=mode, refcount=1, + inode = self.inodes.create_inode(mtime_ns=now_ns, ctime_ns=now_ns, atime_ns=now_ns, + uid=ctx.uid, gid=gid, mode=mode, refcount=1, rdev=rdev, size=size) except OutOfInodesError: log.warning('Could not find a free inode') @@ -1043,8 +1046,8 @@ class Operations(llfuse.Operations): # Inode may have expired from cache inode = self.inodes[fh] - if inode.atime < inode.ctime or inode.atime < inode.mtime: - inode.atime = time.time() + if inode.atime_ns < inode.ctime_ns or inode.atime_ns < inode.mtime_ns: + inode.atime_ns = time_ns() return buf.getvalue() @@ -1071,11 +1074,11 @@ class Operations(llfuse.Operations): # so we have to be careful not to undo a size extension made by # a concurrent write (because _readwrite() releases the global # lock). - timestamp = time.time() + now_ns = time_ns() inode = self.inodes[fh] inode.size = max(inode.size, minsize) - inode.mtime = timestamp - inode.ctime = timestamp + inode.mtime_ns = now_ns + inode.ctime_ns = now_ns return total diff --git a/src/s3ql/fsck.py b/src/s3ql/fsck.py index b46ae59..1235da0 100644 --- a/src/s3ql/fsck.py +++ b/src/s3ql/fsck.py @@ -13,14 +13,13 @@ from .backends.comprenc import ComprencBackend from .backends.local import Backend as LocalBackend from .common import (inode_for_path, sha256_fh, get_path, get_backend_cachedir, get_seq_no, is_mounted, get_backend, load_params, - freeze_basic_mapping) + save_params, time_ns) from .database import NoSuchRowError, Connection from .metadata import create_tables, dump_and_upload_metadata, download_metadata from .parse_args import ArgumentParser from os.path import basename import apsw import os -import hashlib import re import shutil import itertools @@ -218,7 +217,7 @@ class Fsck(object): log.info('Checking lost+found...') - timestamp = time.time() + now_ns = time_ns() try: (inode_l, name_id) = self.conn.get_row("SELECT inode, name_id FROM contents_v " "WHERE name=? AND parent_inode=?", (b"lost+found", ROOT_INODE)) @@ -227,7 +226,7 @@ class Fsck(object): self.found_errors = True self.log_error("Recreating missing lost+found directory") inode_l = self.create_inode(mode=stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR, - atime=timestamp, ctime=timestamp, mtime=timestamp, + atime_ns=now_ns, ctime_ns=now_ns, mtime_ns=now_ns, refcount=1) self.conn.execute("INSERT INTO contents (name_id, inode, parent_inode) VALUES(?,?,?)", (self._add_name(b"lost+found"), inode_l, ROOT_INODE)) @@ -241,7 +240,7 @@ class Fsck(object): # We leave the old inode unassociated, so that it will be added # to lost+found later on. inode_l = self.create_inode(mode=stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR, - atime=timestamp, ctime=timestamp, mtime=timestamp, + atime_ns=now_ns, ctime_ns=now_ns, mtime_ns=now_ns, refcount=1) self.conn.execute('UPDATE contents SET inode=? WHERE name_id=? AND parent_inode=?', (inode_l, name_id, ROOT_INODE)) @@ -623,10 +622,10 @@ class Fsck(object): (id_p, name) = self.resolve_free(b"/lost+found", ("block-%d" % id_).encode()) self.log_error("Block %d not referenced, adding as /lost+found/%s", id_, to_str(name)) - timestamp = time.time() + now_ns = time_ns() size = self.conn.get_val('SELECT size FROM blocks WHERE id=?', (id_,)) inode = self.create_inode(mode=stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR, - mtime=timestamp, atime=timestamp, ctime=timestamp, + mtime_ns=now_ns, atime_ns=now_ns, ctime_ns=now_ns, refcount=1, size=size) self.conn.execute('INSERT INTO inode_blocks (inode, blockno, block_id) VALUES(?,?,?)', (inode, 0, id_)) @@ -667,13 +666,13 @@ class Fsck(object): self.conn.execute('DELETE FROM objects WHERE id=?', (obj_id,)) def create_inode(self, mode, uid=os.getuid(), gid=os.getgid(), - mtime=None, atime=None, ctime=None, refcount=None, + mtime_ns=None, atime_ns=None, ctime_ns=None, refcount=None, size=0): '''Create inode''' - id_ = self.conn.rowid('INSERT INTO inodes (mode,uid,gid,mtime,atime,ctime,' + id_ = self.conn.rowid('INSERT INTO inodes (mode,uid,gid,mtime_ns,atime_ns,ctime_ns,' 'refcount,size) VALUES (?,?,?,?,?,?,?,?)', - (mode, uid, gid, mtime, atime, ctime, refcount, size)) + (mode, uid, gid, mtime_ns, atime_ns, ctime_ns, refcount, size)) return id_ @@ -1051,7 +1050,7 @@ class ROFsck(Fsck): db = Connection(path + '.db') db.execute('PRAGMA journal_mode = WAL') - param = load_params(path + '.params') + param = load_params(path) super().__init__(None, None, param, db) def check(self): @@ -1165,7 +1164,7 @@ def main(args=None): if os.path.exists(cachepath + '.params'): assert os.path.exists(cachepath + '.db') - param = load_params(cachepath + '.params') + param = load_params(cachepath) if param['seq_no'] < seq_no: log.info('Ignoring locally cached metadata (outdated).') param = backend.lookup('s3ql_metadata') @@ -1262,8 +1261,7 @@ def main(args=None): param['seq_no'] += 1 param['needs_fsck'] = True backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty' - with open(cachepath + '.params', 'wb') as fh: - fh.write(freeze_basic_mapping(param)) + save_params(cachepath, param) fsck = Fsck(cachepath + '-cache', backend, param, db) fsck.check() @@ -1289,8 +1287,7 @@ def main(args=None): param['last-modified'] = time.time() dump_and_upload_metadata(backend, db, param) - with open(cachepath + '.params', 'wb') as fh: - fh.write(freeze_basic_mapping(param)) + save_params(cachepath, param) log.info('Cleaning up local metadata...') db.execute('ANALYZE') @@ -1328,9 +1325,9 @@ def renumber_inodes(db): (CTRL_INODE,)) log.info('..inodes..') - db.execute('INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount,size,locked,rdev) ' + db.execute('INSERT INTO inodes (id,mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount,size,locked,rdev) ' 'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = inodes_old.id), ' - ' mode,uid,gid,mtime,atime,ctime,refcount,size,locked,rdev FROM inodes_old') + ' mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount,size,locked,rdev FROM inodes_old') log.info('..inode_blocks..') db.execute('INSERT INTO inode_blocks (inode, blockno, block_id) ' diff --git a/src/s3ql/inode_cache.py b/src/s3ql/inode_cache.py index fda0491..77c8985 100644 --- a/src/s3ql/inode_cache.py +++ b/src/s3ql/inode_cache.py @@ -9,15 +9,16 @@ This work can be distributed under the terms of the GNU GPLv3. from .logging import logging # Ensure use of custom logger class from .database import NoSuchRowError import llfuse +import sys log = logging.getLogger(__name__) CACHE_SIZE = 100 ATTRIBUTES = ('mode', 'refcount', 'uid', 'gid', 'size', 'locked', - 'rdev', 'atime', 'mtime', 'ctime', 'id') + 'rdev', 'atime_ns', 'mtime_ns', 'ctime_ns', 'id') ATTRIBUTE_STR = ', '.join(ATTRIBUTES) UPDATE_ATTRS = ('mode', 'refcount', 'uid', 'gid', 'size', 'locked', - 'rdev', 'atime', 'mtime', 'ctime') + 'rdev', 'atime_ns', 'mtime_ns', 'ctime_ns') UPDATE_STR = ', '.join('%s=?' % x for x in UPDATE_ATTRS) MAX_INODE = 2 ** 32 - 1 @@ -52,9 +53,9 @@ class _Inode: attr.st_gid = self.gid attr.st_size = self.size attr.st_rdev = self.rdev - attr.st_atime_ns = int(self.atime*1e9+0.5) - attr.st_mtime_ns = int(self.mtime*1e9+0.5) - attr.st_ctime_ns = int(self.ctime*1e9+0.5) + attr.st_atime_ns = self.atime_ns + attr.st_mtime_ns = self.mtime_ns + attr.st_ctime_ns = self.ctime_ns attr.generation = self.generation return attr @@ -81,6 +82,18 @@ class _Inode: object.__setattr__(self, 'dirty', True) object.__setattr__(self, name, value) + def __del__(self): + if not self.dirty: + return + + # Force execution of sys.excepthook (exceptions raised + # by __del__ are ignored) + try: + raise RuntimeError('BUG ALERT: Dirty inode was destroyed!') + except RuntimeError: + exc_info = sys.exc_info() + + sys.excepthook(*exc_info) class InodeCache(object): ''' @@ -136,10 +149,9 @@ class InodeCache(object): def __delitem__(self, inode): if self.db.execute('DELETE FROM inodes WHERE id=?', (inode,)) != 1: raise KeyError('No such inode') - try: - del self.attrs[inode] - except KeyError: - pass + inode = self.attrs.pop(inode, None) + if inode is not None: + inode.dirty = False def __getitem__(self, id_): try: @@ -239,8 +251,17 @@ class InodeCache(object): self.setattr(inode) def __del__(self): - if len(self.attrs) > 0: + if len(self.attrs) == 0: + return + + # Force execution of sys.excepthook (exceptions raised + # by __del__ are ignored) + try: raise RuntimeError('InodeCache instance was destroyed without calling destroy()') + except RuntimeError: + exc_info = sys.exc_info() + + sys.excepthook(*exc_info) diff --git a/src/s3ql/metadata.py b/src/s3ql/metadata.py index 498721b..7e0d2f2 100644 --- a/src/s3ql/metadata.py +++ b/src/s3ql/metadata.py @@ -10,7 +10,7 @@ from .logging import logging # Ensure use of custom logger class from .database import Connection from . import BUFSIZE from .common import pretty_print_size -from .deltadump import INTEGER, BLOB, TIME, dump_table, load_table +from .deltadump import INTEGER, BLOB, dump_table, load_table from .backends.common import NoSuchObject, CorruptedObjectError import os import tempfile @@ -35,9 +35,9 @@ DUMP_SPEC = [ ('uid', INTEGER), ('gid', INTEGER), ('mode', INTEGER), - ('mtime', TIME), - ('atime', TIME), - ('ctime', TIME), + ('mtime_ns', INTEGER), + ('atime_ns', INTEGER), + ('ctime_ns', INTEGER), ('size', INTEGER), ('rdev', INTEGER), ('locked', INTEGER), @@ -78,8 +78,6 @@ def restore_metadata(fh, dbfile): *dbfile* will be created with 0600 permissions. Data is first written into a temporary file *dbfile* + '.tmp', and the file is renamed once all data has been loaded. - - ''' tmpfile = dbfile + '.tmp' @@ -195,9 +193,9 @@ def create_tables(conn): uid INT NOT NULL, gid INT NOT NULL, mode INT NOT NULL, - mtime REAL NOT NULL, - atime REAL NOT NULL, - ctime REAL NOT NULL, + mtime_ns INT NOT NULL, + atime_ns INT NOT NULL, + ctime_ns INT NOT NULL, refcount INT NOT NULL, size INT NOT NULL DEFAULT 0, rdev INT NOT NULL DEFAULT 0, diff --git a/src/s3ql/mkfs.py b/src/s3ql/mkfs.py index 3281106..b6da7ce 100644 --- a/src/s3ql/mkfs.py +++ b/src/s3ql/mkfs.py @@ -11,7 +11,7 @@ from . import CURRENT_FS_REV, CTRL_INODE, ROOT_INODE from .backends.comprenc import ComprencBackend from .backends import s3 from .common import (get_backend_cachedir, get_backend, split_by_n, - freeze_basic_mapping) + freeze_basic_mapping, time_ns) from .database import Connection from .metadata import dump_and_upload_metadata, create_tables from .parse_args import ArgumentParser @@ -57,24 +57,24 @@ def parse_args(args): def init_tables(conn): # Insert root directory - timestamp = time.time() - conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount) " + now_ns = time_ns() + conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount) " "VALUES (?,?,?,?,?,?,?,?)", (ROOT_INODE, stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH, - os.getuid(), os.getgid(), timestamp, timestamp, timestamp, 1)) + os.getuid(), os.getgid(), now_ns, now_ns, now_ns, 1)) # Insert control inode, the actual values don't matter that much - conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount) " + conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount) " "VALUES (?,?,?,?,?,?,?,?)", (CTRL_INODE, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR, - 0, 0, timestamp, timestamp, timestamp, 42)) + 0, 0, now_ns, now_ns, now_ns, 42)) # Insert lost+found directory - inode = conn.rowid("INSERT INTO inodes (mode,uid,gid,mtime,atime,ctime,refcount) " + inode = conn.rowid("INSERT INTO inodes (mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount) " "VALUES (?,?,?,?,?,?,?)", (stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR, - os.getuid(), os.getgid(), timestamp, timestamp, timestamp, 1)) + os.getuid(), os.getgid(), now_ns, now_ns, now_ns, 1)) name_id = conn.rowid('INSERT INTO names (name, refcount) VALUES(?,?)', (b'lost+found', 1)) conn.execute("INSERT INTO contents (name_id, inode, parent_inode) VALUES(?,?,?)", diff --git a/src/s3ql/mount.py b/src/s3ql/mount.py index 7a3ed61..ffb9b2f 100644 --- a/src/s3ql/mount.py +++ b/src/s3ql/mount.py @@ -11,7 +11,7 @@ from . import fs, CURRENT_FS_REV from .backends.pool import BackendPool from .block_cache import BlockCache from .common import (get_backend_cachedir, get_seq_no, get_backend_factory, - load_params, freeze_basic_mapping) + load_params, save_params) from .daemonize import daemonize from .database import Connection from .inode_cache import InodeCache @@ -170,10 +170,7 @@ def main(args=None): unmount_clean = False def unmount(): log.info("Unmounting file system...") - # Acquire lock so that Operations.destroy() is called with the - # global lock like all other handlers - with llfuse.lock: - llfuse.close(unmount=unmount_clean) + llfuse.close(unmount=unmount_clean) cm.callback(unmount) if options.fg: @@ -244,13 +241,11 @@ def main(args=None): log.info('File system unchanged, not uploading metadata.') del backend['s3ql_seq_no_%d' % param['seq_no']] param['seq_no'] -= 1 - with open(cachepath + '.params', 'wb') as fh: - fh.write(freeze_basic_mapping(param)) + save_params(cachepath, param) elif seq_no == param['seq_no']: param['last-modified'] = time.time() dump_and_upload_metadata(backend, db, param) - with open(cachepath + '.params', 'wb') as fh: - fh.write(freeze_basic_mapping(param)) + save_params(cachepath, param) else: log.error('Remote metadata is newer than local (%d vs %d), ' 'refusing to overwrite!', seq_no, param['seq_no']) @@ -365,7 +360,7 @@ def get_metadata(backend, cachepath): # Check for cached metadata db = None if os.path.exists(cachepath + '.params'): - param = load_params(cachepath + '.params') + param = load_params(cachepath) if param['seq_no'] < seq_no: log.info('Ignoring locally cached metadata (outdated).') param = backend.lookup('s3ql_metadata') @@ -419,9 +414,7 @@ def get_metadata(backend, cachepath): # Download metadata if not db: db = download_metadata(backend, cachepath + '.db') - - with open(cachepath + '.params', 'wb') as fh: - fh.write(freeze_basic_mapping(param)) + save_params(cachepath, param) return (param, db) @@ -430,14 +423,7 @@ def mark_metadata_dirty(backend, cachepath, param): param['seq_no'] += 1 param['needs_fsck'] = True - with open(cachepath + '.params', 'wb') as fh: - fh.write(freeze_basic_mapping(param)) - - # Fsync to make sure that the updated sequence number is committed to - # disk. Otherwise, a crash immediately after mount could result in both - # the local and remote metadata appearing to be out of date. - fh.flush() - os.fsync(fh.fileno()) + save_params(cachepath, param) backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty' param['needs_fsck'] = False diff --git a/src/s3ql/statfs.py b/src/s3ql/statfs.py index 0a9e455..533c68a 100644 --- a/src/s3ql/statfs.py +++ b/src/s3ql/statfs.py @@ -30,6 +30,9 @@ def parse_args(args): type=(lambda x: x.rstrip('/')), help='Mount point of the file system to examine') + parser.add_argument("--raw", action="store_true", default=False, + help="Do not pretty-print numbers") + return parser.parse_args(args) def main(args=None): @@ -41,29 +44,36 @@ def main(args=None): options = parse_args(args) setup_logging(options) + if options.raw: + pprint = lambda x: '%d bytes' % x + else: + pprint = pretty_print_size + ctrlfile = assert_fs_owner(options.mountpoint, mountpoint=True) # Use a decent sized buffer, otherwise the statistics have to be - # calculated thee(!) times because we need to invoce getxattr + # calculated three(!) times because we need to invoke getxattr # three times. buf = llfuse.getxattr(ctrlfile, 's3qlstat', size_guess=256) (entries, blocks, inodes, fs_size, dedup_size, - compr_size, db_size, cache_used, cache_dirty) = struct.unpack('QQQQQQQQQ', buf) + compr_size, db_size, cache_cnt, cache_size, dirty_cnt, + dirty_size, removal_cnt) = struct.unpack('QQQQQQQQQQQQ', buf) p_dedup = dedup_size * 100 / fs_size if fs_size else 0 p_compr_1 = compr_size * 100 / fs_size if fs_size else 0 p_compr_2 = compr_size * 100 / dedup_size if dedup_size else 0 print ('Directory entries: %d' % entries, 'Inodes: %d' % inodes, 'Data blocks: %d' % blocks, - 'Total data size: %s' % pretty_print_size(fs_size), + 'Total data size: %s' % pprint(fs_size), 'After de-duplication: %s (%.2f%% of total)' - % (pretty_print_size(dedup_size), p_dedup), + % (pprint(dedup_size), p_dedup), 'After compression: %s (%.2f%% of total, %.2f%% of de-duplicated)' - % (pretty_print_size(compr_size), p_compr_1, p_compr_2), - 'Database size: %s (uncompressed)' % pretty_print_size(db_size), - 'Cache usage: %s (dirty: %s)' % (pretty_print_size(cache_used), - pretty_print_size(cache_dirty)), + % (pprint(compr_size), p_compr_1, p_compr_2), + 'Database size: %s (uncompressed)' % pprint(db_size), + 'Cache size: %s, %d entries' % (pprint(cache_size), cache_cnt), + 'Cache size (dirty): %s, %d entries' % (pprint(dirty_size), dirty_cnt), + 'Queued object removals: %d' % (removal_cnt,), sep='\n') |