diff options
Diffstat (limited to 'tests/t4_fuse.py')
-rw-r--r-- | tests/t4_fuse.py | 273 |
1 files changed, 187 insertions, 86 deletions
diff --git a/tests/t4_fuse.py b/tests/t4_fuse.py index 8c5daa1..aa1a90c 100644 --- a/tests/t4_fuse.py +++ b/tests/t4_fuse.py @@ -3,32 +3,130 @@ t4_fuse.py - this file is part of S3QL (http://s3ql.googlecode.com) Copyright (C) 2008-2009 Nikolaus Rath <Nikolaus@rath.org> -This program can be distributed under the terms of the GNU LGPL. +This program can be distributed under the terms of the GNU GPLv3. ''' -from __future__ import division, print_function +from __future__ import absolute_import, division, print_function from _common import TestCase -from cStringIO import StringIO from os.path import basename -from s3ql.common import retry, AsyncFn import filecmp +import llfuse import os.path -import s3ql.cli.fsck -import s3ql.cli.mkfs -import s3ql.cli.mount -import s3ql.cli.umount import shutil import stat -import llfuse import subprocess import sys import tempfile +import threading import time +import traceback import unittest2 as unittest +import logging + +log = logging.getLogger() # For debugging USE_VALGRIND = False +class ExceptionStoringThread(threading.Thread): + def __init__(self): + super(ExceptionStoringThread, self).__init__() + self._exc_info = None + self._joined = False + + def run_protected(self): + pass + + def run(self): + try: + self.run_protected() + except: + # This creates a circular reference chain + self._exc_info = sys.exc_info() + + def join_get_exc(self): + self._joined = True + self.join() + return self._exc_info + + def join_and_raise(self): + '''Wait for the thread to finish, raise any occurred exceptions''' + + self._joined = True + if self.is_alive(): + self.join() + + if self._exc_info is not None: + # Break reference chain + exc_info = self._exc_info + self._exc_info = None + raise EmbeddedException(exc_info, self.name) + + def __del__(self): + if not self._joined: + raise RuntimeError("ExceptionStoringThread instance was destroyed " + "without calling join_and_raise()!") + +class EmbeddedException(Exception): + '''Encapsulates an exception that happened in a different thread + ''' + + def __init__(self, exc_info, threadname): + super(EmbeddedException, self).__init__() + self.exc_info = exc_info + self.threadname = threadname + + log.error('Thread %s terminated with exception:\n%s', + self.threadname, ''.join(traceback.format_exception(*self.exc_info))) + + def __str__(self): + return ''.join(['caused by an exception in thread %s.\n' % self.threadname, + 'Original/inner traceback (most recent call last): \n' ] + + traceback.format_exception(*self.exc_info)) + +class AsyncFn(ExceptionStoringThread): + def __init__(self, fn, *args, **kwargs): + super(AsyncFn, self).__init__() + self.target = fn + self.args = args + self.kwargs = kwargs + + def run_protected(self): + self.target(*self.args, **self.kwargs) + +def retry(timeout, fn, *a, **kw): + """Wait for fn(*a, **kw) to return True. + + If the return value of fn() returns something True, this value + is returned. Otherwise, the function is called repeatedly for + `timeout` seconds. If the timeout is reached, `TimeoutError` is + raised. + """ + + step = 0.2 + waited = 0 + while waited < timeout: + ret = fn(*a, **kw) + if ret: + return ret + time.sleep(step) + waited += step + if step < waited / 30: + step *= 2 + + raise TimeoutError() + +class TimeoutError(Exception): + '''Raised by `retry()` when a timeout is reached.''' + + pass + +if __name__ == '__main__': + mypath = sys.argv[0] +else: + mypath = __file__ +BASEDIR = os.path.abspath(os.path.join(os.path.dirname(mypath), '..')) + class fuse_tests(TestCase): def setUp(self): @@ -41,102 +139,70 @@ class fuse_tests(TestCase): self.cache_dir = tempfile.mkdtemp() self.bucket_dir = tempfile.mkdtemp() - self.bucketname = 'local://' + os.path.join(self.bucket_dir, 'mybucket') + self.bucketname = 'local://' + self.bucket_dir self.passphrase = 'oeut3d' - self.mount_thread = None + self.mount_process = None self.name_cnt = 0 - - def tearDown(self): - # Umount if still mounted - if os.path.ismount(self.mnt_dir): - subprocess.call(['fusermount', '-z', '-u', self.mnt_dir]) - - # Try to wait for mount thread to prevent spurious errors - # because the db file is being removed - if self.mount_thread and USE_VALGRIND: - retry(60, lambda: self.mount_thread.poll() is not None) - elif self.mount_thread: - self.mount_thread.join(60) - - shutil.rmtree(self.mnt_dir) - shutil.rmtree(self.cache_dir) - shutil.rmtree(self.bucket_dir) - - if not USE_VALGRIND and not self.mount_thread.is_alive(): - self.mount_thread.join_and_raise() - def mount(self): + def mkfs(self): + proc = subprocess.Popen([os.path.join(BASEDIR, 'bin', 'mkfs.s3ql'), + '-L', 'test fs', '--blocksize', '500', + '--cachedir', self.cache_dir, '--quiet', + self.bucketname ], stdin=subprocess.PIPE) - sys.stdin = StringIO('%s\n%s\n' % (self.passphrase, self.passphrase)) - try: - s3ql.cli.mkfs.main(['-L', 'test fs', '--blocksize', '500', - '--homedir', self.cache_dir, self.bucketname ]) - except BaseException as exc: - self.fail("mkfs.s3ql failed: %s" % exc) - + print(self.passphrase, file=proc.stdin) + print(self.passphrase, file=proc.stdin) + proc.stdin.close() + + self.assertEqual(proc.wait(), 0) - # Note: When running inside test suite, we have less available - # file descriptors - if USE_VALGRIND: - if __name__ == '__main__': - mypath = sys.argv[0] - else: - mypath = __file__ - basedir = os.path.abspath(os.path.join(os.path.dirname(mypath), '..')) - self.mount_thread = subprocess.Popen(['valgrind', 'python-dbg', - os.path.join(basedir, 'bin', 'mount.s3ql'), - "--fg", '--homedir', self.cache_dir, - '--max-cache-entries', '500', + def mount(self): + self.mount_process = subprocess.Popen([os.path.join(BASEDIR, 'bin', 'mount.s3ql'), + "--fg", '--cachedir', self.cache_dir, + '--log', 'none', '--quiet', self.bucketname, self.mnt_dir], stdin=subprocess.PIPE) - print(self.passphrase, file=self.mount_thread.stdin) - retry(30, os.path.ismount, self.mnt_dir) - else: - sys.stdin = StringIO('%s\n' % self.passphrase) - self.mount_thread = AsyncFn(s3ql.cli.mount.main, - ["--fg", '--homedir', self.cache_dir, - '--max-cache-entries', '500', - self.bucketname, self.mnt_dir]) - self.mount_thread.start() - - # Wait for mountpoint to come up - try: - retry(3, os.path.ismount, self.mnt_dir) - except: - self.mount_thread.join_and_raise() + print(self.passphrase, file=self.mount_process.stdin) + self.mount_process.stdin.close() + retry(30, os.path.ismount, self.mnt_dir) def umount(self): - time.sleep(0.5) devnull = open('/dev/null', 'wb') retry(5, lambda: subprocess.call(['fuser', '-m', self.mnt_dir], stdout=devnull, stderr=devnull) == 1) - s3ql.cli.umount.DONTWAIT = True - try: - s3ql.cli.umount.main([self.mnt_dir]) - except BaseException as exc: - self.fail("Umount failed: %s" % exc) - - # Now wait for server process - if USE_VALGRIND: - self.assertEqual(self.mount_thread.wait(), 0) - else: - exc = self.mount_thread.join_get_exc() - self.assertIsNone(exc) + + subprocess.check_call([os.path.join(BASEDIR, 'bin', 'umount.s3ql'), + '--quiet', self.mnt_dir]) + self.assertEqual(self.mount_process.wait(), 0) self.assertFalse(os.path.ismount(self.mnt_dir)) - # Now run an fsck - sys.stdin = StringIO('%s\n' % self.passphrase) - try: - s3ql.cli.fsck.main(['--force', '--homedir', self.cache_dir, - self.bucketname]) - except BaseException as exc: - self.fail("fsck failed: %s" % exc) + def fsck(self): + proc = subprocess.Popen([os.path.join(BASEDIR, 'bin', 'fsck.s3ql'), + '--force', '--quiet', '--log', 'none', + '--cachedir', self.cache_dir, + self.bucketname ], stdin=subprocess.PIPE) + print(self.passphrase, file=proc.stdin) + proc.stdin.close() + self.assertEqual(proc.wait(), 0) + + def tearDown(self): + subprocess.call(['fusermount', '-z', '-u', self.mnt_dir], + stderr=open('/dev/null', 'wb')) + os.rmdir(self.mnt_dir) + + # Give mount process a little while to terminate + retry(10, lambda : self.mount_process.poll() is not None) + + shutil.rmtree(self.cache_dir) + shutil.rmtree(self.bucket_dir) + def runTest(self): # Run all tests in same environment, mounting and umounting # just takes too long otherwise + self.mkfs() self.mount() self.tst_chown() self.tst_link() @@ -146,9 +212,23 @@ class fuse_tests(TestCase): self.tst_statvfs() self.tst_symlink() self.tst_truncate() + self.tst_truncate_nocache() self.tst_write() self.umount() - + self.fsck() + + # Empty cache + shutil.rmtree(self.cache_dir) + self.cache_dir = tempfile.mkdtemp() + + self.mount() + self.umount() + + # Empty cache + shutil.rmtree(self.cache_dir) + self.cache_dir = tempfile.mkdtemp() + self.fsck() + def newname(self): self.name_cnt += 1 return "s3ql_%d" % self.name_cnt @@ -290,7 +370,28 @@ class fuse_tests(TestCase): os.close(fd) os.unlink(filename) + def tst_truncate_nocache(self): + filename = os.path.join(self.mnt_dir, self.newname()) + src = self.src + shutil.copyfile(src, filename) + self.assertTrue(filecmp.cmp(filename, src, False)) + fstat = os.stat(filename) + size = fstat.st_size + subprocess.check_call([os.path.join(BASEDIR, 'bin', 's3qlctrl'), + '--quiet', 'flushcache', self.mnt_dir ]) + + fd = os.open(filename, os.O_RDWR) + + os.ftruncate(fd, size + 1024) # add > 1 block + self.assertEquals(os.stat(filename).st_size, size + 1024) + + os.ftruncate(fd, size - 1024) # Truncate > 1 block + self.assertEquals(os.stat(filename).st_size, size - 1024) + + os.close(fd) + os.unlink(filename) + # Somehow important according to pyunit documentation def suite(): return unittest.makeSuite(fuse_tests) |