summaryrefslogtreecommitdiff
path: root/src/silx/io/test/test_spech5.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/silx/io/test/test_spech5.py')
-rw-r--r--src/silx/io/test/test_spech5.py912
1 files changed, 912 insertions, 0 deletions
diff --git a/src/silx/io/test/test_spech5.py b/src/silx/io/test/test_spech5.py
new file mode 100644
index 0000000..93175f7
--- /dev/null
+++ b/src/silx/io/test/test_spech5.py
@@ -0,0 +1,912 @@
+# /*##########################################################################
+# Copyright (C) 2016-2023 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for spech5"""
+import numpy
+import os
+import io
+import tempfile
+import unittest
+import datetime
+from functools import partial
+
+from silx.utils import testutils
+
+from .. import spech5
+from ..spech5 import SpecH5, SpecH5Dataset, spec_date_to_iso8601
+from .. import specfile
+
+import h5py
+
+__authors__ = ["P. Knobel"]
+__license__ = "MIT"
+__date__ = "12/02/2018"
+
+sftext = """#F /tmp/sf.dat
+#E 1455180875
+#D Thu Feb 11 09:54:35 2016
+#C imaging User = opid17
+#O0 Pslit HGap MRTSlit UP MRTSlit DOWN
+#O1 Sslit1 VOff Sslit1 HOff Sslit1 VGap
+#o0 pshg mrtu mrtd
+#o2 ss1vo ss1ho ss1vg
+
+#J0 Seconds IA ion.mono Current
+#J1 xbpmc2 idgap1 Inorm
+
+#S 1 ascan ss1vo -4.55687 -0.556875 40 0.2
+#D Thu Feb 11 09:55:20 2016
+#T 0.2 (Seconds)
+#P0 180.005 -0.66875 0.87125
+#P1 14.74255 16.197579 12.238283
+#N 4
+#L MRTSlit UP second column 3rd_col
+-1.23 5.89 8
+8.478100E+01 5 1.56
+3.14 2.73 -3.14
+1.2 2.3 3.4
+
+#S 25 ascan c3th 1.33245 1.52245 40 0.15
+#D Sat 2015/03/14 03:53:50
+#P0 80.005 -1.66875 1.87125
+#P1 4.74255 6.197579 2.238283
+#N 5
+#L column0 column1 col2 col3
+0.0 0.1 0.2 0.3
+1.0 1.1 1.2 1.3
+2.0 2.1 2.2 2.3
+3.0 3.1 3.2 3.3
+
+#S 1 aaaaaa
+#D Thu Feb 11 10:00:32 2016
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+@A 10 9 8
+@A 1 1 1.1
+3 4
+@A 3.1 4 5
+@A 7 6 5
+@A 1 1 1
+5 6
+@A 6 7.7 8
+@A 4 3 2
+@A 1 1 1
+
+#S 1000 bbbbb
+#G1 3.25 3.25 5.207 90 90 120 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353
+#G3 0.0106337923671 0.027529133 1.206191273 -1.43467075 0.7633438883 0.02401568018 -1.709143587 -2.097621783 0.02456954971
+#L a b
+1 2
+
+#S 1001 ccccc
+#G1 0. 0. 0. 0 0 0 2.232368448 2.232368448 1.206680489 90 90 60 1 1 2 -1 2 2 26.132 7.41 -88.96 1.11 1.000012861 15.19 26.06 67.355 -88.96 1.11 1.000012861 15.11 0.723353 0.723353
+#G3 0. 0. 0. 0. 0.0 0. 0. 0. 0.
+#L a b
+1 2
+
+"""
+
+
+class TestSpecDate(unittest.TestCase):
+ """
+ Test of the spec_date_to_iso8601 function.
+ """
+
+ # TODO : time zone tests
+ # TODO : error cases
+
+ @classmethod
+ def setUpClass(cls):
+ import locale
+
+ # FYI : not threadsafe
+ cls.locale_saved = locale.setlocale(locale.LC_TIME)
+ locale.setlocale(locale.LC_TIME, "C")
+
+ @classmethod
+ def tearDownClass(cls):
+ import locale
+
+ # FYI : not threadsafe
+ locale.setlocale(locale.LC_TIME, cls.locale_saved)
+
+ def setUp(self):
+ # covering all week days
+ self.n_days = range(1, 10)
+ # covering all months
+ self.n_months = range(1, 13)
+
+ self.n_years = [1999, 2016, 2020]
+ self.n_seconds = [0, 5, 26, 59]
+ self.n_minutes = [0, 9, 42, 59]
+ self.n_hours = [0, 2, 17, 23]
+
+ self.formats = ["%a %b %d %H:%M:%S %Y", "%a %Y/%m/%d %H:%M:%S"]
+
+ self.check_date_formats = partial(
+ self.__check_date_formats,
+ year=self.n_years[0],
+ month=self.n_months[0],
+ day=self.n_days[0],
+ hour=self.n_hours[0],
+ minute=self.n_minutes[0],
+ second=self.n_seconds[0],
+ msg=None,
+ )
+
+ def __check_date_formats(self, year, month, day, hour, minute, second, msg=None):
+ dt = datetime.datetime(year, month, day, hour, minute, second)
+ expected_date = dt.isoformat()
+
+ for i_fmt, fmt in enumerate(self.formats):
+ spec_date = dt.strftime(fmt)
+ iso_date = spec_date_to_iso8601(spec_date)
+ self.assertEqual(
+ iso_date,
+ expected_date,
+ msg="Testing {0}. format={1}. "
+ 'Expected "{2}", got "{3} ({4})" (dt={5}).'
+ "".format(msg, i_fmt, expected_date, iso_date, spec_date, dt),
+ )
+
+ def testYearsNominal(self):
+ for year in self.n_years:
+ self.check_date_formats(year=year, msg="year")
+
+ def testMonthsNominal(self):
+ for month in self.n_months:
+ self.check_date_formats(month=month, msg="month")
+
+ def testDaysNominal(self):
+ for day in self.n_days:
+ self.check_date_formats(day=day, msg="day")
+
+ def testHoursNominal(self):
+ for hour in self.n_hours:
+ self.check_date_formats(hour=hour, msg="hour")
+
+ def testMinutesNominal(self):
+ for minute in self.n_minutes:
+ self.check_date_formats(minute=minute, msg="minute")
+
+ def testSecondsNominal(self):
+ for second in self.n_seconds:
+ self.check_date_formats(second=second, msg="second")
+
+
+class TestSpecH5(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp()
+ os.write(fd, bytes(sftext, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testContainsFile(self):
+ self.assertIn("/1.2/measurement", self.sfh5)
+ self.assertIn("/25.1", self.sfh5)
+ self.assertIn("25.1", self.sfh5)
+ self.assertNotIn("25.2", self.sfh5)
+ # measurement is a child of a scan, full path would be required to
+ # access from root level
+ self.assertNotIn("measurement", self.sfh5)
+ # Groups may or may not have a trailing /
+ self.assertIn("/1.2/measurement/mca_1/", self.sfh5)
+ self.assertIn("/1.2/measurement/mca_1", self.sfh5)
+ # Datasets can't have a trailing /
+ self.assertNotIn("/1.2/measurement/mca_0/info/calibration/ ", self.sfh5)
+ # No mca_8
+ self.assertNotIn("/1.2/measurement/mca_8/info/calibration", self.sfh5)
+ # Link
+ self.assertIn("/1.2/measurement/mca_0/info/calibration", self.sfh5)
+
+ def testContainsGroup(self):
+ self.assertIn("measurement", self.sfh5["/1.2/"])
+ self.assertIn("measurement", self.sfh5["/1.2"])
+ self.assertIn("25.1", self.sfh5["/"])
+ self.assertNotIn("25.2", self.sfh5["/"])
+ self.assertIn("instrument/positioners/Sslit1 HOff", self.sfh5["/1.1"])
+ # illegal trailing "/" after dataset name
+ self.assertNotIn("instrument/positioners/Sslit1 HOff/", self.sfh5["/1.1"])
+ # full path to element in group (OK)
+ self.assertIn(
+ "/1.1/instrument/positioners/Sslit1 HOff", self.sfh5["/1.1/instrument"]
+ )
+
+ def testDataColumn(self):
+ self.assertAlmostEqual(sum(self.sfh5["/1.2/measurement/duo"]), 12.0)
+ self.assertAlmostEqual(
+ sum(self.sfh5["1.1"]["measurement"]["MRTSlit UP"]), 87.891, places=4
+ )
+
+ def testDate(self):
+ # start time is in Iso8601 format
+ self.assertEqual(self.sfh5["/1.1/start_time"], "2016-02-11T09:55:20")
+ self.assertEqual(self.sfh5["25.1/start_time"], "2015-03-14T03:53:50")
+
+ def assertRaisesRegex(self, *args, **kwargs):
+ return super(TestSpecH5, self).assertRaisesRegex(*args, **kwargs)
+
+ def testDatasetInstanceAttr(self):
+ """The SpecH5Dataset objects must implement some dummy attributes
+ to improve compatibility with widgets dealing with h5py datasets."""
+ self.assertIsNone(self.sfh5["/1.1/start_time"].compression)
+ self.assertIsNone(self.sfh5["1.1"]["measurement"]["MRTSlit UP"].chunks)
+
+ # error message must be explicit
+ with self.assertRaisesRegex(
+ AttributeError, "SpecH5Dataset has no attribute tOTo"
+ ):
+ dummy = self.sfh5["/1.1/start_time"].tOTo
+
+ def testGet(self):
+ """Test :meth:`SpecH5Group.get`"""
+ # default value of param *default* is None
+ self.assertIsNone(self.sfh5.get("toto"))
+ self.assertEqual(self.sfh5["25.1"].get("toto", default=-3), -3)
+
+ self.assertEqual(
+ self.sfh5.get("/1.1/start_time", default=-3), "2016-02-11T09:55:20"
+ )
+
+ def testGetClass(self):
+ """Test :meth:`SpecH5Group.get`"""
+ self.assertIs(self.sfh5["1.1"].get("start_time", getclass=True), h5py.Dataset)
+ self.assertIs(self.sfh5["1.1"].get("instrument", getclass=True), h5py.Group)
+
+ # spech5 does not define external link, so there is no way
+ # a group can *get* a SpecH5 class
+
+ def testGetApi(self):
+ result = self.sfh5.get("1.1", getclass=True, getlink=True)
+ self.assertIs(result, h5py.HardLink)
+ result = self.sfh5.get("1.1", getclass=False, getlink=True)
+ self.assertIsInstance(result, h5py.HardLink)
+ result = self.sfh5.get("1.1", getclass=True, getlink=False)
+ self.assertIs(result, h5py.Group)
+ result = self.sfh5.get("1.1", getclass=False, getlink=False)
+ self.assertIsInstance(result, spech5.SpecH5Group)
+
+ def testGetItemGroup(self):
+ group = self.sfh5["25.1"]["instrument"]
+ self.assertEqual(
+ list(group["positioners"].keys()),
+ [
+ "Pslit HGap",
+ "MRTSlit UP",
+ "MRTSlit DOWN",
+ "Sslit1 VOff",
+ "Sslit1 HOff",
+ "Sslit1 VGap",
+ ],
+ )
+ with self.assertRaises(KeyError):
+ group["Holy Grail"]
+
+ def testGetitemSpecH5(self):
+ self.assertEqual(
+ self.sfh5["/1.2/instrument/positioners"],
+ self.sfh5["1.2"]["instrument"]["positioners"],
+ )
+
+ def testH5pyClass(self):
+ """Test :attr:`h5py_class` returns the corresponding h5py class
+ (h5py.File, h5py.Group, h5py.Dataset)"""
+ a_file = self.sfh5
+ self.assertIs(a_file.h5py_class, h5py.File)
+
+ a_group = self.sfh5["/1.2/measurement"]
+ self.assertIs(a_group.h5py_class, h5py.Group)
+
+ a_dataset = self.sfh5["/1.1/instrument/positioners/Sslit1 HOff"]
+ self.assertIs(a_dataset.h5py_class, h5py.Dataset)
+
+ def testHeader(self):
+ file_header = self.sfh5["/1.2/instrument/specfile/file_header"]
+ scan_header = self.sfh5["/1.2/instrument/specfile/scan_header"]
+
+ # File header has 10 lines
+ self.assertEqual(len(file_header), 10)
+ # 1.2 has 9 scan & mca header lines
+ self.assertEqual(len(scan_header), 9)
+
+ # line 4 of file header
+ self.assertEqual(file_header[3], "#C imaging User = opid17")
+ # line 4 of scan header
+ scan_header = self.sfh5["25.1/instrument/specfile/scan_header"]
+
+ self.assertEqual(scan_header[3], "#P1 4.74255 6.197579 2.238283")
+
+ def testLinks(self):
+ self.assertTrue(
+ numpy.array_equal(
+ self.sfh5["/1.2/measurement/mca_0/data"],
+ self.sfh5["/1.2/instrument/mca_0/data"],
+ )
+ )
+ self.assertTrue(
+ numpy.array_equal(
+ self.sfh5["/1.2/measurement/mca_0/info/data"],
+ self.sfh5["/1.2/instrument/mca_0/data"],
+ )
+ )
+ self.assertTrue(
+ numpy.array_equal(
+ self.sfh5["/1.2/measurement/mca_0/info/channels"],
+ self.sfh5["/1.2/instrument/mca_0/channels"],
+ )
+ )
+ self.assertEqual(
+ self.sfh5["/1.2/measurement/mca_0/info/"].keys(),
+ self.sfh5["/1.2/instrument/mca_0/"].keys(),
+ )
+
+ self.assertEqual(
+ self.sfh5["/1.2/measurement/mca_0/info/preset_time"],
+ self.sfh5["/1.2/instrument/mca_0/preset_time"],
+ )
+ self.assertEqual(
+ self.sfh5["/1.2/measurement/mca_0/info/live_time"],
+ self.sfh5["/1.2/instrument/mca_0/live_time"],
+ )
+ self.assertEqual(
+ self.sfh5["/1.2/measurement/mca_0/info/elapsed_time"],
+ self.sfh5["/1.2/instrument/mca_0/elapsed_time"],
+ )
+
+ def testListScanIndices(self):
+ self.assertEqual(
+ list(self.sfh5.keys()), ["1.1", "25.1", "1.2", "1000.1", "1001.1"]
+ )
+ self.assertEqual(
+ self.sfh5["1.2"].attrs,
+ {
+ "NX_class": "NXentry",
+ },
+ )
+
+ def testMcaAbsent(self):
+ def access_absent_mca():
+ """This must raise a KeyError, because scan 1.1 has no MCA"""
+ return self.sfh5["/1.1/measurement/mca_0/"]
+
+ self.assertRaises(KeyError, access_absent_mca)
+
+ def testMcaCalib(self):
+ mca0_calib = self.sfh5["/1.2/measurement/mca_0/info/calibration"]
+ mca1_calib = self.sfh5["/1.2/measurement/mca_1/info/calibration"]
+ self.assertEqual(mca0_calib.tolist(), [1, 2, 3])
+ # calibration is unique in this scan and applies to all analysers
+ self.assertEqual(mca0_calib.tolist(), mca1_calib.tolist())
+
+ def testMcaChannels(self):
+ mca0_chann = self.sfh5["/1.2/measurement/mca_0/info/channels"]
+ mca1_chann = self.sfh5["/1.2/measurement/mca_1/info/channels"]
+ self.assertEqual(mca0_chann.tolist(), [0, 1, 2])
+ self.assertEqual(mca0_chann.tolist(), mca1_chann.tolist())
+
+ def testMcaCtime(self):
+ """Tests for #@CTIME mca header"""
+ datasets = ["preset_time", "live_time", "elapsed_time"]
+ for ds in datasets:
+ self.assertNotIn("/1.1/instrument/mca_0/" + ds, self.sfh5)
+ self.assertIn("/1.2/instrument/mca_0/" + ds, self.sfh5)
+
+ mca0_preset_time = self.sfh5["/1.2/instrument/mca_0/preset_time"]
+ mca1_preset_time = self.sfh5["/1.2/instrument/mca_1/preset_time"]
+ self.assertLess(mca0_preset_time - 123.4, 10**-5)
+ # ctime is unique in a this scan and applies to all analysers
+ self.assertEqual(mca0_preset_time, mca1_preset_time)
+
+ mca0_live_time = self.sfh5["/1.2/instrument/mca_0/live_time"]
+ mca1_live_time = self.sfh5["/1.2/instrument/mca_1/live_time"]
+ self.assertLess(mca0_live_time - 234.5, 10**-5)
+ self.assertEqual(mca0_live_time, mca1_live_time)
+
+ mca0_elapsed_time = self.sfh5["/1.2/instrument/mca_0/elapsed_time"]
+ mca1_elapsed_time = self.sfh5["/1.2/instrument/mca_1/elapsed_time"]
+ self.assertLess(mca0_elapsed_time - 345.6, 10**-5)
+ self.assertEqual(mca0_elapsed_time, mca1_elapsed_time)
+
+ def testMcaData(self):
+ # sum 1st MCA in scan 1.2 over rows
+ mca_0_data = self.sfh5["/1.2/measurement/mca_0/data"]
+ for summed_row, expected in zip(
+ mca_0_data.sum(axis=1).tolist(), [3.0, 12.1, 21.7]
+ ):
+ self.assertAlmostEqual(summed_row, expected, places=4)
+
+ # sum 3rd MCA in scan 1.2 along both axis
+ mca_2_data = self.sfh5["1.2"]["measurement"]["mca_2"]["data"]
+ self.assertAlmostEqual(sum(sum(mca_2_data)), 9.1, places=5)
+ # attrs
+ self.assertEqual(mca_0_data.attrs, {"interpretation": "spectrum"})
+
+ def testMotorPosition(self):
+ positioners_group = self.sfh5["/1.1/instrument/positioners"]
+ # MRTSlit DOWN position is defined in #P0 san header line
+ self.assertAlmostEqual(float(positioners_group["MRTSlit DOWN"]), 0.87125)
+ # MRTSlit UP position is defined in first data column
+ for a, b in zip(
+ positioners_group["MRTSlit UP"].tolist(), [-1.23, 8.478100e01, 3.14, 1.2]
+ ):
+ self.assertAlmostEqual(float(a), b, places=4)
+
+ def testNumberMcaAnalysers(self):
+ """Scan 1.2 has 2 data columns + 3 mca spectra per data line."""
+ self.assertEqual(len(self.sfh5["1.2"]["measurement"]), 5)
+
+ def testTitle(self):
+ self.assertEqual(
+ self.sfh5["/25.1/title"], "ascan c3th 1.33245 1.52245 40 0.15"
+ )
+
+ def testValues(self):
+ group = self.sfh5["/25.1"]
+ self.assertTrue(hasattr(group, "values"))
+ self.assertTrue(callable(group.values))
+ self.assertIn(self.sfh5["/25.1/title"], self.sfh5["/25.1"].values())
+
+ # visit and visititems ignore links
+ def testVisit(self):
+ name_list = []
+ self.sfh5.visit(name_list.append)
+ self.assertIn("1.2/instrument/positioners/Pslit HGap", name_list)
+ self.assertIn("1.2/instrument/specfile/scan_header", name_list)
+ self.assertEqual(len(name_list), 117)
+
+ # test also visit of a subgroup, with various group name formats
+ name_list_leading_and_trailing_slash = []
+ self.sfh5["/1.2/instrument/"].visit(name_list_leading_and_trailing_slash.append)
+ name_list_leading_slash = []
+ self.sfh5["/1.2/instrument"].visit(name_list_leading_slash.append)
+ name_list_trailing_slash = []
+ self.sfh5["1.2/instrument/"].visit(name_list_trailing_slash.append)
+ name_list_no_slash = []
+ self.sfh5["1.2/instrument"].visit(name_list_no_slash.append)
+
+ # no differences expected in the output names
+ self.assertEqual(name_list_leading_and_trailing_slash, name_list_leading_slash)
+ self.assertEqual(name_list_leading_slash, name_list_trailing_slash)
+ self.assertEqual(name_list_leading_slash, name_list_no_slash)
+ self.assertIn("positioners/Pslit HGap", name_list_no_slash)
+ self.assertIn("positioners", name_list_no_slash)
+
+ def testVisitItems(self):
+ dataset_name_list = []
+
+ def func_generator(l):
+ """return a function appending names to list l"""
+
+ def func(name, obj):
+ if isinstance(obj, SpecH5Dataset):
+ l.append(name)
+
+ return func
+
+ self.sfh5.visititems(func_generator(dataset_name_list))
+ self.assertIn("1.2/instrument/positioners/Pslit HGap", dataset_name_list)
+ self.assertEqual(len(dataset_name_list), 85)
+
+ # test also visit of a subgroup, with various group name formats
+ name_list_leading_and_trailing_slash = []
+ self.sfh5["/1.2/instrument/"].visititems(
+ func_generator(name_list_leading_and_trailing_slash)
+ )
+ name_list_leading_slash = []
+ self.sfh5["/1.2/instrument"].visititems(func_generator(name_list_leading_slash))
+ name_list_trailing_slash = []
+ self.sfh5["1.2/instrument/"].visititems(
+ func_generator(name_list_trailing_slash)
+ )
+ name_list_no_slash = []
+ self.sfh5["1.2/instrument"].visititems(func_generator(name_list_no_slash))
+
+ # no differences expected in the output names
+ self.assertEqual(name_list_leading_and_trailing_slash, name_list_leading_slash)
+ self.assertEqual(name_list_leading_slash, name_list_trailing_slash)
+ self.assertEqual(name_list_leading_slash, name_list_no_slash)
+ self.assertIn("positioners/Pslit HGap", name_list_no_slash)
+
+ def testNotSpecH5(self):
+ fd, fname = tempfile.mkstemp()
+ os.write(fd, b"Not a spec file!")
+ os.close(fd)
+ self.assertRaises(specfile.SfErrFileOpen, SpecH5, fname)
+ self.assertRaises(IOError, SpecH5, fname)
+ os.unlink(fname)
+
+ def testSample(self):
+ self.assertNotIn("sample", self.sfh5["/1.1"])
+ self.assertIn("sample", self.sfh5["/1000.1"])
+ self.assertIn("ub_matrix", self.sfh5["/1000.1/sample"])
+ self.assertIn("unit_cell", self.sfh5["/1000.1/sample"])
+ self.assertIn("unit_cell_abc", self.sfh5["/1000.1/sample"])
+ self.assertIn("unit_cell_alphabetagamma", self.sfh5["/1000.1/sample"])
+
+ # All 0 values
+ self.assertNotIn("sample", self.sfh5["/1001.1"])
+ with self.assertRaises(KeyError):
+ self.sfh5["/1001.1/sample/unit_cell"]
+
+ @testutils.validate_logging(spech5.logger1.name, warning=2)
+ def testOpenFileDescriptor(self):
+ """Open a SpecH5 file from a file descriptor"""
+ with io.open(self.sfh5.filename) as f:
+ sfh5 = SpecH5(f)
+ self.assertIsNotNone(sfh5)
+ name_list = []
+ # check if the object is working
+ self.sfh5.visit(name_list.append)
+ sfh5.close()
+
+
+sftext_multi_mca_headers = """
+#S 1 aaaaaa
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#@MCA %16C
+#@CHANN 3 1 3 1
+#@CALIB 5.5 6.6 7.7
+#@CTIME 10 11 12
+#N 3
+#L uno duo
+1 2
+@A 0 1 2
+@A 10 9 8
+3 4
+@A 3.1 4 5
+@A 7 6 5
+5 6
+@A 6 7.7 8
+@A 4 3 2
+
+"""
+
+
+class TestSpecH5MultiMca(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp(text=False)
+ os.write(fd, bytes(sftext_multi_mca_headers, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testMcaCalib(self):
+ mca0_calib = self.sfh5["/1.1/measurement/mca_0/info/calibration"]
+ mca1_calib = self.sfh5["/1.1/measurement/mca_1/info/calibration"]
+ self.assertEqual(mca0_calib.tolist(), [1, 2, 3])
+ self.assertAlmostEqual(sum(mca1_calib.tolist()), sum([5.5, 6.6, 7.7]), places=5)
+
+ def testMcaChannels(self):
+ mca0_chann = self.sfh5["/1.1/measurement/mca_0/info/channels"]
+ mca1_chann = self.sfh5["/1.1/measurement/mca_1/info/channels"]
+ self.assertEqual(mca0_chann.tolist(), [0.0, 1.0, 2.0])
+ # @CHANN is unique in this scan and applies to all analysers
+ self.assertEqual(mca1_chann.tolist(), [1.0, 2.0, 3.0])
+
+ def testMcaCtime(self):
+ """Tests for #@CTIME mca header"""
+ mca0_preset_time = self.sfh5["/1.1/instrument/mca_0/preset_time"]
+ mca1_preset_time = self.sfh5["/1.1/instrument/mca_1/preset_time"]
+ self.assertLess(mca0_preset_time - 123.4, 10**-5)
+ self.assertLess(mca1_preset_time - 10, 10**-5)
+
+ mca0_live_time = self.sfh5["/1.1/instrument/mca_0/live_time"]
+ mca1_live_time = self.sfh5["/1.1/instrument/mca_1/live_time"]
+ self.assertLess(mca0_live_time - 234.5, 10**-5)
+ self.assertLess(mca1_live_time - 11, 10**-5)
+
+ mca0_elapsed_time = self.sfh5["/1.1/instrument/mca_0/elapsed_time"]
+ mca1_elapsed_time = self.sfh5["/1.1/instrument/mca_1/elapsed_time"]
+ self.assertLess(mca0_elapsed_time - 345.6, 10**-5)
+ self.assertLess(mca1_elapsed_time - 12, 10**-5)
+
+
+sftext_no_cols = r"""#F C:/DATA\test.mca
+#D Thu Jul 7 08:40:19 2016
+
+#S 1 31oct98.dat 22.1 If4
+#D Thu Jul 7 08:40:19 2016
+#C no data cols, one mca analyser, single spectrum
+#@MCA %16C
+#@CHANN 151 0 150 1
+#@CALIB 0 2 0
+@A 789 784 788 814 847 862 880 904 925 955 987 1015 1031 1070 1111 1139 \
+1203 1236 1290 1392 1492 1558 1688 1813 1977 2119 2346 2699 3121 3542 4102 4970 \
+6071 7611 10426 16188 28266 40348 50539 55555 56162 54162 47102 35718 24588 17034 12994 11444 \
+11808 13461 15687 18885 23827 31578 41999 49556 58084 59415 59456 55698 44525 28219 17680 12881 \
+9518 7415 6155 5246 4646 3978 3612 3299 3020 2761 2670 2472 2500 2310 2286 2106 \
+1989 1890 1782 1655 1421 1293 1135 990 879 757 672 618 532 488 445 424 \
+414 373 351 325 307 284 270 247 228 213 199 187 183 176 164 156 \
+153 140 142 130 118 118 103 101 97 86 90 86 87 81 75 82 \
+80 76 77 75 76 77 62 69 74 60 65 68 65 58 63 64 \
+63 59 60 56 57 60 55
+
+#S 2 31oct98.dat 22.1 If4
+#D Thu Jul 7 08:40:19 2016
+#C no data cols, one mca analyser, multiple spectra
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+@A 0 1 2
+@A 10 9 8
+@A 1 1 1.1
+@A 3.1 4 5
+@A 7 6 5
+@A 1 1 1
+@A 6 7.7 8
+@A 4 3 2
+@A 1 1 1
+
+#S 3 31oct98.dat 22.1 If4
+#D Thu Jul 7 08:40:19 2016
+#C no data cols, 3 mca analysers, multiple spectra
+#@MCADEV 1
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#@MCADEV 2
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+#@MCADEV 3
+#@MCA %16C
+#@CHANN 3 0 2 1
+#@CALIB 1 2 3
+#@CTIME 123.4 234.5 345.6
+@A 0 1 2
+@A 10 9 8
+@A 1 1 1.1
+@A 3.1 4 5
+@A 7 6 5
+@A 1 1 1
+@A 6 7.7 8
+@A 4 3 2
+@A 1 1 1
+"""
+
+
+class TestSpecH5NoDataCols(unittest.TestCase):
+ """Test reading SPEC files with only MCA data"""
+
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp()
+ os.write(fd, bytes(sftext_no_cols, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testScan1(self):
+ # 1.1: single analyser, single spectrum, 151 channels
+ self.assertIn("mca_0", self.sfh5["1.1/instrument/"])
+ self.assertEqual(self.sfh5["1.1/instrument/mca_0/data"].shape, (1, 151))
+ self.assertNotIn("mca_1", self.sfh5["1.1/instrument/"])
+
+ def testScan2(self):
+ # 2.1: single analyser, 9 spectra, 3 channels
+ self.assertIn("mca_0", self.sfh5["2.1/instrument/"])
+ self.assertEqual(self.sfh5["2.1/instrument/mca_0/data"].shape, (9, 3))
+ self.assertNotIn("mca_1", self.sfh5["2.1/instrument/"])
+
+ def testScan3(self):
+ # 3.1: 3 analysers, 3 spectra/analyser, 3 channels
+ for i in range(3):
+ self.assertIn("mca_%d" % i, self.sfh5["3.1/instrument/"])
+ self.assertEqual(self.sfh5["3.1/instrument/mca_%d/data" % i].shape, (3, 3))
+
+ self.assertNotIn("mca_3", self.sfh5["3.1/instrument/"])
+
+
+sf_text_slash = r"""#F /data/id09/archive/logspecfiles/laue/2016/scan_231_laue_16-11-29.dat
+#D Sat Dec 10 22:20:59 2016
+#O0 Pslit/HGap MRTSlit%UP
+
+#S 1 laue_16-11-29.log 231.1 PD3/A
+#D Sat Dec 10 22:20:59 2016
+#P0 180.005 -0.66875
+#N 2
+#L GONY/mm PD3%A
+-2.015 5.250424e-05
+-2.01 5.30798e-05
+-2.005 5.281903e-05
+-2 5.220436e-05
+"""
+
+
+class TestSpecH5SlashInLabels(unittest.TestCase):
+ """Test reading SPEC files with labels containing a / character
+
+ The / character must be substituted with a %
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ fd, cls.fname = tempfile.mkstemp()
+ os.write(fd, bytes(sf_text_slash, "ascii"))
+ os.close(fd)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.unlink(cls.fname)
+
+ def setUp(self):
+ self.sfh5 = SpecH5(self.fname)
+
+ def tearDown(self):
+ self.sfh5.close()
+
+ def testLabels(self):
+ """Ensure `/` is substituted with `%` and
+ ensure legitimate `%` in names are still working"""
+ self.assertEqual(
+ list(self.sfh5["1.1/measurement/"].keys()), ["GONY%mm", "PD3%A"]
+ )
+
+ # substituted "%"
+ self.assertIn("GONY%mm", self.sfh5["1.1/measurement/"])
+ self.assertNotIn("GONY/mm", self.sfh5["1.1/measurement/"])
+ self.assertAlmostEqual(
+ self.sfh5["1.1/measurement/GONY%mm"][0], -2.015, places=4
+ )
+ # legitimate "%"
+ self.assertIn("PD3%A", self.sfh5["1.1/measurement/"])
+
+ def testMotors(self):
+ """Ensure `/` is substituted with `%` and
+ ensure legitimate `%` in names are still working"""
+ self.assertEqual(
+ list(self.sfh5["1.1/instrument/positioners"].keys()),
+ ["Pslit%HGap", "MRTSlit%UP"],
+ )
+ # substituted "%"
+ self.assertIn("Pslit%HGap", self.sfh5["1.1/instrument/positioners"])
+ self.assertNotIn("Pslit/HGap", self.sfh5["1.1/instrument/positioners"])
+ self.assertAlmostEqual(
+ self.sfh5["1.1/instrument/positioners/Pslit%HGap"], 180.005, places=4
+ )
+ # legitimate "%"
+ self.assertIn("MRTSlit%UP", self.sfh5["1.1/instrument/positioners"])
+
+
+def testUnitCellUBMatrix(tmp_path):
+ """Test unit cell (#G1) and UB matrix (#G3)"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(
+ bytes(
+ """
+#S 1 OK
+#G1 0 1 2 3 4 5
+#G3 0 1 2 3 4 5 6 7 8
+""",
+ encoding="ascii",
+ )
+ )
+ with SpecH5(str(file_path)) as spech5:
+ assert numpy.array_equal(
+ spech5["/1.1/sample/ub_matrix"], numpy.arange(9).reshape(1, 3, 3)
+ )
+ assert numpy.array_equal(spech5["/1.1/sample/unit_cell"], [[0, 1, 2, 3, 4, 5]])
+ assert numpy.array_equal(spech5["/1.1/sample/unit_cell_abc"], [0, 1, 2])
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell_alphabetagamma"], [3, 4, 5]
+ )
+
+
+def testMalformedUnitCellUBMatrix(tmp_path):
+ """Test malformed unit cell (#G1) and UB matrix (#G3): 1 value"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(
+ bytes(
+ """
+#S 1 all malformed=0
+#G1 0
+#G3 0
+""",
+ encoding="ascii",
+ )
+ )
+ with SpecH5(str(file_path)) as spech5:
+ assert "sample" not in spech5["1.1"]
+
+
+def testMalformedUBMatrix(tmp_path):
+ """Test malformed UB matrix (#G3): all zeros"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(
+ bytes(
+ """
+#S 1 G3 all 0
+#G1 0 1 2 3 4 5
+#G3 0 0 0 0 0 0 0 0 0
+""",
+ encoding="ascii",
+ )
+ )
+ with SpecH5(str(file_path)) as spech5:
+ assert "ub_matrix" not in spech5["/1.1/sample"]
+ assert numpy.array_equal(spech5["/1.1/sample/unit_cell"], [[0, 1, 2, 3, 4, 5]])
+ assert numpy.array_equal(spech5["/1.1/sample/unit_cell_abc"], [0, 1, 2])
+ assert numpy.array_equal(
+ spech5["/1.1/sample/unit_cell_alphabetagamma"], [3, 4, 5]
+ )
+
+
+def testMalformedUnitCell(tmp_path):
+ """Test malformed unit cell (#G1): missing values"""
+ file_path = tmp_path / "spec.dat"
+ file_path.write_bytes(
+ bytes(
+ """
+#S 1 G1 malformed missing values
+#G1 0 1 2
+#G3 0 1 2 3 4 5 6 7 8
+""",
+ encoding="ascii",
+ )
+ )
+ with SpecH5(str(file_path)) as spech5:
+ assert "unit_cell" not in spech5["/1.1/sample"]
+ assert "unit_cell_abc" not in spech5["/1.1/sample"]
+ assert "unit_cell_alphabetagamma" not in spech5["/1.1/sample"]
+ assert numpy.array_equal(
+ spech5["/1.1/sample/ub_matrix"], numpy.arange(9).reshape(1, 3, 3)
+ )