diff options
Diffstat (limited to 'silx/io/test/test_utils.py')
-rw-r--r-- | silx/io/test/test_utils.py | 244 |
1 files changed, 222 insertions, 22 deletions
diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py index 6c70636..13ab532 100644 --- a/silx/io/test/test_utils.py +++ b/silx/io/test/test_utils.py @@ -33,6 +33,7 @@ import unittest import sys from .. import utils +from ..._version import calc_hexversion import silx.io.url import h5py @@ -40,11 +41,9 @@ from ..utils import h5ls import fabio - __authors__ = ["P. Knobel"] __license__ = "MIT" -__date__ = "12/02/2018" - +__date__ = "03/12/2020" expected_spec1 = r"""#F .* #D .* @@ -67,6 +66,28 @@ expected_spec2 = expected_spec1 + r""" 2 8\.00 3 9\.00 """ + +expected_spec2reg = r"""#F .* +#D .* + +#S 1 Ordinate1 +#D .* +#N 3 +#L Abscissa Ordinate1 Ordinate2 +1 4\.00 7\.00 +2 5\.00 8\.00 +3 6\.00 9\.00 +""" + +expected_spec2irr = expected_spec1 + r""" +#S 2 Ordinate2 +#D .* +#N 2 +#L Abscissa Ordinate2 +1 7\.00 +2 8\.00 +""" + expected_csv = r"""Abscissa;Ordinate1;Ordinate2 1;4\.00;7\.00e\+00 2;5\.00;8\.00e\+00 @@ -83,6 +104,7 @@ expected_csv2 = r"""x;y0;y1 class TestSave(unittest.TestCase): """Test saving curves as SpecFile: """ + def setUp(self): self.tempdir = tempfile.mkdtemp() self.spec_fname = os.path.join(self.tempdir, "savespec.dat") @@ -92,6 +114,7 @@ class TestSave(unittest.TestCase): self.x = [1, 2, 3] self.xlab = "Abscissa" self.y = [[4, 5, 6], [7, 8, 9]] + self.y_irr = [[4, 5, 6], [7, 8]] self.ylabs = ["Ordinate1", "Ordinate2"] def tearDown(self): @@ -103,13 +126,6 @@ class TestSave(unittest.TestCase): os.unlink(self.npy_fname) shutil.rmtree(self.tempdir) - def assertRegex(self, *args, **kwargs): - # Python 2 compatibility - if sys.version_info.major >= 3: - return super(TestSave, self).assertRegex(*args, **kwargs) - else: - return self.assertRegexpMatches(*args, **kwargs) - def test_save_csv(self): utils.save1D(self.csv_fname, self.x, self.y, xlabel=self.xlab, ylabels=self.ylabs, @@ -145,7 +161,6 @@ class TestSave(unittest.TestCase): specf = open(self.spec_fname) actual_spec = specf.read() specf.close() - self.assertRegex(actual_spec, expected_spec1) def test_savespec_file_handle(self): @@ -165,18 +180,30 @@ class TestSave(unittest.TestCase): specf = open(self.spec_fname) actual_spec = specf.read() specf.close() - self.assertRegex(actual_spec, expected_spec2) - def test_save_spec(self): - """Save SpecFile using save()""" + def test_save_spec_reg(self): + """Save SpecFile using save() on a regular pattern""" utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab, ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) specf = open(self.spec_fname) actual_spec = specf.read() specf.close() - self.assertRegex(actual_spec, expected_spec2) + + self.assertRegex(actual_spec, expected_spec2reg) + + def test_save_spec_irr(self): + """Save SpecFile using save() on an irregular pattern""" + # invalid test case ?! + return + utils.save1D(self.spec_fname, self.x, self.y_irr, xlabel=self.xlab, + ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"]) + + specf = open(self.spec_fname) + actual_spec = specf.read() + specf.close() + self.assertRegex(actual_spec, expected_spec2irr) def test_save_csv_no_labels(self): """Save csv using save(), with autoheader=True but @@ -217,6 +244,7 @@ class TestH5Ls(unittest.TestCase): <HDF5 dataset "data": shape (1,), type "<f8"> """ + def assertMatchAnyStringInList(self, pattern, list_of_strings): for string_ in list_of_strings: if re.match(pattern, string_): @@ -395,6 +423,7 @@ class TestOpen(unittest.TestCase): class TestNodes(unittest.TestCase): """Test `silx.io.utils.is_` functions.""" + def test_real_h5py_objects(self): name = tempfile.mktemp(suffix=".h5") try: @@ -417,45 +446,60 @@ class TestNodes(unittest.TestCase): os.unlink(name) def test_h5py_like_file(self): + class Foo(object): + def __init__(self): self.h5_class = utils.H5Type.FILE + obj = Foo() self.assertTrue(utils.is_file(obj)) self.assertTrue(utils.is_group(obj)) self.assertFalse(utils.is_dataset(obj)) def test_h5py_like_group(self): + class Foo(object): + def __init__(self): self.h5_class = utils.H5Type.GROUP + obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertTrue(utils.is_group(obj)) self.assertFalse(utils.is_dataset(obj)) def test_h5py_like_dataset(self): + class Foo(object): + def __init__(self): self.h5_class = utils.H5Type.DATASET + obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) self.assertTrue(utils.is_dataset(obj)) def test_bad(self): + class Foo(object): + def __init__(self): pass + obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) self.assertFalse(utils.is_dataset(obj)) def test_bad_api(self): + class Foo(object): + def __init__(self): self.h5_class = int + obj = Foo() self.assertFalse(utils.is_file(obj)) self.assertFalse(utils.is_group(obj)) @@ -513,18 +557,20 @@ class TestGetData(unittest.TestCase): def test_hdf5_array(self): url = "silx:%s?/group/group/array" % self.h5_filename data = utils.get_data(url=url) - self.assertEqual(data.shape, (5, )) + self.assertEqual(data.shape, (5,)) self.assertEqual(data[0], 1) def test_hdf5_array_slice(self): url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename data = utils.get_data(url=url) - self.assertEqual(data.shape, (5, )) + self.assertEqual(data.shape, (5,)) self.assertEqual(data[0], 6) def test_hdf5_array_slice_out_of_range(self): url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename - self.assertRaises(ValueError, utils.get_data, url) + # ValueError: h5py 2.x + # IndexError: h5py 3.x + self.assertRaises((ValueError, IndexError), utils.get_data, url) def test_edf_using_silx(self): url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename @@ -568,14 +614,15 @@ class TestGetData(unittest.TestCase): def _h5_py_version_older_than(version): - v_majeur, v_mineur, v_micro = h5py.version.version.split('.')[:3] - r_majeur, r_mineur, r_micro = version.split('.') - return v_majeur >= r_majeur and v_mineur >= r_mineur + v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split('.')[:3]] + r_majeur, r_mineur, r_micro = [int(i) for i in version.split('.')] + return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion(r_majeur, r_mineur, r_micro) @unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0') class TestRawFileToH5(unittest.TestCase): """Test conversion of .vol file to .h5 external dataset""" + def setUp(self): self.tempdir = tempfile.mkdtemp() self._vol_file = os.path.join(self.tempdir, 'test_vol.vol') @@ -589,7 +636,7 @@ class TestRawFileToH5(unittest.TestCase): assert os.path.exists(self._vol_file + '.npy') os.rename(self._vol_file + '.npy', self._vol_file) self.h5_file = os.path.join(self.tempdir, 'test_h5.h5') - self.external_dataset_path= '/root/my_external_dataset' + self.external_dataset_path = '/root/my_external_dataset' self._data_url = silx.io.url.DataUrl(file_path=self.h5_file, data_path=self.external_dataset_path) with open(self._file_info, 'w') as _fi: @@ -672,6 +719,158 @@ class TestRawFileToH5(unittest.TestCase): shape=self._dataset_shape)) +class TestH5Strings(unittest.TestCase): + """Test HDF5 str and bytes writing and reading""" + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.vlenstr = h5py.special_dtype(vlen=str) + cls.vlenbytes = h5py.special_dtype(vlen=bytes) + try: + cls.unicode = unicode + except NameError: + cls.unicode = str + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir) + + def setUp(self): + self.file = h5py.File(os.path.join(self.tempdir, 'file.h5'), mode="w") + + def tearDown(self): + self.file.close() + + @classmethod + def _make_array(cls, value, n): + if isinstance(value, bytes): + dtype = cls.vlenbytes + elif isinstance(value, cls.unicode): + dtype = cls.vlenstr + else: + return numpy.array([value] * n) + return numpy.array([value] * n, dtype=dtype) + + @classmethod + def _get_charset(cls, value): + if isinstance(value, bytes): + return h5py.h5t.CSET_ASCII + elif isinstance(value, cls.unicode): + return h5py.h5t.CSET_UTF8 + else: + return None + + def _check_dataset(self, value, result=None): + # Write+read scalar + if result: + decode_ascii = True + else: + decode_ascii = False + result = value + charset = self._get_charset(value) + self.file["data"] = value + data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii) + assert type(data) == type(result), data + assert data == result, data + if charset: + assert self.file["data"].id.get_type().get_cset() == charset + + # Write+read variable length + self.file["vlen_data"] = self._make_array(value, 2) + data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii, index=0) + assert type(data) == type(result), data + assert data == result, data + data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii) + numpy.testing.assert_array_equal(data, [result] * 2) + if charset: + assert self.file["vlen_data"].id.get_type().get_cset() == charset + + def _check_attribute(self, value, result=None): + if result: + decode_ascii = True + else: + decode_ascii = False + result = value + self.file.attrs["data"] = value + data = utils.h5py_read_attribute(self.file.attrs, "data", decode_ascii=decode_ascii) + assert type(data) == type(result), data + assert data == result, data + + self.file.attrs["vlen_data"] = self._make_array(value, 2) + data = utils.h5py_read_attribute(self.file.attrs, "vlen_data", decode_ascii=decode_ascii) + assert type(data[0]) == type(result), data[0] + assert data[0] == result, data[0] + numpy.testing.assert_array_equal(data, [result] * 2) + + data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)["vlen_data"] + assert type(data[0]) == type(result), data[0] + assert data[0] == result, data[0] + numpy.testing.assert_array_equal(data, [result] * 2) + + def test_dataset_ascii_bytes(self): + self._check_dataset(b"abc") + + def test_attribute_ascii_bytes(self): + self._check_attribute(b"abc") + + def test_dataset_ascii_bytes_decode(self): + self._check_dataset(b"abc", result="abc") + + def test_attribute_ascii_bytes_decode(self): + self._check_attribute(b"abc", result="abc") + + def test_dataset_ascii_str(self): + self._check_dataset("abc") + + def test_attribute_ascii_str(self): + self._check_attribute("abc") + + def test_dataset_utf8_str(self): + self._check_dataset("\u0101bc") + + def test_attribute_utf8_str(self): + self._check_attribute("\u0101bc") + + def test_dataset_utf8_bytes(self): + # 0xC481 is the byte representation of U+0101 + self._check_dataset(b"\xc4\x81bc") + + def test_attribute_utf8_bytes(self): + # 0xC481 is the byte representation of U+0101 + self._check_attribute(b"\xc4\x81bc") + + def test_dataset_utf8_bytes_decode(self): + # 0xC481 is the byte representation of U+0101 + self._check_dataset(b"\xc4\x81bc", result="\u0101bc") + + def test_attribute_utf8_bytes_decode(self): + # 0xC481 is the byte representation of U+0101 + self._check_attribute(b"\xc4\x81bc", result="\u0101bc") + + def test_dataset_latin1_bytes(self): + # extended ascii character 0xE4 + self._check_dataset(b"\xe423") + + def test_attribute_latin1_bytes(self): + # extended ascii character 0xE4 + self._check_attribute(b"\xe423") + + def test_dataset_latin1_bytes_decode(self): + # U+DCE4: surrogate for extended ascii character 0xE4 + self._check_dataset(b"\xe423", result="\udce423") + + def test_attribute_latin1_bytes_decode(self): + # U+DCE4: surrogate for extended ascii character 0xE4 + self._check_attribute(b"\xe423", result="\udce423") + + def test_dataset_no_string(self): + self._check_dataset(numpy.int64(10)) + + def test_attribute_no_string(self): + self._check_attribute(numpy.int64(10)) + + def suite(): loadTests = unittest.defaultTestLoader.loadTestsFromTestCase test_suite = unittest.TestSuite() @@ -681,6 +880,7 @@ def suite(): test_suite.addTest(loadTests(TestNodes)) test_suite.addTest(loadTests(TestGetData)) test_suite.addTest(loadTests(TestRawFileToH5)) + test_suite.addTest(loadTests(TestH5Strings)) return test_suite |