diff options
Diffstat (limited to 'src/silx/io/test')
-rw-r--r-- | src/silx/io/test/__init__.py | 22 | ||||
-rw-r--r-- | src/silx/io/test/test_commonh5.py | 291 | ||||
-rw-r--r-- | src/silx/io/test/test_dictdump.py | 1069 | ||||
-rwxr-xr-x | src/silx/io/test/test_fabioh5.py | 627 | ||||
-rw-r--r-- | src/silx/io/test/test_fioh5.py | 296 | ||||
-rw-r--r-- | src/silx/io/test/test_h5link_utils.py | 116 | ||||
-rw-r--r-- | src/silx/io/test/test_h5py_utils.py | 482 | ||||
-rw-r--r-- | src/silx/io/test/test_nxdata.py | 727 | ||||
-rw-r--r-- | src/silx/io/test/test_octaveh5.py | 197 | ||||
-rw-r--r-- | src/silx/io/test/test_rawh5.py | 83 | ||||
-rw-r--r-- | src/silx/io/test/test_sliceh5.py | 104 | ||||
-rw-r--r-- | src/silx/io/test/test_specfile.py | 386 | ||||
-rw-r--r-- | src/silx/io/test/test_specfilewrapper.py | 189 | ||||
-rw-r--r-- | src/silx/io/test/test_spech5.py | 912 | ||||
-rw-r--r-- | src/silx/io/test/test_spectoh5.py | 186 | ||||
-rw-r--r-- | src/silx/io/test/test_url.py | 300 | ||||
-rw-r--r-- | src/silx/io/test/test_utils.py | 1141 | ||||
-rw-r--r-- | src/silx/io/test/test_write_to_h5.py | 120 |
18 files changed, 7248 insertions, 0 deletions
diff --git a/src/silx/io/test/__init__.py b/src/silx/io/test/__init__.py new file mode 100644 index 0000000..3c723bb --- /dev/null +++ b/src/silx/io/test/__init__.py @@ -0,0 +1,22 @@ +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ diff --git a/src/silx/io/test/test_commonh5.py b/src/silx/io/test/test_commonh5.py new file mode 100644 index 0000000..1b0a3a6 --- /dev/null +++ b/src/silx/io/test/test_commonh5.py @@ -0,0 +1,291 @@ +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for commonh5 wrapper""" + +__authors__ = ["V. Valls"] +__license__ = "MIT" +__date__ = "21/09/2017" + +import logging +import numpy +import unittest +import tempfile +import shutil + +_logger = logging.getLogger(__name__) + +import silx.io +import silx.io.utils +import h5py + +try: + from .. import commonh5 +except ImportError: + commonh5 = None + + +class _TestCommonFeatures(unittest.TestCase): + """Test common features supported by h5py and our implementation.""" + + __test__ = False # ignore abstract class tests + + @classmethod + def createFile(cls): + return None + + @classmethod + def setUpClass(cls): + # Set to None cause create_resource can raise an excpetion + cls.h5 = None + cls.h5 = cls.create_resource() + if cls.h5 is None: + raise unittest.SkipTest("File not created") + + @classmethod + def create_resource(cls): + """Must be implemented""" + return None + + @classmethod + def tearDownClass(cls): + cls.h5 = None + + def test_file(self): + node = self.h5 + self.assertTrue(silx.io.is_file(node)) + self.assertTrue(silx.io.is_group(node)) + self.assertFalse(silx.io.is_dataset(node)) + self.assertEqual(len(node.attrs), 0) + + def test_group(self): + node = self.h5["group"] + self.assertFalse(silx.io.is_file(node)) + self.assertTrue(silx.io.is_group(node)) + self.assertFalse(silx.io.is_dataset(node)) + self.assertEqual(len(node.attrs), 0) + class_ = self.h5.get("group", getclass=True) + classlink = self.h5.get("group", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Group) + self.assertEqual(classlink, h5py.HardLink) + + def test_dataset(self): + node = self.h5["group/dataset"] + self.assertFalse(silx.io.is_file(node)) + self.assertFalse(silx.io.is_group(node)) + self.assertTrue(silx.io.is_dataset(node)) + self.assertEqual(len(node.attrs), 0) + class_ = self.h5.get("group/dataset", getclass=True) + classlink = self.h5.get("group/dataset", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertEqual(classlink, h5py.HardLink) + + def test_soft_link(self): + node = self.h5["link/soft_link"] + self.assertEqual(node.name, "/link/soft_link") + class_ = self.h5.get("link/soft_link", getclass=True) + link = self.h5.get("link/soft_link", getlink=True) + classlink = self.h5.get("link/soft_link", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertTrue(isinstance(link, (h5py.SoftLink, commonh5.SoftLink))) + self.assertTrue(silx.io.utils.is_softlink(link)) + self.assertEqual(classlink, h5py.SoftLink) + + def test_external_link(self): + node = self.h5["link/external_link"] + self.assertEqual(node.name, "/target/dataset") + class_ = self.h5.get("link/external_link", getclass=True) + classlink = self.h5.get("link/external_link", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertEqual(classlink, h5py.ExternalLink) + + def test_external_link_to_link(self): + node = self.h5["link/external_link_to_link"] + self.assertEqual(node.name, "/target/link") + class_ = self.h5.get("link/external_link_to_link", getclass=True) + classlink = self.h5.get( + "link/external_link_to_link", getlink=True, getclass=True + ) + self.assertEqual(class_, h5py.Dataset) + self.assertEqual(classlink, h5py.ExternalLink) + + def test_create_groups(self): + c = self.h5.create_group(self.id() + "/a/b/c") + d = c.create_group("/" + self.id() + "/a/b/d") + + self.assertRaises(ValueError, self.h5.create_group, self.id() + "/a/b/d") + self.assertEqual(c.name, "/" + self.id() + "/a/b/c") + self.assertEqual(d.name, "/" + self.id() + "/a/b/d") + + def test_setitem_python_object_dataset(self): + group = self.h5.create_group(self.id()) + group["a"] = 10 + self.assertEqual(group["a"].dtype.kind, "i") + + def test_setitem_numpy_dataset(self): + group = self.h5.create_group(self.id()) + group["a"] = numpy.array([10, 20, 30]) + self.assertEqual(group["a"].dtype.kind, "i") + self.assertEqual(group["a"].shape, (3,)) + + def test_setitem_link(self): + group = self.h5.create_group(self.id()) + group["a"] = 10 + group["b"] = group["a"] + self.assertEqual(group["b"].dtype.kind, "i") + + def test_setitem_dataset_is_sub_group(self): + self.h5[self.id() + "/a"] = 10 + + +class TestCommonFeatures_h5py(_TestCommonFeatures): + """Check if h5py is compliant with what we expect.""" + + __test__ = True # because _TestCommonFeatures is ignored + + @classmethod + def create_resource(cls): + cls.tmp_dir = tempfile.mkdtemp() + + externalh5 = h5py.File(cls.tmp_dir + "/external.h5", mode="w") + externalh5["target/dataset"] = 50 + externalh5["target/link"] = h5py.SoftLink("/target/dataset") + externalh5.close() + + h5 = h5py.File(cls.tmp_dir + "/base.h5", mode="w") + h5["group/dataset"] = 50 + h5["link/soft_link"] = h5py.SoftLink("/group/dataset") + h5["link/external_link"] = h5py.ExternalLink("external.h5", "/target/dataset") + h5["link/external_link_to_link"] = h5py.ExternalLink( + "external.h5", "/target/link" + ) + + return h5 + + @classmethod + def tearDownClass(cls): + super(TestCommonFeatures_h5py, cls).tearDownClass() + if hasattr(cls, "tmp_dir") and cls.tmp_dir is not None: + shutil.rmtree(cls.tmp_dir) + + +class TestCommonFeatures_commonH5(_TestCommonFeatures): + """Check if commonh5 is compliant with h5py.""" + + __test__ = True # because _TestCommonFeatures is ignored + + @classmethod + def create_resource(cls): + h5 = commonh5.File("base.h5", "w") + h5.create_group("group").create_dataset("dataset", data=numpy.int32(50)) + + link = h5.create_group("link") + link.add_node(commonh5.SoftLink("soft_link", "/group/dataset")) + + return h5 + + def test_external_link(self): + # not applicable + pass + + def test_external_link_to_link(self): + # not applicable + pass + + +class TestSpecificCommonH5(unittest.TestCase): + """Test specific features from commonh5. + + Test of shared features should be done by TestCommonFeatures.""" + + def setUp(self): + if commonh5 is None: + self.skipTest("silx.io.commonh5 is needed") + + def test_node_attrs(self): + node = commonh5.Node("Foo", attrs={"a": 1}) + self.assertEqual(node.attrs["a"], 1) + node.attrs["b"] = 8 + self.assertEqual(node.attrs["b"], 8) + node.attrs["b"] = 2 + self.assertEqual(node.attrs["b"], 2) + + def test_node_readonly_attrs(self): + f = commonh5.File(name="Foo", mode="r") + node = commonh5.Node("Foo", attrs={"a": 1}) + node.attrs["b"] = 8 + f.add_node(node) + self.assertEqual(node.attrs["b"], 8) + try: + node.attrs["b"] = 1 + self.fail() + except RuntimeError: + pass + + def test_create_dataset(self): + f = commonh5.File(name="Foo", mode="w") + node = f.create_dataset("foo", data=numpy.array([1])) + self.assertIs(node.parent, f) + self.assertIs(f["foo"], node) + + def test_create_group(self): + f = commonh5.File(name="Foo", mode="w") + node = f.create_group("foo") + self.assertIs(node.parent, f) + self.assertIs(f["foo"], node) + + def test_readonly_create_dataset(self): + f = commonh5.File(name="Foo", mode="r") + try: + f.create_dataset("foo", data=numpy.array([1])) + self.fail() + except RuntimeError: + pass + + def test_readonly_create_group(self): + f = commonh5.File(name="Foo", mode="r") + try: + f.create_group("foo") + self.fail() + except RuntimeError: + pass + + def test_create_unicode_dataset(self): + f = commonh5.File(name="Foo", mode="w") + try: + f.create_dataset("foo", data=numpy.array("aaaa")) + self.fail() + except TypeError: + pass + + def test_setitem_dataset(self): + self.h5 = commonh5.File(name="Foo", mode="w") + group = self.h5.create_group(self.id()) + group["a"] = commonh5.Dataset(None, data=numpy.array(10)) + self.assertEqual(group["a"].dtype.kind, "i") + + def test_setitem_explicit_link(self): + self.h5 = commonh5.File(name="Foo", mode="w") + group = self.h5.create_group(self.id()) + group["a"] = 10 + group["b"] = commonh5.SoftLink(None, path="/" + self.id() + "/a") + self.assertEqual(group["b"].dtype.kind, "i") diff --git a/src/silx/io/test/test_dictdump.py b/src/silx/io/test/test_dictdump.py new file mode 100644 index 0000000..2bd376e --- /dev/null +++ b/src/silx/io/test/test_dictdump.py @@ -0,0 +1,1069 @@ +# /*########################################################################## +# Copyright (C) 2016-2023 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for dicttoh5 module""" + +__authors__ = ["P. Knobel"] +__license__ = "MIT" +__date__ = "17/01/2018" + + +from collections import defaultdict +from copy import deepcopy +import os +import re +import tempfile +import unittest + +import h5py +import numpy + +try: + import pint +except ImportError: + pint = None +import pytest + +from silx.utils.testutils import LoggingValidator + +from ..configdict import ConfigDict +from .. import dictdump +from ..dictdump import dicttoh5, dicttojson, dump +from ..dictdump import h5todict, load +from ..dictdump import logger as dictdump_logger +from ..utils import is_link +from ..utils import h5py_read_dataset + + +def tree(): + """Tree data structure as a recursive nested dictionary""" + return defaultdict(tree) + + +inhabitants = 160215 + +city_attrs = tree() +city_attrs["Europe"]["France"]["Grenoble"]["area"] = "18.44 km2" +city_attrs["Europe"]["France"]["Grenoble"]["inhabitants"] = inhabitants +city_attrs["Europe"]["France"]["Grenoble"]["coordinates"] = [45.1830, 5.7196] +city_attrs["Europe"]["France"]["Tourcoing"]["area"] + +ext_attrs = tree() +ext_attrs["ext_group"]["dataset"] = 10 +ext_filename = "ext.h5" + +link_attrs = tree() +link_attrs["links"]["group"]["dataset"] = 10 +link_attrs["links"]["group"]["relative_softlink"] = h5py.SoftLink("dataset") +link_attrs["links"]["relative_softlink"] = h5py.SoftLink("group/dataset") +link_attrs["links"]["absolute_softlink"] = h5py.SoftLink("/links/group/dataset") +link_attrs["links"]["external_link"] = h5py.ExternalLink( + ext_filename, "/ext_group/dataset" +) + + +class DictTestCase(unittest.TestCase): + def assertRecursiveEqual(self, expected, actual, nodes=tuple()): + err_msg = "\n\n Tree nodes: {}".format(nodes) + if isinstance(expected, dict): + self.assertTrue(isinstance(actual, dict), msg=err_msg) + self.assertEqual(set(expected.keys()), set(actual.keys()), msg=err_msg) + for k in actual: + self.assertRecursiveEqual( + expected[k], + actual[k], + nodes=nodes + (k,), + ) + return + if isinstance(actual, numpy.ndarray): + actual = actual.tolist() + if isinstance(expected, numpy.ndarray): + expected = expected.tolist() + + self.assertEqual(expected, actual, msg=err_msg) + + +class H5DictTestCase(DictTestCase): + def _dictRoundTripNormalize(self, treedict): + """Convert the dictionary as expected from a round-trip + treedict -> dicttoh5 -> h5todict -> newtreedict + """ + for key, value in list(treedict.items()): + if isinstance(value, dict): + self._dictRoundTripNormalize(value) + + # Expand treedict[("group", "attr_name")] + # to treedict["group"]["attr_name"] + for key, value in list(treedict.items()): + if not isinstance(key, tuple): + continue + # Put the attribute inside the group + grpname, attr = key + if not grpname: + continue + group = treedict.setdefault(grpname, dict()) + if isinstance(group, dict): + del treedict[key] + group[("", attr)] = value + + def dictRoundTripNormalize(self, treedict): + treedict2 = deepcopy(treedict) + self._dictRoundTripNormalize(treedict2) + return treedict2 + + +class TestDictToH5(H5DictTestCase): + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5") + self.h5_ext_fname = os.path.join(self.tempdir, ext_filename) + + def tearDown(self): + if os.path.exists(self.h5_fname): + os.unlink(self.h5_fname) + if os.path.exists(self.h5_ext_fname): + os.unlink(self.h5_ext_fname) + os.rmdir(self.tempdir) + + def testH5CityAttrs(self): + filters = {"shuffle": True, "fletcher32": True} + dicttoh5( + city_attrs, + self.h5_fname, + h5path="/city attributes", + mode="w", + create_dataset_args=filters, + ) + + h5f = h5py.File(self.h5_fname, mode="r") + + self.assertIn("Tourcoing/area", h5f["/city attributes/Europe/France"]) + ds = h5f["/city attributes/Europe/France/Grenoble/inhabitants"] + self.assertEqual(ds[...], 160215) + + # filters only apply to datasets that are not scalars (shape != () ) + ds = h5f["/city attributes/Europe/France/Grenoble/coordinates"] + # self.assertEqual(ds.compression, "gzip") + self.assertTrue(ds.fletcher32) + self.assertTrue(ds.shuffle) + + h5f.close() + + ddict = load(self.h5_fname, fmat="hdf5") + self.assertAlmostEqual( + min( + ddict["city attributes"]["Europe"]["France"]["Grenoble"]["coordinates"] + ), + 5.7196, + ) + + def testAttributes(self): + """Any kind of attribute can be described""" + ddict = { + "group": {"datatset": "hmmm", ("", "group_attr"): 10}, + "dataset": "aaaaaaaaaaaaaaa", + ("", "root_attr"): 11, + ("dataset", "dataset_attr"): 12, + ("group", "group_attr2"): 13, + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttoh5(ddict, h5file) + self.assertEqual(h5file["group"].attrs["group_attr"], 10) + self.assertEqual(h5file.attrs["root_attr"], 11) + self.assertEqual(h5file["dataset"].attrs["dataset_attr"], 12) + self.assertEqual(h5file["group"].attrs["group_attr2"], 13) + + def testPathAttributes(self): + """A group is requested at a path""" + ddict = { + ("", "NX_class"): "NXcollection", + } + with h5py.File(self.h5_fname, "w") as h5file: + # This should not warn + with LoggingValidator(dictdump_logger, warning=0): + dictdump.dicttoh5(ddict, h5file, h5path="foo/bar") + + def testKeyOrder(self): + ddict1 = { + "d": "plow", + ("d", "a"): "ox", + } + ddict2 = { + ("d", "a"): "ox", + "d": "plow", + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttoh5(ddict1, h5file, h5path="g1") + dictdump.dicttoh5(ddict2, h5file, h5path="g2") + self.assertEqual(h5file["g1/d"].attrs["a"], "ox") + self.assertEqual(h5file["g2/d"].attrs["a"], "ox") + + def testAttributeValues(self): + """Any NX data types can be used""" + ddict = { + ("", "bool"): True, + ("", "int"): 11, + ("", "float"): 1.1, + ("", "str"): "a", + ("", "boollist"): [True, False, True], + ("", "intlist"): [11, 22, 33], + ("", "floatlist"): [1.1, 2.2, 3.3], + ("", "strlist"): ["a", "bb", "ccc"], + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttoh5(ddict, h5file) + for k, expected in ddict.items(): + result = h5file.attrs[k[1]] + if isinstance(expected, list): + if isinstance(expected[0], str): + numpy.testing.assert_array_equal(result, expected) + else: + numpy.testing.assert_array_almost_equal(result, expected) + else: + self.assertEqual(result, expected) + + def testAttributeAlreadyExists(self): + """A duplicated attribute warns if overwriting is not enabled""" + ddict = { + "group": {"dataset": "hmmm", ("", "attr"): 10}, + ("group", "attr"): 10, + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttoh5(ddict, h5file) + self.assertEqual(h5file["group"].attrs["attr"], 10) + + def testFlatDict(self): + """Description of a tree with a single level of keys""" + ddict = { + "group/group/dataset": 10, + ("group/group/dataset", "attr"): 11, + ("group/group", "attr"): 12, + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttoh5(ddict, h5file) + self.assertEqual(h5file["group/group/dataset"][()], 10) + self.assertEqual(h5file["group/group/dataset"].attrs["attr"], 11) + self.assertEqual(h5file["group/group"].attrs["attr"], 12) + + def testLinks(self): + with h5py.File(self.h5_ext_fname, "w") as h5file: + dictdump.dicttoh5(ext_attrs, h5file) + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttoh5(link_attrs, h5file) + with h5py.File(self.h5_fname, "r") as h5file: + self.assertEqual(h5file["links/group/dataset"][()], 10) + self.assertEqual(h5file["links/group/relative_softlink"][()], 10) + self.assertEqual(h5file["links/relative_softlink"][()], 10) + self.assertEqual(h5file["links/absolute_softlink"][()], 10) + self.assertEqual(h5file["links/external_link"][()], 10) + + def testDumpNumpyArray(self): + ddict = { + "darks": {"0": numpy.array([[0, 0, 0], [0, 0, 0]], dtype=numpy.uint16)} + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttoh5(ddict, h5file) + with h5py.File(self.h5_fname, "r") as h5file: + numpy.testing.assert_array_equal( + h5py_read_dataset(h5file["darks"]["0"]), ddict["darks"]["0"] + ) + + def testOverwrite(self): + # Tree structure that will be tested + group1 = { + ("", "attr2"): "original2", + "dset1": 0, + "dset2": [0, 1], + ("dset1", "attr1"): "original1", + ("dset1", "attr2"): "original2", + ("dset2", "attr1"): "original1", + ("dset2", "attr2"): "original2", + } + group2 = { + "subgroup1": group1.copy(), + "subgroup2": group1.copy(), + ("subgroup1", "attr1"): "original1", + ("subgroup2", "attr1"): "original1", + } + group2.update(group1) + # initial HDF5 tree + otreedict = { + ("", "attr1"): "original1", + ("", "attr2"): "original2", + "group1": group1, + "group2": group2, + ("group1", "attr1"): "original1", + ("group2", "attr1"): "original1", + } + wtreedict = None # dumped dictionary + etreedict = None # expected HDF5 tree after dump + + def reset_file(): + dicttoh5( + otreedict, + h5file=self.h5_fname, + mode="w", + ) + + def append_file(update_mode): + dicttoh5(wtreedict, h5file=self.h5_fname, mode="a", update_mode=update_mode) + + def assert_file(): + rtreedict = h5todict(self.h5_fname, include_attributes=True, asarray=False) + netreedict = self.dictRoundTripNormalize(etreedict) + try: + self.assertRecursiveEqual(netreedict, rtreedict) + except AssertionError: + from pprint import pprint + + print("\nDUMP:") + pprint(wtreedict) + print("\nEXPECTED:") + pprint(netreedict) + print("\nHDF5:") + pprint(rtreedict) + raise + + def assert_append(update_mode): + append_file(update_mode) + assert_file() + + # Test wrong arguments + with self.assertRaises(ValueError): + dicttoh5( + otreedict, h5file=self.h5_fname, mode="w", update_mode="wrong-value" + ) + + # No writing + reset_file() + etreedict = deepcopy(otreedict) + assert_file() + + # Write identical dictionary + wtreedict = deepcopy(otreedict) + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add", "modify", "replace"]: + assert_append(update_mode) + + # Write empty dictionary + wtreedict = dict() + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add", "modify", "replace"]: + assert_append(update_mode) + + # Modified dataset + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = dict() + wtreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]} + wtreedict["group2"]["subgroup2"]["dset2"] = [10, 20] + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + etreedict["group2"]["subgroup2"]["dset2"] = [10, 20] + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = dict() + etreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]} + etreedict["group2"]["subgroup2"]["dset2"] = [10, 20] + assert_append("replace") + + # Modified group + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = [0, 1] + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add", "modify"]: + assert_append(update_mode) + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = [0, 1] + assert_append("replace") + + # Modified attribute + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = dict() + wtreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified" + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + etreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified" + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = dict() + etreedict["group2"]["subgroup2"]["dset1"] = dict() + etreedict["group2"]["subgroup2"]["dset1"][("", "attr1")] = "modified" + assert_append("replace") + + # Delete group + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = None + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + del etreedict["group2"]["subgroup2"] + del etreedict["group2"][("subgroup2", "attr1")] + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + assert_append("replace") + + # Delete dataset + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = dict() + wtreedict["group2"]["subgroup2"]["dset2"] = None + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + del etreedict["group2"]["subgroup2"]["dset2"] + del etreedict["group2"]["subgroup2"][("dset2", "attr1")] + del etreedict["group2"]["subgroup2"][("dset2", "attr2")] + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = dict() + assert_append("replace") + + # Delete attribute + wtreedict = dict() + wtreedict["group2"] = dict() + wtreedict["group2"]["subgroup2"] = dict() + wtreedict["group2"]["subgroup2"][("dset2", "attr1")] = None + + reset_file() + etreedict = deepcopy(otreedict) + for update_mode in [None, "add"]: + assert_append(update_mode) + + del etreedict["group2"]["subgroup2"][("dset2", "attr1")] + assert_append("modify") + + etreedict["group2"] = dict() + del etreedict[("group2", "attr1")] + etreedict["group2"]["subgroup2"] = dict() + etreedict["group2"]["subgroup2"]["dset2"] = dict() + assert_append("replace") + + +@pytest.mark.skipif(pint is None, reason="Require pint") +def test_dicttoh5_pint(tmp_h5py_file): + ureg = pint.UnitRegistry() + treedict = { + "array_mm": pint.Quantity([1, 2, 3], ureg.mm), + "value_kg": 3 * ureg.kg, + } + + dicttoh5(treedict, tmp_h5py_file) + + result = h5todict(tmp_h5py_file) + assert set(treedict.keys()) == set(result.keys()) + for key, value in treedict.items(): + assert numpy.array_equal(result[key], value.magnitude) + + +def test_dicttoh5_not_serializable(tmp_h5py_file): + treedict = {"group": {"dset": [{"a": 1}]}} + err_msg = "Failed to create dataset '/group/dset' with data (numpy.ndarray-object) = [{'a': 1}]" + with pytest.raises(ValueError, match=re.escape(err_msg)): + dicttoh5(treedict, tmp_h5py_file) + + +class TestH5ToDict(H5DictTestCase): + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5") + self.h5_ext_fname = os.path.join(self.tempdir, ext_filename) + dicttoh5(city_attrs, self.h5_fname) + dicttoh5(link_attrs, self.h5_fname, mode="a") + dicttoh5(ext_attrs, self.h5_ext_fname) + + def tearDown(self): + if os.path.exists(self.h5_fname): + os.unlink(self.h5_fname) + if os.path.exists(self.h5_ext_fname): + os.unlink(self.h5_ext_fname) + os.rmdir(self.tempdir) + + def testExcludeNames(self): + ddict = h5todict( + self.h5_fname, + path="/Europe/France", + exclude_names=["ourcoing", "inhab", "toto"], + ) + self.assertNotIn("Tourcoing", ddict) + self.assertIn("Grenoble", ddict) + + self.assertNotIn("inhabitants", ddict["Grenoble"]) + self.assertIn("coordinates", ddict["Grenoble"]) + self.assertIn("area", ddict["Grenoble"]) + + def testAsArrayTrue(self): + """Test with asarray=True, the default""" + ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble") + self.assertTrue( + numpy.array_equal(ddict["inhabitants"], numpy.array(inhabitants)) + ) + + def testAsArrayFalse(self): + """Test with asarray=False""" + ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble", asarray=False) + self.assertEqual(ddict["inhabitants"], inhabitants) + + def testDereferenceLinks(self): + ddict = h5todict(self.h5_fname, path="links", dereference_links=True) + self.assertTrue(ddict["absolute_softlink"], 10) + self.assertTrue(ddict["relative_softlink"], 10) + self.assertTrue(ddict["external_link"], 10) + self.assertTrue(ddict["group"]["relative_softlink"], 10) + + def testPreserveLinks(self): + ddict = h5todict(self.h5_fname, path="links", dereference_links=False) + self.assertTrue(is_link(ddict["absolute_softlink"])) + self.assertTrue(is_link(ddict["relative_softlink"])) + self.assertTrue(is_link(ddict["external_link"])) + self.assertTrue(is_link(ddict["group"]["relative_softlink"])) + + def testStrings(self): + ddict = { + "dset_bytes": b"bytes", + "dset_utf8": "utf8", + "dset_2bytes": [b"bytes", b"bytes"], + "dset_2utf8": ["utf8", "utf8"], + ("", "attr_bytes"): b"bytes", + ("", "attr_utf8"): "utf8", + ("", "attr_2bytes"): [b"bytes", b"bytes"], + ("", "attr_2utf8"): ["utf8", "utf8"], + } + dicttoh5(ddict, self.h5_fname, mode="w") + adict = h5todict(self.h5_fname, include_attributes=True, asarray=False) + self.assertEqual(ddict["dset_bytes"], adict["dset_bytes"]) + self.assertEqual(ddict["dset_utf8"], adict["dset_utf8"]) + self.assertEqual(ddict[("", "attr_bytes")], adict[("", "attr_bytes")]) + self.assertEqual(ddict[("", "attr_utf8")], adict[("", "attr_utf8")]) + numpy.testing.assert_array_equal(ddict["dset_2bytes"], adict["dset_2bytes"]) + numpy.testing.assert_array_equal(ddict["dset_2utf8"], adict["dset_2utf8"]) + numpy.testing.assert_array_equal( + ddict[("", "attr_2bytes")], adict[("", "attr_2bytes")] + ) + numpy.testing.assert_array_equal( + ddict[("", "attr_2utf8")], adict[("", "attr_2utf8")] + ) + + +class TestDictToNx(H5DictTestCase): + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.h5_fname = os.path.join(self.tempdir, "nx.h5") + self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5") + + def tearDown(self): + if os.path.exists(self.h5_fname): + os.unlink(self.h5_fname) + if os.path.exists(self.h5_ext_fname): + os.unlink(self.h5_ext_fname) + os.rmdir(self.tempdir) + + def testAttributes(self): + """Any kind of attribute can be described""" + ddict = { + "group": {"dataset": 100, "@group_attr1": 10}, + "dataset": 200, + "@root_attr": 11, + "dataset@dataset_attr": "12", + "group@group_attr2": 13, + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttonx(ddict, h5file) + self.assertEqual(h5file["group"].attrs["group_attr1"], 10) + self.assertEqual(h5file.attrs["root_attr"], 11) + self.assertEqual(h5file["dataset"].attrs["dataset_attr"], "12") + self.assertEqual(h5file["group"].attrs["group_attr2"], 13) + + def testKeyOrder(self): + ddict1 = { + "d": "plow", + "d@a": "ox", + } + ddict2 = { + "d@a": "ox", + "d": "plow", + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttonx(ddict1, h5file, h5path="g1") + dictdump.dicttonx(ddict2, h5file, h5path="g2") + self.assertEqual(h5file["g1/d"].attrs["a"], "ox") + self.assertEqual(h5file["g2/d"].attrs["a"], "ox") + + def testAttributeValues(self): + """Any NX data types can be used""" + ddict = { + "@bool": True, + "@int": 11, + "@float": 1.1, + "@str": "a", + "@boollist": [True, False, True], + "@intlist": [11, 22, 33], + "@floatlist": [1.1, 2.2, 3.3], + "@strlist": ["a", "bb", "ccc"], + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttonx(ddict, h5file) + for k, expected in ddict.items(): + result = h5file.attrs[k[1:]] + if isinstance(expected, list): + if isinstance(expected[0], str): + numpy.testing.assert_array_equal(result, expected) + else: + numpy.testing.assert_array_almost_equal(result, expected) + else: + self.assertEqual(result, expected) + + def testFlatDict(self): + """Description of a tree with a single level of keys""" + ddict = { + "group/group/dataset": 10, + "group/group/dataset@attr": 11, + "group/group@attr": 12, + } + with h5py.File(self.h5_fname, "w") as h5file: + dictdump.dicttonx(ddict, h5file) + self.assertEqual(h5file["group/group/dataset"][()], 10) + self.assertEqual(h5file["group/group/dataset"].attrs["attr"], 11) + self.assertEqual(h5file["group/group"].attrs["attr"], 12) + + def testLinks(self): + ddict = {"ext_group": {"dataset": 10}} + dictdump.dicttonx(ddict, self.h5_ext_fname) + ddict = { + "links": { + "group": {"dataset": 10, ">relative_softlink": "dataset"}, + ">relative_softlink": "group/dataset", + ">absolute_softlink": "/links/group/dataset", + ">external_link": "nx_ext.h5::/ext_group/dataset", + } + } + dictdump.dicttonx(ddict, self.h5_fname) + with h5py.File(self.h5_fname, "r") as h5file: + self.assertEqual(h5file["links/group/dataset"][()], 10) + self.assertEqual(h5file["links/group/relative_softlink"][()], 10) + self.assertEqual(h5file["links/relative_softlink"][()], 10) + self.assertEqual(h5file["links/absolute_softlink"][()], 10) + self.assertEqual(h5file["links/external_link"][()], 10) + + def testUpLinks(self): + ddict = { + "data": {"group": {"dataset": 10, ">relative_softlink": "dataset"}}, + "links": { + "group": { + "subgroup": {">relative_softlink": "../../../data/group/dataset"} + } + }, + } + dictdump.dicttonx(ddict, self.h5_fname) + with h5py.File(self.h5_fname, "r") as h5file: + self.assertEqual(h5file["/links/group/subgroup/relative_softlink"][()], 10) + + def testOverwrite(self): + entry_name = "entry" + wtreedict = { + "group1": {"a": 1, "b": 2}, + "group2@attr3": "attr3", + "group2@attr4": "attr4", + "group2": { + "@attr1": "attr1", + "@attr2": "attr2", + "c": 3, + "d": 4, + "dataset4": 8, + "dataset4@units": "keV", + }, + "group3": {"subgroup": {"e": 9, "f": 10}}, + "dataset1": 5, + "dataset2": 6, + "dataset3": 7, + "dataset3@units": "mm", + } + esubtree = { + "@NX_class": "NXentry", + "group1": {"@NX_class": "NXcollection", "a": 1, "b": 2}, + "group2": { + "@NX_class": "NXcollection", + "@attr1": "attr1", + "@attr2": "attr2", + "@attr3": "attr3", + "@attr4": "attr4", + "c": 3, + "d": 4, + "dataset4": 8, + "dataset4@units": "keV", + }, + "group3": { + "@NX_class": "NXcollection", + "subgroup": {"@NX_class": "NXcollection", "e": 9, "f": 10}, + }, + "dataset1": 5, + "dataset2": 6, + "dataset3": 7, + "dataset3@units": "mm", + } + etreedict = {entry_name: esubtree} + + def append_file(update_mode, add_nx_class): + dictdump.dicttonx( + wtreedict, + h5file=self.h5_fname, + mode="a", + h5path=entry_name, + update_mode=update_mode, + add_nx_class=add_nx_class, + ) + + def assert_file(): + rtreedict = dictdump.nxtodict( + self.h5_fname, + include_attributes=True, + asarray=False, + ) + netreedict = self.dictRoundTripNormalize(etreedict) + try: + self.assertRecursiveEqual(netreedict, rtreedict) + except AssertionError: + from pprint import pprint + + print("\nDUMP:") + pprint(wtreedict) + print("\nEXPECTED:") + pprint(netreedict) + print("\nHDF5:") + pprint(rtreedict) + raise + + def assert_append(update_mode, add_nx_class=None): + append_file(update_mode, add_nx_class=add_nx_class) + assert_file() + + # First to an empty file + assert_append(None) + + # Add non-existing attributes/datasets/groups + wtreedict["group1"].pop("a") + wtreedict["group2"].pop("@attr1") + wtreedict["group2"]["@attr2"] = "attr3" # only for update + wtreedict["group2"]["@type"] = "test" + wtreedict["group2"]["dataset4"] = 9 # only for update + del wtreedict["group2"]["dataset4@units"] + wtreedict["group3"] = {} + esubtree["group2"]["@type"] = "test" + assert_append("add") + + # Add update existing attributes and datasets + esubtree["group2"]["@attr2"] = "attr3" + esubtree["group2"]["dataset4"] = 9 + assert_append("modify") + + # Do not add missing NX_class by default when updating + wtreedict["group2"]["@NX_class"] = "NXprocess" + esubtree["group2"]["@NX_class"] = "NXprocess" + assert_append("modify") + del wtreedict["group2"]["@NX_class"] + assert_append("modify") + + # Overwrite existing groups/datasets/attributes + esubtree["group1"].pop("a") + esubtree["group2"].pop("@attr1") + esubtree["group2"]["@NX_class"] = "NXcollection" + esubtree["group2"]["dataset4"] = 9 + del esubtree["group2"]["dataset4@units"] + esubtree["group3"] = {"@NX_class": "NXcollection"} + assert_append("replace", add_nx_class=True) + + +@pytest.mark.skipif(pint is None, reason="Require pint") +def test_dicttonx_pint(tmp_h5py_file): + ureg = pint.UnitRegistry() + treedict = { + "array_mm": pint.Quantity([1, 2, 3], ureg.mm), + "value_kg": 3 * ureg.kg, + } + + dictdump.dicttonx(treedict, tmp_h5py_file) + + result = dictdump.nxtodict(tmp_h5py_file) + for key, value in treedict.items(): + assert numpy.array_equal(result[key], value.magnitude) + assert result[f"{key}@units"] == f"{value.units:~C}" + + +class TestNxToDict(H5DictTestCase): + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.h5_fname = os.path.join(self.tempdir, "nx.h5") + self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5") + + def tearDown(self): + if os.path.exists(self.h5_fname): + os.unlink(self.h5_fname) + if os.path.exists(self.h5_ext_fname): + os.unlink(self.h5_ext_fname) + os.rmdir(self.tempdir) + + def testAttributes(self): + """Any kind of attribute can be described""" + ddict = { + "group": {"dataset": 100, "@group_attr1": 10}, + "dataset": 200, + "@root_attr": 11, + "dataset@dataset_attr": "12", + "group@group_attr2": 13, + } + dictdump.dicttonx(ddict, self.h5_fname) + ddict = dictdump.nxtodict(self.h5_fname, include_attributes=True) + self.assertEqual(ddict["group"]["@group_attr1"], 10) + self.assertEqual(ddict["@root_attr"], 11) + self.assertEqual(ddict["dataset@dataset_attr"], "12") + self.assertEqual(ddict["group"]["@group_attr2"], 13) + + def testDereferenceLinks(self): + """Write links and dereference on read""" + ddict = {"ext_group": {"dataset": 10}} + dictdump.dicttonx(ddict, self.h5_ext_fname) + ddict = { + "links": { + "group": {"dataset": 10, ">relative_softlink": "dataset"}, + ">relative_softlink": "group/dataset", + ">absolute_softlink": "/links/group/dataset", + ">external_link": "nx_ext.h5::/ext_group/dataset", + } + } + dictdump.dicttonx(ddict, self.h5_fname) + + ddict = dictdump.h5todict(self.h5_fname, dereference_links=True) + self.assertTrue(ddict["links"]["absolute_softlink"], 10) + self.assertTrue(ddict["links"]["relative_softlink"], 10) + self.assertTrue(ddict["links"]["external_link"], 10) + self.assertTrue(ddict["links"]["group"]["relative_softlink"], 10) + + def testPreserveLinks(self): + """Write/read links""" + ddict = {"ext_group": {"dataset": 10}} + dictdump.dicttonx(ddict, self.h5_ext_fname) + ddict = { + "links": { + "group": {"dataset": 10, ">relative_softlink": "dataset"}, + ">relative_softlink": "group/dataset", + ">absolute_softlink": "/links/group/dataset", + ">external_link": "nx_ext.h5::/ext_group/dataset", + } + } + dictdump.dicttonx(ddict, self.h5_fname) + + ddict = dictdump.nxtodict(self.h5_fname, dereference_links=False) + self.assertTrue(ddict["links"][">absolute_softlink"], "dataset") + self.assertTrue(ddict["links"][">relative_softlink"], "group/dataset") + self.assertTrue(ddict["links"][">external_link"], "/links/group/dataset") + self.assertTrue( + ddict["links"]["group"][">relative_softlink"], + "nx_ext.h5::/ext_group/datase", + ) + + def testNotExistingPath(self): + """Test converting not existing path""" + with h5py.File(self.h5_fname, "a") as f: + f["data"] = 1 + + ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors="ignore") + self.assertFalse(ddict) + + with LoggingValidator(dictdump_logger, error=1): + ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors="log") + self.assertFalse(ddict) + + with self.assertRaises(KeyError): + h5todict(self.h5_fname, path="/I/am/not/a/path", errors="raise") + + def testBrokenLinks(self): + """Test with broken links""" + with h5py.File(self.h5_fname, "a") as f: + f["/Mars/BrokenSoftLink"] = h5py.SoftLink("/Idontexists") + f["/Mars/BrokenExternalLink"] = h5py.ExternalLink( + "notexistingfile.h5", "/Idontexists" + ) + + ddict = h5todict(self.h5_fname, path="/Mars", errors="ignore") + self.assertFalse(ddict) + + with LoggingValidator(dictdump_logger, error=2): + ddict = h5todict(self.h5_fname, path="/Mars", errors="log") + self.assertFalse(ddict) + + with self.assertRaises(KeyError): + h5todict(self.h5_fname, path="/Mars", errors="raise") + + +class TestDictToJson(DictTestCase): + def setUp(self): + self.dir_path = tempfile.mkdtemp() + self.json_fname = os.path.join(self.dir_path, "cityattrs.json") + + def tearDown(self): + os.unlink(self.json_fname) + os.rmdir(self.dir_path) + + def testJsonCityAttrs(self): + self.json_fname = os.path.join(self.dir_path, "cityattrs.json") + dicttojson(city_attrs, self.json_fname, indent=3) + + with open(self.json_fname, "r") as f: + json_content = f.read() + self.assertIn('"inhabitants": 160215', json_content) + + +class TestDictToIni(DictTestCase): + def setUp(self): + self.dir_path = tempfile.mkdtemp() + self.ini_fname = os.path.join(self.dir_path, "test.ini") + + def tearDown(self): + os.unlink(self.ini_fname) + os.rmdir(self.dir_path) + + def testConfigDictIO(self): + """Ensure values and types of data is preserved when dictionary is + written to file and read back.""" + testdict = { + "simple_types": { + "float": 1.0, + "int": 1, + "percent string": "5 % is too much", + "backslash string": "i can use \\", + "empty_string": "", + "nonestring": "None", + "nonetype": None, + "interpstring": "interpolation: %(percent string)s", + }, + "containers": { + "list": [-1, "string", 3.0, False, None], + "array": numpy.array([1.0, 2.0, 3.0]), + "dict": { + "key1": "Hello World", + "key2": 2.0, + }, + }, + } + + dump(testdict, self.ini_fname) + + # read the data back + readdict = load(self.ini_fname) + + testdictkeys = list(testdict.keys()) + readkeys = list(readdict.keys()) + + self.assertTrue( + len(readkeys) == len(testdictkeys), "Number of read keys not equal" + ) + + self.assertEqual( + readdict["simple_types"]["interpstring"], "interpolation: 5 % is too much" + ) + + testdict["simple_types"]["interpstring"] = "interpolation: 5 % is too much" + + for key in testdict["simple_types"]: + original = testdict["simple_types"][key] + read = readdict["simple_types"][key] + self.assertEqual( + read, original, "Read <%s> instead of <%s>" % (read, original) + ) + + for key in testdict["containers"]: + original = testdict["containers"][key] + read = readdict["containers"][key] + if key == "array": + self.assertEqual( + read.all(), + original.all(), + "Read <%s> instead of <%s>" % (read, original), + ) + else: + self.assertEqual( + read, original, "Read <%s> instead of <%s>" % (read, original) + ) + + def testConfigDictOrder(self): + """Ensure order is preserved when dictionary is + written to file and read back.""" + test_dict = {"banana": 3, "apple": 4, "pear": 1, "orange": 2} + # sort by key + test_ordered_dict1 = dict(sorted(test_dict.items(), key=lambda t: t[0])) + # sort by value + test_ordered_dict2 = dict(sorted(test_dict.items(), key=lambda t: t[1])) + # add the two ordered dict as sections of a third ordered dict + test_ordered_dict3 = {} + test_ordered_dict3["section1"] = test_ordered_dict1 + test_ordered_dict3["section2"] = test_ordered_dict2 + + # write to ini and read back as a ConfigDict + dump(test_ordered_dict3, self.ini_fname, fmat="ini") + read_instance = ConfigDict() + read_instance.read(self.ini_fname) + + # loop through original and read-back dictionaries, + # test identical order for key/value pairs + for orig_key, section in zip(test_ordered_dict3.keys(), read_instance.keys()): + self.assertEqual(orig_key, section) + for orig_key2, read_key in zip( + test_ordered_dict3[section].keys(), read_instance[section].keys() + ): + self.assertEqual(orig_key2, read_key) + self.assertEqual( + test_ordered_dict3[section][orig_key2], + read_instance[section][read_key], + ) diff --git a/src/silx/io/test/test_fabioh5.py b/src/silx/io/test/test_fabioh5.py new file mode 100755 index 0000000..9c92f15 --- /dev/null +++ b/src/silx/io/test/test_fabioh5.py @@ -0,0 +1,627 @@ +# /*########################################################################## +# Copyright (C) 2016-2023 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for fabioh5 wrapper""" + +__authors__ = ["V. Valls"] +__license__ = "MIT" +__date__ = "02/07/2018" + +import os +import logging +import numpy +import unittest +import tempfile +import shutil + +_logger = logging.getLogger(__name__) + +import fabio +import fabio.file_series +import h5py + +from .. import commonh5 +from .. import fabioh5 + + +class TestFabioH5(unittest.TestCase): + def setUp(self): + header = { + "integer": "-100", + "float": "1.0", + "string": "hi!", + "list_integer": "100 50 0", + "list_float": "1.0 2.0 3.5", + "string_looks_like_list": "2000 hi!", + } + data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64) + self.fabio_image = fabio.numpyimage.NumpyImage(data, header) + self.h5_image = fabioh5.File(fabio_image=self.fabio_image) + + def test_main_groups(self): + self.assertEqual(self.h5_image.h5py_class, h5py.File) + self.assertEqual(self.h5_image["/"].h5py_class, h5py.File) + self.assertEqual(self.h5_image["/scan_0"].h5py_class, h5py.Group) + self.assertEqual(self.h5_image["/scan_0/instrument"].h5py_class, h5py.Group) + self.assertEqual(self.h5_image["/scan_0/measurement"].h5py_class, h5py.Group) + + def test_wrong_path_syntax(self): + # result tested with a default h5py file + self.assertRaises(ValueError, lambda: self.h5_image[""]) + + def test_wrong_root_name(self): + # result tested with a default h5py file + self.assertRaises(KeyError, lambda: self.h5_image["/foo"]) + + def test_wrong_root_path(self): + # result tested with a default h5py file + self.assertRaises(KeyError, lambda: self.h5_image["/foo/foo"]) + + def test_wrong_name(self): + # result tested with a default h5py file + self.assertRaises(KeyError, lambda: self.h5_image["foo"]) + + def test_wrong_path(self): + # result tested with a default h5py file + self.assertRaises(KeyError, lambda: self.h5_image["foo/foo"]) + + def test_single_frame(self): + data = numpy.arange(2 * 3) + data.shape = 2, 3 + fabio_image = fabio.edfimage.edfimage(data=data) + h5_image = fabioh5.File(fabio_image=fabio_image) + + dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertTrue(isinstance(dataset[()], numpy.ndarray)) + self.assertEqual(dataset.dtype.kind, "i") + self.assertEqual(dataset.shape, (2, 3)) + self.assertEqual(dataset[...][0, 0], 0) + self.assertEqual(dataset.attrs["interpretation"], "image") + + def test_multi_frames(self): + data = numpy.arange(2 * 3) + data.shape = 2, 3 + fabio_image = fabio.edfimage.edfimage(data=data) + fabio_image.append_frame(data=data) + h5_image = fabioh5.File(fabio_image=fabio_image) + + dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertTrue(isinstance(dataset[()], numpy.ndarray)) + self.assertEqual(dataset.dtype.kind, "i") + self.assertEqual(dataset.shape, (2, 2, 3)) + self.assertEqual(dataset[...][0, 0, 0], 0) + self.assertEqual(dataset.attrs["interpretation"], "image") + + def test_heterogeneous_frames(self): + """Frames containing 2 images with different sizes and a cube""" + data1 = numpy.arange(2 * 3) + data1.shape = 2, 3 + data2 = numpy.arange(2 * 5) + data2.shape = 2, 5 + data3 = numpy.arange(2 * 5 * 1) + data3.shape = 2, 5, 1 + fabio_image = fabio.edfimage.edfimage(data=data1) + fabio_image.append_frame(data=data2) + fabio_image.append_frame(data=data3) + h5_image = fabioh5.File(fabio_image=fabio_image) + + dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertTrue(isinstance(dataset[()], numpy.ndarray)) + self.assertEqual(dataset.dtype.kind, "i") + self.assertEqual(dataset.shape, (3, 2, 5, 1)) + self.assertEqual(dataset[...][0, 0, 0], 0) + self.assertEqual(dataset.attrs["interpretation"], "image") + + def test_single_3d_frame(self): + """Image source contains a cube""" + data = numpy.arange(2 * 3 * 4) + data.shape = 2, 3, 4 + # Do not provide the data to the constructor to avoid slicing of the + # data. In this way the result stay a cube, and not a multi-frame + fabio_image = fabio.edfimage.edfimage() + fabio_image.data = data + h5_image = fabioh5.File(fabio_image=fabio_image) + + dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertTrue(isinstance(dataset[()], numpy.ndarray)) + self.assertEqual(dataset.dtype.kind, "i") + self.assertEqual(dataset.shape, (2, 3, 4)) + self.assertEqual(dataset[...][0, 0, 0], 0) + self.assertEqual(dataset.attrs["interpretation"], "image") + + def test_metadata_int(self): + dataset = self.h5_image["/scan_0/instrument/detector_0/others/integer"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertEqual(dataset[()], -100) + self.assertEqual(dataset.dtype.kind, "i") + self.assertEqual(dataset.shape, (1,)) + + def test_metadata_float(self): + dataset = self.h5_image["/scan_0/instrument/detector_0/others/float"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertEqual(dataset[()], 1.0) + self.assertEqual(dataset.dtype.kind, "f") + self.assertEqual(dataset.shape, (1,)) + + def test_metadata_string(self): + dataset = self.h5_image["/scan_0/instrument/detector_0/others/string"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertEqual(dataset[()], numpy.string_("hi!")) + self.assertEqual(dataset.dtype.type, numpy.string_) + self.assertEqual(dataset.shape, (1,)) + + def test_metadata_list_integer(self): + dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_integer"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertEqual(dataset.dtype.kind, "u") + self.assertEqual(dataset.shape, (1, 3)) + self.assertEqual(dataset[0, 0], 100) + self.assertEqual(dataset[0, 1], 50) + + def test_metadata_list_float(self): + dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_float"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertEqual(dataset.dtype.kind, "f") + self.assertEqual(dataset.shape, (1, 3)) + self.assertEqual(dataset[0, 0], 1.0) + self.assertEqual(dataset[0, 1], 2.0) + + def test_metadata_list_looks_like_list(self): + dataset = self.h5_image[ + "/scan_0/instrument/detector_0/others/string_looks_like_list" + ] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertEqual(dataset[()], numpy.string_("2000 hi!")) + self.assertEqual(dataset.dtype.type, numpy.string_) + self.assertEqual(dataset.shape, (1,)) + + def test_float_32(self): + float_list = ["1.2", "1.3", "1.4"] + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = None + for float_item in float_list: + header = {"float_item": float_item} + if fabio_image is None: + fabio_image = fabio.edfimage.EdfImage(data=data, header=header) + else: + fabio_image.append_frame(data=data, header=header) + h5_image = fabioh5.File(fabio_image=fabio_image) + data = h5_image["/scan_0/instrument/detector_0/others/float_item"] + # There is no equality between items + self.assertEqual(len(data), len(set(data))) + # At worst a float32 + self.assertIn(data.dtype.kind, ["d", "f"]) + self.assertLessEqual(data.dtype.itemsize, 32 / 8) + + def test_float_64(self): + float_list = [ + "1469117129.082226", + "1469117136.684986", + "1469117144.312749", + "1469117151.892507", + "1469117159.474265", + "1469117167.100027", + "1469117174.815799", + "1469117182.437561", + "1469117190.094326", + "1469117197.721089", + ] + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = None + for float_item in float_list: + header = {"time_of_day": float_item} + if fabio_image is None: + fabio_image = fabio.edfimage.EdfImage(data=data, header=header) + else: + fabio_image.append_frame(data=data, header=header) + h5_image = fabioh5.File(fabio_image=fabio_image) + data = h5_image["/scan_0/instrument/detector_0/others/time_of_day"] + # There is no equality between items + self.assertEqual(len(data), len(set(data))) + # At least a float64 + self.assertIn(data.dtype.kind, ["d", "f"]) + self.assertGreaterEqual(data.dtype.itemsize, 64 / 8) + + def test_mixed_float_size__scalar(self): + # We expect to have a precision of 32 bits + float_list = ["1.2", "1.3001"] + expected_float_result = [1.2, 1.3001] + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = None + for float_item in float_list: + header = {"float_item": float_item} + if fabio_image is None: + fabio_image = fabio.edfimage.EdfImage(data=data, header=header) + else: + fabio_image.append_frame(data=data, header=header) + h5_image = fabioh5.File(fabio_image=fabio_image) + data = h5_image["/scan_0/instrument/detector_0/others/float_item"] + # At worst a float32 + self.assertIn(data.dtype.kind, ["d", "f"]) + self.assertLessEqual(data.dtype.itemsize, 32 / 8) + for computed, expected in zip(data, expected_float_result): + numpy.testing.assert_almost_equal(computed, expected, 5) + + def test_mixed_float_size__list(self): + # We expect to have a precision of 32 bits + float_list = ["1.2 1.3001"] + expected_float_result = numpy.array([[1.2, 1.3001]]) + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = None + for float_item in float_list: + header = {"float_item": float_item} + if fabio_image is None: + fabio_image = fabio.edfimage.EdfImage(data=data, header=header) + else: + fabio_image.append_frame(data=data, header=header) + h5_image = fabioh5.File(fabio_image=fabio_image) + data = h5_image["/scan_0/instrument/detector_0/others/float_item"] + # At worst a float32 + self.assertIn(data.dtype.kind, ["d", "f"]) + self.assertLessEqual(data.dtype.itemsize, 32 / 8) + for computed, expected in zip(data, expected_float_result): + numpy.testing.assert_almost_equal(computed, expected, 5) + + def test_mixed_float_size__list_of_list(self): + # We expect to have a precision of 32 bits + float_list = ["1.2 1.3001", "1.3001 1.3001"] + expected_float_result = numpy.array([[1.2, 1.3001], [1.3001, 1.3001]]) + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = None + for float_item in float_list: + header = {"float_item": float_item} + if fabio_image is None: + fabio_image = fabio.edfimage.EdfImage(data=data, header=header) + else: + fabio_image.append_frame(data=data, header=header) + h5_image = fabioh5.File(fabio_image=fabio_image) + data = h5_image["/scan_0/instrument/detector_0/others/float_item"] + # At worst a float32 + self.assertIn(data.dtype.kind, ["d", "f"]) + self.assertLessEqual(data.dtype.itemsize, 32 / 8) + for computed, expected in zip(data, expected_float_result): + numpy.testing.assert_almost_equal(computed, expected, 5) + + def test_ub_matrix(self): + """Data from mediapix.edf""" + header = {} + header["UB_mne"] = "UB0 UB1 UB2 UB3 UB4 UB5 UB6 UB7 UB8" + header[ + "UB_pos" + ] = "1.99593e-16 2.73682e-16 -1.54 -1.08894 1.08894 1.6083e-16 1.08894 1.08894 9.28619e-17" + header["sample_mne"] = "U0 U1 U2 U3 U4 U5" + header["sample_pos"] = "4.08 4.08 4.08 90 90 90" + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = fabio.edfimage.EdfImage(data=data, header=header) + h5_image = fabioh5.File(fabio_image=fabio_image) + sample = h5_image["/scan_0/sample"] + self.assertIsNotNone(sample) + self.assertEqual(sample.attrs["NXclass"], "NXsample") + + d = sample["unit_cell_abc"] + expected = numpy.array([4.08, 4.08, 4.08]) + self.assertIsNotNone(d) + self.assertEqual(d.shape, (3,)) + self.assertIn(d.dtype.kind, ["d", "f"]) + numpy.testing.assert_array_almost_equal(d[...], expected) + + d = sample["unit_cell_alphabetagamma"] + expected = numpy.array([90.0, 90.0, 90.0]) + self.assertIsNotNone(d) + self.assertEqual(d.shape, (3,)) + self.assertIn(d.dtype.kind, ["d", "f"]) + numpy.testing.assert_array_almost_equal(d[...], expected) + + d = sample["ub_matrix"] + expected = numpy.array( + [ + [ + [1.99593e-16, 2.73682e-16, -1.54], + [-1.08894, 1.08894, 1.6083e-16], + [1.08894, 1.08894, 9.28619e-17], + ] + ] + ) + self.assertIsNotNone(d) + self.assertEqual(d.shape, (1, 3, 3)) + self.assertIn(d.dtype.kind, ["d", "f"]) + numpy.testing.assert_array_almost_equal(d[...], expected) + + def test_interpretation_mca_edf(self): + """EDF files with two or more headers starting with "MCA" + must have @interpretation = "spectrum" an the data.""" + header = { + "Title": "zapimage samy -4.975 -5.095 80 500 samz -4.091 -4.171 70 0", + "MCA a": -23.812, + "MCA b": 2.7107, + "MCA c": 8.1164e-06, + } + + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = fabio.edfimage.EdfImage(data=data, header=header) + h5_image = fabioh5.File(fabio_image=fabio_image) + + data_dataset = h5_image["/scan_0/measurement/image_0/data"] + self.assertEqual(data_dataset.attrs["interpretation"], "spectrum") + + data_dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEqual(data_dataset.attrs["interpretation"], "spectrum") + + data_dataset = h5_image["/scan_0/measurement/image_0/info/data"] + self.assertEqual(data_dataset.attrs["interpretation"], "spectrum") + + def test_get_api(self): + result = self.h5_image.get("scan_0", getclass=True, getlink=True) + self.assertIs(result, h5py.HardLink) + result = self.h5_image.get("scan_0", getclass=False, getlink=True) + self.assertIsInstance(result, h5py.HardLink) + result = self.h5_image.get("scan_0", getclass=True, getlink=False) + self.assertIs(result, h5py.Group) + result = self.h5_image.get("scan_0", getclass=False, getlink=False) + self.assertIsInstance(result, commonh5.Group) + + def test_detector_link(self): + detector1 = self.h5_image["/scan_0/instrument/detector_0"] + detector2 = self.h5_image["/scan_0/measurement/image_0/info"] + self.assertIsNot(detector1, detector2) + self.assertEqual(list(detector1.items()), list(detector2.items())) + self.assertEqual( + self.h5_image.get(detector2.name, getlink=True).path, detector1.name + ) + + def test_detector_data_link(self): + data1 = self.h5_image["/scan_0/instrument/detector_0/data"] + data2 = self.h5_image["/scan_0/measurement/image_0/data"] + self.assertIsNot(data1, data2) + self.assertIs(data1._get_data(), data2._get_data()) + self.assertEqual(self.h5_image.get(data2.name, getlink=True).path, data1.name) + + def test_dirty_header(self): + """Test that it does not fail""" + try: + header = {} + header["foo"] = b"abc" + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = fabio.edfimage.edfimage(data=data, header=header) + header = {} + header["foo"] = b"a\x90bc\xFE" + fabio_image.append_frame(data=data, header=header) + except Exception as e: + _logger.error(e.args[0]) + _logger.debug("Backtrace", exc_info=True) + self.skipTest("fabio do not allow to create the resource") + + h5_image = fabioh5.File(fabio_image=fabio_image) + scan_header_path = "/scan_0/instrument/file/scan_header" + self.assertIn(scan_header_path, h5_image) + data = h5_image[scan_header_path] + self.assertIsInstance(data[...], numpy.ndarray) + + def test_unicode_header(self): + """Test that it does not fail""" + try: + header = {} + header["foo"] = b"abc" + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = fabio.edfimage.edfimage(data=data, header=header) + header = {} + header["foo"] = "abc\u2764" + fabio_image.append_frame(data=data, header=header) + except Exception as e: + _logger.error(e.args[0]) + _logger.debug("Backtrace", exc_info=True) + self.skipTest("fabio do not allow to create the resource") + + h5_image = fabioh5.File(fabio_image=fabio_image) + scan_header_path = "/scan_0/instrument/file/scan_header" + self.assertIn(scan_header_path, h5_image) + data = h5_image[scan_header_path] + self.assertIsInstance(data[...], numpy.ndarray) + + +class TestFabioH5MultiFrames(unittest.TestCase): + @classmethod + def setUpClass(cls): + names = ["A", "B", "C", "D"] + values = [["32000", "-10", "5.0", "1"], ["-32000", "-10", "5.0", "1"]] + + fabio_file = None + + for i in range(10): + header = { + "image_id": "%d" % i, + "integer": "-100", + "float": "1.0", + "string": "hi!", + "list_integer": "100 50 0", + "list_float": "1.0 2.0 3.5", + "string_looks_like_list": "2000 hi!", + "motor_mne": " ".join(names), + "motor_pos": " ".join(values[i % len(values)]), + "counter_mne": " ".join(names), + "counter_pos": " ".join(values[i % len(values)]), + } + for iname, name in enumerate(names): + header[name] = values[i % len(values)][iname] + + data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64) + if fabio_file is None: + fabio_file = fabio.edfimage.EdfImage(data=data, header=header) + else: + fabio_file.append_frame(data=data, header=header) + + cls.fabio_file = fabio_file + cls.fabioh5 = fabioh5.File(fabio_image=fabio_file) + + def test_others(self): + others = self.fabioh5["/scan_0/instrument/detector_0/others"] + dataset = others["A"] + self.assertGreaterEqual(dataset.dtype.itemsize, 1) + self.assertEqual(dataset.dtype.kind, "i") + dataset = others["B"] + self.assertGreaterEqual(dataset.dtype.itemsize, 1) + self.assertEqual(dataset.dtype.kind, "i") + dataset = others["C"] + self.assertGreaterEqual(dataset.dtype.itemsize, 1) + self.assertEqual(dataset.dtype.kind, "f") + dataset = others["D"] + self.assertGreaterEqual(dataset.dtype.itemsize, 1) + self.assertEqual(dataset.dtype.kind, "u") + + def test_positioners(self): + counters = self.fabioh5["/scan_0/instrument/positioners"] + # At least 32 bits, no unsigned values + dataset = counters["A"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + dataset = counters["B"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + dataset = counters["C"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "f") + dataset = counters["D"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + + def test_counters(self): + counters = self.fabioh5["/scan_0/measurement"] + # At least 32 bits, no unsigned values + dataset = counters["A"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + dataset = counters["B"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + dataset = counters["C"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "f") + dataset = counters["D"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + + +class TestFabioH5WithEdf(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tmp_directory = tempfile.mkdtemp() + + cls.edf_filename = os.path.join(cls.tmp_directory, "test.edf") + + header = { + "integer": "-100", + "float": "1.0", + "string": "hi!", + "list_integer": "100 50 0", + "list_float": "1.0 2.0 3.5", + "string_looks_like_list": "2000 hi!", + } + data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64) + fabio_image = fabio.edfimage.edfimage(data, header) + fabio_image.write(cls.edf_filename) + + cls.fabio_image = fabio.open(cls.edf_filename) + cls.h5_image = fabioh5.File(fabio_image=cls.fabio_image) + + @classmethod + def tearDownClass(cls): + cls.fabio_image = None + cls.h5_image = None + shutil.rmtree(cls.tmp_directory) + + def test_reserved_format_metadata(self): + if fabio.hexversion < 327920: # 0.5.0 final + self.skipTest("fabio >= 0.5.0 final is needed") + + # The EDF contains reserved keys in the header + self.assertIn("HeaderID", self.fabio_image.header) + # We do not expose them in FabioH5 + self.assertNotIn("/scan_0/instrument/detector_0/others/HeaderID", self.h5_image) + + +class _TestableFrameData(fabioh5.FrameData): + """Allow to test if the full data is reached.""" + + def _create_data(self): + raise RuntimeError("Not supposed to be called") + + +class TestFabioH5WithFileSeries(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tmp_directory = tempfile.mkdtemp() + + cls.edf_filenames = [] + + for i in range(10): + filename = os.path.join(cls.tmp_directory, "test_%04d.edf" % i) + cls.edf_filenames.append(filename) + + header = { + "image_id": "%d" % i, + "integer": "-100", + "float": "1.0", + "string": "hi!", + "list_integer": "100 50 0", + "list_float": "1.0 2.0 3.5", + "string_looks_like_list": "2000 hi!", + } + data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64) + fabio_image = fabio.edfimage.edfimage(data, header) + fabio_image.write(filename) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmp_directory) + + def _testH5Image(self, h5_image): + # test data + dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEqual(dataset.h5py_class, h5py.Dataset) + self.assertTrue(isinstance(dataset[()], numpy.ndarray)) + self.assertEqual(dataset.dtype.kind, "i") + self.assertEqual(dataset.shape, (10, 3, 2)) + self.assertEqual(list(dataset[:, 0, 0]), list(range(10))) + self.assertEqual(dataset.attrs["interpretation"], "image") + # test metatdata + dataset = h5_image["/scan_0/instrument/detector_0/others/image_id"] + self.assertEqual(list(dataset[...]), list(range(10))) + + def testFileList(self): + h5_image = fabioh5.File(file_series=self.edf_filenames) + self._testH5Image(h5_image) + + def testFileSeries(self): + file_series = fabio.file_series.file_series(self.edf_filenames) + h5_image = fabioh5.File(file_series=file_series) + self._testH5Image(h5_image) + + def testFrameDataCache(self): + file_series = fabio.file_series.file_series(self.edf_filenames) + reader = fabioh5.FabioReader(file_series=file_series) + frameData = _TestableFrameData("foo", reader) + self.assertEqual(frameData.dtype.kind, "i") + self.assertEqual(frameData.shape, (10, 3, 2)) diff --git a/src/silx/io/test/test_fioh5.py b/src/silx/io/test/test_fioh5.py new file mode 100644 index 0000000..fed22a2 --- /dev/null +++ b/src/silx/io/test/test_fioh5.py @@ -0,0 +1,296 @@ +# /*########################################################################## +# Copyright (C) 2021 Timo Fuchs +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for fioh5""" +import numpy +import os +import tempfile +import unittest + +from ..fioh5 import FioH5, is_fiofile, logger1, dtypeConverter + + +__authors__ = ["T. Fuchs"] +__license__ = "MIT" +__date__ = "15/10/2021" + +fioftext = """ +! +! Comments +! +%c +ascan omega 180.0 180.5 3:10/1 4 +user username, acquisition started at Thu Dec 12 18:00:00 2021 +sweep motor lag: 1.0e-03 +channel 3: Detector +! +! Parameter +! +%p +channel3_exposure = 1.000000e+00 +ScanName = ascan +! +! Data +! +%d + Col 1 omega(encoder) DOUBLE + Col 2 channel INTEGER + Col 3 filename STRING + Col 4 type STRING + Col 5 unix time DOUBLE + Col 6 enable BOOLEAN + Col 7 time_s FLOAT + 179.998418821 3 00001 exposure 1576165741.20308 1 1.243 + 180.048418821 3 00002 exposure 1576165742.20308 1 1.243 + 180.098418821 3 00003 exposure 1576165743.20308 1 1.243 + 180.148418821 3 00004 exposure 1576165744.20308 1 1.243 + 180.198418821 3 00005 exposure 1576165745.20308 1 1.243 + 180.248418821 3 00006 exposure 1576165746.20308 1 1.243 + 180.298418821 3 00007 exposure 1576165747.20308 1 1.243 + 180.348418821 3 00008 exposure 1576165748.20308 1 1.243 + 180.398418821 3 00009 exposure 1576165749.20308 1 1.243 + 180.448418821 3 00010 exposure 1576165750.20308 1 1.243 +""" + + +class TestFioH5(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.TemporaryDirectory() + # fd, cls.fname = tempfile.mkstemp() + cls.fname_numbered = os.path.join(cls.temp_dir.name, "eh1scan_00005.fio") + + with open(cls.fname_numbered, "w") as fiof: + fiof.write(fioftext) + + @classmethod + def tearDownClass(cls): + cls.temp_dir.cleanup() + del cls.temp_dir + + def setUp(self): + self.fioh5 = FioH5(self.fname_numbered) + + def tearDown(self): + self.fioh5.close() + + def testScanNumber(self): + # scan number is parsed from the file name. + self.assertIn("/5.1", self.fioh5) + self.assertIn("5.1", self.fioh5) + + def testContainsFile(self): + self.assertIn("/5.1/measurement", self.fioh5) + self.assertNotIn("25.2", self.fioh5) + # measurement is a child of a scan, full path would be required to + # access from root level + self.assertNotIn("measurement", self.fioh5) + # Groups may or may not have a trailing / + self.assertIn("/5.1/measurement/", self.fioh5) + self.assertIn("/5.1/measurement", self.fioh5) + # Datasets can't have a trailing / + self.assertIn("/5.1/measurement/omega(encoder)", self.fioh5) + self.assertNotIn("/5.1/measurement/omega(encoder)/", self.fioh5) + # No gamma + self.assertNotIn("/5.1/measurement/gamma", self.fioh5) + + def testContainsGroup(self): + self.assertIn("measurement", self.fioh5["/5.1/"]) + self.assertIn("measurement", self.fioh5["/5.1"]) + self.assertIn("5.1", self.fioh5["/"]) + self.assertNotIn("5.2", self.fioh5["/"]) + self.assertIn("measurement/filename", self.fioh5["/5.1"]) + # illegal trailing "/" after dataset name + self.assertNotIn("measurement/filename/", self.fioh5["/5.1"]) + # full path to element in group (OK) + self.assertIn("/5.1/measurement/filename", self.fioh5["/5.1/measurement"]) + + def testDataType(self): + meas = self.fioh5["/5.1/measurement/"] + self.assertEqual(meas["omega(encoder)"].dtype, dtypeConverter["DOUBLE"]) + self.assertEqual(meas["channel"].dtype, dtypeConverter["INTEGER"]) + self.assertEqual(meas["filename"].dtype, dtypeConverter["STRING"]) + self.assertEqual(meas["time_s"].dtype, dtypeConverter["FLOAT"]) + self.assertEqual(meas["enable"].dtype, dtypeConverter["BOOLEAN"]) + + def testDataColumn(self): + self.assertAlmostEqual( + sum(self.fioh5["/5.1/measurement/omega(encoder)"]), 1802.23418821 + ) + self.assertTrue(numpy.all(self.fioh5["/5.1/measurement/enable"])) + + # --- comment section tests --- + + def testComment(self): + # should hold the complete comment section + self.assertEqual( + self.fioh5["/5.1/instrument/fiofile/comments"], + """ascan omega 180.0 180.5 3:10/1 4 +user username, acquisition started at Thu Dec 12 18:00:00 2021 +sweep motor lag: 1.0e-03 +channel 3: Detector +""", + ) + + def testDate(self): + # there is no convention on how to format the time. So just check its existence. + self.assertEqual(self.fioh5["/5.1/start_time"], "Thu Dec 12 18:00:00 2021") + + def testTitle(self): + self.assertEqual(self.fioh5["/5.1/title"], "ascan omega 180.0 180.5 3:10/1 4") + + # --- parameter section tests --- + + def testParameter(self): + # should hold the complete parameter section + self.assertEqual( + self.fioh5["/5.1/instrument/fiofile/parameter"], + """channel3_exposure = 1.000000e+00 +ScanName = ascan +""", + ) + + def testParsedParameter(self): + # no dtype is given, so everything is str. + self.assertEqual( + self.fioh5["/5.1/instrument/parameter/channel3_exposure"], "1.000000e+00" + ) + self.assertEqual(self.fioh5["/5.1/instrument/parameter/ScanName"], "ascan") + + def testNotFioH5(self): + testfilename = os.path.join(self.temp_dir.name, "eh1scan_00010.fio") + with open(testfilename, "w") as fiof: + fiof.write("!Not a fio file!") + + self.assertRaises(IOError, FioH5, testfilename) + + self.assertTrue(is_fiofile(self.fname_numbered)) + self.assertFalse(is_fiofile(testfilename)) + + os.unlink(testfilename) + + +class TestUnnumberedFioH5(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.TemporaryDirectory() + cls.fname_nosuffix = os.path.join(cls.temp_dir.name, "eh1scan_nosuffix.fio") + + with open(cls.fname_nosuffix, "w") as fiof: + fiof.write(fioftext) + + @classmethod + def tearDownClass(cls): + cls.temp_dir.cleanup() + del cls.temp_dir + + def setUp(self): + self.fioh5 = FioH5(self.fname_nosuffix) + + def testLogMissingScanno(self): + with self.assertLogs(logger1, level="WARNING") as cm: + fioh5 = FioH5(self.fname_nosuffix) + self.assertIn("Cannot parse scan number of file", cm.output[0]) + + def testFallbackName(self): + self.assertIn("/eh1scan_nosuffix", self.fioh5) + + +brokenHeaderText = """ +! +! Comments +! +%c +ascan omega 180.0 180.5 3:10/1 4 +user username, acquisited at Thu Dec 12 100 2021 +sweep motor lavgvf.0e-03 +channel 3: Detector +! +! Parameter +! +%p +channel3_exposu65 1.000000e+00 +ScanName = ascan +! +! Data +! +%d + Col 1 omega(encoder) DOUBLE + Col 2 channel INTEGER + Col 3 filename STRING + Col 4 type STRING + Col 5 unix time DOUBLE + 179.998418821 3 00001 exposure 1576165741.20308 + 180.048418821 3 00002 exposure 1576165742.20308 + 180.098418821 3 00003 exposure 1576165743.20308 + 180.148418821 3 00004 exposure 1576165744.20308 + 180.198418821 3 00005 exposure 1576165745.20308 + 180.248418821 3 00006 exposure 1576165746.20308 + 180.298418821 3 00007 exposure 1576165747.20308 + 180.348418821 3 00008 exposure 1576165748.20308 + 180.398418821 3 00009 exposure 1576165749.20308 + 180.448418821 3 00010 exposure 1576165750.20308 +""" + + +class TestBrokenHeaderFioH5(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.TemporaryDirectory() + cls.fname_numbered = os.path.join(cls.temp_dir.name, "eh1scan_00005.fio") + + with open(cls.fname_numbered, "w") as fiof: + fiof.write(brokenHeaderText) + + @classmethod + def tearDownClass(cls): + cls.temp_dir.cleanup() + del cls.temp_dir + + def setUp(self): + self.fioh5 = FioH5(self.fname_numbered) + + def testLogBrokenHeader(self): + with self.assertLogs(logger1, level="WARNING") as cm: + fioh5 = FioH5(self.fname_numbered) + self.assertIn("Cannot parse parameter section", cm.output[0]) + self.assertIn("Cannot parse default comment section", cm.output[1]) + + def testComment(self): + # should hold the complete comment section + self.assertEqual( + self.fioh5["/5.1/instrument/fiofile/comments"], + """ascan omega 180.0 180.5 3:10/1 4 +user username, acquisited at Thu Dec 12 100 2021 +sweep motor lavgvf.0e-03 +channel 3: Detector +""", + ) + + def testParameter(self): + # should hold the complete parameter section + self.assertEqual( + self.fioh5["/5.1/instrument/fiofile/parameter"], + """channel3_exposu65 1.000000e+00 +ScanName = ascan +""", + ) diff --git a/src/silx/io/test/test_h5link_utils.py b/src/silx/io/test/test_h5link_utils.py new file mode 100644 index 0000000..4140003 --- /dev/null +++ b/src/silx/io/test/test_h5link_utils.py @@ -0,0 +1,116 @@ +import os +import pytest +import h5py +import numpy +from silx.io import open +from silx.io import h5link_utils + + +@pytest.fixture(scope="module") +def hdf5_with_external_data(tmpdir_factory): + tmpdir = tmpdir_factory.mktemp("hdf5_with_external_data") + master = str(tmpdir / "master.h5") + external_h5 = str(tmpdir / "external.h5") + external_raw = str(tmpdir / "external.raw") + + data = numpy.array([100, 1000, 10000], numpy.uint16) + tshape = (1,) + data.shape + + with h5py.File(master, "w") as fmaster: + dset = fmaster.create_dataset("data", data=data) + + fmaster["int"] = h5py.SoftLink("data") + + layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype) + layout[0] = h5py.VirtualSource(".", "data", shape=data.shape) + fmaster.create_virtual_dataset("vds0", layout) + + with h5py.File(external_h5, "w") as f: + dset = f.create_dataset("data", data=data) + layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype) + layout[0] = h5py.VirtualSource(dset) + fmaster.create_virtual_dataset("vds1", layout) + + layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype) + layout[0] = h5py.VirtualSource( + external_h5, + "data", + shape=data.shape, + ) + fmaster.create_virtual_dataset("vds2", layout) + fmaster["ext1"] = h5py.ExternalLink(external_h5, "data") + + layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype) + layout[0] = h5py.VirtualSource( + "external.h5", + "data", + shape=data.shape, + ) + fmaster.create_virtual_dataset("vds3", layout) + fmaster["ext2"] = h5py.ExternalLink("external.h5", "data") + + layout = h5py.VirtualLayout(shape=tshape, dtype=data.dtype) + layout[0] = h5py.VirtualSource( + "./external.h5", + "data", + shape=data.shape, + ) + fmaster.create_virtual_dataset("vds4", layout) + fmaster["ext3"] = h5py.ExternalLink("./external.h5", "data") + + data.tofile(external_raw) + + external = [(external_raw, 0, 16 * 3)] + fmaster.create_dataset( + "raw1", external=external, shape=tshape, dtype=data.dtype + ) + + external = [("external.raw", 0, 16 * 3)] + fmaster.create_dataset( + "raw2", external=external, shape=tshape, dtype=data.dtype + ) + + external = [("./external.raw", 0, 16 * 3)] + fmaster.create_dataset( + "raw3", external=external, shape=tshape, dtype=data.dtype + ) + + # Validate links + expected = data.tolist() + cwd = os.getcwd() + with h5py.File(master, "r") as master: + for name in master: + if name in ("raw2", "raw3"): + os.chdir(str(tmpdir)) + try: + data = master[name][()].flatten().tolist() + except Exception: + assert False, name + finally: + if name in ("raw2", "raw3"): + os.chdir(cwd) + assert data == expected, name + + return tmpdir + + +@pytest.mark.skipif("VirtualLayout" not in dir(h5py), reason="h5py is too old") +def test_external_dataset_info(hdf5_with_external_data): + tmpdir = hdf5_with_external_data + master = str(tmpdir / "master.h5") + external_h5 = str(tmpdir / "external.h5") + external_raw = str(tmpdir / "external.raw") + with open(master) as f: + for name in f: + hdf5obj = f[name] + info = h5link_utils.external_dataset_info(hdf5obj) + if name in ("data", "int", "ext1", "ext2", "ext3"): + assert info is None, name + elif name == "vds0": + assert info.first_source_url == f"{master}::/data" + elif name in ("vds1", "vds2", "vds3", "vds4"): + assert info.first_source_url == f"{external_h5}::/data" + elif name in ("raw1", "raw2", "raw3"): + assert info.first_source_url == external_raw + else: + assert False, name diff --git a/src/silx/io/test/test_h5py_utils.py b/src/silx/io/test/test_h5py_utils.py new file mode 100644 index 0000000..0d10a78 --- /dev/null +++ b/src/silx/io/test/test_h5py_utils.py @@ -0,0 +1,482 @@ +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for h5py utilities""" + +__authors__ = ["W. de Nolf"] +__license__ = "MIT" +__date__ = "27/01/2020" + + +import unittest +import os +import sys +import time +import shutil +import logging +import tempfile +import multiprocessing +from contextlib import contextmanager + +from .. import h5py_utils +from ...utils.retry import RetryError, RetryTimeoutError + +IS_WINDOWS = sys.platform == "win32" +logger = logging.getLogger() + + +def _subprocess_context_main(queue, contextmgr, *args, **kw): + try: + with contextmgr(*args, **kw): + queue.put(None) + queue.get() + except Exception: + queue.put(None) + raise + + +@contextmanager +def _subprocess_context(contextmgr, *args, **kw): + print("\nSTART", os.getpid()) + timeout = kw.pop("timeout", 10) + queue = multiprocessing.Queue(maxsize=1) + p = multiprocessing.Process( + target=_subprocess_context_main, args=(queue, contextmgr) + args, kwargs=kw + ) + p.start() + try: + queue.get(timeout=timeout) + yield + finally: + queue.put(None) + p.join(timeout) + print(" EXIT", os.getpid()) + + +@contextmanager +def _open_context(filename, **kw): + try: + print(os.getpid(), "OPEN", filename, kw) + with h5py_utils.File(filename, **kw) as f: + if kw.get("mode") == "w": + f["check"] = True + f.flush() + yield f + except Exception: + print(" ", os.getpid(), "FAILED", filename, kw) + raise + else: + print(" ", os.getpid(), "CLOSED", filename, kw) + + +def _cause_segfault(): + import ctypes + + i = ctypes.c_char(b"a") + j = ctypes.pointer(i) + c = 0 + while True: + j[c] = b"a" + c += 1 + + +def _top_level_names_test(txtfilename, *args, **kw): + sys.stderr = open(os.devnull, "w") + + with open(txtfilename, mode="r") as f: + failcounter = int(f.readline().strip()) + + ncausefailure = kw.pop("ncausefailure") + faildelay = kw.pop("faildelay") + if failcounter < ncausefailure: + time.sleep(faildelay) + failcounter += 1 + with open(txtfilename, mode="w") as f: + f.write(str(failcounter)) + if failcounter % 2: + raise RetryError + else: + _cause_segfault() + return h5py_utils._top_level_names(*args, **kw) + + +top_level_names_test = h5py_utils.retry_in_subprocess()(_top_level_names_test) + + +def subtests(test): + def wrapper(self): + for subtest_options in self._subtests(): + print("\n====SUB TEST===\n") + print(f"sub test options: {subtest_options}") + with self.subTest(str(subtest_options)): + test(self) + + return wrapper + + +class TestH5pyUtils(unittest.TestCase): + def setUp(self): + self.test_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.test_dir) + + def _subtests(self): + self._subtest_options = {"mode": "w"} + self.filename_generator = self._filenames() + yield self._subtest_options + self._subtest_options = {"mode": "w", "libver": "latest"} + self.filename_generator = self._filenames() + yield + + def _filenames(self): + i = 1 + while True: + filename = os.path.join(self.test_dir, "file{}.h5".format(i)) + with self._open_context(filename): + pass + yield filename + i += 1 + + def _new_filename(self): + return next(self.filename_generator) + + @contextmanager + def _open_context(self, filename, **kwargs): + kw = dict(self._subtest_options) + kw.update(kwargs) + with _open_context(filename, **kw) as f: + yield f + + @contextmanager + def _open_context_subprocess(self, filename, **kwargs): + kw = dict(self._subtest_options) + kw.update(kwargs) + with _subprocess_context(_open_context, filename, **kw): + yield + + def _assert_hdf5_data(self, f): + self.assertTrue(f["check"][()]) + + def _validate_hdf5_data(self, filename, swmr=False): + with self._open_context(filename, mode="r") as f: + self.assertEqual(f.swmr_mode, swmr) + self._assert_hdf5_data(f) + + @subtests + def test_modes_single_process(self): + """Test concurrent access to the different files from the same process""" + # When using HDF5_USE_FILE_LOCKING, open files with and without + # locking should raise an exception. HDF5_USE_FILE_LOCKING should + # be reset when all files are closed. + + orig = os.environ.get("HDF5_USE_FILE_LOCKING") + filename1 = self._new_filename() + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + filename2 = self._new_filename() + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + + with self._open_context(filename1, mode="r"): + locking1 = False + for mode in ["r", "w", "a"]: + locking2 = mode != "r" + raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT + raise_condition &= locking1 != locking2 + with self.assertRaisesIf(raise_condition, RuntimeError): + with self._open_context(filename2, mode=mode): + pass + self._validate_hdf5_data(filename1) + self._validate_hdf5_data(filename2) + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + + with self._open_context(filename1, mode="a"): + locking1 = True + for mode in ["r", "w", "a"]: + locking2 = mode != "r" + raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT + raise_condition &= locking1 != locking2 + with self.assertRaisesIf(raise_condition, RuntimeError): + with self._open_context(filename2, mode=mode): + pass + self._validate_hdf5_data(filename1) + self._validate_hdf5_data(filename2) + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + + @property + def _libver_low_bound_is_v108(self): + libver = self._subtest_options.get("libver") + return h5py_utils._libver_low_bound_is_v108(libver) + + @property + def _nonlocking_reader_before_writer(self): + """A non-locking reader must open the file before it is locked by a writer""" + if IS_WINDOWS and h5py_utils.HDF5_HAS_LOCKING_ARGUMENT: + return True + if not self._libver_low_bound_is_v108: + return True + return False + + @contextmanager + def assertRaisesIf(self, condition, *args, **kw): + if condition: + with self.assertRaises(*args, **kw): + yield + else: + yield + + @unittest.skipIf( + h5py_utils.HDF5_HAS_LOCKING_ARGUMENT != h5py_utils.H5PY_HAS_LOCKING_ARGUMENT, + "Versions of libhdf5 and h5py use incompatible file locking behaviour", + ) + @subtests + def test_modes_multi_process(self): + """Test concurrent access to the same file from different processes""" + filename = self._new_filename() + + nonlocking_reader_before_writer = self._nonlocking_reader_before_writer + writer_before_nonlocking_reader_exception = OSError + old_hdf5_on_windows = IS_WINDOWS and not h5py_utils.HDF5_HAS_LOCKING_ARGUMENT + locked_exception = OSError + + # File locked by a writer + unexpected_access = old_hdf5_on_windows and self._libver_low_bound_is_v108 + for wmode in ["w", "a"]: + with self._open_context_subprocess(filename, mode=wmode): + # Access by a second non-locking reader + with self.assertRaisesIf( + nonlocking_reader_before_writer, + writer_before_nonlocking_reader_exception, + ): + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + # No access by a second locking reader + if unexpected_access: + logger.warning("unexpected concurrent access by a locking reader") + with self.assertRaisesIf(not unexpected_access, locked_exception): + with self._open_context(filename, mode="r", locking=True) as f: + self._assert_hdf5_data(f) + # No access by a second writer + if unexpected_access: + logger.warning("unexpected concurrent access by a writer") + with self.assertRaisesIf(not unexpected_access, locked_exception): + with self._open_context(filename, mode="a") as f: + self._assert_hdf5_data(f) + # Check for file corruption + if not nonlocking_reader_before_writer: + self._validate_hdf5_data(filename) + self._validate_hdf5_data(filename) + + # File locked by a reader + unexpected_access = old_hdf5_on_windows + with _subprocess_context(_open_context, filename, mode="r", locking=True): + # Access by a non-locking reader + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + # Access by a locking reader + with self._open_context(filename, mode="r", locking=True) as f: + self._assert_hdf5_data(f) + # No access by a second writer + if unexpected_access: + logger.warning("unexpected concurrent access by a writer") + raise_condition = not unexpected_access + with self.assertRaisesIf(raise_condition, locked_exception): + with self._open_context(filename, mode="a") as f: + self._assert_hdf5_data(f) + # Check for file corruption + self._validate_hdf5_data(filename) + self._validate_hdf5_data(filename) + + # File open by a non-locking reader + with self._open_context_subprocess(filename, mode="r"): + # Access by a second non-locking reader + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + # Access by a second locking reader + with self._open_context(filename, mode="r", locking=True) as f: + self._assert_hdf5_data(f) + # Access by a second writer + with self._open_context(filename, mode="a") as f: + self._assert_hdf5_data(f) + # Check for file corruption + self._validate_hdf5_data(filename) + self._validate_hdf5_data(filename) + + @subtests + @unittest.skipIf(not h5py_utils.HAS_SWMR, "SWMR not supported") + def test_modes_multi_process_swmr(self): + filename = self._new_filename() + + with self._open_context(filename, mode="w", libver="latest") as f: + pass + + # File open by SWMR writer + with self._open_context_subprocess(filename, mode="a", swmr=True): + with self._open_context(filename, mode="r") as f: + assert f.swmr_mode + self._assert_hdf5_data(f) + with self.assertRaises(OSError): + with self._open_context(filename, mode="a") as f: + pass + self._validate_hdf5_data(filename, swmr=True) + + @subtests + def test_retry_defaults(self): + filename = self._new_filename() + + names = h5py_utils.top_level_names(filename) + self.assertEqual(names, []) + + names = h5py_utils.safe_top_level_names(filename) + self.assertEqual(names, []) + + names = h5py_utils.top_level_names(filename, include_only=None) + self.assertEqual(names, ["check"]) + + names = h5py_utils.safe_top_level_names(filename, include_only=None) + self.assertEqual(names, ["check"]) + + with h5py_utils.open_item(filename, "/check", validate=lambda x: False) as item: + self.assertEqual(item, None) + + with h5py_utils.open_item(filename, "/check", validate=None) as item: + self.assertTrue(item[()]) + + with self.assertRaises(RetryTimeoutError): + with h5py_utils.open_item( + filename, + "/check", + retry_timeout=0.1, + retry_invalid=True, + validate=lambda x: False, + ) as item: + pass + + ncall = 0 + + def validate(item): + nonlocal ncall + if ncall >= 1: + return True + else: + ncall += 1 + raise RetryError + + with h5py_utils.open_item( + filename, + "/check", + validate=validate, + retry_timeout=1, + retry_invalid=True, + ) as item: + self.assertTrue(item[()]) + + @subtests + def test_retry_custom(self): + filename = self._new_filename() + ncausefailure = 3 + faildelay = 0.1 + sufficient_timeout = ncausefailure * (faildelay + 10) + insufficient_timeout = ncausefailure * faildelay * 0.5 + + @h5py_utils.retry_contextmanager() + def open_item(filename, name): + nonlocal failcounter + if failcounter < ncausefailure: + time.sleep(faildelay) + failcounter += 1 + raise RetryError + with h5py_utils.File(filename) as h5file: + yield h5file[name] + + failcounter = 0 + kw = {"retry_timeout": sufficient_timeout} + with open_item(filename, "/check", **kw) as item: + self.assertTrue(item[()]) + + failcounter = 0 + kw = {"retry_timeout": insufficient_timeout} + with self.assertRaises(RetryTimeoutError): + with open_item(filename, "/check", **kw) as item: + pass + + @subtests + def test_retry_in_subprocess(self): + filename = self._new_filename() + txtfilename = os.path.join(self.test_dir, "failcounter.txt") + ncausefailure = 3 + faildelay = 0.1 + sufficient_timeout = ncausefailure * (faildelay + 10) + insufficient_timeout = ncausefailure * faildelay * 0.5 + + kw = { + "retry_timeout": sufficient_timeout, + "include_only": None, + "ncausefailure": ncausefailure, + "faildelay": faildelay, + } + with open(txtfilename, mode="w") as f: + f.write("0") + names = top_level_names_test(txtfilename, filename, **kw) + self.assertEqual(names, ["check"]) + + kw = { + "retry_timeout": insufficient_timeout, + "include_only": None, + "ncausefailure": ncausefailure, + "faildelay": faildelay, + } + with open(txtfilename, mode="w") as f: + f.write("0") + with self.assertRaises(RetryTimeoutError): + top_level_names_test(txtfilename, filename, **kw) + + @subtests + def test_retry_generator(self): + filename = self._new_filename() + ncausefailure = 3 + faildelay = 0.1 + sufficient_timeout = ncausefailure * (faildelay + 10) + insufficient_timeout = ncausefailure * faildelay * 0.5 + + @h5py_utils.retry() + def iter_data(filename, name, start_index=0): + nonlocal failcounter + if start_index <= 0: + with h5py_utils.File(filename) as h5file: + yield h5file[name][()] + if failcounter < ncausefailure: + time.sleep(faildelay) + failcounter += 1 + raise RetryError + if start_index <= 1: + with h5py_utils.File(filename) as h5file: + yield not h5file[name][()] + + failcounter = 0 + kw = {"retry_timeout": sufficient_timeout} + data = list(iter_data(filename, "/check", **kw)) + self.assertEqual(data, [True, False]) + + failcounter = 0 + kw = {"retry_timeout": insufficient_timeout} + with self.assertRaises(RetryTimeoutError): + list(iter_data(filename, "/check", **kw)) diff --git a/src/silx/io/test/test_nxdata.py b/src/silx/io/test/test_nxdata.py new file mode 100644 index 0000000..1c64a71 --- /dev/null +++ b/src/silx/io/test/test_nxdata.py @@ -0,0 +1,727 @@ +# /*########################################################################## +# Copyright (C) 2016-2022 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for NXdata parsing""" + +__authors__ = ["P. Knobel"] +__license__ = "MIT" +__date__ = "24/03/2020" + + +import tempfile +import unittest + +import h5py +import numpy +import pytest + +from .. import nxdata +from ..dictdump import dicttoh5 + + +text_dtype = h5py.special_dtype(vlen=str) + + +class TestNXdata(unittest.TestCase): + def setUp(self): + tmp = tempfile.NamedTemporaryFile( + prefix="nxdata_examples_", suffix=".h5", delete=True + ) + tmp.file.close() + self.h5fname = tmp.name + self.h5f = h5py.File(tmp.name, "w") + + # SCALARS + g0d = self.h5f.create_group("scalars") + + g0d0 = g0d.create_group("0D_scalar") + g0d0.attrs["NX_class"] = "NXdata" + g0d0.attrs["signal"] = "scalar" + g0d0.create_dataset("scalar", data=10) + g0d0.create_dataset("scalar_errors", data=0.1) + + g0d1 = g0d.create_group("2D_scalars") + g0d1.attrs["NX_class"] = "NXdata" + g0d1.attrs["signal"] = "scalars" + ds = g0d1.create_dataset("scalars", data=numpy.arange(3 * 10).reshape((3, 10))) + ds.attrs["interpretation"] = "scalar" + + g0d1 = g0d.create_group("4D_scalars") + g0d1.attrs["NX_class"] = "NXdata" + g0d1.attrs["signal"] = "scalars" + ds = g0d1.create_dataset( + "scalars", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10)) + ) + ds.attrs["interpretation"] = "scalar" + + # SPECTRA + g1d = self.h5f.create_group("spectra") + + g1d0 = g1d.create_group("1D_spectrum") + g1d0.attrs["NX_class"] = "NXdata" + g1d0.attrs["signal"] = "count" + g1d0.attrs["auxiliary_signals"] = numpy.array( + ["count2", "count3"], dtype=text_dtype + ) + g1d0.attrs["axes"] = "energy_calib" + g1d0.attrs["uncertainties"] = numpy.array( + [ + "energy_errors", + ], + dtype=text_dtype, + ) + g1d0.create_dataset("count", data=numpy.arange(10)) + g1d0.create_dataset("count2", data=0.5 * numpy.arange(10)) + d = g1d0.create_dataset("count3", data=0.4 * numpy.arange(10)) + d.attrs["long_name"] = "3rd counter" + g1d0.create_dataset("title", data="Title as dataset (like nexpy)") + g1d0.create_dataset("energy_calib", data=(10, 5)) # 10 * idx + 5 + g1d0.create_dataset("energy_errors", data=3.14 * numpy.random.rand(10)) + + g1d1 = g1d.create_group("2D_spectra") + g1d1.attrs["NX_class"] = "NXdata" + g1d1.attrs["signal"] = "counts" + ds = g1d1.create_dataset("counts", data=numpy.arange(3 * 10).reshape((3, 10))) + ds.attrs["interpretation"] = "spectrum" + + g1d2 = g1d.create_group("4D_spectra") + g1d2.attrs["NX_class"] = "NXdata" + g1d2.attrs["signal"] = "counts" + g1d2.attrs["axes"] = numpy.array( + [ + "energy", + ], + dtype=text_dtype, + ) + ds = g1d2.create_dataset( + "counts", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10)) + ) + ds.attrs["interpretation"] = "spectrum" + ds = g1d2.create_dataset("errors", data=4.5 * numpy.random.rand(2, 2, 3, 10)) + ds = g1d2.create_dataset( + "energy", data=5 + 10 * numpy.arange(15), shuffle=True, compression="gzip" + ) + ds.attrs["long_name"] = "Calibrated energy" + ds.attrs["first_good"] = 3 + ds.attrs["last_good"] = 12 + g1d2.create_dataset("energy_errors", data=10 * numpy.random.rand(15)) + + # IMAGES + g2d = self.h5f.create_group("images") + + g2d0 = g2d.create_group("2D_regular_image") + g2d0.attrs["NX_class"] = "NXdata" + g2d0.attrs["signal"] = "image" + g2d0.attrs["auxiliary_signals"] = "image2" + g2d0.attrs["axes"] = numpy.array( + ["rows_calib", "columns_coordinates"], dtype=text_dtype + ) + g2d0.create_dataset("image", data=numpy.arange(4 * 6).reshape((4, 6))) + g2d0.create_dataset("image2", data=numpy.arange(4 * 6).reshape((4, 6))) + ds = g2d0.create_dataset("rows_calib", data=(10, 5)) + ds.attrs["long_name"] = "Calibrated Y" + g2d0.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6)) + + g2d1 = g2d.create_group("2D_irregular_data") + g2d1.attrs["NX_class"] = "NXdata" + g2d1.attrs["signal"] = "data" + g2d1.attrs["title"] = "Title as group attr" + g2d1.attrs["axes"] = numpy.array( + ["rows_coordinates", "columns_coordinates"], dtype=text_dtype + ) + g2d1.create_dataset("data", data=numpy.arange(64 * 128).reshape((64, 128))) + g2d1.create_dataset( + "rows_coordinates", data=numpy.arange(64) + numpy.random.rand(64) + ) + g2d1.create_dataset( + "columns_coordinates", data=numpy.arange(128) + 2.5 * numpy.random.rand(128) + ) + + g2d2 = g2d.create_group("3D_images") + g2d2.attrs["NX_class"] = "NXdata" + g2d2.attrs["signal"] = "images" + ds = g2d2.create_dataset( + "images", data=numpy.arange(2 * 4 * 6).reshape((2, 4, 6)) + ) + ds.attrs["interpretation"] = "image" + + g2d3 = g2d.create_group("5D_images") + g2d3.attrs["NX_class"] = "NXdata" + g2d3.attrs["signal"] = "images" + g2d3.attrs["axes"] = numpy.array( + ["rows_coordinates", "columns_coordinates"], dtype=text_dtype + ) + ds = g2d3.create_dataset( + "images", data=numpy.arange(2 * 2 * 2 * 4 * 6).reshape((2, 2, 2, 4, 6)) + ) + ds.attrs["interpretation"] = "image" + g2d3.create_dataset("rows_coordinates", data=5 + 10 * numpy.arange(4)) + g2d3.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6)) + + g2d4 = g2d.create_group("RGBA_image") + g2d4.attrs["NX_class"] = "NXdata" + g2d4.attrs["signal"] = "image" + g2d4.attrs["axes"] = numpy.array( + ["rows_calib", "columns_coordinates"], dtype=text_dtype + ) + rgba_image = numpy.linspace(0, 1, num=7 * 8 * 3).reshape((7, 8, 3)) + rgba_image[:, :, 1] = ( + 1 - rgba_image[:, :, 1] + ) # invert G channel to add some color + ds = g2d4.create_dataset("image", data=rgba_image) + ds.attrs["interpretation"] = "rgba-image" + ds = g2d4.create_dataset("rows_calib", data=(10, 5)) + ds.attrs["long_name"] = "Calibrated Y" + g2d4.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(8)) + + # SCATTER + g = self.h5f.create_group("scatters") + + gd0 = g.create_group("x_y_scatter") + gd0.attrs["NX_class"] = "NXdata" + gd0.attrs["signal"] = "y" + gd0.attrs["axes"] = numpy.array( + [ + "x", + ], + dtype=text_dtype, + ) + gd0.create_dataset("y", data=numpy.random.rand(128) - 0.5) + gd0.create_dataset("x", data=2 * numpy.random.rand(128)) + gd0.create_dataset("x_errors", data=0.05 * numpy.random.rand(128)) + gd0.create_dataset("errors", data=0.05 * numpy.random.rand(128)) + + gd1 = g.create_group("x_y_value_scatter") + gd1.attrs["NX_class"] = "NXdata" + gd1.attrs["signal"] = "values" + gd1.attrs["axes"] = numpy.array(["x", "y"], dtype=text_dtype) + gd1.create_dataset("values", data=3.14 * numpy.random.rand(128)) + gd1.create_dataset("y", data=numpy.random.rand(128)) + gd1.create_dataset("y_errors", data=0.02 * numpy.random.rand(128)) + gd1.create_dataset("x", data=numpy.random.rand(128)) + gd1.create_dataset("x_errors", data=0.02 * numpy.random.rand(128)) + + def tearDown(self): + self.h5f.close() + + def testValidity(self): + for group in self.h5f: + for subgroup in self.h5f[group]: + self.assertTrue( + nxdata.is_valid_nxdata(self.h5f[group][subgroup]), + "%s/%s not found to be a valid NXdata group" % (group, subgroup), + ) + + def testScalars(self): + nxd = nxdata.NXdata(self.h5f["scalars/0D_scalar"]) + self.assertTrue(nxd.signal_is_0d) + self.assertEqual(nxd.signal[()], 10) + self.assertEqual(nxd.axes_names, []) + self.assertEqual(nxd.axes_dataset_names, []) + self.assertEqual(nxd.axes, []) + self.assertIsNotNone(nxd.errors) + self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) + self.assertIsNone(nxd.interpretation) + + nxd = nxdata.NXdata(self.h5f["scalars/2D_scalars"]) + self.assertTrue(nxd.signal_is_2d) + self.assertEqual(nxd.signal[1, 2], 12) + self.assertEqual(nxd.axes_names, [None, None]) + self.assertEqual(nxd.axes_dataset_names, [None, None]) + self.assertEqual(nxd.axes, [None, None]) + self.assertIsNone(nxd.errors) + self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) + self.assertEqual(nxd.interpretation, "scalar") + + nxd = nxdata.NXdata(self.h5f["scalars/4D_scalars"]) + self.assertFalse( + nxd.signal_is_0d or nxd.signal_is_1d or nxd.signal_is_2d or nxd.signal_is_3d + ) + self.assertEqual(nxd.signal[1, 0, 1, 4], 74) + self.assertEqual(nxd.axes_names, [None, None, None, None]) + self.assertEqual(nxd.axes_dataset_names, [None, None, None, None]) + self.assertEqual(nxd.axes, [None, None, None, None]) + self.assertIsNone(nxd.errors) + self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) + self.assertEqual(nxd.interpretation, "scalar") + + def testSpectra(self): + nxd = nxdata.NXdata(self.h5f["spectra/1D_spectrum"]) + self.assertTrue(nxd.signal_is_1d) + self.assertTrue(nxd.is_curve) + self.assertTrue(numpy.array_equal(numpy.array(nxd.signal), numpy.arange(10))) + self.assertEqual(nxd.axes_names, ["energy_calib"]) + self.assertEqual(nxd.axes_dataset_names, ["energy_calib"]) + self.assertEqual(nxd.axes[0][0], 10) + self.assertEqual(nxd.axes[0][1], 5) + self.assertIsNone(nxd.errors) + self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) + self.assertIsNone(nxd.interpretation) + self.assertEqual(nxd.title, "Title as dataset (like nexpy)") + + self.assertEqual(nxd.auxiliary_signals_dataset_names, ["count2", "count3"]) + self.assertEqual(nxd.auxiliary_signals_names, ["count2", "3rd counter"]) + self.assertAlmostEqual( + nxd.auxiliary_signals[1][2], 0.8 + ) # numpy.arange(10) * 0.4 + + nxd = nxdata.NXdata(self.h5f["spectra/2D_spectra"]) + self.assertTrue(nxd.signal_is_2d) + self.assertTrue(nxd.is_curve) + self.assertEqual(nxd.axes_names, [None, None]) + self.assertEqual(nxd.axes_dataset_names, [None, None]) + self.assertEqual(nxd.axes, [None, None]) + self.assertIsNone(nxd.errors) + self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) + self.assertEqual(nxd.interpretation, "spectrum") + + nxd = nxdata.NXdata(self.h5f["spectra/4D_spectra"]) + self.assertFalse( + nxd.signal_is_0d or nxd.signal_is_1d or nxd.signal_is_2d or nxd.signal_is_3d + ) + self.assertTrue(nxd.is_curve) + self.assertEqual(nxd.axes_names, [None, None, None, "Calibrated energy"]) + self.assertEqual(nxd.axes_dataset_names, [None, None, None, "energy"]) + self.assertEqual(nxd.axes[:3], [None, None, None]) + self.assertEqual(nxd.axes[3].shape, (10,)) # dataset shape (15, ) sliced [3:12] + self.assertIsNotNone(nxd.errors) + self.assertEqual(nxd.errors.shape, (2, 2, 3, 10)) + self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) + self.assertEqual(nxd.interpretation, "spectrum") + self.assertEqual(nxd.get_axis_errors("energy").shape, (10,)) + # test getting axis errors by long_name + self.assertTrue( + numpy.array_equal( + nxd.get_axis_errors("Calibrated energy"), nxd.get_axis_errors("energy") + ) + ) + self.assertTrue( + numpy.array_equal( + nxd.get_axis_errors(b"Calibrated energy"), nxd.get_axis_errors("energy") + ) + ) + + def testImages(self): + nxd = nxdata.NXdata(self.h5f["images/2D_regular_image"]) + self.assertTrue(nxd.signal_is_2d) + self.assertTrue(nxd.is_image) + self.assertEqual(nxd.axes_names, ["Calibrated Y", "columns_coordinates"]) + self.assertEqual( + list(nxd.axes_dataset_names), ["rows_calib", "columns_coordinates"] + ) + self.assertIsNone(nxd.errors) + self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) + self.assertIsNone(nxd.interpretation) + self.assertEqual(len(nxd.auxiliary_signals), 1) + self.assertEqual(nxd.auxiliary_signals_names, ["image2"]) + + nxd = nxdata.NXdata(self.h5f["images/2D_irregular_data"]) + self.assertTrue(nxd.signal_is_2d) + self.assertTrue(nxd.is_image) + + self.assertEqual(nxd.axes_dataset_names, nxd.axes_names) + self.assertEqual( + list(nxd.axes_dataset_names), ["rows_coordinates", "columns_coordinates"] + ) + self.assertEqual(len(nxd.axes), 2) + self.assertIsNone(nxd.errors) + self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) + self.assertIsNone(nxd.interpretation) + self.assertEqual(nxd.title, "Title as group attr") + + nxd = nxdata.NXdata(self.h5f["images/5D_images"]) + self.assertTrue(nxd.is_image) + self.assertFalse( + nxd.signal_is_0d or nxd.signal_is_1d or nxd.signal_is_2d or nxd.signal_is_3d + ) + self.assertEqual( + nxd.axes_names, + [None, None, None, "rows_coordinates", "columns_coordinates"], + ) + self.assertEqual( + nxd.axes_dataset_names, + [None, None, None, "rows_coordinates", "columns_coordinates"], + ) + self.assertIsNone(nxd.errors) + self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) + self.assertEqual(nxd.interpretation, "image") + + nxd = nxdata.NXdata(self.h5f["images/RGBA_image"]) + self.assertTrue(nxd.is_image) + self.assertEqual(nxd.interpretation, "rgba-image") + self.assertTrue(nxd.signal_is_3d) + self.assertEqual(nxd.axes_names, ["Calibrated Y", "columns_coordinates", None]) + self.assertEqual( + list(nxd.axes_dataset_names), ["rows_calib", "columns_coordinates", None] + ) + + def testScatters(self): + nxd = nxdata.NXdata(self.h5f["scatters/x_y_scatter"]) + self.assertTrue(nxd.signal_is_1d) + self.assertEqual(nxd.axes_names, ["x"]) + self.assertEqual(nxd.axes_dataset_names, ["x"]) + self.assertIsNotNone(nxd.errors) + self.assertEqual(nxd.get_axis_errors("x").shape, (128,)) + self.assertTrue(nxd.is_scatter) + self.assertFalse(nxd.is_x_y_value_scatter) + self.assertIsNone(nxd.interpretation) + + nxd = nxdata.NXdata(self.h5f["scatters/x_y_value_scatter"]) + self.assertFalse(nxd.signal_is_1d) + self.assertTrue(nxd.axes_dataset_names, nxd.axes_names) + self.assertEqual(nxd.axes_dataset_names, ["x", "y"]) + self.assertEqual(nxd.get_axis_errors("x").shape, (128,)) + self.assertEqual(nxd.get_axis_errors("y").shape, (128,)) + self.assertEqual(len(nxd.axes), 2) + self.assertIsNone(nxd.errors) + self.assertTrue(nxd.is_scatter) + self.assertTrue(nxd.is_x_y_value_scatter) + self.assertIsNone(nxd.interpretation) + + +class TestLegacyNXdata(unittest.TestCase): + def setUp(self): + tmp = tempfile.NamedTemporaryFile( + prefix="nxdata_legacy_examples_", suffix=".h5", delete=True + ) + tmp.file.close() + self.h5fname = tmp.name + self.h5f = h5py.File(tmp.name, "w") + + def tearDown(self): + self.h5f.close() + + def testSignalAttrOnDataset(self): + g = self.h5f.create_group("2D") + g.attrs["NX_class"] = "NXdata" + + ds0 = g.create_dataset("image0", data=numpy.arange(4 * 6).reshape((4, 6))) + ds0.attrs["signal"] = 1 + ds0.attrs["long_name"] = "My first image" + + ds1 = g.create_dataset("image1", data=numpy.arange(4 * 6).reshape((4, 6))) + ds1.attrs["signal"] = "2" + ds1.attrs["long_name"] = "My 2nd image" + + ds2 = g.create_dataset("image2", data=numpy.arange(4 * 6).reshape((4, 6))) + ds2.attrs["signal"] = 3 + + nxd = nxdata.NXdata(self.h5f["2D"]) + + self.assertEqual(nxd.signal_dataset_name, "image0") + self.assertEqual(nxd.signal_name, "My first image") + self.assertEqual(nxd.signal.shape, (4, 6)) + + self.assertEqual(len(nxd.auxiliary_signals), 2) + self.assertEqual(nxd.auxiliary_signals[1].shape, (4, 6)) + + self.assertEqual(nxd.auxiliary_signals_dataset_names, ["image1", "image2"]) + self.assertEqual(nxd.auxiliary_signals_names, ["My 2nd image", "image2"]) + + def testAxesOnSignalDataset(self): + g = self.h5f.create_group("2D") + g.attrs["NX_class"] = "NXdata" + + ds0 = g.create_dataset("image0", data=numpy.arange(4 * 6).reshape((4, 6))) + ds0.attrs["signal"] = 1 + ds0.attrs["axes"] = "yaxis:xaxis" + + ds1 = g.create_dataset("yaxis", data=numpy.arange(4)) + ds2 = g.create_dataset("xaxis", data=numpy.arange(6)) + + nxd = nxdata.NXdata(self.h5f["2D"]) + + self.assertEqual(nxd.axes_dataset_names, ["yaxis", "xaxis"]) + self.assertTrue(numpy.array_equal(nxd.axes[0], numpy.arange(4))) + self.assertTrue(numpy.array_equal(nxd.axes[1], numpy.arange(6))) + + def testAxesOnAxesDatasets(self): + g = self.h5f.create_group("2D") + g.attrs["NX_class"] = "NXdata" + + ds0 = g.create_dataset("image0", data=numpy.arange(4 * 6).reshape((4, 6))) + ds0.attrs["signal"] = 1 + ds1 = g.create_dataset("yaxis", data=numpy.arange(4)) + ds1.attrs["axis"] = 0 + ds2 = g.create_dataset("xaxis", data=numpy.arange(6)) + ds2.attrs["axis"] = "1" + + nxd = nxdata.NXdata(self.h5f["2D"]) + self.assertEqual(nxd.axes_dataset_names, ["yaxis", "xaxis"]) + self.assertTrue(numpy.array_equal(nxd.axes[0], numpy.arange(4))) + self.assertTrue(numpy.array_equal(nxd.axes[1], numpy.arange(6))) + + def testAsciiUndefinedAxesAttrs(self): + """Some files may not be using utf8 for str attrs""" + g = self.h5f.create_group("bytes_attrs") + g.attrs["NX_class"] = b"NXdata" + g.attrs["signal"] = b"image0" + g.attrs["axes"] = b"yaxis", b"." + + g.create_dataset("image0", data=numpy.arange(4 * 6).reshape((4, 6))) + g.create_dataset("yaxis", data=numpy.arange(4)) + + nxd = nxdata.NXdata(self.h5f["bytes_attrs"]) + self.assertEqual(nxd.axes_dataset_names, ["yaxis", None]) + + +class TestSaveNXdata(unittest.TestCase): + def setUp(self): + tmp = tempfile.NamedTemporaryFile(prefix="nxdata", suffix=".h5", delete=True) + tmp.file.close() + self.h5fname = tmp.name + + def testSimpleSave(self): + sig = numpy.array([0, 1, 2]) + a0 = numpy.array([2, 3, 4]) + a1 = numpy.array([3, 4, 5]) + nxdata.save_NXdata( + filename=self.h5fname, + signal=sig, + axes=[a0, a1], + signal_name="sig", + axes_names=["a0", "a1"], + nxentry_name="a", + nxdata_name="mydata", + ) + + h5f = h5py.File(self.h5fname, "r") + self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"])) + + nxd = nxdata.NXdata(h5f["/a/mydata"]) + self.assertTrue(numpy.array_equal(nxd.signal, sig)) + self.assertTrue(numpy.array_equal(nxd.axes[0], a0)) + + h5f.close() + + def testSimplestSave(self): + sig = numpy.array([0, 1, 2]) + nxdata.save_NXdata(filename=self.h5fname, signal=sig) + + h5f = h5py.File(self.h5fname, "r") + + self.assertTrue(nxdata.is_valid_nxdata(h5f["/entry/data0"])) + + nxd = nxdata.NXdata(h5f["/entry/data0"]) + self.assertTrue(numpy.array_equal(nxd.signal, sig)) + h5f.close() + + def testSaveDefaultAxesNames(self): + sig = numpy.array([0, 1, 2]) + a0 = numpy.array([2, 3, 4]) + a1 = numpy.array([3, 4, 5]) + nxdata.save_NXdata( + filename=self.h5fname, + signal=sig, + axes=[a0, a1], + signal_name="sig", + axes_names=None, + axes_long_names=["a", "b"], + nxentry_name="a", + nxdata_name="mydata", + ) + + h5f = h5py.File(self.h5fname, "r") + self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"])) + + nxd = nxdata.NXdata(h5f["/a/mydata"]) + self.assertTrue(numpy.array_equal(nxd.signal, sig)) + self.assertTrue(numpy.array_equal(nxd.axes[0], a0)) + self.assertEqual(nxd.axes_dataset_names, ["dim0", "dim1"]) + self.assertEqual(nxd.axes_names, ["a", "b"]) + + h5f.close() + + def testSaveToExistingEntry(self): + h5f = h5py.File(self.h5fname, "w") + g = h5f.create_group("myentry") + g.attrs["NX_class"] = "NXentry" + h5f.close() + + sig = numpy.array([0, 1, 2]) + a0 = numpy.array([2, 3, 4]) + a1 = numpy.array([3, 4, 5]) + nxdata.save_NXdata( + filename=self.h5fname, + signal=sig, + axes=[a0, a1], + signal_name="sig", + axes_names=["a0", "a1"], + nxentry_name="myentry", + nxdata_name="toto", + ) + + h5f = h5py.File(self.h5fname, "r") + self.assertTrue(nxdata.is_valid_nxdata(h5f["myentry/toto"])) + + nxd = nxdata.NXdata(h5f["myentry/toto"]) + self.assertTrue(numpy.array_equal(nxd.signal, sig)) + self.assertTrue(numpy.array_equal(nxd.axes[0], a0)) + h5f.close() + + +class TestGetDefault: + """Test silx.io.nxdata.get_default function""" + + @pytest.fixture + def hdf5_file(self, tmp_path): + with h5py.File(tmp_path / "test_file.h5", "w") as h5f: + yield h5f + + def testDirectPath(self, hdf5_file): + dicttoh5( + { + ("", "default"): "/nxentry/nxprocess/nxdata", + "nxentry": { + "nxprocess": { + "nxdata": { + ("", "NX_class"): "NXdata", + ("", "signal"): "data", + "data": (1, 2, 3), + } + } + }, + }, + hdf5_file, + ) + default = nxdata.get_default(hdf5_file) + assert isinstance(default, nxdata.NXdata) + assert default.group.name == "/nxentry/nxprocess/nxdata" + + def testAbsolutePath(self, hdf5_file): + dicttoh5( + { + ("", "default"): "/nxentry", + "nxentry": { + ("", "default"): "/nxentry/nxprocess/nxdata", + "nxprocess": { + "nxdata": { + ("", "NX_class"): "NXdata", + ("", "signal"): "data", + "data": (1, 2, 3), + } + }, + }, + }, + hdf5_file, + ) + default = nxdata.get_default(hdf5_file) + assert isinstance(default, nxdata.NXdata) + assert default.group.name == "/nxentry/nxprocess/nxdata" + + def testRelativePath(self, hdf5_file): + dicttoh5( + { + ("", "default"): "nxentry", + "nxentry": { + ("", "default"): "nxdata", + "nxdata": { + ("", "NX_class"): "NXdata", + ("", "signal"): "data", + "data": (1, 2, 3), + }, + }, + }, + hdf5_file, + ) + default = nxdata.get_default(hdf5_file) + assert isinstance(default, nxdata.NXdata) + assert default.group.name == "/nxentry/nxdata" + + def testRelativePathSubdir(self, hdf5_file): + dicttoh5( + { + ("", "default"): "nxentry", + "nxentry": { + ("", "default"): "nxprocess/nxdata", + "nxprocess": { + "nxdata": { + ("", "NX_class"): "NXdata", + ("", "signal"): "data", + "data": (1, 2, 3), + } + }, + }, + }, + hdf5_file, + ) + default = nxdata.get_default(hdf5_file) + assert isinstance(default, nxdata.NXdata) + assert default.group.name == "/nxentry/nxprocess/nxdata" + + def testRecursiveAbsolutePath(self, hdf5_file): + dicttoh5( + { + ("", "default"): "/nxentry", + "nxentry": { + ("", "default"): "/nxentry/nxprocess", + "nxprocess": { + ("", "default"): "/nxentry/nxprocess/nxdata", + "nxdata": { + ("", "NX_class"): "NXdata", + ("", "signal"): "data", + "data": (1, 2, 3), + }, + }, + }, + }, + hdf5_file, + ) + default = nxdata.get_default(hdf5_file) + assert isinstance(default, nxdata.NXdata) + assert default.group.name == "/nxentry/nxprocess/nxdata" + + def testRecursiveRelativePath(self, hdf5_file): + dicttoh5( + { + ("", "default"): "nxentry", + "nxentry": { + ("", "default"): "nxprocess", + "nxprocess": { + ("", "default"): "nxdata", + "nxdata": { + ("", "NX_class"): "NXdata", + ("", "signal"): "data", + "data": (1, 2, 3), + }, + }, + }, + }, + hdf5_file, + ) + default = nxdata.get_default(hdf5_file) + assert isinstance(default, nxdata.NXdata) + assert default.group.name == "/nxentry/nxprocess/nxdata" + + def testLoop(self, hdf5_file): + """Infinite loop of @default""" + dicttoh5( + { + ("", "default"): "/nxentry", + "nxentry": { + ("", "default"): "/nxentry", + }, + }, + hdf5_file, + ) + default = nxdata.get_default(hdf5_file) + assert default is None diff --git a/src/silx/io/test/test_octaveh5.py b/src/silx/io/test/test_octaveh5.py new file mode 100644 index 0000000..479ef85 --- /dev/null +++ b/src/silx/io/test/test_octaveh5.py @@ -0,0 +1,197 @@ +# /*########################################################################## +# Copyright (C) 2016 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +""" +Tests for the octaveh5 module +""" + +__authors__ = ["C. Nemoz", "H. Payno"] +__license__ = "MIT" +__date__ = "12/07/2016" + +import unittest +import os +import tempfile + +try: + from ..octaveh5 import Octaveh5 +except ImportError: + Octaveh5 = None + + +@unittest.skipIf(Octaveh5 is None, "Could not import h5py") +class TestOctaveH5(unittest.TestCase): + @staticmethod + def _get_struct_FT(): + return { + "NO_CHECK": 0.0, + "SHOWSLICE": 1.0, + "DOTOMO": 1.0, + "DATABASE": 0.0, + "ANGLE_OFFSET": 0.0, + "VOLSELECTION_REMEMBER": 0.0, + "NUM_PART": 4.0, + "VOLOUTFILE": 0.0, + "RINGSCORRECTION": 0.0, + "DO_TEST_SLICE": 1.0, + "ZEROOFFMASK": 1.0, + "VERSION": "fastomo3 version 2.0", + "CORRECT_SPIKES_THRESHOLD": 0.040000000000000001, + "SHOWPROJ": 0.0, + "HALF_ACQ": 0.0, + "ANGLE_OFFSET_VALUE": 0.0, + "FIXEDSLICE": "middle", + "VOLSELECT": "total", + } + + @staticmethod + def _get_struct_PYHSTEXE(): + return { + "EXE": "PyHST2_2015d", + "VERBOSE": 0.0, + "OFFV": "PyHST2_2015d", + "TOMO": 0.0, + "VERBOSE_FILE": "pyhst_out.txt", + "DIR": "/usr/bin/", + "OFFN": "pyhst2", + } + + @staticmethod + def _get_struct_FTAXIS(): + return { + "POSITION_VALUE": 12345.0, + "COR_ERROR": 0.0, + "FILESDURINGSCAN": 0.0, + "PLOTFIGURE": 1.0, + "DIM1": 0.0, + "OVERSAMPLING": 5.0, + "TO_THE_CENTER": 1.0, + "POSITION": "fixed", + "COR_POSITION": 0.0, + "HA": 0.0, + } + + @staticmethod + def _get_struct_PAGANIN(): + return { + "MKEEP_MASK": 0.0, + "UNSHARP_SIGMA": 0.80000000000000004, + "DILATE": 2.0, + "UNSHARP_COEFF": 3.0, + "MEDIANR": 4.0, + "DB": 500.0, + "MKEEP_ABS": 0.0, + "MODE": 0.0, + "THRESHOLD": 0.5, + "MKEEP_BONE": 0.0, + "DB2": 100.0, + "MKEEP_CORR": 0.0, + "MKEEP_SOFT": 0.0, + } + + @staticmethod + def _get_struct_BEAMGEO(): + return {"DIST": 55.0, "SY": 0.0, "SX": 0.0, "TYPE": "p"} + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.test_3_6_fname = os.path.join( + self.tempdir, "silx_tmp_t00_octaveTest_3_6.h5" + ) + self.test_3_8_fname = os.path.join( + self.tempdir, "silx_tmp_t00_octaveTest_3_8.h5" + ) + + def tearDown(self): + if os.path.isfile(self.test_3_6_fname): + os.unlink(self.test_3_6_fname) + if os.path.isfile(self.test_3_8_fname): + os.unlink(self.test_3_8_fname) + + def testWritedIsReaded(self): + """ + Simple test to write and reaf the structure compatible with the octave h5 using structure. + This test is for # test for octave version > 3.8 + """ + writer = Octaveh5() + + writer.open(self.test_3_8_fname, "a") + # step 1 writing the file + writer.write("FT", self._get_struct_FT()) + writer.write("PYHSTEXE", self._get_struct_PYHSTEXE()) + writer.write("FTAXIS", self._get_struct_FTAXIS()) + writer.write("PAGANIN", self._get_struct_PAGANIN()) + writer.write("BEAMGEO", self._get_struct_BEAMGEO()) + writer.close() + + # step 2 reading the file + reader = Octaveh5().open(self.test_3_8_fname) + # 2.1 check FT + data_readed = reader.get("FT") + self.assertEqual(data_readed, self._get_struct_FT()) + # 2.2 check PYHSTEXE + data_readed = reader.get("PYHSTEXE") + self.assertEqual(data_readed, self._get_struct_PYHSTEXE()) + # 2.3 check FTAXIS + data_readed = reader.get("FTAXIS") + self.assertEqual(data_readed, self._get_struct_FTAXIS()) + # 2.4 check PAGANIN + data_readed = reader.get("PAGANIN") + self.assertEqual(data_readed, self._get_struct_PAGANIN()) + # 2.5 check BEAMGEO + data_readed = reader.get("BEAMGEO") + self.assertEqual(data_readed, self._get_struct_BEAMGEO()) + reader.close() + + def testWritedIsReadedOldOctaveVersion(self): + """The same test as testWritedIsReaded but for octave version < 3.8""" + # test for octave version < 3.8 + writer = Octaveh5(3.6) + + writer.open(self.test_3_6_fname, "a") + + # step 1 writing the file + writer.write("FT", self._get_struct_FT()) + writer.write("PYHSTEXE", self._get_struct_PYHSTEXE()) + writer.write("FTAXIS", self._get_struct_FTAXIS()) + writer.write("PAGANIN", self._get_struct_PAGANIN()) + writer.write("BEAMGEO", self._get_struct_BEAMGEO()) + writer.close() + + # step 2 reading the file + reader = Octaveh5(3.6).open(self.test_3_6_fname) + # 2.1 check FT + data_readed = reader.get("FT") + self.assertEqual(data_readed, self._get_struct_FT()) + # 2.2 check PYHSTEXE + data_readed = reader.get("PYHSTEXE") + self.assertEqual(data_readed, self._get_struct_PYHSTEXE()) + # 2.3 check FTAXIS + data_readed = reader.get("FTAXIS") + self.assertEqual(data_readed, self._get_struct_FTAXIS()) + # 2.4 check PAGANIN + data_readed = reader.get("PAGANIN") + self.assertEqual(data_readed, self._get_struct_PAGANIN()) + # 2.5 check BEAMGEO + data_readed = reader.get("BEAMGEO") + self.assertEqual(data_readed, self._get_struct_BEAMGEO()) + reader.close() diff --git a/src/silx/io/test/test_rawh5.py b/src/silx/io/test/test_rawh5.py new file mode 100644 index 0000000..fb5caec --- /dev/null +++ b/src/silx/io/test/test_rawh5.py @@ -0,0 +1,83 @@ +# /*########################################################################## +# +# Copyright (c) 2016 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ###########################################################################*/ +"""Test for silx.gui.hdf5 module""" + +__authors__ = ["V. Valls"] +__license__ = "MIT" +__date__ = "21/09/2017" + + +import unittest +import tempfile +import numpy +import shutil +from .. import rawh5 + + +class TestNumpyFile(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tmpDirectory = tempfile.mkdtemp() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmpDirectory) + + def testNumpyFile(self): + filename = "%s/%s.npy" % (self.tmpDirectory, self.id()) + c = numpy.random.rand(5, 5) + numpy.save(filename, c) + h5 = rawh5.NumpyFile(filename) + self.assertIn("data", h5) + self.assertEqual(h5["data"].dtype.kind, "f") + + def testNumpyZFile(self): + filename = "%s/%s.npz" % (self.tmpDirectory, self.id()) + a = numpy.array("aaaaa") + b = numpy.array([1, 2, 3, 4]) + c = numpy.random.rand(5, 5) + d = numpy.array(b"aaaaa") + e = numpy.array("i \u2661 my mother") + numpy.savez(filename, a, b=b, c=c, d=d, e=e) + h5 = rawh5.NumpyFile(filename) + self.assertIn("arr_0", h5) + self.assertIn("b", h5) + self.assertIn("c", h5) + self.assertIn("d", h5) + self.assertIn("e", h5) + self.assertEqual(h5["arr_0"].dtype.kind, "U") + self.assertEqual(h5["b"].dtype.kind, "i") + self.assertEqual(h5["c"].dtype.kind, "f") + self.assertEqual(h5["d"].dtype.kind, "S") + self.assertEqual(h5["e"].dtype.kind, "U") + + def testNumpyZFileContainingDirectories(self): + filename = "%s/%s.npz" % (self.tmpDirectory, self.id()) + data = {} + data["a/b/c"] = numpy.arange(10) + data["a/b/e"] = numpy.arange(10) + numpy.savez(filename, **data) + h5 = rawh5.NumpyFile(filename) + self.assertIn("a/b/c", h5) + self.assertIn("a/b/e", h5) diff --git a/src/silx/io/test/test_sliceh5.py b/src/silx/io/test/test_sliceh5.py new file mode 100644 index 0000000..8ccf14a --- /dev/null +++ b/src/silx/io/test/test_sliceh5.py @@ -0,0 +1,104 @@ +# /*########################################################################## +# Copyright (C) 2022-2023 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +import contextlib +from io import BytesIO + +import h5py +import numpy +import pytest + +import silx.io +from silx.io import commonh5 +from silx.io._sliceh5 import DatasetSlice, _combine_indices + + +@contextlib.contextmanager +def h5py_file(filename, mode): + with BytesIO() as buffer: + with h5py.File(buffer, mode) as h5file: + yield h5file + + +@pytest.fixture(params=[commonh5.File, h5py_file]) +def temp_h5file(request): + temp_file_context = request.param + with temp_file_context("tempfile.h5", "w") as h5file: + yield h5file + + +@pytest.mark.parametrize("indices", [1, slice(None), (1, slice(1, 4))]) +def test_datasetslice(temp_h5file, indices): + data = numpy.arange(50).reshape(10, 5) + ref_data = numpy.array(data[indices], copy=False) + + h5dataset = temp_h5file.create_group("group").create_dataset("dataset", data=data) + + with DatasetSlice(h5dataset, indices, attrs={}) as dset: + assert silx.io.is_dataset(dset) + assert dset.file == temp_h5file + assert dset.shape == ref_data.shape + assert dset.size == ref_data.size + assert dset.dtype == ref_data.dtype + assert len(dset) == len(ref_data) + assert numpy.array_equal(dset[()], ref_data) + assert dset.name == h5dataset.name + + +def test_datasetslice_on_external_link(tmp_path): + data = numpy.arange(10).reshape(5, 2) + + external_filename = str(tmp_path / "external.h5") + ext_dataset_name = "/external_data" + with h5py.File(external_filename, "w") as h5file: + h5file[ext_dataset_name] = data + + with h5py.File(tmp_path / "test.h5", "w") as h5file: + h5file["group/data"] = h5py.ExternalLink(external_filename, ext_dataset_name) + + with DatasetSlice(h5file["group/data"], slice(None), attrs={}) as dset: + assert dset.name == ext_dataset_name + assert numpy.array_equal(dset[()], data) + + +@pytest.mark.parametrize( + "shape,outer_indices,indices", + [ + ((2, 5, 10), (-1, slice(None), slice(None)), slice(None)), + ((2, 5, 10), (-1, slice(None), slice(None)), Ellipsis), + # negative strides + ((5, 10), (slice(1, 5, 2), slice(2, 8)), (slice(2, 3), slice(4, None, -2))), + ( + (5, 10), + (slice(4, None, -1), slice(9, 3, -2)), + (slice(1, 3), slice(3, 0, -1)), + ), + ((5, 10), (slice(1, 8, 2), slice(None)), slice(2, 8)), # slice overflow + ], +) +def test_combine_indices(shape, outer_indices, indices): + data = numpy.arange(numpy.prod(shape)).reshape(shape) + ref_data = data[outer_indices][indices] + + combined_indices = _combine_indices(shape, outer_indices, indices) + + assert numpy.array_equal(data[combined_indices], ref_data) diff --git a/src/silx/io/test/test_specfile.py b/src/silx/io/test/test_specfile.py new file mode 100644 index 0000000..1b84a65 --- /dev/null +++ b/src/silx/io/test/test_specfile.py @@ -0,0 +1,386 @@ +# /*########################################################################## +# Copyright (C) 2016-2023 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for specfile wrapper""" + +__authors__ = ["P. Knobel", "V.A. Sole"] +__license__ = "MIT" +__date__ = "17/01/2018" + + +import locale +import logging +import numpy +import os +import sys +import tempfile +import unittest + +from silx.utils import testutils + +from ..specfile import SpecFile, Scan +from .. import specfile + + +logger1 = logging.getLogger(__name__) + +sftext = """#F /tmp/sf.dat +#E 1455180875 +#D Thu Feb 11 09:54:35 2016 +#C imaging User = opid17 +#U00 user comment first line +#U01 This is a dummy file to test SpecFile parsing +#U02 +#U03 last line + +#O0 Pslit HGap MRTSlit UP MRTSlit DOWN +#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap +#o0 pshg mrtu mrtd +#o2 ss1vo ss1ho ss1vg + +#J0 Seconds IA ion.mono Current +#J1 xbpmc2 idgap1 Inorm + +#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2 +#D Thu Feb 11 09:55:20 2016 +#T 0.2 (Seconds) +#G0 0 +#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 +#G3 0 0 0 0 0 0 0 0 0 +#G4 0 +#Q +#P0 180.005 -0.66875 0.87125 +#P1 14.74255 16.197579 12.238283 +#UMI0 Current AutoM Shutter +#UMI1 192.51 OFF FE open +#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00; +#N 4 +#L first column second column 3rd_col +-1.23 5.89 8 +8.478100E+01 5 1.56 +3.14 2.73 -3.14 +1.2 2.3 3.4 + +#S 25 ascan c3th 1.33245 1.52245 40 0.15 +#D Thu Feb 11 10:00:31 2016 +#P0 80.005 -1.66875 1.87125 +#P1 4.74255 6.197579 2.238283 +#N 5 +#L column0 column1 col2 col3 +0.0 0.1 0.2 0.3 +1.0 1.1 1.2 1.3 +2.0 2.1 2.2 2.3 +3.0 3.1 3.2 3.3 + +#S 26 yyyyyy +#D Thu Feb 11 09:55:20 2016 +#P0 80.005 -1.66875 1.87125 +#P1 4.74255 6.197579 2.238283 +#N 4 +#L first column second column 3rd_col +#C Sat Oct 31 15:51:47 1998. Scan aborted after 0 points. + +#F /tmp/sf.dat +#E 1455180876 +#D Thu Feb 11 09:54:36 2016 + +#S 1 aaaaaa +#U first duplicate line +#U second duplicate line +#@MCADEV 1 +#@MCA %16C +#@CHANN 3 0 2 1 +#@CALIB 1 2 3 +#N 3 +#L uno duo +1 2 +@A 0 1 2 +3 4 +@A 3.1 4 5 +5 6 +@A 6 7.7 8 +""" + + +loc = locale.getlocale(locale.LC_NUMERIC) +try: + locale.setlocale(locale.LC_NUMERIC, "de_DE.utf8") +except locale.Error: + try_DE = False +else: + try_DE = True + locale.setlocale(locale.LC_NUMERIC, loc) + + +class TestSpecFile(unittest.TestCase): + @classmethod + def setUpClass(cls): + fd, cls.fname1 = tempfile.mkstemp(text=False) + os.write(fd, bytes(sftext, "ascii")) + os.close(fd) + + fd2, cls.fname2 = tempfile.mkstemp(text=False) + os.write(fd2, bytes(sftext[370:923], "ascii")) + os.close(fd2) + + fd3, cls.fname3 = tempfile.mkstemp(text=False) + txt = sftext[371:923] + os.write(fd3, bytes(txt, "ascii")) + os.close(fd3) + + @classmethod + def tearDownClass(cls): + os.unlink(cls.fname1) + os.unlink(cls.fname2) + os.unlink(cls.fname3) + + def setUp(self): + self.sf = SpecFile(self.fname1) + self.scan1 = self.sf[0] + self.scan1_2 = self.sf["1.2"] + self.scan25 = self.sf["25.1"] + self.empty_scan = self.sf["26.1"] + + self.sf_no_fhdr = SpecFile(self.fname2) + self.scan1_no_fhdr = self.sf_no_fhdr[0] + + self.sf_no_fhdr_crash = SpecFile(self.fname3) + self.scan1_no_fhdr_crash = self.sf_no_fhdr_crash[0] + + def tearDown(self): + self.sf.close() + self.sf_no_fhdr.close() + self.sf_no_fhdr_crash.close() + + def test_open(self): + self.assertIsInstance(self.sf, SpecFile) + with self.assertRaises(specfile.SfErrFileOpen): + SpecFile("doesnt_exist.dat") + + # test filename types unicode and bytes + try: + SpecFile(self.fname1) + except TypeError: + self.fail("failed to handle filename as python3 str") + try: + SpecFile(bytes(self.fname1, "utf-8")) + except TypeError: + self.fail("failed to handle filename as python3 bytes") + + def test_number_of_scans(self): + self.assertEqual(4, len(self.sf)) + + def test_list_of_scan_indices(self): + self.assertEqual(self.sf.list(), [1, 25, 26, 1]) + self.assertEqual(self.sf.keys(), ["1.1", "25.1", "26.1", "1.2"]) + + def test_index_number_order(self): + self.assertEqual(self.sf.index(1, 2), 3) # sf["1.2"]==sf[3] + self.assertEqual(self.sf.number(1), 25) # sf[1]==sf["25"] + self.assertEqual(self.sf.order(3), 2) # sf[3]==sf["1.2"] + with self.assertRaises(specfile.SfErrScanNotFound): + self.sf.index(3, 2) + with self.assertRaises(specfile.SfErrScanNotFound): + self.sf.index(99) + + def assertRaisesRegex(self, *args, **kwargs): + return super(TestSpecFile, self).assertRaisesRegex(*args, **kwargs) + + def test_getitem(self): + self.assertIsInstance(self.sf[2], Scan) + self.assertIsInstance(self.sf["1.2"], Scan) + # int out of range + with self.assertRaisesRegex(IndexError, "Scan index must be in ran"): + self.sf[107] + # float indexing not allowed + with self.assertRaisesRegex(TypeError, "The scan identification k"): + self.sf[1.2] + # non existant scan with "N.M" indexing + with self.assertRaises(KeyError): + self.sf["3.2"] + + def test_specfile_iterator(self): + i = 0 + for scan in self.sf: + if i == 1: + self.assertEqual(scan.motor_positions, self.sf[1].motor_positions) + i += 1 + # number of returned scans + self.assertEqual(i, len(self.sf)) + + def test_scan_index(self): + self.assertEqual(self.scan1.index, 0) + self.assertEqual(self.scan1_2.index, 3) + self.assertEqual(self.scan25.index, 1) + + def test_scan_headers(self): + self.assertEqual( + self.scan25.scan_header_dict["S"], + "25 ascan c3th 1.33245 1.52245 40 0.15", + ) + self.assertEqual(self.scan1.header[17], "#G0 0") + self.assertEqual(len(self.scan1.header), 29) + # parsing headers with long keys + self.assertEqual( + self.scan1.scan_header_dict["UMI0"], "Current AutoM Shutter" + ) + # parsing empty headers + self.assertEqual(self.scan1.scan_header_dict["Q"], "") + # duplicate headers: concatenated (with newline) + self.assertEqual( + self.scan1_2.scan_header_dict["U"], + "first duplicate line\nsecond duplicate line", + ) + + def test_file_headers(self): + self.assertEqual(self.scan1.header[1], "#E 1455180875") + self.assertEqual(self.scan1.file_header_dict["F"], "/tmp/sf.dat") + + def test_multiple_file_headers(self): + """Scan 1.2 is after the second file header, with a different + Epoch""" + self.assertEqual(self.scan1_2.header[1], "#E 1455180876") + + def test_scan_labels(self): + self.assertEqual( + self.scan1.labels, ["first column", "second column", "3rd_col"] + ) + + def test_data(self): + # data_line() and data_col() take 1-based indices as arg + self.assertAlmostEqual(self.scan1.data_line(1)[2], 1.56) + # tests for data transposition between original file and .data attr + self.assertAlmostEqual(self.scan1.data[2, 0], 8) + self.assertEqual(self.scan1.data.shape, (3, 4)) + self.assertAlmostEqual(numpy.sum(self.scan1.data), 113.631) + + def test_data_column_by_name(self): + self.assertAlmostEqual(self.scan25.data_column_by_name("col2")[1], 1.2) + # Scan.data is transposed after readinq, so column is the first index + self.assertAlmostEqual( + numpy.sum(self.scan25.data_column_by_name("col2")), + numpy.sum(self.scan25.data[2, :]), + ) + with self.assertRaises(specfile.SfErrColNotFound): + self.scan25.data_column_by_name("ygfxgfyxg") + + def test_motors(self): + self.assertEqual(len(self.scan1.motor_names), 6) + self.assertEqual(len(self.scan1.motor_positions), 6) + self.assertAlmostEqual(sum(self.scan1.motor_positions), 223.385912) + self.assertEqual(self.scan1.motor_names[1], "MRTSlit UP") + self.assertAlmostEqual( + self.scan25.motor_position_by_name("MRTSlit UP"), -1.66875 + ) + + def test_absence_of_file_header(self): + """We expect Scan.file_header to be an empty list in the absence + of a file header. + """ + self.assertEqual(len(self.scan1_no_fhdr.motor_names), 0) + # motor positions can still be read in the scan header + # even in the absence of motor names + self.assertAlmostEqual(sum(self.scan1_no_fhdr.motor_positions), 223.385912) + self.assertEqual(len(self.scan1_no_fhdr.header), 15) + self.assertEqual(len(self.scan1_no_fhdr.scan_header), 15) + self.assertEqual(len(self.scan1_no_fhdr.file_header), 0) + + def test_crash_absence_of_file_header(self): + """Test no crash in absence of file header and no leading newline + character + """ + self.assertEqual(len(self.scan1_no_fhdr_crash.motor_names), 0) + # motor positions can still be read in the scan header + # even in the absence of motor names + self.assertAlmostEqual( + sum(self.scan1_no_fhdr_crash.motor_positions), 223.385912 + ) + self.assertEqual(len(self.scan1_no_fhdr_crash.scan_header), 15) + self.assertEqual(len(self.scan1_no_fhdr_crash.file_header), 0) + + def test_mca(self): + self.assertEqual(len(self.scan1.mca), 0) + self.assertEqual(len(self.scan1_2.mca), 3) + self.assertEqual(self.scan1_2.mca[1][2], 5) + self.assertEqual(sum(self.scan1_2.mca[2]), 21.7) + + # Negative indexing + self.assertEqual( + sum(self.scan1_2.mca[len(self.scan1_2.mca) - 1]), sum(self.scan1_2.mca[-1]) + ) + + # Test iterator + line_count, total_sum = (0, 0) + for mca_line in self.scan1_2.mca: + line_count += 1 + total_sum += sum(mca_line) + self.assertEqual(line_count, 3) + self.assertAlmostEqual(total_sum, 36.8) + + def test_mca_header(self): + self.assertEqual(self.scan1.mca_header_dict, {}) + self.assertEqual(len(self.scan1_2.mca_header_dict), 4) + self.assertEqual(self.scan1_2.mca_header_dict["CALIB"], "1 2 3") + self.assertEqual(self.scan1_2.mca.calibration, [[1.0, 2.0, 3.0]]) + # default calib in the absence of #@CALIB + self.assertEqual(self.scan25.mca.calibration, [[0.0, 1.0, 0.0]]) + self.assertEqual(self.scan1_2.mca.channels, [[0, 1, 2]]) + # absence of #@CHANN and spectra + self.assertEqual(self.scan25.mca.channels, []) + + @testutils.validate_logging(specfile._logger.name, warning=1) + def test_empty_scan(self): + """Test reading a scan with no data points""" + self.assertEqual(len(self.empty_scan.labels), 3) + col1 = self.empty_scan.data_column_by_name("second column") + self.assertEqual(col1.shape, (0,)) + + +class TestSFLocale(unittest.TestCase): + @classmethod + def setUpClass(cls): + fd, cls.fname = tempfile.mkstemp(text=False) + os.write(fd, bytes(sftext, "ascii")) + os.close(fd) + + @classmethod + def tearDownClass(cls): + os.unlink(cls.fname) + locale.setlocale(locale.LC_NUMERIC, loc) # restore saved locale + + def crunch_data(self): + self.sf3 = SpecFile(self.fname) + self.assertAlmostEqual(self.sf3[0].data_line(1)[2], 1.56) + self.sf3.close() + + @unittest.skipIf(not try_DE, "de_DE.utf8 locale not installed") + def test_locale_de_DE(self): + locale.setlocale(locale.LC_NUMERIC, "de_DE.utf8") + self.crunch_data() + + def test_locale_user(self): + locale.setlocale(locale.LC_NUMERIC, "") # use user's preferred locale + self.crunch_data() + + def test_locale_C(self): + locale.setlocale(locale.LC_NUMERIC, "C") # use default (C) locale + self.crunch_data() diff --git a/src/silx/io/test/test_specfilewrapper.py b/src/silx/io/test/test_specfilewrapper.py new file mode 100644 index 0000000..e830023 --- /dev/null +++ b/src/silx/io/test/test_specfilewrapper.py @@ -0,0 +1,189 @@ +# /*########################################################################## +# Copyright (C) 2016-2023 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for old specfile wrapper""" + +__authors__ = ["P. Knobel"] +__license__ = "MIT" +__date__ = "15/05/2017" + +import logging +import numpy +import os +import sys +import tempfile +import unittest + +logger1 = logging.getLogger(__name__) + +from ..specfilewrapper import Specfile + +sftext = """#F /tmp/sf.dat +#E 1455180875 +#D Thu Feb 11 09:54:35 2016 +#C imaging User = opid17 +#U00 user comment first line +#U01 This is a dummy file to test SpecFile parsing +#U02 +#U03 last line + +#O0 Pslit HGap MRTSlit UP MRTSlit DOWN +#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap +#o0 pshg mrtu mrtd +#o2 ss1vo ss1ho ss1vg + +#J0 Seconds IA ion.mono Current +#J1 xbpmc2 idgap1 Inorm + +#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2 +#D Thu Feb 11 09:55:20 2016 +#T 0.2 (Seconds) +#G0 0 +#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 +#G3 0 0 0 0 0 0 0 0 0 +#G4 0 +#Q +#P0 180.005 -0.66875 0.87125 +#P1 14.74255 16.197579 12.238283 +#UMI0 Current AutoM Shutter +#UMI1 192.51 OFF FE open +#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00; +#N 4 +#L first column second column 3rd_col +-1.23 5.89 8 +8.478100E+01 5 1.56 +3.14 2.73 -3.14 +1.2 2.3 3.4 + +#S 25 ascan c3th 1.33245 1.52245 40 0.15 +#D Thu Feb 11 10:00:31 2016 +#P0 80.005 -1.66875 1.87125 +#P1 4.74255 6.197579 2.238283 +#N 5 +#L column0 column1 col2 col3 +0.0 0.1 0.2 0.3 +1.0 1.1 1.2 1.3 +2.0 2.1 2.2 2.3 +3.0 3.1 3.2 3.3 + +#F /tmp/sf.dat +#E 1455180876 +#D Thu Feb 11 09:54:36 2016 + +#S 1 aaaaaa +#U first duplicate line +#U second duplicate line +#@MCADEV 1 +#@MCA %16C +#@CHANN 3 0 2 1 +#@CALIB 1 2 3 +#N 3 +#L uno duo +1 2 +@A 0 1 2 +3 4 +@A 3.1 4 5 +5 6 +@A 6 7.7 8 +""" + + +class TestSpecfilewrapper(unittest.TestCase): + @classmethod + def setUpClass(cls): + fd, cls.fname1 = tempfile.mkstemp(text=False) + os.write(fd, bytes(sftext, "ascii")) + os.close(fd) + + @classmethod + def tearDownClass(cls): + os.unlink(cls.fname1) + + def setUp(self): + self.sf = Specfile(self.fname1) + self.scan1 = self.sf[0] + self.scan1_2 = self.sf.select("1.2") + self.scan25 = self.sf.select("25.1") + + def tearDown(self): + self.sf.close() + + def test_number_of_scans(self): + self.assertEqual(3, len(self.sf)) + + def test_list_of_scan_indices(self): + self.assertEqual(self.sf.list(), "1,25,1") + self.assertEqual(self.sf.keys(), ["1.1", "25.1", "1.2"]) + + def test_scan_headers(self): + self.assertEqual( + self.scan25.header("S"), ["#S 25 ascan c3th 1.33245 1.52245 40 0.15"] + ) + self.assertEqual(self.scan1.header("G0"), ["#G0 0"]) + # parsing headers with long keys + # parsing empty headers + self.assertEqual(self.scan1.header("Q"), ["#Q "]) + + def test_file_headers(self): + self.assertEqual(self.scan1.header("E"), ["#E 1455180875"]) + self.assertEqual(self.sf.title(), "imaging") + self.assertEqual(self.sf.epoch(), 1455180875) + self.assertEqual( + self.sf.allmotors(), + [ + "Pslit HGap", + "MRTSlit UP", + "MRTSlit DOWN", + "Sslit1 VOff", + "Sslit1 HOff", + "Sslit1 VGap", + ], + ) + + def test_scan_labels(self): + self.assertEqual( + self.scan1.alllabels(), ["first column", "second column", "3rd_col"] + ) + + def test_data(self): + self.assertAlmostEqual(self.scan1.dataline(3)[2], -3.14) + self.assertAlmostEqual(self.scan1.datacol(1)[2], 3.14) + # tests for data transposition between original file and .data attr + self.assertAlmostEqual(self.scan1.data()[2, 0], 8) + self.assertEqual(self.scan1.data().shape, (3, 4)) + self.assertAlmostEqual(numpy.sum(self.scan1.data()), 113.631) + + def test_date(self): + self.assertEqual(self.scan1.date(), "Thu Feb 11 09:55:20 2016") + + def test_motors(self): + self.assertEqual(len(self.sf.allmotors()), 6) + self.assertEqual(len(self.scan1.allmotorpos()), 6) + self.assertAlmostEqual(sum(self.scan1.allmotorpos()), 223.385912) + self.assertEqual(self.sf.allmotors()[1], "MRTSlit UP") + + def test_mca(self): + self.assertEqual(self.scan1_2.mca(2)[2], 5) + self.assertEqual(sum(self.scan1_2.mca(3)), 21.7) + + def test_mca_header(self): + self.assertEqual(self.scan1_2.header("CALIB"), ["#@CALIB 1 2 3"]) diff --git a/src/silx/io/test/test_spech5.py b/src/silx/io/test/test_spech5.py new file mode 100644 index 0000000..93175f7 --- /dev/null +++ b/src/silx/io/test/test_spech5.py @@ -0,0 +1,912 @@ +# /*########################################################################## +# Copyright (C) 2016-2023 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for spech5""" +import numpy +import os +import io +import tempfile +import unittest +import datetime +from functools import partial + +from silx.utils import testutils + +from .. import spech5 +from ..spech5 import SpecH5, SpecH5Dataset, spec_date_to_iso8601 +from .. import specfile + +import h5py + +__authors__ = ["P. Knobel"] +__license__ = "MIT" +__date__ = "12/02/2018" + +sftext = """#F /tmp/sf.dat +#E 1455180875 +#D Thu Feb 11 09:54:35 2016 +#C imaging User = opid17 +#O0 Pslit HGap MRTSlit UP MRTSlit DOWN +#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap +#o0 pshg mrtu mrtd +#o2 ss1vo ss1ho ss1vg + +#J0 Seconds IA ion.mono Current +#J1 xbpmc2 idgap1 Inorm + +#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2 +#D Thu Feb 11 09:55:20 2016 +#T 0.2 (Seconds) +#P0 180.005 -0.66875 0.87125 +#P1 14.74255 16.197579 12.238283 +#N 4 +#L MRTSlit UP second column 3rd_col +-1.23 5.89 8 +8.478100E+01 5 1.56 +3.14 2.73 -3.14 +1.2 2.3 3.4 + +#S 25 ascan c3th 1.33245 1.52245 40 0.15 +#D Sat 2015/03/14 03:53:50 +#P0 80.005 -1.66875 1.87125 +#P1 4.74255 6.197579 2.238283 +#N 5 +#L column0 column1 col2 col3 +0.0 0.1 0.2 0.3 +1.0 1.1 1.2 1.3 +2.0 2.1 2.2 2.3 +3.0 3.1 3.2 3.3 + +#S 1 aaaaaa +#D Thu Feb 11 10:00:32 2016 +#@MCADEV 1 +#@MCA %16C +#@CHANN 3 0 2 1 +#@CALIB 1 2 3 +#@CTIME 123.4 234.5 345.6 +#N 3 +#L uno duo +1 2 +@A 0 1 2 +@A 10 9 8 +@A 1 1 1.1 +3 4 +@A 3.1 4 5 +@A 7 6 5 +@A 1 1 1 +5 6 +@A 6 7.7 8 +@A 4 3 2 +@A 1 1 1 + +#S 1000 bbbbb +#G1 3.25 3.25 5.207 90 90 120 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353 +#G3 0.0106337923671 0.027529133 1.206191273 -1.43467075 0.7633438883 0.02401568018 -1.709143587 -2.097621783 0.02456954971 +#L a b +1 2 + +#S 1001 ccccc +#G1 0. 0. 0. 0 0 0 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353 +#G3 0. 0. 0. 0. 0.0 0. 0. 0. 0. +#L a b +1 2 + +""" + + +class TestSpecDate(unittest.TestCase): + """ + Test of the spec_date_to_iso8601 function. + """ + + # TODO : time zone tests + # TODO : error cases + + @classmethod + def setUpClass(cls): + import locale + + # FYI : not threadsafe + cls.locale_saved = locale.setlocale(locale.LC_TIME) + locale.setlocale(locale.LC_TIME, "C") + + @classmethod + def tearDownClass(cls): + import locale + + # FYI : not threadsafe + locale.setlocale(locale.LC_TIME, cls.locale_saved) + + def setUp(self): + # covering all week days + self.n_days = range(1, 10) + # covering all months + self.n_months = range(1, 13) + + self.n_years = [1999, 2016, 2020] + self.n_seconds = [0, 5, 26, 59] + self.n_minutes = [0, 9, 42, 59] + self.n_hours = [0, 2, 17, 23] + + self.formats = ["%a %b %d %H:%M:%S %Y", "%a %Y/%m/%d %H:%M:%S"] + + self.check_date_formats = partial( + self.__check_date_formats, + year=self.n_years[0], + month=self.n_months[0], + day=self.n_days[0], + hour=self.n_hours[0], + minute=self.n_minutes[0], + second=self.n_seconds[0], + msg=None, + ) + + def __check_date_formats(self, year, month, day, hour, minute, second, msg=None): + dt = datetime.datetime(year, month, day, hour, minute, second) + expected_date = dt.isoformat() + + for i_fmt, fmt in enumerate(self.formats): + spec_date = dt.strftime(fmt) + iso_date = spec_date_to_iso8601(spec_date) + self.assertEqual( + iso_date, + expected_date, + msg="Testing {0}. format={1}. " + 'Expected "{2}", got "{3} ({4})" (dt={5}).' + "".format(msg, i_fmt, expected_date, iso_date, spec_date, dt), + ) + + def testYearsNominal(self): + for year in self.n_years: + self.check_date_formats(year=year, msg="year") + + def testMonthsNominal(self): + for month in self.n_months: + self.check_date_formats(month=month, msg="month") + + def testDaysNominal(self): + for day in self.n_days: + self.check_date_formats(day=day, msg="day") + + def testHoursNominal(self): + for hour in self.n_hours: + self.check_date_formats(hour=hour, msg="hour") + + def testMinutesNominal(self): + for minute in self.n_minutes: + self.check_date_formats(minute=minute, msg="minute") + + def testSecondsNominal(self): + for second in self.n_seconds: + self.check_date_formats(second=second, msg="second") + + +class TestSpecH5(unittest.TestCase): + @classmethod + def setUpClass(cls): + fd, cls.fname = tempfile.mkstemp() + os.write(fd, bytes(sftext, "ascii")) + os.close(fd) + + @classmethod + def tearDownClass(cls): + os.unlink(cls.fname) + + def setUp(self): + self.sfh5 = SpecH5(self.fname) + + def tearDown(self): + self.sfh5.close() + + def testContainsFile(self): + self.assertIn("/1.2/measurement", self.sfh5) + self.assertIn("/25.1", self.sfh5) + self.assertIn("25.1", self.sfh5) + self.assertNotIn("25.2", self.sfh5) + # measurement is a child of a scan, full path would be required to + # access from root level + self.assertNotIn("measurement", self.sfh5) + # Groups may or may not have a trailing / + self.assertIn("/1.2/measurement/mca_1/", self.sfh5) + self.assertIn("/1.2/measurement/mca_1", self.sfh5) + # Datasets can't have a trailing / + self.assertNotIn("/1.2/measurement/mca_0/info/calibration/ ", self.sfh5) + # No mca_8 + self.assertNotIn("/1.2/measurement/mca_8/info/calibration", self.sfh5) + # Link + self.assertIn("/1.2/measurement/mca_0/info/calibration", self.sfh5) + + def testContainsGroup(self): + self.assertIn("measurement", self.sfh5["/1.2/"]) + self.assertIn("measurement", self.sfh5["/1.2"]) + self.assertIn("25.1", self.sfh5["/"]) + self.assertNotIn("25.2", self.sfh5["/"]) + self.assertIn("instrument/positioners/Sslit1 HOff", self.sfh5["/1.1"]) + # illegal trailing "/" after dataset name + self.assertNotIn("instrument/positioners/Sslit1 HOff/", self.sfh5["/1.1"]) + # full path to element in group (OK) + self.assertIn( + "/1.1/instrument/positioners/Sslit1 HOff", self.sfh5["/1.1/instrument"] + ) + + def testDataColumn(self): + self.assertAlmostEqual(sum(self.sfh5["/1.2/measurement/duo"]), 12.0) + self.assertAlmostEqual( + sum(self.sfh5["1.1"]["measurement"]["MRTSlit UP"]), 87.891, places=4 + ) + + def testDate(self): + # start time is in Iso8601 format + self.assertEqual(self.sfh5["/1.1/start_time"], "2016-02-11T09:55:20") + self.assertEqual(self.sfh5["25.1/start_time"], "2015-03-14T03:53:50") + + def assertRaisesRegex(self, *args, **kwargs): + return super(TestSpecH5, self).assertRaisesRegex(*args, **kwargs) + + def testDatasetInstanceAttr(self): + """The SpecH5Dataset objects must implement some dummy attributes + to improve compatibility with widgets dealing with h5py datasets.""" + self.assertIsNone(self.sfh5["/1.1/start_time"].compression) + self.assertIsNone(self.sfh5["1.1"]["measurement"]["MRTSlit UP"].chunks) + + # error message must be explicit + with self.assertRaisesRegex( + AttributeError, "SpecH5Dataset has no attribute tOTo" + ): + dummy = self.sfh5["/1.1/start_time"].tOTo + + def testGet(self): + """Test :meth:`SpecH5Group.get`""" + # default value of param *default* is None + self.assertIsNone(self.sfh5.get("toto")) + self.assertEqual(self.sfh5["25.1"].get("toto", default=-3), -3) + + self.assertEqual( + self.sfh5.get("/1.1/start_time", default=-3), "2016-02-11T09:55:20" + ) + + def testGetClass(self): + """Test :meth:`SpecH5Group.get`""" + self.assertIs(self.sfh5["1.1"].get("start_time", getclass=True), h5py.Dataset) + self.assertIs(self.sfh5["1.1"].get("instrument", getclass=True), h5py.Group) + + # spech5 does not define external link, so there is no way + # a group can *get* a SpecH5 class + + def testGetApi(self): + result = self.sfh5.get("1.1", getclass=True, getlink=True) + self.assertIs(result, h5py.HardLink) + result = self.sfh5.get("1.1", getclass=False, getlink=True) + self.assertIsInstance(result, h5py.HardLink) + result = self.sfh5.get("1.1", getclass=True, getlink=False) + self.assertIs(result, h5py.Group) + result = self.sfh5.get("1.1", getclass=False, getlink=False) + self.assertIsInstance(result, spech5.SpecH5Group) + + def testGetItemGroup(self): + group = self.sfh5["25.1"]["instrument"] + self.assertEqual( + list(group["positioners"].keys()), + [ + "Pslit HGap", + "MRTSlit UP", + "MRTSlit DOWN", + "Sslit1 VOff", + "Sslit1 HOff", + "Sslit1 VGap", + ], + ) + with self.assertRaises(KeyError): + group["Holy Grail"] + + def testGetitemSpecH5(self): + self.assertEqual( + self.sfh5["/1.2/instrument/positioners"], + self.sfh5["1.2"]["instrument"]["positioners"], + ) + + def testH5pyClass(self): + """Test :attr:`h5py_class` returns the corresponding h5py class + (h5py.File, h5py.Group, h5py.Dataset)""" + a_file = self.sfh5 + self.assertIs(a_file.h5py_class, h5py.File) + + a_group = self.sfh5["/1.2/measurement"] + self.assertIs(a_group.h5py_class, h5py.Group) + + a_dataset = self.sfh5["/1.1/instrument/positioners/Sslit1 HOff"] + self.assertIs(a_dataset.h5py_class, h5py.Dataset) + + def testHeader(self): + file_header = self.sfh5["/1.2/instrument/specfile/file_header"] + scan_header = self.sfh5["/1.2/instrument/specfile/scan_header"] + + # File header has 10 lines + self.assertEqual(len(file_header), 10) + # 1.2 has 9 scan & mca header lines + self.assertEqual(len(scan_header), 9) + + # line 4 of file header + self.assertEqual(file_header[3], "#C imaging User = opid17") + # line 4 of scan header + scan_header = self.sfh5["25.1/instrument/specfile/scan_header"] + + self.assertEqual(scan_header[3], "#P1 4.74255 6.197579 2.238283") + + def testLinks(self): + self.assertTrue( + numpy.array_equal( + self.sfh5["/1.2/measurement/mca_0/data"], + self.sfh5["/1.2/instrument/mca_0/data"], + ) + ) + self.assertTrue( + numpy.array_equal( + self.sfh5["/1.2/measurement/mca_0/info/data"], + self.sfh5["/1.2/instrument/mca_0/data"], + ) + ) + self.assertTrue( + numpy.array_equal( + self.sfh5["/1.2/measurement/mca_0/info/channels"], + self.sfh5["/1.2/instrument/mca_0/channels"], + ) + ) + self.assertEqual( + self.sfh5["/1.2/measurement/mca_0/info/"].keys(), + self.sfh5["/1.2/instrument/mca_0/"].keys(), + ) + + self.assertEqual( + self.sfh5["/1.2/measurement/mca_0/info/preset_time"], + self.sfh5["/1.2/instrument/mca_0/preset_time"], + ) + self.assertEqual( + self.sfh5["/1.2/measurement/mca_0/info/live_time"], + self.sfh5["/1.2/instrument/mca_0/live_time"], + ) + self.assertEqual( + self.sfh5["/1.2/measurement/mca_0/info/elapsed_time"], + self.sfh5["/1.2/instrument/mca_0/elapsed_time"], + ) + + def testListScanIndices(self): + self.assertEqual( + list(self.sfh5.keys()), ["1.1", "25.1", "1.2", "1000.1", "1001.1"] + ) + self.assertEqual( + self.sfh5["1.2"].attrs, + { + "NX_class": "NXentry", + }, + ) + + def testMcaAbsent(self): + def access_absent_mca(): + """This must raise a KeyError, because scan 1.1 has no MCA""" + return self.sfh5["/1.1/measurement/mca_0/"] + + self.assertRaises(KeyError, access_absent_mca) + + def testMcaCalib(self): + mca0_calib = self.sfh5["/1.2/measurement/mca_0/info/calibration"] + mca1_calib = self.sfh5["/1.2/measurement/mca_1/info/calibration"] + self.assertEqual(mca0_calib.tolist(), [1, 2, 3]) + # calibration is unique in this scan and applies to all analysers + self.assertEqual(mca0_calib.tolist(), mca1_calib.tolist()) + + def testMcaChannels(self): + mca0_chann = self.sfh5["/1.2/measurement/mca_0/info/channels"] + mca1_chann = self.sfh5["/1.2/measurement/mca_1/info/channels"] + self.assertEqual(mca0_chann.tolist(), [0, 1, 2]) + self.assertEqual(mca0_chann.tolist(), mca1_chann.tolist()) + + def testMcaCtime(self): + """Tests for #@CTIME mca header""" + datasets = ["preset_time", "live_time", "elapsed_time"] + for ds in datasets: + self.assertNotIn("/1.1/instrument/mca_0/" + ds, self.sfh5) + self.assertIn("/1.2/instrument/mca_0/" + ds, self.sfh5) + + mca0_preset_time = self.sfh5["/1.2/instrument/mca_0/preset_time"] + mca1_preset_time = self.sfh5["/1.2/instrument/mca_1/preset_time"] + self.assertLess(mca0_preset_time - 123.4, 10**-5) + # ctime is unique in a this scan and applies to all analysers + self.assertEqual(mca0_preset_time, mca1_preset_time) + + mca0_live_time = self.sfh5["/1.2/instrument/mca_0/live_time"] + mca1_live_time = self.sfh5["/1.2/instrument/mca_1/live_time"] + self.assertLess(mca0_live_time - 234.5, 10**-5) + self.assertEqual(mca0_live_time, mca1_live_time) + + mca0_elapsed_time = self.sfh5["/1.2/instrument/mca_0/elapsed_time"] + mca1_elapsed_time = self.sfh5["/1.2/instrument/mca_1/elapsed_time"] + self.assertLess(mca0_elapsed_time - 345.6, 10**-5) + self.assertEqual(mca0_elapsed_time, mca1_elapsed_time) + + def testMcaData(self): + # sum 1st MCA in scan 1.2 over rows + mca_0_data = self.sfh5["/1.2/measurement/mca_0/data"] + for summed_row, expected in zip( + mca_0_data.sum(axis=1).tolist(), [3.0, 12.1, 21.7] + ): + self.assertAlmostEqual(summed_row, expected, places=4) + + # sum 3rd MCA in scan 1.2 along both axis + mca_2_data = self.sfh5["1.2"]["measurement"]["mca_2"]["data"] + self.assertAlmostEqual(sum(sum(mca_2_data)), 9.1, places=5) + # attrs + self.assertEqual(mca_0_data.attrs, {"interpretation": "spectrum"}) + + def testMotorPosition(self): + positioners_group = self.sfh5["/1.1/instrument/positioners"] + # MRTSlit DOWN position is defined in #P0 san header line + self.assertAlmostEqual(float(positioners_group["MRTSlit DOWN"]), 0.87125) + # MRTSlit UP position is defined in first data column + for a, b in zip( + positioners_group["MRTSlit UP"].tolist(), [-1.23, 8.478100e01, 3.14, 1.2] + ): + self.assertAlmostEqual(float(a), b, places=4) + + def testNumberMcaAnalysers(self): + """Scan 1.2 has 2 data columns + 3 mca spectra per data line.""" + self.assertEqual(len(self.sfh5["1.2"]["measurement"]), 5) + + def testTitle(self): + self.assertEqual( + self.sfh5["/25.1/title"], "ascan c3th 1.33245 1.52245 40 0.15" + ) + + def testValues(self): + group = self.sfh5["/25.1"] + self.assertTrue(hasattr(group, "values")) + self.assertTrue(callable(group.values)) + self.assertIn(self.sfh5["/25.1/title"], self.sfh5["/25.1"].values()) + + # visit and visititems ignore links + def testVisit(self): + name_list = [] + self.sfh5.visit(name_list.append) + self.assertIn("1.2/instrument/positioners/Pslit HGap", name_list) + self.assertIn("1.2/instrument/specfile/scan_header", name_list) + self.assertEqual(len(name_list), 117) + + # test also visit of a subgroup, with various group name formats + name_list_leading_and_trailing_slash = [] + self.sfh5["/1.2/instrument/"].visit(name_list_leading_and_trailing_slash.append) + name_list_leading_slash = [] + self.sfh5["/1.2/instrument"].visit(name_list_leading_slash.append) + name_list_trailing_slash = [] + self.sfh5["1.2/instrument/"].visit(name_list_trailing_slash.append) + name_list_no_slash = [] + self.sfh5["1.2/instrument"].visit(name_list_no_slash.append) + + # no differences expected in the output names + self.assertEqual(name_list_leading_and_trailing_slash, name_list_leading_slash) + self.assertEqual(name_list_leading_slash, name_list_trailing_slash) + self.assertEqual(name_list_leading_slash, name_list_no_slash) + self.assertIn("positioners/Pslit HGap", name_list_no_slash) + self.assertIn("positioners", name_list_no_slash) + + def testVisitItems(self): + dataset_name_list = [] + + def func_generator(l): + """return a function appending names to list l""" + + def func(name, obj): + if isinstance(obj, SpecH5Dataset): + l.append(name) + + return func + + self.sfh5.visititems(func_generator(dataset_name_list)) + self.assertIn("1.2/instrument/positioners/Pslit HGap", dataset_name_list) + self.assertEqual(len(dataset_name_list), 85) + + # test also visit of a subgroup, with various group name formats + name_list_leading_and_trailing_slash = [] + self.sfh5["/1.2/instrument/"].visititems( + func_generator(name_list_leading_and_trailing_slash) + ) + name_list_leading_slash = [] + self.sfh5["/1.2/instrument"].visititems(func_generator(name_list_leading_slash)) + name_list_trailing_slash = [] + self.sfh5["1.2/instrument/"].visititems( + func_generator(name_list_trailing_slash) + ) + name_list_no_slash = [] + self.sfh5["1.2/instrument"].visititems(func_generator(name_list_no_slash)) + + # no differences expected in the output names + self.assertEqual(name_list_leading_and_trailing_slash, name_list_leading_slash) + self.assertEqual(name_list_leading_slash, name_list_trailing_slash) + self.assertEqual(name_list_leading_slash, name_list_no_slash) + self.assertIn("positioners/Pslit HGap", name_list_no_slash) + + def testNotSpecH5(self): + fd, fname = tempfile.mkstemp() + os.write(fd, b"Not a spec file!") + os.close(fd) + self.assertRaises(specfile.SfErrFileOpen, SpecH5, fname) + self.assertRaises(IOError, SpecH5, fname) + os.unlink(fname) + + def testSample(self): + self.assertNotIn("sample", self.sfh5["/1.1"]) + self.assertIn("sample", self.sfh5["/1000.1"]) + self.assertIn("ub_matrix", self.sfh5["/1000.1/sample"]) + self.assertIn("unit_cell", self.sfh5["/1000.1/sample"]) + self.assertIn("unit_cell_abc", self.sfh5["/1000.1/sample"]) + self.assertIn("unit_cell_alphabetagamma", self.sfh5["/1000.1/sample"]) + + # All 0 values + self.assertNotIn("sample", self.sfh5["/1001.1"]) + with self.assertRaises(KeyError): + self.sfh5["/1001.1/sample/unit_cell"] + + @testutils.validate_logging(spech5.logger1.name, warning=2) + def testOpenFileDescriptor(self): + """Open a SpecH5 file from a file descriptor""" + with io.open(self.sfh5.filename) as f: + sfh5 = SpecH5(f) + self.assertIsNotNone(sfh5) + name_list = [] + # check if the object is working + self.sfh5.visit(name_list.append) + sfh5.close() + + +sftext_multi_mca_headers = """ +#S 1 aaaaaa +#@MCA %16C +#@CHANN 3 0 2 1 +#@CALIB 1 2 3 +#@CTIME 123.4 234.5 345.6 +#@MCA %16C +#@CHANN 3 1 3 1 +#@CALIB 5.5 6.6 7.7 +#@CTIME 10 11 12 +#N 3 +#L uno duo +1 2 +@A 0 1 2 +@A 10 9 8 +3 4 +@A 3.1 4 5 +@A 7 6 5 +5 6 +@A 6 7.7 8 +@A 4 3 2 + +""" + + +class TestSpecH5MultiMca(unittest.TestCase): + @classmethod + def setUpClass(cls): + fd, cls.fname = tempfile.mkstemp(text=False) + os.write(fd, bytes(sftext_multi_mca_headers, "ascii")) + os.close(fd) + + @classmethod + def tearDownClass(cls): + os.unlink(cls.fname) + + def setUp(self): + self.sfh5 = SpecH5(self.fname) + + def tearDown(self): + self.sfh5.close() + + def testMcaCalib(self): + mca0_calib = self.sfh5["/1.1/measurement/mca_0/info/calibration"] + mca1_calib = self.sfh5["/1.1/measurement/mca_1/info/calibration"] + self.assertEqual(mca0_calib.tolist(), [1, 2, 3]) + self.assertAlmostEqual(sum(mca1_calib.tolist()), sum([5.5, 6.6, 7.7]), places=5) + + def testMcaChannels(self): + mca0_chann = self.sfh5["/1.1/measurement/mca_0/info/channels"] + mca1_chann = self.sfh5["/1.1/measurement/mca_1/info/channels"] + self.assertEqual(mca0_chann.tolist(), [0.0, 1.0, 2.0]) + # @CHANN is unique in this scan and applies to all analysers + self.assertEqual(mca1_chann.tolist(), [1.0, 2.0, 3.0]) + + def testMcaCtime(self): + """Tests for #@CTIME mca header""" + mca0_preset_time = self.sfh5["/1.1/instrument/mca_0/preset_time"] + mca1_preset_time = self.sfh5["/1.1/instrument/mca_1/preset_time"] + self.assertLess(mca0_preset_time - 123.4, 10**-5) + self.assertLess(mca1_preset_time - 10, 10**-5) + + mca0_live_time = self.sfh5["/1.1/instrument/mca_0/live_time"] + mca1_live_time = self.sfh5["/1.1/instrument/mca_1/live_time"] + self.assertLess(mca0_live_time - 234.5, 10**-5) + self.assertLess(mca1_live_time - 11, 10**-5) + + mca0_elapsed_time = self.sfh5["/1.1/instrument/mca_0/elapsed_time"] + mca1_elapsed_time = self.sfh5["/1.1/instrument/mca_1/elapsed_time"] + self.assertLess(mca0_elapsed_time - 345.6, 10**-5) + self.assertLess(mca1_elapsed_time - 12, 10**-5) + + +sftext_no_cols = r"""#F C:/DATA\test.mca +#D Thu Jul 7 08:40:19 2016 + +#S 1 31oct98.dat 22.1 If4 +#D Thu Jul 7 08:40:19 2016 +#C no data cols, one mca analyser, single spectrum +#@MCA %16C +#@CHANN 151 0 150 1 +#@CALIB 0 2 0 +@A 789 784 788 814 847 862 880 904 925 955 987 1015 1031 1070 1111 1139 \ +1203 1236 1290 1392 1492 1558 1688 1813 1977 2119 2346 2699 3121 3542 4102 4970 \ +6071 7611 10426 16188 28266 40348 50539 55555 56162 54162 47102 35718 24588 17034 12994 11444 \ +11808 13461 15687 18885 23827 31578 41999 49556 58084 59415 59456 55698 44525 28219 17680 12881 \ +9518 7415 6155 5246 4646 3978 3612 3299 3020 2761 2670 2472 2500 2310 2286 2106 \ +1989 1890 1782 1655 1421 1293 1135 990 879 757 672 618 532 488 445 424 \ +414 373 351 325 307 284 270 247 228 213 199 187 183 176 164 156 \ +153 140 142 130 118 118 103 101 97 86 90 86 87 81 75 82 \ +80 76 77 75 76 77 62 69 74 60 65 68 65 58 63 64 \ +63 59 60 56 57 60 55 + +#S 2 31oct98.dat 22.1 If4 +#D Thu Jul 7 08:40:19 2016 +#C no data cols, one mca analyser, multiple spectra +#@MCA %16C +#@CHANN 3 0 2 1 +#@CALIB 1 2 3 +#@CTIME 123.4 234.5 345.6 +@A 0 1 2 +@A 10 9 8 +@A 1 1 1.1 +@A 3.1 4 5 +@A 7 6 5 +@A 1 1 1 +@A 6 7.7 8 +@A 4 3 2 +@A 1 1 1 + +#S 3 31oct98.dat 22.1 If4 +#D Thu Jul 7 08:40:19 2016 +#C no data cols, 3 mca analysers, multiple spectra +#@MCADEV 1 +#@MCA %16C +#@CHANN 3 0 2 1 +#@CALIB 1 2 3 +#@CTIME 123.4 234.5 345.6 +#@MCADEV 2 +#@MCA %16C +#@CHANN 3 0 2 1 +#@CALIB 1 2 3 +#@CTIME 123.4 234.5 345.6 +#@MCADEV 3 +#@MCA %16C +#@CHANN 3 0 2 1 +#@CALIB 1 2 3 +#@CTIME 123.4 234.5 345.6 +@A 0 1 2 +@A 10 9 8 +@A 1 1 1.1 +@A 3.1 4 5 +@A 7 6 5 +@A 1 1 1 +@A 6 7.7 8 +@A 4 3 2 +@A 1 1 1 +""" + + +class TestSpecH5NoDataCols(unittest.TestCase): + """Test reading SPEC files with only MCA data""" + + @classmethod + def setUpClass(cls): + fd, cls.fname = tempfile.mkstemp() + os.write(fd, bytes(sftext_no_cols, "ascii")) + os.close(fd) + + @classmethod + def tearDownClass(cls): + os.unlink(cls.fname) + + def setUp(self): + self.sfh5 = SpecH5(self.fname) + + def tearDown(self): + self.sfh5.close() + + def testScan1(self): + # 1.1: single analyser, single spectrum, 151 channels + self.assertIn("mca_0", self.sfh5["1.1/instrument/"]) + self.assertEqual(self.sfh5["1.1/instrument/mca_0/data"].shape, (1, 151)) + self.assertNotIn("mca_1", self.sfh5["1.1/instrument/"]) + + def testScan2(self): + # 2.1: single analyser, 9 spectra, 3 channels + self.assertIn("mca_0", self.sfh5["2.1/instrument/"]) + self.assertEqual(self.sfh5["2.1/instrument/mca_0/data"].shape, (9, 3)) + self.assertNotIn("mca_1", self.sfh5["2.1/instrument/"]) + + def testScan3(self): + # 3.1: 3 analysers, 3 spectra/analyser, 3 channels + for i in range(3): + self.assertIn("mca_%d" % i, self.sfh5["3.1/instrument/"]) + self.assertEqual(self.sfh5["3.1/instrument/mca_%d/data" % i].shape, (3, 3)) + + self.assertNotIn("mca_3", self.sfh5["3.1/instrument/"]) + + +sf_text_slash = r"""#F /data/id09/archive/logspecfiles/laue/2016/scan_231_laue_16-11-29.dat +#D Sat Dec 10 22:20:59 2016 +#O0 Pslit/HGap MRTSlit%UP + +#S 1 laue_16-11-29.log 231.1 PD3/A +#D Sat Dec 10 22:20:59 2016 +#P0 180.005 -0.66875 +#N 2 +#L GONY/mm PD3%A +-2.015 5.250424e-05 +-2.01 5.30798e-05 +-2.005 5.281903e-05 +-2 5.220436e-05 +""" + + +class TestSpecH5SlashInLabels(unittest.TestCase): + """Test reading SPEC files with labels containing a / character + + The / character must be substituted with a % + """ + + @classmethod + def setUpClass(cls): + fd, cls.fname = tempfile.mkstemp() + os.write(fd, bytes(sf_text_slash, "ascii")) + os.close(fd) + + @classmethod + def tearDownClass(cls): + os.unlink(cls.fname) + + def setUp(self): + self.sfh5 = SpecH5(self.fname) + + def tearDown(self): + self.sfh5.close() + + def testLabels(self): + """Ensure `/` is substituted with `%` and + ensure legitimate `%` in names are still working""" + self.assertEqual( + list(self.sfh5["1.1/measurement/"].keys()), ["GONY%mm", "PD3%A"] + ) + + # substituted "%" + self.assertIn("GONY%mm", self.sfh5["1.1/measurement/"]) + self.assertNotIn("GONY/mm", self.sfh5["1.1/measurement/"]) + self.assertAlmostEqual( + self.sfh5["1.1/measurement/GONY%mm"][0], -2.015, places=4 + ) + # legitimate "%" + self.assertIn("PD3%A", self.sfh5["1.1/measurement/"]) + + def testMotors(self): + """Ensure `/` is substituted with `%` and + ensure legitimate `%` in names are still working""" + self.assertEqual( + list(self.sfh5["1.1/instrument/positioners"].keys()), + ["Pslit%HGap", "MRTSlit%UP"], + ) + # substituted "%" + self.assertIn("Pslit%HGap", self.sfh5["1.1/instrument/positioners"]) + self.assertNotIn("Pslit/HGap", self.sfh5["1.1/instrument/positioners"]) + self.assertAlmostEqual( + self.sfh5["1.1/instrument/positioners/Pslit%HGap"], 180.005, places=4 + ) + # legitimate "%" + self.assertIn("MRTSlit%UP", self.sfh5["1.1/instrument/positioners"]) + + +def testUnitCellUBMatrix(tmp_path): + """Test unit cell (#G1) and UB matrix (#G3)""" + file_path = tmp_path / "spec.dat" + file_path.write_bytes( + bytes( + """ +#S 1 OK +#G1 0 1 2 3 4 5 +#G3 0 1 2 3 4 5 6 7 8 +""", + encoding="ascii", + ) + ) + with SpecH5(str(file_path)) as spech5: + assert numpy.array_equal( + spech5["/1.1/sample/ub_matrix"], numpy.arange(9).reshape(1, 3, 3) + ) + assert numpy.array_equal(spech5["/1.1/sample/unit_cell"], [[0, 1, 2, 3, 4, 5]]) + assert numpy.array_equal(spech5["/1.1/sample/unit_cell_abc"], [0, 1, 2]) + assert numpy.array_equal( + spech5["/1.1/sample/unit_cell_alphabetagamma"], [3, 4, 5] + ) + + +def testMalformedUnitCellUBMatrix(tmp_path): + """Test malformed unit cell (#G1) and UB matrix (#G3): 1 value""" + file_path = tmp_path / "spec.dat" + file_path.write_bytes( + bytes( + """ +#S 1 all malformed=0 +#G1 0 +#G3 0 +""", + encoding="ascii", + ) + ) + with SpecH5(str(file_path)) as spech5: + assert "sample" not in spech5["1.1"] + + +def testMalformedUBMatrix(tmp_path): + """Test malformed UB matrix (#G3): all zeros""" + file_path = tmp_path / "spec.dat" + file_path.write_bytes( + bytes( + """ +#S 1 G3 all 0 +#G1 0 1 2 3 4 5 +#G3 0 0 0 0 0 0 0 0 0 +""", + encoding="ascii", + ) + ) + with SpecH5(str(file_path)) as spech5: + assert "ub_matrix" not in spech5["/1.1/sample"] + assert numpy.array_equal(spech5["/1.1/sample/unit_cell"], [[0, 1, 2, 3, 4, 5]]) + assert numpy.array_equal(spech5["/1.1/sample/unit_cell_abc"], [0, 1, 2]) + assert numpy.array_equal( + spech5["/1.1/sample/unit_cell_alphabetagamma"], [3, 4, 5] + ) + + +def testMalformedUnitCell(tmp_path): + """Test malformed unit cell (#G1): missing values""" + file_path = tmp_path / "spec.dat" + file_path.write_bytes( + bytes( + """ +#S 1 G1 malformed missing values +#G1 0 1 2 +#G3 0 1 2 3 4 5 6 7 8 +""", + encoding="ascii", + ) + ) + with SpecH5(str(file_path)) as spech5: + assert "unit_cell" not in spech5["/1.1/sample"] + assert "unit_cell_abc" not in spech5["/1.1/sample"] + assert "unit_cell_alphabetagamma" not in spech5["/1.1/sample"] + assert numpy.array_equal( + spech5["/1.1/sample/ub_matrix"], numpy.arange(9).reshape(1, 3, 3) + ) diff --git a/src/silx/io/test/test_spectoh5.py b/src/silx/io/test/test_spectoh5.py new file mode 100644 index 0000000..a3426ea --- /dev/null +++ b/src/silx/io/test/test_spectoh5.py @@ -0,0 +1,186 @@ +# /*########################################################################## +# Copyright (C) 2016-2019 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for SpecFile to HDF5 converter""" + +from numpy import array_equal +import os +import tempfile +import unittest + +import h5py + +from ..spech5 import SpecH5, SpecH5Group +from ..convert import convert, write_to_h5 +from ..utils import h5py_read_dataset + +__authors__ = ["P. Knobel"] +__license__ = "MIT" +__date__ = "12/02/2018" + + +sfdata = b"""#F /tmp/sf.dat +#E 1455180875 +#D Thu Feb 11 09:54:35 2016 +#C imaging User = opid17 +#O0 Pslit HGap MRTSlit UP MRTSlit DOWN +#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap +#o0 pshg mrtu mrtd +#o2 ss1vo ss1ho ss1vg + +#J0 Seconds IA ion.mono Current +#J1 xbpmc2 idgap1 Inorm + +#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2 +#D Thu Feb 11 09:55:20 2016 +#T 0.2 (Seconds) +#P0 180.005 -0.66875 0.87125 +#P1 14.74255 16.197579 12.238283 +#N 4 +#L MRTSlit UP second column 3rd_col +-1.23 5.89 8 +8.478100E+01 5 1.56 +3.14 2.73 -3.14 +1.2 2.3 3.4 + +#S 1 aaaaaa +#D Thu Feb 11 10:00:32 2016 +#@MCADEV 1 +#@MCA %16C +#@CHANN 3 0 2 1 +#@CALIB 1 2 3 +#N 3 +#L uno duo +1 2 +@A 0 1 2 +@A 10 9 8 +3 4 +@A 3.1 4 5 +@A 7 6 5 +5 6 +@A 6 7.7 8 +@A 4 3 2 +""" + + +class TestConvertSpecHDF5(unittest.TestCase): + @classmethod + def setUpClass(cls): + fd, cls.spec_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5") + os.write(fd, sfdata) + os.close(fd) + + fd, cls.h5_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5") + # Close and delete (we just need the name) + os.close(fd) + os.unlink(cls.h5_fname) + + @classmethod + def tearDownClass(cls): + os.unlink(cls.spec_fname) + + def setUp(self): + convert(self.spec_fname, self.h5_fname) + + self.sfh5 = SpecH5(self.spec_fname) + self.h5f = h5py.File(self.h5_fname, "a") + + def tearDown(self): + self.h5f.close() + self.sfh5.close() + os.unlink(self.h5_fname) + + def testAppendToHDF5(self): + write_to_h5(self.sfh5, self.h5f, h5path="/foo/bar/spam") + self.assertTrue( + array_equal( + self.h5f["/1.2/measurement/mca_1/data"], + self.h5f["/foo/bar/spam/1.2/measurement/mca_1/data"], + ) + ) + + def testWriteSpecH5Group(self): + """Test passing a SpecH5Group as parameter, instead of a Spec filename + or a SpecH5.""" + g = self.sfh5["1.1/instrument"] + self.assertIsInstance(g, SpecH5Group) # let's be paranoid + write_to_h5(g, self.h5f, h5path="my instruments") + + self.assertAlmostEqual( + self.h5f["my instruments/positioners/Sslit1 HOff"][tuple()], + 16.197579, + places=4, + ) + + def testTitle(self): + """Test the value of a dataset""" + title12 = h5py_read_dataset(self.h5f["/1.2/title"]) + self.assertEqual(title12, "aaaaaa") + + def testAttrs(self): + # Test root group (file) attributes + self.assertEqual(self.h5f.attrs["NX_class"], "NXroot") + # Test dataset attributes + ds = self.h5f["/1.2/instrument/mca_1/data"] + self.assertTrue("interpretation" in ds.attrs) + self.assertEqual(list(ds.attrs.values()), ["spectrum"]) + # Test group attributes + grp = self.h5f["1.1"] + self.assertEqual(grp.attrs["NX_class"], "NXentry") + self.assertEqual(len(list(grp.attrs.keys())), 1) + + def testHdf5HasSameMembers(self): + spec_member_list = [] + + def append_spec_members(name): + spec_member_list.append(name) + + self.sfh5.visit(append_spec_members) + + hdf5_member_list = [] + + def append_hdf5_members(name): + hdf5_member_list.append(name) + + self.h5f.visit(append_hdf5_members) + + # 1. For some reason, h5py visit method doesn't include the leading + # "/" character when it passes the member name to the function, + # even though an explicit the .name attribute of a member will + # have a leading "/" + spec_member_list = [m.lstrip("/") for m in spec_member_list] + + self.assertEqual(set(hdf5_member_list), set(spec_member_list)) + + def testLinks(self): + self.assertTrue( + array_equal( + self.sfh5["/1.2/measurement/mca_0/data"], + self.h5f["/1.2/measurement/mca_0/data"], + ) + ) + self.assertTrue( + array_equal( + self.h5f["/1.2/instrument/mca_1/channels"], + self.h5f["/1.2/measurement/mca_1/info/channels"], + ) + ) diff --git a/src/silx/io/test/test_url.py b/src/silx/io/test/test_url.py new file mode 100644 index 0000000..61f9883 --- /dev/null +++ b/src/silx/io/test/test_url.py @@ -0,0 +1,300 @@ +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for url module""" + +__authors__ = ["V. Valls"] +__license__ = "MIT" +__date__ = "29/01/2018" + + +import pytest +from ..url import DataUrl + + +def assert_url(url, expected): + assert url.is_valid() == expected[0] + assert url.is_absolute() == expected[1] + assert url.scheme() == expected[2] + assert url.file_path() == expected[3] + assert url.data_path() == expected[4] + assert url.data_slice() == expected[5] + + +def test_fabio_absolute(): + url = DataUrl("fabio:///data/image.edf?slice=2") + expected = [True, True, "fabio", "/data/image.edf", None, (2,)] + assert_url(url, expected) + + +def test_fabio_absolute_windows(): + url = DataUrl("fabio:///C:/data/image.edf?slice=2") + expected = [True, True, "fabio", "C:/data/image.edf", None, (2,)] + assert_url(url, expected) + + +def test_silx_absolute(): + url = DataUrl("silx:///data/image.h5?path=/data/dataset&slice=1,5") + expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)] + assert_url(url, expected) + + +def test_commandline_shell_separator(): + url = DataUrl("silx:///data/image.h5::path=/data/dataset&slice=1,5") + expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)] + assert_url(url, expected) + + +def test_silx_absolute2(): + url = DataUrl("silx:///data/image.edf?/scan_0/detector/data") + expected = [True, True, "silx", "/data/image.edf", "/scan_0/detector/data", None] + assert_url(url, expected) + + +def test_silx_absolute_windows(): + url = DataUrl("silx:///C:/data/image.h5?/scan_0/detector/data") + expected = [True, True, "silx", "C:/data/image.h5", "/scan_0/detector/data", None] + assert_url(url, expected) + + +def test_silx_relative(): + url = DataUrl("silx:./image.h5") + expected = [True, False, "silx", "./image.h5", None, None] + assert_url(url, expected) + + +def test_fabio_relative(): + url = DataUrl("fabio:./image.edf") + expected = [True, False, "fabio", "./image.edf", None, None] + assert_url(url, expected) + + +def test_silx_relative2(): + url = DataUrl("silx:image.h5") + expected = [True, False, "silx", "image.h5", None, None] + assert_url(url, expected) + + +def test_fabio_relative2(): + url = DataUrl("fabio:image.edf") + expected = [True, False, "fabio", "image.edf", None, None] + assert_url(url, expected) + + +def test_file_relative(): + url = DataUrl("image.edf") + expected = [True, False, None, "image.edf", None, None] + assert_url(url, expected) + + +def test_file_relative2(): + url = DataUrl("./foo/bar/image.edf") + expected = [True, False, None, "./foo/bar/image.edf", None, None] + assert_url(url, expected) + + +def test_file_relative3(): + url = DataUrl("foo/bar/image.edf") + expected = [True, False, None, "foo/bar/image.edf", None, None] + assert_url(url, expected) + + +def test_file_absolute(): + url = DataUrl("/data/image.edf") + expected = [True, True, None, "/data/image.edf", None, None] + assert_url(url, expected) + + +def test_file_absolute_windows(): + url = DataUrl("C:/data/image.edf") + expected = [True, True, None, "C:/data/image.edf", None, None] + assert_url(url, expected) + + +def test_absolute_with_path(): + url = DataUrl("/foo/foobar.h5?/foo/bar") + expected = [True, True, None, "/foo/foobar.h5", "/foo/bar", None] + assert_url(url, expected) + + +def test_windows_file_data_slice(): + url = DataUrl("C:/foo/foobar.h5?path=/foo/bar&slice=5,1") + expected = [True, True, None, "C:/foo/foobar.h5", "/foo/bar", (5, 1)] + assert_url(url, expected) + + +def test_scheme_file_data_slice(): + url = DataUrl("silx:/foo/foobar.h5?path=/foo/bar&slice=5,1") + expected = [True, True, "silx", "/foo/foobar.h5", "/foo/bar", (5, 1)] + assert_url(url, expected) + + +def test_scheme_windows_file_data_slice(): + url = DataUrl("silx:C:/foo/foobar.h5?path=/foo/bar&slice=5,1") + expected = [True, True, "silx", "C:/foo/foobar.h5", "/foo/bar", (5, 1)] + assert_url(url, expected) + + +def test_empty(): + url = DataUrl("") + expected = [False, False, None, "", None, None] + assert_url(url, expected) + + +def test_unknown_scheme(): + url = DataUrl("foo:/foo/foobar.h5?path=/foo/bar&slice=5,1") + expected = [False, True, "foo", "/foo/foobar.h5", "/foo/bar", (5, 1)] + assert_url(url, expected) + + +def test_slice(): + url = DataUrl("/a.h5?path=/b&slice=5,1") + expected = [True, True, None, "/a.h5", "/b", (5, 1)] + assert_url(url, expected) + + +def test_slice2(): + url = DataUrl("/a.h5?path=/b&slice=2:5") + expected = [True, True, None, "/a.h5", "/b", (slice(2, 5),)] + assert_url(url, expected) + + +def test_slice3(): + url = DataUrl("/a.h5?path=/b&slice=::2") + expected = [True, True, None, "/a.h5", "/b", (slice(None, None, 2),)] + assert_url(url, expected) + + +def test_slice_ellipsis(): + url = DataUrl("/a.h5?path=/b&slice=...") + expected = [True, True, None, "/a.h5", "/b", (Ellipsis,)] + assert_url(url, expected) + + +def test_slice_slicing(): + url = DataUrl("/a.h5?path=/b&slice=:") + expected = [True, True, None, "/a.h5", "/b", (slice(None),)] + assert_url(url, expected) + + +def test_slice_missing_element(): + url = DataUrl("/a.h5?path=/b&slice=5,,1") + expected = [False, True, None, "/a.h5", "/b", None] + assert_url(url, expected) + + +def test_slice_no_elements(): + url = DataUrl("/a.h5?path=/b&slice=") + expected = [False, True, None, "/a.h5", "/b", None] + assert_url(url, expected) + + +def test_create_relative_url(): + url = DataUrl(scheme="silx", file_path="./foo.h5", data_path="/", data_slice=(5, 1)) + assert not url.is_absolute() + url2 = DataUrl(url.path()) + assert url == url2 + + +def test_create_absolute_url(): + url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1)) + url2 = DataUrl(url.path()) + assert url == url2 + + +def test_create_absolute_windows_url(): + url = DataUrl( + scheme="silx", file_path="C:/foo.h5", data_path="/", data_slice=(5, 1) + ) + url2 = DataUrl(url.path()) + assert url == url2 + + +def test_create_slice_url(): + url = DataUrl( + scheme="silx", + file_path="/foo.h5", + data_path="/", + data_slice=(5, 1, Ellipsis, slice(None)), + ) + url2 = DataUrl(url.path()) + assert url == url2 + + +def test_wrong_url(): + url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=(5, 1)) + assert not url.is_valid() + + +@pytest.mark.parametrize( + "data", + [ + (1, "silx:///foo.h5?slice=1"), + ((1,), "silx:///foo.h5?slice=1"), + (slice(None), "silx:///foo.h5?slice=:"), + (slice(1, None), "silx:///foo.h5?slice=1:"), + (slice(None, -2), "silx:///foo.h5?slice=:-2"), + (slice(1, None, 3), "silx:///foo.h5?slice=1::3"), + (slice(None, 2, 3), "silx:///foo.h5?slice=:2:3"), + (slice(None, None, 3), "silx:///foo.h5?slice=::3"), + (slice(1, 2, 3), "silx:///foo.h5?slice=1:2:3"), + ((1, slice(1, 2)), "silx:///foo.h5?slice=1,1:2"), + ], +) +def test_path_creation(data): + """make sure the construction of path succeed and that we can + recreate a DataUrl from a path""" + data_slice, expected_path = data + url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=data_slice) + path = url.path() + DataUrl(path=path) + assert path == expected_path + + +def test_file_path_none(): + """ + make sure a file path can be None + """ + url = DataUrl(scheme="silx", file_path=None, data_path="/path/to/data") + assert url.file_path() is None + assert url.scheme() == "silx" + assert url.data_path() == "/path/to/data" + + +def test_data_path_none(): + """ + make sure a data path can be None + """ + url = DataUrl(scheme="silx", file_path="my_file.hdf5", data_path=None) + assert url.file_path() == "my_file.hdf5" + assert url.scheme() == "silx" + assert url.data_path() is None + + +def test_scheme_none(): + """ + make sure a scheme can be None + """ + url = DataUrl(scheme=None, file_path="my_file.hdf5", data_path="/path/to/data") + assert url.file_path() == "my_file.hdf5" + assert url.scheme() is None + assert url.data_path() == "/path/to/data" diff --git a/src/silx/io/test/test_utils.py b/src/silx/io/test/test_utils.py new file mode 100644 index 0000000..a9c7f6a --- /dev/null +++ b/src/silx/io/test/test_utils.py @@ -0,0 +1,1141 @@ +# /*########################################################################## +# Copyright (C) 2016-2022 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for utils module""" + +import io +import numpy +import os +import re +import shutil +import tempfile +import unittest + +from .. import utils +from ..._version import calc_hexversion +import silx.io.url + +import h5py +from ..utils import h5ls +from silx.io import commonh5 + + +import fabio + +__authors__ = ["P. Knobel"] +__license__ = "MIT" +__date__ = "03/12/2020" + +expected_spec1 = r"""#F .* +#D .* + +#S 1 Ordinate1 +#D .* +#N 2 +#L Abscissa Ordinate1 +1 4\.00 +2 5\.00 +3 6\.00 +""" + +expected_spec2 = ( + expected_spec1 + + r""" +#S 2 Ordinate2 +#D .* +#N 2 +#L Abscissa Ordinate2 +1 7\.00 +2 8\.00 +3 9\.00 +""" +) + +expected_spec2reg = r"""#F .* +#D .* + +#S 1 Ordinate1 +#D .* +#N 3 +#L Abscissa Ordinate1 Ordinate2 +1 4\.00 7\.00 +2 5\.00 8\.00 +3 6\.00 9\.00 +""" + +expected_spec2irr = ( + expected_spec1 + + r""" +#S 2 Ordinate2 +#D .* +#N 2 +#L Abscissa Ordinate2 +1 7\.00 +2 8\.00 +""" +) + +expected_csv = r"""Abscissa;Ordinate1;Ordinate2 +1;4\.00;7\.00e\+00 +2;5\.00;8\.00e\+00 +3;6\.00;9\.00e\+00 +""" + +expected_csv2 = r"""x;y0;y1 +1;4\.00;7\.00e\+00 +2;5\.00;8\.00e\+00 +3;6\.00;9\.00e\+00 +""" + + +class TestSave(unittest.TestCase): + """Test saving curves as SpecFile:""" + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.spec_fname = os.path.join(self.tempdir, "savespec.dat") + self.csv_fname = os.path.join(self.tempdir, "savecsv.csv") + self.npy_fname = os.path.join(self.tempdir, "savenpy.npy") + + self.x = [1, 2, 3] + self.xlab = "Abscissa" + self.y = [[4, 5, 6], [7, 8, 9]] + self.y_irr = [[4, 5, 6], [7, 8]] + self.ylabs = ["Ordinate1", "Ordinate2"] + + def tearDown(self): + if os.path.isfile(self.spec_fname): + os.unlink(self.spec_fname) + if os.path.isfile(self.csv_fname): + os.unlink(self.csv_fname) + if os.path.isfile(self.npy_fname): + os.unlink(self.npy_fname) + shutil.rmtree(self.tempdir) + + def test_save_csv(self): + utils.save1D( + self.csv_fname, + self.x, + self.y, + xlabel=self.xlab, + ylabels=self.ylabs, + filetype="csv", + fmt=["%d", "%.2f", "%.2e"], + csvdelim=";", + autoheader=True, + ) + + csvf = open(self.csv_fname) + actual_csv = csvf.read() + csvf.close() + + self.assertRegex(actual_csv, expected_csv) + + def test_save_npy(self): + """npy file is saved with numpy.save after building a numpy array + and converting it to a named record array""" + npyf = open(self.npy_fname, "wb") + utils.save1D(npyf, self.x, self.y, xlabel=self.xlab, ylabels=self.ylabs) + npyf.close() + + npy_recarray = numpy.load(self.npy_fname) + + self.assertEqual(npy_recarray.shape, (3,)) + self.assertTrue( + numpy.array_equal(npy_recarray["Ordinate1"], numpy.array((4, 5, 6))) + ) + + def test_savespec_filename(self): + """Save SpecFile using savespec()""" + utils.savespec( + self.spec_fname, + self.x, + self.y[0], + xlabel=self.xlab, + ylabel=self.ylabs[0], + fmt=["%d", "%.2f"], + close_file=True, + scan_number=1, + ) + + specf = open(self.spec_fname) + actual_spec = specf.read() + specf.close() + self.assertRegex(actual_spec, expected_spec1) + + def test_savespec_file_handle(self): + """Save SpecFile using savespec(), passing a file handle""" + # first savespec: open, write file header, save y[0] as scan 1, + # return file handle + specf = utils.savespec( + self.spec_fname, + self.x, + self.y[0], + xlabel=self.xlab, + ylabel=self.ylabs[0], + fmt=["%d", "%.2f"], + close_file=False, + ) + + # second savespec: save y[1] as scan 2, close file + utils.savespec( + specf, + self.x, + self.y[1], + xlabel=self.xlab, + ylabel=self.ylabs[1], + fmt=["%d", "%.2f"], + write_file_header=False, + close_file=True, + scan_number=2, + ) + + specf = open(self.spec_fname) + actual_spec = specf.read() + specf.close() + self.assertRegex(actual_spec, expected_spec2) + + def test_save_spec_reg(self): + """Save SpecFile using save() on a regular pattern""" + utils.save1D( + self.spec_fname, + self.x, + self.y, + xlabel=self.xlab, + ylabels=self.ylabs, + filetype="spec", + fmt=["%d", "%.2f"], + ) + + specf = open(self.spec_fname) + actual_spec = specf.read() + specf.close() + + self.assertRegex(actual_spec, expected_spec2reg) + + def test_save_spec_irr(self): + """Save SpecFile using save() on an irregular pattern""" + # invalid test case ?! + return + utils.save1D( + self.spec_fname, + self.x, + self.y_irr, + xlabel=self.xlab, + ylabels=self.ylabs, + filetype="spec", + fmt=["%d", "%.2f"], + ) + + specf = open(self.spec_fname) + actual_spec = specf.read() + specf.close() + self.assertRegex(actual_spec, expected_spec2irr) + + def test_save_csv_no_labels(self): + """Save csv using save(), with autoheader=True but + xlabel=None and ylabels=None + This is a non-regression test for bug #223""" + self.tempdir = tempfile.mkdtemp() + self.spec_fname = os.path.join(self.tempdir, "savespec.dat") + self.csv_fname = os.path.join(self.tempdir, "savecsv.csv") + self.npy_fname = os.path.join(self.tempdir, "savenpy.npy") + + self.x = [1, 2, 3] + self.xlab = "Abscissa" + self.y = [[4, 5, 6], [7, 8, 9]] + self.ylabs = ["Ordinate1", "Ordinate2"] + utils.save1D( + self.csv_fname, self.x, self.y, autoheader=True, fmt=["%d", "%.2f", "%.2e"] + ) + + csvf = open(self.csv_fname) + actual_csv = csvf.read() + csvf.close() + self.assertRegex(actual_csv, expected_csv2) + + +def assert_match_any_string_in_list(test, pattern, list_of_strings): + for string_ in list_of_strings: + if re.match(pattern, string_): + return True + return False + + +class TestH5Ls(unittest.TestCase): + """Test displaying the following HDF5 file structure: + + +foo + +bar + <HDF5 dataset "spam": shape (2, 2), type "<i8"> + <HDF5 dataset "tmp": shape (3,), type "<i8"> + <HDF5 dataset "data": shape (1,), type "<f8"> + + """ + + def assertMatchAnyStringInList(self, pattern, list_of_strings): + for string_ in list_of_strings: + if re.match(pattern, string_): + return None + raise AssertionError( + "regex pattern %s does not match any" % pattern + + " string in list " + + str(list_of_strings) + ) + + def testHdf5(self): + fd, self.h5_fname = tempfile.mkstemp(text=False) + # Close and delete (we just want the name) + os.close(fd) + os.unlink(self.h5_fname) + self.h5f = h5py.File(self.h5_fname, "w") + self.h5f["/foo/bar/tmp"] = [1, 2, 3] + self.h5f["/foo/bar/spam"] = [[1, 2], [3, 4]] + self.h5f["/foo/data"] = [3.14] + self.h5f.close() + + rep = h5ls(self.h5_fname) + lines = rep.split("\n") + + self.assertIn("+foo", lines) + self.assertIn("\t+bar", lines) + + match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "[<>]i[48]">' + self.assertMatchAnyStringInList(match, lines) + match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "[<>]i[48]">' + self.assertMatchAnyStringInList(match, lines) + match = r'\t<HDF5 dataset "data": shape \(1,\), type "[<>]f[48]">' + self.assertMatchAnyStringInList(match, lines) + + os.unlink(self.h5_fname) + + # Following test case disabled d/t errors on AppVeyor: + # os.unlink(spec_fname) + # PermissionError: [WinError 32] The process cannot access the file because + # it is being used by another process: 'C:\\...\\savespec.dat' + + # def testSpec(self): + # tempdir = tempfile.mkdtemp() + # spec_fname = os.path.join(tempdir, "savespec.dat") + # + # x = [1, 2, 3] + # xlab = "Abscissa" + # y = [[4, 5, 6], [7, 8, 9]] + # ylabs = ["Ordinate1", "Ordinate2"] + # utils.save1D(spec_fname, x, y, xlabel=xlab, + # ylabels=ylabs, filetype="spec", + # fmt=["%d", "%.2f"]) + # + # rep = h5ls(spec_fname) + # lines = rep.split("\n") + # self.assertIn("+1.1", lines) + # self.assertIn("\t+instrument", lines) + # + # self.assertMatchAnyStringInList( + # r'\t\t\t<SPEC dataset "file_header": shape \(\), type "|S60">', + # lines) + # self.assertMatchAnyStringInList( + # r'\t\t<SPEC dataset "Ordinate1": shape \(3L?,\), type "<f4">', + # lines) + # + # os.unlink(spec_fname) + # shutil.rmtree(tempdir) + + +class TestOpen(unittest.TestCase): + """Test `silx.io.utils.open` function.""" + + @classmethod + def setUpClass(cls): + cls.tmp_directory = tempfile.mkdtemp() + cls.createResources(cls.tmp_directory) + + @classmethod + def createResources(cls, directory): + cls.h5_filename = os.path.join(directory, "test.h5") + h5 = h5py.File(cls.h5_filename, mode="w") + h5["group/group/dataset"] = 50 + h5.close() + + cls.spec_filename = os.path.join(directory, "test.dat") + utils.savespec( + cls.spec_filename, + [1], + [1.1], + xlabel="x", + ylabel="y", + fmt=["%d", "%.2f"], + close_file=True, + scan_number=1, + ) + + cls.edf_filename = os.path.join(directory, "test.edf") + header = fabio.fabioimage.OrderedDict() + header["integer"] = "10" + data = numpy.array([[10, 50], [50, 10]]) + fabiofile = fabio.edfimage.EdfImage(data, header) + fabiofile.write(cls.edf_filename) + + cls.txt_filename = os.path.join(directory, "test.txt") + f = io.open(cls.txt_filename, "w+t") + f.write("Kikoo") + f.close() + + cls.missing_filename = os.path.join(directory, "test.missing") + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmp_directory) + + def testH5(self): + f = utils.open(self.h5_filename) + self.assertIsNotNone(f) + self.assertIsInstance(f, h5py.File) + f.close() + + def testH5With(self): + with utils.open(self.h5_filename) as f: + self.assertIsNotNone(f) + self.assertIsInstance(f, h5py.File) + + def testH5_withPath(self): + f = utils.open(self.h5_filename + "::/group/group/dataset") + self.assertIsNotNone(f) + self.assertEqual(f.h5py_class, h5py.Dataset) + self.assertEqual(f[()], 50) + f.close() + + def testH5With_withPath(self): + with utils.open(self.h5_filename + "::/group/group") as f: + self.assertIsNotNone(f) + self.assertEqual(f.h5py_class, h5py.Group) + self.assertIn("dataset", f) + + def testSpec(self): + f = utils.open(self.spec_filename) + self.assertIsNotNone(f) + self.assertEqual(f.h5py_class, h5py.File) + f.close() + + def testSpecWith(self): + with utils.open(self.spec_filename) as f: + self.assertIsNotNone(f) + self.assertEqual(f.h5py_class, h5py.File) + + def testEdf(self): + f = utils.open(self.edf_filename) + self.assertIsNotNone(f) + self.assertEqual(f.h5py_class, h5py.File) + f.close() + + def testEdfWith(self): + with utils.open(self.edf_filename) as f: + self.assertIsNotNone(f) + self.assertEqual(f.h5py_class, h5py.File) + + def testUnsupported(self): + self.assertRaises(IOError, utils.open, self.txt_filename) + + def testNotExists(self): + # load it + self.assertRaises(IOError, utils.open, self.missing_filename) + + def test_silx_scheme(self): + url = silx.io.url.DataUrl( + scheme="silx", file_path=self.h5_filename, data_path="/" + ) + with utils.open(url.path()) as f: + self.assertIsNotNone(f) + self.assertTrue(silx.io.utils.is_file(f)) + + def test_fabio_scheme(self): + url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename) + self.assertRaises(IOError, utils.open, url.path()) + + def test_bad_url(self): + url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename) + self.assertRaises(IOError, utils.open, url.path()) + + def test_sliced_url(self): + url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,)) + self.assertRaises(IOError, utils.open, url.path()) + + +class TestNodes(unittest.TestCase): + """Test `silx.io.utils.is_` functions.""" + + def test_real_h5py_objects(self): + name = tempfile.mktemp(suffix=".h5") + try: + with h5py.File(name, "w") as h5file: + h5group = h5file.create_group("arrays") + h5dataset = h5group.create_dataset("scalar", data=10) + + self.assertTrue(utils.is_file(h5file)) + self.assertTrue(utils.is_group(h5file)) + self.assertFalse(utils.is_dataset(h5file)) + + self.assertFalse(utils.is_file(h5group)) + self.assertTrue(utils.is_group(h5group)) + self.assertFalse(utils.is_dataset(h5group)) + + self.assertFalse(utils.is_file(h5dataset)) + self.assertFalse(utils.is_group(h5dataset)) + self.assertTrue(utils.is_dataset(h5dataset)) + finally: + os.unlink(name) + + def test_h5py_like_file(self): + class Foo(object): + def __init__(self): + self.h5_class = utils.H5Type.FILE + + obj = Foo() + self.assertTrue(utils.is_file(obj)) + self.assertTrue(utils.is_group(obj)) + self.assertFalse(utils.is_dataset(obj)) + + def test_h5py_like_group(self): + class Foo(object): + def __init__(self): + self.h5_class = utils.H5Type.GROUP + + obj = Foo() + self.assertFalse(utils.is_file(obj)) + self.assertTrue(utils.is_group(obj)) + self.assertFalse(utils.is_dataset(obj)) + + def test_h5py_like_dataset(self): + class Foo(object): + def __init__(self): + self.h5_class = utils.H5Type.DATASET + + obj = Foo() + self.assertFalse(utils.is_file(obj)) + self.assertFalse(utils.is_group(obj)) + self.assertTrue(utils.is_dataset(obj)) + + def test_bad(self): + class Foo(object): + def __init__(self): + pass + + obj = Foo() + self.assertFalse(utils.is_file(obj)) + self.assertFalse(utils.is_group(obj)) + self.assertFalse(utils.is_dataset(obj)) + + def test_bad_api(self): + class Foo(object): + def __init__(self): + self.h5_class = int + + obj = Foo() + self.assertFalse(utils.is_file(obj)) + self.assertFalse(utils.is_group(obj)) + self.assertFalse(utils.is_dataset(obj)) + + +class TestGetData(unittest.TestCase): + """Test `silx.io.utils.get_data` function.""" + + @classmethod + def setUpClass(cls): + cls.tmp_directory = tempfile.mkdtemp() + cls.createResources(cls.tmp_directory) + + @classmethod + def createResources(cls, directory): + cls.h5_filename = os.path.join(directory, "test.h5") + h5 = h5py.File(cls.h5_filename, mode="w") + h5["group/group/scalar"] = 50 + h5["group/group/array"] = [1, 2, 3, 4, 5] + h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]] + h5.close() + + cls.spec_filename = os.path.join(directory, "test.dat") + utils.savespec( + cls.spec_filename, + [1], + [1.1], + xlabel="x", + ylabel="y", + fmt=["%d", "%.2f"], + close_file=True, + scan_number=1, + ) + + cls.edf_filename = os.path.join(directory, "test.edf") + cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf") + header = fabio.fabioimage.OrderedDict() + header["integer"] = "10" + data = numpy.array([[10, 50], [50, 10]]) + fabiofile = fabio.edfimage.EdfImage(data, header) + fabiofile.write(cls.edf_filename) + fabiofile.append_frame(data=data, header=header) + fabiofile.write(cls.edf_multiframe_filename) + + cls.txt_filename = os.path.join(directory, "test.txt") + f = io.open(cls.txt_filename, "w+t") + f.write("Kikoo") + f.close() + + cls.missing_filename = os.path.join(directory, "test.missing") + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmp_directory) + + def test_hdf5_scalar(self): + url = "silx:%s?/group/group/scalar" % self.h5_filename + data = utils.get_data(url=url) + self.assertEqual(data, 50) + + def test_hdf5_array(self): + url = "silx:%s?/group/group/array" % self.h5_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (5,)) + self.assertEqual(data[0], 1) + + def test_hdf5_array_slice(self): + url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (5,)) + self.assertEqual(data[0], 6) + + def test_hdf5_array_slice_out_of_range(self): + url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename + # ValueError: h5py 2.x + # IndexError: h5py 3.x + self.assertRaises((ValueError, IndexError), utils.get_data, url) + + def test_edf_using_silx(self): + url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_fabio_frame(self): + url = "fabio:%s?slice=1" % self.edf_multiframe_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_fabio_singleframe(self): + url = "fabio:%s?slice=0" % self.edf_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_fabio_too_much_frames(self): + url = "fabio:%s?slice=..." % self.edf_multiframe_filename + self.assertRaises(ValueError, utils.get_data, url) + + def test_fabio_no_frame(self): + url = "fabio:%s" % self.edf_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_unsupported_scheme(self): + url = "foo:/foo/bar" + self.assertRaises(ValueError, utils.get_data, url) + + def test_no_scheme(self): + url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename + self.assertRaises((ValueError, IOError), utils.get_data, url) + + def test_file_not_exists(self): + url = "silx:/foo/bar" + self.assertRaises(IOError, utils.get_data, url) + + +def _h5_py_version_older_than(version): + v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split(".")[:3]] + r_majeur, r_mineur, r_micro = [int(i) for i in version.split(".")] + return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion( + r_majeur, r_mineur, r_micro + ) + + +@unittest.skipUnless(_h5_py_version_older_than("2.9.0"), "h5py version < 2.9.0") +class TestRawFileToH5(unittest.TestCase): + """Test conversion of .vol file to .h5 external dataset""" + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self._vol_file = os.path.join(self.tempdir, "test_vol.vol") + self._file_info = os.path.join(self.tempdir, "test_vol.info.vol") + self._dataset_shape = 100, 20, 5 + data = ( + numpy.random.random( + self._dataset_shape[0] * self._dataset_shape[1] * self._dataset_shape[2] + ) + .astype(dtype=numpy.float32) + .reshape(self._dataset_shape) + ) + numpy.save(file=self._vol_file, arr=data) + # those are storing into .noz file + assert os.path.exists(self._vol_file + ".npy") + os.rename(self._vol_file + ".npy", self._vol_file) + self.h5_file = os.path.join(self.tempdir, "test_h5.h5") + self.external_dataset_path = "/root/my_external_dataset" + self._data_url = silx.io.url.DataUrl( + file_path=self.h5_file, data_path=self.external_dataset_path + ) + with open(self._file_info, "w") as _fi: + _fi.write("NUM_X = %s\n" % self._dataset_shape[2]) + _fi.write("NUM_Y = %s\n" % self._dataset_shape[1]) + _fi.write("NUM_Z = %s\n" % self._dataset_shape[0]) + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def check_dataset(self, h5_file, data_path, shape): + """Make sure the external dataset is valid""" + with h5py.File(h5_file, "r") as _file: + return data_path in _file and _file[data_path].shape == shape + + def test_h5_file_not_existing(self): + """Test that can create a file with external dataset from scratch""" + utils.rawfile_to_h5_external_dataset( + bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + dtype=numpy.float32, + ) + self.assertTrue( + self.check_dataset( + h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape, + ) + ) + os.remove(self.h5_file) + utils.vol_to_h5_external_dataset( + vol_file=self._vol_file, + output_url=self._data_url, + info_file=self._file_info, + ) + self.assertTrue( + self.check_dataset( + h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape, + ) + ) + + def test_h5_file_existing(self): + """Test that can add the external dataset from an existing file""" + with h5py.File(self.h5_file, "w") as _file: + _file["/root/dataset1"] = numpy.zeros((100, 100)) + _file["/root/group/dataset2"] = numpy.ones((100, 100)) + utils.rawfile_to_h5_external_dataset( + bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + dtype=numpy.float32, + ) + self.assertTrue( + self.check_dataset( + h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape, + ) + ) + + def test_vol_file_not_existing(self): + """Make sure error is raised if .vol file does not exists""" + os.remove(self._vol_file) + utils.rawfile_to_h5_external_dataset( + bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + dtype=numpy.float32, + ) + + self.assertTrue( + self.check_dataset( + h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape, + ) + ) + + def test_conflicts(self): + """Test several conflict cases""" + # test if path already exists + utils.rawfile_to_h5_external_dataset( + bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + dtype=numpy.float32, + ) + with self.assertRaises(ValueError): + utils.rawfile_to_h5_external_dataset( + bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + overwrite=False, + dtype=numpy.float32, + ) + + utils.rawfile_to_h5_external_dataset( + bin_file=self._vol_file, + output_url=self._data_url, + shape=(100, 20, 5), + overwrite=True, + dtype=numpy.float32, + ) + + self.assertTrue( + self.check_dataset( + h5_file=self.h5_file, + data_path=self.external_dataset_path, + shape=self._dataset_shape, + ) + ) + + +class TestH5Strings(unittest.TestCase): + """Test HDF5 str and bytes writing and reading""" + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir) + + def setUp(self): + self.file = h5py.File(os.path.join(self.tempdir, "file.h5"), mode="w") + + def tearDown(self): + self.file.close() + + @classmethod + def _make_array(cls, value, n, vlen=True): + if isinstance(value, bytes): + if vlen: + dtype = h5py.special_dtype(vlen=bytes) + else: + if hasattr(h5py, "string_dtype"): + dtype = h5py.string_dtype("ascii", len(value)) + else: + dtype = f"|S{len(value)}" + elif isinstance(value, str): + if vlen: + dtype = h5py.special_dtype(vlen=str) + else: + value = value.encode("utf-8") + if hasattr(h5py, "string_dtype"): + dtype = h5py.string_dtype("utf-8", len(value)) + else: + dtype = f"|S{len(value)}" + else: + dtype = None + return numpy.array([value] * n, dtype=dtype) + + @classmethod + def _get_charset(cls, value): + if isinstance(value, bytes): + return h5py.h5t.CSET_ASCII + elif isinstance(value, str): + return h5py.h5t.CSET_UTF8 + else: + return None + + def _check_dataset(self, value, result=None): + if result is not None: + decode_ascii = True + else: + decode_ascii = False + result = value + + # Write+read scalar + charset = self._get_charset(value) + self.file["data"] = value + data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii) + assert isinstance(data, type(result)), data + assert data == result, data + if charset is not None: + assert self.file["data"].id.get_type().get_cset() == charset + + # Write+read variable length + no_unicode_support = isinstance(value, str) and not hasattr( + h5py, "string_dtype" + ) + if no_unicode_support: + decode_ascii = True + self.file["vlen_data"] = self._make_array(value, 2) + data = utils.h5py_read_dataset( + self.file["vlen_data"], decode_ascii=decode_ascii, index=0 + ) + assert isinstance(data, type(result)), data + assert data == result, data + data = utils.h5py_read_dataset( + self.file["vlen_data"], decode_ascii=decode_ascii + ) + numpy.testing.assert_array_equal(data, [result] * 2) + if charset is not None: + assert self.file["vlen_data"].id.get_type().get_cset() == charset + + self.file["vlen_empty_array"] = self._make_array(value, 0) + data = utils.h5py_read_dataset( + self.file["vlen_empty_array"], decode_ascii=decode_ascii + ) + assert data.shape == (0,) + + # Write+read fixed length + self.file["flen_data"] = self._make_array(value, 2, vlen=False) + data = utils.h5py_read_dataset( + self.file["flen_data"], decode_ascii=decode_ascii, index=0 + ) + assert isinstance(data, type(result)), data + assert data == result, data + data = utils.h5py_read_dataset( + self.file["flen_data"], decode_ascii=decode_ascii + ) + numpy.testing.assert_array_equal(data, [result] * 2) + if charset is not None and not no_unicode_support: + assert self.file["flen_data"].id.get_type().get_cset() == charset + + def _check_attribute(self, value, result=None): + if result is not None: + decode_ascii = True + else: + decode_ascii = False + result = value + + # Write+read scalar + self.file.attrs["data"] = value + data = utils.h5py_read_attribute( + self.file.attrs, "data", decode_ascii=decode_ascii + ) + assert isinstance(data, type(result)), data + assert data == result, data + + # Write+read variable length + no_unicode_support = isinstance(value, str) and not hasattr( + h5py, "string_dtype" + ) + if no_unicode_support: + decode_ascii = True + self.file.attrs["vlen_data"] = self._make_array(value, 2) + data = utils.h5py_read_attribute( + self.file.attrs, "vlen_data", decode_ascii=decode_ascii + ) + assert isinstance(data[0], type(result)), data[0] + assert data[0] == result, data[0] + numpy.testing.assert_array_equal(data, [result] * 2) + + data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)[ + "vlen_data" + ] + assert isinstance(data[0], type(result)), data[0] + assert data[0] == result, data[0] + numpy.testing.assert_array_equal(data, [result] * 2) + + # Write+read fixed length + self.file.attrs["flen_data"] = self._make_array(value, 2, vlen=False) + data = utils.h5py_read_attribute( + self.file.attrs, "flen_data", decode_ascii=decode_ascii + ) + assert isinstance(data[0], type(result)), data[0] + assert data[0] == result, data[0] + numpy.testing.assert_array_equal(data, [result] * 2) + + data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)[ + "flen_data" + ] + assert isinstance(data[0], type(result)), data[0] + assert data[0] == result, data[0] + numpy.testing.assert_array_equal(data, [result] * 2) + + def test_dataset_ascii_bytes(self): + self._check_dataset(b"abc") + + def test_attribute_ascii_bytes(self): + self._check_attribute(b"abc") + + def test_dataset_ascii_bytes_decode(self): + self._check_dataset(b"abc", result="abc") + + def test_attribute_ascii_bytes_decode(self): + self._check_attribute(b"abc", result="abc") + + def test_dataset_ascii_str(self): + self._check_dataset("abc") + + def test_attribute_ascii_str(self): + self._check_attribute("abc") + + def test_dataset_utf8_str(self): + self._check_dataset("\u0101bc") + + def test_attribute_utf8_str(self): + self._check_attribute("\u0101bc") + + def test_dataset_utf8_bytes(self): + # 0xC481 is the byte representation of U+0101 + self._check_dataset(b"\xc4\x81bc") + + def test_attribute_utf8_bytes(self): + # 0xC481 is the byte representation of U+0101 + self._check_attribute(b"\xc4\x81bc") + + def test_dataset_utf8_bytes_decode(self): + # 0xC481 is the byte representation of U+0101 + self._check_dataset(b"\xc4\x81bc", result="\u0101bc") + + def test_attribute_utf8_bytes_decode(self): + # 0xC481 is the byte representation of U+0101 + self._check_attribute(b"\xc4\x81bc", result="\u0101bc") + + def test_dataset_latin1_bytes(self): + # extended ascii character 0xE4 + self._check_dataset(b"\xe423") + + def test_attribute_latin1_bytes(self): + # extended ascii character 0xE4 + self._check_attribute(b"\xe423") + + def test_dataset_latin1_bytes_decode(self): + # U+DCE4: surrogate for extended ascii character 0xE4 + self._check_dataset(b"\xe423", result="\udce423") + + def test_attribute_latin1_bytes_decode(self): + # U+DCE4: surrogate for extended ascii character 0xE4 + self._check_attribute(b"\xe423", result="\udce423") + + def test_dataset_no_string(self): + self._check_dataset(numpy.int64(10)) + + def test_attribute_no_string(self): + self._check_attribute(numpy.int64(10)) + + +def test_visitall_hdf5(tmp_path): + """visit HDF5 file content not following links""" + external_filepath = tmp_path / "external.h5" + with h5py.File(external_filepath, mode="w") as h5file: + h5file["target/dataset"] = 50 + + filepath = tmp_path / "base.h5" + with h5py.File(filepath, mode="w") as h5file: + h5file["group/dataset"] = 50 + h5file["link/soft_link"] = h5py.SoftLink("/group/dataset") + h5file["link/external_link"] = h5py.ExternalLink( + "external.h5", "/target/dataset" + ) + + with h5py.File(filepath, mode="r") as h5file: + visited_items = {} + for path, item in utils.visitall(h5file): + if isinstance(item, h5py.Dataset): + content = item[()] + elif isinstance(item, h5py.Group): + content = None + elif isinstance(item, h5py.SoftLink): + content = item.path + elif isinstance(item, h5py.ExternalLink): + content = item.filename, item.path + else: + raise AssertionError("Item should not be present: %s" % path) + visited_items[path] = (item.__class__, content) + + assert visited_items == { + "/group": (h5py.Group, None), + "/group/dataset": (h5py.Dataset, 50), + "/link": (h5py.Group, None), + "/link/soft_link": (h5py.SoftLink, "/group/dataset"), + "/link/external_link": (h5py.ExternalLink, ("external.h5", "/target/dataset")), + } + + +def test_visitall_commonh5(): + """Visit commonh5 File object""" + fobj = commonh5.File("filename.file", mode="w") + group = fobj.create_group("group") + dataset = group.create_dataset("dataset", data=numpy.array(50)) + group["soft_link"] = dataset # Create softlink + + visited_items = dict(utils.visitall(fobj)) + assert len(visited_items) == 3 + assert visited_items["/group"] is group + assert visited_items["/group/dataset"] is dataset + soft_link = visited_items["/group/soft_link"] + assert isinstance(soft_link, commonh5.SoftLink) + assert soft_link.path == "/group/dataset" + + +def test_match_hdf5(tmp_path): + """Test match function with HDF5 file""" + with h5py.File(tmp_path / "test_match.h5", "w") as h5f: + h5f.create_group("entry_0000/group") + h5f["entry_0000/data"] = 0 + h5f.create_group("entry_0001/group") + h5f["entry_0001/data"] = 1 + h5f.create_group("entry_0002") + h5f["entry_0003"] = 3 + + result = list(utils.match(h5f, "/entry_*/*")) + + assert sorted(result) == [ + "entry_0000/data", + "entry_0000/group", + "entry_0001/data", + "entry_0001/group", + ] + + +def test_match_commonh5(): + """Test match function with commonh5 objects""" + with commonh5.File("filename.file", mode="w") as fobj: + fobj.create_group("entry_0000/group") + fobj["entry_0000/data"] = 0 + fobj.create_group("entry_0001/group") + fobj["entry_0001/data"] = 1 + fobj.create_group("entry_0002") + fobj["entry_0003"] = 3 + + result = list(utils.match(fobj, "/entry_*/*")) + + assert sorted(result) == [ + "entry_0000/data", + "entry_0000/group", + "entry_0001/data", + "entry_0001/group", + ] + + +def test_recursive_match_commonh5(): + """Test match function with commonh5 objects""" + with commonh5.File("filename.file", mode="w") as fobj: + fobj["entry_0000/bar/data"] = 0 + fobj["entry_0001/foo/data"] = 1 + fobj["entry_0001/foo/data1"] = 2 + fobj["entry_0003"] = 3 + + result = list(utils.match(fobj, "**/data")) + assert result == ["entry_0000/bar/data", "entry_0001/foo/data"] diff --git a/src/silx/io/test/test_write_to_h5.py b/src/silx/io/test/test_write_to_h5.py new file mode 100644 index 0000000..b74bf0f --- /dev/null +++ b/src/silx/io/test/test_write_to_h5.py @@ -0,0 +1,120 @@ +# /*########################################################################## +# Copyright (C) 2021 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Test silx.io.convert.write_to_h5""" + + +import h5py +import numpy +from silx.io import spech5 + +from silx.io.convert import write_to_h5 +from silx.io.dictdump import h5todict +from silx.io import commonh5 + + +def test_with_commonh5(tmp_path): + """Test write_to_h5 with commonh5 input""" + fobj = commonh5.File("filename.txt", mode="w") + group = fobj.create_group("group") + dataset = group.create_dataset("dataset", data=numpy.array(50)) + group["soft_link"] = dataset # Create softlink + + output_filepath = tmp_path / "output.h5" + write_to_h5(fobj, str(output_filepath)) + + assert h5todict(str(output_filepath)) == { + "group": {"dataset": numpy.array(50), "soft_link": numpy.array(50)}, + } + with h5py.File(output_filepath, mode="r") as h5file: + soft_link = h5file.get("/group/soft_link", getlink=True) + assert isinstance(soft_link, h5py.SoftLink) + assert soft_link.path == "/group/dataset" + + +def test_with_hdf5(tmp_path): + """Test write_to_h5 with HDF5 file input""" + filepath = tmp_path / "base.h5" + with h5py.File(filepath, mode="w") as h5file: + h5file["group/dataset"] = 50 + h5file["group/soft_link"] = h5py.SoftLink("/group/dataset") + h5file["group/external_link"] = h5py.ExternalLink("base.h5", "/group/dataset") + + output_filepath = tmp_path / "output.h5" + write_to_h5(str(filepath), str(output_filepath)) + assert h5todict(str(output_filepath)) == { + "group": {"dataset": 50, "soft_link": 50}, + } + with h5py.File(output_filepath, mode="r") as h5file: + soft_link = h5file.get("group/soft_link", getlink=True) + assert isinstance(soft_link, h5py.SoftLink) + assert soft_link.path == "/group/dataset" + + +def test_with_spech5(tmp_path): + """Test write_to_h5 with SpecH5 input""" + filepath = tmp_path / "file.spec" + filepath.write_bytes( + bytes( + """#F /tmp/sf.dat + +#S 1 cmd +#L a b +1 2 +""", + encoding="ascii", + ) + ) + + output_filepath = tmp_path / "output.h5" + with spech5.SpecH5(str(filepath)) as spech5file: + write_to_h5(spech5file, str(output_filepath)) + print(h5todict(str(output_filepath))) + + def assert_equal(item1, item2): + if isinstance(item1, dict): + assert tuple(item1.keys()) == tuple(item2.keys()) + for key in item1.keys(): + assert_equal(item1[key], item2[key]) + else: + numpy.array_equal(item1, item2) + + assert_equal( + h5todict(str(output_filepath)), + { + "1.1": { + "instrument": { + "positioners": {}, + "specfile": { + "file_header": ["#F /tmp/sf.dat"], + "scan_header": ["#S 1 cmd", "#L a b"], + }, + }, + "measurement": { + "a": [1.0], + "b": [2.0], + }, + "start_time": "", + "title": "cmd", + }, + }, + ) |