summaryrefslogtreecommitdiff
path: root/src/silx/io/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/silx/io/test')
-rw-r--r--src/silx/io/test/__init__.py23
-rw-r--r--src/silx/io/test/test_commonh5.py285
-rw-r--r--src/silx/io/test/test_dictdump.py1009
-rwxr-xr-xsrc/silx/io/test/test_fabioh5.py615
-rw-r--r--src/silx/io/test/test_fioh5.py299
-rw-r--r--src/silx/io/test/test_h5py_utils.py451
-rw-r--r--src/silx/io/test/test_nxdata.py563
-rw-r--r--src/silx/io/test/test_octaveh5.py156
-rw-r--r--src/silx/io/test/test_rawh5.py85
-rw-r--r--src/silx/io/test/test_specfile.py420
-rw-r--r--src/silx/io/test/test_specfilewrapper.py195
-rw-r--r--src/silx/io/test/test_spech5.py929
-rw-r--r--src/silx/io/test/test_spectoh5.py183
-rw-r--r--src/silx/io/test/test_url.py217
-rw-r--r--src/silx/io/test/test_utils.py923
-rw-r--r--src/silx/io/test/test_write_to_h5.py118
16 files changed, 6471 insertions, 0 deletions
diff --git a/src/silx/io/test/__init__.py b/src/silx/io/test/__init__.py
new file mode 100644
index 0000000..244d090
--- /dev/null
+++ b/src/silx/io/test/__init__.py
@@ -0,0 +1,23 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
diff --git a/src/silx/io/test/test_commonh5.py b/src/silx/io/test/test_commonh5.py
new file mode 100644
index 0000000..27f6e8c
--- /dev/null
+++ b/src/silx/io/test/test_commonh5.py
@@ -0,0 +1,285 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for commonh5 wrapper"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "21/09/2017"
+
+import logging
+import numpy
+import unittest
+import tempfile
+import shutil
+
+_logger = logging.getLogger(__name__)
+
+import silx.io
+import silx.io.utils
+import h5py
+
+try:
+ from .. import commonh5
+except ImportError:
+ commonh5 = None
+
+
+class _TestCommonFeatures(unittest.TestCase):
+ """Test common features supported by h5py and our implementation."""
+ __test__ = False # ignore abstract class tests
+
+ @classmethod
+ def createFile(cls):
+ return None
+
+ @classmethod
+ def setUpClass(cls):
+ # Set to None cause create_resource can raise an excpetion
+ cls.h5 = None
+ cls.h5 = cls.create_resource()
+ if cls.h5 is None:
+ raise unittest.SkipTest("File not created")
+
+ @classmethod
+ def create_resource(cls):
+ """Must be implemented"""
+ return None
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.h5 = None
+
+ def test_file(self):
+ node = self.h5
+ self.assertTrue(silx.io.is_file(node))
+ self.assertTrue(silx.io.is_group(node))
+ self.assertFalse(silx.io.is_dataset(node))
+ self.assertEqual(len(node.attrs), 0)
+
+ def test_group(self):
+ node = self.h5["group"]
+ self.assertFalse(silx.io.is_file(node))
+ self.assertTrue(silx.io.is_group(node))
+ self.assertFalse(silx.io.is_dataset(node))
+ self.assertEqual(len(node.attrs), 0)
+ class_ = self.h5.get("group", getclass=True)
+ classlink = self.h5.get("group", getlink=True, getclass=True)
+ self.assertEqual(class_, h5py.Group)
+ self.assertEqual(classlink, h5py.HardLink)
+
+ def test_dataset(self):
+ node = self.h5["group/dataset"]
+ self.assertFalse(silx.io.is_file(node))
+ self.assertFalse(silx.io.is_group(node))
+ self.assertTrue(silx.io.is_dataset(node))
+ self.assertEqual(len(node.attrs), 0)
+ class_ = self.h5.get("group/dataset", getclass=True)
+ classlink = self.h5.get("group/dataset", getlink=True, getclass=True)
+ self.assertEqual(class_, h5py.Dataset)
+ self.assertEqual(classlink, h5py.HardLink)
+
+ def test_soft_link(self):
+ node = self.h5["link/soft_link"]
+ self.assertEqual(node.name, "/link/soft_link")
+ class_ = self.h5.get("link/soft_link", getclass=True)
+ link = self.h5.get("link/soft_link", getlink=True)
+ classlink = self.h5.get("link/soft_link", getlink=True, getclass=True)
+ self.assertEqual(class_, h5py.Dataset)
+ self.assertTrue(isinstance(link, (h5py.SoftLink, commonh5.SoftLink)))
+ self.assertTrue(silx.io.utils.is_softlink(link))
+ self.assertEqual(classlink, h5py.SoftLink)
+
+ def test_external_link(self):
+ node = self.h5["link/external_link"]
+ self.assertEqual(node.name, "/target/dataset")
+ class_ = self.h5.get("link/external_link", getclass=True)
+ classlink = self.h5.get("link/external_link", getlink=True, getclass=True)
+ self.assertEqual(class_, h5py.Dataset)
+ self.assertEqual(classlink, h5py.ExternalLink)
+
+ def test_external_link_to_link(self):
+ node = self.h5["link/external_link_to_link"]
+ self.assertEqual(node.name, "/target/link")
+ class_ = self.h5.get("link/external_link_to_link", getclass=True)
+ classlink = self.h5.get("link/external_link_to_link", getlink=True, getclass=True)
+ self.assertEqual(class_, h5py.Dataset)
+ self.assertEqual(classlink, h5py.ExternalLink)
+
+ def test_create_groups(self):
+ c = self.h5.create_group(self.id() + "/a/b/c")
+ d = c.create_group("/" + self.id() + "/a/b/d")
+
+ self.assertRaises(ValueError, self.h5.create_group, self.id() + "/a/b/d")
+ self.assertEqual(c.name, "/" + self.id() + "/a/b/c")
+ self.assertEqual(d.name, "/" + self.id() + "/a/b/d")
+
+ def test_setitem_python_object_dataset(self):
+ group = self.h5.create_group(self.id())
+ group["a"] = 10
+ self.assertEqual(group["a"].dtype.kind, "i")
+
+ def test_setitem_numpy_dataset(self):
+ group = self.h5.create_group(self.id())
+ group["a"] = numpy.array([10, 20, 30])
+ self.assertEqual(group["a"].dtype.kind, "i")
+ self.assertEqual(group["a"].shape, (3,))
+
+ def test_setitem_link(self):
+ group = self.h5.create_group(self.id())
+ group["a"] = 10
+ group["b"] = group["a"]
+ self.assertEqual(group["b"].dtype.kind, "i")
+
+ def test_setitem_dataset_is_sub_group(self):
+ self.h5[self.id() + "/a"] = 10
+
+
+class TestCommonFeatures_h5py(_TestCommonFeatures):
+ """Check if h5py is compliant with what we expect."""
+ __test__ = True # because _TestCommonFeatures is ignored
+
+ @classmethod
+ def create_resource(cls):
+ cls.tmp_dir = tempfile.mkdtemp()
+
+ externalh5 = h5py.File(cls.tmp_dir + "/external.h5", mode="w")
+ externalh5["target/dataset"] = 50
+ externalh5["target/link"] = h5py.SoftLink("/target/dataset")
+ externalh5.close()
+
+ h5 = h5py.File(cls.tmp_dir + "/base.h5", mode="w")
+ h5["group/dataset"] = 50
+ h5["link/soft_link"] = h5py.SoftLink("/group/dataset")
+ h5["link/external_link"] = h5py.ExternalLink("external.h5", "/target/dataset")
+ h5["link/external_link_to_link"] = h5py.ExternalLink("external.h5", "/target/link")
+
+ return h5
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCommonFeatures_h5py, cls).tearDownClass()
+ if hasattr(cls, "tmp_dir") and cls.tmp_dir is not None:
+ shutil.rmtree(cls.tmp_dir)
+
+
+class TestCommonFeatures_commonH5(_TestCommonFeatures):
+ """Check if commonh5 is compliant with h5py."""
+ __test__ = True # because _TestCommonFeatures is ignored
+
+ @classmethod
+ def create_resource(cls):
+ h5 = commonh5.File("base.h5", "w")
+ h5.create_group("group").create_dataset("dataset", data=numpy.int32(50))
+
+ link = h5.create_group("link")
+ link.add_node(commonh5.SoftLink("soft_link", "/group/dataset"))
+
+ return h5
+
+ def test_external_link(self):
+ # not applicable
+ pass
+
+ def test_external_link_to_link(self):
+ # not applicable
+ pass
+
+
+class TestSpecificCommonH5(unittest.TestCase):
+ """Test specific features from commonh5.
+
+ Test of shared features should be done by TestCommonFeatures."""
+
+ def setUp(self):
+ if commonh5 is None:
+ self.skipTest("silx.io.commonh5 is needed")
+
+ def test_node_attrs(self):
+ node = commonh5.Node("Foo", attrs={"a": 1})
+ self.assertEqual(node.attrs["a"], 1)
+ node.attrs["b"] = 8
+ self.assertEqual(node.attrs["b"], 8)
+ node.attrs["b"] = 2
+ self.assertEqual(node.attrs["b"], 2)
+
+ def test_node_readonly_attrs(self):
+ f = commonh5.File(name="Foo", mode="r")
+ node = commonh5.Node("Foo", attrs={"a": 1})
+ node.attrs["b"] = 8
+ f.add_node(node)
+ self.assertEqual(node.attrs["b"], 8)
+ try:
+ node.attrs["b"] = 1
+ self.fail()
+ except RuntimeError:
+ pass
+
+ def test_create_dataset(self):
+ f = commonh5.File(name="Foo", mode="w")
+ node = f.create_dataset("foo", data=numpy.array([1]))
+ self.assertIs(node.parent, f)
+ self.assertIs(f["foo"], node)
+
+ def test_create_group(self):
+ f = commonh5.File(name="Foo", mode="w")
+ node = f.create_group("foo")
+ self.assertIs(node.parent, f)
+ self.assertIs(f["foo"], node)
+
+ def test_readonly_create_dataset(self):
+ f = commonh5.File(name="Foo", mode="r")
+ try:
+ f.create_dataset("foo", data=numpy.array([1]))
+ self.fail()
+ except RuntimeError:
+ pass
+
+ def test_readonly_create_group(self):
+ f = commonh5.File(name="Foo", mode="r")
+ try:
+ f.create_group("foo")
+ self.fail()
+ except RuntimeError:
+ pass
+
+ def test_create_unicode_dataset(self):
+ f = commonh5.File(name="Foo", mode="w")
+ try:
+ f.create_dataset("foo", data=numpy.array(u"aaaa"))
+ self.fail()
+ except TypeError:
+ pass
+
+ def test_setitem_dataset(self):
+ self.h5 = commonh5.File(name="Foo", mode="w")
+ group = self.h5.create_group(self.id())
+ group["a"] = commonh5.Dataset(None, data=numpy.array(10))
+ self.assertEqual(group["a"].dtype.kind, "i")
+
+ def test_setitem_explicit_link(self):
+ self.h5 = commonh5.File(name="Foo", mode="w")
+ group = self.h5.create_group(self.id())
+ group["a"] = 10
+ group["b"] = commonh5.SoftLink(None, path="/" + self.id() + "/a")
+ self.assertEqual(group["b"].dtype.kind, "i")
diff --git a/src/silx/io/test/test_dictdump.py b/src/silx/io/test/test_dictdump.py
new file mode 100644
index 0000000..4cafa9b
--- /dev/null
+++ b/src/silx/io/test/test_dictdump.py
@@ -0,0 +1,1009 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2021 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for dicttoh5 module"""
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "17/01/2018"
+
+from collections import OrderedDict
+import numpy
+import os
+import tempfile
+import unittest
+import h5py
+from copy import deepcopy
+
+from collections import defaultdict
+
+from silx.utils.testutils import LoggingValidator
+
+from ..configdict import ConfigDict
+from .. import dictdump
+from ..dictdump import dicttoh5, dicttojson, dump
+from ..dictdump import h5todict, load
+from ..dictdump import logger as dictdump_logger
+from ..utils import is_link
+from ..utils import h5py_read_dataset
+
+
+def tree():
+ """Tree data structure as a recursive nested dictionary"""
+ return defaultdict(tree)
+
+
+inhabitants = 160215
+
+city_attrs = tree()
+city_attrs["Europe"]["France"]["Grenoble"]["area"] = "18.44 km2"
+city_attrs["Europe"]["France"]["Grenoble"]["inhabitants"] = inhabitants
+city_attrs["Europe"]["France"]["Grenoble"]["coordinates"] = [45.1830, 5.7196]
+city_attrs["Europe"]["France"]["Tourcoing"]["area"]
+
+ext_attrs = tree()
+ext_attrs["ext_group"]["dataset"] = 10
+ext_filename = "ext.h5"
+
+link_attrs = tree()
+link_attrs["links"]["group"]["dataset"] = 10
+link_attrs["links"]["group"]["relative_softlink"] = h5py.SoftLink("dataset")
+link_attrs["links"]["relative_softlink"] = h5py.SoftLink("group/dataset")
+link_attrs["links"]["absolute_softlink"] = h5py.SoftLink("/links/group/dataset")
+link_attrs["links"]["external_link"] = h5py.ExternalLink(ext_filename, "/ext_group/dataset")
+
+
+class DictTestCase(unittest.TestCase):
+
+ def assertRecursiveEqual(self, expected, actual, nodes=tuple()):
+ err_msg = "\n\n Tree nodes: {}".format(nodes)
+ if isinstance(expected, dict):
+ self.assertTrue(isinstance(actual, dict), msg=err_msg)
+ self.assertEqual(
+ set(expected.keys()),
+ set(actual.keys()),
+ msg=err_msg
+ )
+ for k in actual:
+ self.assertRecursiveEqual(
+ expected[k],
+ actual[k],
+ nodes=nodes + (k,),
+ )
+ return
+ if isinstance(actual, numpy.ndarray):
+ actual = actual.tolist()
+ if isinstance(expected, numpy.ndarray):
+ expected = expected.tolist()
+
+ self.assertEqual(expected, actual, msg=err_msg)
+
+
+class H5DictTestCase(DictTestCase):
+
+ def _dictRoundTripNormalize(self, treedict):
+ """Convert the dictionary as expected from a round-trip
+ treedict -> dicttoh5 -> h5todict -> newtreedict
+ """
+ for key, value in list(treedict.items()):
+ if isinstance(value, dict):
+ self._dictRoundTripNormalize(value)
+
+ # Expand treedict[("group", "attr_name")]
+ # to treedict["group"]["attr_name"]
+ for key, value in list(treedict.items()):
+ if not isinstance(key, tuple):
+ continue
+ # Put the attribute inside the group
+ grpname, attr = key
+ if not grpname:
+ continue
+ group = treedict.setdefault(grpname, dict())
+ if isinstance(group, dict):
+ del treedict[key]
+ group[("", attr)] = value
+
+ def dictRoundTripNormalize(self, treedict):
+ treedict2 = deepcopy(treedict)
+ self._dictRoundTripNormalize(treedict2)
+ return treedict2
+
+
+class TestDictToH5(H5DictTestCase):
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, ext_filename)
+
+ def tearDown(self):
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
+ os.rmdir(self.tempdir)
+
+ def testH5CityAttrs(self):
+ filters = {'shuffle': True,
+ 'fletcher32': True}
+ dicttoh5(city_attrs, self.h5_fname, h5path='/city attributes',
+ mode="w", create_dataset_args=filters)
+
+ h5f = h5py.File(self.h5_fname, mode='r')
+
+ self.assertIn("Tourcoing/area", h5f["/city attributes/Europe/France"])
+ ds = h5f["/city attributes/Europe/France/Grenoble/inhabitants"]
+ self.assertEqual(ds[...], 160215)
+
+ # filters only apply to datasets that are not scalars (shape != () )
+ ds = h5f["/city attributes/Europe/France/Grenoble/coordinates"]
+ #self.assertEqual(ds.compression, "gzip")
+ self.assertTrue(ds.fletcher32)
+ self.assertTrue(ds.shuffle)
+
+ h5f.close()
+
+ ddict = load(self.h5_fname, fmat="hdf5")
+ self.assertAlmostEqual(
+ min(ddict["city attributes"]["Europe"]["France"]["Grenoble"]["coordinates"]),
+ 5.7196)
+
+ def testH5OverwriteDeprecatedApi(self):
+ dd = ConfigDict({'t': True})
+
+ dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a')
+ dd = ConfigDict({'t': False})
+ dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a',
+ overwrite_data=False)
+
+ res = h5todict(self.h5_fname)
+ assert(res['t'] == True)
+
+ dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a',
+ overwrite_data=True)
+
+ res = h5todict(self.h5_fname)
+ assert(res['t'] == False)
+
+ def testAttributes(self):
+ """Any kind of attribute can be described"""
+ ddict = {
+ "group": {"datatset": "hmmm", ("", "group_attr"): 10},
+ "dataset": "aaaaaaaaaaaaaaa",
+ ("", "root_attr"): 11,
+ ("dataset", "dataset_attr"): 12,
+ ("group", "group_attr2"): 13,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ self.assertEqual(h5file["group"].attrs['group_attr'], 10)
+ self.assertEqual(h5file.attrs['root_attr'], 11)
+ self.assertEqual(h5file["dataset"].attrs['dataset_attr'], 12)
+ self.assertEqual(h5file["group"].attrs['group_attr2'], 13)
+
+ def testPathAttributes(self):
+ """A group is requested at a path"""
+ ddict = {
+ ("", "NX_class"): 'NXcollection',
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ # This should not warn
+ with LoggingValidator(dictdump_logger, warning=0):
+ dictdump.dicttoh5(ddict, h5file, h5path="foo/bar")
+
+ def testKeyOrder(self):
+ ddict1 = {
+ "d": "plow",
+ ("d", "a"): "ox",
+ }
+ ddict2 = {
+ ("d", "a"): "ox",
+ "d": "plow",
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict1, h5file, h5path="g1")
+ dictdump.dicttoh5(ddict2, h5file, h5path="g2")
+ self.assertEqual(h5file["g1/d"].attrs['a'], "ox")
+ self.assertEqual(h5file["g2/d"].attrs['a'], "ox")
+
+ def testAttributeValues(self):
+ """Any NX data types can be used"""
+ ddict = {
+ ("", "bool"): True,
+ ("", "int"): 11,
+ ("", "float"): 1.1,
+ ("", "str"): "a",
+ ("", "boollist"): [True, False, True],
+ ("", "intlist"): [11, 22, 33],
+ ("", "floatlist"): [1.1, 2.2, 3.3],
+ ("", "strlist"): ["a", "bb", "ccc"],
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ for k, expected in ddict.items():
+ result = h5file.attrs[k[1]]
+ if isinstance(expected, list):
+ if isinstance(expected[0], str):
+ numpy.testing.assert_array_equal(result, expected)
+ else:
+ numpy.testing.assert_array_almost_equal(result, expected)
+ else:
+ self.assertEqual(result, expected)
+
+ def testAttributeAlreadyExists(self):
+ """A duplicated attribute warns if overwriting is not enabled"""
+ ddict = {
+ "group": {"dataset": "hmmm", ("", "attr"): 10},
+ ("group", "attr"): 10,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ self.assertEqual(h5file["group"].attrs['attr'], 10)
+
+ def testFlatDict(self):
+ """Description of a tree with a single level of keys"""
+ ddict = {
+ "group/group/dataset": 10,
+ ("group/group/dataset", "attr"): 11,
+ ("group/group", "attr"): 12,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ self.assertEqual(h5file["group/group/dataset"][()], 10)
+ self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11)
+ self.assertEqual(h5file["group/group"].attrs['attr'], 12)
+
+ def testLinks(self):
+ with h5py.File(self.h5_ext_fname, "w") as h5file:
+ dictdump.dicttoh5(ext_attrs, h5file)
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(link_attrs, h5file)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ self.assertEqual(h5file["links/group/dataset"][()], 10)
+ self.assertEqual(h5file["links/group/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/absolute_softlink"][()], 10)
+ self.assertEqual(h5file["links/external_link"][()], 10)
+
+ def testDumpNumpyArray(self):
+ ddict = {
+ 'darks': {
+ '0': numpy.array([[0, 0, 0], [0, 0, 0]], dtype=numpy.uint16)
+ }
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ numpy.testing.assert_array_equal(h5py_read_dataset(h5file["darks"]["0"]),
+ ddict['darks']['0'])
+
+ def testOverwrite(self):
+ # Tree structure that will be tested
+ group1 = {
+ ("", "attr2"): "original2",
+ "dset1": 0,
+ "dset2": [0, 1],
+ ("dset1", "attr1"): "original1",
+ ("dset1", "attr2"): "original2",
+ ("dset2", "attr1"): "original1",
+ ("dset2", "attr2"): "original2",
+ }
+ group2 = {
+ "subgroup1": group1.copy(),
+ "subgroup2": group1.copy(),
+ ("subgroup1", "attr1"): "original1",
+ ("subgroup2", "attr1"): "original1"
+ }
+ group2.update(group1)
+ # initial HDF5 tree
+ otreedict = {
+ ('', 'attr1'): "original1",
+ ('', 'attr2'): "original2",
+ 'group1': group1,
+ 'group2': group2,
+ ('group1', 'attr1'): "original1",
+ ('group2', 'attr1'): "original1"
+ }
+ wtreedict = None # dumped dictionary
+ etreedict = None # expected HDF5 tree after dump
+
+ def reset_file():
+ dicttoh5(
+ otreedict,
+ h5file=self.h5_fname,
+ mode="w",
+ )
+
+ def append_file(update_mode):
+ dicttoh5(
+ wtreedict,
+ h5file=self.h5_fname,
+ mode="a",
+ update_mode=update_mode
+ )
+
+ def assert_file():
+ rtreedict = h5todict(
+ self.h5_fname,
+ include_attributes=True,
+ asarray=False
+ )
+ netreedict = self.dictRoundTripNormalize(etreedict)
+ try:
+ self.assertRecursiveEqual(netreedict, rtreedict)
+ except AssertionError:
+ from pprint import pprint
+ print("\nDUMP:")
+ pprint(wtreedict)
+ print("\nEXPECTED:")
+ pprint(netreedict)
+ print("\nHDF5:")
+ pprint(rtreedict)
+ raise
+
+ def assert_append(update_mode):
+ append_file(update_mode)
+ assert_file()
+
+ # Test wrong arguments
+ with self.assertRaises(ValueError):
+ dicttoh5(
+ otreedict,
+ h5file=self.h5_fname,
+ mode="w",
+ update_mode="wrong-value"
+ )
+
+ # No writing
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ assert_file()
+
+ # Write identical dictionary
+ wtreedict = deepcopy(otreedict)
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add", "modify", "replace"]:
+ assert_append(update_mode)
+
+ # Write empty dictionary
+ wtreedict = dict()
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add", "modify", "replace"]:
+ assert_append(update_mode)
+
+ # Modified dataset
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]}
+ wtreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ etreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ etreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]}
+ etreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
+ assert_append("replace")
+
+ # Modified group
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = [0, 1]
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add", "modify"]:
+ assert_append(update_mode)
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = [0, 1]
+ assert_append("replace")
+
+ # Modified attribute
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified"
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ etreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified"
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ etreedict["group2"]["subgroup2"]["dset1"] = dict()
+ etreedict["group2"]["subgroup2"]["dset1"][("", "attr1")] = "modified"
+ assert_append("replace")
+
+ # Delete group
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = None
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ del etreedict["group2"]["subgroup2"]
+ del etreedict["group2"][("subgroup2", "attr1")]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ assert_append("replace")
+
+ # Delete dataset
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"]["dset2"] = None
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ del etreedict["group2"]["subgroup2"]["dset2"]
+ del etreedict["group2"]["subgroup2"][("dset2", "attr1")]
+ del etreedict["group2"]["subgroup2"][("dset2", "attr2")]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ assert_append("replace")
+
+ # Delete attribute
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"][("dset2", "attr1")] = None
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ del etreedict["group2"]["subgroup2"][("dset2", "attr1")]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ etreedict["group2"]["subgroup2"]["dset2"] = dict()
+ assert_append("replace")
+
+
+class TestH5ToDict(H5DictTestCase):
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, ext_filename)
+ dicttoh5(city_attrs, self.h5_fname)
+ dicttoh5(link_attrs, self.h5_fname, mode="a")
+ dicttoh5(ext_attrs, self.h5_ext_fname)
+
+ def tearDown(self):
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
+ os.rmdir(self.tempdir)
+
+ def testExcludeNames(self):
+ ddict = h5todict(self.h5_fname, path="/Europe/France",
+ exclude_names=["ourcoing", "inhab", "toto"])
+ self.assertNotIn("Tourcoing", ddict)
+ self.assertIn("Grenoble", ddict)
+
+ self.assertNotIn("inhabitants", ddict["Grenoble"])
+ self.assertIn("coordinates", ddict["Grenoble"])
+ self.assertIn("area", ddict["Grenoble"])
+
+ def testAsArrayTrue(self):
+ """Test with asarray=True, the default"""
+ ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble")
+ self.assertTrue(numpy.array_equal(ddict["inhabitants"], numpy.array(inhabitants)))
+
+ def testAsArrayFalse(self):
+ """Test with asarray=False"""
+ ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble", asarray=False)
+ self.assertEqual(ddict["inhabitants"], inhabitants)
+
+ def testDereferenceLinks(self):
+ ddict = h5todict(self.h5_fname, path="links", dereference_links=True)
+ self.assertTrue(ddict["absolute_softlink"], 10)
+ self.assertTrue(ddict["relative_softlink"], 10)
+ self.assertTrue(ddict["external_link"], 10)
+ self.assertTrue(ddict["group"]["relative_softlink"], 10)
+
+ def testPreserveLinks(self):
+ ddict = h5todict(self.h5_fname, path="links", dereference_links=False)
+ self.assertTrue(is_link(ddict["absolute_softlink"]))
+ self.assertTrue(is_link(ddict["relative_softlink"]))
+ self.assertTrue(is_link(ddict["external_link"]))
+ self.assertTrue(is_link(ddict["group"]["relative_softlink"]))
+
+ def testStrings(self):
+ ddict = {"dset_bytes": b"bytes",
+ "dset_utf8": "utf8",
+ "dset_2bytes": [b"bytes", b"bytes"],
+ "dset_2utf8": ["utf8", "utf8"],
+ ("", "attr_bytes"): b"bytes",
+ ("", "attr_utf8"): "utf8",
+ ("", "attr_2bytes"): [b"bytes", b"bytes"],
+ ("", "attr_2utf8"): ["utf8", "utf8"]}
+ dicttoh5(ddict, self.h5_fname, mode="w")
+ adict = h5todict(self.h5_fname, include_attributes=True, asarray=False)
+ self.assertEqual(ddict["dset_bytes"], adict["dset_bytes"])
+ self.assertEqual(ddict["dset_utf8"], adict["dset_utf8"])
+ self.assertEqual(ddict[("", "attr_bytes")], adict[("", "attr_bytes")])
+ self.assertEqual(ddict[("", "attr_utf8")], adict[("", "attr_utf8")])
+ numpy.testing.assert_array_equal(ddict["dset_2bytes"], adict["dset_2bytes"])
+ numpy.testing.assert_array_equal(ddict["dset_2utf8"], adict["dset_2utf8"])
+ numpy.testing.assert_array_equal(ddict[("", "attr_2bytes")], adict[("", "attr_2bytes")])
+ numpy.testing.assert_array_equal(ddict[("", "attr_2utf8")], adict[("", "attr_2utf8")])
+
+
+class TestDictToNx(H5DictTestCase):
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.h5_fname = os.path.join(self.tempdir, "nx.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5")
+
+ def tearDown(self):
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
+ os.rmdir(self.tempdir)
+
+ def testAttributes(self):
+ """Any kind of attribute can be described"""
+ ddict = {
+ "group": {"dataset": 100, "@group_attr1": 10},
+ "dataset": 200,
+ "@root_attr": 11,
+ "dataset@dataset_attr": "12",
+ "group@group_attr2": 13,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttonx(ddict, h5file)
+ self.assertEqual(h5file["group"].attrs['group_attr1'], 10)
+ self.assertEqual(h5file.attrs['root_attr'], 11)
+ self.assertEqual(h5file["dataset"].attrs['dataset_attr'], "12")
+ self.assertEqual(h5file["group"].attrs['group_attr2'], 13)
+
+ def testKeyOrder(self):
+ ddict1 = {
+ "d": "plow",
+ "d@a": "ox",
+ }
+ ddict2 = {
+ "d@a": "ox",
+ "d": "plow",
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttonx(ddict1, h5file, h5path="g1")
+ dictdump.dicttonx(ddict2, h5file, h5path="g2")
+ self.assertEqual(h5file["g1/d"].attrs['a'], "ox")
+ self.assertEqual(h5file["g2/d"].attrs['a'], "ox")
+
+ def testAttributeValues(self):
+ """Any NX data types can be used"""
+ ddict = {
+ "@bool": True,
+ "@int": 11,
+ "@float": 1.1,
+ "@str": "a",
+ "@boollist": [True, False, True],
+ "@intlist": [11, 22, 33],
+ "@floatlist": [1.1, 2.2, 3.3],
+ "@strlist": ["a", "bb", "ccc"],
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttonx(ddict, h5file)
+ for k, expected in ddict.items():
+ result = h5file.attrs[k[1:]]
+ if isinstance(expected, list):
+ if isinstance(expected[0], str):
+ numpy.testing.assert_array_equal(result, expected)
+ else:
+ numpy.testing.assert_array_almost_equal(result, expected)
+ else:
+ self.assertEqual(result, expected)
+
+ def testFlatDict(self):
+ """Description of a tree with a single level of keys"""
+ ddict = {
+ "group/group/dataset": 10,
+ "group/group/dataset@attr": 11,
+ "group/group@attr": 12,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttonx(ddict, h5file)
+ self.assertEqual(h5file["group/group/dataset"][()], 10)
+ self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11)
+ self.assertEqual(h5file["group/group"].attrs['attr'], 12)
+
+ def testLinks(self):
+ ddict = {"ext_group": {"dataset": 10}}
+ dictdump.dicttonx(ddict, self.h5_ext_fname)
+ ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"},
+ ">relative_softlink": "group/dataset",
+ ">absolute_softlink": "/links/group/dataset",
+ ">external_link": "nx_ext.h5::/ext_group/dataset"}}
+ dictdump.dicttonx(ddict, self.h5_fname)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ self.assertEqual(h5file["links/group/dataset"][()], 10)
+ self.assertEqual(h5file["links/group/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/absolute_softlink"][()], 10)
+ self.assertEqual(h5file["links/external_link"][()], 10)
+
+ def testUpLinks(self):
+ ddict = {"data": {"group": {"dataset": 10, ">relative_softlink": "dataset"}},
+ "links": {"group": {"subgroup": {">relative_softlink": "../../../data/group/dataset"}}}}
+ dictdump.dicttonx(ddict, self.h5_fname)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ self.assertEqual(h5file["/links/group/subgroup/relative_softlink"][()], 10)
+
+ def testOverwrite(self):
+ entry_name = "entry"
+ wtreedict = {
+ "group1": {"a": 1, "b": 2},
+ "group2@attr3": "attr3",
+ "group2@attr4": "attr4",
+ "group2": {
+ "@attr1": "attr1",
+ "@attr2": "attr2",
+ "c": 3,
+ "d": 4,
+ "dataset4": 8,
+ "dataset4@units": "keV",
+ },
+ "group3": {"subgroup": {"e": 9, "f": 10}},
+ "dataset1": 5,
+ "dataset2": 6,
+ "dataset3": 7,
+ "dataset3@units": "mm",
+ }
+ esubtree = {
+ "@NX_class": "NXentry",
+ "group1": {"@NX_class": "NXcollection", "a": 1, "b": 2},
+ "group2": {
+ "@NX_class": "NXcollection",
+ "@attr1": "attr1",
+ "@attr2": "attr2",
+ "@attr3": "attr3",
+ "@attr4": "attr4",
+ "c": 3,
+ "d": 4,
+ "dataset4": 8,
+ "dataset4@units": "keV",
+ },
+ "group3": {
+ "@NX_class": "NXcollection",
+ "subgroup": {"@NX_class": "NXcollection", "e": 9, "f": 10},
+ },
+ "dataset1": 5,
+ "dataset2": 6,
+ "dataset3": 7,
+ "dataset3@units": "mm",
+ }
+ etreedict = {entry_name: esubtree}
+
+ def append_file(update_mode, add_nx_class):
+ dictdump.dicttonx(
+ wtreedict,
+ h5file=self.h5_fname,
+ mode="a",
+ h5path=entry_name,
+ update_mode=update_mode,
+ add_nx_class=add_nx_class
+ )
+
+ def assert_file():
+ rtreedict = dictdump.nxtodict(
+ self.h5_fname,
+ include_attributes=True,
+ asarray=False,
+ )
+ netreedict = self.dictRoundTripNormalize(etreedict)
+ try:
+ self.assertRecursiveEqual(netreedict, rtreedict)
+ except AssertionError:
+ from pprint import pprint
+ print("\nDUMP:")
+ pprint(wtreedict)
+ print("\nEXPECTED:")
+ pprint(netreedict)
+ print("\nHDF5:")
+ pprint(rtreedict)
+ raise
+
+ def assert_append(update_mode, add_nx_class=None):
+ append_file(update_mode, add_nx_class=add_nx_class)
+ assert_file()
+
+ # First to an empty file
+ assert_append(None)
+
+ # Add non-existing attributes/datasets/groups
+ wtreedict["group1"].pop("a")
+ wtreedict["group2"].pop("@attr1")
+ wtreedict["group2"]["@attr2"] = "attr3" # only for update
+ wtreedict["group2"]["@type"] = "test"
+ wtreedict["group2"]["dataset4"] = 9 # only for update
+ del wtreedict["group2"]["dataset4@units"]
+ wtreedict["group3"] = {}
+ esubtree["group2"]["@type"] = "test"
+ assert_append("add")
+
+ # Add update existing attributes and datasets
+ esubtree["group2"]["@attr2"] = "attr3"
+ esubtree["group2"]["dataset4"] = 9
+ assert_append("modify")
+
+ # Do not add missing NX_class by default when updating
+ wtreedict["group2"]["@NX_class"] = "NXprocess"
+ esubtree["group2"]["@NX_class"] = "NXprocess"
+ assert_append("modify")
+ del wtreedict["group2"]["@NX_class"]
+ assert_append("modify")
+
+ # Overwrite existing groups/datasets/attributes
+ esubtree["group1"].pop("a")
+ esubtree["group2"].pop("@attr1")
+ esubtree["group2"]["@NX_class"] = "NXcollection"
+ esubtree["group2"]["dataset4"] = 9
+ del esubtree["group2"]["dataset4@units"]
+ esubtree["group3"] = {"@NX_class": "NXcollection"}
+ assert_append("replace", add_nx_class=True)
+
+
+class TestNxToDict(H5DictTestCase):
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.h5_fname = os.path.join(self.tempdir, "nx.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5")
+
+ def tearDown(self):
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
+ os.rmdir(self.tempdir)
+
+ def testAttributes(self):
+ """Any kind of attribute can be described"""
+ ddict = {
+ "group": {"dataset": 100, "@group_attr1": 10},
+ "dataset": 200,
+ "@root_attr": 11,
+ "dataset@dataset_attr": "12",
+ "group@group_attr2": 13,
+ }
+ dictdump.dicttonx(ddict, self.h5_fname)
+ ddict = dictdump.nxtodict(self.h5_fname, include_attributes=True)
+ self.assertEqual(ddict["group"]["@group_attr1"], 10)
+ self.assertEqual(ddict["@root_attr"], 11)
+ self.assertEqual(ddict["dataset@dataset_attr"], "12")
+ self.assertEqual(ddict["group"]["@group_attr2"], 13)
+
+ def testDereferenceLinks(self):
+ """Write links and dereference on read"""
+ ddict = {"ext_group": {"dataset": 10}}
+ dictdump.dicttonx(ddict, self.h5_ext_fname)
+ ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"},
+ ">relative_softlink": "group/dataset",
+ ">absolute_softlink": "/links/group/dataset",
+ ">external_link": "nx_ext.h5::/ext_group/dataset"}}
+ dictdump.dicttonx(ddict, self.h5_fname)
+
+ ddict = dictdump.h5todict(self.h5_fname, dereference_links=True)
+ self.assertTrue(ddict["links"]["absolute_softlink"], 10)
+ self.assertTrue(ddict["links"]["relative_softlink"], 10)
+ self.assertTrue(ddict["links"]["external_link"], 10)
+ self.assertTrue(ddict["links"]["group"]["relative_softlink"], 10)
+
+ def testPreserveLinks(self):
+ """Write/read links"""
+ ddict = {"ext_group": {"dataset": 10}}
+ dictdump.dicttonx(ddict, self.h5_ext_fname)
+ ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"},
+ ">relative_softlink": "group/dataset",
+ ">absolute_softlink": "/links/group/dataset",
+ ">external_link": "nx_ext.h5::/ext_group/dataset"}}
+ dictdump.dicttonx(ddict, self.h5_fname)
+
+ ddict = dictdump.nxtodict(self.h5_fname, dereference_links=False)
+ self.assertTrue(ddict["links"][">absolute_softlink"], "dataset")
+ self.assertTrue(ddict["links"][">relative_softlink"], "group/dataset")
+ self.assertTrue(ddict["links"][">external_link"], "/links/group/dataset")
+ self.assertTrue(ddict["links"]["group"][">relative_softlink"], "nx_ext.h5::/ext_group/datase")
+
+ def testNotExistingPath(self):
+ """Test converting not existing path"""
+ with h5py.File(self.h5_fname, 'a') as f:
+ f['data'] = 1
+
+ ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='ignore')
+ self.assertFalse(ddict)
+
+ with LoggingValidator(dictdump_logger, error=1):
+ ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='log')
+ self.assertFalse(ddict)
+
+ with self.assertRaises(KeyError):
+ h5todict(self.h5_fname, path="/I/am/not/a/path", errors='raise')
+
+ def testBrokenLinks(self):
+ """Test with broken links"""
+ with h5py.File(self.h5_fname, 'a') as f:
+ f["/Mars/BrokenSoftLink"] = h5py.SoftLink("/Idontexists")
+ f["/Mars/BrokenExternalLink"] = h5py.ExternalLink("notexistingfile.h5", "/Idontexists")
+
+ ddict = h5todict(self.h5_fname, path="/Mars", errors='ignore')
+ self.assertFalse(ddict)
+
+ with LoggingValidator(dictdump_logger, error=2):
+ ddict = h5todict(self.h5_fname, path="/Mars", errors='log')
+ self.assertFalse(ddict)
+
+ with self.assertRaises(KeyError):
+ h5todict(self.h5_fname, path="/Mars", errors='raise')
+
+
+class TestDictToJson(DictTestCase):
+ def setUp(self):
+ self.dir_path = tempfile.mkdtemp()
+ self.json_fname = os.path.join(self.dir_path, "cityattrs.json")
+
+ def tearDown(self):
+ os.unlink(self.json_fname)
+ os.rmdir(self.dir_path)
+
+ def testJsonCityAttrs(self):
+ self.json_fname = os.path.join(self.dir_path, "cityattrs.json")
+ dicttojson(city_attrs, self.json_fname, indent=3)
+
+ with open(self.json_fname, "r") as f:
+ json_content = f.read()
+ self.assertIn('"inhabitants": 160215', json_content)
+
+
+class TestDictToIni(DictTestCase):
+ def setUp(self):
+ self.dir_path = tempfile.mkdtemp()
+ self.ini_fname = os.path.join(self.dir_path, "test.ini")
+
+ def tearDown(self):
+ os.unlink(self.ini_fname)
+ os.rmdir(self.dir_path)
+
+ def testConfigDictIO(self):
+ """Ensure values and types of data is preserved when dictionary is
+ written to file and read back."""
+ testdict = {
+ 'simple_types': {
+ 'float': 1.0,
+ 'int': 1,
+ 'percent string': '5 % is too much',
+ 'backslash string': 'i can use \\',
+ 'empty_string': '',
+ 'nonestring': 'None',
+ 'nonetype': None,
+ 'interpstring': 'interpolation: %(percent string)s',
+ },
+ 'containers': {
+ 'list': [-1, 'string', 3.0, False, None],
+ 'array': numpy.array([1.0, 2.0, 3.0]),
+ 'dict': {
+ 'key1': 'Hello World',
+ 'key2': 2.0,
+ }
+ }
+ }
+
+ dump(testdict, self.ini_fname)
+
+ #read the data back
+ readdict = load(self.ini_fname)
+
+ testdictkeys = list(testdict.keys())
+ readkeys = list(readdict.keys())
+
+ self.assertTrue(len(readkeys) == len(testdictkeys),
+ "Number of read keys not equal")
+
+ self.assertEqual(readdict['simple_types']["interpstring"],
+ "interpolation: 5 % is too much")
+
+ testdict['simple_types']["interpstring"] = "interpolation: 5 % is too much"
+
+ for key in testdict["simple_types"]:
+ original = testdict['simple_types'][key]
+ read = readdict['simple_types'][key]
+ self.assertEqual(read, original,
+ "Read <%s> instead of <%s>" % (read, original))
+
+ for key in testdict["containers"]:
+ original = testdict["containers"][key]
+ read = readdict["containers"][key]
+ if key == 'array':
+ self.assertEqual(read.all(), original.all(),
+ "Read <%s> instead of <%s>" % (read, original))
+ else:
+ self.assertEqual(read, original,
+ "Read <%s> instead of <%s>" % (read, original))
+
+ def testConfigDictOrder(self):
+ """Ensure order is preserved when dictionary is
+ written to file and read back."""
+ test_dict = {'banana': 3, 'apple': 4, 'pear': 1, 'orange': 2}
+ # sort by key
+ test_ordered_dict1 = OrderedDict(sorted(test_dict.items(),
+ key=lambda t: t[0]))
+ # sort by value
+ test_ordered_dict2 = OrderedDict(sorted(test_dict.items(),
+ key=lambda t: t[1]))
+ # add the two ordered dict as sections of a third ordered dict
+ test_ordered_dict3 = OrderedDict()
+ test_ordered_dict3["section1"] = test_ordered_dict1
+ test_ordered_dict3["section2"] = test_ordered_dict2
+
+ # write to ini and read back as a ConfigDict (inherits OrderedDict)
+ dump(test_ordered_dict3,
+ self.ini_fname, fmat="ini")
+ read_instance = ConfigDict()
+ read_instance.read(self.ini_fname)
+
+ # loop through original and read-back dictionaries,
+ # test identical order for key/value pairs
+ for orig_key, section in zip(test_ordered_dict3.keys(),
+ read_instance.keys()):
+ self.assertEqual(orig_key, section)
+ for orig_key2, read_key in zip(test_ordered_dict3[section].keys(),
+ read_instance[section].keys()):
+ self.assertEqual(orig_key2, read_key)
+ self.assertEqual(test_ordered_dict3[section][orig_key2],
+ read_instance[section][read_key])
diff --git a/src/silx/io/test/test_fabioh5.py b/src/silx/io/test/test_fabioh5.py
new file mode 100755
index 0000000..c410024
--- /dev/null
+++ b/src/silx/io/test/test_fabioh5.py
@@ -0,0 +1,615 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2018 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for fabioh5 wrapper"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "02/07/2018"
+
+import os
+import logging
+import numpy
+import unittest
+import tempfile
+import shutil
+
+_logger = logging.getLogger(__name__)
+
+import fabio
+import h5py
+
+from .. import commonh5
+from .. import fabioh5
+
+
+class TestFabioH5(unittest.TestCase):
+
+ def setUp(self):
+
+ header = {
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ }
+ data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ self.fabio_image = fabio.numpyimage.NumpyImage(data, header)
+ self.h5_image = fabioh5.File(fabio_image=self.fabio_image)
+
+ def test_main_groups(self):
+ self.assertEqual(self.h5_image.h5py_class, h5py.File)
+ self.assertEqual(self.h5_image["/"].h5py_class, h5py.File)
+ self.assertEqual(self.h5_image["/scan_0"].h5py_class, h5py.Group)
+ self.assertEqual(self.h5_image["/scan_0/instrument"].h5py_class, h5py.Group)
+ self.assertEqual(self.h5_image["/scan_0/measurement"].h5py_class, h5py.Group)
+
+ def test_wrong_path_syntax(self):
+ # result tested with a default h5py file
+ self.assertRaises(ValueError, lambda: self.h5_image[""])
+
+ def test_wrong_root_name(self):
+ # result tested with a default h5py file
+ self.assertRaises(KeyError, lambda: self.h5_image["/foo"])
+
+ def test_wrong_root_path(self):
+ # result tested with a default h5py file
+ self.assertRaises(KeyError, lambda: self.h5_image["/foo/foo"])
+
+ def test_wrong_name(self):
+ # result tested with a default h5py file
+ self.assertRaises(KeyError, lambda: self.h5_image["foo"])
+
+ def test_wrong_path(self):
+ # result tested with a default h5py file
+ self.assertRaises(KeyError, lambda: self.h5_image["foo/foo"])
+
+ def test_single_frame(self):
+ data = numpy.arange(2 * 3)
+ data.shape = 2, 3
+ fabio_image = fabio.edfimage.edfimage(data=data)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (2, 3))
+ self.assertEqual(dataset[...][0, 0], 0)
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+
+ def test_multi_frames(self):
+ data = numpy.arange(2 * 3)
+ data.shape = 2, 3
+ fabio_image = fabio.edfimage.edfimage(data=data)
+ fabio_image.append_frame(data=data)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (2, 2, 3))
+ self.assertEqual(dataset[...][0, 0, 0], 0)
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+
+ def test_heterogeneous_frames(self):
+ """Frames containing 2 images with different sizes and a cube"""
+ data1 = numpy.arange(2 * 3)
+ data1.shape = 2, 3
+ data2 = numpy.arange(2 * 5)
+ data2.shape = 2, 5
+ data3 = numpy.arange(2 * 5 * 1)
+ data3.shape = 2, 5, 1
+ fabio_image = fabio.edfimage.edfimage(data=data1)
+ fabio_image.append_frame(data=data2)
+ fabio_image.append_frame(data=data3)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (3, 2, 5, 1))
+ self.assertEqual(dataset[...][0, 0, 0], 0)
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+
+ def test_single_3d_frame(self):
+ """Image source contains a cube"""
+ data = numpy.arange(2 * 3 * 4)
+ data.shape = 2, 3, 4
+ # Do not provide the data to the constructor to avoid slicing of the
+ # data. In this way the result stay a cube, and not a multi-frame
+ fabio_image = fabio.edfimage.edfimage()
+ fabio_image.data = data
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (2, 3, 4))
+ self.assertEqual(dataset[...][0, 0, 0], 0)
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+
+ def test_metadata_int(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/integer"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset[()], -100)
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (1,))
+
+ def test_metadata_float(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/float"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset[()], 1.0)
+ self.assertEqual(dataset.dtype.kind, "f")
+ self.assertEqual(dataset.shape, (1,))
+
+ def test_metadata_string(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/string"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset[()], numpy.string_("hi!"))
+ self.assertEqual(dataset.dtype.type, numpy.string_)
+ self.assertEqual(dataset.shape, (1,))
+
+ def test_metadata_list_integer(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_integer"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset.dtype.kind, "u")
+ self.assertEqual(dataset.shape, (1, 3))
+ self.assertEqual(dataset[0, 0], 100)
+ self.assertEqual(dataset[0, 1], 50)
+
+ def test_metadata_list_float(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_float"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset.dtype.kind, "f")
+ self.assertEqual(dataset.shape, (1, 3))
+ self.assertEqual(dataset[0, 0], 1.0)
+ self.assertEqual(dataset[0, 1], 2.0)
+
+ def test_metadata_list_looks_like_list(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/string_looks_like_list"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset[()], numpy.string_("2000 hi!"))
+ self.assertEqual(dataset.dtype.type, numpy.string_)
+ self.assertEqual(dataset.shape, (1,))
+
+ def test_float_32(self):
+ float_list = [u'1.2', u'1.3', u'1.4']
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"float_item": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
+ # There is no equality between items
+ self.assertEqual(len(data), len(set(data)))
+ # At worst a float32
+ self.assertIn(data.dtype.kind, ['d', 'f'])
+ self.assertLessEqual(data.dtype.itemsize, 32 / 8)
+
+ def test_float_64(self):
+ float_list = [
+ u'1469117129.082226',
+ u'1469117136.684986', u'1469117144.312749', u'1469117151.892507',
+ u'1469117159.474265', u'1469117167.100027', u'1469117174.815799',
+ u'1469117182.437561', u'1469117190.094326', u'1469117197.721089']
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"time_of_day": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/time_of_day"]
+ # There is no equality between items
+ self.assertEqual(len(data), len(set(data)))
+ # At least a float64
+ self.assertIn(data.dtype.kind, ['d', 'f'])
+ self.assertGreaterEqual(data.dtype.itemsize, 64 / 8)
+
+ def test_mixed_float_size__scalar(self):
+ # We expect to have a precision of 32 bits
+ float_list = [u'1.2', u'1.3001']
+ expected_float_result = [1.2, 1.3001]
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"float_item": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
+ # At worst a float32
+ self.assertIn(data.dtype.kind, ['d', 'f'])
+ self.assertLessEqual(data.dtype.itemsize, 32 / 8)
+ for computed, expected in zip(data, expected_float_result):
+ numpy.testing.assert_almost_equal(computed, expected, 5)
+
+ def test_mixed_float_size__list(self):
+ # We expect to have a precision of 32 bits
+ float_list = [u'1.2 1.3001']
+ expected_float_result = numpy.array([[1.2, 1.3001]])
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"float_item": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
+ # At worst a float32
+ self.assertIn(data.dtype.kind, ['d', 'f'])
+ self.assertLessEqual(data.dtype.itemsize, 32 / 8)
+ for computed, expected in zip(data, expected_float_result):
+ numpy.testing.assert_almost_equal(computed, expected, 5)
+
+ def test_mixed_float_size__list_of_list(self):
+ # We expect to have a precision of 32 bits
+ float_list = [u'1.2 1.3001', u'1.3001 1.3001']
+ expected_float_result = numpy.array([[1.2, 1.3001], [1.3001, 1.3001]])
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"float_item": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
+ # At worst a float32
+ self.assertIn(data.dtype.kind, ['d', 'f'])
+ self.assertLessEqual(data.dtype.itemsize, 32 / 8)
+ for computed, expected in zip(data, expected_float_result):
+ numpy.testing.assert_almost_equal(computed, expected, 5)
+
+ def test_ub_matrix(self):
+ """Data from mediapix.edf"""
+ header = {}
+ header["UB_mne"] = 'UB0 UB1 UB2 UB3 UB4 UB5 UB6 UB7 UB8'
+ header["UB_pos"] = '1.99593e-16 2.73682e-16 -1.54 -1.08894 1.08894 1.6083e-16 1.08894 1.08894 9.28619e-17'
+ header["sample_mne"] = 'U0 U1 U2 U3 U4 U5'
+ header["sample_pos"] = '4.08 4.08 4.08 90 90 90'
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ sample = h5_image["/scan_0/sample"]
+ self.assertIsNotNone(sample)
+ self.assertEqual(sample.attrs["NXclass"], "NXsample")
+
+ d = sample['unit_cell_abc']
+ expected = numpy.array([4.08, 4.08, 4.08])
+ self.assertIsNotNone(d)
+ self.assertEqual(d.shape, (3, ))
+ self.assertIn(d.dtype.kind, ['d', 'f'])
+ numpy.testing.assert_array_almost_equal(d[...], expected)
+
+ d = sample['unit_cell_alphabetagamma']
+ expected = numpy.array([90.0, 90.0, 90.0])
+ self.assertIsNotNone(d)
+ self.assertEqual(d.shape, (3, ))
+ self.assertIn(d.dtype.kind, ['d', 'f'])
+ numpy.testing.assert_array_almost_equal(d[...], expected)
+
+ d = sample['ub_matrix']
+ expected = numpy.array([[[1.99593e-16, 2.73682e-16, -1.54],
+ [-1.08894, 1.08894, 1.6083e-16],
+ [1.08894, 1.08894, 9.28619e-17]]])
+ self.assertIsNotNone(d)
+ self.assertEqual(d.shape, (1, 3, 3))
+ self.assertIn(d.dtype.kind, ['d', 'f'])
+ numpy.testing.assert_array_almost_equal(d[...], expected)
+
+ def test_interpretation_mca_edf(self):
+ """EDF files with two or more headers starting with "MCA"
+ must have @interpretation = "spectrum" an the data."""
+ header = {
+ "Title": "zapimage samy -4.975 -5.095 80 500 samz -4.091 -4.171 70 0",
+ "MCA a": -23.812,
+ "MCA b": 2.7107,
+ "MCA c": 8.1164e-06}
+
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ data_dataset = h5_image["/scan_0/measurement/image_0/data"]
+ self.assertEqual(data_dataset.attrs["interpretation"], "spectrum")
+
+ data_dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(data_dataset.attrs["interpretation"], "spectrum")
+
+ data_dataset = h5_image["/scan_0/measurement/image_0/info/data"]
+ self.assertEqual(data_dataset.attrs["interpretation"], "spectrum")
+
+ def test_get_api(self):
+ result = self.h5_image.get("scan_0", getclass=True, getlink=True)
+ self.assertIs(result, h5py.HardLink)
+ result = self.h5_image.get("scan_0", getclass=False, getlink=True)
+ self.assertIsInstance(result, h5py.HardLink)
+ result = self.h5_image.get("scan_0", getclass=True, getlink=False)
+ self.assertIs(result, h5py.Group)
+ result = self.h5_image.get("scan_0", getclass=False, getlink=False)
+ self.assertIsInstance(result, commonh5.Group)
+
+ def test_detector_link(self):
+ detector1 = self.h5_image["/scan_0/instrument/detector_0"]
+ detector2 = self.h5_image["/scan_0/measurement/image_0/info"]
+ self.assertIsNot(detector1, detector2)
+ self.assertEqual(list(detector1.items()), list(detector2.items()))
+ self.assertEqual(self.h5_image.get(detector2.name, getlink=True).path, detector1.name)
+
+ def test_detector_data_link(self):
+ data1 = self.h5_image["/scan_0/instrument/detector_0/data"]
+ data2 = self.h5_image["/scan_0/measurement/image_0/data"]
+ self.assertIsNot(data1, data2)
+ self.assertIs(data1._get_data(), data2._get_data())
+ self.assertEqual(self.h5_image.get(data2.name, getlink=True).path, data1.name)
+
+ def test_dirty_header(self):
+ """Test that it does not fail"""
+ try:
+ header = {}
+ header["foo"] = b'abc'
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = fabio.edfimage.edfimage(data=data, header=header)
+ header = {}
+ header["foo"] = b'a\x90bc\xFE'
+ fabio_image.append_frame(data=data, header=header)
+ except Exception as e:
+ _logger.error(e.args[0])
+ _logger.debug("Backtrace", exc_info=True)
+ self.skipTest("fabio do not allow to create the resource")
+
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ scan_header_path = "/scan_0/instrument/file/scan_header"
+ self.assertIn(scan_header_path, h5_image)
+ data = h5_image[scan_header_path]
+ self.assertIsInstance(data[...], numpy.ndarray)
+
+ def test_unicode_header(self):
+ """Test that it does not fail"""
+ try:
+ header = {}
+ header["foo"] = b'abc'
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = fabio.edfimage.edfimage(data=data, header=header)
+ header = {}
+ header["foo"] = u'abc\u2764'
+ fabio_image.append_frame(data=data, header=header)
+ except Exception as e:
+ _logger.error(e.args[0])
+ _logger.debug("Backtrace", exc_info=True)
+ self.skipTest("fabio do not allow to create the resource")
+
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ scan_header_path = "/scan_0/instrument/file/scan_header"
+ self.assertIn(scan_header_path, h5_image)
+ data = h5_image[scan_header_path]
+ self.assertIsInstance(data[...], numpy.ndarray)
+
+
+class TestFabioH5MultiFrames(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+
+ names = ["A", "B", "C", "D"]
+ values = [["32000", "-10", "5.0", "1"],
+ ["-32000", "-10", "5.0", "1"]]
+
+ fabio_file = None
+
+ for i in range(10):
+ header = {
+ "image_id": "%d" % i,
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ "motor_mne": " ".join(names),
+ "motor_pos": " ".join(values[i % len(values)]),
+ "counter_mne": " ".join(names),
+ "counter_pos": " ".join(values[i % len(values)])
+ }
+ for iname, name in enumerate(names):
+ header[name] = values[i % len(values)][iname]
+
+ data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ if fabio_file is None:
+ fabio_file = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_file.append_frame(data=data, header=header)
+
+ cls.fabio_file = fabio_file
+ cls.fabioh5 = fabioh5.File(fabio_image=fabio_file)
+
+ def test_others(self):
+ others = self.fabioh5["/scan_0/instrument/detector_0/others"]
+ dataset = others["A"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = others["B"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = others["C"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "f")
+ dataset = others["D"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "u")
+
+ def test_positioners(self):
+ counters = self.fabioh5["/scan_0/instrument/positioners"]
+ # At least 32 bits, no unsigned values
+ dataset = counters["A"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["B"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["C"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "f")
+ dataset = counters["D"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+
+ def test_counters(self):
+ counters = self.fabioh5["/scan_0/measurement"]
+ # At least 32 bits, no unsigned values
+ dataset = counters["A"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["B"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["C"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "f")
+ dataset = counters["D"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+
+
+class TestFabioH5WithEdf(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+
+ cls.tmp_directory = tempfile.mkdtemp()
+
+ cls.edf_filename = os.path.join(cls.tmp_directory, "test.edf")
+
+ header = {
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ }
+ data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ fabio_image = fabio.edfimage.edfimage(data, header)
+ fabio_image.write(cls.edf_filename)
+
+ cls.fabio_image = fabio.open(cls.edf_filename)
+ cls.h5_image = fabioh5.File(fabio_image=cls.fabio_image)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.fabio_image = None
+ cls.h5_image = None
+ shutil.rmtree(cls.tmp_directory)
+
+ def test_reserved_format_metadata(self):
+ if fabio.hexversion < 327920: # 0.5.0 final
+ self.skipTest("fabio >= 0.5.0 final is needed")
+
+ # The EDF contains reserved keys in the header
+ self.assertIn("HeaderID", self.fabio_image.header)
+ # We do not expose them in FabioH5
+ self.assertNotIn("/scan_0/instrument/detector_0/others/HeaderID", self.h5_image)
+
+
+class _TestableFrameData(fabioh5.FrameData):
+ """Allow to test if the full data is reached."""
+ def _create_data(self):
+ raise RuntimeError("Not supposed to be called")
+
+
+class TestFabioH5WithFileSeries(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+
+ cls.tmp_directory = tempfile.mkdtemp()
+
+ cls.edf_filenames = []
+
+ for i in range(10):
+ filename = os.path.join(cls.tmp_directory, "test_%04d.edf" % i)
+ cls.edf_filenames.append(filename)
+
+ header = {
+ "image_id": "%d" % i,
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ }
+ data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ fabio_image = fabio.edfimage.edfimage(data, header)
+ fabio_image.write(filename)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def _testH5Image(self, h5_image):
+ # test data
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (10, 3, 2))
+ self.assertEqual(list(dataset[:, 0, 0]), list(range(10)))
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+ # test metatdata
+ dataset = h5_image["/scan_0/instrument/detector_0/others/image_id"]
+ self.assertEqual(list(dataset[...]), list(range(10)))
+
+ def testFileList(self):
+ h5_image = fabioh5.File(file_series=self.edf_filenames)
+ self._testH5Image(h5_image)
+
+ def testFileSeries(self):
+ file_series = fabioh5._FileSeries(self.edf_filenames)
+ h5_image = fabioh5.File(file_series=file_series)
+ self._testH5Image(h5_image)
+
+ def testFrameDataCache(self):
+ file_series = fabioh5._FileSeries(self.edf_filenames)
+ reader = fabioh5.FabioReader(file_series=file_series)
+ frameData = _TestableFrameData("foo", reader)
+ self.assertEqual(frameData.dtype.kind, "i")
+ self.assertEqual(frameData.shape, (10, 3, 2))
diff --git a/src/silx/io/test/test_fioh5.py b/src/silx/io/test/test_fioh5.py
new file mode 100644
index 0000000..8ffb4ad
--- /dev/null
+++ b/src/silx/io/test/test_fioh5.py
@@ -0,0 +1,299 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2021 Timo Fuchs
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for fioh5"""
+import numpy
+import os
+import io
+import sys
+import tempfile
+import unittest
+import datetime
+import logging
+
+from silx.utils import testutils
+
+from .. import fioh5
+from ..fioh5 import (FioH5, FioH5NodeDataset, is_fiofile, logger1, dtypeConverter)
+
+import h5py
+
+__authors__ = ["T. Fuchs"]
+__license__ = "MIT"
+__date__ = "15/10/2021"
+
+fioftext = """
+!
+! Comments
+!
+%c
+ascan omega 180.0 180.5 3:10/1 4
+user username, acquisition started at Thu Dec 12 18:00:00 2021
+sweep motor lag: 1.0e-03
+channel 3: Detector
+!
+! Parameter
+!
+%p
+channel3_exposure = 1.000000e+00
+ScanName = ascan
+!
+! Data
+!
+%d
+ Col 1 omega(encoder) DOUBLE
+ Col 2 channel INTEGER
+ Col 3 filename STRING
+ Col 4 type STRING
+ Col 5 unix time DOUBLE
+ Col 6 enable BOOLEAN
+ Col 7 time_s FLOAT
+ 179.998418821 3 00001 exposure 1576165741.20308 1 1.243
+ 180.048418821 3 00002 exposure 1576165742.20308 1 1.243
+ 180.098418821 3 00003 exposure 1576165743.20308 1 1.243
+ 180.148418821 3 00004 exposure 1576165744.20308 1 1.243
+ 180.198418821 3 00005 exposure 1576165745.20308 1 1.243
+ 180.248418821 3 00006 exposure 1576165746.20308 1 1.243
+ 180.298418821 3 00007 exposure 1576165747.20308 1 1.243
+ 180.348418821 3 00008 exposure 1576165748.20308 1 1.243
+ 180.398418821 3 00009 exposure 1576165749.20308 1 1.243
+ 180.448418821 3 00010 exposure 1576165750.20308 1 1.243
+"""
+
+
+
+class TestFioH5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.TemporaryDirectory()
+ #fd, cls.fname = tempfile.mkstemp()
+ cls.fname_numbered = os.path.join(cls.temp_dir.name, "eh1scan_00005.fio")
+
+ with open(cls.fname_numbered, 'w') as fiof:
+ fiof.write(fioftext)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.temp_dir.cleanup()
+ del cls.temp_dir
+
+ def setUp(self):
+ self.fioh5 = FioH5(self.fname_numbered)
+
+ def tearDown(self):
+ self.fioh5.close()
+
+ def testScanNumber(self):
+ # scan number is parsed from the file name.
+ self.assertIn("/5.1", self.fioh5)
+ self.assertIn("5.1", self.fioh5)
+
+ def testContainsFile(self):
+ self.assertIn("/5.1/measurement", self.fioh5)
+ self.assertNotIn("25.2", self.fioh5)
+ # measurement is a child of a scan, full path would be required to
+ # access from root level
+ self.assertNotIn("measurement", self.fioh5)
+ # Groups may or may not have a trailing /
+ self.assertIn("/5.1/measurement/", self.fioh5)
+ self.assertIn("/5.1/measurement", self.fioh5)
+ # Datasets can't have a trailing /
+ self.assertIn("/5.1/measurement/omega(encoder)", self.fioh5)
+ self.assertNotIn("/5.1/measurement/omega(encoder)/", self.fioh5)
+ # No gamma
+ self.assertNotIn("/5.1/measurement/gamma", self.fioh5)
+
+ def testContainsGroup(self):
+ self.assertIn("measurement", self.fioh5["/5.1/"])
+ self.assertIn("measurement", self.fioh5["/5.1"])
+ self.assertIn("5.1", self.fioh5["/"])
+ self.assertNotIn("5.2", self.fioh5["/"])
+ self.assertIn("measurement/filename", self.fioh5["/5.1"])
+ # illegal trailing "/" after dataset name
+ self.assertNotIn("measurement/filename/",
+ self.fioh5["/5.1"])
+ # full path to element in group (OK)
+ self.assertIn("/5.1/measurement/filename",
+ self.fioh5["/5.1/measurement"])
+
+ def testDataType(self):
+ meas = self.fioh5["/5.1/measurement/"]
+ self.assertEqual(meas["omega(encoder)"].dtype, dtypeConverter['DOUBLE'])
+ self.assertEqual(meas["channel"].dtype, dtypeConverter['INTEGER'])
+ self.assertEqual(meas["filename"].dtype, dtypeConverter['STRING'])
+ self.assertEqual(meas["time_s"].dtype, dtypeConverter['FLOAT'])
+ self.assertEqual(meas["enable"].dtype, dtypeConverter['BOOLEAN'])
+
+ def testDataColumn(self):
+ self.assertAlmostEqual(sum(self.fioh5["/5.1/measurement/omega(encoder)"]),
+ 1802.23418821)
+ self.assertTrue(numpy.all(self.fioh5["/5.1/measurement/enable"]))
+
+ # --- comment section tests ---
+
+ def testComment(self):
+ # should hold the complete comment section
+ self.assertEqual(self.fioh5["/5.1/instrument/fiofile/comments"],
+"""ascan omega 180.0 180.5 3:10/1 4
+user username, acquisition started at Thu Dec 12 18:00:00 2021
+sweep motor lag: 1.0e-03
+channel 3: Detector
+""")
+
+ def testDate(self):
+ # there is no convention on how to format the time. So just check its existence.
+ self.assertEqual(self.fioh5["/5.1/start_time"],
+ u"Thu Dec 12 18:00:00 2021")
+
+ def testTitle(self):
+ self.assertEqual(self.fioh5["/5.1/title"],
+ u"ascan omega 180.0 180.5 3:10/1 4")
+
+
+ # --- parameter section tests ---
+
+ def testParameter(self):
+ # should hold the complete parameter section
+ self.assertEqual(self.fioh5["/5.1/instrument/fiofile/parameter"],
+"""channel3_exposure = 1.000000e+00
+ScanName = ascan
+""")
+
+ def testParsedParameter(self):
+ # no dtype is given, so everything is str.
+ self.assertEqual(self.fioh5["/5.1/instrument/parameter/channel3_exposure"],
+ u"1.000000e+00")
+ self.assertEqual(self.fioh5["/5.1/instrument/parameter/ScanName"], u"ascan")
+
+ def testNotFioH5(self):
+ testfilename = os.path.join(self.temp_dir.name, "eh1scan_00010.fio")
+ with open(testfilename, 'w') as fiof:
+ fiof.write("!Not a fio file!")
+
+ self.assertRaises(IOError, FioH5, testfilename)
+
+ self.assertTrue(is_fiofile(self.fname_numbered))
+ self.assertFalse(is_fiofile(testfilename))
+
+ os.unlink(testfilename)
+
+
+class TestUnnumberedFioH5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.TemporaryDirectory()
+ cls.fname_nosuffix = os.path.join(cls.temp_dir.name, "eh1scan_nosuffix.fio")
+
+ with open(cls.fname_nosuffix, 'w') as fiof:
+ fiof.write(fioftext)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.temp_dir.cleanup()
+ del cls.temp_dir
+
+ def setUp(self):
+ self.fioh5 = FioH5(self.fname_nosuffix)
+
+ def testLogMissingScanno(self):
+ with self.assertLogs(logger1,level='WARNING') as cm:
+ fioh5 = FioH5(self.fname_nosuffix)
+ self.assertIn("Cannot parse scan number of file", cm.output[0])
+
+ def testFallbackName(self):
+ self.assertIn("/eh1scan_nosuffix", self.fioh5)
+
+brokenHeaderText = """
+!
+! Comments
+!
+%c
+ascan omega 180.0 180.5 3:10/1 4
+user username, acquisited at Thu Dec 12 100 2021
+sweep motor lavgvf.0e-03
+channel 3: Detector
+!
+! Parameter
+!
+%p
+channel3_exposu65 1.000000e+00
+ScanName = ascan
+!
+! Data
+!
+%d
+ Col 1 omega(encoder) DOUBLE
+ Col 2 channel INTEGER
+ Col 3 filename STRING
+ Col 4 type STRING
+ Col 5 unix time DOUBLE
+ 179.998418821 3 00001 exposure 1576165741.20308
+ 180.048418821 3 00002 exposure 1576165742.20308
+ 180.098418821 3 00003 exposure 1576165743.20308
+ 180.148418821 3 00004 exposure 1576165744.20308
+ 180.198418821 3 00005 exposure 1576165745.20308
+ 180.248418821 3 00006 exposure 1576165746.20308
+ 180.298418821 3 00007 exposure 1576165747.20308
+ 180.348418821 3 00008 exposure 1576165748.20308
+ 180.398418821 3 00009 exposure 1576165749.20308
+ 180.448418821 3 00010 exposure 1576165750.20308
+"""
+
+class TestBrokenHeaderFioH5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.TemporaryDirectory()
+ cls.fname_numbered = os.path.join(cls.temp_dir.name, "eh1scan_00005.fio")
+
+ with open(cls.fname_numbered, 'w') as fiof:
+ fiof.write(brokenHeaderText)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.temp_dir.cleanup()
+ del cls.temp_dir
+
+ def setUp(self):
+ self.fioh5 = FioH5(self.fname_numbered)
+
+ def testLogBrokenHeader(self):
+ with self.assertLogs(logger1,level='WARNING') as cm:
+ fioh5 = FioH5(self.fname_numbered)
+ self.assertIn("Cannot parse parameter section", cm.output[0])
+ self.assertIn("Cannot parse default comment section", cm.output[1])
+
+ def testComment(self):
+ # should hold the complete comment section
+ self.assertEqual(self.fioh5["/5.1/instrument/fiofile/comments"],
+"""ascan omega 180.0 180.5 3:10/1 4
+user username, acquisited at Thu Dec 12 100 2021
+sweep motor lavgvf.0e-03
+channel 3: Detector
+""")
+
+ def testParameter(self):
+ # should hold the complete parameter section
+ self.assertEqual(self.fioh5["/5.1/instrument/fiofile/parameter"],
+"""channel3_exposu65 1.000000e+00
+ScanName = ascan
+""")
diff --git a/src/silx/io/test/test_h5py_utils.py b/src/silx/io/test/test_h5py_utils.py
new file mode 100644
index 0000000..ea46eca
--- /dev/null
+++ b/src/silx/io/test/test_h5py_utils.py
@@ -0,0 +1,451 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for h5py utilities"""
+
+__authors__ = ["W. de Nolf"]
+__license__ = "MIT"
+__date__ = "27/01/2020"
+
+
+import unittest
+import os
+import sys
+import time
+import shutil
+import logging
+import tempfile
+import multiprocessing
+from contextlib import contextmanager
+
+from .. import h5py_utils
+from ...utils.retry import RetryError, RetryTimeoutError
+
+IS_WINDOWS = sys.platform == "win32"
+logger = logging.getLogger()
+
+
+def _subprocess_context_main(queue, contextmgr, *args, **kw):
+ try:
+ with contextmgr(*args, **kw):
+ queue.put(None)
+ queue.get()
+ except Exception:
+ queue.put(None)
+ raise
+
+
+@contextmanager
+def _subprocess_context(contextmgr, *args, **kw):
+ print("\nSTART", os.getpid())
+ timeout = kw.pop("timeout", 10)
+ queue = multiprocessing.Queue(maxsize=1)
+ p = multiprocessing.Process(
+ target=_subprocess_context_main, args=(queue, contextmgr) + args, kwargs=kw
+ )
+ p.start()
+ try:
+ queue.get(timeout=timeout)
+ yield
+ finally:
+ queue.put(None)
+ p.join(timeout)
+ print(" EXIT", os.getpid())
+
+
+@contextmanager
+def _open_context(filename, **kw):
+ try:
+ print(os.getpid(), "OPEN", filename, kw)
+ with h5py_utils.File(filename, **kw) as f:
+ if kw.get("mode") == "w":
+ f["check"] = True
+ f.flush()
+ yield f
+ except Exception:
+ print(" ", os.getpid(), "FAILED", filename, kw)
+ raise
+ else:
+ print(" ", os.getpid(), "CLOSED", filename, kw)
+
+
+def _cause_segfault():
+ import ctypes
+
+ i = ctypes.c_char(b"a")
+ j = ctypes.pointer(i)
+ c = 0
+ while True:
+ j[c] = b"a"
+ c += 1
+
+
+def _top_level_names_test(txtfilename, *args, **kw):
+ sys.stderr = open(os.devnull, "w")
+
+ with open(txtfilename, mode="r") as f:
+ failcounter = int(f.readline().strip())
+
+ ncausefailure = kw.pop("ncausefailure")
+ faildelay = kw.pop("faildelay")
+ if failcounter < ncausefailure:
+ time.sleep(faildelay)
+ failcounter += 1
+ with open(txtfilename, mode="w") as f:
+ f.write(str(failcounter))
+ if failcounter % 2:
+ raise RetryError
+ else:
+ _cause_segfault()
+ return h5py_utils._top_level_names(*args, **kw)
+
+
+top_level_names_test = h5py_utils.retry_in_subprocess()(_top_level_names_test)
+
+
+def subtests(test):
+ def wrapper(self):
+ for subtest_options in self._subtests():
+ print("\n====SUB TEST===\n")
+ print(f"sub test options: {subtest_options}")
+ with self.subTest(str(subtest_options)):
+ test(self)
+
+ return wrapper
+
+
+class TestH5pyUtils(unittest.TestCase):
+ def setUp(self):
+ self.test_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.test_dir)
+
+ def _subtests(self):
+ self._subtest_options = {"mode": "w"}
+ self.filename_generator = self._filenames()
+ yield self._subtest_options
+ self._subtest_options = {"mode": "w", "libver": "latest"}
+ self.filename_generator = self._filenames()
+ yield
+
+ def _filenames(self):
+ i = 1
+ while True:
+ filename = os.path.join(self.test_dir, "file{}.h5".format(i))
+ with self._open_context(filename):
+ pass
+ yield filename
+ i += 1
+
+ def _new_filename(self):
+ return next(self.filename_generator)
+
+ @contextmanager
+ def _open_context(self, filename, **kwargs):
+ kw = dict(self._subtest_options)
+ kw.update(kwargs)
+ with _open_context(filename, **kw) as f:
+ yield f
+
+ @contextmanager
+ def _open_context_subprocess(self, filename, **kwargs):
+ kw = dict(self._subtest_options)
+ kw.update(kwargs)
+ with _subprocess_context(_open_context, filename, **kw):
+ yield
+
+ def _assert_hdf5_data(self, f):
+ self.assertTrue(f["check"][()])
+
+ def _validate_hdf5_data(self, filename, swmr=False):
+ with self._open_context(filename, mode="r") as f:
+ self.assertEqual(f.swmr_mode, swmr)
+ self._assert_hdf5_data(f)
+
+ @subtests
+ def test_modes_single_process(self):
+ """Test concurrent access to the different files from the same process"""
+ # When using HDF5_USE_FILE_LOCKING, open files with and without
+ # locking should raise an exception. HDF5_USE_FILE_LOCKING should
+ # be reset when all files are closed.
+
+ orig = os.environ.get("HDF5_USE_FILE_LOCKING")
+ filename1 = self._new_filename()
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+ filename2 = self._new_filename()
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+
+ with self._open_context(filename1, mode="r"):
+ locking1 = False
+ for mode in ["r", "w", "a"]:
+ locking2 = mode != "r"
+ raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT
+ raise_condition &= locking1 != locking2
+ with self.assertRaisesIf(raise_condition, RuntimeError):
+ with self._open_context(filename2, mode=mode):
+ pass
+ self._validate_hdf5_data(filename1)
+ self._validate_hdf5_data(filename2)
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+
+ with self._open_context(filename1, mode="a"):
+ locking1 = True
+ for mode in ["r", "w", "a"]:
+ locking2 = mode != "r"
+ raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT
+ raise_condition &= locking1 != locking2
+ with self.assertRaisesIf(raise_condition, RuntimeError):
+ with self._open_context(filename2, mode=mode):
+ pass
+ self._validate_hdf5_data(filename1)
+ self._validate_hdf5_data(filename2)
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+
+ @property
+ def _libver_low_bound_is_v108(self):
+ libver = self._subtest_options.get("libver")
+ return h5py_utils._libver_low_bound_is_v108(libver)
+
+ @property
+ def _nonlocking_reader_before_writer(self):
+ """A non-locking reader must open the file before it is locked by a writer"""
+ if IS_WINDOWS and h5py_utils.HDF5_HAS_LOCKING_ARGUMENT:
+ return True
+ if not self._libver_low_bound_is_v108:
+ return True
+ return False
+
+ @contextmanager
+ def assertRaisesIf(self, condition, *args, **kw):
+ if condition:
+ with self.assertRaises(*args, **kw):
+ yield
+ else:
+ yield
+
+ @unittest.skipIf(
+ h5py_utils.HDF5_HAS_LOCKING_ARGUMENT != h5py_utils.H5PY_HAS_LOCKING_ARGUMENT,
+ "Versions of libhdf5 and h5py use incompatible file locking behaviour",
+ )
+ @subtests
+ def test_modes_multi_process(self):
+ """Test concurrent access to the same file from different processes"""
+ filename = self._new_filename()
+
+ nonlocking_reader_before_writer = self._nonlocking_reader_before_writer
+ writer_before_nonlocking_reader_exception = OSError
+ old_hdf5_on_windows = IS_WINDOWS and not h5py_utils.HDF5_HAS_LOCKING_ARGUMENT
+ locked_exception = OSError
+
+ # File locked by a writer
+ unexpected_access = old_hdf5_on_windows and self._libver_low_bound_is_v108
+ for wmode in ["w", "a"]:
+ with self._open_context_subprocess(filename, mode=wmode):
+ # Access by a second non-locking reader
+ with self.assertRaisesIf(
+ nonlocking_reader_before_writer,
+ writer_before_nonlocking_reader_exception,
+ ):
+ with self._open_context(filename, mode="r") as f:
+ self._assert_hdf5_data(f)
+ # No access by a second locking reader
+ if unexpected_access:
+ logger.warning("unexpected concurrent access by a locking reader")
+ with self.assertRaisesIf(not unexpected_access, locked_exception):
+ with self._open_context(filename, mode="r", locking=True) as f:
+ self._assert_hdf5_data(f)
+ # No access by a second writer
+ if unexpected_access:
+ logger.warning("unexpected concurrent access by a writer")
+ with self.assertRaisesIf(not unexpected_access, locked_exception):
+ with self._open_context(filename, mode="a") as f:
+ self._assert_hdf5_data(f)
+ # Check for file corruption
+ if not nonlocking_reader_before_writer:
+ self._validate_hdf5_data(filename)
+ self._validate_hdf5_data(filename)
+
+ # File locked by a reader
+ unexpected_access = old_hdf5_on_windows
+ with _subprocess_context(_open_context, filename, mode="r", locking=True):
+ # Access by a non-locking reader
+ with self._open_context(filename, mode="r") as f:
+ self._assert_hdf5_data(f)
+ # Access by a locking reader
+ with self._open_context(filename, mode="r", locking=True) as f:
+ self._assert_hdf5_data(f)
+ # No access by a second writer
+ if unexpected_access:
+ logger.warning("unexpected concurrent access by a writer")
+ raise_condition = not unexpected_access
+ with self.assertRaisesIf(raise_condition, locked_exception):
+ with self._open_context(filename, mode="a") as f:
+ self._assert_hdf5_data(f)
+ # Check for file corruption
+ self._validate_hdf5_data(filename)
+ self._validate_hdf5_data(filename)
+
+ # File open by a non-locking reader
+ with self._open_context_subprocess(filename, mode="r"):
+ # Access by a second non-locking reader
+ with self._open_context(filename, mode="r") as f:
+ self._assert_hdf5_data(f)
+ # Access by a second locking reader
+ with self._open_context(filename, mode="r", locking=True) as f:
+ self._assert_hdf5_data(f)
+ # Access by a second writer
+ with self._open_context(filename, mode="a") as f:
+ self._assert_hdf5_data(f)
+ # Check for file corruption
+ self._validate_hdf5_data(filename)
+ self._validate_hdf5_data(filename)
+
+ @subtests
+ @unittest.skipIf(not h5py_utils.HAS_SWMR, "SWMR not supported")
+ def test_modes_multi_process_swmr(self):
+ filename = self._new_filename()
+
+ with self._open_context(filename, mode="w", libver="latest") as f:
+ pass
+
+ # File open by SWMR writer
+ with self._open_context_subprocess(filename, mode="a", swmr=True):
+ with self._open_context(filename, mode="r") as f:
+ assert f.swmr_mode
+ self._assert_hdf5_data(f)
+ with self.assertRaises(OSError):
+ with self._open_context(filename, mode="a") as f:
+ pass
+ self._validate_hdf5_data(filename, swmr=True)
+
+ @subtests
+ def test_retry_defaults(self):
+ filename = self._new_filename()
+
+ names = h5py_utils.top_level_names(filename)
+ self.assertEqual(names, [])
+
+ names = h5py_utils.safe_top_level_names(filename)
+ self.assertEqual(names, [])
+
+ names = h5py_utils.top_level_names(filename, include_only=None)
+ self.assertEqual(names, ["check"])
+
+ names = h5py_utils.safe_top_level_names(filename, include_only=None)
+ self.assertEqual(names, ["check"])
+
+ with h5py_utils.open_item(filename, "/check", validate=lambda x: False) as item:
+ self.assertEqual(item, None)
+
+ with h5py_utils.open_item(filename, "/check", validate=None) as item:
+ self.assertTrue(item[()])
+
+ with self.assertRaises(RetryTimeoutError):
+ with h5py_utils.open_item(
+ filename,
+ "/check",
+ retry_timeout=0.1,
+ retry_invalid=True,
+ validate=lambda x: False,
+ ) as item:
+ pass
+
+ ncall = 0
+
+ def validate(item):
+ nonlocal ncall
+ if ncall >= 1:
+ return True
+ else:
+ ncall += 1
+ raise RetryError
+
+ with h5py_utils.open_item(
+ filename,
+ "/check",
+ validate=validate,
+ retry_timeout=1,
+ retry_invalid=True,
+ ) as item:
+ self.assertTrue(item[()])
+
+ @subtests
+ def test_retry_custom(self):
+ filename = self._new_filename()
+ ncausefailure = 3
+ faildelay = 0.1
+ sufficient_timeout = ncausefailure * (faildelay + 10)
+ insufficient_timeout = ncausefailure * faildelay * 0.5
+
+ @h5py_utils.retry_contextmanager()
+ def open_item(filename, name):
+ nonlocal failcounter
+ if failcounter < ncausefailure:
+ time.sleep(faildelay)
+ failcounter += 1
+ raise RetryError
+ with h5py_utils.File(filename) as h5file:
+ yield h5file[name]
+
+ failcounter = 0
+ kw = {"retry_timeout": sufficient_timeout}
+ with open_item(filename, "/check", **kw) as item:
+ self.assertTrue(item[()])
+
+ failcounter = 0
+ kw = {"retry_timeout": insufficient_timeout}
+ with self.assertRaises(RetryTimeoutError):
+ with open_item(filename, "/check", **kw) as item:
+ pass
+
+ @subtests
+ def test_retry_in_subprocess(self):
+ filename = self._new_filename()
+ txtfilename = os.path.join(self.test_dir, "failcounter.txt")
+ ncausefailure = 3
+ faildelay = 0.1
+ sufficient_timeout = ncausefailure * (faildelay + 10)
+ insufficient_timeout = ncausefailure * faildelay * 0.5
+
+ kw = {
+ "retry_timeout": sufficient_timeout,
+ "include_only": None,
+ "ncausefailure": ncausefailure,
+ "faildelay": faildelay,
+ }
+ with open(txtfilename, mode="w") as f:
+ f.write("0")
+ names = top_level_names_test(txtfilename, filename, **kw)
+ self.assertEqual(names, ["check"])
+
+ kw = {
+ "retry_timeout": insufficient_timeout,
+ "include_only": None,
+ "ncausefailure": ncausefailure,
+ "faildelay": faildelay,
+ }
+ with open(txtfilename, mode="w") as f:
+ f.write("0")
+ with self.assertRaises(RetryTimeoutError):
+ top_level_names_test(txtfilename, filename, **kw)
diff --git a/src/silx/io/test/test_nxdata.py b/src/silx/io/test/test_nxdata.py
new file mode 100644
index 0000000..9025d6d
--- /dev/null
+++ b/src/silx/io/test/test_nxdata.py
@@ -0,0 +1,563 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2021 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for NXdata parsing"""
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "24/03/2020"
+
+
+import tempfile
+import unittest
+import h5py
+import numpy
+
+from .. import nxdata
+
+
+text_dtype = h5py.special_dtype(vlen=str)
+
+
+class TestNXdata(unittest.TestCase):
+ def setUp(self):
+ tmp = tempfile.NamedTemporaryFile(prefix="nxdata_examples_", suffix=".h5", delete=True)
+ tmp.file.close()
+ self.h5fname = tmp.name
+ self.h5f = h5py.File(tmp.name, "w")
+
+ # SCALARS
+ g0d = self.h5f.create_group("scalars")
+
+ g0d0 = g0d.create_group("0D_scalar")
+ g0d0.attrs["NX_class"] = "NXdata"
+ g0d0.attrs["signal"] = "scalar"
+ g0d0.create_dataset("scalar", data=10)
+ g0d0.create_dataset("scalar_errors", data=0.1)
+
+ g0d1 = g0d.create_group("2D_scalars")
+ g0d1.attrs["NX_class"] = "NXdata"
+ g0d1.attrs["signal"] = "scalars"
+ ds = g0d1.create_dataset("scalars", data=numpy.arange(3 * 10).reshape((3, 10)))
+ ds.attrs["interpretation"] = "scalar"
+
+ g0d1 = g0d.create_group("4D_scalars")
+ g0d1.attrs["NX_class"] = "NXdata"
+ g0d1.attrs["signal"] = "scalars"
+ ds = g0d1.create_dataset("scalars", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10)))
+ ds.attrs["interpretation"] = "scalar"
+
+ # SPECTRA
+ g1d = self.h5f.create_group("spectra")
+
+ g1d0 = g1d.create_group("1D_spectrum")
+ g1d0.attrs["NX_class"] = "NXdata"
+ g1d0.attrs["signal"] = "count"
+ g1d0.attrs["auxiliary_signals"] = numpy.array(["count2", "count3"],
+ dtype=text_dtype)
+ g1d0.attrs["axes"] = "energy_calib"
+ g1d0.attrs["uncertainties"] = numpy.array(["energy_errors", ],
+ dtype=text_dtype)
+ g1d0.create_dataset("count", data=numpy.arange(10))
+ g1d0.create_dataset("count2", data=0.5 * numpy.arange(10))
+ d = g1d0.create_dataset("count3", data=0.4 * numpy.arange(10))
+ d.attrs["long_name"] = "3rd counter"
+ g1d0.create_dataset("title", data="Title as dataset (like nexpy)")
+ g1d0.create_dataset("energy_calib", data=(10, 5)) # 10 * idx + 5
+ g1d0.create_dataset("energy_errors", data=3.14 * numpy.random.rand(10))
+
+ g1d1 = g1d.create_group("2D_spectra")
+ g1d1.attrs["NX_class"] = "NXdata"
+ g1d1.attrs["signal"] = "counts"
+ ds = g1d1.create_dataset("counts", data=numpy.arange(3 * 10).reshape((3, 10)))
+ ds.attrs["interpretation"] = "spectrum"
+
+ g1d2 = g1d.create_group("4D_spectra")
+ g1d2.attrs["NX_class"] = "NXdata"
+ g1d2.attrs["signal"] = "counts"
+ g1d2.attrs["axes"] = numpy.array(["energy", ], dtype=text_dtype)
+ ds = g1d2.create_dataset("counts", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10)))
+ ds.attrs["interpretation"] = "spectrum"
+ ds = g1d2.create_dataset("errors", data=4.5 * numpy.random.rand(2, 2, 3, 10))
+ ds = g1d2.create_dataset("energy", data=5 + 10 * numpy.arange(15),
+ shuffle=True, compression="gzip")
+ ds.attrs["long_name"] = "Calibrated energy"
+ ds.attrs["first_good"] = 3
+ ds.attrs["last_good"] = 12
+ g1d2.create_dataset("energy_errors", data=10 * numpy.random.rand(15))
+
+ # IMAGES
+ g2d = self.h5f.create_group("images")
+
+ g2d0 = g2d.create_group("2D_regular_image")
+ g2d0.attrs["NX_class"] = "NXdata"
+ g2d0.attrs["signal"] = "image"
+ g2d0.attrs["auxiliary_signals"] = "image2"
+ g2d0.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"],
+ dtype=text_dtype)
+ g2d0.create_dataset("image", data=numpy.arange(4 * 6).reshape((4, 6)))
+ g2d0.create_dataset("image2", data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds = g2d0.create_dataset("rows_calib", data=(10, 5))
+ ds.attrs["long_name"] = "Calibrated Y"
+ g2d0.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6))
+
+ g2d1 = g2d.create_group("2D_irregular_data")
+ g2d1.attrs["NX_class"] = "NXdata"
+ g2d1.attrs["signal"] = "data"
+ g2d1.attrs["title"] = "Title as group attr"
+ g2d1.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"],
+ dtype=text_dtype)
+ g2d1.create_dataset("data", data=numpy.arange(64 * 128).reshape((64, 128)))
+ g2d1.create_dataset("rows_coordinates", data=numpy.arange(64) + numpy.random.rand(64))
+ g2d1.create_dataset("columns_coordinates", data=numpy.arange(128) + 2.5 * numpy.random.rand(128))
+
+ g2d2 = g2d.create_group("3D_images")
+ g2d2.attrs["NX_class"] = "NXdata"
+ g2d2.attrs["signal"] = "images"
+ ds = g2d2.create_dataset("images", data=numpy.arange(2 * 4 * 6).reshape((2, 4, 6)))
+ ds.attrs["interpretation"] = "image"
+
+ g2d3 = g2d.create_group("5D_images")
+ g2d3.attrs["NX_class"] = "NXdata"
+ g2d3.attrs["signal"] = "images"
+ g2d3.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"],
+ dtype=text_dtype)
+ ds = g2d3.create_dataset("images", data=numpy.arange(2 * 2 * 2 * 4 * 6).reshape((2, 2, 2, 4, 6)))
+ ds.attrs["interpretation"] = "image"
+ g2d3.create_dataset("rows_coordinates", data=5 + 10 * numpy.arange(4))
+ g2d3.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6))
+
+ g2d4 = g2d.create_group("RGBA_image")
+ g2d4.attrs["NX_class"] = "NXdata"
+ g2d4.attrs["signal"] = "image"
+ g2d4.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"],
+ dtype=text_dtype)
+ rgba_image = numpy.linspace(0, 1, num=7*8*3).reshape((7, 8, 3))
+ rgba_image[:, :, 1] = 1 - rgba_image[:, :, 1] # invert G channel to add some color
+ ds = g2d4.create_dataset("image", data=rgba_image)
+ ds.attrs["interpretation"] = "rgba-image"
+ ds = g2d4.create_dataset("rows_calib", data=(10, 5))
+ ds.attrs["long_name"] = "Calibrated Y"
+ g2d4.create_dataset("columns_coordinates", data=0.5+0.02*numpy.arange(8))
+
+ # SCATTER
+ g = self.h5f.create_group("scatters")
+
+ gd0 = g.create_group("x_y_scatter")
+ gd0.attrs["NX_class"] = "NXdata"
+ gd0.attrs["signal"] = "y"
+ gd0.attrs["axes"] = numpy.array(["x", ], dtype=text_dtype)
+ gd0.create_dataset("y", data=numpy.random.rand(128) - 0.5)
+ gd0.create_dataset("x", data=2 * numpy.random.rand(128))
+ gd0.create_dataset("x_errors", data=0.05 * numpy.random.rand(128))
+ gd0.create_dataset("errors", data=0.05 * numpy.random.rand(128))
+
+ gd1 = g.create_group("x_y_value_scatter")
+ gd1.attrs["NX_class"] = "NXdata"
+ gd1.attrs["signal"] = "values"
+ gd1.attrs["axes"] = numpy.array(["x", "y"], dtype=text_dtype)
+ gd1.create_dataset("values", data=3.14 * numpy.random.rand(128))
+ gd1.create_dataset("y", data=numpy.random.rand(128))
+ gd1.create_dataset("y_errors", data=0.02 * numpy.random.rand(128))
+ gd1.create_dataset("x", data=numpy.random.rand(128))
+ gd1.create_dataset("x_errors", data=0.02 * numpy.random.rand(128))
+
+ def tearDown(self):
+ self.h5f.close()
+
+ def testValidity(self):
+ for group in self.h5f:
+ for subgroup in self.h5f[group]:
+ self.assertTrue(
+ nxdata.is_valid_nxdata(self.h5f[group][subgroup]),
+ "%s/%s not found to be a valid NXdata group" % (group, subgroup))
+
+ def testScalars(self):
+ nxd = nxdata.NXdata(self.h5f["scalars/0D_scalar"])
+ self.assertTrue(nxd.signal_is_0d)
+ self.assertEqual(nxd.signal[()], 10)
+ self.assertEqual(nxd.axes_names, [])
+ self.assertEqual(nxd.axes_dataset_names, [])
+ self.assertEqual(nxd.axes, [])
+ self.assertIsNotNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+
+ nxd = nxdata.NXdata(self.h5f["scalars/2D_scalars"])
+ self.assertTrue(nxd.signal_is_2d)
+ self.assertEqual(nxd.signal[1, 2], 12)
+ self.assertEqual(nxd.axes_names, [None, None])
+ self.assertEqual(nxd.axes_dataset_names, [None, None])
+ self.assertEqual(nxd.axes, [None, None])
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "scalar")
+
+ nxd = nxdata.NXdata(self.h5f["scalars/4D_scalars"])
+ self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or
+ nxd.signal_is_2d or nxd.signal_is_3d)
+ self.assertEqual(nxd.signal[1, 0, 1, 4], 74)
+ self.assertEqual(nxd.axes_names, [None, None, None, None])
+ self.assertEqual(nxd.axes_dataset_names, [None, None, None, None])
+ self.assertEqual(nxd.axes, [None, None, None, None])
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "scalar")
+
+ def testSpectra(self):
+ nxd = nxdata.NXdata(self.h5f["spectra/1D_spectrum"])
+ self.assertTrue(nxd.signal_is_1d)
+ self.assertTrue(nxd.is_curve)
+ self.assertTrue(numpy.array_equal(numpy.array(nxd.signal),
+ numpy.arange(10)))
+ self.assertEqual(nxd.axes_names, ["energy_calib"])
+ self.assertEqual(nxd.axes_dataset_names, ["energy_calib"])
+ self.assertEqual(nxd.axes[0][0], 10)
+ self.assertEqual(nxd.axes[0][1], 5)
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+ self.assertEqual(nxd.title, "Title as dataset (like nexpy)")
+
+ self.assertEqual(nxd.auxiliary_signals_dataset_names,
+ ["count2", "count3"])
+ self.assertEqual(nxd.auxiliary_signals_names,
+ ["count2", "3rd counter"])
+ self.assertAlmostEqual(nxd.auxiliary_signals[1][2],
+ 0.8) # numpy.arange(10) * 0.4
+
+ nxd = nxdata.NXdata(self.h5f["spectra/2D_spectra"])
+ self.assertTrue(nxd.signal_is_2d)
+ self.assertTrue(nxd.is_curve)
+ self.assertEqual(nxd.axes_names, [None, None])
+ self.assertEqual(nxd.axes_dataset_names, [None, None])
+ self.assertEqual(nxd.axes, [None, None])
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "spectrum")
+
+ nxd = nxdata.NXdata(self.h5f["spectra/4D_spectra"])
+ self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or
+ nxd.signal_is_2d or nxd.signal_is_3d)
+ self.assertTrue(nxd.is_curve)
+ self.assertEqual(nxd.axes_names,
+ [None, None, None, "Calibrated energy"])
+ self.assertEqual(nxd.axes_dataset_names,
+ [None, None, None, "energy"])
+ self.assertEqual(nxd.axes[:3], [None, None, None])
+ self.assertEqual(nxd.axes[3].shape, (10, )) # dataset shape (15, ) sliced [3:12]
+ self.assertIsNotNone(nxd.errors)
+ self.assertEqual(nxd.errors.shape, (2, 2, 3, 10))
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "spectrum")
+ self.assertEqual(nxd.get_axis_errors("energy").shape,
+ (10,))
+ # test getting axis errors by long_name
+ self.assertTrue(numpy.array_equal(nxd.get_axis_errors("Calibrated energy"),
+ nxd.get_axis_errors("energy")))
+ self.assertTrue(numpy.array_equal(nxd.get_axis_errors(b"Calibrated energy"),
+ nxd.get_axis_errors("energy")))
+
+ def testImages(self):
+ nxd = nxdata.NXdata(self.h5f["images/2D_regular_image"])
+ self.assertTrue(nxd.signal_is_2d)
+ self.assertTrue(nxd.is_image)
+ self.assertEqual(nxd.axes_names, ["Calibrated Y", "columns_coordinates"])
+ self.assertEqual(list(nxd.axes_dataset_names),
+ ["rows_calib", "columns_coordinates"])
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+ self.assertEqual(len(nxd.auxiliary_signals), 1)
+ self.assertEqual(nxd.auxiliary_signals_names, ["image2"])
+
+ nxd = nxdata.NXdata(self.h5f["images/2D_irregular_data"])
+ self.assertTrue(nxd.signal_is_2d)
+ self.assertTrue(nxd.is_image)
+
+ self.assertEqual(nxd.axes_dataset_names, nxd.axes_names)
+ self.assertEqual(list(nxd.axes_dataset_names),
+ ["rows_coordinates", "columns_coordinates"])
+ self.assertEqual(len(nxd.axes), 2)
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+ self.assertEqual(nxd.title, "Title as group attr")
+
+ nxd = nxdata.NXdata(self.h5f["images/5D_images"])
+ self.assertTrue(nxd.is_image)
+ self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or
+ nxd.signal_is_2d or nxd.signal_is_3d)
+ self.assertEqual(nxd.axes_names,
+ [None, None, None, 'rows_coordinates', 'columns_coordinates'])
+ self.assertEqual(nxd.axes_dataset_names,
+ [None, None, None, 'rows_coordinates', 'columns_coordinates'])
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "image")
+
+ nxd = nxdata.NXdata(self.h5f["images/RGBA_image"])
+ self.assertTrue(nxd.is_image)
+ self.assertEqual(nxd.interpretation, "rgba-image")
+ self.assertTrue(nxd.signal_is_3d)
+ self.assertEqual(nxd.axes_names, ["Calibrated Y",
+ "columns_coordinates",
+ None])
+ self.assertEqual(list(nxd.axes_dataset_names),
+ ["rows_calib", "columns_coordinates", None])
+
+ def testScatters(self):
+ nxd = nxdata.NXdata(self.h5f["scatters/x_y_scatter"])
+ self.assertTrue(nxd.signal_is_1d)
+ self.assertEqual(nxd.axes_names, ["x"])
+ self.assertEqual(nxd.axes_dataset_names,
+ ["x"])
+ self.assertIsNotNone(nxd.errors)
+ self.assertEqual(nxd.get_axis_errors("x").shape,
+ (128, ))
+ self.assertTrue(nxd.is_scatter)
+ self.assertFalse(nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+
+ nxd = nxdata.NXdata(self.h5f["scatters/x_y_value_scatter"])
+ self.assertFalse(nxd.signal_is_1d)
+ self.assertTrue(nxd.axes_dataset_names,
+ nxd.axes_names)
+ self.assertEqual(nxd.axes_dataset_names,
+ ["x", "y"])
+ self.assertEqual(nxd.get_axis_errors("x").shape,
+ (128, ))
+ self.assertEqual(nxd.get_axis_errors("y").shape,
+ (128, ))
+ self.assertEqual(len(nxd.axes), 2)
+ self.assertIsNone(nxd.errors)
+ self.assertTrue(nxd.is_scatter)
+ self.assertTrue(nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+
+
+class TestLegacyNXdata(unittest.TestCase):
+ def setUp(self):
+ tmp = tempfile.NamedTemporaryFile(prefix="nxdata_legacy_examples_",
+ suffix=".h5", delete=True)
+ tmp.file.close()
+ self.h5fname = tmp.name
+ self.h5f = h5py.File(tmp.name, "w")
+
+ def tearDown(self):
+ self.h5f.close()
+
+ def testSignalAttrOnDataset(self):
+ g = self.h5f.create_group("2D")
+ g.attrs["NX_class"] = "NXdata"
+
+ ds0 = g.create_dataset("image0",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds0.attrs["signal"] = 1
+ ds0.attrs["long_name"] = "My first image"
+
+ ds1 = g.create_dataset("image1",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds1.attrs["signal"] = "2"
+ ds1.attrs["long_name"] = "My 2nd image"
+
+ ds2 = g.create_dataset("image2",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds2.attrs["signal"] = 3
+
+ nxd = nxdata.NXdata(self.h5f["2D"])
+
+ self.assertEqual(nxd.signal_dataset_name, "image0")
+ self.assertEqual(nxd.signal_name, "My first image")
+ self.assertEqual(nxd.signal.shape,
+ (4, 6))
+
+ self.assertEqual(len(nxd.auxiliary_signals), 2)
+ self.assertEqual(nxd.auxiliary_signals[1].shape,
+ (4, 6))
+
+ self.assertEqual(nxd.auxiliary_signals_dataset_names,
+ ["image1", "image2"])
+ self.assertEqual(nxd.auxiliary_signals_names,
+ ["My 2nd image", "image2"])
+
+ def testAxesOnSignalDataset(self):
+ g = self.h5f.create_group("2D")
+ g.attrs["NX_class"] = "NXdata"
+
+ ds0 = g.create_dataset("image0",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds0.attrs["signal"] = 1
+ ds0.attrs["axes"] = "yaxis:xaxis"
+
+ ds1 = g.create_dataset("yaxis",
+ data=numpy.arange(4))
+ ds2 = g.create_dataset("xaxis",
+ data=numpy.arange(6))
+
+ nxd = nxdata.NXdata(self.h5f["2D"])
+
+ self.assertEqual(nxd.axes_dataset_names,
+ ["yaxis", "xaxis"])
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ numpy.arange(4)))
+ self.assertTrue(numpy.array_equal(nxd.axes[1],
+ numpy.arange(6)))
+
+ def testAxesOnAxesDatasets(self):
+ g = self.h5f.create_group("2D")
+ g.attrs["NX_class"] = "NXdata"
+
+ ds0 = g.create_dataset("image0",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds0.attrs["signal"] = 1
+ ds1 = g.create_dataset("yaxis",
+ data=numpy.arange(4))
+ ds1.attrs["axis"] = 0
+ ds2 = g.create_dataset("xaxis",
+ data=numpy.arange(6))
+ ds2.attrs["axis"] = "1"
+
+ nxd = nxdata.NXdata(self.h5f["2D"])
+ self.assertEqual(nxd.axes_dataset_names,
+ ["yaxis", "xaxis"])
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ numpy.arange(4)))
+ self.assertTrue(numpy.array_equal(nxd.axes[1],
+ numpy.arange(6)))
+
+ def testAsciiUndefinedAxesAttrs(self):
+ """Some files may not be using utf8 for str attrs"""
+ g = self.h5f.create_group("bytes_attrs")
+ g.attrs["NX_class"] = b"NXdata"
+ g.attrs["signal"] = b"image0"
+ g.attrs["axes"] = b"yaxis", b"."
+
+ g.create_dataset("image0",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ g.create_dataset("yaxis",
+ data=numpy.arange(4))
+
+ nxd = nxdata.NXdata(self.h5f["bytes_attrs"])
+ self.assertEqual(nxd.axes_dataset_names,
+ ["yaxis", None])
+
+
+class TestSaveNXdata(unittest.TestCase):
+ def setUp(self):
+ tmp = tempfile.NamedTemporaryFile(prefix="nxdata",
+ suffix=".h5", delete=True)
+ tmp.file.close()
+ self.h5fname = tmp.name
+
+ def testSimpleSave(self):
+ sig = numpy.array([0, 1, 2])
+ a0 = numpy.array([2, 3, 4])
+ a1 = numpy.array([3, 4, 5])
+ nxdata.save_NXdata(filename=self.h5fname,
+ signal=sig,
+ axes=[a0, a1],
+ signal_name="sig",
+ axes_names=["a0", "a1"],
+ nxentry_name="a",
+ nxdata_name="mydata")
+
+ h5f = h5py.File(self.h5fname, "r")
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"]))
+
+ nxd = nxdata.NXdata(h5f["/a/mydata"])
+ self.assertTrue(numpy.array_equal(nxd.signal,
+ sig))
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ a0))
+
+ h5f.close()
+
+ def testSimplestSave(self):
+ sig = numpy.array([0, 1, 2])
+ nxdata.save_NXdata(filename=self.h5fname,
+ signal=sig)
+
+ h5f = h5py.File(self.h5fname, "r")
+
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["/entry/data0"]))
+
+ nxd = nxdata.NXdata(h5f["/entry/data0"])
+ self.assertTrue(numpy.array_equal(nxd.signal,
+ sig))
+ h5f.close()
+
+ def testSaveDefaultAxesNames(self):
+ sig = numpy.array([0, 1, 2])
+ a0 = numpy.array([2, 3, 4])
+ a1 = numpy.array([3, 4, 5])
+ nxdata.save_NXdata(filename=self.h5fname,
+ signal=sig,
+ axes=[a0, a1],
+ signal_name="sig",
+ axes_names=None,
+ axes_long_names=["a", "b"],
+ nxentry_name="a",
+ nxdata_name="mydata")
+
+ h5f = h5py.File(self.h5fname, "r")
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"]))
+
+ nxd = nxdata.NXdata(h5f["/a/mydata"])
+ self.assertTrue(numpy.array_equal(nxd.signal,
+ sig))
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ a0))
+ self.assertEqual(nxd.axes_dataset_names,
+ [u"dim0", u"dim1"])
+ self.assertEqual(nxd.axes_names,
+ [u"a", u"b"])
+
+ h5f.close()
+
+ def testSaveToExistingEntry(self):
+ h5f = h5py.File(self.h5fname, "w")
+ g = h5f.create_group("myentry")
+ g.attrs["NX_class"] = "NXentry"
+ h5f.close()
+
+ sig = numpy.array([0, 1, 2])
+ a0 = numpy.array([2, 3, 4])
+ a1 = numpy.array([3, 4, 5])
+ nxdata.save_NXdata(filename=self.h5fname,
+ signal=sig,
+ axes=[a0, a1],
+ signal_name="sig",
+ axes_names=["a0", "a1"],
+ nxentry_name="myentry",
+ nxdata_name="toto")
+
+ h5f = h5py.File(self.h5fname, "r")
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["myentry/toto"]))
+
+ nxd = nxdata.NXdata(h5f["myentry/toto"])
+ self.assertTrue(numpy.array_equal(nxd.signal,
+ sig))
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ a0))
+ h5f.close()
diff --git a/src/silx/io/test/test_octaveh5.py b/src/silx/io/test/test_octaveh5.py
new file mode 100644
index 0000000..1c3b3e0
--- /dev/null
+++ b/src/silx/io/test/test_octaveh5.py
@@ -0,0 +1,156 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""
+Tests for the octaveh5 module
+"""
+
+__authors__ = ["C. Nemoz", "H. Payno"]
+__license__ = "MIT"
+__date__ = "12/07/2016"
+
+import unittest
+import os
+import tempfile
+
+try:
+ from ..octaveh5 import Octaveh5
+except ImportError:
+ Octaveh5 = None
+
+
+@unittest.skipIf(Octaveh5 is None, "Could not import h5py")
+class TestOctaveH5(unittest.TestCase):
+ @staticmethod
+ def _get_struct_FT():
+ return {
+ 'NO_CHECK': 0.0, 'SHOWSLICE': 1.0, 'DOTOMO': 1.0, 'DATABASE': 0.0, 'ANGLE_OFFSET': 0.0,
+ 'VOLSELECTION_REMEMBER': 0.0, 'NUM_PART': 4.0, 'VOLOUTFILE': 0.0, 'RINGSCORRECTION': 0.0,
+ 'DO_TEST_SLICE': 1.0, 'ZEROOFFMASK': 1.0, 'VERSION': 'fastomo3 version 2.0',
+ 'CORRECT_SPIKES_THRESHOLD': 0.040000000000000001, 'SHOWPROJ': 0.0, 'HALF_ACQ': 0.0,
+ 'ANGLE_OFFSET_VALUE': 0.0, 'FIXEDSLICE': 'middle', 'VOLSELECT': 'total' }
+ @staticmethod
+ def _get_struct_PYHSTEXE():
+ return {
+ 'EXE': 'PyHST2_2015d', 'VERBOSE': 0.0, 'OFFV': 'PyHST2_2015d', 'TOMO': 0.0,
+ 'VERBOSE_FILE': 'pyhst_out.txt', 'DIR': '/usr/bin/', 'OFFN': 'pyhst2'}
+
+ @staticmethod
+ def _get_struct_FTAXIS():
+ return {
+ 'POSITION_VALUE': 12345.0, 'COR_ERROR': 0.0, 'FILESDURINGSCAN': 0.0, 'PLOTFIGURE': 1.0,
+ 'DIM1': 0.0, 'OVERSAMPLING': 5.0, 'TO_THE_CENTER': 1.0, 'POSITION': 'fixed',
+ 'COR_POSITION': 0.0, 'HA': 0.0 }
+
+ @staticmethod
+ def _get_struct_PAGANIN():
+ return {
+ 'MKEEP_MASK': 0.0, 'UNSHARP_SIGMA': 0.80000000000000004, 'DILATE': 2.0, 'UNSHARP_COEFF': 3.0,
+ 'MEDIANR': 4.0, 'DB': 500.0, 'MKEEP_ABS': 0.0, 'MODE': 0.0, 'THRESHOLD': 0.5,
+ 'MKEEP_BONE': 0.0, 'DB2': 100.0, 'MKEEP_CORR': 0.0, 'MKEEP_SOFT': 0.0 }
+
+ @staticmethod
+ def _get_struct_BEAMGEO():
+ return {'DIST': 55.0, 'SY': 0.0, 'SX': 0.0, 'TYPE': 'p'}
+
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.test_3_6_fname = os.path.join(self.tempdir, "silx_tmp_t00_octaveTest_3_6.h5")
+ self.test_3_8_fname = os.path.join(self.tempdir, "silx_tmp_t00_octaveTest_3_8.h5")
+
+ def tearDown(self):
+ if os.path.isfile(self.test_3_6_fname):
+ os.unlink(self.test_3_6_fname)
+ if os.path.isfile(self.test_3_8_fname):
+ os.unlink(self.test_3_8_fname)
+
+ def testWritedIsReaded(self):
+ """
+ Simple test to write and reaf the structure compatible with the octave h5 using structure.
+ This test is for # test for octave version > 3.8
+ """
+ writer = Octaveh5()
+
+ writer.open(self.test_3_8_fname, 'a')
+ # step 1 writing the file
+ writer.write('FT', self._get_struct_FT())
+ writer.write('PYHSTEXE', self._get_struct_PYHSTEXE())
+ writer.write('FTAXIS', self._get_struct_FTAXIS())
+ writer.write('PAGANIN', self._get_struct_PAGANIN())
+ writer.write('BEAMGEO', self._get_struct_BEAMGEO())
+ writer.close()
+
+ # step 2 reading the file
+ reader = Octaveh5().open(self.test_3_8_fname)
+ # 2.1 check FT
+ data_readed = reader.get('FT')
+ self.assertEqual(data_readed, self._get_struct_FT() )
+ # 2.2 check PYHSTEXE
+ data_readed = reader.get('PYHSTEXE')
+ self.assertEqual(data_readed, self._get_struct_PYHSTEXE() )
+ # 2.3 check FTAXIS
+ data_readed = reader.get('FTAXIS')
+ self.assertEqual(data_readed, self._get_struct_FTAXIS() )
+ # 2.4 check PAGANIN
+ data_readed = reader.get('PAGANIN')
+ self.assertEqual(data_readed, self._get_struct_PAGANIN() )
+ # 2.5 check BEAMGEO
+ data_readed = reader.get('BEAMGEO')
+ self.assertEqual(data_readed, self._get_struct_BEAMGEO() )
+ reader.close()
+
+ def testWritedIsReadedOldOctaveVersion(self):
+ """The same test as testWritedIsReaded but for octave version < 3.8
+ """
+ # test for octave version < 3.8
+ writer = Octaveh5(3.6)
+
+ writer.open(self.test_3_6_fname, 'a')
+
+ # step 1 writing the file
+ writer.write('FT', self._get_struct_FT())
+ writer.write('PYHSTEXE', self._get_struct_PYHSTEXE())
+ writer.write('FTAXIS', self._get_struct_FTAXIS())
+ writer.write('PAGANIN', self._get_struct_PAGANIN())
+ writer.write('BEAMGEO', self._get_struct_BEAMGEO())
+ writer.close()
+
+ # step 2 reading the file
+ reader = Octaveh5(3.6).open(self.test_3_6_fname)
+ # 2.1 check FT
+ data_readed = reader.get('FT')
+ self.assertEqual(data_readed, self._get_struct_FT() )
+ # 2.2 check PYHSTEXE
+ data_readed = reader.get('PYHSTEXE')
+ self.assertEqual(data_readed, self._get_struct_PYHSTEXE() )
+ # 2.3 check FTAXIS
+ data_readed = reader.get('FTAXIS')
+ self.assertEqual(data_readed, self._get_struct_FTAXIS() )
+ # 2.4 check PAGANIN
+ data_readed = reader.get('PAGANIN')
+ self.assertEqual(data_readed, self._get_struct_PAGANIN() )
+ # 2.5 check BEAMGEO
+ data_readed = reader.get('BEAMGEO')
+ self.assertEqual(data_readed, self._get_struct_BEAMGEO() )
+ reader.close()
diff --git a/src/silx/io/test/test_rawh5.py b/src/silx/io/test/test_rawh5.py
new file mode 100644
index 0000000..236484d
--- /dev/null
+++ b/src/silx/io/test/test_rawh5.py
@@ -0,0 +1,85 @@
+# coding: utf-8
+# /*##########################################################################
+#
+# Copyright (c) 2016 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ###########################################################################*/
+"""Test for silx.gui.hdf5 module"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "21/09/2017"
+
+
+import unittest
+import tempfile
+import numpy
+import shutil
+from ..import rawh5
+
+
+class TestNumpyFile(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tmpDirectory = tempfile.mkdtemp()
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmpDirectory)
+
+ def testNumpyFile(self):
+ filename = "%s/%s.npy" % (self.tmpDirectory, self.id())
+ c = numpy.random.rand(5, 5)
+ numpy.save(filename, c)
+ h5 = rawh5.NumpyFile(filename)
+ self.assertIn("data", h5)
+ self.assertEqual(h5["data"].dtype.kind, "f")
+
+ def testNumpyZFile(self):
+ filename = "%s/%s.npz" % (self.tmpDirectory, self.id())
+ a = numpy.array(u"aaaaa")
+ b = numpy.array([1, 2, 3, 4])
+ c = numpy.random.rand(5, 5)
+ d = numpy.array(b"aaaaa")
+ e = numpy.array(u"i \u2661 my mother")
+ numpy.savez(filename, a, b=b, c=c, d=d, e=e)
+ h5 = rawh5.NumpyFile(filename)
+ self.assertIn("arr_0", h5)
+ self.assertIn("b", h5)
+ self.assertIn("c", h5)
+ self.assertIn("d", h5)
+ self.assertIn("e", h5)
+ self.assertEqual(h5["arr_0"].dtype.kind, "U")
+ self.assertEqual(h5["b"].dtype.kind, "i")
+ self.assertEqual(h5["c"].dtype.kind, "f")
+ self.assertEqual(h5["d"].dtype.kind, "S")
+ self.assertEqual(h5["e"].dtype.kind, "U")
+
+ def testNumpyZFileContainingDirectories(self):
+ filename = "%s/%s.npz" % (self.tmpDirectory, self.id())
+ data = {}
+ data['a/b/c'] = numpy.arange(10)
+ data['a/b/e'] = numpy.arange(10)
+ numpy.savez(filename, **data)
+ h5 = rawh5.NumpyFile(filename)
+ self.assertIn("a/b/c", h5)
+ self.assertIn("a/b/e", h5)
diff --git a/src/silx/io/test/test_specfile.py b/src/silx/io/test/test_specfile.py
new file mode 100644
index 0000000..44cb08c
--- /dev/null
+++ b/src/silx/io/test/test_specfile.py
@@ -0,0 +1,420 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2021 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for specfile wrapper"""
+
+__authors__ = ["P. Knobel", "V.A. Sole"]
+__license__ = "MIT"
+__date__ = "17/01/2018"
+
+
+import locale
+import logging
+import numpy
+import os
+import sys
+import tempfile
+import unittest
+
+from silx.utils import testutils
+
+from ..specfile import SpecFile, Scan
+from .. import specfile
+
+
+logger1 = logging.getLogger(__name__)
+
+sftext = """#F /tmp/sf.dat
+#E 1455180875
+#D Thu Feb 11 09:54:35 2016
+#C imaging User = opid17
+#U00 user comment first line
+#U01 This is a dummy file to test SpecFile parsing
+#U02
+#U03 last line
+
+#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
+#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
+#o0 pshg mrtu mrtd
+#o2 ss1vo ss1ho ss1vg
+
+#J0 Seconds IA ion.mono Current
+#J1 xbpmc2 idgap1 Inorm
+
+#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
+#D Thu Feb 11 09:55:20 2016
+#T 0.2 (Seconds)
+#G0 0
+#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
+#G3 0 0 0 0 0 0 0 0 0
+#G4 0
+#Q
+#P0 180.005 -0.66875 0.87125
+#P1 14.74255 16.197579 12.238283
+#UMI0 Current AutoM Shutter
+#UMI1 192.51 OFF FE open
+#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00;
+#N 4
+#L first column second column 3rd_col
+-1.23 5.89 8
+8.478100E+01 5 1.56
+3.14 2.73 -3.14
+1.2 2.3 3.4
+
+#S 25 ascan c3th 1.33245 1.52245 40 0.15
+#D Thu Feb 11 10:00:31 2016
+#P0 80.005 -1.66875 1.87125
+#P1 4.74255 6.197579 2.238283
+#N 5
+#L column0 column1 col2 col3
+0.0 0.1 0.2 0.3
+1.0 1.1 1.2 1.3
+2.0 2.1 2.2 2.3
+3.0 3.1 3.2 3.3
+
+#S 26 yyyyyy
+#D Thu Feb 11 09:55:20 2016
+#P0 80.005 -1.66875 1.87125
+#P1 4.74255 6.197579 2.238283
+#N 4
+#L first column second column 3rd_col
+#C Sat Oct 31 15:51:47 1998. Scan aborted after 0 points.
+
+#F /tmp/sf.dat
+#E 1455180876
+#D Thu Feb 11 09:54:36 2016
+
+#S 1 aaaaaa
+#U first duplicate line
+#U second duplicate line
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+3 4
+@A 3.1 4 5
+5 6
+@A 6 7.7 8
+"""
+
+
+loc = locale.getlocale(locale.LC_NUMERIC)
+try:
+ locale.setlocale(locale.LC_NUMERIC, 'de_DE.utf8')
+except locale.Error:
+ try_DE = False
+else:
+ try_DE = True
+ locale.setlocale(locale.LC_NUMERIC, loc)
+
+
+class TestSpecFile(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname1 = tempfile.mkstemp(text=False)
+ if sys.version_info < (3, ):
+ os.write(fd, sftext)
+ else:
+ os.write(fd, bytes(sftext, 'ascii'))
+ os.close(fd)
+
+ fd2, cls.fname2 = tempfile.mkstemp(text=False)
+ if sys.version_info < (3, ):
+ os.write(fd2, sftext[370:923])
+ else:
+ os.write(fd2, bytes(sftext[370:923], 'ascii'))
+ os.close(fd2)
+
+ fd3, cls.fname3 = tempfile.mkstemp(text=False)
+ txt = sftext[371:923]
+ if sys.version_info < (3, ):
+ os.write(fd3, txt)
+ else:
+ os.write(fd3, bytes(txt, 'ascii'))
+ os.close(fd3)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname1)
+ os.unlink(cls.fname2)
+ os.unlink(cls.fname3)
+
+ def setUp(self):
+ self.sf = SpecFile(self.fname1)
+ self.scan1 = self.sf[0]
+ self.scan1_2 = self.sf["1.2"]
+ self.scan25 = self.sf["25.1"]
+ self.empty_scan = self.sf["26.1"]
+
+ self.sf_no_fhdr = SpecFile(self.fname2)
+ self.scan1_no_fhdr = self.sf_no_fhdr[0]
+
+ self.sf_no_fhdr_crash = SpecFile(self.fname3)
+ self.scan1_no_fhdr_crash = self.sf_no_fhdr_crash[0]
+
+ def tearDown(self):
+ self.sf.close()
+ self.sf_no_fhdr.close()
+ self.sf_no_fhdr_crash.close()
+
+ def test_open(self):
+ self.assertIsInstance(self.sf, SpecFile)
+ with self.assertRaises(specfile.SfErrFileOpen):
+ SpecFile("doesnt_exist.dat")
+
+ # test filename types unicode and bytes
+ if sys.version_info[0] < 3:
+ try:
+ SpecFile(self.fname1)
+ except TypeError:
+ self.fail("failed to handle filename as python2 str")
+ try:
+ SpecFile(unicode(self.fname1))
+ except TypeError:
+ self.fail("failed to handle filename as python2 unicode")
+ else:
+ try:
+ SpecFile(self.fname1)
+ except TypeError:
+ self.fail("failed to handle filename as python3 str")
+ try:
+ SpecFile(bytes(self.fname1, 'utf-8'))
+ except TypeError:
+ self.fail("failed to handle filename as python3 bytes")
+
+ def test_number_of_scans(self):
+ self.assertEqual(4, len(self.sf))
+
+ def test_list_of_scan_indices(self):
+ self.assertEqual(self.sf.list(),
+ [1, 25, 26, 1])
+ self.assertEqual(self.sf.keys(),
+ ["1.1", "25.1", "26.1", "1.2"])
+
+ def test_index_number_order(self):
+ self.assertEqual(self.sf.index(1, 2), 3) # sf["1.2"]==sf[3]
+ self.assertEqual(self.sf.number(1), 25) # sf[1]==sf["25"]
+ self.assertEqual(self.sf.order(3), 2) # sf[3]==sf["1.2"]
+ with self.assertRaises(specfile.SfErrScanNotFound):
+ self.sf.index(3, 2)
+ with self.assertRaises(specfile.SfErrScanNotFound):
+ self.sf.index(99)
+
+ def assertRaisesRegex(self, *args, **kwargs):
+ # Python 2 compatibility
+ if sys.version_info.major >= 3:
+ return super(TestSpecFile, self).assertRaisesRegex(*args, **kwargs)
+ else:
+ return self.assertRaisesRegexp(*args, **kwargs)
+
+ def test_getitem(self):
+ self.assertIsInstance(self.sf[2], Scan)
+ self.assertIsInstance(self.sf["1.2"], Scan)
+ # int out of range
+ with self.assertRaisesRegex(IndexError, 'Scan index must be in ran'):
+ self.sf[107]
+ # float indexing not allowed
+ with self.assertRaisesRegex(TypeError, 'The scan identification k'):
+ self.sf[1.2]
+ # non existant scan with "N.M" indexing
+ with self.assertRaises(KeyError):
+ self.sf["3.2"]
+
+ def test_specfile_iterator(self):
+ i = 0
+ for scan in self.sf:
+ if i == 1:
+ self.assertEqual(scan.motor_positions,
+ self.sf[1].motor_positions)
+ i += 1
+ # number of returned scans
+ self.assertEqual(i, len(self.sf))
+
+ def test_scan_index(self):
+ self.assertEqual(self.scan1.index, 0)
+ self.assertEqual(self.scan1_2.index, 3)
+ self.assertEqual(self.scan25.index, 1)
+
+ def test_scan_headers(self):
+ self.assertEqual(self.scan25.scan_header_dict['S'],
+ "25 ascan c3th 1.33245 1.52245 40 0.15")
+ self.assertEqual(self.scan1.header[17], '#G0 0')
+ self.assertEqual(len(self.scan1.header), 29)
+ # parsing headers with long keys
+ self.assertEqual(self.scan1.scan_header_dict['UMI0'],
+ 'Current AutoM Shutter')
+ # parsing empty headers
+ self.assertEqual(self.scan1.scan_header_dict['Q'], '')
+ # duplicate headers: concatenated (with newline)
+ self.assertEqual(self.scan1_2.scan_header_dict["U"],
+ "first duplicate line\nsecond duplicate line")
+
+ def test_file_headers(self):
+ self.assertEqual(self.scan1.header[1],
+ '#E 1455180875')
+ self.assertEqual(self.scan1.file_header_dict['F'],
+ '/tmp/sf.dat')
+
+ def test_multiple_file_headers(self):
+ """Scan 1.2 is after the second file header, with a different
+ Epoch"""
+ self.assertEqual(self.scan1_2.header[1],
+ '#E 1455180876')
+
+ def test_scan_labels(self):
+ self.assertEqual(self.scan1.labels,
+ ['first column', 'second column', '3rd_col'])
+
+ def test_data(self):
+ # data_line() and data_col() take 1-based indices as arg
+ self.assertAlmostEqual(self.scan1.data_line(1)[2],
+ 1.56)
+ # tests for data transposition between original file and .data attr
+ self.assertAlmostEqual(self.scan1.data[2, 0],
+ 8)
+ self.assertEqual(self.scan1.data.shape, (3, 4))
+ self.assertAlmostEqual(numpy.sum(self.scan1.data), 113.631)
+
+ def test_data_column_by_name(self):
+ self.assertAlmostEqual(self.scan25.data_column_by_name("col2")[1],
+ 1.2)
+ # Scan.data is transposed after readinq, so column is the first index
+ self.assertAlmostEqual(numpy.sum(self.scan25.data_column_by_name("col2")),
+ numpy.sum(self.scan25.data[2, :]))
+ with self.assertRaises(specfile.SfErrColNotFound):
+ self.scan25.data_column_by_name("ygfxgfyxg")
+
+ def test_motors(self):
+ self.assertEqual(len(self.scan1.motor_names), 6)
+ self.assertEqual(len(self.scan1.motor_positions), 6)
+ self.assertAlmostEqual(sum(self.scan1.motor_positions),
+ 223.385912)
+ self.assertEqual(self.scan1.motor_names[1], 'MRTSlit UP')
+ self.assertAlmostEqual(
+ self.scan25.motor_position_by_name('MRTSlit UP'),
+ -1.66875)
+
+ def test_absence_of_file_header(self):
+ """We expect Scan.file_header to be an empty list in the absence
+ of a file header.
+ """
+ self.assertEqual(len(self.scan1_no_fhdr.motor_names), 0)
+ # motor positions can still be read in the scan header
+ # even in the absence of motor names
+ self.assertAlmostEqual(sum(self.scan1_no_fhdr.motor_positions),
+ 223.385912)
+ self.assertEqual(len(self.scan1_no_fhdr.header), 15)
+ self.assertEqual(len(self.scan1_no_fhdr.scan_header), 15)
+ self.assertEqual(len(self.scan1_no_fhdr.file_header), 0)
+
+ def test_crash_absence_of_file_header(self):
+ """Test no crash in absence of file header and no leading newline
+ character
+ """
+ self.assertEqual(len(self.scan1_no_fhdr_crash.motor_names), 0)
+ # motor positions can still be read in the scan header
+ # even in the absence of motor names
+ self.assertAlmostEqual(sum(self.scan1_no_fhdr_crash.motor_positions),
+ 223.385912)
+ self.assertEqual(len(self.scan1_no_fhdr_crash.scan_header), 15)
+ self.assertEqual(len(self.scan1_no_fhdr_crash.file_header), 0)
+
+ def test_mca(self):
+ self.assertEqual(len(self.scan1.mca), 0)
+ self.assertEqual(len(self.scan1_2.mca), 3)
+ self.assertEqual(self.scan1_2.mca[1][2], 5)
+ self.assertEqual(sum(self.scan1_2.mca[2]), 21.7)
+
+ # Negative indexing
+ self.assertEqual(sum(self.scan1_2.mca[len(self.scan1_2.mca) - 1]),
+ sum(self.scan1_2.mca[-1]))
+
+ # Test iterator
+ line_count, total_sum = (0, 0)
+ for mca_line in self.scan1_2.mca:
+ line_count += 1
+ total_sum += sum(mca_line)
+ self.assertEqual(line_count, 3)
+ self.assertAlmostEqual(total_sum, 36.8)
+
+ def test_mca_header(self):
+ self.assertEqual(self.scan1.mca_header_dict, {})
+ self.assertEqual(len(self.scan1_2.mca_header_dict), 4)
+ self.assertEqual(self.scan1_2.mca_header_dict["CALIB"], "1 2 3")
+ self.assertEqual(self.scan1_2.mca.calibration,
+ [[1., 2., 3.]])
+ # default calib in the absence of #@CALIB
+ self.assertEqual(self.scan25.mca.calibration,
+ [[0., 1., 0.]])
+ self.assertEqual(self.scan1_2.mca.channels,
+ [[0, 1, 2]])
+ # absence of #@CHANN and spectra
+ self.assertEqual(self.scan25.mca.channels,
+ [])
+
+ @testutils.validate_logging(specfile._logger.name, warning=1)
+ def test_empty_scan(self):
+ """Test reading a scan with no data points"""
+ self.assertEqual(len(self.empty_scan.labels),
+ 3)
+ col1 = self.empty_scan.data_column_by_name("second column")
+ self.assertEqual(col1.shape, (0, ))
+
+
+class TestSFLocale(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp(text=False)
+ if sys.version_info < (3, ):
+ os.write(fd, sftext)
+ else:
+ os.write(fd, bytes(sftext, 'ascii'))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+ locale.setlocale(locale.LC_NUMERIC, loc) # restore saved locale
+
+ def crunch_data(self):
+ self.sf3 = SpecFile(self.fname)
+ self.assertAlmostEqual(self.sf3[0].data_line(1)[2],
+ 1.56)
+ self.sf3.close()
+
+ @unittest.skipIf(not try_DE, "de_DE.utf8 locale not installed")
+ def test_locale_de_DE(self):
+ locale.setlocale(locale.LC_NUMERIC, 'de_DE.utf8')
+ self.crunch_data()
+
+ def test_locale_user(self):
+ locale.setlocale(locale.LC_NUMERIC, '') # use user's preferred locale
+ self.crunch_data()
+
+ def test_locale_C(self):
+ locale.setlocale(locale.LC_NUMERIC, 'C') # use default (C) locale
+ self.crunch_data()
diff --git a/src/silx/io/test/test_specfilewrapper.py b/src/silx/io/test/test_specfilewrapper.py
new file mode 100644
index 0000000..a1ba5f4
--- /dev/null
+++ b/src/silx/io/test/test_specfilewrapper.py
@@ -0,0 +1,195 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for old specfile wrapper"""
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "15/05/2017"
+
+import locale
+import logging
+import numpy
+import os
+import sys
+import tempfile
+import unittest
+
+logger1 = logging.getLogger(__name__)
+
+from ..specfilewrapper import Specfile
+
+sftext = """#F /tmp/sf.dat
+#E 1455180875
+#D Thu Feb 11 09:54:35 2016
+#C imaging User = opid17
+#U00 user comment first line
+#U01 This is a dummy file to test SpecFile parsing
+#U02
+#U03 last line
+
+#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
+#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
+#o0 pshg mrtu mrtd
+#o2 ss1vo ss1ho ss1vg
+
+#J0 Seconds IA ion.mono Current
+#J1 xbpmc2 idgap1 Inorm
+
+#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
+#D Thu Feb 11 09:55:20 2016
+#T 0.2 (Seconds)
+#G0 0
+#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
+#G3 0 0 0 0 0 0 0 0 0
+#G4 0
+#Q
+#P0 180.005 -0.66875 0.87125
+#P1 14.74255 16.197579 12.238283
+#UMI0 Current AutoM Shutter
+#UMI1 192.51 OFF FE open
+#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00;
+#N 4
+#L first column second column 3rd_col
+-1.23 5.89 8
+8.478100E+01 5 1.56
+3.14 2.73 -3.14
+1.2 2.3 3.4
+
+#S 25 ascan c3th 1.33245 1.52245 40 0.15
+#D Thu Feb 11 10:00:31 2016
+#P0 80.005 -1.66875 1.87125
+#P1 4.74255 6.197579 2.238283
+#N 5
+#L column0 column1 col2 col3
+0.0 0.1 0.2 0.3
+1.0 1.1 1.2 1.3
+2.0 2.1 2.2 2.3
+3.0 3.1 3.2 3.3
+
+#F /tmp/sf.dat
+#E 1455180876
+#D Thu Feb 11 09:54:36 2016
+
+#S 1 aaaaaa
+#U first duplicate line
+#U second duplicate line
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+3 4
+@A 3.1 4 5
+5 6
+@A 6 7.7 8
+"""
+
+
+class TestSpecfilewrapper(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname1 = tempfile.mkstemp(text=False)
+ if sys.version_info < (3, ):
+ os.write(fd, sftext)
+ else:
+ os.write(fd, bytes(sftext, 'ascii'))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname1)
+
+ def setUp(self):
+ self.sf = Specfile(self.fname1)
+ self.scan1 = self.sf[0]
+ self.scan1_2 = self.sf.select("1.2")
+ self.scan25 = self.sf.select("25.1")
+
+ def tearDown(self):
+ self.sf.close()
+
+ def test_number_of_scans(self):
+ self.assertEqual(3, len(self.sf))
+
+ def test_list_of_scan_indices(self):
+ self.assertEqual(self.sf.list(),
+ '1,25,1')
+ self.assertEqual(self.sf.keys(),
+ ["1.1", "25.1", "1.2"])
+
+ def test_scan_headers(self):
+ self.assertEqual(self.scan25.header('S'),
+ ["#S 25 ascan c3th 1.33245 1.52245 40 0.15"])
+ self.assertEqual(self.scan1.header("G0"), ['#G0 0'])
+ # parsing headers with long keys
+ # parsing empty headers
+ self.assertEqual(self.scan1.header('Q'), ['#Q '])
+
+ def test_file_headers(self):
+ self.assertEqual(self.scan1.header("E"),
+ ['#E 1455180875'])
+ self.assertEqual(self.sf.title(),
+ "imaging")
+ self.assertEqual(self.sf.epoch(),
+ 1455180875)
+ self.assertEqual(self.sf.allmotors(),
+ ["Pslit HGap", "MRTSlit UP", "MRTSlit DOWN",
+ "Sslit1 VOff", "Sslit1 HOff", "Sslit1 VGap"])
+
+ def test_scan_labels(self):
+ self.assertEqual(self.scan1.alllabels(),
+ ['first column', 'second column', '3rd_col'])
+
+ def test_data(self):
+ self.assertAlmostEqual(self.scan1.dataline(3)[2],
+ -3.14)
+ self.assertAlmostEqual(self.scan1.datacol(1)[2],
+ 3.14)
+ # tests for data transposition between original file and .data attr
+ self.assertAlmostEqual(self.scan1.data()[2, 0],
+ 8)
+ self.assertEqual(self.scan1.data().shape, (3, 4))
+ self.assertAlmostEqual(numpy.sum(self.scan1.data()), 113.631)
+
+ def test_date(self):
+ self.assertEqual(self.scan1.date(),
+ "Thu Feb 11 09:55:20 2016")
+
+ def test_motors(self):
+ self.assertEqual(len(self.sf.allmotors()), 6)
+ self.assertEqual(len(self.scan1.allmotorpos()), 6)
+ self.assertAlmostEqual(sum(self.scan1.allmotorpos()),
+ 223.385912)
+ self.assertEqual(self.sf.allmotors()[1], 'MRTSlit UP')
+
+ def test_mca(self):
+ self.assertEqual(self.scan1_2.mca(2)[2], 5)
+ self.assertEqual(sum(self.scan1_2.mca(3)), 21.7)
+
+ def test_mca_header(self):
+ self.assertEqual(self.scan1_2.header("CALIB"),
+ ["#@CALIB 1 2 3"])
diff --git a/src/silx/io/test/test_spech5.py b/src/silx/io/test/test_spech5.py
new file mode 100644
index 0000000..1e67961
--- /dev/null
+++ b/src/silx/io/test/test_spech5.py
@@ -0,0 +1,929 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2021 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for spech5"""
+import numpy
+import os
+import io
+import sys
+import tempfile
+import unittest
+import datetime
+from functools import partial
+
+from silx.utils import testutils
+
+from .. import spech5
+from ..spech5 import (SpecH5, SpecH5Dataset, spec_date_to_iso8601)
+from .. import specfile
+
+import h5py
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "12/02/2018"
+
+sftext = """#F /tmp/sf.dat
+#E 1455180875
+#D Thu Feb 11 09:54:35 2016
+#C imaging User = opid17
+#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
+#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
+#o0 pshg mrtu mrtd
+#o2 ss1vo ss1ho ss1vg
+
+#J0 Seconds IA ion.mono Current
+#J1 xbpmc2 idgap1 Inorm
+
+#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
+#D Thu Feb 11 09:55:20 2016
+#T 0.2 (Seconds)
+#P0 180.005 -0.66875 0.87125
+#P1 14.74255 16.197579 12.238283
+#N 4
+#L MRTSlit UP second column 3rd_col
+-1.23 5.89 8
+8.478100E+01 5 1.56
+3.14 2.73 -3.14
+1.2 2.3 3.4
+
+#S 25 ascan c3th 1.33245 1.52245 40 0.15
+#D Sat 2015/03/14 03:53:50
+#P0 80.005 -1.66875 1.87125
+#P1 4.74255 6.197579 2.238283
+#N 5
+#L column0 column1 col2 col3
+0.0 0.1 0.2 0.3
+1.0 1.1 1.2 1.3
+2.0 2.1 2.2 2.3
+3.0 3.1 3.2 3.3
+
+#S 1 aaaaaa
+#D Thu Feb 11 10:00:32 2016
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+@A 10 9 8
+@A 1 1 1.1
+3 4
+@A 3.1 4 5
+@A 7 6 5
+@A 1 1 1
+5 6
+@A 6 7.7 8
+@A 4 3 2
+@A 1 1 1
+
+#S 1000 bbbbb
+#G1 3.25 3.25 5.207 90 90 120 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353
+#G3 0.0106337923671 0.027529133 1.206191273 -1.43467075 0.7633438883 0.02401568018 -1.709143587 -2.097621783 0.02456954971
+#L a b
+1 2
+
+#S 1001 ccccc
+#G1 0. 0. 0. 0 0 0 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353
+#G3 0. 0. 0. 0. 0.0 0. 0. 0. 0.
+#L a b
+1 2
+
+"""
+
+
+class TestSpecDate(unittest.TestCase):
+ """
+ Test of the spec_date_to_iso8601 function.
+ """
+ # TODO : time zone tests
+ # TODO : error cases
+
+ @classmethod
+ def setUpClass(cls):
+ import locale
+ # FYI : not threadsafe
+ cls.locale_saved = locale.setlocale(locale.LC_TIME)
+ locale.setlocale(locale.LC_TIME, 'C')
+
+ @classmethod
+ def tearDownClass(cls):
+ import locale
+ # FYI : not threadsafe
+ locale.setlocale(locale.LC_TIME, cls.locale_saved)
+
+ def setUp(self):
+ # covering all week days
+ self.n_days = range(1, 10)
+ # covering all months
+ self.n_months = range(1, 13)
+
+ self.n_years = [1999, 2016, 2020]
+ self.n_seconds = [0, 5, 26, 59]
+ self.n_minutes = [0, 9, 42, 59]
+ self.n_hours = [0, 2, 17, 23]
+
+ self.formats = ['%a %b %d %H:%M:%S %Y', '%a %Y/%m/%d %H:%M:%S']
+
+ self.check_date_formats = partial(self.__check_date_formats,
+ year=self.n_years[0],
+ month=self.n_months[0],
+ day=self.n_days[0],
+ hour=self.n_hours[0],
+ minute=self.n_minutes[0],
+ second=self.n_seconds[0],
+ msg=None)
+
+ def __check_date_formats(self,
+ year,
+ month,
+ day,
+ hour,
+ minute,
+ second,
+ msg=None):
+ dt = datetime.datetime(year, month, day, hour, minute, second)
+ expected_date = dt.isoformat()
+
+ for i_fmt, fmt in enumerate(self.formats):
+ spec_date = dt.strftime(fmt)
+ iso_date = spec_date_to_iso8601(spec_date)
+ self.assertEqual(iso_date,
+ expected_date,
+ msg='Testing {0}. format={1}. '
+ 'Expected "{2}", got "{3} ({4})" (dt={5}).'
+ ''.format(msg,
+ i_fmt,
+ expected_date,
+ iso_date,
+ spec_date,
+ dt))
+
+ def testYearsNominal(self):
+ for year in self.n_years:
+ self.check_date_formats(year=year, msg='year')
+
+ def testMonthsNominal(self):
+ for month in self.n_months:
+ self.check_date_formats(month=month, msg='month')
+
+ def testDaysNominal(self):
+ for day in self.n_days:
+ self.check_date_formats(day=day, msg='day')
+
+ def testHoursNominal(self):
+ for hour in self.n_hours:
+ self.check_date_formats(hour=hour, msg='hour')
+
+ def testMinutesNominal(self):
+ for minute in self.n_minutes:
+ self.check_date_formats(minute=minute, msg='minute')
+
+ def testSecondsNominal(self):
+ for second in self.n_seconds:
+ self.check_date_formats(second=second, msg='second')
+
+
+class TestSpecH5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp()
+ if sys.version_info < (3, ):
+ os.write(fd, sftext)
+ else:
+ os.write(fd, bytes(sftext, 'ascii'))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testContainsFile(self):
+ self.assertIn("/1.2/measurement", self.sfh5)
+ self.assertIn("/25.1", self.sfh5)
+ self.assertIn("25.1", self.sfh5)
+ self.assertNotIn("25.2", self.sfh5)
+ # measurement is a child of a scan, full path would be required to
+ # access from root level
+ self.assertNotIn("measurement", self.sfh5)
+ # Groups may or may not have a trailing /
+ self.assertIn("/1.2/measurement/mca_1/", self.sfh5)
+ self.assertIn("/1.2/measurement/mca_1", self.sfh5)
+ # Datasets can't have a trailing /
+ self.assertNotIn("/1.2/measurement/mca_0/info/calibration/ ", self.sfh5)
+ # No mca_8
+ self.assertNotIn("/1.2/measurement/mca_8/info/calibration", self.sfh5)
+ # Link
+ self.assertIn("/1.2/measurement/mca_0/info/calibration", self.sfh5)
+
+ def testContainsGroup(self):
+ self.assertIn("measurement", self.sfh5["/1.2/"])
+ self.assertIn("measurement", self.sfh5["/1.2"])
+ self.assertIn("25.1", self.sfh5["/"])
+ self.assertNotIn("25.2", self.sfh5["/"])
+ self.assertIn("instrument/positioners/Sslit1 HOff", self.sfh5["/1.1"])
+ # illegal trailing "/" after dataset name
+ self.assertNotIn("instrument/positioners/Sslit1 HOff/",
+ self.sfh5["/1.1"])
+ # full path to element in group (OK)
+ self.assertIn("/1.1/instrument/positioners/Sslit1 HOff",
+ self.sfh5["/1.1/instrument"])
+
+ def testDataColumn(self):
+ self.assertAlmostEqual(sum(self.sfh5["/1.2/measurement/duo"]),
+ 12.0)
+ self.assertAlmostEqual(
+ sum(self.sfh5["1.1"]["measurement"]["MRTSlit UP"]),
+ 87.891, places=4)
+
+ def testDate(self):
+ # start time is in Iso8601 format
+ self.assertEqual(self.sfh5["/1.1/start_time"],
+ u"2016-02-11T09:55:20")
+ self.assertEqual(self.sfh5["25.1/start_time"],
+ u"2015-03-14T03:53:50")
+
+ def assertRaisesRegex(self, *args, **kwargs):
+ # Python 2 compatibility
+ if sys.version_info.major >= 3:
+ return super(TestSpecH5, self).assertRaisesRegex(*args, **kwargs)
+ else:
+ return self.assertRaisesRegexp(*args, **kwargs)
+
+ def testDatasetInstanceAttr(self):
+ """The SpecH5Dataset objects must implement some dummy attributes
+ to improve compatibility with widgets dealing with h5py datasets."""
+ self.assertIsNone(self.sfh5["/1.1/start_time"].compression)
+ self.assertIsNone(self.sfh5["1.1"]["measurement"]["MRTSlit UP"].chunks)
+
+ # error message must be explicit
+ with self.assertRaisesRegex(
+ AttributeError,
+ "SpecH5Dataset has no attribute tOTo"):
+ dummy = self.sfh5["/1.1/start_time"].tOTo
+
+ def testGet(self):
+ """Test :meth:`SpecH5Group.get`"""
+ # default value of param *default* is None
+ self.assertIsNone(self.sfh5.get("toto"))
+ self.assertEqual(self.sfh5["25.1"].get("toto", default=-3),
+ -3)
+
+ self.assertEqual(self.sfh5.get("/1.1/start_time", default=-3),
+ u"2016-02-11T09:55:20")
+
+ def testGetClass(self):
+ """Test :meth:`SpecH5Group.get`"""
+ self.assertIs(self.sfh5["1.1"].get("start_time", getclass=True),
+ h5py.Dataset)
+ self.assertIs(self.sfh5["1.1"].get("instrument", getclass=True),
+ h5py.Group)
+
+ # spech5 does not define external link, so there is no way
+ # a group can *get* a SpecH5 class
+
+ def testGetApi(self):
+ result = self.sfh5.get("1.1", getclass=True, getlink=True)
+ self.assertIs(result, h5py.HardLink)
+ result = self.sfh5.get("1.1", getclass=False, getlink=True)
+ self.assertIsInstance(result, h5py.HardLink)
+ result = self.sfh5.get("1.1", getclass=True, getlink=False)
+ self.assertIs(result, h5py.Group)
+ result = self.sfh5.get("1.1", getclass=False, getlink=False)
+ self.assertIsInstance(result, spech5.SpecH5Group)
+
+ def testGetItemGroup(self):
+ group = self.sfh5["25.1"]["instrument"]
+ self.assertEqual(list(group["positioners"].keys()),
+ ["Pslit HGap", "MRTSlit UP", "MRTSlit DOWN",
+ "Sslit1 VOff", "Sslit1 HOff", "Sslit1 VGap"])
+ with self.assertRaises(KeyError):
+ group["Holy Grail"]
+
+ def testGetitemSpecH5(self):
+ self.assertEqual(self.sfh5["/1.2/instrument/positioners"],
+ self.sfh5["1.2"]["instrument"]["positioners"])
+
+ def testH5pyClass(self):
+ """Test :attr:`h5py_class` returns the corresponding h5py class
+ (h5py.File, h5py.Group, h5py.Dataset)"""
+ a_file = self.sfh5
+ self.assertIs(a_file.h5py_class,
+ h5py.File)
+
+ a_group = self.sfh5["/1.2/measurement"]
+ self.assertIs(a_group.h5py_class,
+ h5py.Group)
+
+ a_dataset = self.sfh5["/1.1/instrument/positioners/Sslit1 HOff"]
+ self.assertIs(a_dataset.h5py_class,
+ h5py.Dataset)
+
+ def testHeader(self):
+ file_header = self.sfh5["/1.2/instrument/specfile/file_header"]
+ scan_header = self.sfh5["/1.2/instrument/specfile/scan_header"]
+
+ # File header has 10 lines
+ self.assertEqual(len(file_header), 10)
+ # 1.2 has 9 scan & mca header lines
+ self.assertEqual(len(scan_header), 9)
+
+ # line 4 of file header
+ self.assertEqual(
+ file_header[3],
+ u"#C imaging User = opid17")
+ # line 4 of scan header
+ scan_header = self.sfh5["25.1/instrument/specfile/scan_header"]
+
+ self.assertEqual(
+ scan_header[3],
+ u"#P1 4.74255 6.197579 2.238283")
+
+ def testLinks(self):
+ self.assertTrue(numpy.array_equal(
+ self.sfh5["/1.2/measurement/mca_0/data"],
+ self.sfh5["/1.2/instrument/mca_0/data"])
+ )
+ self.assertTrue(numpy.array_equal(
+ self.sfh5["/1.2/measurement/mca_0/info/data"],
+ self.sfh5["/1.2/instrument/mca_0/data"])
+ )
+ self.assertTrue(numpy.array_equal(
+ self.sfh5["/1.2/measurement/mca_0/info/channels"],
+ self.sfh5["/1.2/instrument/mca_0/channels"])
+ )
+ self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/"].keys(),
+ self.sfh5["/1.2/instrument/mca_0/"].keys())
+
+ self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/preset_time"],
+ self.sfh5["/1.2/instrument/mca_0/preset_time"])
+ self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/live_time"],
+ self.sfh5["/1.2/instrument/mca_0/live_time"])
+ self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/elapsed_time"],
+ self.sfh5["/1.2/instrument/mca_0/elapsed_time"])
+
+ def testListScanIndices(self):
+ self.assertEqual(list(self.sfh5.keys()),
+ ["1.1", "25.1", "1.2", "1000.1", "1001.1"])
+ self.assertEqual(self.sfh5["1.2"].attrs,
+ {"NX_class": "NXentry", })
+
+ def testMcaAbsent(self):
+ def access_absent_mca():
+ """This must raise a KeyError, because scan 1.1 has no MCA"""
+ return self.sfh5["/1.1/measurement/mca_0/"]
+ self.assertRaises(KeyError, access_absent_mca)
+
+ def testMcaCalib(self):
+ mca0_calib = self.sfh5["/1.2/measurement/mca_0/info/calibration"]
+ mca1_calib = self.sfh5["/1.2/measurement/mca_1/info/calibration"]
+ self.assertEqual(mca0_calib.tolist(),
+ [1, 2, 3])
+ # calibration is unique in this scan and applies to all analysers
+ self.assertEqual(mca0_calib.tolist(),
+ mca1_calib.tolist())
+
+ def testMcaChannels(self):
+ mca0_chann = self.sfh5["/1.2/measurement/mca_0/info/channels"]
+ mca1_chann = self.sfh5["/1.2/measurement/mca_1/info/channels"]
+ self.assertEqual(mca0_chann.tolist(),
+ [0, 1, 2])
+ self.assertEqual(mca0_chann.tolist(),
+ mca1_chann.tolist())
+
+ def testMcaCtime(self):
+ """Tests for #@CTIME mca header"""
+ datasets = ["preset_time", "live_time", "elapsed_time"]
+ for ds in datasets:
+ self.assertNotIn("/1.1/instrument/mca_0/" + ds, self.sfh5)
+ self.assertIn("/1.2/instrument/mca_0/" + ds, self.sfh5)
+
+ mca0_preset_time = self.sfh5["/1.2/instrument/mca_0/preset_time"]
+ mca1_preset_time = self.sfh5["/1.2/instrument/mca_1/preset_time"]
+ self.assertLess(mca0_preset_time - 123.4,
+ 10**-5)
+ # ctime is unique in a this scan and applies to all analysers
+ self.assertEqual(mca0_preset_time,
+ mca1_preset_time)
+
+ mca0_live_time = self.sfh5["/1.2/instrument/mca_0/live_time"]
+ mca1_live_time = self.sfh5["/1.2/instrument/mca_1/live_time"]
+ self.assertLess(mca0_live_time - 234.5,
+ 10**-5)
+ self.assertEqual(mca0_live_time,
+ mca1_live_time)
+
+ mca0_elapsed_time = self.sfh5["/1.2/instrument/mca_0/elapsed_time"]
+ mca1_elapsed_time = self.sfh5["/1.2/instrument/mca_1/elapsed_time"]
+ self.assertLess(mca0_elapsed_time - 345.6,
+ 10**-5)
+ self.assertEqual(mca0_elapsed_time,
+ mca1_elapsed_time)
+
+ def testMcaData(self):
+ # sum 1st MCA in scan 1.2 over rows
+ mca_0_data = self.sfh5["/1.2/measurement/mca_0/data"]
+ for summed_row, expected in zip(mca_0_data.sum(axis=1).tolist(),
+ [3.0, 12.1, 21.7]):
+ self.assertAlmostEqual(summed_row, expected, places=4)
+
+ # sum 3rd MCA in scan 1.2 along both axis
+ mca_2_data = self.sfh5["1.2"]["measurement"]["mca_2"]["data"]
+ self.assertAlmostEqual(sum(sum(mca_2_data)), 9.1, places=5)
+ # attrs
+ self.assertEqual(mca_0_data.attrs, {"interpretation": "spectrum"})
+
+ def testMotorPosition(self):
+ positioners_group = self.sfh5["/1.1/instrument/positioners"]
+ # MRTSlit DOWN position is defined in #P0 san header line
+ self.assertAlmostEqual(float(positioners_group["MRTSlit DOWN"]),
+ 0.87125)
+ # MRTSlit UP position is defined in first data column
+ for a, b in zip(positioners_group["MRTSlit UP"].tolist(),
+ [-1.23, 8.478100E+01, 3.14, 1.2]):
+ self.assertAlmostEqual(float(a), b, places=4)
+
+ def testNumberMcaAnalysers(self):
+ """Scan 1.2 has 2 data columns + 3 mca spectra per data line."""
+ self.assertEqual(len(self.sfh5["1.2"]["measurement"]), 5)
+
+ def testTitle(self):
+ self.assertEqual(self.sfh5["/25.1/title"],
+ u"ascan c3th 1.33245 1.52245 40 0.15")
+
+ def testValues(self):
+ group = self.sfh5["/25.1"]
+ self.assertTrue(hasattr(group, "values"))
+ self.assertTrue(callable(group.values))
+ self.assertIn(self.sfh5["/25.1/title"],
+ self.sfh5["/25.1"].values())
+
+ # visit and visititems ignore links
+ def testVisit(self):
+ name_list = []
+ self.sfh5.visit(name_list.append)
+ self.assertIn('1.2/instrument/positioners/Pslit HGap', name_list)
+ self.assertIn("1.2/instrument/specfile/scan_header", name_list)
+ self.assertEqual(len(name_list), 117)
+
+ # test also visit of a subgroup, with various group name formats
+ name_list_leading_and_trailing_slash = []
+ self.sfh5['/1.2/instrument/'].visit(name_list_leading_and_trailing_slash.append)
+ name_list_leading_slash = []
+ self.sfh5['/1.2/instrument'].visit(name_list_leading_slash.append)
+ name_list_trailing_slash = []
+ self.sfh5['1.2/instrument/'].visit(name_list_trailing_slash.append)
+ name_list_no_slash = []
+ self.sfh5['1.2/instrument'].visit(name_list_no_slash.append)
+
+ # no differences expected in the output names
+ self.assertEqual(name_list_leading_and_trailing_slash,
+ name_list_leading_slash)
+ self.assertEqual(name_list_leading_slash,
+ name_list_trailing_slash)
+ self.assertEqual(name_list_leading_slash,
+ name_list_no_slash)
+ self.assertIn("positioners/Pslit HGap", name_list_no_slash)
+ self.assertIn("positioners", name_list_no_slash)
+
+ def testVisitItems(self):
+ dataset_name_list = []
+
+ def func_generator(l):
+ """return a function appending names to list l"""
+ def func(name, obj):
+ if isinstance(obj, SpecH5Dataset):
+ l.append(name)
+ return func
+
+ self.sfh5.visititems(func_generator(dataset_name_list))
+ self.assertIn('1.2/instrument/positioners/Pslit HGap', dataset_name_list)
+ self.assertEqual(len(dataset_name_list), 85)
+
+ # test also visit of a subgroup, with various group name formats
+ name_list_leading_and_trailing_slash = []
+ self.sfh5['/1.2/instrument/'].visititems(func_generator(name_list_leading_and_trailing_slash))
+ name_list_leading_slash = []
+ self.sfh5['/1.2/instrument'].visititems(func_generator(name_list_leading_slash))
+ name_list_trailing_slash = []
+ self.sfh5['1.2/instrument/'].visititems(func_generator(name_list_trailing_slash))
+ name_list_no_slash = []
+ self.sfh5['1.2/instrument'].visititems(func_generator(name_list_no_slash))
+
+ # no differences expected in the output names
+ self.assertEqual(name_list_leading_and_trailing_slash,
+ name_list_leading_slash)
+ self.assertEqual(name_list_leading_slash,
+ name_list_trailing_slash)
+ self.assertEqual(name_list_leading_slash,
+ name_list_no_slash)
+ self.assertIn("positioners/Pslit HGap", name_list_no_slash)
+
+ def testNotSpecH5(self):
+ fd, fname = tempfile.mkstemp()
+ os.write(fd, b"Not a spec file!")
+ os.close(fd)
+ self.assertRaises(specfile.SfErrFileOpen, SpecH5, fname)
+ self.assertRaises(IOError, SpecH5, fname)
+ os.unlink(fname)
+
+ def testSample(self):
+ self.assertNotIn("sample", self.sfh5["/1.1"])
+ self.assertIn("sample", self.sfh5["/1000.1"])
+ self.assertIn("ub_matrix", self.sfh5["/1000.1/sample"])
+ self.assertIn("unit_cell", self.sfh5["/1000.1/sample"])
+ self.assertIn("unit_cell_abc", self.sfh5["/1000.1/sample"])
+ self.assertIn("unit_cell_alphabetagamma", self.sfh5["/1000.1/sample"])
+
+ # All 0 values
+ self.assertNotIn("sample", self.sfh5["/1001.1"])
+ with self.assertRaises(KeyError):
+ self.sfh5["/1001.1/sample/unit_cell"]
+
+ @testutils.validate_logging(spech5.logger1.name, warning=2)
+ def testOpenFileDescriptor(self):
+ """Open a SpecH5 file from a file descriptor"""
+ with io.open(self.sfh5.filename) as f:
+ sfh5 = SpecH5(f)
+ self.assertIsNotNone(sfh5)
+ name_list = []
+ # check if the object is working
+ self.sfh5.visit(name_list.append)
+ sfh5.close()
+
+
+sftext_multi_mca_headers = """
+#S 1 aaaaaa
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#@MCA %16C
+#@CHANN 3 1 3 1
+#@CALIB 5.5 6.6 7.7
+#@CTIME 10 11 12
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+@A 10 9 8
+3 4
+@A 3.1 4 5
+@A 7 6 5
+5 6
+@A 6 7.7 8
+@A 4 3 2
+
+"""
+
+
+class TestSpecH5MultiMca(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp(text=False)
+ if sys.version_info < (3, ):
+ os.write(fd, sftext_multi_mca_headers)
+ else:
+ os.write(fd, bytes(sftext_multi_mca_headers, 'ascii'))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testMcaCalib(self):
+ mca0_calib = self.sfh5["/1.1/measurement/mca_0/info/calibration"]
+ mca1_calib = self.sfh5["/1.1/measurement/mca_1/info/calibration"]
+ self.assertEqual(mca0_calib.tolist(),
+ [1, 2, 3])
+ self.assertAlmostEqual(sum(mca1_calib.tolist()),
+ sum([5.5, 6.6, 7.7]),
+ places=5)
+
+ def testMcaChannels(self):
+ mca0_chann = self.sfh5["/1.1/measurement/mca_0/info/channels"]
+ mca1_chann = self.sfh5["/1.1/measurement/mca_1/info/channels"]
+ self.assertEqual(mca0_chann.tolist(),
+ [0., 1., 2.])
+ # @CHANN is unique in this scan and applies to all analysers
+ self.assertEqual(mca1_chann.tolist(),
+ [1., 2., 3.])
+
+ def testMcaCtime(self):
+ """Tests for #@CTIME mca header"""
+ mca0_preset_time = self.sfh5["/1.1/instrument/mca_0/preset_time"]
+ mca1_preset_time = self.sfh5["/1.1/instrument/mca_1/preset_time"]
+ self.assertLess(mca0_preset_time - 123.4,
+ 10**-5)
+ self.assertLess(mca1_preset_time - 10,
+ 10**-5)
+
+ mca0_live_time = self.sfh5["/1.1/instrument/mca_0/live_time"]
+ mca1_live_time = self.sfh5["/1.1/instrument/mca_1/live_time"]
+ self.assertLess(mca0_live_time - 234.5,
+ 10**-5)
+ self.assertLess(mca1_live_time - 11,
+ 10**-5)
+
+ mca0_elapsed_time = self.sfh5["/1.1/instrument/mca_0/elapsed_time"]
+ mca1_elapsed_time = self.sfh5["/1.1/instrument/mca_1/elapsed_time"]
+ self.assertLess(mca0_elapsed_time - 345.6,
+ 10**-5)
+ self.assertLess(mca1_elapsed_time - 12,
+ 10**-5)
+
+
+sftext_no_cols = r"""#F C:/DATA\test.mca
+#D Thu Jul 7 08:40:19 2016
+
+#S 1 31oct98.dat 22.1 If4
+#D Thu Jul 7 08:40:19 2016
+#C no data cols, one mca analyser, single spectrum
+#@MCA %16C
+#@CHANN 151 0 150 1
+#@CALIB 0 2 0
+@A 789 784 788 814 847 862 880 904 925 955 987 1015 1031 1070 1111 1139 \
+1203 1236 1290 1392 1492 1558 1688 1813 1977 2119 2346 2699 3121 3542 4102 4970 \
+6071 7611 10426 16188 28266 40348 50539 55555 56162 54162 47102 35718 24588 17034 12994 11444 \
+11808 13461 15687 18885 23827 31578 41999 49556 58084 59415 59456 55698 44525 28219 17680 12881 \
+9518 7415 6155 5246 4646 3978 3612 3299 3020 2761 2670 2472 2500 2310 2286 2106 \
+1989 1890 1782 1655 1421 1293 1135 990 879 757 672 618 532 488 445 424 \
+414 373 351 325 307 284 270 247 228 213 199 187 183 176 164 156 \
+153 140 142 130 118 118 103 101 97 86 90 86 87 81 75 82 \
+80 76 77 75 76 77 62 69 74 60 65 68 65 58 63 64 \
+63 59 60 56 57 60 55
+
+#S 2 31oct98.dat 22.1 If4
+#D Thu Jul 7 08:40:19 2016
+#C no data cols, one mca analyser, multiple spectra
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+@A 0 1 2
+@A 10 9 8
+@A 1 1 1.1
+@A 3.1 4 5
+@A 7 6 5
+@A 1 1 1
+@A 6 7.7 8
+@A 4 3 2
+@A 1 1 1
+
+#S 3 31oct98.dat 22.1 If4
+#D Thu Jul 7 08:40:19 2016
+#C no data cols, 3 mca analysers, multiple spectra
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#@MCADEV 2
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#@MCADEV 3
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+@A 0 1 2
+@A 10 9 8
+@A 1 1 1.1
+@A 3.1 4 5
+@A 7 6 5
+@A 1 1 1
+@A 6 7.7 8
+@A 4 3 2
+@A 1 1 1
+"""
+
+
+class TestSpecH5NoDataCols(unittest.TestCase):
+ """Test reading SPEC files with only MCA data"""
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp()
+ if sys.version_info < (3, ):
+ os.write(fd, sftext_no_cols)
+ else:
+ os.write(fd, bytes(sftext_no_cols, 'ascii'))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testScan1(self):
+ # 1.1: single analyser, single spectrum, 151 channels
+ self.assertIn("mca_0",
+ self.sfh5["1.1/instrument/"])
+ self.assertEqual(self.sfh5["1.1/instrument/mca_0/data"].shape,
+ (1, 151))
+ self.assertNotIn("mca_1",
+ self.sfh5["1.1/instrument/"])
+
+ def testScan2(self):
+ # 2.1: single analyser, 9 spectra, 3 channels
+ self.assertIn("mca_0",
+ self.sfh5["2.1/instrument/"])
+ self.assertEqual(self.sfh5["2.1/instrument/mca_0/data"].shape,
+ (9, 3))
+ self.assertNotIn("mca_1",
+ self.sfh5["2.1/instrument/"])
+
+ def testScan3(self):
+ # 3.1: 3 analysers, 3 spectra/analyser, 3 channels
+ for i in range(3):
+ self.assertIn("mca_%d" % i,
+ self.sfh5["3.1/instrument/"])
+ self.assertEqual(
+ self.sfh5["3.1/instrument/mca_%d/data" % i].shape,
+ (3, 3))
+
+ self.assertNotIn("mca_3",
+ self.sfh5["3.1/instrument/"])
+
+
+sf_text_slash = r"""#F /data/id09/archive/logspecfiles/laue/2016/scan_231_laue_16-11-29.dat
+#D Sat Dec 10 22:20:59 2016
+#O0 Pslit/HGap MRTSlit%UP
+
+#S 1 laue_16-11-29.log 231.1 PD3/A
+#D Sat Dec 10 22:20:59 2016
+#P0 180.005 -0.66875
+#N 2
+#L GONY/mm PD3%A
+-2.015 5.250424e-05
+-2.01 5.30798e-05
+-2.005 5.281903e-05
+-2 5.220436e-05
+"""
+
+
+class TestSpecH5SlashInLabels(unittest.TestCase):
+ """Test reading SPEC files with labels containing a / character
+
+ The / character must be substituted with a %
+ """
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp()
+ if sys.version_info < (3, ):
+ os.write(fd, sf_text_slash)
+ else:
+ os.write(fd, bytes(sf_text_slash, 'ascii'))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testLabels(self):
+ """Ensure `/` is substituted with `%` and
+ ensure legitimate `%` in names are still working"""
+ self.assertEqual(list(self.sfh5["1.1/measurement/"].keys()),
+ ["GONY%mm", "PD3%A"])
+
+ # substituted "%"
+ self.assertIn("GONY%mm",
+ self.sfh5["1.1/measurement/"])
+ self.assertNotIn("GONY/mm",
+ self.sfh5["1.1/measurement/"])
+ self.assertAlmostEqual(self.sfh5["1.1/measurement/GONY%mm"][0],
+ -2.015, places=4)
+ # legitimate "%"
+ self.assertIn("PD3%A",
+ self.sfh5["1.1/measurement/"])
+
+ def testMotors(self):
+ """Ensure `/` is substituted with `%` and
+ ensure legitimate `%` in names are still working"""
+ self.assertEqual(list(self.sfh5["1.1/instrument/positioners"].keys()),
+ ["Pslit%HGap", "MRTSlit%UP"])
+ # substituted "%"
+ self.assertIn("Pslit%HGap",
+ self.sfh5["1.1/instrument/positioners"])
+ self.assertNotIn("Pslit/HGap",
+ self.sfh5["1.1/instrument/positioners"])
+ self.assertAlmostEqual(
+ self.sfh5["1.1/instrument/positioners/Pslit%HGap"],
+ 180.005, places=4)
+ # legitimate "%"
+ self.assertIn("MRTSlit%UP",
+ self.sfh5["1.1/instrument/positioners"])
+
+
+def testUnitCellUBMatrix(tmp_path):
+ """Test unit cell (#G1) and UB matrix (#G3)"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(bytes("""
+#S 1 OK
+#G1 0 1 2 3 4 5
+#G3 0 1 2 3 4 5 6 7 8
+""", encoding="ascii"))
+ with SpecH5(str(file_path)) as spech5:
+ assert numpy.array_equal(
+ spech5["/1.1/sample/ub_matrix"],
+ numpy.arange(9).reshape(1, 3, 3))
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell"], [[0, 1, 2, 3, 4, 5]])
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell_abc"], [0, 1, 2])
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell_alphabetagamma"], [3, 4, 5])
+
+
+def testMalformedUnitCellUBMatrix(tmp_path):
+ """Test malformed unit cell (#G1) and UB matrix (#G3): 1 value"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(bytes("""
+#S 1 all malformed=0
+#G1 0
+#G3 0
+""", encoding="ascii"))
+ with SpecH5(str(file_path)) as spech5:
+ assert "sample" not in spech5["1.1"]
+
+
+def testMalformedUBMatrix(tmp_path):
+ """Test malformed UB matrix (#G3): all zeros"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(bytes("""
+#S 1 G3 all 0
+#G1 0 1 2 3 4 5
+#G3 0 0 0 0 0 0 0 0 0
+""", encoding="ascii"))
+ with SpecH5(str(file_path)) as spech5:
+ assert "ub_matrix" not in spech5["/1.1/sample"]
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell"], [[0, 1, 2, 3, 4, 5]])
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell_abc"], [0, 1, 2])
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell_alphabetagamma"], [3, 4, 5])
+
+
+def testMalformedUnitCell(tmp_path):
+ """Test malformed unit cell (#G1): missing values"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(bytes("""
+#S 1 G1 malformed missing values
+#G1 0 1 2
+#G3 0 1 2 3 4 5 6 7 8
+""", encoding="ascii"))
+ with SpecH5(str(file_path)) as spech5:
+ assert "unit_cell" not in spech5["/1.1/sample"]
+ assert "unit_cell_abc" not in spech5["/1.1/sample"]
+ assert "unit_cell_alphabetagamma" not in spech5["/1.1/sample"]
+ assert numpy.array_equal(
+ spech5["/1.1/sample/ub_matrix"],
+ numpy.arange(9).reshape(1, 3, 3))
diff --git a/src/silx/io/test/test_spectoh5.py b/src/silx/io/test/test_spectoh5.py
new file mode 100644
index 0000000..66bf8d6
--- /dev/null
+++ b/src/silx/io/test/test_spectoh5.py
@@ -0,0 +1,183 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2019 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for SpecFile to HDF5 converter"""
+
+from numpy import array_equal
+import os
+import sys
+import tempfile
+import unittest
+
+import h5py
+
+from ..spech5 import SpecH5, SpecH5Group
+from ..convert import convert, write_to_h5
+from ..utils import h5py_read_dataset
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "12/02/2018"
+
+
+sfdata = b"""#F /tmp/sf.dat
+#E 1455180875
+#D Thu Feb 11 09:54:35 2016
+#C imaging User = opid17
+#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
+#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
+#o0 pshg mrtu mrtd
+#o2 ss1vo ss1ho ss1vg
+
+#J0 Seconds IA ion.mono Current
+#J1 xbpmc2 idgap1 Inorm
+
+#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
+#D Thu Feb 11 09:55:20 2016
+#T 0.2 (Seconds)
+#P0 180.005 -0.66875 0.87125
+#P1 14.74255 16.197579 12.238283
+#N 4
+#L MRTSlit UP second column 3rd_col
+-1.23 5.89 8
+8.478100E+01 5 1.56
+3.14 2.73 -3.14
+1.2 2.3 3.4
+
+#S 1 aaaaaa
+#D Thu Feb 11 10:00:32 2016
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+@A 10 9 8
+3 4
+@A 3.1 4 5
+@A 7 6 5
+5 6
+@A 6 7.7 8
+@A 4 3 2
+"""
+
+
+class TestConvertSpecHDF5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.spec_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5")
+ os.write(fd, sfdata)
+ os.close(fd)
+
+ fd, cls.h5_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5")
+ # Close and delete (we just need the name)
+ os.close(fd)
+ os.unlink(cls.h5_fname)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.spec_fname)
+
+ def setUp(self):
+ convert(self.spec_fname, self.h5_fname)
+
+ self.sfh5 = SpecH5(self.spec_fname)
+ self.h5f = h5py.File(self.h5_fname, "a")
+
+ def tearDown(self):
+ self.h5f.close()
+ self.sfh5.close()
+ os.unlink(self.h5_fname)
+
+ def testAppendToHDF5(self):
+ write_to_h5(self.sfh5, self.h5f, h5path="/foo/bar/spam")
+ self.assertTrue(
+ array_equal(self.h5f["/1.2/measurement/mca_1/data"],
+ self.h5f["/foo/bar/spam/1.2/measurement/mca_1/data"])
+ )
+
+ def testWriteSpecH5Group(self):
+ """Test passing a SpecH5Group as parameter, instead of a Spec filename
+ or a SpecH5."""
+ g = self.sfh5["1.1/instrument"]
+ self.assertIsInstance(g, SpecH5Group) # let's be paranoid
+ write_to_h5(g, self.h5f, h5path="my instruments")
+
+ self.assertAlmostEqual(self.h5f["my instruments/positioners/Sslit1 HOff"][tuple()],
+ 16.197579, places=4)
+
+ def testTitle(self):
+ """Test the value of a dataset"""
+ title12 = h5py_read_dataset(self.h5f["/1.2/title"])
+ self.assertEqual(title12,
+ u"aaaaaa")
+
+ def testAttrs(self):
+ # Test root group (file) attributes
+ self.assertEqual(self.h5f.attrs["NX_class"],
+ u"NXroot")
+ # Test dataset attributes
+ ds = self.h5f["/1.2/instrument/mca_1/data"]
+ self.assertTrue("interpretation" in ds.attrs)
+ self.assertEqual(list(ds.attrs.values()),
+ [u"spectrum"])
+ # Test group attributes
+ grp = self.h5f["1.1"]
+ self.assertEqual(grp.attrs["NX_class"],
+ u"NXentry")
+ self.assertEqual(len(list(grp.attrs.keys())),
+ 1)
+
+ def testHdf5HasSameMembers(self):
+ spec_member_list = []
+
+ def append_spec_members(name):
+ spec_member_list.append(name)
+ self.sfh5.visit(append_spec_members)
+
+ hdf5_member_list = []
+
+ def append_hdf5_members(name):
+ hdf5_member_list.append(name)
+ self.h5f.visit(append_hdf5_members)
+
+ # 1. For some reason, h5py visit method doesn't include the leading
+ # "/" character when it passes the member name to the function,
+ # even though an explicit the .name attribute of a member will
+ # have a leading "/"
+ spec_member_list = [m.lstrip("/") for m in spec_member_list]
+
+ self.assertEqual(set(hdf5_member_list),
+ set(spec_member_list))
+
+ def testLinks(self):
+ self.assertTrue(
+ array_equal(self.sfh5["/1.2/measurement/mca_0/data"],
+ self.h5f["/1.2/measurement/mca_0/data"])
+ )
+ self.assertTrue(
+ array_equal(self.h5f["/1.2/instrument/mca_1/channels"],
+ self.h5f["/1.2/measurement/mca_1/info/channels"])
+ )
diff --git a/src/silx/io/test/test_url.py b/src/silx/io/test/test_url.py
new file mode 100644
index 0000000..7346391
--- /dev/null
+++ b/src/silx/io/test/test_url.py
@@ -0,0 +1,217 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for url module"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "29/01/2018"
+
+
+import unittest
+from ..url import DataUrl
+
+
+class TestDataUrl(unittest.TestCase):
+
+ def assertUrl(self, url, expected):
+ self.assertEqual(url.is_valid(), expected[0])
+ self.assertEqual(url.is_absolute(), expected[1])
+ self.assertEqual(url.scheme(), expected[2])
+ self.assertEqual(url.file_path(), expected[3])
+ self.assertEqual(url.data_path(), expected[4])
+ self.assertEqual(url.data_slice(), expected[5])
+
+ def test_fabio_absolute(self):
+ url = DataUrl("fabio:///data/image.edf?slice=2")
+ expected = [True, True, "fabio", "/data/image.edf", None, (2, )]
+ self.assertUrl(url, expected)
+
+ def test_fabio_absolute_windows(self):
+ url = DataUrl("fabio:///C:/data/image.edf?slice=2")
+ expected = [True, True, "fabio", "C:/data/image.edf", None, (2, )]
+ self.assertUrl(url, expected)
+
+ def test_silx_absolute(self):
+ url = DataUrl("silx:///data/image.h5?path=/data/dataset&slice=1,5")
+ expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)]
+ self.assertUrl(url, expected)
+
+ def test_commandline_shell_separator(self):
+ url = DataUrl("silx:///data/image.h5::path=/data/dataset&slice=1,5")
+ expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)]
+ self.assertUrl(url, expected)
+
+ def test_silx_absolute2(self):
+ url = DataUrl("silx:///data/image.edf?/scan_0/detector/data")
+ expected = [True, True, "silx", "/data/image.edf", "/scan_0/detector/data", None]
+ self.assertUrl(url, expected)
+
+ def test_silx_absolute_windows(self):
+ url = DataUrl("silx:///C:/data/image.h5?/scan_0/detector/data")
+ expected = [True, True, "silx", "C:/data/image.h5", "/scan_0/detector/data", None]
+ self.assertUrl(url, expected)
+
+ def test_silx_relative(self):
+ url = DataUrl("silx:./image.h5")
+ expected = [True, False, "silx", "./image.h5", None, None]
+ self.assertUrl(url, expected)
+
+ def test_fabio_relative(self):
+ url = DataUrl("fabio:./image.edf")
+ expected = [True, False, "fabio", "./image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_silx_relative2(self):
+ url = DataUrl("silx:image.h5")
+ expected = [True, False, "silx", "image.h5", None, None]
+ self.assertUrl(url, expected)
+
+ def test_fabio_relative2(self):
+ url = DataUrl("fabio:image.edf")
+ expected = [True, False, "fabio", "image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_relative(self):
+ url = DataUrl("image.edf")
+ expected = [True, False, None, "image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_relative2(self):
+ url = DataUrl("./foo/bar/image.edf")
+ expected = [True, False, None, "./foo/bar/image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_relative3(self):
+ url = DataUrl("foo/bar/image.edf")
+ expected = [True, False, None, "foo/bar/image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_absolute(self):
+ url = DataUrl("/data/image.edf")
+ expected = [True, True, None, "/data/image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_absolute_windows(self):
+ url = DataUrl("C:/data/image.edf")
+ expected = [True, True, None, "C:/data/image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_absolute_with_path(self):
+ url = DataUrl("/foo/foobar.h5?/foo/bar")
+ expected = [True, True, None, "/foo/foobar.h5", "/foo/bar", None]
+ self.assertUrl(url, expected)
+
+ def test_windows_file_data_slice(self):
+ url = DataUrl("C:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [True, True, None, "C:/foo/foobar.h5", "/foo/bar", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_scheme_file_data_slice(self):
+ url = DataUrl("silx:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [True, True, "silx", "/foo/foobar.h5", "/foo/bar", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_scheme_windows_file_data_slice(self):
+ url = DataUrl("silx:C:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [True, True, "silx", "C:/foo/foobar.h5", "/foo/bar", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_empty(self):
+ url = DataUrl("")
+ expected = [False, False, None, "", None, None]
+ self.assertUrl(url, expected)
+
+ def test_unknown_scheme(self):
+ url = DataUrl("foo:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [False, True, "foo", "/foo/foobar.h5", "/foo/bar", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_slice(self):
+ url = DataUrl("/a.h5?path=/b&slice=5,1")
+ expected = [True, True, None, "/a.h5", "/b", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_slice2(self):
+ url = DataUrl("/a.h5?path=/b&slice=2:5")
+ expected = [True, True, None, "/a.h5", "/b", (slice(2, 5),)]
+ self.assertUrl(url, expected)
+
+ def test_slice3(self):
+ url = DataUrl("/a.h5?path=/b&slice=::2")
+ expected = [True, True, None, "/a.h5", "/b", (slice(None, None, 2),)]
+ self.assertUrl(url, expected)
+
+ def test_slice_ellipsis(self):
+ url = DataUrl("/a.h5?path=/b&slice=...")
+ expected = [True, True, None, "/a.h5", "/b", (Ellipsis, )]
+ self.assertUrl(url, expected)
+
+ def test_slice_slicing(self):
+ url = DataUrl("/a.h5?path=/b&slice=:")
+ expected = [True, True, None, "/a.h5", "/b", (slice(None), )]
+ self.assertUrl(url, expected)
+
+ def test_slice_missing_element(self):
+ url = DataUrl("/a.h5?path=/b&slice=5,,1")
+ expected = [False, True, None, "/a.h5", "/b", None]
+ self.assertUrl(url, expected)
+
+ def test_slice_no_elements(self):
+ url = DataUrl("/a.h5?path=/b&slice=")
+ expected = [False, True, None, "/a.h5", "/b", None]
+ self.assertUrl(url, expected)
+
+ def test_create_relative_url(self):
+ url = DataUrl(scheme="silx", file_path="./foo.h5", data_path="/", data_slice=(5, 1))
+ self.assertFalse(url.is_absolute())
+ url2 = DataUrl(url.path())
+ self.assertEqual(url, url2)
+
+ def test_create_absolute_url(self):
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1))
+ url2 = DataUrl(url.path())
+ self.assertEqual(url, url2)
+
+ def test_create_absolute_windows_url(self):
+ url = DataUrl(scheme="silx", file_path="C:/foo.h5", data_path="/", data_slice=(5, 1))
+ url2 = DataUrl(url.path())
+ self.assertEqual(url, url2)
+
+ def test_create_slice_url(self):
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1, Ellipsis, slice(None)))
+ url2 = DataUrl(url.path())
+ self.assertEqual(url, url2)
+
+ def test_wrong_url(self):
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=(5, 1))
+ self.assertFalse(url.is_valid())
+
+ def test_path_creation(self):
+ """make sure the construction of path succeed and that we can
+ recreate a DataUrl from a path"""
+ for data_slice in (1, (1,)):
+ with self.subTest(data_slice=data_slice):
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=data_slice)
+ path = url.path()
+ DataUrl(path=path)
diff --git a/src/silx/io/test/test_utils.py b/src/silx/io/test/test_utils.py
new file mode 100644
index 0000000..cc34100
--- /dev/null
+++ b/src/silx/io/test/test_utils.py
@@ -0,0 +1,923 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2019 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for utils module"""
+
+import io
+import numpy
+import os
+import re
+import shutil
+import tempfile
+import unittest
+import sys
+
+from .. import utils
+from ..._version import calc_hexversion
+import silx.io.url
+
+import h5py
+from ..utils import h5ls
+from silx.io import commonh5
+
+import fabio
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "03/12/2020"
+
+expected_spec1 = r"""#F .*
+#D .*
+
+#S 1 Ordinate1
+#D .*
+#N 2
+#L Abscissa Ordinate1
+1 4\.00
+2 5\.00
+3 6\.00
+"""
+
+expected_spec2 = expected_spec1 + r"""
+#S 2 Ordinate2
+#D .*
+#N 2
+#L Abscissa Ordinate2
+1 7\.00
+2 8\.00
+3 9\.00
+"""
+
+expected_spec2reg = r"""#F .*
+#D .*
+
+#S 1 Ordinate1
+#D .*
+#N 3
+#L Abscissa Ordinate1 Ordinate2
+1 4\.00 7\.00
+2 5\.00 8\.00
+3 6\.00 9\.00
+"""
+
+expected_spec2irr = expected_spec1 + r"""
+#S 2 Ordinate2
+#D .*
+#N 2
+#L Abscissa Ordinate2
+1 7\.00
+2 8\.00
+"""
+
+expected_csv = r"""Abscissa;Ordinate1;Ordinate2
+1;4\.00;7\.00e\+00
+2;5\.00;8\.00e\+00
+3;6\.00;9\.00e\+00
+"""
+
+expected_csv2 = r"""x;y0;y1
+1;4\.00;7\.00e\+00
+2;5\.00;8\.00e\+00
+3;6\.00;9\.00e\+00
+"""
+
+
+class TestSave(unittest.TestCase):
+ """Test saving curves as SpecFile:
+ """
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
+ self.csv_fname = os.path.join(self.tempdir, "savecsv.csv")
+ self.npy_fname = os.path.join(self.tempdir, "savenpy.npy")
+
+ self.x = [1, 2, 3]
+ self.xlab = "Abscissa"
+ self.y = [[4, 5, 6], [7, 8, 9]]
+ self.y_irr = [[4, 5, 6], [7, 8]]
+ self.ylabs = ["Ordinate1", "Ordinate2"]
+
+ def tearDown(self):
+ if os.path.isfile(self.spec_fname):
+ os.unlink(self.spec_fname)
+ if os.path.isfile(self.csv_fname):
+ os.unlink(self.csv_fname)
+ if os.path.isfile(self.npy_fname):
+ os.unlink(self.npy_fname)
+ shutil.rmtree(self.tempdir)
+
+ def test_save_csv(self):
+ utils.save1D(self.csv_fname, self.x, self.y,
+ xlabel=self.xlab, ylabels=self.ylabs,
+ filetype="csv", fmt=["%d", "%.2f", "%.2e"],
+ csvdelim=";", autoheader=True)
+
+ csvf = open(self.csv_fname)
+ actual_csv = csvf.read()
+ csvf.close()
+
+ self.assertRegex(actual_csv, expected_csv)
+
+ def test_save_npy(self):
+ """npy file is saved with numpy.save after building a numpy array
+ and converting it to a named record array"""
+ npyf = open(self.npy_fname, "wb")
+ utils.save1D(npyf, self.x, self.y,
+ xlabel=self.xlab, ylabels=self.ylabs)
+ npyf.close()
+
+ npy_recarray = numpy.load(self.npy_fname)
+
+ self.assertEqual(npy_recarray.shape, (3,))
+ self.assertTrue(numpy.array_equal(npy_recarray['Ordinate1'],
+ numpy.array((4, 5, 6))))
+
+ def test_savespec_filename(self):
+ """Save SpecFile using savespec()"""
+ utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab,
+ ylabel=self.ylabs[0], fmt=["%d", "%.2f"],
+ close_file=True, scan_number=1)
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec1)
+
+ def test_savespec_file_handle(self):
+ """Save SpecFile using savespec(), passing a file handle"""
+ # first savespec: open, write file header, save y[0] as scan 1,
+ # return file handle
+ specf = utils.savespec(self.spec_fname, self.x, self.y[0],
+ xlabel=self.xlab, ylabel=self.ylabs[0],
+ fmt=["%d", "%.2f"], close_file=False)
+
+ # second savespec: save y[1] as scan 2, close file
+ utils.savespec(specf, self.x, self.y[1], xlabel=self.xlab,
+ ylabel=self.ylabs[1], fmt=["%d", "%.2f"],
+ write_file_header=False, close_file=True,
+ scan_number=2)
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec2)
+
+ def test_save_spec_reg(self):
+ """Save SpecFile using save() on a regular pattern"""
+ utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab,
+ ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+
+ self.assertRegex(actual_spec, expected_spec2reg)
+
+ def test_save_spec_irr(self):
+ """Save SpecFile using save() on an irregular pattern"""
+ # invalid test case ?!
+ return
+ utils.save1D(self.spec_fname, self.x, self.y_irr, xlabel=self.xlab,
+ ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec2irr)
+
+ def test_save_csv_no_labels(self):
+ """Save csv using save(), with autoheader=True but
+ xlabel=None and ylabels=None
+ This is a non-regression test for bug #223"""
+ self.tempdir = tempfile.mkdtemp()
+ self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
+ self.csv_fname = os.path.join(self.tempdir, "savecsv.csv")
+ self.npy_fname = os.path.join(self.tempdir, "savenpy.npy")
+
+ self.x = [1, 2, 3]
+ self.xlab = "Abscissa"
+ self.y = [[4, 5, 6], [7, 8, 9]]
+ self.ylabs = ["Ordinate1", "Ordinate2"]
+ utils.save1D(self.csv_fname, self.x, self.y,
+ autoheader=True, fmt=["%d", "%.2f", "%.2e"])
+
+ csvf = open(self.csv_fname)
+ actual_csv = csvf.read()
+ csvf.close()
+ self.assertRegex(actual_csv, expected_csv2)
+
+
+def assert_match_any_string_in_list(test, pattern, list_of_strings):
+ for string_ in list_of_strings:
+ if re.match(pattern, string_):
+ return True
+ return False
+
+
+class TestH5Ls(unittest.TestCase):
+ """Test displaying the following HDF5 file structure:
+
+ +foo
+ +bar
+ <HDF5 dataset "spam": shape (2, 2), type "<i8">
+ <HDF5 dataset "tmp": shape (3,), type "<i8">
+ <HDF5 dataset "data": shape (1,), type "<f8">
+
+ """
+
+ def assertMatchAnyStringInList(self, pattern, list_of_strings):
+ for string_ in list_of_strings:
+ if re.match(pattern, string_):
+ return None
+ raise AssertionError("regex pattern %s does not match any" % pattern +
+ " string in list " + str(list_of_strings))
+
+ def testHdf5(self):
+ fd, self.h5_fname = tempfile.mkstemp(text=False)
+ # Close and delete (we just want the name)
+ os.close(fd)
+ os.unlink(self.h5_fname)
+ self.h5f = h5py.File(self.h5_fname, "w")
+ self.h5f["/foo/bar/tmp"] = [1, 2, 3]
+ self.h5f["/foo/bar/spam"] = [[1, 2], [3, 4]]
+ self.h5f["/foo/data"] = [3.14]
+ self.h5f.close()
+
+ rep = h5ls(self.h5_fname)
+ lines = rep.split("\n")
+
+ self.assertIn("+foo", lines)
+ self.assertIn("\t+bar", lines)
+
+ match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+
+ os.unlink(self.h5_fname)
+
+ # Following test case disabled d/t errors on AppVeyor:
+ # os.unlink(spec_fname)
+ # PermissionError: [WinError 32] The process cannot access the file because
+ # it is being used by another process: 'C:\\...\\savespec.dat'
+
+ # def testSpec(self):
+ # tempdir = tempfile.mkdtemp()
+ # spec_fname = os.path.join(tempdir, "savespec.dat")
+ #
+ # x = [1, 2, 3]
+ # xlab = "Abscissa"
+ # y = [[4, 5, 6], [7, 8, 9]]
+ # ylabs = ["Ordinate1", "Ordinate2"]
+ # utils.save1D(spec_fname, x, y, xlabel=xlab,
+ # ylabels=ylabs, filetype="spec",
+ # fmt=["%d", "%.2f"])
+ #
+ # rep = h5ls(spec_fname)
+ # lines = rep.split("\n")
+ # self.assertIn("+1.1", lines)
+ # self.assertIn("\t+instrument", lines)
+ #
+ # self.assertMatchAnyStringInList(
+ # r'\t\t\t<SPEC dataset "file_header": shape \(\), type "|S60">',
+ # lines)
+ # self.assertMatchAnyStringInList(
+ # r'\t\t<SPEC dataset "Ordinate1": shape \(3L?,\), type "<f4">',
+ # lines)
+ #
+ # os.unlink(spec_fname)
+ # shutil.rmtree(tempdir)
+
+
+class TestOpen(unittest.TestCase):
+ """Test `silx.io.utils.open` function."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+ cls.createResources(cls.tmp_directory)
+
+ @classmethod
+ def createResources(cls, directory):
+
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/dataset"] = 50
+ h5.close()
+
+ cls.spec_filename = os.path.join(directory, "test.dat")
+ utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
+ fmt=["%d", "%.2f"], close_file=True, scan_number=1)
+
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
+
+ cls.txt_filename = os.path.join(directory, "test.txt")
+ f = io.open(cls.txt_filename, "w+t")
+ f.write(u"Kikoo")
+ f.close()
+
+ cls.missing_filename = os.path.join(directory, "test.missing")
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def testH5(self):
+ f = utils.open(self.h5_filename)
+ self.assertIsNotNone(f)
+ self.assertIsInstance(f, h5py.File)
+ f.close()
+
+ def testH5With(self):
+ with utils.open(self.h5_filename) as f:
+ self.assertIsNotNone(f)
+ self.assertIsInstance(f, h5py.File)
+
+ def testH5_withPath(self):
+ f = utils.open(self.h5_filename + "::/group/group/dataset")
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.Dataset)
+ self.assertEqual(f[()], 50)
+ f.close()
+
+ def testH5With_withPath(self):
+ with utils.open(self.h5_filename + "::/group/group") as f:
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.Group)
+ self.assertIn("dataset", f)
+
+ def testSpec(self):
+ f = utils.open(self.spec_filename)
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+ f.close()
+
+ def testSpecWith(self):
+ with utils.open(self.spec_filename) as f:
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+
+ def testEdf(self):
+ f = utils.open(self.edf_filename)
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+ f.close()
+
+ def testEdfWith(self):
+ with utils.open(self.edf_filename) as f:
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+
+ def testUnsupported(self):
+ self.assertRaises(IOError, utils.open, self.txt_filename)
+
+ def testNotExists(self):
+ # load it
+ self.assertRaises(IOError, utils.open, self.missing_filename)
+
+ def test_silx_scheme(self):
+ url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/")
+ with utils.open(url.path()) as f:
+ self.assertIsNotNone(f)
+ self.assertTrue(silx.io.utils.is_file(f))
+
+ def test_fabio_scheme(self):
+ url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_bad_url(self):
+ url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_sliced_url(self):
+ url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,))
+ self.assertRaises(IOError, utils.open, url.path())
+
+
+class TestNodes(unittest.TestCase):
+ """Test `silx.io.utils.is_` functions."""
+
+ def test_real_h5py_objects(self):
+ name = tempfile.mktemp(suffix=".h5")
+ try:
+ with h5py.File(name, "w") as h5file:
+ h5group = h5file.create_group("arrays")
+ h5dataset = h5group.create_dataset("scalar", data=10)
+
+ self.assertTrue(utils.is_file(h5file))
+ self.assertTrue(utils.is_group(h5file))
+ self.assertFalse(utils.is_dataset(h5file))
+
+ self.assertFalse(utils.is_file(h5group))
+ self.assertTrue(utils.is_group(h5group))
+ self.assertFalse(utils.is_dataset(h5group))
+
+ self.assertFalse(utils.is_file(h5dataset))
+ self.assertFalse(utils.is_group(h5dataset))
+ self.assertTrue(utils.is_dataset(h5dataset))
+ finally:
+ os.unlink(name)
+
+ def test_h5py_like_file(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ self.h5_class = utils.H5Type.FILE
+
+ obj = Foo()
+ self.assertTrue(utils.is_file(obj))
+ self.assertTrue(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+ def test_h5py_like_group(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ self.h5_class = utils.H5Type.GROUP
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertTrue(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+ def test_h5py_like_dataset(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ self.h5_class = utils.H5Type.DATASET
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertFalse(utils.is_group(obj))
+ self.assertTrue(utils.is_dataset(obj))
+
+ def test_bad(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ pass
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertFalse(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+ def test_bad_api(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ self.h5_class = int
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertFalse(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+
+class TestGetData(unittest.TestCase):
+ """Test `silx.io.utils.get_data` function."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+ cls.createResources(cls.tmp_directory)
+
+ @classmethod
+ def createResources(cls, directory):
+
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/scalar"] = 50
+ h5["group/group/array"] = [1, 2, 3, 4, 5]
+ h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
+ h5.close()
+
+ cls.spec_filename = os.path.join(directory, "test.dat")
+ utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
+ fmt=["%d", "%.2f"], close_file=True, scan_number=1)
+
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
+ fabiofile.append_frame(data=data, header=header)
+ fabiofile.write(cls.edf_multiframe_filename)
+
+ cls.txt_filename = os.path.join(directory, "test.txt")
+ f = io.open(cls.txt_filename, "w+t")
+ f.write(u"Kikoo")
+ f.close()
+
+ cls.missing_filename = os.path.join(directory, "test.missing")
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def test_hdf5_scalar(self):
+ url = "silx:%s?/group/group/scalar" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data, 50)
+
+ def test_hdf5_array(self):
+ url = "silx:%s?/group/group/array" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5,))
+ self.assertEqual(data[0], 1)
+
+ def test_hdf5_array_slice(self):
+ url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5,))
+ self.assertEqual(data[0], 6)
+
+ def test_hdf5_array_slice_out_of_range(self):
+ url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ # ValueError: h5py 2.x
+ # IndexError: h5py 3.x
+ self.assertRaises((ValueError, IndexError), utils.get_data, url)
+
+ def test_edf_using_silx(self):
+ url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_frame(self):
+ url = "fabio:%s?slice=1" % self.edf_multiframe_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_singleframe(self):
+ url = "fabio:%s?slice=0" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_too_much_frames(self):
+ url = "fabio:%s?slice=..." % self.edf_multiframe_filename
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_fabio_no_frame(self):
+ url = "fabio:%s" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_unsupported_scheme(self):
+ url = "foo:/foo/bar"
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_no_scheme(self):
+ url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ self.assertRaises((ValueError, IOError), utils.get_data, url)
+
+ def test_file_not_exists(self):
+ url = "silx:/foo/bar"
+ self.assertRaises(IOError, utils.get_data, url)
+
+
+def _h5_py_version_older_than(version):
+ v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split('.')[:3]]
+ r_majeur, r_mineur, r_micro = [int(i) for i in version.split('.')]
+ return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion(r_majeur, r_mineur, r_micro)
+
+
+@unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0')
+class TestRawFileToH5(unittest.TestCase):
+ """Test conversion of .vol file to .h5 external dataset"""
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self._vol_file = os.path.join(self.tempdir, 'test_vol.vol')
+ self._file_info = os.path.join(self.tempdir, 'test_vol.info.vol')
+ self._dataset_shape = 100, 20, 5
+ data = numpy.random.random(self._dataset_shape[0] *
+ self._dataset_shape[1] *
+ self._dataset_shape[2]).astype(dtype=numpy.float32).reshape(self._dataset_shape)
+ numpy.save(file=self._vol_file, arr=data)
+ # those are storing into .noz file
+ assert os.path.exists(self._vol_file + '.npy')
+ os.rename(self._vol_file + '.npy', self._vol_file)
+ self.h5_file = os.path.join(self.tempdir, 'test_h5.h5')
+ self.external_dataset_path = '/root/my_external_dataset'
+ self._data_url = silx.io.url.DataUrl(file_path=self.h5_file,
+ data_path=self.external_dataset_path)
+ with open(self._file_info, 'w') as _fi:
+ _fi.write('NUM_X = %s\n' % self._dataset_shape[2])
+ _fi.write('NUM_Y = %s\n' % self._dataset_shape[1])
+ _fi.write('NUM_Z = %s\n' % self._dataset_shape[0])
+
+ def tearDown(self):
+ shutil.rmtree(self.tempdir)
+
+ def check_dataset(self, h5_file, data_path, shape):
+ """Make sure the external dataset is valid"""
+ with h5py.File(h5_file, 'r') as _file:
+ return data_path in _file and _file[data_path].shape == shape
+
+ def test_h5_file_not_existing(self):
+ """Test that can create a file with external dataset from scratch"""
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+ os.remove(self.h5_file)
+ utils.vol_to_h5_external_dataset(vol_file=self._vol_file,
+ output_url=self._data_url,
+ info_file=self._file_info)
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+ def test_h5_file_existing(self):
+ """Test that can add the external dataset from an existing file"""
+ with h5py.File(self.h5_file, 'w') as _file:
+ _file['/root/dataset1'] = numpy.zeros((100, 100))
+ _file['/root/group/dataset2'] = numpy.ones((100, 100))
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+ def test_vol_file_not_existing(self):
+ """Make sure error is raised if .vol file does not exists"""
+ os.remove(self._vol_file)
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+ def test_conflicts(self):
+ """Test several conflict cases"""
+ # test if path already exists
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+ with self.assertRaises(ValueError):
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ overwrite=False,
+ dtype=numpy.float32)
+
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ overwrite=True,
+ dtype=numpy.float32)
+
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+
+class TestH5Strings(unittest.TestCase):
+ """Test HDF5 str and bytes writing and reading"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.vlenstr = h5py.special_dtype(vlen=str)
+ cls.vlenbytes = h5py.special_dtype(vlen=bytes)
+ try:
+ cls.unicode = unicode
+ except NameError:
+ cls.unicode = str
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir)
+
+ def setUp(self):
+ self.file = h5py.File(os.path.join(self.tempdir, 'file.h5'), mode="w")
+
+ def tearDown(self):
+ self.file.close()
+
+ @classmethod
+ def _make_array(cls, value, n):
+ if isinstance(value, bytes):
+ dtype = cls.vlenbytes
+ elif isinstance(value, cls.unicode):
+ dtype = cls.vlenstr
+ else:
+ return numpy.array([value] * n)
+ return numpy.array([value] * n, dtype=dtype)
+
+ @classmethod
+ def _get_charset(cls, value):
+ if isinstance(value, bytes):
+ return h5py.h5t.CSET_ASCII
+ elif isinstance(value, cls.unicode):
+ return h5py.h5t.CSET_UTF8
+ else:
+ return None
+
+ def _check_dataset(self, value, result=None):
+ # Write+read scalar
+ if result:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+ charset = self._get_charset(value)
+ self.file["data"] = value
+ data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii)
+ assert type(data) == type(result), data
+ assert data == result, data
+ if charset:
+ assert self.file["data"].id.get_type().get_cset() == charset
+
+ # Write+read variable length
+ self.file["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii, index=0)
+ assert type(data) == type(result), data
+ assert data == result, data
+ data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii)
+ numpy.testing.assert_array_equal(data, [result] * 2)
+ if charset:
+ assert self.file["vlen_data"].id.get_type().get_cset() == charset
+
+ def _check_attribute(self, value, result=None):
+ if result:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+ self.file.attrs["data"] = value
+ data = utils.h5py_read_attribute(self.file.attrs, "data", decode_ascii=decode_ascii)
+ assert type(data) == type(result), data
+ assert data == result, data
+
+ self.file.attrs["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_attribute(self.file.attrs, "vlen_data", decode_ascii=decode_ascii)
+ assert type(data[0]) == type(result), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)["vlen_data"]
+ assert type(data[0]) == type(result), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ def test_dataset_ascii_bytes(self):
+ self._check_dataset(b"abc")
+
+ def test_attribute_ascii_bytes(self):
+ self._check_attribute(b"abc")
+
+ def test_dataset_ascii_bytes_decode(self):
+ self._check_dataset(b"abc", result="abc")
+
+ def test_attribute_ascii_bytes_decode(self):
+ self._check_attribute(b"abc", result="abc")
+
+ def test_dataset_ascii_str(self):
+ self._check_dataset("abc")
+
+ def test_attribute_ascii_str(self):
+ self._check_attribute("abc")
+
+ def test_dataset_utf8_str(self):
+ self._check_dataset("\u0101bc")
+
+ def test_attribute_utf8_str(self):
+ self._check_attribute("\u0101bc")
+
+ def test_dataset_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc")
+
+ def test_attribute_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc")
+
+ def test_dataset_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_attribute_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_dataset_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_dataset(b"\xe423")
+
+ def test_attribute_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_attribute(b"\xe423")
+
+ def test_dataset_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_dataset(b"\xe423", result="\udce423")
+
+ def test_attribute_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_attribute(b"\xe423", result="\udce423")
+
+ def test_dataset_no_string(self):
+ self._check_dataset(numpy.int64(10))
+
+ def test_attribute_no_string(self):
+ self._check_attribute(numpy.int64(10))
+
+
+def test_visitall_hdf5(tmp_path):
+ """visit HDF5 file content not following links"""
+ external_filepath = tmp_path / "external.h5"
+ with h5py.File(external_filepath, mode="w") as h5file:
+ h5file["target/dataset"] = 50
+
+ filepath = tmp_path / "base.h5"
+ with h5py.File(filepath, mode="w") as h5file:
+ h5file["group/dataset"] = 50
+ h5file["link/soft_link"] = h5py.SoftLink("/group/dataset")
+ h5file["link/external_link"] = h5py.ExternalLink("external.h5", "/target/dataset")
+
+ with h5py.File(filepath, mode="r") as h5file:
+ visited_items = {}
+ for path, item in utils.visitall(h5file):
+ if isinstance(item, h5py.Dataset):
+ content = item[()]
+ elif isinstance(item, h5py.Group):
+ content = None
+ elif isinstance(item, h5py.SoftLink):
+ content = item.path
+ elif isinstance(item, h5py.ExternalLink):
+ content = item.filename, item.path
+ else:
+ raise AssertionError("Item should not be present: %s" % path)
+ visited_items[path] = (item.__class__, content)
+
+ assert visited_items == {
+ "/group": (h5py.Group, None),
+ "/group/dataset": (h5py.Dataset, 50),
+ "/link": (h5py.Group, None),
+ "/link/soft_link": (h5py.SoftLink, "/group/dataset"),
+ "/link/external_link": (h5py.ExternalLink, ("external.h5", "/target/dataset")),
+ }
+
+def test_visitall_commonh5():
+ """Visit commonh5 File object"""
+ fobj = commonh5.File("filename.file", mode="w")
+ group = fobj.create_group("group")
+ dataset = group.create_dataset("dataset", data=numpy.array(50))
+ group["soft_link"] = dataset # Create softlink
+
+ visited_items = dict(utils.visitall(fobj))
+ assert len(visited_items) == 3
+ assert visited_items["/group"] is group
+ assert visited_items["/group/dataset"] is dataset
+ soft_link = visited_items["/group/soft_link"]
+ assert isinstance(soft_link, commonh5.SoftLink)
+ assert soft_link.path == "/group/dataset"
diff --git a/src/silx/io/test/test_write_to_h5.py b/src/silx/io/test/test_write_to_h5.py
new file mode 100644
index 0000000..06149c9
--- /dev/null
+++ b/src/silx/io/test/test_write_to_h5.py
@@ -0,0 +1,118 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2021 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Test silx.io.convert.write_to_h5"""
+
+
+import h5py
+import numpy
+from silx.io import spech5
+
+from silx.io.convert import write_to_h5
+from silx.io.dictdump import h5todict
+from silx.io import commonh5
+from silx.io.spech5 import SpecH5
+
+
+def test_with_commonh5(tmp_path):
+ """Test write_to_h5 with commonh5 input"""
+ fobj = commonh5.File("filename.txt", mode="w")
+ group = fobj.create_group("group")
+ dataset = group.create_dataset("dataset", data=numpy.array(50))
+ group["soft_link"] = dataset # Create softlink
+
+ output_filepath = tmp_path / "output.h5"
+ write_to_h5(fobj, str(output_filepath))
+
+ assert h5todict(str(output_filepath)) == {
+ 'group': {'dataset': numpy.array(50), 'soft_link': numpy.array(50)},
+ }
+ with h5py.File(output_filepath, mode="r") as h5file:
+ soft_link = h5file.get("/group/soft_link", getlink=True)
+ assert isinstance(soft_link, h5py.SoftLink)
+ assert soft_link.path == "/group/dataset"
+
+
+def test_with_hdf5(tmp_path):
+ """Test write_to_h5 with HDF5 file input"""
+ filepath = tmp_path / "base.h5"
+ with h5py.File(filepath, mode="w") as h5file:
+ h5file["group/dataset"] = 50
+ h5file["group/soft_link"] = h5py.SoftLink("/group/dataset")
+ h5file["group/external_link"] = h5py.ExternalLink("base.h5", "/group/dataset")
+
+ output_filepath = tmp_path / "output.h5"
+ write_to_h5(str(filepath), str(output_filepath))
+ assert h5todict(str(output_filepath)) == {
+ 'group': {'dataset': 50, 'soft_link': 50},
+ }
+ with h5py.File(output_filepath, mode="r") as h5file:
+ soft_link = h5file.get("group/soft_link", getlink=True)
+ assert isinstance(soft_link, h5py.SoftLink)
+ assert soft_link.path == "/group/dataset"
+
+
+def test_with_spech5(tmp_path):
+ """Test write_to_h5 with SpecH5 input"""
+ filepath = tmp_path / "file.spec"
+ filepath.write_bytes(
+ bytes(
+"""#F /tmp/sf.dat
+
+#S 1 cmd
+#L a b
+1 2
+""",
+ encoding='ascii')
+ )
+
+ output_filepath = tmp_path / "output.h5"
+ with spech5.SpecH5(str(filepath)) as spech5file:
+ write_to_h5(spech5file, str(output_filepath))
+ print(h5todict(str(output_filepath)))
+
+ def assert_equal(item1, item2):
+ if isinstance(item1, dict):
+ assert tuple(item1.keys()) == tuple(item2.keys())
+ for key in item1.keys():
+ assert_equal(item1[key], item2[key])
+ else:
+ numpy.array_equal(item1, item2)
+
+ assert_equal(h5todict(str(output_filepath)), {
+ '1.1': {
+ 'instrument': {
+ 'positioners': {},
+ 'specfile': {
+ 'file_header': ['#F /tmp/sf.dat'],
+ 'scan_header': ['#S 1 cmd', '#L a b'],
+ },
+ },
+ 'measurement': {
+ 'a': [1.],
+ 'b': [2.],
+ },
+ 'start_time': '',
+ 'title': 'cmd',
+ },
+ })