summaryrefslogtreecommitdiff
path: root/silx/io/spech5.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/spech5.py')
-rw-r--r--silx/io/spech5.py1573
1 files changed, 383 insertions, 1190 deletions
diff --git a/silx/io/spech5.py b/silx/io/spech5.py
index 58a6c6b..81a7a7e 100644
--- a/silx/io/spech5.py
+++ b/silx/io/spech5.py
@@ -24,7 +24,8 @@
"""This module provides a h5py-like API to access SpecFile data.
API description
-===============
++++++++++++++++
+
Specfile data structure exposed by this API:
::
@@ -59,6 +60,11 @@ Specfile data structure exposed by this API:
data -> /1.1/instrument/mca_0/data
info -> /1.1/instrument/mca_0/
+ sample/
+ ub_matrix = …
+ unit_cell = …
+ unit_cell_abc = …
+ unit_cell_alphabetagamma = …
2.1/
@@ -101,7 +107,7 @@ computed from the ``#@CHANN`` scan header line (if present), or computed from
the shape of the first spectrum in a scan (``[0, … len(first_spectrum] - 1]``).
Accessing data
-==============
+++++++++++++++
Data and groups are accessed in :mod:`h5py` fashion::
@@ -123,7 +129,7 @@ Data and groups are accessed in :mod:`h5py` fashion::
# accessing all mca-spectra for one MCA device
mca_0_spectra = measurement_group["mca_0/data"]
-:class:`SpecH5` and :class:`SpecH5Group` provide a :meth:`SpecH5Group.keys` method::
+:class:`SpecH5` files and groups provide a :meth:`keys` method::
>>> sfh5.keys()
['96.1', '97.1', '98.1']
@@ -134,9 +140,11 @@ They can also be treated as iterators:
.. code-block:: python
+ from silx.io import is_dataset
+
for scan_group in SpecH5("test.dat"):
dataset_names = [item.name in scan_group["measurement"] if
- isinstance(item, SpecH5Dataset)]
+ is_dataset(item)]
print("Found data columns in scan " + scan_group.name)
print(", ".join(dataset_names))
@@ -164,30 +172,23 @@ is that you should decode strings before using them in **Python 3**::
>>> sfh5["/68.1/title"].decode()
'68 ascan tx3 -28.5 -24.5 20 0.5'
-
-Classes
-=======
-
-- :class:`SpecH5`
-- :class:`SpecH5Group`
-- :class:`SpecH5Dataset`
-- :class:`SpecH5LinkToGroup`
-- :class:`SpecH5LinkToDataset`
"""
+import datetime
import logging
import numpy
-import posixpath
import re
import sys
+import io
+from silx import version as silx_version
from .specfile import SpecFile
+from . import commonh5
__authors__ = ["P. Knobel", "D. Naudet"]
__license__ = "MIT"
-__date__ = "11/05/2017"
+__date__ = "23/08/2017"
-logging.basicConfig()
logger1 = logging.getLogger(__name__)
try:
@@ -198,303 +199,22 @@ except ImportError:
string_types = (basestring,) if sys.version_info[0] == 2 else (str,) # noqa
-
-# Static subitems: all groups and datasets that are present in any
-# scan (excludes list of scans, data columns, list of mca devices,
-# optional mca headers, optional sample group)
-static_items = {
- "scan": [u"title", u"start_time", u"instrument",
- u"measurement"],
- "scan/instrument": [u"specfile", u"positioners"],
- "scan/instrument/specfile": [u"file_header", u"scan_header"],
- "scan/measurement/mca": [u"data", u"info"],
- "scan/instrument/mca": [u"data", u"calibration", u"channels"],
-}
-
-# Patterns for group keys
-root_pattern = re.compile(r"/$")
-scan_pattern = re.compile(r"/[0-9]+\.[0-9]+/?$")
-instrument_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/?$")
-sample_pattern = re.compile(r"/[0-9]+\.[0-9]+/sample/?$")
-specfile_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/specfile/?$")
-positioners_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/positioners/?$")
-measurement_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/?$")
-measurement_mca_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_[0-9]+/?$")
-instrument_mca_group_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_[0-9]+/?$")
-
-
-# Link to group
-measurement_mca_info_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_([0-9]+)/info/?$")
-
-# Patterns for dataset keys
-header_pattern = re.compile(r"/[0-9]+\.[0-9]+/header$")
-title_pattern = re.compile(r"/[0-9]+\.[0-9]+/title$")
-start_time_pattern = re.compile(r"/[0-9]+\.[0-9]+/start_time$")
-file_header_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/specfile/file_header$")
-scan_header_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/specfile/scan_header$")
-positioners_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/positioners/([^/]+)$")
-measurement_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/([^/]+)$")
-instrument_mca_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_([0-9]+)/data$")
-instrument_mca_calib_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_([0-9]+)/calibration$")
-instrument_mca_chann_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_([0-9])+/channels$")
-instrument_mca_preset_t_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_[0-9]+/preset_time$")
-instrument_mca_elapsed_t_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_[0-9]+/elapsed_time$")
-instrument_mca_live_t_pattern = re.compile(r"/[0-9]+\.[0-9]+/instrument/mca_[0-9]+/live_time$")
-ub_matrix_pattern = re.compile(r"/[0-9]+\.[0-9]+/sample/ub_matrix$")
-unit_cell_pattern = re.compile(r"/[0-9]+\.[0-9]+/sample/unit_cell$")
-unit_cell_abc_pattern = re.compile(r"/[0-9]+\.[0-9]+/sample/unit_cell_abc$")
-unit_cell_alphabetagamma_pattern = re.compile(r"/[0-9]+\.[0-9]+/sample/unit_cell_alphabetagamma$")
-
-# Links to dataset
-measurement_mca_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_([0-9]+)/data$")
-# info/ + calibration, channel, preset_time, live_time, elapsed_time (not data)
-measurement_mca_info_dataset_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_([0-9]+)/info/([^d/][^/]+)$")
-# info/data
-measurement_mca_info_data_pattern = re.compile(r"/[0-9]+\.[0-9]+/measurement/mca_([0-9]+)/info/data$")
-
-
-def _bulk_match(string_, list_of_patterns):
- """Check whether a string matches any regular expression pattern in a list
- """
- for pattern in list_of_patterns:
- if pattern.match(string_):
- return True
- return False
-
-
-def is_group(name):
- """Check if ``name`` matches a valid group name pattern in a
- :class:`SpecH5`.
-
- :param name: Full name of member
- :type name: str
-
- For example:
-
- - ``is_group("/123.456/instrument/")`` returns ``True``.
- - ``is_group("spam")`` returns ``False`` because :literal:`\"spam\"`
- is not at all a valid group name.
- - ``is_group("/1.2/instrument/positioners/xyz")`` returns ``False``
- because this key would point to a motor position, which is a
- dataset and not a group.
- """
- group_patterns = (
- root_pattern, scan_pattern, instrument_pattern,
- specfile_group_pattern, positioners_group_pattern,
- measurement_group_pattern, measurement_mca_group_pattern,
- instrument_mca_group_pattern, sample_pattern
- )
- return _bulk_match(name, group_patterns)
-
-
-def is_dataset(name):
- """Check if ``name`` matches a valid dataset name pattern in a
- :class:`SpecH5`.
-
- :param name: Full name of member
- :type name: str
-
- For example:
-
- - ``is_dataset("/1.2/instrument/positioners/xyz")`` returns ``True``
- because this name could be the key to the dataset recording motor
- positions for motor ``xyz`` in scan ``1.2``.
- - ``is_dataset("/123.456/instrument/")`` returns ``False`` because
- this name points to a group.
- - ``is_dataset("spam")`` returns ``False`` because :literal:`\"spam\"`
- is not at all a valid dataset name.
- """
- # Check groups first because /1.1/measurement/mca_0 could be interpreted
- # as a data column with label "mca_0"
- if measurement_mca_group_pattern.match(name):
- return False
-
- data_patterns = (
- header_pattern, title_pattern, start_time_pattern,
- file_header_data_pattern, scan_header_data_pattern,
- positioners_data_pattern, measurement_data_pattern,
- instrument_mca_data_pattern, instrument_mca_calib_pattern,
- instrument_mca_chann_pattern,
- instrument_mca_preset_t_pattern, instrument_mca_elapsed_t_pattern,
- instrument_mca_live_t_pattern,
- ub_matrix_pattern, unit_cell_pattern, unit_cell_abc_pattern, unit_cell_alphabetagamma_pattern
- )
- return _bulk_match(name, data_patterns)
-
-
-def is_link_to_group(name):
- """Check if ``name`` is a valid link to a group in a :class:`SpecH5`.
- Return ``True`` or ``False``
-
- :param name: Full name of member
- :type name: str
- """
- # so far we only have one type of link to a group
- if measurement_mca_info_pattern.match(name):
- return True
- return False
-
-
-def is_link_to_dataset(name):
- """Check if ``name`` is a valid link to a dataset in a :class:`SpecH5`.
- Return ``True`` or ``False``
-
- :param name: Full name of member
- :type name: str
- """
- list_of_link_patterns = (
- measurement_mca_data_pattern, measurement_mca_info_dataset_pattern,
- measurement_mca_info_data_pattern
- )
- return _bulk_match(name, list_of_link_patterns)
-
-
-def _get_attrs_dict(name):
- """Return attributes dictionary corresponding to the group or dataset
- pointed to by name.
-
- :param name: Full name/path to data or group
- :return: attributes dictionary
- """
- # Associate group and dataset patterns to their attributes
- pattern_attrs = {
- root_pattern:
- {"NX_class": "NXroot",
- },
- scan_pattern:
- {"NX_class": "NXentry", },
- title_pattern:
- {},
- start_time_pattern:
- {},
- instrument_pattern:
- {"NX_class": "NXinstrument", },
- specfile_group_pattern:
- {"NX_class": "NXcollection", },
- file_header_data_pattern:
- {},
- scan_header_data_pattern:
- {},
- positioners_group_pattern:
- {"NX_class": "NXcollection", },
- positioners_data_pattern:
- {},
- instrument_mca_group_pattern:
- {"NX_class": "NXdetector", },
- instrument_mca_data_pattern:
- {"interpretation": "spectrum", },
- instrument_mca_calib_pattern:
- {},
- instrument_mca_chann_pattern:
- {},
- instrument_mca_preset_t_pattern:
- {},
- instrument_mca_elapsed_t_pattern:
- {},
- instrument_mca_live_t_pattern:
- {},
- measurement_group_pattern:
- {"NX_class": "NXcollection", },
- measurement_data_pattern:
- {},
- measurement_mca_group_pattern:
- {},
- measurement_mca_data_pattern:
- {"interpretation": "spectrum", },
- measurement_mca_info_pattern:
- {"NX_class": "NXdetector", },
- measurement_mca_info_dataset_pattern:
- {},
- measurement_mca_info_data_pattern:
- {"interpretation": "spectrum"},
- sample_pattern:
- {"NX_class": "NXsample", },
- ub_matrix_pattern:
- {"interpretation": "scalar"},
- unit_cell_pattern:
- {"interpretation": "scalar"},
- unit_cell_abc_pattern:
- {"interpretation": "scalar"},
- unit_cell_alphabetagamma_pattern:
- {"interpretation": "scalar"},
- }
-
- for pattern in pattern_attrs:
- if pattern.match(name):
- return pattern_attrs[pattern]
-
- logger1.warning("%s not a known pattern, assigning empty dict to attrs",
- name)
- return {}
-
-
-def _get_scan_key_in_name(item_name):
- """
- :param item_name: Name of a group or dataset
- :return: Scan identification key (e.g. ``"1.1"``)
- :rtype: str on None
- """
- scan_match = re.match(r"/([0-9]+\.[0-9]+)", item_name)
- if not scan_match:
- return None
- return scan_match.group(1)
-
-
-def _get_mca_index_in_name(item_name):
- """
- :param item_name: Name of a group or dataset
- :return: MCA analyser index, ``None`` if item name does not reference
- a mca dataset
- :rtype: int or None
- """
- mca_match = re.match(r"/.*/mca_([0-9]+)[^0-9]*", item_name)
- if not mca_match:
- return None
- return int(mca_match.group(1))
-
-
-def _get_motor_in_name(item_name):
- """
- :param item_name: Name of a group or dataset
- :return: Motor name or ``None``
- :rtype: str on None
- """
- motor_match = positioners_data_pattern.match(item_name)
- if not motor_match:
- return None
- return motor_match.group(1)
-
-
-def _get_data_column_label_in_name(item_name):
- """
- :param item_name: Name of a group or dataset
- :return: Data column label or ``None``
- :rtype: str on None
- """
- # /1.1/measurement/mca_0 should not be interpreted as the label of a
- # data column (let's hope no-one ever uses mca_0 as a label)
- if measurement_mca_group_pattern.match(item_name):
- return None
- data_column_match = measurement_data_pattern.match(item_name)
- if not data_column_match:
- return None
- return data_column_match.group(1)
+integer_types = (int, long,) if sys.version_info[0] == 2 else (int,) # noqa
def _get_number_of_mca_analysers(scan):
"""
:param SpecFile sf: :class:`SpecFile` instance
- :param str scan_key: Scan identification key (e.g. ``1.1``)
"""
- number_of_MCA_spectra = len(scan.mca)
+ number_of_mca_spectra = len(scan.mca)
# Scan.data is transposed
number_of_data_lines = scan.data.shape[1]
if not number_of_data_lines == 0:
# Number of MCA spectra must be a multiple of number of data lines
- assert number_of_MCA_spectra % number_of_data_lines == 0
- return number_of_MCA_spectra // number_of_data_lines
- elif number_of_MCA_spectra:
+ assert number_of_mca_spectra % number_of_data_lines == 0
+ return number_of_mca_spectra // number_of_data_lines
+ elif number_of_mca_spectra:
# Case of a scan without data lines, only MCA.
# Our only option is to assume that the number of analysers
# is the number of #@CHANN lines
@@ -503,25 +223,6 @@ def _get_number_of_mca_analysers(scan):
return 0
-def _mca_analyser_in_scan(sf, scan_key, mca_analyser_index):
- """
- :param sf: :class:`SpecFile` instance
- :param scan_key: Scan identification key (e.g. ``1.1``)
- :param mca_analyser_index: 0-based index of MCA analyser
- :return: ``True`` if MCA analyser exists in Scan, else ``False``
- :raise: ``KeyError`` if scan_key not found in SpecFile
- :raise: ``AssertionError`` if number of MCA spectra is not a multiple
- of the number of data lines
- """
- if scan_key not in sf:
- raise KeyError("Scan key %s " % scan_key +
- "does not exist in SpecFile %s" % sf.filename)
-
- number_of_analysers = _get_number_of_mca_analysers(sf[scan_key])
-
- return 0 <= mca_analyser_index < number_of_analysers
-
-
def _motor_in_scan(sf, scan_key, motor_name):
"""
:param sf: :class:`SpecFile` instance
@@ -558,6 +259,41 @@ def _column_label_in_scan(sf, scan_key, column_label):
return ret
+def _parse_UB_matrix(header_line):
+ """Parse G3 header line and return UB matrix
+
+ :param str header_line: G3 header line
+ :return: UB matrix
+ """
+ return numpy.array(list(map(float, header_line.split()))).reshape((1, 3, 3))
+
+
+def _ub_matrix_in_scan(scan):
+ """Return True if scan header has a G3 line and all values are not 0.
+
+ :param scan: specfile.Scan instance
+ :return: True or False
+ """
+ if "G3" not in scan.scan_header_dict:
+ return False
+ return numpy.any(_parse_UB_matrix(scan.scan_header_dict["G3"]))
+
+
+def _parse_unit_cell(header_line):
+ return numpy.array(list(map(float, header_line.split()))[0:6]).reshape((1, 6))
+
+
+def _unit_cell_in_scan(scan):
+ """Return True if scan header has a G1 line and all values are not 0.
+
+ :param scan: specfile.Scan instance
+ :return: True or False
+ """
+ if "G1" not in scan.scan_header_dict:
+ return False
+ return numpy.any(_parse_unit_cell(scan.scan_header_dict["G1"]))
+
+
def _parse_ctime(ctime_lines, analyser_index=0):
"""
:param ctime_lines: e.g ``@CTIME %f %f %f``, first word ``@CTIME`` optional
@@ -670,965 +406,422 @@ def spec_date_to_iso8601(date, zone=None):
return full_date
-def _fixed_length_strings(strings, length=0):
- """Return list of fixed length strings, left-justified and right-padded
- with spaces.
+def _demultiplex_mca(scan, analyser_index):
+ """Return MCA data for a single analyser.
- :param strings: List of variable length strings
- :param length: Length of strings in returned list, defaults to the maximum
- length in the original list if set to 0.
- :type length: int or None
+ Each MCA spectrum is a 1D array. For each analyser, there is one
+ spectrum recorded per scan data line. When there are more than a single
+ MCA analyser in a scan, the data will be multiplexed. For instance if
+ there are 3 analysers, the consecutive spectra for the first analyser must
+ be accessed as ``mca[0], mca[3], mca[6]…``.
+
+ :param scan: :class:`Scan` instance containing the MCA data
+ :param analyser_index: 0-based index referencing the analyser
+ :type analyser_index: int
+ :return: 2D numpy array containing all spectra for one analyser
"""
- if length == 0 and strings:
- length = max(len(s) for s in strings)
- return [s.ljust(length) for s in strings]
+ number_of_analysers = _get_number_of_mca_analysers(scan)
+ number_of_spectra = len(scan.mca)
+ number_of_spectra_per_analyser = number_of_spectra // number_of_analysers
+ len_spectrum = len(scan.mca[analyser_index])
+ mca_array = numpy.empty((number_of_spectra_per_analyser, len_spectrum))
-class SpecH5Dataset(object):
- """Emulate :class:`h5py.Dataset` for a SpecFile object.
+ for i in range(number_of_spectra_per_analyser):
+ mca_array[i, :] = scan.mca[analyser_index + i * number_of_analysers]
- A :class:`SpecH5Dataset` instance is basically a proxy for the numpy
- array :attr:`value` attribute, with additional attributes for
- compatibility with *h5py* datasets.
+ return mca_array
- :param value: Actual dataset value
- :param name: Dataset full name (posix path format, starting with ``/``)
- :type name: str
- :param file_: Parent :class:`SpecH5`
- :param parent: Parent :class:`SpecH5Group` which contains this dataset
- """
- def __init__(self, value, name, file_, parent):
- object.__init__(self)
- self.value = None
- """Actual dataset, can be a *numpy array*, a *numpy.string_*,
- a *numpy.int_* or a *numpy.float32*
+# Node classes
+class SpecH5Dataset(object):
+ """This convenience class is to be inherited by all datasets, for
+ compatibility purpose with code that tests for
+ ``isinstance(obj, SpecH5Dataset)``.
+
+ This legacy behavior is deprecated. The correct way to test
+ if an object is a dataset is to use :meth:`silx.io.utils.is_dataset`.
+
+ Datasets must also inherit :class:`SpecH5NodeDataset` or
+ :class:`SpecH5LazyNodeDataset` which actually implement all the
+ API."""
+ pass
- All operations applied to an instance of the class use this."""
+class SpecH5NodeDataset(commonh5.Dataset, SpecH5Dataset):
+ """This class inherits :class:`commonh5.Dataset`, to which it adds
+ little extra functionality. The main additional functionality is the
+ proxy behavior that allows to mimic the numpy array stored in this
+ class.
+ """
+ def __init__(self, name, data, parent=None, attrs=None):
# get proper value types, to inherit from numpy
# attributes (dtype, shape, size)
- if isinstance(value, string_types):
+ if isinstance(data, string_types):
# use bytes for maximum compatibility
# (see http://docs.h5py.org/en/latest/strings.html)
- self.value = numpy.string_(value)
- elif isinstance(value, float):
+ value = numpy.string_(data)
+ elif isinstance(data, float):
# use 32 bits for float scalars
- self.value = numpy.float32(value)
- elif isinstance(value, int):
- self.value = numpy.int_(value)
+ value = numpy.float32(data)
+ elif isinstance(data, int):
+ value = numpy.int_(data)
else:
# Enforce numpy array
- array = numpy.array(value)
+ array = numpy.array(data)
data_kind = array.dtype.kind
if data_kind in ["S", "U"]:
- self.value = numpy.asarray(array, dtype=numpy.string_)
+ value = numpy.asarray(array, dtype=numpy.string_)
elif data_kind in ["f"]:
- self.value = numpy.asarray(array, dtype=numpy.float32)
+ value = numpy.asarray(array, dtype=numpy.float32)
else:
- self.value = array
-
- # numpy array attributes (more attributes handled in __getattribute__)
- self.shape = self.value.shape
- """Dataset shape, as a tuple with the length of each dimension
- of the dataset."""
-
- self.dtype = self.value.dtype
- """Dataset dtype"""
+ value = array
+ commonh5.Dataset.__init__(self, name, value, parent, attrs)
- self.size = self.value.size
- """Dataset size (number of elements)"""
-
- # h5py dataset specific attributes
- self.name = name
- """"Dataset name (posix path format, starting with ``/``)"""
-
- self.parent = parent
- """Parent :class:`SpecH5Group` object which contains this dataset"""
-
- self.file = file_
- """Parent :class:`SpecH5` object"""
-
- self.attrs = _get_attrs_dict(name)
- """Attributes dictionary"""
-
- self.compression = None
- """Compression attribute as provided by h5py.Dataset"""
-
- self.compression_opts = None
- """Compression options attribute as provided by h5py.Dataset"""
-
- self.chunks = None
-
- @property
- def h5py_class(self):
- """Return h5py class which is mimicked by this class:
- :class:`h5py.dataset`.
-
- Accessing this attribute if :mod:`h5py` is not installed causes
- an ``ImportError`` to be raised
+ def __getattr__(self, item):
+ """Proxy to underlying numpy array methods.
"""
- if h5py is None:
- raise ImportError("Cannot return h5py.Dataset class, " +
- "unable to import h5py module")
- return h5py.Dataset
-
- def __getattribute__(self, item):
- if item in ["value", "name", "parent", "file", "attrs",
- "shape", "dtype", "size", "h5py_class",
- "chunks", "compression", "compression_opts",
- "target"]:
- return object.__getattribute__(self, item)
-
- if hasattr(self.value, item):
- return getattr(self.value, item)
+ if hasattr(self[()], item):
+ return getattr(self[()], item)
raise AttributeError("SpecH5Dataset has no attribute %s" % item)
- def __len__(self):
- return len(self.value)
-
- def __getitem__(self, item):
- if not isinstance(self.value, numpy.ndarray):
- if item == Ellipsis:
- return numpy.array(self.value)
- elif item == tuple():
- return self.value
- else:
- raise ValueError("Scalar can only be reached with an ellipsis or an empty tuple")
- return self.value.__getitem__(item)
-
- def __getslice__(self, i, j):
- # deprecated but still in use for python 2.7
- return self.__getitem__(slice(i, j, None))
-
- def __iter__(self):
- return self.value.__iter__()
-
- def __dir__(self):
- attrs = set(dir(self.value) +
- ["value", "name", "parent", "file",
- "attrs", "shape", "dtype", "size",
- "h5py_class", "chunks", "compression",
- "compression_opts"])
- return sorted(attrs)
-
- # casting
- def __repr__(self):
- return '<SpecH5Dataset "%s": shape %s, type "%s">' % \
- (self.name, self.shape, self.dtype.str)
-
- def __float__(self):
- return float(self.value)
-
- def __int__(self):
- return int(self.value)
-
- def __str__(self):
- basename = self.name.split("/")[-1]
- return '<SPEC dataset "%s": shape %s, type "%s">' % \
- (basename, self.shape, self.dtype.str)
-
- def __bool__(self):
- if self.value:
- return True
- return False
-
- def __nonzero__(self):
- # python 2
- return self.__bool__()
-
- def __array__(self, dtype=None):
- if dtype is None:
- return numpy.array(self.value)
- else:
- return numpy.array(self.value, dtype=dtype)
-
- # comparisons
- def __eq__(self, other):
- if hasattr(other, "value"):
- return self.value == other.value
- else:
- return self.value == other
- def __ne__(self, other):
- if hasattr(other, "value"):
- return self.value != other.value
- else:
- return self.value != other
+class SpecH5LazyNodeDataset(commonh5.LazyLoadableDataset, SpecH5Dataset):
+ """This class inherits :class:`commonh5.LazyLoadableDataset`,
+ to which it adds a proxy behavior that allows to mimic the numpy
+ array stored in this class.
- def __lt__(self, other):
- if hasattr(other, "value"):
- return self.value < other.value
- else:
- return self.value < other
+ The class has to be inherited and the :meth:`_create_data` method has to be
+ implemented to return the numpy data exposed by the dataset. This factory
+ method is only called once, when the data is needed.
+ """
+ def __getattr__(self, item):
+ """Proxy to underlying numpy array methods.
+ """
+ if hasattr(self[()], item):
+ return getattr(self[()], item)
- def __le__(self, other):
- if hasattr(other, "value"):
- return self.value <= other.value
- else:
- return self.value <= other
+ raise AttributeError("SpecH5Dataset has no attribute %s" % item)
- def __gt__(self, other):
- if hasattr(other, "value"):
- return self.value > other.value
- else:
- return self.value > other
+ def _create_data(self):
+ """
+ Factory to create the data exposed by the dataset when it is needed.
- def __ge__(self, other):
- if hasattr(other, "value"):
- return self.value >= other.value
- else:
- return self.value >= other
+ It has to be implemented for the class to work.
- # operations
- def __add__(self, other):
- return self.value + other
+ :rtype: numpy.ndarray
+ """
+ raise NotImplementedError()
- def __radd__(self, other):
- return other + self.value
- def __sub__(self, other):
- return self.value - other
+class SpecH5Group(object):
+ """This convenience class is to be inherited by all groups, for
+ compatibility purposes with code that tests for
+ ``isinstance(obj, SpecH5Group)``.
- def __rsub__(self, other):
- return other - self.value
+ This legacy behavior is deprecated. The correct way to test
+ if an object is a group is to use :meth:`silx.io.utils.is_group`.
- def __mul__(self, other):
- return self.value * other
+ Groups must also inherit :class:`silx.io.commonh5.Group`, which
+ actually implements all the methods and attributes."""
+ pass
- def __rmul__(self, other):
- return other * self.value
- def __truediv__(self, other):
- return self.value / other
+class SpecH5(commonh5.File, SpecH5Group):
+ """This class opens a SPEC file and exposes it as a *h5py.File*.
- def __rtruediv__(self, other):
- return other / self.value
+ It inherits :class:`silx.io.commonh5.Group` (via :class:`commonh5.File`),
+ which implements most of its API.
+ """
- def __floordiv__(self, other):
- return self.value // other
+ def __init__(self, filename):
+ """
+ :param filename: Path to SpecFile in filesystem
+ :type filename: str
+ """
+ if isinstance(filename, io.IOBase):
+ # see https://github.com/silx-kit/silx/issues/858
+ filename = filename.name
- def __rfloordiv__(self, other):
- return other // self.value
+ self._sf = SpecFile(filename)
- # unary operations
- def __neg__(self):
- return -self.value
+ attrs = {"NX_class": "NXroot",
+ "file_time": datetime.datetime.now().isoformat(),
+ "file_name": filename,
+ "creator": "silx %s" % silx_version}
+ commonh5.File.__init__(self, filename, attrs=attrs)
+ assert self.attrs["NX_class"] == "NXroot"
- def __abs__(self):
- return abs(self.value)
+ for scan_key in self._sf.keys():
+ scan = self._sf[scan_key]
+ scan_group = ScanGroup(scan_key, parent=self, scan=scan)
+ self.add_node(scan_group)
+ def close(self):
+ # or del self._sf?
+ self._sf = None
-class SpecH5LinkToDataset(SpecH5Dataset):
- """Special :class:`SpecH5Dataset` representing a link to a dataset. It
- works like a regular dataset, but :meth:`SpecH5Group.visit`
- and :meth:`SpecH5Group.visititems` methods will recognize that it is
- a link and will ignore it.
- A special attribute contains the name of the target dataset:
- :attr:`target`
- """
- def __init__(self, value, name, file_, parent, target):
- """
- :param value: Actual dataset value
- :param name: Dataset full name (posix path format, starting with ``/``)
- :type name: str
- :param file_: Parent :class:`SpecH5`
- :param parent: Parent :class:`SpecH5Group` which contains this dataset
- :param str target: Name of linked dataset
+class ScanGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, scan_key, parent, scan):
"""
- SpecH5Dataset.__init__(self, value, name, file_, parent)
- self.target = target
- """Name of the target dataset"""
-
-def _dataset_builder(name, specfileh5, parent_group):
- """Retrieve dataset from :class:`SpecFile`, based on dataset name, as a
- subclass of :class:`numpy.ndarray`.
-
- :param name: Datatset full name (posix path format, starting with ``/``)
- :type name: str
- :param specfileh5: parent :class:`SpecH5` object
- :type specfileh5: :class:`SpecH5`
- :param parent_group: Parent :class:`SpecH5Group`
-
- :return: Array with the requested data
- :rtype: :class:`SpecH5Dataset`.
- """
- scan_key = _get_scan_key_in_name(name)
- scan = specfileh5._sf[scan_key]
-
- # get dataset in an array-like format (ndarray, str, list…)
- array_like = None
+ :param parent: parent Group
+ :param str scan_key: Scan key (e.g. "1.1")
+ :param scan: specfile.Scan object
+ """
+ commonh5.Group.__init__(self, scan_key, parent=parent,
+ attrs={"NX_class": "NXentry"})
- if title_pattern.match(name):
- array_like = scan.scan_header_dict["S"]
+ self.add_node(SpecH5NodeDataset(name="title",
+ data=scan.scan_header_dict["S"],
+ parent=self))
- elif start_time_pattern.match(name):
if "D" in scan.scan_header_dict:
try:
- array_like = spec_date_to_iso8601(scan.scan_header_dict["D"])
+ start_time_str = spec_date_to_iso8601(scan.scan_header_dict["D"])
except (IndexError, ValueError):
logger1.warn("Could not parse date format in scan %s header." +
" Using original date not converted to ISO-8601",
scan_key)
- array_like = scan.scan_header_dict["D"]
+ start_time_str = scan.scan_header_dict["D"]
elif "D" in scan.file_header_dict:
logger1.warn("No #D line in scan %s header. " +
"Using file header for start_time.",
scan_key)
try:
- array_like = spec_date_to_iso8601(scan.file_header_dict["D"])
+ start_time_str = spec_date_to_iso8601(scan.file_header_dict["D"])
except (IndexError, ValueError):
logger1.warn("Could not parse date format in scan %s header. " +
"Using original date not converted to ISO-8601",
scan_key)
- array_like = scan.file_header_dict["D"]
+ start_time_str = scan.file_header_dict["D"]
else:
logger1.warn("No #D line in %s header. Setting date to empty string.",
scan_key)
- array_like = ""
-
- elif file_header_data_pattern.match(name):
- # array_like = _fixed_length_strings(scan.file_header)
- array_like = "\n".join(scan.file_header)
-
- elif scan_header_data_pattern.match(name):
- # array_like = _fixed_length_strings(scan.scan_header)
- array_like = "\n".join(scan.scan_header)
-
- elif positioners_data_pattern.match(name):
- m = positioners_data_pattern.match(name)
- motor_name = m.group(1)
- if motor_name not in (scan.labels + scan.motor_names):
- if "%" in motor_name:
- motor_name = motor_name.replace("%", "/")
- # if a motor is recorded as a data column, ignore its position in
- # header and return the data column instead
- if motor_name in scan.labels and scan.data.shape[0] > 0:
- array_like = scan.data_column_by_name(motor_name)
- else:
- # may return float("inf") if #P line is missing from scan hdr
- array_like = scan.motor_position_by_name(motor_name)
-
- elif measurement_data_pattern.match(name):
- m = measurement_data_pattern.match(name)
- column_name = m.group(1)
- if column_name not in scan.labels:
- if "%" in column_name:
- column_name = column_name.replace("%", "/")
- array_like = scan.data_column_by_name(column_name)
-
- elif instrument_mca_data_pattern.match(name):
- m = instrument_mca_data_pattern.match(name)
-
- analyser_index = int(m.group(1))
- # retrieve 2D array of all MCA spectra from one analyser
- array_like = _demultiplex_mca(scan, analyser_index)
-
- elif instrument_mca_calib_pattern.match(name):
- m = instrument_mca_calib_pattern.match(name)
- analyser_index = int(m.group(1))
- if len(scan.mca.channels) == 1:
- # single @CALIB line applying to multiple devices
- analyser_index = 0
- array_like = scan.mca.calibration[analyser_index]
-
- elif instrument_mca_chann_pattern.match(name):
- m = instrument_mca_chann_pattern.match(name)
- analyser_index = int(m.group(1))
- if len(scan.mca.channels) == 1:
- # single @CHANN line applying to multiple devices
- analyser_index = 0
- array_like = scan.mca.channels[analyser_index]
-
- elif ub_matrix_pattern.match(name):
- if not "G3" in scan.scan_header_dict:
- raise KeyError("No UB matrix in a scan without a #G3 header line")
- array_like = numpy.array(
- list(map(float, scan.scan_header_dict["G3"].split()))).reshape((1, 3, 3))
- elif unit_cell_pattern.match(name):
- if not "G1" in scan.scan_header_dict:
- raise KeyError(
- "No unit_cell matrix in a scan without a #G1 header line")
- array_like = numpy.array(
- list(map(float, scan.scan_header_dict["G1"].split()))[0:6]).reshape((1, 6))
- elif unit_cell_abc_pattern.match(name):
- if not "G1" in scan.scan_header_dict:
- raise KeyError(
- "No unit_cell matrix in a scan without a #G1 header line")
- array_like = numpy.array(
- list(map(float, scan.scan_header_dict["G1"].split()))[0:3]).reshape((3,))
- elif unit_cell_alphabetagamma_pattern.match(name):
- if not "G1" in scan.scan_header_dict:
- raise KeyError(
- "No unit_cell matrix in a scan without a #G1 header line")
- array_like = numpy.array(
- list(map(float, scan.scan_header_dict["G1"].split()))[3:6]).reshape((3,))
- elif "CTIME" in scan.mca_header_dict and "mca_" in name:
- m = re.compile(r"/.*/mca_([0-9]+)/.*").match(name)
- analyser_index = int(m.group(1))
-
- ctime_line = scan.mca_header_dict['CTIME']
- (preset_time, live_time, elapsed_time) = _parse_ctime(ctime_line, analyser_index)
- if instrument_mca_preset_t_pattern.match(name):
- array_like = preset_time
- elif instrument_mca_live_t_pattern.match(name):
- array_like = live_time
- elif instrument_mca_elapsed_t_pattern.match(name):
- array_like = elapsed_time
-
- if array_like is None:
- raise KeyError("Name " + name + " does not match any known dataset.")
-
- return SpecH5Dataset(array_like, name,
- file_=specfileh5, parent=parent_group)
-
-
-def _link_to_dataset_builder(name, specfileh5, parent_group):
- """Same as :func:`_dataset_builder`, but returns a
- :class:`SpecH5LinkToDataset`
-
- :param name: Datatset full name (posix path format, starting with ``/``)
- :type name: str
- :param specfileh5: parent :class:`SpecH5` object
- :type specfileh5: :class:`SpecH5`
- :param parent_group: Parent :class:`SpecH5Group`
-
- :return: Array with the requested data
- :rtype: :class:`SpecH5LinkToDataset`.
- """
- scan_key = _get_scan_key_in_name(name)
- scan = specfileh5._sf[scan_key]
-
- # get dataset in an array-like format (ndarray, str, list…)
- array_like = None
-
- # /1.1/measurement/mca_0/data -> /1.1/instrument/mca_0/data
- if measurement_mca_data_pattern.match(name):
- m = measurement_mca_data_pattern.match(name)
- analyser_index = int(m.group(1))
- array_like = _demultiplex_mca(scan, analyser_index)
-
- # /1.1/measurement/mca_0/info/X -> /1.1/instrument/mca_0/X
- # X: calibration, channels, preset_time, live_time, elapsed_time
- elif measurement_mca_info_dataset_pattern.match(name):
- m = measurement_mca_info_dataset_pattern.match(name)
- analyser_index = int(m.group(1))
- mca_hdr_type = m.group(2)
-
- if mca_hdr_type == "calibration":
- if len(scan.mca.calibration) == 1:
- # single @CALIB line for multiple devices
- analyser_index = 0
- array_like = scan.mca.calibration[analyser_index]
-
- elif mca_hdr_type == "channels":
- if len(scan.mca.channels) == 1:
- # single @CHANN line for multiple devices
- analyser_index = 0
- array_like = scan.mca.channels[analyser_index]
-
- elif "CTIME" in scan.mca_header_dict:
- ctime_line = scan.mca_header_dict['CTIME']
- (preset_time, live_time, elapsed_time) = _parse_ctime(ctime_line,
- analyser_index)
- if mca_hdr_type == "preset_time":
- array_like = preset_time
- elif mca_hdr_type == "live_time":
- array_like = live_time
- elif mca_hdr_type == "elapsed_time":
- array_like = elapsed_time
-
- # /1.1/measurement/mca_0/info/data -> /1.1/instrument/mca_0/data
- elif measurement_mca_info_data_pattern.match(name):
- m = measurement_mca_info_data_pattern.match(name)
- analyser_index = int(m.group(1))
- array_like = _demultiplex_mca(scan, analyser_index)
-
- if array_like is None:
- raise KeyError("Name " + name + " does not match any known dataset.")
+ start_time_str = ""
+ self.add_node(SpecH5NodeDataset(name="start_time",
+ data=start_time_str,
+ parent=self))
- target = name.replace("measurement", "instrument")
- target = target.replace("info/", "")
+ self.add_node(InstrumentGroup(parent=self, scan=scan))
+ self.add_node(MeasurementGroup(parent=self, scan=scan))
+ if _unit_cell_in_scan(scan) or _ub_matrix_in_scan(scan):
+ self.add_node(SampleGroup(parent=self, scan=scan))
- return SpecH5LinkToDataset(array_like, name,
- file_=specfileh5, parent=parent_group,
- target=target)
-
-def _demultiplex_mca(scan, analyser_index):
- """Return MCA data for a single analyser.
-
- Each MCA spectrum is a 1D array. For each analyser, there is one
- spectrum recorded per scan data line. When there are more than a single
- MCA analyser in a scan, the data will be multiplexed. For instance if
- there are 3 analysers, the consecutive spectra for the first analyser must
- be accessed as ``mca[0], mca[3], mca[6]…``.
-
- :param scan: :class:`Scan` instance containing the MCA data
- :param analyser_index: 0-based index referencing the analyser
- :type analyser_index: int
- :return: 2D numpy array containing all spectra for one analyser
- """
- number_of_analysers = _get_number_of_mca_analysers(scan)
-
- number_of_MCA_spectra = len(scan.mca)
-
- list_of_1D_arrays = []
- for i in range(analyser_index,
- number_of_MCA_spectra,
- number_of_analysers):
- list_of_1D_arrays.append(scan.mca[i])
- # convert list to 2D array
- return numpy.array(list_of_1D_arrays)
-
-
-class SpecH5Group(object):
- """Emulate :class:`h5py.Group` for a SpecFile object
-
- :param name: Group full name (posix path format, starting with ``/``)
- :type name: str
- :param specfileh5: parent :class:`SpecH5` instance
-
- """
- def __init__(self, name, specfileh5):
- self.name = name
- """Full name/path of group"""
-
- self.file = specfileh5
- """Parent SpecH5 object"""
-
- self.attrs = _get_attrs_dict(name)
- """Attributes dictionary"""
-
- if name != "/":
- if name not in specfileh5:
- raise KeyError("File %s does not contain group %s" %
- (specfileh5, name))
- scan_key = _get_scan_key_in_name(name)
- self._scan = self.file._sf[scan_key]
-
- @property
- def h5py_class(self):
- """Return h5py class which is mimicked by this class:
- :class:`h5py.Group`.
-
- Accessing this attribute if :mod:`h5py` is not installed causes
- an ``ImportError`` to be raised
- """
- if h5py is None:
- raise ImportError("Cannot return h5py.Group class, " +
- "unable to import h5py module")
- return h5py.Group
-
- @property
- def parent(self):
- """Parent group (group that contains this group)"""
- if not self.name.strip("/"):
- return None
-
- parent_name = posixpath.dirname(self.name.rstrip("/"))
- return SpecH5Group(parent_name, self.file)
-
- def __contains__(self, key):
+class InstrumentGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
"""
- :param key: Path to child element (e.g. ``"mca_0/info"``) or full name
- of group or dataset (e.g. ``"/2.1/instrument/positioners"``)
- :return: True if key refers to a valid member of this group,
- else False
- """
- # Absolute path to an item outside this group
- if key.startswith("/"):
- if not key.startswith(self.name):
- return False
- # Make sure key is an absolute path by prepending this group's name
- else:
- key = self.name.rstrip("/") + "/" + key
-
- # key not matching any known pattern
- if not is_group(key) and not is_dataset(key) and\
- not is_link_to_group(key) and not is_link_to_dataset(key):
- return False
-
- # nonexistent scan in specfile
- scan_key = _get_scan_key_in_name(key)
- if scan_key not in self.file._sf:
- return False
-
- # nonexistent MCA analyser in scan
- mca_analyser_index = _get_mca_index_in_name(key)
- if mca_analyser_index is not None:
- if not _mca_analyser_in_scan(self.file._sf,
- scan_key,
- mca_analyser_index):
- return False
-
- # nonexistent motor name
- motor_name = _get_motor_in_name(key)
- if motor_name is not None:
- if not _motor_in_scan(self.file._sf,
- scan_key,
- motor_name):
- return False
-
- # nonexistent data column
- column_label = _get_data_column_label_in_name(key)
- if column_label is not None:
- if not _column_label_in_scan(self.file._sf,
- scan_key,
- column_label):
- return False
-
- if key.endswith("preset_time") or\
- key.endswith("elapsed_time") or\
- key.endswith("live_time"):
- return "CTIME" in self.file._sf[scan_key].mca_header_dict
-
- if sample_pattern.match(key):
- return ("G3" in self.file._sf[scan_key].scan_header_dict or
- "G1" in self.file._sf[scan_key].scan_header_dict)
-
- if key.endswith("sample/ub_matrix"):
- return "G3" in self.file._sf[scan_key].scan_header_dict
-
- if key.endswith("sample/unit_cell"):
- return "G1" in self.file._sf[scan_key].scan_header_dict
-
- if key.endswith("sample/unit_cell_abc"):
- return "G1" in self.file._sf[scan_key].scan_header_dict
-
-
- if key.endswith("sample/unit_cell_alphabetagamma"):
- return "G1" in self.file._sf[scan_key].scan_header_dict
-
- # header, title, start_time, existing scan/mca/motor/measurement
- return True
-
- def __eq__(self, other):
- return (isinstance(other, SpecH5Group) and
- self.name == other.name and
- self.file.filename == other.file.filename and
- self.keys() == other.keys())
-
- def get(self, name, default=None, getclass=False, getlink=False):
- """Retrieve an item by name, or a default value if name does not
- point to an existing item.
-
- :param name str: name of the item
- :param default: Default value returned if the name is not found
- :param bool getclass: if *True*, the returned object is the class of
- the item, instead of the item instance.
- :param bool getlink: Not implemented. This method always returns
- an instance of the original class of the requested item (or
- just the class, if *getclass* is *True*)
- :return: The requested item, or its class if *getclass* is *True*,
- or the specified *default* value if the group does not contain
- an item with the requested name.
- """
- if name not in self:
- return default
-
- if getlink and getclass:
- pass
-
- if getclass:
- return self[name].h5py_class
-
- return self[name]
-
- def __getitem__(self, key):
- """Return a :class:`SpecH5Group` or a :class:`SpecH5Dataset`
- if ``key`` is a valid name of a group or dataset.
-
- ``key`` can be a member of ``self.keys()``, i.e. an immediate child of
- the group, or a path reaching into subgroups (e.g.
- ``"instrument/positioners"``)
-
- In the special case were this group is the root group, ``key`` can
- start with a ``/`` character.
- :param key: Name of member
- :type key: str
- :raise: KeyError if ``key`` is not a known member of this group.
+ :param parent: parent Group
+ :param scan: specfile.Scan object
"""
- # accept numbers for scan indices
- if isinstance(key, int):
- number = self.file._sf.number(key)
- order = self.file._sf.order(key)
- full_key = "/%d.%d" % (number, order)
- # Relative path starting from this group (e.g "mca_0/info")
- elif not key.startswith("/"):
- full_key = self.name.rstrip("/") + "/" + key
- # Absolute path called from the root group or from a parent group
- elif key.startswith(self.name):
- full_key = key
- # Absolute path to an element called from a non-parent group
- else:
- raise KeyError(key + " is not a child of " + self.__repr__())
-
- if is_group(full_key):
- return SpecH5Group(full_key, self.file)
- elif is_dataset(full_key):
- return _dataset_builder(full_key, self.file, self)
- elif is_link_to_group(full_key):
- link_target = full_key.replace("measurement", "instrument").rstrip("/")[:-4]
- return SpecH5LinkToGroup(full_key, self.file, link_target)
- elif is_link_to_dataset(full_key):
- return _link_to_dataset_builder(full_key, self.file, self)
- else:
- raise KeyError("unrecognized group or dataset: " + full_key)
+ commonh5.Group.__init__(self, name="instrument", parent=parent,
+ attrs={"NX_class": "NXinstrument"})
+
+ self.add_node(InstrumentSpecfileGroup(parent=self, scan=scan))
+ self.add_node(PositionersGroup(parent=self, scan=scan))
+
+ num_analysers = _get_number_of_mca_analysers(scan)
+ for anal_idx in range(num_analysers):
+ self.add_node(InstrumentMcaGroup(parent=self,
+ analyser_index=anal_idx,
+ scan=scan))
+
+
+class InstrumentSpecfileGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
+ commonh5.Group.__init__(self, name="specfile", parent=parent,
+ attrs={"NX_class": "NXcollection"})
+ self.add_node(SpecH5NodeDataset(name="file_header",
+ data="\n".join(scan.file_header),
+ parent=self,
+ attrs={}))
+ self.add_node(SpecH5NodeDataset(name="scan_header",
+ data="\n".join(scan.scan_header),
+ parent=self,
+ attrs={}))
+
+
+class PositionersGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
+ commonh5.Group.__init__(self, name="positioners", parent=parent,
+ attrs={"NX_class": "NXcollection"})
+ for motor_name in scan.motor_names:
+ safe_motor_name = motor_name.replace("/", "%")
+ if motor_name in scan.labels and scan.data.shape[0] > 0:
+ # return a data column if one has the same label as the motor
+ motor_value = scan.data_column_by_name(motor_name)
+ else:
+ # Take value from #P scan header.
+ # (may return float("inf") if #P line is missing from scan hdr)
+ motor_value = scan.motor_position_by_name(motor_name)
+ self.add_node(SpecH5NodeDataset(name=safe_motor_name,
+ data=motor_value,
+ parent=self))
- def __iter__(self):
- for key in self.keys():
- yield key
- def items(self):
- for key in self.keys():
- yield key, self[key]
+class InstrumentMcaGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, analyser_index, scan):
+ name = "mca_%d" % analyser_index
+ commonh5.Group.__init__(self, name=name, parent=parent,
+ attrs={"NX_class": "NXdetector"})
- def __len__(self):
- """Return number of members, subgroups and datasets, attached to this
- group.
- """
- return len(self.keys())
+ self.add_node(McaDataDataset(parent=self,
+ analyser_index=analyser_index,
+ scan=scan))
- def __repr__(self):
- return '<SpecH5Group "%s" (%d members)>' % (self.name, len(self))
-
- def keys(self):
- """:return: List of all names of members attached to this group
- """
- # keys in hdf5 are unicode
- if self.name == "/":
- return self.file.keys()
-
- if scan_pattern.match(self.name):
- ret = static_items["scan"]
- if "G1" in self._scan.scan_header_dict or "G3" in self._scan.scan_header_dict:
- return ret + [u"sample"]
- return ret
-
- if positioners_group_pattern.match(self.name):
- motor_names = self._scan.motor_names
- return [name.replace("/", "%") for name in motor_names]
-
- if specfile_group_pattern.match(self.name):
- return static_items["scan/instrument/specfile"]
-
- if measurement_mca_group_pattern.match(self.name):
- return static_items["scan/measurement/mca"]
-
- if instrument_mca_group_pattern.match(self.name):
- ret = static_items["scan/instrument/mca"]
- if "CTIME" in self._scan.mca_header_dict:
- return ret + [u"preset_time", u"elapsed_time", u"live_time"]
- return ret
-
- if sample_pattern.match(self.name):
- ret = []
- if "G1" in self._scan.scan_header_dict:
- ret.append(u"unit_cell")
- ret.append(u"unit_cell_abc")
- ret.append(u"unit_cell_alphabetagamma")
- if "G3" in self._scan.scan_header_dict:
- ret.append(u"ub_matrix")
- return ret
-
- number_of_MCA_spectra = len(self._scan.mca)
- number_of_data_lines = self._scan.data.shape[1]
-
- if not number_of_data_lines == 0:
- # Number of MCA spectra must be a multiple of number of data lines
- assert number_of_MCA_spectra % number_of_data_lines == 0
- number_of_MCA_analysers = number_of_MCA_spectra // number_of_data_lines
- elif number_of_MCA_spectra:
- # Case of a scan without data lines, only MCA.
- # Our only option is to assume that the number of analysers
- # is the number of #@CHANN lines
- number_of_MCA_analysers = len(self._scan.mca.channels)
+ if len(scan.mca.channels) == 1:
+ # single @CALIB line applying to multiple devices
+ calibration_dataset = scan.mca.calibration[0]
+ channels_dataset = scan.mca.channels[0]
else:
- number_of_MCA_analysers = 0
-
- mca_list = ["mca_%d" % i for i in range(number_of_MCA_analysers)]
-
- if measurement_group_pattern.match(self.name):
- scan_labels = self._scan.labels
- return [label.replace("/", "%") for label in scan_labels] + mca_list
-
- if instrument_pattern.match(self.name):
- return static_items["scan/instrument"] + mca_list
-
- def visit(self, func, follow_links=False):
- """Recursively visit all names in this group and subgroups.
+ calibration_dataset = scan.mca.calibration[analyser_index]
+ channels_dataset = scan.mca.channels[analyser_index]
+ self.add_node(SpecH5NodeDataset(name="calibration",
+ data=calibration_dataset,
+ parent=self))
+ self.add_node(SpecH5NodeDataset(name="channels",
+ data=channels_dataset,
+ parent=self))
+
+ if "CTIME" in scan.mca_header_dict:
+ ctime_line = scan.mca_header_dict['CTIME']
+ preset_time, live_time, elapsed_time = _parse_ctime(ctime_line, analyser_index)
+ self.add_node(SpecH5NodeDataset(name="preset_time",
+ data=preset_time,
+ parent=self))
+ self.add_node(SpecH5NodeDataset(name="live_time",
+ data=live_time,
+ parent=self))
+ self.add_node(SpecH5NodeDataset(name="elapsed_time",
+ data=elapsed_time,
+ parent=self))
+
+
+class McaDataDataset(SpecH5LazyNodeDataset):
+ """Lazy loadable dataset for MCA data"""
+ def __init__(self, parent, analyser_index, scan):
+ commonh5.LazyLoadableDataset.__init__(
+ self, name="data", parent=parent,
+ attrs={"interpretation": "spectrum", })
+ self._scan = scan
+ self._analyser_index = analyser_index
+ self._shape = None
+ self._num_analysers = _get_number_of_mca_analysers(self._scan)
+
+ def _create_data(self):
+ return _demultiplex_mca(self._scan, self._analyser_index)
- :param func: Callable (function, method or callable object)
- :type func: function
+ @property
+ def shape(self):
+ if self._shape is None:
+ num_spectra_in_file = len(self._scan.mca)
+ num_spectra_per_analyser = num_spectra_in_file // self._num_analysers
+ len_spectrum = len(self._scan.mca[self._analyser_index])
+ self._shape = num_spectra_per_analyser, len_spectrum
+ return self._shape
- You supply a callable (function, method or callable object); it
- will be called exactly once for each link in this group and every
- group below it. Your callable must conform to the signature:
+ @property
+ def size(self):
+ return numpy.prod(self.shape, dtype=numpy.intp)
- ``func(<member name>) => <None or return value>``
+ @property
+ def dtype(self):
+ # we initialize the data with numpy.empty() without specifying a dtype
+ # in _demultiplex_mca()
+ return numpy.empty((1, )).dtype
- Returning ``None`` continues iteration, returning anything else stops
- and immediately returns that value from the visit method. No
- particular order of iteration within groups is guaranteed.
+ def __len__(self):
+ return self.shape[0]
- Example:
+ def __getitem__(self, item):
+ # optimization for fetching a single spectrum if data not already loaded
+ if not self._is_initialized:
+ if isinstance(item, integer_types):
+ if item < 0:
+ # negative indexing
+ item += len(self)
+ return self._scan.mca[self._analyser_index +
+ item * self._num_analysers]
+ # accessing a slice or element of a single spectrum [i, j:k]
+ try:
+ spectrum_idx, channel_idx_or_slice = item
+ assert isinstance(spectrum_idx, integer_types)
+ except (ValueError, TypeError, AssertionError):
+ pass
+ else:
+ if spectrum_idx < 0:
+ item += len(self)
+ idx = self._analyser_index + spectrum_idx * self._num_analysers
+ return self._scan.mca[idx][channel_idx_or_slice]
- .. code-block:: python
+ return super(McaDataDataset, self).__getitem__(item)
- # Get a list of all contents (groups and datasets) in a SpecFile
- mylist = []
- f = File('foo.dat')
- f.visit(mylist.append)
- """
- for member_name in self.keys():
- member = self[member_name]
- ret = None
- if (not is_link_to_dataset(member.name) and
- not is_link_to_group(member.name)) or follow_links:
- ret = func(member.name)
- if ret is not None:
- return ret
- # recurse into subgroups
- if isinstance(member, SpecH5Group):
- if not isinstance(member, SpecH5LinkToGroup) or follow_links:
- self[member_name].visit(func, follow_links)
-
- def visititems(self, func, follow_links=False):
- """Recursively visit names and objects in this group.
-
- :param func: Callable (function, method or callable object)
- :type func: function
-
- You supply a callable (function, method or callable object); it
- will be called exactly once for each
- member in this group and every group below it. Your callable must
- conform to the signature:
-
- ``func(<member name>, <object>) => <None or return value>``
-
- Returning ``None`` continues iteration, returning anything else stops
- and immediately returns that value from the visit method. No
- particular order of iteration within groups is guaranteed.
-
- Example:
-
- .. code-block:: python
-
- # Get a list of all datasets in a specific scan
- mylist = []
- def func(name, obj):
- if isinstance(obj, SpecH5Dataset):
- mylist.append(name)
-
- f = File('foo.dat')
- f["1.1"].visititems(func)
- """
- for member_name in self.keys():
- member = self[member_name]
- ret = None
- if (not is_link_to_dataset(member.name) and
- not is_link_to_group(member.name)) or follow_links:
- ret = func(member.name, member)
- if ret is not None:
- return ret
- # recurse into subgroups
- if isinstance(self[member_name], SpecH5Group):
- if not isinstance(self[member_name], SpecH5LinkToGroup) or follow_links:
- self[member_name].visititems(func, follow_links)
-
-
-class SpecH5LinkToGroup(SpecH5Group):
- """Special :class:`SpecH5Group` representing a link to a group.
-
- It works like a regular group but :meth:`SpecH5Group.visit`
- and :meth:`SpecH5Group.visititems` methods will recognize it as a
- link and will ignore it.
-
- An additional attribute indicates the name of the target group:
- :attr:`target`
- """
- def __init__(self, name, specfileh5, target):
- SpecH5Group.__init__(self, name, specfileh5)
- self.target = target
- """Name of the target group."""
- def keys(self):
- """:return: List of all names of members attached to the target group
+class MeasurementGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
"""
- # we only have a single type of link to a group:
- # /1.1/measurement/mca_0/info/ -> /1.1/instrument/mca_0/
- if measurement_mca_info_pattern.match(self.name):
- # link_target = self.name.replace("measurement", "instrument").rstrip("/")[:-4]
- # return SpecH5Group(link_target, self.file).keys()
- return SpecH5Group(self.target, self.file).keys()
- else:
- raise NameError("Unknown link to SpecH5Group: "
- "%s -> %s" % (self.name, self.target))
-
-
-class SpecH5(SpecH5Group):
- """Special :class:`SpecH5Group` representing the root of a SpecFile.
-
- :param filename: Path to SpecFile in filesystem
- :type filename: str
-
- In addition to all generic :class:`SpecH5Group` attributes, this class
- also keeps a reference to the original :class:`SpecFile` object and
- has a :attr:`filename` attribute.
- Its immediate children are scans, but it also gives access to any group
- or dataset in the entire SpecFile tree by specifying the full path.
- """
- def __init__(self, filename):
- self.filename = filename
- self.attrs = _get_attrs_dict("/")
- self._sf = SpecFile(filename)
-
- SpecH5Group.__init__(self, name="/", specfileh5=self)
- if len(self) == 0:
- # SpecFile library do not raise exception for non specfiles
- raise IOError("Empty specfile. Not a valid spec format.")
-
- def keys(self):
+ :param parent: parent Group
+ :param scan: specfile.Scan object
"""
- :return: List of all scan keys in this SpecFile
- (e.g. ``["1.1", "2.1"…]``)
+ commonh5.Group.__init__(self, name="measurement", parent=parent,
+ attrs={"NX_class": "NXcollection", })
+ for label in scan.labels:
+ safe_label = label.replace("/", "%")
+ self.add_node(SpecH5NodeDataset(name=safe_label,
+ data=scan.data_column_by_name(label),
+ parent=self))
+
+ num_analysers = _get_number_of_mca_analysers(scan)
+ for anal_idx in range(num_analysers):
+ self.add_node(MeasurementMcaGroup(parent=self, analyser_index=anal_idx))
+
+
+class MeasurementMcaGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, analyser_index):
+ basename = "mca_%d" % analyser_index
+ commonh5.Group.__init__(self, name=basename, parent=parent,
+ attrs={})
+
+ target_name = self.name.replace("measurement", "instrument")
+ self.add_node(commonh5.SoftLink(name="data",
+ path=target_name + "/data",
+ parent=self))
+ self.add_node(commonh5.SoftLink(name="info",
+ path=target_name,
+ parent=self))
+
+
+class SampleGroup(commonh5.Group, SpecH5Group):
+ def __init__(self, parent, scan):
"""
- return self._sf.keys()
-
- def __enter__(self):
- """Context manager enter"""
- return self
-
- def __exit__(self, type, value, tb): # pylint: disable=W0622
- """Context manager exit"""
- self.close()
- def close(self):
- """Close the object, and free up associated resources.
-
- After calling this method, attempts to use the object may fail.
+ :param parent: parent Group
+ :param scan: specfile.Scan object
"""
- self._sf = None
-
- def __repr__(self):
- return '<SpecH5 "%s" (%d members)>' % (self.filename, len(self))
-
- def __eq__(self, other):
- return (isinstance(other, SpecH5) and
- self.filename == other.filename and
- self.keys() == other.keys())
-
- @property
- def h5py_class(self):
- """h5py class which is mimicked by this class"""
- if h5py is None:
- raise ImportError("Cannot return h5py.File class, " +
- "unable to import h5py module")
- return h5py.File
+ commonh5.Group.__init__(self, name="sample", parent=parent,
+ attrs={"NX_class": "NXsample", })
+
+ if _unit_cell_in_scan(scan):
+ self.add_node(SpecH5NodeDataset(name="unit_cell",
+ data=_parse_unit_cell(scan.scan_header_dict["G1"]),
+ parent=self,
+ attrs={"interpretation": "scalar"}))
+ self.add_node(SpecH5NodeDataset(name="unit_cell_abc",
+ data=_parse_unit_cell(scan.scan_header_dict["G1"])[0, 0:3],
+ parent=self,
+ attrs={"interpretation": "scalar"}))
+ self.add_node(SpecH5NodeDataset(name="unit_cell_alphabetagamma",
+ data=_parse_unit_cell(scan.scan_header_dict["G1"])[0, 3:6],
+ parent=self,
+ attrs={"interpretation": "scalar"}))
+ if _ub_matrix_in_scan(scan):
+ self.add_node(SpecH5NodeDataset(name="ub_matrix",
+ data=_parse_UB_matrix(scan.scan_header_dict["G3"]),
+ parent=self,
+ attrs={"interpretation": "scalar"}))