summaryrefslogtreecommitdiff
path: root/silx/io/test/test_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/test/test_utils.py')
-rw-r--r--silx/io/test/test_utils.py244
1 files changed, 222 insertions, 22 deletions
diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py
index 6c70636..13ab532 100644
--- a/silx/io/test/test_utils.py
+++ b/silx/io/test/test_utils.py
@@ -33,6 +33,7 @@ import unittest
import sys
from .. import utils
+from ..._version import calc_hexversion
import silx.io.url
import h5py
@@ -40,11 +41,9 @@ from ..utils import h5ls
import fabio
-
__authors__ = ["P. Knobel"]
__license__ = "MIT"
-__date__ = "12/02/2018"
-
+__date__ = "03/12/2020"
expected_spec1 = r"""#F .*
#D .*
@@ -67,6 +66,28 @@ expected_spec2 = expected_spec1 + r"""
2 8\.00
3 9\.00
"""
+
+expected_spec2reg = r"""#F .*
+#D .*
+
+#S 1 Ordinate1
+#D .*
+#N 3
+#L Abscissa Ordinate1 Ordinate2
+1 4\.00 7\.00
+2 5\.00 8\.00
+3 6\.00 9\.00
+"""
+
+expected_spec2irr = expected_spec1 + r"""
+#S 2 Ordinate2
+#D .*
+#N 2
+#L Abscissa Ordinate2
+1 7\.00
+2 8\.00
+"""
+
expected_csv = r"""Abscissa;Ordinate1;Ordinate2
1;4\.00;7\.00e\+00
2;5\.00;8\.00e\+00
@@ -83,6 +104,7 @@ expected_csv2 = r"""x;y0;y1
class TestSave(unittest.TestCase):
"""Test saving curves as SpecFile:
"""
+
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
@@ -92,6 +114,7 @@ class TestSave(unittest.TestCase):
self.x = [1, 2, 3]
self.xlab = "Abscissa"
self.y = [[4, 5, 6], [7, 8, 9]]
+ self.y_irr = [[4, 5, 6], [7, 8]]
self.ylabs = ["Ordinate1", "Ordinate2"]
def tearDown(self):
@@ -103,13 +126,6 @@ class TestSave(unittest.TestCase):
os.unlink(self.npy_fname)
shutil.rmtree(self.tempdir)
- def assertRegex(self, *args, **kwargs):
- # Python 2 compatibility
- if sys.version_info.major >= 3:
- return super(TestSave, self).assertRegex(*args, **kwargs)
- else:
- return self.assertRegexpMatches(*args, **kwargs)
-
def test_save_csv(self):
utils.save1D(self.csv_fname, self.x, self.y,
xlabel=self.xlab, ylabels=self.ylabs,
@@ -145,7 +161,6 @@ class TestSave(unittest.TestCase):
specf = open(self.spec_fname)
actual_spec = specf.read()
specf.close()
-
self.assertRegex(actual_spec, expected_spec1)
def test_savespec_file_handle(self):
@@ -165,18 +180,30 @@ class TestSave(unittest.TestCase):
specf = open(self.spec_fname)
actual_spec = specf.read()
specf.close()
-
self.assertRegex(actual_spec, expected_spec2)
- def test_save_spec(self):
- """Save SpecFile using save()"""
+ def test_save_spec_reg(self):
+ """Save SpecFile using save() on a regular pattern"""
utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab,
ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
specf = open(self.spec_fname)
actual_spec = specf.read()
specf.close()
- self.assertRegex(actual_spec, expected_spec2)
+
+ self.assertRegex(actual_spec, expected_spec2reg)
+
+ def test_save_spec_irr(self):
+ """Save SpecFile using save() on an irregular pattern"""
+ # invalid test case ?!
+ return
+ utils.save1D(self.spec_fname, self.x, self.y_irr, xlabel=self.xlab,
+ ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec2irr)
def test_save_csv_no_labels(self):
"""Save csv using save(), with autoheader=True but
@@ -217,6 +244,7 @@ class TestH5Ls(unittest.TestCase):
<HDF5 dataset "data": shape (1,), type "<f8">
"""
+
def assertMatchAnyStringInList(self, pattern, list_of_strings):
for string_ in list_of_strings:
if re.match(pattern, string_):
@@ -395,6 +423,7 @@ class TestOpen(unittest.TestCase):
class TestNodes(unittest.TestCase):
"""Test `silx.io.utils.is_` functions."""
+
def test_real_h5py_objects(self):
name = tempfile.mktemp(suffix=".h5")
try:
@@ -417,45 +446,60 @@ class TestNodes(unittest.TestCase):
os.unlink(name)
def test_h5py_like_file(self):
+
class Foo(object):
+
def __init__(self):
self.h5_class = utils.H5Type.FILE
+
obj = Foo()
self.assertTrue(utils.is_file(obj))
self.assertTrue(utils.is_group(obj))
self.assertFalse(utils.is_dataset(obj))
def test_h5py_like_group(self):
+
class Foo(object):
+
def __init__(self):
self.h5_class = utils.H5Type.GROUP
+
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertTrue(utils.is_group(obj))
self.assertFalse(utils.is_dataset(obj))
def test_h5py_like_dataset(self):
+
class Foo(object):
+
def __init__(self):
self.h5_class = utils.H5Type.DATASET
+
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
self.assertTrue(utils.is_dataset(obj))
def test_bad(self):
+
class Foo(object):
+
def __init__(self):
pass
+
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
self.assertFalse(utils.is_dataset(obj))
def test_bad_api(self):
+
class Foo(object):
+
def __init__(self):
self.h5_class = int
+
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
@@ -513,18 +557,20 @@ class TestGetData(unittest.TestCase):
def test_hdf5_array(self):
url = "silx:%s?/group/group/array" % self.h5_filename
data = utils.get_data(url=url)
- self.assertEqual(data.shape, (5, ))
+ self.assertEqual(data.shape, (5,))
self.assertEqual(data[0], 1)
def test_hdf5_array_slice(self):
url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename
data = utils.get_data(url=url)
- self.assertEqual(data.shape, (5, ))
+ self.assertEqual(data.shape, (5,))
self.assertEqual(data[0], 6)
def test_hdf5_array_slice_out_of_range(self):
url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename
- self.assertRaises(ValueError, utils.get_data, url)
+ # ValueError: h5py 2.x
+ # IndexError: h5py 3.x
+ self.assertRaises((ValueError, IndexError), utils.get_data, url)
def test_edf_using_silx(self):
url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename
@@ -568,14 +614,15 @@ class TestGetData(unittest.TestCase):
def _h5_py_version_older_than(version):
- v_majeur, v_mineur, v_micro = h5py.version.version.split('.')[:3]
- r_majeur, r_mineur, r_micro = version.split('.')
- return v_majeur >= r_majeur and v_mineur >= r_mineur
+ v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split('.')[:3]]
+ r_majeur, r_mineur, r_micro = [int(i) for i in version.split('.')]
+ return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion(r_majeur, r_mineur, r_micro)
@unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0')
class TestRawFileToH5(unittest.TestCase):
"""Test conversion of .vol file to .h5 external dataset"""
+
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self._vol_file = os.path.join(self.tempdir, 'test_vol.vol')
@@ -589,7 +636,7 @@ class TestRawFileToH5(unittest.TestCase):
assert os.path.exists(self._vol_file + '.npy')
os.rename(self._vol_file + '.npy', self._vol_file)
self.h5_file = os.path.join(self.tempdir, 'test_h5.h5')
- self.external_dataset_path= '/root/my_external_dataset'
+ self.external_dataset_path = '/root/my_external_dataset'
self._data_url = silx.io.url.DataUrl(file_path=self.h5_file,
data_path=self.external_dataset_path)
with open(self._file_info, 'w') as _fi:
@@ -672,6 +719,158 @@ class TestRawFileToH5(unittest.TestCase):
shape=self._dataset_shape))
+class TestH5Strings(unittest.TestCase):
+ """Test HDF5 str and bytes writing and reading"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.vlenstr = h5py.special_dtype(vlen=str)
+ cls.vlenbytes = h5py.special_dtype(vlen=bytes)
+ try:
+ cls.unicode = unicode
+ except NameError:
+ cls.unicode = str
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir)
+
+ def setUp(self):
+ self.file = h5py.File(os.path.join(self.tempdir, 'file.h5'), mode="w")
+
+ def tearDown(self):
+ self.file.close()
+
+ @classmethod
+ def _make_array(cls, value, n):
+ if isinstance(value, bytes):
+ dtype = cls.vlenbytes
+ elif isinstance(value, cls.unicode):
+ dtype = cls.vlenstr
+ else:
+ return numpy.array([value] * n)
+ return numpy.array([value] * n, dtype=dtype)
+
+ @classmethod
+ def _get_charset(cls, value):
+ if isinstance(value, bytes):
+ return h5py.h5t.CSET_ASCII
+ elif isinstance(value, cls.unicode):
+ return h5py.h5t.CSET_UTF8
+ else:
+ return None
+
+ def _check_dataset(self, value, result=None):
+ # Write+read scalar
+ if result:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+ charset = self._get_charset(value)
+ self.file["data"] = value
+ data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii)
+ assert type(data) == type(result), data
+ assert data == result, data
+ if charset:
+ assert self.file["data"].id.get_type().get_cset() == charset
+
+ # Write+read variable length
+ self.file["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii, index=0)
+ assert type(data) == type(result), data
+ assert data == result, data
+ data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii)
+ numpy.testing.assert_array_equal(data, [result] * 2)
+ if charset:
+ assert self.file["vlen_data"].id.get_type().get_cset() == charset
+
+ def _check_attribute(self, value, result=None):
+ if result:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+ self.file.attrs["data"] = value
+ data = utils.h5py_read_attribute(self.file.attrs, "data", decode_ascii=decode_ascii)
+ assert type(data) == type(result), data
+ assert data == result, data
+
+ self.file.attrs["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_attribute(self.file.attrs, "vlen_data", decode_ascii=decode_ascii)
+ assert type(data[0]) == type(result), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)["vlen_data"]
+ assert type(data[0]) == type(result), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ def test_dataset_ascii_bytes(self):
+ self._check_dataset(b"abc")
+
+ def test_attribute_ascii_bytes(self):
+ self._check_attribute(b"abc")
+
+ def test_dataset_ascii_bytes_decode(self):
+ self._check_dataset(b"abc", result="abc")
+
+ def test_attribute_ascii_bytes_decode(self):
+ self._check_attribute(b"abc", result="abc")
+
+ def test_dataset_ascii_str(self):
+ self._check_dataset("abc")
+
+ def test_attribute_ascii_str(self):
+ self._check_attribute("abc")
+
+ def test_dataset_utf8_str(self):
+ self._check_dataset("\u0101bc")
+
+ def test_attribute_utf8_str(self):
+ self._check_attribute("\u0101bc")
+
+ def test_dataset_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc")
+
+ def test_attribute_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc")
+
+ def test_dataset_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_attribute_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_dataset_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_dataset(b"\xe423")
+
+ def test_attribute_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_attribute(b"\xe423")
+
+ def test_dataset_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_dataset(b"\xe423", result="\udce423")
+
+ def test_attribute_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_attribute(b"\xe423", result="\udce423")
+
+ def test_dataset_no_string(self):
+ self._check_dataset(numpy.int64(10))
+
+ def test_attribute_no_string(self):
+ self._check_attribute(numpy.int64(10))
+
+
def suite():
loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
test_suite = unittest.TestSuite()
@@ -681,6 +880,7 @@ def suite():
test_suite.addTest(loadTests(TestNodes))
test_suite.addTest(loadTests(TestGetData))
test_suite.addTest(loadTests(TestRawFileToH5))
+ test_suite.addTest(loadTests(TestH5Strings))
return test_suite