summaryrefslogtreecommitdiff
path: root/silx/io/test/test_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/test/test_utils.py')
-rw-r--r--silx/io/test/test_utils.py247
1 files changed, 136 insertions, 111 deletions
diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py
index 23b1124..5a4c629 100644
--- a/silx/io/test/test_utils.py
+++ b/silx/io/test/test_utils.py
@@ -34,16 +34,10 @@ import unittest
from .. import utils
import silx.io.url
-try:
- import h5py
- from ..utils import h5ls
-except ImportError:
- h5py = None
+import h5py
+from ..utils import h5ls
-try:
- import fabio
-except ImportError:
- fabio = None
+import fabio
__authors__ = ["P. Knobel"]
@@ -205,7 +199,6 @@ def assert_match_any_string_in_list(test, pattern, list_of_strings):
return False
-@unittest.skipIf(h5py is None, "Could not import h5py")
class TestH5Ls(unittest.TestCase):
"""Test displaying the following HDF5 file structure:
@@ -293,23 +286,21 @@ class TestOpen(unittest.TestCase):
@classmethod
def createResources(cls, directory):
- if h5py is not None:
- cls.h5_filename = os.path.join(directory, "test.h5")
- h5 = h5py.File(cls.h5_filename, mode="w")
- h5["group/group/dataset"] = 50
- h5.close()
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/dataset"] = 50
+ h5.close()
cls.spec_filename = os.path.join(directory, "test.dat")
utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
fmt=["%d", "%.2f"], close_file=True, scan_number=1)
- if fabio is not None:
- cls.edf_filename = os.path.join(directory, "test.edf")
- header = fabio.fabioimage.OrderedDict()
- header["integer"] = "10"
- data = numpy.array([[10, 50], [50, 10]])
- fabiofile = fabio.edfimage.EdfImage(data, header)
- fabiofile.write(cls.edf_filename)
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
cls.txt_filename = os.path.join(directory, "test.txt")
f = io.open(cls.txt_filename, "w+t")
@@ -323,26 +314,17 @@ class TestOpen(unittest.TestCase):
shutil.rmtree(cls.tmp_directory)
def testH5(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
f = utils.open(self.h5_filename)
self.assertIsNotNone(f)
self.assertIsInstance(f, h5py.File)
f.close()
def testH5With(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
with utils.open(self.h5_filename) as f:
self.assertIsNotNone(f)
self.assertIsInstance(f, h5py.File)
def testH5_withPath(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
f = utils.open(self.h5_filename + "::/group/group/dataset")
self.assertIsNotNone(f)
self.assertEqual(f.h5py_class, h5py.Dataset)
@@ -350,9 +332,6 @@ class TestOpen(unittest.TestCase):
f.close()
def testH5With_withPath(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
with utils.open(self.h5_filename + "::/group/group") as f:
self.assertIsNotNone(f)
self.assertEqual(f.h5py_class, h5py.Group)
@@ -361,33 +340,21 @@ class TestOpen(unittest.TestCase):
def testSpec(self):
f = utils.open(self.spec_filename)
self.assertIsNotNone(f)
- if h5py is not None:
- self.assertEqual(f.h5py_class, h5py.File)
+ self.assertEqual(f.h5py_class, h5py.File)
f.close()
def testSpecWith(self):
with utils.open(self.spec_filename) as f:
self.assertIsNotNone(f)
- if h5py is not None:
- self.assertEqual(f.h5py_class, h5py.File)
+ self.assertEqual(f.h5py_class, h5py.File)
def testEdf(self):
- if h5py is None:
- self.skipTest("H5py is missing")
- if fabio is None:
- self.skipTest("Fabio is missing")
-
f = utils.open(self.edf_filename)
self.assertIsNotNone(f)
self.assertEqual(f.h5py_class, h5py.File)
f.close()
def testEdfWith(self):
- if h5py is None:
- self.skipTest("H5py is missing")
- if fabio is None:
- self.skipTest("Fabio is missing")
-
with utils.open(self.edf_filename) as f:
self.assertIsNotNone(f)
self.assertEqual(f.h5py_class, h5py.File)
@@ -400,18 +367,12 @@ class TestOpen(unittest.TestCase):
self.assertRaises(IOError, utils.open, self.missing_filename)
def test_silx_scheme(self):
- if h5py is None:
- self.skipTest("H5py is missing")
url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/")
with utils.open(url.path()) as f:
self.assertIsNotNone(f)
self.assertTrue(silx.io.utils.is_file(f))
def test_fabio_scheme(self):
- if h5py is None:
- self.skipTest("H5py is missing")
- if fabio is None:
- self.skipTest("Fabio is missing")
url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename)
self.assertRaises(IOError, utils.open, url.path())
@@ -427,9 +388,6 @@ class TestOpen(unittest.TestCase):
class TestNodes(unittest.TestCase):
"""Test `silx.io.utils.is_` functions."""
def test_real_h5py_objects(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
name = tempfile.mktemp(suffix=".h5")
try:
with h5py.File(name, "w") as h5file:
@@ -451,9 +409,6 @@ class TestNodes(unittest.TestCase):
os.unlink(name)
def test_h5py_like_file(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
class Foo(object):
def __init__(self):
self.h5_class = utils.H5Type.FILE
@@ -463,9 +418,6 @@ class TestNodes(unittest.TestCase):
self.assertFalse(utils.is_dataset(obj))
def test_h5py_like_group(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
class Foo(object):
def __init__(self):
self.h5_class = utils.H5Type.GROUP
@@ -475,9 +427,6 @@ class TestNodes(unittest.TestCase):
self.assertFalse(utils.is_dataset(obj))
def test_h5py_like_dataset(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
class Foo(object):
def __init__(self):
self.h5_class = utils.H5Type.DATASET
@@ -487,9 +436,6 @@ class TestNodes(unittest.TestCase):
self.assertTrue(utils.is_dataset(obj))
def test_bad(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
class Foo(object):
def __init__(self):
pass
@@ -499,9 +445,6 @@ class TestNodes(unittest.TestCase):
self.assertFalse(utils.is_dataset(obj))
def test_bad_api(self):
- if h5py is None:
- self.skipTest("H5py is missing")
-
class Foo(object):
def __init__(self):
self.h5_class = int
@@ -522,28 +465,26 @@ class TestGetData(unittest.TestCase):
@classmethod
def createResources(cls, directory):
- if h5py is not None:
- cls.h5_filename = os.path.join(directory, "test.h5")
- h5 = h5py.File(cls.h5_filename, mode="w")
- h5["group/group/scalar"] = 50
- h5["group/group/array"] = [1, 2, 3, 4, 5]
- h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
- h5.close()
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/scalar"] = 50
+ h5["group/group/array"] = [1, 2, 3, 4, 5]
+ h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
+ h5.close()
cls.spec_filename = os.path.join(directory, "test.dat")
utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
fmt=["%d", "%.2f"], close_file=True, scan_number=1)
- if fabio is not None:
- cls.edf_filename = os.path.join(directory, "test.edf")
- cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf")
- header = fabio.fabioimage.OrderedDict()
- header["integer"] = "10"
- data = numpy.array([[10, 50], [50, 10]])
- fabiofile = fabio.edfimage.EdfImage(data, header)
- fabiofile.write(cls.edf_filename)
- fabiofile.appendFrame(data=data, header=header)
- fabiofile.write(cls.edf_multiframe_filename)
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
+ fabiofile.appendFrame(data=data, header=header)
+ fabiofile.write(cls.edf_multiframe_filename)
cls.txt_filename = os.path.join(directory, "test.txt")
f = io.open(cls.txt_filename, "w+t")
@@ -557,69 +498,49 @@ class TestGetData(unittest.TestCase):
shutil.rmtree(cls.tmp_directory)
def test_hdf5_scalar(self):
- if h5py is None:
- self.skipTest("H5py is missing")
url = "silx:%s?/group/group/scalar" % self.h5_filename
data = utils.get_data(url=url)
self.assertEqual(data, 50)
def test_hdf5_array(self):
- if h5py is None:
- self.skipTest("H5py is missing")
url = "silx:%s?/group/group/array" % self.h5_filename
data = utils.get_data(url=url)
self.assertEqual(data.shape, (5, ))
self.assertEqual(data[0], 1)
def test_hdf5_array_slice(self):
- if h5py is None:
- self.skipTest("H5py is missing")
url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename
data = utils.get_data(url=url)
self.assertEqual(data.shape, (5, ))
self.assertEqual(data[0], 6)
def test_hdf5_array_slice_out_of_range(self):
- if h5py is None:
- self.skipTest("H5py is missing")
url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename
self.assertRaises(ValueError, utils.get_data, url)
def test_edf_using_silx(self):
- if h5py is None:
- self.skipTest("H5py is missing")
- if fabio is None:
- self.skipTest("fabio is missing")
url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename
data = utils.get_data(url=url)
self.assertEqual(data.shape, (2, 2))
self.assertEqual(data[0, 0], 10)
def test_fabio_frame(self):
- if fabio is None:
- self.skipTest("fabio is missing")
url = "fabio:%s?slice=1" % self.edf_multiframe_filename
data = utils.get_data(url=url)
self.assertEqual(data.shape, (2, 2))
self.assertEqual(data[0, 0], 10)
def test_fabio_singleframe(self):
- if fabio is None:
- self.skipTest("fabio is missing")
url = "fabio:%s?slice=0" % self.edf_filename
data = utils.get_data(url=url)
self.assertEqual(data.shape, (2, 2))
self.assertEqual(data[0, 0], 10)
def test_fabio_too_much_frames(self):
- if fabio is None:
- self.skipTest("fabio is missing")
url = "fabio:%s?slice=..." % self.edf_multiframe_filename
self.assertRaises(ValueError, utils.get_data, url)
def test_fabio_no_frame(self):
- if fabio is None:
- self.skipTest("fabio is missing")
url = "fabio:%s" % self.edf_filename
data = utils.get_data(url=url)
self.assertEqual(data.shape, (2, 2))
@@ -630,8 +551,6 @@ class TestGetData(unittest.TestCase):
self.assertRaises(ValueError, utils.get_data, url)
def test_no_scheme(self):
- if fabio is None:
- self.skipTest("fabio is missing")
url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename
self.assertRaises((ValueError, IOError), utils.get_data, url)
@@ -640,6 +559,111 @@ class TestGetData(unittest.TestCase):
self.assertRaises(IOError, utils.get_data, url)
+def _h5_py_version_older_than(version):
+ v_majeur, v_mineur, v_micro = h5py.version.version.split('.')[:3]
+ r_majeur, r_mineur, r_micro = version.split('.')
+ return v_majeur >= r_majeur and v_mineur >= r_mineur
+
+
+@unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0')
+class TestRawFileToH5(unittest.TestCase):
+ """Test conversion of .vol file to .h5 external dataset"""
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self._vol_file = os.path.join(self.tempdir, 'test_vol.vol')
+ self._file_info = os.path.join(self.tempdir, 'test_vol.info.vol')
+ self._dataset_shape = 100, 20, 5
+ data = numpy.random.random(self._dataset_shape[0] *
+ self._dataset_shape[1] *
+ self._dataset_shape[2]).astype(dtype=numpy.float32).reshape(self._dataset_shape)
+ numpy.save(file=self._vol_file, arr=data)
+ # those are storing into .noz file
+ assert os.path.exists(self._vol_file + '.npy')
+ os.rename(self._vol_file + '.npy', self._vol_file)
+ self.h5_file = os.path.join(self.tempdir, 'test_h5.h5')
+ self.external_dataset_path= '/root/my_external_dataset'
+ self._data_url = silx.io.url.DataUrl(file_path=self.h5_file,
+ data_path=self.external_dataset_path)
+ with open(self._file_info, 'w') as _fi:
+ _fi.write('NUM_X = %s\n' % self._dataset_shape[2])
+ _fi.write('NUM_Y = %s\n' % self._dataset_shape[1])
+ _fi.write('NUM_Z = %s\n' % self._dataset_shape[0])
+
+ def tearDown(self):
+ shutil.rmtree(self.tempdir)
+
+ def check_dataset(self, h5_file, data_path, shape):
+ """Make sure the external dataset is valid"""
+ with h5py.File(h5_file, 'r') as _file:
+ return data_path in _file and _file[data_path].shape == shape
+
+ def test_h5_file_not_existing(self):
+ """Test that can create a file with external dataset from scratch"""
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+ os.remove(self.h5_file)
+ utils.vol_to_h5_external_dataset(vol_file=self._vol_file,
+ output_url=self._data_url,
+ info_file=self._file_info)
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+ def test_h5_file_existing(self):
+ """Test that can add the external dataset from an existing file"""
+ with h5py.File(self.h5_file, 'w') as _file:
+ _file['/root/dataset1'] = numpy.zeros((100, 100))
+ _file['/root/group/dataset2'] = numpy.ones((100, 100))
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+ def test_vol_file_not_existing(self):
+ """Make sure error is raised if .vol file does not exists"""
+ os.remove(self._vol_file)
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+ def test_conflicts(self):
+ """Test several conflict cases"""
+ # test if path already exists
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+ with self.assertRaises(ValueError):
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ overwrite=False,
+ dtype=numpy.float32)
+
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ overwrite=True,
+ dtype=numpy.float32)
+
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+
def suite():
loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
test_suite = unittest.TestSuite()
@@ -648,6 +672,7 @@ def suite():
test_suite.addTest(loadTests(TestOpen))
test_suite.addTest(loadTests(TestNodes))
test_suite.addTest(loadTests(TestGetData))
+ test_suite.addTest(loadTests(TestRawFileToH5))
return test_suite