summaryrefslogtreecommitdiff
path: root/src/silx/io/test/test_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/silx/io/test/test_utils.py')
-rw-r--r--src/silx/io/test/test_utils.py923
1 files changed, 923 insertions, 0 deletions
diff --git a/src/silx/io/test/test_utils.py b/src/silx/io/test/test_utils.py
new file mode 100644
index 0000000..cc34100
--- /dev/null
+++ b/src/silx/io/test/test_utils.py
@@ -0,0 +1,923 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2019 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for utils module"""
+
+import io
+import numpy
+import os
+import re
+import shutil
+import tempfile
+import unittest
+import sys
+
+from .. import utils
+from ..._version import calc_hexversion
+import silx.io.url
+
+import h5py
+from ..utils import h5ls
+from silx.io import commonh5
+
+import fabio
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "03/12/2020"
+
+expected_spec1 = r"""#F .*
+#D .*
+
+#S 1 Ordinate1
+#D .*
+#N 2
+#L Abscissa Ordinate1
+1 4\.00
+2 5\.00
+3 6\.00
+"""
+
+expected_spec2 = expected_spec1 + r"""
+#S 2 Ordinate2
+#D .*
+#N 2
+#L Abscissa Ordinate2
+1 7\.00
+2 8\.00
+3 9\.00
+"""
+
+expected_spec2reg = r"""#F .*
+#D .*
+
+#S 1 Ordinate1
+#D .*
+#N 3
+#L Abscissa Ordinate1 Ordinate2
+1 4\.00 7\.00
+2 5\.00 8\.00
+3 6\.00 9\.00
+"""
+
+expected_spec2irr = expected_spec1 + r"""
+#S 2 Ordinate2
+#D .*
+#N 2
+#L Abscissa Ordinate2
+1 7\.00
+2 8\.00
+"""
+
+expected_csv = r"""Abscissa;Ordinate1;Ordinate2
+1;4\.00;7\.00e\+00
+2;5\.00;8\.00e\+00
+3;6\.00;9\.00e\+00
+"""
+
+expected_csv2 = r"""x;y0;y1
+1;4\.00;7\.00e\+00
+2;5\.00;8\.00e\+00
+3;6\.00;9\.00e\+00
+"""
+
+
+class TestSave(unittest.TestCase):
+ """Test saving curves as SpecFile:
+ """
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
+ self.csv_fname = os.path.join(self.tempdir, "savecsv.csv")
+ self.npy_fname = os.path.join(self.tempdir, "savenpy.npy")
+
+ self.x = [1, 2, 3]
+ self.xlab = "Abscissa"
+ self.y = [[4, 5, 6], [7, 8, 9]]
+ self.y_irr = [[4, 5, 6], [7, 8]]
+ self.ylabs = ["Ordinate1", "Ordinate2"]
+
+ def tearDown(self):
+ if os.path.isfile(self.spec_fname):
+ os.unlink(self.spec_fname)
+ if os.path.isfile(self.csv_fname):
+ os.unlink(self.csv_fname)
+ if os.path.isfile(self.npy_fname):
+ os.unlink(self.npy_fname)
+ shutil.rmtree(self.tempdir)
+
+ def test_save_csv(self):
+ utils.save1D(self.csv_fname, self.x, self.y,
+ xlabel=self.xlab, ylabels=self.ylabs,
+ filetype="csv", fmt=["%d", "%.2f", "%.2e"],
+ csvdelim=";", autoheader=True)
+
+ csvf = open(self.csv_fname)
+ actual_csv = csvf.read()
+ csvf.close()
+
+ self.assertRegex(actual_csv, expected_csv)
+
+ def test_save_npy(self):
+ """npy file is saved with numpy.save after building a numpy array
+ and converting it to a named record array"""
+ npyf = open(self.npy_fname, "wb")
+ utils.save1D(npyf, self.x, self.y,
+ xlabel=self.xlab, ylabels=self.ylabs)
+ npyf.close()
+
+ npy_recarray = numpy.load(self.npy_fname)
+
+ self.assertEqual(npy_recarray.shape, (3,))
+ self.assertTrue(numpy.array_equal(npy_recarray['Ordinate1'],
+ numpy.array((4, 5, 6))))
+
+ def test_savespec_filename(self):
+ """Save SpecFile using savespec()"""
+ utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab,
+ ylabel=self.ylabs[0], fmt=["%d", "%.2f"],
+ close_file=True, scan_number=1)
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec1)
+
+ def test_savespec_file_handle(self):
+ """Save SpecFile using savespec(), passing a file handle"""
+ # first savespec: open, write file header, save y[0] as scan 1,
+ # return file handle
+ specf = utils.savespec(self.spec_fname, self.x, self.y[0],
+ xlabel=self.xlab, ylabel=self.ylabs[0],
+ fmt=["%d", "%.2f"], close_file=False)
+
+ # second savespec: save y[1] as scan 2, close file
+ utils.savespec(specf, self.x, self.y[1], xlabel=self.xlab,
+ ylabel=self.ylabs[1], fmt=["%d", "%.2f"],
+ write_file_header=False, close_file=True,
+ scan_number=2)
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec2)
+
+ def test_save_spec_reg(self):
+ """Save SpecFile using save() on a regular pattern"""
+ utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab,
+ ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+
+ self.assertRegex(actual_spec, expected_spec2reg)
+
+ def test_save_spec_irr(self):
+ """Save SpecFile using save() on an irregular pattern"""
+ # invalid test case ?!
+ return
+ utils.save1D(self.spec_fname, self.x, self.y_irr, xlabel=self.xlab,
+ ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
+
+ specf = open(self.spec_fname)
+ actual_spec = specf.read()
+ specf.close()
+ self.assertRegex(actual_spec, expected_spec2irr)
+
+ def test_save_csv_no_labels(self):
+ """Save csv using save(), with autoheader=True but
+ xlabel=None and ylabels=None
+ This is a non-regression test for bug #223"""
+ self.tempdir = tempfile.mkdtemp()
+ self.spec_fname = os.path.join(self.tempdir, "savespec.dat")
+ self.csv_fname = os.path.join(self.tempdir, "savecsv.csv")
+ self.npy_fname = os.path.join(self.tempdir, "savenpy.npy")
+
+ self.x = [1, 2, 3]
+ self.xlab = "Abscissa"
+ self.y = [[4, 5, 6], [7, 8, 9]]
+ self.ylabs = ["Ordinate1", "Ordinate2"]
+ utils.save1D(self.csv_fname, self.x, self.y,
+ autoheader=True, fmt=["%d", "%.2f", "%.2e"])
+
+ csvf = open(self.csv_fname)
+ actual_csv = csvf.read()
+ csvf.close()
+ self.assertRegex(actual_csv, expected_csv2)
+
+
+def assert_match_any_string_in_list(test, pattern, list_of_strings):
+ for string_ in list_of_strings:
+ if re.match(pattern, string_):
+ return True
+ return False
+
+
+class TestH5Ls(unittest.TestCase):
+ """Test displaying the following HDF5 file structure:
+
+ +foo
+ +bar
+ <HDF5 dataset "spam": shape (2, 2), type "<i8">
+ <HDF5 dataset "tmp": shape (3,), type "<i8">
+ <HDF5 dataset "data": shape (1,), type "<f8">
+
+ """
+
+ def assertMatchAnyStringInList(self, pattern, list_of_strings):
+ for string_ in list_of_strings:
+ if re.match(pattern, string_):
+ return None
+ raise AssertionError("regex pattern %s does not match any" % pattern +
+ " string in list " + str(list_of_strings))
+
+ def testHdf5(self):
+ fd, self.h5_fname = tempfile.mkstemp(text=False)
+ # Close and delete (we just want the name)
+ os.close(fd)
+ os.unlink(self.h5_fname)
+ self.h5f = h5py.File(self.h5_fname, "w")
+ self.h5f["/foo/bar/tmp"] = [1, 2, 3]
+ self.h5f["/foo/bar/spam"] = [[1, 2], [3, 4]]
+ self.h5f["/foo/data"] = [3.14]
+ self.h5f.close()
+
+ rep = h5ls(self.h5_fname)
+ lines = rep.split("\n")
+
+ self.assertIn("+foo", lines)
+ self.assertIn("\t+bar", lines)
+
+ match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+
+ os.unlink(self.h5_fname)
+
+ # Following test case disabled d/t errors on AppVeyor:
+ # os.unlink(spec_fname)
+ # PermissionError: [WinError 32] The process cannot access the file because
+ # it is being used by another process: 'C:\\...\\savespec.dat'
+
+ # def testSpec(self):
+ # tempdir = tempfile.mkdtemp()
+ # spec_fname = os.path.join(tempdir, "savespec.dat")
+ #
+ # x = [1, 2, 3]
+ # xlab = "Abscissa"
+ # y = [[4, 5, 6], [7, 8, 9]]
+ # ylabs = ["Ordinate1", "Ordinate2"]
+ # utils.save1D(spec_fname, x, y, xlabel=xlab,
+ # ylabels=ylabs, filetype="spec",
+ # fmt=["%d", "%.2f"])
+ #
+ # rep = h5ls(spec_fname)
+ # lines = rep.split("\n")
+ # self.assertIn("+1.1", lines)
+ # self.assertIn("\t+instrument", lines)
+ #
+ # self.assertMatchAnyStringInList(
+ # r'\t\t\t<SPEC dataset "file_header": shape \(\), type "|S60">',
+ # lines)
+ # self.assertMatchAnyStringInList(
+ # r'\t\t<SPEC dataset "Ordinate1": shape \(3L?,\), type "<f4">',
+ # lines)
+ #
+ # os.unlink(spec_fname)
+ # shutil.rmtree(tempdir)
+
+
+class TestOpen(unittest.TestCase):
+ """Test `silx.io.utils.open` function."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+ cls.createResources(cls.tmp_directory)
+
+ @classmethod
+ def createResources(cls, directory):
+
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/dataset"] = 50
+ h5.close()
+
+ cls.spec_filename = os.path.join(directory, "test.dat")
+ utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
+ fmt=["%d", "%.2f"], close_file=True, scan_number=1)
+
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
+
+ cls.txt_filename = os.path.join(directory, "test.txt")
+ f = io.open(cls.txt_filename, "w+t")
+ f.write(u"Kikoo")
+ f.close()
+
+ cls.missing_filename = os.path.join(directory, "test.missing")
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def testH5(self):
+ f = utils.open(self.h5_filename)
+ self.assertIsNotNone(f)
+ self.assertIsInstance(f, h5py.File)
+ f.close()
+
+ def testH5With(self):
+ with utils.open(self.h5_filename) as f:
+ self.assertIsNotNone(f)
+ self.assertIsInstance(f, h5py.File)
+
+ def testH5_withPath(self):
+ f = utils.open(self.h5_filename + "::/group/group/dataset")
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.Dataset)
+ self.assertEqual(f[()], 50)
+ f.close()
+
+ def testH5With_withPath(self):
+ with utils.open(self.h5_filename + "::/group/group") as f:
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.Group)
+ self.assertIn("dataset", f)
+
+ def testSpec(self):
+ f = utils.open(self.spec_filename)
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+ f.close()
+
+ def testSpecWith(self):
+ with utils.open(self.spec_filename) as f:
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+
+ def testEdf(self):
+ f = utils.open(self.edf_filename)
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+ f.close()
+
+ def testEdfWith(self):
+ with utils.open(self.edf_filename) as f:
+ self.assertIsNotNone(f)
+ self.assertEqual(f.h5py_class, h5py.File)
+
+ def testUnsupported(self):
+ self.assertRaises(IOError, utils.open, self.txt_filename)
+
+ def testNotExists(self):
+ # load it
+ self.assertRaises(IOError, utils.open, self.missing_filename)
+
+ def test_silx_scheme(self):
+ url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/")
+ with utils.open(url.path()) as f:
+ self.assertIsNotNone(f)
+ self.assertTrue(silx.io.utils.is_file(f))
+
+ def test_fabio_scheme(self):
+ url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_bad_url(self):
+ url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_sliced_url(self):
+ url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,))
+ self.assertRaises(IOError, utils.open, url.path())
+
+
+class TestNodes(unittest.TestCase):
+ """Test `silx.io.utils.is_` functions."""
+
+ def test_real_h5py_objects(self):
+ name = tempfile.mktemp(suffix=".h5")
+ try:
+ with h5py.File(name, "w") as h5file:
+ h5group = h5file.create_group("arrays")
+ h5dataset = h5group.create_dataset("scalar", data=10)
+
+ self.assertTrue(utils.is_file(h5file))
+ self.assertTrue(utils.is_group(h5file))
+ self.assertFalse(utils.is_dataset(h5file))
+
+ self.assertFalse(utils.is_file(h5group))
+ self.assertTrue(utils.is_group(h5group))
+ self.assertFalse(utils.is_dataset(h5group))
+
+ self.assertFalse(utils.is_file(h5dataset))
+ self.assertFalse(utils.is_group(h5dataset))
+ self.assertTrue(utils.is_dataset(h5dataset))
+ finally:
+ os.unlink(name)
+
+ def test_h5py_like_file(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ self.h5_class = utils.H5Type.FILE
+
+ obj = Foo()
+ self.assertTrue(utils.is_file(obj))
+ self.assertTrue(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+ def test_h5py_like_group(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ self.h5_class = utils.H5Type.GROUP
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertTrue(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+ def test_h5py_like_dataset(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ self.h5_class = utils.H5Type.DATASET
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertFalse(utils.is_group(obj))
+ self.assertTrue(utils.is_dataset(obj))
+
+ def test_bad(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ pass
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertFalse(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+ def test_bad_api(self):
+
+ class Foo(object):
+
+ def __init__(self):
+ self.h5_class = int
+
+ obj = Foo()
+ self.assertFalse(utils.is_file(obj))
+ self.assertFalse(utils.is_group(obj))
+ self.assertFalse(utils.is_dataset(obj))
+
+
+class TestGetData(unittest.TestCase):
+ """Test `silx.io.utils.get_data` function."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+ cls.createResources(cls.tmp_directory)
+
+ @classmethod
+ def createResources(cls, directory):
+
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/scalar"] = 50
+ h5["group/group/array"] = [1, 2, 3, 4, 5]
+ h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
+ h5.close()
+
+ cls.spec_filename = os.path.join(directory, "test.dat")
+ utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
+ fmt=["%d", "%.2f"], close_file=True, scan_number=1)
+
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
+ fabiofile.append_frame(data=data, header=header)
+ fabiofile.write(cls.edf_multiframe_filename)
+
+ cls.txt_filename = os.path.join(directory, "test.txt")
+ f = io.open(cls.txt_filename, "w+t")
+ f.write(u"Kikoo")
+ f.close()
+
+ cls.missing_filename = os.path.join(directory, "test.missing")
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def test_hdf5_scalar(self):
+ url = "silx:%s?/group/group/scalar" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data, 50)
+
+ def test_hdf5_array(self):
+ url = "silx:%s?/group/group/array" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5,))
+ self.assertEqual(data[0], 1)
+
+ def test_hdf5_array_slice(self):
+ url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5,))
+ self.assertEqual(data[0], 6)
+
+ def test_hdf5_array_slice_out_of_range(self):
+ url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ # ValueError: h5py 2.x
+ # IndexError: h5py 3.x
+ self.assertRaises((ValueError, IndexError), utils.get_data, url)
+
+ def test_edf_using_silx(self):
+ url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_frame(self):
+ url = "fabio:%s?slice=1" % self.edf_multiframe_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_singleframe(self):
+ url = "fabio:%s?slice=0" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_too_much_frames(self):
+ url = "fabio:%s?slice=..." % self.edf_multiframe_filename
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_fabio_no_frame(self):
+ url = "fabio:%s" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_unsupported_scheme(self):
+ url = "foo:/foo/bar"
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_no_scheme(self):
+ url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ self.assertRaises((ValueError, IOError), utils.get_data, url)
+
+ def test_file_not_exists(self):
+ url = "silx:/foo/bar"
+ self.assertRaises(IOError, utils.get_data, url)
+
+
+def _h5_py_version_older_than(version):
+ v_majeur, v_mineur, v_micro = [int(i) for i in h5py.version.version.split('.')[:3]]
+ r_majeur, r_mineur, r_micro = [int(i) for i in version.split('.')]
+ return calc_hexversion(v_majeur, v_mineur, v_micro) >= calc_hexversion(r_majeur, r_mineur, r_micro)
+
+
+@unittest.skipUnless(_h5_py_version_older_than('2.9.0'), 'h5py version < 2.9.0')
+class TestRawFileToH5(unittest.TestCase):
+ """Test conversion of .vol file to .h5 external dataset"""
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self._vol_file = os.path.join(self.tempdir, 'test_vol.vol')
+ self._file_info = os.path.join(self.tempdir, 'test_vol.info.vol')
+ self._dataset_shape = 100, 20, 5
+ data = numpy.random.random(self._dataset_shape[0] *
+ self._dataset_shape[1] *
+ self._dataset_shape[2]).astype(dtype=numpy.float32).reshape(self._dataset_shape)
+ numpy.save(file=self._vol_file, arr=data)
+ # those are storing into .noz file
+ assert os.path.exists(self._vol_file + '.npy')
+ os.rename(self._vol_file + '.npy', self._vol_file)
+ self.h5_file = os.path.join(self.tempdir, 'test_h5.h5')
+ self.external_dataset_path = '/root/my_external_dataset'
+ self._data_url = silx.io.url.DataUrl(file_path=self.h5_file,
+ data_path=self.external_dataset_path)
+ with open(self._file_info, 'w') as _fi:
+ _fi.write('NUM_X = %s\n' % self._dataset_shape[2])
+ _fi.write('NUM_Y = %s\n' % self._dataset_shape[1])
+ _fi.write('NUM_Z = %s\n' % self._dataset_shape[0])
+
+ def tearDown(self):
+ shutil.rmtree(self.tempdir)
+
+ def check_dataset(self, h5_file, data_path, shape):
+ """Make sure the external dataset is valid"""
+ with h5py.File(h5_file, 'r') as _file:
+ return data_path in _file and _file[data_path].shape == shape
+
+ def test_h5_file_not_existing(self):
+ """Test that can create a file with external dataset from scratch"""
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+ os.remove(self.h5_file)
+ utils.vol_to_h5_external_dataset(vol_file=self._vol_file,
+ output_url=self._data_url,
+ info_file=self._file_info)
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+ def test_h5_file_existing(self):
+ """Test that can add the external dataset from an existing file"""
+ with h5py.File(self.h5_file, 'w') as _file:
+ _file['/root/dataset1'] = numpy.zeros((100, 100))
+ _file['/root/group/dataset2'] = numpy.ones((100, 100))
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+ def test_vol_file_not_existing(self):
+ """Make sure error is raised if .vol file does not exists"""
+ os.remove(self._vol_file)
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+ def test_conflicts(self):
+ """Test several conflict cases"""
+ # test if path already exists
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ dtype=numpy.float32)
+ with self.assertRaises(ValueError):
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ overwrite=False,
+ dtype=numpy.float32)
+
+ utils.rawfile_to_h5_external_dataset(bin_file=self._vol_file,
+ output_url=self._data_url,
+ shape=(100, 20, 5),
+ overwrite=True,
+ dtype=numpy.float32)
+
+ self.assertTrue(self.check_dataset(h5_file=self.h5_file,
+ data_path=self.external_dataset_path,
+ shape=self._dataset_shape))
+
+
+class TestH5Strings(unittest.TestCase):
+ """Test HDF5 str and bytes writing and reading"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.vlenstr = h5py.special_dtype(vlen=str)
+ cls.vlenbytes = h5py.special_dtype(vlen=bytes)
+ try:
+ cls.unicode = unicode
+ except NameError:
+ cls.unicode = str
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir)
+
+ def setUp(self):
+ self.file = h5py.File(os.path.join(self.tempdir, 'file.h5'), mode="w")
+
+ def tearDown(self):
+ self.file.close()
+
+ @classmethod
+ def _make_array(cls, value, n):
+ if isinstance(value, bytes):
+ dtype = cls.vlenbytes
+ elif isinstance(value, cls.unicode):
+ dtype = cls.vlenstr
+ else:
+ return numpy.array([value] * n)
+ return numpy.array([value] * n, dtype=dtype)
+
+ @classmethod
+ def _get_charset(cls, value):
+ if isinstance(value, bytes):
+ return h5py.h5t.CSET_ASCII
+ elif isinstance(value, cls.unicode):
+ return h5py.h5t.CSET_UTF8
+ else:
+ return None
+
+ def _check_dataset(self, value, result=None):
+ # Write+read scalar
+ if result:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+ charset = self._get_charset(value)
+ self.file["data"] = value
+ data = utils.h5py_read_dataset(self.file["data"], decode_ascii=decode_ascii)
+ assert type(data) == type(result), data
+ assert data == result, data
+ if charset:
+ assert self.file["data"].id.get_type().get_cset() == charset
+
+ # Write+read variable length
+ self.file["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii, index=0)
+ assert type(data) == type(result), data
+ assert data == result, data
+ data = utils.h5py_read_dataset(self.file["vlen_data"], decode_ascii=decode_ascii)
+ numpy.testing.assert_array_equal(data, [result] * 2)
+ if charset:
+ assert self.file["vlen_data"].id.get_type().get_cset() == charset
+
+ def _check_attribute(self, value, result=None):
+ if result:
+ decode_ascii = True
+ else:
+ decode_ascii = False
+ result = value
+ self.file.attrs["data"] = value
+ data = utils.h5py_read_attribute(self.file.attrs, "data", decode_ascii=decode_ascii)
+ assert type(data) == type(result), data
+ assert data == result, data
+
+ self.file.attrs["vlen_data"] = self._make_array(value, 2)
+ data = utils.h5py_read_attribute(self.file.attrs, "vlen_data", decode_ascii=decode_ascii)
+ assert type(data[0]) == type(result), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ data = utils.h5py_read_attributes(self.file.attrs, decode_ascii=decode_ascii)["vlen_data"]
+ assert type(data[0]) == type(result), data[0]
+ assert data[0] == result, data[0]
+ numpy.testing.assert_array_equal(data, [result] * 2)
+
+ def test_dataset_ascii_bytes(self):
+ self._check_dataset(b"abc")
+
+ def test_attribute_ascii_bytes(self):
+ self._check_attribute(b"abc")
+
+ def test_dataset_ascii_bytes_decode(self):
+ self._check_dataset(b"abc", result="abc")
+
+ def test_attribute_ascii_bytes_decode(self):
+ self._check_attribute(b"abc", result="abc")
+
+ def test_dataset_ascii_str(self):
+ self._check_dataset("abc")
+
+ def test_attribute_ascii_str(self):
+ self._check_attribute("abc")
+
+ def test_dataset_utf8_str(self):
+ self._check_dataset("\u0101bc")
+
+ def test_attribute_utf8_str(self):
+ self._check_attribute("\u0101bc")
+
+ def test_dataset_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc")
+
+ def test_attribute_utf8_bytes(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc")
+
+ def test_dataset_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_dataset(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_attribute_utf8_bytes_decode(self):
+ # 0xC481 is the byte representation of U+0101
+ self._check_attribute(b"\xc4\x81bc", result="\u0101bc")
+
+ def test_dataset_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_dataset(b"\xe423")
+
+ def test_attribute_latin1_bytes(self):
+ # extended ascii character 0xE4
+ self._check_attribute(b"\xe423")
+
+ def test_dataset_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_dataset(b"\xe423", result="\udce423")
+
+ def test_attribute_latin1_bytes_decode(self):
+ # U+DCE4: surrogate for extended ascii character 0xE4
+ self._check_attribute(b"\xe423", result="\udce423")
+
+ def test_dataset_no_string(self):
+ self._check_dataset(numpy.int64(10))
+
+ def test_attribute_no_string(self):
+ self._check_attribute(numpy.int64(10))
+
+
+def test_visitall_hdf5(tmp_path):
+ """visit HDF5 file content not following links"""
+ external_filepath = tmp_path / "external.h5"
+ with h5py.File(external_filepath, mode="w") as h5file:
+ h5file["target/dataset"] = 50
+
+ filepath = tmp_path / "base.h5"
+ with h5py.File(filepath, mode="w") as h5file:
+ h5file["group/dataset"] = 50
+ h5file["link/soft_link"] = h5py.SoftLink("/group/dataset")
+ h5file["link/external_link"] = h5py.ExternalLink("external.h5", "/target/dataset")
+
+ with h5py.File(filepath, mode="r") as h5file:
+ visited_items = {}
+ for path, item in utils.visitall(h5file):
+ if isinstance(item, h5py.Dataset):
+ content = item[()]
+ elif isinstance(item, h5py.Group):
+ content = None
+ elif isinstance(item, h5py.SoftLink):
+ content = item.path
+ elif isinstance(item, h5py.ExternalLink):
+ content = item.filename, item.path
+ else:
+ raise AssertionError("Item should not be present: %s" % path)
+ visited_items[path] = (item.__class__, content)
+
+ assert visited_items == {
+ "/group": (h5py.Group, None),
+ "/group/dataset": (h5py.Dataset, 50),
+ "/link": (h5py.Group, None),
+ "/link/soft_link": (h5py.SoftLink, "/group/dataset"),
+ "/link/external_link": (h5py.ExternalLink, ("external.h5", "/target/dataset")),
+ }
+
+def test_visitall_commonh5():
+ """Visit commonh5 File object"""
+ fobj = commonh5.File("filename.file", mode="w")
+ group = fobj.create_group("group")
+ dataset = group.create_dataset("dataset", data=numpy.array(50))
+ group["soft_link"] = dataset # Create softlink
+
+ visited_items = dict(utils.visitall(fobj))
+ assert len(visited_items) == 3
+ assert visited_items["/group"] is group
+ assert visited_items["/group/dataset"] is dataset
+ soft_link = visited_items["/group/soft_link"]
+ assert isinstance(soft_link, commonh5.SoftLink)
+ assert soft_link.path == "/group/dataset"