diff options
author | Picca Frédéric-Emmanuel <picca@synchrotron-soleil.fr> | 2019-05-28 08:16:16 +0200 |
---|---|---|
committer | Picca Frédéric-Emmanuel <picca@synchrotron-soleil.fr> | 2019-05-28 08:16:16 +0200 |
commit | a763e5d1b3921b3194f3d4e94ab9de3fbe08bbdd (patch) | |
tree | 45d462ed36a5522e9f3b9fde6c4ec4918c2ae8e3 /silx/io/test | |
parent | cebdc9244c019224846cb8d2668080fe386a6adc (diff) |
New upstream version 0.10.1+dfsg
Diffstat (limited to 'silx/io/test')
-rw-r--r-- | silx/io/test/test_commonh5.py | 61 | ||||
-rw-r--r-- | silx/io/test/test_dictdump.py | 9 | ||||
-rw-r--r-- | silx/io/test/test_fabioh5.py | 42 | ||||
-rw-r--r-- | silx/io/test/test_nxdata.py | 13 | ||||
-rw-r--r-- | silx/io/test/test_spech5.py | 9 | ||||
-rw-r--r-- | silx/io/test/test_spectoh5.py | 13 | ||||
-rw-r--r-- | silx/io/test/test_utils.py | 247 |
7 files changed, 180 insertions, 214 deletions
diff --git a/silx/io/test/test_commonh5.py b/silx/io/test/test_commonh5.py index 05ef558..168ef34 100644 --- a/silx/io/test/test_commonh5.py +++ b/silx/io/test/test_commonh5.py @@ -37,11 +37,7 @@ _logger = logging.getLogger(__name__) import silx.io import silx.io.utils - -try: - import h5py -except ImportError: - h5py = None +import h5py try: from .. import commonh5 @@ -86,11 +82,10 @@ class TestCommonFeatures(unittest.TestCase): self.assertTrue(silx.io.is_group(node)) self.assertFalse(silx.io.is_dataset(node)) self.assertEqual(len(node.attrs), 0) - if h5py is not None: - class_ = self.h5.get("group", getclass=True) - classlink = self.h5.get("group", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Group) - self.assertEqual(classlink, h5py.HardLink) + class_ = self.h5.get("group", getclass=True) + classlink = self.h5.get("group", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Group) + self.assertEqual(classlink, h5py.HardLink) def test_dataset(self): node = self.h5["group/dataset"] @@ -98,41 +93,37 @@ class TestCommonFeatures(unittest.TestCase): self.assertFalse(silx.io.is_group(node)) self.assertTrue(silx.io.is_dataset(node)) self.assertEqual(len(node.attrs), 0) - if h5py is not None: - class_ = self.h5.get("group/dataset", getclass=True) - classlink = self.h5.get("group/dataset", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Dataset) - self.assertEqual(classlink, h5py.HardLink) + class_ = self.h5.get("group/dataset", getclass=True) + classlink = self.h5.get("group/dataset", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertEqual(classlink, h5py.HardLink) def test_soft_link(self): node = self.h5["link/soft_link"] self.assertEqual(node.name, "/link/soft_link") - if h5py is not None: - class_ = self.h5.get("link/soft_link", getclass=True) - link = self.h5.get("link/soft_link", getlink=True) - classlink = self.h5.get("link/soft_link", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Dataset) - self.assertTrue(isinstance(link, (h5py.SoftLink, commonh5.SoftLink))) - self.assertTrue(silx.io.utils.is_softlink(link)) - self.assertEqual(classlink, h5py.SoftLink) - + class_ = self.h5.get("link/soft_link", getclass=True) + link = self.h5.get("link/soft_link", getlink=True) + classlink = self.h5.get("link/soft_link", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertTrue(isinstance(link, (h5py.SoftLink, commonh5.SoftLink))) + self.assertTrue(silx.io.utils.is_softlink(link)) + self.assertEqual(classlink, h5py.SoftLink) + def test_external_link(self): node = self.h5["link/external_link"] self.assertEqual(node.name, "/target/dataset") - if h5py is not None: - class_ = self.h5.get("link/external_link", getclass=True) - classlink = self.h5.get("link/external_link", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Dataset) - self.assertEqual(classlink, h5py.ExternalLink) + class_ = self.h5.get("link/external_link", getclass=True) + classlink = self.h5.get("link/external_link", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertEqual(classlink, h5py.ExternalLink) def test_external_link_to_link(self): node = self.h5["link/external_link_to_link"] self.assertEqual(node.name, "/target/link") - if h5py is not None: - class_ = self.h5.get("link/external_link_to_link", getclass=True) - classlink = self.h5.get("link/external_link_to_link", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Dataset) - self.assertEqual(classlink, h5py.ExternalLink) + class_ = self.h5.get("link/external_link_to_link", getclass=True) + classlink = self.h5.get("link/external_link_to_link", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertEqual(classlink, h5py.ExternalLink) def test_create_groups(self): c = self.h5.create_group(self.id() + "/a/b/c") @@ -218,8 +209,6 @@ class TestSpecificCommonH5(unittest.TestCase): Test of shared features should be done by TestCommonFeatures.""" def setUp(self): - if h5py is None: - self.skipTest("h5py is needed") if commonh5 is None: self.skipTest("silx.io.commonh5 is needed") diff --git a/silx/io/test/test_dictdump.py b/silx/io/test/test_dictdump.py index 9cd054c..ab8161d 100644 --- a/silx/io/test/test_dictdump.py +++ b/silx/io/test/test_dictdump.py @@ -32,12 +32,7 @@ import numpy import os import tempfile import unittest - -try: - import h5py - h5py_missing = False -except ImportError: - h5py_missing = True +import h5py from collections import defaultdict @@ -61,7 +56,6 @@ city_attrs["Europe"]["France"]["Grenoble"]["coordinates"] = [45.1830, 5.7196] city_attrs["Europe"]["France"]["Tourcoing"]["area"] -@unittest.skipIf(h5py_missing, "Could not import h5py") class TestDictToH5(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() @@ -115,7 +109,6 @@ class TestDictToH5(unittest.TestCase): assert(res['t'] == False) -@unittest.skipIf(h5py_missing, "Could not import h5py") class TestH5ToDict(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() diff --git a/silx/io/test/test_fabioh5.py b/silx/io/test/test_fabioh5.py index 0eb0949..afd1b49 100644 --- a/silx/io/test/test_fabioh5.py +++ b/silx/io/test/test_fabioh5.py @@ -36,29 +36,16 @@ import shutil _logger = logging.getLogger(__name__) +import fabio +import h5py -try: - import fabio -except ImportError: - fabio = None - -try: - import h5py -except ImportError: - h5py = None - -if fabio is not None and h5py is not None: - from .. import fabioh5 - from .. import commonh5 +from .. import commonh5 +from .. import fabioh5 class TestFabioH5(unittest.TestCase): def setUp(self): - if fabio is None: - self.skipTest("fabio is needed") - if h5py is None: - self.skipTest("h5py is needed") header = { "integer": "-100", @@ -381,10 +368,6 @@ class TestFabioH5MultiFrames(unittest.TestCase): @classmethod def setUpClass(cls): - if fabio is None: - raise unittest.SkipTest("fabio is needed") - if h5py is None: - raise unittest.SkipTest("h5py is needed") names = ["A", "B", "C", "D"] values = [["32000", "-10", "5.0", "1"], @@ -470,10 +453,6 @@ class TestFabioH5WithEdf(unittest.TestCase): @classmethod def setUpClass(cls): - if fabio is None: - raise unittest.SkipTest("fabio is needed") - if h5py is None: - raise unittest.SkipTest("h5py is needed") cls.tmp_directory = tempfile.mkdtemp() @@ -510,21 +489,16 @@ class TestFabioH5WithEdf(unittest.TestCase): self.assertNotIn("/scan_0/instrument/detector_0/others/HeaderID", self.h5_image) -if fabio is not None: - class _TestableFrameData(fabioh5.FrameData): - """Allow to test if the full data is reached.""" - def _create_data(self): - raise RuntimeError("Not supposed to be called") +class _TestableFrameData(fabioh5.FrameData): + """Allow to test if the full data is reached.""" + def _create_data(self): + raise RuntimeError("Not supposed to be called") class TestFabioH5WithFileSeries(unittest.TestCase): @classmethod def setUpClass(cls): - if fabio is None: - raise unittest.SkipTest("fabio is needed") - if h5py is None: - raise unittest.SkipTest("h5py is needed") cls.tmp_directory = tempfile.mkdtemp() diff --git a/silx/io/test/test_nxdata.py b/silx/io/test/test_nxdata.py index fab71dc..a790e36 100644 --- a/silx/io/test/test_nxdata.py +++ b/silx/io/test/test_nxdata.py @@ -27,21 +27,19 @@ __authors__ = ["P. Knobel"] __license__ = "MIT" __date__ = "27/01/2018" -try: - import h5py -except ImportError: - h5py = None -import numpy + import tempfile import unittest +import h5py +import numpy +import six + from .. import nxdata -from silx.third_party import six text_dtype = h5py.special_dtype(vlen=six.text_type) -@unittest.skipIf(h5py is None, "silx.io.nxdata tests depend on h5py") class TestNXdata(unittest.TestCase): def setUp(self): tmp = tempfile.NamedTemporaryFile(prefix="nxdata_examples_", suffix=".h5", delete=True) @@ -358,7 +356,6 @@ class TestNXdata(unittest.TestCase): self.assertIsNone(nxd.interpretation) -@unittest.skipIf(h5py is None, "silx.io.nxdata tests depend on h5py") class TestLegacyNXdata(unittest.TestCase): def setUp(self): tmp = tempfile.NamedTemporaryFile(prefix="nxdata_legacy_examples_", diff --git a/silx/io/test/test_spech5.py b/silx/io/test/test_spech5.py index b842243..7c8ed5d 100644 --- a/silx/io/test/test_spech5.py +++ b/silx/io/test/test_spech5.py @@ -37,10 +37,7 @@ from .. import spech5 from ..spech5 import (SpecH5, SpecH5Dataset, spec_date_to_iso8601) from .. import specfile -try: - import h5py -except ImportError: - h5py = None +import h5py __authors__ = ["P. Knobel"] __license__ = "MIT" @@ -299,8 +296,6 @@ class TestSpecH5(unittest.TestCase): def testGetClass(self): """Test :meth:`SpecH5Group.get`""" - if h5py is None: - self.skipTest("h5py is not available") self.assertIs(self.sfh5["1.1"].get("start_time", getclass=True), h5py.Dataset) self.assertIs(self.sfh5["1.1"].get("instrument", getclass=True), @@ -309,7 +304,6 @@ class TestSpecH5(unittest.TestCase): # spech5 does not define external link, so there is no way # a group can *get* a SpecH5 class - @unittest.skipIf(h5py is None, "test requires h5py (not installed)") def testGetApi(self): result = self.sfh5.get("1.1", getclass=True, getlink=True) self.assertIs(result, h5py.HardLink) @@ -332,7 +326,6 @@ class TestSpecH5(unittest.TestCase): self.assertEqual(self.sfh5["/1.2/instrument/positioners"], self.sfh5["1.2"]["instrument"]["positioners"]) - @unittest.skipIf(h5py is None, "test requires h5py (not installed)") def testH5pyClass(self): """Test :attr:`h5py_class` returns the corresponding h5py class (h5py.File, h5py.Group, h5py.Dataset)""" diff --git a/silx/io/test/test_spectoh5.py b/silx/io/test/test_spectoh5.py index 8020731..2b79704 100644 --- a/silx/io/test/test_spectoh5.py +++ b/silx/io/test/test_spectoh5.py @@ -29,14 +29,10 @@ import sys import tempfile import unittest -try: - import h5py -except ImportError: - h5py_missing = True -else: - h5py_missing = False - from ..spech5 import SpecH5, SpecH5Group - from ..convert import convert, write_to_h5 +import h5py + +from ..spech5 import SpecH5, SpecH5Group +from ..convert import convert, write_to_h5 __authors__ = ["P. Knobel"] __license__ = "MIT" @@ -87,7 +83,6 @@ sftext = """#F /tmp/sf.dat """ -@unittest.skipIf(h5py_missing, "Could not import h5py") class TestConvertSpecHDF5(unittest.TestCase): @classmethod def setUpClass(cls): diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py index 23b1124..5a4c629 100644 --- a/silx/io/test/test_utils.py +++ b/silx/io/test/test_utils.py @@ -34,16 +34,10 @@ import unittest from .. import utils import silx.io.url -try: - import h5py - from ..utils import h5ls -except ImportError: - h5py = None +import h5py +from ..utils import h5ls -try: - import fabio -except ImportError: - fabio = None +import fabio __authors__ = ["P. Knobel"] @@ -205,7 +199,6 @@ def assert_match_any_string_in_list(test, pattern, list_of_strings): return False -@unittest.skipIf(h5py is None, "Could not import h5py") class TestH5Ls(unittest.TestCase): """Test displaying the following HDF5 file structure: @@ -293,23 +286,21 @@ class TestOpen(unittest.TestCase): @classmethod def createResources(cls, directory): - if h5py is not None: - cls.h5_filename = os.path.join(directory, "test.h5") - h5 = h5py.File(cls.h5_filename, mode="w") - h5["group/group/dataset"] = 50 - h5.close() + cls.h5_filename = os.path.join(directory, "test.h5") + h5 = h5py.File(cls.h5_filename, mode="w") + h5["group/group/dataset"] = 50 + h5.close() cls.spec_filename = os.path.join(directory, "test.dat") utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y", fmt=["%d", "%.2f"], close_file=True, scan_number=1) - if fabio is not None: - cls.edf_filename = os.path.join(directory, "test.edf") - header = fabio.fabioimage.OrderedDict() - header["integer"] = "10" - data = numpy.array([[10, 50], [50, 10]]) - fabiofile = fabio.edfimage.EdfImage(data, header) - fabiofile.write(cls.edf_filename) + cls.edf_filename = os.path.join(directory, "test.edf") + header = fabio.fabioimage.OrderedDict() + header["integer"] = "10" + data = numpy.array([[10, 50], [50, 10]]) + fabiofile = fabio.edfimage.EdfImage(data, header) + fabiofile.write(cls.edf_filename) cls.txt_filename = os.path.join(directory, "test.txt") f = io.open(cls.txt_filename, "w+t") @@ -323,26 +314,17 @@ class TestOpen(unittest.TestCase): shutil.rmtree(cls.tmp_directory) def testH5(self): - if h5py is None: - self.skipTest("H5py is missing") - f = utils.open(self.h5_filename) self.assertIsNotNone(f) self.assertIsInstance(f, h5py.File) f.close() def testH5With(self): - if h5py is None: - self.skipTest("H5py is missing") - with utils.open(self.h5_filename) as f: self.assertIsNotNone(f) self.assertIsInstance(f, h5py.File) def testH5_withPath(self): - if h5py is None: - self.skipTest("H5py is missing") - f = utils.open(self.h5_filename + "::/group/group/dataset") self.assertIsNotNone(f) self.assertEqual(f.h5py_class, h5py.Dataset) @@ -350,9 +332,6 @@ class TestOpen(unittest.TestCase): f.close() def testH5With_withPath(self): - if h5py is None: - self.skipTest("H5py is missing") - with utils.open(self.h5_filename + "::/group/group") as f: self.assertIsNotNone(f) self.assertEqual(f.h5py_class, h5py.Group) @@ -361,33 +340,21 @@ class TestOpen(unittest.TestCase): def testSpec(self): f = utils.open(self.spec_filename) self.assertIsNotNone(f) - if h5py is not None: - self.assertEqual(f.h5py_class, h5py.File) + self.assertEqual(f.h5py_class, h5py.File) f.close() def testSpecWith(self): with utils.open(self.spec_filename) as f: self.assertIsNotNone(f) - if h5py is not None: - self.assertEqual(f.h5py_class, h5py.File) + self.assertEqual(f.h5py_class, h5py.File) def testEdf(self): - if h5py is None: - self.skipTest("H5py is missing") - if fabio is None: - self.skipTest("Fabio is missing") - f = utils.open(self.edf_filename) self.assertIsNotNone(f) self.assertEqual(f.h5py_class, h5py.File) f.close() def testEdfWith(self): - if h5py is None: - self.skipTest("H5py is missing") - if fabio is None: - self.skipTest("Fabio is missing") - with utils.open(self.edf_filename) as f: self.assertIsNotNone(f) self.assertEqual(f.h5py_class, h5py.File) @@ -400,18 +367,12 @@ class TestOpen(unittest.TestCase): self.assertRaises(IOError, utils.open, self.missing_filename) def test_silx_scheme(self): - if h5py is None: - self.skipTest("H5py is missing") url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/") with utils.open(url.path()) as f: self.assertIsNotNone(f) self.assertTrue(silx.io.utils.is_file(f)) def test_fabio_scheme(self): - if h5py is None: - self.skipTest("H5py is missing") - if fabio is None: - self.skipTest("Fabio is missing") url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename) self.assertRaises(IOError, utils.open, url.path()) @@ -427,9 +388,6 @@ class TestOpen(unittest.TestCase): class TestNodes(unittest.TestCase): """Test `silx.io.utils.is_` functions.""" def test_real_h5py_objects(self): - if h5py is None: - self.skipTest("H5py is missing") - name = tempfile.mktemp(suffix=".h5") try: with h5py.File(name, "w") as h5file: @@ -451,9 +409,6 @@ class TestNodes(unittest.TestCase): os.unlink(name) def test_h5py_like_file(self): - if h5py is None: - self.skipTest("H5py is missing") - class Foo(object): def __init__(self): self.h5_class = utils.H5Type.FILE @@ -463,9 +418,6 @@ class TestNodes(unittest.TestCase): self.assertFalse(utils.is_dataset(obj)) def test_h5py_like_group(self): - if h5py is None: - self.skipTest("H5py is missing") - class Foo(object): def __init__(self): self.h5_class = utils.H5Type.GROUP @@ -475,9 +427,6 @@ class TestNodes(unittest.TestCase): self.assertFalse(utils.is_dataset(obj)) def test_h5py_like_dataset(self): - if h5py is None: - self.skipTest("H5py is missing") - class Foo(object): def __init__(self): self.h5_class = utils.H5Type.DATASET @@ -487,9 +436,6 @@ class TestNodes(unittest.TestCase): self.assertTrue(utils.is_dataset(obj)) def test_bad(self): - if h5py is None: - self.skipTest("H5py is missing") - class Foo(object): def __init__(self): pass @@ -499,9 +445,6 @@ class TestNodes(unittest.TestCase): self.assertFalse(utils.is_dataset(obj)) def test_bad_api(self): - if h5py is None: - self.skipTest("H5py is missing") - class Foo(object): def __init__(self): self.h5_class = int @@ -522,28 +465,26 @@ class TestGetData(unittest.TestCase): @classmethod def createResources(cls, directory): - if h5py is not None: - cls.h5_filename = os.path.join(directory, "test.h5") - h5 = h5py.File(cls.h5_filename, mode="w") - h5["group/group/scalar"] = 50 - h5["group/group/array"] = [1, 2, 3, 4, 5] - h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]] - h5.close() + cls.h5_filename = os.path.join(directory, "test.h5") + h5 = h5py.File(cls.h5_filename, mode="w") + h5["group/group/scalar"] = 50 + h5["group/group/array"] = [1, 2, 3, 4, 5] + h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]] + h5.close() cls.spec_filename = os.path.join(directory, "test.dat") utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y", fmt=["%d", "%.2f"], close_file=True, scan_number=1) - if fabio is not None: - cls.edf_filename = os.path.join(directory, "test.edf") - cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf") - header = fabio.fabioimage.OrderedDict() - header["integer"] = "10" - data = numpy.array([[10, 50], [50, 10]]) - fabiofile = fabio.edfimage.EdfImage(data, header) - fabiofile.write(cls.edf_filename) - fabiofile.appendFrame(data=data, header=header) - fabiofile.write(cls.edf_multiframe_filename) + cls.edf_filename = os.path.join(directory, "test.edf") + cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf") + header = fabio.fabioimage.OrderedDict() + header["integer"] = "10" + data = numpy.array([[10, 50], [50, 10]]) + fabiofile = fabio.edfimage.EdfImage(data, header) + fabiofile.write(cls.edf_filename) + fabiofile.appendFrame(data=data, header=header) + fabiofile.write(cls.edf_multiframe_filename) cls.txt_filename = os.path.join(directory, "test.txt") f = io.open(cls.txt_filename, "w+t") @@ -557,69 +498,49 @@ class TestGetData(unittest.TestCase): shutil.rmtree(cls.tmp_directory) def test_hdf5_scalar(self): - if h5py is None: - self.skipTest("H5py is missing") url = "silx:%s?/group/group/scalar" % self.h5_filename data = utils.get_data(url=url) self.assertEqual(data, 50) def test_hdf5_array(self): - if h5py is None: - self.skipTest("H5py is missing") url = "silx:%s?/group/group/array" % self.h5_filename data = utils.get_data(url=url) self.assertEqual(data.shape, (5, )) self.assertEqual(data[0], 1) def test_hdf5_array_slice(self): - if h5py is None: - self.skipTest("H5py is missing") url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename data = utils.get_data(url=url) self.assertEqual(data.shape, (5, )) self.assertEqual(data[0], 6) def test_hdf5_array_slice_out_of_range(self): - if h5py is None: - self.skipTest("H5py is missing") url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename self.assertRaises(ValueError, utils.get_data, url) def test_edf_using_silx(self): - if h5py is None: - self.skipTest("H5py is missing") - if fabio is None: - self.skipTest("fabio is missing") url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename data = utils.get_data(url=url) self.assertEqual(data.shape, (2, 2)) self.assertEqual(data[0, 0], 10) def test_fabio_frame(self): - if fabio is None: - self.skipTest("fabio is missing") url = "fabio:%s?slice=1" % self.edf_multiframe_filename data = utils.get_data(url=url) self.assertEqual(data.shape, (2, 2)) self.assertEqual(data[0, 0], 10) def test_fabio_singleframe(self): - if fabio is None: - self.skipTest("fabio is missing") url = "fabio:%s?slice=0" % self.edf_filename data = utils.get_data(url=url) self.assertEqual(data.shape, (2, 2)) self.assertEqual(data[0, 0], 10) def test_fabio_too_much_frames(self): - if fabio is None: - self.skipTest("fabio is missing") url = "fabio:%s?slice=..." % self.edf_multiframe_filename self.assertRaises(ValueError, utils.get_data, url) def test_fabio_no_frame(self): - if fabio is None: - self.skipTest("fabio is missing") url = "fabio:%s" % self.edf_filename data = utils.get_data(url=url) self.assertEqual(data.shape, (2, 2)) @@ -630,8 +551,6 @@ class TestGetData(unittest.TestCase): self.assertRaises(ValueError, utils.get_data, url) def test_no_scheme(self): - if fabio is None: - self.skipTest("fabio is missing") url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename self.assertRaises((ValueError, IOError), utils.get_data, url) @@ -640,6 +559,111 @@ class TestGetData(unittest.TestCase): self.assertRaises(IOError, utils.get_data, url) +def _h5_py_version_older_than(version): + v_majeur, v_mineur, v_micro = h5py.version.version.split('.')[:3] + r_majeur, r_mineur, r_micro = version.split('.') + return v_majeur >= r_majeur and v_mineur >= r_mineur + + +@unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0') +class TestRawFileToH5(unittest.TestCase): + """Test conversion of .vol file to .h5 external dataset""" + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self._vol_file = os.path.join(self.tempdir, 'test_vol.vol') + self._file_info = os.path.join(self.tempdir, 'test_vol.info.vol') + self._dataset_shape = 100, 20, 5 + data = numpy.random.random(self._dataset_shape[0] * + self._dataset_shape[1] * + self._dataset_shape[2]).astype(dtype=numpy.float32).reshape(self._dataset_shape) + numpy.save(file=self._vol_file, arr=data) + # those are storing into .noz file + assert os.path.exists(self._vol_file + '.npy') + os.rename(self._vol_file + '.npy', self._vol_file) + self.h5_file = os.path.join(self.tempdir, 'test_h5.h5') + self.external_dataset_path= '/root/my_external_dataset' + self._data_url = silx.io.url.DataUrl(file_path=self.h5_file, + data_path=self.external_dataset_path) + with open(self._file_info, 'w') as _fi: + _fi.write('NUM_X = %s\n' % self._dataset_shape[2]) + _fi.write('NUM_Y = %s\n' % self._dataset_shape[1]) + _fi.write('NUM_Z = %s\n' % self._dataset_shape[0]) + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def check_dataset(self, h5_file, data_path, shape): + """Make sure the external dataset is valid""" + with h5py.File(h5_file, 'r') as _file: + return data_path in _file and _file[data_path].shape == shape + + def test_h5_file_not_existing(self): + """Test that can create a file with external dataset from scratch""" + utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + dtype=numpy.float32) + self.assertTrue(self.check_dataset(h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape)) + os.remove(self.h5_file) + utils.vol_to_h5_external_dataset(vol_file=self._vol_file, + output_url=self._data_url, + info_file=self._file_info) + self.assertTrue(self.check_dataset(h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape)) + + def test_h5_file_existing(self): + """Test that can add the external dataset from an existing file""" + with h5py.File(self.h5_file, 'w') as _file: + _file['/root/dataset1'] = numpy.zeros((100, 100)) + _file['/root/group/dataset2'] = numpy.ones((100, 100)) + utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + dtype=numpy.float32) + self.assertTrue(self.check_dataset(h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape)) + + def test_vol_file_not_existing(self): + """Make sure error is raised if .vol file does not exists""" + os.remove(self._vol_file) + utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + dtype=numpy.float32) + + self.assertTrue(self.check_dataset(h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape)) + + def test_conflicts(self): + """Test several conflict cases""" + # test if path already exists + utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + dtype=numpy.float32) + with self.assertRaises(ValueError): + utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + overwrite=False, + dtype=numpy.float32) + + utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + overwrite=True, + dtype=numpy.float32) + + self.assertTrue(self.check_dataset(h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape)) + + def suite(): loadTests = unittest.defaultTestLoader.loadTestsFromTestCase test_suite = unittest.TestSuite() @@ -648,6 +672,7 @@ def suite(): test_suite.addTest(loadTests(TestOpen)) test_suite.addTest(loadTests(TestNodes)) test_suite.addTest(loadTests(TestGetData)) + test_suite.addTest(loadTests(TestRawFileToH5)) return test_suite |