diff options
Diffstat (limited to 'silx/io/fabioh5.py')
-rw-r--r-- | silx/io/fabioh5.py | 1151 |
1 files changed, 1151 insertions, 0 deletions
diff --git a/silx/io/fabioh5.py b/silx/io/fabioh5.py new file mode 100644 index 0000000..092ac0c --- /dev/null +++ b/silx/io/fabioh5.py @@ -0,0 +1,1151 @@ +# coding: utf-8 +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""This module provides functions to read fabio images as an HDF5 file. + + >>> import silx.io.fabioh5 + >>> f = silx.io.fabioh5.File("foobar.edf") + +.. note:: This module has a dependency on the `h5py <http://www.h5py.org/>`_ + and `fabio <https://github.com/silx-kit/fabio>`_ libraries, + which are not a mandatory dependencies for `silx`. You might need + to install it if you don't already have it. +""" + +import collections +import numpy +import numbers +import logging +from silx.third_party import six + +_logger = logging.getLogger(__name__) + +try: + import fabio +except ImportError as e: + _logger.error("Module %s requires fabio", __name__) + raise e + +try: + import h5py +except ImportError as e: + _logger.error("Module %s requires h5py", __name__) + raise e + + +class Node(object): + """Main class for all fabioh5 classes. Help to manage a tree.""" + + def __init__(self, name, parent=None): + self.__parent = parent + self.__basename = name + + @property + def h5py_class(self): + """Returns the h5py classes which is mimicked by this class. It can be + one of `h5py.File, h5py.Group` or `h5py.Dataset` + + :rtype: Class + """ + raise NotImplementedError() + + @property + def parent(self): + """Returns the parent of the node. + + :rtype: Node + """ + return self.__parent + + @property + def file(self): + """Returns the file node of this node. + + :rtype: Node + """ + node = self + while node.__parent is not None: + node = node.__parent + if isinstance(node, File): + return node + else: + return None + + def _set_parent(self, parent): + """Set the parent of this node. + + It do not update the parent object. + + :param Node parent: New parent for this node + """ + self.__parent = parent + + @property + def attrs(self): + """Returns HDF5 attributes of this node. + + :rtype: dict + """ + return {} + + @property + def name(self): + """Returns the HDF5 name of this node. + """ + if self.__parent is None: + return "/" + if self.__parent.name == "/": + return "/" + self.basename + return self.__parent.name + "/" + self.basename + + @property + def basename(self): + """Returns the HDF5 basename of this node. + """ + return self.__basename + + +class Dataset(Node): + """Class which handle a numpy data as a mimic of a h5py.Dataset. + """ + + def __init__(self, name, data, parent=None, attrs=None): + self.__data = data + Node.__init__(self, name, parent) + if attrs is None: + self.__attrs = {} + else: + self.__attrs = attrs + + def _set_data(self, data): + """Set the data exposed by the dataset. + + It have to be called only one time before the data is used. It should + not be edited after use. + + :param numpy.ndarray data: Data associated to the dataset + """ + self.__data = data + + def _get_data(self): + """Returns the exposed data + + :rtype: numpy.ndarray + """ + return self.__data + + @property + def attrs(self): + """Returns HDF5 attributes of this node. + + :rtype: dict + """ + return self.__attrs + + @property + def h5py_class(self): + """Returns the h5py classes which is mimicked by this class. It can be + one of `h5py.File, h5py.Group` or `h5py.Dataset` + + :rtype: Class + """ + return h5py.Dataset + + @property + def dtype(self): + """Returns the numpy datatype exposed by this dataset. + + :rtype: numpy.dtype + """ + return self._get_data().dtype + + @property + def shape(self): + """Returns the shape of the data exposed by this dataset. + + :rtype: tuple + """ + if isinstance(self._get_data(), numpy.ndarray): + return self._get_data().shape + else: + return tuple() + + @property + def size(self): + """Returns the size of the data exposed by this dataset. + + :rtype: int + """ + if isinstance(self._get_data(), numpy.ndarray): + return self._get_data().size + else: + # It is returned as float64 1.0 by h5py + return numpy.float64(1.0) + + def __len__(self): + """Returns the size of the data exposed by this dataset. + + :rtype: int + """ + if isinstance(self._get_data(), numpy.ndarray): + return len(self._get_data()) + else: + # It is returned as float64 1.0 by h5py + raise TypeError("Attempt to take len() of scalar dataset") + + def __getitem__(self, item): + """Returns the slice of the data exposed by this dataset. + + :rtype: numpy.ndarray + """ + if not isinstance(self._get_data(), numpy.ndarray): + if item == Ellipsis: + return numpy.array(self._get_data()) + elif item == tuple(): + return self._get_data() + else: + raise ValueError("Scalar can only be reached with an ellipsis or an empty tuple") + return self._get_data().__getitem__(item) + + def __str__(self): + basename = self.name.split("/")[-1] + return '<FabIO dataset "%s": shape %s, type "%s">' % \ + (basename, self.shape, self.dtype.str) + + def __getslice__(self, i, j): + """Returns the slice of the data exposed by this dataset. + + Deprecated but still in use for python 2.7 + + :rtype: numpy.ndarray + """ + return self.__getitem__(slice(i, j, None)) + + @property + def value(self): + """Returns the data exposed by this dataset. + + Deprecated by h5py. It is prefered to use indexing `[()]`. + + :rtype: numpy.ndarray + """ + return self._get_data() + + @property + def compression(self): + """Returns compression as provided by `h5py.Dataset`. + + There is no compression.""" + return None + + @property + def compression_opts(self): + """Returns compression options as provided by `h5py.Dataset`. + + There is no compression.""" + return None + + @property + def chunks(self): + """Returns chunks as provided by `h5py.Dataset`. + + There is no chunks.""" + return None + + +class LazyLoadableDataset(Dataset): + """Abstract dataset which provide a lazy loading of the data. + + The class have to be inherited and the :meth:`_create_data` have to be + implemented to return the numpy data exposed by the dataset. This factory + is only called ones, when the data is needed. + """ + + def __init__(self, name, parent=None, attrs=None): + super(LazyLoadableDataset, self).__init__(name, None, parent, attrs=attrs) + self.__is_initialized = False + + def _create_data(self): + """ + Factory to create the data exposed by the dataset when it is needed. + + It have to be implemented to work. + + :rtype: numpy.ndarray + """ + raise NotImplementedError() + + def _get_data(self): + """Returns the data exposed by the dataset. + + Overwrite Dataset method :meth:`_get_data` to implement the lazy + loading feature. + + :rtype: numpy.ndarray + """ + if not self.__is_initialized: + data = self._create_data() + self._set_data(data) + self.__is_initialized = True + return super(LazyLoadableDataset, self)._get_data() + + +class Group(Node): + """Class which mimic a `h5py.Group`.""" + + def __init__(self, name, parent=None, attrs=None): + Node.__init__(self, name, parent) + self.__items = collections.OrderedDict() + if attrs is None: + attrs = {} + self.__attrs = attrs + + def _get_items(self): + """Returns the child items as a name-node dictionary. + + :rtype: dict + """ + return self.__items + + def add_node(self, node): + """Add a child to this group. + + :param Node node: Child to add to this group + """ + self._get_items()[node.basename] = node + node._set_parent(self) + + @property + def h5py_class(self): + """Returns the h5py classes which is mimicked by this class. + + It returns `h5py.Group` + + :rtype: Class + """ + return h5py.Group + + @property + def attrs(self): + """Returns HDF5 attributes of this node. + + :rtype: dict + """ + return self.__attrs + + def items(self): + """Returns items iterator containing name-node mapping. + + :rtype: iterator + """ + return self._get_items().items() + + def get(self, name, default=None, getclass=False, getlink=False): + """ Retrieve an item or other information. + + If getlink only is true, the returned value is always HardLink + cause this implementation do not use links. Like the original + implementation. + + :param str name: name of the item + :param object default: default value returned if the name is not found + :param bool getclass: if true, the returned object is the class of the object found + :param bool getlink: if true, links object are returned instead of the target + :return: An object, else None + :rtype: object + """ + if name not in self._get_items(): + return default + + if getlink: + node = h5py.HardLink() + else: + node = self._get_items()[name] + + if getclass: + obj = node.h5py_class + else: + obj = node + return obj + + def __len__(self): + """Returns the number of child contained in this group. + + :rtype: int + """ + return len(self._get_items()) + + def __iter__(self): + """Iterate over member names""" + for x in self._get_items().__iter__(): + yield x + + def __getitem__(self, name): + """Return a child from is name. + + :param name str: name of a member or a path throug members using '/' + separator. A '/' as a prefix access to the root item of the tree. + :rtype: Node + """ + + if name is None or name == "": + raise ValueError("No name") + + if "/" not in name: + return self._get_items()[name] + + if name.startswith("/"): + root = self + while root.parent is not None: + root = root.parent + if name == "/": + return root + return root[name[1:]] + + path = name.split("/") + result = self + for item_name in path: + if not isinstance(result, Group): + raise KeyError("Unable to open object (Component not found)") + result = result._get_items()[item_name] + + return result + + def __contains__(self, name): + """Returns true is a name is an existing child of this group. + + :rtype: bool + """ + return name in self._get_items() + + def keys(self): + return self._get_items().keys() + + +class LazyLoadableGroup(Group): + """Abstract group which provide a lazy loading of the child. + + The class have to be inherited and the :meth:`_create_child` have to be + implemented to add (:meth:`_add_node`) all child. This factory + is only called ones, when child are needed. + """ + + def __init__(self, name, parent=None, attrs=None): + Group.__init__(self, name, parent, attrs) + self.__is_initialized = False + + def _get_items(self): + """Returns internal structure which contains child. + + It overwrite method :meth:`_get_items` to implement the lazy + loading feature. + + :rtype: dict + """ + if not self.__is_initialized: + self.__is_initialized = True + self._create_child() + return Group._get_items(self) + + def _create_child(self): + """ + Factory to create the child contained by the group when it is needed. + + It have to be implemented to work. + """ + raise NotImplementedError() + + +class FrameData(LazyLoadableDataset): + """Expose a cube of image from a Fabio file using `FabioReader` as + cache.""" + + def __init__(self, name, fabio_reader, parent=None): + attrs = {"interpretation": "image"} + LazyLoadableDataset.__init__(self, name, parent, attrs=attrs) + self.__fabio_reader = fabio_reader + + def _create_data(self): + return self.__fabio_reader.get_data() + + +class RawHeaderData(LazyLoadableDataset): + """Lazy loadable raw header""" + + def __init__(self, name, fabio_file, parent=None): + LazyLoadableDataset.__init__(self, name, parent) + self.__fabio_file = fabio_file + + def _create_data(self): + """Initialize hold data by merging all headers of each frames. + """ + headers = [] + for frame in range(self.__fabio_file.nframes): + if self.__fabio_file.nframes == 1: + header = self.__fabio_file.header + else: + header = self.__fabio_file.getframe(frame).header + + data = [] + for key, value in header.items(): + data.append("%s: %s" % (str(key), str(value))) + + headers.append(u"\n".join(data)) + + # create the header list + return numpy.array(headers) + + +class MetadataGroup(LazyLoadableGroup): + """Abstract class for groups containing a reference to a fabio image. + """ + + def __init__(self, name, metadata_reader, kind, parent=None, attrs=None): + LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__metadata_reader = metadata_reader + self.__kind = kind + + def _create_child(self): + keys = self.__metadata_reader.get_keys(self.__kind) + for name in keys: + data = self.__metadata_reader.get_value(self.__kind, name) + dataset = Dataset(name, data) + self.add_node(dataset) + + @property + def _metadata_reader(self): + return self.__metadata_reader + + +class DetectorGroup(LazyLoadableGroup): + """Define the detector group (sub group of instrument) using Fabio data. + """ + + def __init__(self, name, fabio_reader, parent=None, attrs=None): + if attrs is None: + attrs = {"NX_class": "NXdetector"} + LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__fabio_reader = fabio_reader + + def _create_child(self): + data = FrameData("data", self.__fabio_reader) + self.add_node(data) + + # TODO we should add here Nexus informations we can extract from the + # metadata + + others = MetadataGroup("others", self.__fabio_reader, kind=FabioReader.DEFAULT) + self.add_node(others) + + +class ImageGroup(LazyLoadableGroup): + """Define the image group (sub group of measurement) using Fabio data. + """ + + def __init__(self, name, fabio_reader, parent=None, attrs=None): + LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__fabio_reader = fabio_reader + + def _create_child(self): + data = FrameData("data", self.__fabio_reader) + self.add_node(data) + + # TODO detector should be a real soft-link + detector = DetectorGroup("info", self.__fabio_reader) + self.add_node(detector) + + +class SampleGroup(LazyLoadableGroup): + """Define the image group (sub group of measurement) using Fabio data. + """ + + def __init__(self, name, fabio_reader, parent=None): + attrs = {"NXclass": "NXsample"} + LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__fabio_reader = fabio_reader + + def _create_child(self): + if self.__fabio_reader.has_ub_matrix(): + scalar = {"interpretation": "scalar"} + data = self.__fabio_reader.get_unit_cell_abc() + data = Dataset("unit_cell_abc", data, attrs=scalar) + self.add_node(data) + unit_cell_data = numpy.zeros((1, 6), numpy.float32) + unit_cell_data[0, :3] = data + data = self.__fabio_reader.get_unit_cell_alphabetagamma() + data = Dataset("unit_cell_alphabetagamma", data, attrs=scalar) + self.add_node(data) + unit_cell_data[0, 3:] = data + data = Dataset("unit_cell", unit_cell_data, attrs=scalar) + self.add_node(data) + data = self.__fabio_reader.get_ub_matrix() + data = Dataset("ub_matrix", data, attrs=scalar) + self.add_node(data) + + +class MeasurementGroup(LazyLoadableGroup): + """Define the measurement group for fabio file. + """ + + def __init__(self, name, fabio_reader, parent=None, attrs=None): + LazyLoadableGroup.__init__(self, name, parent, attrs) + self.__fabio_reader = fabio_reader + + def _create_child(self): + keys = self.__fabio_reader.get_keys(FabioReader.COUNTER) + + # create image measurement but take care that no other metadata use + # this name + for i in range(1000): + name = "image_%i" % i + if name not in keys: + data = ImageGroup(name, self.__fabio_reader) + self.add_node(data) + break + else: + raise Exception("image_i for 0..1000 already used") + + # add all counters + for name in keys: + data = self.__fabio_reader.get_value(FabioReader.COUNTER, name) + dataset = Dataset(name, data) + self.add_node(dataset) + + +class FabioReader(object): + """Class which read and cache data and metadata from a fabio image.""" + + DEFAULT = 0 + COUNTER = 1 + POSITIONER = 2 + + def __init__(self, fabio_file): + self.__fabio_file = fabio_file + self.__counters = {} + self.__positioners = {} + self.__measurements = {} + self.__data = None + self.__frame_count = self.__fabio_file.nframes + self._read(self.__fabio_file) + + def fabio_file(self): + return self.__fabio_file + + def _create_data(self): + """Initialize hold data by merging all frames into a single cube. + + Choose the cube size which fit the best the data. If some images are + smaller than expected, the empty space is set to 0. + + The computation is cached into the class, and only done ones. + """ + images = [] + for frame in range(self.__fabio_file.nframes): + if self.__fabio_file.nframes == 1: + image = self.__fabio_file.data + else: + image = self.__fabio_file.getframe(frame).data + images.append(image) + + # get the max size + max_shape = [0, 0] + for image in images: + if image.shape[0] > max_shape[0]: + max_shape[0] = image.shape[0] + if image.shape[1] > max_shape[1]: + max_shape[1] = image.shape[1] + max_shape = tuple(max_shape) + + # fix smallest images + for index, image in enumerate(images): + if image.shape == max_shape: + continue + right_image = numpy.zeros(max_shape) + right_image[0:image.shape[0], 0:image.shape[1]] = image + images[index] = right_image + + # create a cube + return numpy.array(images) + + def __get_dict(self, kind): + """Returns a dictionary from according to an expected kind""" + if kind == self.DEFAULT: + return self.__measurements + elif kind == self.COUNTER: + return self.__counters + elif kind == self.POSITIONER: + return self.__positioners + else: + raise Exception("Unexpected kind %s", kind) + + def get_data(self): + """Returns a cube from all available data from frames + + :rtype: numpy.ndarray + """ + if self.__data is None: + self.__data = self._create_data() + return self.__data + + def get_keys(self, kind): + """Get all available keys according to a kind of metadata. + + :rtype: list + """ + return self.__get_dict(kind).keys() + + def get_value(self, kind, name): + """Get a metadata value according to the kind and the name. + + :rtype: numpy.ndarray + """ + value = self.__get_dict(kind)[name] + if not isinstance(value, numpy.ndarray): + value = self._convert_metadata_vector(value) + self.__get_dict(kind)[name] = value + return value + + def _set_counter_value(self, frame_id, name, value): + """Set a counter metadata according to the frame id""" + if name not in self.__counters: + self.__counters[name] = [None] * self.__frame_count + self.__counters[name][frame_id] = value + + def _set_positioner_value(self, frame_id, name, value): + """Set a positioner metadata according to the frame id""" + if name not in self.__positioners: + self.__positioners[name] = [None] * self.__frame_count + self.__positioners[name][frame_id] = value + + def _set_measurement_value(self, frame_id, name, value): + """Set a measurement metadata according to the frame id""" + if name not in self.__measurements: + self.__measurements[name] = [None] * self.__frame_count + self.__measurements[name][frame_id] = value + + def _read(self, fabio_file): + """Read all metadata from the fabio file and store it into this + object.""" + for frame in range(fabio_file.nframes): + if fabio_file.nframes == 1: + header = fabio_file.header + else: + header = fabio_file.getframe(frame).header + self._read_frame(frame, header) + + def _read_frame(self, frame_id, header): + """Read all metadata from a frame and store it into this + object.""" + for key, value in header.items(): + self._read_key(frame_id, key, value) + + def _read_key(self, frame_id, name, value): + """Read a key from the metadata and cache it into this object.""" + self._set_measurement_value(frame_id, name, value) + + def _convert_metadata_vector(self, values): + """Convert a list of numpy data into a numpy array with the better + fitting type.""" + converted = [] + types = set([]) + has_none = False + for v in values: + if v is None: + converted.append(None) + has_none = True + else: + c = self._convert_value(v) + converted.append(c) + types.add(c.dtype) + + if has_none and len(types) == 0: + # That's a list of none values + return numpy.array([0] * len(values), numpy.int8) + + result_type = numpy.result_type(*types) + + if issubclass(result_type.type, numpy.string_): + # use the raw data to create the array + result = values + elif issubclass(result_type.type, numpy.unicode_): + # use the raw data to create the array + result = values + else: + result = converted + + if has_none: + # Fix missing data according to the array type + if result_type.kind in ["S", "U"]: + none_value = "" + elif result_type.kind == "f": + none_value = numpy.float("NaN") + elif result_type.kind == "i": + none_value = numpy.int(0) + elif result_type.kind == "u": + none_value = numpy.int(0) + elif result_type.kind == "b": + none_value = numpy.bool(False) + else: + none_value = None + + for index, r in enumerate(result): + if r is not None: + continue + result[index] = none_value + + return numpy.array(result, dtype=result_type) + + def _convert_value(self, value): + """Convert a string into a numpy object (scalar or array). + + The value is most of the time a string, but it can be python object + in case if TIFF decoder for example. + """ + if isinstance(value, list): + # convert to a numpy array + return numpy.array(value) + if isinstance(value, dict): + # convert to a numpy associative array + key_dtype = numpy.min_scalar_type(list(value.keys())) + value_dtype = numpy.min_scalar_type(list(value.values())) + associative_type = [('key', key_dtype), ('value', value_dtype)] + assert key_dtype.kind != "O" and value_dtype.kind != "O" + return numpy.array(list(value.items()), dtype=associative_type) + if isinstance(value, numbers.Number): + dtype = numpy.min_scalar_type(value) + assert dtype.kind != "O" + return dtype.type(value) + + if isinstance(value, six.binary_type): + try: + value = value.decode('utf-8') + except UnicodeDecodeError: + return numpy.void(value) + + if " " in value: + result = self._convert_list(value) + else: + result = self._convert_scalar_value(value) + return result + + def _convert_scalar_value(self, value): + """Convert a string into a numpy int or float. + + If it is not possible it returns a numpy string. + """ + try: + value = int(value) + dtype = numpy.min_scalar_type(value) + assert dtype.kind != "O" + return dtype.type(value) + except ValueError: + try: + # numpy.min_scalar_type is not able to do very well the job + # when there is a lot of digit after the dot + # https://github.com/numpy/numpy/issues/8207 + # Let's count the digit of the string + digits = len(value) - 1 # minus the dot + if digits <= 7: + # A float32 is accurate with about 7 digits + return numpy.float32(value) + elif digits <= 16: + # A float64 is accurate with about 16 digits + return numpy.float64(value) + else: + if hasattr(numpy, "float128"): + return numpy.float128(value) + else: + return numpy.float64(value) + except ValueError: + return numpy.string_(value) + + def _convert_list(self, value): + """Convert a string into a typed numpy array. + + If it is not possible it returns a numpy string. + """ + try: + numpy_values = [] + values = value.split(" ") + types = set([]) + for string_value in values: + v = self._convert_scalar_value(string_value) + numpy_values.append(v) + types.add(v.dtype.type) + + result_type = numpy.result_type(*types) + + if issubclass(result_type.type, numpy.string_): + # use the raw data to create the result + return numpy.string_(value) + elif issubclass(result_type.type, numpy.unicode_): + # use the raw data to create the result + return numpy.unicode_(value) + else: + return numpy.array(numpy_values, dtype=result_type) + except ValueError: + return numpy.string_(value) + + def has_sample_information(self): + """Returns true if there is information about the sample in the + file + + :rtype: bool + """ + return self.has_ub_matrix() + + def has_ub_matrix(self): + """Returns true if a UB matrix is available. + + :rtype: bool + """ + return False + + +class EdfFabioReader(FabioReader): + """Class which read and cache data and metadata from a fabio image. + + It is mostly the same as FabioReader, but counter_mne and + motor_mne are parsed using a special way. + """ + + def __init__(self, fabio_file): + FabioReader.__init__(self, fabio_file) + self.__unit_cell_abc = None + self.__unit_cell_alphabetagamma = None + self.__ub_matrix = None + + def _read_frame(self, frame_id, header): + """Overwrite the method to check and parse special keys: counter and + motors keys.""" + self.__catch_keys = set([]) + if "motor_pos" in header and "motor_mne" in header: + self.__catch_keys.add("motor_pos") + self.__catch_keys.add("motor_mne") + self._read_mnemonic_key(frame_id, "motor", header) + if "counter_pos" in header and "counter_mne" in header: + self.__catch_keys.add("counter_pos") + self.__catch_keys.add("counter_mne") + self._read_mnemonic_key(frame_id, "counter", header) + FabioReader._read_frame(self, frame_id, header) + + def _read_key(self, frame_id, name, value): + """Overwrite the method to filter counter or motor keys.""" + if name in self.__catch_keys: + return + FabioReader._read_key(self, frame_id, name, value) + + def _get_mnemonic_key(self, base_key, header): + mnemonic_values_key = base_key + "_mne" + mnemonic_values = header.get(mnemonic_values_key, "") + mnemonic_values = mnemonic_values.split() + pos_values_key = base_key + "_pos" + pos_values = header.get(pos_values_key, "") + pos_values = pos_values.split() + + result = collections.OrderedDict() + nbitems = max(len(mnemonic_values), len(pos_values)) + for i in range(nbitems): + if i < len(mnemonic_values): + mnemonic = mnemonic_values[i] + else: + # skip the element + continue + + if i < len(pos_values): + pos = pos_values[i] + else: + pos = None + + result[mnemonic] = pos + return result + + def _read_mnemonic_key(self, frame_id, base_key, header): + """Parse a mnemonic key""" + is_counter = base_key == "counter" + is_positioner = base_key == "motor" + data = self._get_mnemonic_key(base_key, header) + + for mnemonic, pos in data.items(): + if is_counter: + self._set_counter_value(frame_id, mnemonic, pos) + elif is_positioner: + self._set_positioner_value(frame_id, mnemonic, pos) + else: + raise Exception("State unexpected (base_key: %s)" % base_key) + + def has_ub_matrix(self): + """Returns true if a UB matrix is available. + + :rtype: bool + """ + header = self.fabio_file().header + expected_keys = set(["UB_mne", "UB_pos", "sample_mne", "sample_pos"]) + return expected_keys.issubset(header) + + def parse_ub_matrix(self): + header = self.fabio_file().header + ub_data = self._get_mnemonic_key("UB", header) + s_data = self._get_mnemonic_key("sample", header) + if len(ub_data) > 9: + _logger.warning("UB_mne and UB_pos contains more than expected keys.") + if len(s_data) > 6: + _logger.warning("sample_mne and sample_pos contains more than expected keys.") + + data = numpy.array([s_data["U0"], s_data["U1"], s_data["U2"]], dtype=float) + unit_cell_abc = data + + data = numpy.array([s_data["U3"], s_data["U4"], s_data["U5"]], dtype=float) + unit_cell_alphabetagamma = data + + ub_matrix = numpy.array([[ + [ub_data["UB0"], ub_data["UB1"], ub_data["UB2"]], + [ub_data["UB3"], ub_data["UB4"], ub_data["UB5"]], + [ub_data["UB6"], ub_data["UB7"], ub_data["UB8"]]]], dtype=float) + + self.__unit_cell_abc = unit_cell_abc + self.__unit_cell_alphabetagamma = unit_cell_alphabetagamma + self.__ub_matrix = ub_matrix + + def get_unit_cell_abc(self): + """Get a numpy array data as defined for the dataset unit_cell_abc + from the NXsample dataset. + + :rtype: numpy.ndarray + """ + if self.__unit_cell_abc is None: + self.parse_ub_matrix() + return self.__unit_cell_abc + + def get_unit_cell_alphabetagamma(self): + """Get a numpy array data as defined for the dataset + unit_cell_alphabetagamma from the NXsample dataset. + + :rtype: numpy.ndarray + """ + if self.__unit_cell_alphabetagamma is None: + self.parse_ub_matrix() + return self.__unit_cell_alphabetagamma + + def get_ub_matrix(self): + """Get a numpy array data as defined for the dataset ub_matrix + from the NXsample dataset. + + :rtype: numpy.ndarray + """ + if self.__ub_matrix is None: + self.parse_ub_matrix() + return self.__ub_matrix + + +class File(Group): + """Class which handle a fabio image as a mimick of a h5py.File. + """ + + def __init__(self, file_name=None, fabio_image=None): + self.__must_be_closed = False + if file_name is not None and fabio_image is not None: + raise TypeError("Parameters file_name and fabio_image are mutually exclusive.") + if file_name is not None: + self.__fabio_image = fabio.open(file_name) + self.__must_be_closed = True + elif fabio_image is not None: + self.__fabio_image = fabio_image + Group.__init__(self, name="", parent=None, attrs={"NX_class": "NXroot"}) + self.__fabio_reader = self.create_fabio_reader(self.__fabio_image) + scan = self.create_scan_group(self.__fabio_image, self.__fabio_reader) + self.add_node(scan) + + def create_scan_group(self, fabio_image, fabio_reader): + """Factory to create the scan group. + + :param FabioImage fabio_image: A Fabio image + :param FabioReader fabio_reader: A reader for the Fabio image + :rtype: Group + """ + + scan = Group("scan_0", attrs={"NX_class": "NXentry"}) + instrument = Group("instrument", attrs={"NX_class": "NXinstrument"}) + measurement = MeasurementGroup("measurement", fabio_reader, attrs={"NX_class": "NXcollection"}) + file_ = Group("file", attrs={"NX_class": "NXcollection"}) + positioners = MetadataGroup("positioners", fabio_reader, FabioReader.POSITIONER, attrs={"NX_class": "NXpositioner"}) + raw_header = RawHeaderData("scan_header", fabio_image, self) + detector = DetectorGroup("detector_0", fabio_reader) + + scan.add_node(instrument) + instrument.add_node(positioners) + instrument.add_node(file_) + instrument.add_node(detector) + file_.add_node(raw_header) + scan.add_node(measurement) + + if fabio_reader.has_sample_information(): + sample = SampleGroup("sample", fabio_reader) + scan.add_node(sample) + + return scan + + def create_fabio_reader(self, fabio_file): + """Factory to create fabio reader. + + :rtype: FabioReader""" + if isinstance(fabio_file, fabio.edfimage.EdfImage): + metadata = EdfFabioReader(fabio_file) + else: + metadata = FabioReader(fabio_file) + return metadata + + @property + def h5py_class(self): + return h5py.File + + @property + def filename(self): + return self.__fabio_image.filename + + def __enter__(self): + return self + + def __exit__(self, type, value, tb): # pylint: disable=W0622 + """Called at the end of a `with` statement. + + It will close the internal FabioImage only if the FabioImage was + created by the class itself. The reference to the FabioImage is anyway + broken. + """ + if self.__must_be_closed: + self.close() + else: + self.__fabio_image = None + + def close(self): + """Close the object, and free up associated resources. + + The associated FabioImage is closed anyway the object was created from + a filename or from a FabioImage. + + After calling this method, attempts to use the object may fail. + """ + # It looks like there is no close on FabioImage + # self.__fabio_image.close() + self.__fabio_image = None |