summaryrefslogtreecommitdiff
path: root/src/silx/io/spech5.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/silx/io/spech5.py')
-rw-r--r--src/silx/io/spech5.py907
1 files changed, 907 insertions, 0 deletions
diff --git a/src/silx/io/spech5.py b/src/silx/io/spech5.py
new file mode 100644
index 0000000..df2021c
--- /dev/null
+++ b/src/silx/io/spech5.py
@@ -0,0 +1,907 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2021 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""This module provides a h5py-like API to access SpecFile data.
+
+API description
++++++++++++++++
+
+Specfile data structure exposed by this API:
+
+::
+
+ /
+ 1.1/
+ title = "…"
+ start_time = "…"
+ instrument/
+ specfile/
+ file_header = "…"
+ scan_header = "…"
+ positioners/
+ motor_name = value
+ …
+ mca_0/
+ data = …
+ calibration = …
+ channels = …
+ preset_time = …
+ elapsed_time = …
+ live_time = …
+
+ mca_1/
+ …
+ …
+ measurement/
+ colname0 = …
+ colname1 = …
+ …
+ mca_0/
+ data -> /1.1/instrument/mca_0/data
+ info -> /1.1/instrument/mca_0/
+ …
+ sample/
+ ub_matrix = …
+ unit_cell = …
+ unit_cell_abc = …
+ unit_cell_alphabetagamma = …
+ 2.1/
+ …
+
+``file_header`` and ``scan_header`` are the raw headers as they
+appear in the original file, as a string of lines separated by newline (``\\n``) characters.
+
+The title is the content of the ``#S`` scan header line without the leading
+``#S`` and without the scan number (e.g ``"ascan ss1vo -4.55687 -0.556875 40 0.2"``).
+
+The start time is converted to ISO8601 format (``"2016-02-23T22:49:05Z"``),
+if the original date format is standard.
+
+Numeric datasets are stored in *float32* format, except for scalar integers
+which are stored as *int64*.
+
+Motor positions (e.g. ``/1.1/instrument/positioners/motor_name``) can be
+1D numpy arrays if they are measured as scan data, or else scalars as defined
+on ``#P`` scan header lines. A simple test is done to check if the motor name
+is also a data column header defined in the ``#L`` scan header line.
+
+Scan data (e.g. ``/1.1/measurement/colname0``) is accessed by column,
+the dataset name ``colname0`` being the column label as defined in the ``#L``
+scan header line.
+
+If a ``/`` character is present in a column label or in a motor name in the
+original SPEC file, it will be substituted with a ``%`` character in the
+corresponding dataset name.
+
+MCA data is exposed as a 2D numpy array containing all spectra for a given
+analyser. The number of analysers is calculated as the number of MCA spectra
+per scan data line. Demultiplexing is then performed to assign the correct
+spectra to a given analyser.
+
+MCA calibration is an array of 3 scalars, from the ``#@CALIB`` header line.
+It is identical for all MCA analysers, as there can be only one
+``#@CALIB`` line per scan.
+
+MCA channels is an array containing all channel numbers. This information is
+computed from the ``#@CHANN`` scan header line (if present), or computed from
+the shape of the first spectrum in a scan (``[0, … len(first_spectrum] - 1]``).
+
+Accessing data
+++++++++++++++
+
+Data and groups are accessed in :mod:`h5py` fashion::
+
+ from silx.io.spech5 import SpecH5
+
+ # Open a SpecFile
+ sfh5 = SpecH5("test.dat")
+
+ # using SpecH5 as a regular group to access scans
+ scan1group = sfh5["1.1"]
+ instrument_group = scan1group["instrument"]
+
+ # alternative: full path access
+ measurement_group = sfh5["/1.1/measurement"]
+
+ # accessing a scan data column by name as a 1D numpy array
+ data_array = measurement_group["Pslit HGap"]
+
+ # accessing all mca-spectra for one MCA device
+ mca_0_spectra = measurement_group["mca_0/data"]
+
+:class:`SpecH5` files and groups provide a :meth:`keys` method::
+
+ >>> sfh5.keys()
+ ['96.1', '97.1', '98.1']
+ >>> sfh5['96.1'].keys()
+ ['title', 'start_time', 'instrument', 'measurement']
+
+They can also be treated as iterators:
+
+.. code-block:: python
+
+ from silx.io import is_dataset
+
+ for scan_group in SpecH5("test.dat"):
+ dataset_names = [item.name in scan_group["measurement"] if
+ is_dataset(item)]
+ print("Found data columns in scan " + scan_group.name)
+ print(", ".join(dataset_names))
+
+You can test for existence of data or groups::
+
+ >>> "/1.1/measurement/Pslit HGap" in sfh5
+ True
+ >>> "positioners" in sfh5["/2.1/instrument"]
+ True
+ >>> "spam" in sfh5["1.1"]
+ False
+
+.. note::
+
+ Text used to be stored with a dtype ``numpy.string_`` in silx versions
+ prior to *0.7.0*. The type ``numpy.string_`` is a byte-string format.
+ The consequence of this is that you had to decode strings before using
+ them in **Python 3**::
+
+ >>> from silx.io.spech5 import SpecH5
+ >>> sfh5 = SpecH5("31oct98.dat")
+ >>> sfh5["/68.1/title"]
+ b'68 ascan tx3 -28.5 -24.5 20 0.5'
+ >>> sfh5["/68.1/title"].decode()
+ '68 ascan tx3 -28.5 -24.5 20 0.5'
+
+ From silx version *0.7.0* onwards, text is now stored as unicode. This
+ corresponds to the default text type in python 3, and to the *unicode*
+ type in Python 2.
+
+ To be on the safe side, you can test for the presence of a *decode*
+ attribute, to ensure that you always work with unicode text::
+
+ >>> title = sfh5["/68.1/title"]
+ >>> if hasattr(title, "decode"):
+ ... title = title.decode()
+
+"""
+
+import datetime
+import logging
+import re
+import io
+
+import h5py
+import numpy
+
+from silx import version as silx_version
+from .specfile import SpecFile, SfErrColNotFound
+from . import commonh5
+
+__authors__ = ["P. Knobel", "D. Naudet"]
+__license__ = "MIT"
+__date__ = "17/07/2018"
+
+logger1 = logging.getLogger(__name__)
+
+
+text_dtype = h5py.special_dtype(vlen=str)
+
+
+def to_h5py_utf8(str_list):
+ """Convert a string or a list of strings to a numpy array of
+ unicode strings that can be written to HDF5 as utf-8.
+
+ This ensures that the type will be consistent between python 2 and
+ python 3, if attributes or datasets are saved to an HDF5 file.
+ """
+ return numpy.array(str_list, dtype=text_dtype)
+
+
+def _get_number_of_mca_analysers(scan):
+ """
+ :param SpecFile sf: :class:`SpecFile` instance
+ """
+ number_of_mca_spectra = len(scan.mca)
+ # Scan.data is transposed
+ number_of_data_lines = scan.data.shape[1]
+
+ if not number_of_data_lines == 0:
+ # Number of MCA spectra must be a multiple of number of data lines
+ assert number_of_mca_spectra % number_of_data_lines == 0
+ return number_of_mca_spectra // number_of_data_lines
+ elif number_of_mca_spectra:
+ # Case of a scan without data lines, only MCA.
+ # Our only option is to assume that the number of analysers
+ # is the number of #@CHANN lines
+ return len(scan.mca.channels)
+ else:
+ return 0
+
+
+def _motor_in_scan(sf, scan_key, motor_name):
+ """
+ :param sf: :class:`SpecFile` instance
+ :param scan_key: Scan identification key (e.g. ``1.1``)
+ :param motor_name: Name of motor as defined in file header lines
+ :return: ``True`` if motor exists in scan, else ``False``
+ :raise: ``KeyError`` if scan_key not found in SpecFile
+ """
+ if scan_key not in sf:
+ raise KeyError("Scan key %s " % scan_key +
+ "does not exist in SpecFile %s" % sf.filename)
+ ret = motor_name in sf[scan_key].motor_names
+ if not ret and "%" in motor_name:
+ motor_name = motor_name.replace("%", "/")
+ ret = motor_name in sf[scan_key].motor_names
+ return ret
+
+
+def _column_label_in_scan(sf, scan_key, column_label):
+ """
+ :param sf: :class:`SpecFile` instance
+ :param scan_key: Scan identification key (e.g. ``1.1``)
+ :param column_label: Column label as defined in scan header
+ :return: ``True`` if data column label exists in scan, else ``False``
+ :raise: ``KeyError`` if scan_key not found in SpecFile
+ """
+ if scan_key not in sf:
+ raise KeyError("Scan key %s " % scan_key +
+ "does not exist in SpecFile %s" % sf.filename)
+ ret = column_label in sf[scan_key].labels
+ if not ret and "%" in column_label:
+ column_label = column_label.replace("%", "/")
+ ret = column_label in sf[scan_key].labels
+ return ret
+
+
+def _parse_UB_matrix(header_line):
+ """Parse G3 header line and return UB matrix
+
+ :param str header_line: G3 header line
+ :return: UB matrix
+ :raises ValueError: For malformed UB matrix header line
+ """
+ values = list(map(float, header_line.split())) # Can raise ValueError
+ if len(values) < 9:
+ raise ValueError("Not enough values in UB matrix")
+ return numpy.array(values).reshape((1, 3, 3))
+
+
+def _ub_matrix_in_scan(scan):
+ """Return True if scan header has a G3 line and all values are not 0.
+
+ :param scan: specfile.Scan instance
+ :return: True or False
+ """
+ header_line = scan.scan_header_dict.get("G3", None)
+ if header_line is None:
+ return False
+ try:
+ ub_matrix = _parse_UB_matrix(header_line)
+ except ValueError:
+ logger1.warning("Malformed G3 header line")
+ return False
+ return numpy.any(ub_matrix)
+
+
+def _parse_unit_cell(header_line):
+ """Parse G1 header line and return unit cell
+
+ :param str header_line: G1 header line
+ :return: unit cell
+ :raises ValueError: For malformed unit cell header line
+ """
+ values = list(map(float, header_line.split()[0:6])) # can raise ValueError
+ if len(values) < 6:
+ raise ValueError("Not enough values in unit cell")
+ return numpy.array(values).reshape((1, 6))
+
+
+def _unit_cell_in_scan(scan):
+ """Return True if scan header has a G1 line and all values are not 0.
+
+ :param scan: specfile.Scan instance
+ :return: True or False
+ """
+ header_line = scan.scan_header_dict.get("G1", None)
+ if header_line is None:
+ return False
+ try:
+ unit_cell = _parse_unit_cell(header_line)
+ except ValueError:
+ logger1.warning("Malformed G1 header line")
+ return False
+ return numpy.any(unit_cell)
+
+
+def _parse_ctime(ctime_lines, analyser_index=0):
+ """
+ :param ctime_lines: e.g ``@CTIME %f %f %f``, first word ``@CTIME`` optional
+ When multiple CTIME lines are present in a scan header, this argument
+ is a concatenation of them separated by a ``\\n`` character.
+ :param analyser_index: MCA device/analyser index, when multiple devices
+ are in a scan.
+ :return: (preset_time, live_time, elapsed_time)
+ """
+ ctime_lines = ctime_lines.lstrip("@CTIME ")
+ ctimes_lines_list = ctime_lines.split("\n")
+ if len(ctimes_lines_list) == 1:
+ # single @CTIME line for all devices
+ ctime_line = ctimes_lines_list[0]
+ else:
+ ctime_line = ctimes_lines_list[analyser_index]
+ if not len(ctime_line.split()) == 3:
+ raise ValueError("Incorrect format for @CTIME header line " +
+ '(expected "@CTIME %f %f %f").')
+ return list(map(float, ctime_line.split()))
+
+
+def spec_date_to_iso8601(date, zone=None):
+ """Convert SpecFile date to Iso8601.
+
+ :param date: Date (see supported formats below)
+ :type date: str
+ :param zone: Time zone as it appears in a ISO8601 date
+
+ Supported formats:
+
+ * ``DDD MMM dd hh:mm:ss YYYY``
+ * ``DDD YYYY/MM/dd hh:mm:ss YYYY``
+
+ where `DDD` is the abbreviated weekday, `MMM` is the month abbreviated
+ name, `MM` is the month number (zero padded), `dd` is the weekday number
+ (zero padded) `YYYY` is the year, `hh` the hour (zero padded), `mm` the
+ minute (zero padded) and `ss` the second (zero padded).
+ All names are expected to be in english.
+
+ Examples::
+
+ >>> spec_date_to_iso8601("Thu Feb 11 09:54:35 2016")
+ '2016-02-11T09:54:35'
+
+ >>> spec_date_to_iso8601("Sat 2015/03/14 03:53:50")
+ '2015-03-14T03:53:50'
+ """
+ months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
+ 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
+ days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
+
+ days_rx = '(?P<day>' + '|'.join(days) + ')'
+ months_rx = '(?P<month>' + '|'.join(months) + ')'
+ year_rx = r'(?P<year>\d{4})'
+ day_nb_rx = r'(?P<day_nb>[0-3 ]\d)'
+ month_nb_rx = r'(?P<month_nb>[0-1]\d)'
+ hh_rx = r'(?P<hh>[0-2]\d)'
+ mm_rx = r'(?P<mm>[0-5]\d)'
+ ss_rx = r'(?P<ss>[0-5]\d)'
+ tz_rx = r'(?P<tz>[+-]\d\d:\d\d){0,1}'
+
+ # date formats must have either month_nb (1..12) or month (Jan, Feb, ...)
+ re_tpls = ['{days} {months} {day_nb} {hh}:{mm}:{ss}{tz} {year}',
+ '{days} {year}/{month_nb}/{day_nb} {hh}:{mm}:{ss}{tz}']
+
+ grp_d = None
+
+ for rx in re_tpls:
+ full_rx = rx.format(days=days_rx,
+ months=months_rx,
+ year=year_rx,
+ day_nb=day_nb_rx,
+ month_nb=month_nb_rx,
+ hh=hh_rx,
+ mm=mm_rx,
+ ss=ss_rx,
+ tz=tz_rx)
+ m = re.match(full_rx, date)
+
+ if m:
+ grp_d = m.groupdict()
+ break
+
+ if not grp_d:
+ raise ValueError('Date format not recognized : {0}'.format(date))
+
+ year = grp_d['year']
+
+ month = grp_d.get('month_nb')
+
+ if not month:
+ month = '{0:02d}'.format(months.index(grp_d.get('month')) + 1)
+
+ day = grp_d['day_nb']
+
+ tz = grp_d['tz']
+ if not tz:
+ tz = zone
+
+ time = '{0}:{1}:{2}'.format(grp_d['hh'],
+ grp_d['mm'],
+ grp_d['ss'])
+
+ full_date = '{0}-{1}-{2}T{3}{4}'.format(year,
+ month,
+ day,
+ time,
+ tz if tz else '')
+ return full_date
+
+
+def _demultiplex_mca(scan, analyser_index):
+ """Return MCA data for a single analyser.
+
+ Each MCA spectrum is a 1D array. For each analyser, there is one
+ spectrum recorded per scan data line. When there are more than a single
+ MCA analyser in a scan, the data will be multiplexed. For instance if
+ there are 3 analysers, the consecutive spectra for the first analyser must
+ be accessed as ``mca[0], mca[3], mca[6]…``.
+
+ :param scan: :class:`Scan` instance containing the MCA data
+ :param analyser_index: 0-based index referencing the analyser
+ :type analyser_index: int
+ :return: 2D numpy array containing all spectra for one analyser
+ """
+ number_of_analysers = _get_number_of_mca_analysers(scan)
+ number_of_spectra = len(scan.mca)
+ number_of_spectra_per_analyser = number_of_spectra // number_of_analysers
+ len_spectrum = len(scan.mca[analyser_index])
+
+ mca_array = numpy.empty((number_of_spectra_per_analyser, len_spectrum))
+
+ for i in range(number_of_spectra_per_analyser):
+ mca_array[i, :] = scan.mca[analyser_index + i * number_of_analysers]
+
+ return mca_array
+
+
+# Node classes
+class SpecH5Dataset(object):
+ """This convenience class is to be inherited by all datasets, for
+ compatibility purpose with code that tests for
+ ``isinstance(obj, SpecH5Dataset)``.
+
+ This legacy behavior is deprecated. The correct way to test
+ if an object is a dataset is to use :meth:`silx.io.utils.is_dataset`.
+
+ Datasets must also inherit :class:`SpecH5NodeDataset` or
+ :class:`SpecH5LazyNodeDataset` which actually implement all the
+ API."""
+ pass
+
+
+class SpecH5NodeDataset(commonh5.Dataset, SpecH5Dataset):
+ """This class inherits :class:`commonh5.Dataset`, to which it adds
+ little extra functionality. The main additional functionality is the
+ proxy behavior that allows to mimic the numpy array stored in this
+ class.
+ """
+ def __init__(self, name, data, parent=None, attrs=None):
+ # get proper value types, to inherit from numpy
+ # attributes (dtype, shape, size)
+ if isinstance(data, str):
+ # use unicode (utf-8 when saved to HDF5 output)
+ value = to_h5py_utf8(data)
+ elif isinstance(data, float):
+ # use 32 bits for float scalars
+ value = numpy.float32(data)
+ elif isinstance(data, int):
+ value = numpy.int_(data)
+ else:
+ # Enforce numpy array
+ array = numpy.array(data)
+ data_kind = array.dtype.kind
+
+ if data_kind in ["S", "U"]:
+ value = numpy.asarray(array,
+ dtype=text_dtype)
+ elif data_kind in ["f"]:
+ value = numpy.asarray(array, dtype=numpy.float32)
+ else:
+ value = array
+ commonh5.Dataset.__init__(self, name, value, parent, attrs)
+
+ def __getattr__(self, item):
+ """Proxy to underlying numpy array methods.
+ """
+ if hasattr(self[()], item):
+ return getattr(self[()], item)
+
+ raise AttributeError("SpecH5Dataset has no attribute %s" % item)
+
+
+class SpecH5LazyNodeDataset(commonh5.LazyLoadableDataset, SpecH5Dataset):
+ """This class inherits :class:`commonh5.LazyLoadableDataset`,
+ to which it adds a proxy behavior that allows to mimic the numpy
+ array stored in this class.
+
+ The class has to be inherited and the :meth:`_create_data` method has to be
+ implemented to return the numpy data exposed by the dataset. This factory
+ method is only called once, when the data is needed.
+ """
+ def __getattr__(self, item):
+ """Proxy to underlying numpy array methods.
+ """
+ if hasattr(self[()], item):
+ return getattr(self[()], item)
+
+ raise AttributeError("SpecH5Dataset has no attribute %s" % item)
+
+ def _create_data(self):
+ """
+ Factory to create the data exposed by the dataset when it is needed.
+
+ It has to be implemented for the class to work.
+
+ :rtype: numpy.ndarray
+ """
+ raise NotImplementedError()
+
+
+class SpecH5Group(object):
+ """This convenience class is to be inherited by all groups, for
+ compatibility purposes with code that tests for
+ ``isinstance(obj, SpecH5Group)``.
+
+ This legacy behavior is deprecated. The correct way to test
+ if an object is a group is to use :meth:`silx.io.utils.is_group`.
+
+ Groups must also inherit :class:`silx.io.commonh5.Group`, which
+ actually implements all the methods and attributes."""
+ pass
+
+
+class SpecH5(commonh5.File, SpecH5Group):
+ """This class opens a SPEC file and exposes it as a *h5py.File*.
+
+ It inherits :class:`silx.io.commonh5.Group` (via :class:`commonh5.File`),
+ which implements most of its API.
+ """
+
+ def __init__(self, filename):
+ """
+ :param filename: Path to SpecFile in filesystem
+ :type filename: str
+ """
+ if isinstance(filename, io.IOBase):
+ # see https://github.com/silx-kit/silx/issues/858
+ filename = filename.name
+
+ self._sf = SpecFile(filename)
+
+ attrs = {"NX_class": to_h5py_utf8("NXroot"),
+ "file_time": to_h5py_utf8(
+ datetime.datetime.now().isoformat()),
+ "file_name": to_h5py_utf8(filename),
+ "creator": to_h5py_utf8("silx spech5 %s" % silx_version)}
+ commonh5.File.__init__(self, filename, attrs=attrs)
+
+ for scan_key in self._sf.keys():
+ scan = self._sf[scan_key]
+ scan_group = ScanGroup(scan_key, parent=self, scan=scan)
+ self.add_node(scan_group)
+
+ def close(self):
+ self._sf.close()
+ self._sf = None
+
+
+class ScanGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, scan_key, parent, scan):
+ """
+
+ :param parent: parent Group
+ :param str scan_key: Scan key (e.g. "1.1")
+ :param scan: specfile.Scan object
+ """
+ commonh5.Group.__init__(self, scan_key, parent=parent,
+ attrs={"NX_class": to_h5py_utf8("NXentry")})
+
+ # take title in #S after stripping away scan number and spaces
+ s_hdr_line = scan.scan_header_dict["S"]
+ title = s_hdr_line.lstrip("0123456789").lstrip()
+ self.add_node(SpecH5NodeDataset(name="title",
+ data=to_h5py_utf8(title),
+ parent=self))
+
+ if "D" in scan.scan_header_dict:
+ try:
+ start_time_str = spec_date_to_iso8601(scan.scan_header_dict["D"])
+ except (IndexError, ValueError):
+ logger1.warning("Could not parse date format in scan %s header." +
+ " Using original date not converted to ISO-8601",
+ scan_key)
+ start_time_str = scan.scan_header_dict["D"]
+ elif "D" in scan.file_header_dict:
+ logger1.warning("No #D line in scan %s header. " +
+ "Using file header for start_time.",
+ scan_key)
+ try:
+ start_time_str = spec_date_to_iso8601(scan.file_header_dict["D"])
+ except (IndexError, ValueError):
+ logger1.warning("Could not parse date format in scan %s header. " +
+ "Using original date not converted to ISO-8601",
+ scan_key)
+ start_time_str = scan.file_header_dict["D"]
+ else:
+ logger1.warning("No #D line in %s header. Setting date to empty string.",
+ scan_key)
+ start_time_str = ""
+ self.add_node(SpecH5NodeDataset(name="start_time",
+ data=to_h5py_utf8(start_time_str),
+ parent=self))
+
+ self.add_node(InstrumentGroup(parent=self, scan=scan))
+ self.add_node(MeasurementGroup(parent=self, scan=scan))
+ if _unit_cell_in_scan(scan) or _ub_matrix_in_scan(scan):
+ self.add_node(SampleGroup(parent=self, scan=scan))
+
+
+class InstrumentGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
+ """
+
+ :param parent: parent Group
+ :param scan: specfile.Scan object
+ """
+ commonh5.Group.__init__(self, name="instrument", parent=parent,
+ attrs={"NX_class": to_h5py_utf8("NXinstrument")})
+
+ self.add_node(InstrumentSpecfileGroup(parent=self, scan=scan))
+ self.add_node(PositionersGroup(parent=self, scan=scan))
+
+ num_analysers = _get_number_of_mca_analysers(scan)
+ for anal_idx in range(num_analysers):
+ self.add_node(InstrumentMcaGroup(parent=self,
+ analyser_index=anal_idx,
+ scan=scan))
+
+
+class InstrumentSpecfileGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
+ commonh5.Group.__init__(self, name="specfile", parent=parent,
+ attrs={"NX_class": to_h5py_utf8("NXcollection")})
+ self.add_node(SpecH5NodeDataset(
+ name="file_header",
+ data=to_h5py_utf8(scan.file_header),
+ parent=self,
+ attrs={}))
+ self.add_node(SpecH5NodeDataset(
+ name="scan_header",
+ data=to_h5py_utf8(scan.scan_header),
+ parent=self,
+ attrs={}))
+
+
+class PositionersGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
+ commonh5.Group.__init__(self, name="positioners", parent=parent,
+ attrs={"NX_class": to_h5py_utf8("NXcollection")})
+
+ dataset_info = [] # Store list of positioner's (name, value)
+ is_error = False # True if error encountered
+
+ for motor_name in scan.motor_names:
+ safe_motor_name = motor_name.replace("/", "%")
+ if motor_name in scan.labels and scan.data.shape[0] > 0:
+ # return a data column if one has the same label as the motor
+ motor_value = scan.data_column_by_name(motor_name)
+ else:
+ # Take value from #P scan header.
+ # (may return float("inf") if #P line is missing from scan hdr)
+ try:
+ motor_value = scan.motor_position_by_name(motor_name)
+ except SfErrColNotFound:
+ is_error = True
+ motor_value = float('inf')
+ dataset_info.append((safe_motor_name, motor_value))
+
+ if is_error: # Filter-out scalar values
+ logger1.warning("Mismatching number of elements in #P and #O: Ignoring")
+ dataset_info = [
+ (name, value) for name, value in dataset_info
+ if not isinstance(value, float)]
+
+ for name, value in dataset_info:
+ self.add_node(SpecH5NodeDataset(
+ name=name,
+ data=value,
+ parent=self))
+
+
+class InstrumentMcaGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, analyser_index, scan):
+ name = "mca_%d" % analyser_index
+ commonh5.Group.__init__(self, name=name, parent=parent,
+ attrs={"NX_class": to_h5py_utf8("NXdetector")})
+
+ mcaDataDataset = McaDataDataset(parent=self,
+ analyser_index=analyser_index,
+ scan=scan)
+ self.add_node(mcaDataDataset)
+ spectrum_length = mcaDataDataset.shape[-1]
+ mcaDataDataset = None
+
+ if len(scan.mca.channels) == 1:
+ # single @CALIB line applying to multiple devices
+ calibration_dataset = scan.mca.calibration[0]
+ channels_dataset = scan.mca.channels[0]
+ else:
+ calibration_dataset = scan.mca.calibration[analyser_index]
+ channels_dataset = scan.mca.channels[analyser_index]
+
+ channels_length = len(channels_dataset)
+ if (channels_length > 1) and (spectrum_length > 0):
+ logger1.info("Spectrum and channels length mismatch")
+ # this should always be the case
+ if channels_length > spectrum_length:
+ channels_dataset = channels_dataset[:spectrum_length]
+ elif channels_length < spectrum_length:
+ # only trust first channel and increment
+ channel0 = channels_dataset[0]
+ increment = channels_dataset[1] - channels_dataset[0]
+ channels_dataset = numpy.linspace(channel0,
+ channel0 + increment * spectrum_length,
+ spectrum_length, endpoint=False)
+
+ self.add_node(SpecH5NodeDataset(name="calibration",
+ data=calibration_dataset,
+ parent=self))
+ self.add_node(SpecH5NodeDataset(name="channels",
+ data=channels_dataset,
+ parent=self))
+
+ if "CTIME" in scan.mca_header_dict:
+ ctime_line = scan.mca_header_dict['CTIME']
+ preset_time, live_time, elapsed_time = _parse_ctime(ctime_line, analyser_index)
+ self.add_node(SpecH5NodeDataset(name="preset_time",
+ data=preset_time,
+ parent=self))
+ self.add_node(SpecH5NodeDataset(name="live_time",
+ data=live_time,
+ parent=self))
+ self.add_node(SpecH5NodeDataset(name="elapsed_time",
+ data=elapsed_time,
+ parent=self))
+
+
+class McaDataDataset(SpecH5LazyNodeDataset):
+ """Lazy loadable dataset for MCA data"""
+ def __init__(self, parent, analyser_index, scan):
+ commonh5.LazyLoadableDataset.__init__(
+ self, name="data", parent=parent,
+ attrs={"interpretation": to_h5py_utf8("spectrum"),})
+ self._scan = scan
+ self._analyser_index = analyser_index
+ self._shape = None
+ self._num_analysers = _get_number_of_mca_analysers(self._scan)
+
+ def _create_data(self):
+ return _demultiplex_mca(self._scan, self._analyser_index)
+
+ @property
+ def shape(self):
+ if self._shape is None:
+ num_spectra_in_file = len(self._scan.mca)
+ num_spectra_per_analyser = num_spectra_in_file // self._num_analysers
+ len_spectrum = len(self._scan.mca[self._analyser_index])
+ self._shape = num_spectra_per_analyser, len_spectrum
+ return self._shape
+
+ @property
+ def size(self):
+ return numpy.prod(self.shape, dtype=numpy.intp)
+
+ @property
+ def dtype(self):
+ # we initialize the data with numpy.empty() without specifying a dtype
+ # in _demultiplex_mca()
+ return numpy.empty((1, )).dtype
+
+ def __len__(self):
+ return self.shape[0]
+
+ def __getitem__(self, item):
+ # optimization for fetching a single spectrum if data not already loaded
+ if not self._is_initialized:
+ if isinstance(item, int):
+ if item < 0:
+ # negative indexing
+ item += len(self)
+ return self._scan.mca[self._analyser_index +
+ item * self._num_analysers]
+ # accessing a slice or element of a single spectrum [i, j:k]
+ try:
+ spectrum_idx, channel_idx_or_slice = item
+ assert isinstance(spectrum_idx, int)
+ except (ValueError, TypeError, AssertionError):
+ pass
+ else:
+ if spectrum_idx < 0:
+ item += len(self)
+ idx = self._analyser_index + spectrum_idx * self._num_analysers
+ return self._scan.mca[idx][channel_idx_or_slice]
+
+ return super(McaDataDataset, self).__getitem__(item)
+
+
+class MeasurementGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
+ """
+
+ :param parent: parent Group
+ :param scan: specfile.Scan object
+ """
+ commonh5.Group.__init__(self, name="measurement", parent=parent,
+ attrs={"NX_class": to_h5py_utf8("NXcollection"),})
+ for label in scan.labels:
+ safe_label = label.replace("/", "%")
+ self.add_node(SpecH5NodeDataset(name=safe_label,
+ data=scan.data_column_by_name(label),
+ parent=self))
+
+ num_analysers = _get_number_of_mca_analysers(scan)
+ for anal_idx in range(num_analysers):
+ self.add_node(MeasurementMcaGroup(parent=self, analyser_index=anal_idx))
+
+
+class MeasurementMcaGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, analyser_index):
+ basename = "mca_%d" % analyser_index
+ commonh5.Group.__init__(self, name=basename, parent=parent,
+ attrs={})
+
+ target_name = self.name.replace("measurement", "instrument")
+ self.add_node(commonh5.SoftLink(name="data",
+ path=target_name + "/data",
+ parent=self))
+ self.add_node(commonh5.SoftLink(name="info",
+ path=target_name,
+ parent=self))
+
+
+class SampleGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
+ """
+
+ :param parent: parent Group
+ :param scan: specfile.Scan object
+ """
+ commonh5.Group.__init__(self, name="sample", parent=parent,
+ attrs={"NX_class": to_h5py_utf8("NXsample"),})
+
+ if _unit_cell_in_scan(scan):
+ self.add_node(SpecH5NodeDataset(name="unit_cell",
+ data=_parse_unit_cell(scan.scan_header_dict["G1"]),
+ parent=self,
+ attrs={"interpretation": to_h5py_utf8("scalar")}))
+ self.add_node(SpecH5NodeDataset(name="unit_cell_abc",
+ data=_parse_unit_cell(scan.scan_header_dict["G1"])[0, 0:3],
+ parent=self,
+ attrs={"interpretation": to_h5py_utf8("scalar")}))
+ self.add_node(SpecH5NodeDataset(name="unit_cell_alphabetagamma",
+ data=_parse_unit_cell(scan.scan_header_dict["G1"])[0, 3:6],
+ parent=self,
+ attrs={"interpretation": to_h5py_utf8("scalar")}))
+ if _ub_matrix_in_scan(scan):
+ self.add_node(SpecH5NodeDataset(name="ub_matrix",
+ data=_parse_UB_matrix(scan.scan_header_dict["G3"]),
+ parent=self,
+ attrs={"interpretation": to_h5py_utf8("scalar")}))