diff options
Diffstat (limited to 'silx/io/test')
-rw-r--r-- | silx/io/test/test_dictdump.py | 257 | ||||
-rw-r--r-- | silx/io/test/test_spectoh5.py | 3 | ||||
-rw-r--r-- | silx/io/test/test_url.py | 10 | ||||
-rw-r--r-- | silx/io/test/test_utils.py | 244 |
4 files changed, 462 insertions, 52 deletions
diff --git a/silx/io/test/test_dictdump.py b/silx/io/test/test_dictdump.py index c0b6914..b99116b 100644 --- a/silx/io/test/test_dictdump.py +++ b/silx/io/test/test_dictdump.py @@ -43,6 +43,8 @@ from .. import dictdump from ..dictdump import dicttoh5, dicttojson, dump from ..dictdump import h5todict, load from ..dictdump import logger as dictdump_logger +from ..utils import is_link +from ..utils import h5py_read_dataset def tree(): @@ -58,15 +60,29 @@ city_attrs["Europe"]["France"]["Grenoble"]["inhabitants"] = inhabitants city_attrs["Europe"]["France"]["Grenoble"]["coordinates"] = [45.1830, 5.7196] city_attrs["Europe"]["France"]["Tourcoing"]["area"] +ext_attrs = tree() +ext_attrs["ext_group"]["dataset"] = 10 +ext_filename = "ext.h5" + +link_attrs = tree() +link_attrs["links"]["group"]["dataset"] = 10 +link_attrs["links"]["group"]["relative_softlink"] = h5py.SoftLink("dataset") +link_attrs["links"]["relative_softlink"] = h5py.SoftLink("group/dataset") +link_attrs["links"]["absolute_softlink"] = h5py.SoftLink("/links/group/dataset") +link_attrs["links"]["external_link"] = h5py.ExternalLink(ext_filename, "/ext_group/dataset") + class TestDictToH5(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5") + self.h5_ext_fname = os.path.join(self.tempdir, ext_filename) def tearDown(self): if os.path.exists(self.h5_fname): os.unlink(self.h5_fname) + if os.path.exists(self.h5_ext_fname): + os.unlink(self.h5_ext_fname) os.rmdir(self.tempdir) def testH5CityAttrs(self): @@ -201,31 +217,129 @@ class TestDictToH5(unittest.TestCase): self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11) self.assertEqual(h5file["group/group"].attrs['attr'], 12) + def testLinks(self): + with h5py.File(self.h5_ext_fname, "w") as h5file: + dictdump.dicttoh5(ext_attrs, h5file) + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttoh5(link_attrs, h5file) + with h5py.File(self.h5_fname, "r") as h5file: + self.assertEqual(h5file["links/group/dataset"][()], 10) + self.assertEqual(h5file["links/group/relative_softlink"][()], 10) + self.assertEqual(h5file["links/relative_softlink"][()], 10) + self.assertEqual(h5file["links/absolute_softlink"][()], 10) + self.assertEqual(h5file["links/external_link"][()], 10) + + def testDumpNumpyArray(self): + ddict = { + 'darks': { + '0': numpy.array([[0, 0, 0], [0, 0, 0]], dtype=numpy.uint16) + } + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttoh5(ddict, h5file) + with h5py.File(self.h5_fname, "r") as h5file: + numpy.testing.assert_array_equal(h5py_read_dataset(h5file["darks"]["0"]), + ddict['darks']['0']) + + +class TestH5ToDict(unittest.TestCase): + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5") + self.h5_ext_fname = os.path.join(self.tempdir, ext_filename) + dicttoh5(city_attrs, self.h5_fname) + dicttoh5(link_attrs, self.h5_fname, mode="a") + dicttoh5(ext_attrs, self.h5_ext_fname) + + def tearDown(self): + if os.path.exists(self.h5_fname): + os.unlink(self.h5_fname) + if os.path.exists(self.h5_ext_fname): + os.unlink(self.h5_ext_fname) + os.rmdir(self.tempdir) + + def testExcludeNames(self): + ddict = h5todict(self.h5_fname, path="/Europe/France", + exclude_names=["ourcoing", "inhab", "toto"]) + self.assertNotIn("Tourcoing", ddict) + self.assertIn("Grenoble", ddict) + + self.assertNotIn("inhabitants", ddict["Grenoble"]) + self.assertIn("coordinates", ddict["Grenoble"]) + self.assertIn("area", ddict["Grenoble"]) + + def testAsArrayTrue(self): + """Test with asarray=True, the default""" + ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble") + self.assertTrue(numpy.array_equal(ddict["inhabitants"], numpy.array(inhabitants))) + + def testAsArrayFalse(self): + """Test with asarray=False""" + ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble", asarray=False) + self.assertEqual(ddict["inhabitants"], inhabitants) + + def testDereferenceLinks(self): + ddict = h5todict(self.h5_fname, path="links", dereference_links=True) + self.assertTrue(ddict["absolute_softlink"], 10) + self.assertTrue(ddict["relative_softlink"], 10) + self.assertTrue(ddict["external_link"], 10) + self.assertTrue(ddict["group"]["relative_softlink"], 10) + + def testPreserveLinks(self): + ddict = h5todict(self.h5_fname, path="links", dereference_links=False) + self.assertTrue(is_link(ddict["absolute_softlink"])) + self.assertTrue(is_link(ddict["relative_softlink"])) + self.assertTrue(is_link(ddict["external_link"])) + self.assertTrue(is_link(ddict["group"]["relative_softlink"])) + + def testStrings(self): + ddict = {"dset_bytes": b"bytes", + "dset_utf8": "utf8", + "dset_2bytes": [b"bytes", b"bytes"], + "dset_2utf8": ["utf8", "utf8"], + ("", "attr_bytes"): b"bytes", + ("", "attr_utf8"): "utf8", + ("", "attr_2bytes"): [b"bytes", b"bytes"], + ("", "attr_2utf8"): ["utf8", "utf8"]} + dicttoh5(ddict, self.h5_fname, mode="w") + adict = h5todict(self.h5_fname, include_attributes=True, asarray=False) + self.assertEqual(ddict["dset_bytes"], adict["dset_bytes"]) + self.assertEqual(ddict["dset_utf8"], adict["dset_utf8"]) + self.assertEqual(ddict[("", "attr_bytes")], adict[("", "attr_bytes")]) + self.assertEqual(ddict[("", "attr_utf8")], adict[("", "attr_utf8")]) + numpy.testing.assert_array_equal(ddict["dset_2bytes"], adict["dset_2bytes"]) + numpy.testing.assert_array_equal(ddict["dset_2utf8"], adict["dset_2utf8"]) + numpy.testing.assert_array_equal(ddict[("", "attr_2bytes")], adict[("", "attr_2bytes")]) + numpy.testing.assert_array_equal(ddict[("", "attr_2utf8")], adict[("", "attr_2utf8")]) + class TestDictToNx(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() self.h5_fname = os.path.join(self.tempdir, "nx.h5") + self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5") def tearDown(self): if os.path.exists(self.h5_fname): os.unlink(self.h5_fname) + if os.path.exists(self.h5_ext_fname): + os.unlink(self.h5_ext_fname) os.rmdir(self.tempdir) def testAttributes(self): """Any kind of attribute can be described""" ddict = { - "group": {"datatset": "hmmm", "@group_attr": 10}, - "dataset": "aaaaaaaaaaaaaaa", + "group": {"dataset": 100, "@group_attr1": 10}, + "dataset": 200, "@root_attr": 11, - "dataset@dataset_attr": 12, + "dataset@dataset_attr": "12", "group@group_attr2": 13, } with h5py.File(self.h5_fname, "w") as h5file: dictdump.dicttonx(ddict, h5file) - self.assertEqual(h5file["group"].attrs['group_attr'], 10) + self.assertEqual(h5file["group"].attrs['group_attr1'], 10) self.assertEqual(h5file.attrs['root_attr'], 11) - self.assertEqual(h5file["dataset"].attrs['dataset_attr'], 12) + self.assertEqual(h5file["dataset"].attrs['dataset_attr'], "12") self.assertEqual(h5file["group"].attrs['group_attr2'], 13) def testKeyOrder(self): @@ -280,36 +394,120 @@ class TestDictToNx(unittest.TestCase): self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11) self.assertEqual(h5file["group/group"].attrs['attr'], 12) - -class TestH5ToDict(unittest.TestCase): + def testLinks(self): + ddict = {"ext_group": {"dataset": 10}} + dictdump.dicttonx(ddict, self.h5_ext_fname) + ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"}, + ">relative_softlink": "group/dataset", + ">absolute_softlink": "/links/group/dataset", + ">external_link": "nx_ext.h5::/ext_group/dataset"}} + dictdump.dicttonx(ddict, self.h5_fname) + with h5py.File(self.h5_fname, "r") as h5file: + self.assertEqual(h5file["links/group/dataset"][()], 10) + self.assertEqual(h5file["links/group/relative_softlink"][()], 10) + self.assertEqual(h5file["links/relative_softlink"][()], 10) + self.assertEqual(h5file["links/absolute_softlink"][()], 10) + self.assertEqual(h5file["links/external_link"][()], 10) + + def testUpLinks(self): + ddict = {"data": {"group": {"dataset": 10, ">relative_softlink": "dataset"}}, + "links": {"group": {"subgroup": {">relative_softlink": "../../../data/group/dataset"}}}} + dictdump.dicttonx(ddict, self.h5_fname) + with h5py.File(self.h5_fname, "r") as h5file: + self.assertEqual(h5file["/links/group/subgroup/relative_softlink"][()], 10) + + +class TestNxToDict(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() - self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5") - dicttoh5(city_attrs, self.h5_fname) + self.h5_fname = os.path.join(self.tempdir, "nx.h5") + self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5") def tearDown(self): - os.unlink(self.h5_fname) + if os.path.exists(self.h5_fname): + os.unlink(self.h5_fname) + if os.path.exists(self.h5_ext_fname): + os.unlink(self.h5_ext_fname) os.rmdir(self.tempdir) - def testExcludeNames(self): - ddict = h5todict(self.h5_fname, path="/Europe/France", - exclude_names=["ourcoing", "inhab", "toto"]) - self.assertNotIn("Tourcoing", ddict) - self.assertIn("Grenoble", ddict) - - self.assertNotIn("inhabitants", ddict["Grenoble"]) - self.assertIn("coordinates", ddict["Grenoble"]) - self.assertIn("area", ddict["Grenoble"]) - - def testAsArrayTrue(self): - """Test with asarray=True, the default""" - ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble") - self.assertTrue(numpy.array_equal(ddict["inhabitants"], numpy.array(inhabitants))) - - def testAsArrayFalse(self): - """Test with asarray=False""" - ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble", asarray=False) - self.assertEqual(ddict["inhabitants"], inhabitants) + def testAttributes(self): + """Any kind of attribute can be described""" + ddict = { + "group": {"dataset": 100, "@group_attr1": 10}, + "dataset": 200, + "@root_attr": 11, + "dataset@dataset_attr": "12", + "group@group_attr2": 13, + } + dictdump.dicttonx(ddict, self.h5_fname) + ddict = dictdump.nxtodict(self.h5_fname, include_attributes=True) + self.assertEqual(ddict["group"]["@group_attr1"], 10) + self.assertEqual(ddict["@root_attr"], 11) + self.assertEqual(ddict["dataset@dataset_attr"], "12") + self.assertEqual(ddict["group"]["@group_attr2"], 13) + + def testDereferenceLinks(self): + """Write links and dereference on read""" + ddict = {"ext_group": {"dataset": 10}} + dictdump.dicttonx(ddict, self.h5_ext_fname) + ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"}, + ">relative_softlink": "group/dataset", + ">absolute_softlink": "/links/group/dataset", + ">external_link": "nx_ext.h5::/ext_group/dataset"}} + dictdump.dicttonx(ddict, self.h5_fname) + + ddict = dictdump.h5todict(self.h5_fname, dereference_links=True) + self.assertTrue(ddict["links"]["absolute_softlink"], 10) + self.assertTrue(ddict["links"]["relative_softlink"], 10) + self.assertTrue(ddict["links"]["external_link"], 10) + self.assertTrue(ddict["links"]["group"]["relative_softlink"], 10) + + def testPreserveLinks(self): + """Write/read links""" + ddict = {"ext_group": {"dataset": 10}} + dictdump.dicttonx(ddict, self.h5_ext_fname) + ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"}, + ">relative_softlink": "group/dataset", + ">absolute_softlink": "/links/group/dataset", + ">external_link": "nx_ext.h5::/ext_group/dataset"}} + dictdump.dicttonx(ddict, self.h5_fname) + + ddict = dictdump.nxtodict(self.h5_fname, dereference_links=False) + self.assertTrue(ddict["links"][">absolute_softlink"], "dataset") + self.assertTrue(ddict["links"][">relative_softlink"], "group/dataset") + self.assertTrue(ddict["links"][">external_link"], "/links/group/dataset") + self.assertTrue(ddict["links"]["group"][">relative_softlink"], "nx_ext.h5::/ext_group/datase") + + def testNotExistingPath(self): + """Test converting not existing path""" + with h5py.File(self.h5_fname, 'a') as f: + f['data'] = 1 + + ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='ignore') + self.assertFalse(ddict) + + with TestLogging(dictdump_logger, error=1): + ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='log') + self.assertFalse(ddict) + + with self.assertRaises(KeyError): + h5todict(self.h5_fname, path="/I/am/not/a/path", errors='raise') + + def testBrokenLinks(self): + """Test with broken links""" + with h5py.File(self.h5_fname, 'a') as f: + f["/Mars/BrokenSoftLink"] = h5py.SoftLink("/Idontexists") + f["/Mars/BrokenExternalLink"] = h5py.ExternalLink("notexistingfile.h5", "/Idontexists") + + ddict = h5todict(self.h5_fname, path="/Mars", errors='ignore') + self.assertFalse(ddict) + + with TestLogging(dictdump_logger, error=2): + ddict = h5todict(self.h5_fname, path="/Mars", errors='log') + self.assertFalse(ddict) + + with self.assertRaises(KeyError): + h5todict(self.h5_fname, path="/Mars", errors='raise') class TestDictToJson(unittest.TestCase): @@ -436,6 +634,7 @@ def suite(): test_suite.addTest(loadTests(TestDictToNx)) test_suite.addTest(loadTests(TestDictToJson)) test_suite.addTest(loadTests(TestH5ToDict)) + test_suite.addTest(loadTests(TestNxToDict)) return test_suite diff --git a/silx/io/test/test_spectoh5.py b/silx/io/test/test_spectoh5.py index c3f03e9..903a62c 100644 --- a/silx/io/test/test_spectoh5.py +++ b/silx/io/test/test_spectoh5.py @@ -33,6 +33,7 @@ import h5py from ..spech5 import SpecH5, SpecH5Group from ..convert import convert, write_to_h5 +from ..utils import h5py_read_dataset __authors__ = ["P. Knobel"] __license__ = "MIT" @@ -129,7 +130,7 @@ class TestConvertSpecHDF5(unittest.TestCase): def testTitle(self): """Test the value of a dataset""" - title12 = self.h5f["/1.2/title"][()] + title12 = h5py_read_dataset(self.h5f["/1.2/title"]) self.assertEqual(title12, u"aaaaaa") diff --git a/silx/io/test/test_url.py b/silx/io/test/test_url.py index e68c67a..114f6a7 100644 --- a/silx/io/test/test_url.py +++ b/silx/io/test/test_url.py @@ -152,6 +152,16 @@ class TestDataUrl(unittest.TestCase): expected = [True, True, None, "/a.h5", "/b", (5, 1)] self.assertUrl(url, expected) + def test_slice2(self): + url = DataUrl("/a.h5?path=/b&slice=2:5") + expected = [True, True, None, "/a.h5", "/b", (slice(2, 5),)] + self.assertUrl(url, expected) + + def test_slice3(self): + url = DataUrl("/a.h5?path=/b&slice=::2") + expected = [True, True, None, "/a.h5", "/b", (slice(None, None, 2),)] + self.assertUrl(url, expected) + def test_slice_ellipsis(self): url = DataUrl("/a.h5?path=/b&slice=...") expected = [True, True, None, "/a.h5", "/b", (Ellipsis, )] diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py index 6c70636..13ab532 100644 --- a/silx/io/test/test_utils.py +++ b/silx/io/test/test_utils.py @@ -33,6 +33,7 @@ import unittest import sys from .. import utils +from ..._version import calc_hexversion import silx.io.url import h5py @@ -40,11 +41,9 @@ from ..utils import h5ls import fabio - __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "12/02/2018" - +__date__ = "03/12/2020" expected_spec1 = r"""#F .* #D .* @@ -67,6 +66,28 @@ expected_spec2 = expected_spec1 + r""" 2 8\.00 3 9\.00 """ + +expected_spec2reg = r"""#F .* +#D .* + +#S 1 Ordinate1 +#D .* +#N 3 +#L Abscissa Ordinate1 Ordinate2 +1 4\.00 7\.00 +2 5\.00 8\.00 +3 6\.00 9\.00 +""" + +expected_spec2irr = expected_spec1 + r""" +#S 2 Ordinate2 +#D .* +#N 2 +#L Abscissa Ordinate2 +1 7\.00 +2 8\.00 +""" + expected_csv = r"""Abscissa;Ordinate1;Ordinate2 1;4\.00;7\.00e\+00 2;5\.00;8\.00e\+00 @@ -83,6 +104,7 @@ expected_csv2 = r"""x;y0;y1 class TestSave(unittest.TestCase): """Test saving curves as SpecFile: """ + def setUp(self): self.tempdir = tempfile.mkdtemp() self.spec_fname = os.path.join(self.tempdir, "savespec.dat") @@ -92,6 +114,7 @@ class TestSave(unittest.TestCase): self.x = [1, 2, 3] self.xlab = "Abscissa" self.y = [[4, 5, 6], [7, 8, 9]] + self.y_irr = [[4, 5, 6], [7, 8]] self.ylabs = ["Ordinate1", "Ordinate2"] def tearDown(self): @@ -103,13 +126,6 @@ class TestSave(unittest.TestCase): os.unlink(self.npy_fname) shutil.rmtree(self.tempdir) - def assertRegex(self, *args, **kwargs): - # Python 2 compatibility - if sys.version_info.major >= 3: - return super(TestSave, self).assertRegex(*args, **kwargs) - else: - return self.assertRegexpMatches(*args, **kwargs) - def test_save_csv(self): utils.save1D(self.csv_fname, self.x, self.y, xlabel=self.xlab, ylabels=self.ylabs, @@ -145,7 +161,6 @@ class TestSave(unittest.TestCase): specf = open(self.spec_fname) actual_spec = specf.read() specf.close() - self.assertRegex(actual_spec, expected_spec1) def test_savespec_file_handle(self): @@ -165,18 +180,30 @@ class TestSave(unittest.TestCase): specf = open(self.spec_fname) actual_spec = specf.read() specf.close() - self.assertRegex(actual_spec, expected_spec2) - def test_save_spec(self): - """Save SpecFile using save()""" + def test_save_spec_reg(self): + """Save SpecFile using save() on a regular pattern""" utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab, ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) specf = open(self.spec_fname) actual_spec = specf.read() specf.close() - self.assertRegex(actual_spec, expected_spec2) + + self.assertRegex(actual_spec, expected_spec2reg) + + def test_save_spec_irr(self): + """Save SpecFile using save() on an irregular pattern""" + # invalid test case ?! + return + utils.save1D(self.spec_fname, self.x, self.y_irr, xlabel=self.xlab, + ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) + + specf = open(self.spec_fname) + actual_spec = specf.read() + specf.close() + self.assertRegex(actual_spec, expected_spec2irr) def test_save_csv_no_labels(self): """Save csv using save(), with autoheader=True but @@ -217,6 +244,7 @@ class TestH5Ls(unittest.TestCase): <HDF5 dataset "data": shape (1,), type "<f8"> """ + def assertMatchAnyStringInList(self, pattern, list_of_strings): for string_ in list_of_strings: if re.match(pattern, string_): @@ -395,6 +423,7 @@ class TestOpen(unittest.TestCase): class TestNodes(unittest.TestCase): """Test `silx.io.utils.is_` functions.""" + def test_real_h5py_objects(self): name = tempfile.mktemp(suffix=".h5") try: @@ -417,45 +446,60 @@ class TestNodes(unittest.TestCase): os.unlink(name) def test_h5py_like_file(self): + class Foo(object): + def __init__(self): self.h5_class = utils.H5Type.FILE + obj = Foo() self.assertTrue(utils.is_file(obj)) self.assertTrue(utils.is_group(obj)) self.assertFalse(utils.is_dataset(obj)) def test_h5py_like_group(self): + class Foo(object): + def __init__(self): self.h5_class = utils.H5Type.GROUP + obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertTrue(utils.is_group(obj)) self.assertFalse(utils.is_dataset(obj)) def test_h5py_like_dataset(self): + class Foo(object): + def __init__(self): self.h5_class = utils.H5Type.DATASET + obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) self.assertTrue(utils.is_dataset(obj)) def test_bad(self): + class Foo(object): + def __init__(self): pass + obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) self.assertFalse(utils.is_dataset(obj)) def test_bad_api(self): + class Foo(object): + def __init__(self): self.h5_class = int + obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) @@ -513,18 +557,20 @@ class TestGetData(unittest.TestCase): def test_hdf5_array(self): url = "silx:%s?/group/group/array" % self.h5_filename data = utils.get_data(url=url) - self.assertEqual(data.shape, (5, )) + self.assertEqual(data.shape, (5,)) self.assertEqual(data[0], 1) def test_hdf5_array_slice(self): url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename data = utils.get_data(url=url) - self.assertEqual(data.shape, (5, )) + self.assertEqual(data.shape, (5,)) self.assertEqual(data[0], 6) def test_hdf5_array_slice_out_of_range(self): url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename - self.assertRaises(ValueError, utils.get_data, url) + # ValueError: h5py 2.x + # IndexError: h5py 3.x + self.assertRaises((ValueError, IndexError), utils.get_data, url) def test_edf_using_silx(self): url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename @@ -568,14 +614,15 @@ class TestGetData(unittest.TestCase): def _h5_py_version_older_than(version): - v_majeur, v_mineur, v_micro = h5py.version.version.split('.')[:3] - r_majeur, r_mineur, r_micro = version.split('.') - return v_majeur >= r_majeur and v_mineur >= r_mineur + v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split('.')[:3]] + r_majeur, r_mineur, r_micro = [int(i) for i in version.split('.')] + return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion(r_majeur, r_mineur, r_micro) @unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0') class TestRawFileToH5(unittest.TestCase): """Test conversion of .vol file to .h5 external dataset""" + def setUp(self): self.tempdir = tempfile.mkdtemp() self._vol_file = os.path.join(self.tempdir, 'test_vol.vol') @@ -589,7 +636,7 @@ class TestRawFileToH5(unittest.TestCase): assert os.path.exists(self._vol_file + '.npy') os.rename(self._vol_file + '.npy', self._vol_file) self.h5_file = os.path.join(self.tempdir, 'test_h5.h5') - self.external_dataset_path= '/root/my_external_dataset' + self.external_dataset_path = '/root/my_external_dataset' self._data_url = silx.io.url.DataUrl(file_path=self.h5_file, data_path=self.external_dataset_path) with open(self._file_info, 'w') as _fi: @@ -672,6 +719,158 @@ class TestRawFileToH5(unittest.TestCase): shape=self._dataset_shape)) +class TestH5Strings(unittest.TestCase): + """Test HDF5 str and bytes writing and reading""" + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.vlenstr = h5py.special_dtype(vlen=str) + cls.vlenbytes = h5py.special_dtype(vlen=bytes) + try: + cls.unicode = unicode + except NameError: + cls.unicode = str + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir) + + def setUp(self): + self.file = h5py.File(os.path.join(self.tempdir, 'file.h5'), mode="w") + + def tearDown(self): + self.file.close() + + @classmethod + def _make_array(cls, value, n): + if isinstance(value, bytes): + dtype = cls.vlenbytes + elif isinstance(value, cls.unicode): + dtype = cls.vlenstr + else: + return numpy.array([value] * n) + return numpy.array([value] * n, dtype=dtype) + + @classmethod + def _get_charset(cls, value): + if isinstance(value, bytes): + return h5py.h5t.CSET_ASCII + elif isinstance(value, cls.unicode): + return h5py.h5t.CSET_UTF8 + else: + return None + + def _check_dataset(self, value, result=None): + # Write+read scalar + if result: + decode_ascii = True + else: + decode_ascii = False + result = value + charset = self._get_charset(value) + self.file["data"] = value + data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii) + assert type(data) == type(result), data + assert data == result, data + if charset: + assert self.file["data"].id.get_type().get_cset() == charset + + # Write+read variable length + self.file["vlen_data"] = self._make_array(value, 2) + data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii, index=0) + assert type(data) == type(result), data + assert data == result, data + data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii) + numpy.testing.assert_array_equal(data, [result] * 2) + if charset: + assert self.file["vlen_data"].id.get_type().get_cset() == charset + + def _check_attribute(self, value, result=None): + if result: + decode_ascii = True + else: + decode_ascii = False + result = value + self.file.attrs["data"] = value + data = utils.h5py_read_attribute(self.file.attrs, "data", decode_ascii=decode_ascii) + assert type(data) == type(result), data + assert data == result, data + + self.file.attrs["vlen_data"] = self._make_array(value, 2) + data = utils.h5py_read_attribute(self.file.attrs, "vlen_data", decode_ascii=decode_ascii) + assert type(data[0]) == type(result), data[0] + assert data[0] == result, data[0] + numpy.testing.assert_array_equal(data, [result] * 2) + + data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)["vlen_data"] + assert type(data[0]) == type(result), data[0] + assert data[0] == result, data[0] + numpy.testing.assert_array_equal(data, [result] * 2) + + def test_dataset_ascii_bytes(self): + self._check_dataset(b"abc") + + def test_attribute_ascii_bytes(self): + self._check_attribute(b"abc") + + def test_dataset_ascii_bytes_decode(self): + self._check_dataset(b"abc", result="abc") + + def test_attribute_ascii_bytes_decode(self): + self._check_attribute(b"abc", result="abc") + + def test_dataset_ascii_str(self): + self._check_dataset("abc") + + def test_attribute_ascii_str(self): + self._check_attribute("abc") + + def test_dataset_utf8_str(self): + self._check_dataset("\u0101bc") + + def test_attribute_utf8_str(self): + self._check_attribute("\u0101bc") + + def test_dataset_utf8_bytes(self): + # 0xC481 is the byte representation of U+0101 + self._check_dataset(b"\xc4\x81bc") + + def test_attribute_utf8_bytes(self): + # 0xC481 is the byte representation of U+0101 + self._check_attribute(b"\xc4\x81bc") + + def test_dataset_utf8_bytes_decode(self): + # 0xC481 is the byte representation of U+0101 + self._check_dataset(b"\xc4\x81bc", result="\u0101bc") + + def test_attribute_utf8_bytes_decode(self): + # 0xC481 is the byte representation of U+0101 + self._check_attribute(b"\xc4\x81bc", result="\u0101bc") + + def test_dataset_latin1_bytes(self): + # extended ascii character 0xE4 + self._check_dataset(b"\xe423") + + def test_attribute_latin1_bytes(self): + # extended ascii character 0xE4 + self._check_attribute(b"\xe423") + + def test_dataset_latin1_bytes_decode(self): + # U+DCE4: surrogate for extended ascii character 0xE4 + self._check_dataset(b"\xe423", result="\udce423") + + def test_attribute_latin1_bytes_decode(self): + # U+DCE4: surrogate for extended ascii character 0xE4 + self._check_attribute(b"\xe423", result="\udce423") + + def test_dataset_no_string(self): + self._check_dataset(numpy.int64(10)) + + def test_attribute_no_string(self): + self._check_attribute(numpy.int64(10)) + + def suite(): loadTests = unittest.defaultTestLoader.loadTestsFromTestCase test_suite = unittest.TestSuite() @@ -681,6 +880,7 @@ def suite(): test_suite.addTest(loadTests(TestNodes)) test_suite.addTest(loadTests(TestGetData)) test_suite.addTest(loadTests(TestRawFileToH5)) + test_suite.addTest(loadTests(TestH5Strings)) return test_suite |