diff options
Diffstat (limited to 'silx/io/test/test_utils.py')
-rw-r--r-- | silx/io/test/test_utils.py | 221 |
1 files changed, 182 insertions, 39 deletions
diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py index 09074bb..53e001c 100644 --- a/silx/io/test/test_utils.py +++ b/silx/io/test/test_utils.py @@ -30,9 +30,9 @@ import re import shutil import tempfile import unittest -import sys from .. import utils +import silx.io.url try: import h5py @@ -48,7 +48,7 @@ except ImportError: __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "28/09/2017" +__date__ = "12/02/2018" expected_spec1 = r"""#F .* @@ -110,9 +110,9 @@ class TestSave(unittest.TestCase): def test_save_csv(self): utils.save1D(self.csv_fname, self.x, self.y, - xlabel=self.xlab, ylabels=self.ylabs, - filetype="csv", fmt=["%d", "%.2f", "%.2e"], - csvdelim=";", autoheader=True) + xlabel=self.xlab, ylabels=self.ylabs, + filetype="csv", fmt=["%d", "%.2f", "%.2e"], + csvdelim=";", autoheader=True) csvf = open(self.csv_fname) actual_csv = csvf.read() @@ -125,22 +125,20 @@ class TestSave(unittest.TestCase): and converting it to a named record array""" npyf = open(self.npy_fname, "wb") utils.save1D(npyf, self.x, self.y, - xlabel=self.xlab, ylabels=self.ylabs) + xlabel=self.xlab, ylabels=self.ylabs) npyf.close() npy_recarray = numpy.load(self.npy_fname) self.assertEqual(npy_recarray.shape, (3,)) - self.assertTrue( - numpy.array_equal( - npy_recarray['Ordinate1'], - numpy.array((4, 5, 6)))) + self.assertTrue(numpy.array_equal(npy_recarray['Ordinate1'], + numpy.array((4, 5, 6)))) def test_savespec_filename(self): """Save SpecFile using savespec()""" utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab, - ylabel=self.ylabs[0], fmt=["%d", "%.2f"], close_file=True, - scan_number=1) + ylabel=self.ylabs[0], fmt=["%d", "%.2f"], + close_file=True, scan_number=1) specf = open(self.spec_fname) actual_spec = specf.read() @@ -152,15 +150,15 @@ class TestSave(unittest.TestCase): """Save SpecFile using savespec(), passing a file handle""" # first savespec: open, write file header, save y[0] as scan 1, # return file handle - specf = utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab, - ylabel=self.ylabs[0], fmt=["%d", "%.2f"], - close_file=False) + specf = utils.savespec(self.spec_fname, self.x, self.y[0], + xlabel=self.xlab, ylabel=self.ylabs[0], + fmt=["%d", "%.2f"], close_file=False) # second savespec: save y[1] as scan 2, close file utils.savespec(specf, self.x, self.y[1], xlabel=self.xlab, - ylabel=self.ylabs[1], fmt=["%d", "%.2f"], - write_file_header=False, close_file=True, - scan_number=2) + ylabel=self.ylabs[1], fmt=["%d", "%.2f"], + write_file_header=False, close_file=True, + scan_number=2) specf = open(self.spec_fname) actual_spec = specf.read() @@ -171,7 +169,7 @@ class TestSave(unittest.TestCase): def test_save_spec(self): """Save SpecFile using save()""" utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab, - ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) + ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) specf = open(self.spec_fname) actual_spec = specf.read() @@ -192,7 +190,7 @@ class TestSave(unittest.TestCase): self.y = [[4, 5, 6], [7, 8, 9]] self.ylabs = ["Ordinate1", "Ordinate2"] utils.save1D(self.csv_fname, self.x, self.y, - autoheader=True, fmt=["%d", "%.2f", "%.2e"]) + autoheader=True, fmt=["%d", "%.2f", "%.2e"]) csvf = open(self.csv_fname) actual_csv = csvf.read() @@ -242,15 +240,12 @@ class TestH5Ls(unittest.TestCase): self.assertIn("+foo", lines) self.assertIn("\t+bar", lines) - self.assertMatchAnyStringInList( - r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">', - lines) - self.assertMatchAnyStringInList( - r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">', - lines) - self.assertMatchAnyStringInList( - r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">', - lines) + match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">' + self.assertMatchAnyStringInList(match, lines) + match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">' + self.assertMatchAnyStringInList(match, lines) + match = r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">' + self.assertMatchAnyStringInList(match, lines) os.unlink(self.h5_fname) @@ -325,12 +320,6 @@ class TestOpen(unittest.TestCase): @classmethod def tearDownClass(cls): - if sys.platform == "win32" and fabio is not None: - # gc collect is needed to close a file descriptor - # opened by fabio and not released. - # https://github.com/silx-kit/fabio/issues/167 - import gc - gc.collect() shutil.rmtree(cls.tmp_directory) def testH5(self): @@ -410,6 +399,30 @@ class TestOpen(unittest.TestCase): # load it self.assertRaises(IOError, utils.open, self.missing_filename) + def test_silx_scheme(self): + if h5py is None: + self.skipTest("H5py is missing") + url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/") + with utils.open(url.path()) as f: + self.assertIsNotNone(f) + self.assertTrue(silx.io.utils.is_file(f)) + + def test_fabio_scheme(self): + if h5py is None: + self.skipTest("H5py is missing") + if fabio is None: + self.skipTest("Fabio is missing") + url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename) + self.assertRaises(IOError, utils.open, url.path()) + + def test_bad_url(self): + url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename) + self.assertRaises(IOError, utils.open, url.path()) + + def test_sliced_url(self): + url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,)) + self.assertRaises(IOError, utils.open, url.path()) + class TestNodes(unittest.TestCase): """Test `silx.io.utils.is_` functions.""" @@ -443,7 +456,7 @@ class TestNodes(unittest.TestCase): class Foo(object): def __init__(self): - self.h5py_class = h5py.File + self.h5_class = utils.H5Type.FILE obj = Foo() self.assertTrue(utils.is_file(obj)) self.assertTrue(utils.is_group(obj)) @@ -455,7 +468,7 @@ class TestNodes(unittest.TestCase): class Foo(object): def __init__(self): - self.h5py_class = h5py.Group + self.h5_class = utils.H5Type.GROUP obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertTrue(utils.is_group(obj)) @@ -467,7 +480,7 @@ class TestNodes(unittest.TestCase): class Foo(object): def __init__(self): - self.h5py_class = h5py.Dataset + self.h5_class = utils.H5Type.DATASET obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) @@ -491,13 +504,142 @@ class TestNodes(unittest.TestCase): class Foo(object): def __init__(self): - self.h5py_class = int + self.h5_class = int obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) self.assertFalse(utils.is_dataset(obj)) +class TestGetData(unittest.TestCase): + """Test `silx.io.utils.get_data` function.""" + + @classmethod + def setUpClass(cls): + cls.tmp_directory = tempfile.mkdtemp() + cls.createResources(cls.tmp_directory) + + @classmethod + def createResources(cls, directory): + + if h5py is not None: + cls.h5_filename = os.path.join(directory, "test.h5") + h5 = h5py.File(cls.h5_filename, mode="w") + h5["group/group/scalar"] = 50 + h5["group/group/array"] = [1, 2, 3, 4, 5] + h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]] + h5.close() + + cls.spec_filename = os.path.join(directory, "test.dat") + utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y", + fmt=["%d", "%.2f"], close_file=True, scan_number=1) + + if fabio is not None: + cls.edf_filename = os.path.join(directory, "test.edf") + cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf") + header = fabio.fabioimage.OrderedDict() + header["integer"] = "10" + data = numpy.array([[10, 50], [50, 10]]) + fabiofile = fabio.edfimage.EdfImage(data, header) + fabiofile.write(cls.edf_filename) + fabiofile.appendFrame(data=data, header=header) + fabiofile.write(cls.edf_multiframe_filename) + + cls.txt_filename = os.path.join(directory, "test.txt") + f = io.open(cls.txt_filename, "w+t") + f.write(u"Kikoo") + f.close() + + cls.missing_filename = os.path.join(directory, "test.missing") + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tmp_directory) + + def test_hdf5_scalar(self): + if h5py is None: + self.skipTest("H5py is missing") + url = "silx:%s?/group/group/scalar" % self.h5_filename + data = utils.get_data(url=url) + self.assertEqual(data, 50) + + def test_hdf5_array(self): + if h5py is None: + self.skipTest("H5py is missing") + url = "silx:%s?/group/group/array" % self.h5_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (5, )) + self.assertEqual(data[0], 1) + + def test_hdf5_array_slice(self): + if h5py is None: + self.skipTest("H5py is missing") + url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (5, )) + self.assertEqual(data[0], 6) + + def test_hdf5_array_slice_out_of_range(self): + if h5py is None: + self.skipTest("H5py is missing") + url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename + self.assertRaises(ValueError, utils.get_data, url) + + def test_edf_using_silx(self): + if h5py is None: + self.skipTest("H5py is missing") + if fabio is None: + self.skipTest("fabio is missing") + url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_fabio_frame(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "fabio:%s?slice=1" % self.edf_multiframe_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_fabio_singleframe(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "fabio:%s?slice=0" % self.edf_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_fabio_too_much_frames(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "fabio:%s?slice=..." % self.edf_multiframe_filename + self.assertRaises(ValueError, utils.get_data, url) + + def test_fabio_no_frame(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "fabio:%s" % self.edf_filename + data = utils.get_data(url=url) + self.assertEqual(data.shape, (2, 2)) + self.assertEqual(data[0, 0], 10) + + def test_unsupported_scheme(self): + url = "foo:/foo/bar" + self.assertRaises(ValueError, utils.get_data, url) + + def test_no_scheme(self): + if fabio is None: + self.skipTest("fabio is missing") + url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename + self.assertRaises((ValueError, IOError), utils.get_data, url) + + def test_file_not_exists(self): + url = "silx:/foo/bar" + self.assertRaises(IOError, utils.get_data, url) + + def suite(): loadTests = unittest.defaultTestLoader.loadTestsFromTestCase test_suite = unittest.TestSuite() @@ -505,6 +647,7 @@ def suite(): test_suite.addTest(loadTests(TestH5Ls)) test_suite.addTest(loadTests(TestOpen)) test_suite.addTest(loadTests(TestNodes)) + test_suite.addTest(loadTests(TestGetData)) return test_suite |