summaryrefslogtreecommitdiff
path: root/silx/io/test/test_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/test/test_utils.py')
-rw-r--r--silx/io/test/test_utils.py221
1 files changed, 182 insertions, 39 deletions
diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py
index 09074bb..53e001c 100644
--- a/silx/io/test/test_utils.py
+++ b/silx/io/test/test_utils.py
@@ -30,9 +30,9 @@ import re
import shutil
import tempfile
import unittest
-import sys
from .. import utils
+import silx.io.url
try:
import h5py
@@ -48,7 +48,7 @@ except ImportError:
__authors__ = ["P. Knobel"]
__license__ = "MIT"
-__date__ = "28/09/2017"
+__date__ = "12/02/2018"
expected_spec1 = r"""#F .*
@@ -110,9 +110,9 @@ class TestSave(unittest.TestCase):
def test_save_csv(self):
utils.save1D(self.csv_fname, self.x, self.y,
- xlabel=self.xlab, ylabels=self.ylabs,
- filetype="csv", fmt=["%d", "%.2f", "%.2e"],
- csvdelim=";", autoheader=True)
+ xlabel=self.xlab, ylabels=self.ylabs,
+ filetype="csv", fmt=["%d", "%.2f", "%.2e"],
+ csvdelim=";", autoheader=True)
csvf = open(self.csv_fname)
actual_csv = csvf.read()
@@ -125,22 +125,20 @@ class TestSave(unittest.TestCase):
and converting it to a named record array"""
npyf = open(self.npy_fname, "wb")
utils.save1D(npyf, self.x, self.y,
- xlabel=self.xlab, ylabels=self.ylabs)
+ xlabel=self.xlab, ylabels=self.ylabs)
npyf.close()
npy_recarray = numpy.load(self.npy_fname)
self.assertEqual(npy_recarray.shape, (3,))
- self.assertTrue(
- numpy.array_equal(
- npy_recarray['Ordinate1'],
- numpy.array((4, 5, 6))))
+ self.assertTrue(numpy.array_equal(npy_recarray['Ordinate1'],
+ numpy.array((4, 5, 6))))
def test_savespec_filename(self):
"""Save SpecFile using savespec()"""
utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab,
- ylabel=self.ylabs[0], fmt=["%d", "%.2f"], close_file=True,
- scan_number=1)
+ ylabel=self.ylabs[0], fmt=["%d", "%.2f"],
+ close_file=True, scan_number=1)
specf = open(self.spec_fname)
actual_spec = specf.read()
@@ -152,15 +150,15 @@ class TestSave(unittest.TestCase):
"""Save SpecFile using savespec(), passing a file handle"""
# first savespec: open, write file header, save y[0] as scan 1,
# return file handle
- specf = utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab,
- ylabel=self.ylabs[0], fmt=["%d", "%.2f"],
- close_file=False)
+ specf = utils.savespec(self.spec_fname, self.x, self.y[0],
+ xlabel=self.xlab, ylabel=self.ylabs[0],
+ fmt=["%d", "%.2f"], close_file=False)
# second savespec: save y[1] as scan 2, close file
utils.savespec(specf, self.x, self.y[1], xlabel=self.xlab,
- ylabel=self.ylabs[1], fmt=["%d", "%.2f"],
- write_file_header=False, close_file=True,
- scan_number=2)
+ ylabel=self.ylabs[1], fmt=["%d", "%.2f"],
+ write_file_header=False, close_file=True,
+ scan_number=2)
specf = open(self.spec_fname)
actual_spec = specf.read()
@@ -171,7 +169,7 @@ class TestSave(unittest.TestCase):
def test_save_spec(self):
"""Save SpecFile using save()"""
utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab,
- ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
+ ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
specf = open(self.spec_fname)
actual_spec = specf.read()
@@ -192,7 +190,7 @@ class TestSave(unittest.TestCase):
self.y = [[4, 5, 6], [7, 8, 9]]
self.ylabs = ["Ordinate1", "Ordinate2"]
utils.save1D(self.csv_fname, self.x, self.y,
- autoheader=True, fmt=["%d", "%.2f", "%.2e"])
+ autoheader=True, fmt=["%d", "%.2f", "%.2e"])
csvf = open(self.csv_fname)
actual_csv = csvf.read()
@@ -242,15 +240,12 @@ class TestH5Ls(unittest.TestCase):
self.assertIn("+foo", lines)
self.assertIn("\t+bar", lines)
- self.assertMatchAnyStringInList(
- r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">',
- lines)
- self.assertMatchAnyStringInList(
- r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">',
- lines)
- self.assertMatchAnyStringInList(
- r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">',
- lines)
+ match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">'
+ self.assertMatchAnyStringInList(match, lines)
os.unlink(self.h5_fname)
@@ -325,12 +320,6 @@ class TestOpen(unittest.TestCase):
@classmethod
def tearDownClass(cls):
- if sys.platform == "win32" and fabio is not None:
- # gc collect is needed to close a file descriptor
- # opened by fabio and not released.
- # https://github.com/silx-kit/fabio/issues/167
- import gc
- gc.collect()
shutil.rmtree(cls.tmp_directory)
def testH5(self):
@@ -410,6 +399,30 @@ class TestOpen(unittest.TestCase):
# load it
self.assertRaises(IOError, utils.open, self.missing_filename)
+ def test_silx_scheme(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/")
+ with utils.open(url.path()) as f:
+ self.assertIsNotNone(f)
+ self.assertTrue(silx.io.utils.is_file(f))
+
+ def test_fabio_scheme(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ if fabio is None:
+ self.skipTest("Fabio is missing")
+ url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_bad_url(self):
+ url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_sliced_url(self):
+ url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,))
+ self.assertRaises(IOError, utils.open, url.path())
+
class TestNodes(unittest.TestCase):
"""Test `silx.io.utils.is_` functions."""
@@ -443,7 +456,7 @@ class TestNodes(unittest.TestCase):
class Foo(object):
def __init__(self):
- self.h5py_class = h5py.File
+ self.h5_class = utils.H5Type.FILE
obj = Foo()
self.assertTrue(utils.is_file(obj))
self.assertTrue(utils.is_group(obj))
@@ -455,7 +468,7 @@ class TestNodes(unittest.TestCase):
class Foo(object):
def __init__(self):
- self.h5py_class = h5py.Group
+ self.h5_class = utils.H5Type.GROUP
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertTrue(utils.is_group(obj))
@@ -467,7 +480,7 @@ class TestNodes(unittest.TestCase):
class Foo(object):
def __init__(self):
- self.h5py_class = h5py.Dataset
+ self.h5_class = utils.H5Type.DATASET
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
@@ -491,13 +504,142 @@ class TestNodes(unittest.TestCase):
class Foo(object):
def __init__(self):
- self.h5py_class = int
+ self.h5_class = int
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
self.assertFalse(utils.is_dataset(obj))
+class TestGetData(unittest.TestCase):
+ """Test `silx.io.utils.get_data` function."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+ cls.createResources(cls.tmp_directory)
+
+ @classmethod
+ def createResources(cls, directory):
+
+ if h5py is not None:
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/scalar"] = 50
+ h5["group/group/array"] = [1, 2, 3, 4, 5]
+ h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
+ h5.close()
+
+ cls.spec_filename = os.path.join(directory, "test.dat")
+ utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
+ fmt=["%d", "%.2f"], close_file=True, scan_number=1)
+
+ if fabio is not None:
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
+ fabiofile.appendFrame(data=data, header=header)
+ fabiofile.write(cls.edf_multiframe_filename)
+
+ cls.txt_filename = os.path.join(directory, "test.txt")
+ f = io.open(cls.txt_filename, "w+t")
+ f.write(u"Kikoo")
+ f.close()
+
+ cls.missing_filename = os.path.join(directory, "test.missing")
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def test_hdf5_scalar(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = "silx:%s?/group/group/scalar" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data, 50)
+
+ def test_hdf5_array(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = "silx:%s?/group/group/array" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5, ))
+ self.assertEqual(data[0], 1)
+
+ def test_hdf5_array_slice(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5, ))
+ self.assertEqual(data[0], 6)
+
+ def test_hdf5_array_slice_out_of_range(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_edf_using_silx(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_frame(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "fabio:%s?slice=1" % self.edf_multiframe_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_singleframe(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "fabio:%s?slice=0" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_too_much_frames(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "fabio:%s?slice=..." % self.edf_multiframe_filename
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_fabio_no_frame(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "fabio:%s" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_unsupported_scheme(self):
+ url = "foo:/foo/bar"
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_no_scheme(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ self.assertRaises((ValueError, IOError), utils.get_data, url)
+
+ def test_file_not_exists(self):
+ url = "silx:/foo/bar"
+ self.assertRaises(IOError, utils.get_data, url)
+
+
def suite():
loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
test_suite = unittest.TestSuite()
@@ -505,6 +647,7 @@ def suite():
test_suite.addTest(loadTests(TestH5Ls))
test_suite.addTest(loadTests(TestOpen))
test_suite.addTest(loadTests(TestNodes))
+ test_suite.addTest(loadTests(TestGetData))
return test_suite