diff options
Diffstat (limited to 'silx/io/test')
-rw-r--r-- | silx/io/test/__init__.py | 4 | ||||
-rw-r--r-- | silx/io/test/test_dictdump.py | 25 | ||||
-rw-r--r-- | silx/io/test/test_fabioh5.py | 181 | ||||
-rw-r--r-- | silx/io/test/test_nxdata.py | 271 | ||||
-rw-r--r-- | silx/io/test/test_specfile.py | 32 | ||||
-rw-r--r-- | silx/io/test/test_specfilewrapper.py | 17 | ||||
-rw-r--r-- | silx/io/test/test_spech5.py | 72 | ||||
-rw-r--r-- | silx/io/test/test_spectoh5.py | 23 | ||||
-rw-r--r-- | silx/io/test/test_url.py | 209 | ||||
-rw-r--r-- | silx/io/test/test_utils.py | 221 |
10 files changed, 900 insertions, 155 deletions
diff --git a/silx/io/test/__init__.py b/silx/io/test/__init__.py index 2510ae2..a309ee9 100644 --- a/silx/io/test/__init__.py +++ b/silx/io/test/__init__.py @@ -24,7 +24,7 @@ __authors__ = ["T. Vincent", "P. Knobel"] __license__ = "MIT" -__date__ = "20/09/2017" +__date__ = "08/12/2017" import unittest @@ -39,6 +39,7 @@ from .test_utils import suite as test_utils_suite from .test_nxdata import suite as test_nxdata_suite from .test_commonh5 import suite as test_commonh5_suite from .test_rawh5 import suite as test_rawh5_suite +from .test_url import suite as test_url_suite def suite(): @@ -54,4 +55,5 @@ def suite(): test_suite.addTest(test_nxdata_suite()) test_suite.addTest(test_commonh5_suite()) test_suite.addTest(test_rawh5_suite()) + test_suite.addTest(test_url_suite()) return test_suite diff --git a/silx/io/test/test_dictdump.py b/silx/io/test/test_dictdump.py index 15d5fdc..9cd054c 100644 --- a/silx/io/test/test_dictdump.py +++ b/silx/io/test/test_dictdump.py @@ -25,7 +25,7 @@ __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "10/02/2017" +__date__ = "17/01/2018" from collections import OrderedDict import numpy @@ -41,9 +41,12 @@ except ImportError: from collections import defaultdict +from silx.utils.testutils import TestLogging + from ..configdict import ConfigDict -from ..dictdump import dicttoh5, dicttojson, dicttoini, dump +from ..dictdump import dicttoh5, dicttojson, dump from ..dictdump import h5todict, load +from ..dictdump import logger as dictdump_logger def tree(): @@ -93,6 +96,24 @@ class TestDictToH5(unittest.TestCase): min(ddict["city attributes"]["Europe"]["France"]["Grenoble"]["coordinates"]), 5.7196) + def testH5Overwrite(self): + dd = ConfigDict({'t': True}) + + dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a') + dd = ConfigDict({'t': False}) + with TestLogging(dictdump_logger, warning=1): + dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a', + overwrite_data=False) + + res = h5todict(self.h5_fname) + assert(res['t'] == True) + + dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a', + overwrite_data=True) + + res = h5todict(self.h5_fname) + assert(res['t'] == False) + @unittest.skipIf(h5py_missing, "Could not import h5py") class TestH5ToDict(unittest.TestCase): diff --git a/silx/io/test/test_fabioh5.py b/silx/io/test/test_fabioh5.py index d9459ae..5fbf0d0 100644 --- a/silx/io/test/test_fabioh5.py +++ b/silx/io/test/test_fabioh5.py @@ -1,6 +1,6 @@ # coding: utf-8 # /*########################################################################## -# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# Copyright (C) 2016-2018 European Synchrotron Radiation Facility # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -25,10 +25,9 @@ __authors__ = ["V. Valls"] __license__ = "MIT" -__date__ = "04/10/2017" +__date__ = "24/02/2018" import os -import sys import logging import numpy import unittest @@ -289,6 +288,28 @@ class TestFabioH5(unittest.TestCase): self.assertIn(d.dtype.char, ['d', 'f']) numpy.testing.assert_array_almost_equal(d[...], expected) + def test_interpretation_mca_edf(self): + """EDF files with two or more headers starting with "MCA" + must have @interpretation = "spectrum" an the data.""" + header = { + "Title": "zapimage samy -4.975 -5.095 80 500 samz -4.091 -4.171 70 0", + "MCA a": -23.812, + "MCA b": 2.7107, + "MCA c": 8.1164e-06} + + data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8) + fabio_image = fabio.edfimage.EdfImage(data=data, header=header) + h5_image = fabioh5.File(fabio_image=fabio_image) + + data_dataset = h5_image["/scan_0/measurement/image_0/data"] + self.assertEquals(data_dataset.attrs["interpretation"], "spectrum") + + data_dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEquals(data_dataset.attrs["interpretation"], "spectrum") + + data_dataset = h5_image["/scan_0/measurement/image_0/info/data"] + self.assertEquals(data_dataset.attrs["interpretation"], "spectrum") + def test_get_api(self): result = self.h5_image.get("scan_0", getclass=True, getlink=True) self.assertIs(result, h5py.HardLink) @@ -356,6 +377,95 @@ class TestFabioH5(unittest.TestCase): self.assertIsInstance(data[...], numpy.ndarray) +class TestFabioH5MultiFrames(unittest.TestCase): + + @classmethod + def setUpClass(cls): + if fabio is None: + raise unittest.SkipTest("fabio is needed") + if h5py is None: + raise unittest.SkipTest("h5py is needed") + + names = ["A", "B", "C", "D"] + values = [["32000", "-10", "5.0", "1"], + ["-32000", "-10", "5.0", "1"]] + + fabio_file = None + + for i in range(10): + header = { + "image_id": "%d" % i, + "integer": "-100", + "float": "1.0", + "string": "hi!", + "list_integer": "100 50 0", + "list_float": "1.0 2.0 3.5", + "string_looks_like_list": "2000 hi!", + "motor_mne": " ".join(names), + "motor_pos": " ".join(values[i % len(values)]), + "counter_mne": " ".join(names), + "counter_pos": " ".join(values[i % len(values)]) + } + for iname, name in enumerate(names): + header[name] = values[i % len(values)][iname] + + data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64) + if fabio_file is None: + fabio_file = fabio.edfimage.EdfImage(data=data, header=header) + else: + fabio_file.appendFrame(data=data, header=header) + + cls.fabio_file = fabio_file + cls.fabioh5 = fabioh5.File(fabio_image=fabio_file) + + def test_others(self): + others = self.fabioh5["/scan_0/instrument/detector_0/others"] + dataset = others["A"] + self.assertGreaterEqual(dataset.dtype.itemsize, 1) + self.assertEqual(dataset.dtype.kind, "i") + dataset = others["B"] + self.assertGreaterEqual(dataset.dtype.itemsize, 1) + self.assertEqual(dataset.dtype.kind, "i") + dataset = others["C"] + self.assertGreaterEqual(dataset.dtype.itemsize, 1) + self.assertEqual(dataset.dtype.kind, "f") + dataset = others["D"] + self.assertGreaterEqual(dataset.dtype.itemsize, 1) + self.assertEqual(dataset.dtype.kind, "u") + + def test_positioners(self): + counters = self.fabioh5["/scan_0/instrument/positioners"] + # At least 32 bits, no unsigned values + dataset = counters["A"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + dataset = counters["B"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + dataset = counters["C"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "f") + dataset = counters["D"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + + def test_counters(self): + counters = self.fabioh5["/scan_0/measurement"] + # At least 32 bits, no unsigned values + dataset = counters["A"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + dataset = counters["B"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + dataset = counters["C"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "f") + dataset = counters["D"] + self.assertGreaterEqual(dataset.dtype.itemsize, 4) + self.assertEqual(dataset.dtype.kind, "i") + + class TestFabioH5WithEdf(unittest.TestCase): @classmethod @@ -388,12 +498,6 @@ class TestFabioH5WithEdf(unittest.TestCase): def tearDownClass(cls): cls.fabio_image = None cls.h5_image = None - if sys.platform == "win32" and fabio is not None: - # gc collect is needed to close a file descriptor - # opened by fabio and not released. - # https://github.com/silx-kit/fabio/issues/167 - import gc - gc.collect() shutil.rmtree(cls.tmp_directory) def test_reserved_format_metadata(self): @@ -406,11 +510,70 @@ class TestFabioH5WithEdf(unittest.TestCase): self.assertNotIn("/scan_0/instrument/detector_0/others/HeaderID", self.h5_image) +class TestFabioH5WithFileSeries(unittest.TestCase): + + @classmethod + def setUpClass(cls): + if fabio is None: + raise unittest.SkipTest("fabio is needed") + if h5py is None: + raise unittest.SkipTest("h5py is needed") + + cls.tmp_directory = tempfile.mkdtemp() + + cls.edf_filenames = [] + + for i in range(10): + filename = os.path.join(cls.tmp_directory, "test_%04d.edf" % i) + cls.edf_filenames.append(filename) + + header = { + "image_id": "%d" % i, + "integer": "-100", + "float": "1.0", + "string": "hi!", + "list_integer": "100 50 0", + "list_float": "1.0 2.0 3.5", + "string_looks_like_list": "2000 hi!", + } + data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64) + fabio_image = fabio.edfimage.edfimage(data, header) + fabio_image.write(filename) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmp_directory) + + def _testH5Image(self, h5_image): + # test data + dataset = h5_image["/scan_0/instrument/detector_0/data"] + self.assertEquals(dataset.h5py_class, h5py.Dataset) + self.assertTrue(isinstance(dataset[()], numpy.ndarray)) + self.assertEquals(dataset.dtype.kind, "i") + self.assertEquals(dataset.shape, (10, 3, 2)) + self.assertEquals(list(dataset[:, 0, 0]), list(range(10))) + self.assertEquals(dataset.attrs["interpretation"], "image") + # test metatdata + dataset = h5_image["/scan_0/instrument/detector_0/others/image_id"] + self.assertEquals(list(dataset[...]), list(range(10))) + + def testFileList(self): + h5_image = fabioh5.File(file_series=self.edf_filenames) + self._testH5Image(h5_image) + + def testFileSeries(self): + file_series = fabioh5._FileSeries(self.edf_filenames) + h5_image = fabioh5.File(file_series=file_series) + self._testH5Image(h5_image) + + def suite(): loadTests = unittest.defaultTestLoader.loadTestsFromTestCase test_suite = unittest.TestSuite() test_suite.addTest(loadTests(TestFabioH5)) + test_suite.addTest(loadTests(TestFabioH5MultiFrames)) test_suite.addTest(loadTests(TestFabioH5WithEdf)) + test_suite.addTest(loadTests(TestFabioH5WithFileSeries)) return test_suite diff --git a/silx/io/test/test_nxdata.py b/silx/io/test/test_nxdata.py index f891db9..f7ea8a4 100644 --- a/silx/io/test/test_nxdata.py +++ b/silx/io/test/test_nxdata.py @@ -1,6 +1,6 @@ # coding: utf-8 # /*########################################################################## -# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# Copyright (C) 2016-2018 European Synchrotron Radiation Facility # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -25,7 +25,7 @@ __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "27/09/2016" +__date__ = "27/01/2018" try: import h5py @@ -36,6 +36,10 @@ import tempfile import unittest from .. import nxdata +from silx.third_party import six + +text_dtype = h5py.special_dtype(vlen=six.text_type) + @unittest.skipIf(h5py is None, "silx.io.nxdata tests depend on h5py") class TestNXdata(unittest.TestCase): @@ -71,9 +75,16 @@ class TestNXdata(unittest.TestCase): g1d0 = g1d.create_group("1D_spectrum") g1d0.attrs["NX_class"] = "NXdata" g1d0.attrs["signal"] = "count" + g1d0.attrs["auxiliary_signals"] = numpy.array(["count2", "count3"], + dtype=text_dtype) g1d0.attrs["axes"] = "energy_calib" - g1d0.attrs["uncertainties"] = b"energy_errors", + g1d0.attrs["uncertainties"] = numpy.array(["energy_errors", ], + dtype=text_dtype) g1d0.create_dataset("count", data=numpy.arange(10)) + g1d0.create_dataset("count2", data=0.5 * numpy.arange(10)) + d = g1d0.create_dataset("count3", data=0.4 * numpy.arange(10)) + d.attrs["long_name"] = "3rd counter" + g1d0.create_dataset("title", data="Title as dataset (like nexpy)") g1d0.create_dataset("energy_calib", data=(10, 5)) # 10 * idx + 5 g1d0.create_dataset("energy_errors", data=3.14 * numpy.random.rand(10)) @@ -86,7 +97,7 @@ class TestNXdata(unittest.TestCase): g1d2 = g1d.create_group("4D_spectra") g1d2.attrs["NX_class"] = "NXdata" g1d2.attrs["signal"] = "counts" - g1d2.attrs["axes"] = b"energy", + g1d2.attrs["axes"] = numpy.array(["energy", ], dtype=text_dtype) ds = g1d2.create_dataset("counts", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10))) ds.attrs["interpretation"] = "spectrum" ds = g1d2.create_dataset("errors", data=4.5 * numpy.random.rand(2, 2, 3, 10)) @@ -103,8 +114,11 @@ class TestNXdata(unittest.TestCase): g2d0 = g2d.create_group("2D_regular_image") g2d0.attrs["NX_class"] = "NXdata" g2d0.attrs["signal"] = "image" - g2d0.attrs["axes"] = b"rows_calib", b"columns_coordinates" + g2d0.attrs["auxiliary_signals"] = "image2" + g2d0.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"], + dtype=text_dtype) g2d0.create_dataset("image", data=numpy.arange(4 * 6).reshape((4, 6))) + g2d0.create_dataset("image2", data=numpy.arange(4 * 6).reshape((4, 6))) ds = g2d0.create_dataset("rows_calib", data=(10, 5)) ds.attrs["long_name"] = "Calibrated Y" g2d0.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6)) @@ -112,7 +126,9 @@ class TestNXdata(unittest.TestCase): g2d1 = g2d.create_group("2D_irregular_data") g2d1.attrs["NX_class"] = "NXdata" g2d1.attrs["signal"] = "data" - g2d1.attrs["axes"] = b"rows_coordinates", b"columns_coordinates" + g2d1.attrs["title"] = "Title as group attr" + g2d1.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"], + dtype=text_dtype) g2d1.create_dataset("data", data=numpy.arange(64 * 128).reshape((64, 128))) g2d1.create_dataset("rows_coordinates", data=numpy.arange(64) + numpy.random.rand(64)) g2d1.create_dataset("columns_coordinates", data=numpy.arange(128) + 2.5 * numpy.random.rand(128)) @@ -126,19 +142,33 @@ class TestNXdata(unittest.TestCase): g2d3 = g2d.create_group("5D_images") g2d3.attrs["NX_class"] = "NXdata" g2d3.attrs["signal"] = "images" - g2d3.attrs["axes"] = b"rows_coordinates", b"columns_coordinates" + g2d3.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"], + dtype=text_dtype) ds = g2d3.create_dataset("images", data=numpy.arange(2 * 2 * 2 * 4 * 6).reshape((2, 2, 2, 4, 6))) ds.attrs["interpretation"] = "image" g2d3.create_dataset("rows_coordinates", data=5 + 10 * numpy.arange(4)) g2d3.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6)) + g2d4 = g2d.create_group("RGBA_image") + g2d4.attrs["NX_class"] = "NXdata" + g2d4.attrs["signal"] = "image" + g2d4.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"], + dtype=text_dtype) + rgba_image = numpy.linspace(0, 1, num=7*8*3).reshape((7, 8, 3)) + rgba_image[:, :, 1] = 1 - rgba_image[:, :, 1] # invert G channel to add some color + ds = g2d4.create_dataset("image", data=rgba_image) + ds.attrs["interpretation"] = "rgba-image" + ds = g2d4.create_dataset("rows_calib", data=(10, 5)) + ds.attrs["long_name"] = "Calibrated Y" + g2d4.create_dataset("columns_coordinates", data=0.5+0.02*numpy.arange(8)) + # SCATTER g = self.h5f.create_group("scatters") gd0 = g.create_group("x_y_scatter") gd0.attrs["NX_class"] = "NXdata" gd0.attrs["signal"] = "y" - gd0.attrs["axes"] = b"x", + gd0.attrs["axes"] = numpy.array(["x", ], dtype=text_dtype) gd0.create_dataset("y", data=numpy.random.rand(128) - 0.5) gd0.create_dataset("x", data=2 * numpy.random.rand(128)) gd0.create_dataset("x_errors", data=0.05 * numpy.random.rand(128)) @@ -147,7 +177,7 @@ class TestNXdata(unittest.TestCase): gd1 = g.create_group("x_y_value_scatter") gd1.attrs["NX_class"] = "NXdata" gd1.attrs["signal"] = "values" - gd1.attrs["axes"] = b"x", b"y" + gd1.attrs["axes"] = numpy.array(["x", "y"], dtype=text_dtype) gd1.create_dataset("values", data=3.14 * numpy.random.rand(128)) gd1.create_dataset("y", data=numpy.random.rand(128)) gd1.create_dataset("y_errors", data=0.02 * numpy.random.rand(128)) @@ -199,6 +229,7 @@ class TestNXdata(unittest.TestCase): def testSpectra(self): nxd = nxdata.NXdata(self.h5f["spectra/1D_spectrum"]) self.assertTrue(nxd.signal_is_1d) + self.assertTrue(nxd.is_curve) self.assertTrue(numpy.array_equal(numpy.array(nxd.signal), numpy.arange(10))) self.assertEqual(nxd.axes_names, ["energy_calib"]) @@ -208,9 +239,18 @@ class TestNXdata(unittest.TestCase): self.assertIsNone(nxd.errors) self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) self.assertIsNone(nxd.interpretation) + self.assertEqual(nxd.title, "Title as dataset (like nexpy)") + + self.assertEqual(nxd.auxiliary_signals_dataset_names, + ["count2", "count3"]) + self.assertEqual(nxd.auxiliary_signals_names, + ["count2", "3rd counter"]) + self.assertAlmostEqual(nxd.auxiliary_signals[1][2], + 0.8) # numpy.arange(10) * 0.4 nxd = nxdata.NXdata(self.h5f["spectra/2D_spectra"]) self.assertTrue(nxd.signal_is_2d) + self.assertTrue(nxd.is_curve) self.assertEqual(nxd.axes_names, [None, None]) self.assertEqual(nxd.axes_dataset_names, [None, None]) self.assertEqual(nxd.axes, [None, None]) @@ -221,6 +261,7 @@ class TestNXdata(unittest.TestCase): nxd = nxdata.NXdata(self.h5f["spectra/4D_spectra"]) self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or nxd.signal_is_2d or nxd.signal_is_3d) + self.assertTrue(nxd.is_curve) self.assertEqual(nxd.axes_names, [None, None, None, "Calibrated energy"]) self.assertEqual(nxd.axes_dataset_names, @@ -242,15 +283,19 @@ class TestNXdata(unittest.TestCase): def testImages(self): nxd = nxdata.NXdata(self.h5f["images/2D_regular_image"]) self.assertTrue(nxd.signal_is_2d) + self.assertTrue(nxd.is_image) self.assertEqual(nxd.axes_names, ["Calibrated Y", "columns_coordinates"]) self.assertEqual(list(nxd.axes_dataset_names), ["rows_calib", "columns_coordinates"]) self.assertIsNone(nxd.errors) self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) self.assertIsNone(nxd.interpretation) + self.assertEqual(len(nxd.auxiliary_signals), 1) + self.assertEqual(nxd.auxiliary_signals_names, ["image2"]) nxd = nxdata.NXdata(self.h5f["images/2D_irregular_data"]) self.assertTrue(nxd.signal_is_2d) + self.assertTrue(nxd.is_image) self.assertEqual(nxd.axes_dataset_names, nxd.axes_names) self.assertEqual(list(nxd.axes_dataset_names), @@ -259,8 +304,10 @@ class TestNXdata(unittest.TestCase): self.assertIsNone(nxd.errors) self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) self.assertIsNone(nxd.interpretation) + self.assertEqual(nxd.title, "Title as group attr") nxd = nxdata.NXdata(self.h5f["images/5D_images"]) + self.assertTrue(nxd.is_image) self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or nxd.signal_is_2d or nxd.signal_is_3d) self.assertEqual(nxd.axes_names, @@ -271,6 +318,16 @@ class TestNXdata(unittest.TestCase): self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter) self.assertEqual(nxd.interpretation, "image") + nxd = nxdata.NXdata(self.h5f["images/RGBA_image"]) + self.assertTrue(nxd.is_image) + self.assertEqual(nxd.interpretation, "rgba-image") + self.assertTrue(nxd.signal_is_3d) + self.assertEqual(nxd.axes_names, ["Calibrated Y", + "columns_coordinates", + None]) + self.assertEqual(list(nxd.axes_dataset_names), + ["rows_calib", "columns_coordinates", None]) + def testScatters(self): nxd = nxdata.NXdata(self.h5f["scatters/x_y_scatter"]) self.assertTrue(nxd.signal_is_1d) @@ -301,10 +358,206 @@ class TestNXdata(unittest.TestCase): self.assertIsNone(nxd.interpretation) +@unittest.skipIf(h5py is None, "silx.io.nxdata tests depend on h5py") +class TestLegacyNXdata(unittest.TestCase): + def setUp(self): + tmp = tempfile.NamedTemporaryFile(prefix="nxdata_legacy_examples_", + suffix=".h5", delete=True) + tmp.file.close() + self.h5fname = tmp.name + self.h5f = h5py.File(tmp.name, "w") + + def tearDown(self): + self.h5f.close() + + def testSignalAttrOnDataset(self): + g = self.h5f.create_group("2D") + g.attrs["NX_class"] = "NXdata" + + ds0 = g.create_dataset("image0", + data=numpy.arange(4 * 6).reshape((4, 6))) + ds0.attrs["signal"] = 1 + ds0.attrs["long_name"] = "My first image" + + ds1 = g.create_dataset("image1", + data=numpy.arange(4 * 6).reshape((4, 6))) + ds1.attrs["signal"] = "2" + ds1.attrs["long_name"] = "My 2nd image" + + ds2 = g.create_dataset("image2", + data=numpy.arange(4 * 6).reshape((4, 6))) + ds2.attrs["signal"] = 3 + + nxd = nxdata.NXdata(self.h5f["2D"]) + + self.assertEqual(nxd.signal_dataset_name, "image0") + self.assertEqual(nxd.signal_name, "My first image") + self.assertEqual(nxd.signal.shape, + (4, 6)) + + self.assertEqual(len(nxd.auxiliary_signals), 2) + self.assertEqual(nxd.auxiliary_signals[1].shape, + (4, 6)) + + self.assertEqual(nxd.auxiliary_signals_dataset_names, + ["image1", "image2"]) + self.assertEqual(nxd.auxiliary_signals_names, + ["My 2nd image", "image2"]) + + def testAxesOnSignalDataset(self): + g = self.h5f.create_group("2D") + g.attrs["NX_class"] = "NXdata" + + ds0 = g.create_dataset("image0", + data=numpy.arange(4 * 6).reshape((4, 6))) + ds0.attrs["signal"] = 1 + ds0.attrs["axes"] = "yaxis:xaxis" + + ds1 = g.create_dataset("yaxis", + data=numpy.arange(4)) + ds2 = g.create_dataset("xaxis", + data=numpy.arange(6)) + + nxd = nxdata.NXdata(self.h5f["2D"]) + + self.assertEqual(nxd.axes_dataset_names, + ["yaxis", "xaxis"]) + self.assertTrue(numpy.array_equal(nxd.axes[0], + numpy.arange(4))) + self.assertTrue(numpy.array_equal(nxd.axes[1], + numpy.arange(6))) + + def testAxesOnAxesDatasets(self): + g = self.h5f.create_group("2D") + g.attrs["NX_class"] = "NXdata" + + ds0 = g.create_dataset("image0", + data=numpy.arange(4 * 6).reshape((4, 6))) + ds0.attrs["signal"] = 1 + ds1 = g.create_dataset("yaxis", + data=numpy.arange(4)) + ds1.attrs["axis"] = 0 + ds2 = g.create_dataset("xaxis", + data=numpy.arange(6)) + ds2.attrs["axis"] = "1" + + nxd = nxdata.NXdata(self.h5f["2D"]) + self.assertEqual(nxd.axes_dataset_names, + ["yaxis", "xaxis"]) + self.assertTrue(numpy.array_equal(nxd.axes[0], + numpy.arange(4))) + self.assertTrue(numpy.array_equal(nxd.axes[1], + numpy.arange(6))) + + +class TestSaveNXdata(unittest.TestCase): + def setUp(self): + tmp = tempfile.NamedTemporaryFile(prefix="nxdata", + suffix=".h5", delete=True) + tmp.file.close() + self.h5fname = tmp.name + + def testSimpleSave(self): + sig = numpy.array([0, 1, 2]) + a0 = numpy.array([2, 3, 4]) + a1 = numpy.array([3, 4, 5]) + nxdata.save_NXdata(filename=self.h5fname, + signal=sig, + axes=[a0, a1], + signal_name="sig", + axes_names=["a0", "a1"], + nxentry_name="a", + nxdata_name="mydata") + + h5f = h5py.File(self.h5fname, "r") + self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"])) + + nxd = nxdata.NXdata(h5f["/a/mydata"]) + self.assertTrue(numpy.array_equal(nxd.signal, + sig)) + self.assertTrue(numpy.array_equal(nxd.axes[0], + a0)) + + h5f.close() + + def testSimplestSave(self): + sig = numpy.array([0, 1, 2]) + nxdata.save_NXdata(filename=self.h5fname, + signal=sig) + + h5f = h5py.File(self.h5fname, "r") + + self.assertTrue(nxdata.is_valid_nxdata(h5f["/entry/data0"])) + + nxd = nxdata.NXdata(h5f["/entry/data0"]) + self.assertTrue(numpy.array_equal(nxd.signal, + sig)) + h5f.close() + + def testSaveDefaultAxesNames(self): + sig = numpy.array([0, 1, 2]) + a0 = numpy.array([2, 3, 4]) + a1 = numpy.array([3, 4, 5]) + nxdata.save_NXdata(filename=self.h5fname, + signal=sig, + axes=[a0, a1], + signal_name="sig", + axes_names=None, + axes_long_names=["a", "b"], + nxentry_name="a", + nxdata_name="mydata") + + h5f = h5py.File(self.h5fname, "r") + self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"])) + + nxd = nxdata.NXdata(h5f["/a/mydata"]) + self.assertTrue(numpy.array_equal(nxd.signal, + sig)) + self.assertTrue(numpy.array_equal(nxd.axes[0], + a0)) + self.assertEqual(nxd.axes_dataset_names, + [u"dim0", u"dim1"]) + self.assertEqual(nxd.axes_names, + [u"a", u"b"]) + + h5f.close() + + def testSaveToExistingEntry(self): + h5f = h5py.File(self.h5fname, "w") + g = h5f.create_group("myentry") + g.attrs["NX_class"] = "NXentry" + h5f.close() + + sig = numpy.array([0, 1, 2]) + a0 = numpy.array([2, 3, 4]) + a1 = numpy.array([3, 4, 5]) + nxdata.save_NXdata(filename=self.h5fname, + signal=sig, + axes=[a0, a1], + signal_name="sig", + axes_names=["a0", "a1"], + nxentry_name="myentry", + nxdata_name="toto") + + h5f = h5py.File(self.h5fname, "r") + self.assertTrue(nxdata.is_valid_nxdata(h5f["myentry/toto"])) + + nxd = nxdata.NXdata(h5f["myentry/toto"]) + self.assertTrue(numpy.array_equal(nxd.signal, + sig)) + self.assertTrue(numpy.array_equal(nxd.axes[0], + a0)) + h5f.close() + + def suite(): test_suite = unittest.TestSuite() test_suite.addTest( unittest.defaultTestLoader.loadTestsFromTestCase(TestNXdata)) + test_suite.addTest( + unittest.defaultTestLoader.loadTestsFromTestCase(TestLegacyNXdata)) + test_suite.addTest( + unittest.defaultTestLoader.loadTestsFromTestCase(TestSaveNXdata)) return test_suite diff --git a/silx/io/test/test_specfile.py b/silx/io/test/test_specfile.py index 0aef2e2..9236fee 100644 --- a/silx/io/test/test_specfile.py +++ b/silx/io/test/test_specfile.py @@ -25,9 +25,9 @@ __authors__ = ["P. Knobel", "V.A. Sole"] __license__ = "MIT" -__date__ = "03/08/2017" +__date__ = "17/01/2018" + -import gc import locale import logging import numpy @@ -36,7 +36,7 @@ import sys import tempfile import unittest -from silx.test import utils +from silx.utils import testutils from ..specfile import SpecFile, Scan from .. import specfile @@ -136,14 +136,14 @@ class TestSpecFile(unittest.TestCase): @classmethod def setUpClass(cls): fd, cls.fname1 = tempfile.mkstemp(text=False) - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd, sftext) else: os.write(fd, bytes(sftext, 'ascii')) os.close(fd) fd2, cls.fname2 = tempfile.mkstemp(text=False) - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd2, sftext[370:923]) else: os.write(fd2, bytes(sftext[370:923], 'ascii')) @@ -151,7 +151,7 @@ class TestSpecFile(unittest.TestCase): fd3, cls.fname3 = tempfile.mkstemp(text=False) txt = sftext[371:923] - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd3, txt) else: os.write(fd3, bytes(txt, 'ascii')) @@ -177,16 +177,9 @@ class TestSpecFile(unittest.TestCase): self.scan1_no_fhdr_crash = self.sf_no_fhdr_crash[0] def tearDown(self): - del self.sf - del self.sf_no_fhdr - del self.scan1 - del self.scan1_2 - del self.scan25 - del self.scan1_no_fhdr - del self.sf_no_fhdr_crash - del self.scan1_no_fhdr_crash - del self.empty_scan - gc.collect() + self.sf.close() + self.sf_no_fhdr.close() + self.sf_no_fhdr_crash.close() def test_open(self): self.assertIsInstance(self.sf, SpecFile) @@ -376,7 +369,7 @@ class TestSpecFile(unittest.TestCase): self.assertEqual(self.scan25.mca.channels, []) - @utils.test_logging(specfile._logger.name, warning=1) + @testutils.test_logging(specfile._logger.name, warning=1) def test_empty_scan(self): """Test reading a scan with no data points""" self.assertEqual(len(self.empty_scan.labels), @@ -389,7 +382,7 @@ class TestSFLocale(unittest.TestCase): @classmethod def setUpClass(cls): fd, cls.fname = tempfile.mkstemp(text=False) - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd, sftext) else: os.write(fd, bytes(sftext, 'ascii')) @@ -399,13 +392,12 @@ class TestSFLocale(unittest.TestCase): def tearDownClass(cls): os.unlink(cls.fname) locale.setlocale(locale.LC_NUMERIC, loc) # restore saved locale - gc.collect() def crunch_data(self): self.sf3 = SpecFile(self.fname) self.assertAlmostEqual(self.sf3[0].data_line(1)[2], 1.56) - del self.sf3 + self.sf3.close() @unittest.skipIf(not try_DE, "de_DE.utf8 locale not installed") def test_locale_de_DE(self): diff --git a/silx/io/test/test_specfilewrapper.py b/silx/io/test/test_specfilewrapper.py index c5b0f39..2f463fa 100644 --- a/silx/io/test/test_specfilewrapper.py +++ b/silx/io/test/test_specfilewrapper.py @@ -27,7 +27,6 @@ __authors__ = ["P. Knobel"] __license__ = "MIT" __date__ = "15/05/2017" -import gc import locale import logging import numpy @@ -114,23 +113,15 @@ class TestSpecfilewrapper(unittest.TestCase): @classmethod def setUpClass(cls): fd, cls.fname1 = tempfile.mkstemp(text=False) - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd, sftext) else: os.write(fd, bytes(sftext, 'ascii')) os.close(fd) - fd2, cls.fname2 = tempfile.mkstemp(text=False) - if sys.version < '3.0': - os.write(fd2, sftext[370:-97]) - else: - os.write(fd2, bytes(sftext[370:-97], 'ascii')) - os.close(fd2) - @classmethod def tearDownClass(cls): os.unlink(cls.fname1) - os.unlink(cls.fname2) def setUp(self): self.sf = Specfile(self.fname1) @@ -139,11 +130,7 @@ class TestSpecfilewrapper(unittest.TestCase): self.scan25 = self.sf.select("25.1") def tearDown(self): - del self.sf - del self.scan1 - del self.scan1_2 - del self.scan25 - gc.collect() + self.sf.close() def test_number_of_scans(self): self.assertEqual(3, len(self.sf)) diff --git a/silx/io/test/test_spech5.py b/silx/io/test/test_spech5.py index 1d04c6f..b842243 100644 --- a/silx/io/test/test_spech5.py +++ b/silx/io/test/test_spech5.py @@ -1,6 +1,6 @@ # coding: utf-8 # /*########################################################################## -# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# Copyright (C) 2016-2018 European Synchrotron Radiation Facility # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,6 @@ # # ############################################################################*/ """Tests for spech5""" -import gc from numpy import array_equal import os import io @@ -32,11 +31,10 @@ import unittest import datetime from functools import partial -from silx.test import utils +from silx.utils import testutils from .. import spech5 -from ..spech5 import (SpecH5, SpecH5Group, - SpecH5Dataset, spec_date_to_iso8601) +from ..spech5 import (SpecH5, SpecH5Dataset, spec_date_to_iso8601) from .. import specfile try: @@ -46,7 +44,7 @@ except ImportError: __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "26/07/2017" +__date__ = "12/02/2018" sftext = """#F /tmp/sf.dat #E 1455180875 @@ -216,7 +214,7 @@ class TestSpecH5(unittest.TestCase): @classmethod def setUpClass(cls): fd, cls.fname = tempfile.mkstemp() - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd, sftext) else: os.write(fd, bytes(sftext, 'ascii')) @@ -230,9 +228,7 @@ class TestSpecH5(unittest.TestCase): self.sfh5 = SpecH5(self.fname) def tearDown(self): - # fix Win32 permission error when deleting temp file - del self.sfh5 - gc.collect() + self.sfh5.close() def testContainsFile(self): self.assertIn("/1.2/measurement", self.sfh5) @@ -275,9 +271,9 @@ class TestSpecH5(unittest.TestCase): def testDate(self): # start time is in Iso8601 format self.assertEqual(self.sfh5["/1.1/start_time"], - b"2016-02-11T09:55:20") + u"2016-02-11T09:55:20") self.assertEqual(self.sfh5["25.1/start_time"], - b"2015-03-14T03:53:50") + u"2015-03-14T03:53:50") def testDatasetInstanceAttr(self): """The SpecH5Dataset objects must implement some dummy attributes @@ -299,7 +295,7 @@ class TestSpecH5(unittest.TestCase): -3) self.assertEqual(self.sfh5.get("/1.1/start_time", default=-3), - b"2016-02-11T09:55:20") + u"2016-02-11T09:55:20") def testGetClass(self): """Test :meth:`SpecH5Group.get`""" @@ -355,33 +351,22 @@ class TestSpecH5(unittest.TestCase): def testHeader(self): file_header = self.sfh5["/1.2/instrument/specfile/file_header"] scan_header = self.sfh5["/1.2/instrument/specfile/scan_header"] - # convert ndarray(dtype=numpy.string_) to str - if sys.version < '3.0': - file_header = str(file_header[()]) - scan_header = str(scan_header[()]) - else: - file_header = str(file_header.astype(str)) - scan_header = str(scan_header.astype(str)) # File header has 10 lines - self.assertEqual(len(file_header.split("\n")), 10) + self.assertEqual(len(file_header), 10) # 1.2 has 9 scan & mca header lines - self.assertEqual(len(scan_header.split("\n")), 9) + self.assertEqual(len(scan_header), 9) # line 4 of file header self.assertEqual( - file_header.split("\n")[3], - "#C imaging User = opid17") + file_header[3], + u"#C imaging User = opid17") # line 4 of scan header scan_header = self.sfh5["25.1/instrument/specfile/scan_header"] - if sys.version < '3.0': - scan_header = str(scan_header[()]) - else: - scan_header = str(scan_header[()].astype(str)) self.assertEqual( - scan_header.split("\n")[3], - "#P1 4.74255 6.197579 2.238283") + scan_header[3], + u"#P1 4.74255 6.197579 2.238283") def testLinks(self): self.assertTrue( @@ -493,7 +478,7 @@ class TestSpecH5(unittest.TestCase): def testTitle(self): self.assertEqual(self.sfh5["/25.1/title"], - b"25 ascan c3th 1.33245 1.52245 40 0.15") + u"ascan c3th 1.33245 1.52245 40 0.15") def testValues(self): group = self.sfh5["/25.1"] @@ -582,9 +567,9 @@ class TestSpecH5(unittest.TestCase): # All 0 values self.assertNotIn("sample", self.sfh5["/1001.1"]) with self.assertRaises(KeyError): - uc = self.sfh5["/1001.1/sample/unit_cell"] + self.sfh5["/1001.1/sample/unit_cell"] - @utils.test_logging(spech5.logger1.name, warning=2) + @testutils.test_logging(spech5.logger1.name, warning=2) def testOpenFileDescriptor(self): """Open a SpecH5 file from a file descriptor""" with io.open(self.sfh5.filename) as f: @@ -593,6 +578,7 @@ class TestSpecH5(unittest.TestCase): name_list = [] # check if the object is working self.sfh5.visit(name_list.append) + sfh5.close() sftext_multi_mca_headers = """ @@ -624,7 +610,7 @@ class TestSpecH5MultiMca(unittest.TestCase): @classmethod def setUpClass(cls): fd, cls.fname = tempfile.mkstemp(text=False) - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd, sftext_multi_mca_headers) else: os.write(fd, bytes(sftext_multi_mca_headers, 'ascii')) @@ -638,9 +624,7 @@ class TestSpecH5MultiMca(unittest.TestCase): self.sfh5 = SpecH5(self.fname) def tearDown(self): - # fix Win32 permission error when deleting temp file - del self.sfh5 - gc.collect() + self.sfh5.close() def testMcaCalib(self): mca0_calib = self.sfh5["/1.1/measurement/mca_0/info/calibration"] @@ -691,7 +675,7 @@ sftext_no_cols = r"""#F C:/DATA\test.mca #D Thu Jul 7 08:40:19 2016 #C no data cols, one mca analyser, single spectrum #@MCA %16C -#@CHANN 151 29 29 1 +#@CHANN 151 0 150 1 #@CALIB 0 2 0 @A 789 784 788 814 847 862 880 904 925 955 987 1015 1031 1070 1111 1139 \ 1203 1236 1290 1392 1492 1558 1688 1813 1977 2119 2346 2699 3121 3542 4102 4970 \ @@ -756,7 +740,7 @@ class TestSpecH5NoDataCols(unittest.TestCase): @classmethod def setUpClass(cls): fd, cls.fname = tempfile.mkstemp() - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd, sftext_no_cols) else: os.write(fd, bytes(sftext_no_cols, 'ascii')) @@ -770,9 +754,7 @@ class TestSpecH5NoDataCols(unittest.TestCase): self.sfh5 = SpecH5(self.fname) def tearDown(self): - # fix Win32 permission error when deleting temp file - del self.sfh5 - gc.collect() + self.sfh5.close() def testScan1(self): # 1.1: single analyser, single spectrum, 151 channels @@ -829,7 +811,7 @@ class TestSpecH5SlashInLabels(unittest.TestCase): @classmethod def setUpClass(cls): fd, cls.fname = tempfile.mkstemp() - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd, sf_text_slash) else: os.write(fd, bytes(sf_text_slash, 'ascii')) @@ -843,9 +825,7 @@ class TestSpecH5SlashInLabels(unittest.TestCase): self.sfh5 = SpecH5(self.fname) def tearDown(self): - # fix Win32 permission error when deleting temp file - del self.sfh5 - gc.collect() + self.sfh5.close() def testLabels(self): """Ensure `/` is substituted with `%` and diff --git a/silx/io/test/test_spectoh5.py b/silx/io/test/test_spectoh5.py index 421c48b..8020731 100644 --- a/silx/io/test/test_spectoh5.py +++ b/silx/io/test/test_spectoh5.py @@ -1,6 +1,6 @@ # coding: utf-8 # /*########################################################################## -# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# Copyright (C) 2016-2018 European Synchrotron Radiation Facility # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -23,8 +23,7 @@ # ############################################################################*/ """Tests for SpecFile to HDF5 converter""" -import gc -from numpy import array_equal, string_ +from numpy import array_equal import os import sys import tempfile @@ -41,7 +40,7 @@ else: __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "02/06/2017" +__date__ = "12/02/2018" sftext = """#F /tmp/sf.dat @@ -93,7 +92,7 @@ class TestConvertSpecHDF5(unittest.TestCase): @classmethod def setUpClass(cls): fd, cls.spec_fname = tempfile.mkstemp(text=False) - if sys.version < '3.0': + if sys.version_info < (3, ): os.write(fd, sftext) else: os.write(fd, bytes(sftext, 'ascii')) @@ -115,11 +114,9 @@ class TestConvertSpecHDF5(unittest.TestCase): self.h5f = h5py.File(self.h5_fname, "a") def tearDown(self): - del self.sfh5 self.h5f.close() - del self.h5f + self.sfh5.close() os.unlink(self.h5_fname) - gc.collect() def testAppendToHDF5(self): write_to_h5(self.sfh5, self.h5f, h5path="/foo/bar/spam") @@ -141,24 +138,22 @@ class TestConvertSpecHDF5(unittest.TestCase): def testTitle(self): """Test the value of a dataset""" title12 = self.h5f["/1.2/title"].value - if sys.version > '3.0': - title12 = title12.decode() self.assertEqual(title12, - "1 aaaaaa") + u"aaaaaa") def testAttrs(self): # Test root group (file) attributes self.assertEqual(self.h5f.attrs["NX_class"], - string_("NXroot")) + u"NXroot") # Test dataset attributes ds = self.h5f["/1.2/instrument/mca_1/data"] self.assertTrue("interpretation" in ds.attrs) self.assertEqual(list(ds.attrs.values()), - [string_("spectrum")]) + [u"spectrum"]) # Test group attributes grp = self.h5f["1.1"] self.assertEqual(grp.attrs["NX_class"], - string_("NXentry")) + u"NXentry") self.assertEqual(len(list(grp.attrs.keys())), 1) diff --git a/silx/io/test/test_url.py b/silx/io/test/test_url.py new file mode 100644 index 0000000..5093fc2 --- /dev/null +++ b/silx/io/test/test_url.py @@ -0,0 +1,209 @@ +# coding: utf-8 +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for url module""" + +__authors__ = ["V. Valls"] +__license__ = "MIT" +__date__ = "29/01/2018" + + +import unittest +from ..url import DataUrl + + +class TestDataUrl(unittest.TestCase): + + def assertUrl(self, url, expected): + self.assertEqual(url.is_valid(), expected[0]) + self.assertEqual(url.is_absolute(), expected[1]) + self.assertEqual(url.scheme(), expected[2]) + self.assertEqual(url.file_path(), expected[3]) + self.assertEqual(url.data_path(), expected[4]) + self.assertEqual(url.data_slice(), expected[5]) + + def test_fabio_absolute(self): + url = DataUrl("fabio:///data/image.edf?slice=2") + expected = [True, True, "fabio", "/data/image.edf", None, (2, )] + self.assertUrl(url, expected) + + def test_fabio_absolute_windows(self): + url = DataUrl("fabio:///C:/data/image.edf?slice=2") + expected = [True, True, "fabio", "C:/data/image.edf", None, (2, )] + self.assertUrl(url, expected) + + def test_silx_absolute(self): + url = DataUrl("silx:///data/image.h5?path=/data/dataset&slice=1,5") + expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)] + self.assertUrl(url, expected) + + def test_commandline_shell_separator(self): + url = DataUrl("silx:///data/image.h5::path=/data/dataset&slice=1,5") + expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)] + self.assertUrl(url, expected) + + def test_silx_absolute2(self): + url = DataUrl("silx:///data/image.edf?/scan_0/detector/data") + expected = [True, True, "silx", "/data/image.edf", "/scan_0/detector/data", None] + self.assertUrl(url, expected) + + def test_silx_absolute_windows(self): + url = DataUrl("silx:///C:/data/image.h5?/scan_0/detector/data") + expected = [True, True, "silx", "C:/data/image.h5", "/scan_0/detector/data", None] + self.assertUrl(url, expected) + + def test_silx_relative(self): + url = DataUrl("silx:./image.h5") + expected = [True, False, "silx", "./image.h5", None, None] + self.assertUrl(url, expected) + + def test_fabio_relative(self): + url = DataUrl("fabio:./image.edf") + expected = [True, False, "fabio", "./image.edf", None, None] + self.assertUrl(url, expected) + + def test_silx_relative2(self): + url = DataUrl("silx:image.h5") + expected = [True, False, "silx", "image.h5", None, None] + self.assertUrl(url, expected) + + def test_fabio_relative2(self): + url = DataUrl("fabio:image.edf") + expected = [True, False, "fabio", "image.edf", None, None] + self.assertUrl(url, expected) + + def test_file_relative(self): + url = DataUrl("image.edf") + expected = [True, False, None, "image.edf", None, None] + self.assertUrl(url, expected) + + def test_file_relative2(self): + url = DataUrl("./foo/bar/image.edf") + expected = [True, False, None, "./foo/bar/image.edf", None, None] + self.assertUrl(url, expected) + + def test_file_relative3(self): + url = DataUrl("foo/bar/image.edf") + expected = [True, False, None, "foo/bar/image.edf", None, None] + self.assertUrl(url, expected) + + def test_file_absolute(self): + url = DataUrl("/data/image.edf") + expected = [True, True, None, "/data/image.edf", None, None] + self.assertUrl(url, expected) + + def test_file_absolute_windows(self): + url = DataUrl("C:/data/image.edf") + expected = [True, True, None, "C:/data/image.edf", None, None] + self.assertUrl(url, expected) + + def test_absolute_with_path(self): + url = DataUrl("/foo/foobar.h5?/foo/bar") + expected = [True, True, None, "/foo/foobar.h5", "/foo/bar", None] + self.assertUrl(url, expected) + + def test_windows_file_data_slice(self): + url = DataUrl("C:/foo/foobar.h5?path=/foo/bar&slice=5,1") + expected = [True, True, None, "C:/foo/foobar.h5", "/foo/bar", (5, 1)] + self.assertUrl(url, expected) + + def test_scheme_file_data_slice(self): + url = DataUrl("silx:/foo/foobar.h5?path=/foo/bar&slice=5,1") + expected = [True, True, "silx", "/foo/foobar.h5", "/foo/bar", (5, 1)] + self.assertUrl(url, expected) + + def test_scheme_windows_file_data_slice(self): + url = DataUrl("silx:C:/foo/foobar.h5?path=/foo/bar&slice=5,1") + expected = [True, True, "silx", "C:/foo/foobar.h5", "/foo/bar", (5, 1)] + self.assertUrl(url, expected) + + def test_empty(self): + url = DataUrl("") + expected = [False, False, None, "", None, None] + self.assertUrl(url, expected) + + def test_unknown_scheme(self): + url = DataUrl("foo:/foo/foobar.h5?path=/foo/bar&slice=5,1") + expected = [False, True, "foo", "/foo/foobar.h5", "/foo/bar", (5, 1)] + self.assertUrl(url, expected) + + def test_slice(self): + url = DataUrl("/a.h5?path=/b&slice=5,1") + expected = [True, True, None, "/a.h5", "/b", (5, 1)] + self.assertUrl(url, expected) + + def test_slice_ellipsis(self): + url = DataUrl("/a.h5?path=/b&slice=...") + expected = [True, True, None, "/a.h5", "/b", (Ellipsis, )] + self.assertUrl(url, expected) + + def test_slice_slicing(self): + url = DataUrl("/a.h5?path=/b&slice=:") + expected = [True, True, None, "/a.h5", "/b", (slice(None), )] + self.assertUrl(url, expected) + + def test_slice_missing_element(self): + url = DataUrl("/a.h5?path=/b&slice=5,,1") + expected = [False, True, None, "/a.h5", "/b", None] + self.assertUrl(url, expected) + + def test_slice_no_elements(self): + url = DataUrl("/a.h5?path=/b&slice=") + expected = [False, True, None, "/a.h5", "/b", None] + self.assertUrl(url, expected) + + def test_create_relative_url(self): + url = DataUrl(scheme="silx", file_path="./foo.h5", data_path="/", data_slice=(5, 1)) + self.assertFalse(url.is_absolute()) + url2 = DataUrl(url.path()) + self.assertEqual(url, url2) + + def test_create_absolute_url(self): + url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1)) + url2 = DataUrl(url.path()) + self.assertEqual(url, url2) + + def test_create_absolute_windows_url(self): + url = DataUrl(scheme="silx", file_path="C:/foo.h5", data_path="/", data_slice=(5, 1)) + url2 = DataUrl(url.path()) + self.assertEqual(url, url2) + + def test_create_slice_url(self): + url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1, Ellipsis, slice(None))) + url2 = DataUrl(url.path()) + self.assertEqual(url, url2) + + def test_wrong_url(self): + url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=(5, 1)) + self.assertFalse(url.is_valid()) + + +def suite(): + test_suite = unittest.TestSuite() + loadTests = unittest.defaultTestLoader.loadTestsFromTestCase + test_suite.addTest(loadTests(TestDataUrl)) + return test_suite + + +if __name__ == '__main__': + unittest.main(defaultTest='suite') diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py index 09074bb..53e001c 100644 --- a/silx/io/test/test_utils.py +++ b/silx/io/test/test_utils.py @@ -30,9 +30,9 @@ import re import shutil import tempfile import unittest -import sys from .. import utils +import silx.io.url try: import h5py @@ -48,7 +48,7 @@ except ImportError: __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "28/09/2017" +__date__ = "12/02/2018" expected_spec1 = r"""#F .* @@ -110,9 +110,9 @@ class TestSave(unittest.TestCase): def test_save_csv(self): utils.save1D(self.csv_fname, self.x, self.y, - xlabel=self.xlab, ylabels=self.ylabs, - filetype="csv", fmt=["%d", "%.2f", "%.2e"], - csvdelim=";", autoheader=True) + xlabel=self.xlab, ylabels=self.ylabs, + filetype="csv", fmt=["%d", "%.2f", "%.2e"], + csvdelim=";", autoheader=True) csvf = open(self.csv_fname) actual_csv = csvf.read() @@ -125,22 +125,20 @@ class TestSave(unittest.TestCase): and converting it to a named record array""" npyf = open(self.npy_fname, "wb") utils.save1D(npyf, self.x, self.y, - xlabel=self.xlab, ylabels=self.ylabs) + xlabel=self.xlab, ylabels=self.ylabs) npyf.close() npy_recarray = numpy.load(self.npy_fname) self.assertEqual(npy_recarray.shape, (3,)) - self.assertTrue( - numpy.array_equal( - npy_recarray['Ordinate1'], - numpy.array((4, 5, 6)))) + self.assertTrue(numpy.array_equal(npy_recarray['Ordinate1'], + numpy.array((4, 5, 6)))) def test_savespec_filename(self): """Save SpecFile using savespec()""" utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab, - ylabel=self.ylabs[0], fmt=["%d", "%.2f"], close_file=True, - scan_number=1) + ylabel=self.ylabs[0], fmt=["%d", "%.2f"], + close_file=True, scan_number=1) specf = open(self.spec_fname) actual_spec = specf.read() @@ -152,15 +150,15 @@ class TestSave(unittest.TestCase): """Save SpecFile using savespec(), passing a file handle""" # first savespec: open, write file header, save y[0] as scan 1, # return file handle - specf = utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab, - ylabel=self.ylabs[0], fmt=["%d", "%.2f"], - close_file=False) + specf = utils.savespec(self.spec_fname, self.x, self.y[0], + xlabel=self.xlab, ylabel=self.ylabs[0], + fmt=["%d", "%.2f"], close_file=False) # second savespec: save y[1] as scan 2, close file utils.savespec(specf, self.x, self.y[1], xlabel=self.xlab, - ylabel=self.ylabs[1], fmt=["%d", "%.2f"], - write_file_header=False, close_file=True, - scan_number=2) + ylabel=self.ylabs[1], fmt=["%d", "%.2f"], + write_file_header=False, close_file=True, + scan_number=2) specf = open(self.spec_fname) actual_spec = specf.read() @@ -171,7 +169,7 @@ class TestSave(unittest.TestCase): def test_save_spec(self): """Save SpecFile using save()""" utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab, - ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) + ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) specf = open(self.spec_fname) actual_spec = specf.read() @@ -192,7 +190,7 @@ class TestSave(unittest.TestCase): self.y = [[4, 5, 6], [7, 8, 9]] self.ylabs = ["Ordinate1", "Ordinate2"] utils.save1D(self.csv_fname, self.x, self.y, - autoheader=True, fmt=["%d", "%.2f", "%.2e"]) + autoheader=True, fmt=["%d", "%.2f", "%.2e"]) csvf = open(self.csv_fname) actual_csv = csvf.read() @@ -242,15 +240,12 @@ class TestH5Ls(unittest.TestCase): self.assertIn("+foo", lines) self.assertIn("\t+bar", lines) - self.assertMatchAnyStringInList( - r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">', - lines) - self.assertMatchAnyStringInList( - r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">', - lines) - self.assertMatchAnyStringInList( - r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">', - lines) + match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">' + self.assertMatchAnyStringInList(match, lines) + match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">' + self.assertMatchAnyStringInList(match, lines) + match = r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">' + self.assertMatchAnyStringInList(match, lines) os.unlink(self.h5_fname) @@ -325,12 +320,6 @@ class TestOpen(unittest.TestCase): @classmethod def tearDownClass(cls): - if sys.platform == "win32" and fabio is not None: - # gc collect is needed to close a file descriptor - # opened by fabio and not released. - # https://github.com/silx-kit/fabio/issues/167 - import gc - gc.collect() shutil.rmtree(cls.tmp_directory) def testH5(self): @@ -410,6 +399,30 @@ class TestOpen(unittest.TestCase): # load it self.assertRaises(IOError, utils.open, self.missing_filename) + def test_silx_scheme(self): + if h5py is None: + self.skipTest("H5py is missing") + url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/") + with utils.open(url.path()) as f: + self.assertIsNotNone(f) + self.assertTrue(silx.io.utils.is_file(f)) + + def test_fabio_scheme(self): + if h5py is None: + self.skipTest("H5py is missing") + if fabio is None: + self.skipTest("Fabio is missing") + url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename) + self.assertRaises(IOError, utils.open, url.path()) + + def test_bad_url(self): + url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename) + self.assertRaises(IOError, utils.open, url.path()) + + def test_sliced_url(self): + url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,)) + self.assertRaises(IOError, utils.open, url.path()) + class TestNodes(unittest.TestCase): """Test `silx.io.utils.is_` functions.""" @@ -443,7 +456,7 @@ class TestNodes(unittest.TestCase): class Foo(object): def __init__(self): - self.h5py_class = h5py.File + self.h5_class = utils.H5Type.FILE obj = Foo() self.assertTrue(utils.is_file(obj)) self.assertTrue(utils.is_group(obj)) @@ -455,7 +468,7 @@ class TestNodes(unittest.TestCase): class Foo(object): def __init__(self): - self.h5py_class = h5py.Group + self.h5_class = utils.H5Type.GROUP obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertTrue(utils.is_group(obj)) @@ -467,7 +480,7 @@ class TestNodes(unittest.TestCase): class Foo(object): def __init__(self): - self.h5py_class = h5py.Dataset + self.h5_class = utils.H5Type.DATASET obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) @@ -491,13 +504,142 @@ class TestNodes(unittest.TestCase): class Foo(object): def __init__(self): - self.h5py_class = int + self.h5_class = int obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) self.assertFalse(utils.is_dataset(obj)) +class TestGetData(unittest.TestCase): + """Test `silx.io.utils.get_data` function.""" + + @classmethod + def setUpClass(cls): + cls.tmp_directory = tempfile.mkdtemp() + cls.createResources(cls.tmp_directory) + + @classmethod + def createResources(cls, directory): + + if h5py is not None: + cls.h5_filename = os.path.join(directory, "test.h5") + h5 = h5py.File(cls.h5_filename, mode="w") + h5["group/group/scalar"] = 50 + h5["group/group/array"] = [1, 2, 3, 4, 5] + h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]] + h5.close() + + cls.spec_filename = os.path.join(directory, "test.dat") + utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y", + fmt=["%d", "%.2f"], close_file=True, scan_number=1) + + if fabio is not None: + cls.edf_filename = os.path.join(directory, "test.edf") + cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf") + header = fabio.fabioimage.OrderedDict() + header["integer"] = "10" + data = numpy.array([[10, 50], [50, 10]]) + fabiofile = fabio.edfimage.EdfImage(data, header) + fabiofile.write(cls.edf_filename) + fabiofile.appendFrame(data=data, header=header) + fabiofile.write(cls.edf_multiframe_filename) + + cls.txt_filename = os.path.join(directory, "test.txt") + f = io.open(cls.txt_filename, "w+t") + f.write(u"Kikoo") + f.close() + + cls.missing_filename = os.path.join(directory, "test.missing") + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmp_directory) + + def test_hdf5_scalar(self): + if h5py is None: + self.skipTest("H5py is missing") + url = "silx:%s?/group/group/scalar" % self.h5_filename + data = utils.get_data(url=url) + self.assertEqual(data, 50) + + def test_hdf5_array(self): + if h5py is None: + self.skipTest("H5py is missing") + url = "silx:%s?/group/group/array" % self.h5_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (5, )) + self.assertEqual(data[0], 1) + + def test_hdf5_array_slice(self): + if h5py is None: + self.skipTest("H5py is missing") + url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (5, )) + self.assertEqual(data[0], 6) + + def test_hdf5_array_slice_out_of_range(self): + if h5py is None: + self.skipTest("H5py is missing") + url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename + self.assertRaises(ValueError, utils.get_data, url) + + def test_edf_using_silx(self): + if h5py is None: + self.skipTest("H5py is missing") + if fabio is None: + self.skipTest("fabio is missing") + url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_fabio_frame(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "fabio:%s?slice=1" % self.edf_multiframe_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_fabio_singleframe(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "fabio:%s?slice=0" % self.edf_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_fabio_too_much_frames(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "fabio:%s?slice=..." % self.edf_multiframe_filename + self.assertRaises(ValueError, utils.get_data, url) + + def test_fabio_no_frame(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "fabio:%s" % self.edf_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_unsupported_scheme(self): + url = "foo:/foo/bar" + self.assertRaises(ValueError, utils.get_data, url) + + def test_no_scheme(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename + self.assertRaises((ValueError, IOError), utils.get_data, url) + + def test_file_not_exists(self): + url = "silx:/foo/bar" + self.assertRaises(IOError, utils.get_data, url) + + def suite(): loadTests = unittest.defaultTestLoader.loadTestsFromTestCase test_suite = unittest.TestSuite() @@ -505,6 +647,7 @@ def suite(): test_suite.addTest(loadTests(TestH5Ls)) test_suite.addTest(loadTests(TestOpen)) test_suite.addTest(loadTests(TestNodes)) + test_suite.addTest(loadTests(TestGetData)) return test_suite |