summaryrefslogtreecommitdiff
path: root/silx/io/test
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/test')
-rw-r--r--silx/io/test/__init__.py61
-rw-r--r--silx/io/test/test_commonh5.py295
-rw-r--r--silx/io/test/test_dictdump.py1025
-rwxr-xr-xsilx/io/test/test_fabioh5.py629
-rw-r--r--silx/io/test/test_h5py_utils.py397
-rw-r--r--silx/io/test/test_nxdata.py579
-rw-r--r--silx/io/test/test_octaveh5.py165
-rw-r--r--silx/io/test/test_rawh5.py96
-rw-r--r--silx/io/test/test_specfile.py433
-rw-r--r--silx/io/test/test_specfilewrapper.py206
-rw-r--r--silx/io/test/test_spech5.py881
-rw-r--r--silx/io/test/test_spectoh5.py194
-rw-r--r--silx/io/test/test_url.py228
-rw-r--r--silx/io/test/test_utils.py888
14 files changed, 0 insertions, 6077 deletions
diff --git a/silx/io/test/__init__.py b/silx/io/test/__init__.py
deleted file mode 100644
index 68b6e9b..0000000
--- a/silx/io/test/__init__.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-
-__authors__ = ["T. Vincent", "P. Knobel"]
-__license__ = "MIT"
-__date__ = "08/12/2017"
-
-import unittest
-
-from .test_specfile import suite as test_specfile_suite
-from .test_specfilewrapper import suite as test_specfilewrapper_suite
-from .test_dictdump import suite as test_dictdump_suite
-from .test_spech5 import suite as test_spech5_suite
-from .test_spectoh5 import suite as test_spectoh5_suite
-from .test_octaveh5 import suite as test_octaveh5_suite
-from .test_fabioh5 import suite as test_fabioh5_suite
-from .test_utils import suite as test_utils_suite
-from .test_nxdata import suite as test_nxdata_suite
-from .test_commonh5 import suite as test_commonh5_suite
-from .test_rawh5 import suite as test_rawh5_suite
-from .test_url import suite as test_url_suite
-from .test_h5py_utils import suite as test_h5py_utils_suite
-
-
-def suite():
- test_suite = unittest.TestSuite()
- test_suite.addTest(test_dictdump_suite())
- test_suite.addTest(test_specfile_suite())
- test_suite.addTest(test_specfilewrapper_suite())
- test_suite.addTest(test_spech5_suite())
- test_suite.addTest(test_spectoh5_suite())
- test_suite.addTest(test_octaveh5_suite())
- test_suite.addTest(test_utils_suite())
- test_suite.addTest(test_fabioh5_suite())
- test_suite.addTest(test_nxdata_suite())
- test_suite.addTest(test_commonh5_suite())
- test_suite.addTest(test_rawh5_suite())
- test_suite.addTest(test_url_suite())
- test_suite.addTest(test_h5py_utils_suite())
- return test_suite
diff --git a/silx/io/test/test_commonh5.py b/silx/io/test/test_commonh5.py
deleted file mode 100644
index 168ef34..0000000
--- a/silx/io/test/test_commonh5.py
+++ /dev/null
@@ -1,295 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for commonh5 wrapper"""
-
-__authors__ = ["V. Valls"]
-__license__ = "MIT"
-__date__ = "21/09/2017"
-
-import logging
-import numpy
-import unittest
-import tempfile
-import shutil
-
-_logger = logging.getLogger(__name__)
-
-import silx.io
-import silx.io.utils
-import h5py
-
-try:
- from .. import commonh5
-except ImportError:
- commonh5 = None
-
-
-class TestCommonFeatures(unittest.TestCase):
- """Test common features supported by h5py and our implementation."""
-
- @classmethod
- def createFile(cls):
- return None
-
- @classmethod
- def setUpClass(cls):
- # Set to None cause create_resource can raise an excpetion
- cls.h5 = None
- cls.h5 = cls.create_resource()
- if cls.h5 is None:
- raise unittest.SkipTest("File not created")
-
- @classmethod
- def create_resource(cls):
- """Must be implemented"""
- return None
-
- @classmethod
- def tearDownClass(cls):
- cls.h5 = None
-
- def test_file(self):
- node = self.h5
- self.assertTrue(silx.io.is_file(node))
- self.assertTrue(silx.io.is_group(node))
- self.assertFalse(silx.io.is_dataset(node))
- self.assertEqual(len(node.attrs), 0)
-
- def test_group(self):
- node = self.h5["group"]
- self.assertFalse(silx.io.is_file(node))
- self.assertTrue(silx.io.is_group(node))
- self.assertFalse(silx.io.is_dataset(node))
- self.assertEqual(len(node.attrs), 0)
- class_ = self.h5.get("group", getclass=True)
- classlink = self.h5.get("group", getlink=True, getclass=True)
- self.assertEqual(class_, h5py.Group)
- self.assertEqual(classlink, h5py.HardLink)
-
- def test_dataset(self):
- node = self.h5["group/dataset"]
- self.assertFalse(silx.io.is_file(node))
- self.assertFalse(silx.io.is_group(node))
- self.assertTrue(silx.io.is_dataset(node))
- self.assertEqual(len(node.attrs), 0)
- class_ = self.h5.get("group/dataset", getclass=True)
- classlink = self.h5.get("group/dataset", getlink=True, getclass=True)
- self.assertEqual(class_, h5py.Dataset)
- self.assertEqual(classlink, h5py.HardLink)
-
- def test_soft_link(self):
- node = self.h5["link/soft_link"]
- self.assertEqual(node.name, "/link/soft_link")
- class_ = self.h5.get("link/soft_link", getclass=True)
- link = self.h5.get("link/soft_link", getlink=True)
- classlink = self.h5.get("link/soft_link", getlink=True, getclass=True)
- self.assertEqual(class_, h5py.Dataset)
- self.assertTrue(isinstance(link, (h5py.SoftLink, commonh5.SoftLink)))
- self.assertTrue(silx.io.utils.is_softlink(link))
- self.assertEqual(classlink, h5py.SoftLink)
-
- def test_external_link(self):
- node = self.h5["link/external_link"]
- self.assertEqual(node.name, "/target/dataset")
- class_ = self.h5.get("link/external_link", getclass=True)
- classlink = self.h5.get("link/external_link", getlink=True, getclass=True)
- self.assertEqual(class_, h5py.Dataset)
- self.assertEqual(classlink, h5py.ExternalLink)
-
- def test_external_link_to_link(self):
- node = self.h5["link/external_link_to_link"]
- self.assertEqual(node.name, "/target/link")
- class_ = self.h5.get("link/external_link_to_link", getclass=True)
- classlink = self.h5.get("link/external_link_to_link", getlink=True, getclass=True)
- self.assertEqual(class_, h5py.Dataset)
- self.assertEqual(classlink, h5py.ExternalLink)
-
- def test_create_groups(self):
- c = self.h5.create_group(self.id() + "/a/b/c")
- d = c.create_group("/" + self.id() + "/a/b/d")
-
- self.assertRaises(ValueError, self.h5.create_group, self.id() + "/a/b/d")
- self.assertEqual(c.name, "/" + self.id() + "/a/b/c")
- self.assertEqual(d.name, "/" + self.id() + "/a/b/d")
-
- def test_setitem_python_object_dataset(self):
- group = self.h5.create_group(self.id())
- group["a"] = 10
- self.assertEqual(group["a"].dtype.kind, "i")
-
- def test_setitem_numpy_dataset(self):
- group = self.h5.create_group(self.id())
- group["a"] = numpy.array([10, 20, 30])
- self.assertEqual(group["a"].dtype.kind, "i")
- self.assertEqual(group["a"].shape, (3,))
-
- def test_setitem_link(self):
- group = self.h5.create_group(self.id())
- group["a"] = 10
- group["b"] = group["a"]
- self.assertEqual(group["b"].dtype.kind, "i")
-
- def test_setitem_dataset_is_sub_group(self):
- self.h5[self.id() + "/a"] = 10
-
-
-class TestCommonFeatures_h5py(TestCommonFeatures):
- """Check if h5py is compliant with what we expect."""
-
- @classmethod
- def create_resource(cls):
- cls.tmp_dir = tempfile.mkdtemp()
-
- externalh5 = h5py.File(cls.tmp_dir + "/external.h5", mode="w")
- externalh5["target/dataset"] = 50
- externalh5["target/link"] = h5py.SoftLink("/target/dataset")
- externalh5.close()
-
- h5 = h5py.File(cls.tmp_dir + "/base.h5", mode="w")
- h5["group/dataset"] = 50
- h5["link/soft_link"] = h5py.SoftLink("/group/dataset")
- h5["link/external_link"] = h5py.ExternalLink("external.h5", "/target/dataset")
- h5["link/external_link_to_link"] = h5py.ExternalLink("external.h5", "/target/link")
-
- return h5
-
- @classmethod
- def tearDownClass(cls):
- super(TestCommonFeatures_h5py, cls).tearDownClass()
- if hasattr(cls, "tmp_dir") and cls.tmp_dir is not None:
- shutil.rmtree(cls.tmp_dir)
-
-
-class TestCommonFeatures_commonH5(TestCommonFeatures):
- """Check if commonh5 is compliant with h5py."""
-
- @classmethod
- def create_resource(cls):
- h5 = commonh5.File("base.h5", "w")
- h5.create_group("group").create_dataset("dataset", data=numpy.int32(50))
-
- link = h5.create_group("link")
- link.add_node(commonh5.SoftLink("soft_link", "/group/dataset"))
-
- return h5
-
- def test_external_link(self):
- # not applicable
- pass
-
- def test_external_link_to_link(self):
- # not applicable
- pass
-
-
-class TestSpecificCommonH5(unittest.TestCase):
- """Test specific features from commonh5.
-
- Test of shared features should be done by TestCommonFeatures."""
-
- def setUp(self):
- if commonh5 is None:
- self.skipTest("silx.io.commonh5 is needed")
-
- def test_node_attrs(self):
- node = commonh5.Node("Foo", attrs={"a": 1})
- self.assertEqual(node.attrs["a"], 1)
- node.attrs["b"] = 8
- self.assertEqual(node.attrs["b"], 8)
- node.attrs["b"] = 2
- self.assertEqual(node.attrs["b"], 2)
-
- def test_node_readonly_attrs(self):
- f = commonh5.File(name="Foo", mode="r")
- node = commonh5.Node("Foo", attrs={"a": 1})
- node.attrs["b"] = 8
- f.add_node(node)
- self.assertEqual(node.attrs["b"], 8)
- try:
- node.attrs["b"] = 1
- self.fail()
- except RuntimeError:
- pass
-
- def test_create_dataset(self):
- f = commonh5.File(name="Foo", mode="w")
- node = f.create_dataset("foo", data=numpy.array([1]))
- self.assertIs(node.parent, f)
- self.assertIs(f["foo"], node)
-
- def test_create_group(self):
- f = commonh5.File(name="Foo", mode="w")
- node = f.create_group("foo")
- self.assertIs(node.parent, f)
- self.assertIs(f["foo"], node)
-
- def test_readonly_create_dataset(self):
- f = commonh5.File(name="Foo", mode="r")
- try:
- f.create_dataset("foo", data=numpy.array([1]))
- self.fail()
- except RuntimeError:
- pass
-
- def test_readonly_create_group(self):
- f = commonh5.File(name="Foo", mode="r")
- try:
- f.create_group("foo")
- self.fail()
- except RuntimeError:
- pass
-
- def test_create_unicode_dataset(self):
- f = commonh5.File(name="Foo", mode="w")
- try:
- f.create_dataset("foo", data=numpy.array(u"aaaa"))
- self.fail()
- except TypeError:
- pass
-
- def test_setitem_dataset(self):
- self.h5 = commonh5.File(name="Foo", mode="w")
- group = self.h5.create_group(self.id())
- group["a"] = commonh5.Dataset(None, data=numpy.array(10))
- self.assertEqual(group["a"].dtype.kind, "i")
-
- def test_setitem_explicit_link(self):
- self.h5 = commonh5.File(name="Foo", mode="w")
- group = self.h5.create_group(self.id())
- group["a"] = 10
- group["b"] = commonh5.SoftLink(None, path="/" + self.id() + "/a")
- self.assertEqual(group["b"].dtype.kind, "i")
-
-
-def suite():
- loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
- test_suite = unittest.TestSuite()
- test_suite.addTest(loadTests(TestCommonFeatures_h5py))
- test_suite.addTest(loadTests(TestCommonFeatures_commonH5))
- test_suite.addTest(loadTests(TestSpecificCommonH5))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_dictdump.py b/silx/io/test/test_dictdump.py
deleted file mode 100644
index 93c9183..0000000
--- a/silx/io/test/test_dictdump.py
+++ /dev/null
@@ -1,1025 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2020 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for dicttoh5 module"""
-
-__authors__ = ["P. Knobel"]
-__license__ = "MIT"
-__date__ = "17/01/2018"
-
-from collections import OrderedDict
-import numpy
-import os
-import tempfile
-import unittest
-import h5py
-from copy import deepcopy
-
-from collections import defaultdict
-
-from silx.utils.testutils import TestLogging
-
-from ..configdict import ConfigDict
-from .. import dictdump
-from ..dictdump import dicttoh5, dicttojson, dump
-from ..dictdump import h5todict, load
-from ..dictdump import logger as dictdump_logger
-from ..utils import is_link
-from ..utils import h5py_read_dataset
-
-
-def tree():
- """Tree data structure as a recursive nested dictionary"""
- return defaultdict(tree)
-
-
-inhabitants = 160215
-
-city_attrs = tree()
-city_attrs["Europe"]["France"]["Grenoble"]["area"] = "18.44 km2"
-city_attrs["Europe"]["France"]["Grenoble"]["inhabitants"] = inhabitants
-city_attrs["Europe"]["France"]["Grenoble"]["coordinates"] = [45.1830, 5.7196]
-city_attrs["Europe"]["France"]["Tourcoing"]["area"]
-
-ext_attrs = tree()
-ext_attrs["ext_group"]["dataset"] = 10
-ext_filename = "ext.h5"
-
-link_attrs = tree()
-link_attrs["links"]["group"]["dataset"] = 10
-link_attrs["links"]["group"]["relative_softlink"] = h5py.SoftLink("dataset")
-link_attrs["links"]["relative_softlink"] = h5py.SoftLink("group/dataset")
-link_attrs["links"]["absolute_softlink"] = h5py.SoftLink("/links/group/dataset")
-link_attrs["links"]["external_link"] = h5py.ExternalLink(ext_filename, "/ext_group/dataset")
-
-
-class DictTestCase(unittest.TestCase):
-
- def assertRecursiveEqual(self, expected, actual, nodes=tuple()):
- err_msg = "\n\n Tree nodes: {}".format(nodes)
- if isinstance(expected, dict):
- self.assertTrue(isinstance(actual, dict), msg=err_msg)
- self.assertEqual(
- set(expected.keys()),
- set(actual.keys()),
- msg=err_msg
- )
- for k in actual:
- self.assertRecursiveEqual(
- expected[k],
- actual[k],
- nodes=nodes + (k,),
- )
- return
- if isinstance(actual, numpy.ndarray):
- actual = actual.tolist()
- if isinstance(expected, numpy.ndarray):
- expected = expected.tolist()
-
- self.assertEqual(expected, actual, msg=err_msg)
-
-
-class H5DictTestCase(DictTestCase):
-
- def _dictRoundTripNormalize(self, treedict):
- """Convert the dictionary as expected from a round-trip
- treedict -> dicttoh5 -> h5todict -> newtreedict
- """
- for key, value in list(treedict.items()):
- if isinstance(value, dict):
- self._dictRoundTripNormalize(value)
-
- # Expand treedict[("group", "attr_name")]
- # to treedict["group"]["attr_name"]
- for key, value in list(treedict.items()):
- if not isinstance(key, tuple):
- continue
- # Put the attribute inside the group
- grpname, attr = key
- if not grpname:
- continue
- group = treedict.setdefault(grpname, dict())
- if isinstance(group, dict):
- del treedict[key]
- group[("", attr)] = value
-
- def dictRoundTripNormalize(self, treedict):
- treedict2 = deepcopy(treedict)
- self._dictRoundTripNormalize(treedict2)
- return treedict2
-
-
-class TestDictToH5(H5DictTestCase):
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
- self.h5_ext_fname = os.path.join(self.tempdir, ext_filename)
-
- def tearDown(self):
- if os.path.exists(self.h5_fname):
- os.unlink(self.h5_fname)
- if os.path.exists(self.h5_ext_fname):
- os.unlink(self.h5_ext_fname)
- os.rmdir(self.tempdir)
-
- def testH5CityAttrs(self):
- filters = {'shuffle': True,
- 'fletcher32': True}
- dicttoh5(city_attrs, self.h5_fname, h5path='/city attributes',
- mode="w", create_dataset_args=filters)
-
- h5f = h5py.File(self.h5_fname, mode='r')
-
- self.assertIn("Tourcoing/area", h5f["/city attributes/Europe/France"])
- ds = h5f["/city attributes/Europe/France/Grenoble/inhabitants"]
- self.assertEqual(ds[...], 160215)
-
- # filters only apply to datasets that are not scalars (shape != () )
- ds = h5f["/city attributes/Europe/France/Grenoble/coordinates"]
- #self.assertEqual(ds.compression, "gzip")
- self.assertTrue(ds.fletcher32)
- self.assertTrue(ds.shuffle)
-
- h5f.close()
-
- ddict = load(self.h5_fname, fmat="hdf5")
- self.assertAlmostEqual(
- min(ddict["city attributes"]["Europe"]["France"]["Grenoble"]["coordinates"]),
- 5.7196)
-
- def testH5OverwriteDeprecatedApi(self):
- dd = ConfigDict({'t': True})
-
- dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a')
- dd = ConfigDict({'t': False})
- dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a',
- overwrite_data=False)
-
- res = h5todict(self.h5_fname)
- assert(res['t'] == True)
-
- dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a',
- overwrite_data=True)
-
- res = h5todict(self.h5_fname)
- assert(res['t'] == False)
-
- def testAttributes(self):
- """Any kind of attribute can be described"""
- ddict = {
- "group": {"datatset": "hmmm", ("", "group_attr"): 10},
- "dataset": "aaaaaaaaaaaaaaa",
- ("", "root_attr"): 11,
- ("dataset", "dataset_attr"): 12,
- ("group", "group_attr2"): 13,
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttoh5(ddict, h5file)
- self.assertEqual(h5file["group"].attrs['group_attr'], 10)
- self.assertEqual(h5file.attrs['root_attr'], 11)
- self.assertEqual(h5file["dataset"].attrs['dataset_attr'], 12)
- self.assertEqual(h5file["group"].attrs['group_attr2'], 13)
-
- def testPathAttributes(self):
- """A group is requested at a path"""
- ddict = {
- ("", "NX_class"): 'NXcollection',
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- # This should not warn
- with TestLogging(dictdump_logger, warning=0):
- dictdump.dicttoh5(ddict, h5file, h5path="foo/bar")
-
- def testKeyOrder(self):
- ddict1 = {
- "d": "plow",
- ("d", "a"): "ox",
- }
- ddict2 = {
- ("d", "a"): "ox",
- "d": "plow",
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttoh5(ddict1, h5file, h5path="g1")
- dictdump.dicttoh5(ddict2, h5file, h5path="g2")
- self.assertEqual(h5file["g1/d"].attrs['a'], "ox")
- self.assertEqual(h5file["g2/d"].attrs['a'], "ox")
-
- def testAttributeValues(self):
- """Any NX data types can be used"""
- ddict = {
- ("", "bool"): True,
- ("", "int"): 11,
- ("", "float"): 1.1,
- ("", "str"): "a",
- ("", "boollist"): [True, False, True],
- ("", "intlist"): [11, 22, 33],
- ("", "floatlist"): [1.1, 2.2, 3.3],
- ("", "strlist"): ["a", "bb", "ccc"],
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttoh5(ddict, h5file)
- for k, expected in ddict.items():
- result = h5file.attrs[k[1]]
- if isinstance(expected, list):
- if isinstance(expected[0], str):
- numpy.testing.assert_array_equal(result, expected)
- else:
- numpy.testing.assert_array_almost_equal(result, expected)
- else:
- self.assertEqual(result, expected)
-
- def testAttributeAlreadyExists(self):
- """A duplicated attribute warns if overwriting is not enabled"""
- ddict = {
- "group": {"dataset": "hmmm", ("", "attr"): 10},
- ("group", "attr"): 10,
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttoh5(ddict, h5file)
- self.assertEqual(h5file["group"].attrs['attr'], 10)
-
- def testFlatDict(self):
- """Description of a tree with a single level of keys"""
- ddict = {
- "group/group/dataset": 10,
- ("group/group/dataset", "attr"): 11,
- ("group/group", "attr"): 12,
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttoh5(ddict, h5file)
- self.assertEqual(h5file["group/group/dataset"][()], 10)
- self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11)
- self.assertEqual(h5file["group/group"].attrs['attr'], 12)
-
- def testLinks(self):
- with h5py.File(self.h5_ext_fname, "w") as h5file:
- dictdump.dicttoh5(ext_attrs, h5file)
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttoh5(link_attrs, h5file)
- with h5py.File(self.h5_fname, "r") as h5file:
- self.assertEqual(h5file["links/group/dataset"][()], 10)
- self.assertEqual(h5file["links/group/relative_softlink"][()], 10)
- self.assertEqual(h5file["links/relative_softlink"][()], 10)
- self.assertEqual(h5file["links/absolute_softlink"][()], 10)
- self.assertEqual(h5file["links/external_link"][()], 10)
-
- def testDumpNumpyArray(self):
- ddict = {
- 'darks': {
- '0': numpy.array([[0, 0, 0], [0, 0, 0]], dtype=numpy.uint16)
- }
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttoh5(ddict, h5file)
- with h5py.File(self.h5_fname, "r") as h5file:
- numpy.testing.assert_array_equal(h5py_read_dataset(h5file["darks"]["0"]),
- ddict['darks']['0'])
-
- def testOverwrite(self):
- # Tree structure that will be tested
- group1 = {
- ("", "attr2"): "original2",
- "dset1": 0,
- "dset2": [0, 1],
- ("dset1", "attr1"): "original1",
- ("dset1", "attr2"): "original2",
- ("dset2", "attr1"): "original1",
- ("dset2", "attr2"): "original2",
- }
- group2 = {
- "subgroup1": group1.copy(),
- "subgroup2": group1.copy(),
- ("subgroup1", "attr1"): "original1",
- ("subgroup2", "attr1"): "original1"
- }
- group2.update(group1)
- # initial HDF5 tree
- otreedict = {
- ('', 'attr1'): "original1",
- ('', 'attr2'): "original2",
- 'group1': group1,
- 'group2': group2,
- ('group1', 'attr1'): "original1",
- ('group2', 'attr1'): "original1"
- }
- wtreedict = None # dumped dictionary
- etreedict = None # expected HDF5 tree after dump
-
- def reset_file():
- dicttoh5(
- otreedict,
- h5file=self.h5_fname,
- mode="w",
- )
-
- def append_file(update_mode):
- dicttoh5(
- wtreedict,
- h5file=self.h5_fname,
- mode="a",
- update_mode=update_mode
- )
-
- def assert_file():
- rtreedict = h5todict(
- self.h5_fname,
- include_attributes=True,
- asarray=False
- )
- netreedict = self.dictRoundTripNormalize(etreedict)
- try:
- self.assertRecursiveEqual(netreedict, rtreedict)
- except AssertionError:
- from pprint import pprint
- print("\nDUMP:")
- pprint(wtreedict)
- print("\nEXPECTED:")
- pprint(netreedict)
- print("\nHDF5:")
- pprint(rtreedict)
- raise
-
- def assert_append(update_mode):
- append_file(update_mode)
- assert_file()
-
- # Test wrong arguments
- with self.assertRaises(ValueError):
- dicttoh5(
- otreedict,
- h5file=self.h5_fname,
- mode="w",
- update_mode="wrong-value"
- )
-
- # No writing
- reset_file()
- etreedict = deepcopy(otreedict)
- assert_file()
-
- # Write identical dictionary
- wtreedict = deepcopy(otreedict)
-
- reset_file()
- etreedict = deepcopy(otreedict)
- for update_mode in [None, "add", "modify", "replace"]:
- assert_append(update_mode)
-
- # Write empty dictionary
- wtreedict = dict()
-
- reset_file()
- etreedict = deepcopy(otreedict)
- for update_mode in [None, "add", "modify", "replace"]:
- assert_append(update_mode)
-
- # Modified dataset
- wtreedict = dict()
- wtreedict["group2"] = dict()
- wtreedict["group2"]["subgroup2"] = dict()
- wtreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]}
- wtreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
-
- reset_file()
- etreedict = deepcopy(otreedict)
- for update_mode in [None, "add"]:
- assert_append(update_mode)
-
- etreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
- assert_append("modify")
-
- etreedict["group2"] = dict()
- del etreedict[("group2", "attr1")]
- etreedict["group2"]["subgroup2"] = dict()
- etreedict["group2"]["subgroup2"]["dset1"] = {"dset3": [10, 20]}
- etreedict["group2"]["subgroup2"]["dset2"] = [10, 20]
- assert_append("replace")
-
- # Modified group
- wtreedict = dict()
- wtreedict["group2"] = dict()
- wtreedict["group2"]["subgroup2"] = [0, 1]
-
- reset_file()
- etreedict = deepcopy(otreedict)
- for update_mode in [None, "add", "modify"]:
- assert_append(update_mode)
-
- etreedict["group2"] = dict()
- del etreedict[("group2", "attr1")]
- etreedict["group2"]["subgroup2"] = [0, 1]
- assert_append("replace")
-
- # Modified attribute
- wtreedict = dict()
- wtreedict["group2"] = dict()
- wtreedict["group2"]["subgroup2"] = dict()
- wtreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified"
-
- reset_file()
- etreedict = deepcopy(otreedict)
- for update_mode in [None, "add"]:
- assert_append(update_mode)
-
- etreedict["group2"]["subgroup2"][("dset1", "attr1")] = "modified"
- assert_append("modify")
-
- etreedict["group2"] = dict()
- del etreedict[("group2", "attr1")]
- etreedict["group2"]["subgroup2"] = dict()
- etreedict["group2"]["subgroup2"]["dset1"] = dict()
- etreedict["group2"]["subgroup2"]["dset1"][("", "attr1")] = "modified"
- assert_append("replace")
-
- # Delete group
- wtreedict = dict()
- wtreedict["group2"] = dict()
- wtreedict["group2"]["subgroup2"] = None
-
- reset_file()
- etreedict = deepcopy(otreedict)
- for update_mode in [None, "add"]:
- assert_append(update_mode)
-
- del etreedict["group2"]["subgroup2"]
- del etreedict["group2"][("subgroup2", "attr1")]
- assert_append("modify")
-
- etreedict["group2"] = dict()
- del etreedict[("group2", "attr1")]
- assert_append("replace")
-
- # Delete dataset
- wtreedict = dict()
- wtreedict["group2"] = dict()
- wtreedict["group2"]["subgroup2"] = dict()
- wtreedict["group2"]["subgroup2"]["dset2"] = None
-
- reset_file()
- etreedict = deepcopy(otreedict)
- for update_mode in [None, "add"]:
- assert_append(update_mode)
-
- del etreedict["group2"]["subgroup2"]["dset2"]
- del etreedict["group2"]["subgroup2"][("dset2", "attr1")]
- del etreedict["group2"]["subgroup2"][("dset2", "attr2")]
- assert_append("modify")
-
- etreedict["group2"] = dict()
- del etreedict[("group2", "attr1")]
- etreedict["group2"]["subgroup2"] = dict()
- assert_append("replace")
-
- # Delete attribute
- wtreedict = dict()
- wtreedict["group2"] = dict()
- wtreedict["group2"]["subgroup2"] = dict()
- wtreedict["group2"]["subgroup2"][("dset2", "attr1")] = None
-
- reset_file()
- etreedict = deepcopy(otreedict)
- for update_mode in [None, "add"]:
- assert_append(update_mode)
-
- del etreedict["group2"]["subgroup2"][("dset2", "attr1")]
- assert_append("modify")
-
- etreedict["group2"] = dict()
- del etreedict[("group2", "attr1")]
- etreedict["group2"]["subgroup2"] = dict()
- etreedict["group2"]["subgroup2"]["dset2"] = dict()
- assert_append("replace")
-
-
-class TestH5ToDict(H5DictTestCase):
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- self.h5_fname = os.path.join(self.tempdir, "cityattrs.h5")
- self.h5_ext_fname = os.path.join(self.tempdir, ext_filename)
- dicttoh5(city_attrs, self.h5_fname)
- dicttoh5(link_attrs, self.h5_fname, mode="a")
- dicttoh5(ext_attrs, self.h5_ext_fname)
-
- def tearDown(self):
- if os.path.exists(self.h5_fname):
- os.unlink(self.h5_fname)
- if os.path.exists(self.h5_ext_fname):
- os.unlink(self.h5_ext_fname)
- os.rmdir(self.tempdir)
-
- def testExcludeNames(self):
- ddict = h5todict(self.h5_fname, path="/Europe/France",
- exclude_names=["ourcoing", "inhab", "toto"])
- self.assertNotIn("Tourcoing", ddict)
- self.assertIn("Grenoble", ddict)
-
- self.assertNotIn("inhabitants", ddict["Grenoble"])
- self.assertIn("coordinates", ddict["Grenoble"])
- self.assertIn("area", ddict["Grenoble"])
-
- def testAsArrayTrue(self):
- """Test with asarray=True, the default"""
- ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble")
- self.assertTrue(numpy.array_equal(ddict["inhabitants"], numpy.array(inhabitants)))
-
- def testAsArrayFalse(self):
- """Test with asarray=False"""
- ddict = h5todict(self.h5_fname, path="/Europe/France/Grenoble", asarray=False)
- self.assertEqual(ddict["inhabitants"], inhabitants)
-
- def testDereferenceLinks(self):
- ddict = h5todict(self.h5_fname, path="links", dereference_links=True)
- self.assertTrue(ddict["absolute_softlink"], 10)
- self.assertTrue(ddict["relative_softlink"], 10)
- self.assertTrue(ddict["external_link"], 10)
- self.assertTrue(ddict["group"]["relative_softlink"], 10)
-
- def testPreserveLinks(self):
- ddict = h5todict(self.h5_fname, path="links", dereference_links=False)
- self.assertTrue(is_link(ddict["absolute_softlink"]))
- self.assertTrue(is_link(ddict["relative_softlink"]))
- self.assertTrue(is_link(ddict["external_link"]))
- self.assertTrue(is_link(ddict["group"]["relative_softlink"]))
-
- def testStrings(self):
- ddict = {"dset_bytes": b"bytes",
- "dset_utf8": "utf8",
- "dset_2bytes": [b"bytes", b"bytes"],
- "dset_2utf8": ["utf8", "utf8"],
- ("", "attr_bytes"): b"bytes",
- ("", "attr_utf8"): "utf8",
- ("", "attr_2bytes"): [b"bytes", b"bytes"],
- ("", "attr_2utf8"): ["utf8", "utf8"]}
- dicttoh5(ddict, self.h5_fname, mode="w")
- adict = h5todict(self.h5_fname, include_attributes=True, asarray=False)
- self.assertEqual(ddict["dset_bytes"], adict["dset_bytes"])
- self.assertEqual(ddict["dset_utf8"], adict["dset_utf8"])
- self.assertEqual(ddict[("", "attr_bytes")], adict[("", "attr_bytes")])
- self.assertEqual(ddict[("", "attr_utf8")], adict[("", "attr_utf8")])
- numpy.testing.assert_array_equal(ddict["dset_2bytes"], adict["dset_2bytes"])
- numpy.testing.assert_array_equal(ddict["dset_2utf8"], adict["dset_2utf8"])
- numpy.testing.assert_array_equal(ddict[("", "attr_2bytes")], adict[("", "attr_2bytes")])
- numpy.testing.assert_array_equal(ddict[("", "attr_2utf8")], adict[("", "attr_2utf8")])
-
-
-class TestDictToNx(H5DictTestCase):
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- self.h5_fname = os.path.join(self.tempdir, "nx.h5")
- self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5")
-
- def tearDown(self):
- if os.path.exists(self.h5_fname):
- os.unlink(self.h5_fname)
- if os.path.exists(self.h5_ext_fname):
- os.unlink(self.h5_ext_fname)
- os.rmdir(self.tempdir)
-
- def testAttributes(self):
- """Any kind of attribute can be described"""
- ddict = {
- "group": {"dataset": 100, "@group_attr1": 10},
- "dataset": 200,
- "@root_attr": 11,
- "dataset@dataset_attr": "12",
- "group@group_attr2": 13,
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttonx(ddict, h5file)
- self.assertEqual(h5file["group"].attrs['group_attr1'], 10)
- self.assertEqual(h5file.attrs['root_attr'], 11)
- self.assertEqual(h5file["dataset"].attrs['dataset_attr'], "12")
- self.assertEqual(h5file["group"].attrs['group_attr2'], 13)
-
- def testKeyOrder(self):
- ddict1 = {
- "d": "plow",
- "d@a": "ox",
- }
- ddict2 = {
- "d@a": "ox",
- "d": "plow",
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttonx(ddict1, h5file, h5path="g1")
- dictdump.dicttonx(ddict2, h5file, h5path="g2")
- self.assertEqual(h5file["g1/d"].attrs['a'], "ox")
- self.assertEqual(h5file["g2/d"].attrs['a'], "ox")
-
- def testAttributeValues(self):
- """Any NX data types can be used"""
- ddict = {
- "@bool": True,
- "@int": 11,
- "@float": 1.1,
- "@str": "a",
- "@boollist": [True, False, True],
- "@intlist": [11, 22, 33],
- "@floatlist": [1.1, 2.2, 3.3],
- "@strlist": ["a", "bb", "ccc"],
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttonx(ddict, h5file)
- for k, expected in ddict.items():
- result = h5file.attrs[k[1:]]
- if isinstance(expected, list):
- if isinstance(expected[0], str):
- numpy.testing.assert_array_equal(result, expected)
- else:
- numpy.testing.assert_array_almost_equal(result, expected)
- else:
- self.assertEqual(result, expected)
-
- def testFlatDict(self):
- """Description of a tree with a single level of keys"""
- ddict = {
- "group/group/dataset": 10,
- "group/group/dataset@attr": 11,
- "group/group@attr": 12,
- }
- with h5py.File(self.h5_fname, "w") as h5file:
- dictdump.dicttonx(ddict, h5file)
- self.assertEqual(h5file["group/group/dataset"][()], 10)
- self.assertEqual(h5file["group/group/dataset"].attrs['attr'], 11)
- self.assertEqual(h5file["group/group"].attrs['attr'], 12)
-
- def testLinks(self):
- ddict = {"ext_group": {"dataset": 10}}
- dictdump.dicttonx(ddict, self.h5_ext_fname)
- ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"},
- ">relative_softlink": "group/dataset",
- ">absolute_softlink": "/links/group/dataset",
- ">external_link": "nx_ext.h5::/ext_group/dataset"}}
- dictdump.dicttonx(ddict, self.h5_fname)
- with h5py.File(self.h5_fname, "r") as h5file:
- self.assertEqual(h5file["links/group/dataset"][()], 10)
- self.assertEqual(h5file["links/group/relative_softlink"][()], 10)
- self.assertEqual(h5file["links/relative_softlink"][()], 10)
- self.assertEqual(h5file["links/absolute_softlink"][()], 10)
- self.assertEqual(h5file["links/external_link"][()], 10)
-
- def testUpLinks(self):
- ddict = {"data": {"group": {"dataset": 10, ">relative_softlink": "dataset"}},
- "links": {"group": {"subgroup": {">relative_softlink": "../../../data/group/dataset"}}}}
- dictdump.dicttonx(ddict, self.h5_fname)
- with h5py.File(self.h5_fname, "r") as h5file:
- self.assertEqual(h5file["/links/group/subgroup/relative_softlink"][()], 10)
-
- def testOverwrite(self):
- entry_name = "entry"
- wtreedict = {
- "group1": {"a": 1, "b": 2},
- "group2@attr3": "attr3",
- "group2@attr4": "attr4",
- "group2": {
- "@attr1": "attr1",
- "@attr2": "attr2",
- "c": 3,
- "d": 4,
- "dataset4": 8,
- "dataset4@units": "keV",
- },
- "group3": {"subgroup": {"e": 9, "f": 10}},
- "dataset1": 5,
- "dataset2": 6,
- "dataset3": 7,
- "dataset3@units": "mm",
- }
- esubtree = {
- "@NX_class": "NXentry",
- "group1": {"@NX_class": "NXcollection", "a": 1, "b": 2},
- "group2": {
- "@NX_class": "NXcollection",
- "@attr1": "attr1",
- "@attr2": "attr2",
- "@attr3": "attr3",
- "@attr4": "attr4",
- "c": 3,
- "d": 4,
- "dataset4": 8,
- "dataset4@units": "keV",
- },
- "group3": {
- "@NX_class": "NXcollection",
- "subgroup": {"@NX_class": "NXcollection", "e": 9, "f": 10},
- },
- "dataset1": 5,
- "dataset2": 6,
- "dataset3": 7,
- "dataset3@units": "mm",
- }
- etreedict = {entry_name: esubtree}
-
- def append_file(update_mode, add_nx_class):
- dictdump.dicttonx(
- wtreedict,
- h5file=self.h5_fname,
- mode="a",
- h5path=entry_name,
- update_mode=update_mode,
- add_nx_class=add_nx_class
- )
-
- def assert_file():
- rtreedict = dictdump.nxtodict(
- self.h5_fname,
- include_attributes=True,
- asarray=False,
- )
- netreedict = self.dictRoundTripNormalize(etreedict)
- try:
- self.assertRecursiveEqual(netreedict, rtreedict)
- except AssertionError:
- from pprint import pprint
- print("\nDUMP:")
- pprint(wtreedict)
- print("\nEXPECTED:")
- pprint(netreedict)
- print("\nHDF5:")
- pprint(rtreedict)
- raise
-
- def assert_append(update_mode, add_nx_class=None):
- append_file(update_mode, add_nx_class=add_nx_class)
- assert_file()
-
- # First to an empty file
- assert_append(None)
-
- # Add non-existing attributes/datasets/groups
- wtreedict["group1"].pop("a")
- wtreedict["group2"].pop("@attr1")
- wtreedict["group2"]["@attr2"] = "attr3" # only for update
- wtreedict["group2"]["@type"] = "test"
- wtreedict["group2"]["dataset4"] = 9 # only for update
- del wtreedict["group2"]["dataset4@units"]
- wtreedict["group3"] = {}
- esubtree["group2"]["@type"] = "test"
- assert_append("add")
-
- # Add update existing attributes and datasets
- esubtree["group2"]["@attr2"] = "attr3"
- esubtree["group2"]["dataset4"] = 9
- assert_append("modify")
-
- # Do not add missing NX_class by default when updating
- wtreedict["group2"]["@NX_class"] = "NXprocess"
- esubtree["group2"]["@NX_class"] = "NXprocess"
- assert_append("modify")
- del wtreedict["group2"]["@NX_class"]
- assert_append("modify")
-
- # Overwrite existing groups/datasets/attributes
- esubtree["group1"].pop("a")
- esubtree["group2"].pop("@attr1")
- esubtree["group2"]["@NX_class"] = "NXcollection"
- esubtree["group2"]["dataset4"] = 9
- del esubtree["group2"]["dataset4@units"]
- esubtree["group3"] = {"@NX_class": "NXcollection"}
- assert_append("replace", add_nx_class=True)
-
-
-class TestNxToDict(H5DictTestCase):
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- self.h5_fname = os.path.join(self.tempdir, "nx.h5")
- self.h5_ext_fname = os.path.join(self.tempdir, "nx_ext.h5")
-
- def tearDown(self):
- if os.path.exists(self.h5_fname):
- os.unlink(self.h5_fname)
- if os.path.exists(self.h5_ext_fname):
- os.unlink(self.h5_ext_fname)
- os.rmdir(self.tempdir)
-
- def testAttributes(self):
- """Any kind of attribute can be described"""
- ddict = {
- "group": {"dataset": 100, "@group_attr1": 10},
- "dataset": 200,
- "@root_attr": 11,
- "dataset@dataset_attr": "12",
- "group@group_attr2": 13,
- }
- dictdump.dicttonx(ddict, self.h5_fname)
- ddict = dictdump.nxtodict(self.h5_fname, include_attributes=True)
- self.assertEqual(ddict["group"]["@group_attr1"], 10)
- self.assertEqual(ddict["@root_attr"], 11)
- self.assertEqual(ddict["dataset@dataset_attr"], "12")
- self.assertEqual(ddict["group"]["@group_attr2"], 13)
-
- def testDereferenceLinks(self):
- """Write links and dereference on read"""
- ddict = {"ext_group": {"dataset": 10}}
- dictdump.dicttonx(ddict, self.h5_ext_fname)
- ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"},
- ">relative_softlink": "group/dataset",
- ">absolute_softlink": "/links/group/dataset",
- ">external_link": "nx_ext.h5::/ext_group/dataset"}}
- dictdump.dicttonx(ddict, self.h5_fname)
-
- ddict = dictdump.h5todict(self.h5_fname, dereference_links=True)
- self.assertTrue(ddict["links"]["absolute_softlink"], 10)
- self.assertTrue(ddict["links"]["relative_softlink"], 10)
- self.assertTrue(ddict["links"]["external_link"], 10)
- self.assertTrue(ddict["links"]["group"]["relative_softlink"], 10)
-
- def testPreserveLinks(self):
- """Write/read links"""
- ddict = {"ext_group": {"dataset": 10}}
- dictdump.dicttonx(ddict, self.h5_ext_fname)
- ddict = {"links": {"group": {"dataset": 10, ">relative_softlink": "dataset"},
- ">relative_softlink": "group/dataset",
- ">absolute_softlink": "/links/group/dataset",
- ">external_link": "nx_ext.h5::/ext_group/dataset"}}
- dictdump.dicttonx(ddict, self.h5_fname)
-
- ddict = dictdump.nxtodict(self.h5_fname, dereference_links=False)
- self.assertTrue(ddict["links"][">absolute_softlink"], "dataset")
- self.assertTrue(ddict["links"][">relative_softlink"], "group/dataset")
- self.assertTrue(ddict["links"][">external_link"], "/links/group/dataset")
- self.assertTrue(ddict["links"]["group"][">relative_softlink"], "nx_ext.h5::/ext_group/datase")
-
- def testNotExistingPath(self):
- """Test converting not existing path"""
- with h5py.File(self.h5_fname, 'a') as f:
- f['data'] = 1
-
- ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='ignore')
- self.assertFalse(ddict)
-
- with TestLogging(dictdump_logger, error=1):
- ddict = h5todict(self.h5_fname, path="/I/am/not/a/path", errors='log')
- self.assertFalse(ddict)
-
- with self.assertRaises(KeyError):
- h5todict(self.h5_fname, path="/I/am/not/a/path", errors='raise')
-
- def testBrokenLinks(self):
- """Test with broken links"""
- with h5py.File(self.h5_fname, 'a') as f:
- f["/Mars/BrokenSoftLink"] = h5py.SoftLink("/Idontexists")
- f["/Mars/BrokenExternalLink"] = h5py.ExternalLink("notexistingfile.h5", "/Idontexists")
-
- ddict = h5todict(self.h5_fname, path="/Mars", errors='ignore')
- self.assertFalse(ddict)
-
- with TestLogging(dictdump_logger, error=2):
- ddict = h5todict(self.h5_fname, path="/Mars", errors='log')
- self.assertFalse(ddict)
-
- with self.assertRaises(KeyError):
- h5todict(self.h5_fname, path="/Mars", errors='raise')
-
-
-class TestDictToJson(DictTestCase):
- def setUp(self):
- self.dir_path = tempfile.mkdtemp()
- self.json_fname = os.path.join(self.dir_path, "cityattrs.json")
-
- def tearDown(self):
- os.unlink(self.json_fname)
- os.rmdir(self.dir_path)
-
- def testJsonCityAttrs(self):
- self.json_fname = os.path.join(self.dir_path, "cityattrs.json")
- dicttojson(city_attrs, self.json_fname, indent=3)
-
- with open(self.json_fname, "r") as f:
- json_content = f.read()
- self.assertIn('"inhabitants": 160215', json_content)
-
-
-class TestDictToIni(DictTestCase):
- def setUp(self):
- self.dir_path = tempfile.mkdtemp()
- self.ini_fname = os.path.join(self.dir_path, "test.ini")
-
- def tearDown(self):
- os.unlink(self.ini_fname)
- os.rmdir(self.dir_path)
-
- def testConfigDictIO(self):
- """Ensure values and types of data is preserved when dictionary is
- written to file and read back."""
- testdict = {
- 'simple_types': {
- 'float': 1.0,
- 'int': 1,
- 'percent string': '5 % is too much',
- 'backslash string': 'i can use \\',
- 'empty_string': '',
- 'nonestring': 'None',
- 'nonetype': None,
- 'interpstring': 'interpolation: %(percent string)s',
- },
- 'containers': {
- 'list': [-1, 'string', 3.0, False, None],
- 'array': numpy.array([1.0, 2.0, 3.0]),
- 'dict': {
- 'key1': 'Hello World',
- 'key2': 2.0,
- }
- }
- }
-
- dump(testdict, self.ini_fname)
-
- #read the data back
- readdict = load(self.ini_fname)
-
- testdictkeys = list(testdict.keys())
- readkeys = list(readdict.keys())
-
- self.assertTrue(len(readkeys) == len(testdictkeys),
- "Number of read keys not equal")
-
- self.assertEqual(readdict['simple_types']["interpstring"],
- "interpolation: 5 % is too much")
-
- testdict['simple_types']["interpstring"] = "interpolation: 5 % is too much"
-
- for key in testdict["simple_types"]:
- original = testdict['simple_types'][key]
- read = readdict['simple_types'][key]
- self.assertEqual(read, original,
- "Read <%s> instead of <%s>" % (read, original))
-
- for key in testdict["containers"]:
- original = testdict["containers"][key]
- read = readdict["containers"][key]
- if key == 'array':
- self.assertEqual(read.all(), original.all(),
- "Read <%s> instead of <%s>" % (read, original))
- else:
- self.assertEqual(read, original,
- "Read <%s> instead of <%s>" % (read, original))
-
- def testConfigDictOrder(self):
- """Ensure order is preserved when dictionary is
- written to file and read back."""
- test_dict = {'banana': 3, 'apple': 4, 'pear': 1, 'orange': 2}
- # sort by key
- test_ordered_dict1 = OrderedDict(sorted(test_dict.items(),
- key=lambda t: t[0]))
- # sort by value
- test_ordered_dict2 = OrderedDict(sorted(test_dict.items(),
- key=lambda t: t[1]))
- # add the two ordered dict as sections of a third ordered dict
- test_ordered_dict3 = OrderedDict()
- test_ordered_dict3["section1"] = test_ordered_dict1
- test_ordered_dict3["section2"] = test_ordered_dict2
-
- # write to ini and read back as a ConfigDict (inherits OrderedDict)
- dump(test_ordered_dict3,
- self.ini_fname, fmat="ini")
- read_instance = ConfigDict()
- read_instance.read(self.ini_fname)
-
- # loop through original and read-back dictionaries,
- # test identical order for key/value pairs
- for orig_key, section in zip(test_ordered_dict3.keys(),
- read_instance.keys()):
- self.assertEqual(orig_key, section)
- for orig_key2, read_key in zip(test_ordered_dict3[section].keys(),
- read_instance[section].keys()):
- self.assertEqual(orig_key2, read_key)
- self.assertEqual(test_ordered_dict3[section][orig_key2],
- read_instance[section][read_key])
-
-
-def suite():
- test_suite = unittest.TestSuite()
- loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
- test_suite.addTest(loadTests(TestDictToIni))
- test_suite.addTest(loadTests(TestDictToH5))
- test_suite.addTest(loadTests(TestDictToNx))
- test_suite.addTest(loadTests(TestDictToJson))
- test_suite.addTest(loadTests(TestH5ToDict))
- test_suite.addTest(loadTests(TestNxToDict))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_fabioh5.py b/silx/io/test/test_fabioh5.py
deleted file mode 100755
index f2c85b1..0000000
--- a/silx/io/test/test_fabioh5.py
+++ /dev/null
@@ -1,629 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2018 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for fabioh5 wrapper"""
-
-__authors__ = ["V. Valls"]
-__license__ = "MIT"
-__date__ = "02/07/2018"
-
-import os
-import logging
-import numpy
-import unittest
-import tempfile
-import shutil
-
-_logger = logging.getLogger(__name__)
-
-import fabio
-import h5py
-
-from .. import commonh5
-from .. import fabioh5
-
-
-class TestFabioH5(unittest.TestCase):
-
- def setUp(self):
-
- header = {
- "integer": "-100",
- "float": "1.0",
- "string": "hi!",
- "list_integer": "100 50 0",
- "list_float": "1.0 2.0 3.5",
- "string_looks_like_list": "2000 hi!",
- }
- data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64)
- self.fabio_image = fabio.numpyimage.NumpyImage(data, header)
- self.h5_image = fabioh5.File(fabio_image=self.fabio_image)
-
- def test_main_groups(self):
- self.assertEqual(self.h5_image.h5py_class, h5py.File)
- self.assertEqual(self.h5_image["/"].h5py_class, h5py.File)
- self.assertEqual(self.h5_image["/scan_0"].h5py_class, h5py.Group)
- self.assertEqual(self.h5_image["/scan_0/instrument"].h5py_class, h5py.Group)
- self.assertEqual(self.h5_image["/scan_0/measurement"].h5py_class, h5py.Group)
-
- def test_wrong_path_syntax(self):
- # result tested with a default h5py file
- self.assertRaises(ValueError, lambda: self.h5_image[""])
-
- def test_wrong_root_name(self):
- # result tested with a default h5py file
- self.assertRaises(KeyError, lambda: self.h5_image["/foo"])
-
- def test_wrong_root_path(self):
- # result tested with a default h5py file
- self.assertRaises(KeyError, lambda: self.h5_image["/foo/foo"])
-
- def test_wrong_name(self):
- # result tested with a default h5py file
- self.assertRaises(KeyError, lambda: self.h5_image["foo"])
-
- def test_wrong_path(self):
- # result tested with a default h5py file
- self.assertRaises(KeyError, lambda: self.h5_image["foo/foo"])
-
- def test_single_frame(self):
- data = numpy.arange(2 * 3)
- data.shape = 2, 3
- fabio_image = fabio.edfimage.edfimage(data=data)
- h5_image = fabioh5.File(fabio_image=fabio_image)
-
- dataset = h5_image["/scan_0/instrument/detector_0/data"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertTrue(isinstance(dataset[()], numpy.ndarray))
- self.assertEqual(dataset.dtype.kind, "i")
- self.assertEqual(dataset.shape, (2, 3))
- self.assertEqual(dataset[...][0, 0], 0)
- self.assertEqual(dataset.attrs["interpretation"], "image")
-
- def test_multi_frames(self):
- data = numpy.arange(2 * 3)
- data.shape = 2, 3
- fabio_image = fabio.edfimage.edfimage(data=data)
- fabio_image.append_frame(data=data)
- h5_image = fabioh5.File(fabio_image=fabio_image)
-
- dataset = h5_image["/scan_0/instrument/detector_0/data"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertTrue(isinstance(dataset[()], numpy.ndarray))
- self.assertEqual(dataset.dtype.kind, "i")
- self.assertEqual(dataset.shape, (2, 2, 3))
- self.assertEqual(dataset[...][0, 0, 0], 0)
- self.assertEqual(dataset.attrs["interpretation"], "image")
-
- def test_heterogeneous_frames(self):
- """Frames containing 2 images with different sizes and a cube"""
- data1 = numpy.arange(2 * 3)
- data1.shape = 2, 3
- data2 = numpy.arange(2 * 5)
- data2.shape = 2, 5
- data3 = numpy.arange(2 * 5 * 1)
- data3.shape = 2, 5, 1
- fabio_image = fabio.edfimage.edfimage(data=data1)
- fabio_image.append_frame(data=data2)
- fabio_image.append_frame(data=data3)
- h5_image = fabioh5.File(fabio_image=fabio_image)
-
- dataset = h5_image["/scan_0/instrument/detector_0/data"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertTrue(isinstance(dataset[()], numpy.ndarray))
- self.assertEqual(dataset.dtype.kind, "i")
- self.assertEqual(dataset.shape, (3, 2, 5, 1))
- self.assertEqual(dataset[...][0, 0, 0], 0)
- self.assertEqual(dataset.attrs["interpretation"], "image")
-
- def test_single_3d_frame(self):
- """Image source contains a cube"""
- data = numpy.arange(2 * 3 * 4)
- data.shape = 2, 3, 4
- # Do not provide the data to the constructor to avoid slicing of the
- # data. In this way the result stay a cube, and not a multi-frame
- fabio_image = fabio.edfimage.edfimage()
- fabio_image.data = data
- h5_image = fabioh5.File(fabio_image=fabio_image)
-
- dataset = h5_image["/scan_0/instrument/detector_0/data"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertTrue(isinstance(dataset[()], numpy.ndarray))
- self.assertEqual(dataset.dtype.kind, "i")
- self.assertEqual(dataset.shape, (2, 3, 4))
- self.assertEqual(dataset[...][0, 0, 0], 0)
- self.assertEqual(dataset.attrs["interpretation"], "image")
-
- def test_metadata_int(self):
- dataset = self.h5_image["/scan_0/instrument/detector_0/others/integer"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertEqual(dataset[()], -100)
- self.assertEqual(dataset.dtype.kind, "i")
- self.assertEqual(dataset.shape, (1,))
-
- def test_metadata_float(self):
- dataset = self.h5_image["/scan_0/instrument/detector_0/others/float"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertEqual(dataset[()], 1.0)
- self.assertEqual(dataset.dtype.kind, "f")
- self.assertEqual(dataset.shape, (1,))
-
- def test_metadata_string(self):
- dataset = self.h5_image["/scan_0/instrument/detector_0/others/string"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertEqual(dataset[()], numpy.string_("hi!"))
- self.assertEqual(dataset.dtype.type, numpy.string_)
- self.assertEqual(dataset.shape, (1,))
-
- def test_metadata_list_integer(self):
- dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_integer"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertEqual(dataset.dtype.kind, "u")
- self.assertEqual(dataset.shape, (1, 3))
- self.assertEqual(dataset[0, 0], 100)
- self.assertEqual(dataset[0, 1], 50)
-
- def test_metadata_list_float(self):
- dataset = self.h5_image["/scan_0/instrument/detector_0/others/list_float"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertEqual(dataset.dtype.kind, "f")
- self.assertEqual(dataset.shape, (1, 3))
- self.assertEqual(dataset[0, 0], 1.0)
- self.assertEqual(dataset[0, 1], 2.0)
-
- def test_metadata_list_looks_like_list(self):
- dataset = self.h5_image["/scan_0/instrument/detector_0/others/string_looks_like_list"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertEqual(dataset[()], numpy.string_("2000 hi!"))
- self.assertEqual(dataset.dtype.type, numpy.string_)
- self.assertEqual(dataset.shape, (1,))
-
- def test_float_32(self):
- float_list = [u'1.2', u'1.3', u'1.4']
- data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
- fabio_image = None
- for float_item in float_list:
- header = {"float_item": float_item}
- if fabio_image is None:
- fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
- else:
- fabio_image.append_frame(data=data, header=header)
- h5_image = fabioh5.File(fabio_image=fabio_image)
- data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
- # There is no equality between items
- self.assertEqual(len(data), len(set(data)))
- # At worst a float32
- self.assertIn(data.dtype.kind, ['d', 'f'])
- self.assertLessEqual(data.dtype.itemsize, 32 / 8)
-
- def test_float_64(self):
- float_list = [
- u'1469117129.082226',
- u'1469117136.684986', u'1469117144.312749', u'1469117151.892507',
- u'1469117159.474265', u'1469117167.100027', u'1469117174.815799',
- u'1469117182.437561', u'1469117190.094326', u'1469117197.721089']
- data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
- fabio_image = None
- for float_item in float_list:
- header = {"time_of_day": float_item}
- if fabio_image is None:
- fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
- else:
- fabio_image.append_frame(data=data, header=header)
- h5_image = fabioh5.File(fabio_image=fabio_image)
- data = h5_image["/scan_0/instrument/detector_0/others/time_of_day"]
- # There is no equality between items
- self.assertEqual(len(data), len(set(data)))
- # At least a float64
- self.assertIn(data.dtype.kind, ['d', 'f'])
- self.assertGreaterEqual(data.dtype.itemsize, 64 / 8)
-
- def test_mixed_float_size__scalar(self):
- # We expect to have a precision of 32 bits
- float_list = [u'1.2', u'1.3001']
- expected_float_result = [1.2, 1.3001]
- data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
- fabio_image = None
- for float_item in float_list:
- header = {"float_item": float_item}
- if fabio_image is None:
- fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
- else:
- fabio_image.append_frame(data=data, header=header)
- h5_image = fabioh5.File(fabio_image=fabio_image)
- data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
- # At worst a float32
- self.assertIn(data.dtype.kind, ['d', 'f'])
- self.assertLessEqual(data.dtype.itemsize, 32 / 8)
- for computed, expected in zip(data, expected_float_result):
- numpy.testing.assert_almost_equal(computed, expected, 5)
-
- def test_mixed_float_size__list(self):
- # We expect to have a precision of 32 bits
- float_list = [u'1.2 1.3001']
- expected_float_result = numpy.array([[1.2, 1.3001]])
- data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
- fabio_image = None
- for float_item in float_list:
- header = {"float_item": float_item}
- if fabio_image is None:
- fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
- else:
- fabio_image.append_frame(data=data, header=header)
- h5_image = fabioh5.File(fabio_image=fabio_image)
- data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
- # At worst a float32
- self.assertIn(data.dtype.kind, ['d', 'f'])
- self.assertLessEqual(data.dtype.itemsize, 32 / 8)
- for computed, expected in zip(data, expected_float_result):
- numpy.testing.assert_almost_equal(computed, expected, 5)
-
- def test_mixed_float_size__list_of_list(self):
- # We expect to have a precision of 32 bits
- float_list = [u'1.2 1.3001', u'1.3001 1.3001']
- expected_float_result = numpy.array([[1.2, 1.3001], [1.3001, 1.3001]])
- data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
- fabio_image = None
- for float_item in float_list:
- header = {"float_item": float_item}
- if fabio_image is None:
- fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
- else:
- fabio_image.append_frame(data=data, header=header)
- h5_image = fabioh5.File(fabio_image=fabio_image)
- data = h5_image["/scan_0/instrument/detector_0/others/float_item"]
- # At worst a float32
- self.assertIn(data.dtype.kind, ['d', 'f'])
- self.assertLessEqual(data.dtype.itemsize, 32 / 8)
- for computed, expected in zip(data, expected_float_result):
- numpy.testing.assert_almost_equal(computed, expected, 5)
-
- def test_ub_matrix(self):
- """Data from mediapix.edf"""
- header = {}
- header["UB_mne"] = 'UB0 UB1 UB2 UB3 UB4 UB5 UB6 UB7 UB8'
- header["UB_pos"] = '1.99593e-16 2.73682e-16 -1.54 -1.08894 1.08894 1.6083e-16 1.08894 1.08894 9.28619e-17'
- header["sample_mne"] = 'U0 U1 U2 U3 U4 U5'
- header["sample_pos"] = '4.08 4.08 4.08 90 90 90'
- data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
- fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
- h5_image = fabioh5.File(fabio_image=fabio_image)
- sample = h5_image["/scan_0/sample"]
- self.assertIsNotNone(sample)
- self.assertEqual(sample.attrs["NXclass"], "NXsample")
-
- d = sample['unit_cell_abc']
- expected = numpy.array([4.08, 4.08, 4.08])
- self.assertIsNotNone(d)
- self.assertEqual(d.shape, (3, ))
- self.assertIn(d.dtype.kind, ['d', 'f'])
- numpy.testing.assert_array_almost_equal(d[...], expected)
-
- d = sample['unit_cell_alphabetagamma']
- expected = numpy.array([90.0, 90.0, 90.0])
- self.assertIsNotNone(d)
- self.assertEqual(d.shape, (3, ))
- self.assertIn(d.dtype.kind, ['d', 'f'])
- numpy.testing.assert_array_almost_equal(d[...], expected)
-
- d = sample['ub_matrix']
- expected = numpy.array([[[1.99593e-16, 2.73682e-16, -1.54],
- [-1.08894, 1.08894, 1.6083e-16],
- [1.08894, 1.08894, 9.28619e-17]]])
- self.assertIsNotNone(d)
- self.assertEqual(d.shape, (1, 3, 3))
- self.assertIn(d.dtype.kind, ['d', 'f'])
- numpy.testing.assert_array_almost_equal(d[...], expected)
-
- def test_interpretation_mca_edf(self):
- """EDF files with two or more headers starting with "MCA"
- must have @interpretation = "spectrum" an the data."""
- header = {
- "Title": "zapimage samy -4.975 -5.095 80 500 samz -4.091 -4.171 70 0",
- "MCA a": -23.812,
- "MCA b": 2.7107,
- "MCA c": 8.1164e-06}
-
- data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
- fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
- h5_image = fabioh5.File(fabio_image=fabio_image)
-
- data_dataset = h5_image["/scan_0/measurement/image_0/data"]
- self.assertEqual(data_dataset.attrs["interpretation"], "spectrum")
-
- data_dataset = h5_image["/scan_0/instrument/detector_0/data"]
- self.assertEqual(data_dataset.attrs["interpretation"], "spectrum")
-
- data_dataset = h5_image["/scan_0/measurement/image_0/info/data"]
- self.assertEqual(data_dataset.attrs["interpretation"], "spectrum")
-
- def test_get_api(self):
- result = self.h5_image.get("scan_0", getclass=True, getlink=True)
- self.assertIs(result, h5py.HardLink)
- result = self.h5_image.get("scan_0", getclass=False, getlink=True)
- self.assertIsInstance(result, h5py.HardLink)
- result = self.h5_image.get("scan_0", getclass=True, getlink=False)
- self.assertIs(result, h5py.Group)
- result = self.h5_image.get("scan_0", getclass=False, getlink=False)
- self.assertIsInstance(result, commonh5.Group)
-
- def test_detector_link(self):
- detector1 = self.h5_image["/scan_0/instrument/detector_0"]
- detector2 = self.h5_image["/scan_0/measurement/image_0/info"]
- self.assertIsNot(detector1, detector2)
- self.assertEqual(list(detector1.items()), list(detector2.items()))
- self.assertEqual(self.h5_image.get(detector2.name, getlink=True).path, detector1.name)
-
- def test_detector_data_link(self):
- data1 = self.h5_image["/scan_0/instrument/detector_0/data"]
- data2 = self.h5_image["/scan_0/measurement/image_0/data"]
- self.assertIsNot(data1, data2)
- self.assertIs(data1._get_data(), data2._get_data())
- self.assertEqual(self.h5_image.get(data2.name, getlink=True).path, data1.name)
-
- def test_dirty_header(self):
- """Test that it does not fail"""
- try:
- header = {}
- header["foo"] = b'abc'
- data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
- fabio_image = fabio.edfimage.edfimage(data=data, header=header)
- header = {}
- header["foo"] = b'a\x90bc\xFE'
- fabio_image.append_frame(data=data, header=header)
- except Exception as e:
- _logger.error(e.args[0])
- _logger.debug("Backtrace", exc_info=True)
- self.skipTest("fabio do not allow to create the resource")
-
- h5_image = fabioh5.File(fabio_image=fabio_image)
- scan_header_path = "/scan_0/instrument/file/scan_header"
- self.assertIn(scan_header_path, h5_image)
- data = h5_image[scan_header_path]
- self.assertIsInstance(data[...], numpy.ndarray)
-
- def test_unicode_header(self):
- """Test that it does not fail"""
- try:
- header = {}
- header["foo"] = b'abc'
- data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
- fabio_image = fabio.edfimage.edfimage(data=data, header=header)
- header = {}
- header["foo"] = u'abc\u2764'
- fabio_image.append_frame(data=data, header=header)
- except Exception as e:
- _logger.error(e.args[0])
- _logger.debug("Backtrace", exc_info=True)
- self.skipTest("fabio do not allow to create the resource")
-
- h5_image = fabioh5.File(fabio_image=fabio_image)
- scan_header_path = "/scan_0/instrument/file/scan_header"
- self.assertIn(scan_header_path, h5_image)
- data = h5_image[scan_header_path]
- self.assertIsInstance(data[...], numpy.ndarray)
-
-
-class TestFabioH5MultiFrames(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
-
- names = ["A", "B", "C", "D"]
- values = [["32000", "-10", "5.0", "1"],
- ["-32000", "-10", "5.0", "1"]]
-
- fabio_file = None
-
- for i in range(10):
- header = {
- "image_id": "%d" % i,
- "integer": "-100",
- "float": "1.0",
- "string": "hi!",
- "list_integer": "100 50 0",
- "list_float": "1.0 2.0 3.5",
- "string_looks_like_list": "2000 hi!",
- "motor_mne": " ".join(names),
- "motor_pos": " ".join(values[i % len(values)]),
- "counter_mne": " ".join(names),
- "counter_pos": " ".join(values[i % len(values)])
- }
- for iname, name in enumerate(names):
- header[name] = values[i % len(values)][iname]
-
- data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64)
- if fabio_file is None:
- fabio_file = fabio.edfimage.EdfImage(data=data, header=header)
- else:
- fabio_file.append_frame(data=data, header=header)
-
- cls.fabio_file = fabio_file
- cls.fabioh5 = fabioh5.File(fabio_image=fabio_file)
-
- def test_others(self):
- others = self.fabioh5["/scan_0/instrument/detector_0/others"]
- dataset = others["A"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 1)
- self.assertEqual(dataset.dtype.kind, "i")
- dataset = others["B"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 1)
- self.assertEqual(dataset.dtype.kind, "i")
- dataset = others["C"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 1)
- self.assertEqual(dataset.dtype.kind, "f")
- dataset = others["D"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 1)
- self.assertEqual(dataset.dtype.kind, "u")
-
- def test_positioners(self):
- counters = self.fabioh5["/scan_0/instrument/positioners"]
- # At least 32 bits, no unsigned values
- dataset = counters["A"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 4)
- self.assertEqual(dataset.dtype.kind, "i")
- dataset = counters["B"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 4)
- self.assertEqual(dataset.dtype.kind, "i")
- dataset = counters["C"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 4)
- self.assertEqual(dataset.dtype.kind, "f")
- dataset = counters["D"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 4)
- self.assertEqual(dataset.dtype.kind, "i")
-
- def test_counters(self):
- counters = self.fabioh5["/scan_0/measurement"]
- # At least 32 bits, no unsigned values
- dataset = counters["A"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 4)
- self.assertEqual(dataset.dtype.kind, "i")
- dataset = counters["B"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 4)
- self.assertEqual(dataset.dtype.kind, "i")
- dataset = counters["C"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 4)
- self.assertEqual(dataset.dtype.kind, "f")
- dataset = counters["D"]
- self.assertGreaterEqual(dataset.dtype.itemsize, 4)
- self.assertEqual(dataset.dtype.kind, "i")
-
-
-class TestFabioH5WithEdf(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
-
- cls.tmp_directory = tempfile.mkdtemp()
-
- cls.edf_filename = os.path.join(cls.tmp_directory, "test.edf")
-
- header = {
- "integer": "-100",
- "float": "1.0",
- "string": "hi!",
- "list_integer": "100 50 0",
- "list_float": "1.0 2.0 3.5",
- "string_looks_like_list": "2000 hi!",
- }
- data = numpy.array([[10, 11], [12, 13], [14, 15]], dtype=numpy.int64)
- fabio_image = fabio.edfimage.edfimage(data, header)
- fabio_image.write(cls.edf_filename)
-
- cls.fabio_image = fabio.open(cls.edf_filename)
- cls.h5_image = fabioh5.File(fabio_image=cls.fabio_image)
-
- @classmethod
- def tearDownClass(cls):
- cls.fabio_image = None
- cls.h5_image = None
- shutil.rmtree(cls.tmp_directory)
-
- def test_reserved_format_metadata(self):
- if fabio.hexversion < 327920: # 0.5.0 final
- self.skipTest("fabio >= 0.5.0 final is needed")
-
- # The EDF contains reserved keys in the header
- self.assertIn("HeaderID", self.fabio_image.header)
- # We do not expose them in FabioH5
- self.assertNotIn("/scan_0/instrument/detector_0/others/HeaderID", self.h5_image)
-
-
-class _TestableFrameData(fabioh5.FrameData):
- """Allow to test if the full data is reached."""
- def _create_data(self):
- raise RuntimeError("Not supposed to be called")
-
-
-class TestFabioH5WithFileSeries(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
-
- cls.tmp_directory = tempfile.mkdtemp()
-
- cls.edf_filenames = []
-
- for i in range(10):
- filename = os.path.join(cls.tmp_directory, "test_%04d.edf" % i)
- cls.edf_filenames.append(filename)
-
- header = {
- "image_id": "%d" % i,
- "integer": "-100",
- "float": "1.0",
- "string": "hi!",
- "list_integer": "100 50 0",
- "list_float": "1.0 2.0 3.5",
- "string_looks_like_list": "2000 hi!",
- }
- data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64)
- fabio_image = fabio.edfimage.edfimage(data, header)
- fabio_image.write(filename)
-
- @classmethod
- def tearDownClass(cls):
- shutil.rmtree(cls.tmp_directory)
-
- def _testH5Image(self, h5_image):
- # test data
- dataset = h5_image["/scan_0/instrument/detector_0/data"]
- self.assertEqual(dataset.h5py_class, h5py.Dataset)
- self.assertTrue(isinstance(dataset[()], numpy.ndarray))
- self.assertEqual(dataset.dtype.kind, "i")
- self.assertEqual(dataset.shape, (10, 3, 2))
- self.assertEqual(list(dataset[:, 0, 0]), list(range(10)))
- self.assertEqual(dataset.attrs["interpretation"], "image")
- # test metatdata
- dataset = h5_image["/scan_0/instrument/detector_0/others/image_id"]
- self.assertEqual(list(dataset[...]), list(range(10)))
-
- def testFileList(self):
- h5_image = fabioh5.File(file_series=self.edf_filenames)
- self._testH5Image(h5_image)
-
- def testFileSeries(self):
- file_series = fabioh5._FileSeries(self.edf_filenames)
- h5_image = fabioh5.File(file_series=file_series)
- self._testH5Image(h5_image)
-
- def testFrameDataCache(self):
- file_series = fabioh5._FileSeries(self.edf_filenames)
- reader = fabioh5.FabioReader(file_series=file_series)
- frameData = _TestableFrameData("foo", reader)
- self.assertEqual(frameData.dtype.kind, "i")
- self.assertEqual(frameData.shape, (10, 3, 2))
-
-
-def suite():
- loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
- test_suite = unittest.TestSuite()
- test_suite.addTest(loadTests(TestFabioH5))
- test_suite.addTest(loadTests(TestFabioH5MultiFrames))
- test_suite.addTest(loadTests(TestFabioH5WithEdf))
- test_suite.addTest(loadTests(TestFabioH5WithFileSeries))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_h5py_utils.py b/silx/io/test/test_h5py_utils.py
deleted file mode 100644
index 2e2e3dd..0000000
--- a/silx/io/test/test_h5py_utils.py
+++ /dev/null
@@ -1,397 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for h5py utilities"""
-
-__authors__ = ["W. de Nolf"]
-__license__ = "MIT"
-__date__ = "27/01/2020"
-
-
-import unittest
-import os
-import sys
-import time
-import shutil
-import tempfile
-import threading
-import multiprocessing
-from contextlib import contextmanager
-
-from .. import h5py_utils
-from ...utils.retry import RetryError, RetryTimeoutError
-
-IS_WINDOWS = sys.platform == "win32"
-
-
-def _subprocess_context_main(queue, contextmgr, *args, **kw):
- try:
- with contextmgr(*args, **kw):
- queue.put(None)
- threading.Event().wait()
- except Exception:
- queue.put(None)
- raise
-
-
-@contextmanager
-def _subprocess_context(contextmgr, *args, **kw):
- timeout = kw.pop("timeout", 10)
- queue = multiprocessing.Queue(maxsize=1)
- p = multiprocessing.Process(
- target=_subprocess_context_main, args=(queue, contextmgr) + args, kwargs=kw
- )
- p.start()
- try:
- queue.get(timeout=timeout)
- yield
- finally:
- try:
- p.kill()
- except AttributeError:
- p.terminate()
- p.join(timeout)
-
-
-@contextmanager
-def _open_context(filename, **kw):
- with h5py_utils.File(filename, **kw) as f:
- if kw.get("mode") == "w":
- f["check"] = True
- f.flush()
- yield f
-
-
-def _cause_segfault():
- import ctypes
-
- i = ctypes.c_char(b"a")
- j = ctypes.pointer(i)
- c = 0
- while True:
- j[c] = b"a"
- c += 1
-
-
-def _top_level_names_test(txtfilename, *args, **kw):
- sys.stderr = open(os.devnull, "w")
-
- with open(txtfilename, mode="r") as f:
- failcounter = int(f.readline().strip())
-
- ncausefailure = kw.pop("ncausefailure")
- faildelay = kw.pop("faildelay")
- if failcounter < ncausefailure:
- time.sleep(faildelay)
- failcounter += 1
- with open(txtfilename, mode="w") as f:
- f.write(str(failcounter))
- if failcounter % 2:
- raise RetryError
- else:
- _cause_segfault()
- return h5py_utils._top_level_names(*args, **kw)
-
-
-top_level_names_test = h5py_utils.retry_in_subprocess()(_top_level_names_test)
-
-
-def subtests(test):
- def wrapper(self):
- for _ in self._subtests():
- with self.subTest(**self._subtest_options):
- test(self)
-
- return wrapper
-
-
-class TestH5pyUtils(unittest.TestCase):
- def setUp(self):
- self.test_dir = tempfile.mkdtemp()
-
- def tearDown(self):
- shutil.rmtree(self.test_dir)
-
- def _subtests(self):
- self._subtest_options = {"mode": "w"}
- self.filename_generator = self._filenames()
- yield
- self._subtest_options = {"mode": "w", "libver": "latest"}
- self.filename_generator = self._filenames()
- yield
-
- @property
- def _liber_allows_concurrent_access(self):
- return self._subtest_options.get("libver") in [None, "earliest", "v18"]
-
- def _filenames(self):
- i = 1
- while True:
- filename = os.path.join(self.test_dir, "file{}.h5".format(i))
- with self._open_context(filename):
- pass
- yield filename
- i += 1
-
- def _new_filename(self):
- return next(self.filename_generator)
-
- @contextmanager
- def _open_context(self, filename, **kwargs):
- kw = self._subtest_options
- kw.update(kwargs)
- with _open_context(filename, **kw) as f:
-
- yield f
-
- @contextmanager
- def _open_context_subprocess(self, filename, **kwargs):
- kw = self._subtest_options
- kw.update(kwargs)
- with _subprocess_context(_open_context, filename, **kw):
- yield
-
- def _assert_hdf5_data(self, f):
- self.assertTrue(f["check"][()])
-
- def _validate_hdf5_data(self, filename, swmr=False):
- with self._open_context(filename, mode="r") as f:
- self.assertEqual(f.swmr_mode, swmr)
- self._assert_hdf5_data(f)
-
- @subtests
- def test_modes_single_process(self):
- orig = os.environ.get("HDF5_USE_FILE_LOCKING")
- filename1 = self._new_filename()
- self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
- filename2 = self._new_filename()
- self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
- with self._open_context(filename1, mode="r"):
- with self._open_context(filename2, mode="r"):
- pass
- for mode in ["w", "a"]:
- with self.assertRaises(RuntimeError):
- with self._open_context(filename2, mode=mode):
- pass
- self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
- with self._open_context(filename1, mode="a"):
- for mode in ["w", "a"]:
- with self._open_context(filename2, mode=mode):
- pass
- with self.assertRaises(RuntimeError):
- with self._open_context(filename2, mode="r"):
- pass
- self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
-
- @subtests
- def test_modes_multi_process(self):
- if not self._liber_allows_concurrent_access:
- # A concurrent reader with HDF5_USE_FILE_LOCKING=FALSE is
- # no longer works with HDF5 >=1.10 (you get an exception
- # when trying to open the file)
- return
- filename = self._new_filename()
-
- # File open by truncating writer
- with self._open_context_subprocess(filename, mode="w"):
- with self._open_context(filename, mode="r") as f:
- self._assert_hdf5_data(f)
- if IS_WINDOWS:
- with self._open_context(filename, mode="a") as f:
- self._assert_hdf5_data(f)
- else:
- with self.assertRaises(OSError):
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename)
-
- # File open by appending writer
- with self._open_context_subprocess(filename, mode="a"):
- with self._open_context(filename, mode="r") as f:
- self._assert_hdf5_data(f)
- if IS_WINDOWS:
- with self._open_context(filename, mode="a") as f:
- self._assert_hdf5_data(f)
- else:
- with self.assertRaises(OSError):
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename)
-
- # File open by reader
- with self._open_context_subprocess(filename, mode="r"):
- with self._open_context(filename, mode="r") as f:
- self._assert_hdf5_data(f)
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename)
-
- # File open by locking reader
- with _subprocess_context(
- _open_context, filename, mode="r", enable_file_locking=True
- ):
- with self._open_context(filename, mode="r") as f:
- self._assert_hdf5_data(f)
- if IS_WINDOWS:
- with self._open_context(filename, mode="a") as f:
- self._assert_hdf5_data(f)
- else:
- with self.assertRaises(OSError):
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename)
-
- @subtests
- @unittest.skipIf(not h5py_utils.HAS_SWMR, "SWMR not supported")
- def test_modes_multi_process_swmr(self):
- filename = self._new_filename()
-
- with self._open_context(filename, mode="w", libver="latest") as f:
- pass
-
- # File open by SWMR writer
- with self._open_context_subprocess(filename, mode="a", swmr=True):
- with self._open_context(filename, mode="r") as f:
- assert f.swmr_mode
- self._assert_hdf5_data(f)
- with self.assertRaises(OSError):
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename, swmr=True)
-
- @subtests
- def test_retry_defaults(self):
- filename = self._new_filename()
-
- names = h5py_utils.top_level_names(filename)
- self.assertEqual(names, [])
-
- names = h5py_utils.safe_top_level_names(filename)
- self.assertEqual(names, [])
-
- names = h5py_utils.top_level_names(filename, include_only=None)
- self.assertEqual(names, ["check"])
-
- names = h5py_utils.safe_top_level_names(filename, include_only=None)
- self.assertEqual(names, ["check"])
-
- with h5py_utils.open_item(filename, "/check", validate=lambda x: False) as item:
- self.assertEqual(item, None)
-
- with h5py_utils.open_item(filename, "/check", validate=None) as item:
- self.assertTrue(item[()])
-
- with self.assertRaises(RetryTimeoutError):
- with h5py_utils.open_item(
- filename,
- "/check",
- retry_timeout=0.1,
- retry_invalid=True,
- validate=lambda x: False,
- ) as item:
- pass
-
- ncall = 0
-
- def validate(item):
- nonlocal ncall
- if ncall >= 1:
- return True
- else:
- ncall += 1
- raise RetryError
-
- with h5py_utils.open_item(
- filename, "/check", validate=validate, retry_timeout=1, retry_invalid=True
- ) as item:
- self.assertTrue(item[()])
-
- @subtests
- def test_retry_custom(self):
- filename = self._new_filename()
- ncausefailure = 3
- faildelay = 0.1
- sufficient_timeout = ncausefailure * (faildelay + 10)
- insufficient_timeout = ncausefailure * faildelay * 0.5
-
- @h5py_utils.retry_contextmanager()
- def open_item(filename, name):
- nonlocal failcounter
- if failcounter < ncausefailure:
- time.sleep(faildelay)
- failcounter += 1
- raise RetryError
- with h5py_utils.File(filename) as h5file:
- yield h5file[name]
-
- failcounter = 0
- kw = {"retry_timeout": sufficient_timeout}
- with open_item(filename, "/check", **kw) as item:
- self.assertTrue(item[()])
-
- failcounter = 0
- kw = {"retry_timeout": insufficient_timeout}
- with self.assertRaises(RetryTimeoutError):
- with open_item(filename, "/check", **kw) as item:
- pass
-
- @subtests
- def test_retry_in_subprocess(self):
- filename = self._new_filename()
- txtfilename = os.path.join(self.test_dir, "failcounter.txt")
- ncausefailure = 3
- faildelay = 0.1
- sufficient_timeout = ncausefailure * (faildelay + 10)
- insufficient_timeout = ncausefailure * faildelay * 0.5
-
- kw = {
- "retry_timeout": sufficient_timeout,
- "include_only": None,
- "ncausefailure": ncausefailure,
- "faildelay": faildelay,
- }
- with open(txtfilename, mode="w") as f:
- f.write("0")
- names = top_level_names_test(txtfilename, filename, **kw)
- self.assertEqual(names, ["check"])
-
- kw = {
- "retry_timeout": insufficient_timeout,
- "include_only": None,
- "ncausefailure": ncausefailure,
- "faildelay": faildelay,
- }
- with open(txtfilename, mode="w") as f:
- f.write("0")
- with self.assertRaises(RetryTimeoutError):
- top_level_names_test(txtfilename, filename, **kw)
-
-
-def suite():
- test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestH5pyUtils))
- return test_suite
-
-
-if __name__ == "__main__":
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_nxdata.py b/silx/io/test/test_nxdata.py
deleted file mode 100644
index 80cc193..0000000
--- a/silx/io/test/test_nxdata.py
+++ /dev/null
@@ -1,579 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2020 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for NXdata parsing"""
-
-__authors__ = ["P. Knobel"]
-__license__ = "MIT"
-__date__ = "24/03/2020"
-
-
-import tempfile
-import unittest
-import h5py
-import numpy
-import six
-
-from .. import nxdata
-
-
-text_dtype = h5py.special_dtype(vlen=six.text_type)
-
-
-class TestNXdata(unittest.TestCase):
- def setUp(self):
- tmp = tempfile.NamedTemporaryFile(prefix="nxdata_examples_", suffix=".h5", delete=True)
- tmp.file.close()
- self.h5fname = tmp.name
- self.h5f = h5py.File(tmp.name, "w")
-
- # SCALARS
- g0d = self.h5f.create_group("scalars")
-
- g0d0 = g0d.create_group("0D_scalar")
- g0d0.attrs["NX_class"] = "NXdata"
- g0d0.attrs["signal"] = "scalar"
- g0d0.create_dataset("scalar", data=10)
- g0d0.create_dataset("scalar_errors", data=0.1)
-
- g0d1 = g0d.create_group("2D_scalars")
- g0d1.attrs["NX_class"] = "NXdata"
- g0d1.attrs["signal"] = "scalars"
- ds = g0d1.create_dataset("scalars", data=numpy.arange(3 * 10).reshape((3, 10)))
- ds.attrs["interpretation"] = "scalar"
-
- g0d1 = g0d.create_group("4D_scalars")
- g0d1.attrs["NX_class"] = "NXdata"
- g0d1.attrs["signal"] = "scalars"
- ds = g0d1.create_dataset("scalars", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10)))
- ds.attrs["interpretation"] = "scalar"
-
- # SPECTRA
- g1d = self.h5f.create_group("spectra")
-
- g1d0 = g1d.create_group("1D_spectrum")
- g1d0.attrs["NX_class"] = "NXdata"
- g1d0.attrs["signal"] = "count"
- g1d0.attrs["auxiliary_signals"] = numpy.array(["count2", "count3"],
- dtype=text_dtype)
- g1d0.attrs["axes"] = "energy_calib"
- g1d0.attrs["uncertainties"] = numpy.array(["energy_errors", ],
- dtype=text_dtype)
- g1d0.create_dataset("count", data=numpy.arange(10))
- g1d0.create_dataset("count2", data=0.5 * numpy.arange(10))
- d = g1d0.create_dataset("count3", data=0.4 * numpy.arange(10))
- d.attrs["long_name"] = "3rd counter"
- g1d0.create_dataset("title", data="Title as dataset (like nexpy)")
- g1d0.create_dataset("energy_calib", data=(10, 5)) # 10 * idx + 5
- g1d0.create_dataset("energy_errors", data=3.14 * numpy.random.rand(10))
-
- g1d1 = g1d.create_group("2D_spectra")
- g1d1.attrs["NX_class"] = "NXdata"
- g1d1.attrs["signal"] = "counts"
- ds = g1d1.create_dataset("counts", data=numpy.arange(3 * 10).reshape((3, 10)))
- ds.attrs["interpretation"] = "spectrum"
-
- g1d2 = g1d.create_group("4D_spectra")
- g1d2.attrs["NX_class"] = "NXdata"
- g1d2.attrs["signal"] = "counts"
- g1d2.attrs["axes"] = numpy.array(["energy", ], dtype=text_dtype)
- ds = g1d2.create_dataset("counts", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10)))
- ds.attrs["interpretation"] = "spectrum"
- ds = g1d2.create_dataset("errors", data=4.5 * numpy.random.rand(2, 2, 3, 10))
- ds = g1d2.create_dataset("energy", data=5 + 10 * numpy.arange(15),
- shuffle=True, compression="gzip")
- ds.attrs["long_name"] = "Calibrated energy"
- ds.attrs["first_good"] = 3
- ds.attrs["last_good"] = 12
- g1d2.create_dataset("energy_errors", data=10 * numpy.random.rand(15))
-
- # IMAGES
- g2d = self.h5f.create_group("images")
-
- g2d0 = g2d.create_group("2D_regular_image")
- g2d0.attrs["NX_class"] = "NXdata"
- g2d0.attrs["signal"] = "image"
- g2d0.attrs["auxiliary_signals"] = "image2"
- g2d0.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"],
- dtype=text_dtype)
- g2d0.create_dataset("image", data=numpy.arange(4 * 6).reshape((4, 6)))
- g2d0.create_dataset("image2", data=numpy.arange(4 * 6).reshape((4, 6)))
- ds = g2d0.create_dataset("rows_calib", data=(10, 5))
- ds.attrs["long_name"] = "Calibrated Y"
- g2d0.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6))
-
- g2d1 = g2d.create_group("2D_irregular_data")
- g2d1.attrs["NX_class"] = "NXdata"
- g2d1.attrs["signal"] = "data"
- g2d1.attrs["title"] = "Title as group attr"
- g2d1.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"],
- dtype=text_dtype)
- g2d1.create_dataset("data", data=numpy.arange(64 * 128).reshape((64, 128)))
- g2d1.create_dataset("rows_coordinates", data=numpy.arange(64) + numpy.random.rand(64))
- g2d1.create_dataset("columns_coordinates", data=numpy.arange(128) + 2.5 * numpy.random.rand(128))
-
- g2d2 = g2d.create_group("3D_images")
- g2d2.attrs["NX_class"] = "NXdata"
- g2d2.attrs["signal"] = "images"
- ds = g2d2.create_dataset("images", data=numpy.arange(2 * 4 * 6).reshape((2, 4, 6)))
- ds.attrs["interpretation"] = "image"
-
- g2d3 = g2d.create_group("5D_images")
- g2d3.attrs["NX_class"] = "NXdata"
- g2d3.attrs["signal"] = "images"
- g2d3.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"],
- dtype=text_dtype)
- ds = g2d3.create_dataset("images", data=numpy.arange(2 * 2 * 2 * 4 * 6).reshape((2, 2, 2, 4, 6)))
- ds.attrs["interpretation"] = "image"
- g2d3.create_dataset("rows_coordinates", data=5 + 10 * numpy.arange(4))
- g2d3.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6))
-
- g2d4 = g2d.create_group("RGBA_image")
- g2d4.attrs["NX_class"] = "NXdata"
- g2d4.attrs["signal"] = "image"
- g2d4.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"],
- dtype=text_dtype)
- rgba_image = numpy.linspace(0, 1, num=7*8*3).reshape((7, 8, 3))
- rgba_image[:, :, 1] = 1 - rgba_image[:, :, 1] # invert G channel to add some color
- ds = g2d4.create_dataset("image", data=rgba_image)
- ds.attrs["interpretation"] = "rgba-image"
- ds = g2d4.create_dataset("rows_calib", data=(10, 5))
- ds.attrs["long_name"] = "Calibrated Y"
- g2d4.create_dataset("columns_coordinates", data=0.5+0.02*numpy.arange(8))
-
- # SCATTER
- g = self.h5f.create_group("scatters")
-
- gd0 = g.create_group("x_y_scatter")
- gd0.attrs["NX_class"] = "NXdata"
- gd0.attrs["signal"] = "y"
- gd0.attrs["axes"] = numpy.array(["x", ], dtype=text_dtype)
- gd0.create_dataset("y", data=numpy.random.rand(128) - 0.5)
- gd0.create_dataset("x", data=2 * numpy.random.rand(128))
- gd0.create_dataset("x_errors", data=0.05 * numpy.random.rand(128))
- gd0.create_dataset("errors", data=0.05 * numpy.random.rand(128))
-
- gd1 = g.create_group("x_y_value_scatter")
- gd1.attrs["NX_class"] = "NXdata"
- gd1.attrs["signal"] = "values"
- gd1.attrs["axes"] = numpy.array(["x", "y"], dtype=text_dtype)
- gd1.create_dataset("values", data=3.14 * numpy.random.rand(128))
- gd1.create_dataset("y", data=numpy.random.rand(128))
- gd1.create_dataset("y_errors", data=0.02 * numpy.random.rand(128))
- gd1.create_dataset("x", data=numpy.random.rand(128))
- gd1.create_dataset("x_errors", data=0.02 * numpy.random.rand(128))
-
- def tearDown(self):
- self.h5f.close()
-
- def testValidity(self):
- for group in self.h5f:
- for subgroup in self.h5f[group]:
- self.assertTrue(
- nxdata.is_valid_nxdata(self.h5f[group][subgroup]),
- "%s/%s not found to be a valid NXdata group" % (group, subgroup))
-
- def testScalars(self):
- nxd = nxdata.NXdata(self.h5f["scalars/0D_scalar"])
- self.assertTrue(nxd.signal_is_0d)
- self.assertEqual(nxd.signal[()], 10)
- self.assertEqual(nxd.axes_names, [])
- self.assertEqual(nxd.axes_dataset_names, [])
- self.assertEqual(nxd.axes, [])
- self.assertIsNotNone(nxd.errors)
- self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
- self.assertIsNone(nxd.interpretation)
-
- nxd = nxdata.NXdata(self.h5f["scalars/2D_scalars"])
- self.assertTrue(nxd.signal_is_2d)
- self.assertEqual(nxd.signal[1, 2], 12)
- self.assertEqual(nxd.axes_names, [None, None])
- self.assertEqual(nxd.axes_dataset_names, [None, None])
- self.assertEqual(nxd.axes, [None, None])
- self.assertIsNone(nxd.errors)
- self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
- self.assertEqual(nxd.interpretation, "scalar")
-
- nxd = nxdata.NXdata(self.h5f["scalars/4D_scalars"])
- self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or
- nxd.signal_is_2d or nxd.signal_is_3d)
- self.assertEqual(nxd.signal[1, 0, 1, 4], 74)
- self.assertEqual(nxd.axes_names, [None, None, None, None])
- self.assertEqual(nxd.axes_dataset_names, [None, None, None, None])
- self.assertEqual(nxd.axes, [None, None, None, None])
- self.assertIsNone(nxd.errors)
- self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
- self.assertEqual(nxd.interpretation, "scalar")
-
- def testSpectra(self):
- nxd = nxdata.NXdata(self.h5f["spectra/1D_spectrum"])
- self.assertTrue(nxd.signal_is_1d)
- self.assertTrue(nxd.is_curve)
- self.assertTrue(numpy.array_equal(numpy.array(nxd.signal),
- numpy.arange(10)))
- self.assertEqual(nxd.axes_names, ["energy_calib"])
- self.assertEqual(nxd.axes_dataset_names, ["energy_calib"])
- self.assertEqual(nxd.axes[0][0], 10)
- self.assertEqual(nxd.axes[0][1], 5)
- self.assertIsNone(nxd.errors)
- self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
- self.assertIsNone(nxd.interpretation)
- self.assertEqual(nxd.title, "Title as dataset (like nexpy)")
-
- self.assertEqual(nxd.auxiliary_signals_dataset_names,
- ["count2", "count3"])
- self.assertEqual(nxd.auxiliary_signals_names,
- ["count2", "3rd counter"])
- self.assertAlmostEqual(nxd.auxiliary_signals[1][2],
- 0.8) # numpy.arange(10) * 0.4
-
- nxd = nxdata.NXdata(self.h5f["spectra/2D_spectra"])
- self.assertTrue(nxd.signal_is_2d)
- self.assertTrue(nxd.is_curve)
- self.assertEqual(nxd.axes_names, [None, None])
- self.assertEqual(nxd.axes_dataset_names, [None, None])
- self.assertEqual(nxd.axes, [None, None])
- self.assertIsNone(nxd.errors)
- self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
- self.assertEqual(nxd.interpretation, "spectrum")
-
- nxd = nxdata.NXdata(self.h5f["spectra/4D_spectra"])
- self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or
- nxd.signal_is_2d or nxd.signal_is_3d)
- self.assertTrue(nxd.is_curve)
- self.assertEqual(nxd.axes_names,
- [None, None, None, "Calibrated energy"])
- self.assertEqual(nxd.axes_dataset_names,
- [None, None, None, "energy"])
- self.assertEqual(nxd.axes[:3], [None, None, None])
- self.assertEqual(nxd.axes[3].shape, (10, )) # dataset shape (15, ) sliced [3:12]
- self.assertIsNotNone(nxd.errors)
- self.assertEqual(nxd.errors.shape, (2, 2, 3, 10))
- self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
- self.assertEqual(nxd.interpretation, "spectrum")
- self.assertEqual(nxd.get_axis_errors("energy").shape,
- (10,))
- # test getting axis errors by long_name
- self.assertTrue(numpy.array_equal(nxd.get_axis_errors("Calibrated energy"),
- nxd.get_axis_errors("energy")))
- self.assertTrue(numpy.array_equal(nxd.get_axis_errors(b"Calibrated energy"),
- nxd.get_axis_errors("energy")))
-
- def testImages(self):
- nxd = nxdata.NXdata(self.h5f["images/2D_regular_image"])
- self.assertTrue(nxd.signal_is_2d)
- self.assertTrue(nxd.is_image)
- self.assertEqual(nxd.axes_names, ["Calibrated Y", "columns_coordinates"])
- self.assertEqual(list(nxd.axes_dataset_names),
- ["rows_calib", "columns_coordinates"])
- self.assertIsNone(nxd.errors)
- self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
- self.assertIsNone(nxd.interpretation)
- self.assertEqual(len(nxd.auxiliary_signals), 1)
- self.assertEqual(nxd.auxiliary_signals_names, ["image2"])
-
- nxd = nxdata.NXdata(self.h5f["images/2D_irregular_data"])
- self.assertTrue(nxd.signal_is_2d)
- self.assertTrue(nxd.is_image)
-
- self.assertEqual(nxd.axes_dataset_names, nxd.axes_names)
- self.assertEqual(list(nxd.axes_dataset_names),
- ["rows_coordinates", "columns_coordinates"])
- self.assertEqual(len(nxd.axes), 2)
- self.assertIsNone(nxd.errors)
- self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
- self.assertIsNone(nxd.interpretation)
- self.assertEqual(nxd.title, "Title as group attr")
-
- nxd = nxdata.NXdata(self.h5f["images/5D_images"])
- self.assertTrue(nxd.is_image)
- self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or
- nxd.signal_is_2d or nxd.signal_is_3d)
- self.assertEqual(nxd.axes_names,
- [None, None, None, 'rows_coordinates', 'columns_coordinates'])
- self.assertEqual(nxd.axes_dataset_names,
- [None, None, None, 'rows_coordinates', 'columns_coordinates'])
- self.assertIsNone(nxd.errors)
- self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
- self.assertEqual(nxd.interpretation, "image")
-
- nxd = nxdata.NXdata(self.h5f["images/RGBA_image"])
- self.assertTrue(nxd.is_image)
- self.assertEqual(nxd.interpretation, "rgba-image")
- self.assertTrue(nxd.signal_is_3d)
- self.assertEqual(nxd.axes_names, ["Calibrated Y",
- "columns_coordinates",
- None])
- self.assertEqual(list(nxd.axes_dataset_names),
- ["rows_calib", "columns_coordinates", None])
-
- def testScatters(self):
- nxd = nxdata.NXdata(self.h5f["scatters/x_y_scatter"])
- self.assertTrue(nxd.signal_is_1d)
- self.assertEqual(nxd.axes_names, ["x"])
- self.assertEqual(nxd.axes_dataset_names,
- ["x"])
- self.assertIsNotNone(nxd.errors)
- self.assertEqual(nxd.get_axis_errors("x").shape,
- (128, ))
- self.assertTrue(nxd.is_scatter)
- self.assertFalse(nxd.is_x_y_value_scatter)
- self.assertIsNone(nxd.interpretation)
-
- nxd = nxdata.NXdata(self.h5f["scatters/x_y_value_scatter"])
- self.assertFalse(nxd.signal_is_1d)
- self.assertTrue(nxd.axes_dataset_names,
- nxd.axes_names)
- self.assertEqual(nxd.axes_dataset_names,
- ["x", "y"])
- self.assertEqual(nxd.get_axis_errors("x").shape,
- (128, ))
- self.assertEqual(nxd.get_axis_errors("y").shape,
- (128, ))
- self.assertEqual(len(nxd.axes), 2)
- self.assertIsNone(nxd.errors)
- self.assertTrue(nxd.is_scatter)
- self.assertTrue(nxd.is_x_y_value_scatter)
- self.assertIsNone(nxd.interpretation)
-
-
-class TestLegacyNXdata(unittest.TestCase):
- def setUp(self):
- tmp = tempfile.NamedTemporaryFile(prefix="nxdata_legacy_examples_",
- suffix=".h5", delete=True)
- tmp.file.close()
- self.h5fname = tmp.name
- self.h5f = h5py.File(tmp.name, "w")
-
- def tearDown(self):
- self.h5f.close()
-
- def testSignalAttrOnDataset(self):
- g = self.h5f.create_group("2D")
- g.attrs["NX_class"] = "NXdata"
-
- ds0 = g.create_dataset("image0",
- data=numpy.arange(4 * 6).reshape((4, 6)))
- ds0.attrs["signal"] = 1
- ds0.attrs["long_name"] = "My first image"
-
- ds1 = g.create_dataset("image1",
- data=numpy.arange(4 * 6).reshape((4, 6)))
- ds1.attrs["signal"] = "2"
- ds1.attrs["long_name"] = "My 2nd image"
-
- ds2 = g.create_dataset("image2",
- data=numpy.arange(4 * 6).reshape((4, 6)))
- ds2.attrs["signal"] = 3
-
- nxd = nxdata.NXdata(self.h5f["2D"])
-
- self.assertEqual(nxd.signal_dataset_name, "image0")
- self.assertEqual(nxd.signal_name, "My first image")
- self.assertEqual(nxd.signal.shape,
- (4, 6))
-
- self.assertEqual(len(nxd.auxiliary_signals), 2)
- self.assertEqual(nxd.auxiliary_signals[1].shape,
- (4, 6))
-
- self.assertEqual(nxd.auxiliary_signals_dataset_names,
- ["image1", "image2"])
- self.assertEqual(nxd.auxiliary_signals_names,
- ["My 2nd image", "image2"])
-
- def testAxesOnSignalDataset(self):
- g = self.h5f.create_group("2D")
- g.attrs["NX_class"] = "NXdata"
-
- ds0 = g.create_dataset("image0",
- data=numpy.arange(4 * 6).reshape((4, 6)))
- ds0.attrs["signal"] = 1
- ds0.attrs["axes"] = "yaxis:xaxis"
-
- ds1 = g.create_dataset("yaxis",
- data=numpy.arange(4))
- ds2 = g.create_dataset("xaxis",
- data=numpy.arange(6))
-
- nxd = nxdata.NXdata(self.h5f["2D"])
-
- self.assertEqual(nxd.axes_dataset_names,
- ["yaxis", "xaxis"])
- self.assertTrue(numpy.array_equal(nxd.axes[0],
- numpy.arange(4)))
- self.assertTrue(numpy.array_equal(nxd.axes[1],
- numpy.arange(6)))
-
- def testAxesOnAxesDatasets(self):
- g = self.h5f.create_group("2D")
- g.attrs["NX_class"] = "NXdata"
-
- ds0 = g.create_dataset("image0",
- data=numpy.arange(4 * 6).reshape((4, 6)))
- ds0.attrs["signal"] = 1
- ds1 = g.create_dataset("yaxis",
- data=numpy.arange(4))
- ds1.attrs["axis"] = 0
- ds2 = g.create_dataset("xaxis",
- data=numpy.arange(6))
- ds2.attrs["axis"] = "1"
-
- nxd = nxdata.NXdata(self.h5f["2D"])
- self.assertEqual(nxd.axes_dataset_names,
- ["yaxis", "xaxis"])
- self.assertTrue(numpy.array_equal(nxd.axes[0],
- numpy.arange(4)))
- self.assertTrue(numpy.array_equal(nxd.axes[1],
- numpy.arange(6)))
-
- def testAsciiUndefinedAxesAttrs(self):
- """Some files may not be using utf8 for str attrs"""
- g = self.h5f.create_group("bytes_attrs")
- g.attrs["NX_class"] = b"NXdata"
- g.attrs["signal"] = b"image0"
- g.attrs["axes"] = b"yaxis", b"."
-
- g.create_dataset("image0",
- data=numpy.arange(4 * 6).reshape((4, 6)))
- g.create_dataset("yaxis",
- data=numpy.arange(4))
-
- nxd = nxdata.NXdata(self.h5f["bytes_attrs"])
- self.assertEqual(nxd.axes_dataset_names,
- ["yaxis", None])
-
-
-class TestSaveNXdata(unittest.TestCase):
- def setUp(self):
- tmp = tempfile.NamedTemporaryFile(prefix="nxdata",
- suffix=".h5", delete=True)
- tmp.file.close()
- self.h5fname = tmp.name
-
- def testSimpleSave(self):
- sig = numpy.array([0, 1, 2])
- a0 = numpy.array([2, 3, 4])
- a1 = numpy.array([3, 4, 5])
- nxdata.save_NXdata(filename=self.h5fname,
- signal=sig,
- axes=[a0, a1],
- signal_name="sig",
- axes_names=["a0", "a1"],
- nxentry_name="a",
- nxdata_name="mydata")
-
- h5f = h5py.File(self.h5fname, "r")
- self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"]))
-
- nxd = nxdata.NXdata(h5f["/a/mydata"])
- self.assertTrue(numpy.array_equal(nxd.signal,
- sig))
- self.assertTrue(numpy.array_equal(nxd.axes[0],
- a0))
-
- h5f.close()
-
- def testSimplestSave(self):
- sig = numpy.array([0, 1, 2])
- nxdata.save_NXdata(filename=self.h5fname,
- signal=sig)
-
- h5f = h5py.File(self.h5fname, "r")
-
- self.assertTrue(nxdata.is_valid_nxdata(h5f["/entry/data0"]))
-
- nxd = nxdata.NXdata(h5f["/entry/data0"])
- self.assertTrue(numpy.array_equal(nxd.signal,
- sig))
- h5f.close()
-
- def testSaveDefaultAxesNames(self):
- sig = numpy.array([0, 1, 2])
- a0 = numpy.array([2, 3, 4])
- a1 = numpy.array([3, 4, 5])
- nxdata.save_NXdata(filename=self.h5fname,
- signal=sig,
- axes=[a0, a1],
- signal_name="sig",
- axes_names=None,
- axes_long_names=["a", "b"],
- nxentry_name="a",
- nxdata_name="mydata")
-
- h5f = h5py.File(self.h5fname, "r")
- self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"]))
-
- nxd = nxdata.NXdata(h5f["/a/mydata"])
- self.assertTrue(numpy.array_equal(nxd.signal,
- sig))
- self.assertTrue(numpy.array_equal(nxd.axes[0],
- a0))
- self.assertEqual(nxd.axes_dataset_names,
- [u"dim0", u"dim1"])
- self.assertEqual(nxd.axes_names,
- [u"a", u"b"])
-
- h5f.close()
-
- def testSaveToExistingEntry(self):
- h5f = h5py.File(self.h5fname, "w")
- g = h5f.create_group("myentry")
- g.attrs["NX_class"] = "NXentry"
- h5f.close()
-
- sig = numpy.array([0, 1, 2])
- a0 = numpy.array([2, 3, 4])
- a1 = numpy.array([3, 4, 5])
- nxdata.save_NXdata(filename=self.h5fname,
- signal=sig,
- axes=[a0, a1],
- signal_name="sig",
- axes_names=["a0", "a1"],
- nxentry_name="myentry",
- nxdata_name="toto")
-
- h5f = h5py.File(self.h5fname, "r")
- self.assertTrue(nxdata.is_valid_nxdata(h5f["myentry/toto"]))
-
- nxd = nxdata.NXdata(h5f["myentry/toto"])
- self.assertTrue(numpy.array_equal(nxd.signal,
- sig))
- self.assertTrue(numpy.array_equal(nxd.axes[0],
- a0))
- h5f.close()
-
-
-def suite():
- test_suite = unittest.TestSuite()
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestNXdata))
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestLegacyNXdata))
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestSaveNXdata))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_octaveh5.py b/silx/io/test/test_octaveh5.py
deleted file mode 100644
index 2e65820..0000000
--- a/silx/io/test/test_octaveh5.py
+++ /dev/null
@@ -1,165 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""
-Tests for the octaveh5 module
-"""
-
-__authors__ = ["C. Nemoz", "H. Payno"]
-__license__ = "MIT"
-__date__ = "12/07/2016"
-
-import unittest
-import os
-import tempfile
-
-try:
- from ..octaveh5 import Octaveh5
-except ImportError:
- Octaveh5 = None
-
-
-@unittest.skipIf(Octaveh5 is None, "Could not import h5py")
-class TestOctaveH5(unittest.TestCase):
- @staticmethod
- def _get_struct_FT():
- return {
- 'NO_CHECK': 0.0, 'SHOWSLICE': 1.0, 'DOTOMO': 1.0, 'DATABASE': 0.0, 'ANGLE_OFFSET': 0.0,
- 'VOLSELECTION_REMEMBER': 0.0, 'NUM_PART': 4.0, 'VOLOUTFILE': 0.0, 'RINGSCORRECTION': 0.0,
- 'DO_TEST_SLICE': 1.0, 'ZEROOFFMASK': 1.0, 'VERSION': 'fastomo3 version 2.0',
- 'CORRECT_SPIKES_THRESHOLD': 0.040000000000000001, 'SHOWPROJ': 0.0, 'HALF_ACQ': 0.0,
- 'ANGLE_OFFSET_VALUE': 0.0, 'FIXEDSLICE': 'middle', 'VOLSELECT': 'total' }
- @staticmethod
- def _get_struct_PYHSTEXE():
- return {
- 'EXE': 'PyHST2_2015d', 'VERBOSE': 0.0, 'OFFV': 'PyHST2_2015d', 'TOMO': 0.0,
- 'VERBOSE_FILE': 'pyhst_out.txt', 'DIR': '/usr/bin/', 'OFFN': 'pyhst2'}
-
- @staticmethod
- def _get_struct_FTAXIS():
- return {
- 'POSITION_VALUE': 12345.0, 'COR_ERROR': 0.0, 'FILESDURINGSCAN': 0.0, 'PLOTFIGURE': 1.0,
- 'DIM1': 0.0, 'OVERSAMPLING': 5.0, 'TO_THE_CENTER': 1.0, 'POSITION': 'fixed',
- 'COR_POSITION': 0.0, 'HA': 0.0 }
-
- @staticmethod
- def _get_struct_PAGANIN():
- return {
- 'MKEEP_MASK': 0.0, 'UNSHARP_SIGMA': 0.80000000000000004, 'DILATE': 2.0, 'UNSHARP_COEFF': 3.0,
- 'MEDIANR': 4.0, 'DB': 500.0, 'MKEEP_ABS': 0.0, 'MODE': 0.0, 'THRESHOLD': 0.5,
- 'MKEEP_BONE': 0.0, 'DB2': 100.0, 'MKEEP_CORR': 0.0, 'MKEEP_SOFT': 0.0 }
-
- @staticmethod
- def _get_struct_BEAMGEO():
- return {'DIST': 55.0, 'SY': 0.0, 'SX': 0.0, 'TYPE': 'p'}
-
-
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- self.test_3_6_fname = os.path.join(self.tempdir, "silx_tmp_t00_octaveTest_3_6.h5")
- self.test_3_8_fname = os.path.join(self.tempdir, "silx_tmp_t00_octaveTest_3_8.h5")
-
- def tearDown(self):
- if os.path.isfile(self.test_3_6_fname):
- os.unlink(self.test_3_6_fname)
- if os.path.isfile(self.test_3_8_fname):
- os.unlink(self.test_3_8_fname)
-
- def testWritedIsReaded(self):
- """
- Simple test to write and reaf the structure compatible with the octave h5 using structure.
- This test is for # test for octave version > 3.8
- """
- writer = Octaveh5()
-
- writer.open(self.test_3_8_fname, 'a')
- # step 1 writing the file
- writer.write('FT', self._get_struct_FT())
- writer.write('PYHSTEXE', self._get_struct_PYHSTEXE())
- writer.write('FTAXIS', self._get_struct_FTAXIS())
- writer.write('PAGANIN', self._get_struct_PAGANIN())
- writer.write('BEAMGEO', self._get_struct_BEAMGEO())
- writer.close()
-
- # step 2 reading the file
- reader = Octaveh5().open(self.test_3_8_fname)
- # 2.1 check FT
- data_readed = reader.get('FT')
- self.assertEqual(data_readed, self._get_struct_FT() )
- # 2.2 check PYHSTEXE
- data_readed = reader.get('PYHSTEXE')
- self.assertEqual(data_readed, self._get_struct_PYHSTEXE() )
- # 2.3 check FTAXIS
- data_readed = reader.get('FTAXIS')
- self.assertEqual(data_readed, self._get_struct_FTAXIS() )
- # 2.4 check PAGANIN
- data_readed = reader.get('PAGANIN')
- self.assertEqual(data_readed, self._get_struct_PAGANIN() )
- # 2.5 check BEAMGEO
- data_readed = reader.get('BEAMGEO')
- self.assertEqual(data_readed, self._get_struct_BEAMGEO() )
- reader.close()
-
- def testWritedIsReadedOldOctaveVersion(self):
- """The same test as testWritedIsReaded but for octave version < 3.8
- """
- # test for octave version < 3.8
- writer = Octaveh5(3.6)
-
- writer.open(self.test_3_6_fname, 'a')
-
- # step 1 writing the file
- writer.write('FT', self._get_struct_FT())
- writer.write('PYHSTEXE', self._get_struct_PYHSTEXE())
- writer.write('FTAXIS', self._get_struct_FTAXIS())
- writer.write('PAGANIN', self._get_struct_PAGANIN())
- writer.write('BEAMGEO', self._get_struct_BEAMGEO())
- writer.close()
-
- # step 2 reading the file
- reader = Octaveh5(3.6).open(self.test_3_6_fname)
- # 2.1 check FT
- data_readed = reader.get('FT')
- self.assertEqual(data_readed, self._get_struct_FT() )
- # 2.2 check PYHSTEXE
- data_readed = reader.get('PYHSTEXE')
- self.assertEqual(data_readed, self._get_struct_PYHSTEXE() )
- # 2.3 check FTAXIS
- data_readed = reader.get('FTAXIS')
- self.assertEqual(data_readed, self._get_struct_FTAXIS() )
- # 2.4 check PAGANIN
- data_readed = reader.get('PAGANIN')
- self.assertEqual(data_readed, self._get_struct_PAGANIN() )
- # 2.5 check BEAMGEO
- data_readed = reader.get('BEAMGEO')
- self.assertEqual(data_readed, self._get_struct_BEAMGEO() )
- reader.close()
-
-def suite():
- test_suite = unittest.TestSuite()
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestOctaveH5))
- return test_suite
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_rawh5.py b/silx/io/test/test_rawh5.py
deleted file mode 100644
index 0f7205c..0000000
--- a/silx/io/test/test_rawh5.py
+++ /dev/null
@@ -1,96 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-#
-# Copyright (c) 2016 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ###########################################################################*/
-"""Test for silx.gui.hdf5 module"""
-
-__authors__ = ["V. Valls"]
-__license__ = "MIT"
-__date__ = "21/09/2017"
-
-
-import unittest
-import tempfile
-import numpy
-import shutil
-from ..import rawh5
-
-
-class TestNumpyFile(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- cls.tmpDirectory = tempfile.mkdtemp()
-
- @classmethod
- def tearDownClass(cls):
- shutil.rmtree(cls.tmpDirectory)
-
- def testNumpyFile(self):
- filename = "%s/%s.npy" % (self.tmpDirectory, self.id())
- c = numpy.random.rand(5, 5)
- numpy.save(filename, c)
- h5 = rawh5.NumpyFile(filename)
- self.assertIn("data", h5)
- self.assertEqual(h5["data"].dtype.kind, "f")
-
- def testNumpyZFile(self):
- filename = "%s/%s.npz" % (self.tmpDirectory, self.id())
- a = numpy.array(u"aaaaa")
- b = numpy.array([1, 2, 3, 4])
- c = numpy.random.rand(5, 5)
- d = numpy.array(b"aaaaa")
- e = numpy.array(u"i \u2661 my mother")
- numpy.savez(filename, a, b=b, c=c, d=d, e=e)
- h5 = rawh5.NumpyFile(filename)
- self.assertIn("arr_0", h5)
- self.assertIn("b", h5)
- self.assertIn("c", h5)
- self.assertIn("d", h5)
- self.assertIn("e", h5)
- self.assertEqual(h5["arr_0"].dtype.kind, "U")
- self.assertEqual(h5["b"].dtype.kind, "i")
- self.assertEqual(h5["c"].dtype.kind, "f")
- self.assertEqual(h5["d"].dtype.kind, "S")
- self.assertEqual(h5["e"].dtype.kind, "U")
-
- def testNumpyZFileContainingDirectories(self):
- filename = "%s/%s.npz" % (self.tmpDirectory, self.id())
- data = {}
- data['a/b/c'] = numpy.arange(10)
- data['a/b/e'] = numpy.arange(10)
- numpy.savez(filename, **data)
- h5 = rawh5.NumpyFile(filename)
- self.assertIn("a/b/c", h5)
- self.assertIn("a/b/e", h5)
-
-
-def suite():
- test_suite = unittest.TestSuite()
- loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
- test_suite.addTest(loadTests(TestNumpyFile))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest='suite')
diff --git a/silx/io/test/test_specfile.py b/silx/io/test/test_specfile.py
deleted file mode 100644
index 79d5544..0000000
--- a/silx/io/test/test_specfile.py
+++ /dev/null
@@ -1,433 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2019 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for specfile wrapper"""
-
-__authors__ = ["P. Knobel", "V.A. Sole"]
-__license__ = "MIT"
-__date__ = "17/01/2018"
-
-
-import locale
-import logging
-import numpy
-import os
-import sys
-import tempfile
-import unittest
-
-from silx.utils import testutils
-
-from ..specfile import SpecFile, Scan
-from .. import specfile
-
-
-logger1 = logging.getLogger(__name__)
-
-sftext = """#F /tmp/sf.dat
-#E 1455180875
-#D Thu Feb 11 09:54:35 2016
-#C imaging User = opid17
-#U00 user comment first line
-#U01 This is a dummy file to test SpecFile parsing
-#U02
-#U03 last line
-
-#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
-#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
-#o0 pshg mrtu mrtd
-#o2 ss1vo ss1ho ss1vg
-
-#J0 Seconds IA ion.mono Current
-#J1 xbpmc2 idgap1 Inorm
-
-#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
-#D Thu Feb 11 09:55:20 2016
-#T 0.2 (Seconds)
-#G0 0
-#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
-#G3 0 0 0 0 0 0 0 0 0
-#G4 0
-#Q
-#P0 180.005 -0.66875 0.87125
-#P1 14.74255 16.197579 12.238283
-#UMI0 Current AutoM Shutter
-#UMI1 192.51 OFF FE open
-#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00;
-#N 4
-#L first column second column 3rd_col
--1.23 5.89 8
-8.478100E+01 5 1.56
-3.14 2.73 -3.14
-1.2 2.3 3.4
-
-#S 25 ascan c3th 1.33245 1.52245 40 0.15
-#D Thu Feb 11 10:00:31 2016
-#P0 80.005 -1.66875 1.87125
-#P1 4.74255 6.197579 2.238283
-#N 5
-#L column0 column1 col2 col3
-0.0 0.1 0.2 0.3
-1.0 1.1 1.2 1.3
-2.0 2.1 2.2 2.3
-3.0 3.1 3.2 3.3
-
-#S 26 yyyyyy
-#D Thu Feb 11 09:55:20 2016
-#P0 80.005 -1.66875 1.87125
-#P1 4.74255 6.197579 2.238283
-#N 4
-#L first column second column 3rd_col
-#C Sat Oct 31 15:51:47 1998. Scan aborted after 0 points.
-
-#F /tmp/sf.dat
-#E 1455180876
-#D Thu Feb 11 09:54:36 2016
-
-#S 1 aaaaaa
-#U first duplicate line
-#U second duplicate line
-#@MCADEV 1
-#@MCA %16C
-#@CHANN 3 0 2 1
-#@CALIB 1 2 3
-#N 3
-#L uno duo
-1 2
-@A 0 1 2
-3 4
-@A 3.1 4 5
-5 6
-@A 6 7.7 8
-"""
-
-
-loc = locale.getlocale(locale.LC_NUMERIC)
-try:
- locale.setlocale(locale.LC_NUMERIC, 'de_DE.utf8')
-except locale.Error:
- try_DE = False
-else:
- try_DE = True
- locale.setlocale(locale.LC_NUMERIC, loc)
-
-
-class TestSpecFile(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- fd, cls.fname1 = tempfile.mkstemp(text=False)
- if sys.version_info < (3, ):
- os.write(fd, sftext)
- else:
- os.write(fd, bytes(sftext, 'ascii'))
- os.close(fd)
-
- fd2, cls.fname2 = tempfile.mkstemp(text=False)
- if sys.version_info < (3, ):
- os.write(fd2, sftext[370:923])
- else:
- os.write(fd2, bytes(sftext[370:923], 'ascii'))
- os.close(fd2)
-
- fd3, cls.fname3 = tempfile.mkstemp(text=False)
- txt = sftext[371:923]
- if sys.version_info < (3, ):
- os.write(fd3, txt)
- else:
- os.write(fd3, bytes(txt, 'ascii'))
- os.close(fd3)
-
- @classmethod
- def tearDownClass(cls):
- os.unlink(cls.fname1)
- os.unlink(cls.fname2)
- os.unlink(cls.fname3)
-
- def setUp(self):
- self.sf = SpecFile(self.fname1)
- self.scan1 = self.sf[0]
- self.scan1_2 = self.sf["1.2"]
- self.scan25 = self.sf["25.1"]
- self.empty_scan = self.sf["26.1"]
-
- self.sf_no_fhdr = SpecFile(self.fname2)
- self.scan1_no_fhdr = self.sf_no_fhdr[0]
-
- self.sf_no_fhdr_crash = SpecFile(self.fname3)
- self.scan1_no_fhdr_crash = self.sf_no_fhdr_crash[0]
-
- def tearDown(self):
- self.sf.close()
- self.sf_no_fhdr.close()
- self.sf_no_fhdr_crash.close()
-
- def test_open(self):
- self.assertIsInstance(self.sf, SpecFile)
- with self.assertRaises(specfile.SfErrFileOpen):
- SpecFile("doesnt_exist.dat")
-
- # test filename types unicode and bytes
- if sys.version_info[0] < 3:
- try:
- SpecFile(self.fname1)
- except TypeError:
- self.fail("failed to handle filename as python2 str")
- try:
- SpecFile(unicode(self.fname1))
- except TypeError:
- self.fail("failed to handle filename as python2 unicode")
- else:
- try:
- SpecFile(self.fname1)
- except TypeError:
- self.fail("failed to handle filename as python3 str")
- try:
- SpecFile(bytes(self.fname1, 'utf-8'))
- except TypeError:
- self.fail("failed to handle filename as python3 bytes")
-
- def test_number_of_scans(self):
- self.assertEqual(4, len(self.sf))
-
- def test_list_of_scan_indices(self):
- self.assertEqual(self.sf.list(),
- [1, 25, 26, 1])
- self.assertEqual(self.sf.keys(),
- ["1.1", "25.1", "26.1", "1.2"])
-
- def test_index_number_order(self):
- self.assertEqual(self.sf.index(1, 2), 3) # sf["1.2"]==sf[3]
- self.assertEqual(self.sf.number(1), 25) # sf[1]==sf["25"]
- self.assertEqual(self.sf.order(3), 2) # sf[3]==sf["1.2"]
- with self.assertRaises(specfile.SfErrScanNotFound):
- self.sf.index(3, 2)
- with self.assertRaises(specfile.SfErrScanNotFound):
- self.sf.index(99)
-
- def assertRaisesRegex(self, *args, **kwargs):
- # Python 2 compatibility
- if sys.version_info.major >= 3:
- return super(TestSpecFile, self).assertRaisesRegex(*args, **kwargs)
- else:
- return self.assertRaisesRegexp(*args, **kwargs)
-
- def test_getitem(self):
- self.assertIsInstance(self.sf[2], Scan)
- self.assertIsInstance(self.sf["1.2"], Scan)
- # int out of range
- with self.assertRaisesRegex(IndexError, 'Scan index must be in ran'):
- self.sf[107]
- # float indexing not allowed
- with self.assertRaisesRegex(TypeError, 'The scan identification k'):
- self.sf[1.2]
- # non existant scan with "N.M" indexing
- with self.assertRaises(KeyError):
- self.sf["3.2"]
-
- def test_specfile_iterator(self):
- i = 0
- for scan in self.sf:
- if i == 1:
- self.assertEqual(scan.motor_positions,
- self.sf[1].motor_positions)
- i += 1
- # number of returned scans
- self.assertEqual(i, len(self.sf))
-
- def test_scan_index(self):
- self.assertEqual(self.scan1.index, 0)
- self.assertEqual(self.scan1_2.index, 3)
- self.assertEqual(self.scan25.index, 1)
-
- def test_scan_headers(self):
- self.assertEqual(self.scan25.scan_header_dict['S'],
- "25 ascan c3th 1.33245 1.52245 40 0.15")
- self.assertEqual(self.scan1.header[17], '#G0 0')
- self.assertEqual(len(self.scan1.header), 29)
- # parsing headers with long keys
- self.assertEqual(self.scan1.scan_header_dict['UMI0'],
- 'Current AutoM Shutter')
- # parsing empty headers
- self.assertEqual(self.scan1.scan_header_dict['Q'], '')
- # duplicate headers: concatenated (with newline)
- self.assertEqual(self.scan1_2.scan_header_dict["U"],
- "first duplicate line\nsecond duplicate line")
-
- def test_file_headers(self):
- self.assertEqual(self.scan1.header[1],
- '#E 1455180875')
- self.assertEqual(self.scan1.file_header_dict['F'],
- '/tmp/sf.dat')
-
- def test_multiple_file_headers(self):
- """Scan 1.2 is after the second file header, with a different
- Epoch"""
- self.assertEqual(self.scan1_2.header[1],
- '#E 1455180876')
-
- def test_scan_labels(self):
- self.assertEqual(self.scan1.labels,
- ['first column', 'second column', '3rd_col'])
-
- def test_data(self):
- # data_line() and data_col() take 1-based indices as arg
- self.assertAlmostEqual(self.scan1.data_line(1)[2],
- 1.56)
- # tests for data transposition between original file and .data attr
- self.assertAlmostEqual(self.scan1.data[2, 0],
- 8)
- self.assertEqual(self.scan1.data.shape, (3, 4))
- self.assertAlmostEqual(numpy.sum(self.scan1.data), 113.631)
-
- def test_data_column_by_name(self):
- self.assertAlmostEqual(self.scan25.data_column_by_name("col2")[1],
- 1.2)
- # Scan.data is transposed after readinq, so column is the first index
- self.assertAlmostEqual(numpy.sum(self.scan25.data_column_by_name("col2")),
- numpy.sum(self.scan25.data[2, :]))
- with self.assertRaises(specfile.SfErrColNotFound):
- self.scan25.data_column_by_name("ygfxgfyxg")
-
- def test_motors(self):
- self.assertEqual(len(self.scan1.motor_names), 6)
- self.assertEqual(len(self.scan1.motor_positions), 6)
- self.assertAlmostEqual(sum(self.scan1.motor_positions),
- 223.385912)
- self.assertEqual(self.scan1.motor_names[1], 'MRTSlit UP')
- self.assertAlmostEqual(
- self.scan25.motor_position_by_name('MRTSlit UP'),
- -1.66875)
-
- def test_absence_of_file_header(self):
- """We expect Scan.file_header to be an empty list in the absence
- of a file header.
- """
- self.assertEqual(len(self.scan1_no_fhdr.motor_names), 0)
- # motor positions can still be read in the scan header
- # even in the absence of motor names
- self.assertAlmostEqual(sum(self.scan1_no_fhdr.motor_positions),
- 223.385912)
- self.assertEqual(len(self.scan1_no_fhdr.header), 15)
- self.assertEqual(len(self.scan1_no_fhdr.scan_header), 15)
- self.assertEqual(len(self.scan1_no_fhdr.file_header), 0)
-
- def test_crash_absence_of_file_header(self):
- """Test no crash in absence of file header and no leading newline
- character
- """
- self.assertEqual(len(self.scan1_no_fhdr_crash.motor_names), 0)
- # motor positions can still be read in the scan header
- # even in the absence of motor names
- self.assertAlmostEqual(sum(self.scan1_no_fhdr_crash.motor_positions),
- 223.385912)
- self.assertEqual(len(self.scan1_no_fhdr_crash.scan_header), 15)
- self.assertEqual(len(self.scan1_no_fhdr_crash.file_header), 0)
-
- def test_mca(self):
- self.assertEqual(len(self.scan1.mca), 0)
- self.assertEqual(len(self.scan1_2.mca), 3)
- self.assertEqual(self.scan1_2.mca[1][2], 5)
- self.assertEqual(sum(self.scan1_2.mca[2]), 21.7)
-
- # Negative indexing
- self.assertEqual(sum(self.scan1_2.mca[len(self.scan1_2.mca) - 1]),
- sum(self.scan1_2.mca[-1]))
-
- # Test iterator
- line_count, total_sum = (0, 0)
- for mca_line in self.scan1_2.mca:
- line_count += 1
- total_sum += sum(mca_line)
- self.assertEqual(line_count, 3)
- self.assertAlmostEqual(total_sum, 36.8)
-
- def test_mca_header(self):
- self.assertEqual(self.scan1.mca_header_dict, {})
- self.assertEqual(len(self.scan1_2.mca_header_dict), 4)
- self.assertEqual(self.scan1_2.mca_header_dict["CALIB"], "1 2 3")
- self.assertEqual(self.scan1_2.mca.calibration,
- [[1., 2., 3.]])
- # default calib in the absence of #@CALIB
- self.assertEqual(self.scan25.mca.calibration,
- [[0., 1., 0.]])
- self.assertEqual(self.scan1_2.mca.channels,
- [[0, 1, 2]])
- # absence of #@CHANN and spectra
- self.assertEqual(self.scan25.mca.channels,
- [])
-
- @testutils.test_logging(specfile._logger.name, warning=1)
- def test_empty_scan(self):
- """Test reading a scan with no data points"""
- self.assertEqual(len(self.empty_scan.labels),
- 3)
- col1 = self.empty_scan.data_column_by_name("second column")
- self.assertEqual(col1.shape, (0, ))
-
-
-class TestSFLocale(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- fd, cls.fname = tempfile.mkstemp(text=False)
- if sys.version_info < (3, ):
- os.write(fd, sftext)
- else:
- os.write(fd, bytes(sftext, 'ascii'))
- os.close(fd)
-
- @classmethod
- def tearDownClass(cls):
- os.unlink(cls.fname)
- locale.setlocale(locale.LC_NUMERIC, loc) # restore saved locale
-
- def crunch_data(self):
- self.sf3 = SpecFile(self.fname)
- self.assertAlmostEqual(self.sf3[0].data_line(1)[2],
- 1.56)
- self.sf3.close()
-
- @unittest.skipIf(not try_DE, "de_DE.utf8 locale not installed")
- def test_locale_de_DE(self):
- locale.setlocale(locale.LC_NUMERIC, 'de_DE.utf8')
- self.crunch_data()
-
- def test_locale_user(self):
- locale.setlocale(locale.LC_NUMERIC, '') # use user's preferred locale
- self.crunch_data()
-
- def test_locale_C(self):
- locale.setlocale(locale.LC_NUMERIC, 'C') # use default (C) locale
- self.crunch_data()
-
-
-def suite():
- test_suite = unittest.TestSuite()
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecFile))
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestSFLocale))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_specfilewrapper.py b/silx/io/test/test_specfilewrapper.py
deleted file mode 100644
index 2f463fa..0000000
--- a/silx/io/test/test_specfilewrapper.py
+++ /dev/null
@@ -1,206 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for old specfile wrapper"""
-
-__authors__ = ["P. Knobel"]
-__license__ = "MIT"
-__date__ = "15/05/2017"
-
-import locale
-import logging
-import numpy
-import os
-import sys
-import tempfile
-import unittest
-
-logger1 = logging.getLogger(__name__)
-
-from ..specfilewrapper import Specfile
-
-sftext = """#F /tmp/sf.dat
-#E 1455180875
-#D Thu Feb 11 09:54:35 2016
-#C imaging User = opid17
-#U00 user comment first line
-#U01 This is a dummy file to test SpecFile parsing
-#U02
-#U03 last line
-
-#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
-#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
-#o0 pshg mrtu mrtd
-#o2 ss1vo ss1ho ss1vg
-
-#J0 Seconds IA ion.mono Current
-#J1 xbpmc2 idgap1 Inorm
-
-#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
-#D Thu Feb 11 09:55:20 2016
-#T 0.2 (Seconds)
-#G0 0
-#G1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
-#G3 0 0 0 0 0 0 0 0 0
-#G4 0
-#Q
-#P0 180.005 -0.66875 0.87125
-#P1 14.74255 16.197579 12.238283
-#UMI0 Current AutoM Shutter
-#UMI1 192.51 OFF FE open
-#UMI2 Refill in 39883 sec, Fill Mode: uniform multibunch / Message: Feb 11 08:00 Delivery:Next Refill at 21:00;
-#N 4
-#L first column second column 3rd_col
--1.23 5.89 8
-8.478100E+01 5 1.56
-3.14 2.73 -3.14
-1.2 2.3 3.4
-
-#S 25 ascan c3th 1.33245 1.52245 40 0.15
-#D Thu Feb 11 10:00:31 2016
-#P0 80.005 -1.66875 1.87125
-#P1 4.74255 6.197579 2.238283
-#N 5
-#L column0 column1 col2 col3
-0.0 0.1 0.2 0.3
-1.0 1.1 1.2 1.3
-2.0 2.1 2.2 2.3
-3.0 3.1 3.2 3.3
-
-#F /tmp/sf.dat
-#E 1455180876
-#D Thu Feb 11 09:54:36 2016
-
-#S 1 aaaaaa
-#U first duplicate line
-#U second duplicate line
-#@MCADEV 1
-#@MCA %16C
-#@CHANN 3 0 2 1
-#@CALIB 1 2 3
-#N 3
-#L uno duo
-1 2
-@A 0 1 2
-3 4
-@A 3.1 4 5
-5 6
-@A 6 7.7 8
-"""
-
-
-class TestSpecfilewrapper(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- fd, cls.fname1 = tempfile.mkstemp(text=False)
- if sys.version_info < (3, ):
- os.write(fd, sftext)
- else:
- os.write(fd, bytes(sftext, 'ascii'))
- os.close(fd)
-
- @classmethod
- def tearDownClass(cls):
- os.unlink(cls.fname1)
-
- def setUp(self):
- self.sf = Specfile(self.fname1)
- self.scan1 = self.sf[0]
- self.scan1_2 = self.sf.select("1.2")
- self.scan25 = self.sf.select("25.1")
-
- def tearDown(self):
- self.sf.close()
-
- def test_number_of_scans(self):
- self.assertEqual(3, len(self.sf))
-
- def test_list_of_scan_indices(self):
- self.assertEqual(self.sf.list(),
- '1,25,1')
- self.assertEqual(self.sf.keys(),
- ["1.1", "25.1", "1.2"])
-
- def test_scan_headers(self):
- self.assertEqual(self.scan25.header('S'),
- ["#S 25 ascan c3th 1.33245 1.52245 40 0.15"])
- self.assertEqual(self.scan1.header("G0"), ['#G0 0'])
- # parsing headers with long keys
- # parsing empty headers
- self.assertEqual(self.scan1.header('Q'), ['#Q '])
-
- def test_file_headers(self):
- self.assertEqual(self.scan1.header("E"),
- ['#E 1455180875'])
- self.assertEqual(self.sf.title(),
- "imaging")
- self.assertEqual(self.sf.epoch(),
- 1455180875)
- self.assertEqual(self.sf.allmotors(),
- ["Pslit HGap", "MRTSlit UP", "MRTSlit DOWN",
- "Sslit1 VOff", "Sslit1 HOff", "Sslit1 VGap"])
-
- def test_scan_labels(self):
- self.assertEqual(self.scan1.alllabels(),
- ['first column', 'second column', '3rd_col'])
-
- def test_data(self):
- self.assertAlmostEqual(self.scan1.dataline(3)[2],
- -3.14)
- self.assertAlmostEqual(self.scan1.datacol(1)[2],
- 3.14)
- # tests for data transposition between original file and .data attr
- self.assertAlmostEqual(self.scan1.data()[2, 0],
- 8)
- self.assertEqual(self.scan1.data().shape, (3, 4))
- self.assertAlmostEqual(numpy.sum(self.scan1.data()), 113.631)
-
- def test_date(self):
- self.assertEqual(self.scan1.date(),
- "Thu Feb 11 09:55:20 2016")
-
- def test_motors(self):
- self.assertEqual(len(self.sf.allmotors()), 6)
- self.assertEqual(len(self.scan1.allmotorpos()), 6)
- self.assertAlmostEqual(sum(self.scan1.allmotorpos()),
- 223.385912)
- self.assertEqual(self.sf.allmotors()[1], 'MRTSlit UP')
-
- def test_mca(self):
- self.assertEqual(self.scan1_2.mca(2)[2], 5)
- self.assertEqual(sum(self.scan1_2.mca(3)), 21.7)
-
- def test_mca_header(self):
- self.assertEqual(self.scan1_2.header("CALIB"),
- ["#@CALIB 1 2 3"])
-
-
-def suite():
- test_suite = unittest.TestSuite()
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecfilewrapper))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_spech5.py b/silx/io/test/test_spech5.py
deleted file mode 100644
index 0263c3c..0000000
--- a/silx/io/test/test_spech5.py
+++ /dev/null
@@ -1,881 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2019 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for spech5"""
-from numpy import array_equal
-import os
-import io
-import sys
-import tempfile
-import unittest
-import datetime
-from functools import partial
-
-from silx.utils import testutils
-
-from .. import spech5
-from ..spech5 import (SpecH5, SpecH5Dataset, spec_date_to_iso8601)
-from .. import specfile
-
-import h5py
-
-__authors__ = ["P. Knobel"]
-__license__ = "MIT"
-__date__ = "12/02/2018"
-
-sftext = """#F /tmp/sf.dat
-#E 1455180875
-#D Thu Feb 11 09:54:35 2016
-#C imaging User = opid17
-#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
-#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
-#o0 pshg mrtu mrtd
-#o2 ss1vo ss1ho ss1vg
-
-#J0 Seconds IA ion.mono Current
-#J1 xbpmc2 idgap1 Inorm
-
-#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
-#D Thu Feb 11 09:55:20 2016
-#T 0.2 (Seconds)
-#P0 180.005 -0.66875 0.87125
-#P1 14.74255 16.197579 12.238283
-#N 4
-#L MRTSlit UP second column 3rd_col
--1.23 5.89 8
-8.478100E+01 5 1.56
-3.14 2.73 -3.14
-1.2 2.3 3.4
-
-#S 25 ascan c3th 1.33245 1.52245 40 0.15
-#D Sat 2015/03/14 03:53:50
-#P0 80.005 -1.66875 1.87125
-#P1 4.74255 6.197579 2.238283
-#N 5
-#L column0 column1 col2 col3
-0.0 0.1 0.2 0.3
-1.0 1.1 1.2 1.3
-2.0 2.1 2.2 2.3
-3.0 3.1 3.2 3.3
-
-#S 1 aaaaaa
-#D Thu Feb 11 10:00:32 2016
-#@MCADEV 1
-#@MCA %16C
-#@CHANN 3 0 2 1
-#@CALIB 1 2 3
-#@CTIME 123.4 234.5 345.6
-#N 3
-#L uno duo
-1 2
-@A 0 1 2
-@A 10 9 8
-@A 1 1 1.1
-3 4
-@A 3.1 4 5
-@A 7 6 5
-@A 1 1 1
-5 6
-@A 6 7.7 8
-@A 4 3 2
-@A 1 1 1
-
-#S 1000 bbbbb
-#G1 3.25 3.25 5.207 90 90 120 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353
-#G3 0.0106337923671 0.027529133 1.206191273 -1.43467075 0.7633438883 0.02401568018 -1.709143587 -2.097621783 0.02456954971
-#L a b
-1 2
-
-#S 1001 ccccc
-#G1 0. 0. 0. 0 0 0 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353
-#G3 0. 0. 0. 0. 0.0 0. 0. 0. 0.
-#L a b
-1 2
-
-"""
-
-
-class TestSpecDate(unittest.TestCase):
- """
- Test of the spec_date_to_iso8601 function.
- """
- # TODO : time zone tests
- # TODO : error cases
-
- @classmethod
- def setUpClass(cls):
- import locale
- # FYI : not threadsafe
- cls.locale_saved = locale.setlocale(locale.LC_TIME)
- locale.setlocale(locale.LC_TIME, 'C')
-
- @classmethod
- def tearDownClass(cls):
- import locale
- # FYI : not threadsafe
- locale.setlocale(locale.LC_TIME, cls.locale_saved)
-
- def setUp(self):
- # covering all week days
- self.n_days = range(1, 10)
- # covering all months
- self.n_months = range(1, 13)
-
- self.n_years = [1999, 2016, 2020]
- self.n_seconds = [0, 5, 26, 59]
- self.n_minutes = [0, 9, 42, 59]
- self.n_hours = [0, 2, 17, 23]
-
- self.formats = ['%a %b %d %H:%M:%S %Y', '%a %Y/%m/%d %H:%M:%S']
-
- self.check_date_formats = partial(self.__check_date_formats,
- year=self.n_years[0],
- month=self.n_months[0],
- day=self.n_days[0],
- hour=self.n_hours[0],
- minute=self.n_minutes[0],
- second=self.n_seconds[0],
- msg=None)
-
- def __check_date_formats(self,
- year,
- month,
- day,
- hour,
- minute,
- second,
- msg=None):
- dt = datetime.datetime(year, month, day, hour, minute, second)
- expected_date = dt.isoformat()
-
- for i_fmt, fmt in enumerate(self.formats):
- spec_date = dt.strftime(fmt)
- iso_date = spec_date_to_iso8601(spec_date)
- self.assertEqual(iso_date,
- expected_date,
- msg='Testing {0}. format={1}. '
- 'Expected "{2}", got "{3} ({4})" (dt={5}).'
- ''.format(msg,
- i_fmt,
- expected_date,
- iso_date,
- spec_date,
- dt))
-
- def testYearsNominal(self):
- for year in self.n_years:
- self.check_date_formats(year=year, msg='year')
-
- def testMonthsNominal(self):
- for month in self.n_months:
- self.check_date_formats(month=month, msg='month')
-
- def testDaysNominal(self):
- for day in self.n_days:
- self.check_date_formats(day=day, msg='day')
-
- def testHoursNominal(self):
- for hour in self.n_hours:
- self.check_date_formats(hour=hour, msg='hour')
-
- def testMinutesNominal(self):
- for minute in self.n_minutes:
- self.check_date_formats(minute=minute, msg='minute')
-
- def testSecondsNominal(self):
- for second in self.n_seconds:
- self.check_date_formats(second=second, msg='second')
-
-
-class TestSpecH5(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- fd, cls.fname = tempfile.mkstemp()
- if sys.version_info < (3, ):
- os.write(fd, sftext)
- else:
- os.write(fd, bytes(sftext, 'ascii'))
- os.close(fd)
-
- @classmethod
- def tearDownClass(cls):
- os.unlink(cls.fname)
-
- def setUp(self):
- self.sfh5 = SpecH5(self.fname)
-
- def tearDown(self):
- self.sfh5.close()
-
- def testContainsFile(self):
- self.assertIn("/1.2/measurement", self.sfh5)
- self.assertIn("/25.1", self.sfh5)
- self.assertIn("25.1", self.sfh5)
- self.assertNotIn("25.2", self.sfh5)
- # measurement is a child of a scan, full path would be required to
- # access from root level
- self.assertNotIn("measurement", self.sfh5)
- # Groups may or may not have a trailing /
- self.assertIn("/1.2/measurement/mca_1/", self.sfh5)
- self.assertIn("/1.2/measurement/mca_1", self.sfh5)
- # Datasets can't have a trailing /
- self.assertNotIn("/1.2/measurement/mca_0/info/calibration/ ", self.sfh5)
- # No mca_8
- self.assertNotIn("/1.2/measurement/mca_8/info/calibration", self.sfh5)
- # Link
- self.assertIn("/1.2/measurement/mca_0/info/calibration", self.sfh5)
-
- def testContainsGroup(self):
- self.assertIn("measurement", self.sfh5["/1.2/"])
- self.assertIn("measurement", self.sfh5["/1.2"])
- self.assertIn("25.1", self.sfh5["/"])
- self.assertNotIn("25.2", self.sfh5["/"])
- self.assertIn("instrument/positioners/Sslit1 HOff", self.sfh5["/1.1"])
- # illegal trailing "/" after dataset name
- self.assertNotIn("instrument/positioners/Sslit1 HOff/",
- self.sfh5["/1.1"])
- # full path to element in group (OK)
- self.assertIn("/1.1/instrument/positioners/Sslit1 HOff",
- self.sfh5["/1.1/instrument"])
-
- def testDataColumn(self):
- self.assertAlmostEqual(sum(self.sfh5["/1.2/measurement/duo"]),
- 12.0)
- self.assertAlmostEqual(
- sum(self.sfh5["1.1"]["measurement"]["MRTSlit UP"]),
- 87.891, places=4)
-
- def testDate(self):
- # start time is in Iso8601 format
- self.assertEqual(self.sfh5["/1.1/start_time"],
- u"2016-02-11T09:55:20")
- self.assertEqual(self.sfh5["25.1/start_time"],
- u"2015-03-14T03:53:50")
-
- def assertRaisesRegex(self, *args, **kwargs):
- # Python 2 compatibility
- if sys.version_info.major >= 3:
- return super(TestSpecH5, self).assertRaisesRegex(*args, **kwargs)
- else:
- return self.assertRaisesRegexp(*args, **kwargs)
-
- def testDatasetInstanceAttr(self):
- """The SpecH5Dataset objects must implement some dummy attributes
- to improve compatibility with widgets dealing with h5py datasets."""
- self.assertIsNone(self.sfh5["/1.1/start_time"].compression)
- self.assertIsNone(self.sfh5["1.1"]["measurement"]["MRTSlit UP"].chunks)
-
- # error message must be explicit
- with self.assertRaisesRegex(
- AttributeError,
- "SpecH5Dataset has no attribute tOTo"):
- dummy = self.sfh5["/1.1/start_time"].tOTo
-
- def testGet(self):
- """Test :meth:`SpecH5Group.get`"""
- # default value of param *default* is None
- self.assertIsNone(self.sfh5.get("toto"))
- self.assertEqual(self.sfh5["25.1"].get("toto", default=-3),
- -3)
-
- self.assertEqual(self.sfh5.get("/1.1/start_time", default=-3),
- u"2016-02-11T09:55:20")
-
- def testGetClass(self):
- """Test :meth:`SpecH5Group.get`"""
- self.assertIs(self.sfh5["1.1"].get("start_time", getclass=True),
- h5py.Dataset)
- self.assertIs(self.sfh5["1.1"].get("instrument", getclass=True),
- h5py.Group)
-
- # spech5 does not define external link, so there is no way
- # a group can *get* a SpecH5 class
-
- def testGetApi(self):
- result = self.sfh5.get("1.1", getclass=True, getlink=True)
- self.assertIs(result, h5py.HardLink)
- result = self.sfh5.get("1.1", getclass=False, getlink=True)
- self.assertIsInstance(result, h5py.HardLink)
- result = self.sfh5.get("1.1", getclass=True, getlink=False)
- self.assertIs(result, h5py.Group)
- result = self.sfh5.get("1.1", getclass=False, getlink=False)
- self.assertIsInstance(result, spech5.SpecH5Group)
-
- def testGetItemGroup(self):
- group = self.sfh5["25.1"]["instrument"]
- self.assertEqual(list(group["positioners"].keys()),
- ["Pslit HGap", "MRTSlit UP", "MRTSlit DOWN",
- "Sslit1 VOff", "Sslit1 HOff", "Sslit1 VGap"])
- with self.assertRaises(KeyError):
- group["Holy Grail"]
-
- def testGetitemSpecH5(self):
- self.assertEqual(self.sfh5["/1.2/instrument/positioners"],
- self.sfh5["1.2"]["instrument"]["positioners"])
-
- def testH5pyClass(self):
- """Test :attr:`h5py_class` returns the corresponding h5py class
- (h5py.File, h5py.Group, h5py.Dataset)"""
- a_file = self.sfh5
- self.assertIs(a_file.h5py_class,
- h5py.File)
-
- a_group = self.sfh5["/1.2/measurement"]
- self.assertIs(a_group.h5py_class,
- h5py.Group)
-
- a_dataset = self.sfh5["/1.1/instrument/positioners/Sslit1 HOff"]
- self.assertIs(a_dataset.h5py_class,
- h5py.Dataset)
-
- def testHeader(self):
- file_header = self.sfh5["/1.2/instrument/specfile/file_header"]
- scan_header = self.sfh5["/1.2/instrument/specfile/scan_header"]
-
- # File header has 10 lines
- self.assertEqual(len(file_header), 10)
- # 1.2 has 9 scan & mca header lines
- self.assertEqual(len(scan_header), 9)
-
- # line 4 of file header
- self.assertEqual(
- file_header[3],
- u"#C imaging User = opid17")
- # line 4 of scan header
- scan_header = self.sfh5["25.1/instrument/specfile/scan_header"]
-
- self.assertEqual(
- scan_header[3],
- u"#P1 4.74255 6.197579 2.238283")
-
- def testLinks(self):
- self.assertTrue(
- array_equal(self.sfh5["/1.2/measurement/mca_0/data"],
- self.sfh5["/1.2/instrument/mca_0/data"])
- )
- self.assertTrue(
- array_equal(self.sfh5["/1.2/measurement/mca_0/info/data"],
- self.sfh5["/1.2/instrument/mca_0/data"])
- )
- self.assertTrue(
- array_equal(self.sfh5["/1.2/measurement/mca_0/info/channels"],
- self.sfh5["/1.2/instrument/mca_0/channels"])
- )
- self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/"].keys(),
- self.sfh5["/1.2/instrument/mca_0/"].keys())
-
- self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/preset_time"],
- self.sfh5["/1.2/instrument/mca_0/preset_time"])
- self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/live_time"],
- self.sfh5["/1.2/instrument/mca_0/live_time"])
- self.assertEqual(self.sfh5["/1.2/measurement/mca_0/info/elapsed_time"],
- self.sfh5["/1.2/instrument/mca_0/elapsed_time"])
-
- def testListScanIndices(self):
- self.assertEqual(list(self.sfh5.keys()),
- ["1.1", "25.1", "1.2", "1000.1", "1001.1"])
- self.assertEqual(self.sfh5["1.2"].attrs,
- {"NX_class": "NXentry", })
-
- def testMcaAbsent(self):
- def access_absent_mca():
- """This must raise a KeyError, because scan 1.1 has no MCA"""
- return self.sfh5["/1.1/measurement/mca_0/"]
- self.assertRaises(KeyError, access_absent_mca)
-
- def testMcaCalib(self):
- mca0_calib = self.sfh5["/1.2/measurement/mca_0/info/calibration"]
- mca1_calib = self.sfh5["/1.2/measurement/mca_1/info/calibration"]
- self.assertEqual(mca0_calib.tolist(),
- [1, 2, 3])
- # calibration is unique in this scan and applies to all analysers
- self.assertEqual(mca0_calib.tolist(),
- mca1_calib.tolist())
-
- def testMcaChannels(self):
- mca0_chann = self.sfh5["/1.2/measurement/mca_0/info/channels"]
- mca1_chann = self.sfh5["/1.2/measurement/mca_1/info/channels"]
- self.assertEqual(mca0_chann.tolist(),
- [0, 1, 2])
- self.assertEqual(mca0_chann.tolist(),
- mca1_chann.tolist())
-
- def testMcaCtime(self):
- """Tests for #@CTIME mca header"""
- datasets = ["preset_time", "live_time", "elapsed_time"]
- for ds in datasets:
- self.assertNotIn("/1.1/instrument/mca_0/" + ds, self.sfh5)
- self.assertIn("/1.2/instrument/mca_0/" + ds, self.sfh5)
-
- mca0_preset_time = self.sfh5["/1.2/instrument/mca_0/preset_time"]
- mca1_preset_time = self.sfh5["/1.2/instrument/mca_1/preset_time"]
- self.assertLess(mca0_preset_time - 123.4,
- 10**-5)
- # ctime is unique in a this scan and applies to all analysers
- self.assertEqual(mca0_preset_time,
- mca1_preset_time)
-
- mca0_live_time = self.sfh5["/1.2/instrument/mca_0/live_time"]
- mca1_live_time = self.sfh5["/1.2/instrument/mca_1/live_time"]
- self.assertLess(mca0_live_time - 234.5,
- 10**-5)
- self.assertEqual(mca0_live_time,
- mca1_live_time)
-
- mca0_elapsed_time = self.sfh5["/1.2/instrument/mca_0/elapsed_time"]
- mca1_elapsed_time = self.sfh5["/1.2/instrument/mca_1/elapsed_time"]
- self.assertLess(mca0_elapsed_time - 345.6,
- 10**-5)
- self.assertEqual(mca0_elapsed_time,
- mca1_elapsed_time)
-
- def testMcaData(self):
- # sum 1st MCA in scan 1.2 over rows
- mca_0_data = self.sfh5["/1.2/measurement/mca_0/data"]
- for summed_row, expected in zip(mca_0_data.sum(axis=1).tolist(),
- [3.0, 12.1, 21.7]):
- self.assertAlmostEqual(summed_row, expected, places=4)
-
- # sum 3rd MCA in scan 1.2 along both axis
- mca_2_data = self.sfh5["1.2"]["measurement"]["mca_2"]["data"]
- self.assertAlmostEqual(sum(sum(mca_2_data)), 9.1, places=5)
- # attrs
- self.assertEqual(mca_0_data.attrs, {"interpretation": "spectrum"})
-
- def testMotorPosition(self):
- positioners_group = self.sfh5["/1.1/instrument/positioners"]
- # MRTSlit DOWN position is defined in #P0 san header line
- self.assertAlmostEqual(float(positioners_group["MRTSlit DOWN"]),
- 0.87125)
- # MRTSlit UP position is defined in first data column
- for a, b in zip(positioners_group["MRTSlit UP"].tolist(),
- [-1.23, 8.478100E+01, 3.14, 1.2]):
- self.assertAlmostEqual(float(a), b, places=4)
-
- def testNumberMcaAnalysers(self):
- """Scan 1.2 has 2 data columns + 3 mca spectra per data line."""
- self.assertEqual(len(self.sfh5["1.2"]["measurement"]), 5)
-
- def testTitle(self):
- self.assertEqual(self.sfh5["/25.1/title"],
- u"ascan c3th 1.33245 1.52245 40 0.15")
-
- def testValues(self):
- group = self.sfh5["/25.1"]
- self.assertTrue(hasattr(group, "values"))
- self.assertTrue(callable(group.values))
- self.assertIn(self.sfh5["/25.1/title"],
- self.sfh5["/25.1"].values())
-
- # visit and visititems ignore links
- def testVisit(self):
- name_list = []
- self.sfh5.visit(name_list.append)
- self.assertIn('1.2/instrument/positioners/Pslit HGap', name_list)
- self.assertIn("1.2/instrument/specfile/scan_header", name_list)
- self.assertEqual(len(name_list), 117)
-
- # test also visit of a subgroup, with various group name formats
- name_list_leading_and_trailing_slash = []
- self.sfh5['/1.2/instrument/'].visit(name_list_leading_and_trailing_slash.append)
- name_list_leading_slash = []
- self.sfh5['/1.2/instrument'].visit(name_list_leading_slash.append)
- name_list_trailing_slash = []
- self.sfh5['1.2/instrument/'].visit(name_list_trailing_slash.append)
- name_list_no_slash = []
- self.sfh5['1.2/instrument'].visit(name_list_no_slash.append)
-
- # no differences expected in the output names
- self.assertEqual(name_list_leading_and_trailing_slash,
- name_list_leading_slash)
- self.assertEqual(name_list_leading_slash,
- name_list_trailing_slash)
- self.assertEqual(name_list_leading_slash,
- name_list_no_slash)
- self.assertIn("positioners/Pslit HGap", name_list_no_slash)
- self.assertIn("positioners", name_list_no_slash)
-
- def testVisitItems(self):
- dataset_name_list = []
-
- def func_generator(l):
- """return a function appending names to list l"""
- def func(name, obj):
- if isinstance(obj, SpecH5Dataset):
- l.append(name)
- return func
-
- self.sfh5.visititems(func_generator(dataset_name_list))
- self.assertIn('1.2/instrument/positioners/Pslit HGap', dataset_name_list)
- self.assertEqual(len(dataset_name_list), 85)
-
- # test also visit of a subgroup, with various group name formats
- name_list_leading_and_trailing_slash = []
- self.sfh5['/1.2/instrument/'].visititems(func_generator(name_list_leading_and_trailing_slash))
- name_list_leading_slash = []
- self.sfh5['/1.2/instrument'].visititems(func_generator(name_list_leading_slash))
- name_list_trailing_slash = []
- self.sfh5['1.2/instrument/'].visititems(func_generator(name_list_trailing_slash))
- name_list_no_slash = []
- self.sfh5['1.2/instrument'].visititems(func_generator(name_list_no_slash))
-
- # no differences expected in the output names
- self.assertEqual(name_list_leading_and_trailing_slash,
- name_list_leading_slash)
- self.assertEqual(name_list_leading_slash,
- name_list_trailing_slash)
- self.assertEqual(name_list_leading_slash,
- name_list_no_slash)
- self.assertIn("positioners/Pslit HGap", name_list_no_slash)
-
- def testNotSpecH5(self):
- fd, fname = tempfile.mkstemp()
- os.write(fd, b"Not a spec file!")
- os.close(fd)
- self.assertRaises(specfile.SfErrFileOpen, SpecH5, fname)
- self.assertRaises(IOError, SpecH5, fname)
- os.unlink(fname)
-
- def testSample(self):
- self.assertNotIn("sample", self.sfh5["/1.1"])
- self.assertIn("sample", self.sfh5["/1000.1"])
- self.assertIn("ub_matrix", self.sfh5["/1000.1/sample"])
- self.assertIn("unit_cell", self.sfh5["/1000.1/sample"])
- self.assertIn("unit_cell_abc", self.sfh5["/1000.1/sample"])
- self.assertIn("unit_cell_alphabetagamma", self.sfh5["/1000.1/sample"])
-
- # All 0 values
- self.assertNotIn("sample", self.sfh5["/1001.1"])
- with self.assertRaises(KeyError):
- self.sfh5["/1001.1/sample/unit_cell"]
-
- @testutils.test_logging(spech5.logger1.name, warning=2)
- def testOpenFileDescriptor(self):
- """Open a SpecH5 file from a file descriptor"""
- with io.open(self.sfh5.filename) as f:
- sfh5 = SpecH5(f)
- self.assertIsNotNone(sfh5)
- name_list = []
- # check if the object is working
- self.sfh5.visit(name_list.append)
- sfh5.close()
-
-
-sftext_multi_mca_headers = """
-#S 1 aaaaaa
-#@MCA %16C
-#@CHANN 3 0 2 1
-#@CALIB 1 2 3
-#@CTIME 123.4 234.5 345.6
-#@MCA %16C
-#@CHANN 3 1 3 1
-#@CALIB 5.5 6.6 7.7
-#@CTIME 10 11 12
-#N 3
-#L uno duo
-1 2
-@A 0 1 2
-@A 10 9 8
-3 4
-@A 3.1 4 5
-@A 7 6 5
-5 6
-@A 6 7.7 8
-@A 4 3 2
-
-"""
-
-
-class TestSpecH5MultiMca(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- fd, cls.fname = tempfile.mkstemp(text=False)
- if sys.version_info < (3, ):
- os.write(fd, sftext_multi_mca_headers)
- else:
- os.write(fd, bytes(sftext_multi_mca_headers, 'ascii'))
- os.close(fd)
-
- @classmethod
- def tearDownClass(cls):
- os.unlink(cls.fname)
-
- def setUp(self):
- self.sfh5 = SpecH5(self.fname)
-
- def tearDown(self):
- self.sfh5.close()
-
- def testMcaCalib(self):
- mca0_calib = self.sfh5["/1.1/measurement/mca_0/info/calibration"]
- mca1_calib = self.sfh5["/1.1/measurement/mca_1/info/calibration"]
- self.assertEqual(mca0_calib.tolist(),
- [1, 2, 3])
- self.assertAlmostEqual(sum(mca1_calib.tolist()),
- sum([5.5, 6.6, 7.7]),
- places=5)
-
- def testMcaChannels(self):
- mca0_chann = self.sfh5["/1.1/measurement/mca_0/info/channels"]
- mca1_chann = self.sfh5["/1.1/measurement/mca_1/info/channels"]
- self.assertEqual(mca0_chann.tolist(),
- [0., 1., 2.])
- # @CHANN is unique in this scan and applies to all analysers
- self.assertEqual(mca1_chann.tolist(),
- [1., 2., 3.])
-
- def testMcaCtime(self):
- """Tests for #@CTIME mca header"""
- mca0_preset_time = self.sfh5["/1.1/instrument/mca_0/preset_time"]
- mca1_preset_time = self.sfh5["/1.1/instrument/mca_1/preset_time"]
- self.assertLess(mca0_preset_time - 123.4,
- 10**-5)
- self.assertLess(mca1_preset_time - 10,
- 10**-5)
-
- mca0_live_time = self.sfh5["/1.1/instrument/mca_0/live_time"]
- mca1_live_time = self.sfh5["/1.1/instrument/mca_1/live_time"]
- self.assertLess(mca0_live_time - 234.5,
- 10**-5)
- self.assertLess(mca1_live_time - 11,
- 10**-5)
-
- mca0_elapsed_time = self.sfh5["/1.1/instrument/mca_0/elapsed_time"]
- mca1_elapsed_time = self.sfh5["/1.1/instrument/mca_1/elapsed_time"]
- self.assertLess(mca0_elapsed_time - 345.6,
- 10**-5)
- self.assertLess(mca1_elapsed_time - 12,
- 10**-5)
-
-
-sftext_no_cols = r"""#F C:/DATA\test.mca
-#D Thu Jul 7 08:40:19 2016
-
-#S 1 31oct98.dat 22.1 If4
-#D Thu Jul 7 08:40:19 2016
-#C no data cols, one mca analyser, single spectrum
-#@MCA %16C
-#@CHANN 151 0 150 1
-#@CALIB 0 2 0
-@A 789 784 788 814 847 862 880 904 925 955 987 1015 1031 1070 1111 1139 \
-1203 1236 1290 1392 1492 1558 1688 1813 1977 2119 2346 2699 3121 3542 4102 4970 \
-6071 7611 10426 16188 28266 40348 50539 55555 56162 54162 47102 35718 24588 17034 12994 11444 \
-11808 13461 15687 18885 23827 31578 41999 49556 58084 59415 59456 55698 44525 28219 17680 12881 \
-9518 7415 6155 5246 4646 3978 3612 3299 3020 2761 2670 2472 2500 2310 2286 2106 \
-1989 1890 1782 1655 1421 1293 1135 990 879 757 672 618 532 488 445 424 \
-414 373 351 325 307 284 270 247 228 213 199 187 183 176 164 156 \
-153 140 142 130 118 118 103 101 97 86 90 86 87 81 75 82 \
-80 76 77 75 76 77 62 69 74 60 65 68 65 58 63 64 \
-63 59 60 56 57 60 55
-
-#S 2 31oct98.dat 22.1 If4
-#D Thu Jul 7 08:40:19 2016
-#C no data cols, one mca analyser, multiple spectra
-#@MCA %16C
-#@CHANN 3 0 2 1
-#@CALIB 1 2 3
-#@CTIME 123.4 234.5 345.6
-@A 0 1 2
-@A 10 9 8
-@A 1 1 1.1
-@A 3.1 4 5
-@A 7 6 5
-@A 1 1 1
-@A 6 7.7 8
-@A 4 3 2
-@A 1 1 1
-
-#S 3 31oct98.dat 22.1 If4
-#D Thu Jul 7 08:40:19 2016
-#C no data cols, 3 mca analysers, multiple spectra
-#@MCADEV 1
-#@MCA %16C
-#@CHANN 3 0 2 1
-#@CALIB 1 2 3
-#@CTIME 123.4 234.5 345.6
-#@MCADEV 2
-#@MCA %16C
-#@CHANN 3 0 2 1
-#@CALIB 1 2 3
-#@CTIME 123.4 234.5 345.6
-#@MCADEV 3
-#@MCA %16C
-#@CHANN 3 0 2 1
-#@CALIB 1 2 3
-#@CTIME 123.4 234.5 345.6
-@A 0 1 2
-@A 10 9 8
-@A 1 1 1.1
-@A 3.1 4 5
-@A 7 6 5
-@A 1 1 1
-@A 6 7.7 8
-@A 4 3 2
-@A 1 1 1
-"""
-
-
-class TestSpecH5NoDataCols(unittest.TestCase):
- """Test reading SPEC files with only MCA data"""
- @classmethod
- def setUpClass(cls):
- fd, cls.fname = tempfile.mkstemp()
- if sys.version_info < (3, ):
- os.write(fd, sftext_no_cols)
- else:
- os.write(fd, bytes(sftext_no_cols, 'ascii'))
- os.close(fd)
-
- @classmethod
- def tearDownClass(cls):
- os.unlink(cls.fname)
-
- def setUp(self):
- self.sfh5 = SpecH5(self.fname)
-
- def tearDown(self):
- self.sfh5.close()
-
- def testScan1(self):
- # 1.1: single analyser, single spectrum, 151 channels
- self.assertIn("mca_0",
- self.sfh5["1.1/instrument/"])
- self.assertEqual(self.sfh5["1.1/instrument/mca_0/data"].shape,
- (1, 151))
- self.assertNotIn("mca_1",
- self.sfh5["1.1/instrument/"])
-
- def testScan2(self):
- # 2.1: single analyser, 9 spectra, 3 channels
- self.assertIn("mca_0",
- self.sfh5["2.1/instrument/"])
- self.assertEqual(self.sfh5["2.1/instrument/mca_0/data"].shape,
- (9, 3))
- self.assertNotIn("mca_1",
- self.sfh5["2.1/instrument/"])
-
- def testScan3(self):
- # 3.1: 3 analysers, 3 spectra/analyser, 3 channels
- for i in range(3):
- self.assertIn("mca_%d" % i,
- self.sfh5["3.1/instrument/"])
- self.assertEqual(
- self.sfh5["3.1/instrument/mca_%d/data" % i].shape,
- (3, 3))
-
- self.assertNotIn("mca_3",
- self.sfh5["3.1/instrument/"])
-
-
-sf_text_slash = r"""#F /data/id09/archive/logspecfiles/laue/2016/scan_231_laue_16-11-29.dat
-#D Sat Dec 10 22:20:59 2016
-#O0 Pslit/HGap MRTSlit%UP
-
-#S 1 laue_16-11-29.log 231.1 PD3/A
-#D Sat Dec 10 22:20:59 2016
-#P0 180.005 -0.66875
-#N 2
-#L GONY/mm PD3%A
--2.015 5.250424e-05
--2.01 5.30798e-05
--2.005 5.281903e-05
--2 5.220436e-05
-"""
-
-
-class TestSpecH5SlashInLabels(unittest.TestCase):
- """Test reading SPEC files with labels containing a / character
-
- The / character must be substituted with a %
- """
- @classmethod
- def setUpClass(cls):
- fd, cls.fname = tempfile.mkstemp()
- if sys.version_info < (3, ):
- os.write(fd, sf_text_slash)
- else:
- os.write(fd, bytes(sf_text_slash, 'ascii'))
- os.close(fd)
-
- @classmethod
- def tearDownClass(cls):
- os.unlink(cls.fname)
-
- def setUp(self):
- self.sfh5 = SpecH5(self.fname)
-
- def tearDown(self):
- self.sfh5.close()
-
- def testLabels(self):
- """Ensure `/` is substituted with `%` and
- ensure legitimate `%` in names are still working"""
- self.assertEqual(list(self.sfh5["1.1/measurement/"].keys()),
- ["GONY%mm", "PD3%A"])
-
- # substituted "%"
- self.assertIn("GONY%mm",
- self.sfh5["1.1/measurement/"])
- self.assertNotIn("GONY/mm",
- self.sfh5["1.1/measurement/"])
- self.assertAlmostEqual(self.sfh5["1.1/measurement/GONY%mm"][0],
- -2.015, places=4)
- # legitimate "%"
- self.assertIn("PD3%A",
- self.sfh5["1.1/measurement/"])
-
- def testMotors(self):
- """Ensure `/` is substituted with `%` and
- ensure legitimate `%` in names are still working"""
- self.assertEqual(list(self.sfh5["1.1/instrument/positioners"].keys()),
- ["Pslit%HGap", "MRTSlit%UP"])
- # substituted "%"
- self.assertIn("Pslit%HGap",
- self.sfh5["1.1/instrument/positioners"])
- self.assertNotIn("Pslit/HGap",
- self.sfh5["1.1/instrument/positioners"])
- self.assertAlmostEqual(
- self.sfh5["1.1/instrument/positioners/Pslit%HGap"],
- 180.005, places=4)
- # legitimate "%"
- self.assertIn("MRTSlit%UP",
- self.sfh5["1.1/instrument/positioners"])
-
-
-def suite():
- test_suite = unittest.TestSuite()
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecH5))
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecDate))
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecH5MultiMca))
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecH5NoDataCols))
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestSpecH5SlashInLabels))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_spectoh5.py b/silx/io/test/test_spectoh5.py
deleted file mode 100644
index 903a62c..0000000
--- a/silx/io/test/test_spectoh5.py
+++ /dev/null
@@ -1,194 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2019 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for SpecFile to HDF5 converter"""
-
-from numpy import array_equal
-import os
-import sys
-import tempfile
-import unittest
-
-import h5py
-
-from ..spech5 import SpecH5, SpecH5Group
-from ..convert import convert, write_to_h5
-from ..utils import h5py_read_dataset
-
-__authors__ = ["P. Knobel"]
-__license__ = "MIT"
-__date__ = "12/02/2018"
-
-
-sfdata = b"""#F /tmp/sf.dat
-#E 1455180875
-#D Thu Feb 11 09:54:35 2016
-#C imaging User = opid17
-#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
-#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
-#o0 pshg mrtu mrtd
-#o2 ss1vo ss1ho ss1vg
-
-#J0 Seconds IA ion.mono Current
-#J1 xbpmc2 idgap1 Inorm
-
-#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
-#D Thu Feb 11 09:55:20 2016
-#T 0.2 (Seconds)
-#P0 180.005 -0.66875 0.87125
-#P1 14.74255 16.197579 12.238283
-#N 4
-#L MRTSlit UP second column 3rd_col
--1.23 5.89 8
-8.478100E+01 5 1.56
-3.14 2.73 -3.14
-1.2 2.3 3.4
-
-#S 1 aaaaaa
-#D Thu Feb 11 10:00:32 2016
-#@MCADEV 1
-#@MCA %16C
-#@CHANN 3 0 2 1
-#@CALIB 1 2 3
-#N 3
-#L uno duo
-1 2
-@A 0 1 2
-@A 10 9 8
-3 4
-@A 3.1 4 5
-@A 7 6 5
-5 6
-@A 6 7.7 8
-@A 4 3 2
-"""
-
-
-class TestConvertSpecHDF5(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- fd, cls.spec_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5")
- os.write(fd, sfdata)
- os.close(fd)
-
- fd, cls.h5_fname = tempfile.mkstemp(prefix="TestConvertSpecHDF5")
- # Close and delete (we just need the name)
- os.close(fd)
- os.unlink(cls.h5_fname)
-
- @classmethod
- def tearDownClass(cls):
- os.unlink(cls.spec_fname)
-
- def setUp(self):
- convert(self.spec_fname, self.h5_fname)
-
- self.sfh5 = SpecH5(self.spec_fname)
- self.h5f = h5py.File(self.h5_fname, "a")
-
- def tearDown(self):
- self.h5f.close()
- self.sfh5.close()
- os.unlink(self.h5_fname)
-
- def testAppendToHDF5(self):
- write_to_h5(self.sfh5, self.h5f, h5path="/foo/bar/spam")
- self.assertTrue(
- array_equal(self.h5f["/1.2/measurement/mca_1/data"],
- self.h5f["/foo/bar/spam/1.2/measurement/mca_1/data"])
- )
-
- def testWriteSpecH5Group(self):
- """Test passing a SpecH5Group as parameter, instead of a Spec filename
- or a SpecH5."""
- g = self.sfh5["1.1/instrument"]
- self.assertIsInstance(g, SpecH5Group) # let's be paranoid
- write_to_h5(g, self.h5f, h5path="my instruments")
-
- self.assertAlmostEqual(self.h5f["my instruments/positioners/Sslit1 HOff"][tuple()],
- 16.197579, places=4)
-
- def testTitle(self):
- """Test the value of a dataset"""
- title12 = h5py_read_dataset(self.h5f["/1.2/title"])
- self.assertEqual(title12,
- u"aaaaaa")
-
- def testAttrs(self):
- # Test root group (file) attributes
- self.assertEqual(self.h5f.attrs["NX_class"],
- u"NXroot")
- # Test dataset attributes
- ds = self.h5f["/1.2/instrument/mca_1/data"]
- self.assertTrue("interpretation" in ds.attrs)
- self.assertEqual(list(ds.attrs.values()),
- [u"spectrum"])
- # Test group attributes
- grp = self.h5f["1.1"]
- self.assertEqual(grp.attrs["NX_class"],
- u"NXentry")
- self.assertEqual(len(list(grp.attrs.keys())),
- 1)
-
- def testHdf5HasSameMembers(self):
- spec_member_list = []
-
- def append_spec_members(name):
- spec_member_list.append(name)
- self.sfh5.visit(append_spec_members)
-
- hdf5_member_list = []
-
- def append_hdf5_members(name):
- hdf5_member_list.append(name)
- self.h5f.visit(append_hdf5_members)
-
- # 1. For some reason, h5py visit method doesn't include the leading
- # "/" character when it passes the member name to the function,
- # even though an explicit the .name attribute of a member will
- # have a leading "/"
- spec_member_list = [m.lstrip("/") for m in spec_member_list]
-
- self.assertEqual(set(hdf5_member_list),
- set(spec_member_list))
-
- def testLinks(self):
- self.assertTrue(
- array_equal(self.sfh5["/1.2/measurement/mca_0/data"],
- self.h5f["/1.2/measurement/mca_0/data"])
- )
- self.assertTrue(
- array_equal(self.h5f["/1.2/instrument/mca_1/channels"],
- self.h5f["/1.2/measurement/mca_1/info/channels"])
- )
-
-
-def suite():
- test_suite = unittest.TestSuite()
- test_suite.addTest(
- unittest.defaultTestLoader.loadTestsFromTestCase(TestConvertSpecHDF5))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")
diff --git a/silx/io/test/test_url.py b/silx/io/test/test_url.py
deleted file mode 100644
index 114f6a7..0000000
--- a/silx/io/test/test_url.py
+++ /dev/null
@@ -1,228 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for url module"""
-
-__authors__ = ["V. Valls"]
-__license__ = "MIT"
-__date__ = "29/01/2018"
-
-
-import unittest
-from ..url import DataUrl
-
-
-class TestDataUrl(unittest.TestCase):
-
- def assertUrl(self, url, expected):
- self.assertEqual(url.is_valid(), expected[0])
- self.assertEqual(url.is_absolute(), expected[1])
- self.assertEqual(url.scheme(), expected[2])
- self.assertEqual(url.file_path(), expected[3])
- self.assertEqual(url.data_path(), expected[4])
- self.assertEqual(url.data_slice(), expected[5])
-
- def test_fabio_absolute(self):
- url = DataUrl("fabio:///data/image.edf?slice=2")
- expected = [True, True, "fabio", "/data/image.edf", None, (2, )]
- self.assertUrl(url, expected)
-
- def test_fabio_absolute_windows(self):
- url = DataUrl("fabio:///C:/data/image.edf?slice=2")
- expected = [True, True, "fabio", "C:/data/image.edf", None, (2, )]
- self.assertUrl(url, expected)
-
- def test_silx_absolute(self):
- url = DataUrl("silx:///data/image.h5?path=/data/dataset&slice=1,5")
- expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)]
- self.assertUrl(url, expected)
-
- def test_commandline_shell_separator(self):
- url = DataUrl("silx:///data/image.h5::path=/data/dataset&slice=1,5")
- expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)]
- self.assertUrl(url, expected)
-
- def test_silx_absolute2(self):
- url = DataUrl("silx:///data/image.edf?/scan_0/detector/data")
- expected = [True, True, "silx", "/data/image.edf", "/scan_0/detector/data", None]
- self.assertUrl(url, expected)
-
- def test_silx_absolute_windows(self):
- url = DataUrl("silx:///C:/data/image.h5?/scan_0/detector/data")
- expected = [True, True, "silx", "C:/data/image.h5", "/scan_0/detector/data", None]
- self.assertUrl(url, expected)
-
- def test_silx_relative(self):
- url = DataUrl("silx:./image.h5")
- expected = [True, False, "silx", "./image.h5", None, None]
- self.assertUrl(url, expected)
-
- def test_fabio_relative(self):
- url = DataUrl("fabio:./image.edf")
- expected = [True, False, "fabio", "./image.edf", None, None]
- self.assertUrl(url, expected)
-
- def test_silx_relative2(self):
- url = DataUrl("silx:image.h5")
- expected = [True, False, "silx", "image.h5", None, None]
- self.assertUrl(url, expected)
-
- def test_fabio_relative2(self):
- url = DataUrl("fabio:image.edf")
- expected = [True, False, "fabio", "image.edf", None, None]
- self.assertUrl(url, expected)
-
- def test_file_relative(self):
- url = DataUrl("image.edf")
- expected = [True, False, None, "image.edf", None, None]
- self.assertUrl(url, expected)
-
- def test_file_relative2(self):
- url = DataUrl("./foo/bar/image.edf")
- expected = [True, False, None, "./foo/bar/image.edf", None, None]
- self.assertUrl(url, expected)
-
- def test_file_relative3(self):
- url = DataUrl("foo/bar/image.edf")
- expected = [True, False, None, "foo/bar/image.edf", None, None]
- self.assertUrl(url, expected)
-
- def test_file_absolute(self):
- url = DataUrl("/data/image.edf")
- expected = [True, True, None, "/data/image.edf", None, None]
- self.assertUrl(url, expected)
-
- def test_file_absolute_windows(self):
- url = DataUrl("C:/data/image.edf")
- expected = [True, True, None, "C:/data/image.edf", None, None]
- self.assertUrl(url, expected)
-
- def test_absolute_with_path(self):
- url = DataUrl("/foo/foobar.h5?/foo/bar")
- expected = [True, True, None, "/foo/foobar.h5", "/foo/bar", None]
- self.assertUrl(url, expected)
-
- def test_windows_file_data_slice(self):
- url = DataUrl("C:/foo/foobar.h5?path=/foo/bar&slice=5,1")
- expected = [True, True, None, "C:/foo/foobar.h5", "/foo/bar", (5, 1)]
- self.assertUrl(url, expected)
-
- def test_scheme_file_data_slice(self):
- url = DataUrl("silx:/foo/foobar.h5?path=/foo/bar&slice=5,1")
- expected = [True, True, "silx", "/foo/foobar.h5", "/foo/bar", (5, 1)]
- self.assertUrl(url, expected)
-
- def test_scheme_windows_file_data_slice(self):
- url = DataUrl("silx:C:/foo/foobar.h5?path=/foo/bar&slice=5,1")
- expected = [True, True, "silx", "C:/foo/foobar.h5", "/foo/bar", (5, 1)]
- self.assertUrl(url, expected)
-
- def test_empty(self):
- url = DataUrl("")
- expected = [False, False, None, "", None, None]
- self.assertUrl(url, expected)
-
- def test_unknown_scheme(self):
- url = DataUrl("foo:/foo/foobar.h5?path=/foo/bar&slice=5,1")
- expected = [False, True, "foo", "/foo/foobar.h5", "/foo/bar", (5, 1)]
- self.assertUrl(url, expected)
-
- def test_slice(self):
- url = DataUrl("/a.h5?path=/b&slice=5,1")
- expected = [True, True, None, "/a.h5", "/b", (5, 1)]
- self.assertUrl(url, expected)
-
- def test_slice2(self):
- url = DataUrl("/a.h5?path=/b&slice=2:5")
- expected = [True, True, None, "/a.h5", "/b", (slice(2, 5),)]
- self.assertUrl(url, expected)
-
- def test_slice3(self):
- url = DataUrl("/a.h5?path=/b&slice=::2")
- expected = [True, True, None, "/a.h5", "/b", (slice(None, None, 2),)]
- self.assertUrl(url, expected)
-
- def test_slice_ellipsis(self):
- url = DataUrl("/a.h5?path=/b&slice=...")
- expected = [True, True, None, "/a.h5", "/b", (Ellipsis, )]
- self.assertUrl(url, expected)
-
- def test_slice_slicing(self):
- url = DataUrl("/a.h5?path=/b&slice=:")
- expected = [True, True, None, "/a.h5", "/b", (slice(None), )]
- self.assertUrl(url, expected)
-
- def test_slice_missing_element(self):
- url = DataUrl("/a.h5?path=/b&slice=5,,1")
- expected = [False, True, None, "/a.h5", "/b", None]
- self.assertUrl(url, expected)
-
- def test_slice_no_elements(self):
- url = DataUrl("/a.h5?path=/b&slice=")
- expected = [False, True, None, "/a.h5", "/b", None]
- self.assertUrl(url, expected)
-
- def test_create_relative_url(self):
- url = DataUrl(scheme="silx", file_path="./foo.h5", data_path="/", data_slice=(5, 1))
- self.assertFalse(url.is_absolute())
- url2 = DataUrl(url.path())
- self.assertEqual(url, url2)
-
- def test_create_absolute_url(self):
- url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1))
- url2 = DataUrl(url.path())
- self.assertEqual(url, url2)
-
- def test_create_absolute_windows_url(self):
- url = DataUrl(scheme="silx", file_path="C:/foo.h5", data_path="/", data_slice=(5, 1))
- url2 = DataUrl(url.path())
- self.assertEqual(url, url2)
-
- def test_create_slice_url(self):
- url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1, Ellipsis, slice(None)))
- url2 = DataUrl(url.path())
- self.assertEqual(url, url2)
-
- def test_wrong_url(self):
- url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=(5, 1))
- self.assertFalse(url.is_valid())
-
- def test_path_creation(self):
- """make sure the construction of path succeed and that we can
- recreate a DataUrl from a path"""
- for data_slice in (1, (1,)):
- with self.subTest(data_slice=data_slice):
- url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=data_slice)
- path = url.path()
- DataUrl(path=path)
-
-
-def suite():
- test_suite = unittest.TestSuite()
- loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
- test_suite.addTest(loadTests(TestDataUrl))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest='suite')
diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py
deleted file mode 100644
index 13ab532..0000000
--- a/silx/io/test/test_utils.py
+++ /dev/null
@@ -1,888 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2019 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for utils module"""
-
-import io
-import numpy
-import os
-import re
-import shutil
-import tempfile
-import unittest
-import sys
-
-from .. import utils
-from ..._version import calc_hexversion
-import silx.io.url
-
-import h5py
-from ..utils import h5ls
-
-import fabio
-
-__authors__ = ["P. Knobel"]
-__license__ = "MIT"
-__date__ = "03/12/2020"
-
-expected_spec1 = r"""#F .*
-#D .*
-
-#S 1 Ordinate1
-#D .*
-#N 2
-#L Abscissa Ordinate1
-1 4\.00
-2 5\.00
-3 6\.00
-"""
-
-expected_spec2 = expected_spec1 + r"""
-#S 2 Ordinate2
-#D .*
-#N 2
-#L Abscissa Ordinate2
-1 7\.00
-2 8\.00
-3 9\.00
-"""
-
-expected_spec2reg = r"""#F .*
-#D .*
-
-#S 1 Ordinate1
-#D .*
-#N 3
-#L Abscissa Ordinate1 Ordinate2
-1 4\.00 7\.00
-2 5\.00 8\.00
-3 6\.00 9\.00
-"""
-
-expected_spec2irr = expected_spec1 + r"""
-#S 2 Ordinate2
-#D .*
-#N 2
-#L Abscissa Ordinate2
-1 7\.00
-2 8\.00
-"""
-
-expected_csv = r"""Abscissa;Ordinate1;Ordinate2
-1;4\.00;7\.00e\+00
-2;5\.00;8\.00e\+00
-3;6\.00;9\.00e\+00
-"""
-
-expected_csv2 = r"""x;y0;y1
-1;4\.00;7\.00e\+00
-2;5\.00;8\.00e\+00
-3;6\.00;9\.00e\+00
-"""
-
-
-class TestSave(unittest.TestCase):
- """Test saving curves as SpecFile:
- """
-
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
- self.csv_fname = os.path.join(self.tempdir, "savecsv.csv")
- self.npy_fname = os.path.join(self.tempdir, "savenpy.npy")
-
- self.x = [1, 2, 3]
- self.xlab = "Abscissa"
- self.y = [[4, 5, 6], [7, 8, 9]]
- self.y_irr = [[4, 5, 6], [7, 8]]
- self.ylabs = ["Ordinate1", "Ordinate2"]
-
- def tearDown(self):
- if os.path.isfile(self.spec_fname):
- os.unlink(self.spec_fname)
- if os.path.isfile(self.csv_fname):
- os.unlink(self.csv_fname)
- if os.path.isfile(self.npy_fname):
- os.unlink(self.npy_fname)
- shutil.rmtree(self.tempdir)
-
- def test_save_csv(self):
- utils.save1D(self.csv_fname, self.x, self.y,
- xlabel=self.xlab, ylabels=self.ylabs,
- filetype="csv", fmt=["%d", "%.2f", "%.2e"],
- csvdelim=";", autoheader=True)
-
- csvf = open(self.csv_fname)
- actual_csv = csvf.read()
- csvf.close()
-
- self.assertRegex(actual_csv, expected_csv)
-
- def test_save_npy(self):
- """npy file is saved with numpy.save after building a numpy array
- and converting it to a named record array"""
- npyf = open(self.npy_fname, "wb")
- utils.save1D(npyf, self.x, self.y,
- xlabel=self.xlab, ylabels=self.ylabs)
- npyf.close()
-
- npy_recarray = numpy.load(self.npy_fname)
-
- self.assertEqual(npy_recarray.shape, (3,))
- self.assertTrue(numpy.array_equal(npy_recarray['Ordinate1'],
- numpy.array((4, 5, 6))))
-
- def test_savespec_filename(self):
- """Save SpecFile using savespec()"""
- utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab,
- ylabel=self.ylabs[0], fmt=["%d", "%.2f"],
- close_file=True, scan_number=1)
-
- specf = open(self.spec_fname)
- actual_spec = specf.read()
- specf.close()
- self.assertRegex(actual_spec, expected_spec1)
-
- def test_savespec_file_handle(self):
- """Save SpecFile using savespec(), passing a file handle"""
- # first savespec: open, write file header, save y[0] as scan 1,
- # return file handle
- specf = utils.savespec(self.spec_fname, self.x, self.y[0],
- xlabel=self.xlab, ylabel=self.ylabs[0],
- fmt=["%d", "%.2f"], close_file=False)
-
- # second savespec: save y[1] as scan 2, close file
- utils.savespec(specf, self.x, self.y[1], xlabel=self.xlab,
- ylabel=self.ylabs[1], fmt=["%d", "%.2f"],
- write_file_header=False, close_file=True,
- scan_number=2)
-
- specf = open(self.spec_fname)
- actual_spec = specf.read()
- specf.close()
- self.assertRegex(actual_spec, expected_spec2)
-
- def test_save_spec_reg(self):
- """Save SpecFile using save() on a regular pattern"""
- utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab,
- ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
-
- specf = open(self.spec_fname)
- actual_spec = specf.read()
- specf.close()
-
- self.assertRegex(actual_spec, expected_spec2reg)
-
- def test_save_spec_irr(self):
- """Save SpecFile using save() on an irregular pattern"""
- # invalid test case ?!
- return
- utils.save1D(self.spec_fname, self.x, self.y_irr, xlabel=self.xlab,
- ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
-
- specf = open(self.spec_fname)
- actual_spec = specf.read()
- specf.close()
- self.assertRegex(actual_spec, expected_spec2irr)
-
- def test_save_csv_no_labels(self):
- """Save csv using save(), with autoheader=True but
- xlabel=None and ylabels=None
- This is a non-regression test for bug #223"""
- self.tempdir = tempfile.mkdtemp()
- self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
- self.csv_fname = os.path.join(self.tempdir, "savecsv.csv")
- self.npy_fname = os.path.join(self.tempdir, "savenpy.npy")
-
- self.x = [1, 2, 3]
- self.xlab = "Abscissa"
- self.y = [[4, 5, 6], [7, 8, 9]]
- self.ylabs = ["Ordinate1", "Ordinate2"]
- utils.save1D(self.csv_fname, self.x, self.y,
- autoheader=True, fmt=["%d", "%.2f", "%.2e"])
-
- csvf = open(self.csv_fname)
- actual_csv = csvf.read()
- csvf.close()
- self.assertRegex(actual_csv, expected_csv2)
-
-
-def assert_match_any_string_in_list(test, pattern, list_of_strings):
- for string_ in list_of_strings:
- if re.match(pattern, string_):
- return True
- return False
-
-
-class TestH5Ls(unittest.TestCase):
- """Test displaying the following HDF5 file structure:
-
- +foo
- +bar
- <HDF5 dataset "spam": shape (2, 2), type "<i8">
- <HDF5 dataset "tmp": shape (3,), type "<i8">
- <HDF5 dataset "data": shape (1,), type "<f8">
-
- """
-
- def assertMatchAnyStringInList(self, pattern, list_of_strings):
- for string_ in list_of_strings:
- if re.match(pattern, string_):
- return None
- raise AssertionError("regex pattern %s does not match any" % pattern +
- " string in list " + str(list_of_strings))
-
- def testHdf5(self):
- fd, self.h5_fname = tempfile.mkstemp(text=False)
- # Close and delete (we just want the name)
- os.close(fd)
- os.unlink(self.h5_fname)
- self.h5f = h5py.File(self.h5_fname, "w")
- self.h5f["/foo/bar/tmp"] = [1, 2, 3]
- self.h5f["/foo/bar/spam"] = [[1, 2], [3, 4]]
- self.h5f["/foo/data"] = [3.14]
- self.h5f.close()
-
- rep = h5ls(self.h5_fname)
- lines = rep.split("\n")
-
- self.assertIn("+foo", lines)
- self.assertIn("\t+bar", lines)
-
- match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">'
- self.assertMatchAnyStringInList(match, lines)
- match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">'
- self.assertMatchAnyStringInList(match, lines)
- match = r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">'
- self.assertMatchAnyStringInList(match, lines)
-
- os.unlink(self.h5_fname)
-
- # Following test case disabled d/t errors on AppVeyor:
- # os.unlink(spec_fname)
- # PermissionError: [WinError 32] The process cannot access the file because
- # it is being used by another process: 'C:\\...\\savespec.dat'
-
- # def testSpec(self):
- # tempdir = tempfile.mkdtemp()
- # spec_fname = os.path.join(tempdir, "savespec.dat")
- #
- # x = [1, 2, 3]
- # xlab = "Abscissa"
- # y = [[4, 5, 6], [7, 8, 9]]
- # ylabs = ["Ordinate1", "Ordinate2"]
- # utils.save1D(spec_fname, x, y, xlabel=xlab,
- # ylabels=ylabs, filetype="spec",
- # fmt=["%d", "%.2f"])
- #
- # rep = h5ls(spec_fname)
- # lines = rep.split("\n")
- # self.assertIn("+1.1", lines)
- # self.assertIn("\t+instrument", lines)
- #
- # self.assertMatchAnyStringInList(
- # r'\t\t\t<SPEC dataset "file_header": shape \(\), type "|S60">',
- # lines)
- # self.assertMatchAnyStringInList(
- # r'\t\t<SPEC dataset "Ordinate1": shape \(3L?,\), type "<f4">',
- # lines)
- #
- # os.unlink(spec_fname)
- # shutil.rmtree(tempdir)
-
-
-class TestOpen(unittest.TestCase):
- """Test `silx.io.utils.open` function."""
-
- @classmethod
- def setUpClass(cls):
- cls.tmp_directory = tempfile.mkdtemp()
- cls.createResources(cls.tmp_directory)
-
- @classmethod
- def createResources(cls, directory):
-
- cls.h5_filename = os.path.join(directory, "test.h5")
- h5 = h5py.File(cls.h5_filename, mode="w")
- h5["group/group/dataset"] = 50
- h5.close()
-
- cls.spec_filename = os.path.join(directory, "test.dat")
- utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
- fmt=["%d", "%.2f"], close_file=True, scan_number=1)
-
- cls.edf_filename = os.path.join(directory, "test.edf")
- header = fabio.fabioimage.OrderedDict()
- header["integer"] = "10"
- data = numpy.array([[10, 50], [50, 10]])
- fabiofile = fabio.edfimage.EdfImage(data, header)
- fabiofile.write(cls.edf_filename)
-
- cls.txt_filename = os.path.join(directory, "test.txt")
- f = io.open(cls.txt_filename, "w+t")
- f.write(u"Kikoo")
- f.close()
-
- cls.missing_filename = os.path.join(directory, "test.missing")
-
- @classmethod
- def tearDownClass(cls):
- shutil.rmtree(cls.tmp_directory)
-
- def testH5(self):
- f = utils.open(self.h5_filename)
- self.assertIsNotNone(f)
- self.assertIsInstance(f, h5py.File)
- f.close()
-
- def testH5With(self):
- with utils.open(self.h5_filename) as f:
- self.assertIsNotNone(f)
- self.assertIsInstance(f, h5py.File)
-
- def testH5_withPath(self):
- f = utils.open(self.h5_filename + "::/group/group/dataset")
- self.assertIsNotNone(f)
- self.assertEqual(f.h5py_class, h5py.Dataset)
- self.assertEqual(f[()], 50)
- f.close()
-
- def testH5With_withPath(self):
- with utils.open(self.h5_filename + "::/group/group") as f:
- self.assertIsNotNone(f)
- self.assertEqual(f.h5py_class, h5py.Group)
- self.assertIn("dataset", f)
-
- def testSpec(self):
- f = utils.open(self.spec_filename)
- self.assertIsNotNone(f)
- self.assertEqual(f.h5py_class, h5py.File)
- f.close()
-
- def testSpecWith(self):
- with utils.open(self.spec_filename) as f:
- self.assertIsNotNone(f)
- self.assertEqual(f.h5py_class, h5py.File)
-
- def testEdf(self):
- f = utils.open(self.edf_filename)
- self.assertIsNotNone(f)
- self.assertEqual(f.h5py_class, h5py.File)
- f.close()
-
- def testEdfWith(self):
- with utils.open(self.edf_filename) as f:
- self.assertIsNotNone(f)
- self.assertEqual(f.h5py_class, h5py.File)
-
- def testUnsupported(self):
- self.assertRaises(IOError, utils.open, self.txt_filename)
-
- def testNotExists(self):
- # load it
- self.assertRaises(IOError, utils.open, self.missing_filename)
-
- def test_silx_scheme(self):
- url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/")
- with utils.open(url.path()) as f:
- self.assertIsNotNone(f)
- self.assertTrue(silx.io.utils.is_file(f))
-
- def test_fabio_scheme(self):
- url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename)
- self.assertRaises(IOError, utils.open, url.path())
-
- def test_bad_url(self):
- url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename)
- self.assertRaises(IOError, utils.open, url.path())
-
- def test_sliced_url(self):
- url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,))
- self.assertRaises(IOError, utils.open, url.path())
-
-
-class TestNodes(unittest.TestCase):
- """Test `silx.io.utils.is_` functions."""
-
- def test_real_h5py_objects(self):
- name = tempfile.mktemp(suffix=".h5")
- try:
- with h5py.File(name, "w") as h5file:
- h5group = h5file.create_group("arrays")
- h5dataset = h5group.create_dataset("scalar", data=10)
-
- self.assertTrue(utils.is_file(h5file))
- self.assertTrue(utils.is_group(h5file))
- self.assertFalse(utils.is_dataset(h5file))
-
- self.assertFalse(utils.is_file(h5group))
- self.assertTrue(utils.is_group(h5group))
- self.assertFalse(utils.is_dataset(h5group))
-
- self.assertFalse(utils.is_file(h5dataset))
- self.assertFalse(utils.is_group(h5dataset))
- self.assertTrue(utils.is_dataset(h5dataset))
- finally:
- os.unlink(name)
-
- def test_h5py_like_file(self):
-
- class Foo(object):
-
- def __init__(self):
- self.h5_class = utils.H5Type.FILE
-
- obj = Foo()
- self.assertTrue(utils.is_file(obj))
- self.assertTrue(utils.is_group(obj))
- self.assertFalse(utils.is_dataset(obj))
-
- def test_h5py_like_group(self):
-
- class Foo(object):
-
- def __init__(self):
- self.h5_class = utils.H5Type.GROUP
-
- obj = Foo()
- self.assertFalse(utils.is_file(obj))
- self.assertTrue(utils.is_group(obj))
- self.assertFalse(utils.is_dataset(obj))
-
- def test_h5py_like_dataset(self):
-
- class Foo(object):
-
- def __init__(self):
- self.h5_class = utils.H5Type.DATASET
-
- obj = Foo()
- self.assertFalse(utils.is_file(obj))
- self.assertFalse(utils.is_group(obj))
- self.assertTrue(utils.is_dataset(obj))
-
- def test_bad(self):
-
- class Foo(object):
-
- def __init__(self):
- pass
-
- obj = Foo()
- self.assertFalse(utils.is_file(obj))
- self.assertFalse(utils.is_group(obj))
- self.assertFalse(utils.is_dataset(obj))
-
- def test_bad_api(self):
-
- class Foo(object):
-
- def __init__(self):
- self.h5_class = int
-
- obj = Foo()
- self.assertFalse(utils.is_file(obj))
- self.assertFalse(utils.is_group(obj))
- self.assertFalse(utils.is_dataset(obj))
-
-
-class TestGetData(unittest.TestCase):
- """Test `silx.io.utils.get_data` function."""
-
- @classmethod
- def setUpClass(cls):
- cls.tmp_directory = tempfile.mkdtemp()
- cls.createResources(cls.tmp_directory)
-
- @classmethod
- def createResources(cls, directory):
-
- cls.h5_filename = os.path.join(directory, "test.h5")
- h5 = h5py.File(cls.h5_filename, mode="w")
- h5["group/group/scalar"] = 50
- h5["group/group/array"] = [1, 2, 3, 4, 5]
- h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
- h5.close()
-
- cls.spec_filename = os.path.join(directory, "test.dat")
- utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
- fmt=["%d", "%.2f"], close_file=True, scan_number=1)
-
- cls.edf_filename = os.path.join(directory, "test.edf")
- cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf")
- header = fabio.fabioimage.OrderedDict()
- header["integer"] = "10"
- data = numpy.array([[10, 50], [50, 10]])
- fabiofile = fabio.edfimage.EdfImage(data, header)
- fabiofile.write(cls.edf_filename)
- fabiofile.append_frame(data=data, header=header)
- fabiofile.write(cls.edf_multiframe_filename)
-
- cls.txt_filename = os.path.join(directory, "test.txt")
- f = io.open(cls.txt_filename, "w+t")
- f.write(u"Kikoo")
- f.close()
-
- cls.missing_filename = os.path.join(directory, "test.missing")
-
- @classmethod
- def tearDownClass(cls):
- shutil.rmtree(cls.tmp_directory)
-
- def test_hdf5_scalar(self):
- url = "silx:%s?/group/group/scalar" % self.h5_filename
- data = utils.get_data(url=url)
- self.assertEqual(data, 50)
-
- def test_hdf5_array(self):
- url = "silx:%s?/group/group/array" % self.h5_filename
- data = utils.get_data(url=url)
- self.assertEqual(data.shape, (5,))
- self.assertEqual(data[0], 1)
-
- def test_hdf5_array_slice(self):
- url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename
- data = utils.get_data(url=url)
- self.assertEqual(data.shape, (5,))
- self.assertEqual(data[0], 6)
-
- def test_hdf5_array_slice_out_of_range(self):
- url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename
- # ValueError: h5py 2.x
- # IndexError: h5py 3.x
- self.assertRaises((ValueError, IndexError), utils.get_data, url)
-
- def test_edf_using_silx(self):
- url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename
- data = utils.get_data(url=url)
- self.assertEqual(data.shape, (2, 2))
- self.assertEqual(data[0, 0], 10)
-
- def test_fabio_frame(self):
- url = "fabio:%s?slice=1" % self.edf_multiframe_filename
- data = utils.get_data(url=url)
- self.assertEqual(data.shape, (2, 2))
- self.assertEqual(data[0, 0], 10)
-
- def test_fabio_singleframe(self):
- url = "fabio:%s?slice=0" % self.edf_filename
- data = utils.get_data(url=url)
- self.assertEqual(data.shape, (2, 2))
- self.assertEqual(data[0, 0], 10)
-
- def test_fabio_too_much_frames(self):
- url = "fabio:%s?slice=..." % self.edf_multiframe_filename
- self.assertRaises(ValueError, utils.get_data, url)
-
- def test_fabio_no_frame(self):
- url = "fabio:%s" % self.edf_filename
- data = utils.get_data(url=url)
- self.assertEqual(data.shape, (2, 2))
- self.assertEqual(data[0, 0], 10)
-
- def test_unsupported_scheme(self):
- url = "foo:/foo/bar"
- self.assertRaises(ValueError, utils.get_data, url)
-
- def test_no_scheme(self):
- url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename
- self.assertRaises((ValueError, IOError), utils.get_data, url)
-
- def test_file_not_exists(self):
- url = "silx:/foo/bar"
- self.assertRaises(IOError, utils.get_data, url)
-
-
-def _h5_py_version_older_than(version):
- v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split('.')[:3]]
- r_majeur, r_mineur, r_micro = [int(i) for i in version.split('.')]
- return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion(r_majeur, r_mineur, r_micro)
-
-
-@unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0')
-class TestRawFileToH5(unittest.TestCase):
- """Test conversion of .vol file to .h5 external dataset"""
-
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- self._vol_file = os.path.join(self.tempdir, 'test_vol.vol')
- self._file_info = os.path.join(self.tempdir, 'test_vol.info.vol')
- self._dataset_shape = 100, 20, 5
- data = numpy.random.random(self._dataset_shape[0] *
- self._dataset_shape[1] *
- self._dataset_shape[2]).astype(dtype=numpy.float32).reshape(self._dataset_shape)
- numpy.save(file=self._vol_file, arr=data)
- # those are storing into .noz file
- assert os.path.exists(self._vol_file + '.npy')
- os.rename(self._vol_file + '.npy', self._vol_file)
- self.h5_file = os.path.join(self.tempdir, 'test_h5.h5')
- self.external_dataset_path = '/root/my_external_dataset'
- self._data_url = silx.io.url.DataUrl(file_path=self.h5_file,
- data_path=self.external_dataset_path)
- with open(self._file_info, 'w') as _fi:
- _fi.write('NUM_X = %s\n' % self._dataset_shape[2])
- _fi.write('NUM_Y = %s\n' % self._dataset_shape[1])
- _fi.write('NUM_Z = %s\n' % self._dataset_shape[0])
-
- def tearDown(self):
- shutil.rmtree(self.tempdir)
-
- def check_dataset(self, h5_file, data_path, shape):
- """Make sure the external dataset is valid"""
- with h5py.File(h5_file, 'r') as _file:
- return data_path in _file and _file[data_path].shape == shape
-
- def test_h5_file_not_existing(self):
- """Test that can create a file with external dataset from scratch"""
- utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
- output_url=self._data_url,
- shape=(100, 20, 5),
- dtype=numpy.float32)
- self.assertTrue(self.check_dataset(h5_file=self.h5_file,
- data_path=self.external_dataset_path,
- shape=self._dataset_shape))
- os.remove(self.h5_file)
- utils.vol_to_h5_external_dataset(vol_file=self._vol_file,
- output_url=self._data_url,
- info_file=self._file_info)
- self.assertTrue(self.check_dataset(h5_file=self.h5_file,
- data_path=self.external_dataset_path,
- shape=self._dataset_shape))
-
- def test_h5_file_existing(self):
- """Test that can add the external dataset from an existing file"""
- with h5py.File(self.h5_file, 'w') as _file:
- _file['/root/dataset1'] = numpy.zeros((100, 100))
- _file['/root/group/dataset2'] = numpy.ones((100, 100))
- utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
- output_url=self._data_url,
- shape=(100, 20, 5),
- dtype=numpy.float32)
- self.assertTrue(self.check_dataset(h5_file=self.h5_file,
- data_path=self.external_dataset_path,
- shape=self._dataset_shape))
-
- def test_vol_file_not_existing(self):
- """Make sure error is raised if .vol file does not exists"""
- os.remove(self._vol_file)
- utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
- output_url=self._data_url,
- shape=(100, 20, 5),
- dtype=numpy.float32)
-
- self.assertTrue(self.check_dataset(h5_file=self.h5_file,
- data_path=self.external_dataset_path,
- shape=self._dataset_shape))
-
- def test_conflicts(self):
- """Test several conflict cases"""
- # test if path already exists
- utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
- output_url=self._data_url,
- shape=(100, 20, 5),
- dtype=numpy.float32)
- with self.assertRaises(ValueError):
- utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
- output_url=self._data_url,
- shape=(100, 20, 5),
- overwrite=False,
- dtype=numpy.float32)
-
- utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
- output_url=self._data_url,
- shape=(100, 20, 5),
- overwrite=True,
- dtype=numpy.float32)
-
- self.assertTrue(self.check_dataset(h5_file=self.h5_file,
- data_path=self.external_dataset_path,
- shape=self._dataset_shape))
-
-
-class TestH5Strings(unittest.TestCase):
- """Test HDF5 str and bytes writing and reading"""
-
- @classmethod
- def setUpClass(cls):
- cls.tempdir = tempfile.mkdtemp()
- cls.vlenstr = h5py.special_dtype(vlen=str)
- cls.vlenbytes = h5py.special_dtype(vlen=bytes)
- try:
- cls.unicode = unicode
- except NameError:
- cls.unicode = str
-
- @classmethod
- def tearDownClass(cls):
- shutil.rmtree(cls.tempdir)
-
- def setUp(self):
- self.file = h5py.File(os.path.join(self.tempdir, 'file.h5'), mode="w")
-
- def tearDown(self):
- self.file.close()
-
- @classmethod
- def _make_array(cls, value, n):
- if isinstance(value, bytes):
- dtype = cls.vlenbytes
- elif isinstance(value, cls.unicode):
- dtype = cls.vlenstr
- else:
- return numpy.array([value] * n)
- return numpy.array([value] * n, dtype=dtype)
-
- @classmethod
- def _get_charset(cls, value):
- if isinstance(value, bytes):
- return h5py.h5t.CSET_ASCII
- elif isinstance(value, cls.unicode):
- return h5py.h5t.CSET_UTF8
- else:
- return None
-
- def _check_dataset(self, value, result=None):
- # Write+read scalar
- if result:
- decode_ascii = True
- else:
- decode_ascii = False
- result = value
- charset = self._get_charset(value)
- self.file["data"] = value
- data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii)
- assert type(data) == type(result), data
- assert data == result, data
- if charset:
- assert self.file["data"].id.get_type().get_cset() == charset
-
- # Write+read variable length
- self.file["vlen_data"] = self._make_array(value, 2)
- data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii, index=0)
- assert type(data) == type(result), data
- assert data == result, data
- data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii)
- numpy.testing.assert_array_equal(data, [result] * 2)
- if charset:
- assert self.file["vlen_data"].id.get_type().get_cset() == charset
-
- def _check_attribute(self, value, result=None):
- if result:
- decode_ascii = True
- else:
- decode_ascii = False
- result = value
- self.file.attrs["data"] = value
- data = utils.h5py_read_attribute(self.file.attrs, "data", decode_ascii=decode_ascii)
- assert type(data) == type(result), data
- assert data == result, data
-
- self.file.attrs["vlen_data"] = self._make_array(value, 2)
- data = utils.h5py_read_attribute(self.file.attrs, "vlen_data", decode_ascii=decode_ascii)
- assert type(data[0]) == type(result), data[0]
- assert data[0] == result, data[0]
- numpy.testing.assert_array_equal(data, [result] * 2)
-
- data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)["vlen_data"]
- assert type(data[0]) == type(result), data[0]
- assert data[0] == result, data[0]
- numpy.testing.assert_array_equal(data, [result] * 2)
-
- def test_dataset_ascii_bytes(self):
- self._check_dataset(b"abc")
-
- def test_attribute_ascii_bytes(self):
- self._check_attribute(b"abc")
-
- def test_dataset_ascii_bytes_decode(self):
- self._check_dataset(b"abc", result="abc")
-
- def test_attribute_ascii_bytes_decode(self):
- self._check_attribute(b"abc", result="abc")
-
- def test_dataset_ascii_str(self):
- self._check_dataset("abc")
-
- def test_attribute_ascii_str(self):
- self._check_attribute("abc")
-
- def test_dataset_utf8_str(self):
- self._check_dataset("\u0101bc")
-
- def test_attribute_utf8_str(self):
- self._check_attribute("\u0101bc")
-
- def test_dataset_utf8_bytes(self):
- # 0xC481 is the byte representation of U+0101
- self._check_dataset(b"\xc4\x81bc")
-
- def test_attribute_utf8_bytes(self):
- # 0xC481 is the byte representation of U+0101
- self._check_attribute(b"\xc4\x81bc")
-
- def test_dataset_utf8_bytes_decode(self):
- # 0xC481 is the byte representation of U+0101
- self._check_dataset(b"\xc4\x81bc", result="\u0101bc")
-
- def test_attribute_utf8_bytes_decode(self):
- # 0xC481 is the byte representation of U+0101
- self._check_attribute(b"\xc4\x81bc", result="\u0101bc")
-
- def test_dataset_latin1_bytes(self):
- # extended ascii character 0xE4
- self._check_dataset(b"\xe423")
-
- def test_attribute_latin1_bytes(self):
- # extended ascii character 0xE4
- self._check_attribute(b"\xe423")
-
- def test_dataset_latin1_bytes_decode(self):
- # U+DCE4: surrogate for extended ascii character 0xE4
- self._check_dataset(b"\xe423", result="\udce423")
-
- def test_attribute_latin1_bytes_decode(self):
- # U+DCE4: surrogate for extended ascii character 0xE4
- self._check_attribute(b"\xe423", result="\udce423")
-
- def test_dataset_no_string(self):
- self._check_dataset(numpy.int64(10))
-
- def test_attribute_no_string(self):
- self._check_attribute(numpy.int64(10))
-
-
-def suite():
- loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
- test_suite = unittest.TestSuite()
- test_suite.addTest(loadTests(TestSave))
- test_suite.addTest(loadTests(TestH5Ls))
- test_suite.addTest(loadTests(TestOpen))
- test_suite.addTest(loadTests(TestNodes))
- test_suite.addTest(loadTests(TestGetData))
- test_suite.addTest(loadTests(TestRawFileToH5))
- test_suite.addTest(loadTests(TestH5Strings))
- return test_suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest="suite")