summaryrefslogtreecommitdiff
path: root/tests/t4_fuse.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/t4_fuse.py')
-rw-r--r--tests/t4_fuse.py301
1 files changed, 301 insertions, 0 deletions
diff --git a/tests/t4_fuse.py b/tests/t4_fuse.py
new file mode 100644
index 0000000..8c5daa1
--- /dev/null
+++ b/tests/t4_fuse.py
@@ -0,0 +1,301 @@
+'''
+t4_fuse.py - this file is part of S3QL (http://s3ql.googlecode.com)
+
+Copyright (C) 2008-2009 Nikolaus Rath <Nikolaus@rath.org>
+
+This program can be distributed under the terms of the GNU LGPL.
+'''
+
+from __future__ import division, print_function
+from _common import TestCase
+from cStringIO import StringIO
+from os.path import basename
+from s3ql.common import retry, AsyncFn
+import filecmp
+import os.path
+import s3ql.cli.fsck
+import s3ql.cli.mkfs
+import s3ql.cli.mount
+import s3ql.cli.umount
+import shutil
+import stat
+import llfuse
+import subprocess
+import sys
+import tempfile
+import time
+import unittest2 as unittest
+
+# For debugging
+USE_VALGRIND = False
+
+class fuse_tests(TestCase):
+
+ def setUp(self):
+ # We need this to test multi block operations
+ self.src = __file__
+ if os.path.getsize(self.src) < 1048:
+ raise RuntimeError("test file %s should be bigger than 1 kb" % self.src)
+
+ self.mnt_dir = tempfile.mkdtemp()
+ self.cache_dir = tempfile.mkdtemp()
+ self.bucket_dir = tempfile.mkdtemp()
+
+ self.bucketname = 'local://' + os.path.join(self.bucket_dir, 'mybucket')
+ self.passphrase = 'oeut3d'
+
+ self.mount_thread = None
+ self.name_cnt = 0
+
+ def tearDown(self):
+ # Umount if still mounted
+ if os.path.ismount(self.mnt_dir):
+ subprocess.call(['fusermount', '-z', '-u', self.mnt_dir])
+
+ # Try to wait for mount thread to prevent spurious errors
+ # because the db file is being removed
+ if self.mount_thread and USE_VALGRIND:
+ retry(60, lambda: self.mount_thread.poll() is not None)
+ elif self.mount_thread:
+ self.mount_thread.join(60)
+
+ shutil.rmtree(self.mnt_dir)
+ shutil.rmtree(self.cache_dir)
+ shutil.rmtree(self.bucket_dir)
+
+ if not USE_VALGRIND and not self.mount_thread.is_alive():
+ self.mount_thread.join_and_raise()
+
+ def mount(self):
+
+ sys.stdin = StringIO('%s\n%s\n' % (self.passphrase, self.passphrase))
+ try:
+ s3ql.cli.mkfs.main(['-L', 'test fs', '--blocksize', '500',
+ '--homedir', self.cache_dir, self.bucketname ])
+ except BaseException as exc:
+ self.fail("mkfs.s3ql failed: %s" % exc)
+
+
+ # Note: When running inside test suite, we have less available
+ # file descriptors
+ if USE_VALGRIND:
+ if __name__ == '__main__':
+ mypath = sys.argv[0]
+ else:
+ mypath = __file__
+ basedir = os.path.abspath(os.path.join(os.path.dirname(mypath), '..'))
+ self.mount_thread = subprocess.Popen(['valgrind', 'python-dbg',
+ os.path.join(basedir, 'bin', 'mount.s3ql'),
+ "--fg", '--homedir', self.cache_dir,
+ '--max-cache-entries', '500',
+ self.bucketname, self.mnt_dir],
+ stdin=subprocess.PIPE)
+ print(self.passphrase, file=self.mount_thread.stdin)
+ retry(30, os.path.ismount, self.mnt_dir)
+ else:
+ sys.stdin = StringIO('%s\n' % self.passphrase)
+ self.mount_thread = AsyncFn(s3ql.cli.mount.main,
+ ["--fg", '--homedir', self.cache_dir,
+ '--max-cache-entries', '500',
+ self.bucketname, self.mnt_dir])
+ self.mount_thread.start()
+
+ # Wait for mountpoint to come up
+ try:
+ retry(3, os.path.ismount, self.mnt_dir)
+ except:
+ self.mount_thread.join_and_raise()
+
+ def umount(self):
+ time.sleep(0.5)
+ devnull = open('/dev/null', 'wb')
+ retry(5, lambda: subprocess.call(['fuser', '-m', self.mnt_dir],
+ stdout=devnull, stderr=devnull) == 1)
+ s3ql.cli.umount.DONTWAIT = True
+ try:
+ s3ql.cli.umount.main([self.mnt_dir])
+ except BaseException as exc:
+ self.fail("Umount failed: %s" % exc)
+
+ # Now wait for server process
+ if USE_VALGRIND:
+ self.assertEqual(self.mount_thread.wait(), 0)
+ else:
+ exc = self.mount_thread.join_get_exc()
+ self.assertIsNone(exc)
+ self.assertFalse(os.path.ismount(self.mnt_dir))
+
+ # Now run an fsck
+ sys.stdin = StringIO('%s\n' % self.passphrase)
+ try:
+ s3ql.cli.fsck.main(['--force', '--homedir', self.cache_dir,
+ self.bucketname])
+ except BaseException as exc:
+ self.fail("fsck failed: %s" % exc)
+
+ def runTest(self):
+ # Run all tests in same environment, mounting and umounting
+ # just takes too long otherwise
+
+ self.mount()
+ self.tst_chown()
+ self.tst_link()
+ self.tst_mkdir()
+ self.tst_mknod()
+ self.tst_readdir()
+ self.tst_statvfs()
+ self.tst_symlink()
+ self.tst_truncate()
+ self.tst_write()
+ self.umount()
+
+ def newname(self):
+ self.name_cnt += 1
+ return "s3ql_%d" % self.name_cnt
+
+ def tst_mkdir(self):
+ dirname = self.newname()
+ fullname = self.mnt_dir + "/" + dirname
+ os.mkdir(fullname)
+ fstat = os.stat(fullname)
+ self.assertTrue(stat.S_ISDIR(fstat.st_mode))
+ self.assertEquals(llfuse.listdir(fullname), [])
+ self.assertEquals(fstat.st_nlink, 1)
+ self.assertTrue(dirname in llfuse.listdir(self.mnt_dir))
+ os.rmdir(fullname)
+ self.assertRaises(OSError, os.stat, fullname)
+ self.assertTrue(dirname not in llfuse.listdir(self.mnt_dir))
+
+ def tst_symlink(self):
+ linkname = self.newname()
+ fullname = self.mnt_dir + "/" + linkname
+ os.symlink("/imaginary/dest", fullname)
+ fstat = os.lstat(fullname)
+ self.assertTrue(stat.S_ISLNK(fstat.st_mode))
+ self.assertEquals(os.readlink(fullname), "/imaginary/dest")
+ self.assertEquals(fstat.st_nlink, 1)
+ self.assertTrue(linkname in llfuse.listdir(self.mnt_dir))
+ os.unlink(fullname)
+ self.assertRaises(OSError, os.lstat, fullname)
+ self.assertTrue(linkname not in llfuse.listdir(self.mnt_dir))
+
+ def tst_mknod(self):
+ filename = os.path.join(self.mnt_dir, self.newname())
+ src = self.src
+ shutil.copyfile(src, filename)
+ fstat = os.lstat(filename)
+ self.assertTrue(stat.S_ISREG(fstat.st_mode))
+ self.assertEquals(fstat.st_nlink, 1)
+ self.assertTrue(basename(filename) in llfuse.listdir(self.mnt_dir))
+ self.assertTrue(filecmp.cmp(src, filename, False))
+ os.unlink(filename)
+ self.assertRaises(OSError, os.stat, filename)
+ self.assertTrue(basename(filename) not in llfuse.listdir(self.mnt_dir))
+
+ def tst_chown(self):
+ filename = os.path.join(self.mnt_dir, self.newname())
+ os.mkdir(filename)
+ fstat = os.lstat(filename)
+ uid = fstat.st_uid
+ gid = fstat.st_gid
+
+ uid_new = uid + 1
+ os.chown(filename, uid_new, -1)
+ fstat = os.lstat(filename)
+ self.assertEquals(fstat.st_uid, uid_new)
+ self.assertEquals(fstat.st_gid, gid)
+
+ gid_new = gid + 1
+ os.chown(filename, -1, gid_new)
+ fstat = os.lstat(filename)
+ self.assertEquals(fstat.st_uid, uid_new)
+ self.assertEquals(fstat.st_gid, gid_new)
+
+ os.rmdir(filename)
+ self.assertRaises(OSError, os.stat, filename)
+ self.assertTrue(basename(filename) not in llfuse.listdir(self.mnt_dir))
+
+
+ def tst_write(self):
+ name = os.path.join(self.mnt_dir, self.newname())
+ src = self.src
+ shutil.copyfile(src, name)
+ self.assertTrue(filecmp.cmp(name, src, False))
+
+ # Don't unlink file, we want to see if cache flushing
+ # works
+
+ def tst_statvfs(self):
+ os.statvfs(self.mnt_dir)
+
+ def tst_link(self):
+ name1 = os.path.join(self.mnt_dir, self.newname())
+ name2 = os.path.join(self.mnt_dir, self.newname())
+ src = self.src
+ shutil.copyfile(src, name1)
+ self.assertTrue(filecmp.cmp(name1, src, False))
+ os.link(name1, name2)
+
+ fstat1 = os.lstat(name1)
+ fstat2 = os.lstat(name2)
+
+ self.assertEquals(fstat1, fstat2)
+ self.assertEquals(fstat1.st_nlink, 2)
+
+ self.assertTrue(basename(name2) in llfuse.listdir(self.mnt_dir))
+ self.assertTrue(filecmp.cmp(name1, name2, False))
+ os.unlink(name2)
+ fstat1 = os.lstat(name1)
+ self.assertEquals(fstat1.st_nlink, 1)
+ os.unlink(name1)
+
+ def tst_readdir(self):
+ dir_ = os.path.join(self.mnt_dir, self.newname())
+ file_ = dir_ + "/" + self.newname()
+ subdir = dir_ + "/" + self.newname()
+ subfile = subdir + "/" + self.newname()
+ src = self.src
+
+ os.mkdir(dir_)
+ shutil.copyfile(src, file_)
+ os.mkdir(subdir)
+ shutil.copyfile(src, subfile)
+
+ listdir_is = llfuse.listdir(dir_)
+ listdir_is.sort()
+ listdir_should = [ basename(file_), basename(subdir) ]
+ listdir_should.sort()
+ self.assertEquals(listdir_is, listdir_should)
+
+ os.unlink(file_)
+ os.unlink(subfile)
+ os.rmdir(subdir)
+ os.rmdir(dir_)
+
+ def tst_truncate(self):
+ filename = os.path.join(self.mnt_dir, self.newname())
+ src = self.src
+ shutil.copyfile(src, filename)
+ self.assertTrue(filecmp.cmp(filename, src, False))
+ fstat = os.stat(filename)
+ size = fstat.st_size
+ fd = os.open(filename, os.O_RDWR)
+
+ os.ftruncate(fd, size + 1024) # add > 1 block
+ self.assertEquals(os.stat(filename).st_size, size + 1024)
+
+ os.ftruncate(fd, size - 1024) # Truncate > 1 block
+ self.assertEquals(os.stat(filename).st_size, size - 1024)
+
+ os.close(fd)
+ os.unlink(filename)
+
+
+# Somehow important according to pyunit documentation
+def suite():
+ return unittest.makeSuite(fuse_tests)
+
+
+# Allow calling from command line
+if __name__ == "__main__":
+ unittest.main()