summaryrefslogtreecommitdiff
path: root/src/silx/io/fabioh5.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/silx/io/fabioh5.py')
-rwxr-xr-xsrc/silx/io/fabioh5.py132
1 files changed, 68 insertions, 64 deletions
diff --git a/src/silx/io/fabioh5.py b/src/silx/io/fabioh5.py
index c5ef964..89e838b 100755
--- a/src/silx/io/fabioh5.py
+++ b/src/silx/io/fabioh5.py
@@ -1,5 +1,5 @@
# /*##########################################################################
-# Copyright (C) 2016-2021 European Synchrotron Radiation Facility
+# Copyright (C) 2016-2023 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
@@ -31,7 +31,6 @@
"""
-import collections
import datetime
import logging
import numbers
@@ -77,19 +76,6 @@ def supported_extensions():
return _fabio_extensions
-class _FileSeries(fabio.file_series.file_series):
- """
- .. note:: Overwrite a function to fix an issue in fabio.
- """
- def jump(self, num):
- """
- Goto a position in sequence
- """
- assert num < len(self) and num >= 0, "num out of range"
- self._current = num
- return self[self._current]
-
-
class FrameData(commonh5.LazyLoadableDataset):
"""Expose a cube of image from a Fabio file using `FabioReader` as
cache."""
@@ -108,8 +94,7 @@ class FrameData(commonh5.LazyLoadableDataset):
return self.__fabio_reader.get_data()
def _update_cache(self):
- if isinstance(self.__fabio_reader.fabio_file(),
- fabio.file_series.file_series):
+ if isinstance(self.__fabio_reader.fabio_file(), fabio.file_series.file_series):
# Reading all the files is taking too much time
# Reach the information from the only first frame
first_image = self.__fabio_reader.fabio_file().first_image()
@@ -140,9 +125,9 @@ class FrameData(commonh5.LazyLoadableDataset):
def __getitem__(self, item):
# optimization for fetching a single frame if data not already loaded
if not self._is_initialized:
- if isinstance(item, int) and \
- isinstance(self.__fabio_reader.fabio_file(),
- fabio.file_series.file_series):
+ if isinstance(item, int) and isinstance(
+ self.__fabio_reader.fabio_file(), fabio.file_series.file_series
+ ):
if item < 0:
# negative indexing
item += len(self)
@@ -158,8 +143,7 @@ class RawHeaderData(commonh5.LazyLoadableDataset):
self.__fabio_reader = fabio_reader
def _create_data(self):
- """Initialize hold data by merging all headers of each frames.
- """
+ """Initialize hold data by merging all headers of each frames."""
headers = []
types = set([])
for fabio_frame in self.__fabio_reader.iter_frames():
@@ -199,8 +183,7 @@ class RawHeaderData(commonh5.LazyLoadableDataset):
class MetadataGroup(commonh5.LazyLoadableGroup):
- """Abstract class for groups containing a reference to a fabio image.
- """
+ """Abstract class for groups containing a reference to a fabio image."""
def __init__(self, name, metadata_reader, kind, parent=None, attrs=None):
commonh5.LazyLoadableGroup.__init__(self, name, parent, attrs)
@@ -220,8 +203,7 @@ class MetadataGroup(commonh5.LazyLoadableGroup):
class DetectorGroup(commonh5.LazyLoadableGroup):
- """Define the detector group (sub group of instrument) using Fabio data.
- """
+ """Define the detector group (sub group of instrument) using Fabio data."""
def __init__(self, name, fabio_reader, parent=None, attrs=None):
if attrs is None:
@@ -241,8 +223,7 @@ class DetectorGroup(commonh5.LazyLoadableGroup):
class ImageGroup(commonh5.LazyLoadableGroup):
- """Define the image group (sub group of measurement) using Fabio data.
- """
+ """Define the image group (sub group of measurement) using Fabio data."""
def __init__(self, name, fabio_reader, parent=None, attrs=None):
commonh5.LazyLoadableGroup.__init__(self, name, parent, attrs)
@@ -281,8 +262,7 @@ class NxDataPreviewGroup(commonh5.LazyLoadableGroup):
class SampleGroup(commonh5.LazyLoadableGroup):
- """Define the image group (sub group of measurement) using Fabio data.
- """
+ """Define the image group (sub group of measurement) using Fabio data."""
def __init__(self, name, fabio_reader, parent=None):
attrs = {"NXclass": "NXsample"}
@@ -309,8 +289,7 @@ class SampleGroup(commonh5.LazyLoadableGroup):
class MeasurementGroup(commonh5.LazyLoadableGroup):
- """Define the measurement group for fabio file.
- """
+ """Define the measurement group for fabio file."""
def __init__(self, name, fabio_reader, parent=None, attrs=None):
commonh5.LazyLoadableGroup.__init__(self, name, parent, attrs)
@@ -369,9 +348,13 @@ class FabioReader(object):
def __load(self, file_name=None, fabio_image=None, file_series=None):
if file_name is not None and fabio_image:
- raise TypeError("Parameters file_name and fabio_image are mutually exclusive.")
+ raise TypeError(
+ "Parameters file_name and fabio_image are mutually exclusive."
+ )
if file_name is not None and fabio_image:
- raise TypeError("Parameters fabio_image and file_series are mutually exclusive.")
+ raise TypeError(
+ "Parameters fabio_image and file_series are mutually exclusive."
+ )
self.__must_be_closed = False
@@ -382,14 +365,18 @@ class FabioReader(object):
if isinstance(fabio_image, fabio.fabioimage.FabioImage):
self.__fabio_file = fabio_image
else:
- raise TypeError("FabioImage expected but %s found.", fabio_image.__class__)
+ raise TypeError(
+ "FabioImage expected but %s found.", fabio_image.__class__
+ )
elif file_series is not None:
if isinstance(file_series, list):
- self.__fabio_file = _FileSeries(file_series)
+ self.__fabio_file = fabio.file_series.file_series(file_series)
elif isinstance(file_series, fabio.file_series.file_series):
self.__fabio_file = file_series
else:
- raise TypeError("file_series or list expected but %s found.", file_series.__class__)
+ raise TypeError(
+ "file_series or list expected but %s found.", file_series.__class__
+ )
def close(self):
"""Close the object, and free up associated resources.
@@ -401,10 +388,7 @@ class FabioReader(object):
may fail.
"""
if self.__must_be_closed:
- # Make sure the API of fabio provide it a 'close' method
- # TODO the test can be removed if fabio version >= 0.8
- if hasattr(self.__fabio_file, "close"):
- self.__fabio_file.close()
+ self.__fabio_file.close()
self.__fabio_file = None
def fabio_file(self):
@@ -428,7 +412,7 @@ class FabioReader(object):
for file_number in range(len(self.__fabio_file)):
with self.__fabio_file.jump_image(file_number) as fabio_image:
# return the first frame only
- assert(fabio_image.nframes == 1)
+ assert fabio_image.nframes == 1
yield fabio_image
elif isinstance(self.__fabio_file, fabio.fabioimage.FabioImage):
for frame_count in range(self.__fabio_file.nframes):
@@ -514,7 +498,9 @@ class FabioReader(object):
if not isinstance(value, numpy.ndarray):
if kind in [self.COUNTER, self.POSITIONER]:
# Force normalization for counters and positioners
- old = self._set_vector_normalization(at_least_32bits=True, signed_type=True)
+ old = self._set_vector_normalization(
+ at_least_32bits=True, signed_type=True
+ )
else:
old = None
value = self._convert_metadata_vector(value)
@@ -590,7 +576,7 @@ class FabioReader(object):
return previous
def _normalize_vector_type(self, dtype):
- """Normalize the """
+ """Normalize the"""
if self.__at_least_32bits:
if numpy.issubdtype(dtype, numpy.signedinteger):
dtype = numpy.result_type(dtype, numpy.uint32)
@@ -602,7 +588,7 @@ class FabioReader(object):
dtype = numpy.result_type(dtype, numpy.complex64)
if self.__signed_type:
if numpy.issubdtype(dtype, numpy.unsignedinteger):
- signed = numpy.dtype("%s%i" % ('i', dtype.itemsize))
+ signed = numpy.dtype("%s%i" % ("i", dtype.itemsize))
dtype = numpy.result_type(dtype, signed)
return dtype
@@ -652,7 +638,7 @@ class FabioReader(object):
if result_type.kind == "S":
none_value = b""
elif result_type.kind == "U":
- none_value = u""
+ none_value = ""
elif result_type.kind == "f":
none_value = numpy.float64("NaN")
elif result_type.kind == "i":
@@ -692,7 +678,7 @@ class FabioReader(object):
# convert to a numpy associative array
key_dtype = numpy.min_scalar_type(list(value.keys()))
value_dtype = numpy.min_scalar_type(list(value.values()))
- associative_type = [('key', key_dtype), ('value', value_dtype)]
+ associative_type = [("key", key_dtype), ("value", value_dtype)]
assert key_dtype.kind != "O" and value_dtype.kind != "O"
return numpy.array(list(value.items()), dtype=associative_type)
if isinstance(value, numbers.Number):
@@ -702,7 +688,7 @@ class FabioReader(object):
if isinstance(value, bytes):
try:
- value = value.decode('utf-8')
+ value = value.decode("utf-8")
except UnicodeDecodeError:
return numpy.void(value)
@@ -818,7 +804,7 @@ class EdfFabioReader(FabioReader):
pos_values = header.get(pos_values_key, "")
pos_values = pos_values.split()
- result = collections.OrderedDict()
+ result = {}
nbitems = max(len(mnemonic_values), len(pos_values))
for i in range(nbitems):
if i < len(mnemonic_values):
@@ -874,7 +860,9 @@ class EdfFabioReader(FabioReader):
if len(ub_data) > 9:
_logger.warning("UB_mne and UB_pos contains more than expected keys.")
if len(s_data) > 6:
- _logger.warning("sample_mne and sample_pos contains more than expected keys.")
+ _logger.warning(
+ "sample_mne and sample_pos contains more than expected keys."
+ )
data = numpy.array([s_data["U0"], s_data["U1"], s_data["U2"]], dtype=float)
unit_cell_abc = data
@@ -882,10 +870,16 @@ class EdfFabioReader(FabioReader):
data = numpy.array([s_data["U3"], s_data["U4"], s_data["U5"]], dtype=float)
unit_cell_alphabetagamma = data
- ub_matrix = numpy.array([[
- [ub_data["UB0"], ub_data["UB1"], ub_data["UB2"]],
- [ub_data["UB3"], ub_data["UB4"], ub_data["UB5"]],
- [ub_data["UB6"], ub_data["UB7"], ub_data["UB8"]]]], dtype=float)
+ ub_matrix = numpy.array(
+ [
+ [
+ [ub_data["UB0"], ub_data["UB1"], ub_data["UB2"]],
+ [ub_data["UB3"], ub_data["UB4"], ub_data["UB5"]],
+ [ub_data["UB6"], ub_data["UB7"], ub_data["UB8"]],
+ ]
+ ],
+ dtype=float,
+ )
self.__unit_cell_abc = unit_cell_abc
self.__unit_cell_alphabetagamma = unit_cell_alphabetagamma
@@ -939,8 +933,7 @@ class EdfFabioReader(FabioReader):
class File(commonh5.File):
- """Class which handle a fabio image as a mimick of a h5py.File.
- """
+ """Class which handle a fabio image as a mimick of a h5py.File."""
def __init__(self, file_name=None, fabio_image=None, file_series=None):
"""
@@ -953,15 +946,19 @@ class File(commonh5.File):
list of file name or a :class:`fabio.file_series.file_series`
instance
"""
- self.__fabio_reader = self.create_fabio_reader(file_name, fabio_image, file_series)
+ self.__fabio_reader = self.create_fabio_reader(
+ file_name, fabio_image, file_series
+ )
if fabio_image is not None:
file_name = fabio_image.filename
scan = self.create_scan_group(self.__fabio_reader)
- attrs = {"NX_class": "NXroot",
- "file_time": datetime.datetime.now().isoformat(),
- "creator": "silx %s" % silx_version,
- "default": scan.basename}
+ attrs = {
+ "NX_class": "NXroot",
+ "file_time": datetime.datetime.now().isoformat(),
+ "creator": "silx %s" % silx_version,
+ "default": scan.basename,
+ }
if file_name is not None:
attrs["file_name"] = file_name
commonh5.File.__init__(self, name=file_name, attrs=attrs)
@@ -981,9 +978,16 @@ class File(commonh5.File):
}
scan = commonh5.Group("scan_0", attrs=scan_attrs)
instrument = commonh5.Group("instrument", attrs={"NX_class": "NXinstrument"})
- measurement = MeasurementGroup("measurement", fabio_reader, attrs={"NX_class": "NXcollection"})
+ measurement = MeasurementGroup(
+ "measurement", fabio_reader, attrs={"NX_class": "NXcollection"}
+ )
file_ = commonh5.Group("file", attrs={"NX_class": "NXcollection"})
- positioners = MetadataGroup("positioners", fabio_reader, FabioReader.POSITIONER, attrs={"NX_class": "NXpositioner"})
+ positioners = MetadataGroup(
+ "positioners",
+ fabio_reader,
+ FabioReader.POSITIONER,
+ attrs={"NX_class": "NXpositioner"},
+ )
raw_header = RawHeaderData("scan_header", fabio_reader, self)
detector = DetectorGroup("detector_0", fabio_reader)
@@ -1031,7 +1035,7 @@ class File(commonh5.File):
elif first_image is not None:
use_edf_reader = isinstance(first_image, fabio.edfimage.EdfImage)
else:
- assert(False)
+ assert False
if use_edf_reader:
reader = EdfFabioReader(file_name, fabio_image, file_series)