summaryrefslogtreecommitdiff
path: root/src/silx/io/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/silx/io/test')
-rw-r--r--src/silx/io/test/__init__.py22
-rw-r--r--src/silx/io/test/test_commonh5.py291
-rw-r--r--src/silx/io/test/test_dictdump.py1069
-rwxr-xr-xsrc/silx/io/test/test_fabioh5.py627
-rw-r--r--src/silx/io/test/test_fioh5.py296
-rw-r--r--src/silx/io/test/test_h5link_utils.py116
-rw-r--r--src/silx/io/test/test_h5py_utils.py482
-rw-r--r--src/silx/io/test/test_nxdata.py727
-rw-r--r--src/silx/io/test/test_octaveh5.py197
-rw-r--r--src/silx/io/test/test_rawh5.py83
-rw-r--r--src/silx/io/test/test_sliceh5.py104
-rw-r--r--src/silx/io/test/test_specfile.py386
-rw-r--r--src/silx/io/test/test_specfilewrapper.py189
-rw-r--r--src/silx/io/test/test_spech5.py912
-rw-r--r--src/silx/io/test/test_spectoh5.py186
-rw-r--r--src/silx/io/test/test_url.py300
-rw-r--r--src/silx/io/test/test_utils.py1141
-rw-r--r--src/silx/io/test/test_write_to_h5.py120
18 files changed, 7248 insertions, 0 deletions
diff --git a/src/silx/io/test/__init__.py b/src/silx/io/test/__init__.py
new file mode 100644
index 0000000..3c723bb
--- /dev/null
+++ b/src/silx/io/test/__init__.py
@@ -0,0 +1,22 @@
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
diff --git a/src/silx/io/test/test_commonh5.py b/src/silx/io/test/test_commonh5.py
new file mode 100644
index 0000000..1b0a3a6
--- /dev/null
+++ b/src/silx/io/test/test_commonh5.py
@@ -0,0 +1,291 @@
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for commonh5 wrapper"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "21/09/2017"
+
+import logging
+import numpy
+import unittest
+import tempfile
+import shutil
+
+_logger = logging.getLogger(__name__)
+
+import silx.io
+import silx.io.utils
+import h5py
+
+try:
+ from .. import commonh5
+except ImportError:
+ commonh5 = None
+
+
+class _TestCommonFeatures(unittest.TestCase):
+ """Test common features supported by h5py and our implementation."""
+
+ __test__ = False # ignore abstract class tests
+
+ @classmethod
+ def createFile(cls):
+ return None
+
+ @classmethod
+ def setUpClass(cls):
+ # Set to None cause create_resource can raise an excpetion
+ cls.h5 = None
+ cls.h5 = cls.create_resource()
+ if cls.h5 is None:
+ raise unittest.SkipTest("File not created")
+
+ @classmethod
+ def create_resource(cls):
+ """Must be implemented"""
+ return None
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.h5 = None
+
+ def test_file(self):
+ node = self.h5
+ self.assertTrue(silx.io.is_file(node))
+ self.assertTrue(silx.io.is_group(node))
+ self.assertFalse(silx.io.is_dataset(node))
+ self.assertEqual(len(node.attrs), 0)
+
+ def test_group(self):
+ node = self.h5["group"]
+ self.assertFalse(silx.io.is_file(node))
+ self.assertTrue(silx.io.is_group(node))
+ self.assertFalse(silx.io.is_dataset(node))
+ self.assertEqual(len(node.attrs), 0)
+ class_ = self.h5.get("group", getclass=True)
+ classlink = self.h5.get("group", getlink=True, getclass=True)
+ self.assertEqual(class_, h5py.Group)
+ self.assertEqual(classlink, h5py.HardLink)
+
+ def test_dataset(self):
+ node = self.h5["group/dataset"]
+ self.assertFalse(silx.io.is_file(node))
+ self.assertFalse(silx.io.is_group(node))
+ self.assertTrue(silx.io.is_dataset(node))
+ self.assertEqual(len(node.attrs), 0)
+ class_ = self.h5.get("group/dataset", getclass=True)
+ classlink = self.h5.get("group/dataset", getlink=True, getclass=True)
+ self.assertEqual(class_, h5py.Dataset)
+ self.assertEqual(classlink, h5py.HardLink)
+
+ def test_soft_link(self):
+ node = self.h5["link/soft_link"]
+ self.assertEqual(node.name, "/link/soft_link")
+ class_ = self.h5.get("link/soft_link", getclass=True)
+ link = self.h5.get("link/soft_link", getlink=True)
+ classlink = self.h5.get("link/soft_link", getlink=True, getclass=True)
+ self.assertEqual(class_, h5py.Dataset)
+ self.assertTrue(isinstance(link, (h5py.SoftLink, commonh5.SoftLink)))
+ self.assertTrue(silx.io.utils.is_softlink(link))
+ self.assertEqual(classlink, h5py.SoftLink)
+
+ def test_external_link(self):
+ node = self.h5["link/external_link"]
+ self.assertEqual(node.name, "/target/dataset")
+ class_ = self.h5.get("link/external_link", getclass=True)
+ classlink = self.h5.get("link/external_link", getlink=True, getclass=True)
+ self.assertEqual(class_, h5py.Dataset)
+ self.assertEqual(classlink, h5py.ExternalLink)
+
+ def test_external_link_to_link(self):
+ node = self.h5["link/external_link_to_link"]
+ self.assertEqual(node.name, "/target/link")
+ class_ = self.h5.get("link/external_link_to_link", getclass=True)
+ classlink = self.h5.get(
+ "link/external_link_to_link", getlink=True, getclass=True
+ )
+ self.assertEqual(class_, h5py.Dataset)
+ self.assertEqual(classlink, h5py.ExternalLink)
+
+ def test_create_groups(self):
+ c = self.h5.create_group(self.id() + "/a/b/c")
+ d = c.create_group("/" + self.id() + "/a/b/d")
+
+ self.assertRaises(ValueError, self.h5.create_group, self.id() + "/a/b/d")
+ self.assertEqual(c.name, "/" + self.id() + "/a/b/c")
+ self.assertEqual(d.name, "/" + self.id() + "/a/b/d")
+
+ def test_setitem_python_object_dataset(self):
+ group = self.h5.create_group(self.id())
+ group["a"] = 10
+ self.assertEqual(group["a"].dtype.kind, "i")
+
+ def test_setitem_numpy_dataset(self):
+ group = self.h5.create_group(self.id())
+ group["a"] = numpy.array([10, 20, 30])
+ self.assertEqual(group["a"].dtype.kind, "i")
+ self.assertEqual(group["a"].shape, (3,))
+
+ def test_setitem_link(self):
+ group = self.h5.create_group(self.id())
+ group["a"] = 10
+ group["b"] = group["a"]
+ self.assertEqual(group["b"].dtype.kind, "i")
+
+ def test_setitem_dataset_is_sub_group(self):
+ self.h5[self.id() + "/a"] = 10
+
+
+class TestCommonFeatures_h5py(_TestCommonFeatures):
+ """Check if h5py is compliant with what we expect."""
+
+ __test__ = True # because _TestCommonFeatures is ignored
+
+ @classmethod
+ def create_resource(cls):
+ cls.tmp_dir = tempfile.mkdtemp()
+
+ externalh5 = h5py.File(cls.tmp_dir + "/external.h5", mode="w")
+ externalh5["target/dataset"] = 50
+ externalh5["target/link"] = h5py.SoftLink("/target/dataset")
+ externalh5.close()
+
+ h5 = h5py.File(cls.tmp_dir + "/base.h5", mode="w")
+ h5["group/dataset"] = 50
+ h5["link/soft_link"] = h5py.SoftLink("/group/dataset")
+ h5["link/external_link"] = h5py.ExternalLink("external.h5", "/target/dataset")
+ h5["link/external_link_to_link"] = h5py.ExternalLink(
+ "external.h5", "/target/link"
+ )
+
+ return h5
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCommonFeatures_h5py, cls).tearDownClass()
+ if hasattr(cls, "tmp_dir") and cls.tmp_dir is not None:
+ shutil.rmtree(cls.tmp_dir)
+
+
+class TestCommonFeatures_commonH5(_TestCommonFeatures):
+ """Check if commonh5 is compliant with h5py."""
+
+ __test__ = True # because _TestCommonFeatures is ignored
+
+ @classmethod
+ def create_resource(cls):
+ h5 = commonh5.File("base.h5", "w")
+ h5.create_group("group").create_dataset("dataset", data=numpy.int32(50))
+
+ link = h5.create_group("link")
+ link.add_node(commonh5.SoftLink("soft_link", "/group/dataset"))
+
+ return h5
+
+ def test_external_link(self):
+ # not applicable
+ pass
+
+ def test_external_link_to_link(self):
+ # not applicable
+ pass
+
+
+class TestSpecificCommonH5(unittest.TestCase):
+ """Test specific features from commonh5.
+
+ Test of shared features should be done by TestCommonFeatures."""
+
+ def setUp(self):
+ if commonh5 is None:
+ self.skipTest("silx.io.commonh5 is needed")
+
+ def test_node_attrs(self):
+ node = commonh5.Node("Foo", attrs={"a": 1})
+ self.assertEqual(node.attrs["a"], 1)
+ node.attrs["b"] = 8
+ self.assertEqual(node.attrs["b"], 8)
+ node.attrs["b"] = 2
+ self.assertEqual(node.attrs["b"], 2)
+
+ def test_node_readonly_attrs(self):
+ f = commonh5.File(name="Foo", mode="r")
+ node = commonh5.Node("Foo", attrs={"a": 1})
+ node.attrs["b"] = 8
+ f.add_node(node)
+ self.assertEqual(node.attrs["b"], 8)
+ try:
+ node.attrs["b"] = 1
+ self.fail()
+ except RuntimeError:
+ pass
+
+ def test_create_dataset(self):
+ f = commonh5.File(name="Foo", mode="w")
+ node = f.create_dataset("foo", data=numpy.array([1]))
+ self.assertIs(node.parent, f)
+ self.assertIs(f["foo"], node)
+
+ def test_create_group(self):
+ f = commonh5.File(name="Foo", mode="w")
+ node = f.create_group("foo")
+ self.assertIs(node.parent, f)
+ self.assertIs(f["foo"], node)
+
+ def test_readonly_create_dataset(self):
+ f = commonh5.File(name="Foo", mode="r")
+ try:
+ f.create_dataset("foo", data=numpy.array([1]))
+ self.fail()
+ except RuntimeError:
+ pass
+
+ def test_readonly_create_group(self):
+ f = commonh5.File(name="Foo", mode="r")
+ try:
+ f.create_group("foo")
+ self.fail()
+ except RuntimeError:
+ pass
+
+ def test_create_unicode_dataset(self):
+ f = commonh5.File(name="Foo", mode="w")
+ try:
+ f.create_dataset("foo", data=numpy.array("aaaa"))
+ self.fail()
+ except TypeError:
+ pass
+
+ def test_setitem_dataset(self):
+ self.h5 = commonh5.File(name="Foo", mode="w")
+ group = self.h5.create_group(self.id())
+ group["a"] = commonh5.Dataset(None, data=numpy.array(10))
+ self.assertEqual(group["a"].dtype.kind, "i")
+
+ def test_setitem_explicit_link(self):
+ self.h5 = commonh5.File(name="Foo", mode="w")
+ group = self.h5.create_group(self.id())
+ group["a"] = 10
+ group["b"] = commonh5.SoftLink(None, path="/" + self.id() + "/a")
+ self.assertEqual(group["b"].dtype.kind, "i")
diff --git a/src/silx/io/test/test_dictdump.py b/src/silx/io/test/test_dictdump.py
new file mode 100644
index 0000000..2bd376e
--- /dev/null
+++ b/src/silx/io/test/test_dictdump.py
@@ -0,0 +1,1069 @@
+# /*##########################################################################
+# Copyright (C) 2016-2023 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for dicttoh5 module"""
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "17/01/2018"
+
+
+from collections import defaultdict
+from copy import deepcopy
+import os
+import re
+import tempfile
+import unittest
+
+import h5py
+import numpy
+
+try:
+ import pint
+except ImportError:
+ pint = None
+import pytest
+
+from silx.utils.testutils import LoggingValidator
+
+from ..configdict import ConfigDict
+from .. import dictdump
+from ..dictdump import dicttoh5, dicttojson, dump
+from ..dictdump import h5todict, load
+from ..dictdump import logger as dictdump_logger
+from ..utils import is_link
+from ..utils import h5py_read_dataset
+
+
+def tree():
+ """Tree data structure as a recursive nested dictionary"""
+ return defaultdict(tree)
+
+
+inhabitants = 160215
+
+city_attrs = tree()
+city_attrs["Europe"]["France"]["Grenoble"]["area"] = "18.44 km2"
+city_attrs["Europe"]["France"]["Grenoble"]["inhabitants"] = inhabitants
+city_attrs["Europe"]["France"]["Grenoble"]["coordinates"] = [45.1830, 5.7196]
+city_attrs["Europe"]["France"]["Tourcoing"]["area"]
+
+ext_attrs = tree()
+ext_attrs["ext_group"]["dataset"] = 10
+ext_filename = "ext.h5"
+
+link_attrs = tree()
+link_attrs["links"]["group"]["dataset"] = 10
+link_attrs["links"]["group"]["relative_softlink"] = h5py.SoftLink("dataset")
+link_attrs["links"]["relative_softlink"] = h5py.SoftLink("group/dataset")
+link_attrs["links"]["absolute_softlink"] = h5py.SoftLink("/links/group/dataset")
+link_attrs["links"]["external_link"] = h5py.ExternalLink(
+ ext_filename, "/ext_group/dataset"
+)
+
+
+class DictTestCase(unittest.TestCase):
+ def assertRecursiveEqual(self, expected, actual, nodes=tuple()):
+ err_msg = "\n\n Tree nodes: {}".format(nodes)
+ if isinstance(expected, dict):
+ self.assertTrue(isinstance(actual, dict), msg=err_msg)
+ self.assertEqual(set(expected.keys()), set(actual.keys()), msg=err_msg)
+ for k in actual:
+ self.assertRecursiveEqual(
+ expected[k],
+ actual[k],
+ nodes=nodes + (k,),
+ )
+ return
+ if isinstance(actual, numpy.ndarray):
+ actual = actual.tolist()
+ if isinstance(expected, numpy.ndarray):
+ expected = expected.tolist()
+
+ self.assertEqual(expected, actual, msg=err_msg)
+
+
+class H5DictTestCase(DictTestCase):
+ def _dictRoundTripNormalize(self, treedict):
+ """Convert the dictionary as expected from a round-trip
+ treedict -> dicttoh5 -> h5todict -> newtreedict
+ """
+ for key, value in list(treedict.items()):
+ if isinstance(value, dict):
+ self._dictRoundTripNormalize(value)
+
+ # Expand treedict[("group", "attr_name")]
+ # to treedict["group"]["attr_name"]
+ for key, value in list(treedict.items()):
+ if not isinstance(key, tuple):
+ continue
+ # Put the attribute inside the group
+ grpname, attr = key
+ if not grpname:
+ continue
+ group = treedict.setdefault(grpname, dict())
+ if isinstance(group, dict):
+ del treedict[key]
+ group[("", attr)] = value
+
+ def dictRoundTripNormalize(self, treedict):
+ treedict2 = deepcopy(treedict)
+ self._dictRoundTripNormalize(treedict2)
+ return treedict2
+
+
+class TestDictToH5(H5DictTestCase):
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, ext_filename)
+
+ def tearDown(self):
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
+ os.rmdir(self.tempdir)
+
+ def testH5CityAttrs(self):
+ filters = {"shuffle": True, "fletcher32": True}
+ dicttoh5(
+ city_attrs,
+ self.h5_fname,
+ h5path="/city attributes",
+ mode="w",
+ create_dataset_args=filters,
+ )
+
+ h5f = h5py.File(self.h5_fname, mode="r")
+
+ self.assertIn("Tourcoing/area", h5f["/city attributes/Europe/France"])
+ ds = h5f["/city attributes/Europe/France/Grenoble/inhabitants"]
+ self.assertEqual(ds[...], 160215)
+
+ # filters only apply to datasets that are not scalars (shape != () )
+ ds = h5f["/city attributes/Europe/France/Grenoble/coordinates"]
+ # self.assertEqual(ds.compression, "gzip")
+ self.assertTrue(ds.fletcher32)
+ self.assertTrue(ds.shuffle)
+
+ h5f.close()
+
+ ddict = load(self.h5_fname, fmat="hdf5")
+ self.assertAlmostEqual(
+ min(
+ ddict["city attributes"]["Europe"]["France"]["Grenoble"]["coordinates"]
+ ),
+ 5.7196,
+ )
+
+ def testAttributes(self):
+ """Any kind of attribute can be described"""
+ ddict = {
+ "group": {"datatset": "hmmm", ("", "group_attr"): 10},
+ "dataset": "aaaaaaaaaaaaaaa",
+ ("", "root_attr"): 11,
+ ("dataset", "dataset_attr"): 12,
+ ("group", "group_attr2"): 13,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ self.assertEqual(h5file["group"].attrs["group_attr"], 10)
+ self.assertEqual(h5file.attrs["root_attr"], 11)
+ self.assertEqual(h5file["dataset"].attrs["dataset_attr"], 12)
+ self.assertEqual(h5file["group"].attrs["group_attr2"], 13)
+
+ def testPathAttributes(self):
+ """A group is requested at a path"""
+ ddict = {
+ ("", "NX_class"): "NXcollection",
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ # This should not warn
+ with LoggingValidator(dictdump_logger, warning=0):
+ dictdump.dicttoh5(ddict, h5file, h5path="foo/bar")
+
+ def testKeyOrder(self):
+ ddict1 = {
+ "d": "plow",
+ ("d", "a"): "ox",
+ }
+ ddict2 = {
+ ("d", "a"): "ox",
+ "d": "plow",
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict1, h5file, h5path="g1")
+ dictdump.dicttoh5(ddict2, h5file, h5path="g2")
+ self.assertEqual(h5file["g1/d"].attrs["a"], "ox")
+ self.assertEqual(h5file["g2/d"].attrs["a"], "ox")
+
+ def testAttributeValues(self):
+ """Any NX data types can be used"""
+ ddict = {
+ ("", "bool"): True,
+ ("", "int"): 11,
+ ("", "float"): 1.1,
+ ("", "str"): "a",
+ ("", "boollist"): [True, False, True],
+ ("", "intlist"): [11, 22, 33],
+ ("", "floatlist"): [1.1, 2.2, 3.3],
+ ("", "strlist"): ["a", "bb", "ccc"],
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ for k, expected in ddict.items():
+ result = h5file.attrs[k[1]]
+ if isinstance(expected, list):
+ if isinstance(expected[0], str):
+ numpy.testing.assert_array_equal(result, expected)
+ else:
+ numpy.testing.assert_array_almost_equal(result, expected)
+ else:
+ self.assertEqual(result, expected)
+
+ def testAttributeAlreadyExists(self):
+ """A duplicated attribute warns if overwriting is not enabled"""
+ ddict = {
+ "group": {"dataset": "hmmm", ("", "attr"): 10},
+ ("group", "attr"): 10,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ self.assertEqual(h5file["group"].attrs["attr"], 10)
+
+ def testFlatDict(self):
+ """Description of a tree with a single level of keys"""
+ ddict = {
+ "group/group/dataset": 10,
+ ("group/group/dataset", "attr"): 11,
+ ("group/group", "attr"): 12,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ self.assertEqual(h5file["group/group/dataset"][()], 10)
+ self.assertEqual(h5file["group/group/dataset"].attrs["attr"], 11)
+ self.assertEqual(h5file["group/group"].attrs["attr"], 12)
+
+ def testLinks(self):
+ with h5py.File(self.h5_ext_fname, "w") as h5file:
+ dictdump.dicttoh5(ext_attrs, h5file)
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(link_attrs, h5file)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ self.assertEqual(h5file["links/group/dataset"][()], 10)
+ self.assertEqual(h5file["links/group/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/absolute_softlink"][()], 10)
+ self.assertEqual(h5file["links/external_link"][()], 10)
+
+ def testDumpNumpyArray(self):
+ ddict = {
+ "darks": {"0": numpy.array([[0, 0, 0], [0, 0, 0]], dtype=numpy.uint16)}
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttoh5(ddict, h5file)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ numpy.testing.assert_array_equal(
+ h5py_read_dataset(h5file["darks"]["0"]), ddict["darks"]["0"]
+ )
+
+ def testOverwrite(self):
+ # Tree structure that will be tested
+ group1 = {
+ ("", "attr2"): "original2",
+ "dset1": 0,
+ "dset2": [0, 1],
+ ("dset1", "attr1"): "original1",
+ ("dset1", "attr2"): "original2",
+ ("dset2", "attr1"): "original1",
+ ("dset2", "attr2"): "original2",
+ }
+ group2 = {
+ "subgroup1": group1.copy(),
+ "subgroup2": group1.copy(),
+ ("subgroup1", "attr1"): "original1",
+ ("subgroup2", "attr1"): "original1",
+ }
+ group2.update(group1)
+ # initial HDF5 tree
+ otreedict = {
+ ("", "attr1"): "original1",
+ ("", "attr2"): "original2",
+ "group1": group1,
+ "group2": group2,
+ ("group1", "attr1"): "original1",
+ ("group2", "attr1"): "original1",
+ }
+ wtreedict = None # dumped dictionary
+ etreedict = None # expected HDF5 tree after dump
+
+ def reset_file():
+ dicttoh5(
+ otreedict,
+ h5file=self.h5_fname,
+ mode="w",
+ )
+
+ def append_file(update_mode):
+ dicttoh5(wtreedict, h5file=self.h5_fname, mode="a", update_mode=update_mode)
+
+ def assert_file():
+ rtreedict = h5todict(self.h5_fname, include_attributes=True, asarray=False)
+ netreedict = self.dictRoundTripNormalize(etreedict)
+ try:
+ self.assertRecursiveEqual(netreedict, rtreedict)
+ except AssertionError:
+ from pprint import pprint
+
+ print("\nDUMP:")
+ pprint(wtreedict)
+ print("\nEXPECTED:")
+ pprint(netreedict)
+ print("\nHDF5:")
+ pprint(rtreedict)
+ raise
+
+ def assert_append(update_mode):
+ append_file(update_mode)
+ assert_file()
+
+ # Test wrong arguments
+ with self.assertRaises(ValueError):
+ dicttoh5(
+ otreedict, h5file=self.h5_fname, mode="w", update_mode="wrong-value"
+ )
+
+ # No writing
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ assert_file()
+
+ # Write identical dictionary
+ wtreedict = deepcopy(otreedict)
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add", "modify", "replace"]:
+ assert_append(update_mode)
+
+ # Write empty dictionary
+ wtreedict = dict()
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add", "modify", "replace"]:
+ assert_append(update_mode)
+
+ # Modified dataset
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]}
+ wtreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ etreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ etreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]}
+ etreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
+ assert_append("replace")
+
+ # Modified group
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = [0, 1]
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add", "modify"]:
+ assert_append(update_mode)
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = [0, 1]
+ assert_append("replace")
+
+ # Modified attribute
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified"
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ etreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified"
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ etreedict["group2"]["subgroup2"]["dset1"] = dict()
+ etreedict["group2"]["subgroup2"]["dset1"][("", "attr1")] = "modified"
+ assert_append("replace")
+
+ # Delete group
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = None
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ del etreedict["group2"]["subgroup2"]
+ del etreedict["group2"][("subgroup2", "attr1")]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ assert_append("replace")
+
+ # Delete dataset
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"]["dset2"] = None
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ del etreedict["group2"]["subgroup2"]["dset2"]
+ del etreedict["group2"]["subgroup2"][("dset2", "attr1")]
+ del etreedict["group2"]["subgroup2"][("dset2", "attr2")]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ assert_append("replace")
+
+ # Delete attribute
+ wtreedict = dict()
+ wtreedict["group2"] = dict()
+ wtreedict["group2"]["subgroup2"] = dict()
+ wtreedict["group2"]["subgroup2"][("dset2", "attr1")] = None
+
+ reset_file()
+ etreedict = deepcopy(otreedict)
+ for update_mode in [None, "add"]:
+ assert_append(update_mode)
+
+ del etreedict["group2"]["subgroup2"][("dset2", "attr1")]
+ assert_append("modify")
+
+ etreedict["group2"] = dict()
+ del etreedict[("group2", "attr1")]
+ etreedict["group2"]["subgroup2"] = dict()
+ etreedict["group2"]["subgroup2"]["dset2"] = dict()
+ assert_append("replace")
+
+
+@pytest.mark.skipif(pint is None, reason="Require pint")
+def test_dicttoh5_pint(tmp_h5py_file):
+ ureg = pint.UnitRegistry()
+ treedict = {
+ "array_mm": pint.Quantity([1, 2, 3], ureg.mm),
+ "value_kg": 3 * ureg.kg,
+ }
+
+ dicttoh5(treedict, tmp_h5py_file)
+
+ result = h5todict(tmp_h5py_file)
+ assert set(treedict.keys()) == set(result.keys())
+ for key, value in treedict.items():
+ assert numpy.array_equal(result[key], value.magnitude)
+
+
+def test_dicttoh5_not_serializable(tmp_h5py_file):
+ treedict = {"group": {"dset": [{"a": 1}]}}
+ err_msg = "Failed to create dataset '/group/dset' with data (numpy.ndarray-object) = [{'a': 1}]"
+ with pytest.raises(ValueError, match=re.escape(err_msg)):
+ dicttoh5(treedict, tmp_h5py_file)
+
+
+class TestH5ToDict(H5DictTestCase):
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, ext_filename)
+ dicttoh5(city_attrs, self.h5_fname)
+ dicttoh5(link_attrs, self.h5_fname, mode="a")
+ dicttoh5(ext_attrs, self.h5_ext_fname)
+
+ def tearDown(self):
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
+ os.rmdir(self.tempdir)
+
+ def testExcludeNames(self):
+ ddict = h5todict(
+ self.h5_fname,
+ path="/Europe/France",
+ exclude_names=["ourcoing", "inhab", "toto"],
+ )
+ self.assertNotIn("Tourcoing", ddict)
+ self.assertIn("Grenoble", ddict)
+
+ self.assertNotIn("inhabitants", ddict["Grenoble"])
+ self.assertIn("coordinates", ddict["Grenoble"])
+ self.assertIn("area", ddict["Grenoble"])
+
+ def testAsArrayTrue(self):
+ """Test with asarray=True, the default"""
+ ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble")
+ self.assertTrue(
+ numpy.array_equal(ddict["inhabitants"], numpy.array(inhabitants))
+ )
+
+ def testAsArrayFalse(self):
+ """Test with asarray=False"""
+ ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble", asarray=False)
+ self.assertEqual(ddict["inhabitants"], inhabitants)
+
+ def testDereferenceLinks(self):
+ ddict = h5todict(self.h5_fname, path="links", dereference_links=True)
+ self.assertTrue(ddict["absolute_softlink"], 10)
+ self.assertTrue(ddict["relative_softlink"], 10)
+ self.assertTrue(ddict["external_link"], 10)
+ self.assertTrue(ddict["group"]["relative_softlink"], 10)
+
+ def testPreserveLinks(self):
+ ddict = h5todict(self.h5_fname, path="links", dereference_links=False)
+ self.assertTrue(is_link(ddict["absolute_softlink"]))
+ self.assertTrue(is_link(ddict["relative_softlink"]))
+ self.assertTrue(is_link(ddict["external_link"]))
+ self.assertTrue(is_link(ddict["group"]["relative_softlink"]))
+
+ def testStrings(self):
+ ddict = {
+ "dset_bytes": b"bytes",
+ "dset_utf8": "utf8",
+ "dset_2bytes": [b"bytes", b"bytes"],
+ "dset_2utf8": ["utf8", "utf8"],
+ ("", "attr_bytes"): b"bytes",
+ ("", "attr_utf8"): "utf8",
+ ("", "attr_2bytes"): [b"bytes", b"bytes"],
+ ("", "attr_2utf8"): ["utf8", "utf8"],
+ }
+ dicttoh5(ddict, self.h5_fname, mode="w")
+ adict = h5todict(self.h5_fname, include_attributes=True, asarray=False)
+ self.assertEqual(ddict["dset_bytes"], adict["dset_bytes"])
+ self.assertEqual(ddict["dset_utf8"], adict["dset_utf8"])
+ self.assertEqual(ddict[("", "attr_bytes")], adict[("", "attr_bytes")])
+ self.assertEqual(ddict[("", "attr_utf8")], adict[("", "attr_utf8")])
+ numpy.testing.assert_array_equal(ddict["dset_2bytes"], adict["dset_2bytes"])
+ numpy.testing.assert_array_equal(ddict["dset_2utf8"], adict["dset_2utf8"])
+ numpy.testing.assert_array_equal(
+ ddict[("", "attr_2bytes")], adict[("", "attr_2bytes")]
+ )
+ numpy.testing.assert_array_equal(
+ ddict[("", "attr_2utf8")], adict[("", "attr_2utf8")]
+ )
+
+
+class TestDictToNx(H5DictTestCase):
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.h5_fname = os.path.join(self.tempdir, "nx.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5")
+
+ def tearDown(self):
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
+ os.rmdir(self.tempdir)
+
+ def testAttributes(self):
+ """Any kind of attribute can be described"""
+ ddict = {
+ "group": {"dataset": 100, "@group_attr1": 10},
+ "dataset": 200,
+ "@root_attr": 11,
+ "dataset@dataset_attr": "12",
+ "group@group_attr2": 13,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttonx(ddict, h5file)
+ self.assertEqual(h5file["group"].attrs["group_attr1"], 10)
+ self.assertEqual(h5file.attrs["root_attr"], 11)
+ self.assertEqual(h5file["dataset"].attrs["dataset_attr"], "12")
+ self.assertEqual(h5file["group"].attrs["group_attr2"], 13)
+
+ def testKeyOrder(self):
+ ddict1 = {
+ "d": "plow",
+ "d@a": "ox",
+ }
+ ddict2 = {
+ "d@a": "ox",
+ "d": "plow",
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttonx(ddict1, h5file, h5path="g1")
+ dictdump.dicttonx(ddict2, h5file, h5path="g2")
+ self.assertEqual(h5file["g1/d"].attrs["a"], "ox")
+ self.assertEqual(h5file["g2/d"].attrs["a"], "ox")
+
+ def testAttributeValues(self):
+ """Any NX data types can be used"""
+ ddict = {
+ "@bool": True,
+ "@int": 11,
+ "@float": 1.1,
+ "@str": "a",
+ "@boollist": [True, False, True],
+ "@intlist": [11, 22, 33],
+ "@floatlist": [1.1, 2.2, 3.3],
+ "@strlist": ["a", "bb", "ccc"],
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttonx(ddict, h5file)
+ for k, expected in ddict.items():
+ result = h5file.attrs[k[1:]]
+ if isinstance(expected, list):
+ if isinstance(expected[0], str):
+ numpy.testing.assert_array_equal(result, expected)
+ else:
+ numpy.testing.assert_array_almost_equal(result, expected)
+ else:
+ self.assertEqual(result, expected)
+
+ def testFlatDict(self):
+ """Description of a tree with a single level of keys"""
+ ddict = {
+ "group/group/dataset": 10,
+ "group/group/dataset@attr": 11,
+ "group/group@attr": 12,
+ }
+ with h5py.File(self.h5_fname, "w") as h5file:
+ dictdump.dicttonx(ddict, h5file)
+ self.assertEqual(h5file["group/group/dataset"][()], 10)
+ self.assertEqual(h5file["group/group/dataset"].attrs["attr"], 11)
+ self.assertEqual(h5file["group/group"].attrs["attr"], 12)
+
+ def testLinks(self):
+ ddict = {"ext_group": {"dataset": 10}}
+ dictdump.dicttonx(ddict, self.h5_ext_fname)
+ ddict = {
+ "links": {
+ "group": {"dataset": 10, ">relative_softlink": "dataset"},
+ ">relative_softlink": "group/dataset",
+ ">absolute_softlink": "/links/group/dataset",
+ ">external_link": "nx_ext.h5::/ext_group/dataset",
+ }
+ }
+ dictdump.dicttonx(ddict, self.h5_fname)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ self.assertEqual(h5file["links/group/dataset"][()], 10)
+ self.assertEqual(h5file["links/group/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/relative_softlink"][()], 10)
+ self.assertEqual(h5file["links/absolute_softlink"][()], 10)
+ self.assertEqual(h5file["links/external_link"][()], 10)
+
+ def testUpLinks(self):
+ ddict = {
+ "data": {"group": {"dataset": 10, ">relative_softlink": "dataset"}},
+ "links": {
+ "group": {
+ "subgroup": {">relative_softlink": "../../../data/group/dataset"}
+ }
+ },
+ }
+ dictdump.dicttonx(ddict, self.h5_fname)
+ with h5py.File(self.h5_fname, "r") as h5file:
+ self.assertEqual(h5file["/links/group/subgroup/relative_softlink"][()], 10)
+
+ def testOverwrite(self):
+ entry_name = "entry"
+ wtreedict = {
+ "group1": {"a": 1, "b": 2},
+ "group2@attr3": "attr3",
+ "group2@attr4": "attr4",
+ "group2": {
+ "@attr1": "attr1",
+ "@attr2": "attr2",
+ "c": 3,
+ "d": 4,
+ "dataset4": 8,
+ "dataset4@units": "keV",
+ },
+ "group3": {"subgroup": {"e": 9, "f": 10}},
+ "dataset1": 5,
+ "dataset2": 6,
+ "dataset3": 7,
+ "dataset3@units": "mm",
+ }
+ esubtree = {
+ "@NX_class": "NXentry",
+ "group1": {"@NX_class": "NXcollection", "a": 1, "b": 2},
+ "group2": {
+ "@NX_class": "NXcollection",
+ "@attr1": "attr1",
+ "@attr2": "attr2",
+ "@attr3": "attr3",
+ "@attr4": "attr4",
+ "c": 3,
+ "d": 4,
+ "dataset4": 8,
+ "dataset4@units": "keV",
+ },
+ "group3": {
+ "@NX_class": "NXcollection",
+ "subgroup": {"@NX_class": "NXcollection", "e": 9, "f": 10},
+ },
+ "dataset1": 5,
+ "dataset2": 6,
+ "dataset3": 7,
+ "dataset3@units": "mm",
+ }
+ etreedict = {entry_name: esubtree}
+
+ def append_file(update_mode, add_nx_class):
+ dictdump.dicttonx(
+ wtreedict,
+ h5file=self.h5_fname,
+ mode="a",
+ h5path=entry_name,
+ update_mode=update_mode,
+ add_nx_class=add_nx_class,
+ )
+
+ def assert_file():
+ rtreedict = dictdump.nxtodict(
+ self.h5_fname,
+ include_attributes=True,
+ asarray=False,
+ )
+ netreedict = self.dictRoundTripNormalize(etreedict)
+ try:
+ self.assertRecursiveEqual(netreedict, rtreedict)
+ except AssertionError:
+ from pprint import pprint
+
+ print("\nDUMP:")
+ pprint(wtreedict)
+ print("\nEXPECTED:")
+ pprint(netreedict)
+ print("\nHDF5:")
+ pprint(rtreedict)
+ raise
+
+ def assert_append(update_mode, add_nx_class=None):
+ append_file(update_mode, add_nx_class=add_nx_class)
+ assert_file()
+
+ # First to an empty file
+ assert_append(None)
+
+ # Add non-existing attributes/datasets/groups
+ wtreedict["group1"].pop("a")
+ wtreedict["group2"].pop("@attr1")
+ wtreedict["group2"]["@attr2"] = "attr3" # only for update
+ wtreedict["group2"]["@type"] = "test"
+ wtreedict["group2"]["dataset4"] = 9 # only for update
+ del wtreedict["group2"]["dataset4@units"]
+ wtreedict["group3"] = {}
+ esubtree["group2"]["@type"] = "test"
+ assert_append("add")
+
+ # Add update existing attributes and datasets
+ esubtree["group2"]["@attr2"] = "attr3"
+ esubtree["group2"]["dataset4"] = 9
+ assert_append("modify")
+
+ # Do not add missing NX_class by default when updating
+ wtreedict["group2"]["@NX_class"] = "NXprocess"
+ esubtree["group2"]["@NX_class"] = "NXprocess"
+ assert_append("modify")
+ del wtreedict["group2"]["@NX_class"]
+ assert_append("modify")
+
+ # Overwrite existing groups/datasets/attributes
+ esubtree["group1"].pop("a")
+ esubtree["group2"].pop("@attr1")
+ esubtree["group2"]["@NX_class"] = "NXcollection"
+ esubtree["group2"]["dataset4"] = 9
+ del esubtree["group2"]["dataset4@units"]
+ esubtree["group3"] = {"@NX_class": "NXcollection"}
+ assert_append("replace", add_nx_class=True)
+
+
+@pytest.mark.skipif(pint is None, reason="Require pint")
+def test_dicttonx_pint(tmp_h5py_file):
+ ureg = pint.UnitRegistry()
+ treedict = {
+ "array_mm": pint.Quantity([1, 2, 3], ureg.mm),
+ "value_kg": 3 * ureg.kg,
+ }
+
+ dictdump.dicttonx(treedict, tmp_h5py_file)
+
+ result = dictdump.nxtodict(tmp_h5py_file)
+ for key, value in treedict.items():
+ assert numpy.array_equal(result[key], value.magnitude)
+ assert result[f"{key}@units"] == f"{value.units:~C}"
+
+
+class TestNxToDict(H5DictTestCase):
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.h5_fname = os.path.join(self.tempdir, "nx.h5")
+ self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5")
+
+ def tearDown(self):
+ if os.path.exists(self.h5_fname):
+ os.unlink(self.h5_fname)
+ if os.path.exists(self.h5_ext_fname):
+ os.unlink(self.h5_ext_fname)
+ os.rmdir(self.tempdir)
+
+ def testAttributes(self):
+ """Any kind of attribute can be described"""
+ ddict = {
+ "group": {"dataset": 100, "@group_attr1": 10},
+ "dataset": 200,
+ "@root_attr": 11,
+ "dataset@dataset_attr": "12",
+ "group@group_attr2": 13,
+ }
+ dictdump.dicttonx(ddict, self.h5_fname)
+ ddict = dictdump.nxtodict(self.h5_fname, include_attributes=True)
+ self.assertEqual(ddict["group"]["@group_attr1"], 10)
+ self.assertEqual(ddict["@root_attr"], 11)
+ self.assertEqual(ddict["dataset@dataset_attr"], "12")
+ self.assertEqual(ddict["group"]["@group_attr2"], 13)
+
+ def testDereferenceLinks(self):
+ """Write links and dereference on read"""
+ ddict = {"ext_group": {"dataset": 10}}
+ dictdump.dicttonx(ddict, self.h5_ext_fname)
+ ddict = {
+ "links": {
+ "group": {"dataset": 10, ">relative_softlink": "dataset"},
+ ">relative_softlink": "group/dataset",
+ ">absolute_softlink": "/links/group/dataset",
+ ">external_link": "nx_ext.h5::/ext_group/dataset",
+ }
+ }
+ dictdump.dicttonx(ddict, self.h5_fname)
+
+ ddict = dictdump.h5todict(self.h5_fname, dereference_links=True)
+ self.assertTrue(ddict["links"]["absolute_softlink"], 10)
+ self.assertTrue(ddict["links"]["relative_softlink"], 10)
+ self.assertTrue(ddict["links"]["external_link"], 10)
+ self.assertTrue(ddict["links"]["group"]["relative_softlink"], 10)
+
+ def testPreserveLinks(self):
+ """Write/read links"""
+ ddict = {"ext_group": {"dataset": 10}}
+ dictdump.dicttonx(ddict, self.h5_ext_fname)
+ ddict = {
+ "links": {
+ "group": {"dataset": 10, ">relative_softlink": "dataset"},
+ ">relative_softlink": "group/dataset",
+ ">absolute_softlink": "/links/group/dataset",
+ ">external_link": "nx_ext.h5::/ext_group/dataset",
+ }
+ }
+ dictdump.dicttonx(ddict, self.h5_fname)
+
+ ddict = dictdump.nxtodict(self.h5_fname, dereference_links=False)
+ self.assertTrue(ddict["links"][">absolute_softlink"], "dataset")
+ self.assertTrue(ddict["links"][">relative_softlink"], "group/dataset")
+ self.assertTrue(ddict["links"][">external_link"], "/links/group/dataset")
+ self.assertTrue(
+ ddict["links"]["group"][">relative_softlink"],
+ "nx_ext.h5::/ext_group/datase",
+ )
+
+ def testNotExistingPath(self):
+ """Test converting not existing path"""
+ with h5py.File(self.h5_fname, "a") as f:
+ f["data"] = 1
+
+ ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors="ignore")
+ self.assertFalse(ddict)
+
+ with LoggingValidator(dictdump_logger, error=1):
+ ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors="log")
+ self.assertFalse(ddict)
+
+ with self.assertRaises(KeyError):
+ h5todict(self.h5_fname, path="/I/am/not/a/path", errors="raise")
+
+ def testBrokenLinks(self):
+ """Test with broken links"""
+ with h5py.File(self.h5_fname, "a") as f:
+ f["/Mars/BrokenSoftLink"] = h5py.SoftLink("/Idontexists")
+ f["/Mars/BrokenExternalLink"] = h5py.ExternalLink(
+ "notexistingfile.h5", "/Idontexists"
+ )
+
+ ddict = h5todict(self.h5_fname, path="/Mars", errors="ignore")
+ self.assertFalse(ddict)
+
+ with LoggingValidator(dictdump_logger, error=2):
+ ddict = h5todict(self.h5_fname, path="/Mars", errors="log")
+ self.assertFalse(ddict)
+
+ with self.assertRaises(KeyError):
+ h5todict(self.h5_fname, path="/Mars", errors="raise")
+
+
+class TestDictToJson(DictTestCase):
+ def setUp(self):
+ self.dir_path = tempfile.mkdtemp()
+ self.json_fname = os.path.join(self.dir_path, "cityattrs.json")
+
+ def tearDown(self):
+ os.unlink(self.json_fname)
+ os.rmdir(self.dir_path)
+
+ def testJsonCityAttrs(self):
+ self.json_fname = os.path.join(self.dir_path, "cityattrs.json")
+ dicttojson(city_attrs, self.json_fname, indent=3)
+
+ with open(self.json_fname, "r") as f:
+ json_content = f.read()
+ self.assertIn('"inhabitants": 160215', json_content)
+
+
+class TestDictToIni(DictTestCase):
+ def setUp(self):
+ self.dir_path = tempfile.mkdtemp()
+ self.ini_fname = os.path.join(self.dir_path, "test.ini")
+
+ def tearDown(self):
+ os.unlink(self.ini_fname)
+ os.rmdir(self.dir_path)
+
+ def testConfigDictIO(self):
+ """Ensure values and types of data is preserved when dictionary is
+ written to file and read back."""
+ testdict = {
+ "simple_types": {
+ "float": 1.0,
+ "int": 1,
+ "percent string": "5 % is too much",
+ "backslash string": "i can use \\",
+ "empty_string": "",
+ "nonestring": "None",
+ "nonetype": None,
+ "interpstring": "interpolation: %(percent string)s",
+ },
+ "containers": {
+ "list": [-1, "string", 3.0, False, None],
+ "array": numpy.array([1.0, 2.0, 3.0]),
+ "dict": {
+ "key1": "Hello World",
+ "key2": 2.0,
+ },
+ },
+ }
+
+ dump(testdict, self.ini_fname)
+
+ # read the data back
+ readdict = load(self.ini_fname)
+
+ testdictkeys = list(testdict.keys())
+ readkeys = list(readdict.keys())
+
+ self.assertTrue(
+ len(readkeys) == len(testdictkeys), "Number of read keys not equal"
+ )
+
+ self.assertEqual(
+ readdict["simple_types"]["interpstring"], "interpolation: 5 % is too much"
+ )
+
+ testdict["simple_types"]["interpstring"] = "interpolation: 5 % is too much"
+
+ for key in testdict["simple_types"]:
+ original = testdict["simple_types"][key]
+ read = readdict["simple_types"][key]
+ self.assertEqual(
+ read, original, "Read <%s> instead of <%s>" % (read, original)
+ )
+
+ for key in testdict["containers"]:
+ original = testdict["containers"][key]
+ read = readdict["containers"][key]
+ if key == "array":
+ self.assertEqual(
+ read.all(),
+ original.all(),
+ "Read <%s> instead of <%s>" % (read, original),
+ )
+ else:
+ self.assertEqual(
+ read, original, "Read <%s> instead of <%s>" % (read, original)
+ )
+
+ def testConfigDictOrder(self):
+ """Ensure order is preserved when dictionary is
+ written to file and read back."""
+ test_dict = {"banana": 3, "apple": 4, "pear": 1, "orange": 2}
+ # sort by key
+ test_ordered_dict1 = dict(sorted(test_dict.items(), key=lambda t: t[0]))
+ # sort by value
+ test_ordered_dict2 = dict(sorted(test_dict.items(), key=lambda t: t[1]))
+ # add the two ordered dict as sections of a third ordered dict
+ test_ordered_dict3 = {}
+ test_ordered_dict3["section1"] = test_ordered_dict1
+ test_ordered_dict3["section2"] = test_ordered_dict2
+
+ # write to ini and read back as a ConfigDict
+ dump(test_ordered_dict3, self.ini_fname, fmat="ini")
+ read_instance = ConfigDict()
+ read_instance.read(self.ini_fname)
+
+ # loop through original and read-back dictionaries,
+ # test identical order for key/value pairs
+ for orig_key, section in zip(test_ordered_dict3.keys(), read_instance.keys()):
+ self.assertEqual(orig_key, section)
+ for orig_key2, read_key in zip(
+ test_ordered_dict3[section].keys(), read_instance[section].keys()
+ ):
+ self.assertEqual(orig_key2, read_key)
+ self.assertEqual(
+ test_ordered_dict3[section][orig_key2],
+ read_instance[section][read_key],
+ )
diff --git a/src/silx/io/test/test_fabioh5.py b/src/silx/io/test/test_fabioh5.py
new file mode 100755
index 0000000..9c92f15
--- /dev/null
+++ b/src/silx/io/test/test_fabioh5.py
@@ -0,0 +1,627 @@
+# /*##########################################################################
+# Copyright (C) 2016-2023 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for fabioh5 wrapper"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "02/07/2018"
+
+import os
+import logging
+import numpy
+import unittest
+import tempfile
+import shutil
+
+_logger = logging.getLogger(__name__)
+
+import fabio
+import fabio.file_series
+import h5py
+
+from .. import commonh5
+from .. import fabioh5
+
+
+class TestFabioH5(unittest.TestCase):
+ def setUp(self):
+ header = {
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ }
+ data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ self.fabio_image = fabio.numpyimage.NumpyImage(data, header)
+ self.h5_image = fabioh5.File(fabio_image=self.fabio_image)
+
+ def test_main_groups(self):
+ self.assertEqual(self.h5_image.h5py_class, h5py.File)
+ self.assertEqual(self.h5_image["/"].h5py_class, h5py.File)
+ self.assertEqual(self.h5_image["/scan_0"].h5py_class, h5py.Group)
+ self.assertEqual(self.h5_image["/scan_0/instrument"].h5py_class, h5py.Group)
+ self.assertEqual(self.h5_image["/scan_0/measurement"].h5py_class, h5py.Group)
+
+ def test_wrong_path_syntax(self):
+ # result tested with a default h5py file
+ self.assertRaises(ValueError, lambda: self.h5_image[""])
+
+ def test_wrong_root_name(self):
+ # result tested with a default h5py file
+ self.assertRaises(KeyError, lambda: self.h5_image["/foo"])
+
+ def test_wrong_root_path(self):
+ # result tested with a default h5py file
+ self.assertRaises(KeyError, lambda: self.h5_image["/foo/foo"])
+
+ def test_wrong_name(self):
+ # result tested with a default h5py file
+ self.assertRaises(KeyError, lambda: self.h5_image["foo"])
+
+ def test_wrong_path(self):
+ # result tested with a default h5py file
+ self.assertRaises(KeyError, lambda: self.h5_image["foo/foo"])
+
+ def test_single_frame(self):
+ data = numpy.arange(2 * 3)
+ data.shape = 2, 3
+ fabio_image = fabio.edfimage.edfimage(data=data)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (2, 3))
+ self.assertEqual(dataset[...][0, 0], 0)
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+
+ def test_multi_frames(self):
+ data = numpy.arange(2 * 3)
+ data.shape = 2, 3
+ fabio_image = fabio.edfimage.edfimage(data=data)
+ fabio_image.append_frame(data=data)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (2, 2, 3))
+ self.assertEqual(dataset[...][0, 0, 0], 0)
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+
+ def test_heterogeneous_frames(self):
+ """Frames containing 2 images with different sizes and a cube"""
+ data1 = numpy.arange(2 * 3)
+ data1.shape = 2, 3
+ data2 = numpy.arange(2 * 5)
+ data2.shape = 2, 5
+ data3 = numpy.arange(2 * 5 * 1)
+ data3.shape = 2, 5, 1
+ fabio_image = fabio.edfimage.edfimage(data=data1)
+ fabio_image.append_frame(data=data2)
+ fabio_image.append_frame(data=data3)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (3, 2, 5, 1))
+ self.assertEqual(dataset[...][0, 0, 0], 0)
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+
+ def test_single_3d_frame(self):
+ """Image source contains a cube"""
+ data = numpy.arange(2 * 3 * 4)
+ data.shape = 2, 3, 4
+ # Do not provide the data to the constructor to avoid slicing of the
+ # data. In this way the result stay a cube, and not a multi-frame
+ fabio_image = fabio.edfimage.edfimage()
+ fabio_image.data = data
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (2, 3, 4))
+ self.assertEqual(dataset[...][0, 0, 0], 0)
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+
+ def test_metadata_int(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/integer"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset[()], -100)
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (1,))
+
+ def test_metadata_float(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/float"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset[()], 1.0)
+ self.assertEqual(dataset.dtype.kind, "f")
+ self.assertEqual(dataset.shape, (1,))
+
+ def test_metadata_string(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/string"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset[()], numpy.string_("hi!"))
+ self.assertEqual(dataset.dtype.type, numpy.string_)
+ self.assertEqual(dataset.shape, (1,))
+
+ def test_metadata_list_integer(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_integer"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset.dtype.kind, "u")
+ self.assertEqual(dataset.shape, (1, 3))
+ self.assertEqual(dataset[0, 0], 100)
+ self.assertEqual(dataset[0, 1], 50)
+
+ def test_metadata_list_float(self):
+ dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_float"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset.dtype.kind, "f")
+ self.assertEqual(dataset.shape, (1, 3))
+ self.assertEqual(dataset[0, 0], 1.0)
+ self.assertEqual(dataset[0, 1], 2.0)
+
+ def test_metadata_list_looks_like_list(self):
+ dataset = self.h5_image[
+ "/scan_0/instrument/detector_0/others/string_looks_like_list"
+ ]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertEqual(dataset[()], numpy.string_("2000 hi!"))
+ self.assertEqual(dataset.dtype.type, numpy.string_)
+ self.assertEqual(dataset.shape, (1,))
+
+ def test_float_32(self):
+ float_list = ["1.2", "1.3", "1.4"]
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"float_item": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
+ # There is no equality between items
+ self.assertEqual(len(data), len(set(data)))
+ # At worst a float32
+ self.assertIn(data.dtype.kind, ["d", "f"])
+ self.assertLessEqual(data.dtype.itemsize, 32 / 8)
+
+ def test_float_64(self):
+ float_list = [
+ "1469117129.082226",
+ "1469117136.684986",
+ "1469117144.312749",
+ "1469117151.892507",
+ "1469117159.474265",
+ "1469117167.100027",
+ "1469117174.815799",
+ "1469117182.437561",
+ "1469117190.094326",
+ "1469117197.721089",
+ ]
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"time_of_day": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/time_of_day"]
+ # There is no equality between items
+ self.assertEqual(len(data), len(set(data)))
+ # At least a float64
+ self.assertIn(data.dtype.kind, ["d", "f"])
+ self.assertGreaterEqual(data.dtype.itemsize, 64 / 8)
+
+ def test_mixed_float_size__scalar(self):
+ # We expect to have a precision of 32 bits
+ float_list = ["1.2", "1.3001"]
+ expected_float_result = [1.2, 1.3001]
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"float_item": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
+ # At worst a float32
+ self.assertIn(data.dtype.kind, ["d", "f"])
+ self.assertLessEqual(data.dtype.itemsize, 32 / 8)
+ for computed, expected in zip(data, expected_float_result):
+ numpy.testing.assert_almost_equal(computed, expected, 5)
+
+ def test_mixed_float_size__list(self):
+ # We expect to have a precision of 32 bits
+ float_list = ["1.2 1.3001"]
+ expected_float_result = numpy.array([[1.2, 1.3001]])
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"float_item": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
+ # At worst a float32
+ self.assertIn(data.dtype.kind, ["d", "f"])
+ self.assertLessEqual(data.dtype.itemsize, 32 / 8)
+ for computed, expected in zip(data, expected_float_result):
+ numpy.testing.assert_almost_equal(computed, expected, 5)
+
+ def test_mixed_float_size__list_of_list(self):
+ # We expect to have a precision of 32 bits
+ float_list = ["1.2 1.3001", "1.3001 1.3001"]
+ expected_float_result = numpy.array([[1.2, 1.3001], [1.3001, 1.3001]])
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = None
+ for float_item in float_list:
+ header = {"float_item": float_item}
+ if fabio_image is None:
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_image.append_frame(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
+ # At worst a float32
+ self.assertIn(data.dtype.kind, ["d", "f"])
+ self.assertLessEqual(data.dtype.itemsize, 32 / 8)
+ for computed, expected in zip(data, expected_float_result):
+ numpy.testing.assert_almost_equal(computed, expected, 5)
+
+ def test_ub_matrix(self):
+ """Data from mediapix.edf"""
+ header = {}
+ header["UB_mne"] = "UB0 UB1 UB2 UB3 UB4 UB5 UB6 UB7 UB8"
+ header[
+ "UB_pos"
+ ] = "1.99593e-16 2.73682e-16 -1.54 -1.08894 1.08894 1.6083e-16 1.08894 1.08894 9.28619e-17"
+ header["sample_mne"] = "U0 U1 U2 U3 U4 U5"
+ header["sample_pos"] = "4.08 4.08 4.08 90 90 90"
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ sample = h5_image["/scan_0/sample"]
+ self.assertIsNotNone(sample)
+ self.assertEqual(sample.attrs["NXclass"], "NXsample")
+
+ d = sample["unit_cell_abc"]
+ expected = numpy.array([4.08, 4.08, 4.08])
+ self.assertIsNotNone(d)
+ self.assertEqual(d.shape, (3,))
+ self.assertIn(d.dtype.kind, ["d", "f"])
+ numpy.testing.assert_array_almost_equal(d[...], expected)
+
+ d = sample["unit_cell_alphabetagamma"]
+ expected = numpy.array([90.0, 90.0, 90.0])
+ self.assertIsNotNone(d)
+ self.assertEqual(d.shape, (3,))
+ self.assertIn(d.dtype.kind, ["d", "f"])
+ numpy.testing.assert_array_almost_equal(d[...], expected)
+
+ d = sample["ub_matrix"]
+ expected = numpy.array(
+ [
+ [
+ [1.99593e-16, 2.73682e-16, -1.54],
+ [-1.08894, 1.08894, 1.6083e-16],
+ [1.08894, 1.08894, 9.28619e-17],
+ ]
+ ]
+ )
+ self.assertIsNotNone(d)
+ self.assertEqual(d.shape, (1, 3, 3))
+ self.assertIn(d.dtype.kind, ["d", "f"])
+ numpy.testing.assert_array_almost_equal(d[...], expected)
+
+ def test_interpretation_mca_edf(self):
+ """EDF files with two or more headers starting with "MCA"
+ must have @interpretation = "spectrum" an the data."""
+ header = {
+ "Title": "zapimage samy -4.975 -5.095 80 500 samz -4.091 -4.171 70 0",
+ "MCA a": -23.812,
+ "MCA b": 2.7107,
+ "MCA c": 8.1164e-06,
+ }
+
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ data_dataset = h5_image["/scan_0/measurement/image_0/data"]
+ self.assertEqual(data_dataset.attrs["interpretation"], "spectrum")
+
+ data_dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(data_dataset.attrs["interpretation"], "spectrum")
+
+ data_dataset = h5_image["/scan_0/measurement/image_0/info/data"]
+ self.assertEqual(data_dataset.attrs["interpretation"], "spectrum")
+
+ def test_get_api(self):
+ result = self.h5_image.get("scan_0", getclass=True, getlink=True)
+ self.assertIs(result, h5py.HardLink)
+ result = self.h5_image.get("scan_0", getclass=False, getlink=True)
+ self.assertIsInstance(result, h5py.HardLink)
+ result = self.h5_image.get("scan_0", getclass=True, getlink=False)
+ self.assertIs(result, h5py.Group)
+ result = self.h5_image.get("scan_0", getclass=False, getlink=False)
+ self.assertIsInstance(result, commonh5.Group)
+
+ def test_detector_link(self):
+ detector1 = self.h5_image["/scan_0/instrument/detector_0"]
+ detector2 = self.h5_image["/scan_0/measurement/image_0/info"]
+ self.assertIsNot(detector1, detector2)
+ self.assertEqual(list(detector1.items()), list(detector2.items()))
+ self.assertEqual(
+ self.h5_image.get(detector2.name, getlink=True).path, detector1.name
+ )
+
+ def test_detector_data_link(self):
+ data1 = self.h5_image["/scan_0/instrument/detector_0/data"]
+ data2 = self.h5_image["/scan_0/measurement/image_0/data"]
+ self.assertIsNot(data1, data2)
+ self.assertIs(data1._get_data(), data2._get_data())
+ self.assertEqual(self.h5_image.get(data2.name, getlink=True).path, data1.name)
+
+ def test_dirty_header(self):
+ """Test that it does not fail"""
+ try:
+ header = {}
+ header["foo"] = b"abc"
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = fabio.edfimage.edfimage(data=data, header=header)
+ header = {}
+ header["foo"] = b"a\x90bc\xFE"
+ fabio_image.append_frame(data=data, header=header)
+ except Exception as e:
+ _logger.error(e.args[0])
+ _logger.debug("Backtrace", exc_info=True)
+ self.skipTest("fabio do not allow to create the resource")
+
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ scan_header_path = "/scan_0/instrument/file/scan_header"
+ self.assertIn(scan_header_path, h5_image)
+ data = h5_image[scan_header_path]
+ self.assertIsInstance(data[...], numpy.ndarray)
+
+ def test_unicode_header(self):
+ """Test that it does not fail"""
+ try:
+ header = {}
+ header["foo"] = b"abc"
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = fabio.edfimage.edfimage(data=data, header=header)
+ header = {}
+ header["foo"] = "abc\u2764"
+ fabio_image.append_frame(data=data, header=header)
+ except Exception as e:
+ _logger.error(e.args[0])
+ _logger.debug("Backtrace", exc_info=True)
+ self.skipTest("fabio do not allow to create the resource")
+
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+ scan_header_path = "/scan_0/instrument/file/scan_header"
+ self.assertIn(scan_header_path, h5_image)
+ data = h5_image[scan_header_path]
+ self.assertIsInstance(data[...], numpy.ndarray)
+
+
+class TestFabioH5MultiFrames(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ names = ["A", "B", "C", "D"]
+ values = [["32000", "-10", "5.0", "1"], ["-32000", "-10", "5.0", "1"]]
+
+ fabio_file = None
+
+ for i in range(10):
+ header = {
+ "image_id": "%d" % i,
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ "motor_mne": " ".join(names),
+ "motor_pos": " ".join(values[i % len(values)]),
+ "counter_mne": " ".join(names),
+ "counter_pos": " ".join(values[i % len(values)]),
+ }
+ for iname, name in enumerate(names):
+ header[name] = values[i % len(values)][iname]
+
+ data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ if fabio_file is None:
+ fabio_file = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_file.append_frame(data=data, header=header)
+
+ cls.fabio_file = fabio_file
+ cls.fabioh5 = fabioh5.File(fabio_image=fabio_file)
+
+ def test_others(self):
+ others = self.fabioh5["/scan_0/instrument/detector_0/others"]
+ dataset = others["A"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = others["B"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = others["C"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "f")
+ dataset = others["D"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "u")
+
+ def test_positioners(self):
+ counters = self.fabioh5["/scan_0/instrument/positioners"]
+ # At least 32 bits, no unsigned values
+ dataset = counters["A"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["B"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["C"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "f")
+ dataset = counters["D"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+
+ def test_counters(self):
+ counters = self.fabioh5["/scan_0/measurement"]
+ # At least 32 bits, no unsigned values
+ dataset = counters["A"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["B"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["C"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "f")
+ dataset = counters["D"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+
+
+class TestFabioH5WithEdf(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+
+ cls.edf_filename = os.path.join(cls.tmp_directory, "test.edf")
+
+ header = {
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ }
+ data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ fabio_image = fabio.edfimage.edfimage(data, header)
+ fabio_image.write(cls.edf_filename)
+
+ cls.fabio_image = fabio.open(cls.edf_filename)
+ cls.h5_image = fabioh5.File(fabio_image=cls.fabio_image)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.fabio_image = None
+ cls.h5_image = None
+ shutil.rmtree(cls.tmp_directory)
+
+ def test_reserved_format_metadata(self):
+ if fabio.hexversion < 327920: # 0.5.0 final
+ self.skipTest("fabio >= 0.5.0 final is needed")
+
+ # The EDF contains reserved keys in the header
+ self.assertIn("HeaderID", self.fabio_image.header)
+ # We do not expose them in FabioH5
+ self.assertNotIn("/scan_0/instrument/detector_0/others/HeaderID", self.h5_image)
+
+
+class _TestableFrameData(fabioh5.FrameData):
+ """Allow to test if the full data is reached."""
+
+ def _create_data(self):
+ raise RuntimeError("Not supposed to be called")
+
+
+class TestFabioH5WithFileSeries(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+
+ cls.edf_filenames = []
+
+ for i in range(10):
+ filename = os.path.join(cls.tmp_directory, "test_%04d.edf" % i)
+ cls.edf_filenames.append(filename)
+
+ header = {
+ "image_id": "%d" % i,
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ }
+ data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ fabio_image = fabio.edfimage.edfimage(data, header)
+ fabio_image.write(filename)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def _testH5Image(self, h5_image):
+ # test data
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEqual(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEqual(dataset.dtype.kind, "i")
+ self.assertEqual(dataset.shape, (10, 3, 2))
+ self.assertEqual(list(dataset[:, 0, 0]), list(range(10)))
+ self.assertEqual(dataset.attrs["interpretation"], "image")
+ # test metatdata
+ dataset = h5_image["/scan_0/instrument/detector_0/others/image_id"]
+ self.assertEqual(list(dataset[...]), list(range(10)))
+
+ def testFileList(self):
+ h5_image = fabioh5.File(file_series=self.edf_filenames)
+ self._testH5Image(h5_image)
+
+ def testFileSeries(self):
+ file_series = fabio.file_series.file_series(self.edf_filenames)
+ h5_image = fabioh5.File(file_series=file_series)
+ self._testH5Image(h5_image)
+
+ def testFrameDataCache(self):
+ file_series = fabio.file_series.file_series(self.edf_filenames)
+ reader = fabioh5.FabioReader(file_series=file_series)
+ frameData = _TestableFrameData("foo", reader)
+ self.assertEqual(frameData.dtype.kind, "i")
+ self.assertEqual(frameData.shape, (10, 3, 2))
diff --git a/src/silx/io/test/test_fioh5.py b/src/silx/io/test/test_fioh5.py
new file mode 100644
index 0000000..fed22a2
--- /dev/null
+++ b/src/silx/io/test/test_fioh5.py
@@ -0,0 +1,296 @@
+# /*##########################################################################
+# Copyright (C) 2021 Timo Fuchs
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for fioh5"""
+import numpy
+import os
+import tempfile
+import unittest
+
+from ..fioh5 import FioH5, is_fiofile, logger1, dtypeConverter
+
+
+__authors__ = ["T. Fuchs"]
+__license__ = "MIT"
+__date__ = "15/10/2021"
+
+fioftext = """
+!
+! Comments
+!
+%c
+ascan omega 180.0 180.5 3:10/1 4
+user username, acquisition started at Thu Dec 12 18:00:00 2021
+sweep motor lag: 1.0e-03
+channel 3: Detector
+!
+! Parameter
+!
+%p
+channel3_exposure = 1.000000e+00
+ScanName = ascan
+!
+! Data
+!
+%d
+ Col 1 omega(encoder) DOUBLE
+ Col 2 channel INTEGER
+ Col 3 filename STRING
+ Col 4 type STRING
+ Col 5 unix time DOUBLE
+ Col 6 enable BOOLEAN
+ Col 7 time_s FLOAT
+ 179.998418821 3 00001 exposure 1576165741.20308 1 1.243
+ 180.048418821 3 00002 exposure 1576165742.20308 1 1.243
+ 180.098418821 3 00003 exposure 1576165743.20308 1 1.243
+ 180.148418821 3 00004 exposure 1576165744.20308 1 1.243
+ 180.198418821 3 00005 exposure 1576165745.20308 1 1.243
+ 180.248418821 3 00006 exposure 1576165746.20308 1 1.243
+ 180.298418821 3 00007 exposure 1576165747.20308 1 1.243
+ 180.348418821 3 00008 exposure 1576165748.20308 1 1.243
+ 180.398418821 3 00009 exposure 1576165749.20308 1 1.243
+ 180.448418821 3 00010 exposure 1576165750.20308 1 1.243
+"""
+
+
+class TestFioH5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.TemporaryDirectory()
+ # fd, cls.fname = tempfile.mkstemp()
+ cls.fname_numbered = os.path.join(cls.temp_dir.name, "eh1scan_00005.fio")
+
+ with open(cls.fname_numbered, "w") as fiof:
+ fiof.write(fioftext)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.temp_dir.cleanup()
+ del cls.temp_dir
+
+ def setUp(self):
+ self.fioh5 = FioH5(self.fname_numbered)
+
+ def tearDown(self):
+ self.fioh5.close()
+
+ def testScanNumber(self):
+ # scan number is parsed from the file name.
+ self.assertIn("/5.1", self.fioh5)
+ self.assertIn("5.1", self.fioh5)
+
+ def testContainsFile(self):
+ self.assertIn("/5.1/measurement", self.fioh5)
+ self.assertNotIn("25.2", self.fioh5)
+ # measurement is a child of a scan, full path would be required to
+ # access from root level
+ self.assertNotIn("measurement", self.fioh5)
+ # Groups may or may not have a trailing /
+ self.assertIn("/5.1/measurement/", self.fioh5)
+ self.assertIn("/5.1/measurement", self.fioh5)
+ # Datasets can't have a trailing /
+ self.assertIn("/5.1/measurement/omega(encoder)", self.fioh5)
+ self.assertNotIn("/5.1/measurement/omega(encoder)/", self.fioh5)
+ # No gamma
+ self.assertNotIn("/5.1/measurement/gamma", self.fioh5)
+
+ def testContainsGroup(self):
+ self.assertIn("measurement", self.fioh5["/5.1/"])
+ self.assertIn("measurement", self.fioh5["/5.1"])
+ self.assertIn("5.1", self.fioh5["/"])
+ self.assertNotIn("5.2", self.fioh5["/"])
+ self.assertIn("measurement/filename", self.fioh5["/5.1"])
+ # illegal trailing "/" after dataset name
+ self.assertNotIn("measurement/filename/", self.fioh5["/5.1"])
+ # full path to element in group (OK)
+ self.assertIn("/5.1/measurement/filename", self.fioh5["/5.1/measurement"])
+
+ def testDataType(self):
+ meas = self.fioh5["/5.1/measurement/"]
+ self.assertEqual(meas["omega(encoder)"].dtype, dtypeConverter["DOUBLE"])
+ self.assertEqual(meas["channel"].dtype, dtypeConverter["INTEGER"])
+ self.assertEqual(meas["filename"].dtype, dtypeConverter["STRING"])
+ self.assertEqual(meas["time_s"].dtype, dtypeConverter["FLOAT"])
+ self.assertEqual(meas["enable"].dtype, dtypeConverter["BOOLEAN"])
+
+ def testDataColumn(self):
+ self.assertAlmostEqual(
+ sum(self.fioh5["/5.1/measurement/omega(encoder)"]), 1802.23418821
+ )
+ self.assertTrue(numpy.all(self.fioh5["/5.1/measurement/enable"]))
+
+ # --- comment section tests ---
+
+ def testComment(self):
+ # should hold the complete comment section
+ self.assertEqual(
+ self.fioh5["/5.1/instrument/fiofile/comments"],
+ """ascan omega 180.0 180.5 3:10/1 4
+user username, acquisition started at Thu Dec 12 18:00:00 2021
+sweep motor lag: 1.0e-03
+channel 3: Detector
+""",
+ )
+
+ def testDate(self):
+ # there is no convention on how to format the time. So just check its existence.
+ self.assertEqual(self.fioh5["/5.1/start_time"], "Thu Dec 12 18:00:00 2021")
+
+ def testTitle(self):
+ self.assertEqual(self.fioh5["/5.1/title"], "ascan omega 180.0 180.5 3:10/1 4")
+
+ # --- parameter section tests ---
+
+ def testParameter(self):
+ # should hold the complete parameter section
+ self.assertEqual(
+ self.fioh5["/5.1/instrument/fiofile/parameter"],
+ """channel3_exposure = 1.000000e+00
+ScanName = ascan
+""",
+ )
+
+ def testParsedParameter(self):
+ # no dtype is given, so everything is str.
+ self.assertEqual(
+ self.fioh5["/5.1/instrument/parameter/channel3_exposure"], "1.000000e+00"
+ )
+ self.assertEqual(self.fioh5["/5.1/instrument/parameter/ScanName"], "ascan")
+
+ def testNotFioH5(self):
+ testfilename = os.path.join(self.temp_dir.name, "eh1scan_00010.fio")
+ with open(testfilename, "w") as fiof:
+ fiof.write("!Not a fio file!")
+
+ self.assertRaises(IOError, FioH5, testfilename)
+
+ self.assertTrue(is_fiofile(self.fname_numbered))
+ self.assertFalse(is_fiofile(testfilename))
+
+ os.unlink(testfilename)
+
+
+class TestUnnumberedFioH5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.TemporaryDirectory()
+ cls.fname_nosuffix = os.path.join(cls.temp_dir.name, "eh1scan_nosuffix.fio")
+
+ with open(cls.fname_nosuffix, "w") as fiof:
+ fiof.write(fioftext)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.temp_dir.cleanup()
+ del cls.temp_dir
+
+ def setUp(self):
+ self.fioh5 = FioH5(self.fname_nosuffix)
+
+ def testLogMissingScanno(self):
+ with self.assertLogs(logger1, level="WARNING") as cm:
+ fioh5 = FioH5(self.fname_nosuffix)
+ self.assertIn("Cannot parse scan number of file", cm.output[0])
+
+ def testFallbackName(self):
+ self.assertIn("/eh1scan_nosuffix", self.fioh5)
+
+
+brokenHeaderText = """
+!
+! Comments
+!
+%c
+ascan omega 180.0 180.5 3:10/1 4
+user username, acquisited at Thu Dec 12 100 2021
+sweep motor lavgvf.0e-03
+channel 3: Detector
+!
+! Parameter
+!
+%p
+channel3_exposu65 1.000000e+00
+ScanName = ascan
+!
+! Data
+!
+%d
+ Col 1 omega(encoder) DOUBLE
+ Col 2 channel INTEGER
+ Col 3 filename STRING
+ Col 4 type STRING
+ Col 5 unix time DOUBLE
+ 179.998418821 3 00001 exposure 1576165741.20308
+ 180.048418821 3 00002 exposure 1576165742.20308
+ 180.098418821 3 00003 exposure 1576165743.20308
+ 180.148418821 3 00004 exposure 1576165744.20308
+ 180.198418821 3 00005 exposure 1576165745.20308
+ 180.248418821 3 00006 exposure 1576165746.20308
+ 180.298418821 3 00007 exposure 1576165747.20308
+ 180.348418821 3 00008 exposure 1576165748.20308
+ 180.398418821 3 00009 exposure 1576165749.20308
+ 180.448418821 3 00010 exposure 1576165750.20308
+"""
+
+
+class TestBrokenHeaderFioH5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.TemporaryDirectory()
+ cls.fname_numbered = os.path.join(cls.temp_dir.name, "eh1scan_00005.fio")
+
+ with open(cls.fname_numbered, "w") as fiof:
+ fiof.write(brokenHeaderText)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.temp_dir.cleanup()
+ del cls.temp_dir
+
+ def setUp(self):
+ self.fioh5 = FioH5(self.fname_numbered)
+
+ def testLogBrokenHeader(self):
+ with self.assertLogs(logger1, level="WARNING") as cm:
+ fioh5 = FioH5(self.fname_numbered)
+ self.assertIn("Cannot parse parameter section", cm.output[0])
+ self.assertIn("Cannot parse default comment section", cm.output[1])
+
+ def testComment(self):
+ # should hold the complete comment section
+ self.assertEqual(
+ self.fioh5["/5.1/instrument/fiofile/comments"],
+ """ascan omega 180.0 180.5 3:10/1 4
+user username, acquisited at Thu Dec 12 100 2021
+sweep motor lavgvf.0e-03
+channel 3: Detector
+""",
+ )
+
+ def testParameter(self):
+ # should hold the complete parameter section
+ self.assertEqual(
+ self.fioh5["/5.1/instrument/fiofile/parameter"],
+ """channel3_exposu65 1.000000e+00
+ScanName = ascan
+""",
+ )
diff --git a/src/silx/io/test/test_h5link_utils.py b/src/silx/io/test/test_h5link_utils.py
new file mode 100644
index 0000000..4140003
--- /dev/null
+++ b/src/silx/io/test/test_h5link_utils.py
@@ -0,0 +1,116 @@
+import os
+import pytest
+import h5py
+import numpy
+from silx.io import open
+from silx.io import h5link_utils
+
+
+@pytest.fixture(scope="module")
+def hdf5_with_external_data(tmpdir_factory):
+ tmpdir = tmpdir_factory.mktemp("hdf5_with_external_data")
+ master = str(tmpdir / "master.h5")
+ external_h5 = str(tmpdir / "external.h5")
+ external_raw = str(tmpdir / "external.raw")
+
+ data = numpy.array([100, 1000, 10000], numpy.uint16)
+ tshape = (1,) + data.shape
+
+ with h5py.File(master, "w") as fmaster:
+ dset = fmaster.create_dataset("data", data=data)
+
+ fmaster["int"] = h5py.SoftLink("data")
+
+ layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype)
+ layout[0] = h5py.VirtualSource(".", "data", shape=data.shape)
+ fmaster.create_virtual_dataset("vds0", layout)
+
+ with h5py.File(external_h5, "w") as f:
+ dset = f.create_dataset("data", data=data)
+ layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype)
+ layout[0] = h5py.VirtualSource(dset)
+ fmaster.create_virtual_dataset("vds1", layout)
+
+ layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype)
+ layout[0] = h5py.VirtualSource(
+ external_h5,
+ "data",
+ shape=data.shape,
+ )
+ fmaster.create_virtual_dataset("vds2", layout)
+ fmaster["ext1"] = h5py.ExternalLink(external_h5, "data")
+
+ layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype)
+ layout[0] = h5py.VirtualSource(
+ "external.h5",
+ "data",
+ shape=data.shape,
+ )
+ fmaster.create_virtual_dataset("vds3", layout)
+ fmaster["ext2"] = h5py.ExternalLink("external.h5", "data")
+
+ layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype)
+ layout[0] = h5py.VirtualSource(
+ "./external.h5",
+ "data",
+ shape=data.shape,
+ )
+ fmaster.create_virtual_dataset("vds4", layout)
+ fmaster["ext3"] = h5py.ExternalLink("./external.h5", "data")
+
+ data.tofile(external_raw)
+
+ external = [(external_raw, 0, 16 * 3)]
+ fmaster.create_dataset(
+ "raw1", external=external, shape=tshape, dtype=data.dtype
+ )
+
+ external = [("external.raw", 0, 16 * 3)]
+ fmaster.create_dataset(
+ "raw2", external=external, shape=tshape, dtype=data.dtype
+ )
+
+ external = [("./external.raw", 0, 16 * 3)]
+ fmaster.create_dataset(
+ "raw3", external=external, shape=tshape, dtype=data.dtype
+ )
+
+ # Validate links
+ expected = data.tolist()
+ cwd = os.getcwd()
+ with h5py.File(master, "r") as master:
+ for name in master:
+ if name in ("raw2", "raw3"):
+ os.chdir(str(tmpdir))
+ try:
+ data = master[name][()].flatten().tolist()
+ except Exception:
+ assert False, name
+ finally:
+ if name in ("raw2", "raw3"):
+ os.chdir(cwd)
+ assert data == expected, name
+
+ return tmpdir
+
+
+@pytest.mark.skipif("VirtualLayout" not in dir(h5py), reason="h5py is too old")
+def test_external_dataset_info(hdf5_with_external_data):
+ tmpdir = hdf5_with_external_data
+ master = str(tmpdir / "master.h5")
+ external_h5 = str(tmpdir / "external.h5")
+ external_raw = str(tmpdir / "external.raw")
+ with open(master) as f:
+ for name in f:
+ hdf5obj = f[name]
+ info = h5link_utils.external_dataset_info(hdf5obj)
+ if name in ("data", "int", "ext1", "ext2", "ext3"):
+ assert info is None, name
+ elif name == "vds0":
+ assert info.first_source_url == f"{master}::/data"
+ elif name in ("vds1", "vds2", "vds3", "vds4"):
+ assert info.first_source_url == f"{external_h5}::/data"
+ elif name in ("raw1", "raw2", "raw3"):
+ assert info.first_source_url == external_raw
+ else:
+ assert False, name
diff --git a/src/silx/io/test/test_h5py_utils.py b/src/silx/io/test/test_h5py_utils.py
new file mode 100644
index 0000000..0d10a78
--- /dev/null
+++ b/src/silx/io/test/test_h5py_utils.py
@@ -0,0 +1,482 @@
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for h5py utilities"""
+
+__authors__ = ["W. de Nolf"]
+__license__ = "MIT"
+__date__ = "27/01/2020"
+
+
+import unittest
+import os
+import sys
+import time
+import shutil
+import logging
+import tempfile
+import multiprocessing
+from contextlib import contextmanager
+
+from .. import h5py_utils
+from ...utils.retry import RetryError, RetryTimeoutError
+
+IS_WINDOWS = sys.platform == "win32"
+logger = logging.getLogger()
+
+
+def _subprocess_context_main(queue, contextmgr, *args, **kw):
+ try:
+ with contextmgr(*args, **kw):
+ queue.put(None)
+ queue.get()
+ except Exception:
+ queue.put(None)
+ raise
+
+
+@contextmanager
+def _subprocess_context(contextmgr, *args, **kw):
+ print("\nSTART", os.getpid())
+ timeout = kw.pop("timeout", 10)
+ queue = multiprocessing.Queue(maxsize=1)
+ p = multiprocessing.Process(
+ target=_subprocess_context_main, args=(queue, contextmgr) + args, kwargs=kw
+ )
+ p.start()
+ try:
+ queue.get(timeout=timeout)
+ yield
+ finally:
+ queue.put(None)
+ p.join(timeout)
+ print(" EXIT", os.getpid())
+
+
+@contextmanager
+def _open_context(filename, **kw):
+ try:
+ print(os.getpid(), "OPEN", filename, kw)
+ with h5py_utils.File(filename, **kw) as f:
+ if kw.get("mode") == "w":
+ f["check"] = True
+ f.flush()
+ yield f
+ except Exception:
+ print(" ", os.getpid(), "FAILED", filename, kw)
+ raise
+ else:
+ print(" ", os.getpid(), "CLOSED", filename, kw)
+
+
+def _cause_segfault():
+ import ctypes
+
+ i = ctypes.c_char(b"a")
+ j = ctypes.pointer(i)
+ c = 0
+ while True:
+ j[c] = b"a"
+ c += 1
+
+
+def _top_level_names_test(txtfilename, *args, **kw):
+ sys.stderr = open(os.devnull, "w")
+
+ with open(txtfilename, mode="r") as f:
+ failcounter = int(f.readline().strip())
+
+ ncausefailure = kw.pop("ncausefailure")
+ faildelay = kw.pop("faildelay")
+ if failcounter < ncausefailure:
+ time.sleep(faildelay)
+ failcounter += 1
+ with open(txtfilename, mode="w") as f:
+ f.write(str(failcounter))
+ if failcounter % 2:
+ raise RetryError
+ else:
+ _cause_segfault()
+ return h5py_utils._top_level_names(*args, **kw)
+
+
+top_level_names_test = h5py_utils.retry_in_subprocess()(_top_level_names_test)
+
+
+def subtests(test):
+ def wrapper(self):
+ for subtest_options in self._subtests():
+ print("\n====SUB TEST===\n")
+ print(f"sub test options: {subtest_options}")
+ with self.subTest(str(subtest_options)):
+ test(self)
+
+ return wrapper
+
+
+class TestH5pyUtils(unittest.TestCase):
+ def setUp(self):
+ self.test_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.test_dir)
+
+ def _subtests(self):
+ self._subtest_options = {"mode": "w"}
+ self.filename_generator = self._filenames()
+ yield self._subtest_options
+ self._subtest_options = {"mode": "w", "libver": "latest"}
+ self.filename_generator = self._filenames()
+ yield
+
+ def _filenames(self):
+ i = 1
+ while True:
+ filename = os.path.join(self.test_dir, "file{}.h5".format(i))
+ with self._open_context(filename):
+ pass
+ yield filename
+ i += 1
+
+ def _new_filename(self):
+ return next(self.filename_generator)
+
+ @contextmanager
+ def _open_context(self, filename, **kwargs):
+ kw = dict(self._subtest_options)
+ kw.update(kwargs)
+ with _open_context(filename, **kw) as f:
+ yield f
+
+ @contextmanager
+ def _open_context_subprocess(self, filename, **kwargs):
+ kw = dict(self._subtest_options)
+ kw.update(kwargs)
+ with _subprocess_context(_open_context, filename, **kw):
+ yield
+
+ def _assert_hdf5_data(self, f):
+ self.assertTrue(f["check"][()])
+
+ def _validate_hdf5_data(self, filename, swmr=False):
+ with self._open_context(filename, mode="r") as f:
+ self.assertEqual(f.swmr_mode, swmr)
+ self._assert_hdf5_data(f)
+
+ @subtests
+ def test_modes_single_process(self):
+ """Test concurrent access to the different files from the same process"""
+ # When using HDF5_USE_FILE_LOCKING, open files with and without
+ # locking should raise an exception. HDF5_USE_FILE_LOCKING should
+ # be reset when all files are closed.
+
+ orig = os.environ.get("HDF5_USE_FILE_LOCKING")
+ filename1 = self._new_filename()
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+ filename2 = self._new_filename()
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+
+ with self._open_context(filename1, mode="r"):
+ locking1 = False
+ for mode in ["r", "w", "a"]:
+ locking2 = mode != "r"
+ raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT
+ raise_condition &= locking1 != locking2
+ with self.assertRaisesIf(raise_condition, RuntimeError):
+ with self._open_context(filename2, mode=mode):
+ pass
+ self._validate_hdf5_data(filename1)
+ self._validate_hdf5_data(filename2)
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+
+ with self._open_context(filename1, mode="a"):
+ locking1 = True
+ for mode in ["r", "w", "a"]:
+ locking2 = mode != "r"
+ raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT
+ raise_condition &= locking1 != locking2
+ with self.assertRaisesIf(raise_condition, RuntimeError):
+ with self._open_context(filename2, mode=mode):
+ pass
+ self._validate_hdf5_data(filename1)
+ self._validate_hdf5_data(filename2)
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+
+ @property
+ def _libver_low_bound_is_v108(self):
+ libver = self._subtest_options.get("libver")
+ return h5py_utils._libver_low_bound_is_v108(libver)
+
+ @property
+ def _nonlocking_reader_before_writer(self):
+ """A non-locking reader must open the file before it is locked by a writer"""
+ if IS_WINDOWS and h5py_utils.HDF5_HAS_LOCKING_ARGUMENT:
+ return True
+ if not self._libver_low_bound_is_v108:
+ return True
+ return False
+
+ @contextmanager
+ def assertRaisesIf(self, condition, *args, **kw):
+ if condition:
+ with self.assertRaises(*args, **kw):
+ yield
+ else:
+ yield
+
+ @unittest.skipIf(
+ h5py_utils.HDF5_HAS_LOCKING_ARGUMENT != h5py_utils.H5PY_HAS_LOCKING_ARGUMENT,
+ "Versions of libhdf5 and h5py use incompatible file locking behaviour",
+ )
+ @subtests
+ def test_modes_multi_process(self):
+ """Test concurrent access to the same file from different processes"""
+ filename = self._new_filename()
+
+ nonlocking_reader_before_writer = self._nonlocking_reader_before_writer
+ writer_before_nonlocking_reader_exception = OSError
+ old_hdf5_on_windows = IS_WINDOWS and not h5py_utils.HDF5_HAS_LOCKING_ARGUMENT
+ locked_exception = OSError
+
+ # File locked by a writer
+ unexpected_access = old_hdf5_on_windows and self._libver_low_bound_is_v108
+ for wmode in ["w", "a"]:
+ with self._open_context_subprocess(filename, mode=wmode):
+ # Access by a second non-locking reader
+ with self.assertRaisesIf(
+ nonlocking_reader_before_writer,
+ writer_before_nonlocking_reader_exception,
+ ):
+ with self._open_context(filename, mode="r") as f:
+ self._assert_hdf5_data(f)
+ # No access by a second locking reader
+ if unexpected_access:
+ logger.warning("unexpected concurrent access by a locking reader")
+ with self.assertRaisesIf(not unexpected_access, locked_exception):
+ with self._open_context(filename, mode="r", locking=True) as f:
+ self._assert_hdf5_data(f)
+ # No access by a second writer
+ if unexpected_access:
+ logger.warning("unexpected concurrent access by a writer")
+ with self.assertRaisesIf(not unexpected_access, locked_exception):
+ with self._open_context(filename, mode="a") as f:
+ self._assert_hdf5_data(f)
+ # Check for file corruption
+ if not nonlocking_reader_before_writer:
+ self._validate_hdf5_data(filename)
+ self._validate_hdf5_data(filename)
+
+ # File locked by a reader
+ unexpected_access = old_hdf5_on_windows
+ with _subprocess_context(_open_context, filename, mode="r", locking=True):
+ # Access by a non-locking reader
+ with self._open_context(filename, mode="r") as f:
+ self._assert_hdf5_data(f)
+ # Access by a locking reader
+ with self._open_context(filename, mode="r", locking=True) as f:
+ self._assert_hdf5_data(f)
+ # No access by a second writer
+ if unexpected_access:
+ logger.warning("unexpected concurrent access by a writer")
+ raise_condition = not unexpected_access
+ with self.assertRaisesIf(raise_condition, locked_exception):
+ with self._open_context(filename, mode="a") as f:
+ self._assert_hdf5_data(f)
+ # Check for file corruption
+ self._validate_hdf5_data(filename)
+ self._validate_hdf5_data(filename)
+
+ # File open by a non-locking reader
+ with self._open_context_subprocess(filename, mode="r"):
+ # Access by a second non-locking reader
+ with self._open_context(filename, mode="r") as f:
+ self._assert_hdf5_data(f)
+ # Access by a second locking reader
+ with self._open_context(filename, mode="r", locking=True) as f:
+ self._assert_hdf5_data(f)
+ # Access by a second writer
+ with self._open_context(filename, mode="a") as f:
+ self._assert_hdf5_data(f)
+ # Check for file corruption
+ self._validate_hdf5_data(filename)
+ self._validate_hdf5_data(filename)
+
+ @subtests
+ @unittest.skipIf(not h5py_utils.HAS_SWMR, "SWMR not supported")
+ def test_modes_multi_process_swmr(self):
+ filename = self._new_filename()
+
+ with self._open_context(filename, mode="w", libver="latest") as f:
+ pass
+
+ # File open by SWMR writer
+ with self._open_context_subprocess(filename, mode="a", swmr=True):
+ with self._open_context(filename, mode="r") as f:
+ assert f.swmr_mode
+ self._assert_hdf5_data(f)
+ with self.assertRaises(OSError):
+ with self._open_context(filename, mode="a") as f:
+ pass
+ self._validate_hdf5_data(filename, swmr=True)
+
+ @subtests
+ def test_retry_defaults(self):
+ filename = self._new_filename()
+
+ names = h5py_utils.top_level_names(filename)
+ self.assertEqual(names, [])
+
+ names = h5py_utils.safe_top_level_names(filename)
+ self.assertEqual(names, [])
+
+ names = h5py_utils.top_level_names(filename, include_only=None)
+ self.assertEqual(names, ["check"])
+
+ names = h5py_utils.safe_top_level_names(filename, include_only=None)
+ self.assertEqual(names, ["check"])
+
+ with h5py_utils.open_item(filename, "/check", validate=lambda x: False) as item:
+ self.assertEqual(item, None)
+
+ with h5py_utils.open_item(filename, "/check", validate=None) as item:
+ self.assertTrue(item[()])
+
+ with self.assertRaises(RetryTimeoutError):
+ with h5py_utils.open_item(
+ filename,
+ "/check",
+ retry_timeout=0.1,
+ retry_invalid=True,
+ validate=lambda x: False,
+ ) as item:
+ pass
+
+ ncall = 0
+
+ def validate(item):
+ nonlocal ncall
+ if ncall >= 1:
+ return True
+ else:
+ ncall += 1
+ raise RetryError
+
+ with h5py_utils.open_item(
+ filename,
+ "/check",
+ validate=validate,
+ retry_timeout=1,
+ retry_invalid=True,
+ ) as item:
+ self.assertTrue(item[()])
+
+ @subtests
+ def test_retry_custom(self):
+ filename = self._new_filename()
+ ncausefailure = 3
+ faildelay = 0.1
+ sufficient_timeout = ncausefailure * (faildelay + 10)
+ insufficient_timeout = ncausefailure * faildelay * 0.5
+
+ @h5py_utils.retry_contextmanager()
+ def open_item(filename, name):
+ nonlocal failcounter
+ if failcounter < ncausefailure:
+ time.sleep(faildelay)
+ failcounter += 1
+ raise RetryError
+ with h5py_utils.File(filename) as h5file:
+ yield h5file[name]
+
+ failcounter = 0
+ kw = {"retry_timeout": sufficient_timeout}
+ with open_item(filename, "/check", **kw) as item:
+ self.assertTrue(item[()])
+
+ failcounter = 0
+ kw = {"retry_timeout": insufficient_timeout}
+ with self.assertRaises(RetryTimeoutError):
+ with open_item(filename, "/check", **kw) as item:
+ pass
+
+ @subtests
+ def test_retry_in_subprocess(self):
+ filename = self._new_filename()
+ txtfilename = os.path.join(self.test_dir, "failcounter.txt")
+ ncausefailure = 3
+ faildelay = 0.1
+ sufficient_timeout = ncausefailure * (faildelay + 10)
+ insufficient_timeout = ncausefailure * faildelay * 0.5
+
+ kw = {
+ "retry_timeout": sufficient_timeout,
+ "include_only": None,
+ "ncausefailure": ncausefailure,
+ "faildelay": faildelay,
+ }
+ with open(txtfilename, mode="w") as f:
+ f.write("0")
+ names = top_level_names_test(txtfilename, filename, **kw)
+ self.assertEqual(names, ["check"])
+
+ kw = {
+ "retry_timeout": insufficient_timeout,
+ "include_only": None,
+ "ncausefailure": ncausefailure,
+ "faildelay": faildelay,
+ }
+ with open(txtfilename, mode="w") as f:
+ f.write("0")
+ with self.assertRaises(RetryTimeoutError):
+ top_level_names_test(txtfilename, filename, **kw)
+
+ @subtests
+ def test_retry_generator(self):
+ filename = self._new_filename()
+ ncausefailure = 3
+ faildelay = 0.1
+ sufficient_timeout = ncausefailure * (faildelay + 10)
+ insufficient_timeout = ncausefailure * faildelay * 0.5
+
+ @h5py_utils.retry()
+ def iter_data(filename, name, start_index=0):
+ nonlocal failcounter
+ if start_index <= 0:
+ with h5py_utils.File(filename) as h5file:
+ yield h5file[name][()]
+ if failcounter < ncausefailure:
+ time.sleep(faildelay)
+ failcounter += 1
+ raise RetryError
+ if start_index <= 1:
+ with h5py_utils.File(filename) as h5file:
+ yield not h5file[name][()]
+
+ failcounter = 0
+ kw = {"retry_timeout": sufficient_timeout}
+ data = list(iter_data(filename, "/check", **kw))
+ self.assertEqual(data, [True, False])
+
+ failcounter = 0
+ kw = {"retry_timeout": insufficient_timeout}
+ with self.assertRaises(RetryTimeoutError):
+ list(iter_data(filename, "/check", **kw))
diff --git a/src/silx/io/test/test_nxdata.py b/src/silx/io/test/test_nxdata.py
new file mode 100644
index 0000000..1c64a71
--- /dev/null
+++ b/src/silx/io/test/test_nxdata.py
@@ -0,0 +1,727 @@
+# /*##########################################################################
+# Copyright (C) 2016-2022 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for NXdata parsing"""
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "24/03/2020"
+
+
+import tempfile
+import unittest
+
+import h5py
+import numpy
+import pytest
+
+from .. import nxdata
+from ..dictdump import dicttoh5
+
+
+text_dtype = h5py.special_dtype(vlen=str)
+
+
+class TestNXdata(unittest.TestCase):
+ def setUp(self):
+ tmp = tempfile.NamedTemporaryFile(
+ prefix="nxdata_examples_", suffix=".h5", delete=True
+ )
+ tmp.file.close()
+ self.h5fname = tmp.name
+ self.h5f = h5py.File(tmp.name, "w")
+
+ # SCALARS
+ g0d = self.h5f.create_group("scalars")
+
+ g0d0 = g0d.create_group("0D_scalar")
+ g0d0.attrs["NX_class"] = "NXdata"
+ g0d0.attrs["signal"] = "scalar"
+ g0d0.create_dataset("scalar", data=10)
+ g0d0.create_dataset("scalar_errors", data=0.1)
+
+ g0d1 = g0d.create_group("2D_scalars")
+ g0d1.attrs["NX_class"] = "NXdata"
+ g0d1.attrs["signal"] = "scalars"
+ ds = g0d1.create_dataset("scalars", data=numpy.arange(3 * 10).reshape((3, 10)))
+ ds.attrs["interpretation"] = "scalar"
+
+ g0d1 = g0d.create_group("4D_scalars")
+ g0d1.attrs["NX_class"] = "NXdata"
+ g0d1.attrs["signal"] = "scalars"
+ ds = g0d1.create_dataset(
+ "scalars", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10))
+ )
+ ds.attrs["interpretation"] = "scalar"
+
+ # SPECTRA
+ g1d = self.h5f.create_group("spectra")
+
+ g1d0 = g1d.create_group("1D_spectrum")
+ g1d0.attrs["NX_class"] = "NXdata"
+ g1d0.attrs["signal"] = "count"
+ g1d0.attrs["auxiliary_signals"] = numpy.array(
+ ["count2", "count3"], dtype=text_dtype
+ )
+ g1d0.attrs["axes"] = "energy_calib"
+ g1d0.attrs["uncertainties"] = numpy.array(
+ [
+ "energy_errors",
+ ],
+ dtype=text_dtype,
+ )
+ g1d0.create_dataset("count", data=numpy.arange(10))
+ g1d0.create_dataset("count2", data=0.5 * numpy.arange(10))
+ d = g1d0.create_dataset("count3", data=0.4 * numpy.arange(10))
+ d.attrs["long_name"] = "3rd counter"
+ g1d0.create_dataset("title", data="Title as dataset (like nexpy)")
+ g1d0.create_dataset("energy_calib", data=(10, 5)) # 10 * idx + 5
+ g1d0.create_dataset("energy_errors", data=3.14 * numpy.random.rand(10))
+
+ g1d1 = g1d.create_group("2D_spectra")
+ g1d1.attrs["NX_class"] = "NXdata"
+ g1d1.attrs["signal"] = "counts"
+ ds = g1d1.create_dataset("counts", data=numpy.arange(3 * 10).reshape((3, 10)))
+ ds.attrs["interpretation"] = "spectrum"
+
+ g1d2 = g1d.create_group("4D_spectra")
+ g1d2.attrs["NX_class"] = "NXdata"
+ g1d2.attrs["signal"] = "counts"
+ g1d2.attrs["axes"] = numpy.array(
+ [
+ "energy",
+ ],
+ dtype=text_dtype,
+ )
+ ds = g1d2.create_dataset(
+ "counts", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10))
+ )
+ ds.attrs["interpretation"] = "spectrum"
+ ds = g1d2.create_dataset("errors", data=4.5 * numpy.random.rand(2, 2, 3, 10))
+ ds = g1d2.create_dataset(
+ "energy", data=5 + 10 * numpy.arange(15), shuffle=True, compression="gzip"
+ )
+ ds.attrs["long_name"] = "Calibrated energy"
+ ds.attrs["first_good"] = 3
+ ds.attrs["last_good"] = 12
+ g1d2.create_dataset("energy_errors", data=10 * numpy.random.rand(15))
+
+ # IMAGES
+ g2d = self.h5f.create_group("images")
+
+ g2d0 = g2d.create_group("2D_regular_image")
+ g2d0.attrs["NX_class"] = "NXdata"
+ g2d0.attrs["signal"] = "image"
+ g2d0.attrs["auxiliary_signals"] = "image2"
+ g2d0.attrs["axes"] = numpy.array(
+ ["rows_calib", "columns_coordinates"], dtype=text_dtype
+ )
+ g2d0.create_dataset("image", data=numpy.arange(4 * 6).reshape((4, 6)))
+ g2d0.create_dataset("image2", data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds = g2d0.create_dataset("rows_calib", data=(10, 5))
+ ds.attrs["long_name"] = "Calibrated Y"
+ g2d0.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6))
+
+ g2d1 = g2d.create_group("2D_irregular_data")
+ g2d1.attrs["NX_class"] = "NXdata"
+ g2d1.attrs["signal"] = "data"
+ g2d1.attrs["title"] = "Title as group attr"
+ g2d1.attrs["axes"] = numpy.array(
+ ["rows_coordinates", "columns_coordinates"], dtype=text_dtype
+ )
+ g2d1.create_dataset("data", data=numpy.arange(64 * 128).reshape((64, 128)))
+ g2d1.create_dataset(
+ "rows_coordinates", data=numpy.arange(64) + numpy.random.rand(64)
+ )
+ g2d1.create_dataset(
+ "columns_coordinates", data=numpy.arange(128) + 2.5 * numpy.random.rand(128)
+ )
+
+ g2d2 = g2d.create_group("3D_images")
+ g2d2.attrs["NX_class"] = "NXdata"
+ g2d2.attrs["signal"] = "images"
+ ds = g2d2.create_dataset(
+ "images", data=numpy.arange(2 * 4 * 6).reshape((2, 4, 6))
+ )
+ ds.attrs["interpretation"] = "image"
+
+ g2d3 = g2d.create_group("5D_images")
+ g2d3.attrs["NX_class"] = "NXdata"
+ g2d3.attrs["signal"] = "images"
+ g2d3.attrs["axes"] = numpy.array(
+ ["rows_coordinates", "columns_coordinates"], dtype=text_dtype
+ )
+ ds = g2d3.create_dataset(
+ "images", data=numpy.arange(2 * 2 * 2 * 4 * 6).reshape((2, 2, 2, 4, 6))
+ )
+ ds.attrs["interpretation"] = "image"
+ g2d3.create_dataset("rows_coordinates", data=5 + 10 * numpy.arange(4))
+ g2d3.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6))
+
+ g2d4 = g2d.create_group("RGBA_image")
+ g2d4.attrs["NX_class"] = "NXdata"
+ g2d4.attrs["signal"] = "image"
+ g2d4.attrs["axes"] = numpy.array(
+ ["rows_calib", "columns_coordinates"], dtype=text_dtype
+ )
+ rgba_image = numpy.linspace(0, 1, num=7 * 8 * 3).reshape((7, 8, 3))
+ rgba_image[:, :, 1] = (
+ 1 - rgba_image[:, :, 1]
+ ) # invert G channel to add some color
+ ds = g2d4.create_dataset("image", data=rgba_image)
+ ds.attrs["interpretation"] = "rgba-image"
+ ds = g2d4.create_dataset("rows_calib", data=(10, 5))
+ ds.attrs["long_name"] = "Calibrated Y"
+ g2d4.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(8))
+
+ # SCATTER
+ g = self.h5f.create_group("scatters")
+
+ gd0 = g.create_group("x_y_scatter")
+ gd0.attrs["NX_class"] = "NXdata"
+ gd0.attrs["signal"] = "y"
+ gd0.attrs["axes"] = numpy.array(
+ [
+ "x",
+ ],
+ dtype=text_dtype,
+ )
+ gd0.create_dataset("y", data=numpy.random.rand(128) - 0.5)
+ gd0.create_dataset("x", data=2 * numpy.random.rand(128))
+ gd0.create_dataset("x_errors", data=0.05 * numpy.random.rand(128))
+ gd0.create_dataset("errors", data=0.05 * numpy.random.rand(128))
+
+ gd1 = g.create_group("x_y_value_scatter")
+ gd1.attrs["NX_class"] = "NXdata"
+ gd1.attrs["signal"] = "values"
+ gd1.attrs["axes"] = numpy.array(["x", "y"], dtype=text_dtype)
+ gd1.create_dataset("values", data=3.14 * numpy.random.rand(128))
+ gd1.create_dataset("y", data=numpy.random.rand(128))
+ gd1.create_dataset("y_errors", data=0.02 * numpy.random.rand(128))
+ gd1.create_dataset("x", data=numpy.random.rand(128))
+ gd1.create_dataset("x_errors", data=0.02 * numpy.random.rand(128))
+
+ def tearDown(self):
+ self.h5f.close()
+
+ def testValidity(self):
+ for group in self.h5f:
+ for subgroup in self.h5f[group]:
+ self.assertTrue(
+ nxdata.is_valid_nxdata(self.h5f[group][subgroup]),
+ "%s/%s not found to be a valid NXdata group" % (group, subgroup),
+ )
+
+ def testScalars(self):
+ nxd = nxdata.NXdata(self.h5f["scalars/0D_scalar"])
+ self.assertTrue(nxd.signal_is_0d)
+ self.assertEqual(nxd.signal[()], 10)
+ self.assertEqual(nxd.axes_names, [])
+ self.assertEqual(nxd.axes_dataset_names, [])
+ self.assertEqual(nxd.axes, [])
+ self.assertIsNotNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+
+ nxd = nxdata.NXdata(self.h5f["scalars/2D_scalars"])
+ self.assertTrue(nxd.signal_is_2d)
+ self.assertEqual(nxd.signal[1, 2], 12)
+ self.assertEqual(nxd.axes_names, [None, None])
+ self.assertEqual(nxd.axes_dataset_names, [None, None])
+ self.assertEqual(nxd.axes, [None, None])
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "scalar")
+
+ nxd = nxdata.NXdata(self.h5f["scalars/4D_scalars"])
+ self.assertFalse(
+ nxd.signal_is_0d or nxd.signal_is_1d or nxd.signal_is_2d or nxd.signal_is_3d
+ )
+ self.assertEqual(nxd.signal[1, 0, 1, 4], 74)
+ self.assertEqual(nxd.axes_names, [None, None, None, None])
+ self.assertEqual(nxd.axes_dataset_names, [None, None, None, None])
+ self.assertEqual(nxd.axes, [None, None, None, None])
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "scalar")
+
+ def testSpectra(self):
+ nxd = nxdata.NXdata(self.h5f["spectra/1D_spectrum"])
+ self.assertTrue(nxd.signal_is_1d)
+ self.assertTrue(nxd.is_curve)
+ self.assertTrue(numpy.array_equal(numpy.array(nxd.signal), numpy.arange(10)))
+ self.assertEqual(nxd.axes_names, ["energy_calib"])
+ self.assertEqual(nxd.axes_dataset_names, ["energy_calib"])
+ self.assertEqual(nxd.axes[0][0], 10)
+ self.assertEqual(nxd.axes[0][1], 5)
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+ self.assertEqual(nxd.title, "Title as dataset (like nexpy)")
+
+ self.assertEqual(nxd.auxiliary_signals_dataset_names, ["count2", "count3"])
+ self.assertEqual(nxd.auxiliary_signals_names, ["count2", "3rd counter"])
+ self.assertAlmostEqual(
+ nxd.auxiliary_signals[1][2], 0.8
+ ) # numpy.arange(10) * 0.4
+
+ nxd = nxdata.NXdata(self.h5f["spectra/2D_spectra"])
+ self.assertTrue(nxd.signal_is_2d)
+ self.assertTrue(nxd.is_curve)
+ self.assertEqual(nxd.axes_names, [None, None])
+ self.assertEqual(nxd.axes_dataset_names, [None, None])
+ self.assertEqual(nxd.axes, [None, None])
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "spectrum")
+
+ nxd = nxdata.NXdata(self.h5f["spectra/4D_spectra"])
+ self.assertFalse(
+ nxd.signal_is_0d or nxd.signal_is_1d or nxd.signal_is_2d or nxd.signal_is_3d
+ )
+ self.assertTrue(nxd.is_curve)
+ self.assertEqual(nxd.axes_names, [None, None, None, "Calibrated energy"])
+ self.assertEqual(nxd.axes_dataset_names, [None, None, None, "energy"])
+ self.assertEqual(nxd.axes[:3], [None, None, None])
+ self.assertEqual(nxd.axes[3].shape, (10,)) # dataset shape (15, ) sliced [3:12]
+ self.assertIsNotNone(nxd.errors)
+ self.assertEqual(nxd.errors.shape, (2, 2, 3, 10))
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "spectrum")
+ self.assertEqual(nxd.get_axis_errors("energy").shape, (10,))
+ # test getting axis errors by long_name
+ self.assertTrue(
+ numpy.array_equal(
+ nxd.get_axis_errors("Calibrated energy"), nxd.get_axis_errors("energy")
+ )
+ )
+ self.assertTrue(
+ numpy.array_equal(
+ nxd.get_axis_errors(b"Calibrated energy"), nxd.get_axis_errors("energy")
+ )
+ )
+
+ def testImages(self):
+ nxd = nxdata.NXdata(self.h5f["images/2D_regular_image"])
+ self.assertTrue(nxd.signal_is_2d)
+ self.assertTrue(nxd.is_image)
+ self.assertEqual(nxd.axes_names, ["Calibrated Y", "columns_coordinates"])
+ self.assertEqual(
+ list(nxd.axes_dataset_names), ["rows_calib", "columns_coordinates"]
+ )
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+ self.assertEqual(len(nxd.auxiliary_signals), 1)
+ self.assertEqual(nxd.auxiliary_signals_names, ["image2"])
+
+ nxd = nxdata.NXdata(self.h5f["images/2D_irregular_data"])
+ self.assertTrue(nxd.signal_is_2d)
+ self.assertTrue(nxd.is_image)
+
+ self.assertEqual(nxd.axes_dataset_names, nxd.axes_names)
+ self.assertEqual(
+ list(nxd.axes_dataset_names), ["rows_coordinates", "columns_coordinates"]
+ )
+ self.assertEqual(len(nxd.axes), 2)
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+ self.assertEqual(nxd.title, "Title as group attr")
+
+ nxd = nxdata.NXdata(self.h5f["images/5D_images"])
+ self.assertTrue(nxd.is_image)
+ self.assertFalse(
+ nxd.signal_is_0d or nxd.signal_is_1d or nxd.signal_is_2d or nxd.signal_is_3d
+ )
+ self.assertEqual(
+ nxd.axes_names,
+ [None, None, None, "rows_coordinates", "columns_coordinates"],
+ )
+ self.assertEqual(
+ nxd.axes_dataset_names,
+ [None, None, None, "rows_coordinates", "columns_coordinates"],
+ )
+ self.assertIsNone(nxd.errors)
+ self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
+ self.assertEqual(nxd.interpretation, "image")
+
+ nxd = nxdata.NXdata(self.h5f["images/RGBA_image"])
+ self.assertTrue(nxd.is_image)
+ self.assertEqual(nxd.interpretation, "rgba-image")
+ self.assertTrue(nxd.signal_is_3d)
+ self.assertEqual(nxd.axes_names, ["Calibrated Y", "columns_coordinates", None])
+ self.assertEqual(
+ list(nxd.axes_dataset_names), ["rows_calib", "columns_coordinates", None]
+ )
+
+ def testScatters(self):
+ nxd = nxdata.NXdata(self.h5f["scatters/x_y_scatter"])
+ self.assertTrue(nxd.signal_is_1d)
+ self.assertEqual(nxd.axes_names, ["x"])
+ self.assertEqual(nxd.axes_dataset_names, ["x"])
+ self.assertIsNotNone(nxd.errors)
+ self.assertEqual(nxd.get_axis_errors("x").shape, (128,))
+ self.assertTrue(nxd.is_scatter)
+ self.assertFalse(nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+
+ nxd = nxdata.NXdata(self.h5f["scatters/x_y_value_scatter"])
+ self.assertFalse(nxd.signal_is_1d)
+ self.assertTrue(nxd.axes_dataset_names, nxd.axes_names)
+ self.assertEqual(nxd.axes_dataset_names, ["x", "y"])
+ self.assertEqual(nxd.get_axis_errors("x").shape, (128,))
+ self.assertEqual(nxd.get_axis_errors("y").shape, (128,))
+ self.assertEqual(len(nxd.axes), 2)
+ self.assertIsNone(nxd.errors)
+ self.assertTrue(nxd.is_scatter)
+ self.assertTrue(nxd.is_x_y_value_scatter)
+ self.assertIsNone(nxd.interpretation)
+
+
+class TestLegacyNXdata(unittest.TestCase):
+ def setUp(self):
+ tmp = tempfile.NamedTemporaryFile(
+ prefix="nxdata_legacy_examples_", suffix=".h5", delete=True
+ )
+ tmp.file.close()
+ self.h5fname = tmp.name
+ self.h5f = h5py.File(tmp.name, "w")
+
+ def tearDown(self):
+ self.h5f.close()
+
+ def testSignalAttrOnDataset(self):
+ g = self.h5f.create_group("2D")
+ g.attrs["NX_class"] = "NXdata"
+
+ ds0 = g.create_dataset("image0", data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds0.attrs["signal"] = 1
+ ds0.attrs["long_name"] = "My first image"
+
+ ds1 = g.create_dataset("image1", data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds1.attrs["signal"] = "2"
+ ds1.attrs["long_name"] = "My 2nd image"
+
+ ds2 = g.create_dataset("image2", data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds2.attrs["signal"] = 3
+
+ nxd = nxdata.NXdata(self.h5f["2D"])
+
+ self.assertEqual(nxd.signal_dataset_name, "image0")
+ self.assertEqual(nxd.signal_name, "My first image")
+ self.assertEqual(nxd.signal.shape, (4, 6))
+
+ self.assertEqual(len(nxd.auxiliary_signals), 2)
+ self.assertEqual(nxd.auxiliary_signals[1].shape, (4, 6))
+
+ self.assertEqual(nxd.auxiliary_signals_dataset_names, ["image1", "image2"])
+ self.assertEqual(nxd.auxiliary_signals_names, ["My 2nd image", "image2"])
+
+ def testAxesOnSignalDataset(self):
+ g = self.h5f.create_group("2D")
+ g.attrs["NX_class"] = "NXdata"
+
+ ds0 = g.create_dataset("image0", data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds0.attrs["signal"] = 1
+ ds0.attrs["axes"] = "yaxis:xaxis"
+
+ ds1 = g.create_dataset("yaxis", data=numpy.arange(4))
+ ds2 = g.create_dataset("xaxis", data=numpy.arange(6))
+
+ nxd = nxdata.NXdata(self.h5f["2D"])
+
+ self.assertEqual(nxd.axes_dataset_names, ["yaxis", "xaxis"])
+ self.assertTrue(numpy.array_equal(nxd.axes[0], numpy.arange(4)))
+ self.assertTrue(numpy.array_equal(nxd.axes[1], numpy.arange(6)))
+
+ def testAxesOnAxesDatasets(self):
+ g = self.h5f.create_group("2D")
+ g.attrs["NX_class"] = "NXdata"
+
+ ds0 = g.create_dataset("image0", data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds0.attrs["signal"] = 1
+ ds1 = g.create_dataset("yaxis", data=numpy.arange(4))
+ ds1.attrs["axis"] = 0
+ ds2 = g.create_dataset("xaxis", data=numpy.arange(6))
+ ds2.attrs["axis"] = "1"
+
+ nxd = nxdata.NXdata(self.h5f["2D"])
+ self.assertEqual(nxd.axes_dataset_names, ["yaxis", "xaxis"])
+ self.assertTrue(numpy.array_equal(nxd.axes[0], numpy.arange(4)))
+ self.assertTrue(numpy.array_equal(nxd.axes[1], numpy.arange(6)))
+
+ def testAsciiUndefinedAxesAttrs(self):
+ """Some files may not be using utf8 for str attrs"""
+ g = self.h5f.create_group("bytes_attrs")
+ g.attrs["NX_class"] = b"NXdata"
+ g.attrs["signal"] = b"image0"
+ g.attrs["axes"] = b"yaxis", b"."
+
+ g.create_dataset("image0", data=numpy.arange(4 * 6).reshape((4, 6)))
+ g.create_dataset("yaxis", data=numpy.arange(4))
+
+ nxd = nxdata.NXdata(self.h5f["bytes_attrs"])
+ self.assertEqual(nxd.axes_dataset_names, ["yaxis", None])
+
+
+class TestSaveNXdata(unittest.TestCase):
+ def setUp(self):
+ tmp = tempfile.NamedTemporaryFile(prefix="nxdata", suffix=".h5", delete=True)
+ tmp.file.close()
+ self.h5fname = tmp.name
+
+ def testSimpleSave(self):
+ sig = numpy.array([0, 1, 2])
+ a0 = numpy.array([2, 3, 4])
+ a1 = numpy.array([3, 4, 5])
+ nxdata.save_NXdata(
+ filename=self.h5fname,
+ signal=sig,
+ axes=[a0, a1],
+ signal_name="sig",
+ axes_names=["a0", "a1"],
+ nxentry_name="a",
+ nxdata_name="mydata",
+ )
+
+ h5f = h5py.File(self.h5fname, "r")
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"]))
+
+ nxd = nxdata.NXdata(h5f["/a/mydata"])
+ self.assertTrue(numpy.array_equal(nxd.signal, sig))
+ self.assertTrue(numpy.array_equal(nxd.axes[0], a0))
+
+ h5f.close()
+
+ def testSimplestSave(self):
+ sig = numpy.array([0, 1, 2])
+ nxdata.save_NXdata(filename=self.h5fname, signal=sig)
+
+ h5f = h5py.File(self.h5fname, "r")
+
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["/entry/data0"]))
+
+ nxd = nxdata.NXdata(h5f["/entry/data0"])
+ self.assertTrue(numpy.array_equal(nxd.signal, sig))
+ h5f.close()
+
+ def testSaveDefaultAxesNames(self):
+ sig = numpy.array([0, 1, 2])
+ a0 = numpy.array([2, 3, 4])
+ a1 = numpy.array([3, 4, 5])
+ nxdata.save_NXdata(
+ filename=self.h5fname,
+ signal=sig,
+ axes=[a0, a1],
+ signal_name="sig",
+ axes_names=None,
+ axes_long_names=["a", "b"],
+ nxentry_name="a",
+ nxdata_name="mydata",
+ )
+
+ h5f = h5py.File(self.h5fname, "r")
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"]))
+
+ nxd = nxdata.NXdata(h5f["/a/mydata"])
+ self.assertTrue(numpy.array_equal(nxd.signal, sig))
+ self.assertTrue(numpy.array_equal(nxd.axes[0], a0))
+ self.assertEqual(nxd.axes_dataset_names, ["dim0", "dim1"])
+ self.assertEqual(nxd.axes_names, ["a", "b"])
+
+ h5f.close()
+
+ def testSaveToExistingEntry(self):
+ h5f = h5py.File(self.h5fname, "w")
+ g = h5f.create_group("myentry")
+ g.attrs["NX_class"] = "NXentry"
+ h5f.close()
+
+ sig = numpy.array([0, 1, 2])
+ a0 = numpy.array([2, 3, 4])
+ a1 = numpy.array([3, 4, 5])
+ nxdata.save_NXdata(
+ filename=self.h5fname,
+ signal=sig,
+ axes=[a0, a1],
+ signal_name="sig",
+ axes_names=["a0", "a1"],
+ nxentry_name="myentry",
+ nxdata_name="toto",
+ )
+
+ h5f = h5py.File(self.h5fname, "r")
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["myentry/toto"]))
+
+ nxd = nxdata.NXdata(h5f["myentry/toto"])
+ self.assertTrue(numpy.array_equal(nxd.signal, sig))
+ self.assertTrue(numpy.array_equal(nxd.axes[0], a0))
+ h5f.close()
+
+
+class TestGetDefault:
+ """Test silx.io.nxdata.get_default function"""
+
+ @pytest.fixture
+ def hdf5_file(self, tmp_path):
+ with h5py.File(tmp_path / "test_file.h5", "w") as h5f:
+ yield h5f
+
+ def testDirectPath(self, hdf5_file):
+ dicttoh5(
+ {
+ ("", "default"): "/nxentry/nxprocess/nxdata",
+ "nxentry": {
+ "nxprocess": {
+ "nxdata": {
+ ("", "NX_class"): "NXdata",
+ ("", "signal"): "data",
+ "data": (1, 2, 3),
+ }
+ }
+ },
+ },
+ hdf5_file,
+ )
+ default = nxdata.get_default(hdf5_file)
+ assert isinstance(default, nxdata.NXdata)
+ assert default.group.name == "/nxentry/nxprocess/nxdata"
+
+ def testAbsolutePath(self, hdf5_file):
+ dicttoh5(
+ {
+ ("", "default"): "/nxentry",
+ "nxentry": {
+ ("", "default"): "/nxentry/nxprocess/nxdata",
+ "nxprocess": {
+ "nxdata": {
+ ("", "NX_class"): "NXdata",
+ ("", "signal"): "data",
+ "data": (1, 2, 3),
+ }
+ },
+ },
+ },
+ hdf5_file,
+ )
+ default = nxdata.get_default(hdf5_file)
+ assert isinstance(default, nxdata.NXdata)
+ assert default.group.name == "/nxentry/nxprocess/nxdata"
+
+ def testRelativePath(self, hdf5_file):
+ dicttoh5(
+ {
+ ("", "default"): "nxentry",
+ "nxentry": {
+ ("", "default"): "nxdata",
+ "nxdata": {
+ ("", "NX_class"): "NXdata",
+ ("", "signal"): "data",
+ "data": (1, 2, 3),
+ },
+ },
+ },
+ hdf5_file,
+ )
+ default = nxdata.get_default(hdf5_file)
+ assert isinstance(default, nxdata.NXdata)
+ assert default.group.name == "/nxentry/nxdata"
+
+ def testRelativePathSubdir(self, hdf5_file):
+ dicttoh5(
+ {
+ ("", "default"): "nxentry",
+ "nxentry": {
+ ("", "default"): "nxprocess/nxdata",
+ "nxprocess": {
+ "nxdata": {
+ ("", "NX_class"): "NXdata",
+ ("", "signal"): "data",
+ "data": (1, 2, 3),
+ }
+ },
+ },
+ },
+ hdf5_file,
+ )
+ default = nxdata.get_default(hdf5_file)
+ assert isinstance(default, nxdata.NXdata)
+ assert default.group.name == "/nxentry/nxprocess/nxdata"
+
+ def testRecursiveAbsolutePath(self, hdf5_file):
+ dicttoh5(
+ {
+ ("", "default"): "/nxentry",
+ "nxentry": {
+ ("", "default"): "/nxentry/nxprocess",
+ "nxprocess": {
+ ("", "default"): "/nxentry/nxprocess/nxdata",
+ "nxdata": {
+ ("", "NX_class"): "NXdata",
+ ("", "signal"): "data",
+ "data": (1, 2, 3),
+ },
+ },
+ },
+ },
+ hdf5_file,
+ )
+ default = nxdata.get_default(hdf5_file)
+ assert isinstance(default, nxdata.NXdata)
+ assert default.group.name == "/nxentry/nxprocess/nxdata"
+
+ def testRecursiveRelativePath(self, hdf5_file):
+ dicttoh5(
+ {
+ ("", "default"): "nxentry",
+ "nxentry": {
+ ("", "default"): "nxprocess",
+ "nxprocess": {
+ ("", "default"): "nxdata",
+ "nxdata": {
+ ("", "NX_class"): "NXdata",
+ ("", "signal"): "data",
+ "data": (1, 2, 3),
+ },
+ },
+ },
+ },
+ hdf5_file,
+ )
+ default = nxdata.get_default(hdf5_file)
+ assert isinstance(default, nxdata.NXdata)
+ assert default.group.name == "/nxentry/nxprocess/nxdata"
+
+ def testLoop(self, hdf5_file):
+ """Infinite loop of @default"""
+ dicttoh5(
+ {
+ ("", "default"): "/nxentry",
+ "nxentry": {
+ ("", "default"): "/nxentry",
+ },
+ },
+ hdf5_file,
+ )
+ default = nxdata.get_default(hdf5_file)
+ assert default is None
diff --git a/src/silx/io/test/test_octaveh5.py b/src/silx/io/test/test_octaveh5.py
new file mode 100644
index 0000000..479ef85
--- /dev/null
+++ b/src/silx/io/test/test_octaveh5.py
@@ -0,0 +1,197 @@
+# /*##########################################################################
+# Copyright (C) 2016 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""
+Tests for the octaveh5 module
+"""
+
+__authors__ = ["C. Nemoz", "H. Payno"]
+__license__ = "MIT"
+__date__ = "12/07/2016"
+
+import unittest
+import os
+import tempfile
+
+try:
+ from ..octaveh5 import Octaveh5
+except ImportError:
+ Octaveh5 = None
+
+
+@unittest.skipIf(Octaveh5 is None, "Could not import h5py")
+class TestOctaveH5(unittest.TestCase):
+ @staticmethod
+ def _get_struct_FT():
+ return {
+ "NO_CHECK": 0.0,
+ "SHOWSLICE": 1.0,
+ "DOTOMO": 1.0,
+ "DATABASE": 0.0,
+ "ANGLE_OFFSET": 0.0,
+ "VOLSELECTION_REMEMBER": 0.0,
+ "NUM_PART": 4.0,
+ "VOLOUTFILE": 0.0,
+ "RINGSCORRECTION": 0.0,
+ "DO_TEST_SLICE": 1.0,
+ "ZEROOFFMASK": 1.0,
+ "VERSION": "fastomo3 version 2.0",
+ "CORRECT_SPIKES_THRESHOLD": 0.040000000000000001,
+ "SHOWPROJ": 0.0,
+ "HALF_ACQ": 0.0,
+ "ANGLE_OFFSET_VALUE": 0.0,
+ "FIXEDSLICE": "middle",
+ "VOLSELECT": "total",
+ }
+
+ @staticmethod
+ def _get_struct_PYHSTEXE():
+ return {
+ "EXE": "PyHST2_2015d",
+ "VERBOSE": 0.0,
+ "OFFV": "PyHST2_2015d",
+ "TOMO": 0.0,
+ "VERBOSE_FILE": "pyhst_out.txt",
+ "DIR": "/usr/bin/",
+ "OFFN": "pyhst2",
+ }
+
+ @staticmethod
+ def _get_struct_FTAXIS():
+ return {
+ "POSITION_VALUE": 12345.0,
+ "COR_ERROR": 0.0,
+ "FILESDURINGSCAN": 0.0,
+ "PLOTFIGURE": 1.0,
+ "DIM1": 0.0,
+ "OVERSAMPLING": 5.0,
+ "TO_THE_CENTER": 1.0,
+ "POSITION": "fixed",
+ "COR_POSITION": 0.0,
+ "HA": 0.0,
+ }
+
+ @staticmethod
+ def _get_struct_PAGANIN():
+ return {
+ "MKEEP_MASK": 0.0,
+ "UNSHARP_SIGMA": 0.80000000000000004,
+ "DILATE": 2.0,
+ "UNSHARP_COEFF": 3.0,
+ "MEDIANR": 4.0,
+ "DB": 500.0,
+ "MKEEP_ABS": 0.0,
+ "MODE": 0.0,
+ "THRESHOLD": 0.5,
+ "MKEEP_BONE": 0.0,
+ "DB2": 100.0,
+ "MKEEP_CORR": 0.0,
+ "MKEEP_SOFT": 0.0,
+ }
+
+ @staticmethod
+ def _get_struct_BEAMGEO():
+ return {"DIST": 55.0, "SY": 0.0, "SX": 0.0, "TYPE": "p"}
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.test_3_6_fname = os.path.join(
+ self.tempdir, "silx_tmp_t00_octaveTest_3_6.h5"
+ )
+ self.test_3_8_fname = os.path.join(
+ self.tempdir, "silx_tmp_t00_octaveTest_3_8.h5"
+ )
+
+ def tearDown(self):
+ if os.path.isfile(self.test_3_6_fname):
+ os.unlink(self.test_3_6_fname)
+ if os.path.isfile(self.test_3_8_fname):
+ os.unlink(self.test_3_8_fname)
+
+ def testWritedIsReaded(self):
+ """
+ Simple test to write and reaf the structure compatible with the octave h5 using structure.
+ This test is for # test for octave version > 3.8
+ """
+ writer = Octaveh5()
+
+ writer.open(self.test_3_8_fname, "a")
+ # step 1 writing the file
+ writer.write("FT", self._get_struct_FT())
+ writer.write("PYHSTEXE", self._get_struct_PYHSTEXE())
+ writer.write("FTAXIS", self._get_struct_FTAXIS())
+ writer.write("PAGANIN", self._get_struct_PAGANIN())
+ writer.write("BEAMGEO", self._get_struct_BEAMGEO())
+ writer.close()
+
+ # step 2 reading the file
+ reader = Octaveh5().open(self.test_3_8_fname)
+ # 2.1 check FT
+ data_readed = reader.get("FT")
+ self.assertEqual(data_readed, self._get_struct_FT())
+ # 2.2 check PYHSTEXE
+ data_readed = reader.get("PYHSTEXE")
+ self.assertEqual(data_readed, self._get_struct_PYHSTEXE())
+ # 2.3 check FTAXIS
+ data_readed = reader.get("FTAXIS")
+ self.assertEqual(data_readed, self._get_struct_FTAXIS())
+ # 2.4 check PAGANIN
+ data_readed = reader.get("PAGANIN")
+ self.assertEqual(data_readed, self._get_struct_PAGANIN())
+ # 2.5 check BEAMGEO
+ data_readed = reader.get("BEAMGEO")
+ self.assertEqual(data_readed, self._get_struct_BEAMGEO())
+ reader.close()
+
+ def testWritedIsReadedOldOctaveVersion(self):
+ """The same test as testWritedIsReaded but for octave version < 3.8"""
+ # test for octave version < 3.8
+ writer = Octaveh5(3.6)
+
+ writer.open(self.test_3_6_fname, "a")
+
+ # step 1 writing the file
+ writer.write("FT", self._get_struct_FT())
+ writer.write("PYHSTEXE", self._get_struct_PYHSTEXE())
+ writer.write("FTAXIS", self._get_struct_FTAXIS())
+ writer.write("PAGANIN", self._get_struct_PAGANIN())
+ writer.write("BEAMGEO", self._get_struct_BEAMGEO())
+ writer.close()
+
+ # step 2 reading the file
+ reader = Octaveh5(3.6).open(self.test_3_6_fname)
+ # 2.1 check FT
+ data_readed = reader.get("FT")
+ self.assertEqual(data_readed, self._get_struct_FT())
+ # 2.2 check PYHSTEXE
+ data_readed = reader.get("PYHSTEXE")
+ self.assertEqual(data_readed, self._get_struct_PYHSTEXE())
+ # 2.3 check FTAXIS
+ data_readed = reader.get("FTAXIS")
+ self.assertEqual(data_readed, self._get_struct_FTAXIS())
+ # 2.4 check PAGANIN
+ data_readed = reader.get("PAGANIN")
+ self.assertEqual(data_readed, self._get_struct_PAGANIN())
+ # 2.5 check BEAMGEO
+ data_readed = reader.get("BEAMGEO")
+ self.assertEqual(data_readed, self._get_struct_BEAMGEO())
+ reader.close()
diff --git a/src/silx/io/test/test_rawh5.py b/src/silx/io/test/test_rawh5.py
new file mode 100644
index 0000000..fb5caec
--- /dev/null
+++ b/src/silx/io/test/test_rawh5.py
@@ -0,0 +1,83 @@
+# /*##########################################################################
+#
+# Copyright (c) 2016 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ###########################################################################*/
+"""Test for silx.gui.hdf5 module"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "21/09/2017"
+
+
+import unittest
+import tempfile
+import numpy
+import shutil
+from .. import rawh5
+
+
+class TestNumpyFile(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tmpDirectory = tempfile.mkdtemp()
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmpDirectory)
+
+ def testNumpyFile(self):
+ filename = "%s/%s.npy" % (self.tmpDirectory, self.id())
+ c = numpy.random.rand(5, 5)
+ numpy.save(filename, c)
+ h5 = rawh5.NumpyFile(filename)
+ self.assertIn("data", h5)
+ self.assertEqual(h5["data"].dtype.kind, "f")
+
+ def testNumpyZFile(self):
+ filename = "%s/%s.npz" % (self.tmpDirectory, self.id())
+ a = numpy.array("aaaaa")
+ b = numpy.array([1, 2, 3, 4])
+ c = numpy.random.rand(5, 5)
+ d = numpy.array(b"aaaaa")
+ e = numpy.array("i \u2661 my mother")
+ numpy.savez(filename, a, b=b, c=c, d=d, e=e)
+ h5 = rawh5.NumpyFile(filename)
+ self.assertIn("arr_0", h5)
+ self.assertIn("b", h5)
+ self.assertIn("c", h5)
+ self.assertIn("d", h5)
+ self.assertIn("e", h5)
+ self.assertEqual(h5["arr_0"].dtype.kind, "U")
+ self.assertEqual(h5["b"].dtype.kind, "i")
+ self.assertEqual(h5["c"].dtype.kind, "f")
+ self.assertEqual(h5["d"].dtype.kind, "S")
+ self.assertEqual(h5["e"].dtype.kind, "U")
+
+ def testNumpyZFileContainingDirectories(self):
+ filename = "%s/%s.npz" % (self.tmpDirectory, self.id())
+ data = {}
+ data["a/b/c"] = numpy.arange(10)
+ data["a/b/e"] = numpy.arange(10)
+ numpy.savez(filename, **data)
+ h5 = rawh5.NumpyFile(filename)
+ self.assertIn("a/b/c", h5)
+ self.assertIn("a/b/e", h5)
diff --git a/src/silx/io/test/test_sliceh5.py b/src/silx/io/test/test_sliceh5.py
new file mode 100644
index 0000000..8ccf14a
--- /dev/null
+++ b/src/silx/io/test/test_sliceh5.py
@@ -0,0 +1,104 @@
+# /*##########################################################################
+# Copyright (C) 2022-2023 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+import contextlib
+from io import BytesIO
+
+import h5py
+import numpy
+import pytest
+
+import silx.io
+from silx.io import commonh5
+from silx.io._sliceh5 import DatasetSlice, _combine_indices
+
+
+@contextlib.contextmanager
+def h5py_file(filename, mode):
+ with BytesIO() as buffer:
+ with h5py.File(buffer, mode) as h5file:
+ yield h5file
+
+
+@pytest.fixture(params=[commonh5.File, h5py_file])
+def temp_h5file(request):
+ temp_file_context = request.param
+ with temp_file_context("tempfile.h5", "w") as h5file:
+ yield h5file
+
+
+@pytest.mark.parametrize("indices", [1, slice(None), (1, slice(1, 4))])
+def test_datasetslice(temp_h5file, indices):
+ data = numpy.arange(50).reshape(10, 5)
+ ref_data = numpy.array(data[indices], copy=False)
+
+ h5dataset = temp_h5file.create_group("group").create_dataset("dataset", data=data)
+
+ with DatasetSlice(h5dataset, indices, attrs={}) as dset:
+ assert silx.io.is_dataset(dset)
+ assert dset.file == temp_h5file
+ assert dset.shape == ref_data.shape
+ assert dset.size == ref_data.size
+ assert dset.dtype == ref_data.dtype
+ assert len(dset) == len(ref_data)
+ assert numpy.array_equal(dset[()], ref_data)
+ assert dset.name == h5dataset.name
+
+
+def test_datasetslice_on_external_link(tmp_path):
+ data = numpy.arange(10).reshape(5, 2)
+
+ external_filename = str(tmp_path / "external.h5")
+ ext_dataset_name = "/external_data"
+ with h5py.File(external_filename, "w") as h5file:
+ h5file[ext_dataset_name] = data
+
+ with h5py.File(tmp_path / "test.h5", "w") as h5file:
+ h5file["group/data"] = h5py.ExternalLink(external_filename, ext_dataset_name)
+
+ with DatasetSlice(h5file["group/data"], slice(None), attrs={}) as dset:
+ assert dset.name == ext_dataset_name
+ assert numpy.array_equal(dset[()], data)
+
+
+@pytest.mark.parametrize(
+ "shape,outer_indices,indices",
+ [
+ ((2, 5, 10), (-1, slice(None), slice(None)), slice(None)),
+ ((2, 5, 10), (-1, slice(None), slice(None)), Ellipsis),
+ # negative strides
+ ((5, 10), (slice(1, 5, 2), slice(2, 8)), (slice(2, 3), slice(4, None, -2))),
+ (
+ (5, 10),
+ (slice(4, None, -1), slice(9, 3, -2)),
+ (slice(1, 3), slice(3, 0, -1)),
+ ),
+ ((5, 10), (slice(1, 8, 2), slice(None)), slice(2, 8)), # slice overflow
+ ],
+)
+def test_combine_indices(shape, outer_indices, indices):
+ data = numpy.arange(numpy.prod(shape)).reshape(shape)
+ ref_data = data[outer_indices][indices]
+
+ combined_indices = _combine_indices(shape, outer_indices, indices)
+
+ assert numpy.array_equal(data[combined_indices], ref_data)
diff --git a/src/silx/io/test/test_specfile.py b/src/silx/io/test/test_specfile.py
new file mode 100644
index 0000000..1b84a65
--- /dev/null
+++ b/src/silx/io/test/test_specfile.py
@@ -0,0 +1,386 @@
+# /*##########################################################################
+# Copyright (C) 2016-2023 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for specfile wrapper"""
+
+__authors__ = ["P. Knobel", "V.A. Sole"]
+__license__ = "MIT"
+__date__ = "17/01/2018"
+
+
+import locale
+import logging
+import numpy
+import os
+import sys
+import tempfile
+import unittest
+
+from silx.utils import testutils
+
+from ..specfile import SpecFile, Scan
+from .. import specfile
+
+
+logger1 = logging.getLogger(__name__)
+
+sftext = """#F /tmp/sf.dat
+#E 1455180875
+#D Thu Feb 11 09:54:35 2016
+#C imaging User = opid17
+#U00 user comment first line
+#U01 This is a dummy file to test SpecFile parsing
+#U02
+#U03 last line
+
+#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
+#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
+#o0 pshg mrtu mrtd
+#o2 ss1vo ss1ho ss1vg
+
+#J0 Seconds IA ion.mono Current
+#J1 xbpmc2 idgap1 Inorm
+
+#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
+#D Thu Feb 11 09:55:20 2016
+#T 0.2 (Seconds)
+#G0 0
+#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
+#G3 0 0 0 0 0 0 0 0 0
+#G4 0
+#Q
+#P0 180.005 -0.66875 0.87125
+#P1 14.74255 16.197579 12.238283
+#UMI0 Current AutoM Shutter
+#UMI1 192.51 OFF FE open
+#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00;
+#N 4
+#L first column second column 3rd_col
+-1.23 5.89 8
+8.478100E+01 5 1.56
+3.14 2.73 -3.14
+1.2 2.3 3.4
+
+#S 25 ascan c3th 1.33245 1.52245 40 0.15
+#D Thu Feb 11 10:00:31 2016
+#P0 80.005 -1.66875 1.87125
+#P1 4.74255 6.197579 2.238283
+#N 5
+#L column0 column1 col2 col3
+0.0 0.1 0.2 0.3
+1.0 1.1 1.2 1.3
+2.0 2.1 2.2 2.3
+3.0 3.1 3.2 3.3
+
+#S 26 yyyyyy
+#D Thu Feb 11 09:55:20 2016
+#P0 80.005 -1.66875 1.87125
+#P1 4.74255 6.197579 2.238283
+#N 4
+#L first column second column 3rd_col
+#C Sat Oct 31 15:51:47 1998. Scan aborted after 0 points.
+
+#F /tmp/sf.dat
+#E 1455180876
+#D Thu Feb 11 09:54:36 2016
+
+#S 1 aaaaaa
+#U first duplicate line
+#U second duplicate line
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+3 4
+@A 3.1 4 5
+5 6
+@A 6 7.7 8
+"""
+
+
+loc = locale.getlocale(locale.LC_NUMERIC)
+try:
+ locale.setlocale(locale.LC_NUMERIC, "de_DE.utf8")
+except locale.Error:
+ try_DE = False
+else:
+ try_DE = True
+ locale.setlocale(locale.LC_NUMERIC, loc)
+
+
+class TestSpecFile(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname1 = tempfile.mkstemp(text=False)
+ os.write(fd, bytes(sftext, "ascii"))
+ os.close(fd)
+
+ fd2, cls.fname2 = tempfile.mkstemp(text=False)
+ os.write(fd2, bytes(sftext[370:923], "ascii"))
+ os.close(fd2)
+
+ fd3, cls.fname3 = tempfile.mkstemp(text=False)
+ txt = sftext[371:923]
+ os.write(fd3, bytes(txt, "ascii"))
+ os.close(fd3)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname1)
+ os.unlink(cls.fname2)
+ os.unlink(cls.fname3)
+
+ def setUp(self):
+ self.sf = SpecFile(self.fname1)
+ self.scan1 = self.sf[0]
+ self.scan1_2 = self.sf["1.2"]
+ self.scan25 = self.sf["25.1"]
+ self.empty_scan = self.sf["26.1"]
+
+ self.sf_no_fhdr = SpecFile(self.fname2)
+ self.scan1_no_fhdr = self.sf_no_fhdr[0]
+
+ self.sf_no_fhdr_crash = SpecFile(self.fname3)
+ self.scan1_no_fhdr_crash = self.sf_no_fhdr_crash[0]
+
+ def tearDown(self):
+ self.sf.close()
+ self.sf_no_fhdr.close()
+ self.sf_no_fhdr_crash.close()
+
+ def test_open(self):
+ self.assertIsInstance(self.sf, SpecFile)
+ with self.assertRaises(specfile.SfErrFileOpen):
+ SpecFile("doesnt_exist.dat")
+
+ # test filename types unicode and bytes
+ try:
+ SpecFile(self.fname1)
+ except TypeError:
+ self.fail("failed to handle filename as python3 str")
+ try:
+ SpecFile(bytes(self.fname1, "utf-8"))
+ except TypeError:
+ self.fail("failed to handle filename as python3 bytes")
+
+ def test_number_of_scans(self):
+ self.assertEqual(4, len(self.sf))
+
+ def test_list_of_scan_indices(self):
+ self.assertEqual(self.sf.list(), [1, 25, 26, 1])
+ self.assertEqual(self.sf.keys(), ["1.1", "25.1", "26.1", "1.2"])
+
+ def test_index_number_order(self):
+ self.assertEqual(self.sf.index(1, 2), 3) # sf["1.2"]==sf[3]
+ self.assertEqual(self.sf.number(1), 25) # sf[1]==sf["25"]
+ self.assertEqual(self.sf.order(3), 2) # sf[3]==sf["1.2"]
+ with self.assertRaises(specfile.SfErrScanNotFound):
+ self.sf.index(3, 2)
+ with self.assertRaises(specfile.SfErrScanNotFound):
+ self.sf.index(99)
+
+ def assertRaisesRegex(self, *args, **kwargs):
+ return super(TestSpecFile, self).assertRaisesRegex(*args, **kwargs)
+
+ def test_getitem(self):
+ self.assertIsInstance(self.sf[2], Scan)
+ self.assertIsInstance(self.sf["1.2"], Scan)
+ # int out of range
+ with self.assertRaisesRegex(IndexError, "Scan index must be in ran"):
+ self.sf[107]
+ # float indexing not allowed
+ with self.assertRaisesRegex(TypeError, "The scan identification k"):
+ self.sf[1.2]
+ # non existant scan with "N.M" indexing
+ with self.assertRaises(KeyError):
+ self.sf["3.2"]
+
+ def test_specfile_iterator(self):
+ i = 0
+ for scan in self.sf:
+ if i == 1:
+ self.assertEqual(scan.motor_positions, self.sf[1].motor_positions)
+ i += 1
+ # number of returned scans
+ self.assertEqual(i, len(self.sf))
+
+ def test_scan_index(self):
+ self.assertEqual(self.scan1.index, 0)
+ self.assertEqual(self.scan1_2.index, 3)
+ self.assertEqual(self.scan25.index, 1)
+
+ def test_scan_headers(self):
+ self.assertEqual(
+ self.scan25.scan_header_dict["S"],
+ "25 ascan c3th 1.33245 1.52245 40 0.15",
+ )
+ self.assertEqual(self.scan1.header[17], "#G0 0")
+ self.assertEqual(len(self.scan1.header), 29)
+ # parsing headers with long keys
+ self.assertEqual(
+ self.scan1.scan_header_dict["UMI0"], "Current AutoM Shutter"
+ )
+ # parsing empty headers
+ self.assertEqual(self.scan1.scan_header_dict["Q"], "")
+ # duplicate headers: concatenated (with newline)
+ self.assertEqual(
+ self.scan1_2.scan_header_dict["U"],
+ "first duplicate line\nsecond duplicate line",
+ )
+
+ def test_file_headers(self):
+ self.assertEqual(self.scan1.header[1], "#E 1455180875")
+ self.assertEqual(self.scan1.file_header_dict["F"], "/tmp/sf.dat")
+
+ def test_multiple_file_headers(self):
+ """Scan 1.2 is after the second file header, with a different
+ Epoch"""
+ self.assertEqual(self.scan1_2.header[1], "#E 1455180876")
+
+ def test_scan_labels(self):
+ self.assertEqual(
+ self.scan1.labels, ["first column", "second column", "3rd_col"]
+ )
+
+ def test_data(self):
+ # data_line() and data_col() take 1-based indices as arg
+ self.assertAlmostEqual(self.scan1.data_line(1)[2], 1.56)
+ # tests for data transposition between original file and .data attr
+ self.assertAlmostEqual(self.scan1.data[2, 0], 8)
+ self.assertEqual(self.scan1.data.shape, (3, 4))
+ self.assertAlmostEqual(numpy.sum(self.scan1.data), 113.631)
+
+ def test_data_column_by_name(self):
+ self.assertAlmostEqual(self.scan25.data_column_by_name("col2")[1], 1.2)
+ # Scan.data is transposed after readinq, so column is the first index
+ self.assertAlmostEqual(
+ numpy.sum(self.scan25.data_column_by_name("col2")),
+ numpy.sum(self.scan25.data[2, :]),
+ )
+ with self.assertRaises(specfile.SfErrColNotFound):
+ self.scan25.data_column_by_name("ygfxgfyxg")
+
+ def test_motors(self):
+ self.assertEqual(len(self.scan1.motor_names), 6)
+ self.assertEqual(len(self.scan1.motor_positions), 6)
+ self.assertAlmostEqual(sum(self.scan1.motor_positions), 223.385912)
+ self.assertEqual(self.scan1.motor_names[1], "MRTSlit UP")
+ self.assertAlmostEqual(
+ self.scan25.motor_position_by_name("MRTSlit UP"), -1.66875
+ )
+
+ def test_absence_of_file_header(self):
+ """We expect Scan.file_header to be an empty list in the absence
+ of a file header.
+ """
+ self.assertEqual(len(self.scan1_no_fhdr.motor_names), 0)
+ # motor positions can still be read in the scan header
+ # even in the absence of motor names
+ self.assertAlmostEqual(sum(self.scan1_no_fhdr.motor_positions), 223.385912)
+ self.assertEqual(len(self.scan1_no_fhdr.header), 15)
+ self.assertEqual(len(self.scan1_no_fhdr.scan_header), 15)
+ self.assertEqual(len(self.scan1_no_fhdr.file_header), 0)
+
+ def test_crash_absence_of_file_header(self):
+ """Test no crash in absence of file header and no leading newline
+ character
+ """
+ self.assertEqual(len(self.scan1_no_fhdr_crash.motor_names), 0)
+ # motor positions can still be read in the scan header
+ # even in the absence of motor names
+ self.assertAlmostEqual(
+ sum(self.scan1_no_fhdr_crash.motor_positions), 223.385912
+ )
+ self.assertEqual(len(self.scan1_no_fhdr_crash.scan_header), 15)
+ self.assertEqual(len(self.scan1_no_fhdr_crash.file_header), 0)
+
+ def test_mca(self):
+ self.assertEqual(len(self.scan1.mca), 0)
+ self.assertEqual(len(self.scan1_2.mca), 3)
+ self.assertEqual(self.scan1_2.mca[1][2], 5)
+ self.assertEqual(sum(self.scan1_2.mca[2]), 21.7)
+
+ # Negative indexing
+ self.assertEqual(
+ sum(self.scan1_2.mca[len(self.scan1_2.mca) - 1]), sum(self.scan1_2.mca[-1])
+ )
+
+ # Test iterator
+ line_count, total_sum = (0, 0)
+ for mca_line in self.scan1_2.mca:
+ line_count += 1
+ total_sum += sum(mca_line)
+ self.assertEqual(line_count, 3)
+ self.assertAlmostEqual(total_sum, 36.8)
+
+ def test_mca_header(self):
+ self.assertEqual(self.scan1.mca_header_dict, {})
+ self.assertEqual(len(self.scan1_2.mca_header_dict), 4)
+ self.assertEqual(self.scan1_2.mca_header_dict["CALIB"], "1 2 3")
+ self.assertEqual(self.scan1_2.mca.calibration, [[1.0, 2.0, 3.0]])
+ # default calib in the absence of #@CALIB
+ self.assertEqual(self.scan25.mca.calibration, [[0.0, 1.0, 0.0]])
+ self.assertEqual(self.scan1_2.mca.channels, [[0, 1, 2]])
+ # absence of #@CHANN and spectra
+ self.assertEqual(self.scan25.mca.channels, [])
+
+ @testutils.validate_logging(specfile._logger.name, warning=1)
+ def test_empty_scan(self):
+ """Test reading a scan with no data points"""
+ self.assertEqual(len(self.empty_scan.labels), 3)
+ col1 = self.empty_scan.data_column_by_name("second column")
+ self.assertEqual(col1.shape, (0,))
+
+
+class TestSFLocale(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp(text=False)
+ os.write(fd, bytes(sftext, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+ locale.setlocale(locale.LC_NUMERIC, loc) # restore saved locale
+
+ def crunch_data(self):
+ self.sf3 = SpecFile(self.fname)
+ self.assertAlmostEqual(self.sf3[0].data_line(1)[2], 1.56)
+ self.sf3.close()
+
+ @unittest.skipIf(not try_DE, "de_DE.utf8 locale not installed")
+ def test_locale_de_DE(self):
+ locale.setlocale(locale.LC_NUMERIC, "de_DE.utf8")
+ self.crunch_data()
+
+ def test_locale_user(self):
+ locale.setlocale(locale.LC_NUMERIC, "") # use user's preferred locale
+ self.crunch_data()
+
+ def test_locale_C(self):
+ locale.setlocale(locale.LC_NUMERIC, "C") # use default (C) locale
+ self.crunch_data()
diff --git a/src/silx/io/test/test_specfilewrapper.py b/src/silx/io/test/test_specfilewrapper.py
new file mode 100644
index 0000000..e830023
--- /dev/null
+++ b/src/silx/io/test/test_specfilewrapper.py
@@ -0,0 +1,189 @@
+# /*##########################################################################
+# Copyright (C) 2016-2023 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for old specfile wrapper"""
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "15/05/2017"
+
+import logging
+import numpy
+import os
+import sys
+import tempfile
+import unittest
+
+logger1 = logging.getLogger(__name__)
+
+from ..specfilewrapper import Specfile
+
+sftext = """#F /tmp/sf.dat
+#E 1455180875
+#D Thu Feb 11 09:54:35 2016
+#C imaging User = opid17
+#U00 user comment first line
+#U01 This is a dummy file to test SpecFile parsing
+#U02
+#U03 last line
+
+#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
+#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
+#o0 pshg mrtu mrtd
+#o2 ss1vo ss1ho ss1vg
+
+#J0 Seconds IA ion.mono Current
+#J1 xbpmc2 idgap1 Inorm
+
+#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
+#D Thu Feb 11 09:55:20 2016
+#T 0.2 (Seconds)
+#G0 0
+#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
+#G3 0 0 0 0 0 0 0 0 0
+#G4 0
+#Q
+#P0 180.005 -0.66875 0.87125
+#P1 14.74255 16.197579 12.238283
+#UMI0 Current AutoM Shutter
+#UMI1 192.51 OFF FE open
+#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00;
+#N 4
+#L first column second column 3rd_col
+-1.23 5.89 8
+8.478100E+01 5 1.56
+3.14 2.73 -3.14
+1.2 2.3 3.4
+
+#S 25 ascan c3th 1.33245 1.52245 40 0.15
+#D Thu Feb 11 10:00:31 2016
+#P0 80.005 -1.66875 1.87125
+#P1 4.74255 6.197579 2.238283
+#N 5
+#L column0 column1 col2 col3
+0.0 0.1 0.2 0.3
+1.0 1.1 1.2 1.3
+2.0 2.1 2.2 2.3
+3.0 3.1 3.2 3.3
+
+#F /tmp/sf.dat
+#E 1455180876
+#D Thu Feb 11 09:54:36 2016
+
+#S 1 aaaaaa
+#U first duplicate line
+#U second duplicate line
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+3 4
+@A 3.1 4 5
+5 6
+@A 6 7.7 8
+"""
+
+
+class TestSpecfilewrapper(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname1 = tempfile.mkstemp(text=False)
+ os.write(fd, bytes(sftext, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname1)
+
+ def setUp(self):
+ self.sf = Specfile(self.fname1)
+ self.scan1 = self.sf[0]
+ self.scan1_2 = self.sf.select("1.2")
+ self.scan25 = self.sf.select("25.1")
+
+ def tearDown(self):
+ self.sf.close()
+
+ def test_number_of_scans(self):
+ self.assertEqual(3, len(self.sf))
+
+ def test_list_of_scan_indices(self):
+ self.assertEqual(self.sf.list(), "1,25,1")
+ self.assertEqual(self.sf.keys(), ["1.1", "25.1", "1.2"])
+
+ def test_scan_headers(self):
+ self.assertEqual(
+ self.scan25.header("S"), ["#S 25 ascan c3th 1.33245 1.52245 40 0.15"]
+ )
+ self.assertEqual(self.scan1.header("G0"), ["#G0 0"])
+ # parsing headers with long keys
+ # parsing empty headers
+ self.assertEqual(self.scan1.header("Q"), ["#Q "])
+
+ def test_file_headers(self):
+ self.assertEqual(self.scan1.header("E"), ["#E 1455180875"])
+ self.assertEqual(self.sf.title(), "imaging")
+ self.assertEqual(self.sf.epoch(), 1455180875)
+ self.assertEqual(
+ self.sf.allmotors(),
+ [
+ "Pslit HGap",
+ "MRTSlit UP",
+ "MRTSlit DOWN",
+ "Sslit1 VOff",
+ "Sslit1 HOff",
+ "Sslit1 VGap",
+ ],
+ )
+
+ def test_scan_labels(self):
+ self.assertEqual(
+ self.scan1.alllabels(), ["first column", "second column", "3rd_col"]
+ )
+
+ def test_data(self):
+ self.assertAlmostEqual(self.scan1.dataline(3)[2], -3.14)
+ self.assertAlmostEqual(self.scan1.datacol(1)[2], 3.14)
+ # tests for data transposition between original file and .data attr
+ self.assertAlmostEqual(self.scan1.data()[2, 0], 8)
+ self.assertEqual(self.scan1.data().shape, (3, 4))
+ self.assertAlmostEqual(numpy.sum(self.scan1.data()), 113.631)
+
+ def test_date(self):
+ self.assertEqual(self.scan1.date(), "Thu Feb 11 09:55:20 2016")
+
+ def test_motors(self):
+ self.assertEqual(len(self.sf.allmotors()), 6)
+ self.assertEqual(len(self.scan1.allmotorpos()), 6)
+ self.assertAlmostEqual(sum(self.scan1.allmotorpos()), 223.385912)
+ self.assertEqual(self.sf.allmotors()[1], "MRTSlit UP")
+
+ def test_mca(self):
+ self.assertEqual(self.scan1_2.mca(2)[2], 5)
+ self.assertEqual(sum(self.scan1_2.mca(3)), 21.7)
+
+ def test_mca_header(self):
+ self.assertEqual(self.scan1_2.header("CALIB"), ["#@CALIB 1 2 3"])
diff --git a/src/silx/io/test/test_spech5.py b/src/silx/io/test/test_spech5.py
new file mode 100644
index 0000000..93175f7
--- /dev/null
+++ b/src/silx/io/test/test_spech5.py
@@ -0,0 +1,912 @@
+# /*##########################################################################
+# Copyright (C) 2016-2023 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for spech5"""
+import numpy
+import os
+import io
+import tempfile
+import unittest
+import datetime
+from functools import partial
+
+from silx.utils import testutils
+
+from .. import spech5
+from ..spech5 import SpecH5, SpecH5Dataset, spec_date_to_iso8601
+from .. import specfile
+
+import h5py
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "12/02/2018"
+
+sftext = """#F /tmp/sf.dat
+#E 1455180875
+#D Thu Feb 11 09:54:35 2016
+#C imaging User = opid17
+#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
+#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
+#o0 pshg mrtu mrtd
+#o2 ss1vo ss1ho ss1vg
+
+#J0 Seconds IA ion.mono Current
+#J1 xbpmc2 idgap1 Inorm
+
+#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
+#D Thu Feb 11 09:55:20 2016
+#T 0.2 (Seconds)
+#P0 180.005 -0.66875 0.87125
+#P1 14.74255 16.197579 12.238283
+#N 4
+#L MRTSlit UP second column 3rd_col
+-1.23 5.89 8
+8.478100E+01 5 1.56
+3.14 2.73 -3.14
+1.2 2.3 3.4
+
+#S 25 ascan c3th 1.33245 1.52245 40 0.15
+#D Sat 2015/03/14 03:53:50
+#P0 80.005 -1.66875 1.87125
+#P1 4.74255 6.197579 2.238283
+#N 5
+#L column0 column1 col2 col3
+0.0 0.1 0.2 0.3
+1.0 1.1 1.2 1.3
+2.0 2.1 2.2 2.3
+3.0 3.1 3.2 3.3
+
+#S 1 aaaaaa
+#D Thu Feb 11 10:00:32 2016
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+@A 10 9 8
+@A 1 1 1.1
+3 4
+@A 3.1 4 5
+@A 7 6 5
+@A 1 1 1
+5 6
+@A 6 7.7 8
+@A 4 3 2
+@A 1 1 1
+
+#S 1000 bbbbb
+#G1 3.25 3.25 5.207 90 90 120 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353
+#G3 0.0106337923671 0.027529133 1.206191273 -1.43467075 0.7633438883 0.02401568018 -1.709143587 -2.097621783 0.02456954971
+#L a b
+1 2
+
+#S 1001 ccccc
+#G1 0. 0. 0. 0 0 0 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353
+#G3 0. 0. 0. 0. 0.0 0. 0. 0. 0.
+#L a b
+1 2
+
+"""
+
+
+class TestSpecDate(unittest.TestCase):
+ """
+ Test of the spec_date_to_iso8601 function.
+ """
+
+ # TODO : time zone tests
+ # TODO : error cases
+
+ @classmethod
+ def setUpClass(cls):
+ import locale
+
+ # FYI : not threadsafe
+ cls.locale_saved = locale.setlocale(locale.LC_TIME)
+ locale.setlocale(locale.LC_TIME, "C")
+
+ @classmethod
+ def tearDownClass(cls):
+ import locale
+
+ # FYI : not threadsafe
+ locale.setlocale(locale.LC_TIME, cls.locale_saved)
+
+ def setUp(self):
+ # covering all week days
+ self.n_days = range(1, 10)
+ # covering all months
+ self.n_months = range(1, 13)
+
+ self.n_years = [1999, 2016, 2020]
+ self.n_seconds = [0, 5, 26, 59]
+ self.n_minutes = [0, 9, 42, 59]
+ self.n_hours = [0, 2, 17, 23]
+
+ self.formats = ["%a %b %d %H:%M:%S %Y", "%a %Y/%m/%d %H:%M:%S"]
+
+ self.check_date_formats = partial(
+ self.__check_date_formats,
+ year=self.n_years[0],
+ month=self.n_months[0],
+ day=self.n_days[0],
+ hour=self.n_hours[0],
+ minute=self.n_minutes[0],
+ second=self.n_seconds[0],
+ msg=None,
+ )
+
+ def __check_date_formats(self, year, month, day, hour, minute, second, msg=None):
+ dt = datetime.datetime(year, month, day, hour, minute, second)
+ expected_date = dt.isoformat()
+
+ for i_fmt, fmt in enumerate(self.formats):
+ spec_date = dt.strftime(fmt)
+ iso_date = spec_date_to_iso8601(spec_date)
+ self.assertEqual(
+ iso_date,
+ expected_date,
+ msg="Testing {0}. format={1}. "
+ 'Expected "{2}", got "{3} ({4})" (dt={5}).'
+ "".format(msg, i_fmt, expected_date, iso_date, spec_date, dt),
+ )
+
+ def testYearsNominal(self):
+ for year in self.n_years:
+ self.check_date_formats(year=year, msg="year")
+
+ def testMonthsNominal(self):
+ for month in self.n_months:
+ self.check_date_formats(month=month, msg="month")
+
+ def testDaysNominal(self):
+ for day in self.n_days:
+ self.check_date_formats(day=day, msg="day")
+
+ def testHoursNominal(self):
+ for hour in self.n_hours:
+ self.check_date_formats(hour=hour, msg="hour")
+
+ def testMinutesNominal(self):
+ for minute in self.n_minutes:
+ self.check_date_formats(minute=minute, msg="minute")
+
+ def testSecondsNominal(self):
+ for second in self.n_seconds:
+ self.check_date_formats(second=second, msg="second")
+
+
+class TestSpecH5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp()
+ os.write(fd, bytes(sftext, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testContainsFile(self):
+ self.assertIn("/1.2/measurement", self.sfh5)
+ self.assertIn("/25.1", self.sfh5)
+ self.assertIn("25.1", self.sfh5)
+ self.assertNotIn("25.2", self.sfh5)
+ # measurement is a child of a scan, full path would be required to
+ # access from root level
+ self.assertNotIn("measurement", self.sfh5)
+ # Groups may or may not have a trailing /
+ self.assertIn("/1.2/measurement/mca_1/", self.sfh5)
+ self.assertIn("/1.2/measurement/mca_1", self.sfh5)
+ # Datasets can't have a trailing /
+ self.assertNotIn("/1.2/measurement/mca_0/info/calibration/ ", self.sfh5)
+ # No mca_8
+ self.assertNotIn("/1.2/measurement/mca_8/info/calibration", self.sfh5)
+ # Link
+ self.assertIn("/1.2/measurement/mca_0/info/calibration", self.sfh5)
+
+ def testContainsGroup(self):
+ self.assertIn("measurement", self.sfh5["/1.2/"])
+ self.assertIn("measurement", self.sfh5["/1.2"])
+ self.assertIn("25.1", self.sfh5["/"])
+ self.assertNotIn("25.2", self.sfh5["/"])
+ self.assertIn("instrument/positioners/Sslit1 HOff", self.sfh5["/1.1"])
+ # illegal trailing "/" after dataset name
+ self.assertNotIn("instrument/positioners/Sslit1 HOff/", self.sfh5["/1.1"])
+ # full path to element in group (OK)
+ self.assertIn(
+ "/1.1/instrument/positioners/Sslit1 HOff", self.sfh5["/1.1/instrument"]
+ )
+
+ def testDataColumn(self):
+ self.assertAlmostEqual(sum(self.sfh5["/1.2/measurement/duo"]), 12.0)
+ self.assertAlmostEqual(
+ sum(self.sfh5["1.1"]["measurement"]["MRTSlit UP"]), 87.891, places=4
+ )
+
+ def testDate(self):
+ # start time is in Iso8601 format
+ self.assertEqual(self.sfh5["/1.1/start_time"], "2016-02-11T09:55:20")
+ self.assertEqual(self.sfh5["25.1/start_time"], "2015-03-14T03:53:50")
+
+ def assertRaisesRegex(self, *args, **kwargs):
+ return super(TestSpecH5, self).assertRaisesRegex(*args, **kwargs)
+
+ def testDatasetInstanceAttr(self):
+ """The SpecH5Dataset objects must implement some dummy attributes
+ to improve compatibility with widgets dealing with h5py datasets."""
+ self.assertIsNone(self.sfh5["/1.1/start_time"].compression)
+ self.assertIsNone(self.sfh5["1.1"]["measurement"]["MRTSlit UP"].chunks)
+
+ # error message must be explicit
+ with self.assertRaisesRegex(
+ AttributeError, "SpecH5Dataset has no attribute tOTo"
+ ):
+ dummy = self.sfh5["/1.1/start_time"].tOTo
+
+ def testGet(self):
+ """Test :meth:`SpecH5Group.get`"""
+ # default value of param *default* is None
+ self.assertIsNone(self.sfh5.get("toto"))
+ self.assertEqual(self.sfh5["25.1"].get("toto", default=-3), -3)
+
+ self.assertEqual(
+ self.sfh5.get("/1.1/start_time", default=-3), "2016-02-11T09:55:20"
+ )
+
+ def testGetClass(self):
+ """Test :meth:`SpecH5Group.get`"""
+ self.assertIs(self.sfh5["1.1"].get("start_time", getclass=True), h5py.Dataset)
+ self.assertIs(self.sfh5["1.1"].get("instrument", getclass=True), h5py.Group)
+
+ # spech5 does not define external link, so there is no way
+ # a group can *get* a SpecH5 class
+
+ def testGetApi(self):
+ result = self.sfh5.get("1.1", getclass=True, getlink=True)
+ self.assertIs(result, h5py.HardLink)
+ result = self.sfh5.get("1.1", getclass=False, getlink=True)
+ self.assertIsInstance(result, h5py.HardLink)
+ result = self.sfh5.get("1.1", getclass=True, getlink=False)
+ self.assertIs(result, h5py.Group)
+ result = self.sfh5.get("1.1", getclass=False, getlink=False)
+ self.assertIsInstance(result, spech5.SpecH5Group)
+
+ def testGetItemGroup(self):
+ group = self.sfh5["25.1"]["instrument"]
+ self.assertEqual(
+ list(group["positioners"].keys()),
+ [
+ "Pslit HGap",
+ "MRTSlit UP",
+ "MRTSlit DOWN",
+ "Sslit1 VOff",
+ "Sslit1 HOff",
+ "Sslit1 VGap",
+ ],
+ )
+ with self.assertRaises(KeyError):
+ group["Holy Grail"]
+
+ def testGetitemSpecH5(self):
+ self.assertEqual(
+ self.sfh5["/1.2/instrument/positioners"],
+ self.sfh5["1.2"]["instrument"]["positioners"],
+ )
+
+ def testH5pyClass(self):
+ """Test :attr:`h5py_class` returns the corresponding h5py class
+ (h5py.File, h5py.Group, h5py.Dataset)"""
+ a_file = self.sfh5
+ self.assertIs(a_file.h5py_class, h5py.File)
+
+ a_group = self.sfh5["/1.2/measurement"]
+ self.assertIs(a_group.h5py_class, h5py.Group)
+
+ a_dataset = self.sfh5["/1.1/instrument/positioners/Sslit1 HOff"]
+ self.assertIs(a_dataset.h5py_class, h5py.Dataset)
+
+ def testHeader(self):
+ file_header = self.sfh5["/1.2/instrument/specfile/file_header"]
+ scan_header = self.sfh5["/1.2/instrument/specfile/scan_header"]
+
+ # File header has 10 lines
+ self.assertEqual(len(file_header), 10)
+ # 1.2 has 9 scan & mca header lines
+ self.assertEqual(len(scan_header), 9)
+
+ # line 4 of file header
+ self.assertEqual(file_header[3], "#C imaging User = opid17")
+ # line 4 of scan header
+ scan_header = self.sfh5["25.1/instrument/specfile/scan_header"]
+
+ self.assertEqual(scan_header[3], "#P1 4.74255 6.197579 2.238283")
+
+ def testLinks(self):
+ self.assertTrue(
+ numpy.array_equal(
+ self.sfh5["/1.2/measurement/mca_0/data"],
+ self.sfh5["/1.2/instrument/mca_0/data"],
+ )
+ )
+ self.assertTrue(
+ numpy.array_equal(
+ self.sfh5["/1.2/measurement/mca_0/info/data"],
+ self.sfh5["/1.2/instrument/mca_0/data"],
+ )
+ )
+ self.assertTrue(
+ numpy.array_equal(
+ self.sfh5["/1.2/measurement/mca_0/info/channels"],
+ self.sfh5["/1.2/instrument/mca_0/channels"],
+ )
+ )
+ self.assertEqual(
+ self.sfh5["/1.2/measurement/mca_0/info/"].keys(),
+ self.sfh5["/1.2/instrument/mca_0/"].keys(),
+ )
+
+ self.assertEqual(
+ self.sfh5["/1.2/measurement/mca_0/info/preset_time"],
+ self.sfh5["/1.2/instrument/mca_0/preset_time"],
+ )
+ self.assertEqual(
+ self.sfh5["/1.2/measurement/mca_0/info/live_time"],
+ self.sfh5["/1.2/instrument/mca_0/live_time"],
+ )
+ self.assertEqual(
+ self.sfh5["/1.2/measurement/mca_0/info/elapsed_time"],
+ self.sfh5["/1.2/instrument/mca_0/elapsed_time"],
+ )
+
+ def testListScanIndices(self):
+ self.assertEqual(
+ list(self.sfh5.keys()), ["1.1", "25.1", "1.2", "1000.1", "1001.1"]
+ )
+ self.assertEqual(
+ self.sfh5["1.2"].attrs,
+ {
+ "NX_class": "NXentry",
+ },
+ )
+
+ def testMcaAbsent(self):
+ def access_absent_mca():
+ """This must raise a KeyError, because scan 1.1 has no MCA"""
+ return self.sfh5["/1.1/measurement/mca_0/"]
+
+ self.assertRaises(KeyError, access_absent_mca)
+
+ def testMcaCalib(self):
+ mca0_calib = self.sfh5["/1.2/measurement/mca_0/info/calibration"]
+ mca1_calib = self.sfh5["/1.2/measurement/mca_1/info/calibration"]
+ self.assertEqual(mca0_calib.tolist(), [1, 2, 3])
+ # calibration is unique in this scan and applies to all analysers
+ self.assertEqual(mca0_calib.tolist(), mca1_calib.tolist())
+
+ def testMcaChannels(self):
+ mca0_chann = self.sfh5["/1.2/measurement/mca_0/info/channels"]
+ mca1_chann = self.sfh5["/1.2/measurement/mca_1/info/channels"]
+ self.assertEqual(mca0_chann.tolist(), [0, 1, 2])
+ self.assertEqual(mca0_chann.tolist(), mca1_chann.tolist())
+
+ def testMcaCtime(self):
+ """Tests for #@CTIME mca header"""
+ datasets = ["preset_time", "live_time", "elapsed_time"]
+ for ds in datasets:
+ self.assertNotIn("/1.1/instrument/mca_0/" + ds, self.sfh5)
+ self.assertIn("/1.2/instrument/mca_0/" + ds, self.sfh5)
+
+ mca0_preset_time = self.sfh5["/1.2/instrument/mca_0/preset_time"]
+ mca1_preset_time = self.sfh5["/1.2/instrument/mca_1/preset_time"]
+ self.assertLess(mca0_preset_time - 123.4, 10**-5)
+ # ctime is unique in a this scan and applies to all analysers
+ self.assertEqual(mca0_preset_time, mca1_preset_time)
+
+ mca0_live_time = self.sfh5["/1.2/instrument/mca_0/live_time"]
+ mca1_live_time = self.sfh5["/1.2/instrument/mca_1/live_time"]
+ self.assertLess(mca0_live_time - 234.5, 10**-5)
+ self.assertEqual(mca0_live_time, mca1_live_time)
+
+ mca0_elapsed_time = self.sfh5["/1.2/instrument/mca_0/elapsed_time"]
+ mca1_elapsed_time = self.sfh5["/1.2/instrument/mca_1/elapsed_time"]
+ self.assertLess(mca0_elapsed_time - 345.6, 10**-5)
+ self.assertEqual(mca0_elapsed_time, mca1_elapsed_time)
+
+ def testMcaData(self):
+ # sum 1st MCA in scan 1.2 over rows
+ mca_0_data = self.sfh5["/1.2/measurement/mca_0/data"]
+ for summed_row, expected in zip(
+ mca_0_data.sum(axis=1).tolist(), [3.0, 12.1, 21.7]
+ ):
+ self.assertAlmostEqual(summed_row, expected, places=4)
+
+ # sum 3rd MCA in scan 1.2 along both axis
+ mca_2_data = self.sfh5["1.2"]["measurement"]["mca_2"]["data"]
+ self.assertAlmostEqual(sum(sum(mca_2_data)), 9.1, places=5)
+ # attrs
+ self.assertEqual(mca_0_data.attrs, {"interpretation": "spectrum"})
+
+ def testMotorPosition(self):
+ positioners_group = self.sfh5["/1.1/instrument/positioners"]
+ # MRTSlit DOWN position is defined in #P0 san header line
+ self.assertAlmostEqual(float(positioners_group["MRTSlit DOWN"]), 0.87125)
+ # MRTSlit UP position is defined in first data column
+ for a, b in zip(
+ positioners_group["MRTSlit UP"].tolist(), [-1.23, 8.478100e01, 3.14, 1.2]
+ ):
+ self.assertAlmostEqual(float(a), b, places=4)
+
+ def testNumberMcaAnalysers(self):
+ """Scan 1.2 has 2 data columns + 3 mca spectra per data line."""
+ self.assertEqual(len(self.sfh5["1.2"]["measurement"]), 5)
+
+ def testTitle(self):
+ self.assertEqual(
+ self.sfh5["/25.1/title"], "ascan c3th 1.33245 1.52245 40 0.15"
+ )
+
+ def testValues(self):
+ group = self.sfh5["/25.1"]
+ self.assertTrue(hasattr(group, "values"))
+ self.assertTrue(callable(group.values))
+ self.assertIn(self.sfh5["/25.1/title"], self.sfh5["/25.1"].values())
+
+ # visit and visititems ignore links
+ def testVisit(self):
+ name_list = []
+ self.sfh5.visit(name_list.append)
+ self.assertIn("1.2/instrument/positioners/Pslit HGap", name_list)
+ self.assertIn("1.2/instrument/specfile/scan_header", name_list)
+ self.assertEqual(len(name_list), 117)
+
+ # test also visit of a subgroup, with various group name formats
+ name_list_leading_and_trailing_slash = []
+ self.sfh5["/1.2/instrument/"].visit(name_list_leading_and_trailing_slash.append)
+ name_list_leading_slash = []
+ self.sfh5["/1.2/instrument"].visit(name_list_leading_slash.append)
+ name_list_trailing_slash = []
+ self.sfh5["1.2/instrument/"].visit(name_list_trailing_slash.append)
+ name_list_no_slash = []
+ self.sfh5["1.2/instrument"].visit(name_list_no_slash.append)
+
+ # no differences expected in the output names
+ self.assertEqual(name_list_leading_and_trailing_slash, name_list_leading_slash)
+ self.assertEqual(name_list_leading_slash, name_list_trailing_slash)
+ self.assertEqual(name_list_leading_slash, name_list_no_slash)
+ self.assertIn("positioners/Pslit HGap", name_list_no_slash)
+ self.assertIn("positioners", name_list_no_slash)
+
+ def testVisitItems(self):
+ dataset_name_list = []
+
+ def func_generator(l):
+ """return a function appending names to list l"""
+
+ def func(name, obj):
+ if isinstance(obj, SpecH5Dataset):
+ l.append(name)
+
+ return func
+
+ self.sfh5.visititems(func_generator(dataset_name_list))
+ self.assertIn("1.2/instrument/positioners/Pslit HGap", dataset_name_list)
+ self.assertEqual(len(dataset_name_list), 85)
+
+ # test also visit of a subgroup, with various group name formats
+ name_list_leading_and_trailing_slash = []
+ self.sfh5["/1.2/instrument/"].visititems(
+ func_generator(name_list_leading_and_trailing_slash)
+ )
+ name_list_leading_slash = []
+ self.sfh5["/1.2/instrument"].visititems(func_generator(name_list_leading_slash))
+ name_list_trailing_slash = []
+ self.sfh5["1.2/instrument/"].visititems(
+ func_generator(name_list_trailing_slash)
+ )
+ name_list_no_slash = []
+ self.sfh5["1.2/instrument"].visititems(func_generator(name_list_no_slash))
+
+ # no differences expected in the output names
+ self.assertEqual(name_list_leading_and_trailing_slash, name_list_leading_slash)
+ self.assertEqual(name_list_leading_slash, name_list_trailing_slash)
+ self.assertEqual(name_list_leading_slash, name_list_no_slash)
+ self.assertIn("positioners/Pslit HGap", name_list_no_slash)
+
+ def testNotSpecH5(self):
+ fd, fname = tempfile.mkstemp()
+ os.write(fd, b"Not a spec file!")
+ os.close(fd)
+ self.assertRaises(specfile.SfErrFileOpen, SpecH5, fname)
+ self.assertRaises(IOError, SpecH5, fname)
+ os.unlink(fname)
+
+ def testSample(self):
+ self.assertNotIn("sample", self.sfh5["/1.1"])
+ self.assertIn("sample", self.sfh5["/1000.1"])
+ self.assertIn("ub_matrix", self.sfh5["/1000.1/sample"])
+ self.assertIn("unit_cell", self.sfh5["/1000.1/sample"])
+ self.assertIn("unit_cell_abc", self.sfh5["/1000.1/sample"])
+ self.assertIn("unit_cell_alphabetagamma", self.sfh5["/1000.1/sample"])
+
+ # All 0 values
+ self.assertNotIn("sample", self.sfh5["/1001.1"])
+ with self.assertRaises(KeyError):
+ self.sfh5["/1001.1/sample/unit_cell"]
+
+ @testutils.validate_logging(spech5.logger1.name, warning=2)
+ def testOpenFileDescriptor(self):
+ """Open a SpecH5 file from a file descriptor"""
+ with io.open(self.sfh5.filename) as f:
+ sfh5 = SpecH5(f)
+ self.assertIsNotNone(sfh5)
+ name_list = []
+ # check if the object is working
+ self.sfh5.visit(name_list.append)
+ sfh5.close()
+
+
+sftext_multi_mca_headers = """
+#S 1 aaaaaa
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#@MCA %16C
+#@CHANN 3 1 3 1
+#@CALIB 5.5 6.6 7.7
+#@CTIME 10 11 12
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+@A 10 9 8
+3 4
+@A 3.1 4 5
+@A 7 6 5
+5 6
+@A 6 7.7 8
+@A 4 3 2
+
+"""
+
+
+class TestSpecH5MultiMca(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp(text=False)
+ os.write(fd, bytes(sftext_multi_mca_headers, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testMcaCalib(self):
+ mca0_calib = self.sfh5["/1.1/measurement/mca_0/info/calibration"]
+ mca1_calib = self.sfh5["/1.1/measurement/mca_1/info/calibration"]
+ self.assertEqual(mca0_calib.tolist(), [1, 2, 3])
+ self.assertAlmostEqual(sum(mca1_calib.tolist()), sum([5.5, 6.6, 7.7]), places=5)
+
+ def testMcaChannels(self):
+ mca0_chann = self.sfh5["/1.1/measurement/mca_0/info/channels"]
+ mca1_chann = self.sfh5["/1.1/measurement/mca_1/info/channels"]
+ self.assertEqual(mca0_chann.tolist(), [0.0, 1.0, 2.0])
+ # @CHANN is unique in this scan and applies to all analysers
+ self.assertEqual(mca1_chann.tolist(), [1.0, 2.0, 3.0])
+
+ def testMcaCtime(self):
+ """Tests for #@CTIME mca header"""
+ mca0_preset_time = self.sfh5["/1.1/instrument/mca_0/preset_time"]
+ mca1_preset_time = self.sfh5["/1.1/instrument/mca_1/preset_time"]
+ self.assertLess(mca0_preset_time - 123.4, 10**-5)
+ self.assertLess(mca1_preset_time - 10, 10**-5)
+
+ mca0_live_time = self.sfh5["/1.1/instrument/mca_0/live_time"]
+ mca1_live_time = self.sfh5["/1.1/instrument/mca_1/live_time"]
+ self.assertLess(mca0_live_time - 234.5, 10**-5)
+ self.assertLess(mca1_live_time - 11, 10**-5)
+
+ mca0_elapsed_time = self.sfh5["/1.1/instrument/mca_0/elapsed_time"]
+ mca1_elapsed_time = self.sfh5["/1.1/instrument/mca_1/elapsed_time"]
+ self.assertLess(mca0_elapsed_time - 345.6, 10**-5)
+ self.assertLess(mca1_elapsed_time - 12, 10**-5)
+
+
+sftext_no_cols = r"""#F C:/DATA\test.mca
+#D Thu Jul 7 08:40:19 2016
+
+#S 1 31oct98.dat 22.1 If4
+#D Thu Jul 7 08:40:19 2016
+#C no data cols, one mca analyser, single spectrum
+#@MCA %16C
+#@CHANN 151 0 150 1
+#@CALIB 0 2 0
+@A 789 784 788 814 847 862 880 904 925 955 987 1015 1031 1070 1111 1139 \
+1203 1236 1290 1392 1492 1558 1688 1813 1977 2119 2346 2699 3121 3542 4102 4970 \
+6071 7611 10426 16188 28266 40348 50539 55555 56162 54162 47102 35718 24588 17034 12994 11444 \
+11808 13461 15687 18885 23827 31578 41999 49556 58084 59415 59456 55698 44525 28219 17680 12881 \
+9518 7415 6155 5246 4646 3978 3612 3299 3020 2761 2670 2472 2500 2310 2286 2106 \
+1989 1890 1782 1655 1421 1293 1135 990 879 757 672 618 532 488 445 424 \
+414 373 351 325 307 284 270 247 228 213 199 187 183 176 164 156 \
+153 140 142 130 118 118 103 101 97 86 90 86 87 81 75 82 \
+80 76 77 75 76 77 62 69 74 60 65 68 65 58 63 64 \
+63 59 60 56 57 60 55
+
+#S 2 31oct98.dat 22.1 If4
+#D Thu Jul 7 08:40:19 2016
+#C no data cols, one mca analyser, multiple spectra
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+@A 0 1 2
+@A 10 9 8
+@A 1 1 1.1
+@A 3.1 4 5
+@A 7 6 5
+@A 1 1 1
+@A 6 7.7 8
+@A 4 3 2
+@A 1 1 1
+
+#S 3 31oct98.dat 22.1 If4
+#D Thu Jul 7 08:40:19 2016
+#C no data cols, 3 mca analysers, multiple spectra
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#@MCADEV 2
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#@MCADEV 3
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+@A 0 1 2
+@A 10 9 8
+@A 1 1 1.1
+@A 3.1 4 5
+@A 7 6 5
+@A 1 1 1
+@A 6 7.7 8
+@A 4 3 2
+@A 1 1 1
+"""
+
+
+class TestSpecH5NoDataCols(unittest.TestCase):
+ """Test reading SPEC files with only MCA data"""
+
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp()
+ os.write(fd, bytes(sftext_no_cols, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testScan1(self):
+ # 1.1: single analyser, single spectrum, 151 channels
+ self.assertIn("mca_0", self.sfh5["1.1/instrument/"])
+ self.assertEqual(self.sfh5["1.1/instrument/mca_0/data"].shape, (1, 151))
+ self.assertNotIn("mca_1", self.sfh5["1.1/instrument/"])
+
+ def testScan2(self):
+ # 2.1: single analyser, 9 spectra, 3 channels
+ self.assertIn("mca_0", self.sfh5["2.1/instrument/"])
+ self.assertEqual(self.sfh5["2.1/instrument/mca_0/data"].shape, (9, 3))
+ self.assertNotIn("mca_1", self.sfh5["2.1/instrument/"])
+
+ def testScan3(self):
+ # 3.1: 3 analysers, 3 spectra/analyser, 3 channels
+ for i in range(3):
+ self.assertIn("mca_%d" % i, self.sfh5["3.1/instrument/"])
+ self.assertEqual(self.sfh5["3.1/instrument/mca_%d/data" % i].shape, (3, 3))
+
+ self.assertNotIn("mca_3", self.sfh5["3.1/instrument/"])
+
+
+sf_text_slash = r"""#F /data/id09/archive/logspecfiles/laue/2016/scan_231_laue_16-11-29.dat
+#D Sat Dec 10 22:20:59 2016
+#O0 Pslit/HGap MRTSlit%UP
+
+#S 1 laue_16-11-29.log 231.1 PD3/A
+#D Sat Dec 10 22:20:59 2016
+#P0 180.005 -0.66875
+#N 2
+#L GONY/mm PD3%A
+-2.015 5.250424e-05
+-2.01 5.30798e-05
+-2.005 5.281903e-05
+-2 5.220436e-05
+"""
+
+
+class TestSpecH5SlashInLabels(unittest.TestCase):
+ """Test reading SPEC files with labels containing a / character
+
+ The / character must be substituted with a %
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp()
+ os.write(fd, bytes(sf_text_slash, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testLabels(self):
+ """Ensure `/` is substituted with `%` and
+ ensure legitimate `%` in names are still working"""
+ self.assertEqual(
+ list(self.sfh5["1.1/measurement/"].keys()), ["GONY%mm", "PD3%A"]
+ )
+
+ # substituted "%"
+ self.assertIn("GONY%mm", self.sfh5["1.1/measurement/"])
+ self.assertNotIn("GONY/mm", self.sfh5["1.1/measurement/"])
+ self.assertAlmostEqual(
+ self.sfh5["1.1/measurement/GONY%mm"][0], -2.015, places=4
+ )
+ # legitimate "%"
+ self.assertIn("PD3%A", self.sfh5["1.1/measurement/"])
+
+ def testMotors(self):
+ """Ensure `/` is substituted with `%` and
+ ensure legitimate `%` in names are still working"""
+ self.assertEqual(
+ list(self.sfh5["1.1/instrument/positioners"].keys()),
+ ["Pslit%HGap", "MRTSlit%UP"],
+ )
+ # substituted "%"
+ self.assertIn("Pslit%HGap", self.sfh5["1.1/instrument/positioners"])
+ self.assertNotIn("Pslit/HGap", self.sfh5["1.1/instrument/positioners"])
+ self.assertAlmostEqual(
+ self.sfh5["1.1/instrument/positioners/Pslit%HGap"], 180.005, places=4
+ )
+ # legitimate "%"
+ self.assertIn("MRTSlit%UP", self.sfh5["1.1/instrument/positioners"])
+
+
+def testUnitCellUBMatrix(tmp_path):
+ """Test unit cell (#G1) and UB matrix (#G3)"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(
+ bytes(
+ """
+#S 1 OK
+#G1 0 1 2 3 4 5
+#G3 0 1 2 3 4 5 6 7 8
+""",
+ encoding="ascii",
+ )
+ )
+ with SpecH5(str(file_path)) as spech5:
+ assert numpy.array_equal(
+ spech5["/1.1/sample/ub_matrix"], numpy.arange(9).reshape(1, 3, 3)
+ )
+ assert numpy.array_equal(spech5["/1.1/sample/unit_cell"], [[0, 1, 2, 3, 4, 5]])
+ assert numpy.array_equal(spech5["/1.1/sample/unit_cell_abc"], [0, 1, 2])
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell_alphabetagamma"], [3, 4, 5]
+ )
+
+
+def testMalformedUnitCellUBMatrix(tmp_path):
+ """Test malformed unit cell (#G1) and UB matrix (#G3): 1 value"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(
+ bytes(
+ """
+#S 1 all malformed=0
+#G1 0
+#G3 0
+""",
+ encoding="ascii",
+ )
+ )
+ with SpecH5(str(file_path)) as spech5:
+ assert "sample" not in spech5["1.1"]
+
+
+def testMalformedUBMatrix(tmp_path):
+ """Test malformed UB matrix (#G3): all zeros"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(
+ bytes(
+ """
+#S 1 G3 all 0
+#G1 0 1 2 3 4 5
+#G3 0 0 0 0 0 0 0 0 0
+""",
+ encoding="ascii",
+ )
+ )
+ with SpecH5(str(file_path)) as spech5:
+ assert "ub_matrix" not in spech5["/1.1/sample"]
+ assert numpy.array_equal(spech5["/1.1/sample/unit_cell"], [[0, 1, 2, 3, 4, 5]])
+ assert numpy.array_equal(spech5["/1.1/sample/unit_cell_abc"], [0, 1, 2])
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell_alphabetagamma"], [3, 4, 5]
+ )
+
+
+def testMalformedUnitCell(tmp_path):
+ """Test malformed unit cell (#G1): missing values"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(
+ bytes(
+ """
+#S 1 G1 malformed missing values
+#G1 0 1 2
+#G3 0 1 2 3 4 5 6 7 8
+""",
+ encoding="ascii",
+ )
+ )
+ with SpecH5(str(file_path)) as spech5:
+ assert "unit_cell" not in spech5["/1.1/sample"]
+ assert "unit_cell_abc" not in spech5["/1.1/sample"]
+ assert "unit_cell_alphabetagamma" not in spech5["/1.1/sample"]
+ assert numpy.array_equal(
+ spech5["/1.1/sample/ub_matrix"], numpy.arange(9).reshape(1, 3, 3)
+ )
diff --git a/src/silx/io/test/test_spectoh5.py b/src/silx/io/test/test_spectoh5.py
new file mode 100644
index 0000000..a3426ea
--- /dev/null
+++ b/src/silx/io/test/test_spectoh5.py
@@ -0,0 +1,186 @@
+# /*##########################################################################
+# Copyright (C) 2016-2019 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for SpecFile to HDF5 converter"""
+
+from numpy import array_equal
+import os
+import tempfile
+import unittest
+
+import h5py
+
+from ..spech5 import SpecH5, SpecH5Group
+from ..convert import convert, write_to_h5
+from ..utils import h5py_read_dataset
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "12/02/2018"
+
+
+sfdata = b"""#F /tmp/sf.dat
+#E 1455180875
+#D Thu Feb 11 09:54:35 2016
+#C imaging User = opid17
+#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
+#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
+#o0 pshg mrtu mrtd
+#o2 ss1vo ss1ho ss1vg
+
+#J0 Seconds IA ion.mono Current
+#J1 xbpmc2 idgap1 Inorm
+
+#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
+#D Thu Feb 11 09:55:20 2016
+#T 0.2 (Seconds)
+#P0 180.005 -0.66875 0.87125
+#P1 14.74255 16.197579 12.238283
+#N 4
+#L MRTSlit UP second column 3rd_col
+-1.23 5.89 8
+8.478100E+01 5 1.56
+3.14 2.73 -3.14
+1.2 2.3 3.4
+
+#S 1 aaaaaa
+#D Thu Feb 11 10:00:32 2016
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+@A 10 9 8
+3 4
+@A 3.1 4 5
+@A 7 6 5
+5 6
+@A 6 7.7 8
+@A 4 3 2
+"""
+
+
+class TestConvertSpecHDF5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.spec_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5")
+ os.write(fd, sfdata)
+ os.close(fd)
+
+ fd, cls.h5_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5")
+ # Close and delete (we just need the name)
+ os.close(fd)
+ os.unlink(cls.h5_fname)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.spec_fname)
+
+ def setUp(self):
+ convert(self.spec_fname, self.h5_fname)
+
+ self.sfh5 = SpecH5(self.spec_fname)
+ self.h5f = h5py.File(self.h5_fname, "a")
+
+ def tearDown(self):
+ self.h5f.close()
+ self.sfh5.close()
+ os.unlink(self.h5_fname)
+
+ def testAppendToHDF5(self):
+ write_to_h5(self.sfh5, self.h5f, h5path="/foo/bar/spam")
+ self.assertTrue(
+ array_equal(
+ self.h5f["/1.2/measurement/mca_1/data"],
+ self.h5f["/foo/bar/spam/1.2/measurement/mca_1/data"],
+ )
+ )
+
+ def testWriteSpecH5Group(self):
+ """Test passing a SpecH5Group as parameter, instead of a Spec filename
+ or a SpecH5."""
+ g = self.sfh5["1.1/instrument"]
+ self.assertIsInstance(g, SpecH5Group) # let's be paranoid
+ write_to_h5(g, self.h5f, h5path="my instruments")
+
+ self.assertAlmostEqual(
+ self.h5f["my instruments/positioners/Sslit1 HOff"][tuple()],
+ 16.197579,
+ places=4,
+ )
+
+ def testTitle(self):
+ """Test the value of a dataset"""
+ title12 = h5py_read_dataset(self.h5f["/1.2/title"])
+ self.assertEqual(title12, "aaaaaa")
+
+ def testAttrs(self):
+ # Test root group (file) attributes
+ self.assertEqual(self.h5f.attrs["NX_class"], "NXroot")
+ # Test dataset attributes
+ ds = self.h5f["/1.2/instrument/mca_1/data"]
+ self.assertTrue("interpretation" in ds.attrs)
+ self.assertEqual(list(ds.attrs.values()), ["spectrum"])
+ # Test group attributes
+ grp = self.h5f["1.1"]
+ self.assertEqual(grp.attrs["NX_class"], "NXentry")
+ self.assertEqual(len(list(grp.attrs.keys())), 1)
+
+ def testHdf5HasSameMembers(self):
+ spec_member_list = []
+
+ def append_spec_members(name):
+ spec_member_list.append(name)
+
+ self.sfh5.visit(append_spec_members)
+
+ hdf5_member_list = []
+
+ def append_hdf5_members(name):
+ hdf5_member_list.append(name)
+
+ self.h5f.visit(append_hdf5_members)
+
+ # 1. For some reason, h5py visit method doesn't include the leading
+ # "/" character when it passes the member name to the function,
+ # even though an explicit the .name attribute of a member will
+ # have a leading "/"
+ spec_member_list = [m.lstrip("/") for m in spec_member_list]
+
+ self.assertEqual(set(hdf5_member_list), set(spec_member_list))
+
+ def testLinks(self):
+ self.assertTrue(
+ array_equal(
+ self.sfh5["/1.2/measurement/mca_0/data"],
+ self.h5f["/1.2/measurement/mca_0/data"],
+ )
+ )
+ self.assertTrue(
+ array_equal(
+ self.h5f["/1.2/instrument/mca_1/channels"],
+ self.h5f["/1.2/measurement/mca_1/info/channels"],
+ )
+ )
diff --git a/src/silx/io/test/test_url.py b/src/silx/io/test/test_url.py
new file mode 100644
index 0000000..61f9883
--- /dev/null
+++ b/src/silx/io/test/test_url.py
@@ -0,0 +1,300 @@
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for url module"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "29/01/2018"
+
+
+import pytest
+from ..url import DataUrl
+
+
+def assert_url(url, expected):
+ assert url.is_valid() == expected[0]
+ assert url.is_absolute() == expected[1]
+ assert url.scheme() == expected[2]
+ assert url.file_path() == expected[3]
+ assert url.data_path() == expected[4]
+ assert url.data_slice() == expected[5]
+
+
+def test_fabio_absolute():
+ url = DataUrl("fabio:///data/image.edf?slice=2")
+ expected = [True, True, "fabio", "/data/image.edf", None, (2,)]
+ assert_url(url, expected)
+
+
+def test_fabio_absolute_windows():
+ url = DataUrl("fabio:///C:/data/image.edf?slice=2")
+ expected = [True, True, "fabio", "C:/data/image.edf", None, (2,)]
+ assert_url(url, expected)
+
+
+def test_silx_absolute():
+ url = DataUrl("silx:///data/image.h5?path=/data/dataset&slice=1,5")
+ expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)]
+ assert_url(url, expected)
+
+
+def test_commandline_shell_separator():
+ url = DataUrl("silx:///data/image.h5::path=/data/dataset&slice=1,5")
+ expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)]
+ assert_url(url, expected)
+
+
+def test_silx_absolute2():
+ url = DataUrl("silx:///data/image.edf?/scan_0/detector/data")
+ expected = [True, True, "silx", "/data/image.edf", "/scan_0/detector/data", None]
+ assert_url(url, expected)
+
+
+def test_silx_absolute_windows():
+ url = DataUrl("silx:///C:/data/image.h5?/scan_0/detector/data")
+ expected = [True, True, "silx", "C:/data/image.h5", "/scan_0/detector/data", None]
+ assert_url(url, expected)
+
+
+def test_silx_relative():
+ url = DataUrl("silx:./image.h5")
+ expected = [True, False, "silx", "./image.h5", None, None]
+ assert_url(url, expected)
+
+
+def test_fabio_relative():
+ url = DataUrl("fabio:./image.edf")
+ expected = [True, False, "fabio", "./image.edf", None, None]
+ assert_url(url, expected)
+
+
+def test_silx_relative2():
+ url = DataUrl("silx:image.h5")
+ expected = [True, False, "silx", "image.h5", None, None]
+ assert_url(url, expected)
+
+
+def test_fabio_relative2():
+ url = DataUrl("fabio:image.edf")
+ expected = [True, False, "fabio", "image.edf", None, None]
+ assert_url(url, expected)
+
+
+def test_file_relative():
+ url = DataUrl("image.edf")
+ expected = [True, False, None, "image.edf", None, None]
+ assert_url(url, expected)
+
+
+def test_file_relative2():
+ url = DataUrl("./foo/bar/image.edf")
+ expected = [True, False, None, "./foo/bar/image.edf", None, None]
+ assert_url(url, expected)
+
+
+def test_file_relative3():
+ url = DataUrl("foo/bar/image.edf")
+ expected = [True, False, None, "foo/bar/image.edf", None, None]
+ assert_url(url, expected)
+
+
+def test_file_absolute():
+ url = DataUrl("/data/image.edf")
+ expected = [True, True, None, "/data/image.edf", None, None]
+ assert_url(url, expected)
+
+
+def test_file_absolute_windows():
+ url = DataUrl("C:/data/image.edf")
+ expected = [True, True, None, "C:/data/image.edf", None, None]
+ assert_url(url, expected)
+
+
+def test_absolute_with_path():
+ url = DataUrl("/foo/foobar.h5?/foo/bar")
+ expected = [True, True, None, "/foo/foobar.h5", "/foo/bar", None]
+ assert_url(url, expected)
+
+
+def test_windows_file_data_slice():
+ url = DataUrl("C:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [True, True, None, "C:/foo/foobar.h5", "/foo/bar", (5, 1)]
+ assert_url(url, expected)
+
+
+def test_scheme_file_data_slice():
+ url = DataUrl("silx:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [True, True, "silx", "/foo/foobar.h5", "/foo/bar", (5, 1)]
+ assert_url(url, expected)
+
+
+def test_scheme_windows_file_data_slice():
+ url = DataUrl("silx:C:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [True, True, "silx", "C:/foo/foobar.h5", "/foo/bar", (5, 1)]
+ assert_url(url, expected)
+
+
+def test_empty():
+ url = DataUrl("")
+ expected = [False, False, None, "", None, None]
+ assert_url(url, expected)
+
+
+def test_unknown_scheme():
+ url = DataUrl("foo:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [False, True, "foo", "/foo/foobar.h5", "/foo/bar", (5, 1)]
+ assert_url(url, expected)
+
+
+def test_slice():
+ url = DataUrl("/a.h5?path=/b&slice=5,1")
+ expected = [True, True, None, "/a.h5", "/b", (5, 1)]
+ assert_url(url, expected)
+
+
+def test_slice2():
+ url = DataUrl("/a.h5?path=/b&slice=2:5")
+ expected = [True, True, None, "/a.h5", "/b", (slice(2, 5),)]
+ assert_url(url, expected)
+
+
+def test_slice3():
+ url = DataUrl("/a.h5?path=/b&slice=::2")
+ expected = [True, True, None, "/a.h5", "/b", (slice(None, None, 2),)]
+ assert_url(url, expected)
+
+
+def test_slice_ellipsis():
+ url = DataUrl("/a.h5?path=/b&slice=...")
+ expected = [True, True, None, "/a.h5", "/b", (Ellipsis,)]
+ assert_url(url, expected)
+
+
+def test_slice_slicing():
+ url = DataUrl("/a.h5?path=/b&slice=:")
+ expected = [True, True, None, "/a.h5", "/b", (slice(None),)]
+ assert_url(url, expected)
+
+
+def test_slice_missing_element():
+ url = DataUrl("/a.h5?path=/b&slice=5,,1")
+ expected = [False, True, None, "/a.h5", "/b", None]
+ assert_url(url, expected)
+
+
+def test_slice_no_elements():
+ url = DataUrl("/a.h5?path=/b&slice=")
+ expected = [False, True, None, "/a.h5", "/b", None]
+ assert_url(url, expected)
+
+
+def test_create_relative_url():
+ url = DataUrl(scheme="silx", file_path="./foo.h5", data_path="/", data_slice=(5, 1))
+ assert not url.is_absolute()
+ url2 = DataUrl(url.path())
+ assert url == url2
+
+
+def test_create_absolute_url():
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1))
+ url2 = DataUrl(url.path())
+ assert url == url2
+
+
+def test_create_absolute_windows_url():
+ url = DataUrl(
+ scheme="silx", file_path="C:/foo.h5", data_path="/", data_slice=(5, 1)
+ )
+ url2 = DataUrl(url.path())
+ assert url == url2
+
+
+def test_create_slice_url():
+ url = DataUrl(
+ scheme="silx",
+ file_path="/foo.h5",
+ data_path="/",
+ data_slice=(5, 1, Ellipsis, slice(None)),
+ )
+ url2 = DataUrl(url.path())
+ assert url == url2
+
+
+def test_wrong_url():
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=(5, 1))
+ assert not url.is_valid()
+
+
+@pytest.mark.parametrize(
+ "data",
+ [
+ (1, "silx:///foo.h5?slice=1"),
+ ((1,), "silx:///foo.h5?slice=1"),
+ (slice(None), "silx:///foo.h5?slice=:"),
+ (slice(1, None), "silx:///foo.h5?slice=1:"),
+ (slice(None, -2), "silx:///foo.h5?slice=:-2"),
+ (slice(1, None, 3), "silx:///foo.h5?slice=1::3"),
+ (slice(None, 2, 3), "silx:///foo.h5?slice=:2:3"),
+ (slice(None, None, 3), "silx:///foo.h5?slice=::3"),
+ (slice(1, 2, 3), "silx:///foo.h5?slice=1:2:3"),
+ ((1, slice(1, 2)), "silx:///foo.h5?slice=1,1:2"),
+ ],
+)
+def test_path_creation(data):
+ """make sure the construction of path succeed and that we can
+ recreate a DataUrl from a path"""
+ data_slice, expected_path = data
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=data_slice)
+ path = url.path()
+ DataUrl(path=path)
+ assert path == expected_path
+
+
+def test_file_path_none():
+ """
+ make sure a file path can be None
+ """
+ url = DataUrl(scheme="silx", file_path=None, data_path="/path/to/data")
+ assert url.file_path() is None
+ assert url.scheme() == "silx"
+ assert url.data_path() == "/path/to/data"
+
+
+def test_data_path_none():
+ """
+ make sure a data path can be None
+ """
+ url = DataUrl(scheme="silx", file_path="my_file.hdf5", data_path=None)
+ assert url.file_path() == "my_file.hdf5"
+ assert url.scheme() == "silx"
+ assert url.data_path() is None
+
+
+def test_scheme_none():
+ """
+ make sure a scheme can be None
+ """
+ url = DataUrl(scheme=None, file_path="my_file.hdf5", data_path="/path/to/data")
+ assert url.file_path() == "my_file.hdf5"
+ assert url.scheme() is None
+ assert url.data_path() == "/path/to/data"
diff --git a/src/silx/io/test/test_utils.py b/src/silx/io/test/test_utils.py
new file mode 100644
index 0000000..a9c7f6a
--- /dev/null
+++ b/src/silx/io/test/test_utils.py
@@ -0,0 +1,1141 @@
+# /*##########################################################################
+# Copyright (C) 2016-2022 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for utils module"""
+
+import io
+import numpy
+import os
+import re
+import shutil
+import tempfile
+import unittest
+
+from .. import utils
+from ..._version import calc_hexversion
+import silx.io.url
+
+import h5py
+from ..utils import h5ls
+from silx.io import commonh5
+
+
+import fabio
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "03/12/2020"
+
+expected_spec1 = r"""#F .*
+#D .*
+
+#S 1 Ordinate1
+#D .*
+#N 2
+#L Abscissa Ordinate1
+1 4\.00
+2 5\.00
+3 6\.00
+"""
+
+expected_spec2 = (
+ expected_spec1
+ + r"""
+#S 2 Ordinate2
+#D .*
+#N 2
+#L Abscissa Ordinate2
+1 7\.00
+2 8\.00
+3 9\.00
+"""
+)
+
+expected_spec2reg = r"""#F .*
+#D .*
+
+#S 1 Ordinate1
+#D .*
+#N 3
+#L Abscissa Ordinate1 Ordinate2
+1 4\.00 7\.00
+2 5\.00 8\.00
+3 6\.00 9\.00
+"""
+
+expected_spec2irr = (
+ expected_spec1
+ + r"""
+#S 2 Ordinate2
+#D .*
+#N 2
+#L Abscissa Ordinate2
+1 7\.00
+2 8\.00
+"""
+)
+
+expected_csv = r"""Abscissa;Ordinate1;Ordinate2
+1;4\.00;7\.00e\+00
+2;5\.00;8\.00e\+00
+3;6\.00;9\.00e\+00
+"""
+
+expected_csv2 = r"""x;y0;y1
+1;4\.00;7\.00e\+00
+2;5\.00;8\.00e\+00
+3;6\.00;9\.00e\+00
+"""
+
+
+class TestSave(unittest.TestCase):
+ """Test saving curves as SpecFile:"""
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
+ self.csv_fname = os.path.join(self.tempdir, "savecsv.csv")
+ self.npy_fname = os.path.join(self.tempdir, "savenpy.npy")
+
+ self.x = [1, 2, 3]
+ self.xlab = "Abscissa"
+ self.y = [[4, 5, 6], [7, 8, 9]]
+ self.y_irr = [[4, 5, 6], [7, 8]]
+ self.ylabs = ["Ordinate1", "Ordinate2"]
+
+ def tearDown(self):
+ if os.path.isfile(self.spec_fname):
+ os.unlink(self.spec_fname)
+ if os.path.isfile(self.csv_fname):
+ os.unlink(self.csv_fname)
+ if os.path.isfile(self.npy_fname):
+ os.unlink(self.npy_fname)
+ shutil.rmtree(self.tempdir)
+
+ def test_save_csv(self):
+ utils.save1D(
+ self.csv_fname,
+ self.x,
+ self.y,
+ xlabel=self.xlab,
+ ylabels=self.ylabs,
+ filetype="csv",
+ fmt=["%d", "%.2f", "%.2e"],
+ csvdelim=";",
+ autoheader=True,
+ )
+
+ csvf = open(self.csv_fname)
+ actual_csv = csvf.read()
+ csvf.close()
+
+ self.assertRegex(actual_csv, expected_csv)
+
+ def test_save_npy(self):
+ """npy file is saved with numpy.save after building a numpy array
+ and converting it to a named record array"""
+ npyf = open(self.npy_fname, "wb")
+ utils.save1D(npyf, self.x, self.y, xlabel=self.xlab, ylabels=self.ylabs)
+ npyf.close()
+
+ npy_recarray = numpy.load(self.npy_fname)
+
+ self.assertEqual(npy_recarray.shape, (3,))
+ self.assertTrue(
+ numpy.array_equal(npy_recarray["Ordinate1"], numpy.array((4, 5, 6)))
+ )
+
+ def test_savespec_filename(self):
+ """Save SpecFile using savespec()"""
+ utils.savespec(
+ self.spec_fname,
+ self.x,
+ self.y[0],
+ xlabel=self.xlab,
+ ylabel=self.ylabs[0],
+ fmt=["%d", "%.2f"],
+ close_file=True,
+ scan_number=1,
+ )
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec1)
+
+ def test_savespec_file_handle(self):
+ """Save SpecFile using savespec(), passing a file handle"""
+ # first savespec: open, write file header, save y[0] as scan 1,
+ # return file handle
+ specf = utils.savespec(
+ self.spec_fname,
+ self.x,
+ self.y[0],
+ xlabel=self.xlab,
+ ylabel=self.ylabs[0],
+ fmt=["%d", "%.2f"],
+ close_file=False,
+ )
+
+ # second savespec: save y[1] as scan 2, close file
+ utils.savespec(
+ specf,
+ self.x,
+ self.y[1],
+ xlabel=self.xlab,
+ ylabel=self.ylabs[1],
+ fmt=["%d", "%.2f"],
+ write_file_header=False,
+ close_file=True,
+ scan_number=2,
+ )
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec2)
+
+ def test_save_spec_reg(self):
+ """Save SpecFile using save() on a regular pattern"""
+ utils.save1D(
+ self.spec_fname,
+ self.x,
+ self.y,
+ xlabel=self.xlab,
+ ylabels=self.ylabs,
+ filetype="spec",
+ fmt=["%d", "%.2f"],
+ )
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+
+ self.assertRegex(actual_spec, expected_spec2reg)
+
+ def test_save_spec_irr(self):
+ """Save SpecFile using save() on an irregular pattern"""
+ # invalid test case ?!
+ return
+ utils.save1D(
+ self.spec_fname,
+ self.x,
+ self.y_irr,
+ xlabel=self.xlab,
+ ylabels=self.ylabs,
+ filetype="spec",
+ fmt=["%d", "%.2f"],
+ )
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec2irr)
+
+ def test_save_csv_no_labels(self):
+ """Save csv using save(), with autoheader=True but
+ xlabel=None and ylabels=None
+ This is a non-regression test for bug #223"""
+ self.tempdir = tempfile.mkdtemp()
+ self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
+ self.csv_fname = os.path.join(self.tempdir, "savecsv.csv")
+ self.npy_fname = os.path.join(self.tempdir, "savenpy.npy")
+
+ self.x = [1, 2, 3]
+ self.xlab = "Abscissa"
+ self.y = [[4, 5, 6], [7, 8, 9]]
+ self.ylabs = ["Ordinate1", "Ordinate2"]
+ utils.save1D(
+ self.csv_fname, self.x, self.y, autoheader=True, fmt=["%d", "%.2f", "%.2e"]
+ )
+
+ csvf = open(self.csv_fname)
+ actual_csv = csvf.read()
+ csvf.close()
+ self.assertRegex(actual_csv, expected_csv2)
+
+
+def assert_match_any_string_in_list(test, pattern, list_of_strings):
+ for string_ in list_of_strings:
+ if re.match(pattern, string_):
+ return True
+ return False
+
+
+class TestH5Ls(unittest.TestCase):
+ """Test displaying the following HDF5 file structure:
+
+ +foo
+ +bar
+ <HDF5 dataset "spam": shape (2, 2), type "<i8">
+ <HDF5 dataset "tmp": shape (3,), type "<i8">
+ <HDF5 dataset "data": shape (1,), type "<f8">
+
+ """
+
+ def assertMatchAnyStringInList(self, pattern, list_of_strings):
+ for string_ in list_of_strings:
+ if re.match(pattern, string_):
+ return None
+ raise AssertionError(
+ "regex pattern %s does not match any" % pattern
+ + " string in list "
+ + str(list_of_strings)
+ )
+
+ def testHdf5(self):
+ fd, self.h5_fname = tempfile.mkstemp(text=False)
+ # Close and delete (we just want the name)
+ os.close(fd)
+ os.unlink(self.h5_fname)
+ self.h5f = h5py.File(self.h5_fname, "w")
+ self.h5f["/foo/bar/tmp"] = [1, 2, 3]
+ self.h5f["/foo/bar/spam"] = [[1, 2], [3, 4]]
+ self.h5f["/foo/data"] = [3.14]
+ self.h5f.close()
+
+ rep = h5ls(self.h5_fname)
+ lines = rep.split("\n")
+
+ self.assertIn("+foo", lines)
+ self.assertIn("\t+bar", lines)
+
+ match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "[<>]i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "[<>]i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t<HDF5 dataset "data": shape \(1,\), type "[<>]f[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+
+ os.unlink(self.h5_fname)
+
+ # Following test case disabled d/t errors on AppVeyor:
+ # os.unlink(spec_fname)
+ # PermissionError: [WinError 32] The process cannot access the file because
+ # it is being used by another process: 'C:\\...\\savespec.dat'
+
+ # def testSpec(self):
+ # tempdir = tempfile.mkdtemp()
+ # spec_fname = os.path.join(tempdir, "savespec.dat")
+ #
+ # x = [1, 2, 3]
+ # xlab = "Abscissa"
+ # y = [[4, 5, 6], [7, 8, 9]]
+ # ylabs = ["Ordinate1", "Ordinate2"]
+ # utils.save1D(spec_fname, x, y, xlabel=xlab,
+ # ylabels=ylabs, filetype="spec",
+ # fmt=["%d", "%.2f"])
+ #
+ # rep = h5ls(spec_fname)
+ # lines = rep.split("\n")
+ # self.assertIn("+1.1", lines)
+ # self.assertIn("\t+instrument", lines)
+ #
+ # self.assertMatchAnyStringInList(
+ # r'\t\t\t<SPEC dataset "file_header": shape \(\), type "|S60">',
+ # lines)
+ # self.assertMatchAnyStringInList(
+ # r'\t\t<SPEC dataset "Ordinate1": shape \(3L?,\), type "<f4">',
+ # lines)
+ #
+ # os.unlink(spec_fname)
+ # shutil.rmtree(tempdir)
+
+
+class TestOpen(unittest.TestCase):
+ """Test `silx.io.utils.open` function."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+ cls.createResources(cls.tmp_directory)
+
+ @classmethod
+ def createResources(cls, directory):
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/dataset"] = 50
+ h5.close()
+
+ cls.spec_filename = os.path.join(directory, "test.dat")
+ utils.savespec(
+ cls.spec_filename,
+ [1],
+ [1.1],
+ xlabel="x",
+ ylabel="y",
+ fmt=["%d", "%.2f"],
+ close_file=True,
+ scan_number=1,
+ )
+
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
+
+ cls.txt_filename = os.path.join(directory, "test.txt")
+ f = io.open(cls.txt_filename, "w+t")
+ f.write("Kikoo")
+ f.close()
+
+ cls.missing_filename = os.path.join(directory, "test.missing")
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def testH5(self):
+ f = utils.open(self.h5_filename)
+ self.assertIsNotNone(f)
+ self.assertIsInstance(f, h5py.File)
+ f.close()
+
+ def testH5With(self):
+ with utils.open(self.h5_filename) as f:
+ self.assertIsNotNone(f)
+ self.assertIsInstance(f, h5py.File)
+
+ def testH5_withPath(self):
+ f = utils.open(self.h5_filename + "::/group/group/dataset")
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.Dataset)
+ self.assertEqual(f[()], 50)
+ f.close()
+
+ def testH5With_withPath(self):
+ with utils.open(self.h5_filename + "::/group/group") as f:
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.Group)
+ self.assertIn("dataset", f)
+
+ def testSpec(self):
+ f = utils.open(self.spec_filename)
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+ f.close()
+
+ def testSpecWith(self):
+ with utils.open(self.spec_filename) as f:
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+
+ def testEdf(self):
+ f = utils.open(self.edf_filename)
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+ f.close()
+
+ def testEdfWith(self):
+ with utils.open(self.edf_filename) as f:
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+
+ def testUnsupported(self):
+ self.assertRaises(IOError, utils.open, self.txt_filename)
+
+ def testNotExists(self):
+ # load it
+ self.assertRaises(IOError, utils.open, self.missing_filename)
+
+ def test_silx_scheme(self):
+ url = silx.io.url.DataUrl(
+ scheme="silx", file_path=self.h5_filename, data_path="/"
+ )
+ with utils.open(url.path()) as f:
+ self.assertIsNotNone(f)
+ self.assertTrue(silx.io.utils.is_file(f))
+
+ def test_fabio_scheme(self):
+ url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_bad_url(self):
+ url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_sliced_url(self):
+ url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,))
+ self.assertRaises(IOError, utils.open, url.path())
+
+
+class TestNodes(unittest.TestCase):
+ """Test `silx.io.utils.is_` functions."""
+
+ def test_real_h5py_objects(self):
+ name = tempfile.mktemp(suffix=".h5")
+ try:
+ with h5py.File(name, "w") as h5file:
+ h5group = h5file.create_group("arrays")
+ h5dataset = h5group.create_dataset("scalar", data=10)
+
+ self.assertTrue(utils.is_file(h5file))
+ self.assertTrue(utils.is_group(h5file))
+ self.assertFalse(utils.is_dataset(h5file))
+
+ self.assertFalse(utils.is_file(h5group))
+ self.assertTrue(utils.is_group(h5group))
+ self.assertFalse(utils.is_dataset(h5group))
+
+ self.assertFalse(utils.is_file(h5dataset))
+ self.assertFalse(utils.is_group(h5dataset))
+ self.assertTrue(utils.is_dataset(h5dataset))
+ finally:
+ os.unlink(name)
+
+ def test_h5py_like_file(self):
+ class Foo(object):
+ def __init__(self):
+ self.h5_class = utils.H5Type.FILE
+
+ obj = Foo()
+ self.assertTrue(utils.is_file(obj))
+ self.assertTrue(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+ def test_h5py_like_group(self):
+ class Foo(object):
+ def __init__(self):
+ self.h5_class = utils.H5Type.GROUP
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertTrue(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+ def test_h5py_like_dataset(self):
+ class Foo(object):
+ def __init__(self):
+ self.h5_class = utils.H5Type.DATASET
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertFalse(utils.is_group(obj))
+ self.assertTrue(utils.is_dataset(obj))
+
+ def test_bad(self):
+ class Foo(object):
+ def __init__(self):
+ pass
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertFalse(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+ def test_bad_api(self):
+ class Foo(object):
+ def __init__(self):
+ self.h5_class = int
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertFalse(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+
+class TestGetData(unittest.TestCase):
+ """Test `silx.io.utils.get_data` function."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+ cls.createResources(cls.tmp_directory)
+
+ @classmethod
+ def createResources(cls, directory):
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/scalar"] = 50
+ h5["group/group/array"] = [1, 2, 3, 4, 5]
+ h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
+ h5.close()
+
+ cls.spec_filename = os.path.join(directory, "test.dat")
+ utils.savespec(
+ cls.spec_filename,
+ [1],
+ [1.1],
+ xlabel="x",
+ ylabel="y",
+ fmt=["%d", "%.2f"],
+ close_file=True,
+ scan_number=1,
+ )
+
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
+ fabiofile.append_frame(data=data, header=header)
+ fabiofile.write(cls.edf_multiframe_filename)
+
+ cls.txt_filename = os.path.join(directory, "test.txt")
+ f = io.open(cls.txt_filename, "w+t")
+ f.write("Kikoo")
+ f.close()
+
+ cls.missing_filename = os.path.join(directory, "test.missing")
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def test_hdf5_scalar(self):
+ url = "silx:%s?/group/group/scalar" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data, 50)
+
+ def test_hdf5_array(self):
+ url = "silx:%s?/group/group/array" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5,))
+ self.assertEqual(data[0], 1)
+
+ def test_hdf5_array_slice(self):
+ url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5,))
+ self.assertEqual(data[0], 6)
+
+ def test_hdf5_array_slice_out_of_range(self):
+ url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ # ValueError: h5py 2.x
+ # IndexError: h5py 3.x
+ self.assertRaises((ValueError, IndexError), utils.get_data, url)
+
+ def test_edf_using_silx(self):
+ url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_frame(self):
+ url = "fabio:%s?slice=1" % self.edf_multiframe_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_singleframe(self):
+ url = "fabio:%s?slice=0" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_too_much_frames(self):
+ url = "fabio:%s?slice=..." % self.edf_multiframe_filename
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_fabio_no_frame(self):
+ url = "fabio:%s" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_unsupported_scheme(self):
+ url = "foo:/foo/bar"
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_no_scheme(self):
+ url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ self.assertRaises((ValueError, IOError), utils.get_data, url)
+
+ def test_file_not_exists(self):
+ url = "silx:/foo/bar"
+ self.assertRaises(IOError, utils.get_data, url)
+
+
+def _h5_py_version_older_than(version):
+ v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split(".")[:3]]
+ r_majeur, r_mineur, r_micro = [int(i) for i in version.split(".")]
+ return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion(
+ r_majeur, r_mineur, r_micro
+ )
+
+
+@unittest.skipUnless(_h5_py_version_older_than("2.9.0"), "h5py version < 2.9.0")
+class TestRawFileToH5(unittest.TestCase):
+ """Test conversion of .vol file to .h5 external dataset"""
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self._vol_file = os.path.join(self.tempdir, "test_vol.vol")
+ self._file_info = os.path.join(self.tempdir, "test_vol.info.vol")
+ self._dataset_shape = 100, 20, 5
+ data = (
+ numpy.random.random(
+ self._dataset_shape[0] * self._dataset_shape[1] * self._dataset_shape[2]
+ )
+ .astype(dtype=numpy.float32)
+ .reshape(self._dataset_shape)
+ )
+ numpy.save(file=self._vol_file, arr=data)
+ # those are storing into .noz file
+ assert os.path.exists(self._vol_file + ".npy")
+ os.rename(self._vol_file + ".npy", self._vol_file)
+ self.h5_file = os.path.join(self.tempdir, "test_h5.h5")
+ self.external_dataset_path = "/root/my_external_dataset"
+ self._data_url = silx.io.url.DataUrl(
+ file_path=self.h5_file, data_path=self.external_dataset_path
+ )
+ with open(self._file_info, "w") as _fi:
+ _fi.write("NUM_X = %s\n" % self._dataset_shape[2])
+ _fi.write("NUM_Y = %s\n" % self._dataset_shape[1])
+ _fi.write("NUM_Z = %s\n" % self._dataset_shape[0])
+
+ def tearDown(self):
+ shutil.rmtree(self.tempdir)
+
+ def check_dataset(self, h5_file, data_path, shape):
+ """Make sure the external dataset is valid"""
+ with h5py.File(h5_file, "r") as _file:
+ return data_path in _file and _file[data_path].shape == shape
+
+ def test_h5_file_not_existing(self):
+ """Test that can create a file with external dataset from scratch"""
+ utils.rawfile_to_h5_external_dataset(
+ bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32,
+ )
+ self.assertTrue(
+ self.check_dataset(
+ h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape,
+ )
+ )
+ os.remove(self.h5_file)
+ utils.vol_to_h5_external_dataset(
+ vol_file=self._vol_file,
+ output_url=self._data_url,
+ info_file=self._file_info,
+ )
+ self.assertTrue(
+ self.check_dataset(
+ h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape,
+ )
+ )
+
+ def test_h5_file_existing(self):
+ """Test that can add the external dataset from an existing file"""
+ with h5py.File(self.h5_file, "w") as _file:
+ _file["/root/dataset1"] = numpy.zeros((100, 100))
+ _file["/root/group/dataset2"] = numpy.ones((100, 100))
+ utils.rawfile_to_h5_external_dataset(
+ bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32,
+ )
+ self.assertTrue(
+ self.check_dataset(
+ h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape,
+ )
+ )
+
+ def test_vol_file_not_existing(self):
+ """Make sure error is raised if .vol file does not exists"""
+ os.remove(self._vol_file)
+ utils.rawfile_to_h5_external_dataset(
+ bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32,
+ )
+
+ self.assertTrue(
+ self.check_dataset(
+ h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape,
+ )
+ )
+
+ def test_conflicts(self):
+ """Test several conflict cases"""
+ # test if path already exists
+ utils.rawfile_to_h5_external_dataset(
+ bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32,
+ )
+ with self.assertRaises(ValueError):
+ utils.rawfile_to_h5_external_dataset(
+ bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ overwrite=False,
+ dtype=numpy.float32,
+ )
+
+ utils.rawfile_to_h5_external_dataset(
+ bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ overwrite=True,
+ dtype=numpy.float32,
+ )
+
+ self.assertTrue(
+ self.check_dataset(
+ h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape,
+ )
+ )
+
+
+class TestH5Strings(unittest.TestCase):
+ """Test HDF5 str and bytes writing and reading"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir)
+
+ def setUp(self):
+ self.file = h5py.File(os.path.join(self.tempdir, "file.h5"), mode="w")
+
+ def tearDown(self):
+ self.file.close()
+
+ @classmethod
+ def _make_array(cls, value, n, vlen=True):
+ if isinstance(value, bytes):
+ if vlen:
+ dtype = h5py.special_dtype(vlen=bytes)
+ else:
+ if hasattr(h5py, "string_dtype"):
+ dtype = h5py.string_dtype("ascii", len(value))
+ else:
+ dtype = f"|S{len(value)}"
+ elif isinstance(value, str):
+ if vlen:
+ dtype = h5py.special_dtype(vlen=str)
+ else:
+ value = value.encode("utf-8")
+ if hasattr(h5py, "string_dtype"):
+ dtype = h5py.string_dtype("utf-8", len(value))
+ else:
+ dtype = f"|S{len(value)}"
+ else:
+ dtype = None
+ return numpy.array([value] * n, dtype=dtype)
+
+ @classmethod
+ def _get_charset(cls, value):
+ if isinstance(value, bytes):
+ return h5py.h5t.CSET_ASCII
+ elif isinstance(value, str):
+ return h5py.h5t.CSET_UTF8
+ else:
+ return None
+
+ def _check_dataset(self, value, result=None):
+ if result is not None:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+
+ # Write+read scalar
+ charset = self._get_charset(value)
+ self.file["data"] = value
+ data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii)
+ assert isinstance(data, type(result)), data
+ assert data == result, data
+ if charset is not None:
+ assert self.file["data"].id.get_type().get_cset() == charset
+
+ # Write+read variable length
+ no_unicode_support = isinstance(value, str) and not hasattr(
+ h5py, "string_dtype"
+ )
+ if no_unicode_support:
+ decode_ascii = True
+ self.file["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_dataset(
+ self.file["vlen_data"], decode_ascii=decode_ascii, index=0
+ )
+ assert isinstance(data, type(result)), data
+ assert data == result, data
+ data = utils.h5py_read_dataset(
+ self.file["vlen_data"], decode_ascii=decode_ascii
+ )
+ numpy.testing.assert_array_equal(data, [result] * 2)
+ if charset is not None:
+ assert self.file["vlen_data"].id.get_type().get_cset() == charset
+
+ self.file["vlen_empty_array"] = self._make_array(value, 0)
+ data = utils.h5py_read_dataset(
+ self.file["vlen_empty_array"], decode_ascii=decode_ascii
+ )
+ assert data.shape == (0,)
+
+ # Write+read fixed length
+ self.file["flen_data"] = self._make_array(value, 2, vlen=False)
+ data = utils.h5py_read_dataset(
+ self.file["flen_data"], decode_ascii=decode_ascii, index=0
+ )
+ assert isinstance(data, type(result)), data
+ assert data == result, data
+ data = utils.h5py_read_dataset(
+ self.file["flen_data"], decode_ascii=decode_ascii
+ )
+ numpy.testing.assert_array_equal(data, [result] * 2)
+ if charset is not None and not no_unicode_support:
+ assert self.file["flen_data"].id.get_type().get_cset() == charset
+
+ def _check_attribute(self, value, result=None):
+ if result is not None:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+
+ # Write+read scalar
+ self.file.attrs["data"] = value
+ data = utils.h5py_read_attribute(
+ self.file.attrs, "data", decode_ascii=decode_ascii
+ )
+ assert isinstance(data, type(result)), data
+ assert data == result, data
+
+ # Write+read variable length
+ no_unicode_support = isinstance(value, str) and not hasattr(
+ h5py, "string_dtype"
+ )
+ if no_unicode_support:
+ decode_ascii = True
+ self.file.attrs["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_attribute(
+ self.file.attrs, "vlen_data", decode_ascii=decode_ascii
+ )
+ assert isinstance(data[0], type(result)), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)[
+ "vlen_data"
+ ]
+ assert isinstance(data[0], type(result)), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ # Write+read fixed length
+ self.file.attrs["flen_data"] = self._make_array(value, 2, vlen=False)
+ data = utils.h5py_read_attribute(
+ self.file.attrs, "flen_data", decode_ascii=decode_ascii
+ )
+ assert isinstance(data[0], type(result)), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)[
+ "flen_data"
+ ]
+ assert isinstance(data[0], type(result)), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ def test_dataset_ascii_bytes(self):
+ self._check_dataset(b"abc")
+
+ def test_attribute_ascii_bytes(self):
+ self._check_attribute(b"abc")
+
+ def test_dataset_ascii_bytes_decode(self):
+ self._check_dataset(b"abc", result="abc")
+
+ def test_attribute_ascii_bytes_decode(self):
+ self._check_attribute(b"abc", result="abc")
+
+ def test_dataset_ascii_str(self):
+ self._check_dataset("abc")
+
+ def test_attribute_ascii_str(self):
+ self._check_attribute("abc")
+
+ def test_dataset_utf8_str(self):
+ self._check_dataset("\u0101bc")
+
+ def test_attribute_utf8_str(self):
+ self._check_attribute("\u0101bc")
+
+ def test_dataset_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc")
+
+ def test_attribute_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc")
+
+ def test_dataset_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_attribute_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_dataset_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_dataset(b"\xe423")
+
+ def test_attribute_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_attribute(b"\xe423")
+
+ def test_dataset_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_dataset(b"\xe423", result="\udce423")
+
+ def test_attribute_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_attribute(b"\xe423", result="\udce423")
+
+ def test_dataset_no_string(self):
+ self._check_dataset(numpy.int64(10))
+
+ def test_attribute_no_string(self):
+ self._check_attribute(numpy.int64(10))
+
+
+def test_visitall_hdf5(tmp_path):
+ """visit HDF5 file content not following links"""
+ external_filepath = tmp_path / "external.h5"
+ with h5py.File(external_filepath, mode="w") as h5file:
+ h5file["target/dataset"] = 50
+
+ filepath = tmp_path / "base.h5"
+ with h5py.File(filepath, mode="w") as h5file:
+ h5file["group/dataset"] = 50
+ h5file["link/soft_link"] = h5py.SoftLink("/group/dataset")
+ h5file["link/external_link"] = h5py.ExternalLink(
+ "external.h5", "/target/dataset"
+ )
+
+ with h5py.File(filepath, mode="r") as h5file:
+ visited_items = {}
+ for path, item in utils.visitall(h5file):
+ if isinstance(item, h5py.Dataset):
+ content = item[()]
+ elif isinstance(item, h5py.Group):
+ content = None
+ elif isinstance(item, h5py.SoftLink):
+ content = item.path
+ elif isinstance(item, h5py.ExternalLink):
+ content = item.filename, item.path
+ else:
+ raise AssertionError("Item should not be present: %s" % path)
+ visited_items[path] = (item.__class__, content)
+
+ assert visited_items == {
+ "/group": (h5py.Group, None),
+ "/group/dataset": (h5py.Dataset, 50),
+ "/link": (h5py.Group, None),
+ "/link/soft_link": (h5py.SoftLink, "/group/dataset"),
+ "/link/external_link": (h5py.ExternalLink, ("external.h5", "/target/dataset")),
+ }
+
+
+def test_visitall_commonh5():
+ """Visit commonh5 File object"""
+ fobj = commonh5.File("filename.file", mode="w")
+ group = fobj.create_group("group")
+ dataset = group.create_dataset("dataset", data=numpy.array(50))
+ group["soft_link"] = dataset # Create softlink
+
+ visited_items = dict(utils.visitall(fobj))
+ assert len(visited_items) == 3
+ assert visited_items["/group"] is group
+ assert visited_items["/group/dataset"] is dataset
+ soft_link = visited_items["/group/soft_link"]
+ assert isinstance(soft_link, commonh5.SoftLink)
+ assert soft_link.path == "/group/dataset"
+
+
+def test_match_hdf5(tmp_path):
+ """Test match function with HDF5 file"""
+ with h5py.File(tmp_path / "test_match.h5", "w") as h5f:
+ h5f.create_group("entry_0000/group")
+ h5f["entry_0000/data"] = 0
+ h5f.create_group("entry_0001/group")
+ h5f["entry_0001/data"] = 1
+ h5f.create_group("entry_0002")
+ h5f["entry_0003"] = 3
+
+ result = list(utils.match(h5f, "/entry_*/*"))
+
+ assert sorted(result) == [
+ "entry_0000/data",
+ "entry_0000/group",
+ "entry_0001/data",
+ "entry_0001/group",
+ ]
+
+
+def test_match_commonh5():
+ """Test match function with commonh5 objects"""
+ with commonh5.File("filename.file", mode="w") as fobj:
+ fobj.create_group("entry_0000/group")
+ fobj["entry_0000/data"] = 0
+ fobj.create_group("entry_0001/group")
+ fobj["entry_0001/data"] = 1
+ fobj.create_group("entry_0002")
+ fobj["entry_0003"] = 3
+
+ result = list(utils.match(fobj, "/entry_*/*"))
+
+ assert sorted(result) == [
+ "entry_0000/data",
+ "entry_0000/group",
+ "entry_0001/data",
+ "entry_0001/group",
+ ]
+
+
+def test_recursive_match_commonh5():
+ """Test match function with commonh5 objects"""
+ with commonh5.File("filename.file", mode="w") as fobj:
+ fobj["entry_0000/bar/data"] = 0
+ fobj["entry_0001/foo/data"] = 1
+ fobj["entry_0001/foo/data1"] = 2
+ fobj["entry_0003"] = 3
+
+ result = list(utils.match(fobj, "**/data"))
+ assert result == ["entry_0000/bar/data", "entry_0001/foo/data"]
diff --git a/src/silx/io/test/test_write_to_h5.py b/src/silx/io/test/test_write_to_h5.py
new file mode 100644
index 0000000..b74bf0f
--- /dev/null
+++ b/src/silx/io/test/test_write_to_h5.py
@@ -0,0 +1,120 @@
+# /*##########################################################################
+# Copyright (C) 2021 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Test silx.io.convert.write_to_h5"""
+
+
+import h5py
+import numpy
+from silx.io import spech5
+
+from silx.io.convert import write_to_h5
+from silx.io.dictdump import h5todict
+from silx.io import commonh5
+
+
+def test_with_commonh5(tmp_path):
+ """Test write_to_h5 with commonh5 input"""
+ fobj = commonh5.File("filename.txt", mode="w")
+ group = fobj.create_group("group")
+ dataset = group.create_dataset("dataset", data=numpy.array(50))
+ group["soft_link"] = dataset # Create softlink
+
+ output_filepath = tmp_path / "output.h5"
+ write_to_h5(fobj, str(output_filepath))
+
+ assert h5todict(str(output_filepath)) == {
+ "group": {"dataset": numpy.array(50), "soft_link": numpy.array(50)},
+ }
+ with h5py.File(output_filepath, mode="r") as h5file:
+ soft_link = h5file.get("/group/soft_link", getlink=True)
+ assert isinstance(soft_link, h5py.SoftLink)
+ assert soft_link.path == "/group/dataset"
+
+
+def test_with_hdf5(tmp_path):
+ """Test write_to_h5 with HDF5 file input"""
+ filepath = tmp_path / "base.h5"
+ with h5py.File(filepath, mode="w") as h5file:
+ h5file["group/dataset"] = 50
+ h5file["group/soft_link"] = h5py.SoftLink("/group/dataset")
+ h5file["group/external_link"] = h5py.ExternalLink("base.h5", "/group/dataset")
+
+ output_filepath = tmp_path / "output.h5"
+ write_to_h5(str(filepath), str(output_filepath))
+ assert h5todict(str(output_filepath)) == {
+ "group": {"dataset": 50, "soft_link": 50},
+ }
+ with h5py.File(output_filepath, mode="r") as h5file:
+ soft_link = h5file.get("group/soft_link", getlink=True)
+ assert isinstance(soft_link, h5py.SoftLink)
+ assert soft_link.path == "/group/dataset"
+
+
+def test_with_spech5(tmp_path):
+ """Test write_to_h5 with SpecH5 input"""
+ filepath = tmp_path / "file.spec"
+ filepath.write_bytes(
+ bytes(
+ """#F /tmp/sf.dat
+
+#S 1 cmd
+#L a b
+1 2
+""",
+ encoding="ascii",
+ )
+ )
+
+ output_filepath = tmp_path / "output.h5"
+ with spech5.SpecH5(str(filepath)) as spech5file:
+ write_to_h5(spech5file, str(output_filepath))
+ print(h5todict(str(output_filepath)))
+
+ def assert_equal(item1, item2):
+ if isinstance(item1, dict):
+ assert tuple(item1.keys()) == tuple(item2.keys())
+ for key in item1.keys():
+ assert_equal(item1[key], item2[key])
+ else:
+ numpy.array_equal(item1, item2)
+
+ assert_equal(
+ h5todict(str(output_filepath)),
+ {
+ "1.1": {
+ "instrument": {
+ "positioners": {},
+ "specfile": {
+ "file_header": ["#F /tmp/sf.dat"],
+ "scan_header": ["#S 1 cmd", "#L a b"],
+ },
+ },
+ "measurement": {
+ "a": [1.0],
+ "b": [2.0],
+ },
+ "start_time": "",
+ "title": "cmd",
+ },
+ },
+ )