summaryrefslogtreecommitdiff
path: root/src/s3ql
diff options
context:
space:
mode:
authorNikolaus Rath <Nikolaus@rath.org>2016-03-09 10:36:48 -0800
committerNikolaus Rath <Nikolaus@rath.org>2016-03-09 10:36:48 -0800
commit3134cd3962e1a0d52a74a5ba8be8b8f74b4898e4 (patch)
tree3eee070d1db8d3311dbcb221e9fdc03cade34425 /src/s3ql
parentee66f49d060aae35cd189b540bd556876fe746c6 (diff)
Import s3ql_2.17+dfsg.orig.tar.gz
Diffstat (limited to 'src/s3ql')
-rw-r--r--src/s3ql/__init__.py9
-rw-r--r--src/s3ql/adm.py80
-rw-r--r--src/s3ql/backends/s3.py24
-rw-r--r--src/s3ql/block_cache.py39
-rw-r--r--src/s3ql/common.py26
-rw-r--r--src/s3ql/ctrl.py5
-rw-r--r--src/s3ql/endian_indep.h5
-rw-r--r--src/s3ql/fs.py111
-rw-r--r--src/s3ql/fsck.py33
-rw-r--r--src/s3ql/inode_cache.py41
-rw-r--r--src/s3ql/metadata.py16
-rw-r--r--src/s3ql/mkfs.py16
-rw-r--r--src/s3ql/mount.py28
-rw-r--r--src/s3ql/statfs.py26
14 files changed, 284 insertions, 175 deletions
diff --git a/src/s3ql/__init__.py b/src/s3ql/__init__.py
index fabc193..5e17dab 100644
--- a/src/s3ql/__init__.py
+++ b/src/s3ql/__init__.py
@@ -22,12 +22,13 @@ __all__ = [ 'adm', 'backends', 'block_cache', 'common', 'calc_mro',
'REV_VER_MAP', 'RELEASE', 'BUFSIZE',
'CTRL_NAME', 'CTRL_INODE' ]
-VERSION = '2.16'
+VERSION = '2.17'
RELEASE = '%s' % VERSION
-# TODO: On next upgrade, remove pickle support from
-# common.py:load_params().
-CURRENT_FS_REV = 22
+# TODO: On next revision bump, consider removing support for TIME
+# values from deltadump.pyx. It is only present to allow upgrades
+# from revisions <= 22.
+CURRENT_FS_REV = 23
# Buffer size when writing objects
BUFSIZE = 64 * 1024
diff --git a/src/s3ql/adm.py b/src/s3ql/adm.py
index 2025e5e..b0dd334 100644
--- a/src/s3ql/adm.py
+++ b/src/s3ql/adm.py
@@ -10,14 +10,18 @@ from .logging import logging, QuietError, setup_logging
from . import CURRENT_FS_REV, REV_VER_MAP
from .backends.comprenc import ComprencBackend
from .database import Connection
+from .deltadump import TIME, INTEGER
from .common import (get_backend_cachedir, get_seq_no, is_mounted, get_backend,
- freeze_basic_mapping, load_params)
+ load_params, save_params)
from .metadata import dump_and_upload_metadata, download_metadata
+from . import metadata
from .parse_args import ArgumentParser
from datetime import datetime as Datetime
from getpass import getpass
+from contextlib import contextmanager
import os
import shutil
+import functools
import sys
import textwrap
import time
@@ -139,8 +143,7 @@ def download_metadata_cmd(backend, storage_url):
# downloaded backup
seq_nos = [ int(x[len('s3ql_seq_no_'):]) for x in backend.list('s3ql_seq_no_') ]
param['seq_no'] = max(seq_nos) + 1
- with open(cachepath + '.params', 'wb') as fh:
- fh.write(freeze_basic_mapping(param))
+ save_params(cachepath, param)
def change_passphrase(backend):
'''Change file system passphrase'''
@@ -199,7 +202,7 @@ def get_old_rev_msg(rev, prog):
$ wget http://s3ql.googlecode.com/files/s3ql-%(version)s.tar.bz2 \
|| wget https://bitbucket.org/nikratio/s3ql/downloads/s3ql-%(version)s.tar.bz2
$ tar xjf s3ql-%(version)s.tar.bz2
- $ (cd s3ql-%(version)s; ./setup.py build_ext)
+ $ (cd s3ql-%(version)s; ./setup.py build_ext --inplace)
$ s3ql-%(version)s/bin/%(prog)s <options>
''' % { 'version': REV_VER_MAP[rev],
'prog': prog })
@@ -213,7 +216,7 @@ def upgrade(backend, cachepath):
db = None
seq_no = get_seq_no(backend)
if os.path.exists(cachepath + '.params'):
- param = load_params(cachepath + '.params')
+ param = load_params(cachepath)
if param['seq_no'] < seq_no:
log.info('Ignoring locally cached metadata (outdated).')
param = backend.lookup('s3ql_metadata')
@@ -247,10 +250,7 @@ def upgrade(backend, cachepath):
raise QuietError()
# Check revision
- # Upgrade from 21 to 22 is only possible with release 2.13,
- # because we removed support for reading the old storage object
- # format after 2.13.
- if param['revision'] == 21 or param['revision'] < CURRENT_FS_REV-1:
+ if param['revision'] < CURRENT_FS_REV-1:
print(textwrap.dedent('''
File system revision too old to upgrade!
@@ -283,8 +283,8 @@ def upgrade(backend, cachepath):
raise QuietError()
if not db:
- # Need to download metadata
- db = download_metadata(backend, cachepath + '.db')
+ with monkeypatch_metadata_retrieval():
+ db = download_metadata(backend, cachepath + '.db')
log.info('Upgrading from revision %d to %d...', param['revision'], CURRENT_FS_REV)
@@ -292,14 +292,17 @@ def upgrade(backend, cachepath):
param['last-modified'] = time.time()
param['seq_no'] += 1
- # Upgrade code goes here
+ # Upgrade
+ db.execute('ALTER TABLE inodes ADD COLUMN atime_ns INT NOT NULL DEFAULT 0')
+ db.execute('ALTER TABLE inodes ADD COLUMN mtime_ns INT NOT NULL DEFAULT 0')
+ db.execute('ALTER TABLE inodes ADD COLUMN ctime_ns INT NOT NULL DEFAULT 0')
+ db.execute('UPDATE inodes SET atime_ns = atime * 1e9')
+ db.execute('UPDATE inodes SET mtime_ns = mtime * 1e9')
+ db.execute('UPDATE inodes SET ctime_ns = ctime * 1e9')
dump_and_upload_metadata(backend, db, param)
-
backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty'
-
- with open(cachepath + '.params', 'wb') as fh:
- fh.write(freeze_basic_mapping(param))
+ save_params(cachepath, param)
log.info('Cleaning up local metadata...')
db.execute('ANALYZE')
@@ -307,5 +310,50 @@ def upgrade(backend, cachepath):
print('File system upgrade complete.')
+
+@contextmanager
+def monkeypatch_metadata_retrieval():
+ DUMP_SPEC_bak = metadata.DUMP_SPEC
+ create_tables_bak = metadata.create_tables
+
+ @functools.wraps(metadata.create_tables)
+ def create_tables(conn):
+ create_tables_bak(conn)
+ conn.execute('DROP TABLE inodes')
+ conn.execute("""
+ CREATE TABLE inodes (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ uid INT NOT NULL,
+ gid INT NOT NULL,
+ mode INT NOT NULL,
+ mtime REAL NOT NULL,
+ atime REAL NOT NULL,
+ ctime REAL NOT NULL,
+ refcount INT NOT NULL,
+ size INT NOT NULL DEFAULT 0,
+ rdev INT NOT NULL DEFAULT 0,
+ locked BOOLEAN NOT NULL DEFAULT 0
+ )""")
+ metadata.create_tables = create_tables
+
+ assert metadata.DUMP_SPEC[2][0] == 'inodes'
+ metadata.DUMP_SPEC[2] = ('inodes', 'id', (('id', INTEGER, 1),
+ ('uid', INTEGER),
+ ('gid', INTEGER),
+ ('mode', INTEGER),
+ ('mtime', TIME),
+ ('atime', TIME),
+ ('ctime', TIME),
+ ('size', INTEGER),
+ ('rdev', INTEGER),
+ ('locked', INTEGER),
+ ('refcount', INTEGER)))
+
+ try:
+ yield
+ finally:
+ metadata.DUMP_SPEC = DUMP_SPEC_bak
+ metadata.create_tables = create_tables_bak
+
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/src/s3ql/backends/s3.py b/src/s3ql/backends/s3.py
index 1a20228..a1e6d20 100644
--- a/src/s3ql/backends/s3.py
+++ b/src/s3ql/backends/s3.py
@@ -32,7 +32,7 @@ class Backend(s3c.Backend):
may or may not be available and can be queried for with instance methods.
"""
- known_options = ((s3c.Backend.known_options | { 'sse', 'rrs' })
+ known_options = ((s3c.Backend.known_options | { 'sse', 'rrs', 'ia' })
- {'dumb-copy', 'disable-expect100'})
def __init__(self, storage_url, login, password, options):
@@ -83,23 +83,29 @@ class Backend(s3c.Backend):
finally:
keys[:MAX_KEYS] = tmp
+ def _set_storage_options(self, headers):
+ if 'sse' in self.options:
+ headers['x-amz-server-side-encryption'] = 'AES256'
+
+ if 'ia' in self.options:
+ sc = 'STANDARD_IA'
+ elif 'rrs' in self.options:
+ sc = 'REDUCED_REDUNDANCY'
+ else:
+ sc = 'STANDARD'
+ headers['x-amz-storage-class'] = sc
+
@copy_ancestor_docstring
def copy(self, src, dest, metadata=None):
extra_headers = {}
- if 'sse' in self.options:
- extra_headers['x-amz-server-side-encryption'] = 'AES256'
- if 'rrs' in self.options:
- extra_headers['x-amz-storage-class'] = 'REDUCED_REDUNDANCY'
+ self._set_storage_options(extra_headers)
return super().copy(src, dest, metadata=metadata,
extra_headers=extra_headers)
@copy_ancestor_docstring
def open_write(self, key, metadata=None, is_compressed=False):
extra_headers = {}
- if 'sse' in self.options:
- extra_headers['x-amz-server-side-encryption'] = 'AES256'
- if 'rrs' in self.options:
- extra_headers['x-amz-storage-class'] = 'REDUCED_REDUNDANCY'
+ self._set_storage_options(extra_headers)
return super().open_write(key, metadata=metadata, is_compressed=is_compressed,
extra_headers=extra_headers)
diff --git a/src/s3ql/block_cache.py b/src/s3ql/block_cache.py
index f258f3c..c5dc566 100644
--- a/src/s3ql/block_cache.py
+++ b/src/s3ql/block_cache.py
@@ -19,6 +19,7 @@ import hashlib
import shutil
import threading
import time
+import sys
# standard logger for this module
log = logging.getLogger(__name__)
@@ -930,19 +931,45 @@ class BlockCache(object):
log.debug('finished')
def get_usage(self):
- '''Return cache size and dirty cache size
+ '''Get cache usage information.
+
+ Return a tuple of
+
+ * cache entries
+ * cache size
+ * dirty cache entries
+ * dirty cache size
+ * pending removals
This method is O(n) in the number of cache entries.
'''
used = self.cache.size
- dirty = 0
+ dirty_size = 0
+ dirty_cnt = 0
for el in self.cache.values():
if el.dirty:
- dirty += el.size
+ dirty_size += el.size
+ dirty_cnt += 1
+
+ if self.to_remove is None:
+ remove_cnt = 0
+ else:
+ # This is an estimate which may be negative
+ remove_cnt = max(0, self.to_remove.qsize())
- return (used, dirty)
+ return (len(self.cache), used, dirty_cnt, dirty_size, remove_cnt)
def __del__(self):
- if len(self.cache) > 0:
- raise RuntimeError("BlockManager instance was destroyed without calling destroy()!")
+ if len(self.cache) == 0:
+ return
+
+ # Force execution of sys.excepthook (exceptions raised
+ # by __del__ are ignored)
+ try:
+ raise RuntimeError("BlockManager instance was destroyed without "
+ "calling destroy()!")
+ except RuntimeError:
+ exc_info = sys.exc_info()
+
+ sys.excepthook(*exc_info)
diff --git a/src/s3ql/common.py b/src/s3ql/common.py
index c8a3fc1..09216cb 100644
--- a/src/s3ql/common.py
+++ b/src/s3ql/common.py
@@ -20,8 +20,8 @@ import threading
import traceback
import sys
import os
+import time
import subprocess
-import pickle
import errno
import hashlib
import llfuse
@@ -611,11 +611,19 @@ def freeze_basic_mapping(d):
buf = '{ %s }' % ', '.join(els)
return buf.encode('utf-8')
-def load_params(fname):
- with open(fname, 'rb') as fh:
- proto = fh.read(2)
- fh.seek(0)
- if proto == b'\x80\x02':
- return pickle.load(fh)
- else:
- return thaw_basic_mapping(fh.read())
+def load_params(cachepath):
+ with open(cachepath + '.params' , 'rb') as fh:
+ return thaw_basic_mapping(fh.read())
+
+def save_params(cachepath, param):
+ with open(cachepath + '.params', 'wb') as fh:
+ fh.write(freeze_basic_mapping(param))
+
+ # Fsync to make sure that the updated sequence number is committed to
+ # disk. Otherwise, a crash immediately after mount could result in both
+ # the local and remote metadata appearing to be out of date.
+ fh.flush()
+ os.fsync(fh.fileno())
+
+def time_ns():
+ return int(time.time() * 1e9)
diff --git a/src/s3ql/ctrl.py b/src/s3ql/ctrl.py
index 38d4f0d..ad5f4ae 100644
--- a/src/s3ql/ctrl.py
+++ b/src/s3ql/ctrl.py
@@ -89,11 +89,12 @@ def main(args=None):
llfuse.setxattr(ctrlfile, 'upload-meta', b'dummy')
elif options.action == 'log':
- cmd = ('(%r, %r)' % (options.level, ','.join(options.modules))).encode()
+ level = getattr(logging, options.level.upper())
+ cmd = ('(%r, %r)' % (level, ','.join(options.modules))).encode()
llfuse.setxattr(ctrlfile, 'logging', cmd)
elif options.action == 'cachesize':
- llfuse.setxattr(ctrlfile, 'cachesize', ('%d' % options.cachesize * 1024).encode())
+ llfuse.setxattr(ctrlfile, 'cachesize', ('%d' % (options.cachesize * 1024,)).encode())
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/src/s3ql/endian_indep.h b/src/s3ql/endian_indep.h
index ee8b917..91283fc 100644
--- a/src/s3ql/endian_indep.h
+++ b/src/s3ql/endian_indep.h
@@ -27,7 +27,10 @@ This program can be distributed under the terms of the GNU GPLv3.
#define be64toh(x) OSSwapBigToHostInt64(x)
#define le64toh(x) OSSwapLittleToHostInt64(x)
-#else /* __APPLE__ */
+#elif __FreeBSD__
+#include <sys/endian.h>
+
+#else
#include <endian.h>
#endif
diff --git a/src/s3ql/fs.py b/src/s3ql/fs.py
index f00f35d..916a2a8 100644
--- a/src/s3ql/fs.py
+++ b/src/s3ql/fs.py
@@ -9,7 +9,7 @@ This work can be distributed under the terms of the GNU GPLv3.
from .logging import logging
from . import deltadump, CTRL_NAME, CTRL_INODE
from .backends.common import NoSuchObject, CorruptedObjectError
-from .common import get_path, parse_literal
+from .common import get_path, parse_literal, time_ns
from .database import NoSuchRowError
from .inode_cache import OutOfInodesError
from io import BytesIO
@@ -56,7 +56,6 @@ if not hasattr(errno, 'EOPNOTSUPP'):
else:
ACL_ERRNO = errno.EOPNOTSUPP
-
class Operations(llfuse.Operations):
"""A full-featured file system for online data storage
@@ -168,10 +167,10 @@ class Operations(llfuse.Operations):
def readlink(self, id_, ctx):
log.debug('started with %d', id_)
- timestamp = time.time()
+ now_ns = time_ns()
inode = self.inodes[id_]
- if inode.atime < inode.ctime or inode.atime < inode.mtime:
- inode.atime = timestamp
+ if inode.atime_ns < inode.ctime_ns or inode.atime_ns < inode.mtime_ns:
+ inode.atime_ns = now_ns
try:
return self.db.get_val("SELECT target FROM symlink_targets WHERE inode=?", (id_,))
except NoSuchRowError:
@@ -195,8 +194,8 @@ class Operations(llfuse.Operations):
off = -1
inode = self.inodes[id_]
- if inode.atime < inode.ctime or inode.atime < inode.mtime:
- inode.atime = time.time()
+ if inode.atime_ns < inode.ctime_ns or inode.atime_ns < inode.mtime_ns:
+ inode.atime_ns = time_ns()
with self.db.query("SELECT name_id, name, inode FROM contents_v "
'WHERE parent_inode=? AND name_id > ? ORDER BY name_id',
@@ -277,8 +276,7 @@ class Operations(llfuse.Operations):
elif name == b'logging':
try:
- (lvl, modules)= parse_literal(value, (str, str))
- lvl = logging._levelNames[lvl.upper()]
+ (lvl, modules)= parse_literal(value, (int, str))
except (ValueError, KeyError):
log.warning('Received malformed command via control inode')
raise FUSEError.EINVAL()
@@ -290,6 +288,7 @@ class Operations(llfuse.Operations):
except ValueError:
log.warning('Received malformed command via control inode')
raise FUSEError.EINVAL()
+ log.debug('updated cache size to %d bytes', self.cache.cache.max_size)
else:
log.warning('Received unknown command via control inode')
@@ -309,7 +308,7 @@ class Operations(llfuse.Operations):
self.db.execute('INSERT OR REPLACE INTO ext_attributes (inode, name_id, value) '
'VALUES(?, ?, ?)', (id_, self._add_name(name), value))
- self.inodes[id_].ctime = time.time()
+ self.inodes[id_].ctime_ns = time_ns()
def removexattr(self, id_, name, ctx):
log.debug('started with %d, %r', id_, name)
@@ -327,7 +326,7 @@ class Operations(llfuse.Operations):
if changes == 0:
raise llfuse.FUSEError(llfuse.ENOATTR)
- self.inodes[id_].ctime = time.time()
+ self.inodes[id_].ctime_ns = time_ns()
def lock_tree(self, id0):
'''Lock directory tree'''
@@ -339,7 +338,7 @@ class Operations(llfuse.Operations):
queue = [ id0 ]
self.inodes[id0].locked = True
processed = 0 # Number of steps since last GIL release
- stamp = time.time() # Time of last GIL release
+ stamp = time_ns() # Time of last GIL release
gil_step = 250 # Approx. number of steps between GIL releases
while True:
id_p = queue.pop()
@@ -457,13 +456,13 @@ class Operations(llfuse.Operations):
target_inode = self.inodes[target_id]
except KeyError:
raise FUSEError(errno.ENOENT)
- for attr in ('atime', 'ctime', 'mtime', 'mode', 'uid', 'gid'):
+ for attr in ('atime_ns', 'ctime_ns', 'mtime_ns', 'mode', 'uid', 'gid'):
setattr(target_inode, attr, getattr(src_inode, attr))
# We first replicate into a dummy inode, so that we
# need to invalidate only once.
- timestamp = time.time()
- tmp = make_inode(mtime=timestamp, ctime=timestamp, atime=timestamp,
+ now_ns = time_ns()
+ tmp = make_inode(mtime_ns=now_ns, ctime_ns=now_ns, atime_ns=now_ns,
uid=0, gid=0, mode=0, refcount=0)
queue = [ (src_id, tmp.id, 0) ]
@@ -484,8 +483,8 @@ class Operations(llfuse.Operations):
try:
inode_new = make_inode(refcount=1, mode=inode.mode, size=inode.size,
uid=inode.uid, gid=inode.gid,
- mtime=inode.mtime, atime=inode.atime,
- ctime=inode.ctime, rdev=inode.rdev)
+ mtime_ns=inode.mtime_ns, atime_ns=inode.atime_ns,
+ ctime_ns=inode.ctime_ns, rdev=inode.rdev)
except OutOfInodesError:
log.warning('Could not find a free inode')
raise FUSEError(errno.ENOSPC)
@@ -591,7 +590,7 @@ class Operations(llfuse.Operations):
log.debug('started with %d, %r', id_p, name)
- timestamp = time.time()
+ now_ns = time_ns()
# Check that there are no child entries
if self.db.has_val("SELECT 1 FROM contents WHERE parent_inode=?", (id_,)):
@@ -608,11 +607,11 @@ class Operations(llfuse.Operations):
inode = self.inodes[id_]
inode.refcount -= 1
- inode.ctime = timestamp
+ inode.ctime_ns = now_ns
inode_p = self.inodes[id_p]
- inode_p.mtime = timestamp
- inode_p.ctime = timestamp
+ inode_p.mtime_ns = now_ns
+ inode_p.ctime_ns = now_ns
if inode.refcount == 0 and id_ not in self.open_inodes:
log.debug('removing from cache')
@@ -717,7 +716,7 @@ class Operations(llfuse.Operations):
return name_id
def _rename(self, id_p_old, name_old, id_p_new, name_new):
- timestamp = time.time()
+ now_ns = time_ns()
name_id_new = self._add_name(name_new)
name_id_old = self._del_name(name_old)
@@ -728,15 +727,15 @@ class Operations(llfuse.Operations):
inode_p_old = self.inodes[id_p_old]
inode_p_new = self.inodes[id_p_new]
- inode_p_old.mtime = timestamp
- inode_p_new.mtime = timestamp
- inode_p_old.ctime = timestamp
- inode_p_new.ctime = timestamp
+ inode_p_old.mtime_ns = now_ns
+ inode_p_new.mtime_ns = now_ns
+ inode_p_old.ctime_ns = now_ns
+ inode_p_new.ctime_ns = now_ns
def _replace(self, id_p_old, name_old, id_p_new, name_new,
id_old, id_new):
- timestamp = time.time()
+ now_ns = time_ns()
if self.db.has_val("SELECT 1 FROM contents WHERE parent_inode=?", (id_new,)):
log.info("Attempted to overwrite entry with children: %s",
@@ -755,15 +754,15 @@ class Operations(llfuse.Operations):
inode_new = self.inodes[id_new]
inode_new.refcount -= 1
- inode_new.ctime = timestamp
+ inode_new.ctime_ns = now_ns
inode_p_old = self.inodes[id_p_old]
- inode_p_old.ctime = timestamp
- inode_p_old.mtime = timestamp
+ inode_p_old.ctime_ns = now_ns
+ inode_p_old.mtime_ns = now_ns
inode_p_new = self.inodes[id_p_new]
- inode_p_new.ctime = timestamp
- inode_p_new.mtime = timestamp
+ inode_p_new.ctime_ns = now_ns
+ inode_p_new.mtime_ns = now_ns
if inode_new.refcount == 0 and id_new not in self.open_inodes:
self.cache.remove(id_new, 0,
@@ -787,7 +786,7 @@ class Operations(llfuse.Operations):
get_path(new_id_p, self.db, new_name))
raise llfuse.FUSEError(errno.EACCES)
- timestamp = time.time()
+ now_ns = time_ns()
inode_p = self.inodes[new_id_p]
if inode_p.refcount == 0:
@@ -798,14 +797,14 @@ class Operations(llfuse.Operations):
if self.failsafe or inode_p.locked:
raise FUSEError(errno.EPERM)
- inode_p.ctime = timestamp
- inode_p.mtime = timestamp
+ inode_p.ctime_ns = now_ns
+ inode_p.mtime_ns = now_ns
self.db.execute("INSERT INTO contents (name_id, inode, parent_inode) VALUES(?,?,?)",
(self._add_name(new_name), id_, new_id_p))
inode = self.inodes[id_]
inode.refcount += 1
- inode.ctime = timestamp
+ inode.ctime_ns = now_ns
self.open_inodes[inode.id] += 1
return inode.entry_attributes()
@@ -815,7 +814,7 @@ class Operations(llfuse.Operations):
inode = self.inodes[id_]
if fh is not None:
assert fh == id_
- timestamp = time.time()
+ now_ns = time_ns()
if self.failsafe or inode.locked:
raise FUSEError(errno.EPERM)
@@ -861,12 +860,12 @@ class Operations(llfuse.Operations):
inode.gid = attr.st_gid
if fields.update_atime:
- inode.atime = attr.st_atime_ns / 1e9
+ inode.atime_ns = attr.st_atime_ns
if fields.update_mtime:
- inode.mtime = attr.st_mtime_ns / 1e9
+ inode.mtime_ns = attr.st_mtime_ns
- inode.ctime = timestamp
+ inode.ctime_ns = now_ns
return inode.entry_attributes()
@@ -900,11 +899,9 @@ class Operations(llfuse.Operations):
fs_size = self.db.get_val('SELECT SUM(size) FROM inodes') or 0
dedup_size = self.db.get_val('SELECT SUM(size) FROM blocks') or 0
compr_size = self.db.get_val('SELECT SUM(size) FROM objects') or 0
- (cache_used, cache_dirty) = self.cache.get_usage()
-
- return struct.pack('QQQQQQQQQ', entries, blocks, inodes, fs_size, dedup_size,
- compr_size, self.db.get_size(), cache_used, cache_dirty)
+ return struct.pack('QQQQQQQQQQQQ', entries, blocks, inodes, fs_size, dedup_size,
+ compr_size, self.db.get_size(), *self.cache.get_usage())
def statfs(self, ctx):
log.debug('started')
@@ -990,7 +987,7 @@ class Operations(llfuse.Operations):
get_path(id_p, self.db, name))
raise FUSEError(errno.EACCES)
- timestamp = time.time()
+ now_ns = time_ns()
inode_p = self.inodes[id_p]
if inode_p.locked:
@@ -1000,12 +997,18 @@ class Operations(llfuse.Operations):
log.warning('Attempted to create entry %s with unlinked parent %d',
name, id_p)
raise FUSEError(errno.EINVAL)
- inode_p.mtime = timestamp
- inode_p.ctime = timestamp
+ inode_p.mtime_ns = now_ns
+ inode_p.ctime_ns = now_ns
+ if inode_p.mode & stat.S_ISGID:
+ gid = inode_p.gid
+ if stat.S_ISDIR(mode):
+ mode |= stat.S_ISGID
+ else:
+ gid = ctx.gid
try:
- inode = self.inodes.create_inode(mtime=timestamp, ctime=timestamp, atime=timestamp,
- uid=ctx.uid, gid=ctx.gid, mode=mode, refcount=1,
+ inode = self.inodes.create_inode(mtime_ns=now_ns, ctime_ns=now_ns, atime_ns=now_ns,
+ uid=ctx.uid, gid=gid, mode=mode, refcount=1,
rdev=rdev, size=size)
except OutOfInodesError:
log.warning('Could not find a free inode')
@@ -1043,8 +1046,8 @@ class Operations(llfuse.Operations):
# Inode may have expired from cache
inode = self.inodes[fh]
- if inode.atime < inode.ctime or inode.atime < inode.mtime:
- inode.atime = time.time()
+ if inode.atime_ns < inode.ctime_ns or inode.atime_ns < inode.mtime_ns:
+ inode.atime_ns = time_ns()
return buf.getvalue()
@@ -1071,11 +1074,11 @@ class Operations(llfuse.Operations):
# so we have to be careful not to undo a size extension made by
# a concurrent write (because _readwrite() releases the global
# lock).
- timestamp = time.time()
+ now_ns = time_ns()
inode = self.inodes[fh]
inode.size = max(inode.size, minsize)
- inode.mtime = timestamp
- inode.ctime = timestamp
+ inode.mtime_ns = now_ns
+ inode.ctime_ns = now_ns
return total
diff --git a/src/s3ql/fsck.py b/src/s3ql/fsck.py
index b46ae59..1235da0 100644
--- a/src/s3ql/fsck.py
+++ b/src/s3ql/fsck.py
@@ -13,14 +13,13 @@ from .backends.comprenc import ComprencBackend
from .backends.local import Backend as LocalBackend
from .common import (inode_for_path, sha256_fh, get_path, get_backend_cachedir,
get_seq_no, is_mounted, get_backend, load_params,
- freeze_basic_mapping)
+ save_params, time_ns)
from .database import NoSuchRowError, Connection
from .metadata import create_tables, dump_and_upload_metadata, download_metadata
from .parse_args import ArgumentParser
from os.path import basename
import apsw
import os
-import hashlib
import re
import shutil
import itertools
@@ -218,7 +217,7 @@ class Fsck(object):
log.info('Checking lost+found...')
- timestamp = time.time()
+ now_ns = time_ns()
try:
(inode_l, name_id) = self.conn.get_row("SELECT inode, name_id FROM contents_v "
"WHERE name=? AND parent_inode=?", (b"lost+found", ROOT_INODE))
@@ -227,7 +226,7 @@ class Fsck(object):
self.found_errors = True
self.log_error("Recreating missing lost+found directory")
inode_l = self.create_inode(mode=stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR,
- atime=timestamp, ctime=timestamp, mtime=timestamp,
+ atime_ns=now_ns, ctime_ns=now_ns, mtime_ns=now_ns,
refcount=1)
self.conn.execute("INSERT INTO contents (name_id, inode, parent_inode) VALUES(?,?,?)",
(self._add_name(b"lost+found"), inode_l, ROOT_INODE))
@@ -241,7 +240,7 @@ class Fsck(object):
# We leave the old inode unassociated, so that it will be added
# to lost+found later on.
inode_l = self.create_inode(mode=stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR,
- atime=timestamp, ctime=timestamp, mtime=timestamp,
+ atime_ns=now_ns, ctime_ns=now_ns, mtime_ns=now_ns,
refcount=1)
self.conn.execute('UPDATE contents SET inode=? WHERE name_id=? AND parent_inode=?',
(inode_l, name_id, ROOT_INODE))
@@ -623,10 +622,10 @@ class Fsck(object):
(id_p, name) = self.resolve_free(b"/lost+found", ("block-%d" % id_).encode())
self.log_error("Block %d not referenced, adding as /lost+found/%s",
id_, to_str(name))
- timestamp = time.time()
+ now_ns = time_ns()
size = self.conn.get_val('SELECT size FROM blocks WHERE id=?', (id_,))
inode = self.create_inode(mode=stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR,
- mtime=timestamp, atime=timestamp, ctime=timestamp,
+ mtime_ns=now_ns, atime_ns=now_ns, ctime_ns=now_ns,
refcount=1, size=size)
self.conn.execute('INSERT INTO inode_blocks (inode, blockno, block_id) VALUES(?,?,?)',
(inode, 0, id_))
@@ -667,13 +666,13 @@ class Fsck(object):
self.conn.execute('DELETE FROM objects WHERE id=?', (obj_id,))
def create_inode(self, mode, uid=os.getuid(), gid=os.getgid(),
- mtime=None, atime=None, ctime=None, refcount=None,
+ mtime_ns=None, atime_ns=None, ctime_ns=None, refcount=None,
size=0):
'''Create inode'''
- id_ = self.conn.rowid('INSERT INTO inodes (mode,uid,gid,mtime,atime,ctime,'
+ id_ = self.conn.rowid('INSERT INTO inodes (mode,uid,gid,mtime_ns,atime_ns,ctime_ns,'
'refcount,size) VALUES (?,?,?,?,?,?,?,?)',
- (mode, uid, gid, mtime, atime, ctime, refcount, size))
+ (mode, uid, gid, mtime_ns, atime_ns, ctime_ns, refcount, size))
return id_
@@ -1051,7 +1050,7 @@ class ROFsck(Fsck):
db = Connection(path + '.db')
db.execute('PRAGMA journal_mode = WAL')
- param = load_params(path + '.params')
+ param = load_params(path)
super().__init__(None, None, param, db)
def check(self):
@@ -1165,7 +1164,7 @@ def main(args=None):
if os.path.exists(cachepath + '.params'):
assert os.path.exists(cachepath + '.db')
- param = load_params(cachepath + '.params')
+ param = load_params(cachepath)
if param['seq_no'] < seq_no:
log.info('Ignoring locally cached metadata (outdated).')
param = backend.lookup('s3ql_metadata')
@@ -1262,8 +1261,7 @@ def main(args=None):
param['seq_no'] += 1
param['needs_fsck'] = True
backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty'
- with open(cachepath + '.params', 'wb') as fh:
- fh.write(freeze_basic_mapping(param))
+ save_params(cachepath, param)
fsck = Fsck(cachepath + '-cache', backend, param, db)
fsck.check()
@@ -1289,8 +1287,7 @@ def main(args=None):
param['last-modified'] = time.time()
dump_and_upload_metadata(backend, db, param)
- with open(cachepath + '.params', 'wb') as fh:
- fh.write(freeze_basic_mapping(param))
+ save_params(cachepath, param)
log.info('Cleaning up local metadata...')
db.execute('ANALYZE')
@@ -1328,9 +1325,9 @@ def renumber_inodes(db):
(CTRL_INODE,))
log.info('..inodes..')
- db.execute('INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount,size,locked,rdev) '
+ db.execute('INSERT INTO inodes (id,mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount,size,locked,rdev) '
'SELECT (SELECT rowid FROM inode_map WHERE inode_map.id = inodes_old.id), '
- ' mode,uid,gid,mtime,atime,ctime,refcount,size,locked,rdev FROM inodes_old')
+ ' mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount,size,locked,rdev FROM inodes_old')
log.info('..inode_blocks..')
db.execute('INSERT INTO inode_blocks (inode, blockno, block_id) '
diff --git a/src/s3ql/inode_cache.py b/src/s3ql/inode_cache.py
index fda0491..77c8985 100644
--- a/src/s3ql/inode_cache.py
+++ b/src/s3ql/inode_cache.py
@@ -9,15 +9,16 @@ This work can be distributed under the terms of the GNU GPLv3.
from .logging import logging # Ensure use of custom logger class
from .database import NoSuchRowError
import llfuse
+import sys
log = logging.getLogger(__name__)
CACHE_SIZE = 100
ATTRIBUTES = ('mode', 'refcount', 'uid', 'gid', 'size', 'locked',
- 'rdev', 'atime', 'mtime', 'ctime', 'id')
+ 'rdev', 'atime_ns', 'mtime_ns', 'ctime_ns', 'id')
ATTRIBUTE_STR = ', '.join(ATTRIBUTES)
UPDATE_ATTRS = ('mode', 'refcount', 'uid', 'gid', 'size', 'locked',
- 'rdev', 'atime', 'mtime', 'ctime')
+ 'rdev', 'atime_ns', 'mtime_ns', 'ctime_ns')
UPDATE_STR = ', '.join('%s=?' % x for x in UPDATE_ATTRS)
MAX_INODE = 2 ** 32 - 1
@@ -52,9 +53,9 @@ class _Inode:
attr.st_gid = self.gid
attr.st_size = self.size
attr.st_rdev = self.rdev
- attr.st_atime_ns = int(self.atime*1e9+0.5)
- attr.st_mtime_ns = int(self.mtime*1e9+0.5)
- attr.st_ctime_ns = int(self.ctime*1e9+0.5)
+ attr.st_atime_ns = self.atime_ns
+ attr.st_mtime_ns = self.mtime_ns
+ attr.st_ctime_ns = self.ctime_ns
attr.generation = self.generation
return attr
@@ -81,6 +82,18 @@ class _Inode:
object.__setattr__(self, 'dirty', True)
object.__setattr__(self, name, value)
+ def __del__(self):
+ if not self.dirty:
+ return
+
+ # Force execution of sys.excepthook (exceptions raised
+ # by __del__ are ignored)
+ try:
+ raise RuntimeError('BUG ALERT: Dirty inode was destroyed!')
+ except RuntimeError:
+ exc_info = sys.exc_info()
+
+ sys.excepthook(*exc_info)
class InodeCache(object):
'''
@@ -136,10 +149,9 @@ class InodeCache(object):
def __delitem__(self, inode):
if self.db.execute('DELETE FROM inodes WHERE id=?', (inode,)) != 1:
raise KeyError('No such inode')
- try:
- del self.attrs[inode]
- except KeyError:
- pass
+ inode = self.attrs.pop(inode, None)
+ if inode is not None:
+ inode.dirty = False
def __getitem__(self, id_):
try:
@@ -239,8 +251,17 @@ class InodeCache(object):
self.setattr(inode)
def __del__(self):
- if len(self.attrs) > 0:
+ if len(self.attrs) == 0:
+ return
+
+ # Force execution of sys.excepthook (exceptions raised
+ # by __del__ are ignored)
+ try:
raise RuntimeError('InodeCache instance was destroyed without calling destroy()')
+ except RuntimeError:
+ exc_info = sys.exc_info()
+
+ sys.excepthook(*exc_info)
diff --git a/src/s3ql/metadata.py b/src/s3ql/metadata.py
index 498721b..7e0d2f2 100644
--- a/src/s3ql/metadata.py
+++ b/src/s3ql/metadata.py
@@ -10,7 +10,7 @@ from .logging import logging # Ensure use of custom logger class
from .database import Connection
from . import BUFSIZE
from .common import pretty_print_size
-from .deltadump import INTEGER, BLOB, TIME, dump_table, load_table
+from .deltadump import INTEGER, BLOB, dump_table, load_table
from .backends.common import NoSuchObject, CorruptedObjectError
import os
import tempfile
@@ -35,9 +35,9 @@ DUMP_SPEC = [
('uid', INTEGER),
('gid', INTEGER),
('mode', INTEGER),
- ('mtime', TIME),
- ('atime', TIME),
- ('ctime', TIME),
+ ('mtime_ns', INTEGER),
+ ('atime_ns', INTEGER),
+ ('ctime_ns', INTEGER),
('size', INTEGER),
('rdev', INTEGER),
('locked', INTEGER),
@@ -78,8 +78,6 @@ def restore_metadata(fh, dbfile):
*dbfile* will be created with 0600 permissions. Data is
first written into a temporary file *dbfile* + '.tmp', and
the file is renamed once all data has been loaded.
-
-
'''
tmpfile = dbfile + '.tmp'
@@ -195,9 +193,9 @@ def create_tables(conn):
uid INT NOT NULL,
gid INT NOT NULL,
mode INT NOT NULL,
- mtime REAL NOT NULL,
- atime REAL NOT NULL,
- ctime REAL NOT NULL,
+ mtime_ns INT NOT NULL,
+ atime_ns INT NOT NULL,
+ ctime_ns INT NOT NULL,
refcount INT NOT NULL,
size INT NOT NULL DEFAULT 0,
rdev INT NOT NULL DEFAULT 0,
diff --git a/src/s3ql/mkfs.py b/src/s3ql/mkfs.py
index 3281106..b6da7ce 100644
--- a/src/s3ql/mkfs.py
+++ b/src/s3ql/mkfs.py
@@ -11,7 +11,7 @@ from . import CURRENT_FS_REV, CTRL_INODE, ROOT_INODE
from .backends.comprenc import ComprencBackend
from .backends import s3
from .common import (get_backend_cachedir, get_backend, split_by_n,
- freeze_basic_mapping)
+ freeze_basic_mapping, time_ns)
from .database import Connection
from .metadata import dump_and_upload_metadata, create_tables
from .parse_args import ArgumentParser
@@ -57,24 +57,24 @@ def parse_args(args):
def init_tables(conn):
# Insert root directory
- timestamp = time.time()
- conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount) "
+ now_ns = time_ns()
+ conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount) "
"VALUES (?,?,?,?,?,?,?,?)",
(ROOT_INODE, stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
| stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH,
- os.getuid(), os.getgid(), timestamp, timestamp, timestamp, 1))
+ os.getuid(), os.getgid(), now_ns, now_ns, now_ns, 1))
# Insert control inode, the actual values don't matter that much
- conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime,atime,ctime,refcount) "
+ conn.execute("INSERT INTO inodes (id,mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount) "
"VALUES (?,?,?,?,?,?,?,?)",
(CTRL_INODE, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR,
- 0, 0, timestamp, timestamp, timestamp, 42))
+ 0, 0, now_ns, now_ns, now_ns, 42))
# Insert lost+found directory
- inode = conn.rowid("INSERT INTO inodes (mode,uid,gid,mtime,atime,ctime,refcount) "
+ inode = conn.rowid("INSERT INTO inodes (mode,uid,gid,mtime_ns,atime_ns,ctime_ns,refcount) "
"VALUES (?,?,?,?,?,?,?)",
(stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR,
- os.getuid(), os.getgid(), timestamp, timestamp, timestamp, 1))
+ os.getuid(), os.getgid(), now_ns, now_ns, now_ns, 1))
name_id = conn.rowid('INSERT INTO names (name, refcount) VALUES(?,?)',
(b'lost+found', 1))
conn.execute("INSERT INTO contents (name_id, inode, parent_inode) VALUES(?,?,?)",
diff --git a/src/s3ql/mount.py b/src/s3ql/mount.py
index 7a3ed61..ffb9b2f 100644
--- a/src/s3ql/mount.py
+++ b/src/s3ql/mount.py
@@ -11,7 +11,7 @@ from . import fs, CURRENT_FS_REV
from .backends.pool import BackendPool
from .block_cache import BlockCache
from .common import (get_backend_cachedir, get_seq_no, get_backend_factory,
- load_params, freeze_basic_mapping)
+ load_params, save_params)
from .daemonize import daemonize
from .database import Connection
from .inode_cache import InodeCache
@@ -170,10 +170,7 @@ def main(args=None):
unmount_clean = False
def unmount():
log.info("Unmounting file system...")
- # Acquire lock so that Operations.destroy() is called with the
- # global lock like all other handlers
- with llfuse.lock:
- llfuse.close(unmount=unmount_clean)
+ llfuse.close(unmount=unmount_clean)
cm.callback(unmount)
if options.fg:
@@ -244,13 +241,11 @@ def main(args=None):
log.info('File system unchanged, not uploading metadata.')
del backend['s3ql_seq_no_%d' % param['seq_no']]
param['seq_no'] -= 1
- with open(cachepath + '.params', 'wb') as fh:
- fh.write(freeze_basic_mapping(param))
+ save_params(cachepath, param)
elif seq_no == param['seq_no']:
param['last-modified'] = time.time()
dump_and_upload_metadata(backend, db, param)
- with open(cachepath + '.params', 'wb') as fh:
- fh.write(freeze_basic_mapping(param))
+ save_params(cachepath, param)
else:
log.error('Remote metadata is newer than local (%d vs %d), '
'refusing to overwrite!', seq_no, param['seq_no'])
@@ -365,7 +360,7 @@ def get_metadata(backend, cachepath):
# Check for cached metadata
db = None
if os.path.exists(cachepath + '.params'):
- param = load_params(cachepath + '.params')
+ param = load_params(cachepath)
if param['seq_no'] < seq_no:
log.info('Ignoring locally cached metadata (outdated).')
param = backend.lookup('s3ql_metadata')
@@ -419,9 +414,7 @@ def get_metadata(backend, cachepath):
# Download metadata
if not db:
db = download_metadata(backend, cachepath + '.db')
-
- with open(cachepath + '.params', 'wb') as fh:
- fh.write(freeze_basic_mapping(param))
+ save_params(cachepath, param)
return (param, db)
@@ -430,14 +423,7 @@ def mark_metadata_dirty(backend, cachepath, param):
param['seq_no'] += 1
param['needs_fsck'] = True
- with open(cachepath + '.params', 'wb') as fh:
- fh.write(freeze_basic_mapping(param))
-
- # Fsync to make sure that the updated sequence number is committed to
- # disk. Otherwise, a crash immediately after mount could result in both
- # the local and remote metadata appearing to be out of date.
- fh.flush()
- os.fsync(fh.fileno())
+ save_params(cachepath, param)
backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty'
param['needs_fsck'] = False
diff --git a/src/s3ql/statfs.py b/src/s3ql/statfs.py
index 0a9e455..533c68a 100644
--- a/src/s3ql/statfs.py
+++ b/src/s3ql/statfs.py
@@ -30,6 +30,9 @@ def parse_args(args):
type=(lambda x: x.rstrip('/')),
help='Mount point of the file system to examine')
+ parser.add_argument("--raw", action="store_true", default=False,
+ help="Do not pretty-print numbers")
+
return parser.parse_args(args)
def main(args=None):
@@ -41,29 +44,36 @@ def main(args=None):
options = parse_args(args)
setup_logging(options)
+ if options.raw:
+ pprint = lambda x: '%d bytes' % x
+ else:
+ pprint = pretty_print_size
+
ctrlfile = assert_fs_owner(options.mountpoint, mountpoint=True)
# Use a decent sized buffer, otherwise the statistics have to be
- # calculated thee(!) times because we need to invoce getxattr
+ # calculated three(!) times because we need to invoke getxattr
# three times.
buf = llfuse.getxattr(ctrlfile, 's3qlstat', size_guess=256)
(entries, blocks, inodes, fs_size, dedup_size,
- compr_size, db_size, cache_used, cache_dirty) = struct.unpack('QQQQQQQQQ', buf)
+ compr_size, db_size, cache_cnt, cache_size, dirty_cnt,
+ dirty_size, removal_cnt) = struct.unpack('QQQQQQQQQQQQ', buf)
p_dedup = dedup_size * 100 / fs_size if fs_size else 0
p_compr_1 = compr_size * 100 / fs_size if fs_size else 0
p_compr_2 = compr_size * 100 / dedup_size if dedup_size else 0
print ('Directory entries: %d' % entries,
'Inodes: %d' % inodes,
'Data blocks: %d' % blocks,
- 'Total data size: %s' % pretty_print_size(fs_size),
+ 'Total data size: %s' % pprint(fs_size),
'After de-duplication: %s (%.2f%% of total)'
- % (pretty_print_size(dedup_size), p_dedup),
+ % (pprint(dedup_size), p_dedup),
'After compression: %s (%.2f%% of total, %.2f%% of de-duplicated)'
- % (pretty_print_size(compr_size), p_compr_1, p_compr_2),
- 'Database size: %s (uncompressed)' % pretty_print_size(db_size),
- 'Cache usage: %s (dirty: %s)' % (pretty_print_size(cache_used),
- pretty_print_size(cache_dirty)),
+ % (pprint(compr_size), p_compr_1, p_compr_2),
+ 'Database size: %s (uncompressed)' % pprint(db_size),
+ 'Cache size: %s, %d entries' % (pprint(cache_size), cache_cnt),
+ 'Cache size (dirty): %s, %d entries' % (pprint(dirty_size), dirty_cnt),
+ 'Queued object removals: %d' % (removal_cnt,),
sep='\n')