diff options
Diffstat (limited to 'src/silx/io/fabioh5.py')
-rwxr-xr-x | src/silx/io/fabioh5.py | 1050 |
1 files changed, 1050 insertions, 0 deletions
diff --git a/src/silx/io/fabioh5.py b/src/silx/io/fabioh5.py new file mode 100755 index 0000000..af9b29a --- /dev/null +++ b/src/silx/io/fabioh5.py @@ -0,0 +1,1050 @@ +# coding: utf-8 +# /*########################################################################## +# Copyright (C) 2016-2021 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""This module provides functions to read fabio images as an HDF5 file. + + >>> import silx.io.fabioh5 + >>> f = silx.io.fabioh5.File("foobar.edf") + +.. note:: This module has a dependency on the `h5py <http://www.h5py.org/>`_ + and `fabio <https://github.com/silx-kit/fabio>`_ libraries, + which are not mandatory dependencies for `silx`. + +""" + +import collections +import datetime +import logging +import numbers +import os + +import fabio.file_series +import numpy + +from . import commonh5 +from silx import version as silx_version +import silx.utils.number +import h5py + + +_logger = logging.getLogger(__name__) + + +_fabio_extensions = set([]) + + +def supported_extensions(): + """Returns all extensions supported by fabio. + + :returns: A set containing extensions like "*.edf". + :rtype: Set[str] + """ + global _fabio_extensions + if len(_fabio_extensions) > 0: + return _fabio_extensions + + formats = fabio.fabioformats.get_classes(reader=True) + all_extensions = set([]) + + for reader in formats: + if not hasattr(reader, "DEFAULT_EXTENSIONS"): + continue + + ext = reader.DEFAULT_EXTENSIONS + ext = ["*.%s" % e for e in ext] + all_extensions.update(ext) + + _fabio_extensions = set(all_extensions) + return _fabio_extensions + + +class _FileSeries(fabio.file_series.file_series): + """ + .. note:: Overwrite a function to fix an issue in fabio. + """ + def jump(self, num): + """ + Goto a position in sequence + """ + assert num < len(self) and num >= 0, "num out of range" + self._current = num + return self[self._current] + + +class FrameData(commonh5.LazyLoadableDataset): + """Expose a cube of image from a Fabio file using `FabioReader` as + cache.""" + + def __init__(self, name, fabio_reader, parent=None): + if fabio_reader.is_spectrum(): + attrs = {"interpretation": "spectrum"} + else: + attrs = {"interpretation": "image"} + commonh5.LazyLoadableDataset.__init__(self, name, parent, attrs=attrs) + self.__fabio_reader = fabio_reader + self._shape = None + self._dtype = None + + def _create_data(self): + return self.__fabio_reader.get_data() + + def _update_cache(self): + if isinstance(self.__fabio_reader.fabio_file(), + fabio.file_series.file_series): + # Reading all the files is taking too much time + # Reach the information from the only first frame + first_image = self.__fabio_reader.fabio_file().first_image() + self._dtype = first_image.data.dtype + shape0 = self.__fabio_reader.frame_count() + shape1, shape2 = first_image.data.shape + self._shape = shape0, shape1, shape2 + else: + self._dtype = super(commonh5.LazyLoadableDataset, self).dtype + self._shape = super(commonh5.LazyLoadableDataset, self).shape + + @property + def dtype(self): + if self._dtype is None: + self._update_cache() + return self._dtype + + @property + def shape(self): + if self._shape is None: + self._update_cache() + return self._shape + + def __iter__(self): + for frame in self.__fabio_reader.iter_frames(): + yield frame.data + + def __getitem__(self, item): + # optimization for fetching a single frame if data not already loaded + if not self._is_initialized: + if isinstance(item, int) and \ + isinstance(self.__fabio_reader.fabio_file(), + fabio.file_series.file_series): + if item < 0: + # negative indexing + item += len(self) + return self.__fabio_reader.fabio_file().jump_image(item).data + return super(FrameData, self).__getitem__(item) + + +class RawHeaderData(commonh5.LazyLoadableDataset): + """Lazy loadable raw header""" + + def __init__(self, name, fabio_reader, parent=None): + commonh5.LazyLoadableDataset.__init__(self, name, parent) + self.__fabio_reader = fabio_reader + + def _create_data(self): + """Initialize hold data by merging all headers of each frames. + """ + headers = [] + types = set([]) + for fabio_frame in self.__fabio_reader.iter_frames(): + header = fabio_frame.header + + data = [] + for key, value in header.items(): + data.append("%s: %s" % (str(key), str(value))) + + data = "\n".join(data) + try: + line = data.encode("ascii") + types.add(numpy.string_) + except UnicodeEncodeError: + try: + line = data.encode("utf-8") + types.add(numpy.unicode_) + except UnicodeEncodeError: + # Fallback in void + line = numpy.void(data) + types.add(numpy.void) + + headers.append(line) + + if numpy.void in types: + dtype = numpy.void + elif numpy.unicode_ in types: + dtype = numpy.unicode_ + else: + dtype = numpy.string_ + + if dtype == numpy.unicode_: + # h5py only support vlen unicode + dtype = h5py.special_dtype(vlen=str) + + return numpy.array(headers, dtype=dtype) + + +class MetadataGroup(commonh5.LazyLoadableGroup): + """Abstract class for groups containing a reference to a fabio image. + """ + + def __init__(self, name, metadata_reader, kind, parent=None, attrs=None): + commonh5.LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__metadata_reader = metadata_reader + self.__kind = kind + + def _create_child(self): + keys = self.__metadata_reader.get_keys(self.__kind) + for name in keys: + data = self.__metadata_reader.get_value(self.__kind, name) + dataset = commonh5.Dataset(name, data) + self.add_node(dataset) + + @property + def _metadata_reader(self): + return self.__metadata_reader + + +class DetectorGroup(commonh5.LazyLoadableGroup): + """Define the detector group (sub group of instrument) using Fabio data. + """ + + def __init__(self, name, fabio_reader, parent=None, attrs=None): + if attrs is None: + attrs = {"NX_class": "NXdetector"} + commonh5.LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__fabio_reader = fabio_reader + + def _create_child(self): + data = FrameData("data", self.__fabio_reader) + self.add_node(data) + + # TODO we should add here Nexus informations we can extract from the + # metadata + + others = MetadataGroup("others", self.__fabio_reader, kind=FabioReader.DEFAULT) + self.add_node(others) + + +class ImageGroup(commonh5.LazyLoadableGroup): + """Define the image group (sub group of measurement) using Fabio data. + """ + + def __init__(self, name, fabio_reader, parent=None, attrs=None): + commonh5.LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__fabio_reader = fabio_reader + + def _create_child(self): + basepath = self.parent.parent.name + data = commonh5.SoftLink("data", path=basepath + "/instrument/detector_0/data") + self.add_node(data) + detector = commonh5.SoftLink("info", path=basepath + "/instrument/detector_0") + self.add_node(detector) + + +class NxDataPreviewGroup(commonh5.LazyLoadableGroup): + """Define the NxData group which is used as the default NXdata to show the + content of the file. + """ + + def __init__(self, name, fabio_reader, parent=None): + if fabio_reader.is_spectrum(): + interpretation = "spectrum" + else: + interpretation = "image" + attrs = { + "NX_class": "NXdata", + "interpretation": interpretation, + "signal": "data", + } + commonh5.LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__fabio_reader = fabio_reader + + def _create_child(self): + basepath = self.parent.name + data = commonh5.SoftLink("data", path=basepath + "/instrument/detector_0/data") + self.add_node(data) + + +class SampleGroup(commonh5.LazyLoadableGroup): + """Define the image group (sub group of measurement) using Fabio data. + """ + + def __init__(self, name, fabio_reader, parent=None): + attrs = {"NXclass": "NXsample"} + commonh5.LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__fabio_reader = fabio_reader + + def _create_child(self): + if self.__fabio_reader.has_ub_matrix(): + scalar = {"interpretation": "scalar"} + data = self.__fabio_reader.get_unit_cell_abc() + data = commonh5.Dataset("unit_cell_abc", data, attrs=scalar) + self.add_node(data) + unit_cell_data = numpy.zeros((1, 6), numpy.float32) + unit_cell_data[0, :3] = data + data = self.__fabio_reader.get_unit_cell_alphabetagamma() + data = commonh5.Dataset("unit_cell_alphabetagamma", data, attrs=scalar) + self.add_node(data) + unit_cell_data[0, 3:] = data + data = commonh5.Dataset("unit_cell", unit_cell_data, attrs=scalar) + self.add_node(data) + data = self.__fabio_reader.get_ub_matrix() + data = commonh5.Dataset("ub_matrix", data, attrs=scalar) + self.add_node(data) + + +class MeasurementGroup(commonh5.LazyLoadableGroup): + """Define the measurement group for fabio file. + """ + + def __init__(self, name, fabio_reader, parent=None, attrs=None): + commonh5.LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__fabio_reader = fabio_reader + + def _create_child(self): + keys = self.__fabio_reader.get_keys(FabioReader.COUNTER) + + # create image measurement but take care that no other metadata use + # this name + for i in range(1000): + name = "image_%i" % i + if name not in keys: + data = ImageGroup(name, self.__fabio_reader) + self.add_node(data) + break + else: + raise Exception("image_i for 0..1000 already used") + + # add all counters + for name in keys: + data = self.__fabio_reader.get_value(FabioReader.COUNTER, name) + dataset = commonh5.Dataset(name, data) + self.add_node(dataset) + + +class FabioReader(object): + """Class which read and cache data and metadata from a fabio image.""" + + DEFAULT = 0 + COUNTER = 1 + POSITIONER = 2 + + def __init__(self, file_name=None, fabio_image=None, file_series=None): + """ + Constructor + + :param str file_name: File name of the image file to read + :param fabio.fabioimage.FabioImage fabio_image: An already openned + :class:`fabio.fabioimage.FabioImage` instance. + :param Union[list[str],fabio.file_series.file_series] file_series: An + list of file name or a :class:`fabio.file_series.file_series` + instance + """ + self.__at_least_32bits = False + self.__signed_type = False + + self.__load(file_name, fabio_image, file_series) + self.__counters = {} + self.__positioners = {} + self.__measurements = {} + self.__key_filters = set([]) + self.__data = None + self.__frame_count = self.frame_count() + self._read() + + def __load(self, file_name=None, fabio_image=None, file_series=None): + if file_name is not None and fabio_image: + raise TypeError("Parameters file_name and fabio_image are mutually exclusive.") + if file_name is not None and fabio_image: + raise TypeError("Parameters fabio_image and file_series are mutually exclusive.") + + self.__must_be_closed = False + + if file_name is not None: + self.__fabio_file = fabio.open(file_name) + self.__must_be_closed = True + elif fabio_image is not None: + if isinstance(fabio_image, fabio.fabioimage.FabioImage): + self.__fabio_file = fabio_image + else: + raise TypeError("FabioImage expected but %s found.", fabio_image.__class__) + elif file_series is not None: + if isinstance(file_series, list): + self.__fabio_file = _FileSeries(file_series) + elif isinstance(file_series, fabio.file_series.file_series): + self.__fabio_file = file_series + else: + raise TypeError("file_series or list expected but %s found.", file_series.__class__) + + def close(self): + """Close the object, and free up associated resources. + + The associated FabioImage is closed only if the object was created from + a filename by this class itself. + + After calling this method, attempts to use the object (and children) + may fail. + """ + if self.__must_be_closed: + # Make sure the API of fabio provide it a 'close' method + # TODO the test can be removed if fabio version >= 0.8 + if hasattr(self.__fabio_file, "close"): + self.__fabio_file.close() + self.__fabio_file = None + + def fabio_file(self): + return self.__fabio_file + + def frame_count(self): + """Returns the number of frames available.""" + if isinstance(self.__fabio_file, fabio.file_series.file_series): + return len(self.__fabio_file) + elif isinstance(self.__fabio_file, fabio.fabioimage.FabioImage): + return self.__fabio_file.nframes + else: + raise TypeError("Unsupported type %s", self.__fabio_file.__class__) + + def iter_frames(self): + """Iter all the available frames. + + A frame provides at least `data` and `header` attributes. + """ + if isinstance(self.__fabio_file, fabio.file_series.file_series): + for file_number in range(len(self.__fabio_file)): + with self.__fabio_file.jump_image(file_number) as fabio_image: + # return the first frame only + assert(fabio_image.nframes == 1) + yield fabio_image + elif isinstance(self.__fabio_file, fabio.fabioimage.FabioImage): + for frame_count in range(self.__fabio_file.nframes): + if self.__fabio_file.nframes == 1: + yield self.__fabio_file + else: + yield self.__fabio_file.getframe(frame_count) + else: + raise TypeError("Unsupported type %s", self.__fabio_file.__class__) + + def _create_data(self): + """Initialize hold data by merging all frames into a single cube. + + Choose the cube size which fit the best the data. If some images are + smaller than expected, the empty space is set to 0. + + The computation is cached into the class, and only done ones. + """ + images = [] + for fabio_frame in self.iter_frames(): + images.append(fabio_frame.data) + + # returns the data without extra dim in case of single frame + if len(images) == 1: + return images[0] + + # get the max size + max_dim = max([i.ndim for i in images]) + max_shape = [0] * max_dim + for image in images: + for dim in range(image.ndim): + if image.shape[dim] > max_shape[dim]: + max_shape[dim] = image.shape[dim] + max_shape = tuple(max_shape) + + # fix smallest images + for index, image in enumerate(images): + if image.shape == max_shape: + continue + location = [slice(0, i) for i in image.shape] + while len(location) < max_dim: + location.append(0) + normalized_image = numpy.zeros(max_shape, dtype=image.dtype) + normalized_image[tuple(location)] = image + images[index] = normalized_image + + # create a cube + return numpy.array(images) + + def __get_dict(self, kind): + """Returns a dictionary from according to an expected kind""" + if kind == self.DEFAULT: + return self.__measurements + elif kind == self.COUNTER: + return self.__counters + elif kind == self.POSITIONER: + return self.__positioners + else: + raise Exception("Unexpected kind %s", kind) + + def get_data(self): + """Returns a cube from all available data from frames + + :rtype: numpy.ndarray + """ + if self.__data is None: + self.__data = self._create_data() + return self.__data + + def get_keys(self, kind): + """Get all available keys according to a kind of metadata. + + :rtype: list + """ + return self.__get_dict(kind).keys() + + def get_value(self, kind, name): + """Get a metadata value according to the kind and the name. + + :rtype: numpy.ndarray + """ + value = self.__get_dict(kind)[name] + if not isinstance(value, numpy.ndarray): + if kind in [self.COUNTER, self.POSITIONER]: + # Force normalization for counters and positioners + old = self._set_vector_normalization(at_least_32bits=True, signed_type=True) + else: + old = None + value = self._convert_metadata_vector(value) + self.__get_dict(kind)[name] = value + if old is not None: + self._set_vector_normalization(*old) + return value + + def _set_counter_value(self, frame_id, name, value): + """Set a counter metadata according to the frame id""" + if name not in self.__counters: + self.__counters[name] = [None] * self.__frame_count + self.__counters[name][frame_id] = value + + def _set_positioner_value(self, frame_id, name, value): + """Set a positioner metadata according to the frame id""" + if name not in self.__positioners: + self.__positioners[name] = [None] * self.__frame_count + self.__positioners[name][frame_id] = value + + def _set_measurement_value(self, frame_id, name, value): + """Set a measurement metadata according to the frame id""" + if name not in self.__measurements: + self.__measurements[name] = [None] * self.__frame_count + self.__measurements[name][frame_id] = value + + def _enable_key_filters(self, fabio_file): + self.__key_filters.clear() + if hasattr(fabio_file, "RESERVED_HEADER_KEYS"): + # Provided in fabio 0.5 + for key in fabio_file.RESERVED_HEADER_KEYS: + self.__key_filters.add(key.lower()) + + def _read(self): + """Read all metadata from the fabio file and store it into this + object.""" + + file_series = isinstance(self.__fabio_file, fabio.file_series.file_series) + if not file_series: + self._enable_key_filters(self.__fabio_file) + + for frame_id, fabio_frame in enumerate(self.iter_frames()): + if file_series: + self._enable_key_filters(fabio_frame) + self._read_frame(frame_id, fabio_frame.header) + + def _is_filtered_key(self, key): + """ + If this function returns True, the :meth:`_read_key` while not be + called with this `key`while reading the metatdata frame. + + :param str key: A key of the metadata + :rtype: bool + """ + return key.lower() in self.__key_filters + + def _read_frame(self, frame_id, header): + """Read all metadata from a frame and store it into this + object.""" + for key, value in header.items(): + if self._is_filtered_key(key): + continue + self._read_key(frame_id, key, value) + + def _read_key(self, frame_id, name, value): + """Read a key from the metadata and cache it into this object.""" + self._set_measurement_value(frame_id, name, value) + + def _set_vector_normalization(self, at_least_32bits, signed_type): + previous = self.__at_least_32bits, self.__signed_type + self.__at_least_32bits = at_least_32bits + self.__signed_type = signed_type + return previous + + def _normalize_vector_type(self, dtype): + """Normalize the """ + if self.__at_least_32bits: + if numpy.issubdtype(dtype, numpy.signedinteger): + dtype = numpy.result_type(dtype, numpy.uint32) + if numpy.issubdtype(dtype, numpy.unsignedinteger): + dtype = numpy.result_type(dtype, numpy.uint32) + elif numpy.issubdtype(dtype, numpy.floating): + dtype = numpy.result_type(dtype, numpy.float32) + elif numpy.issubdtype(dtype, numpy.complexfloating): + dtype = numpy.result_type(dtype, numpy.complex64) + if self.__signed_type: + if numpy.issubdtype(dtype, numpy.unsignedinteger): + signed = numpy.dtype("%s%i" % ('i', dtype.itemsize)) + dtype = numpy.result_type(dtype, signed) + return dtype + + def _convert_metadata_vector(self, values): + """Convert a list of numpy data into a numpy array with the better + fitting type.""" + converted = [] + types = set([]) + has_none = False + is_array = False + array = [] + + for v in values: + if v is None: + converted.append(None) + has_none = True + array.append(None) + else: + c = self._convert_value(v) + if c.shape != tuple(): + array.append(v.split(" ")) + is_array = True + else: + array.append(v) + converted.append(c) + types.add(c.dtype) + + if has_none and len(types) == 0: + # That's a list of none values + return numpy.array([0] * len(values), numpy.int8) + + result_type = numpy.result_type(*types) + + if issubclass(result_type.type, numpy.string_): + # use the raw data to create the array + result = values + elif issubclass(result_type.type, numpy.unicode_): + # use the raw data to create the array + result = values + else: + result = converted + + result_type = self._normalize_vector_type(result_type) + + if has_none: + # Fix missing data according to the array type + if result_type.kind == "S": + none_value = b"" + elif result_type.kind == "U": + none_value = u"" + elif result_type.kind == "f": + none_value = numpy.float64("NaN") + elif result_type.kind == "i": + none_value = numpy.int64(0) + elif result_type.kind == "u": + none_value = numpy.int64(0) + elif result_type.kind == "b": + none_value = numpy.bool_(False) + else: + none_value = None + + for index, r in enumerate(result): + if r is not None: + continue + result[index] = none_value + values[index] = none_value + array[index] = none_value + + if result_type.kind in "uifd" and len(types) > 1 and len(values) > 1: + # Catch numerical precision + if is_array and len(array) > 1: + return numpy.array(array, dtype=result_type) + else: + return numpy.array(values, dtype=result_type) + return numpy.array(result, dtype=result_type) + + def _convert_value(self, value): + """Convert a string into a numpy object (scalar or array). + + The value is most of the time a string, but it can be python object + in case if TIFF decoder for example. + """ + if isinstance(value, list): + # convert to a numpy array + return numpy.array(value) + if isinstance(value, dict): + # convert to a numpy associative array + key_dtype = numpy.min_scalar_type(list(value.keys())) + value_dtype = numpy.min_scalar_type(list(value.values())) + associative_type = [('key', key_dtype), ('value', value_dtype)] + assert key_dtype.kind != "O" and value_dtype.kind != "O" + return numpy.array(list(value.items()), dtype=associative_type) + if isinstance(value, numbers.Number): + dtype = numpy.min_scalar_type(value) + assert dtype.kind != "O" + return dtype.type(value) + + if isinstance(value, bytes): + try: + value = value.decode('utf-8') + except UnicodeDecodeError: + return numpy.void(value) + + if " " in value: + result = self._convert_list(value) + else: + result = self._convert_scalar_value(value) + return result + + def _convert_scalar_value(self, value): + """Convert a string into a numpy int or float. + + If it is not possible it returns a numpy string. + """ + try: + numpy_type = silx.utils.number.min_numerical_convertible_type(value) + converted = numpy_type(value) + except ValueError: + converted = numpy.string_(value) + return converted + + def _convert_list(self, value): + """Convert a string into a typed numpy array. + + If it is not possible it returns a numpy string. + """ + try: + numpy_values = [] + values = value.split(" ") + types = set([]) + for string_value in values: + v = self._convert_scalar_value(string_value) + numpy_values.append(v) + types.add(v.dtype.type) + + result_type = numpy.result_type(*types) + + if issubclass(result_type.type, (numpy.string_, bytes)): + # use the raw data to create the result + return numpy.string_(value) + elif issubclass(result_type.type, (numpy.unicode_, str)): + # use the raw data to create the result + return numpy.unicode_(value) + else: + if len(types) == 1: + return numpy.array(numpy_values, dtype=result_type) + else: + return numpy.array(values, dtype=result_type) + except ValueError: + return numpy.string_(value) + + def has_sample_information(self): + """Returns true if there is information about the sample in the + file + + :rtype: bool + """ + return self.has_ub_matrix() + + def has_ub_matrix(self): + """Returns true if a UB matrix is available. + + :rtype: bool + """ + return False + + def is_spectrum(self): + """Returns true if the data should be interpreted as + MCA data. + + :rtype: bool + """ + return False + + +class EdfFabioReader(FabioReader): + """Class which read and cache data and metadata from a fabio image. + + It is mostly the same as FabioReader, but counter_mne and + motor_mne are parsed using a special way. + """ + + def __init__(self, file_name=None, fabio_image=None, file_series=None): + FabioReader.__init__(self, file_name, fabio_image, file_series) + self.__unit_cell_abc = None + self.__unit_cell_alphabetagamma = None + self.__ub_matrix = None + + def _read_frame(self, frame_id, header): + """Overwrite the method to check and parse special keys: counter and + motors keys.""" + self.__catch_keys = set([]) + if "motor_pos" in header and "motor_mne" in header: + self.__catch_keys.add("motor_pos") + self.__catch_keys.add("motor_mne") + self._read_mnemonic_key(frame_id, "motor", header) + if "counter_pos" in header and "counter_mne" in header: + self.__catch_keys.add("counter_pos") + self.__catch_keys.add("counter_mne") + self._read_mnemonic_key(frame_id, "counter", header) + FabioReader._read_frame(self, frame_id, header) + + def _is_filtered_key(self, key): + if key in self.__catch_keys: + return True + return FabioReader._is_filtered_key(self, key) + + def _get_mnemonic_key(self, base_key, header): + mnemonic_values_key = base_key + "_mne" + mnemonic_values = header.get(mnemonic_values_key, "") + mnemonic_values = mnemonic_values.split() + pos_values_key = base_key + "_pos" + pos_values = header.get(pos_values_key, "") + pos_values = pos_values.split() + + result = collections.OrderedDict() + nbitems = max(len(mnemonic_values), len(pos_values)) + for i in range(nbitems): + if i < len(mnemonic_values): + mnemonic = mnemonic_values[i] + else: + # skip the element + continue + + if i < len(pos_values): + pos = pos_values[i] + else: + pos = None + + result[mnemonic] = pos + return result + + def _read_mnemonic_key(self, frame_id, base_key, header): + """Parse a mnemonic key""" + is_counter = base_key == "counter" + is_positioner = base_key == "motor" + data = self._get_mnemonic_key(base_key, header) + + for mnemonic, pos in data.items(): + if is_counter: + self._set_counter_value(frame_id, mnemonic, pos) + elif is_positioner: + self._set_positioner_value(frame_id, mnemonic, pos) + else: + raise Exception("State unexpected (base_key: %s)" % base_key) + + def _get_first_header(self): + """ + ..note:: This function can be cached + """ + fabio_file = self.fabio_file() + if isinstance(fabio_file, fabio.file_series.file_series): + return fabio_file.jump_image(0).header + return fabio_file.header + + def has_ub_matrix(self): + """Returns true if a UB matrix is available. + + :rtype: bool + """ + header = self._get_first_header() + expected_keys = set(["UB_mne", "UB_pos", "sample_mne", "sample_pos"]) + return expected_keys.issubset(header) + + def parse_ub_matrix(self): + header = self._get_first_header() + ub_data = self._get_mnemonic_key("UB", header) + s_data = self._get_mnemonic_key("sample", header) + if len(ub_data) > 9: + _logger.warning("UB_mne and UB_pos contains more than expected keys.") + if len(s_data) > 6: + _logger.warning("sample_mne and sample_pos contains more than expected keys.") + + data = numpy.array([s_data["U0"], s_data["U1"], s_data["U2"]], dtype=float) + unit_cell_abc = data + + data = numpy.array([s_data["U3"], s_data["U4"], s_data["U5"]], dtype=float) + unit_cell_alphabetagamma = data + + ub_matrix = numpy.array([[ + [ub_data["UB0"], ub_data["UB1"], ub_data["UB2"]], + [ub_data["UB3"], ub_data["UB4"], ub_data["UB5"]], + [ub_data["UB6"], ub_data["UB7"], ub_data["UB8"]]]], dtype=float) + + self.__unit_cell_abc = unit_cell_abc + self.__unit_cell_alphabetagamma = unit_cell_alphabetagamma + self.__ub_matrix = ub_matrix + + def get_unit_cell_abc(self): + """Get a numpy array data as defined for the dataset unit_cell_abc + from the NXsample dataset. + + :rtype: numpy.ndarray + """ + if self.__unit_cell_abc is None: + self.parse_ub_matrix() + return self.__unit_cell_abc + + def get_unit_cell_alphabetagamma(self): + """Get a numpy array data as defined for the dataset + unit_cell_alphabetagamma from the NXsample dataset. + + :rtype: numpy.ndarray + """ + if self.__unit_cell_alphabetagamma is None: + self.parse_ub_matrix() + return self.__unit_cell_alphabetagamma + + def get_ub_matrix(self): + """Get a numpy array data as defined for the dataset ub_matrix + from the NXsample dataset. + + :rtype: numpy.ndarray + """ + if self.__ub_matrix is None: + self.parse_ub_matrix() + return self.__ub_matrix + + def is_spectrum(self): + """Returns true if the data should be interpreted as + MCA data. + EDF files or file series, with two or more header names starting with + "MCA", should be interpreted as MCA data. + + :rtype: bool + """ + count = 0 + for key in self._get_first_header(): + if key.lower().startswith("mca"): + count += 1 + if count >= 2: + return True + return False + + +class File(commonh5.File): + """Class which handle a fabio image as a mimick of a h5py.File. + """ + + def __init__(self, file_name=None, fabio_image=None, file_series=None): + """ + Constructor + + :param str file_name: File name of the image file to read + :param fabio.fabioimage.FabioImage fabio_image: An already openned + :class:`fabio.fabioimage.FabioImage` instance. + :param Union[list[str],fabio.file_series.file_series] file_series: An + list of file name or a :class:`fabio.file_series.file_series` + instance + """ + self.__fabio_reader = self.create_fabio_reader(file_name, fabio_image, file_series) + if fabio_image is not None: + file_name = fabio_image.filename + scan = self.create_scan_group(self.__fabio_reader) + + attrs = {"NX_class": "NXroot", + "file_time": datetime.datetime.now().isoformat(), + "creator": "silx %s" % silx_version, + "default": scan.basename} + if file_name is not None: + attrs["file_name"] = file_name + commonh5.File.__init__(self, name=file_name, attrs=attrs) + self.add_node(scan) + + def create_scan_group(self, fabio_reader): + """Factory to create the scan group. + + :param FabioImage fabio_image: A Fabio image + :param FabioReader fabio_reader: A reader for the Fabio image + :rtype: commonh5.Group + """ + nxdata = NxDataPreviewGroup("image", fabio_reader) + scan_attrs = { + "NX_class": "NXentry", + "default": nxdata.basename, + } + scan = commonh5.Group("scan_0", attrs=scan_attrs) + instrument = commonh5.Group("instrument", attrs={"NX_class": "NXinstrument"}) + measurement = MeasurementGroup("measurement", fabio_reader, attrs={"NX_class": "NXcollection"}) + file_ = commonh5.Group("file", attrs={"NX_class": "NXcollection"}) + positioners = MetadataGroup("positioners", fabio_reader, FabioReader.POSITIONER, attrs={"NX_class": "NXpositioner"}) + raw_header = RawHeaderData("scan_header", fabio_reader, self) + detector = DetectorGroup("detector_0", fabio_reader) + + scan.add_node(instrument) + instrument.add_node(positioners) + instrument.add_node(file_) + instrument.add_node(detector) + file_.add_node(raw_header) + scan.add_node(measurement) + scan.add_node(nxdata) + + if fabio_reader.has_sample_information(): + sample = SampleGroup("sample", fabio_reader) + scan.add_node(sample) + + return scan + + def create_fabio_reader(self, file_name, fabio_image, file_series): + """Factory to create fabio reader. + + :rtype: FabioReader""" + use_edf_reader = False + first_file_name = None + first_image = None + + if isinstance(file_series, list): + first_file_name = file_series[0] + elif isinstance(file_series, fabio.file_series.file_series): + first_image = file_series.first_image() + elif fabio_image is not None: + first_image = fabio_image + else: + first_file_name = file_name + + if first_file_name is not None: + _, ext = os.path.splitext(first_file_name) + ext = ext[1:] + edfimage = fabio.edfimage.EdfImage + if hasattr(edfimage, "DEFAULT_EXTENTIONS"): + # Typo on fabio 0.5 + edf_extensions = edfimage.DEFAULT_EXTENTIONS + else: + edf_extensions = edfimage.DEFAULT_EXTENSIONS + use_edf_reader = ext in edf_extensions + elif first_image is not None: + use_edf_reader = isinstance(first_image, fabio.edfimage.EdfImage) + else: + assert(False) + + if use_edf_reader: + reader = EdfFabioReader(file_name, fabio_image, file_series) + else: + reader = FabioReader(file_name, fabio_image, file_series) + return reader + + def close(self): + """Close the object, and free up associated resources. + + After calling this method, attempts to use the object (and children) + may fail. + """ + self.__fabio_reader.close() + self.__fabio_reader = None |