summaryrefslogtreecommitdiff
path: root/silx/io/test
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/test')
-rw-r--r--silx/io/test/__init__.py4
-rw-r--r--silx/io/test/test_dictdump.py25
-rw-r--r--silx/io/test/test_fabioh5.py181
-rw-r--r--silx/io/test/test_nxdata.py271
-rw-r--r--silx/io/test/test_specfile.py32
-rw-r--r--silx/io/test/test_specfilewrapper.py17
-rw-r--r--silx/io/test/test_spech5.py72
-rw-r--r--silx/io/test/test_spectoh5.py23
-rw-r--r--silx/io/test/test_url.py209
-rw-r--r--silx/io/test/test_utils.py221
10 files changed, 900 insertions, 155 deletions
diff --git a/silx/io/test/__init__.py b/silx/io/test/__init__.py
index 2510ae2..a309ee9 100644
--- a/silx/io/test/__init__.py
+++ b/silx/io/test/__init__.py
@@ -24,7 +24,7 @@
__authors__ = ["T. Vincent", "P. Knobel"]
__license__ = "MIT"
-__date__ = "20/09/2017"
+__date__ = "08/12/2017"
import unittest
@@ -39,6 +39,7 @@ from .test_utils import suite as test_utils_suite
from .test_nxdata import suite as test_nxdata_suite
from .test_commonh5 import suite as test_commonh5_suite
from .test_rawh5 import suite as test_rawh5_suite
+from .test_url import suite as test_url_suite
def suite():
@@ -54,4 +55,5 @@ def suite():
test_suite.addTest(test_nxdata_suite())
test_suite.addTest(test_commonh5_suite())
test_suite.addTest(test_rawh5_suite())
+ test_suite.addTest(test_url_suite())
return test_suite
diff --git a/silx/io/test/test_dictdump.py b/silx/io/test/test_dictdump.py
index 15d5fdc..9cd054c 100644
--- a/silx/io/test/test_dictdump.py
+++ b/silx/io/test/test_dictdump.py
@@ -25,7 +25,7 @@
__authors__ = ["P. Knobel"]
__license__ = "MIT"
-__date__ = "10/02/2017"
+__date__ = "17/01/2018"
from collections import OrderedDict
import numpy
@@ -41,9 +41,12 @@ except ImportError:
from collections import defaultdict
+from silx.utils.testutils import TestLogging
+
from ..configdict import ConfigDict
-from ..dictdump import dicttoh5, dicttojson, dicttoini, dump
+from ..dictdump import dicttoh5, dicttojson, dump
from ..dictdump import h5todict, load
+from ..dictdump import logger as dictdump_logger
def tree():
@@ -93,6 +96,24 @@ class TestDictToH5(unittest.TestCase):
min(ddict["city attributes"]["Europe"]["France"]["Grenoble"]["coordinates"]),
5.7196)
+ def testH5Overwrite(self):
+ dd = ConfigDict({'t': True})
+
+ dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a')
+ dd = ConfigDict({'t': False})
+ with TestLogging(dictdump_logger, warning=1):
+ dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a',
+ overwrite_data=False)
+
+ res = h5todict(self.h5_fname)
+ assert(res['t'] == True)
+
+ dicttoh5(h5file=self.h5_fname, treedict=dd, mode='a',
+ overwrite_data=True)
+
+ res = h5todict(self.h5_fname)
+ assert(res['t'] == False)
+
@unittest.skipIf(h5py_missing, "Could not import h5py")
class TestH5ToDict(unittest.TestCase):
diff --git a/silx/io/test/test_fabioh5.py b/silx/io/test/test_fabioh5.py
index d9459ae..5fbf0d0 100644
--- a/silx/io/test/test_fabioh5.py
+++ b/silx/io/test/test_fabioh5.py
@@ -1,6 +1,6 @@
# coding: utf-8
# /*##########################################################################
-# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+# Copyright (C) 2016-2018 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@@ -25,10 +25,9 @@
__authors__ = ["V. Valls"]
__license__ = "MIT"
-__date__ = "04/10/2017"
+__date__ = "24/02/2018"
import os
-import sys
import logging
import numpy
import unittest
@@ -289,6 +288,28 @@ class TestFabioH5(unittest.TestCase):
self.assertIn(d.dtype.char, ['d', 'f'])
numpy.testing.assert_array_almost_equal(d[...], expected)
+ def test_interpretation_mca_edf(self):
+ """EDF files with two or more headers starting with "MCA"
+ must have @interpretation = "spectrum" an the data."""
+ header = {
+ "Title": "zapimage samy -4.975 -5.095 80 500 samz -4.091 -4.171 70 0",
+ "MCA a": -23.812,
+ "MCA b": 2.7107,
+ "MCA c": 8.1164e-06}
+
+ data = numpy.array([[0, 0], [0, 0]], dtype=numpy.int8)
+ fabio_image = fabio.edfimage.EdfImage(data=data, header=header)
+ h5_image = fabioh5.File(fabio_image=fabio_image)
+
+ data_dataset = h5_image["/scan_0/measurement/image_0/data"]
+ self.assertEquals(data_dataset.attrs["interpretation"], "spectrum")
+
+ data_dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEquals(data_dataset.attrs["interpretation"], "spectrum")
+
+ data_dataset = h5_image["/scan_0/measurement/image_0/info/data"]
+ self.assertEquals(data_dataset.attrs["interpretation"], "spectrum")
+
def test_get_api(self):
result = self.h5_image.get("scan_0", getclass=True, getlink=True)
self.assertIs(result, h5py.HardLink)
@@ -356,6 +377,95 @@ class TestFabioH5(unittest.TestCase):
self.assertIsInstance(data[...], numpy.ndarray)
+class TestFabioH5MultiFrames(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ if fabio is None:
+ raise unittest.SkipTest("fabio is needed")
+ if h5py is None:
+ raise unittest.SkipTest("h5py is needed")
+
+ names = ["A", "B", "C", "D"]
+ values = [["32000", "-10", "5.0", "1"],
+ ["-32000", "-10", "5.0", "1"]]
+
+ fabio_file = None
+
+ for i in range(10):
+ header = {
+ "image_id": "%d" % i,
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ "motor_mne": " ".join(names),
+ "motor_pos": " ".join(values[i % len(values)]),
+ "counter_mne": " ".join(names),
+ "counter_pos": " ".join(values[i % len(values)])
+ }
+ for iname, name in enumerate(names):
+ header[name] = values[i % len(values)][iname]
+
+ data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ if fabio_file is None:
+ fabio_file = fabio.edfimage.EdfImage(data=data, header=header)
+ else:
+ fabio_file.appendFrame(data=data, header=header)
+
+ cls.fabio_file = fabio_file
+ cls.fabioh5 = fabioh5.File(fabio_image=fabio_file)
+
+ def test_others(self):
+ others = self.fabioh5["/scan_0/instrument/detector_0/others"]
+ dataset = others["A"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = others["B"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = others["C"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "f")
+ dataset = others["D"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 1)
+ self.assertEqual(dataset.dtype.kind, "u")
+
+ def test_positioners(self):
+ counters = self.fabioh5["/scan_0/instrument/positioners"]
+ # At least 32 bits, no unsigned values
+ dataset = counters["A"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["B"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["C"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "f")
+ dataset = counters["D"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+
+ def test_counters(self):
+ counters = self.fabioh5["/scan_0/measurement"]
+ # At least 32 bits, no unsigned values
+ dataset = counters["A"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["B"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+ dataset = counters["C"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "f")
+ dataset = counters["D"]
+ self.assertGreaterEqual(dataset.dtype.itemsize, 4)
+ self.assertEqual(dataset.dtype.kind, "i")
+
+
class TestFabioH5WithEdf(unittest.TestCase):
@classmethod
@@ -388,12 +498,6 @@ class TestFabioH5WithEdf(unittest.TestCase):
def tearDownClass(cls):
cls.fabio_image = None
cls.h5_image = None
- if sys.platform == "win32" and fabio is not None:
- # gc collect is needed to close a file descriptor
- # opened by fabio and not released.
- # https://github.com/silx-kit/fabio/issues/167
- import gc
- gc.collect()
shutil.rmtree(cls.tmp_directory)
def test_reserved_format_metadata(self):
@@ -406,11 +510,70 @@ class TestFabioH5WithEdf(unittest.TestCase):
self.assertNotIn("/scan_0/instrument/detector_0/others/HeaderID", self.h5_image)
+class TestFabioH5WithFileSeries(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ if fabio is None:
+ raise unittest.SkipTest("fabio is needed")
+ if h5py is None:
+ raise unittest.SkipTest("h5py is needed")
+
+ cls.tmp_directory = tempfile.mkdtemp()
+
+ cls.edf_filenames = []
+
+ for i in range(10):
+ filename = os.path.join(cls.tmp_directory, "test_%04d.edf" % i)
+ cls.edf_filenames.append(filename)
+
+ header = {
+ "image_id": "%d" % i,
+ "integer": "-100",
+ "float": "1.0",
+ "string": "hi!",
+ "list_integer": "100 50 0",
+ "list_float": "1.0 2.0 3.5",
+ "string_looks_like_list": "2000 hi!",
+ }
+ data = numpy.array([[i, 11], [12, 13], [14, 15]], dtype=numpy.int64)
+ fabio_image = fabio.edfimage.edfimage(data, header)
+ fabio_image.write(filename)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def _testH5Image(self, h5_image):
+ # test data
+ dataset = h5_image["/scan_0/instrument/detector_0/data"]
+ self.assertEquals(dataset.h5py_class, h5py.Dataset)
+ self.assertTrue(isinstance(dataset[()], numpy.ndarray))
+ self.assertEquals(dataset.dtype.kind, "i")
+ self.assertEquals(dataset.shape, (10, 3, 2))
+ self.assertEquals(list(dataset[:, 0, 0]), list(range(10)))
+ self.assertEquals(dataset.attrs["interpretation"], "image")
+ # test metatdata
+ dataset = h5_image["/scan_0/instrument/detector_0/others/image_id"]
+ self.assertEquals(list(dataset[...]), list(range(10)))
+
+ def testFileList(self):
+ h5_image = fabioh5.File(file_series=self.edf_filenames)
+ self._testH5Image(h5_image)
+
+ def testFileSeries(self):
+ file_series = fabioh5._FileSeries(self.edf_filenames)
+ h5_image = fabioh5.File(file_series=file_series)
+ self._testH5Image(h5_image)
+
+
def suite():
loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
test_suite = unittest.TestSuite()
test_suite.addTest(loadTests(TestFabioH5))
+ test_suite.addTest(loadTests(TestFabioH5MultiFrames))
test_suite.addTest(loadTests(TestFabioH5WithEdf))
+ test_suite.addTest(loadTests(TestFabioH5WithFileSeries))
return test_suite
diff --git a/silx/io/test/test_nxdata.py b/silx/io/test/test_nxdata.py
index f891db9..f7ea8a4 100644
--- a/silx/io/test/test_nxdata.py
+++ b/silx/io/test/test_nxdata.py
@@ -1,6 +1,6 @@
# coding: utf-8
# /*##########################################################################
-# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+# Copyright (C) 2016-2018 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@@ -25,7 +25,7 @@
__authors__ = ["P. Knobel"]
__license__ = "MIT"
-__date__ = "27/09/2016"
+__date__ = "27/01/2018"
try:
import h5py
@@ -36,6 +36,10 @@ import tempfile
import unittest
from .. import nxdata
+from silx.third_party import six
+
+text_dtype = h5py.special_dtype(vlen=six.text_type)
+
@unittest.skipIf(h5py is None, "silx.io.nxdata tests depend on h5py")
class TestNXdata(unittest.TestCase):
@@ -71,9 +75,16 @@ class TestNXdata(unittest.TestCase):
g1d0 = g1d.create_group("1D_spectrum")
g1d0.attrs["NX_class"] = "NXdata"
g1d0.attrs["signal"] = "count"
+ g1d0.attrs["auxiliary_signals"] = numpy.array(["count2", "count3"],
+ dtype=text_dtype)
g1d0.attrs["axes"] = "energy_calib"
- g1d0.attrs["uncertainties"] = b"energy_errors",
+ g1d0.attrs["uncertainties"] = numpy.array(["energy_errors", ],
+ dtype=text_dtype)
g1d0.create_dataset("count", data=numpy.arange(10))
+ g1d0.create_dataset("count2", data=0.5 * numpy.arange(10))
+ d = g1d0.create_dataset("count3", data=0.4 * numpy.arange(10))
+ d.attrs["long_name"] = "3rd counter"
+ g1d0.create_dataset("title", data="Title as dataset (like nexpy)")
g1d0.create_dataset("energy_calib", data=(10, 5)) # 10 * idx + 5
g1d0.create_dataset("energy_errors", data=3.14 * numpy.random.rand(10))
@@ -86,7 +97,7 @@ class TestNXdata(unittest.TestCase):
g1d2 = g1d.create_group("4D_spectra")
g1d2.attrs["NX_class"] = "NXdata"
g1d2.attrs["signal"] = "counts"
- g1d2.attrs["axes"] = b"energy",
+ g1d2.attrs["axes"] = numpy.array(["energy", ], dtype=text_dtype)
ds = g1d2.create_dataset("counts", data=numpy.arange(2 * 2 * 3 * 10).reshape((2, 2, 3, 10)))
ds.attrs["interpretation"] = "spectrum"
ds = g1d2.create_dataset("errors", data=4.5 * numpy.random.rand(2, 2, 3, 10))
@@ -103,8 +114,11 @@ class TestNXdata(unittest.TestCase):
g2d0 = g2d.create_group("2D_regular_image")
g2d0.attrs["NX_class"] = "NXdata"
g2d0.attrs["signal"] = "image"
- g2d0.attrs["axes"] = b"rows_calib", b"columns_coordinates"
+ g2d0.attrs["auxiliary_signals"] = "image2"
+ g2d0.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"],
+ dtype=text_dtype)
g2d0.create_dataset("image", data=numpy.arange(4 * 6).reshape((4, 6)))
+ g2d0.create_dataset("image2", data=numpy.arange(4 * 6).reshape((4, 6)))
ds = g2d0.create_dataset("rows_calib", data=(10, 5))
ds.attrs["long_name"] = "Calibrated Y"
g2d0.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6))
@@ -112,7 +126,9 @@ class TestNXdata(unittest.TestCase):
g2d1 = g2d.create_group("2D_irregular_data")
g2d1.attrs["NX_class"] = "NXdata"
g2d1.attrs["signal"] = "data"
- g2d1.attrs["axes"] = b"rows_coordinates", b"columns_coordinates"
+ g2d1.attrs["title"] = "Title as group attr"
+ g2d1.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"],
+ dtype=text_dtype)
g2d1.create_dataset("data", data=numpy.arange(64 * 128).reshape((64, 128)))
g2d1.create_dataset("rows_coordinates", data=numpy.arange(64) + numpy.random.rand(64))
g2d1.create_dataset("columns_coordinates", data=numpy.arange(128) + 2.5 * numpy.random.rand(128))
@@ -126,19 +142,33 @@ class TestNXdata(unittest.TestCase):
g2d3 = g2d.create_group("5D_images")
g2d3.attrs["NX_class"] = "NXdata"
g2d3.attrs["signal"] = "images"
- g2d3.attrs["axes"] = b"rows_coordinates", b"columns_coordinates"
+ g2d3.attrs["axes"] = numpy.array(["rows_coordinates", "columns_coordinates"],
+ dtype=text_dtype)
ds = g2d3.create_dataset("images", data=numpy.arange(2 * 2 * 2 * 4 * 6).reshape((2, 2, 2, 4, 6)))
ds.attrs["interpretation"] = "image"
g2d3.create_dataset("rows_coordinates", data=5 + 10 * numpy.arange(4))
g2d3.create_dataset("columns_coordinates", data=0.5 + 0.02 * numpy.arange(6))
+ g2d4 = g2d.create_group("RGBA_image")
+ g2d4.attrs["NX_class"] = "NXdata"
+ g2d4.attrs["signal"] = "image"
+ g2d4.attrs["axes"] = numpy.array(["rows_calib", "columns_coordinates"],
+ dtype=text_dtype)
+ rgba_image = numpy.linspace(0, 1, num=7*8*3).reshape((7, 8, 3))
+ rgba_image[:, :, 1] = 1 - rgba_image[:, :, 1] # invert G channel to add some color
+ ds = g2d4.create_dataset("image", data=rgba_image)
+ ds.attrs["interpretation"] = "rgba-image"
+ ds = g2d4.create_dataset("rows_calib", data=(10, 5))
+ ds.attrs["long_name"] = "Calibrated Y"
+ g2d4.create_dataset("columns_coordinates", data=0.5+0.02*numpy.arange(8))
+
# SCATTER
g = self.h5f.create_group("scatters")
gd0 = g.create_group("x_y_scatter")
gd0.attrs["NX_class"] = "NXdata"
gd0.attrs["signal"] = "y"
- gd0.attrs["axes"] = b"x",
+ gd0.attrs["axes"] = numpy.array(["x", ], dtype=text_dtype)
gd0.create_dataset("y", data=numpy.random.rand(128) - 0.5)
gd0.create_dataset("x", data=2 * numpy.random.rand(128))
gd0.create_dataset("x_errors", data=0.05 * numpy.random.rand(128))
@@ -147,7 +177,7 @@ class TestNXdata(unittest.TestCase):
gd1 = g.create_group("x_y_value_scatter")
gd1.attrs["NX_class"] = "NXdata"
gd1.attrs["signal"] = "values"
- gd1.attrs["axes"] = b"x", b"y"
+ gd1.attrs["axes"] = numpy.array(["x", "y"], dtype=text_dtype)
gd1.create_dataset("values", data=3.14 * numpy.random.rand(128))
gd1.create_dataset("y", data=numpy.random.rand(128))
gd1.create_dataset("y_errors", data=0.02 * numpy.random.rand(128))
@@ -199,6 +229,7 @@ class TestNXdata(unittest.TestCase):
def testSpectra(self):
nxd = nxdata.NXdata(self.h5f["spectra/1D_spectrum"])
self.assertTrue(nxd.signal_is_1d)
+ self.assertTrue(nxd.is_curve)
self.assertTrue(numpy.array_equal(numpy.array(nxd.signal),
numpy.arange(10)))
self.assertEqual(nxd.axes_names, ["energy_calib"])
@@ -208,9 +239,18 @@ class TestNXdata(unittest.TestCase):
self.assertIsNone(nxd.errors)
self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
self.assertIsNone(nxd.interpretation)
+ self.assertEqual(nxd.title, "Title as dataset (like nexpy)")
+
+ self.assertEqual(nxd.auxiliary_signals_dataset_names,
+ ["count2", "count3"])
+ self.assertEqual(nxd.auxiliary_signals_names,
+ ["count2", "3rd counter"])
+ self.assertAlmostEqual(nxd.auxiliary_signals[1][2],
+ 0.8) # numpy.arange(10) * 0.4
nxd = nxdata.NXdata(self.h5f["spectra/2D_spectra"])
self.assertTrue(nxd.signal_is_2d)
+ self.assertTrue(nxd.is_curve)
self.assertEqual(nxd.axes_names, [None, None])
self.assertEqual(nxd.axes_dataset_names, [None, None])
self.assertEqual(nxd.axes, [None, None])
@@ -221,6 +261,7 @@ class TestNXdata(unittest.TestCase):
nxd = nxdata.NXdata(self.h5f["spectra/4D_spectra"])
self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or
nxd.signal_is_2d or nxd.signal_is_3d)
+ self.assertTrue(nxd.is_curve)
self.assertEqual(nxd.axes_names,
[None, None, None, "Calibrated energy"])
self.assertEqual(nxd.axes_dataset_names,
@@ -242,15 +283,19 @@ class TestNXdata(unittest.TestCase):
def testImages(self):
nxd = nxdata.NXdata(self.h5f["images/2D_regular_image"])
self.assertTrue(nxd.signal_is_2d)
+ self.assertTrue(nxd.is_image)
self.assertEqual(nxd.axes_names, ["Calibrated Y", "columns_coordinates"])
self.assertEqual(list(nxd.axes_dataset_names),
["rows_calib", "columns_coordinates"])
self.assertIsNone(nxd.errors)
self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
self.assertIsNone(nxd.interpretation)
+ self.assertEqual(len(nxd.auxiliary_signals), 1)
+ self.assertEqual(nxd.auxiliary_signals_names, ["image2"])
nxd = nxdata.NXdata(self.h5f["images/2D_irregular_data"])
self.assertTrue(nxd.signal_is_2d)
+ self.assertTrue(nxd.is_image)
self.assertEqual(nxd.axes_dataset_names, nxd.axes_names)
self.assertEqual(list(nxd.axes_dataset_names),
@@ -259,8 +304,10 @@ class TestNXdata(unittest.TestCase):
self.assertIsNone(nxd.errors)
self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
self.assertIsNone(nxd.interpretation)
+ self.assertEqual(nxd.title, "Title as group attr")
nxd = nxdata.NXdata(self.h5f["images/5D_images"])
+ self.assertTrue(nxd.is_image)
self.assertFalse(nxd.signal_is_0d or nxd.signal_is_1d or
nxd.signal_is_2d or nxd.signal_is_3d)
self.assertEqual(nxd.axes_names,
@@ -271,6 +318,16 @@ class TestNXdata(unittest.TestCase):
self.assertFalse(nxd.is_scatter or nxd.is_x_y_value_scatter)
self.assertEqual(nxd.interpretation, "image")
+ nxd = nxdata.NXdata(self.h5f["images/RGBA_image"])
+ self.assertTrue(nxd.is_image)
+ self.assertEqual(nxd.interpretation, "rgba-image")
+ self.assertTrue(nxd.signal_is_3d)
+ self.assertEqual(nxd.axes_names, ["Calibrated Y",
+ "columns_coordinates",
+ None])
+ self.assertEqual(list(nxd.axes_dataset_names),
+ ["rows_calib", "columns_coordinates", None])
+
def testScatters(self):
nxd = nxdata.NXdata(self.h5f["scatters/x_y_scatter"])
self.assertTrue(nxd.signal_is_1d)
@@ -301,10 +358,206 @@ class TestNXdata(unittest.TestCase):
self.assertIsNone(nxd.interpretation)
+@unittest.skipIf(h5py is None, "silx.io.nxdata tests depend on h5py")
+class TestLegacyNXdata(unittest.TestCase):
+ def setUp(self):
+ tmp = tempfile.NamedTemporaryFile(prefix="nxdata_legacy_examples_",
+ suffix=".h5", delete=True)
+ tmp.file.close()
+ self.h5fname = tmp.name
+ self.h5f = h5py.File(tmp.name, "w")
+
+ def tearDown(self):
+ self.h5f.close()
+
+ def testSignalAttrOnDataset(self):
+ g = self.h5f.create_group("2D")
+ g.attrs["NX_class"] = "NXdata"
+
+ ds0 = g.create_dataset("image0",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds0.attrs["signal"] = 1
+ ds0.attrs["long_name"] = "My first image"
+
+ ds1 = g.create_dataset("image1",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds1.attrs["signal"] = "2"
+ ds1.attrs["long_name"] = "My 2nd image"
+
+ ds2 = g.create_dataset("image2",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds2.attrs["signal"] = 3
+
+ nxd = nxdata.NXdata(self.h5f["2D"])
+
+ self.assertEqual(nxd.signal_dataset_name, "image0")
+ self.assertEqual(nxd.signal_name, "My first image")
+ self.assertEqual(nxd.signal.shape,
+ (4, 6))
+
+ self.assertEqual(len(nxd.auxiliary_signals), 2)
+ self.assertEqual(nxd.auxiliary_signals[1].shape,
+ (4, 6))
+
+ self.assertEqual(nxd.auxiliary_signals_dataset_names,
+ ["image1", "image2"])
+ self.assertEqual(nxd.auxiliary_signals_names,
+ ["My 2nd image", "image2"])
+
+ def testAxesOnSignalDataset(self):
+ g = self.h5f.create_group("2D")
+ g.attrs["NX_class"] = "NXdata"
+
+ ds0 = g.create_dataset("image0",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds0.attrs["signal"] = 1
+ ds0.attrs["axes"] = "yaxis:xaxis"
+
+ ds1 = g.create_dataset("yaxis",
+ data=numpy.arange(4))
+ ds2 = g.create_dataset("xaxis",
+ data=numpy.arange(6))
+
+ nxd = nxdata.NXdata(self.h5f["2D"])
+
+ self.assertEqual(nxd.axes_dataset_names,
+ ["yaxis", "xaxis"])
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ numpy.arange(4)))
+ self.assertTrue(numpy.array_equal(nxd.axes[1],
+ numpy.arange(6)))
+
+ def testAxesOnAxesDatasets(self):
+ g = self.h5f.create_group("2D")
+ g.attrs["NX_class"] = "NXdata"
+
+ ds0 = g.create_dataset("image0",
+ data=numpy.arange(4 * 6).reshape((4, 6)))
+ ds0.attrs["signal"] = 1
+ ds1 = g.create_dataset("yaxis",
+ data=numpy.arange(4))
+ ds1.attrs["axis"] = 0
+ ds2 = g.create_dataset("xaxis",
+ data=numpy.arange(6))
+ ds2.attrs["axis"] = "1"
+
+ nxd = nxdata.NXdata(self.h5f["2D"])
+ self.assertEqual(nxd.axes_dataset_names,
+ ["yaxis", "xaxis"])
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ numpy.arange(4)))
+ self.assertTrue(numpy.array_equal(nxd.axes[1],
+ numpy.arange(6)))
+
+
+class TestSaveNXdata(unittest.TestCase):
+ def setUp(self):
+ tmp = tempfile.NamedTemporaryFile(prefix="nxdata",
+ suffix=".h5", delete=True)
+ tmp.file.close()
+ self.h5fname = tmp.name
+
+ def testSimpleSave(self):
+ sig = numpy.array([0, 1, 2])
+ a0 = numpy.array([2, 3, 4])
+ a1 = numpy.array([3, 4, 5])
+ nxdata.save_NXdata(filename=self.h5fname,
+ signal=sig,
+ axes=[a0, a1],
+ signal_name="sig",
+ axes_names=["a0", "a1"],
+ nxentry_name="a",
+ nxdata_name="mydata")
+
+ h5f = h5py.File(self.h5fname, "r")
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"]))
+
+ nxd = nxdata.NXdata(h5f["/a/mydata"])
+ self.assertTrue(numpy.array_equal(nxd.signal,
+ sig))
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ a0))
+
+ h5f.close()
+
+ def testSimplestSave(self):
+ sig = numpy.array([0, 1, 2])
+ nxdata.save_NXdata(filename=self.h5fname,
+ signal=sig)
+
+ h5f = h5py.File(self.h5fname, "r")
+
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["/entry/data0"]))
+
+ nxd = nxdata.NXdata(h5f["/entry/data0"])
+ self.assertTrue(numpy.array_equal(nxd.signal,
+ sig))
+ h5f.close()
+
+ def testSaveDefaultAxesNames(self):
+ sig = numpy.array([0, 1, 2])
+ a0 = numpy.array([2, 3, 4])
+ a1 = numpy.array([3, 4, 5])
+ nxdata.save_NXdata(filename=self.h5fname,
+ signal=sig,
+ axes=[a0, a1],
+ signal_name="sig",
+ axes_names=None,
+ axes_long_names=["a", "b"],
+ nxentry_name="a",
+ nxdata_name="mydata")
+
+ h5f = h5py.File(self.h5fname, "r")
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["a/mydata"]))
+
+ nxd = nxdata.NXdata(h5f["/a/mydata"])
+ self.assertTrue(numpy.array_equal(nxd.signal,
+ sig))
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ a0))
+ self.assertEqual(nxd.axes_dataset_names,
+ [u"dim0", u"dim1"])
+ self.assertEqual(nxd.axes_names,
+ [u"a", u"b"])
+
+ h5f.close()
+
+ def testSaveToExistingEntry(self):
+ h5f = h5py.File(self.h5fname, "w")
+ g = h5f.create_group("myentry")
+ g.attrs["NX_class"] = "NXentry"
+ h5f.close()
+
+ sig = numpy.array([0, 1, 2])
+ a0 = numpy.array([2, 3, 4])
+ a1 = numpy.array([3, 4, 5])
+ nxdata.save_NXdata(filename=self.h5fname,
+ signal=sig,
+ axes=[a0, a1],
+ signal_name="sig",
+ axes_names=["a0", "a1"],
+ nxentry_name="myentry",
+ nxdata_name="toto")
+
+ h5f = h5py.File(self.h5fname, "r")
+ self.assertTrue(nxdata.is_valid_nxdata(h5f["myentry/toto"]))
+
+ nxd = nxdata.NXdata(h5f["myentry/toto"])
+ self.assertTrue(numpy.array_equal(nxd.signal,
+ sig))
+ self.assertTrue(numpy.array_equal(nxd.axes[0],
+ a0))
+ h5f.close()
+
+
def suite():
test_suite = unittest.TestSuite()
test_suite.addTest(
unittest.defaultTestLoader.loadTestsFromTestCase(TestNXdata))
+ test_suite.addTest(
+ unittest.defaultTestLoader.loadTestsFromTestCase(TestLegacyNXdata))
+ test_suite.addTest(
+ unittest.defaultTestLoader.loadTestsFromTestCase(TestSaveNXdata))
return test_suite
diff --git a/silx/io/test/test_specfile.py b/silx/io/test/test_specfile.py
index 0aef2e2..9236fee 100644
--- a/silx/io/test/test_specfile.py
+++ b/silx/io/test/test_specfile.py
@@ -25,9 +25,9 @@
__authors__ = ["P. Knobel", "V.A. Sole"]
__license__ = "MIT"
-__date__ = "03/08/2017"
+__date__ = "17/01/2018"
+
-import gc
import locale
import logging
import numpy
@@ -36,7 +36,7 @@ import sys
import tempfile
import unittest
-from silx.test import utils
+from silx.utils import testutils
from ..specfile import SpecFile, Scan
from .. import specfile
@@ -136,14 +136,14 @@ class TestSpecFile(unittest.TestCase):
@classmethod
def setUpClass(cls):
fd, cls.fname1 = tempfile.mkstemp(text=False)
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd, sftext)
else:
os.write(fd, bytes(sftext, 'ascii'))
os.close(fd)
fd2, cls.fname2 = tempfile.mkstemp(text=False)
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd2, sftext[370:923])
else:
os.write(fd2, bytes(sftext[370:923], 'ascii'))
@@ -151,7 +151,7 @@ class TestSpecFile(unittest.TestCase):
fd3, cls.fname3 = tempfile.mkstemp(text=False)
txt = sftext[371:923]
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd3, txt)
else:
os.write(fd3, bytes(txt, 'ascii'))
@@ -177,16 +177,9 @@ class TestSpecFile(unittest.TestCase):
self.scan1_no_fhdr_crash = self.sf_no_fhdr_crash[0]
def tearDown(self):
- del self.sf
- del self.sf_no_fhdr
- del self.scan1
- del self.scan1_2
- del self.scan25
- del self.scan1_no_fhdr
- del self.sf_no_fhdr_crash
- del self.scan1_no_fhdr_crash
- del self.empty_scan
- gc.collect()
+ self.sf.close()
+ self.sf_no_fhdr.close()
+ self.sf_no_fhdr_crash.close()
def test_open(self):
self.assertIsInstance(self.sf, SpecFile)
@@ -376,7 +369,7 @@ class TestSpecFile(unittest.TestCase):
self.assertEqual(self.scan25.mca.channels,
[])
- @utils.test_logging(specfile._logger.name, warning=1)
+ @testutils.test_logging(specfile._logger.name, warning=1)
def test_empty_scan(self):
"""Test reading a scan with no data points"""
self.assertEqual(len(self.empty_scan.labels),
@@ -389,7 +382,7 @@ class TestSFLocale(unittest.TestCase):
@classmethod
def setUpClass(cls):
fd, cls.fname = tempfile.mkstemp(text=False)
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd, sftext)
else:
os.write(fd, bytes(sftext, 'ascii'))
@@ -399,13 +392,12 @@ class TestSFLocale(unittest.TestCase):
def tearDownClass(cls):
os.unlink(cls.fname)
locale.setlocale(locale.LC_NUMERIC, loc) # restore saved locale
- gc.collect()
def crunch_data(self):
self.sf3 = SpecFile(self.fname)
self.assertAlmostEqual(self.sf3[0].data_line(1)[2],
1.56)
- del self.sf3
+ self.sf3.close()
@unittest.skipIf(not try_DE, "de_DE.utf8 locale not installed")
def test_locale_de_DE(self):
diff --git a/silx/io/test/test_specfilewrapper.py b/silx/io/test/test_specfilewrapper.py
index c5b0f39..2f463fa 100644
--- a/silx/io/test/test_specfilewrapper.py
+++ b/silx/io/test/test_specfilewrapper.py
@@ -27,7 +27,6 @@ __authors__ = ["P. Knobel"]
__license__ = "MIT"
__date__ = "15/05/2017"
-import gc
import locale
import logging
import numpy
@@ -114,23 +113,15 @@ class TestSpecfilewrapper(unittest.TestCase):
@classmethod
def setUpClass(cls):
fd, cls.fname1 = tempfile.mkstemp(text=False)
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd, sftext)
else:
os.write(fd, bytes(sftext, 'ascii'))
os.close(fd)
- fd2, cls.fname2 = tempfile.mkstemp(text=False)
- if sys.version < '3.0':
- os.write(fd2, sftext[370:-97])
- else:
- os.write(fd2, bytes(sftext[370:-97], 'ascii'))
- os.close(fd2)
-
@classmethod
def tearDownClass(cls):
os.unlink(cls.fname1)
- os.unlink(cls.fname2)
def setUp(self):
self.sf = Specfile(self.fname1)
@@ -139,11 +130,7 @@ class TestSpecfilewrapper(unittest.TestCase):
self.scan25 = self.sf.select("25.1")
def tearDown(self):
- del self.sf
- del self.scan1
- del self.scan1_2
- del self.scan25
- gc.collect()
+ self.sf.close()
def test_number_of_scans(self):
self.assertEqual(3, len(self.sf))
diff --git a/silx/io/test/test_spech5.py b/silx/io/test/test_spech5.py
index 1d04c6f..b842243 100644
--- a/silx/io/test/test_spech5.py
+++ b/silx/io/test/test_spech5.py
@@ -1,6 +1,6 @@
# coding: utf-8
# /*##########################################################################
-# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+# Copyright (C) 2016-2018 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@@ -22,7 +22,6 @@
#
# ############################################################################*/
"""Tests for spech5"""
-import gc
from numpy import array_equal
import os
import io
@@ -32,11 +31,10 @@ import unittest
import datetime
from functools import partial
-from silx.test import utils
+from silx.utils import testutils
from .. import spech5
-from ..spech5 import (SpecH5, SpecH5Group,
- SpecH5Dataset, spec_date_to_iso8601)
+from ..spech5 import (SpecH5, SpecH5Dataset, spec_date_to_iso8601)
from .. import specfile
try:
@@ -46,7 +44,7 @@ except ImportError:
__authors__ = ["P. Knobel"]
__license__ = "MIT"
-__date__ = "26/07/2017"
+__date__ = "12/02/2018"
sftext = """#F /tmp/sf.dat
#E 1455180875
@@ -216,7 +214,7 @@ class TestSpecH5(unittest.TestCase):
@classmethod
def setUpClass(cls):
fd, cls.fname = tempfile.mkstemp()
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd, sftext)
else:
os.write(fd, bytes(sftext, 'ascii'))
@@ -230,9 +228,7 @@ class TestSpecH5(unittest.TestCase):
self.sfh5 = SpecH5(self.fname)
def tearDown(self):
- # fix Win32 permission error when deleting temp file
- del self.sfh5
- gc.collect()
+ self.sfh5.close()
def testContainsFile(self):
self.assertIn("/1.2/measurement", self.sfh5)
@@ -275,9 +271,9 @@ class TestSpecH5(unittest.TestCase):
def testDate(self):
# start time is in Iso8601 format
self.assertEqual(self.sfh5["/1.1/start_time"],
- b"2016-02-11T09:55:20")
+ u"2016-02-11T09:55:20")
self.assertEqual(self.sfh5["25.1/start_time"],
- b"2015-03-14T03:53:50")
+ u"2015-03-14T03:53:50")
def testDatasetInstanceAttr(self):
"""The SpecH5Dataset objects must implement some dummy attributes
@@ -299,7 +295,7 @@ class TestSpecH5(unittest.TestCase):
-3)
self.assertEqual(self.sfh5.get("/1.1/start_time", default=-3),
- b"2016-02-11T09:55:20")
+ u"2016-02-11T09:55:20")
def testGetClass(self):
"""Test :meth:`SpecH5Group.get`"""
@@ -355,33 +351,22 @@ class TestSpecH5(unittest.TestCase):
def testHeader(self):
file_header = self.sfh5["/1.2/instrument/specfile/file_header"]
scan_header = self.sfh5["/1.2/instrument/specfile/scan_header"]
- # convert ndarray(dtype=numpy.string_) to str
- if sys.version < '3.0':
- file_header = str(file_header[()])
- scan_header = str(scan_header[()])
- else:
- file_header = str(file_header.astype(str))
- scan_header = str(scan_header.astype(str))
# File header has 10 lines
- self.assertEqual(len(file_header.split("\n")), 10)
+ self.assertEqual(len(file_header), 10)
# 1.2 has 9 scan & mca header lines
- self.assertEqual(len(scan_header.split("\n")), 9)
+ self.assertEqual(len(scan_header), 9)
# line 4 of file header
self.assertEqual(
- file_header.split("\n")[3],
- "#C imaging User = opid17")
+ file_header[3],
+ u"#C imaging User = opid17")
# line 4 of scan header
scan_header = self.sfh5["25.1/instrument/specfile/scan_header"]
- if sys.version < '3.0':
- scan_header = str(scan_header[()])
- else:
- scan_header = str(scan_header[()].astype(str))
self.assertEqual(
- scan_header.split("\n")[3],
- "#P1 4.74255 6.197579 2.238283")
+ scan_header[3],
+ u"#P1 4.74255 6.197579 2.238283")
def testLinks(self):
self.assertTrue(
@@ -493,7 +478,7 @@ class TestSpecH5(unittest.TestCase):
def testTitle(self):
self.assertEqual(self.sfh5["/25.1/title"],
- b"25 ascan c3th 1.33245 1.52245 40 0.15")
+ u"ascan c3th 1.33245 1.52245 40 0.15")
def testValues(self):
group = self.sfh5["/25.1"]
@@ -582,9 +567,9 @@ class TestSpecH5(unittest.TestCase):
# All 0 values
self.assertNotIn("sample", self.sfh5["/1001.1"])
with self.assertRaises(KeyError):
- uc = self.sfh5["/1001.1/sample/unit_cell"]
+ self.sfh5["/1001.1/sample/unit_cell"]
- @utils.test_logging(spech5.logger1.name, warning=2)
+ @testutils.test_logging(spech5.logger1.name, warning=2)
def testOpenFileDescriptor(self):
"""Open a SpecH5 file from a file descriptor"""
with io.open(self.sfh5.filename) as f:
@@ -593,6 +578,7 @@ class TestSpecH5(unittest.TestCase):
name_list = []
# check if the object is working
self.sfh5.visit(name_list.append)
+ sfh5.close()
sftext_multi_mca_headers = """
@@ -624,7 +610,7 @@ class TestSpecH5MultiMca(unittest.TestCase):
@classmethod
def setUpClass(cls):
fd, cls.fname = tempfile.mkstemp(text=False)
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd, sftext_multi_mca_headers)
else:
os.write(fd, bytes(sftext_multi_mca_headers, 'ascii'))
@@ -638,9 +624,7 @@ class TestSpecH5MultiMca(unittest.TestCase):
self.sfh5 = SpecH5(self.fname)
def tearDown(self):
- # fix Win32 permission error when deleting temp file
- del self.sfh5
- gc.collect()
+ self.sfh5.close()
def testMcaCalib(self):
mca0_calib = self.sfh5["/1.1/measurement/mca_0/info/calibration"]
@@ -691,7 +675,7 @@ sftext_no_cols = r"""#F C:/DATA\test.mca
#D Thu Jul 7 08:40:19 2016
#C no data cols, one mca analyser, single spectrum
#@MCA %16C
-#@CHANN 151 29 29 1
+#@CHANN 151 0 150 1
#@CALIB 0 2 0
@A 789 784 788 814 847 862 880 904 925 955 987 1015 1031 1070 1111 1139 \
1203 1236 1290 1392 1492 1558 1688 1813 1977 2119 2346 2699 3121 3542 4102 4970 \
@@ -756,7 +740,7 @@ class TestSpecH5NoDataCols(unittest.TestCase):
@classmethod
def setUpClass(cls):
fd, cls.fname = tempfile.mkstemp()
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd, sftext_no_cols)
else:
os.write(fd, bytes(sftext_no_cols, 'ascii'))
@@ -770,9 +754,7 @@ class TestSpecH5NoDataCols(unittest.TestCase):
self.sfh5 = SpecH5(self.fname)
def tearDown(self):
- # fix Win32 permission error when deleting temp file
- del self.sfh5
- gc.collect()
+ self.sfh5.close()
def testScan1(self):
# 1.1: single analyser, single spectrum, 151 channels
@@ -829,7 +811,7 @@ class TestSpecH5SlashInLabels(unittest.TestCase):
@classmethod
def setUpClass(cls):
fd, cls.fname = tempfile.mkstemp()
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd, sf_text_slash)
else:
os.write(fd, bytes(sf_text_slash, 'ascii'))
@@ -843,9 +825,7 @@ class TestSpecH5SlashInLabels(unittest.TestCase):
self.sfh5 = SpecH5(self.fname)
def tearDown(self):
- # fix Win32 permission error when deleting temp file
- del self.sfh5
- gc.collect()
+ self.sfh5.close()
def testLabels(self):
"""Ensure `/` is substituted with `%` and
diff --git a/silx/io/test/test_spectoh5.py b/silx/io/test/test_spectoh5.py
index 421c48b..8020731 100644
--- a/silx/io/test/test_spectoh5.py
+++ b/silx/io/test/test_spectoh5.py
@@ -1,6 +1,6 @@
# coding: utf-8
# /*##########################################################################
-# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+# Copyright (C) 2016-2018 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@@ -23,8 +23,7 @@
# ############################################################################*/
"""Tests for SpecFile to HDF5 converter"""
-import gc
-from numpy import array_equal, string_
+from numpy import array_equal
import os
import sys
import tempfile
@@ -41,7 +40,7 @@ else:
__authors__ = ["P. Knobel"]
__license__ = "MIT"
-__date__ = "02/06/2017"
+__date__ = "12/02/2018"
sftext = """#F /tmp/sf.dat
@@ -93,7 +92,7 @@ class TestConvertSpecHDF5(unittest.TestCase):
@classmethod
def setUpClass(cls):
fd, cls.spec_fname = tempfile.mkstemp(text=False)
- if sys.version < '3.0':
+ if sys.version_info < (3, ):
os.write(fd, sftext)
else:
os.write(fd, bytes(sftext, 'ascii'))
@@ -115,11 +114,9 @@ class TestConvertSpecHDF5(unittest.TestCase):
self.h5f = h5py.File(self.h5_fname, "a")
def tearDown(self):
- del self.sfh5
self.h5f.close()
- del self.h5f
+ self.sfh5.close()
os.unlink(self.h5_fname)
- gc.collect()
def testAppendToHDF5(self):
write_to_h5(self.sfh5, self.h5f, h5path="/foo/bar/spam")
@@ -141,24 +138,22 @@ class TestConvertSpecHDF5(unittest.TestCase):
def testTitle(self):
"""Test the value of a dataset"""
title12 = self.h5f["/1.2/title"].value
- if sys.version > '3.0':
- title12 = title12.decode()
self.assertEqual(title12,
- "1 aaaaaa")
+ u"aaaaaa")
def testAttrs(self):
# Test root group (file) attributes
self.assertEqual(self.h5f.attrs["NX_class"],
- string_("NXroot"))
+ u"NXroot")
# Test dataset attributes
ds = self.h5f["/1.2/instrument/mca_1/data"]
self.assertTrue("interpretation" in ds.attrs)
self.assertEqual(list(ds.attrs.values()),
- [string_("spectrum")])
+ [u"spectrum"])
# Test group attributes
grp = self.h5f["1.1"]
self.assertEqual(grp.attrs["NX_class"],
- string_("NXentry"))
+ u"NXentry")
self.assertEqual(len(list(grp.attrs.keys())),
1)
diff --git a/silx/io/test/test_url.py b/silx/io/test/test_url.py
new file mode 100644
index 0000000..5093fc2
--- /dev/null
+++ b/silx/io/test/test_url.py
@@ -0,0 +1,209 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for url module"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "29/01/2018"
+
+
+import unittest
+from ..url import DataUrl
+
+
+class TestDataUrl(unittest.TestCase):
+
+ def assertUrl(self, url, expected):
+ self.assertEqual(url.is_valid(), expected[0])
+ self.assertEqual(url.is_absolute(), expected[1])
+ self.assertEqual(url.scheme(), expected[2])
+ self.assertEqual(url.file_path(), expected[3])
+ self.assertEqual(url.data_path(), expected[4])
+ self.assertEqual(url.data_slice(), expected[5])
+
+ def test_fabio_absolute(self):
+ url = DataUrl("fabio:///data/image.edf?slice=2")
+ expected = [True, True, "fabio", "/data/image.edf", None, (2, )]
+ self.assertUrl(url, expected)
+
+ def test_fabio_absolute_windows(self):
+ url = DataUrl("fabio:///C:/data/image.edf?slice=2")
+ expected = [True, True, "fabio", "C:/data/image.edf", None, (2, )]
+ self.assertUrl(url, expected)
+
+ def test_silx_absolute(self):
+ url = DataUrl("silx:///data/image.h5?path=/data/dataset&slice=1,5")
+ expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)]
+ self.assertUrl(url, expected)
+
+ def test_commandline_shell_separator(self):
+ url = DataUrl("silx:///data/image.h5::path=/data/dataset&slice=1,5")
+ expected = [True, True, "silx", "/data/image.h5", "/data/dataset", (1, 5)]
+ self.assertUrl(url, expected)
+
+ def test_silx_absolute2(self):
+ url = DataUrl("silx:///data/image.edf?/scan_0/detector/data")
+ expected = [True, True, "silx", "/data/image.edf", "/scan_0/detector/data", None]
+ self.assertUrl(url, expected)
+
+ def test_silx_absolute_windows(self):
+ url = DataUrl("silx:///C:/data/image.h5?/scan_0/detector/data")
+ expected = [True, True, "silx", "C:/data/image.h5", "/scan_0/detector/data", None]
+ self.assertUrl(url, expected)
+
+ def test_silx_relative(self):
+ url = DataUrl("silx:./image.h5")
+ expected = [True, False, "silx", "./image.h5", None, None]
+ self.assertUrl(url, expected)
+
+ def test_fabio_relative(self):
+ url = DataUrl("fabio:./image.edf")
+ expected = [True, False, "fabio", "./image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_silx_relative2(self):
+ url = DataUrl("silx:image.h5")
+ expected = [True, False, "silx", "image.h5", None, None]
+ self.assertUrl(url, expected)
+
+ def test_fabio_relative2(self):
+ url = DataUrl("fabio:image.edf")
+ expected = [True, False, "fabio", "image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_relative(self):
+ url = DataUrl("image.edf")
+ expected = [True, False, None, "image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_relative2(self):
+ url = DataUrl("./foo/bar/image.edf")
+ expected = [True, False, None, "./foo/bar/image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_relative3(self):
+ url = DataUrl("foo/bar/image.edf")
+ expected = [True, False, None, "foo/bar/image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_absolute(self):
+ url = DataUrl("/data/image.edf")
+ expected = [True, True, None, "/data/image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_file_absolute_windows(self):
+ url = DataUrl("C:/data/image.edf")
+ expected = [True, True, None, "C:/data/image.edf", None, None]
+ self.assertUrl(url, expected)
+
+ def test_absolute_with_path(self):
+ url = DataUrl("/foo/foobar.h5?/foo/bar")
+ expected = [True, True, None, "/foo/foobar.h5", "/foo/bar", None]
+ self.assertUrl(url, expected)
+
+ def test_windows_file_data_slice(self):
+ url = DataUrl("C:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [True, True, None, "C:/foo/foobar.h5", "/foo/bar", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_scheme_file_data_slice(self):
+ url = DataUrl("silx:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [True, True, "silx", "/foo/foobar.h5", "/foo/bar", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_scheme_windows_file_data_slice(self):
+ url = DataUrl("silx:C:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [True, True, "silx", "C:/foo/foobar.h5", "/foo/bar", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_empty(self):
+ url = DataUrl("")
+ expected = [False, False, None, "", None, None]
+ self.assertUrl(url, expected)
+
+ def test_unknown_scheme(self):
+ url = DataUrl("foo:/foo/foobar.h5?path=/foo/bar&slice=5,1")
+ expected = [False, True, "foo", "/foo/foobar.h5", "/foo/bar", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_slice(self):
+ url = DataUrl("/a.h5?path=/b&slice=5,1")
+ expected = [True, True, None, "/a.h5", "/b", (5, 1)]
+ self.assertUrl(url, expected)
+
+ def test_slice_ellipsis(self):
+ url = DataUrl("/a.h5?path=/b&slice=...")
+ expected = [True, True, None, "/a.h5", "/b", (Ellipsis, )]
+ self.assertUrl(url, expected)
+
+ def test_slice_slicing(self):
+ url = DataUrl("/a.h5?path=/b&slice=:")
+ expected = [True, True, None, "/a.h5", "/b", (slice(None), )]
+ self.assertUrl(url, expected)
+
+ def test_slice_missing_element(self):
+ url = DataUrl("/a.h5?path=/b&slice=5,,1")
+ expected = [False, True, None, "/a.h5", "/b", None]
+ self.assertUrl(url, expected)
+
+ def test_slice_no_elements(self):
+ url = DataUrl("/a.h5?path=/b&slice=")
+ expected = [False, True, None, "/a.h5", "/b", None]
+ self.assertUrl(url, expected)
+
+ def test_create_relative_url(self):
+ url = DataUrl(scheme="silx", file_path="./foo.h5", data_path="/", data_slice=(5, 1))
+ self.assertFalse(url.is_absolute())
+ url2 = DataUrl(url.path())
+ self.assertEqual(url, url2)
+
+ def test_create_absolute_url(self):
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1))
+ url2 = DataUrl(url.path())
+ self.assertEqual(url, url2)
+
+ def test_create_absolute_windows_url(self):
+ url = DataUrl(scheme="silx", file_path="C:/foo.h5", data_path="/", data_slice=(5, 1))
+ url2 = DataUrl(url.path())
+ self.assertEqual(url, url2)
+
+ def test_create_slice_url(self):
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_path="/", data_slice=(5, 1, Ellipsis, slice(None)))
+ url2 = DataUrl(url.path())
+ self.assertEqual(url, url2)
+
+ def test_wrong_url(self):
+ url = DataUrl(scheme="silx", file_path="/foo.h5", data_slice=(5, 1))
+ self.assertFalse(url.is_valid())
+
+
+def suite():
+ test_suite = unittest.TestSuite()
+ loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
+ test_suite.addTest(loadTests(TestDataUrl))
+ return test_suite
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='suite')
diff --git a/silx/io/test/test_utils.py b/silx/io/test/test_utils.py
index 09074bb..53e001c 100644
--- a/silx/io/test/test_utils.py
+++ b/silx/io/test/test_utils.py
@@ -30,9 +30,9 @@ import re
import shutil
import tempfile
import unittest
-import sys
from .. import utils
+import silx.io.url
try:
import h5py
@@ -48,7 +48,7 @@ except ImportError:
__authors__ = ["P. Knobel"]
__license__ = "MIT"
-__date__ = "28/09/2017"
+__date__ = "12/02/2018"
expected_spec1 = r"""#F .*
@@ -110,9 +110,9 @@ class TestSave(unittest.TestCase):
def test_save_csv(self):
utils.save1D(self.csv_fname, self.x, self.y,
- xlabel=self.xlab, ylabels=self.ylabs,
- filetype="csv", fmt=["%d", "%.2f", "%.2e"],
- csvdelim=";", autoheader=True)
+ xlabel=self.xlab, ylabels=self.ylabs,
+ filetype="csv", fmt=["%d", "%.2f", "%.2e"],
+ csvdelim=";", autoheader=True)
csvf = open(self.csv_fname)
actual_csv = csvf.read()
@@ -125,22 +125,20 @@ class TestSave(unittest.TestCase):
and converting it to a named record array"""
npyf = open(self.npy_fname, "wb")
utils.save1D(npyf, self.x, self.y,
- xlabel=self.xlab, ylabels=self.ylabs)
+ xlabel=self.xlab, ylabels=self.ylabs)
npyf.close()
npy_recarray = numpy.load(self.npy_fname)
self.assertEqual(npy_recarray.shape, (3,))
- self.assertTrue(
- numpy.array_equal(
- npy_recarray['Ordinate1'],
- numpy.array((4, 5, 6))))
+ self.assertTrue(numpy.array_equal(npy_recarray['Ordinate1'],
+ numpy.array((4, 5, 6))))
def test_savespec_filename(self):
"""Save SpecFile using savespec()"""
utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab,
- ylabel=self.ylabs[0], fmt=["%d", "%.2f"], close_file=True,
- scan_number=1)
+ ylabel=self.ylabs[0], fmt=["%d", "%.2f"],
+ close_file=True, scan_number=1)
specf = open(self.spec_fname)
actual_spec = specf.read()
@@ -152,15 +150,15 @@ class TestSave(unittest.TestCase):
"""Save SpecFile using savespec(), passing a file handle"""
# first savespec: open, write file header, save y[0] as scan 1,
# return file handle
- specf = utils.savespec(self.spec_fname, self.x, self.y[0], xlabel=self.xlab,
- ylabel=self.ylabs[0], fmt=["%d", "%.2f"],
- close_file=False)
+ specf = utils.savespec(self.spec_fname, self.x, self.y[0],
+ xlabel=self.xlab, ylabel=self.ylabs[0],
+ fmt=["%d", "%.2f"], close_file=False)
# second savespec: save y[1] as scan 2, close file
utils.savespec(specf, self.x, self.y[1], xlabel=self.xlab,
- ylabel=self.ylabs[1], fmt=["%d", "%.2f"],
- write_file_header=False, close_file=True,
- scan_number=2)
+ ylabel=self.ylabs[1], fmt=["%d", "%.2f"],
+ write_file_header=False, close_file=True,
+ scan_number=2)
specf = open(self.spec_fname)
actual_spec = specf.read()
@@ -171,7 +169,7 @@ class TestSave(unittest.TestCase):
def test_save_spec(self):
"""Save SpecFile using save()"""
utils.save1D(self.spec_fname, self.x, self.y, xlabel=self.xlab,
- ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
+ ylabels=self.ylabs, filetype="spec", fmt=["%d", "%.2f"])
specf = open(self.spec_fname)
actual_spec = specf.read()
@@ -192,7 +190,7 @@ class TestSave(unittest.TestCase):
self.y = [[4, 5, 6], [7, 8, 9]]
self.ylabs = ["Ordinate1", "Ordinate2"]
utils.save1D(self.csv_fname, self.x, self.y,
- autoheader=True, fmt=["%d", "%.2f", "%.2e"])
+ autoheader=True, fmt=["%d", "%.2f", "%.2e"])
csvf = open(self.csv_fname)
actual_csv = csvf.read()
@@ -242,15 +240,12 @@ class TestH5Ls(unittest.TestCase):
self.assertIn("+foo", lines)
self.assertIn("\t+bar", lines)
- self.assertMatchAnyStringInList(
- r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">',
- lines)
- self.assertMatchAnyStringInList(
- r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">',
- lines)
- self.assertMatchAnyStringInList(
- r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">',
- lines)
+ match = r'\t\t<HDF5 dataset "tmp": shape \(3,\), type "<i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t\t<HDF5 dataset "spam": shape \(2, 2\), type "<i[48]">'
+ self.assertMatchAnyStringInList(match, lines)
+ match = r'\t<HDF5 dataset "data": shape \(1,\), type "<f[48]">'
+ self.assertMatchAnyStringInList(match, lines)
os.unlink(self.h5_fname)
@@ -325,12 +320,6 @@ class TestOpen(unittest.TestCase):
@classmethod
def tearDownClass(cls):
- if sys.platform == "win32" and fabio is not None:
- # gc collect is needed to close a file descriptor
- # opened by fabio and not released.
- # https://github.com/silx-kit/fabio/issues/167
- import gc
- gc.collect()
shutil.rmtree(cls.tmp_directory)
def testH5(self):
@@ -410,6 +399,30 @@ class TestOpen(unittest.TestCase):
# load it
self.assertRaises(IOError, utils.open, self.missing_filename)
+ def test_silx_scheme(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = silx.io.url.DataUrl(scheme="silx", file_path=self.h5_filename, data_path="/")
+ with utils.open(url.path()) as f:
+ self.assertIsNotNone(f)
+ self.assertTrue(silx.io.utils.is_file(f))
+
+ def test_fabio_scheme(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ if fabio is None:
+ self.skipTest("Fabio is missing")
+ url = silx.io.url.DataUrl(scheme="fabio", file_path=self.edf_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_bad_url(self):
+ url = silx.io.url.DataUrl(scheme="sil", file_path=self.h5_filename)
+ self.assertRaises(IOError, utils.open, url.path())
+
+ def test_sliced_url(self):
+ url = silx.io.url.DataUrl(file_path=self.h5_filename, data_slice=(5,))
+ self.assertRaises(IOError, utils.open, url.path())
+
class TestNodes(unittest.TestCase):
"""Test `silx.io.utils.is_` functions."""
@@ -443,7 +456,7 @@ class TestNodes(unittest.TestCase):
class Foo(object):
def __init__(self):
- self.h5py_class = h5py.File
+ self.h5_class = utils.H5Type.FILE
obj = Foo()
self.assertTrue(utils.is_file(obj))
self.assertTrue(utils.is_group(obj))
@@ -455,7 +468,7 @@ class TestNodes(unittest.TestCase):
class Foo(object):
def __init__(self):
- self.h5py_class = h5py.Group
+ self.h5_class = utils.H5Type.GROUP
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertTrue(utils.is_group(obj))
@@ -467,7 +480,7 @@ class TestNodes(unittest.TestCase):
class Foo(object):
def __init__(self):
- self.h5py_class = h5py.Dataset
+ self.h5_class = utils.H5Type.DATASET
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
@@ -491,13 +504,142 @@ class TestNodes(unittest.TestCase):
class Foo(object):
def __init__(self):
- self.h5py_class = int
+ self.h5_class = int
obj = Foo()
self.assertFalse(utils.is_file(obj))
self.assertFalse(utils.is_group(obj))
self.assertFalse(utils.is_dataset(obj))
+class TestGetData(unittest.TestCase):
+ """Test `silx.io.utils.get_data` function."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tmp_directory = tempfile.mkdtemp()
+ cls.createResources(cls.tmp_directory)
+
+ @classmethod
+ def createResources(cls, directory):
+
+ if h5py is not None:
+ cls.h5_filename = os.path.join(directory, "test.h5")
+ h5 = h5py.File(cls.h5_filename, mode="w")
+ h5["group/group/scalar"] = 50
+ h5["group/group/array"] = [1, 2, 3, 4, 5]
+ h5["group/group/array2d"] = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
+ h5.close()
+
+ cls.spec_filename = os.path.join(directory, "test.dat")
+ utils.savespec(cls.spec_filename, [1], [1.1], xlabel="x", ylabel="y",
+ fmt=["%d", "%.2f"], close_file=True, scan_number=1)
+
+ if fabio is not None:
+ cls.edf_filename = os.path.join(directory, "test.edf")
+ cls.edf_multiframe_filename = os.path.join(directory, "test_multi.edf")
+ header = fabio.fabioimage.OrderedDict()
+ header["integer"] = "10"
+ data = numpy.array([[10, 50], [50, 10]])
+ fabiofile = fabio.edfimage.EdfImage(data, header)
+ fabiofile.write(cls.edf_filename)
+ fabiofile.appendFrame(data=data, header=header)
+ fabiofile.write(cls.edf_multiframe_filename)
+
+ cls.txt_filename = os.path.join(directory, "test.txt")
+ f = io.open(cls.txt_filename, "w+t")
+ f.write(u"Kikoo")
+ f.close()
+
+ cls.missing_filename = os.path.join(directory, "test.missing")
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tmp_directory)
+
+ def test_hdf5_scalar(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = "silx:%s?/group/group/scalar" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data, 50)
+
+ def test_hdf5_array(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = "silx:%s?/group/group/array" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5, ))
+ self.assertEqual(data[0], 1)
+
+ def test_hdf5_array_slice(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = "silx:%s?path=/group/group/array2d&slice=1" % self.h5_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (5, ))
+ self.assertEqual(data[0], 6)
+
+ def test_hdf5_array_slice_out_of_range(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ url = "silx:%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_edf_using_silx(self):
+ if h5py is None:
+ self.skipTest("H5py is missing")
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "silx:%s?/scan_0/instrument/detector_0/data" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_frame(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "fabio:%s?slice=1" % self.edf_multiframe_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_singleframe(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "fabio:%s?slice=0" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_fabio_too_much_frames(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "fabio:%s?slice=..." % self.edf_multiframe_filename
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_fabio_no_frame(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "fabio:%s" % self.edf_filename
+ data = utils.get_data(url=url)
+ self.assertEqual(data.shape, (2, 2))
+ self.assertEqual(data[0, 0], 10)
+
+ def test_unsupported_scheme(self):
+ url = "foo:/foo/bar"
+ self.assertRaises(ValueError, utils.get_data, url)
+
+ def test_no_scheme(self):
+ if fabio is None:
+ self.skipTest("fabio is missing")
+ url = "%s?path=/group/group/array2d&slice=5" % self.h5_filename
+ self.assertRaises((ValueError, IOError), utils.get_data, url)
+
+ def test_file_not_exists(self):
+ url = "silx:/foo/bar"
+ self.assertRaises(IOError, utils.get_data, url)
+
+
def suite():
loadTests = unittest.defaultTestLoader.loadTestsFromTestCase
test_suite = unittest.TestSuite()
@@ -505,6 +647,7 @@ def suite():
test_suite.addTest(loadTests(TestH5Ls))
test_suite.addTest(loadTests(TestOpen))
test_suite.addTest(loadTests(TestNodes))
+ test_suite.addTest(loadTests(TestGetData))
return test_suite