summaryrefslogtreecommitdiff
path: root/silx/io/test
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/test')
-rw-r--r--silx/io/test/test_dictdump.py257
-rw-r--r--silx/io/test/test_spectoh5.py3
-rw-r--r--silx/io/test/test_url.py10
-rw-r--r--silx/io/test/test_utils.py244
4 files changed, 462 insertions, 52 deletions
diff --git a/silx/io/test/test_dictdump.py b/silx/io/test/test_dictdump.py
index c0b6914..b99116b 100644
--- a/silx/io/test/test_dictdump.py
+++ b/silx/io/test/test_dictdump.py
@@ -43,6 +43,8 @@ from .. import dictdump
from ..dictdump import dicttoh5, dicttojson, dump
from ..dictdump import h5todict, load
from ..dictdump import logger as dictdump_logger
+from ..utils import is_link
+from ..utils import h5py_read_dataset
def tree():
@@ -58,15 +60,29 @@ city_attrs["Europe"]["France"]["Grenoble"]["inhabitants"] = inhabitants
city_attrs["Europe"]["France"]["Grenoble"]["coordinates"] = [45.1830, 5.7196]
city_attrs["Europe"]["France"]["Tourcoing"]["area"]
+ext_attrs = tree()
+ext_attrs["ext_group"]["dataset"] = 10
+ext_filename = "ext.h5"
+
+link_attrs = tree()
+link_attrs["links"]["group"]["dataset"] = 10
+link_attrs["links"]["group"]["relative_softlink"] = h5py.SoftLink("dataset")
+link_attrs["links"]["relative_softlink"] = h5py.SoftLink("group/dataset")
+link_attrs["links"]["absolute_softlink"] = h5py.SoftLink("/links/group/dataset")
+link_attrs["links"]["external_link"] = h5py.ExternalLink(ext_filename, "/ext_group/dataset")
+
class TestDictToH5(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, ext_filename)
def tearDown(self):
if os.path.exists(self.h5_fname):
os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
os.rmdir(self.tempdir)
def testH5CityAttrs(self):
@@ -201,31 +217,129 @@ class TestDictToH5(unittest.TestCase):
self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11)
self.assertEqual(h5file["group/group"].attrs['attr'], 12)
+ def testLinks(self):
+ with h5py.File(self.h5_ext_fname, "w") as h5file:
+ dictdump.dicttoh5(ext_attrs, h5file)
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(link_attrs, h5file)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ self.assertEqual(h5file["links/group/dataset"][()], 10)
+ self.assertEqual(h5file["links/group/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/absolute_softlink"][()], 10)
+ self.assertEqual(h5file["links/external_link"][()], 10)
+
+ def testDumpNumpyArray(self):
+ ddict = {
+ 'darks': {
+ '0': numpy.array([[0, 0, 0], [0, 0, 0]], dtype=numpy.uint16)
+ }
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ numpy.testing.assert_array_equal(h5py_read_dataset(h5file["darks"]["0"]),
+ ddict['darks']['0'])
+
+
+class TestH5ToDict(unittest.TestCase):
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, ext_filename)
+ dicttoh5(city_attrs, self.h5_fname)
+ dicttoh5(link_attrs, self.h5_fname, mode="a")
+ dicttoh5(ext_attrs, self.h5_ext_fname)
+
+ def tearDown(self):
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
+ os.rmdir(self.tempdir)
+
+ def testExcludeNames(self):
+ ddict = h5todict(self.h5_fname, path="/Europe/France",
+ exclude_names=["ourcoing", "inhab", "toto"])
+ self.assertNotIn("Tourcoing", ddict)
+ self.assertIn("Grenoble", ddict)
+
+ self.assertNotIn("inhabitants", ddict["Grenoble"])
+ self.assertIn("coordinates", ddict["Grenoble"])
+ self.assertIn("area", ddict["Grenoble"])
+
+ def testAsArrayTrue(self):
+ """Test with asarray=True, the default"""
+ ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble")
+ self.assertTrue(numpy.array_equal(ddict["inhabitants"], numpy.array(inhabitants)))
+
+ def testAsArrayFalse(self):
+ """Test with asarray=False"""
+ ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble", asarray=False)
+ self.assertEqual(ddict["inhabitants"], inhabitants)
+
+ def testDereferenceLinks(self):
+ ddict = h5todict(self.h5_fname, path="links", dereference_links=True)
+ self.assertTrue(ddict["absolute_softlink"], 10)
+ self.assertTrue(ddict["relative_softlink"], 10)
+ self.assertTrue(ddict["external_link"], 10)
+ self.assertTrue(ddict["group"]["relative_softlink"], 10)
+
+ def testPreserveLinks(self):
+ ddict = h5todict(self.h5_fname, path="links", dereference_links=False)
+ self.assertTrue(is_link(ddict["absolute_softlink"]))
+ self.assertTrue(is_link(ddict["relative_softlink"]))
+ self.assertTrue(is_link(ddict["external_link"]))
+ self.assertTrue(is_link(ddict["group"]["relative_softlink"]))
+
+ def testStrings(self):
+ ddict = {"dset_bytes": b"bytes",
+ "dset_utf8": "utf8",
+ "dset_2bytes": [b"bytes", b"bytes"],
+ "dset_2utf8": ["utf8", "utf8"],
+ ("", "attr_bytes"): b"bytes",
+ ("", "attr_utf8"): "utf8",
+ ("", "attr_2bytes"): [b"bytes", b"bytes"],
+ ("", "attr_2utf8"): ["utf8", "utf8"]}
+ dicttoh5(ddict, self.h5_fname, mode="w")
+ adict = h5todict(self.h5_fname, include_attributes=True, asarray=False)
+ self.assertEqual(ddict["dset_bytes"], adict["dset_bytes"])
+ self.assertEqual(ddict["dset_utf8"], adict["dset_utf8"])
+ self.assertEqual(ddict[("", "attr_bytes")], adict[("", "attr_bytes")])
+ self.assertEqual(ddict[("", "attr_utf8")], adict[("", "attr_utf8")])
+ numpy.testing.assert_array_equal(ddict["dset_2bytes"], adict["dset_2bytes"])
+ numpy.testing.assert_array_equal(ddict["dset_2utf8"], adict["dset_2utf8"])
+ numpy.testing.assert_array_equal(ddict[("", "attr_2bytes")], adict[("", "attr_2bytes")])
+ numpy.testing.assert_array_equal(ddict[("", "attr_2utf8")], adict[("", "attr_2utf8")])
+
class TestDictToNx(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.h5_fname = os.path.join(self.tempdir, "nx.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5")
def tearDown(self):
if os.path.exists(self.h5_fname):
os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
os.rmdir(self.tempdir)
def testAttributes(self):
"""Any kind of attribute can be described"""
ddict = {
- "group": {"datatset": "hmmm", "@group_attr": 10},
- "dataset": "aaaaaaaaaaaaaaa",
+ "group": {"dataset": 100, "@group_attr1": 10},
+ "dataset": 200,
"@root_attr": 11,
- "dataset@dataset_attr": 12,
+ "dataset@dataset_attr": "12",
"group@group_attr2": 13,
}
with h5py.File(self.h5_fname, "w") as h5file:
dictdump.dicttonx(ddict, h5file)
- self.assertEqual(h5file["group"].attrs['group_attr'], 10)
+ self.assertEqual(h5file["group"].attrs['group_attr1'], 10)
self.assertEqual(h5file.attrs['root_attr'], 11)
- self.assertEqual(h5file["dataset"].attrs['dataset_attr'], 12)
+ self.assertEqual(h5file["dataset"].attrs['dataset_attr'], "12")
self.assertEqual(h5file["group"].attrs['group_attr2'], 13)
def testKeyOrder(self):
@@ -280,36 +394,120 @@ class TestDictToNx(unittest.TestCase):
self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11)
self.assertEqual(h5file["group/group"].attrs['attr'], 12)
-
-class TestH5ToDict(unittest.TestCase):
+ def testLinks(self):
+ ddict = {"ext_group": {"dataset": 10}}
+ dictdump.dicttonx(ddict, self.h5_ext_fname)
+ ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"},
+ ">relative_softlink": "group/dataset",
+ ">absolute_softlink": "/links/group/dataset",
+ ">external_link": "nx_ext.h5::/ext_group/dataset"}}
+ dictdump.dicttonx(ddict, self.h5_fname)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ self.assertEqual(h5file["links/group/dataset"][()], 10)
+ self.assertEqual(h5file["links/group/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/absolute_softlink"][()], 10)
+ self.assertEqual(h5file["links/external_link"][()], 10)
+
+ def testUpLinks(self):
+ ddict = {"data": {"group": {"dataset": 10, ">relative_softlink": "dataset"}},
+ "links": {"group": {"subgroup": {">relative_softlink": "../../../data/group/dataset"}}}}
+ dictdump.dicttonx(ddict, self.h5_fname)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ self.assertEqual(h5file["/links/group/subgroup/relative_softlink"][()], 10)
+
+
+class TestNxToDict(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
- self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
- dicttoh5(city_attrs, self.h5_fname)
+ self.h5_fname = os.path.join(self.tempdir, "nx.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5")
def tearDown(self):
- os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
os.rmdir(self.tempdir)
- def testExcludeNames(self):
- ddict = h5todict(self.h5_fname, path="/Europe/France",
- exclude_names=["ourcoing", "inhab", "toto"])
- self.assertNotIn("Tourcoing", ddict)
- self.assertIn("Grenoble", ddict)
-
- self.assertNotIn("inhabitants", ddict["Grenoble"])
- self.assertIn("coordinates", ddict["Grenoble"])
- self.assertIn("area", ddict["Grenoble"])
-
- def testAsArrayTrue(self):
- """Test with asarray=True, the default"""
- ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble")
- self.assertTrue(numpy.array_equal(ddict["inhabitants"], numpy.array(inhabitants)))
-
- def testAsArrayFalse(self):
- """Test with asarray=False"""
- ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble", asarray=False)
- self.assertEqual(ddict["inhabitants"], inhabitants)
+ def testAttributes(self):
+ """Any kind of attribute can be described"""
+ ddict = {
+ "group": {"dataset": 100, "@group_attr1": 10},
+ "dataset": 200,
+ "@root_attr": 11,
+ "dataset@dataset_attr": "12",
+ "group@group_attr2": 13,
+ }
+ dictdump.dicttonx(ddict, self.h5_fname)
+ ddict = dictdump.nxtodict(self.h5_fname, include_attributes=True)
+ self.assertEqual(ddict["group"]["@group_attr1"], 10)
+ self.assertEqual(ddict["@root_attr"], 11)
+ self.assertEqual(ddict["dataset@dataset_attr"], "12")
+ self.assertEqual(ddict["group"]["@group_attr2"], 13)
+
+ def testDereferenceLinks(self):
+ """Write links and dereference on read"""
+ ddict = {"ext_group": {"dataset": 10}}
+ dictdump.dicttonx(ddict, self.h5_ext_fname)
+ ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"},
+ ">relative_softlink": "group/dataset",
+ ">absolute_softlink": "/links/group/dataset",
+ ">external_link": "nx_ext.h5::/ext_group/dataset"}}
+ dictdump.dicttonx(ddict, self.h5_fname)
+
+ ddict = dictdump.h5todict(self.h5_fname, dereference_links=True)
+ self.assertTrue(ddict["links"]["absolute_softlink"], 10)
+ self.assertTrue(ddict["links"]["relative_softlink"], 10)
+ self.assertTrue(ddict["links"]["external_link"], 10)
+ self.assertTrue(ddict["links"]["group"]["relative_softlink"], 10)
+
+ def testPreserveLinks(self):
+ """Write/read links"""
+ ddict = {"ext_group": {"dataset": 10}}
+ dictdump.dicttonx(ddict, self.h5_ext_fname)
+ ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"},
+ ">relative_softlink": "group/dataset",
+ ">absolute_softlink": "/links/group/dataset",
+ ">external_link": "nx_ext.h5::/ext_group/dataset"}}
+ dictdump.dicttonx(ddict, self.h5_fname)
+
+ ddict = dictdump.nxtodict(self.h5_fname, dereference_links=False)
+ self.assertTrue(ddict["links"][">absolute_softlink"], "dataset")
+ self.assertTrue(ddict["links"][">relative_softlink"], "group/dataset")
+ self.assertTrue(ddict["links"][">external_link"], "/links/group/dataset")
+ self.assertTrue(ddict["links"]["group"][">relative_softlink"], "nx_ext.h5::/ext_group/datase")
+
+ def testNotExistingPath(self):
+ """Test converting not existing path"""
+ with h5py.File(self.h5_fname, 'a') as f:
+ f['data'] = 1
+
+ ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='ignore')
+ self.assertFalse(ddict)
+
+ with TestLogging(dictdump_logger, error=1):
+ ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='log')
+ self.assertFalse(ddict)
+
+ with self.assertRaises(KeyError):
+ h5todict(self.h5_fname, path="/I/am/not/a/path", errors='raise')
+
+ def testBrokenLinks(self):
+ """Test with broken links"""
+ with h5py.File(self.h5_fname, 'a') as f:
+ f["/Mars/BrokenSoftLink"] = h5py.SoftLink("/Idontexists")
+ f["/Mars/BrokenExternalLink"] = h5py.ExternalLink("notexistingfile.h5", "/Idontexists")
+
+ ddict = h5todict(self.h5_fname, path="/Mars", errors='ignore')
+ self.assertFalse(ddict)
+
+ with TestLogging(dictdump_logger, error=2):
+ ddict = h5todict(self.h5_fname, path="/Mars", errors='log')
+ self.assertFalse(ddict)
+
+ with self.assertRaises(KeyError):
+ h5todict(self.h5_fname, path="/Mars", errors='raise')
class TestDictToJson(unittest.TestCase):
@@ -436,6 +634,7 @@ def suite():
test_suite.addTest(loadTests(TestDictToNx))
test_suite.addTest(loadTests(TestDictToJson))
test_suite.addTest(loadTests(TestH5ToDict))
+ test_suite.addTest(loadTests(TestNxToDict))
return test_suite
diff --git a/silx/io/test/test_spectoh5.py b/silx/io/test/test_spectoh5.py
index c3f03e9..903a62c 100644
--- a/silx/io/test/test_spectoh5.py
+++ b/silx/io/test/test_spectoh5.py
@@ -33,6 +33,7 @@ import h5py
from ..spech5 import SpecH5, SpecH5Group
from ..convert import convert, write_to_h5
+from ..utils import h5py_read_dataset
__authors__ = ["P. Knobel"]
__license__ = "MIT"
@@ -129,7 +130,7 @@ class TestConvertSpecHDF5(unittest.TestCase):
def testTitle(self):
"""Test the value of a dataset"""
- title12 = self.h5f["/1.2/title"][()]
+ title12 = h5py_read_dataset(self.h5f["/1.2/title"])
self.assertEqual(title12,
u"aaaaaa")
diff --git a/silx/io/test/test_url.py b/silx/io/test/test_url.py
index e68c67a..114f6a7 100644
--- a/silx/io/test/test_url.py
+++ b/silx/io/test/test_url.py
@@ -152,6 +152,16 @@ class TestDataUrl(unittest.TestCase):
expected = [True, True, None, "/a.h5", "/b", (5, 1)]
self.assertUrl(url, expected)
+ def test_slice2(self):
+ url = DataUrl("/a.h5?path=/b&slice=2:5")
+ expected = [True, True, None, "/a.h5", "/b", (slice(2, 5),)]
+ self.assertUrl(url, expected)
+
+ def test_slice3(self):
+ url = DataUrl("/a.h5?path=/b&slice=::2")
+ expected = [True, True, None, "/a.h5", "/b", (slice(None, None, 2),)]
+ self.assertUrl(url, expected)
+
def test_slice_ellipsis(self):
url = DataUrl("/a.h5?path=/b&slice=...")
expected = [True, True, None, "/a.h5", "/b", (Ellipsis, )]
diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py
index 6c70636..13ab532 100644
--- a/silx/io/test/test_utils.py
+++ b/silx/io/test/test_utils.py
@@ -33,6 +33,7 @@ import unittest
import sys
from .. import utils
+from ..._version import calc_hexversion
import silx.io.url
import h5py
@@ -40,11 +41,9 @@ from ..utils import h5ls
import fabio
-
__authors__ = ["P. Knobel"]
__license__ = "MIT"
-__date__ = "12/02/2018"
-
+__date__ = "03/12/2020"
expected_spec1 = r"""#F .*
#D .*
@@ -67,6 +66,28 @@ expected_spec2 = expected_spec1 + r"""
2 8\.00
3 9\.00
"""
+
+expected_spec2reg = r"""#F .*
+#D .*
+
+#S 1 Ordinate1
+#D .*
+#N 3
+#L Abscissa Ordinate1 Ordinate2
+1 4\.00 7\.00
+2 5\.00 8\.00
+3 6\.00 9\.00
+"""
+
+expected_spec2irr = expected_spec1 + r"""
+#S 2 Ordinate2
+#D .*
+#N 2
+#L Abscissa Ordinate2
+1 7\.00
+2 8\.00
+"""
+
expected_csv = r"""Abscissa;Ordinate1;Ordinate2
1;4\.00;7\.00e\+00
2;5\.00;8\.00e\+00
@@ -83,6 +104,7 @@ expected_csv2 = r"""x;y0;y1
class TestSave(unittest.TestCase):
"""Test saving curves as SpecFile:
"""
+
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
@@ -92,6 +114,7 @@ class TestSave(unittest.TestCase):
self.x = [1, 2, 3]
self.xlab = "Abscissa"
self.y = [[4, 5, 6], [7, 8, 9]]
+ self.y_irr = [[4, 5, 6], [7, 8]]
self.ylabs = ["Ordinate1", "Ordinate2"]
def tearDown(self):
@@ -103,13 +126,6 @@ class TestSave(unittest.TestCase):
os.unlink(self.npy_fname)
shutil.rmtree(self.tempdir)
- def assertRegex(self, *args, **kwargs):
- # Python 2 compatibility
- if sys.version_info.major >= 3:
- return super(TestSave, self).assertRegex(*args, **kwargs)
- else:
- return self.assertRegexpMatches(*args, **kwargs)
-
def test_save_csv(self):
utils.save1D(self.csv_fname, self.x, self.y,
xlabel=self.xlab, ylabels=self.ylabs,
@@ -145,7 +161,6 @@ class TestSave(unittest.TestCase):
specf = open(self.spec_fname)
actual_spec = specf.read()
specf.close()
-
self.assertRegex(actual_spec, expected_spec1)
def test_savespec_file_handle(self):
@@ -165,18 +180,30 @@ class TestSave(unittest.TestCase):
specf = open(self.spec_fname)
actual_spec = specf.read()
specf.close()
-
self.assertRegex(actual_spec, expected_spec2)
- def test_save_spec(self):
- """Save SpecFile using save()"""
+ def test_save_spec_reg(self):
+ """Save SpecFile using save() on a regular pattern"""
utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab,
ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
specf = open(self.spec_fname)
actual_spec = specf.read()
specf.close()
- self.assertRegex(actual_spec, expected_spec2)
+
+ self.assertRegex(actual_spec, expected_spec2reg)
+
+ def test_save_spec_irr(self):
+ """Save SpecFile using save() on an irregular pattern"""
+ # invalid test case ?!
+ return
+ utils.save1D(self.spec_fname, self.x, self.y_irr, xlabel=self.xlab,
+ ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec2irr)
def test_save_csv_no_labels(self):
"""Save csv using save(), with autoheader=True but
@@ -217,6 +244,7 @@ class TestH5Ls(unittest.TestCase):
<HDF5 dataset "data": shape (1,), type "<f8">
"""
+
def assertMatchAnyStringInList(self, pattern, list_of_strings):
for string_ in list_of_strings:
if re.match(pattern, string_):
@@ -395,6 +423,7 @@ class TestOpen(unittest.TestCase):
class TestNodes(unittest.TestCase):
"""Test `silx.io.utils.is_` functions."""
+
def test_real_h5py_objects(self):
name = tempfile.mktemp(suffix=".h5")
try:
@@ -417,45 +446,60 @@ class TestNodes(unittest.TestCase):
os.unlink(name)
def test_h5py_like_file(self):
+
class Foo(object):
+
def __init__(self):
self.h5_class = utils.H5Type.FILE
+
obj = Foo()
self.assertTrue(utils.is_file(obj))
self.assertTrue(utils.is_group(obj))
self.assertFalse(utils.is_dataset(obj))
def test_h5py_like_group(self):
+
class Foo(object):
+
def __init__(self):
self.h5_class = utils.H5Type.GROUP
+
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertTrue(utils.is_group(obj))
self.assertFalse(utils.is_dataset(obj))
def test_h5py_like_dataset(self):
+
class Foo(object):
+
def __init__(self):
self.h5_class = utils.H5Type.DATASET
+
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
self.assertTrue(utils.is_dataset(obj))
def test_bad(self):
+
class Foo(object):
+
def __init__(self):
pass
+
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
self.assertFalse(utils.is_dataset(obj))
def test_bad_api(self):
+
class Foo(object):
+
def __init__(self):
self.h5_class = int
+
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
@@ -513,18 +557,20 @@ class TestGetData(unittest.TestCase):
def test_hdf5_array(self):
url = "silx:%s?/group/group/array" % self.h5_filename
data = utils.get_data(url=url)
- self.assertEqual(data.shape, (5, ))
+ self.assertEqual(data.shape, (5,))
self.assertEqual(data[0], 1)
def test_hdf5_array_slice(self):
url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename
data = utils.get_data(url=url)
- self.assertEqual(data.shape, (5, ))
+ self.assertEqual(data.shape, (5,))
self.assertEqual(data[0], 6)
def test_hdf5_array_slice_out_of_range(self):
url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename
- self.assertRaises(ValueError, utils.get_data, url)
+ # ValueError: h5py 2.x
+ # IndexError: h5py 3.x
+ self.assertRaises((ValueError, IndexError), utils.get_data, url)
def test_edf_using_silx(self):
url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename
@@ -568,14 +614,15 @@ class TestGetData(unittest.TestCase):
def _h5_py_version_older_than(version):
- v_majeur, v_mineur, v_micro = h5py.version.version.split('.')[:3]
- r_majeur, r_mineur, r_micro = version.split('.')
- return v_majeur >= r_majeur and v_mineur >= r_mineur
+ v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split('.')[:3]]
+ r_majeur, r_mineur, r_micro = [int(i) for i in version.split('.')]
+ return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion(r_majeur, r_mineur, r_micro)
@unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0')
class TestRawFileToH5(unittest.TestCase):
"""Test conversion of .vol file to .h5 external dataset"""
+
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self._vol_file = os.path.join(self.tempdir, 'test_vol.vol')
@@ -589,7 +636,7 @@ class TestRawFileToH5(unittest.TestCase):
assert os.path.exists(self._vol_file + '.npy')
os.rename(self._vol_file + '.npy', self._vol_file)
self.h5_file = os.path.join(self.tempdir, 'test_h5.h5')
- self.external_dataset_path= '/root/my_external_dataset'
+ self.external_dataset_path = '/root/my_external_dataset'
self._data_url = silx.io.url.DataUrl(file_path=self.h5_file,
data_path=self.external_dataset_path)
with open(self._file_info, 'w') as _fi:
@@ -672,6 +719,158 @@ class TestRawFileToH5(unittest.TestCase):
shape=self._dataset_shape))
+class TestH5Strings(unittest.TestCase):
+ """Test HDF5 str and bytes writing and reading"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.vlenstr = h5py.special_dtype(vlen=str)
+ cls.vlenbytes = h5py.special_dtype(vlen=bytes)
+ try:
+ cls.unicode = unicode
+ except NameError:
+ cls.unicode = str
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir)
+
+ def setUp(self):
+ self.file = h5py.File(os.path.join(self.tempdir, 'file.h5'), mode="w")
+
+ def tearDown(self):
+ self.file.close()
+
+ @classmethod
+ def _make_array(cls, value, n):
+ if isinstance(value, bytes):
+ dtype = cls.vlenbytes
+ elif isinstance(value, cls.unicode):
+ dtype = cls.vlenstr
+ else:
+ return numpy.array([value] * n)
+ return numpy.array([value] * n, dtype=dtype)
+
+ @classmethod
+ def _get_charset(cls, value):
+ if isinstance(value, bytes):
+ return h5py.h5t.CSET_ASCII
+ elif isinstance(value, cls.unicode):
+ return h5py.h5t.CSET_UTF8
+ else:
+ return None
+
+ def _check_dataset(self, value, result=None):
+ # Write+read scalar
+ if result:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+ charset = self._get_charset(value)
+ self.file["data"] = value
+ data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii)
+ assert type(data) == type(result), data
+ assert data == result, data
+ if charset:
+ assert self.file["data"].id.get_type().get_cset() == charset
+
+ # Write+read variable length
+ self.file["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii, index=0)
+ assert type(data) == type(result), data
+ assert data == result, data
+ data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii)
+ numpy.testing.assert_array_equal(data, [result] * 2)
+ if charset:
+ assert self.file["vlen_data"].id.get_type().get_cset() == charset
+
+ def _check_attribute(self, value, result=None):
+ if result:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+ self.file.attrs["data"] = value
+ data = utils.h5py_read_attribute(self.file.attrs, "data", decode_ascii=decode_ascii)
+ assert type(data) == type(result), data
+ assert data == result, data
+
+ self.file.attrs["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_attribute(self.file.attrs, "vlen_data", decode_ascii=decode_ascii)
+ assert type(data[0]) == type(result), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)["vlen_data"]
+ assert type(data[0]) == type(result), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ def test_dataset_ascii_bytes(self):
+ self._check_dataset(b"abc")
+
+ def test_attribute_ascii_bytes(self):
+ self._check_attribute(b"abc")
+
+ def test_dataset_ascii_bytes_decode(self):
+ self._check_dataset(b"abc", result="abc")
+
+ def test_attribute_ascii_bytes_decode(self):
+ self._check_attribute(b"abc", result="abc")
+
+ def test_dataset_ascii_str(self):
+ self._check_dataset("abc")
+
+ def test_attribute_ascii_str(self):
+ self._check_attribute("abc")
+
+ def test_dataset_utf8_str(self):
+ self._check_dataset("\u0101bc")
+
+ def test_attribute_utf8_str(self):
+ self._check_attribute("\u0101bc")
+
+ def test_dataset_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc")
+
+ def test_attribute_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc")
+
+ def test_dataset_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_attribute_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_dataset_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_dataset(b"\xe423")
+
+ def test_attribute_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_attribute(b"\xe423")
+
+ def test_dataset_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_dataset(b"\xe423", result="\udce423")
+
+ def test_attribute_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_attribute(b"\xe423", result="\udce423")
+
+ def test_dataset_no_string(self):
+ self._check_dataset(numpy.int64(10))
+
+ def test_attribute_no_string(self):
+ self._check_attribute(numpy.int64(10))
+
+
def suite():
loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
test_suite = unittest.TestSuite()
@@ -681,6 +880,7 @@ def suite():
test_suite.addTest(loadTests(TestNodes))
test_suite.addTest(loadTests(TestGetData))
test_suite.addTest(loadTests(TestRawFileToH5))
+ test_suite.addTest(loadTests(TestH5Strings))
return test_suite