diff options
Diffstat (limited to 'src/silx/io/h5py_utils.py')
-rw-r--r-- | src/silx/io/h5py_utils.py | 440 |
1 files changed, 440 insertions, 0 deletions
diff --git a/src/silx/io/h5py_utils.py b/src/silx/io/h5py_utils.py new file mode 100644 index 0000000..fb04152 --- /dev/null +++ b/src/silx/io/h5py_utils.py @@ -0,0 +1,440 @@ +# coding: utf-8 +# /*########################################################################## +# Copyright (C) 2016-2021 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +""" +This module provides utility methods on top of h5py, mainly to handle +parallel writing and reading. +""" + +__authors__ = ["W. de Nolf"] +__license__ = "MIT" +__date__ = "27/01/2020" + + +import os +import sys +import traceback +import logging +import h5py + +from .._version import calc_hexversion +from ..utils import retry as retry_mod +from silx.utils.deprecation import deprecated_warning + +_logger = logging.getLogger(__name__) + +IS_WINDOWS = sys.platform == "win32" + +H5PY_HEX_VERSION = calc_hexversion(*h5py.version.version_tuple[:3]) +HDF5_HEX_VERSION = calc_hexversion(*h5py.version.hdf5_version_tuple[:3]) + +HDF5_SWMR_VERSION = calc_hexversion(*h5py.get_config().swmr_min_hdf5_version[:3]) +HAS_SWMR = HDF5_HEX_VERSION >= HDF5_SWMR_VERSION + +HAS_TRACK_ORDER = H5PY_HEX_VERSION >= calc_hexversion(2, 9, 0) + +if h5py.version.hdf5_version_tuple[:2] == (1, 10): + HDF5_HAS_LOCKING_ARGUMENT = HDF5_HEX_VERSION >= calc_hexversion(1, 10, 7) +else: + HDF5_HAS_LOCKING_ARGUMENT = HDF5_HEX_VERSION >= calc_hexversion(1, 12, 1) +H5PY_HAS_LOCKING_ARGUMENT = H5PY_HEX_VERSION >= calc_hexversion(3, 5, 0) +HAS_LOCKING_ARGUMENT = HDF5_HAS_LOCKING_ARGUMENT & H5PY_HAS_LOCKING_ARGUMENT + +LATEST_LIBVER_IS_V108 = HDF5_HEX_VERSION < calc_hexversion(1, 10, 0) + + +def _libver_low_bound_is_v108(libver) -> bool: + if libver is None: + return True + if LATEST_LIBVER_IS_V108: + return True + if isinstance(libver, str): + low = libver + else: + low = libver[0] + if low == "latest": + return False + return low == "v108" + + +def _hdf5_file_locking(mode="r", locking=None, swmr=None, libver=None, **_): + """Concurrent access by disabling file locking is not supported + in these cases: + + * mode != "r": causes file corruption + * SWMR: does not work + * libver > v108 and file already locked: does not work + * windows and HDF5_HAS_LOCKING_ARGUMENT and file already locked: does not work + + :param str or None mode: read-only by default + :param bool or None locking: by default it is disabled for `mode='r'` + and `swmr=False` and enabled for all + other modes. + :param bool or None swmr: try both modes when `mode='r'` and `swmr=None` + :param None or str or tuple libver: + :returns bool: + """ + if locking is None: + locking = bool(mode != "r" or swmr) + if not locking: + if mode != "r": + raise ValueError("Locking is mandatory for HDF5 writing") + if swmr: + raise ValueError("Locking is mandatory for HDF5 SWMR mode") + if IS_WINDOWS and HDF5_HAS_LOCKING_ARGUMENT: + _logger.debug( + "Non-locking readers will fail when a writer has already locked the HDF5 file (this restriction applies to libhdf5 >= 1.12.1 or libhdf5 >= 1.10.7 on Windows)" + ) + if not _libver_low_bound_is_v108(libver): + _logger.debug( + "Non-locking readers will fail when a writer has already locked the HDF5 file (this restriction applies to libver >= v110)" + ) + return locking + + +def _is_h5py_exception(e): + """ + :param BaseException e: + :returns bool: + """ + for frame in traceback.walk_tb(e.__traceback__): + if frame[0].f_locals.get("__package__", None) == "h5py": + return True + return False + + +def _retry_h5py_error(e): + """ + :param BaseException e: + :returns bool: + """ + if _is_h5py_exception(e): + if isinstance(e, (OSError, RuntimeError)): + return True + elif isinstance(e, KeyError): + # For example this needs to be retried: + # KeyError: 'Unable to open object (bad object header version number)' + return "Unable to open object" in str(e) + elif isinstance(e, retry_mod.RetryError): + return True + return False + + +def retry(**kw): + r"""Decorator for a method that needs to be executed until it not longer + fails on HDF5 IO. Mainly used for reading an HDF5 file that is being + written. + + :param \**kw: see `silx.utils.retry` + """ + kw.setdefault("retry_on_error", _retry_h5py_error) + return retry_mod.retry(**kw) + + +def retry_contextmanager(**kw): + r"""Decorator to make a context manager from a method that needs to be + entered until it not longer fails on HDF5 IO. Mainly used for reading + an HDF5 file that is being written. + + :param \**kw: see `silx.utils.retry_contextmanager` + """ + kw.setdefault("retry_on_error", _retry_h5py_error) + return retry_mod.retry_contextmanager(**kw) + + +def retry_in_subprocess(**kw): + r"""Same as `retry` but it also retries segmentation faults. + + On Window you cannot use this decorator with the "@" syntax: + + .. code-block:: python + + def _method(*args, **kw): + ... + + method = retry_in_subprocess()(_method) + + :param \**kw: see `silx.utils.retry_in_subprocess` + """ + kw.setdefault("retry_on_error", _retry_h5py_error) + return retry_mod.retry_in_subprocess(**kw) + + +def group_has_end_time(h5item): + """Returns True when the HDF5 item is a Group with an "end_time" + dataset. A reader can use this as an indication that the Group + has been fully written (at least if the writer supports this). + + :param Union[h5py.Group,h5py.Dataset] h5item: + :returns bool: + """ + if isinstance(h5item, h5py.Group): + return "end_time" in h5item + else: + return False + + +@retry_contextmanager() +def open_item(filename, name, retry_invalid=False, validate=None, **open_options): + r"""Yield an HDF5 dataset or group (retry until it can be instantiated). + + :param str filename: + :param bool retry_invalid: retry when item is missing or not valid + :param callable or None validate: + :param \**open_options: see `File.__init__` + :yields Dataset, Group or None: + """ + with File(filename, **open_options) as h5file: + try: + item = h5file[name] + except KeyError as e: + if "doesn't exist" in str(e): + if retry_invalid: + raise retry_mod.RetryError + else: + item = None + else: + raise + if callable(validate) and item is not None: + if not validate(item): + if retry_invalid: + raise retry_mod.RetryError + else: + item = None + yield item + + +def _top_level_names(filename, include_only=group_has_end_time, **open_options): + r"""Return all valid top-level HDF5 names. + + :param str filename: + :param callable or None include_only: + :param \**open_options: see `File.__init__` + :returns list(str): + """ + with File(filename, **open_options) as h5file: + try: + if callable(include_only): + return [name for name in h5file["/"] if include_only(h5file[name])] + else: + return list(h5file["/"]) + except KeyError: + raise retry_mod.RetryError + + +top_level_names = retry()(_top_level_names) +safe_top_level_names = retry_in_subprocess()(_top_level_names) + + +class Hdf5FileLockingManager: + """Manage HDF5 file locking in the current process through the HDF5_USE_FILE_LOCKING + environment variable. + """ + + def __init__(self) -> None: + self._hdf5_file_locking = None + self._nfiles_open = 0 + + def opened(self): + self._add_nopen(1) + + def closed(self): + self._add_nopen(-1) + if not self._nfiles_open: + self._restore_locking_env() + + def set_locking(self, locking): + if self._nfiles_open: + self._check_locking_env(locking) + else: + self._set_locking_env(locking) + + def _add_nopen(self, v): + self._nfiles_open = max(self._nfiles_open + v, 0) + + def _set_locking_env(self, enable): + self._backup_locking_env() + if enable: + os.environ["HDF5_USE_FILE_LOCKING"] = "TRUE" + elif enable is None: + try: + del os.environ["HDF5_USE_FILE_LOCKING"] + except KeyError: + pass + else: + os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" + + def _get_locking_env(self): + v = os.environ.get("HDF5_USE_FILE_LOCKING") + if v == "TRUE": + return True + elif v is None: + return None + else: + return False + + def _check_locking_env(self, enable): + if enable != self._get_locking_env(): + if enable: + raise RuntimeError( + "Close all HDF5 files before enabling HDF5 file locking" + ) + else: + raise RuntimeError( + "Close all HDF5 files before disabling HDF5 file locking" + ) + + def _backup_locking_env(self): + v = os.environ.get("HDF5_USE_FILE_LOCKING") + if v is None: + self._hdf5_file_locking = None + else: + self._hdf5_file_locking = v == "TRUE" + + def _restore_locking_env(self): + self._set_locking_env(self._hdf5_file_locking) + self._hdf5_file_locking = None + + +class File(h5py.File): + """Takes care of HDF5 file locking and SWMR mode without the need + to handle those explicitely. + + When file locking is managed through the HDF5_USE_FILE_LOCKING environment + variable, you cannot open different files simultaneously with different modes. + """ + + _SWMR_LIBVER = "latest" + + if HAS_LOCKING_ARGUMENT: + _LOCKING_MGR = None + else: + _LOCKING_MGR = Hdf5FileLockingManager() + + def __init__( + self, + filename, + mode=None, + locking=None, + enable_file_locking=None, + swmr=None, + libver=None, + **kwargs, + ): + r"""The arguments `locking` and `swmr` should not be + specified explicitly for normal use cases. + + :param str filename: + :param str or None mode: read-only by default + :param bool or None locking: by default it is disabled for `mode='r'` + and `swmr=False` and enabled for all + other modes. + :param bool or None enable_file_locking: deprecated + :param bool or None swmr: try both modes when `mode='r'` and `swmr=None` + :param None or str or tuple libver: + :param \**kwargs: see `h5py.File.__init__` + """ + # File locking behavior has changed in recent versions of libhdf5 + if HDF5_HAS_LOCKING_ARGUMENT != H5PY_HAS_LOCKING_ARGUMENT: + _logger.critical( + "The version of libhdf5 ({}) used by h5py ({}) is not supported: " + "Do not expect file locking to work.".format( + h5py.version.hdf5_version, h5py.version.version + ) + ) + + if mode is None: + mode = "r" + elif mode not in ("r", "w", "w-", "x", "a", "r+"): + raise ValueError("invalid mode {}".format(mode)) + if not HAS_SWMR: + swmr = False + if swmr and libver is None: + libver = self._SWMR_LIBVER + + if enable_file_locking is not None: + deprecated_warning( + type_="argument", + name="enable_file_locking", + replacement="locking", + since_version="1.0", + ) + if locking is None: + locking = enable_file_locking + locking = _hdf5_file_locking( + mode=mode, locking=locking, swmr=swmr, libver=libver + ) + if self._LOCKING_MGR is None: + kwargs.setdefault("locking", locking) + else: + self._LOCKING_MGR.set_locking(locking) + + if HAS_TRACK_ORDER: + kwargs.setdefault("track_order", True) + try: + super().__init__(filename, mode=mode, swmr=swmr, libver=libver, **kwargs) + except OSError as e: + # wlock wSWMR rlock rSWMR OSError: Unable to open file (...) + # 1 TRUE FALSE FALSE FALSE - + # 2 TRUE FALSE FALSE TRUE - + # 3 TRUE FALSE TRUE FALSE unable to lock file, errno = 11, error message = 'Resource temporarily unavailable' + # 4 TRUE FALSE TRUE TRUE unable to lock file, errno = 11, error message = 'Resource temporarily unavailable' + # 5 TRUE TRUE FALSE FALSE file is already open for write (may use <h5clear file> to clear file consistency flags) + # 6 TRUE TRUE FALSE TRUE - + # 7 TRUE TRUE TRUE FALSE file is already open for write (may use <h5clear file> to clear file consistency flags) + # 8 TRUE TRUE TRUE TRUE - + if ( + mode == "r" + and swmr is None + and "file is already open for write" in str(e) + ): + # Try reading in SWMR mode (situation 5 and 7) + swmr = True + if libver is None: + libver = self._SWMR_LIBVER + super().__init__( + filename, mode=mode, swmr=swmr, libver=libver, **kwargs + ) + else: + raise + else: + self._file_open_callback() + try: + if mode != "r" and swmr: + # Try setting writer in SWMR mode + self.swmr_mode = True + except Exception: + self.close() + raise + + def close(self): + super().close() + self._file_close_callback() + + def _file_open_callback(self): + if self._LOCKING_MGR is not None: + self._LOCKING_MGR.opened() + + def _file_close_callback(self): + if self._LOCKING_MGR is not None: + self._LOCKING_MGR.closed() |