diff options
Diffstat (limited to 'silx/io/test')
-rw-r--r-- | silx/io/test/__init__.py | 6 | ||||
-rw-r--r-- | silx/io/test/test_commonh5.py | 306 | ||||
-rw-r--r-- | silx/io/test/test_fabioh5.py | 93 | ||||
-rw-r--r-- | silx/io/test/test_nxdata.py | 7 | ||||
-rw-r--r-- | silx/io/test/test_rawh5.py | 96 | ||||
-rw-r--r-- | silx/io/test/test_specfile.py | 9 | ||||
-rw-r--r-- | silx/io/test/test_specfilewrapper.py | 3 | ||||
-rw-r--r-- | silx/io/test/test_spech5.py | 123 | ||||
-rw-r--r-- | silx/io/test/test_spectoh5.py | 23 | ||||
-rw-r--r-- | silx/io/test/test_utils.py | 186 |
10 files changed, 722 insertions, 130 deletions
diff --git a/silx/io/test/__init__.py b/silx/io/test/__init__.py index 72bd8f9..2510ae2 100644 --- a/silx/io/test/__init__.py +++ b/silx/io/test/__init__.py @@ -24,7 +24,7 @@ __authors__ = ["T. Vincent", "P. Knobel"] __license__ = "MIT" -__date__ = "31/08/2016" +__date__ = "20/09/2017" import unittest @@ -37,6 +37,8 @@ from .test_octaveh5 import suite as test_octaveh5_suite from .test_fabioh5 import suite as test_fabioh5_suite from .test_utils import suite as test_utils_suite from .test_nxdata import suite as test_nxdata_suite +from .test_commonh5 import suite as test_commonh5_suite +from .test_rawh5 import suite as test_rawh5_suite def suite(): @@ -50,4 +52,6 @@ def suite(): test_suite.addTest(test_utils_suite()) test_suite.addTest(test_fabioh5_suite()) test_suite.addTest(test_nxdata_suite()) + test_suite.addTest(test_commonh5_suite()) + test_suite.addTest(test_rawh5_suite()) return test_suite diff --git a/silx/io/test/test_commonh5.py b/silx/io/test/test_commonh5.py new file mode 100644 index 0000000..05ef558 --- /dev/null +++ b/silx/io/test/test_commonh5.py @@ -0,0 +1,306 @@ +# coding: utf-8 +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for commonh5 wrapper""" + +__authors__ = ["V. Valls"] +__license__ = "MIT" +__date__ = "21/09/2017" + +import logging +import numpy +import unittest +import tempfile +import shutil + +_logger = logging.getLogger(__name__) + +import silx.io +import silx.io.utils + +try: + import h5py +except ImportError: + h5py = None + +try: + from .. import commonh5 +except ImportError: + commonh5 = None + + +class TestCommonFeatures(unittest.TestCase): + """Test common features supported by h5py and our implementation.""" + + @classmethod + def createFile(cls): + return None + + @classmethod + def setUpClass(cls): + # Set to None cause create_resource can raise an excpetion + cls.h5 = None + cls.h5 = cls.create_resource() + if cls.h5 is None: + raise unittest.SkipTest("File not created") + + @classmethod + def create_resource(cls): + """Must be implemented""" + return None + + @classmethod + def tearDownClass(cls): + cls.h5 = None + + def test_file(self): + node = self.h5 + self.assertTrue(silx.io.is_file(node)) + self.assertTrue(silx.io.is_group(node)) + self.assertFalse(silx.io.is_dataset(node)) + self.assertEqual(len(node.attrs), 0) + + def test_group(self): + node = self.h5["group"] + self.assertFalse(silx.io.is_file(node)) + self.assertTrue(silx.io.is_group(node)) + self.assertFalse(silx.io.is_dataset(node)) + self.assertEqual(len(node.attrs), 0) + if h5py is not None: + class_ = self.h5.get("group", getclass=True) + classlink = self.h5.get("group", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Group) + self.assertEqual(classlink, h5py.HardLink) + + def test_dataset(self): + node = self.h5["group/dataset"] + self.assertFalse(silx.io.is_file(node)) + self.assertFalse(silx.io.is_group(node)) + self.assertTrue(silx.io.is_dataset(node)) + self.assertEqual(len(node.attrs), 0) + if h5py is not None: + class_ = self.h5.get("group/dataset", getclass=True) + classlink = self.h5.get("group/dataset", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertEqual(classlink, h5py.HardLink) + + def test_soft_link(self): + node = self.h5["link/soft_link"] + self.assertEqual(node.name, "/link/soft_link") + if h5py is not None: + class_ = self.h5.get("link/soft_link", getclass=True) + link = self.h5.get("link/soft_link", getlink=True) + classlink = self.h5.get("link/soft_link", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertTrue(isinstance(link, (h5py.SoftLink, commonh5.SoftLink))) + self.assertTrue(silx.io.utils.is_softlink(link)) + self.assertEqual(classlink, h5py.SoftLink) + + def test_external_link(self): + node = self.h5["link/external_link"] + self.assertEqual(node.name, "/target/dataset") + if h5py is not None: + class_ = self.h5.get("link/external_link", getclass=True) + classlink = self.h5.get("link/external_link", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertEqual(classlink, h5py.ExternalLink) + + def test_external_link_to_link(self): + node = self.h5["link/external_link_to_link"] + self.assertEqual(node.name, "/target/link") + if h5py is not None: + class_ = self.h5.get("link/external_link_to_link", getclass=True) + classlink = self.h5.get("link/external_link_to_link", getlink=True, getclass=True) + self.assertEqual(class_, h5py.Dataset) + self.assertEqual(classlink, h5py.ExternalLink) + + def test_create_groups(self): + c = self.h5.create_group(self.id() + "/a/b/c") + d = c.create_group("/" + self.id() + "/a/b/d") + + self.assertRaises(ValueError, self.h5.create_group, self.id() + "/a/b/d") + self.assertEqual(c.name, "/" + self.id() + "/a/b/c") + self.assertEqual(d.name, "/" + self.id() + "/a/b/d") + + def test_setitem_python_object_dataset(self): + group = self.h5.create_group(self.id()) + group["a"] = 10 + self.assertEqual(group["a"].dtype.kind, "i") + + def test_setitem_numpy_dataset(self): + group = self.h5.create_group(self.id()) + group["a"] = numpy.array([10, 20, 30]) + self.assertEqual(group["a"].dtype.kind, "i") + self.assertEqual(group["a"].shape, (3,)) + + def test_setitem_link(self): + group = self.h5.create_group(self.id()) + group["a"] = 10 + group["b"] = group["a"] + self.assertEqual(group["b"].dtype.kind, "i") + + def test_setitem_dataset_is_sub_group(self): + self.h5[self.id() + "/a"] = 10 + + +class TestCommonFeatures_h5py(TestCommonFeatures): + """Check if h5py is compliant with what we expect.""" + + @classmethod + def create_resource(cls): + cls.tmp_dir = tempfile.mkdtemp() + + externalh5 = h5py.File(cls.tmp_dir + "/external.h5", mode="w") + externalh5["target/dataset"] = 50 + externalh5["target/link"] = h5py.SoftLink("/target/dataset") + externalh5.close() + + h5 = h5py.File(cls.tmp_dir + "/base.h5", mode="w") + h5["group/dataset"] = 50 + h5["link/soft_link"] = h5py.SoftLink("/group/dataset") + h5["link/external_link"] = h5py.ExternalLink("external.h5", "/target/dataset") + h5["link/external_link_to_link"] = h5py.ExternalLink("external.h5", "/target/link") + + return h5 + + @classmethod + def tearDownClass(cls): + super(TestCommonFeatures_h5py, cls).tearDownClass() + if hasattr(cls, "tmp_dir") and cls.tmp_dir is not None: + shutil.rmtree(cls.tmp_dir) + + +class TestCommonFeatures_commonH5(TestCommonFeatures): + """Check if commonh5 is compliant with h5py.""" + + @classmethod + def create_resource(cls): + h5 = commonh5.File("base.h5", "w") + h5.create_group("group").create_dataset("dataset", data=numpy.int32(50)) + + link = h5.create_group("link") + link.add_node(commonh5.SoftLink("soft_link", "/group/dataset")) + + return h5 + + def test_external_link(self): + # not applicable + pass + + def test_external_link_to_link(self): + # not applicable + pass + + +class TestSpecificCommonH5(unittest.TestCase): + """Test specific features from commonh5. + + Test of shared features should be done by TestCommonFeatures.""" + + def setUp(self): + if h5py is None: + self.skipTest("h5py is needed") + if commonh5 is None: + self.skipTest("silx.io.commonh5 is needed") + + def test_node_attrs(self): + node = commonh5.Node("Foo", attrs={"a": 1}) + self.assertEqual(node.attrs["a"], 1) + node.attrs["b"] = 8 + self.assertEqual(node.attrs["b"], 8) + node.attrs["b"] = 2 + self.assertEqual(node.attrs["b"], 2) + + def test_node_readonly_attrs(self): + f = commonh5.File(name="Foo", mode="r") + node = commonh5.Node("Foo", attrs={"a": 1}) + node.attrs["b"] = 8 + f.add_node(node) + self.assertEqual(node.attrs["b"], 8) + try: + node.attrs["b"] = 1 + self.fail() + except RuntimeError: + pass + + def test_create_dataset(self): + f = commonh5.File(name="Foo", mode="w") + node = f.create_dataset("foo", data=numpy.array([1])) + self.assertIs(node.parent, f) + self.assertIs(f["foo"], node) + + def test_create_group(self): + f = commonh5.File(name="Foo", mode="w") + node = f.create_group("foo") + self.assertIs(node.parent, f) + self.assertIs(f["foo"], node) + + def test_readonly_create_dataset(self): + f = commonh5.File(name="Foo", mode="r") + try: + f.create_dataset("foo", data=numpy.array([1])) + self.fail() + except RuntimeError: + pass + + def test_readonly_create_group(self): + f = commonh5.File(name="Foo", mode="r") + try: + f.create_group("foo") + self.fail() + except RuntimeError: + pass + + def test_create_unicode_dataset(self): + f = commonh5.File(name="Foo", mode="w") + try: + f.create_dataset("foo", data=numpy.array(u"aaaa")) + self.fail() + except TypeError: + pass + + def test_setitem_dataset(self): + self.h5 = commonh5.File(name="Foo", mode="w") + group = self.h5.create_group(self.id()) + group["a"] = commonh5.Dataset(None, data=numpy.array(10)) + self.assertEqual(group["a"].dtype.kind, "i") + + def test_setitem_explicit_link(self): + self.h5 = commonh5.File(name="Foo", mode="w") + group = self.h5.create_group(self.id()) + group["a"] = 10 + group["b"] = commonh5.SoftLink(None, path="/" + self.id() + "/a") + self.assertEqual(group["b"].dtype.kind, "i") + + +def suite(): + loadTests = unittest.defaultTestLoader.loadTestsFromTestCase + test_suite = unittest.TestSuite() + test_suite.addTest(loadTests(TestCommonFeatures_h5py)) + test_suite.addTest(loadTests(TestCommonFeatures_commonH5)) + test_suite.addTest(loadTests(TestSpecificCommonH5)) + return test_suite + + +if __name__ == '__main__': + unittest.main(defaultTest="suite") diff --git a/silx/io/test/test_fabioh5.py b/silx/io/test/test_fabioh5.py index c1f4a43..0237620 100644 --- a/silx/io/test/test_fabioh5.py +++ b/silx/io/test/test_fabioh5.py @@ -25,7 +25,7 @@ __authors__ = ["V. Valls"] __license__ = "MIT" -__date__ = "11/04/2017" +__date__ = "29/08/2017" import logging import numpy @@ -46,6 +46,7 @@ except ImportError: if fabio is not None and h5py is not None: from .. import fabioh5 + from .. import commonh5 class TestFabioH5(unittest.TestCase): @@ -95,12 +96,72 @@ class TestFabioH5(unittest.TestCase): # result tested with a default h5py file self.assertRaises(KeyError, lambda: self.h5_image["foo/foo"]) - def test_frames(self): - dataset = self.h5_image["/scan_0/instrument/detector_0/data"] + def test_single_frame(self): + data = numpy.arange(2 * 3) + data.shape = 2, 3 + fabio_image = fabio.edfimage.edfimage(data=data) + h5_image = fabioh5.File(fabio_image=fabio_image) + + dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEquals(dataset.h5py_class, h5py.Dataset) + self.assertTrue(isinstance(dataset[()], numpy.ndarray)) + self.assertEquals(dataset.dtype.kind, "i") + self.assertEquals(dataset.shape, (2, 3)) + self.assertEquals(dataset[...][0, 0], 0) + self.assertEquals(dataset.attrs["interpretation"], "image") + + def test_multi_frames(self): + data = numpy.arange(2 * 3) + data.shape = 2, 3 + fabio_image = fabio.edfimage.edfimage(data=data) + fabio_image.appendFrame(data=data) + h5_image = fabioh5.File(fabio_image=fabio_image) + + dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEquals(dataset.h5py_class, h5py.Dataset) + self.assertTrue(isinstance(dataset[()], numpy.ndarray)) + self.assertEquals(dataset.dtype.kind, "i") + self.assertEquals(dataset.shape, (2, 2, 3)) + self.assertEquals(dataset[...][0, 0, 0], 0) + self.assertEquals(dataset.attrs["interpretation"], "image") + + def test_heterogeneous_frames(self): + """Frames containing 2 images with different sizes and a cube""" + data1 = numpy.arange(2 * 3) + data1.shape = 2, 3 + data2 = numpy.arange(2 * 5) + data2.shape = 2, 5 + data3 = numpy.arange(2 * 5 * 1) + data3.shape = 2, 5, 1 + fabio_image = fabio.edfimage.edfimage(data=data1) + fabio_image.appendFrame(data=data2) + fabio_image.appendFrame(data=data3) + h5_image = fabioh5.File(fabio_image=fabio_image) + + dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEquals(dataset.h5py_class, h5py.Dataset) + self.assertTrue(isinstance(dataset[()], numpy.ndarray)) + self.assertEquals(dataset.dtype.kind, "i") + self.assertEquals(dataset.shape, (3, 2, 5, 1)) + self.assertEquals(dataset[...][0, 0, 0], 0) + self.assertEquals(dataset.attrs["interpretation"], "image") + + def test_single_3d_frame(self): + """Image source contains a cube""" + data = numpy.arange(2 * 3 * 4) + data.shape = 2, 3, 4 + # Do not provide the data to the constructor to avoid slicing of the + # data. In this way the result stay a cube, and not a multi-frame + fabio_image = fabio.edfimage.edfimage() + fabio_image.data = data + h5_image = fabioh5.File(fabio_image=fabio_image) + + dataset = h5_image["/scan_0/instrument/detector_0/data"] self.assertEquals(dataset.h5py_class, h5py.Dataset) self.assertTrue(isinstance(dataset[()], numpy.ndarray)) self.assertEquals(dataset.dtype.kind, "i") - self.assertEquals(dataset.shape, (1, 3, 2)) + self.assertEquals(dataset.shape, (2, 3, 4)) + self.assertEquals(dataset[...][0, 0, 0], 0) self.assertEquals(dataset.attrs["interpretation"], "image") def test_metadata_int(self): @@ -224,6 +285,30 @@ class TestFabioH5(unittest.TestCase): self.assertIn(d.dtype.char, ['d', 'f']) numpy.testing.assert_array_almost_equal(d[...], expected) + def test_get_api(self): + result = self.h5_image.get("scan_0", getclass=True, getlink=True) + self.assertIs(result, h5py.HardLink) + result = self.h5_image.get("scan_0", getclass=False, getlink=True) + self.assertIsInstance(result, h5py.HardLink) + result = self.h5_image.get("scan_0", getclass=True, getlink=False) + self.assertIs(result, h5py.Group) + result = self.h5_image.get("scan_0", getclass=False, getlink=False) + self.assertIsInstance(result, commonh5.Group) + + def test_detector_link(self): + detector1 = self.h5_image["/scan_0/instrument/detector_0"] + detector2 = self.h5_image["/scan_0/measurement/image_0/info"] + self.assertIsNot(detector1, detector2) + self.assertEqual(list(detector1.items()), list(detector2.items())) + self.assertEqual(self.h5_image.get(detector2.name, getlink=True).path, detector1.name) + + def test_detector_data_link(self): + data1 = self.h5_image["/scan_0/instrument/detector_0/data"] + data2 = self.h5_image["/scan_0/measurement/image_0/data"] + self.assertIsNot(data1, data2) + self.assertIs(data1._get_data(), data2._get_data()) + self.assertEqual(self.h5_image.get(data2.name, getlink=True).path, data1.name) + def suite(): test_suite = unittest.TestSuite() diff --git a/silx/io/test/test_nxdata.py b/silx/io/test/test_nxdata.py index caa8c1e..f891db9 100644 --- a/silx/io/test/test_nxdata.py +++ b/silx/io/test/test_nxdata.py @@ -231,6 +231,13 @@ class TestNXdata(unittest.TestCase): self.assertEqual(nxd.errors.shape, (2, 2, 3, 10)) self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) self.assertEqual(nxd.interpretation, "spectrum") + self.assertEqual(nxd.get_axis_errors("energy").shape, + (10,)) + # test getting axis errors by long_name + self.assertTrue(numpy.array_equal(nxd.get_axis_errors("Calibrated energy"), + nxd.get_axis_errors("energy"))) + self.assertTrue(numpy.array_equal(nxd.get_axis_errors(b"Calibrated energy"), + nxd.get_axis_errors("energy"))) def testImages(self): nxd = nxdata.NXdata(self.h5f["images/2D_regular_image"]) diff --git a/silx/io/test/test_rawh5.py b/silx/io/test/test_rawh5.py new file mode 100644 index 0000000..0f7205c --- /dev/null +++ b/silx/io/test/test_rawh5.py @@ -0,0 +1,96 @@ +# coding: utf-8 +# /*########################################################################## +# +# Copyright (c) 2016 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ###########################################################################*/ +"""Test for silx.gui.hdf5 module""" + +__authors__ = ["V. Valls"] +__license__ = "MIT" +__date__ = "21/09/2017" + + +import unittest +import tempfile +import numpy +import shutil +from ..import rawh5 + + +class TestNumpyFile(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tmpDirectory = tempfile.mkdtemp() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmpDirectory) + + def testNumpyFile(self): + filename = "%s/%s.npy" % (self.tmpDirectory, self.id()) + c = numpy.random.rand(5, 5) + numpy.save(filename, c) + h5 = rawh5.NumpyFile(filename) + self.assertIn("data", h5) + self.assertEqual(h5["data"].dtype.kind, "f") + + def testNumpyZFile(self): + filename = "%s/%s.npz" % (self.tmpDirectory, self.id()) + a = numpy.array(u"aaaaa") + b = numpy.array([1, 2, 3, 4]) + c = numpy.random.rand(5, 5) + d = numpy.array(b"aaaaa") + e = numpy.array(u"i \u2661 my mother") + numpy.savez(filename, a, b=b, c=c, d=d, e=e) + h5 = rawh5.NumpyFile(filename) + self.assertIn("arr_0", h5) + self.assertIn("b", h5) + self.assertIn("c", h5) + self.assertIn("d", h5) + self.assertIn("e", h5) + self.assertEqual(h5["arr_0"].dtype.kind, "U") + self.assertEqual(h5["b"].dtype.kind, "i") + self.assertEqual(h5["c"].dtype.kind, "f") + self.assertEqual(h5["d"].dtype.kind, "S") + self.assertEqual(h5["e"].dtype.kind, "U") + + def testNumpyZFileContainingDirectories(self): + filename = "%s/%s.npz" % (self.tmpDirectory, self.id()) + data = {} + data['a/b/c'] = numpy.arange(10) + data['a/b/e'] = numpy.arange(10) + numpy.savez(filename, **data) + h5 = rawh5.NumpyFile(filename) + self.assertIn("a/b/c", h5) + self.assertIn("a/b/e", h5) + + +def suite(): + test_suite = unittest.TestSuite() + loadTests = unittest.defaultTestLoader.loadTestsFromTestCase + test_suite.addTest(loadTests(TestNumpyFile)) + return test_suite + + +if __name__ == '__main__': + unittest.main(defaultTest='suite') diff --git a/silx/io/test/test_specfile.py b/silx/io/test/test_specfile.py index 884cb04..0aef2e2 100644 --- a/silx/io/test/test_specfile.py +++ b/silx/io/test/test_specfile.py @@ -25,7 +25,7 @@ __authors__ = ["P. Knobel", "V.A. Sole"] __license__ = "MIT" -__date__ = "24/04/2017" +__date__ = "03/08/2017" import gc import locale @@ -36,12 +36,14 @@ import sys import tempfile import unittest -logging.basicConfig() -logger1 = logging.getLogger(__name__) +from silx.test import utils from ..specfile import SpecFile, Scan from .. import specfile + +logger1 = logging.getLogger(__name__) + sftext = """#F /tmp/sf.dat #E 1455180875 #D Thu Feb 11 09:54:35 2016 @@ -374,6 +376,7 @@ class TestSpecFile(unittest.TestCase): self.assertEqual(self.scan25.mca.channels, []) + @utils.test_logging(specfile._logger.name, warning=1) def test_empty_scan(self): """Test reading a scan with no data points""" self.assertEqual(len(self.empty_scan.labels), diff --git a/silx/io/test/test_specfilewrapper.py b/silx/io/test/test_specfilewrapper.py index e9e1a24..c5b0f39 100644 --- a/silx/io/test/test_specfilewrapper.py +++ b/silx/io/test/test_specfilewrapper.py @@ -25,7 +25,7 @@ __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "05/07/2016" +__date__ = "15/05/2017" import gc import locale @@ -36,7 +36,6 @@ import sys import tempfile import unittest -logging.basicConfig() logger1 = logging.getLogger(__name__) from ..specfilewrapper import Specfile diff --git a/silx/io/test/test_spech5.py b/silx/io/test/test_spech5.py index ac250ab..1d04c6f 100644 --- a/silx/io/test/test_spech5.py +++ b/silx/io/test/test_spech5.py @@ -25,12 +25,16 @@ import gc from numpy import array_equal import os +import io import sys import tempfile import unittest import datetime from functools import partial +from silx.test import utils + +from .. import spech5 from ..spech5 import (SpecH5, SpecH5Group, SpecH5Dataset, spec_date_to_iso8601) from .. import specfile @@ -42,7 +46,7 @@ except ImportError: __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "11/05/2017" +__date__ = "26/07/2017" sftext = """#F /tmp/sf.dat #E 1455180875 @@ -107,6 +111,12 @@ sftext = """#F /tmp/sf.dat #L a b 1 2 +#S 1001 ccccc +#G1 0. 0. 0. 0 0 0 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353 +#G3 0. 0. 0. 0. 0.0 0. 0. 0. 0. +#L a b +1 2 + """ @@ -254,9 +264,6 @@ class TestSpecH5(unittest.TestCase): # full path to element in group (OK) self.assertIn("/1.1/instrument/positioners/Sslit1 HOff", self.sfh5["/1.1/instrument"]) - # full path to element outside group (illegal) - self.assertNotIn("/1.1/instrument/positioners/Sslit1 HOff", - self.sfh5["/1.1/measurement"]) def testDataColumn(self): self.assertAlmostEqual(sum(self.sfh5["/1.2/measurement/duo"]), @@ -306,9 +313,20 @@ class TestSpecH5(unittest.TestCase): # spech5 does not define external link, so there is no way # a group can *get* a SpecH5 class + @unittest.skipIf(h5py is None, "test requires h5py (not installed)") + def testGetApi(self): + result = self.sfh5.get("1.1", getclass=True, getlink=True) + self.assertIs(result, h5py.HardLink) + result = self.sfh5.get("1.1", getclass=False, getlink=True) + self.assertIsInstance(result, h5py.HardLink) + result = self.sfh5.get("1.1", getclass=True, getlink=False) + self.assertIs(result, h5py.Group) + result = self.sfh5.get("1.1", getclass=False, getlink=False) + self.assertIsInstance(result, spech5.SpecH5Group) + def testGetItemGroup(self): group = self.sfh5["25.1"]["instrument"] - self.assertEqual(group["positioners"].keys(), + self.assertEqual(list(group["positioners"].keys()), ["Pslit HGap", "MRTSlit UP", "MRTSlit DOWN", "Sslit1 VOff", "Sslit1 HOff", "Sslit1 VGap"]) with self.assertRaises(KeyError): @@ -389,8 +407,8 @@ class TestSpecH5(unittest.TestCase): self.sfh5["/1.2/instrument/mca_0/elapsed_time"]) def testListScanIndices(self): - self.assertEqual(self.sfh5.keys(), - ["1.1", "25.1", "1.2", "1000.1"]) + self.assertEqual(list(self.sfh5.keys()), + ["1.1", "25.1", "1.2", "1000.1", "1001.1"]) self.assertEqual(self.sfh5["1.2"].attrs, {"NX_class": "NXentry", }) @@ -477,30 +495,80 @@ class TestSpecH5(unittest.TestCase): self.assertEqual(self.sfh5["/25.1/title"], b"25 ascan c3th 1.33245 1.52245 40 0.15") + def testValues(self): + group = self.sfh5["/25.1"] + self.assertTrue(hasattr(group, "values")) + self.assertTrue(callable(group.values)) + self.assertIn(self.sfh5["/25.1/title"], + self.sfh5["/25.1"].values()) + # visit and visititems ignore links def testVisit(self): name_list = [] self.sfh5.visit(name_list.append) - self.assertIn('/1.2/instrument/positioners/Pslit HGap', name_list) - self.assertIn("/1.2/instrument/specfile/scan_header", name_list) - self.assertEqual(len(name_list), 100) #, "actual name list: %s" % "\n".join(name_list)) + self.assertIn('1.2/instrument/positioners/Pslit HGap', name_list) + self.assertIn("1.2/instrument/specfile/scan_header", name_list) + self.assertEqual(len(name_list), 117) + + # test also visit of a subgroup, with various group name formats + name_list_leading_and_trailing_slash = [] + self.sfh5['/1.2/instrument/'].visit(name_list_leading_and_trailing_slash.append) + name_list_leading_slash = [] + self.sfh5['/1.2/instrument'].visit(name_list_leading_slash.append) + name_list_trailing_slash = [] + self.sfh5['1.2/instrument/'].visit(name_list_trailing_slash.append) + name_list_no_slash = [] + self.sfh5['1.2/instrument'].visit(name_list_no_slash.append) + + # no differences expected in the output names + self.assertEqual(name_list_leading_and_trailing_slash, + name_list_leading_slash) + self.assertEqual(name_list_leading_slash, + name_list_trailing_slash) + self.assertEqual(name_list_leading_slash, + name_list_no_slash) + self.assertIn("positioners/Pslit HGap", name_list_no_slash) + self.assertIn("positioners", name_list_no_slash) def testVisitItems(self): dataset_name_list = [] - def func(name, obj): - if isinstance(obj, SpecH5Dataset): - dataset_name_list.append(name) - - self.sfh5.visititems(func) - self.assertIn('/1.2/instrument/positioners/Pslit HGap', dataset_name_list) - self.assertEqual(len(dataset_name_list), 73) + def func_generator(l): + """return a function appending names to list l""" + def func(name, obj): + if isinstance(obj, SpecH5Dataset): + l.append(name) + return func + + self.sfh5.visititems(func_generator(dataset_name_list)) + self.assertIn('1.2/instrument/positioners/Pslit HGap', dataset_name_list) + self.assertEqual(len(dataset_name_list), 85) + + # test also visit of a subgroup, with various group name formats + name_list_leading_and_trailing_slash = [] + self.sfh5['/1.2/instrument/'].visititems(func_generator(name_list_leading_and_trailing_slash)) + name_list_leading_slash = [] + self.sfh5['/1.2/instrument'].visititems(func_generator(name_list_leading_slash)) + name_list_trailing_slash = [] + self.sfh5['1.2/instrument/'].visititems(func_generator(name_list_trailing_slash)) + name_list_no_slash = [] + self.sfh5['1.2/instrument'].visititems(func_generator(name_list_no_slash)) + + # no differences expected in the output names + self.assertEqual(name_list_leading_and_trailing_slash, + name_list_leading_slash) + self.assertEqual(name_list_leading_slash, + name_list_trailing_slash) + self.assertEqual(name_list_leading_slash, + name_list_no_slash) + self.assertIn("positioners/Pslit HGap", name_list_no_slash) def testNotSpecH5(self): fd, fname = tempfile.mkstemp() os.write(fd, b"Not a spec file!") os.close(fd) self.assertRaises(specfile.SfErrFileOpen, SpecH5, fname) + self.assertRaises(IOError, SpecH5, fname) os.unlink(fname) def testSample(self): @@ -511,6 +579,21 @@ class TestSpecH5(unittest.TestCase): self.assertIn("unit_cell_abc", self.sfh5["/1000.1/sample"]) self.assertIn("unit_cell_alphabetagamma", self.sfh5["/1000.1/sample"]) + # All 0 values + self.assertNotIn("sample", self.sfh5["/1001.1"]) + with self.assertRaises(KeyError): + uc = self.sfh5["/1001.1/sample/unit_cell"] + + @utils.test_logging(spech5.logger1.name, warning=2) + def testOpenFileDescriptor(self): + """Open a SpecH5 file from a file descriptor""" + with io.open(self.sfh5.filename) as f: + sfh5 = SpecH5(f) + self.assertIsNotNone(sfh5) + name_list = [] + # check if the object is working + self.sfh5.visit(name_list.append) + sftext_multi_mca_headers = """ #S 1 aaaaaa @@ -767,7 +850,7 @@ class TestSpecH5SlashInLabels(unittest.TestCase): def testLabels(self): """Ensure `/` is substituted with `%` and ensure legitimate `%` in names are still working""" - self.assertEqual(self.sfh5["1.1/measurement/"].keys(), + self.assertEqual(list(self.sfh5["1.1/measurement/"].keys()), ["GONY%mm", "PD3%A"]) # substituted "%" @@ -784,7 +867,7 @@ class TestSpecH5SlashInLabels(unittest.TestCase): def testMotors(self): """Ensure `/` is substituted with `%` and ensure legitimate `%` in names are still working""" - self.assertEqual(self.sfh5["1.1/instrument/positioners"].keys(), + self.assertEqual(list(self.sfh5["1.1/instrument/positioners"].keys()), ["Pslit%HGap", "MRTSlit%UP"]) # substituted "%" self.assertIn("Pslit%HGap", @@ -799,8 +882,6 @@ class TestSpecH5SlashInLabels(unittest.TestCase): self.sfh5["1.1/instrument/positioners"]) - - def suite(): test_suite = unittest.TestSuite() test_suite.addTest( diff --git a/silx/io/test/test_spectoh5.py b/silx/io/test/test_spectoh5.py index f42f8a8..421c48b 100644 --- a/silx/io/test/test_spectoh5.py +++ b/silx/io/test/test_spectoh5.py @@ -1,6 +1,6 @@ # coding: utf-8 # /*########################################################################## -# Copyright (C) 2016 European Synchrotron Radiation Facility +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -36,12 +36,12 @@ except ImportError: h5py_missing = True else: h5py_missing = False - from ..spech5 import SpecH5 - from ..spectoh5 import convert, write_spec_to_h5 + from ..spech5 import SpecH5, SpecH5Group + from ..convert import convert, write_to_h5 __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "05/10/2016" +__date__ = "02/06/2017" sftext = """#F /tmp/sf.dat @@ -112,7 +112,7 @@ class TestConvertSpecHDF5(unittest.TestCase): convert(self.spec_fname, self.h5_fname) self.sfh5 = SpecH5(self.spec_fname) - self.h5f = h5py.File(self.h5_fname) + self.h5f = h5py.File(self.h5_fname, "a") def tearDown(self): del self.sfh5 @@ -122,13 +122,22 @@ class TestConvertSpecHDF5(unittest.TestCase): gc.collect() def testAppendToHDF5(self): - write_spec_to_h5(self.sfh5, self.h5f, - h5path="/foo/bar/spam") + write_to_h5(self.sfh5, self.h5f, h5path="/foo/bar/spam") self.assertTrue( array_equal(self.h5f["/1.2/measurement/mca_1/data"], self.h5f["/foo/bar/spam/1.2/measurement/mca_1/data"]) ) + def testWriteSpecH5Group(self): + """Test passing a SpecH5Group as parameter, instead of a Spec filename + or a SpecH5.""" + g = self.sfh5["1.1/instrument"] + self.assertIsInstance(g, SpecH5Group) # let's be paranoid + write_to_h5(g, self.h5f, h5path="my instruments") + + self.assertAlmostEqual(self.h5f["my instruments/positioners/Sslit1 HOff"][tuple()], + 16.197579, places=4) + def testTitle(self): """Test the value of a dataset""" title12 = self.h5f["/1.2/title"].value diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py index 15d0005..09074bb 100644 --- a/silx/io/test/test_utils.py +++ b/silx/io/test/test_utils.py @@ -23,23 +23,22 @@ # ############################################################################*/ """Tests for utils module""" - +import io import numpy import os import re import shutil import tempfile import unittest +import sys from .. import utils try: import h5py -except ImportError: - h5py_missing = True -else: - h5py_missing = False from ..utils import h5ls +except ImportError: + h5py = None try: import fabio @@ -49,7 +48,7 @@ except ImportError: __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "11/01/2017" +__date__ = "28/09/2017" expected_spec1 = r"""#F .* @@ -208,7 +207,7 @@ def assert_match_any_string_in_list(test, pattern, list_of_strings): return False -@unittest.skipIf(h5py_missing, "Could not import h5py") +@unittest.skipIf(h5py is None, "Could not import h5py") class TestH5Ls(unittest.TestCase): """Test displaying the following HDF5 file structure: @@ -291,125 +290,131 @@ class TestH5Ls(unittest.TestCase): class TestOpen(unittest.TestCase): """Test `silx.io.utils.open` function.""" + @classmethod + def setUpClass(cls): + cls.tmp_directory = tempfile.mkdtemp() + cls.createResources(cls.tmp_directory) + + @classmethod + def createResources(cls, directory): + + if h5py is not None: + cls.h5_filename = os.path.join(directory, "test.h5") + h5 = h5py.File(cls.h5_filename, mode="w") + h5["group/group/dataset"] = 50 + h5.close() + + cls.spec_filename = os.path.join(directory, "test.dat") + utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y", + fmt=["%d", "%.2f"], close_file=True, scan_number=1) + + if fabio is not None: + cls.edf_filename = os.path.join(directory, "test.edf") + header = fabio.fabioimage.OrderedDict() + header["integer"] = "10" + data = numpy.array([[10, 50], [50, 10]]) + fabiofile = fabio.edfimage.EdfImage(data, header) + fabiofile.write(cls.edf_filename) + + cls.txt_filename = os.path.join(directory, "test.txt") + f = io.open(cls.txt_filename, "w+t") + f.write(u"Kikoo") + f.close() + + cls.missing_filename = os.path.join(directory, "test.missing") + + @classmethod + def tearDownClass(cls): + if sys.platform == "win32" and fabio is not None: + # gc collect is needed to close a file descriptor + # opened by fabio and not released. + # https://github.com/silx-kit/fabio/issues/167 + import gc + gc.collect() + shutil.rmtree(cls.tmp_directory) + def testH5(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") - # create a file - tmp = tempfile.NamedTemporaryFile(suffix=".h5", delete=True) - tmp.file.close() - h5 = h5py.File(tmp.name, "w") - g = h5.create_group("arrays") - g.create_dataset("scalar", data=10) - h5.close() - - # load it - f = utils.open(tmp.name) + f = utils.open(self.h5_filename) self.assertIsNotNone(f) self.assertIsInstance(f, h5py.File) f.close() def testH5With(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") - # create a file - tmp = tempfile.NamedTemporaryFile(suffix=".h5", delete=True) - tmp.file.close() - h5 = h5py.File(tmp.name, "w") - g = h5.create_group("arrays") - g.create_dataset("scalar", data=10) - h5.close() - - # load it - with utils.open(tmp.name) as f: + with utils.open(self.h5_filename) as f: self.assertIsNotNone(f) self.assertIsInstance(f, h5py.File) - def testSpec(self): - # create a file - tmp = tempfile.NamedTemporaryFile(mode="w+t", suffix=".dat", delete=True) - tmp.file.close() - utils.savespec(tmp.name, [1], [1.1], xlabel="x", ylabel="y", - fmt=["%d", "%.2f"], close_file=True, scan_number=1) + def testH5_withPath(self): + if h5py is None: + self.skipTest("H5py is missing") - # load it - f = utils.open(tmp.name) + f = utils.open(self.h5_filename + "::/group/group/dataset") self.assertIsNotNone(f) - self.assertEquals(f.h5py_class, h5py.File) + self.assertEqual(f.h5py_class, h5py.Dataset) + self.assertEqual(f[()], 50) f.close() - def testSpecWith(self): - # create a file - tmp = tempfile.NamedTemporaryFile(mode="w+t", suffix=".dat", delete=True) - tmp.file.close() - utils.savespec(tmp.name, [1], [1.1], xlabel="x", ylabel="y", - fmt=["%d", "%.2f"], close_file=True, scan_number=1) + def testH5With_withPath(self): + if h5py is None: + self.skipTest("H5py is missing") - # load it - with utils.open(tmp.name) as f: + with utils.open(self.h5_filename + "::/group/group") as f: self.assertIsNotNone(f) + self.assertEqual(f.h5py_class, h5py.Group) + self.assertIn("dataset", f) + + def testSpec(self): + f = utils.open(self.spec_filename) + self.assertIsNotNone(f) + if h5py is not None: self.assertEquals(f.h5py_class, h5py.File) + f.close() + + def testSpecWith(self): + with utils.open(self.spec_filename) as f: + self.assertIsNotNone(f) + if h5py is not None: + self.assertEquals(f.h5py_class, h5py.File) def testEdf(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") if fabio is None: self.skipTest("Fabio is missing") - # create a file - tmp = tempfile.NamedTemporaryFile(suffix=".edf", delete=True) - tmp.file.close() - header = fabio.fabioimage.OrderedDict() - header["integer"] = "10" - data = numpy.array([[10, 50], [50, 10]]) - fabiofile = fabio.edfimage.EdfImage(data, header) - fabiofile.write(tmp.name) - - # load it - f = utils.open(tmp.name) + f = utils.open(self.edf_filename) self.assertIsNotNone(f) self.assertEquals(f.h5py_class, h5py.File) f.close() def testEdfWith(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") if fabio is None: self.skipTest("Fabio is missing") - # create a file - tmp = tempfile.NamedTemporaryFile(suffix=".edf", delete=True) - tmp.file.close() - header = fabio.fabioimage.OrderedDict() - header["integer"] = "10" - data = numpy.array([[10, 50], [50, 10]]) - fabiofile = fabio.edfimage.EdfImage(data, header) - fabiofile.write(tmp.name) - - # load it - with utils.open(tmp.name) as f: + with utils.open(self.edf_filename) as f: self.assertIsNotNone(f) self.assertEquals(f.h5py_class, h5py.File) def testUnsupported(self): - # create a file - tmp = tempfile.NamedTemporaryFile(mode="w+t", suffix=".txt", delete=True) - tmp.write("Kikoo") - tmp.close() - - # load it - self.assertRaises(IOError, utils.open, tmp.name) + self.assertRaises(IOError, utils.open, self.txt_filename) def testNotExists(self): # load it - self.assertRaises(IOError, utils.open, "#$.") + self.assertRaises(IOError, utils.open, self.missing_filename) class TestNodes(unittest.TestCase): """Test `silx.io.utils.is_` functions.""" def test_real_h5py_objects(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") name = tempfile.mktemp(suffix=".h5") @@ -433,7 +438,7 @@ class TestNodes(unittest.TestCase): os.unlink(name) def test_h5py_like_file(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") class Foo(object): @@ -445,7 +450,7 @@ class TestNodes(unittest.TestCase): self.assertFalse(utils.is_dataset(obj)) def test_h5py_like_group(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") class Foo(object): @@ -457,7 +462,7 @@ class TestNodes(unittest.TestCase): self.assertFalse(utils.is_dataset(obj)) def test_h5py_like_dataset(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") class Foo(object): @@ -469,7 +474,7 @@ class TestNodes(unittest.TestCase): self.assertTrue(utils.is_dataset(obj)) def test_bad(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") class Foo(object): @@ -481,7 +486,7 @@ class TestNodes(unittest.TestCase): self.assertFalse(utils.is_dataset(obj)) def test_bad_api(self): - if h5py_missing: + if h5py is None: self.skipTest("H5py is missing") class Foo(object): @@ -494,15 +499,12 @@ class TestNodes(unittest.TestCase): def suite(): + loadTests = unittest.defaultTestLoader.loadTestsFromTestCase test_suite = unittest.TestSuite() - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestSave)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestH5Ls)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestOpen)) - test_suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestNodes)) + test_suite.addTest(loadTests(TestSave)) + test_suite.addTest(loadTests(TestH5Ls)) + test_suite.addTest(loadTests(TestOpen)) + test_suite.addTest(loadTests(TestNodes)) return test_suite |