diff options
Diffstat (limited to 'silx/io/test')
-rw-r--r-- | silx/io/test/__init__.py | 61 | ||||
-rw-r--r-- | silx/io/test/test_commonh5.py | 295 | ||||
-rw-r--r-- | silx/io/test/test_dictdump.py | 1025 | ||||
-rwxr-xr-x | silx/io/test/test_fabioh5.py | 629 | ||||
-rw-r--r-- | silx/io/test/test_h5py_utils.py | 397 | ||||
-rw-r--r-- | silx/io/test/test_nxdata.py | 579 | ||||
-rw-r--r-- | silx/io/test/test_octaveh5.py | 165 | ||||
-rw-r--r-- | silx/io/test/test_rawh5.py | 96 | ||||
-rw-r--r-- | silx/io/test/test_specfile.py | 433 | ||||
-rw-r--r-- | silx/io/test/test_specfilewrapper.py | 206 | ||||
-rw-r--r-- | silx/io/test/test_spech5.py | 881 | ||||
-rw-r--r-- | silx/io/test/test_spectoh5.py | 194 | ||||
-rw-r--r-- | silx/io/test/test_url.py | 228 | ||||
-rw-r--r-- | silx/io/test/test_utils.py | 888 |
14 files changed, 0 insertions, 6077 deletions
diff --git a/silx/io/test/__init__.py b/silx/io/test/__init__.py deleted file mode 100644 index 68b6e9b..0000000 --- a/silx/io/test/__init__.py +++ /dev/null @@ -1,61 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2017 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ - -__authors__ = ["T. Vincent", "P. Knobel"] -__license__ = "MIT" -__date__ = "08/12/2017" - -import unittest - -from .test_specfile import suite as test_specfile_suite -from .test_specfilewrapper import suite as test_specfilewrapper_suite -from .test_dictdump import suite as test_dictdump_suite -from .test_spech5 import suite as test_spech5_suite -from .test_spectoh5 import suite as test_spectoh5_suite -from .test_octaveh5 import suite as test_octaveh5_suite -from .test_fabioh5 import suite as test_fabioh5_suite -from .test_utils import suite as test_utils_suite -from .test_nxdata import suite as test_nxdata_suite -from .test_commonh5 import suite as test_commonh5_suite -from .test_rawh5 import suite as test_rawh5_suite -from .test_url import suite as test_url_suite -from .test_h5py_utils import suite as test_h5py_utils_suite - - -def suite(): - test_suite = unittest.TestSuite() - test_suite.addTest(test_dictdump_suite()) - test_suite.addTest(test_specfile_suite()) - test_suite.addTest(test_specfilewrapper_suite()) - test_suite.addTest(test_spech5_suite()) - test_suite.addTest(test_spectoh5_suite()) - test_suite.addTest(test_octaveh5_suite()) - test_suite.addTest(test_utils_suite()) - test_suite.addTest(test_fabioh5_suite()) - test_suite.addTest(test_nxdata_suite()) - test_suite.addTest(test_commonh5_suite()) - test_suite.addTest(test_rawh5_suite()) - test_suite.addTest(test_url_suite()) - test_suite.addTest(test_h5py_utils_suite()) - return test_suite diff --git a/silx/io/test/test_commonh5.py b/silx/io/test/test_commonh5.py deleted file mode 100644 index 168ef34..0000000 --- a/silx/io/test/test_commonh5.py +++ /dev/null @@ -1,295 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2017 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for commonh5 wrapper""" - -__authors__ = ["V. Valls"] -__license__ = "MIT" -__date__ = "21/09/2017" - -import logging -import numpy -import unittest -import tempfile -import shutil - -_logger = logging.getLogger(__name__) - -import silx.io -import silx.io.utils -import h5py - -try: - from .. import commonh5 -except ImportError: - commonh5 = None - - -class TestCommonFeatures(unittest.TestCase): - """Test common features supported by h5py and our implementation.""" - - @classmethod - def createFile(cls): - return None - - @classmethod - def setUpClass(cls): - # Set to None cause create_resource can raise an excpetion - cls.h5 = None - cls.h5 = cls.create_resource() - if cls.h5 is None: - raise unittest.SkipTest("File not created") - - @classmethod - def create_resource(cls): - """Must be implemented""" - return None - - @classmethod - def tearDownClass(cls): - cls.h5 = None - - def test_file(self): - node = self.h5 - self.assertTrue(silx.io.is_file(node)) - self.assertTrue(silx.io.is_group(node)) - self.assertFalse(silx.io.is_dataset(node)) - self.assertEqual(len(node.attrs), 0) - - def test_group(self): - node = self.h5["group"] - self.assertFalse(silx.io.is_file(node)) - self.assertTrue(silx.io.is_group(node)) - self.assertFalse(silx.io.is_dataset(node)) - self.assertEqual(len(node.attrs), 0) - class_ = self.h5.get("group", getclass=True) - classlink = self.h5.get("group", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Group) - self.assertEqual(classlink, h5py.HardLink) - - def test_dataset(self): - node = self.h5["group/dataset"] - self.assertFalse(silx.io.is_file(node)) - self.assertFalse(silx.io.is_group(node)) - self.assertTrue(silx.io.is_dataset(node)) - self.assertEqual(len(node.attrs), 0) - class_ = self.h5.get("group/dataset", getclass=True) - classlink = self.h5.get("group/dataset", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Dataset) - self.assertEqual(classlink, h5py.HardLink) - - def test_soft_link(self): - node = self.h5["link/soft_link"] - self.assertEqual(node.name, "/link/soft_link") - class_ = self.h5.get("link/soft_link", getclass=True) - link = self.h5.get("link/soft_link", getlink=True) - classlink = self.h5.get("link/soft_link", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Dataset) - self.assertTrue(isinstance(link, (h5py.SoftLink, commonh5.SoftLink))) - self.assertTrue(silx.io.utils.is_softlink(link)) - self.assertEqual(classlink, h5py.SoftLink) - - def test_external_link(self): - node = self.h5["link/external_link"] - self.assertEqual(node.name, "/target/dataset") - class_ = self.h5.get("link/external_link", getclass=True) - classlink = self.h5.get("link/external_link", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Dataset) - self.assertEqual(classlink, h5py.ExternalLink) - - def test_external_link_to_link(self): - node = self.h5["link/external_link_to_link"] - self.assertEqual(node.name, "/target/link") - class_ = self.h5.get("link/external_link_to_link", getclass=True) - classlink = self.h5.get("link/external_link_to_link", getlink=True, getclass=True) - self.assertEqual(class_, h5py.Dataset) - self.assertEqual(classlink, h5py.ExternalLink) - - def test_create_groups(self): - c = self.h5.create_group(self.id() + "/a/b/c") - d = c.create_group("/" + self.id() + "/a/b/d") - - self.assertRaises(ValueError, self.h5.create_group, self.id() + "/a/b/d") - self.assertEqual(c.name, "/" + self.id() + "/a/b/c") - self.assertEqual(d.name, "/" + self.id() + "/a/b/d") - - def test_setitem_python_object_dataset(self): - group = self.h5.create_group(self.id()) - group["a"] = 10 - self.assertEqual(group["a"].dtype.kind, "i") - - def test_setitem_numpy_dataset(self): - group = self.h5.create_group(self.id()) - group["a"] = numpy.array([10, 20, 30]) - self.assertEqual(group["a"].dtype.kind, "i") - self.assertEqual(group["a"].shape, (3,)) - - def test_setitem_link(self): - group = self.h5.create_group(self.id()) - group["a"] = 10 - group["b"] = group["a"] - self.assertEqual(group["b"].dtype.kind, "i") - - def test_setitem_dataset_is_sub_group(self): - self.h5[self.id() + "/a"] = 10 - - -class TestCommonFeatures_h5py(TestCommonFeatures): - """Check if h5py is compliant with what we expect.""" - - @classmethod - def create_resource(cls): - cls.tmp_dir = tempfile.mkdtemp() - - externalh5 = h5py.File(cls.tmp_dir + "/external.h5", mode="w") - externalh5["target/dataset"] = 50 - externalh5["target/link"] = h5py.SoftLink("/target/dataset") - externalh5.close() - - h5 = h5py.File(cls.tmp_dir + "/base.h5", mode="w") - h5["group/dataset"] = 50 - h5["link/soft_link"] = h5py.SoftLink("/group/dataset") - h5["link/external_link"] = h5py.ExternalLink("external.h5", "/target/dataset") - h5["link/external_link_to_link"] = h5py.ExternalLink("external.h5", "/target/link") - - return h5 - - @classmethod - def tearDownClass(cls): - super(TestCommonFeatures_h5py, cls).tearDownClass() - if hasattr(cls, "tmp_dir") and cls.tmp_dir is not None: - shutil.rmtree(cls.tmp_dir) - - -class TestCommonFeatures_commonH5(TestCommonFeatures): - """Check if commonh5 is compliant with h5py.""" - - @classmethod - def create_resource(cls): - h5 = commonh5.File("base.h5", "w") - h5.create_group("group").create_dataset("dataset", data=numpy.int32(50)) - - link = h5.create_group("link") - link.add_node(commonh5.SoftLink("soft_link", "/group/dataset")) - - return h5 - - def test_external_link(self): - # not applicable - pass - - def test_external_link_to_link(self): - # not applicable - pass - - -class TestSpecificCommonH5(unittest.TestCase): - """Test specific features from commonh5. - - Test of shared features should be done by TestCommonFeatures.""" - - def setUp(self): - if commonh5 is None: - self.skipTest("silx.io.commonh5 is needed") - - def test_node_attrs(self): - node = commonh5.Node("Foo", attrs={"a": 1}) - self.assertEqual(node.attrs["a"], 1) - node.attrs["b"] = 8 - self.assertEqual(node.attrs["b"], 8) - node.attrs["b"] = 2 - self.assertEqual(node.attrs["b"], 2) - - def test_node_readonly_attrs(self): - f = commonh5.File(name="Foo", mode="r") - node = commonh5.Node("Foo", attrs={"a": 1}) - node.attrs["b"] = 8 - f.add_node(node) - self.assertEqual(node.attrs["b"], 8) - try: - node.attrs["b"] = 1 - self.fail() - except RuntimeError: - pass - - def test_create_dataset(self): - f = commonh5.File(name="Foo", mode="w") - node = f.create_dataset("foo", data=numpy.array([1])) - self.assertIs(node.parent, f) - self.assertIs(f["foo"], node) - - def test_create_group(self): - f = commonh5.File(name="Foo", mode="w") - node = f.create_group("foo") - self.assertIs(node.parent, f) - self.assertIs(f["foo"], node) - - def test_readonly_create_dataset(self): - f = commonh5.File(name="Foo", mode="r") - try: - f.create_dataset("foo", data=numpy.array([1])) - self.fail() - except RuntimeError: - pass - - def test_readonly_create_group(self): - f = commonh5.File(name="Foo", mode="r") - try: - f.create_group("foo") - self.fail() - except RuntimeError: - pass - - def test_create_unicode_dataset(self): - f = commonh5.File(name="Foo", mode="w") - try: - f.create_dataset("foo", data=numpy.array(u"aaaa")) - self.fail() - except TypeError: - pass - - def test_setitem_dataset(self): - self.h5 = commonh5.File(name="Foo", mode="w") - group = self.h5.create_group(self.id()) - group["a"] = commonh5.Dataset(None, data=numpy.array(10)) - self.assertEqual(group["a"].dtype.kind, "i") - - def test_setitem_explicit_link(self): - self.h5 = commonh5.File(name="Foo", mode="w") - group = self.h5.create_group(self.id()) - group["a"] = 10 - group["b"] = commonh5.SoftLink(None, path="/" + self.id() + "/a") - self.assertEqual(group["b"].dtype.kind, "i") - - -def suite(): - loadTests = unittest.defaultTestLoader.loadTestsFromTestCase - test_suite = unittest.TestSuite() - test_suite.addTest(loadTests(TestCommonFeatures_h5py)) - test_suite.addTest(loadTests(TestCommonFeatures_commonH5)) - test_suite.addTest(loadTests(TestSpecificCommonH5)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_dictdump.py b/silx/io/test/test_dictdump.py deleted file mode 100644 index 93c9183..0000000 --- a/silx/io/test/test_dictdump.py +++ /dev/null @@ -1,1025 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2020 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for dicttoh5 module""" - -__authors__ = ["P. Knobel"] -__license__ = "MIT" -__date__ = "17/01/2018" - -from collections import OrderedDict -import numpy -import os -import tempfile -import unittest -import h5py -from copy import deepcopy - -from collections import defaultdict - -from silx.utils.testutils import TestLogging - -from ..configdict import ConfigDict -from .. import dictdump -from ..dictdump import dicttoh5, dicttojson, dump -from ..dictdump import h5todict, load -from ..dictdump import logger as dictdump_logger -from ..utils import is_link -from ..utils import h5py_read_dataset - - -def tree(): - """Tree data structure as a recursive nested dictionary""" - return defaultdict(tree) - - -inhabitants = 160215 - -city_attrs = tree() -city_attrs["Europe"]["France"]["Grenoble"]["area"] = "18.44 km2" -city_attrs["Europe"]["France"]["Grenoble"]["inhabitants"] = inhabitants -city_attrs["Europe"]["France"]["Grenoble"]["coordinates"] = [45.1830, 5.7196] -city_attrs["Europe"]["France"]["Tourcoing"]["area"] - -ext_attrs = tree() -ext_attrs["ext_group"]["dataset"] = 10 -ext_filename = "ext.h5" - -link_attrs = tree() -link_attrs["links"]["group"]["dataset"] = 10 -link_attrs["links"]["group"]["relative_softlink"] = h5py.SoftLink("dataset") -link_attrs["links"]["relative_softlink"] = h5py.SoftLink("group/dataset") -link_attrs["links"]["absolute_softlink"] = h5py.SoftLink("/links/group/dataset") -link_attrs["links"]["external_link"] = h5py.ExternalLink(ext_filename, "/ext_group/dataset") - - -class DictTestCase(unittest.TestCase): - - def assertRecursiveEqual(self, expected, actual, nodes=tuple()): - err_msg = "\n\n Tree nodes: {}".format(nodes) - if isinstance(expected, dict): - self.assertTrue(isinstance(actual, dict), msg=err_msg) - self.assertEqual( - set(expected.keys()), - set(actual.keys()), - msg=err_msg - ) - for k in actual: - self.assertRecursiveEqual( - expected[k], - actual[k], - nodes=nodes + (k,), - ) - return - if isinstance(actual, numpy.ndarray): - actual = actual.tolist() - if isinstance(expected, numpy.ndarray): - expected = expected.tolist() - - self.assertEqual(expected, actual, msg=err_msg) - - -class H5DictTestCase(DictTestCase): - - def _dictRoundTripNormalize(self, treedict): - """Convert the dictionary as expected from a round-trip - treedict -> dicttoh5 -> h5todict -> newtreedict - """ - for key, value in list(treedict.items()): - if isinstance(value, dict): - self._dictRoundTripNormalize(value) - - # Expand treedict[("group", "attr_name")] - # to treedict["group"]["attr_name"] - for key, value in list(treedict.items()): - if not isinstance(key, tuple): - continue - # Put the attribute inside the group - grpname, attr = key - if not grpname: - continue - group = treedict.setdefault(grpname, dict()) - if isinstance(group, dict): - del treedict[key] - group[("", attr)] = value - - def dictRoundTripNormalize(self, treedict): - treedict2 = deepcopy(treedict) - self._dictRoundTripNormalize(treedict2) - return treedict2 - - -class TestDictToH5(H5DictTestCase): - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5") - self.h5_ext_fname = os.path.join(self.tempdir, ext_filename) - - def tearDown(self): - if os.path.exists(self.h5_fname): - os.unlink(self.h5_fname) - if os.path.exists(self.h5_ext_fname): - os.unlink(self.h5_ext_fname) - os.rmdir(self.tempdir) - - def testH5CityAttrs(self): - filters = {'shuffle': True, - 'fletcher32': True} - dicttoh5(city_attrs, self.h5_fname, h5path='/city attributes', - mode="w", create_dataset_args=filters) - - h5f = h5py.File(self.h5_fname, mode='r') - - self.assertIn("Tourcoing/area", h5f["/city attributes/Europe/France"]) - ds = h5f["/city attributes/Europe/France/Grenoble/inhabitants"] - self.assertEqual(ds[...], 160215) - - # filters only apply to datasets that are not scalars (shape != () ) - ds = h5f["/city attributes/Europe/France/Grenoble/coordinates"] - #self.assertEqual(ds.compression, "gzip") - self.assertTrue(ds.fletcher32) - self.assertTrue(ds.shuffle) - - h5f.close() - - ddict = load(self.h5_fname, fmat="hdf5") - self.assertAlmostEqual( - min(ddict["city attributes"]["Europe"]["France"]["Grenoble"]["coordinates"]), - 5.7196) - - def testH5OverwriteDeprecatedApi(self): - dd = ConfigDict({'t': True}) - - dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a') - dd = ConfigDict({'t': False}) - dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a', - overwrite_data=False) - - res = h5todict(self.h5_fname) - assert(res['t'] == True) - - dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a', - overwrite_data=True) - - res = h5todict(self.h5_fname) - assert(res['t'] == False) - - def testAttributes(self): - """Any kind of attribute can be described""" - ddict = { - "group": {"datatset": "hmmm", ("", "group_attr"): 10}, - "dataset": "aaaaaaaaaaaaaaa", - ("", "root_attr"): 11, - ("dataset", "dataset_attr"): 12, - ("group", "group_attr2"): 13, - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttoh5(ddict, h5file) - self.assertEqual(h5file["group"].attrs['group_attr'], 10) - self.assertEqual(h5file.attrs['root_attr'], 11) - self.assertEqual(h5file["dataset"].attrs['dataset_attr'], 12) - self.assertEqual(h5file["group"].attrs['group_attr2'], 13) - - def testPathAttributes(self): - """A group is requested at a path""" - ddict = { - ("", "NX_class"): 'NXcollection', - } - with h5py.File(self.h5_fname, "w") as h5file: - # This should not warn - with TestLogging(dictdump_logger, warning=0): - dictdump.dicttoh5(ddict, h5file, h5path="foo/bar") - - def testKeyOrder(self): - ddict1 = { - "d": "plow", - ("d", "a"): "ox", - } - ddict2 = { - ("d", "a"): "ox", - "d": "plow", - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttoh5(ddict1, h5file, h5path="g1") - dictdump.dicttoh5(ddict2, h5file, h5path="g2") - self.assertEqual(h5file["g1/d"].attrs['a'], "ox") - self.assertEqual(h5file["g2/d"].attrs['a'], "ox") - - def testAttributeValues(self): - """Any NX data types can be used""" - ddict = { - ("", "bool"): True, - ("", "int"): 11, - ("", "float"): 1.1, - ("", "str"): "a", - ("", "boollist"): [True, False, True], - ("", "intlist"): [11, 22, 33], - ("", "floatlist"): [1.1, 2.2, 3.3], - ("", "strlist"): ["a", "bb", "ccc"], - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttoh5(ddict, h5file) - for k, expected in ddict.items(): - result = h5file.attrs[k[1]] - if isinstance(expected, list): - if isinstance(expected[0], str): - numpy.testing.assert_array_equal(result, expected) - else: - numpy.testing.assert_array_almost_equal(result, expected) - else: - self.assertEqual(result, expected) - - def testAttributeAlreadyExists(self): - """A duplicated attribute warns if overwriting is not enabled""" - ddict = { - "group": {"dataset": "hmmm", ("", "attr"): 10}, - ("group", "attr"): 10, - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttoh5(ddict, h5file) - self.assertEqual(h5file["group"].attrs['attr'], 10) - - def testFlatDict(self): - """Description of a tree with a single level of keys""" - ddict = { - "group/group/dataset": 10, - ("group/group/dataset", "attr"): 11, - ("group/group", "attr"): 12, - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttoh5(ddict, h5file) - self.assertEqual(h5file["group/group/dataset"][()], 10) - self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11) - self.assertEqual(h5file["group/group"].attrs['attr'], 12) - - def testLinks(self): - with h5py.File(self.h5_ext_fname, "w") as h5file: - dictdump.dicttoh5(ext_attrs, h5file) - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttoh5(link_attrs, h5file) - with h5py.File(self.h5_fname, "r") as h5file: - self.assertEqual(h5file["links/group/dataset"][()], 10) - self.assertEqual(h5file["links/group/relative_softlink"][()], 10) - self.assertEqual(h5file["links/relative_softlink"][()], 10) - self.assertEqual(h5file["links/absolute_softlink"][()], 10) - self.assertEqual(h5file["links/external_link"][()], 10) - - def testDumpNumpyArray(self): - ddict = { - 'darks': { - '0': numpy.array([[0, 0, 0], [0, 0, 0]], dtype=numpy.uint16) - } - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttoh5(ddict, h5file) - with h5py.File(self.h5_fname, "r") as h5file: - numpy.testing.assert_array_equal(h5py_read_dataset(h5file["darks"]["0"]), - ddict['darks']['0']) - - def testOverwrite(self): - # Tree structure that will be tested - group1 = { - ("", "attr2"): "original2", - "dset1": 0, - "dset2": [0, 1], - ("dset1", "attr1"): "original1", - ("dset1", "attr2"): "original2", - ("dset2", "attr1"): "original1", - ("dset2", "attr2"): "original2", - } - group2 = { - "subgroup1": group1.copy(), - "subgroup2": group1.copy(), - ("subgroup1", "attr1"): "original1", - ("subgroup2", "attr1"): "original1" - } - group2.update(group1) - # initial HDF5 tree - otreedict = { - ('', 'attr1'): "original1", - ('', 'attr2'): "original2", - 'group1': group1, - 'group2': group2, - ('group1', 'attr1'): "original1", - ('group2', 'attr1'): "original1" - } - wtreedict = None # dumped dictionary - etreedict = None # expected HDF5 tree after dump - - def reset_file(): - dicttoh5( - otreedict, - h5file=self.h5_fname, - mode="w", - ) - - def append_file(update_mode): - dicttoh5( - wtreedict, - h5file=self.h5_fname, - mode="a", - update_mode=update_mode - ) - - def assert_file(): - rtreedict = h5todict( - self.h5_fname, - include_attributes=True, - asarray=False - ) - netreedict = self.dictRoundTripNormalize(etreedict) - try: - self.assertRecursiveEqual(netreedict, rtreedict) - except AssertionError: - from pprint import pprint - print("\nDUMP:") - pprint(wtreedict) - print("\nEXPECTED:") - pprint(netreedict) - print("\nHDF5:") - pprint(rtreedict) - raise - - def assert_append(update_mode): - append_file(update_mode) - assert_file() - - # Test wrong arguments - with self.assertRaises(ValueError): - dicttoh5( - otreedict, - h5file=self.h5_fname, - mode="w", - update_mode="wrong-value" - ) - - # No writing - reset_file() - etreedict = deepcopy(otreedict) - assert_file() - - # Write identical dictionary - wtreedict = deepcopy(otreedict) - - reset_file() - etreedict = deepcopy(otreedict) - for update_mode in [None, "add", "modify", "replace"]: - assert_append(update_mode) - - # Write empty dictionary - wtreedict = dict() - - reset_file() - etreedict = deepcopy(otreedict) - for update_mode in [None, "add", "modify", "replace"]: - assert_append(update_mode) - - # Modified dataset - wtreedict = dict() - wtreedict["group2"] = dict() - wtreedict["group2"]["subgroup2"] = dict() - wtreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]} - wtreedict["group2"]["subgroup2"]["dset2"] = [10, 20] - - reset_file() - etreedict = deepcopy(otreedict) - for update_mode in [None, "add"]: - assert_append(update_mode) - - etreedict["group2"]["subgroup2"]["dset2"] = [10, 20] - assert_append("modify") - - etreedict["group2"] = dict() - del etreedict[("group2", "attr1")] - etreedict["group2"]["subgroup2"] = dict() - etreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]} - etreedict["group2"]["subgroup2"]["dset2"] = [10, 20] - assert_append("replace") - - # Modified group - wtreedict = dict() - wtreedict["group2"] = dict() - wtreedict["group2"]["subgroup2"] = [0, 1] - - reset_file() - etreedict = deepcopy(otreedict) - for update_mode in [None, "add", "modify"]: - assert_append(update_mode) - - etreedict["group2"] = dict() - del etreedict[("group2", "attr1")] - etreedict["group2"]["subgroup2"] = [0, 1] - assert_append("replace") - - # Modified attribute - wtreedict = dict() - wtreedict["group2"] = dict() - wtreedict["group2"]["subgroup2"] = dict() - wtreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified" - - reset_file() - etreedict = deepcopy(otreedict) - for update_mode in [None, "add"]: - assert_append(update_mode) - - etreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified" - assert_append("modify") - - etreedict["group2"] = dict() - del etreedict[("group2", "attr1")] - etreedict["group2"]["subgroup2"] = dict() - etreedict["group2"]["subgroup2"]["dset1"] = dict() - etreedict["group2"]["subgroup2"]["dset1"][("", "attr1")] = "modified" - assert_append("replace") - - # Delete group - wtreedict = dict() - wtreedict["group2"] = dict() - wtreedict["group2"]["subgroup2"] = None - - reset_file() - etreedict = deepcopy(otreedict) - for update_mode in [None, "add"]: - assert_append(update_mode) - - del etreedict["group2"]["subgroup2"] - del etreedict["group2"][("subgroup2", "attr1")] - assert_append("modify") - - etreedict["group2"] = dict() - del etreedict[("group2", "attr1")] - assert_append("replace") - - # Delete dataset - wtreedict = dict() - wtreedict["group2"] = dict() - wtreedict["group2"]["subgroup2"] = dict() - wtreedict["group2"]["subgroup2"]["dset2"] = None - - reset_file() - etreedict = deepcopy(otreedict) - for update_mode in [None, "add"]: - assert_append(update_mode) - - del etreedict["group2"]["subgroup2"]["dset2"] - del etreedict["group2"]["subgroup2"][("dset2", "attr1")] - del etreedict["group2"]["subgroup2"][("dset2", "attr2")] - assert_append("modify") - - etreedict["group2"] = dict() - del etreedict[("group2", "attr1")] - etreedict["group2"]["subgroup2"] = dict() - assert_append("replace") - - # Delete attribute - wtreedict = dict() - wtreedict["group2"] = dict() - wtreedict["group2"]["subgroup2"] = dict() - wtreedict["group2"]["subgroup2"][("dset2", "attr1")] = None - - reset_file() - etreedict = deepcopy(otreedict) - for update_mode in [None, "add"]: - assert_append(update_mode) - - del etreedict["group2"]["subgroup2"][("dset2", "attr1")] - assert_append("modify") - - etreedict["group2"] = dict() - del etreedict[("group2", "attr1")] - etreedict["group2"]["subgroup2"] = dict() - etreedict["group2"]["subgroup2"]["dset2"] = dict() - assert_append("replace") - - -class TestH5ToDict(H5DictTestCase): - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5") - self.h5_ext_fname = os.path.join(self.tempdir, ext_filename) - dicttoh5(city_attrs, self.h5_fname) - dicttoh5(link_attrs, self.h5_fname, mode="a") - dicttoh5(ext_attrs, self.h5_ext_fname) - - def tearDown(self): - if os.path.exists(self.h5_fname): - os.unlink(self.h5_fname) - if os.path.exists(self.h5_ext_fname): - os.unlink(self.h5_ext_fname) - os.rmdir(self.tempdir) - - def testExcludeNames(self): - ddict = h5todict(self.h5_fname, path="/Europe/France", - exclude_names=["ourcoing", "inhab", "toto"]) - self.assertNotIn("Tourcoing", ddict) - self.assertIn("Grenoble", ddict) - - self.assertNotIn("inhabitants", ddict["Grenoble"]) - self.assertIn("coordinates", ddict["Grenoble"]) - self.assertIn("area", ddict["Grenoble"]) - - def testAsArrayTrue(self): - """Test with asarray=True, the default""" - ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble") - self.assertTrue(numpy.array_equal(ddict["inhabitants"], numpy.array(inhabitants))) - - def testAsArrayFalse(self): - """Test with asarray=False""" - ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble", asarray=False) - self.assertEqual(ddict["inhabitants"], inhabitants) - - def testDereferenceLinks(self): - ddict = h5todict(self.h5_fname, path="links", dereference_links=True) - self.assertTrue(ddict["absolute_softlink"], 10) - self.assertTrue(ddict["relative_softlink"], 10) - self.assertTrue(ddict["external_link"], 10) - self.assertTrue(ddict["group"]["relative_softlink"], 10) - - def testPreserveLinks(self): - ddict = h5todict(self.h5_fname, path="links", dereference_links=False) - self.assertTrue(is_link(ddict["absolute_softlink"])) - self.assertTrue(is_link(ddict["relative_softlink"])) - self.assertTrue(is_link(ddict["external_link"])) - self.assertTrue(is_link(ddict["group"]["relative_softlink"])) - - def testStrings(self): - ddict = {"dset_bytes": b"bytes", - "dset_utf8": "utf8", - "dset_2bytes": [b"bytes", b"bytes"], - "dset_2utf8": ["utf8", "utf8"], - ("", "attr_bytes"): b"bytes", - ("", "attr_utf8"): "utf8", - ("", "attr_2bytes"): [b"bytes", b"bytes"], - ("", "attr_2utf8"): ["utf8", "utf8"]} - dicttoh5(ddict, self.h5_fname, mode="w") - adict = h5todict(self.h5_fname, include_attributes=True, asarray=False) - self.assertEqual(ddict["dset_bytes"], adict["dset_bytes"]) - self.assertEqual(ddict["dset_utf8"], adict["dset_utf8"]) - self.assertEqual(ddict[("", "attr_bytes")], adict[("", "attr_bytes")]) - self.assertEqual(ddict[("", "attr_utf8")], adict[("", "attr_utf8")]) - numpy.testing.assert_array_equal(ddict["dset_2bytes"], adict["dset_2bytes"]) - numpy.testing.assert_array_equal(ddict["dset_2utf8"], adict["dset_2utf8"]) - numpy.testing.assert_array_equal(ddict[("", "attr_2bytes")], adict[("", "attr_2bytes")]) - numpy.testing.assert_array_equal(ddict[("", "attr_2utf8")], adict[("", "attr_2utf8")]) - - -class TestDictToNx(H5DictTestCase): - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.h5_fname = os.path.join(self.tempdir, "nx.h5") - self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5") - - def tearDown(self): - if os.path.exists(self.h5_fname): - os.unlink(self.h5_fname) - if os.path.exists(self.h5_ext_fname): - os.unlink(self.h5_ext_fname) - os.rmdir(self.tempdir) - - def testAttributes(self): - """Any kind of attribute can be described""" - ddict = { - "group": {"dataset": 100, "@group_attr1": 10}, - "dataset": 200, - "@root_attr": 11, - "dataset@dataset_attr": "12", - "group@group_attr2": 13, - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttonx(ddict, h5file) - self.assertEqual(h5file["group"].attrs['group_attr1'], 10) - self.assertEqual(h5file.attrs['root_attr'], 11) - self.assertEqual(h5file["dataset"].attrs['dataset_attr'], "12") - self.assertEqual(h5file["group"].attrs['group_attr2'], 13) - - def testKeyOrder(self): - ddict1 = { - "d": "plow", - "d@a": "ox", - } - ddict2 = { - "d@a": "ox", - "d": "plow", - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttonx(ddict1, h5file, h5path="g1") - dictdump.dicttonx(ddict2, h5file, h5path="g2") - self.assertEqual(h5file["g1/d"].attrs['a'], "ox") - self.assertEqual(h5file["g2/d"].attrs['a'], "ox") - - def testAttributeValues(self): - """Any NX data types can be used""" - ddict = { - "@bool": True, - "@int": 11, - "@float": 1.1, - "@str": "a", - "@boollist": [True, False, True], - "@intlist": [11, 22, 33], - "@floatlist": [1.1, 2.2, 3.3], - "@strlist": ["a", "bb", "ccc"], - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttonx(ddict, h5file) - for k, expected in ddict.items(): - result = h5file.attrs[k[1:]] - if isinstance(expected, list): - if isinstance(expected[0], str): - numpy.testing.assert_array_equal(result, expected) - else: - numpy.testing.assert_array_almost_equal(result, expected) - else: - self.assertEqual(result, expected) - - def testFlatDict(self): - """Description of a tree with a single level of keys""" - ddict = { - "group/group/dataset": 10, - "group/group/dataset@attr": 11, - "group/group@attr": 12, - } - with h5py.File(self.h5_fname, "w") as h5file: - dictdump.dicttonx(ddict, h5file) - self.assertEqual(h5file["group/group/dataset"][()], 10) - self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11) - self.assertEqual(h5file["group/group"].attrs['attr'], 12) - - def testLinks(self): - ddict = {"ext_group": {"dataset": 10}} - dictdump.dicttonx(ddict, self.h5_ext_fname) - ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"}, - ">relative_softlink": "group/dataset", - ">absolute_softlink": "/links/group/dataset", - ">external_link": "nx_ext.h5::/ext_group/dataset"}} - dictdump.dicttonx(ddict, self.h5_fname) - with h5py.File(self.h5_fname, "r") as h5file: - self.assertEqual(h5file["links/group/dataset"][()], 10) - self.assertEqual(h5file["links/group/relative_softlink"][()], 10) - self.assertEqual(h5file["links/relative_softlink"][()], 10) - self.assertEqual(h5file["links/absolute_softlink"][()], 10) - self.assertEqual(h5file["links/external_link"][()], 10) - - def testUpLinks(self): - ddict = {"data": {"group": {"dataset": 10, ">relative_softlink": "dataset"}}, - "links": {"group": {"subgroup": {">relative_softlink": "../../../data/group/dataset"}}}} - dictdump.dicttonx(ddict, self.h5_fname) - with h5py.File(self.h5_fname, "r") as h5file: - self.assertEqual(h5file["/links/group/subgroup/relative_softlink"][()], 10) - - def testOverwrite(self): - entry_name = "entry" - wtreedict = { - "group1": {"a": 1, "b": 2}, - "group2@attr3": "attr3", - "group2@attr4": "attr4", - "group2": { - "@attr1": "attr1", - "@attr2": "attr2", - "c": 3, - "d": 4, - "dataset4": 8, - "dataset4@units": "keV", - }, - "group3": {"subgroup": {"e": 9, "f": 10}}, - "dataset1": 5, - "dataset2": 6, - "dataset3": 7, - "dataset3@units": "mm", - } - esubtree = { - "@NX_class": "NXentry", - "group1": {"@NX_class": "NXcollection", "a": 1, "b": 2}, - "group2": { - "@NX_class": "NXcollection", - "@attr1": "attr1", - "@attr2": "attr2", - "@attr3": "attr3", - "@attr4": "attr4", - "c": 3, - "d": 4, - "dataset4": 8, - "dataset4@units": "keV", - }, - "group3": { - "@NX_class": "NXcollection", - "subgroup": {"@NX_class": "NXcollection", "e": 9, "f": 10}, - }, - "dataset1": 5, - "dataset2": 6, - "dataset3": 7, - "dataset3@units": "mm", - } - etreedict = {entry_name: esubtree} - - def append_file(update_mode, add_nx_class): - dictdump.dicttonx( - wtreedict, - h5file=self.h5_fname, - mode="a", - h5path=entry_name, - update_mode=update_mode, - add_nx_class=add_nx_class - ) - - def assert_file(): - rtreedict = dictdump.nxtodict( - self.h5_fname, - include_attributes=True, - asarray=False, - ) - netreedict = self.dictRoundTripNormalize(etreedict) - try: - self.assertRecursiveEqual(netreedict, rtreedict) - except AssertionError: - from pprint import pprint - print("\nDUMP:") - pprint(wtreedict) - print("\nEXPECTED:") - pprint(netreedict) - print("\nHDF5:") - pprint(rtreedict) - raise - - def assert_append(update_mode, add_nx_class=None): - append_file(update_mode, add_nx_class=add_nx_class) - assert_file() - - # First to an empty file - assert_append(None) - - # Add non-existing attributes/datasets/groups - wtreedict["group1"].pop("a") - wtreedict["group2"].pop("@attr1") - wtreedict["group2"]["@attr2"] = "attr3" # only for update - wtreedict["group2"]["@type"] = "test" - wtreedict["group2"]["dataset4"] = 9 # only for update - del wtreedict["group2"]["dataset4@units"] - wtreedict["group3"] = {} - esubtree["group2"]["@type"] = "test" - assert_append("add") - - # Add update existing attributes and datasets - esubtree["group2"]["@attr2"] = "attr3" - esubtree["group2"]["dataset4"] = 9 - assert_append("modify") - - # Do not add missing NX_class by default when updating - wtreedict["group2"]["@NX_class"] = "NXprocess" - esubtree["group2"]["@NX_class"] = "NXprocess" - assert_append("modify") - del wtreedict["group2"]["@NX_class"] - assert_append("modify") - - # Overwrite existing groups/datasets/attributes - esubtree["group1"].pop("a") - esubtree["group2"].pop("@attr1") - esubtree["group2"]["@NX_class"] = "NXcollection" - esubtree["group2"]["dataset4"] = 9 - del esubtree["group2"]["dataset4@units"] - esubtree["group3"] = {"@NX_class": "NXcollection"} - assert_append("replace", add_nx_class=True) - - -class TestNxToDict(H5DictTestCase): - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.h5_fname = os.path.join(self.tempdir, "nx.h5") - self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5") - - def tearDown(self): - if os.path.exists(self.h5_fname): - os.unlink(self.h5_fname) - if os.path.exists(self.h5_ext_fname): - os.unlink(self.h5_ext_fname) - os.rmdir(self.tempdir) - - def testAttributes(self): - """Any kind of attribute can be described""" - ddict = { - "group": {"dataset": 100, "@group_attr1": 10}, - "dataset": 200, - "@root_attr": 11, - "dataset@dataset_attr": "12", - "group@group_attr2": 13, - } - dictdump.dicttonx(ddict, self.h5_fname) - ddict = dictdump.nxtodict(self.h5_fname, include_attributes=True) - self.assertEqual(ddict["group"]["@group_attr1"], 10) - self.assertEqual(ddict["@root_attr"], 11) - self.assertEqual(ddict["dataset@dataset_attr"], "12") - self.assertEqual(ddict["group"]["@group_attr2"], 13) - - def testDereferenceLinks(self): - """Write links and dereference on read""" - ddict = {"ext_group": {"dataset": 10}} - dictdump.dicttonx(ddict, self.h5_ext_fname) - ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"}, - ">relative_softlink": "group/dataset", - ">absolute_softlink": "/links/group/dataset", - ">external_link": "nx_ext.h5::/ext_group/dataset"}} - dictdump.dicttonx(ddict, self.h5_fname) - - ddict = dictdump.h5todict(self.h5_fname, dereference_links=True) - self.assertTrue(ddict["links"]["absolute_softlink"], 10) - self.assertTrue(ddict["links"]["relative_softlink"], 10) - self.assertTrue(ddict["links"]["external_link"], 10) - self.assertTrue(ddict["links"]["group"]["relative_softlink"], 10) - - def testPreserveLinks(self): - """Write/read links""" - ddict = {"ext_group": {"dataset": 10}} - dictdump.dicttonx(ddict, self.h5_ext_fname) - ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"}, - ">relative_softlink": "group/dataset", - ">absolute_softlink": "/links/group/dataset", - ">external_link": "nx_ext.h5::/ext_group/dataset"}} - dictdump.dicttonx(ddict, self.h5_fname) - - ddict = dictdump.nxtodict(self.h5_fname, dereference_links=False) - self.assertTrue(ddict["links"][">absolute_softlink"], "dataset") - self.assertTrue(ddict["links"][">relative_softlink"], "group/dataset") - self.assertTrue(ddict["links"][">external_link"], "/links/group/dataset") - self.assertTrue(ddict["links"]["group"][">relative_softlink"], "nx_ext.h5::/ext_group/datase") - - def testNotExistingPath(self): - """Test converting not existing path""" - with h5py.File(self.h5_fname, 'a') as f: - f['data'] = 1 - - ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='ignore') - self.assertFalse(ddict) - - with TestLogging(dictdump_logger, error=1): - ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='log') - self.assertFalse(ddict) - - with self.assertRaises(KeyError): - h5todict(self.h5_fname, path="/I/am/not/a/path", errors='raise') - - def testBrokenLinks(self): - """Test with broken links""" - with h5py.File(self.h5_fname, 'a') as f: - f["/Mars/BrokenSoftLink"] = h5py.SoftLink("/Idontexists") - f["/Mars/BrokenExternalLink"] = h5py.ExternalLink("notexistingfile.h5", "/Idontexists") - - ddict = h5todict(self.h5_fname, path="/Mars", errors='ignore') - self.assertFalse(ddict) - - with TestLogging(dictdump_logger, error=2): - ddict = h5todict(self.h5_fname, path="/Mars", errors='log') - self.assertFalse(ddict) - - with self.assertRaises(KeyError): - h5todict(self.h5_fname, path="/Mars", errors='raise') - - -class TestDictToJson(DictTestCase): - def setUp(self): - self.dir_path = tempfile.mkdtemp() - self.json_fname = os.path.join(self.dir_path, "cityattrs.json") - - def tearDown(self): - os.unlink(self.json_fname) - os.rmdir(self.dir_path) - - def testJsonCityAttrs(self): - self.json_fname = os.path.join(self.dir_path, "cityattrs.json") - dicttojson(city_attrs, self.json_fname, indent=3) - - with open(self.json_fname, "r") as f: - json_content = f.read() - self.assertIn('"inhabitants": 160215', json_content) - - -class TestDictToIni(DictTestCase): - def setUp(self): - self.dir_path = tempfile.mkdtemp() - self.ini_fname = os.path.join(self.dir_path, "test.ini") - - def tearDown(self): - os.unlink(self.ini_fname) - os.rmdir(self.dir_path) - - def testConfigDictIO(self): - """Ensure values and types of data is preserved when dictionary is - written to file and read back.""" - testdict = { - 'simple_types': { - 'float': 1.0, - 'int': 1, - 'percent string': '5 % is too much', - 'backslash string': 'i can use \\', - 'empty_string': '', - 'nonestring': 'None', - 'nonetype': None, - 'interpstring': 'interpolation: %(percent string)s', - }, - 'containers': { - 'list': [-1, 'string', 3.0, False, None], - 'array': numpy.array([1.0, 2.0, 3.0]), - 'dict': { - 'key1': 'Hello World', - 'key2': 2.0, - } - } - } - - dump(testdict, self.ini_fname) - - #read the data back - readdict = load(self.ini_fname) - - testdictkeys = list(testdict.keys()) - readkeys = list(readdict.keys()) - - self.assertTrue(len(readkeys) == len(testdictkeys), - "Number of read keys not equal") - - self.assertEqual(readdict['simple_types']["interpstring"], - "interpolation: 5 % is too much") - - testdict['simple_types']["interpstring"] = "interpolation: 5 % is too much" - - for key in testdict["simple_types"]: - original = testdict['simple_types'][key] - read = readdict['simple_types'][key] - self.assertEqual(read, original, - "Read <%s> instead of <%s>" % (read, original)) - - for key in testdict["containers"]: - original = testdict["containers"][key] - read = readdict["containers"][key] - if key == 'array': - self.assertEqual(read.all(), original.all(), - "Read <%s> instead of <%s>" % (read, original)) - else: - self.assertEqual(read, original, - "Read <%s> instead of <%s>" % (read, original)) - - def testConfigDictOrder(self): - """Ensure order is preserved when dictionary is - written to file and read back.""" - test_dict = {'banana': 3, 'apple': 4, 'pear': 1, 'orange': 2} - # sort by key - test_ordered_dict1 = OrderedDict(sorted(test_dict.items(), - key=lambda t: t[0])) - # sort by value - test_ordered_dict2 = OrderedDict(sorted(test_dict.items(), - key=lambda t: t[1])) - # add the two ordered dict as sections of a third ordered dict - test_ordered_dict3 = OrderedDict() - test_ordered_dict3["section1"] = test_ordered_dict1 - test_ordered_dict3["section2"] = test_ordered_dict2 - - # write to ini and read back as a ConfigDict (inherits OrderedDict) - dump(test_ordered_dict3, - self.ini_fname, fmat="ini") - read_instance = ConfigDict() - read_instance.read(self.ini_fname) - - # loop through original and read-back dictionaries, - # test identical order for key/value pairs - for orig_key, section in zip(test_ordered_dict3.keys(), - read_instance.keys()): - self.assertEqual(orig_key, section) - for orig_key2, read_key in zip(test_ordered_dict3[section].keys(), - read_instance[section].keys()): - self.assertEqual(orig_key2, read_key) - self.assertEqual(test_ordered_dict3[section][orig_key2], - read_instance[section][read_key]) - - -def suite(): - test_suite = unittest.TestSuite() - loadTests = unittest.defaultTestLoader.loadTestsFromTestCase - test_suite.addTest(loadTests(TestDictToIni)) - test_suite.addTest(loadTests(TestDictToH5)) - test_suite.addTest(loadTests(TestDictToNx)) - test_suite.addTest(loadTests(TestDictToJson)) - test_suite.addTest(loadTests(TestH5ToDict)) - test_suite.addTest(loadTests(TestNxToDict)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_fabioh5.py b/silx/io/test/test_fabioh5.py deleted file mode 100755 index f2c85b1..0000000 --- a/silx/io/test/test_fabioh5.py +++ /dev/null @@ -1,629 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2018 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for fabioh5 wrapper""" - -__authors__ = ["V. Valls"] -__license__ = "MIT" -__date__ = "02/07/2018" - -import os -import logging -import numpy -import unittest -import tempfile -import shutil - -_logger = logging.getLogger(__name__) - -import fabio -import h5py - -from .. import commonh5 -from .. import fabioh5 - - -class TestFabioH5(unittest.TestCase): - - def setUp(self): - - header = { - "integer": "-100", - "float": "1.0", - "string": "hi!", - "list_integer": "100 50 0", - "list_float": "1.0 2.0 3.5", - "string_looks_like_list": "2000 hi!", - } - data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64) - self.fabio_image = fabio.numpyimage.NumpyImage(data, header) - self.h5_image = fabioh5.File(fabio_image=self.fabio_image) - - def test_main_groups(self): - self.assertEqual(self.h5_image.h5py_class, h5py.File) - self.assertEqual(self.h5_image["/"].h5py_class, h5py.File) - self.assertEqual(self.h5_image["/scan_0"].h5py_class, h5py.Group) - self.assertEqual(self.h5_image["/scan_0/instrument"].h5py_class, h5py.Group) - self.assertEqual(self.h5_image["/scan_0/measurement"].h5py_class, h5py.Group) - - def test_wrong_path_syntax(self): - # result tested with a default h5py file - self.assertRaises(ValueError, lambda: self.h5_image[""]) - - def test_wrong_root_name(self): - # result tested with a default h5py file - self.assertRaises(KeyError, lambda: self.h5_image["/foo"]) - - def test_wrong_root_path(self): - # result tested with a default h5py file - self.assertRaises(KeyError, lambda: self.h5_image["/foo/foo"]) - - def test_wrong_name(self): - # result tested with a default h5py file - self.assertRaises(KeyError, lambda: self.h5_image["foo"]) - - def test_wrong_path(self): - # result tested with a default h5py file - self.assertRaises(KeyError, lambda: self.h5_image["foo/foo"]) - - def test_single_frame(self): - data = numpy.arange(2 * 3) - data.shape = 2, 3 - fabio_image = fabio.edfimage.edfimage(data=data) - h5_image = fabioh5.File(fabio_image=fabio_image) - - dataset = h5_image["/scan_0/instrument/detector_0/data"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertTrue(isinstance(dataset[()], numpy.ndarray)) - self.assertEqual(dataset.dtype.kind, "i") - self.assertEqual(dataset.shape, (2, 3)) - self.assertEqual(dataset[...][0, 0], 0) - self.assertEqual(dataset.attrs["interpretation"], "image") - - def test_multi_frames(self): - data = numpy.arange(2 * 3) - data.shape = 2, 3 - fabio_image = fabio.edfimage.edfimage(data=data) - fabio_image.append_frame(data=data) - h5_image = fabioh5.File(fabio_image=fabio_image) - - dataset = h5_image["/scan_0/instrument/detector_0/data"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertTrue(isinstance(dataset[()], numpy.ndarray)) - self.assertEqual(dataset.dtype.kind, "i") - self.assertEqual(dataset.shape, (2, 2, 3)) - self.assertEqual(dataset[...][0, 0, 0], 0) - self.assertEqual(dataset.attrs["interpretation"], "image") - - def test_heterogeneous_frames(self): - """Frames containing 2 images with different sizes and a cube""" - data1 = numpy.arange(2 * 3) - data1.shape = 2, 3 - data2 = numpy.arange(2 * 5) - data2.shape = 2, 5 - data3 = numpy.arange(2 * 5 * 1) - data3.shape = 2, 5, 1 - fabio_image = fabio.edfimage.edfimage(data=data1) - fabio_image.append_frame(data=data2) - fabio_image.append_frame(data=data3) - h5_image = fabioh5.File(fabio_image=fabio_image) - - dataset = h5_image["/scan_0/instrument/detector_0/data"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertTrue(isinstance(dataset[()], numpy.ndarray)) - self.assertEqual(dataset.dtype.kind, "i") - self.assertEqual(dataset.shape, (3, 2, 5, 1)) - self.assertEqual(dataset[...][0, 0, 0], 0) - self.assertEqual(dataset.attrs["interpretation"], "image") - - def test_single_3d_frame(self): - """Image source contains a cube""" - data = numpy.arange(2 * 3 * 4) - data.shape = 2, 3, 4 - # Do not provide the data to the constructor to avoid slicing of the - # data. In this way the result stay a cube, and not a multi-frame - fabio_image = fabio.edfimage.edfimage() - fabio_image.data = data - h5_image = fabioh5.File(fabio_image=fabio_image) - - dataset = h5_image["/scan_0/instrument/detector_0/data"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertTrue(isinstance(dataset[()], numpy.ndarray)) - self.assertEqual(dataset.dtype.kind, "i") - self.assertEqual(dataset.shape, (2, 3, 4)) - self.assertEqual(dataset[...][0, 0, 0], 0) - self.assertEqual(dataset.attrs["interpretation"], "image") - - def test_metadata_int(self): - dataset = self.h5_image["/scan_0/instrument/detector_0/others/integer"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertEqual(dataset[()], -100) - self.assertEqual(dataset.dtype.kind, "i") - self.assertEqual(dataset.shape, (1,)) - - def test_metadata_float(self): - dataset = self.h5_image["/scan_0/instrument/detector_0/others/float"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertEqual(dataset[()], 1.0) - self.assertEqual(dataset.dtype.kind, "f") - self.assertEqual(dataset.shape, (1,)) - - def test_metadata_string(self): - dataset = self.h5_image["/scan_0/instrument/detector_0/others/string"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertEqual(dataset[()], numpy.string_("hi!")) - self.assertEqual(dataset.dtype.type, numpy.string_) - self.assertEqual(dataset.shape, (1,)) - - def test_metadata_list_integer(self): - dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_integer"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertEqual(dataset.dtype.kind, "u") - self.assertEqual(dataset.shape, (1, 3)) - self.assertEqual(dataset[0, 0], 100) - self.assertEqual(dataset[0, 1], 50) - - def test_metadata_list_float(self): - dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_float"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertEqual(dataset.dtype.kind, "f") - self.assertEqual(dataset.shape, (1, 3)) - self.assertEqual(dataset[0, 0], 1.0) - self.assertEqual(dataset[0, 1], 2.0) - - def test_metadata_list_looks_like_list(self): - dataset = self.h5_image["/scan_0/instrument/detector_0/others/string_looks_like_list"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertEqual(dataset[()], numpy.string_("2000 hi!")) - self.assertEqual(dataset.dtype.type, numpy.string_) - self.assertEqual(dataset.shape, (1,)) - - def test_float_32(self): - float_list = [u'1.2', u'1.3', u'1.4'] - data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) - fabio_image = None - for float_item in float_list: - header = {"float_item": float_item} - if fabio_image is None: - fabio_image = fabio.edfimage.EdfImage(data=data, header=header) - else: - fabio_image.append_frame(data=data, header=header) - h5_image = fabioh5.File(fabio_image=fabio_image) - data = h5_image["/scan_0/instrument/detector_0/others/float_item"] - # There is no equality between items - self.assertEqual(len(data), len(set(data))) - # At worst a float32 - self.assertIn(data.dtype.kind, ['d', 'f']) - self.assertLessEqual(data.dtype.itemsize, 32 / 8) - - def test_float_64(self): - float_list = [ - u'1469117129.082226', - u'1469117136.684986', u'1469117144.312749', u'1469117151.892507', - u'1469117159.474265', u'1469117167.100027', u'1469117174.815799', - u'1469117182.437561', u'1469117190.094326', u'1469117197.721089'] - data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) - fabio_image = None - for float_item in float_list: - header = {"time_of_day": float_item} - if fabio_image is None: - fabio_image = fabio.edfimage.EdfImage(data=data, header=header) - else: - fabio_image.append_frame(data=data, header=header) - h5_image = fabioh5.File(fabio_image=fabio_image) - data = h5_image["/scan_0/instrument/detector_0/others/time_of_day"] - # There is no equality between items - self.assertEqual(len(data), len(set(data))) - # At least a float64 - self.assertIn(data.dtype.kind, ['d', 'f']) - self.assertGreaterEqual(data.dtype.itemsize, 64 / 8) - - def test_mixed_float_size__scalar(self): - # We expect to have a precision of 32 bits - float_list = [u'1.2', u'1.3001'] - expected_float_result = [1.2, 1.3001] - data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) - fabio_image = None - for float_item in float_list: - header = {"float_item": float_item} - if fabio_image is None: - fabio_image = fabio.edfimage.EdfImage(data=data, header=header) - else: - fabio_image.append_frame(data=data, header=header) - h5_image = fabioh5.File(fabio_image=fabio_image) - data = h5_image["/scan_0/instrument/detector_0/others/float_item"] - # At worst a float32 - self.assertIn(data.dtype.kind, ['d', 'f']) - self.assertLessEqual(data.dtype.itemsize, 32 / 8) - for computed, expected in zip(data, expected_float_result): - numpy.testing.assert_almost_equal(computed, expected, 5) - - def test_mixed_float_size__list(self): - # We expect to have a precision of 32 bits - float_list = [u'1.2 1.3001'] - expected_float_result = numpy.array([[1.2, 1.3001]]) - data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) - fabio_image = None - for float_item in float_list: - header = {"float_item": float_item} - if fabio_image is None: - fabio_image = fabio.edfimage.EdfImage(data=data, header=header) - else: - fabio_image.append_frame(data=data, header=header) - h5_image = fabioh5.File(fabio_image=fabio_image) - data = h5_image["/scan_0/instrument/detector_0/others/float_item"] - # At worst a float32 - self.assertIn(data.dtype.kind, ['d', 'f']) - self.assertLessEqual(data.dtype.itemsize, 32 / 8) - for computed, expected in zip(data, expected_float_result): - numpy.testing.assert_almost_equal(computed, expected, 5) - - def test_mixed_float_size__list_of_list(self): - # We expect to have a precision of 32 bits - float_list = [u'1.2 1.3001', u'1.3001 1.3001'] - expected_float_result = numpy.array([[1.2, 1.3001], [1.3001, 1.3001]]) - data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) - fabio_image = None - for float_item in float_list: - header = {"float_item": float_item} - if fabio_image is None: - fabio_image = fabio.edfimage.EdfImage(data=data, header=header) - else: - fabio_image.append_frame(data=data, header=header) - h5_image = fabioh5.File(fabio_image=fabio_image) - data = h5_image["/scan_0/instrument/detector_0/others/float_item"] - # At worst a float32 - self.assertIn(data.dtype.kind, ['d', 'f']) - self.assertLessEqual(data.dtype.itemsize, 32 / 8) - for computed, expected in zip(data, expected_float_result): - numpy.testing.assert_almost_equal(computed, expected, 5) - - def test_ub_matrix(self): - """Data from mediapix.edf""" - header = {} - header["UB_mne"] = 'UB0 UB1 UB2 UB3 UB4 UB5 UB6 UB7 UB8' - header["UB_pos"] = '1.99593e-16 2.73682e-16 -1.54 -1.08894 1.08894 1.6083e-16 1.08894 1.08894 9.28619e-17' - header["sample_mne"] = 'U0 U1 U2 U3 U4 U5' - header["sample_pos"] = '4.08 4.08 4.08 90 90 90' - data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) - fabio_image = fabio.edfimage.EdfImage(data=data, header=header) - h5_image = fabioh5.File(fabio_image=fabio_image) - sample = h5_image["/scan_0/sample"] - self.assertIsNotNone(sample) - self.assertEqual(sample.attrs["NXclass"], "NXsample") - - d = sample['unit_cell_abc'] - expected = numpy.array([4.08, 4.08, 4.08]) - self.assertIsNotNone(d) - self.assertEqual(d.shape, (3, )) - self.assertIn(d.dtype.kind, ['d', 'f']) - numpy.testing.assert_array_almost_equal(d[...], expected) - - d = sample['unit_cell_alphabetagamma'] - expected = numpy.array([90.0, 90.0, 90.0]) - self.assertIsNotNone(d) - self.assertEqual(d.shape, (3, )) - self.assertIn(d.dtype.kind, ['d', 'f']) - numpy.testing.assert_array_almost_equal(d[...], expected) - - d = sample['ub_matrix'] - expected = numpy.array([[[1.99593e-16, 2.73682e-16, -1.54], - [-1.08894, 1.08894, 1.6083e-16], - [1.08894, 1.08894, 9.28619e-17]]]) - self.assertIsNotNone(d) - self.assertEqual(d.shape, (1, 3, 3)) - self.assertIn(d.dtype.kind, ['d', 'f']) - numpy.testing.assert_array_almost_equal(d[...], expected) - - def test_interpretation_mca_edf(self): - """EDF files with two or more headers starting with "MCA" - must have @interpretation = "spectrum" an the data.""" - header = { - "Title": "zapimage samy -4.975 -5.095 80 500 samz -4.091 -4.171 70 0", - "MCA a": -23.812, - "MCA b": 2.7107, - "MCA c": 8.1164e-06} - - data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) - fabio_image = fabio.edfimage.EdfImage(data=data, header=header) - h5_image = fabioh5.File(fabio_image=fabio_image) - - data_dataset = h5_image["/scan_0/measurement/image_0/data"] - self.assertEqual(data_dataset.attrs["interpretation"], "spectrum") - - data_dataset = h5_image["/scan_0/instrument/detector_0/data"] - self.assertEqual(data_dataset.attrs["interpretation"], "spectrum") - - data_dataset = h5_image["/scan_0/measurement/image_0/info/data"] - self.assertEqual(data_dataset.attrs["interpretation"], "spectrum") - - def test_get_api(self): - result = self.h5_image.get("scan_0", getclass=True, getlink=True) - self.assertIs(result, h5py.HardLink) - result = self.h5_image.get("scan_0", getclass=False, getlink=True) - self.assertIsInstance(result, h5py.HardLink) - result = self.h5_image.get("scan_0", getclass=True, getlink=False) - self.assertIs(result, h5py.Group) - result = self.h5_image.get("scan_0", getclass=False, getlink=False) - self.assertIsInstance(result, commonh5.Group) - - def test_detector_link(self): - detector1 = self.h5_image["/scan_0/instrument/detector_0"] - detector2 = self.h5_image["/scan_0/measurement/image_0/info"] - self.assertIsNot(detector1, detector2) - self.assertEqual(list(detector1.items()), list(detector2.items())) - self.assertEqual(self.h5_image.get(detector2.name, getlink=True).path, detector1.name) - - def test_detector_data_link(self): - data1 = self.h5_image["/scan_0/instrument/detector_0/data"] - data2 = self.h5_image["/scan_0/measurement/image_0/data"] - self.assertIsNot(data1, data2) - self.assertIs(data1._get_data(), data2._get_data()) - self.assertEqual(self.h5_image.get(data2.name, getlink=True).path, data1.name) - - def test_dirty_header(self): - """Test that it does not fail""" - try: - header = {} - header["foo"] = b'abc' - data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) - fabio_image = fabio.edfimage.edfimage(data=data, header=header) - header = {} - header["foo"] = b'a\x90bc\xFE' - fabio_image.append_frame(data=data, header=header) - except Exception as e: - _logger.error(e.args[0]) - _logger.debug("Backtrace", exc_info=True) - self.skipTest("fabio do not allow to create the resource") - - h5_image = fabioh5.File(fabio_image=fabio_image) - scan_header_path = "/scan_0/instrument/file/scan_header" - self.assertIn(scan_header_path, h5_image) - data = h5_image[scan_header_path] - self.assertIsInstance(data[...], numpy.ndarray) - - def test_unicode_header(self): - """Test that it does not fail""" - try: - header = {} - header["foo"] = b'abc' - data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) - fabio_image = fabio.edfimage.edfimage(data=data, header=header) - header = {} - header["foo"] = u'abc\u2764' - fabio_image.append_frame(data=data, header=header) - except Exception as e: - _logger.error(e.args[0]) - _logger.debug("Backtrace", exc_info=True) - self.skipTest("fabio do not allow to create the resource") - - h5_image = fabioh5.File(fabio_image=fabio_image) - scan_header_path = "/scan_0/instrument/file/scan_header" - self.assertIn(scan_header_path, h5_image) - data = h5_image[scan_header_path] - self.assertIsInstance(data[...], numpy.ndarray) - - -class TestFabioH5MultiFrames(unittest.TestCase): - - @classmethod - def setUpClass(cls): - - names = ["A", "B", "C", "D"] - values = [["32000", "-10", "5.0", "1"], - ["-32000", "-10", "5.0", "1"]] - - fabio_file = None - - for i in range(10): - header = { - "image_id": "%d" % i, - "integer": "-100", - "float": "1.0", - "string": "hi!", - "list_integer": "100 50 0", - "list_float": "1.0 2.0 3.5", - "string_looks_like_list": "2000 hi!", - "motor_mne": " ".join(names), - "motor_pos": " ".join(values[i % len(values)]), - "counter_mne": " ".join(names), - "counter_pos": " ".join(values[i % len(values)]) - } - for iname, name in enumerate(names): - header[name] = values[i % len(values)][iname] - - data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64) - if fabio_file is None: - fabio_file = fabio.edfimage.EdfImage(data=data, header=header) - else: - fabio_file.append_frame(data=data, header=header) - - cls.fabio_file = fabio_file - cls.fabioh5 = fabioh5.File(fabio_image=fabio_file) - - def test_others(self): - others = self.fabioh5["/scan_0/instrument/detector_0/others"] - dataset = others["A"] - self.assertGreaterEqual(dataset.dtype.itemsize, 1) - self.assertEqual(dataset.dtype.kind, "i") - dataset = others["B"] - self.assertGreaterEqual(dataset.dtype.itemsize, 1) - self.assertEqual(dataset.dtype.kind, "i") - dataset = others["C"] - self.assertGreaterEqual(dataset.dtype.itemsize, 1) - self.assertEqual(dataset.dtype.kind, "f") - dataset = others["D"] - self.assertGreaterEqual(dataset.dtype.itemsize, 1) - self.assertEqual(dataset.dtype.kind, "u") - - def test_positioners(self): - counters = self.fabioh5["/scan_0/instrument/positioners"] - # At least 32 bits, no unsigned values - dataset = counters["A"] - self.assertGreaterEqual(dataset.dtype.itemsize, 4) - self.assertEqual(dataset.dtype.kind, "i") - dataset = counters["B"] - self.assertGreaterEqual(dataset.dtype.itemsize, 4) - self.assertEqual(dataset.dtype.kind, "i") - dataset = counters["C"] - self.assertGreaterEqual(dataset.dtype.itemsize, 4) - self.assertEqual(dataset.dtype.kind, "f") - dataset = counters["D"] - self.assertGreaterEqual(dataset.dtype.itemsize, 4) - self.assertEqual(dataset.dtype.kind, "i") - - def test_counters(self): - counters = self.fabioh5["/scan_0/measurement"] - # At least 32 bits, no unsigned values - dataset = counters["A"] - self.assertGreaterEqual(dataset.dtype.itemsize, 4) - self.assertEqual(dataset.dtype.kind, "i") - dataset = counters["B"] - self.assertGreaterEqual(dataset.dtype.itemsize, 4) - self.assertEqual(dataset.dtype.kind, "i") - dataset = counters["C"] - self.assertGreaterEqual(dataset.dtype.itemsize, 4) - self.assertEqual(dataset.dtype.kind, "f") - dataset = counters["D"] - self.assertGreaterEqual(dataset.dtype.itemsize, 4) - self.assertEqual(dataset.dtype.kind, "i") - - -class TestFabioH5WithEdf(unittest.TestCase): - - @classmethod - def setUpClass(cls): - - cls.tmp_directory = tempfile.mkdtemp() - - cls.edf_filename = os.path.join(cls.tmp_directory, "test.edf") - - header = { - "integer": "-100", - "float": "1.0", - "string": "hi!", - "list_integer": "100 50 0", - "list_float": "1.0 2.0 3.5", - "string_looks_like_list": "2000 hi!", - } - data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64) - fabio_image = fabio.edfimage.edfimage(data, header) - fabio_image.write(cls.edf_filename) - - cls.fabio_image = fabio.open(cls.edf_filename) - cls.h5_image = fabioh5.File(fabio_image=cls.fabio_image) - - @classmethod - def tearDownClass(cls): - cls.fabio_image = None - cls.h5_image = None - shutil.rmtree(cls.tmp_directory) - - def test_reserved_format_metadata(self): - if fabio.hexversion < 327920: # 0.5.0 final - self.skipTest("fabio >= 0.5.0 final is needed") - - # The EDF contains reserved keys in the header - self.assertIn("HeaderID", self.fabio_image.header) - # We do not expose them in FabioH5 - self.assertNotIn("/scan_0/instrument/detector_0/others/HeaderID", self.h5_image) - - -class _TestableFrameData(fabioh5.FrameData): - """Allow to test if the full data is reached.""" - def _create_data(self): - raise RuntimeError("Not supposed to be called") - - -class TestFabioH5WithFileSeries(unittest.TestCase): - - @classmethod - def setUpClass(cls): - - cls.tmp_directory = tempfile.mkdtemp() - - cls.edf_filenames = [] - - for i in range(10): - filename = os.path.join(cls.tmp_directory, "test_%04d.edf" % i) - cls.edf_filenames.append(filename) - - header = { - "image_id": "%d" % i, - "integer": "-100", - "float": "1.0", - "string": "hi!", - "list_integer": "100 50 0", - "list_float": "1.0 2.0 3.5", - "string_looks_like_list": "2000 hi!", - } - data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64) - fabio_image = fabio.edfimage.edfimage(data, header) - fabio_image.write(filename) - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tmp_directory) - - def _testH5Image(self, h5_image): - # test data - dataset = h5_image["/scan_0/instrument/detector_0/data"] - self.assertEqual(dataset.h5py_class, h5py.Dataset) - self.assertTrue(isinstance(dataset[()], numpy.ndarray)) - self.assertEqual(dataset.dtype.kind, "i") - self.assertEqual(dataset.shape, (10, 3, 2)) - self.assertEqual(list(dataset[:, 0, 0]), list(range(10))) - self.assertEqual(dataset.attrs["interpretation"], "image") - # test metatdata - dataset = h5_image["/scan_0/instrument/detector_0/others/image_id"] - self.assertEqual(list(dataset[...]), list(range(10))) - - def testFileList(self): - h5_image = fabioh5.File(file_series=self.edf_filenames) - self._testH5Image(h5_image) - - def testFileSeries(self): - file_series = fabioh5._FileSeries(self.edf_filenames) - h5_image = fabioh5.File(file_series=file_series) - self._testH5Image(h5_image) - - def testFrameDataCache(self): - file_series = fabioh5._FileSeries(self.edf_filenames) - reader = fabioh5.FabioReader(file_series=file_series) - frameData = _TestableFrameData("foo", reader) - self.assertEqual(frameData.dtype.kind, "i") - self.assertEqual(frameData.shape, (10, 3, 2)) - - -def suite(): - loadTests = unittest.defaultTestLoader.loadTestsFromTestCase - test_suite = unittest.TestSuite() - test_suite.addTest(loadTests(TestFabioH5)) - test_suite.addTest(loadTests(TestFabioH5MultiFrames)) - test_suite.addTest(loadTests(TestFabioH5WithEdf)) - test_suite.addTest(loadTests(TestFabioH5WithFileSeries)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_h5py_utils.py b/silx/io/test/test_h5py_utils.py deleted file mode 100644 index 2e2e3dd..0000000 --- a/silx/io/test/test_h5py_utils.py +++ /dev/null @@ -1,397 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2017 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for h5py utilities""" - -__authors__ = ["W. de Nolf"] -__license__ = "MIT" -__date__ = "27/01/2020" - - -import unittest -import os -import sys -import time -import shutil -import tempfile -import threading -import multiprocessing -from contextlib import contextmanager - -from .. import h5py_utils -from ...utils.retry import RetryError, RetryTimeoutError - -IS_WINDOWS = sys.platform == "win32" - - -def _subprocess_context_main(queue, contextmgr, *args, **kw): - try: - with contextmgr(*args, **kw): - queue.put(None) - threading.Event().wait() - except Exception: - queue.put(None) - raise - - -@contextmanager -def _subprocess_context(contextmgr, *args, **kw): - timeout = kw.pop("timeout", 10) - queue = multiprocessing.Queue(maxsize=1) - p = multiprocessing.Process( - target=_subprocess_context_main, args=(queue, contextmgr) + args, kwargs=kw - ) - p.start() - try: - queue.get(timeout=timeout) - yield - finally: - try: - p.kill() - except AttributeError: - p.terminate() - p.join(timeout) - - -@contextmanager -def _open_context(filename, **kw): - with h5py_utils.File(filename, **kw) as f: - if kw.get("mode") == "w": - f["check"] = True - f.flush() - yield f - - -def _cause_segfault(): - import ctypes - - i = ctypes.c_char(b"a") - j = ctypes.pointer(i) - c = 0 - while True: - j[c] = b"a" - c += 1 - - -def _top_level_names_test(txtfilename, *args, **kw): - sys.stderr = open(os.devnull, "w") - - with open(txtfilename, mode="r") as f: - failcounter = int(f.readline().strip()) - - ncausefailure = kw.pop("ncausefailure") - faildelay = kw.pop("faildelay") - if failcounter < ncausefailure: - time.sleep(faildelay) - failcounter += 1 - with open(txtfilename, mode="w") as f: - f.write(str(failcounter)) - if failcounter % 2: - raise RetryError - else: - _cause_segfault() - return h5py_utils._top_level_names(*args, **kw) - - -top_level_names_test = h5py_utils.retry_in_subprocess()(_top_level_names_test) - - -def subtests(test): - def wrapper(self): - for _ in self._subtests(): - with self.subTest(**self._subtest_options): - test(self) - - return wrapper - - -class TestH5pyUtils(unittest.TestCase): - def setUp(self): - self.test_dir = tempfile.mkdtemp() - - def tearDown(self): - shutil.rmtree(self.test_dir) - - def _subtests(self): - self._subtest_options = {"mode": "w"} - self.filename_generator = self._filenames() - yield - self._subtest_options = {"mode": "w", "libver": "latest"} - self.filename_generator = self._filenames() - yield - - @property - def _liber_allows_concurrent_access(self): - return self._subtest_options.get("libver") in [None, "earliest", "v18"] - - def _filenames(self): - i = 1 - while True: - filename = os.path.join(self.test_dir, "file{}.h5".format(i)) - with self._open_context(filename): - pass - yield filename - i += 1 - - def _new_filename(self): - return next(self.filename_generator) - - @contextmanager - def _open_context(self, filename, **kwargs): - kw = self._subtest_options - kw.update(kwargs) - with _open_context(filename, **kw) as f: - - yield f - - @contextmanager - def _open_context_subprocess(self, filename, **kwargs): - kw = self._subtest_options - kw.update(kwargs) - with _subprocess_context(_open_context, filename, **kw): - yield - - def _assert_hdf5_data(self, f): - self.assertTrue(f["check"][()]) - - def _validate_hdf5_data(self, filename, swmr=False): - with self._open_context(filename, mode="r") as f: - self.assertEqual(f.swmr_mode, swmr) - self._assert_hdf5_data(f) - - @subtests - def test_modes_single_process(self): - orig = os.environ.get("HDF5_USE_FILE_LOCKING") - filename1 = self._new_filename() - self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) - filename2 = self._new_filename() - self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) - with self._open_context(filename1, mode="r"): - with self._open_context(filename2, mode="r"): - pass - for mode in ["w", "a"]: - with self.assertRaises(RuntimeError): - with self._open_context(filename2, mode=mode): - pass - self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) - with self._open_context(filename1, mode="a"): - for mode in ["w", "a"]: - with self._open_context(filename2, mode=mode): - pass - with self.assertRaises(RuntimeError): - with self._open_context(filename2, mode="r"): - pass - self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) - - @subtests - def test_modes_multi_process(self): - if not self._liber_allows_concurrent_access: - # A concurrent reader with HDF5_USE_FILE_LOCKING=FALSE is - # no longer works with HDF5 >=1.10 (you get an exception - # when trying to open the file) - return - filename = self._new_filename() - - # File open by truncating writer - with self._open_context_subprocess(filename, mode="w"): - with self._open_context(filename, mode="r") as f: - self._assert_hdf5_data(f) - if IS_WINDOWS: - with self._open_context(filename, mode="a") as f: - self._assert_hdf5_data(f) - else: - with self.assertRaises(OSError): - with self._open_context(filename, mode="a") as f: - pass - self._validate_hdf5_data(filename) - - # File open by appending writer - with self._open_context_subprocess(filename, mode="a"): - with self._open_context(filename, mode="r") as f: - self._assert_hdf5_data(f) - if IS_WINDOWS: - with self._open_context(filename, mode="a") as f: - self._assert_hdf5_data(f) - else: - with self.assertRaises(OSError): - with self._open_context(filename, mode="a") as f: - pass - self._validate_hdf5_data(filename) - - # File open by reader - with self._open_context_subprocess(filename, mode="r"): - with self._open_context(filename, mode="r") as f: - self._assert_hdf5_data(f) - with self._open_context(filename, mode="a") as f: - pass - self._validate_hdf5_data(filename) - - # File open by locking reader - with _subprocess_context( - _open_context, filename, mode="r", enable_file_locking=True - ): - with self._open_context(filename, mode="r") as f: - self._assert_hdf5_data(f) - if IS_WINDOWS: - with self._open_context(filename, mode="a") as f: - self._assert_hdf5_data(f) - else: - with self.assertRaises(OSError): - with self._open_context(filename, mode="a") as f: - pass - self._validate_hdf5_data(filename) - - @subtests - @unittest.skipIf(not h5py_utils.HAS_SWMR, "SWMR not supported") - def test_modes_multi_process_swmr(self): - filename = self._new_filename() - - with self._open_context(filename, mode="w", libver="latest") as f: - pass - - # File open by SWMR writer - with self._open_context_subprocess(filename, mode="a", swmr=True): - with self._open_context(filename, mode="r") as f: - assert f.swmr_mode - self._assert_hdf5_data(f) - with self.assertRaises(OSError): - with self._open_context(filename, mode="a") as f: - pass - self._validate_hdf5_data(filename, swmr=True) - - @subtests - def test_retry_defaults(self): - filename = self._new_filename() - - names = h5py_utils.top_level_names(filename) - self.assertEqual(names, []) - - names = h5py_utils.safe_top_level_names(filename) - self.assertEqual(names, []) - - names = h5py_utils.top_level_names(filename, include_only=None) - self.assertEqual(names, ["check"]) - - names = h5py_utils.safe_top_level_names(filename, include_only=None) - self.assertEqual(names, ["check"]) - - with h5py_utils.open_item(filename, "/check", validate=lambda x: False) as item: - self.assertEqual(item, None) - - with h5py_utils.open_item(filename, "/check", validate=None) as item: - self.assertTrue(item[()]) - - with self.assertRaises(RetryTimeoutError): - with h5py_utils.open_item( - filename, - "/check", - retry_timeout=0.1, - retry_invalid=True, - validate=lambda x: False, - ) as item: - pass - - ncall = 0 - - def validate(item): - nonlocal ncall - if ncall >= 1: - return True - else: - ncall += 1 - raise RetryError - - with h5py_utils.open_item( - filename, "/check", validate=validate, retry_timeout=1, retry_invalid=True - ) as item: - self.assertTrue(item[()]) - - @subtests - def test_retry_custom(self): - filename = self._new_filename() - ncausefailure = 3 - faildelay = 0.1 - sufficient_timeout = ncausefailure * (faildelay + 10) - insufficient_timeout = ncausefailure * faildelay * 0.5 - - @h5py_utils.retry_contextmanager() - def open_item(filename, name): - nonlocal failcounter - if failcounter < ncausefailure: - time.sleep(faildelay) - failcounter += 1 - raise RetryError - with h5py_utils.File(filename) as h5file: - yield h5file[name] - - failcounter = 0 - kw = {"retry_timeout": sufficient_timeout} - with open_item(filename, "/check", **kw) as item: - self.assertTrue(item[()]) - - failcounter = 0 - kw = {"retry_timeout": insufficient_timeout} - with self.assertRaises(RetryTimeoutError): - with open_item(filename, "/check", **kw) as item: - pass - - @subtests - def test_retry_in_subprocess(self): - filename = self._new_filename() - txtfilename = os.path.join(self.test_dir, "failcounter.txt") - ncausefailure = 3 - faildelay = 0.1 - sufficient_timeout = ncausefailure * (faildelay + 10) - insufficient_timeout = ncausefailure * faildelay * 0.5 - - kw = { - "retry_timeout": sufficient_timeout, - "include_only": None, - "ncausefailure": ncausefailure, - "faildelay": faildelay, - } - with open(txtfilename, mode="w") as f: - f.write("0") - names = top_level_names_test(txtfilename, filename, **kw) - self.assertEqual(names, ["check"]) - - kw = { - "retry_timeout": insufficient_timeout, - "include_only": None, - "ncausefailure": ncausefailure, - "faildelay": faildelay, - } - with open(txtfilename, mode="w") as f: - f.write("0") - with self.assertRaises(RetryTimeoutError): - top_level_names_test(txtfilename, filename, **kw) - - -def suite(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestH5pyUtils)) - return test_suite - - -if __name__ == "__main__": - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_nxdata.py b/silx/io/test/test_nxdata.py deleted file mode 100644 index 80cc193..0000000 --- a/silx/io/test/test_nxdata.py +++ /dev/null @@ -1,579 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2020 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for NXdata parsing""" - -__authors__ = ["P. Knobel"] -__license__ = "MIT" -__date__ = "24/03/2020" - - -import tempfile -import unittest -import h5py -import numpy -import six - -from .. import nxdata - - -text_dtype = h5py.special_dtype(vlen=six.text_type) - - -class TestNXdata(unittest.TestCase): - def setUp(self): - tmp = tempfile.NamedTemporaryFile(prefix="nxdata_examples_", suffix=".h5", delete=True) - tmp.file.close() - self.h5fname = tmp.name - self.h5f = h5py.File(tmp.name, "w") - - # SCALARS - g0d = self.h5f.create_group("scalars") - - g0d0 = g0d.create_group("0D_scalar") - g0d0.attrs["NX_class"] = "NXdata" - g0d0.attrs["signal"] = "scalar" - g0d0.create_dataset("scalar", data=10) - g0d0.create_dataset("scalar_errors", data=0.1) - - g0d1 = g0d.create_group("2D_scalars") - g0d1.attrs["NX_class"] = "NXdata" - g0d1.attrs["signal"] = "scalars" - ds = g0d1.create_dataset("scalars", data=numpy.arange(3 * 10).reshape((3, 10))) - ds.attrs["interpretation"] = "scalar" - - g0d1 = g0d.create_group("4D_scalars") - g0d1.attrs["NX_class"] = "NXdata" - g0d1.attrs["signal"] = "scalars" - ds = g0d1.create_dataset("scalars", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10))) - ds.attrs["interpretation"] = "scalar" - - # SPECTRA - g1d = self.h5f.create_group("spectra") - - g1d0 = g1d.create_group("1D_spectrum") - g1d0.attrs["NX_class"] = "NXdata" - g1d0.attrs["signal"] = "count" - g1d0.attrs["auxiliary_signals"] = numpy.array(["count2", "count3"], - dtype=text_dtype) - g1d0.attrs["axes"] = "energy_calib" - g1d0.attrs["uncertainties"] = numpy.array(["energy_errors", ], - dtype=text_dtype) - g1d0.create_dataset("count", data=numpy.arange(10)) - g1d0.create_dataset("count2", data=0.5 * numpy.arange(10)) - d = g1d0.create_dataset("count3", data=0.4 * numpy.arange(10)) - d.attrs["long_name"] = "3rd counter" - g1d0.create_dataset("title", data="Title as dataset (like nexpy)") - g1d0.create_dataset("energy_calib", data=(10, 5)) # 10 * idx + 5 - g1d0.create_dataset("energy_errors", data=3.14 * numpy.random.rand(10)) - - g1d1 = g1d.create_group("2D_spectra") - g1d1.attrs["NX_class"] = "NXdata" - g1d1.attrs["signal"] = "counts" - ds = g1d1.create_dataset("counts", data=numpy.arange(3 * 10).reshape((3, 10))) - ds.attrs["interpretation"] = "spectrum" - - g1d2 = g1d.create_group("4D_spectra") - g1d2.attrs["NX_class"] = "NXdata" - g1d2.attrs["signal"] = "counts" - g1d2.attrs["axes"] = numpy.array(["energy", ], dtype=text_dtype) - ds = g1d2.create_dataset("counts", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10))) - ds.attrs["interpretation"] = "spectrum" - ds = g1d2.create_dataset("errors", data=4.5 * numpy.random.rand(2, 2, 3, 10)) - ds = g1d2.create_dataset("energy", data=5 + 10 * numpy.arange(15), - shuffle=True, compression="gzip") - ds.attrs["long_name"] = "Calibrated energy" - ds.attrs["first_good"] = 3 - ds.attrs["last_good"] = 12 - g1d2.create_dataset("energy_errors", data=10 * numpy.random.rand(15)) - - # IMAGES - g2d = self.h5f.create_group("images") - - g2d0 = g2d.create_group("2D_regular_image") - g2d0.attrs["NX_class"] = "NXdata" - g2d0.attrs["signal"] = "image" - g2d0.attrs["auxiliary_signals"] = "image2" - g2d0.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"], - dtype=text_dtype) - g2d0.create_dataset("image", data=numpy.arange(4 * 6).reshape((4, 6))) - g2d0.create_dataset("image2", data=numpy.arange(4 * 6).reshape((4, 6))) - ds = g2d0.create_dataset("rows_calib", data=(10, 5)) - ds.attrs["long_name"] = "Calibrated Y" - g2d0.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6)) - - g2d1 = g2d.create_group("2D_irregular_data") - g2d1.attrs["NX_class"] = "NXdata" - g2d1.attrs["signal"] = "data" - g2d1.attrs["title"] = "Title as group attr" - g2d1.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"], - dtype=text_dtype) - g2d1.create_dataset("data", data=numpy.arange(64 * 128).reshape((64, 128))) - g2d1.create_dataset("rows_coordinates", data=numpy.arange(64) + numpy.random.rand(64)) - g2d1.create_dataset("columns_coordinates", data=numpy.arange(128) + 2.5 * numpy.random.rand(128)) - - g2d2 = g2d.create_group("3D_images") - g2d2.attrs["NX_class"] = "NXdata" - g2d2.attrs["signal"] = "images" - ds = g2d2.create_dataset("images", data=numpy.arange(2 * 4 * 6).reshape((2, 4, 6))) - ds.attrs["interpretation"] = "image" - - g2d3 = g2d.create_group("5D_images") - g2d3.attrs["NX_class"] = "NXdata" - g2d3.attrs["signal"] = "images" - g2d3.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"], - dtype=text_dtype) - ds = g2d3.create_dataset("images", data=numpy.arange(2 * 2 * 2 * 4 * 6).reshape((2, 2, 2, 4, 6))) - ds.attrs["interpretation"] = "image" - g2d3.create_dataset("rows_coordinates", data=5 + 10 * numpy.arange(4)) - g2d3.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6)) - - g2d4 = g2d.create_group("RGBA_image") - g2d4.attrs["NX_class"] = "NXdata" - g2d4.attrs["signal"] = "image" - g2d4.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"], - dtype=text_dtype) - rgba_image = numpy.linspace(0, 1, num=7*8*3).reshape((7, 8, 3)) - rgba_image[:, :, 1] = 1 - rgba_image[:, :, 1] # invert G channel to add some color - ds = g2d4.create_dataset("image", data=rgba_image) - ds.attrs["interpretation"] = "rgba-image" - ds = g2d4.create_dataset("rows_calib", data=(10, 5)) - ds.attrs["long_name"] = "Calibrated Y" - g2d4.create_dataset("columns_coordinates", data=0.5+0.02*numpy.arange(8)) - - # SCATTER - g = self.h5f.create_group("scatters") - - gd0 = g.create_group("x_y_scatter") - gd0.attrs["NX_class"] = "NXdata" - gd0.attrs["signal"] = "y" - gd0.attrs["axes"] = numpy.array(["x", ], dtype=text_dtype) - gd0.create_dataset("y", data=numpy.random.rand(128) - 0.5) - gd0.create_dataset("x", data=2 * numpy.random.rand(128)) - gd0.create_dataset("x_errors", data=0.05 * numpy.random.rand(128)) - gd0.create_dataset("errors", data=0.05 * numpy.random.rand(128)) - - gd1 = g.create_group("x_y_value_scatter") - gd1.attrs["NX_class"] = "NXdata" - gd1.attrs["signal"] = "values" - gd1.attrs["axes"] = numpy.array(["x", "y"], dtype=text_dtype) - gd1.create_dataset("values", data=3.14 * numpy.random.rand(128)) - gd1.create_dataset("y", data=numpy.random.rand(128)) - gd1.create_dataset("y_errors", data=0.02 * numpy.random.rand(128)) - gd1.create_dataset("x", data=numpy.random.rand(128)) - gd1.create_dataset("x_errors", data=0.02 * numpy.random.rand(128)) - - def tearDown(self): - self.h5f.close() - - def testValidity(self): - for group in self.h5f: - for subgroup in self.h5f[group]: - self.assertTrue( - nxdata.is_valid_nxdata(self.h5f[group][subgroup]), - "%s/%s not found to be a valid NXdata group" % (group, subgroup)) - - def testScalars(self): - nxd = nxdata.NXdata(self.h5f["scalars/0D_scalar"]) - self.assertTrue(nxd.signal_is_0d) - self.assertEqual(nxd.signal[()], 10) - self.assertEqual(nxd.axes_names, []) - self.assertEqual(nxd.axes_dataset_names, []) - self.assertEqual(nxd.axes, []) - self.assertIsNotNone(nxd.errors) - self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) - self.assertIsNone(nxd.interpretation) - - nxd = nxdata.NXdata(self.h5f["scalars/2D_scalars"]) - self.assertTrue(nxd.signal_is_2d) - self.assertEqual(nxd.signal[1, 2], 12) - self.assertEqual(nxd.axes_names, [None, None]) - self.assertEqual(nxd.axes_dataset_names, [None, None]) - self.assertEqual(nxd.axes, [None, None]) - self.assertIsNone(nxd.errors) - self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) - self.assertEqual(nxd.interpretation, "scalar") - - nxd = nxdata.NXdata(self.h5f["scalars/4D_scalars"]) - self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or - nxd.signal_is_2d or nxd.signal_is_3d) - self.assertEqual(nxd.signal[1, 0, 1, 4], 74) - self.assertEqual(nxd.axes_names, [None, None, None, None]) - self.assertEqual(nxd.axes_dataset_names, [None, None, None, None]) - self.assertEqual(nxd.axes, [None, None, None, None]) - self.assertIsNone(nxd.errors) - self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) - self.assertEqual(nxd.interpretation, "scalar") - - def testSpectra(self): - nxd = nxdata.NXdata(self.h5f["spectra/1D_spectrum"]) - self.assertTrue(nxd.signal_is_1d) - self.assertTrue(nxd.is_curve) - self.assertTrue(numpy.array_equal(numpy.array(nxd.signal), - numpy.arange(10))) - self.assertEqual(nxd.axes_names, ["energy_calib"]) - self.assertEqual(nxd.axes_dataset_names, ["energy_calib"]) - self.assertEqual(nxd.axes[0][0], 10) - self.assertEqual(nxd.axes[0][1], 5) - self.assertIsNone(nxd.errors) - self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) - self.assertIsNone(nxd.interpretation) - self.assertEqual(nxd.title, "Title as dataset (like nexpy)") - - self.assertEqual(nxd.auxiliary_signals_dataset_names, - ["count2", "count3"]) - self.assertEqual(nxd.auxiliary_signals_names, - ["count2", "3rd counter"]) - self.assertAlmostEqual(nxd.auxiliary_signals[1][2], - 0.8) # numpy.arange(10) * 0.4 - - nxd = nxdata.NXdata(self.h5f["spectra/2D_spectra"]) - self.assertTrue(nxd.signal_is_2d) - self.assertTrue(nxd.is_curve) - self.assertEqual(nxd.axes_names, [None, None]) - self.assertEqual(nxd.axes_dataset_names, [None, None]) - self.assertEqual(nxd.axes, [None, None]) - self.assertIsNone(nxd.errors) - self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) - self.assertEqual(nxd.interpretation, "spectrum") - - nxd = nxdata.NXdata(self.h5f["spectra/4D_spectra"]) - self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or - nxd.signal_is_2d or nxd.signal_is_3d) - self.assertTrue(nxd.is_curve) - self.assertEqual(nxd.axes_names, - [None, None, None, "Calibrated energy"]) - self.assertEqual(nxd.axes_dataset_names, - [None, None, None, "energy"]) - self.assertEqual(nxd.axes[:3], [None, None, None]) - self.assertEqual(nxd.axes[3].shape, (10, )) # dataset shape (15, ) sliced [3:12] - self.assertIsNotNone(nxd.errors) - self.assertEqual(nxd.errors.shape, (2, 2, 3, 10)) - self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) - self.assertEqual(nxd.interpretation, "spectrum") - self.assertEqual(nxd.get_axis_errors("energy").shape, - (10,)) - # test getting axis errors by long_name - self.assertTrue(numpy.array_equal(nxd.get_axis_errors("Calibrated energy"), - nxd.get_axis_errors("energy"))) - self.assertTrue(numpy.array_equal(nxd.get_axis_errors(b"Calibrated energy"), - nxd.get_axis_errors("energy"))) - - def testImages(self): - nxd = nxdata.NXdata(self.h5f["images/2D_regular_image"]) - self.assertTrue(nxd.signal_is_2d) - self.assertTrue(nxd.is_image) - self.assertEqual(nxd.axes_names, ["Calibrated Y", "columns_coordinates"]) - self.assertEqual(list(nxd.axes_dataset_names), - ["rows_calib", "columns_coordinates"]) - self.assertIsNone(nxd.errors) - self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) - self.assertIsNone(nxd.interpretation) - self.assertEqual(len(nxd.auxiliary_signals), 1) - self.assertEqual(nxd.auxiliary_signals_names, ["image2"]) - - nxd = nxdata.NXdata(self.h5f["images/2D_irregular_data"]) - self.assertTrue(nxd.signal_is_2d) - self.assertTrue(nxd.is_image) - - self.assertEqual(nxd.axes_dataset_names, nxd.axes_names) - self.assertEqual(list(nxd.axes_dataset_names), - ["rows_coordinates", "columns_coordinates"]) - self.assertEqual(len(nxd.axes), 2) - self.assertIsNone(nxd.errors) - self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) - self.assertIsNone(nxd.interpretation) - self.assertEqual(nxd.title, "Title as group attr") - - nxd = nxdata.NXdata(self.h5f["images/5D_images"]) - self.assertTrue(nxd.is_image) - self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or - nxd.signal_is_2d or nxd.signal_is_3d) - self.assertEqual(nxd.axes_names, - [None, None, None, 'rows_coordinates', 'columns_coordinates']) - self.assertEqual(nxd.axes_dataset_names, - [None, None, None, 'rows_coordinates', 'columns_coordinates']) - self.assertIsNone(nxd.errors) - self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) - self.assertEqual(nxd.interpretation, "image") - - nxd = nxdata.NXdata(self.h5f["images/RGBA_image"]) - self.assertTrue(nxd.is_image) - self.assertEqual(nxd.interpretation, "rgba-image") - self.assertTrue(nxd.signal_is_3d) - self.assertEqual(nxd.axes_names, ["Calibrated Y", - "columns_coordinates", - None]) - self.assertEqual(list(nxd.axes_dataset_names), - ["rows_calib", "columns_coordinates", None]) - - def testScatters(self): - nxd = nxdata.NXdata(self.h5f["scatters/x_y_scatter"]) - self.assertTrue(nxd.signal_is_1d) - self.assertEqual(nxd.axes_names, ["x"]) - self.assertEqual(nxd.axes_dataset_names, - ["x"]) - self.assertIsNotNone(nxd.errors) - self.assertEqual(nxd.get_axis_errors("x").shape, - (128, )) - self.assertTrue(nxd.is_scatter) - self.assertFalse(nxd.is_x_y_value_scatter) - self.assertIsNone(nxd.interpretation) - - nxd = nxdata.NXdata(self.h5f["scatters/x_y_value_scatter"]) - self.assertFalse(nxd.signal_is_1d) - self.assertTrue(nxd.axes_dataset_names, - nxd.axes_names) - self.assertEqual(nxd.axes_dataset_names, - ["x", "y"]) - self.assertEqual(nxd.get_axis_errors("x").shape, - (128, )) - self.assertEqual(nxd.get_axis_errors("y").shape, - (128, )) - self.assertEqual(len(nxd.axes), 2) - self.assertIsNone(nxd.errors) - self.assertTrue(nxd.is_scatter) - self.assertTrue(nxd.is_x_y_value_scatter) - self.assertIsNone(nxd.interpretation) - - -class TestLegacyNXdata(unittest.TestCase): - def setUp(self): - tmp = tempfile.NamedTemporaryFile(prefix="nxdata_legacy_examples_", - suffix=".h5", delete=True) - tmp.file.close() - self.h5fname = tmp.name - self.h5f = h5py.File(tmp.name, "w") - - def tearDown(self): - self.h5f.close() - - def testSignalAttrOnDataset(self): - g = self.h5f.create_group("2D") - g.attrs["NX_class"] = "NXdata" - - ds0 = g.create_dataset("image0", - data=numpy.arange(4 * 6).reshape((4, 6))) - ds0.attrs["signal"] = 1 - ds0.attrs["long_name"] = "My first image" - - ds1 = g.create_dataset("image1", - data=numpy.arange(4 * 6).reshape((4, 6))) - ds1.attrs["signal"] = "2" - ds1.attrs["long_name"] = "My 2nd image" - - ds2 = g.create_dataset("image2", - data=numpy.arange(4 * 6).reshape((4, 6))) - ds2.attrs["signal"] = 3 - - nxd = nxdata.NXdata(self.h5f["2D"]) - - self.assertEqual(nxd.signal_dataset_name, "image0") - self.assertEqual(nxd.signal_name, "My first image") - self.assertEqual(nxd.signal.shape, - (4, 6)) - - self.assertEqual(len(nxd.auxiliary_signals), 2) - self.assertEqual(nxd.auxiliary_signals[1].shape, - (4, 6)) - - self.assertEqual(nxd.auxiliary_signals_dataset_names, - ["image1", "image2"]) - self.assertEqual(nxd.auxiliary_signals_names, - ["My 2nd image", "image2"]) - - def testAxesOnSignalDataset(self): - g = self.h5f.create_group("2D") - g.attrs["NX_class"] = "NXdata" - - ds0 = g.create_dataset("image0", - data=numpy.arange(4 * 6).reshape((4, 6))) - ds0.attrs["signal"] = 1 - ds0.attrs["axes"] = "yaxis:xaxis" - - ds1 = g.create_dataset("yaxis", - data=numpy.arange(4)) - ds2 = g.create_dataset("xaxis", - data=numpy.arange(6)) - - nxd = nxdata.NXdata(self.h5f["2D"]) - - self.assertEqual(nxd.axes_dataset_names, - ["yaxis", "xaxis"]) - self.assertTrue(numpy.array_equal(nxd.axes[0], - numpy.arange(4))) - self.assertTrue(numpy.array_equal(nxd.axes[1], - numpy.arange(6))) - - def testAxesOnAxesDatasets(self): - g = self.h5f.create_group("2D") - g.attrs["NX_class"] = "NXdata" - - ds0 = g.create_dataset("image0", - data=numpy.arange(4 * 6).reshape((4, 6))) - ds0.attrs["signal"] = 1 - ds1 = g.create_dataset("yaxis", - data=numpy.arange(4)) - ds1.attrs["axis"] = 0 - ds2 = g.create_dataset("xaxis", - data=numpy.arange(6)) - ds2.attrs["axis"] = "1" - - nxd = nxdata.NXdata(self.h5f["2D"]) - self.assertEqual(nxd.axes_dataset_names, - ["yaxis", "xaxis"]) - self.assertTrue(numpy.array_equal(nxd.axes[0], - numpy.arange(4))) - self.assertTrue(numpy.array_equal(nxd.axes[1], - numpy.arange(6))) - - def testAsciiUndefinedAxesAttrs(self): - """Some files may not be using utf8 for str attrs""" - g = self.h5f.create_group("bytes_attrs") - g.attrs["NX_class"] = b"NXdata" - g.attrs["signal"] = b"image0" - g.attrs["axes"] = b"yaxis", b"." - - g.create_dataset("image0", - data=numpy.arange(4 * 6).reshape((4, 6))) - g.create_dataset("yaxis", - data=numpy.arange(4)) - - nxd = nxdata.NXdata(self.h5f["bytes_attrs"]) - self.assertEqual(nxd.axes_dataset_names, - ["yaxis", None]) - - -class TestSaveNXdata(unittest.TestCase): - def setUp(self): - tmp = tempfile.NamedTemporaryFile(prefix="nxdata", - suffix=".h5", delete=True) - tmp.file.close() - self.h5fname = tmp.name - - def testSimpleSave(self): - sig = numpy.array([0, 1, 2]) - a0 = numpy.array([2, 3, 4]) - a1 = numpy.array([3, 4, 5]) - nxdata.save_NXdata(filename=self.h5fname, - signal=sig, - axes=[a0, a1], - signal_name="sig", - axes_names=["a0", "a1"], - nxentry_name="a", - nxdata_name="mydata") - - h5f = h5py.File(self.h5fname, "r") - self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"])) - - nxd = nxdata.NXdata(h5f["/a/mydata"]) - self.assertTrue(numpy.array_equal(nxd.signal, - sig)) - self.assertTrue(numpy.array_equal(nxd.axes[0], - a0)) - - h5f.close() - - def testSimplestSave(self): - sig = numpy.array([0, 1, 2]) - nxdata.save_NXdata(filename=self.h5fname, - signal=sig) - - h5f = h5py.File(self.h5fname, "r") - - self.assertTrue(nxdata.is_valid_nxdata(h5f["/entry/data0"])) - - nxd = nxdata.NXdata(h5f["/entry/data0"]) - self.assertTrue(numpy.array_equal(nxd.signal, - sig)) - h5f.close() - - def testSaveDefaultAxesNames(self): - sig = numpy.array([0, 1, 2]) - a0 = numpy.array([2, 3, 4]) - a1 = numpy.array([3, 4, 5]) - nxdata.save_NXdata(filename=self.h5fname, - signal=sig, - axes=[a0, a1], - signal_name="sig", - axes_names=None, - axes_long_names=["a", "b"], - nxentry_name="a", - nxdata_name="mydata") - - h5f = h5py.File(self.h5fname, "r") - self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"])) - - nxd = nxdata.NXdata(h5f["/a/mydata"]) - self.assertTrue(numpy.array_equal(nxd.signal, - sig)) - self.assertTrue(numpy.array_equal(nxd.axes[0], - a0)) - self.assertEqual(nxd.axes_dataset_names, - [u"dim0", u"dim1"]) - self.assertEqual(nxd.axes_names, - [u"a", u"b"]) - - h5f.close() - - def testSaveToExistingEntry(self): - h5f = h5py.File(self.h5fname, "w") - g = h5f.create_group("myentry") - g.attrs["NX_class"] = "NXentry" - h5f.close() - - sig = numpy.array([0, 1, 2]) - a0 = numpy.array([2, 3, 4]) - a1 = numpy.array([3, 4, 5]) - nxdata.save_NXdata(filename=self.h5fname, - signal=sig, - axes=[a0, a1], - signal_name="sig", - axes_names=["a0", "a1"], - nxentry_name="myentry", - nxdata_name="toto") - - h5f = h5py.File(self.h5fname, "r") - self.assertTrue(nxdata.is_valid_nxdata(h5f["myentry/toto"])) - - nxd = nxdata.NXdata(h5f["myentry/toto"]) - self.assertTrue(numpy.array_equal(nxd.signal, - sig)) - self.assertTrue(numpy.array_equal(nxd.axes[0], - a0)) - h5f.close() - - -def suite(): - test_suite = unittest.TestSuite() - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestNXdata)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestLegacyNXdata)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSaveNXdata)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_octaveh5.py b/silx/io/test/test_octaveh5.py deleted file mode 100644 index 2e65820..0000000 --- a/silx/io/test/test_octaveh5.py +++ /dev/null @@ -1,165 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -""" -Tests for the octaveh5 module -""" - -__authors__ = ["C. Nemoz", "H. Payno"] -__license__ = "MIT" -__date__ = "12/07/2016" - -import unittest -import os -import tempfile - -try: - from ..octaveh5 import Octaveh5 -except ImportError: - Octaveh5 = None - - -@unittest.skipIf(Octaveh5 is None, "Could not import h5py") -class TestOctaveH5(unittest.TestCase): - @staticmethod - def _get_struct_FT(): - return { - 'NO_CHECK': 0.0, 'SHOWSLICE': 1.0, 'DOTOMO': 1.0, 'DATABASE': 0.0, 'ANGLE_OFFSET': 0.0, - 'VOLSELECTION_REMEMBER': 0.0, 'NUM_PART': 4.0, 'VOLOUTFILE': 0.0, 'RINGSCORRECTION': 0.0, - 'DO_TEST_SLICE': 1.0, 'ZEROOFFMASK': 1.0, 'VERSION': 'fastomo3 version 2.0', - 'CORRECT_SPIKES_THRESHOLD': 0.040000000000000001, 'SHOWPROJ': 0.0, 'HALF_ACQ': 0.0, - 'ANGLE_OFFSET_VALUE': 0.0, 'FIXEDSLICE': 'middle', 'VOLSELECT': 'total' } - @staticmethod - def _get_struct_PYHSTEXE(): - return { - 'EXE': 'PyHST2_2015d', 'VERBOSE': 0.0, 'OFFV': 'PyHST2_2015d', 'TOMO': 0.0, - 'VERBOSE_FILE': 'pyhst_out.txt', 'DIR': '/usr/bin/', 'OFFN': 'pyhst2'} - - @staticmethod - def _get_struct_FTAXIS(): - return { - 'POSITION_VALUE': 12345.0, 'COR_ERROR': 0.0, 'FILESDURINGSCAN': 0.0, 'PLOTFIGURE': 1.0, - 'DIM1': 0.0, 'OVERSAMPLING': 5.0, 'TO_THE_CENTER': 1.0, 'POSITION': 'fixed', - 'COR_POSITION': 0.0, 'HA': 0.0 } - - @staticmethod - def _get_struct_PAGANIN(): - return { - 'MKEEP_MASK': 0.0, 'UNSHARP_SIGMA': 0.80000000000000004, 'DILATE': 2.0, 'UNSHARP_COEFF': 3.0, - 'MEDIANR': 4.0, 'DB': 500.0, 'MKEEP_ABS': 0.0, 'MODE': 0.0, 'THRESHOLD': 0.5, - 'MKEEP_BONE': 0.0, 'DB2': 100.0, 'MKEEP_CORR': 0.0, 'MKEEP_SOFT': 0.0 } - - @staticmethod - def _get_struct_BEAMGEO(): - return {'DIST': 55.0, 'SY': 0.0, 'SX': 0.0, 'TYPE': 'p'} - - - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.test_3_6_fname = os.path.join(self.tempdir, "silx_tmp_t00_octaveTest_3_6.h5") - self.test_3_8_fname = os.path.join(self.tempdir, "silx_tmp_t00_octaveTest_3_8.h5") - - def tearDown(self): - if os.path.isfile(self.test_3_6_fname): - os.unlink(self.test_3_6_fname) - if os.path.isfile(self.test_3_8_fname): - os.unlink(self.test_3_8_fname) - - def testWritedIsReaded(self): - """ - Simple test to write and reaf the structure compatible with the octave h5 using structure. - This test is for # test for octave version > 3.8 - """ - writer = Octaveh5() - - writer.open(self.test_3_8_fname, 'a') - # step 1 writing the file - writer.write('FT', self._get_struct_FT()) - writer.write('PYHSTEXE', self._get_struct_PYHSTEXE()) - writer.write('FTAXIS', self._get_struct_FTAXIS()) - writer.write('PAGANIN', self._get_struct_PAGANIN()) - writer.write('BEAMGEO', self._get_struct_BEAMGEO()) - writer.close() - - # step 2 reading the file - reader = Octaveh5().open(self.test_3_8_fname) - # 2.1 check FT - data_readed = reader.get('FT') - self.assertEqual(data_readed, self._get_struct_FT() ) - # 2.2 check PYHSTEXE - data_readed = reader.get('PYHSTEXE') - self.assertEqual(data_readed, self._get_struct_PYHSTEXE() ) - # 2.3 check FTAXIS - data_readed = reader.get('FTAXIS') - self.assertEqual(data_readed, self._get_struct_FTAXIS() ) - # 2.4 check PAGANIN - data_readed = reader.get('PAGANIN') - self.assertEqual(data_readed, self._get_struct_PAGANIN() ) - # 2.5 check BEAMGEO - data_readed = reader.get('BEAMGEO') - self.assertEqual(data_readed, self._get_struct_BEAMGEO() ) - reader.close() - - def testWritedIsReadedOldOctaveVersion(self): - """The same test as testWritedIsReaded but for octave version < 3.8 - """ - # test for octave version < 3.8 - writer = Octaveh5(3.6) - - writer.open(self.test_3_6_fname, 'a') - - # step 1 writing the file - writer.write('FT', self._get_struct_FT()) - writer.write('PYHSTEXE', self._get_struct_PYHSTEXE()) - writer.write('FTAXIS', self._get_struct_FTAXIS()) - writer.write('PAGANIN', self._get_struct_PAGANIN()) - writer.write('BEAMGEO', self._get_struct_BEAMGEO()) - writer.close() - - # step 2 reading the file - reader = Octaveh5(3.6).open(self.test_3_6_fname) - # 2.1 check FT - data_readed = reader.get('FT') - self.assertEqual(data_readed, self._get_struct_FT() ) - # 2.2 check PYHSTEXE - data_readed = reader.get('PYHSTEXE') - self.assertEqual(data_readed, self._get_struct_PYHSTEXE() ) - # 2.3 check FTAXIS - data_readed = reader.get('FTAXIS') - self.assertEqual(data_readed, self._get_struct_FTAXIS() ) - # 2.4 check PAGANIN - data_readed = reader.get('PAGANIN') - self.assertEqual(data_readed, self._get_struct_PAGANIN() ) - # 2.5 check BEAMGEO - data_readed = reader.get('BEAMGEO') - self.assertEqual(data_readed, self._get_struct_BEAMGEO() ) - reader.close() - -def suite(): - test_suite = unittest.TestSuite() - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestOctaveH5)) - return test_suite - -if __name__ == '__main__': - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_rawh5.py b/silx/io/test/test_rawh5.py deleted file mode 100644 index 0f7205c..0000000 --- a/silx/io/test/test_rawh5.py +++ /dev/null @@ -1,96 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# -# Copyright (c) 2016 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ###########################################################################*/ -"""Test for silx.gui.hdf5 module""" - -__authors__ = ["V. Valls"] -__license__ = "MIT" -__date__ = "21/09/2017" - - -import unittest -import tempfile -import numpy -import shutil -from ..import rawh5 - - -class TestNumpyFile(unittest.TestCase): - - @classmethod - def setUpClass(cls): - cls.tmpDirectory = tempfile.mkdtemp() - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tmpDirectory) - - def testNumpyFile(self): - filename = "%s/%s.npy" % (self.tmpDirectory, self.id()) - c = numpy.random.rand(5, 5) - numpy.save(filename, c) - h5 = rawh5.NumpyFile(filename) - self.assertIn("data", h5) - self.assertEqual(h5["data"].dtype.kind, "f") - - def testNumpyZFile(self): - filename = "%s/%s.npz" % (self.tmpDirectory, self.id()) - a = numpy.array(u"aaaaa") - b = numpy.array([1, 2, 3, 4]) - c = numpy.random.rand(5, 5) - d = numpy.array(b"aaaaa") - e = numpy.array(u"i \u2661 my mother") - numpy.savez(filename, a, b=b, c=c, d=d, e=e) - h5 = rawh5.NumpyFile(filename) - self.assertIn("arr_0", h5) - self.assertIn("b", h5) - self.assertIn("c", h5) - self.assertIn("d", h5) - self.assertIn("e", h5) - self.assertEqual(h5["arr_0"].dtype.kind, "U") - self.assertEqual(h5["b"].dtype.kind, "i") - self.assertEqual(h5["c"].dtype.kind, "f") - self.assertEqual(h5["d"].dtype.kind, "S") - self.assertEqual(h5["e"].dtype.kind, "U") - - def testNumpyZFileContainingDirectories(self): - filename = "%s/%s.npz" % (self.tmpDirectory, self.id()) - data = {} - data['a/b/c'] = numpy.arange(10) - data['a/b/e'] = numpy.arange(10) - numpy.savez(filename, **data) - h5 = rawh5.NumpyFile(filename) - self.assertIn("a/b/c", h5) - self.assertIn("a/b/e", h5) - - -def suite(): - test_suite = unittest.TestSuite() - loadTests = unittest.defaultTestLoader.loadTestsFromTestCase - test_suite.addTest(loadTests(TestNumpyFile)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest='suite') diff --git a/silx/io/test/test_specfile.py b/silx/io/test/test_specfile.py deleted file mode 100644 index 79d5544..0000000 --- a/silx/io/test/test_specfile.py +++ /dev/null @@ -1,433 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2019 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for specfile wrapper""" - -__authors__ = ["P. Knobel", "V.A. Sole"] -__license__ = "MIT" -__date__ = "17/01/2018" - - -import locale -import logging -import numpy -import os -import sys -import tempfile -import unittest - -from silx.utils import testutils - -from ..specfile import SpecFile, Scan -from .. import specfile - - -logger1 = logging.getLogger(__name__) - -sftext = """#F /tmp/sf.dat -#E 1455180875 -#D Thu Feb 11 09:54:35 2016 -#C imaging User = opid17 -#U00 user comment first line -#U01 This is a dummy file to test SpecFile parsing -#U02 -#U03 last line - -#O0 Pslit HGap MRTSlit UP MRTSlit DOWN -#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap -#o0 pshg mrtu mrtd -#o2 ss1vo ss1ho ss1vg - -#J0 Seconds IA ion.mono Current -#J1 xbpmc2 idgap1 Inorm - -#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2 -#D Thu Feb 11 09:55:20 2016 -#T 0.2 (Seconds) -#G0 0 -#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 -#G3 0 0 0 0 0 0 0 0 0 -#G4 0 -#Q -#P0 180.005 -0.66875 0.87125 -#P1 14.74255 16.197579 12.238283 -#UMI0 Current AutoM Shutter -#UMI1 192.51 OFF FE open -#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00; -#N 4 -#L first column second column 3rd_col --1.23 5.89 8 -8.478100E+01 5 1.56 -3.14 2.73 -3.14 -1.2 2.3 3.4 - -#S 25 ascan c3th 1.33245 1.52245 40 0.15 -#D Thu Feb 11 10:00:31 2016 -#P0 80.005 -1.66875 1.87125 -#P1 4.74255 6.197579 2.238283 -#N 5 -#L column0 column1 col2 col3 -0.0 0.1 0.2 0.3 -1.0 1.1 1.2 1.3 -2.0 2.1 2.2 2.3 -3.0 3.1 3.2 3.3 - -#S 26 yyyyyy -#D Thu Feb 11 09:55:20 2016 -#P0 80.005 -1.66875 1.87125 -#P1 4.74255 6.197579 2.238283 -#N 4 -#L first column second column 3rd_col -#C Sat Oct 31 15:51:47 1998. Scan aborted after 0 points. - -#F /tmp/sf.dat -#E 1455180876 -#D Thu Feb 11 09:54:36 2016 - -#S 1 aaaaaa -#U first duplicate line -#U second duplicate line -#@MCADEV 1 -#@MCA %16C -#@CHANN 3 0 2 1 -#@CALIB 1 2 3 -#N 3 -#L uno duo -1 2 -@A 0 1 2 -3 4 -@A 3.1 4 5 -5 6 -@A 6 7.7 8 -""" - - -loc = locale.getlocale(locale.LC_NUMERIC) -try: - locale.setlocale(locale.LC_NUMERIC, 'de_DE.utf8') -except locale.Error: - try_DE = False -else: - try_DE = True - locale.setlocale(locale.LC_NUMERIC, loc) - - -class TestSpecFile(unittest.TestCase): - @classmethod - def setUpClass(cls): - fd, cls.fname1 = tempfile.mkstemp(text=False) - if sys.version_info < (3, ): - os.write(fd, sftext) - else: - os.write(fd, bytes(sftext, 'ascii')) - os.close(fd) - - fd2, cls.fname2 = tempfile.mkstemp(text=False) - if sys.version_info < (3, ): - os.write(fd2, sftext[370:923]) - else: - os.write(fd2, bytes(sftext[370:923], 'ascii')) - os.close(fd2) - - fd3, cls.fname3 = tempfile.mkstemp(text=False) - txt = sftext[371:923] - if sys.version_info < (3, ): - os.write(fd3, txt) - else: - os.write(fd3, bytes(txt, 'ascii')) - os.close(fd3) - - @classmethod - def tearDownClass(cls): - os.unlink(cls.fname1) - os.unlink(cls.fname2) - os.unlink(cls.fname3) - - def setUp(self): - self.sf = SpecFile(self.fname1) - self.scan1 = self.sf[0] - self.scan1_2 = self.sf["1.2"] - self.scan25 = self.sf["25.1"] - self.empty_scan = self.sf["26.1"] - - self.sf_no_fhdr = SpecFile(self.fname2) - self.scan1_no_fhdr = self.sf_no_fhdr[0] - - self.sf_no_fhdr_crash = SpecFile(self.fname3) - self.scan1_no_fhdr_crash = self.sf_no_fhdr_crash[0] - - def tearDown(self): - self.sf.close() - self.sf_no_fhdr.close() - self.sf_no_fhdr_crash.close() - - def test_open(self): - self.assertIsInstance(self.sf, SpecFile) - with self.assertRaises(specfile.SfErrFileOpen): - SpecFile("doesnt_exist.dat") - - # test filename types unicode and bytes - if sys.version_info[0] < 3: - try: - SpecFile(self.fname1) - except TypeError: - self.fail("failed to handle filename as python2 str") - try: - SpecFile(unicode(self.fname1)) - except TypeError: - self.fail("failed to handle filename as python2 unicode") - else: - try: - SpecFile(self.fname1) - except TypeError: - self.fail("failed to handle filename as python3 str") - try: - SpecFile(bytes(self.fname1, 'utf-8')) - except TypeError: - self.fail("failed to handle filename as python3 bytes") - - def test_number_of_scans(self): - self.assertEqual(4, len(self.sf)) - - def test_list_of_scan_indices(self): - self.assertEqual(self.sf.list(), - [1, 25, 26, 1]) - self.assertEqual(self.sf.keys(), - ["1.1", "25.1", "26.1", "1.2"]) - - def test_index_number_order(self): - self.assertEqual(self.sf.index(1, 2), 3) # sf["1.2"]==sf[3] - self.assertEqual(self.sf.number(1), 25) # sf[1]==sf["25"] - self.assertEqual(self.sf.order(3), 2) # sf[3]==sf["1.2"] - with self.assertRaises(specfile.SfErrScanNotFound): - self.sf.index(3, 2) - with self.assertRaises(specfile.SfErrScanNotFound): - self.sf.index(99) - - def assertRaisesRegex(self, *args, **kwargs): - # Python 2 compatibility - if sys.version_info.major >= 3: - return super(TestSpecFile, self).assertRaisesRegex(*args, **kwargs) - else: - return self.assertRaisesRegexp(*args, **kwargs) - - def test_getitem(self): - self.assertIsInstance(self.sf[2], Scan) - self.assertIsInstance(self.sf["1.2"], Scan) - # int out of range - with self.assertRaisesRegex(IndexError, 'Scan index must be in ran'): - self.sf[107] - # float indexing not allowed - with self.assertRaisesRegex(TypeError, 'The scan identification k'): - self.sf[1.2] - # non existant scan with "N.M" indexing - with self.assertRaises(KeyError): - self.sf["3.2"] - - def test_specfile_iterator(self): - i = 0 - for scan in self.sf: - if i == 1: - self.assertEqual(scan.motor_positions, - self.sf[1].motor_positions) - i += 1 - # number of returned scans - self.assertEqual(i, len(self.sf)) - - def test_scan_index(self): - self.assertEqual(self.scan1.index, 0) - self.assertEqual(self.scan1_2.index, 3) - self.assertEqual(self.scan25.index, 1) - - def test_scan_headers(self): - self.assertEqual(self.scan25.scan_header_dict['S'], - "25 ascan c3th 1.33245 1.52245 40 0.15") - self.assertEqual(self.scan1.header[17], '#G0 0') - self.assertEqual(len(self.scan1.header), 29) - # parsing headers with long keys - self.assertEqual(self.scan1.scan_header_dict['UMI0'], - 'Current AutoM Shutter') - # parsing empty headers - self.assertEqual(self.scan1.scan_header_dict['Q'], '') - # duplicate headers: concatenated (with newline) - self.assertEqual(self.scan1_2.scan_header_dict["U"], - "first duplicate line\nsecond duplicate line") - - def test_file_headers(self): - self.assertEqual(self.scan1.header[1], - '#E 1455180875') - self.assertEqual(self.scan1.file_header_dict['F'], - '/tmp/sf.dat') - - def test_multiple_file_headers(self): - """Scan 1.2 is after the second file header, with a different - Epoch""" - self.assertEqual(self.scan1_2.header[1], - '#E 1455180876') - - def test_scan_labels(self): - self.assertEqual(self.scan1.labels, - ['first column', 'second column', '3rd_col']) - - def test_data(self): - # data_line() and data_col() take 1-based indices as arg - self.assertAlmostEqual(self.scan1.data_line(1)[2], - 1.56) - # tests for data transposition between original file and .data attr - self.assertAlmostEqual(self.scan1.data[2, 0], - 8) - self.assertEqual(self.scan1.data.shape, (3, 4)) - self.assertAlmostEqual(numpy.sum(self.scan1.data), 113.631) - - def test_data_column_by_name(self): - self.assertAlmostEqual(self.scan25.data_column_by_name("col2")[1], - 1.2) - # Scan.data is transposed after readinq, so column is the first index - self.assertAlmostEqual(numpy.sum(self.scan25.data_column_by_name("col2")), - numpy.sum(self.scan25.data[2, :])) - with self.assertRaises(specfile.SfErrColNotFound): - self.scan25.data_column_by_name("ygfxgfyxg") - - def test_motors(self): - self.assertEqual(len(self.scan1.motor_names), 6) - self.assertEqual(len(self.scan1.motor_positions), 6) - self.assertAlmostEqual(sum(self.scan1.motor_positions), - 223.385912) - self.assertEqual(self.scan1.motor_names[1], 'MRTSlit UP') - self.assertAlmostEqual( - self.scan25.motor_position_by_name('MRTSlit UP'), - -1.66875) - - def test_absence_of_file_header(self): - """We expect Scan.file_header to be an empty list in the absence - of a file header. - """ - self.assertEqual(len(self.scan1_no_fhdr.motor_names), 0) - # motor positions can still be read in the scan header - # even in the absence of motor names - self.assertAlmostEqual(sum(self.scan1_no_fhdr.motor_positions), - 223.385912) - self.assertEqual(len(self.scan1_no_fhdr.header), 15) - self.assertEqual(len(self.scan1_no_fhdr.scan_header), 15) - self.assertEqual(len(self.scan1_no_fhdr.file_header), 0) - - def test_crash_absence_of_file_header(self): - """Test no crash in absence of file header and no leading newline - character - """ - self.assertEqual(len(self.scan1_no_fhdr_crash.motor_names), 0) - # motor positions can still be read in the scan header - # even in the absence of motor names - self.assertAlmostEqual(sum(self.scan1_no_fhdr_crash.motor_positions), - 223.385912) - self.assertEqual(len(self.scan1_no_fhdr_crash.scan_header), 15) - self.assertEqual(len(self.scan1_no_fhdr_crash.file_header), 0) - - def test_mca(self): - self.assertEqual(len(self.scan1.mca), 0) - self.assertEqual(len(self.scan1_2.mca), 3) - self.assertEqual(self.scan1_2.mca[1][2], 5) - self.assertEqual(sum(self.scan1_2.mca[2]), 21.7) - - # Negative indexing - self.assertEqual(sum(self.scan1_2.mca[len(self.scan1_2.mca) - 1]), - sum(self.scan1_2.mca[-1])) - - # Test iterator - line_count, total_sum = (0, 0) - for mca_line in self.scan1_2.mca: - line_count += 1 - total_sum += sum(mca_line) - self.assertEqual(line_count, 3) - self.assertAlmostEqual(total_sum, 36.8) - - def test_mca_header(self): - self.assertEqual(self.scan1.mca_header_dict, {}) - self.assertEqual(len(self.scan1_2.mca_header_dict), 4) - self.assertEqual(self.scan1_2.mca_header_dict["CALIB"], "1 2 3") - self.assertEqual(self.scan1_2.mca.calibration, - [[1., 2., 3.]]) - # default calib in the absence of #@CALIB - self.assertEqual(self.scan25.mca.calibration, - [[0., 1., 0.]]) - self.assertEqual(self.scan1_2.mca.channels, - [[0, 1, 2]]) - # absence of #@CHANN and spectra - self.assertEqual(self.scan25.mca.channels, - []) - - @testutils.test_logging(specfile._logger.name, warning=1) - def test_empty_scan(self): - """Test reading a scan with no data points""" - self.assertEqual(len(self.empty_scan.labels), - 3) - col1 = self.empty_scan.data_column_by_name("second column") - self.assertEqual(col1.shape, (0, )) - - -class TestSFLocale(unittest.TestCase): - @classmethod - def setUpClass(cls): - fd, cls.fname = tempfile.mkstemp(text=False) - if sys.version_info < (3, ): - os.write(fd, sftext) - else: - os.write(fd, bytes(sftext, 'ascii')) - os.close(fd) - - @classmethod - def tearDownClass(cls): - os.unlink(cls.fname) - locale.setlocale(locale.LC_NUMERIC, loc) # restore saved locale - - def crunch_data(self): - self.sf3 = SpecFile(self.fname) - self.assertAlmostEqual(self.sf3[0].data_line(1)[2], - 1.56) - self.sf3.close() - - @unittest.skipIf(not try_DE, "de_DE.utf8 locale not installed") - def test_locale_de_DE(self): - locale.setlocale(locale.LC_NUMERIC, 'de_DE.utf8') - self.crunch_data() - - def test_locale_user(self): - locale.setlocale(locale.LC_NUMERIC, '') # use user's preferred locale - self.crunch_data() - - def test_locale_C(self): - locale.setlocale(locale.LC_NUMERIC, 'C') # use default (C) locale - self.crunch_data() - - -def suite(): - test_suite = unittest.TestSuite() - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecFile)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSFLocale)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_specfilewrapper.py b/silx/io/test/test_specfilewrapper.py deleted file mode 100644 index 2f463fa..0000000 --- a/silx/io/test/test_specfilewrapper.py +++ /dev/null @@ -1,206 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for old specfile wrapper""" - -__authors__ = ["P. Knobel"] -__license__ = "MIT" -__date__ = "15/05/2017" - -import locale -import logging -import numpy -import os -import sys -import tempfile -import unittest - -logger1 = logging.getLogger(__name__) - -from ..specfilewrapper import Specfile - -sftext = """#F /tmp/sf.dat -#E 1455180875 -#D Thu Feb 11 09:54:35 2016 -#C imaging User = opid17 -#U00 user comment first line -#U01 This is a dummy file to test SpecFile parsing -#U02 -#U03 last line - -#O0 Pslit HGap MRTSlit UP MRTSlit DOWN -#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap -#o0 pshg mrtu mrtd -#o2 ss1vo ss1ho ss1vg - -#J0 Seconds IA ion.mono Current -#J1 xbpmc2 idgap1 Inorm - -#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2 -#D Thu Feb 11 09:55:20 2016 -#T 0.2 (Seconds) -#G0 0 -#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 -#G3 0 0 0 0 0 0 0 0 0 -#G4 0 -#Q -#P0 180.005 -0.66875 0.87125 -#P1 14.74255 16.197579 12.238283 -#UMI0 Current AutoM Shutter -#UMI1 192.51 OFF FE open -#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00; -#N 4 -#L first column second column 3rd_col --1.23 5.89 8 -8.478100E+01 5 1.56 -3.14 2.73 -3.14 -1.2 2.3 3.4 - -#S 25 ascan c3th 1.33245 1.52245 40 0.15 -#D Thu Feb 11 10:00:31 2016 -#P0 80.005 -1.66875 1.87125 -#P1 4.74255 6.197579 2.238283 -#N 5 -#L column0 column1 col2 col3 -0.0 0.1 0.2 0.3 -1.0 1.1 1.2 1.3 -2.0 2.1 2.2 2.3 -3.0 3.1 3.2 3.3 - -#F /tmp/sf.dat -#E 1455180876 -#D Thu Feb 11 09:54:36 2016 - -#S 1 aaaaaa -#U first duplicate line -#U second duplicate line -#@MCADEV 1 -#@MCA %16C -#@CHANN 3 0 2 1 -#@CALIB 1 2 3 -#N 3 -#L uno duo -1 2 -@A 0 1 2 -3 4 -@A 3.1 4 5 -5 6 -@A 6 7.7 8 -""" - - -class TestSpecfilewrapper(unittest.TestCase): - @classmethod - def setUpClass(cls): - fd, cls.fname1 = tempfile.mkstemp(text=False) - if sys.version_info < (3, ): - os.write(fd, sftext) - else: - os.write(fd, bytes(sftext, 'ascii')) - os.close(fd) - - @classmethod - def tearDownClass(cls): - os.unlink(cls.fname1) - - def setUp(self): - self.sf = Specfile(self.fname1) - self.scan1 = self.sf[0] - self.scan1_2 = self.sf.select("1.2") - self.scan25 = self.sf.select("25.1") - - def tearDown(self): - self.sf.close() - - def test_number_of_scans(self): - self.assertEqual(3, len(self.sf)) - - def test_list_of_scan_indices(self): - self.assertEqual(self.sf.list(), - '1,25,1') - self.assertEqual(self.sf.keys(), - ["1.1", "25.1", "1.2"]) - - def test_scan_headers(self): - self.assertEqual(self.scan25.header('S'), - ["#S 25 ascan c3th 1.33245 1.52245 40 0.15"]) - self.assertEqual(self.scan1.header("G0"), ['#G0 0']) - # parsing headers with long keys - # parsing empty headers - self.assertEqual(self.scan1.header('Q'), ['#Q ']) - - def test_file_headers(self): - self.assertEqual(self.scan1.header("E"), - ['#E 1455180875']) - self.assertEqual(self.sf.title(), - "imaging") - self.assertEqual(self.sf.epoch(), - 1455180875) - self.assertEqual(self.sf.allmotors(), - ["Pslit HGap", "MRTSlit UP", "MRTSlit DOWN", - "Sslit1 VOff", "Sslit1 HOff", "Sslit1 VGap"]) - - def test_scan_labels(self): - self.assertEqual(self.scan1.alllabels(), - ['first column', 'second column', '3rd_col']) - - def test_data(self): - self.assertAlmostEqual(self.scan1.dataline(3)[2], - -3.14) - self.assertAlmostEqual(self.scan1.datacol(1)[2], - 3.14) - # tests for data transposition between original file and .data attr - self.assertAlmostEqual(self.scan1.data()[2, 0], - 8) - self.assertEqual(self.scan1.data().shape, (3, 4)) - self.assertAlmostEqual(numpy.sum(self.scan1.data()), 113.631) - - def test_date(self): - self.assertEqual(self.scan1.date(), - "Thu Feb 11 09:55:20 2016") - - def test_motors(self): - self.assertEqual(len(self.sf.allmotors()), 6) - self.assertEqual(len(self.scan1.allmotorpos()), 6) - self.assertAlmostEqual(sum(self.scan1.allmotorpos()), - 223.385912) - self.assertEqual(self.sf.allmotors()[1], 'MRTSlit UP') - - def test_mca(self): - self.assertEqual(self.scan1_2.mca(2)[2], 5) - self.assertEqual(sum(self.scan1_2.mca(3)), 21.7) - - def test_mca_header(self): - self.assertEqual(self.scan1_2.header("CALIB"), - ["#@CALIB 1 2 3"]) - - -def suite(): - test_suite = unittest.TestSuite() - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecfilewrapper)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_spech5.py b/silx/io/test/test_spech5.py deleted file mode 100644 index 0263c3c..0000000 --- a/silx/io/test/test_spech5.py +++ /dev/null @@ -1,881 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2019 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for spech5""" -from numpy import array_equal -import os -import io -import sys -import tempfile -import unittest -import datetime -from functools import partial - -from silx.utils import testutils - -from .. import spech5 -from ..spech5 import (SpecH5, SpecH5Dataset, spec_date_to_iso8601) -from .. import specfile - -import h5py - -__authors__ = ["P. Knobel"] -__license__ = "MIT" -__date__ = "12/02/2018" - -sftext = """#F /tmp/sf.dat -#E 1455180875 -#D Thu Feb 11 09:54:35 2016 -#C imaging User = opid17 -#O0 Pslit HGap MRTSlit UP MRTSlit DOWN -#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap -#o0 pshg mrtu mrtd -#o2 ss1vo ss1ho ss1vg - -#J0 Seconds IA ion.mono Current -#J1 xbpmc2 idgap1 Inorm - -#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2 -#D Thu Feb 11 09:55:20 2016 -#T 0.2 (Seconds) -#P0 180.005 -0.66875 0.87125 -#P1 14.74255 16.197579 12.238283 -#N 4 -#L MRTSlit UP second column 3rd_col --1.23 5.89 8 -8.478100E+01 5 1.56 -3.14 2.73 -3.14 -1.2 2.3 3.4 - -#S 25 ascan c3th 1.33245 1.52245 40 0.15 -#D Sat 2015/03/14 03:53:50 -#P0 80.005 -1.66875 1.87125 -#P1 4.74255 6.197579 2.238283 -#N 5 -#L column0 column1 col2 col3 -0.0 0.1 0.2 0.3 -1.0 1.1 1.2 1.3 -2.0 2.1 2.2 2.3 -3.0 3.1 3.2 3.3 - -#S 1 aaaaaa -#D Thu Feb 11 10:00:32 2016 -#@MCADEV 1 -#@MCA %16C -#@CHANN 3 0 2 1 -#@CALIB 1 2 3 -#@CTIME 123.4 234.5 345.6 -#N 3 -#L uno duo -1 2 -@A 0 1 2 -@A 10 9 8 -@A 1 1 1.1 -3 4 -@A 3.1 4 5 -@A 7 6 5 -@A 1 1 1 -5 6 -@A 6 7.7 8 -@A 4 3 2 -@A 1 1 1 - -#S 1000 bbbbb -#G1 3.25 3.25 5.207 90 90 120 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353 -#G3 0.0106337923671 0.027529133 1.206191273 -1.43467075 0.7633438883 0.02401568018 -1.709143587 -2.097621783 0.02456954971 -#L a b -1 2 - -#S 1001 ccccc -#G1 0. 0. 0. 0 0 0 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353 -#G3 0. 0. 0. 0. 0.0 0. 0. 0. 0. -#L a b -1 2 - -""" - - -class TestSpecDate(unittest.TestCase): - """ - Test of the spec_date_to_iso8601 function. - """ - # TODO : time zone tests - # TODO : error cases - - @classmethod - def setUpClass(cls): - import locale - # FYI : not threadsafe - cls.locale_saved = locale.setlocale(locale.LC_TIME) - locale.setlocale(locale.LC_TIME, 'C') - - @classmethod - def tearDownClass(cls): - import locale - # FYI : not threadsafe - locale.setlocale(locale.LC_TIME, cls.locale_saved) - - def setUp(self): - # covering all week days - self.n_days = range(1, 10) - # covering all months - self.n_months = range(1, 13) - - self.n_years = [1999, 2016, 2020] - self.n_seconds = [0, 5, 26, 59] - self.n_minutes = [0, 9, 42, 59] - self.n_hours = [0, 2, 17, 23] - - self.formats = ['%a %b %d %H:%M:%S %Y', '%a %Y/%m/%d %H:%M:%S'] - - self.check_date_formats = partial(self.__check_date_formats, - year=self.n_years[0], - month=self.n_months[0], - day=self.n_days[0], - hour=self.n_hours[0], - minute=self.n_minutes[0], - second=self.n_seconds[0], - msg=None) - - def __check_date_formats(self, - year, - month, - day, - hour, - minute, - second, - msg=None): - dt = datetime.datetime(year, month, day, hour, minute, second) - expected_date = dt.isoformat() - - for i_fmt, fmt in enumerate(self.formats): - spec_date = dt.strftime(fmt) - iso_date = spec_date_to_iso8601(spec_date) - self.assertEqual(iso_date, - expected_date, - msg='Testing {0}. format={1}. ' - 'Expected "{2}", got "{3} ({4})" (dt={5}).' - ''.format(msg, - i_fmt, - expected_date, - iso_date, - spec_date, - dt)) - - def testYearsNominal(self): - for year in self.n_years: - self.check_date_formats(year=year, msg='year') - - def testMonthsNominal(self): - for month in self.n_months: - self.check_date_formats(month=month, msg='month') - - def testDaysNominal(self): - for day in self.n_days: - self.check_date_formats(day=day, msg='day') - - def testHoursNominal(self): - for hour in self.n_hours: - self.check_date_formats(hour=hour, msg='hour') - - def testMinutesNominal(self): - for minute in self.n_minutes: - self.check_date_formats(minute=minute, msg='minute') - - def testSecondsNominal(self): - for second in self.n_seconds: - self.check_date_formats(second=second, msg='second') - - -class TestSpecH5(unittest.TestCase): - @classmethod - def setUpClass(cls): - fd, cls.fname = tempfile.mkstemp() - if sys.version_info < (3, ): - os.write(fd, sftext) - else: - os.write(fd, bytes(sftext, 'ascii')) - os.close(fd) - - @classmethod - def tearDownClass(cls): - os.unlink(cls.fname) - - def setUp(self): - self.sfh5 = SpecH5(self.fname) - - def tearDown(self): - self.sfh5.close() - - def testContainsFile(self): - self.assertIn("/1.2/measurement", self.sfh5) - self.assertIn("/25.1", self.sfh5) - self.assertIn("25.1", self.sfh5) - self.assertNotIn("25.2", self.sfh5) - # measurement is a child of a scan, full path would be required to - # access from root level - self.assertNotIn("measurement", self.sfh5) - # Groups may or may not have a trailing / - self.assertIn("/1.2/measurement/mca_1/", self.sfh5) - self.assertIn("/1.2/measurement/mca_1", self.sfh5) - # Datasets can't have a trailing / - self.assertNotIn("/1.2/measurement/mca_0/info/calibration/ ", self.sfh5) - # No mca_8 - self.assertNotIn("/1.2/measurement/mca_8/info/calibration", self.sfh5) - # Link - self.assertIn("/1.2/measurement/mca_0/info/calibration", self.sfh5) - - def testContainsGroup(self): - self.assertIn("measurement", self.sfh5["/1.2/"]) - self.assertIn("measurement", self.sfh5["/1.2"]) - self.assertIn("25.1", self.sfh5["/"]) - self.assertNotIn("25.2", self.sfh5["/"]) - self.assertIn("instrument/positioners/Sslit1 HOff", self.sfh5["/1.1"]) - # illegal trailing "/" after dataset name - self.assertNotIn("instrument/positioners/Sslit1 HOff/", - self.sfh5["/1.1"]) - # full path to element in group (OK) - self.assertIn("/1.1/instrument/positioners/Sslit1 HOff", - self.sfh5["/1.1/instrument"]) - - def testDataColumn(self): - self.assertAlmostEqual(sum(self.sfh5["/1.2/measurement/duo"]), - 12.0) - self.assertAlmostEqual( - sum(self.sfh5["1.1"]["measurement"]["MRTSlit UP"]), - 87.891, places=4) - - def testDate(self): - # start time is in Iso8601 format - self.assertEqual(self.sfh5["/1.1/start_time"], - u"2016-02-11T09:55:20") - self.assertEqual(self.sfh5["25.1/start_time"], - u"2015-03-14T03:53:50") - - def assertRaisesRegex(self, *args, **kwargs): - # Python 2 compatibility - if sys.version_info.major >= 3: - return super(TestSpecH5, self).assertRaisesRegex(*args, **kwargs) - else: - return self.assertRaisesRegexp(*args, **kwargs) - - def testDatasetInstanceAttr(self): - """The SpecH5Dataset objects must implement some dummy attributes - to improve compatibility with widgets dealing with h5py datasets.""" - self.assertIsNone(self.sfh5["/1.1/start_time"].compression) - self.assertIsNone(self.sfh5["1.1"]["measurement"]["MRTSlit UP"].chunks) - - # error message must be explicit - with self.assertRaisesRegex( - AttributeError, - "SpecH5Dataset has no attribute tOTo"): - dummy = self.sfh5["/1.1/start_time"].tOTo - - def testGet(self): - """Test :meth:`SpecH5Group.get`""" - # default value of param *default* is None - self.assertIsNone(self.sfh5.get("toto")) - self.assertEqual(self.sfh5["25.1"].get("toto", default=-3), - -3) - - self.assertEqual(self.sfh5.get("/1.1/start_time", default=-3), - u"2016-02-11T09:55:20") - - def testGetClass(self): - """Test :meth:`SpecH5Group.get`""" - self.assertIs(self.sfh5["1.1"].get("start_time", getclass=True), - h5py.Dataset) - self.assertIs(self.sfh5["1.1"].get("instrument", getclass=True), - h5py.Group) - - # spech5 does not define external link, so there is no way - # a group can *get* a SpecH5 class - - def testGetApi(self): - result = self.sfh5.get("1.1", getclass=True, getlink=True) - self.assertIs(result, h5py.HardLink) - result = self.sfh5.get("1.1", getclass=False, getlink=True) - self.assertIsInstance(result, h5py.HardLink) - result = self.sfh5.get("1.1", getclass=True, getlink=False) - self.assertIs(result, h5py.Group) - result = self.sfh5.get("1.1", getclass=False, getlink=False) - self.assertIsInstance(result, spech5.SpecH5Group) - - def testGetItemGroup(self): - group = self.sfh5["25.1"]["instrument"] - self.assertEqual(list(group["positioners"].keys()), - ["Pslit HGap", "MRTSlit UP", "MRTSlit DOWN", - "Sslit1 VOff", "Sslit1 HOff", "Sslit1 VGap"]) - with self.assertRaises(KeyError): - group["Holy Grail"] - - def testGetitemSpecH5(self): - self.assertEqual(self.sfh5["/1.2/instrument/positioners"], - self.sfh5["1.2"]["instrument"]["positioners"]) - - def testH5pyClass(self): - """Test :attr:`h5py_class` returns the corresponding h5py class - (h5py.File, h5py.Group, h5py.Dataset)""" - a_file = self.sfh5 - self.assertIs(a_file.h5py_class, - h5py.File) - - a_group = self.sfh5["/1.2/measurement"] - self.assertIs(a_group.h5py_class, - h5py.Group) - - a_dataset = self.sfh5["/1.1/instrument/positioners/Sslit1 HOff"] - self.assertIs(a_dataset.h5py_class, - h5py.Dataset) - - def testHeader(self): - file_header = self.sfh5["/1.2/instrument/specfile/file_header"] - scan_header = self.sfh5["/1.2/instrument/specfile/scan_header"] - - # File header has 10 lines - self.assertEqual(len(file_header), 10) - # 1.2 has 9 scan & mca header lines - self.assertEqual(len(scan_header), 9) - - # line 4 of file header - self.assertEqual( - file_header[3], - u"#C imaging User = opid17") - # line 4 of scan header - scan_header = self.sfh5["25.1/instrument/specfile/scan_header"] - - self.assertEqual( - scan_header[3], - u"#P1 4.74255 6.197579 2.238283") - - def testLinks(self): - self.assertTrue( - array_equal(self.sfh5["/1.2/measurement/mca_0/data"], - self.sfh5["/1.2/instrument/mca_0/data"]) - ) - self.assertTrue( - array_equal(self.sfh5["/1.2/measurement/mca_0/info/data"], - self.sfh5["/1.2/instrument/mca_0/data"]) - ) - self.assertTrue( - array_equal(self.sfh5["/1.2/measurement/mca_0/info/channels"], - self.sfh5["/1.2/instrument/mca_0/channels"]) - ) - self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/"].keys(), - self.sfh5["/1.2/instrument/mca_0/"].keys()) - - self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/preset_time"], - self.sfh5["/1.2/instrument/mca_0/preset_time"]) - self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/live_time"], - self.sfh5["/1.2/instrument/mca_0/live_time"]) - self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/elapsed_time"], - self.sfh5["/1.2/instrument/mca_0/elapsed_time"]) - - def testListScanIndices(self): - self.assertEqual(list(self.sfh5.keys()), - ["1.1", "25.1", "1.2", "1000.1", "1001.1"]) - self.assertEqual(self.sfh5["1.2"].attrs, - {"NX_class": "NXentry", }) - - def testMcaAbsent(self): - def access_absent_mca(): - """This must raise a KeyError, because scan 1.1 has no MCA""" - return self.sfh5["/1.1/measurement/mca_0/"] - self.assertRaises(KeyError, access_absent_mca) - - def testMcaCalib(self): - mca0_calib = self.sfh5["/1.2/measurement/mca_0/info/calibration"] - mca1_calib = self.sfh5["/1.2/measurement/mca_1/info/calibration"] - self.assertEqual(mca0_calib.tolist(), - [1, 2, 3]) - # calibration is unique in this scan and applies to all analysers - self.assertEqual(mca0_calib.tolist(), - mca1_calib.tolist()) - - def testMcaChannels(self): - mca0_chann = self.sfh5["/1.2/measurement/mca_0/info/channels"] - mca1_chann = self.sfh5["/1.2/measurement/mca_1/info/channels"] - self.assertEqual(mca0_chann.tolist(), - [0, 1, 2]) - self.assertEqual(mca0_chann.tolist(), - mca1_chann.tolist()) - - def testMcaCtime(self): - """Tests for #@CTIME mca header""" - datasets = ["preset_time", "live_time", "elapsed_time"] - for ds in datasets: - self.assertNotIn("/1.1/instrument/mca_0/" + ds, self.sfh5) - self.assertIn("/1.2/instrument/mca_0/" + ds, self.sfh5) - - mca0_preset_time = self.sfh5["/1.2/instrument/mca_0/preset_time"] - mca1_preset_time = self.sfh5["/1.2/instrument/mca_1/preset_time"] - self.assertLess(mca0_preset_time - 123.4, - 10**-5) - # ctime is unique in a this scan and applies to all analysers - self.assertEqual(mca0_preset_time, - mca1_preset_time) - - mca0_live_time = self.sfh5["/1.2/instrument/mca_0/live_time"] - mca1_live_time = self.sfh5["/1.2/instrument/mca_1/live_time"] - self.assertLess(mca0_live_time - 234.5, - 10**-5) - self.assertEqual(mca0_live_time, - mca1_live_time) - - mca0_elapsed_time = self.sfh5["/1.2/instrument/mca_0/elapsed_time"] - mca1_elapsed_time = self.sfh5["/1.2/instrument/mca_1/elapsed_time"] - self.assertLess(mca0_elapsed_time - 345.6, - 10**-5) - self.assertEqual(mca0_elapsed_time, - mca1_elapsed_time) - - def testMcaData(self): - # sum 1st MCA in scan 1.2 over rows - mca_0_data = self.sfh5["/1.2/measurement/mca_0/data"] - for summed_row, expected in zip(mca_0_data.sum(axis=1).tolist(), - [3.0, 12.1, 21.7]): - self.assertAlmostEqual(summed_row, expected, places=4) - - # sum 3rd MCA in scan 1.2 along both axis - mca_2_data = self.sfh5["1.2"]["measurement"]["mca_2"]["data"] - self.assertAlmostEqual(sum(sum(mca_2_data)), 9.1, places=5) - # attrs - self.assertEqual(mca_0_data.attrs, {"interpretation": "spectrum"}) - - def testMotorPosition(self): - positioners_group = self.sfh5["/1.1/instrument/positioners"] - # MRTSlit DOWN position is defined in #P0 san header line - self.assertAlmostEqual(float(positioners_group["MRTSlit DOWN"]), - 0.87125) - # MRTSlit UP position is defined in first data column - for a, b in zip(positioners_group["MRTSlit UP"].tolist(), - [-1.23, 8.478100E+01, 3.14, 1.2]): - self.assertAlmostEqual(float(a), b, places=4) - - def testNumberMcaAnalysers(self): - """Scan 1.2 has 2 data columns + 3 mca spectra per data line.""" - self.assertEqual(len(self.sfh5["1.2"]["measurement"]), 5) - - def testTitle(self): - self.assertEqual(self.sfh5["/25.1/title"], - u"ascan c3th 1.33245 1.52245 40 0.15") - - def testValues(self): - group = self.sfh5["/25.1"] - self.assertTrue(hasattr(group, "values")) - self.assertTrue(callable(group.values)) - self.assertIn(self.sfh5["/25.1/title"], - self.sfh5["/25.1"].values()) - - # visit and visititems ignore links - def testVisit(self): - name_list = [] - self.sfh5.visit(name_list.append) - self.assertIn('1.2/instrument/positioners/Pslit HGap', name_list) - self.assertIn("1.2/instrument/specfile/scan_header", name_list) - self.assertEqual(len(name_list), 117) - - # test also visit of a subgroup, with various group name formats - name_list_leading_and_trailing_slash = [] - self.sfh5['/1.2/instrument/'].visit(name_list_leading_and_trailing_slash.append) - name_list_leading_slash = [] - self.sfh5['/1.2/instrument'].visit(name_list_leading_slash.append) - name_list_trailing_slash = [] - self.sfh5['1.2/instrument/'].visit(name_list_trailing_slash.append) - name_list_no_slash = [] - self.sfh5['1.2/instrument'].visit(name_list_no_slash.append) - - # no differences expected in the output names - self.assertEqual(name_list_leading_and_trailing_slash, - name_list_leading_slash) - self.assertEqual(name_list_leading_slash, - name_list_trailing_slash) - self.assertEqual(name_list_leading_slash, - name_list_no_slash) - self.assertIn("positioners/Pslit HGap", name_list_no_slash) - self.assertIn("positioners", name_list_no_slash) - - def testVisitItems(self): - dataset_name_list = [] - - def func_generator(l): - """return a function appending names to list l""" - def func(name, obj): - if isinstance(obj, SpecH5Dataset): - l.append(name) - return func - - self.sfh5.visititems(func_generator(dataset_name_list)) - self.assertIn('1.2/instrument/positioners/Pslit HGap', dataset_name_list) - self.assertEqual(len(dataset_name_list), 85) - - # test also visit of a subgroup, with various group name formats - name_list_leading_and_trailing_slash = [] - self.sfh5['/1.2/instrument/'].visititems(func_generator(name_list_leading_and_trailing_slash)) - name_list_leading_slash = [] - self.sfh5['/1.2/instrument'].visititems(func_generator(name_list_leading_slash)) - name_list_trailing_slash = [] - self.sfh5['1.2/instrument/'].visititems(func_generator(name_list_trailing_slash)) - name_list_no_slash = [] - self.sfh5['1.2/instrument'].visititems(func_generator(name_list_no_slash)) - - # no differences expected in the output names - self.assertEqual(name_list_leading_and_trailing_slash, - name_list_leading_slash) - self.assertEqual(name_list_leading_slash, - name_list_trailing_slash) - self.assertEqual(name_list_leading_slash, - name_list_no_slash) - self.assertIn("positioners/Pslit HGap", name_list_no_slash) - - def testNotSpecH5(self): - fd, fname = tempfile.mkstemp() - os.write(fd, b"Not a spec file!") - os.close(fd) - self.assertRaises(specfile.SfErrFileOpen, SpecH5, fname) - self.assertRaises(IOError, SpecH5, fname) - os.unlink(fname) - - def testSample(self): - self.assertNotIn("sample", self.sfh5["/1.1"]) - self.assertIn("sample", self.sfh5["/1000.1"]) - self.assertIn("ub_matrix", self.sfh5["/1000.1/sample"]) - self.assertIn("unit_cell", self.sfh5["/1000.1/sample"]) - self.assertIn("unit_cell_abc", self.sfh5["/1000.1/sample"]) - self.assertIn("unit_cell_alphabetagamma", self.sfh5["/1000.1/sample"]) - - # All 0 values - self.assertNotIn("sample", self.sfh5["/1001.1"]) - with self.assertRaises(KeyError): - self.sfh5["/1001.1/sample/unit_cell"] - - @testutils.test_logging(spech5.logger1.name, warning=2) - def testOpenFileDescriptor(self): - """Open a SpecH5 file from a file descriptor""" - with io.open(self.sfh5.filename) as f: - sfh5 = SpecH5(f) - self.assertIsNotNone(sfh5) - name_list = [] - # check if the object is working - self.sfh5.visit(name_list.append) - sfh5.close() - - -sftext_multi_mca_headers = """ -#S 1 aaaaaa -#@MCA %16C -#@CHANN 3 0 2 1 -#@CALIB 1 2 3 -#@CTIME 123.4 234.5 345.6 -#@MCA %16C -#@CHANN 3 1 3 1 -#@CALIB 5.5 6.6 7.7 -#@CTIME 10 11 12 -#N 3 -#L uno duo -1 2 -@A 0 1 2 -@A 10 9 8 -3 4 -@A 3.1 4 5 -@A 7 6 5 -5 6 -@A 6 7.7 8 -@A 4 3 2 - -""" - - -class TestSpecH5MultiMca(unittest.TestCase): - @classmethod - def setUpClass(cls): - fd, cls.fname = tempfile.mkstemp(text=False) - if sys.version_info < (3, ): - os.write(fd, sftext_multi_mca_headers) - else: - os.write(fd, bytes(sftext_multi_mca_headers, 'ascii')) - os.close(fd) - - @classmethod - def tearDownClass(cls): - os.unlink(cls.fname) - - def setUp(self): - self.sfh5 = SpecH5(self.fname) - - def tearDown(self): - self.sfh5.close() - - def testMcaCalib(self): - mca0_calib = self.sfh5["/1.1/measurement/mca_0/info/calibration"] - mca1_calib = self.sfh5["/1.1/measurement/mca_1/info/calibration"] - self.assertEqual(mca0_calib.tolist(), - [1, 2, 3]) - self.assertAlmostEqual(sum(mca1_calib.tolist()), - sum([5.5, 6.6, 7.7]), - places=5) - - def testMcaChannels(self): - mca0_chann = self.sfh5["/1.1/measurement/mca_0/info/channels"] - mca1_chann = self.sfh5["/1.1/measurement/mca_1/info/channels"] - self.assertEqual(mca0_chann.tolist(), - [0., 1., 2.]) - # @CHANN is unique in this scan and applies to all analysers - self.assertEqual(mca1_chann.tolist(), - [1., 2., 3.]) - - def testMcaCtime(self): - """Tests for #@CTIME mca header""" - mca0_preset_time = self.sfh5["/1.1/instrument/mca_0/preset_time"] - mca1_preset_time = self.sfh5["/1.1/instrument/mca_1/preset_time"] - self.assertLess(mca0_preset_time - 123.4, - 10**-5) - self.assertLess(mca1_preset_time - 10, - 10**-5) - - mca0_live_time = self.sfh5["/1.1/instrument/mca_0/live_time"] - mca1_live_time = self.sfh5["/1.1/instrument/mca_1/live_time"] - self.assertLess(mca0_live_time - 234.5, - 10**-5) - self.assertLess(mca1_live_time - 11, - 10**-5) - - mca0_elapsed_time = self.sfh5["/1.1/instrument/mca_0/elapsed_time"] - mca1_elapsed_time = self.sfh5["/1.1/instrument/mca_1/elapsed_time"] - self.assertLess(mca0_elapsed_time - 345.6, - 10**-5) - self.assertLess(mca1_elapsed_time - 12, - 10**-5) - - -sftext_no_cols = r"""#F C:/DATA\test.mca -#D Thu Jul 7 08:40:19 2016 - -#S 1 31oct98.dat 22.1 If4 -#D Thu Jul 7 08:40:19 2016 -#C no data cols, one mca analyser, single spectrum -#@MCA %16C -#@CHANN 151 0 150 1 -#@CALIB 0 2 0 -@A 789 784 788 814 847 862 880 904 925 955 987 1015 1031 1070 1111 1139 \ -1203 1236 1290 1392 1492 1558 1688 1813 1977 2119 2346 2699 3121 3542 4102 4970 \ -6071 7611 10426 16188 28266 40348 50539 55555 56162 54162 47102 35718 24588 17034 12994 11444 \ -11808 13461 15687 18885 23827 31578 41999 49556 58084 59415 59456 55698 44525 28219 17680 12881 \ -9518 7415 6155 5246 4646 3978 3612 3299 3020 2761 2670 2472 2500 2310 2286 2106 \ -1989 1890 1782 1655 1421 1293 1135 990 879 757 672 618 532 488 445 424 \ -414 373 351 325 307 284 270 247 228 213 199 187 183 176 164 156 \ -153 140 142 130 118 118 103 101 97 86 90 86 87 81 75 82 \ -80 76 77 75 76 77 62 69 74 60 65 68 65 58 63 64 \ -63 59 60 56 57 60 55 - -#S 2 31oct98.dat 22.1 If4 -#D Thu Jul 7 08:40:19 2016 -#C no data cols, one mca analyser, multiple spectra -#@MCA %16C -#@CHANN 3 0 2 1 -#@CALIB 1 2 3 -#@CTIME 123.4 234.5 345.6 -@A 0 1 2 -@A 10 9 8 -@A 1 1 1.1 -@A 3.1 4 5 -@A 7 6 5 -@A 1 1 1 -@A 6 7.7 8 -@A 4 3 2 -@A 1 1 1 - -#S 3 31oct98.dat 22.1 If4 -#D Thu Jul 7 08:40:19 2016 -#C no data cols, 3 mca analysers, multiple spectra -#@MCADEV 1 -#@MCA %16C -#@CHANN 3 0 2 1 -#@CALIB 1 2 3 -#@CTIME 123.4 234.5 345.6 -#@MCADEV 2 -#@MCA %16C -#@CHANN 3 0 2 1 -#@CALIB 1 2 3 -#@CTIME 123.4 234.5 345.6 -#@MCADEV 3 -#@MCA %16C -#@CHANN 3 0 2 1 -#@CALIB 1 2 3 -#@CTIME 123.4 234.5 345.6 -@A 0 1 2 -@A 10 9 8 -@A 1 1 1.1 -@A 3.1 4 5 -@A 7 6 5 -@A 1 1 1 -@A 6 7.7 8 -@A 4 3 2 -@A 1 1 1 -""" - - -class TestSpecH5NoDataCols(unittest.TestCase): - """Test reading SPEC files with only MCA data""" - @classmethod - def setUpClass(cls): - fd, cls.fname = tempfile.mkstemp() - if sys.version_info < (3, ): - os.write(fd, sftext_no_cols) - else: - os.write(fd, bytes(sftext_no_cols, 'ascii')) - os.close(fd) - - @classmethod - def tearDownClass(cls): - os.unlink(cls.fname) - - def setUp(self): - self.sfh5 = SpecH5(self.fname) - - def tearDown(self): - self.sfh5.close() - - def testScan1(self): - # 1.1: single analyser, single spectrum, 151 channels - self.assertIn("mca_0", - self.sfh5["1.1/instrument/"]) - self.assertEqual(self.sfh5["1.1/instrument/mca_0/data"].shape, - (1, 151)) - self.assertNotIn("mca_1", - self.sfh5["1.1/instrument/"]) - - def testScan2(self): - # 2.1: single analyser, 9 spectra, 3 channels - self.assertIn("mca_0", - self.sfh5["2.1/instrument/"]) - self.assertEqual(self.sfh5["2.1/instrument/mca_0/data"].shape, - (9, 3)) - self.assertNotIn("mca_1", - self.sfh5["2.1/instrument/"]) - - def testScan3(self): - # 3.1: 3 analysers, 3 spectra/analyser, 3 channels - for i in range(3): - self.assertIn("mca_%d" % i, - self.sfh5["3.1/instrument/"]) - self.assertEqual( - self.sfh5["3.1/instrument/mca_%d/data" % i].shape, - (3, 3)) - - self.assertNotIn("mca_3", - self.sfh5["3.1/instrument/"]) - - -sf_text_slash = r"""#F /data/id09/archive/logspecfiles/laue/2016/scan_231_laue_16-11-29.dat -#D Sat Dec 10 22:20:59 2016 -#O0 Pslit/HGap MRTSlit%UP - -#S 1 laue_16-11-29.log 231.1 PD3/A -#D Sat Dec 10 22:20:59 2016 -#P0 180.005 -0.66875 -#N 2 -#L GONY/mm PD3%A --2.015 5.250424e-05 --2.01 5.30798e-05 --2.005 5.281903e-05 --2 5.220436e-05 -""" - - -class TestSpecH5SlashInLabels(unittest.TestCase): - """Test reading SPEC files with labels containing a / character - - The / character must be substituted with a % - """ - @classmethod - def setUpClass(cls): - fd, cls.fname = tempfile.mkstemp() - if sys.version_info < (3, ): - os.write(fd, sf_text_slash) - else: - os.write(fd, bytes(sf_text_slash, 'ascii')) - os.close(fd) - - @classmethod - def tearDownClass(cls): - os.unlink(cls.fname) - - def setUp(self): - self.sfh5 = SpecH5(self.fname) - - def tearDown(self): - self.sfh5.close() - - def testLabels(self): - """Ensure `/` is substituted with `%` and - ensure legitimate `%` in names are still working""" - self.assertEqual(list(self.sfh5["1.1/measurement/"].keys()), - ["GONY%mm", "PD3%A"]) - - # substituted "%" - self.assertIn("GONY%mm", - self.sfh5["1.1/measurement/"]) - self.assertNotIn("GONY/mm", - self.sfh5["1.1/measurement/"]) - self.assertAlmostEqual(self.sfh5["1.1/measurement/GONY%mm"][0], - -2.015, places=4) - # legitimate "%" - self.assertIn("PD3%A", - self.sfh5["1.1/measurement/"]) - - def testMotors(self): - """Ensure `/` is substituted with `%` and - ensure legitimate `%` in names are still working""" - self.assertEqual(list(self.sfh5["1.1/instrument/positioners"].keys()), - ["Pslit%HGap", "MRTSlit%UP"]) - # substituted "%" - self.assertIn("Pslit%HGap", - self.sfh5["1.1/instrument/positioners"]) - self.assertNotIn("Pslit/HGap", - self.sfh5["1.1/instrument/positioners"]) - self.assertAlmostEqual( - self.sfh5["1.1/instrument/positioners/Pslit%HGap"], - 180.005, places=4) - # legitimate "%" - self.assertIn("MRTSlit%UP", - self.sfh5["1.1/instrument/positioners"]) - - -def suite(): - test_suite = unittest.TestSuite() - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecH5)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecDate)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecH5MultiMca)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecH5NoDataCols)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecH5SlashInLabels)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_spectoh5.py b/silx/io/test/test_spectoh5.py deleted file mode 100644 index 903a62c..0000000 --- a/silx/io/test/test_spectoh5.py +++ /dev/null @@ -1,194 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2019 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for SpecFile to HDF5 converter""" - -from numpy import array_equal -import os -import sys -import tempfile -import unittest - -import h5py - -from ..spech5 import SpecH5, SpecH5Group -from ..convert import convert, write_to_h5 -from ..utils import h5py_read_dataset - -__authors__ = ["P. Knobel"] -__license__ = "MIT" -__date__ = "12/02/2018" - - -sfdata = b"""#F /tmp/sf.dat -#E 1455180875 -#D Thu Feb 11 09:54:35 2016 -#C imaging User = opid17 -#O0 Pslit HGap MRTSlit UP MRTSlit DOWN -#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap -#o0 pshg mrtu mrtd -#o2 ss1vo ss1ho ss1vg - -#J0 Seconds IA ion.mono Current -#J1 xbpmc2 idgap1 Inorm - -#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2 -#D Thu Feb 11 09:55:20 2016 -#T 0.2 (Seconds) -#P0 180.005 -0.66875 0.87125 -#P1 14.74255 16.197579 12.238283 -#N 4 -#L MRTSlit UP second column 3rd_col --1.23 5.89 8 -8.478100E+01 5 1.56 -3.14 2.73 -3.14 -1.2 2.3 3.4 - -#S 1 aaaaaa -#D Thu Feb 11 10:00:32 2016 -#@MCADEV 1 -#@MCA %16C -#@CHANN 3 0 2 1 -#@CALIB 1 2 3 -#N 3 -#L uno duo -1 2 -@A 0 1 2 -@A 10 9 8 -3 4 -@A 3.1 4 5 -@A 7 6 5 -5 6 -@A 6 7.7 8 -@A 4 3 2 -""" - - -class TestConvertSpecHDF5(unittest.TestCase): - @classmethod - def setUpClass(cls): - fd, cls.spec_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5") - os.write(fd, sfdata) - os.close(fd) - - fd, cls.h5_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5") - # Close and delete (we just need the name) - os.close(fd) - os.unlink(cls.h5_fname) - - @classmethod - def tearDownClass(cls): - os.unlink(cls.spec_fname) - - def setUp(self): - convert(self.spec_fname, self.h5_fname) - - self.sfh5 = SpecH5(self.spec_fname) - self.h5f = h5py.File(self.h5_fname, "a") - - def tearDown(self): - self.h5f.close() - self.sfh5.close() - os.unlink(self.h5_fname) - - def testAppendToHDF5(self): - write_to_h5(self.sfh5, self.h5f, h5path="/foo/bar/spam") - self.assertTrue( - array_equal(self.h5f["/1.2/measurement/mca_1/data"], - self.h5f["/foo/bar/spam/1.2/measurement/mca_1/data"]) - ) - - def testWriteSpecH5Group(self): - """Test passing a SpecH5Group as parameter, instead of a Spec filename - or a SpecH5.""" - g = self.sfh5["1.1/instrument"] - self.assertIsInstance(g, SpecH5Group) # let's be paranoid - write_to_h5(g, self.h5f, h5path="my instruments") - - self.assertAlmostEqual(self.h5f["my instruments/positioners/Sslit1 HOff"][tuple()], - 16.197579, places=4) - - def testTitle(self): - """Test the value of a dataset""" - title12 = h5py_read_dataset(self.h5f["/1.2/title"]) - self.assertEqual(title12, - u"aaaaaa") - - def testAttrs(self): - # Test root group (file) attributes - self.assertEqual(self.h5f.attrs["NX_class"], - u"NXroot") - # Test dataset attributes - ds = self.h5f["/1.2/instrument/mca_1/data"] - self.assertTrue("interpretation" in ds.attrs) - self.assertEqual(list(ds.attrs.values()), - [u"spectrum"]) - # Test group attributes - grp = self.h5f["1.1"] - self.assertEqual(grp.attrs["NX_class"], - u"NXentry") - self.assertEqual(len(list(grp.attrs.keys())), - 1) - - def testHdf5HasSameMembers(self): - spec_member_list = [] - - def append_spec_members(name): - spec_member_list.append(name) - self.sfh5.visit(append_spec_members) - - hdf5_member_list = [] - - def append_hdf5_members(name): - hdf5_member_list.append(name) - self.h5f.visit(append_hdf5_members) - - # 1. For some reason, h5py visit method doesn't include the leading - # "/" character when it passes the member name to the function, - # even though an explicit the .name attribute of a member will - # have a leading "/" - spec_member_list = [m.lstrip("/") for m in spec_member_list] - - self.assertEqual(set(hdf5_member_list), - set(spec_member_list)) - - def testLinks(self): - self.assertTrue( - array_equal(self.sfh5["/1.2/measurement/mca_0/data"], - self.h5f["/1.2/measurement/mca_0/data"]) - ) - self.assertTrue( - array_equal(self.h5f["/1.2/instrument/mca_1/channels"], - self.h5f["/1.2/measurement/mca_1/info/channels"]) - ) - - -def suite(): - test_suite = unittest.TestSuite() - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestConvertSpecHDF5)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_url.py b/silx/io/test/test_url.py deleted file mode 100644 index 114f6a7..0000000 --- a/silx/io/test/test_url.py +++ /dev/null @@ -1,228 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2017 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for url module""" - -__authors__ = ["V. Valls"] -__license__ = "MIT" -__date__ = "29/01/2018" - - -import unittest -from ..url import DataUrl - - -class TestDataUrl(unittest.TestCase): - - def assertUrl(self, url, expected): - self.assertEqual(url.is_valid(), expected[0]) - self.assertEqual(url.is_absolute(), expected[1]) - self.assertEqual(url.scheme(), expected[2]) - self.assertEqual(url.file_path(), expected[3]) - self.assertEqual(url.data_path(), expected[4]) - self.assertEqual(url.data_slice(), expected[5]) - - def test_fabio_absolute(self): - url = DataUrl("fabio:///data/image.edf?slice=2") - expected = [True, True, "fabio", "/data/image.edf", None, (2, )] - self.assertUrl(url, expected) - - def test_fabio_absolute_windows(self): - url = DataUrl("fabio:///C:/data/image.edf?slice=2") - expected = [True, True, "fabio", "C:/data/image.edf", None, (2, )] - self.assertUrl(url, expected) - - def test_silx_absolute(self): - url = DataUrl("silx:///data/image.h5?path=/data/dataset&slice=1,5") - expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)] - self.assertUrl(url, expected) - - def test_commandline_shell_separator(self): - url = DataUrl("silx:///data/image.h5::path=/data/dataset&slice=1,5") - expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)] - self.assertUrl(url, expected) - - def test_silx_absolute2(self): - url = DataUrl("silx:///data/image.edf?/scan_0/detector/data") - expected = [True, True, "silx", "/data/image.edf", "/scan_0/detector/data", None] - self.assertUrl(url, expected) - - def test_silx_absolute_windows(self): - url = DataUrl("silx:///C:/data/image.h5?/scan_0/detector/data") - expected = [True, True, "silx", "C:/data/image.h5", "/scan_0/detector/data", None] - self.assertUrl(url, expected) - - def test_silx_relative(self): - url = DataUrl("silx:./image.h5") - expected = [True, False, "silx", "./image.h5", None, None] - self.assertUrl(url, expected) - - def test_fabio_relative(self): - url = DataUrl("fabio:./image.edf") - expected = [True, False, "fabio", "./image.edf", None, None] - self.assertUrl(url, expected) - - def test_silx_relative2(self): - url = DataUrl("silx:image.h5") - expected = [True, False, "silx", "image.h5", None, None] - self.assertUrl(url, expected) - - def test_fabio_relative2(self): - url = DataUrl("fabio:image.edf") - expected = [True, False, "fabio", "image.edf", None, None] - self.assertUrl(url, expected) - - def test_file_relative(self): - url = DataUrl("image.edf") - expected = [True, False, None, "image.edf", None, None] - self.assertUrl(url, expected) - - def test_file_relative2(self): - url = DataUrl("./foo/bar/image.edf") - expected = [True, False, None, "./foo/bar/image.edf", None, None] - self.assertUrl(url, expected) - - def test_file_relative3(self): - url = DataUrl("foo/bar/image.edf") - expected = [True, False, None, "foo/bar/image.edf", None, None] - self.assertUrl(url, expected) - - def test_file_absolute(self): - url = DataUrl("/data/image.edf") - expected = [True, True, None, "/data/image.edf", None, None] - self.assertUrl(url, expected) - - def test_file_absolute_windows(self): - url = DataUrl("C:/data/image.edf") - expected = [True, True, None, "C:/data/image.edf", None, None] - self.assertUrl(url, expected) - - def test_absolute_with_path(self): - url = DataUrl("/foo/foobar.h5?/foo/bar") - expected = [True, True, None, "/foo/foobar.h5", "/foo/bar", None] - self.assertUrl(url, expected) - - def test_windows_file_data_slice(self): - url = DataUrl("C:/foo/foobar.h5?path=/foo/bar&slice=5,1") - expected = [True, True, None, "C:/foo/foobar.h5", "/foo/bar", (5, 1)] - self.assertUrl(url, expected) - - def test_scheme_file_data_slice(self): - url = DataUrl("silx:/foo/foobar.h5?path=/foo/bar&slice=5,1") - expected = [True, True, "silx", "/foo/foobar.h5", "/foo/bar", (5, 1)] - self.assertUrl(url, expected) - - def test_scheme_windows_file_data_slice(self): - url = DataUrl("silx:C:/foo/foobar.h5?path=/foo/bar&slice=5,1") - expected = [True, True, "silx", "C:/foo/foobar.h5", "/foo/bar", (5, 1)] - self.assertUrl(url, expected) - - def test_empty(self): - url = DataUrl("") - expected = [False, False, None, "", None, None] - self.assertUrl(url, expected) - - def test_unknown_scheme(self): - url = DataUrl("foo:/foo/foobar.h5?path=/foo/bar&slice=5,1") - expected = [False, True, "foo", "/foo/foobar.h5", "/foo/bar", (5, 1)] - self.assertUrl(url, expected) - - def test_slice(self): - url = DataUrl("/a.h5?path=/b&slice=5,1") - expected = [True, True, None, "/a.h5", "/b", (5, 1)] - self.assertUrl(url, expected) - - def test_slice2(self): - url = DataUrl("/a.h5?path=/b&slice=2:5") - expected = [True, True, None, "/a.h5", "/b", (slice(2, 5),)] - self.assertUrl(url, expected) - - def test_slice3(self): - url = DataUrl("/a.h5?path=/b&slice=::2") - expected = [True, True, None, "/a.h5", "/b", (slice(None, None, 2),)] - self.assertUrl(url, expected) - - def test_slice_ellipsis(self): - url = DataUrl("/a.h5?path=/b&slice=...") - expected = [True, True, None, "/a.h5", "/b", (Ellipsis, )] - self.assertUrl(url, expected) - - def test_slice_slicing(self): - url = DataUrl("/a.h5?path=/b&slice=:") - expected = [True, True, None, "/a.h5", "/b", (slice(None), )] - self.assertUrl(url, expected) - - def test_slice_missing_element(self): - url = DataUrl("/a.h5?path=/b&slice=5,,1") - expected = [False, True, None, "/a.h5", "/b", None] - self.assertUrl(url, expected) - - def test_slice_no_elements(self): - url = DataUrl("/a.h5?path=/b&slice=") - expected = [False, True, None, "/a.h5", "/b", None] - self.assertUrl(url, expected) - - def test_create_relative_url(self): - url = DataUrl(scheme="silx", file_path="./foo.h5", data_path="/", data_slice=(5, 1)) - self.assertFalse(url.is_absolute()) - url2 = DataUrl(url.path()) - self.assertEqual(url, url2) - - def test_create_absolute_url(self): - url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1)) - url2 = DataUrl(url.path()) - self.assertEqual(url, url2) - - def test_create_absolute_windows_url(self): - url = DataUrl(scheme="silx", file_path="C:/foo.h5", data_path="/", data_slice=(5, 1)) - url2 = DataUrl(url.path()) - self.assertEqual(url, url2) - - def test_create_slice_url(self): - url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1, Ellipsis, slice(None))) - url2 = DataUrl(url.path()) - self.assertEqual(url, url2) - - def test_wrong_url(self): - url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=(5, 1)) - self.assertFalse(url.is_valid()) - - def test_path_creation(self): - """make sure the construction of path succeed and that we can - recreate a DataUrl from a path""" - for data_slice in (1, (1,)): - with self.subTest(data_slice=data_slice): - url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=data_slice) - path = url.path() - DataUrl(path=path) - - -def suite(): - test_suite = unittest.TestSuite() - loadTests = unittest.defaultTestLoader.loadTestsFromTestCase - test_suite.addTest(loadTests(TestDataUrl)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest='suite') diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py deleted file mode 100644 index 13ab532..0000000 --- a/silx/io/test/test_utils.py +++ /dev/null @@ -1,888 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# Copyright (C) 2016-2019 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ############################################################################*/ -"""Tests for utils module""" - -import io -import numpy -import os -import re -import shutil -import tempfile -import unittest -import sys - -from .. import utils -from ..._version import calc_hexversion -import silx.io.url - -import h5py -from ..utils import h5ls - -import fabio - -__authors__ = ["P. Knobel"] -__license__ = "MIT" -__date__ = "03/12/2020" - -expected_spec1 = r"""#F .* -#D .* - -#S 1 Ordinate1 -#D .* -#N 2 -#L Abscissa Ordinate1 -1 4\.00 -2 5\.00 -3 6\.00 -""" - -expected_spec2 = expected_spec1 + r""" -#S 2 Ordinate2 -#D .* -#N 2 -#L Abscissa Ordinate2 -1 7\.00 -2 8\.00 -3 9\.00 -""" - -expected_spec2reg = r"""#F .* -#D .* - -#S 1 Ordinate1 -#D .* -#N 3 -#L Abscissa Ordinate1 Ordinate2 -1 4\.00 7\.00 -2 5\.00 8\.00 -3 6\.00 9\.00 -""" - -expected_spec2irr = expected_spec1 + r""" -#S 2 Ordinate2 -#D .* -#N 2 -#L Abscissa Ordinate2 -1 7\.00 -2 8\.00 -""" - -expected_csv = r"""Abscissa;Ordinate1;Ordinate2 -1;4\.00;7\.00e\+00 -2;5\.00;8\.00e\+00 -3;6\.00;9\.00e\+00 -""" - -expected_csv2 = r"""x;y0;y1 -1;4\.00;7\.00e\+00 -2;5\.00;8\.00e\+00 -3;6\.00;9\.00e\+00 -""" - - -class TestSave(unittest.TestCase): - """Test saving curves as SpecFile: - """ - - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.spec_fname = os.path.join(self.tempdir, "savespec.dat") - self.csv_fname = os.path.join(self.tempdir, "savecsv.csv") - self.npy_fname = os.path.join(self.tempdir, "savenpy.npy") - - self.x = [1, 2, 3] - self.xlab = "Abscissa" - self.y = [[4, 5, 6], [7, 8, 9]] - self.y_irr = [[4, 5, 6], [7, 8]] - self.ylabs = ["Ordinate1", "Ordinate2"] - - def tearDown(self): - if os.path.isfile(self.spec_fname): - os.unlink(self.spec_fname) - if os.path.isfile(self.csv_fname): - os.unlink(self.csv_fname) - if os.path.isfile(self.npy_fname): - os.unlink(self.npy_fname) - shutil.rmtree(self.tempdir) - - def test_save_csv(self): - utils.save1D(self.csv_fname, self.x, self.y, - xlabel=self.xlab, ylabels=self.ylabs, - filetype="csv", fmt=["%d", "%.2f", "%.2e"], - csvdelim=";", autoheader=True) - - csvf = open(self.csv_fname) - actual_csv = csvf.read() - csvf.close() - - self.assertRegex(actual_csv, expected_csv) - - def test_save_npy(self): - """npy file is saved with numpy.save after building a numpy array - and converting it to a named record array""" - npyf = open(self.npy_fname, "wb") - utils.save1D(npyf, self.x, self.y, - xlabel=self.xlab, ylabels=self.ylabs) - npyf.close() - - npy_recarray = numpy.load(self.npy_fname) - - self.assertEqual(npy_recarray.shape, (3,)) - self.assertTrue(numpy.array_equal(npy_recarray['Ordinate1'], - numpy.array((4, 5, 6)))) - - def test_savespec_filename(self): - """Save SpecFile using savespec()""" - utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab, - ylabel=self.ylabs[0], fmt=["%d", "%.2f"], - close_file=True, scan_number=1) - - specf = open(self.spec_fname) - actual_spec = specf.read() - specf.close() - self.assertRegex(actual_spec, expected_spec1) - - def test_savespec_file_handle(self): - """Save SpecFile using savespec(), passing a file handle""" - # first savespec: open, write file header, save y[0] as scan 1, - # return file handle - specf = utils.savespec(self.spec_fname, self.x, self.y[0], - xlabel=self.xlab, ylabel=self.ylabs[0], - fmt=["%d", "%.2f"], close_file=False) - - # second savespec: save y[1] as scan 2, close file - utils.savespec(specf, self.x, self.y[1], xlabel=self.xlab, - ylabel=self.ylabs[1], fmt=["%d", "%.2f"], - write_file_header=False, close_file=True, - scan_number=2) - - specf = open(self.spec_fname) - actual_spec = specf.read() - specf.close() - self.assertRegex(actual_spec, expected_spec2) - - def test_save_spec_reg(self): - """Save SpecFile using save() on a regular pattern""" - utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab, - ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) - - specf = open(self.spec_fname) - actual_spec = specf.read() - specf.close() - - self.assertRegex(actual_spec, expected_spec2reg) - - def test_save_spec_irr(self): - """Save SpecFile using save() on an irregular pattern""" - # invalid test case ?! - return - utils.save1D(self.spec_fname, self.x, self.y_irr, xlabel=self.xlab, - ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) - - specf = open(self.spec_fname) - actual_spec = specf.read() - specf.close() - self.assertRegex(actual_spec, expected_spec2irr) - - def test_save_csv_no_labels(self): - """Save csv using save(), with autoheader=True but - xlabel=None and ylabels=None - This is a non-regression test for bug #223""" - self.tempdir = tempfile.mkdtemp() - self.spec_fname = os.path.join(self.tempdir, "savespec.dat") - self.csv_fname = os.path.join(self.tempdir, "savecsv.csv") - self.npy_fname = os.path.join(self.tempdir, "savenpy.npy") - - self.x = [1, 2, 3] - self.xlab = "Abscissa" - self.y = [[4, 5, 6], [7, 8, 9]] - self.ylabs = ["Ordinate1", "Ordinate2"] - utils.save1D(self.csv_fname, self.x, self.y, - autoheader=True, fmt=["%d", "%.2f", "%.2e"]) - - csvf = open(self.csv_fname) - actual_csv = csvf.read() - csvf.close() - self.assertRegex(actual_csv, expected_csv2) - - -def assert_match_any_string_in_list(test, pattern, list_of_strings): - for string_ in list_of_strings: - if re.match(pattern, string_): - return True - return False - - -class TestH5Ls(unittest.TestCase): - """Test displaying the following HDF5 file structure: - - +foo - +bar - <HDF5 dataset "spam": shape (2, 2), type "<i8"> - <HDF5 dataset "tmp": shape (3,), type "<i8"> - <HDF5 dataset "data": shape (1,), type "<f8"> - - """ - - def assertMatchAnyStringInList(self, pattern, list_of_strings): - for string_ in list_of_strings: - if re.match(pattern, string_): - return None - raise AssertionError("regex pattern %s does not match any" % pattern + - " string in list " + str(list_of_strings)) - - def testHdf5(self): - fd, self.h5_fname = tempfile.mkstemp(text=False) - # Close and delete (we just want the name) - os.close(fd) - os.unlink(self.h5_fname) - self.h5f = h5py.File(self.h5_fname, "w") - self.h5f["/foo/bar/tmp"] = [1, 2, 3] - self.h5f["/foo/bar/spam"] = [[1, 2], [3, 4]] - self.h5f["/foo/data"] = [3.14] - self.h5f.close() - - rep = h5ls(self.h5_fname) - lines = rep.split("\n") - - self.assertIn("+foo", lines) - self.assertIn("\t+bar", lines) - - match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">' - self.assertMatchAnyStringInList(match, lines) - match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">' - self.assertMatchAnyStringInList(match, lines) - match = r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">' - self.assertMatchAnyStringInList(match, lines) - - os.unlink(self.h5_fname) - - # Following test case disabled d/t errors on AppVeyor: - # os.unlink(spec_fname) - # PermissionError: [WinError 32] The process cannot access the file because - # it is being used by another process: 'C:\\...\\savespec.dat' - - # def testSpec(self): - # tempdir = tempfile.mkdtemp() - # spec_fname = os.path.join(tempdir, "savespec.dat") - # - # x = [1, 2, 3] - # xlab = "Abscissa" - # y = [[4, 5, 6], [7, 8, 9]] - # ylabs = ["Ordinate1", "Ordinate2"] - # utils.save1D(spec_fname, x, y, xlabel=xlab, - # ylabels=ylabs, filetype="spec", - # fmt=["%d", "%.2f"]) - # - # rep = h5ls(spec_fname) - # lines = rep.split("\n") - # self.assertIn("+1.1", lines) - # self.assertIn("\t+instrument", lines) - # - # self.assertMatchAnyStringInList( - # r'\t\t\t<SPEC dataset "file_header": shape \(\), type "|S60">', - # lines) - # self.assertMatchAnyStringInList( - # r'\t\t<SPEC dataset "Ordinate1": shape \(3L?,\), type "<f4">', - # lines) - # - # os.unlink(spec_fname) - # shutil.rmtree(tempdir) - - -class TestOpen(unittest.TestCase): - """Test `silx.io.utils.open` function.""" - - @classmethod - def setUpClass(cls): - cls.tmp_directory = tempfile.mkdtemp() - cls.createResources(cls.tmp_directory) - - @classmethod - def createResources(cls, directory): - - cls.h5_filename = os.path.join(directory, "test.h5") - h5 = h5py.File(cls.h5_filename, mode="w") - h5["group/group/dataset"] = 50 - h5.close() - - cls.spec_filename = os.path.join(directory, "test.dat") - utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y", - fmt=["%d", "%.2f"], close_file=True, scan_number=1) - - cls.edf_filename = os.path.join(directory, "test.edf") - header = fabio.fabioimage.OrderedDict() - header["integer"] = "10" - data = numpy.array([[10, 50], [50, 10]]) - fabiofile = fabio.edfimage.EdfImage(data, header) - fabiofile.write(cls.edf_filename) - - cls.txt_filename = os.path.join(directory, "test.txt") - f = io.open(cls.txt_filename, "w+t") - f.write(u"Kikoo") - f.close() - - cls.missing_filename = os.path.join(directory, "test.missing") - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tmp_directory) - - def testH5(self): - f = utils.open(self.h5_filename) - self.assertIsNotNone(f) - self.assertIsInstance(f, h5py.File) - f.close() - - def testH5With(self): - with utils.open(self.h5_filename) as f: - self.assertIsNotNone(f) - self.assertIsInstance(f, h5py.File) - - def testH5_withPath(self): - f = utils.open(self.h5_filename + "::/group/group/dataset") - self.assertIsNotNone(f) - self.assertEqual(f.h5py_class, h5py.Dataset) - self.assertEqual(f[()], 50) - f.close() - - def testH5With_withPath(self): - with utils.open(self.h5_filename + "::/group/group") as f: - self.assertIsNotNone(f) - self.assertEqual(f.h5py_class, h5py.Group) - self.assertIn("dataset", f) - - def testSpec(self): - f = utils.open(self.spec_filename) - self.assertIsNotNone(f) - self.assertEqual(f.h5py_class, h5py.File) - f.close() - - def testSpecWith(self): - with utils.open(self.spec_filename) as f: - self.assertIsNotNone(f) - self.assertEqual(f.h5py_class, h5py.File) - - def testEdf(self): - f = utils.open(self.edf_filename) - self.assertIsNotNone(f) - self.assertEqual(f.h5py_class, h5py.File) - f.close() - - def testEdfWith(self): - with utils.open(self.edf_filename) as f: - self.assertIsNotNone(f) - self.assertEqual(f.h5py_class, h5py.File) - - def testUnsupported(self): - self.assertRaises(IOError, utils.open, self.txt_filename) - - def testNotExists(self): - # load it - self.assertRaises(IOError, utils.open, self.missing_filename) - - def test_silx_scheme(self): - url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/") - with utils.open(url.path()) as f: - self.assertIsNotNone(f) - self.assertTrue(silx.io.utils.is_file(f)) - - def test_fabio_scheme(self): - url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename) - self.assertRaises(IOError, utils.open, url.path()) - - def test_bad_url(self): - url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename) - self.assertRaises(IOError, utils.open, url.path()) - - def test_sliced_url(self): - url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,)) - self.assertRaises(IOError, utils.open, url.path()) - - -class TestNodes(unittest.TestCase): - """Test `silx.io.utils.is_` functions.""" - - def test_real_h5py_objects(self): - name = tempfile.mktemp(suffix=".h5") - try: - with h5py.File(name, "w") as h5file: - h5group = h5file.create_group("arrays") - h5dataset = h5group.create_dataset("scalar", data=10) - - self.assertTrue(utils.is_file(h5file)) - self.assertTrue(utils.is_group(h5file)) - self.assertFalse(utils.is_dataset(h5file)) - - self.assertFalse(utils.is_file(h5group)) - self.assertTrue(utils.is_group(h5group)) - self.assertFalse(utils.is_dataset(h5group)) - - self.assertFalse(utils.is_file(h5dataset)) - self.assertFalse(utils.is_group(h5dataset)) - self.assertTrue(utils.is_dataset(h5dataset)) - finally: - os.unlink(name) - - def test_h5py_like_file(self): - - class Foo(object): - - def __init__(self): - self.h5_class = utils.H5Type.FILE - - obj = Foo() - self.assertTrue(utils.is_file(obj)) - self.assertTrue(utils.is_group(obj)) - self.assertFalse(utils.is_dataset(obj)) - - def test_h5py_like_group(self): - - class Foo(object): - - def __init__(self): - self.h5_class = utils.H5Type.GROUP - - obj = Foo() - self.assertFalse(utils.is_file(obj)) - self.assertTrue(utils.is_group(obj)) - self.assertFalse(utils.is_dataset(obj)) - - def test_h5py_like_dataset(self): - - class Foo(object): - - def __init__(self): - self.h5_class = utils.H5Type.DATASET - - obj = Foo() - self.assertFalse(utils.is_file(obj)) - self.assertFalse(utils.is_group(obj)) - self.assertTrue(utils.is_dataset(obj)) - - def test_bad(self): - - class Foo(object): - - def __init__(self): - pass - - obj = Foo() - self.assertFalse(utils.is_file(obj)) - self.assertFalse(utils.is_group(obj)) - self.assertFalse(utils.is_dataset(obj)) - - def test_bad_api(self): - - class Foo(object): - - def __init__(self): - self.h5_class = int - - obj = Foo() - self.assertFalse(utils.is_file(obj)) - self.assertFalse(utils.is_group(obj)) - self.assertFalse(utils.is_dataset(obj)) - - -class TestGetData(unittest.TestCase): - """Test `silx.io.utils.get_data` function.""" - - @classmethod - def setUpClass(cls): - cls.tmp_directory = tempfile.mkdtemp() - cls.createResources(cls.tmp_directory) - - @classmethod - def createResources(cls, directory): - - cls.h5_filename = os.path.join(directory, "test.h5") - h5 = h5py.File(cls.h5_filename, mode="w") - h5["group/group/scalar"] = 50 - h5["group/group/array"] = [1, 2, 3, 4, 5] - h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]] - h5.close() - - cls.spec_filename = os.path.join(directory, "test.dat") - utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y", - fmt=["%d", "%.2f"], close_file=True, scan_number=1) - - cls.edf_filename = os.path.join(directory, "test.edf") - cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf") - header = fabio.fabioimage.OrderedDict() - header["integer"] = "10" - data = numpy.array([[10, 50], [50, 10]]) - fabiofile = fabio.edfimage.EdfImage(data, header) - fabiofile.write(cls.edf_filename) - fabiofile.append_frame(data=data, header=header) - fabiofile.write(cls.edf_multiframe_filename) - - cls.txt_filename = os.path.join(directory, "test.txt") - f = io.open(cls.txt_filename, "w+t") - f.write(u"Kikoo") - f.close() - - cls.missing_filename = os.path.join(directory, "test.missing") - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tmp_directory) - - def test_hdf5_scalar(self): - url = "silx:%s?/group/group/scalar" % self.h5_filename - data = utils.get_data(url=url) - self.assertEqual(data, 50) - - def test_hdf5_array(self): - url = "silx:%s?/group/group/array" % self.h5_filename - data = utils.get_data(url=url) - self.assertEqual(data.shape, (5,)) - self.assertEqual(data[0], 1) - - def test_hdf5_array_slice(self): - url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename - data = utils.get_data(url=url) - self.assertEqual(data.shape, (5,)) - self.assertEqual(data[0], 6) - - def test_hdf5_array_slice_out_of_range(self): - url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename - # ValueError: h5py 2.x - # IndexError: h5py 3.x - self.assertRaises((ValueError, IndexError), utils.get_data, url) - - def test_edf_using_silx(self): - url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename - data = utils.get_data(url=url) - self.assertEqual(data.shape, (2, 2)) - self.assertEqual(data[0, 0], 10) - - def test_fabio_frame(self): - url = "fabio:%s?slice=1" % self.edf_multiframe_filename - data = utils.get_data(url=url) - self.assertEqual(data.shape, (2, 2)) - self.assertEqual(data[0, 0], 10) - - def test_fabio_singleframe(self): - url = "fabio:%s?slice=0" % self.edf_filename - data = utils.get_data(url=url) - self.assertEqual(data.shape, (2, 2)) - self.assertEqual(data[0, 0], 10) - - def test_fabio_too_much_frames(self): - url = "fabio:%s?slice=..." % self.edf_multiframe_filename - self.assertRaises(ValueError, utils.get_data, url) - - def test_fabio_no_frame(self): - url = "fabio:%s" % self.edf_filename - data = utils.get_data(url=url) - self.assertEqual(data.shape, (2, 2)) - self.assertEqual(data[0, 0], 10) - - def test_unsupported_scheme(self): - url = "foo:/foo/bar" - self.assertRaises(ValueError, utils.get_data, url) - - def test_no_scheme(self): - url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename - self.assertRaises((ValueError, IOError), utils.get_data, url) - - def test_file_not_exists(self): - url = "silx:/foo/bar" - self.assertRaises(IOError, utils.get_data, url) - - -def _h5_py_version_older_than(version): - v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split('.')[:3]] - r_majeur, r_mineur, r_micro = [int(i) for i in version.split('.')] - return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion(r_majeur, r_mineur, r_micro) - - -@unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0') -class TestRawFileToH5(unittest.TestCase): - """Test conversion of .vol file to .h5 external dataset""" - - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self._vol_file = os.path.join(self.tempdir, 'test_vol.vol') - self._file_info = os.path.join(self.tempdir, 'test_vol.info.vol') - self._dataset_shape = 100, 20, 5 - data = numpy.random.random(self._dataset_shape[0] * - self._dataset_shape[1] * - self._dataset_shape[2]).astype(dtype=numpy.float32).reshape(self._dataset_shape) - numpy.save(file=self._vol_file, arr=data) - # those are storing into .noz file - assert os.path.exists(self._vol_file + '.npy') - os.rename(self._vol_file + '.npy', self._vol_file) - self.h5_file = os.path.join(self.tempdir, 'test_h5.h5') - self.external_dataset_path = '/root/my_external_dataset' - self._data_url = silx.io.url.DataUrl(file_path=self.h5_file, - data_path=self.external_dataset_path) - with open(self._file_info, 'w') as _fi: - _fi.write('NUM_X = %s\n' % self._dataset_shape[2]) - _fi.write('NUM_Y = %s\n' % self._dataset_shape[1]) - _fi.write('NUM_Z = %s\n' % self._dataset_shape[0]) - - def tearDown(self): - shutil.rmtree(self.tempdir) - - def check_dataset(self, h5_file, data_path, shape): - """Make sure the external dataset is valid""" - with h5py.File(h5_file, 'r') as _file: - return data_path in _file and _file[data_path].shape == shape - - def test_h5_file_not_existing(self): - """Test that can create a file with external dataset from scratch""" - utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, - output_url=self._data_url, - shape=(100, 20, 5), - dtype=numpy.float32) - self.assertTrue(self.check_dataset(h5_file=self.h5_file, - data_path=self.external_dataset_path, - shape=self._dataset_shape)) - os.remove(self.h5_file) - utils.vol_to_h5_external_dataset(vol_file=self._vol_file, - output_url=self._data_url, - info_file=self._file_info) - self.assertTrue(self.check_dataset(h5_file=self.h5_file, - data_path=self.external_dataset_path, - shape=self._dataset_shape)) - - def test_h5_file_existing(self): - """Test that can add the external dataset from an existing file""" - with h5py.File(self.h5_file, 'w') as _file: - _file['/root/dataset1'] = numpy.zeros((100, 100)) - _file['/root/group/dataset2'] = numpy.ones((100, 100)) - utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, - output_url=self._data_url, - shape=(100, 20, 5), - dtype=numpy.float32) - self.assertTrue(self.check_dataset(h5_file=self.h5_file, - data_path=self.external_dataset_path, - shape=self._dataset_shape)) - - def test_vol_file_not_existing(self): - """Make sure error is raised if .vol file does not exists""" - os.remove(self._vol_file) - utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, - output_url=self._data_url, - shape=(100, 20, 5), - dtype=numpy.float32) - - self.assertTrue(self.check_dataset(h5_file=self.h5_file, - data_path=self.external_dataset_path, - shape=self._dataset_shape)) - - def test_conflicts(self): - """Test several conflict cases""" - # test if path already exists - utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, - output_url=self._data_url, - shape=(100, 20, 5), - dtype=numpy.float32) - with self.assertRaises(ValueError): - utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, - output_url=self._data_url, - shape=(100, 20, 5), - overwrite=False, - dtype=numpy.float32) - - utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file, - output_url=self._data_url, - shape=(100, 20, 5), - overwrite=True, - dtype=numpy.float32) - - self.assertTrue(self.check_dataset(h5_file=self.h5_file, - data_path=self.external_dataset_path, - shape=self._dataset_shape)) - - -class TestH5Strings(unittest.TestCase): - """Test HDF5 str and bytes writing and reading""" - - @classmethod - def setUpClass(cls): - cls.tempdir = tempfile.mkdtemp() - cls.vlenstr = h5py.special_dtype(vlen=str) - cls.vlenbytes = h5py.special_dtype(vlen=bytes) - try: - cls.unicode = unicode - except NameError: - cls.unicode = str - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tempdir) - - def setUp(self): - self.file = h5py.File(os.path.join(self.tempdir, 'file.h5'), mode="w") - - def tearDown(self): - self.file.close() - - @classmethod - def _make_array(cls, value, n): - if isinstance(value, bytes): - dtype = cls.vlenbytes - elif isinstance(value, cls.unicode): - dtype = cls.vlenstr - else: - return numpy.array([value] * n) - return numpy.array([value] * n, dtype=dtype) - - @classmethod - def _get_charset(cls, value): - if isinstance(value, bytes): - return h5py.h5t.CSET_ASCII - elif isinstance(value, cls.unicode): - return h5py.h5t.CSET_UTF8 - else: - return None - - def _check_dataset(self, value, result=None): - # Write+read scalar - if result: - decode_ascii = True - else: - decode_ascii = False - result = value - charset = self._get_charset(value) - self.file["data"] = value - data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii) - assert type(data) == type(result), data - assert data == result, data - if charset: - assert self.file["data"].id.get_type().get_cset() == charset - - # Write+read variable length - self.file["vlen_data"] = self._make_array(value, 2) - data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii, index=0) - assert type(data) == type(result), data - assert data == result, data - data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii) - numpy.testing.assert_array_equal(data, [result] * 2) - if charset: - assert self.file["vlen_data"].id.get_type().get_cset() == charset - - def _check_attribute(self, value, result=None): - if result: - decode_ascii = True - else: - decode_ascii = False - result = value - self.file.attrs["data"] = value - data = utils.h5py_read_attribute(self.file.attrs, "data", decode_ascii=decode_ascii) - assert type(data) == type(result), data - assert data == result, data - - self.file.attrs["vlen_data"] = self._make_array(value, 2) - data = utils.h5py_read_attribute(self.file.attrs, "vlen_data", decode_ascii=decode_ascii) - assert type(data[0]) == type(result), data[0] - assert data[0] == result, data[0] - numpy.testing.assert_array_equal(data, [result] * 2) - - data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)["vlen_data"] - assert type(data[0]) == type(result), data[0] - assert data[0] == result, data[0] - numpy.testing.assert_array_equal(data, [result] * 2) - - def test_dataset_ascii_bytes(self): - self._check_dataset(b"abc") - - def test_attribute_ascii_bytes(self): - self._check_attribute(b"abc") - - def test_dataset_ascii_bytes_decode(self): - self._check_dataset(b"abc", result="abc") - - def test_attribute_ascii_bytes_decode(self): - self._check_attribute(b"abc", result="abc") - - def test_dataset_ascii_str(self): - self._check_dataset("abc") - - def test_attribute_ascii_str(self): - self._check_attribute("abc") - - def test_dataset_utf8_str(self): - self._check_dataset("\u0101bc") - - def test_attribute_utf8_str(self): - self._check_attribute("\u0101bc") - - def test_dataset_utf8_bytes(self): - # 0xC481 is the byte representation of U+0101 - self._check_dataset(b"\xc4\x81bc") - - def test_attribute_utf8_bytes(self): - # 0xC481 is the byte representation of U+0101 - self._check_attribute(b"\xc4\x81bc") - - def test_dataset_utf8_bytes_decode(self): - # 0xC481 is the byte representation of U+0101 - self._check_dataset(b"\xc4\x81bc", result="\u0101bc") - - def test_attribute_utf8_bytes_decode(self): - # 0xC481 is the byte representation of U+0101 - self._check_attribute(b"\xc4\x81bc", result="\u0101bc") - - def test_dataset_latin1_bytes(self): - # extended ascii character 0xE4 - self._check_dataset(b"\xe423") - - def test_attribute_latin1_bytes(self): - # extended ascii character 0xE4 - self._check_attribute(b"\xe423") - - def test_dataset_latin1_bytes_decode(self): - # U+DCE4: surrogate for extended ascii character 0xE4 - self._check_dataset(b"\xe423", result="\udce423") - - def test_attribute_latin1_bytes_decode(self): - # U+DCE4: surrogate for extended ascii character 0xE4 - self._check_attribute(b"\xe423", result="\udce423") - - def test_dataset_no_string(self): - self._check_dataset(numpy.int64(10)) - - def test_attribute_no_string(self): - self._check_attribute(numpy.int64(10)) - - -def suite(): - loadTests = unittest.defaultTestLoader.loadTestsFromTestCase - test_suite = unittest.TestSuite() - test_suite.addTest(loadTests(TestSave)) - test_suite.addTest(loadTests(TestH5Ls)) - test_suite.addTest(loadTests(TestOpen)) - test_suite.addTest(loadTests(TestNodes)) - test_suite.addTest(loadTests(TestGetData)) - test_suite.addTest(loadTests(TestRawFileToH5)) - test_suite.addTest(loadTests(TestH5Strings)) - return test_suite - - -if __name__ == '__main__': - unittest.main(defaultTest="suite") |