summaryrefslogtreecommitdiff
path: root/silx/io/h5py_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/h5py_utils.py')
-rw-r--r--silx/io/h5py_utils.py317
1 files changed, 317 insertions, 0 deletions
diff --git a/silx/io/h5py_utils.py b/silx/io/h5py_utils.py
new file mode 100644
index 0000000..cbdb44a
--- /dev/null
+++ b/silx/io/h5py_utils.py
@@ -0,0 +1,317 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2021 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""
+This module provides utility methods on top of h5py, mainly to handle
+parallel writing and reading.
+"""
+
+__authors__ = ["W. de Nolf"]
+__license__ = "MIT"
+__date__ = "27/01/2020"
+
+
+import os
+import traceback
+import h5py
+
+from .._version import calc_hexversion
+from ..utils import retry as retry_mod
+
+H5PY_HEX_VERSION = calc_hexversion(*h5py.version.version_tuple[:3])
+HDF5_HEX_VERSION = calc_hexversion(*h5py.version.hdf5_version_tuple[:3])
+
+HDF5_SWMR_VERSION = calc_hexversion(*h5py.get_config().swmr_min_hdf5_version[:3])
+HDF5_TRACK_ORDER_VERSION = calc_hexversion(2, 9, 0)
+
+HAS_SWMR = HDF5_HEX_VERSION >= HDF5_SWMR_VERSION
+HAS_TRACK_ORDER = H5PY_HEX_VERSION >= HDF5_TRACK_ORDER_VERSION
+
+
+def _is_h5py_exception(e):
+ for frame in traceback.walk_tb(e.__traceback__):
+ if frame[0].f_locals.get("__package__", None) == "h5py":
+ return True
+ return False
+
+
+def _retry_h5py_error(e):
+ """
+ :param BaseException e:
+ :returns bool:
+ """
+ if _is_h5py_exception(e):
+ if isinstance(e, (OSError, RuntimeError)):
+ return True
+ elif isinstance(e, KeyError):
+ # For example this needs to be retried:
+ # KeyError: 'Unable to open object (bad object header version number)'
+ return "Unable to open object" in str(e)
+ elif isinstance(e, retry_mod.RetryError):
+ return True
+ return False
+
+
+def retry(**kw):
+ """Decorator for a method that needs to be executed until it not longer
+ fails on HDF5 IO. Mainly used for reading an HDF5 file that is being
+ written.
+
+ :param \**kw: see `silx.utils.retry`
+ """
+ kw.setdefault("retry_on_error", _retry_h5py_error)
+ return retry_mod.retry(**kw)
+
+
+def retry_contextmanager(**kw):
+ """Decorator to make a context manager from a method that needs to be
+ entered until it not longer fails on HDF5 IO. Mainly used for reading
+ an HDF5 file that is being written.
+
+ :param \**kw: see `silx.utils.retry_contextmanager`
+ """
+ kw.setdefault("retry_on_error", _retry_h5py_error)
+ return retry_mod.retry_contextmanager(**kw)
+
+
+def retry_in_subprocess(**kw):
+ """Same as `retry` but it also retries segmentation faults.
+
+ On Window you cannot use this decorator with the "@" syntax:
+
+ .. code-block:: python
+
+ def _method(*args, **kw):
+ ...
+
+ method = retry_in_subprocess()(_method)
+
+ :param \**kw: see `silx.utils.retry_in_subprocess`
+ """
+ kw.setdefault("retry_on_error", _retry_h5py_error)
+ return retry_mod.retry_in_subprocess(**kw)
+
+
+def group_has_end_time(h5item):
+ """Returns True when the HDF5 item is a Group with an "end_time"
+ dataset. A reader can use this as an indication that the Group
+ has been fully written (at least if the writer supports this).
+
+ :param Union[h5py.Group,h5py.Dataset] h5item:
+ :returns bool:
+ """
+ if isinstance(h5item, h5py.Group):
+ return "end_time" in h5item
+ else:
+ return False
+
+
+@retry_contextmanager()
+def open_item(filename, name, retry_invalid=False, validate=None):
+ """Yield an HDF5 dataset or group (retry until it can be instantiated).
+
+ :param str filename:
+ :param bool retry_invalid: retry when item is missing or not valid
+ :param callable or None validate:
+ :yields Dataset, Group or None:
+ """
+ with File(filename) as h5file:
+ try:
+ item = h5file[name]
+ except KeyError as e:
+ if "doesn't exist" in str(e):
+ if retry_invalid:
+ raise retry_mod.RetryError
+ else:
+ item = None
+ else:
+ raise
+ if callable(validate) and item is not None:
+ if not validate(item):
+ if retry_invalid:
+ raise retry_mod.RetryError
+ else:
+ item = None
+ yield item
+
+
+def _top_level_names(filename, include_only=group_has_end_time):
+ """Return all valid top-level HDF5 names.
+
+ :param str filename:
+ :param callable or None include_only:
+ :returns list(str):
+ """
+ with File(filename) as h5file:
+ try:
+ if callable(include_only):
+ return [name for name in h5file["/"] if include_only(h5file[name])]
+ else:
+ return list(h5file["/"])
+ except KeyError:
+ raise retry_mod.RetryError
+
+
+top_level_names = retry()(_top_level_names)
+safe_top_level_names = retry_in_subprocess()(_top_level_names)
+
+
+class File(h5py.File):
+ """Takes care of HDF5 file locking and SWMR mode without the need
+ to handle those explicitely.
+
+ When using this class, you cannot open different files simultatiously
+ with different modes because the locking flag is an environment variable.
+ """
+
+ _HDF5_FILE_LOCKING = None
+ _NOPEN = 0
+ _SWMR_LIBVER = "latest"
+
+ def __init__(
+ self,
+ filename,
+ mode=None,
+ enable_file_locking=None,
+ swmr=None,
+ libver=None,
+ **kwargs
+ ):
+ """The arguments `enable_file_locking` and `swmr` should not be
+ specified explicitly for normal use cases.
+
+ :param str filename:
+ :param str or None mode: read-only by default
+ :param bool or None enable_file_locking: by default it is disabled for `mode='r'`
+ and `swmr=False` and enabled for all
+ other modes.
+ :param bool or None swmr: try both modes when `mode='r'` and `swmr=None`
+ :param **kwargs: see `h5py.File.__init__`
+ """
+ if mode is None:
+ mode = "r"
+ elif mode not in ("r", "w", "w-", "x", "a", "r+"):
+ raise ValueError("invalid mode {}".format(mode))
+ if not HAS_SWMR:
+ swmr = False
+
+ if enable_file_locking is None:
+ enable_file_locking = bool(mode != "r" or swmr)
+ if self._NOPEN:
+ self._check_locking_env(enable_file_locking)
+ else:
+ self._set_locking_env(enable_file_locking)
+
+ if swmr and libver is None:
+ libver = self._SWMR_LIBVER
+
+ if HAS_TRACK_ORDER:
+ kwargs.setdefault("track_order", True)
+ try:
+ super().__init__(filename, mode=mode, swmr=swmr, libver=libver, **kwargs)
+ except OSError as e:
+ # wlock wSWMR rlock rSWMR OSError: Unable to open file (...)
+ # 1 TRUE FALSE FALSE FALSE -
+ # 2 TRUE FALSE FALSE TRUE -
+ # 3 TRUE FALSE TRUE FALSE unable to lock file, errno = 11, error message = 'Resource temporarily unavailable'
+ # 4 TRUE FALSE TRUE TRUE unable to lock file, errno = 11, error message = 'Resource temporarily unavailable'
+ # 5 TRUE TRUE FALSE FALSE file is already open for write (may use <h5clear file> to clear file consistency flags)
+ # 6 TRUE TRUE FALSE TRUE -
+ # 7 TRUE TRUE TRUE FALSE file is already open for write (may use <h5clear file> to clear file consistency flags)
+ # 8 TRUE TRUE TRUE TRUE -
+ if (
+ mode == "r"
+ and swmr is None
+ and "file is already open for write" in str(e)
+ ):
+ # Try reading in SWMR mode (situation 5 and 7)
+ swmr = True
+ if libver is None:
+ libver = self._SWMR_LIBVER
+ super().__init__(
+ filename, mode=mode, swmr=swmr, libver=libver, **kwargs
+ )
+ else:
+ raise
+ else:
+ self._add_nopen(1)
+ try:
+ if mode != "r" and swmr:
+ # Try setting writer in SWMR mode
+ self.swmr_mode = True
+ except Exception:
+ self.close()
+ raise
+
+ @classmethod
+ def _add_nopen(cls, v):
+ cls._NOPEN = max(cls._NOPEN + v, 0)
+
+ def close(self):
+ super().close()
+ self._add_nopen(-1)
+ if not self._NOPEN:
+ self._restore_locking_env()
+
+ def _set_locking_env(self, enable):
+ self._backup_locking_env()
+ if enable:
+ os.environ["HDF5_USE_FILE_LOCKING"] = "TRUE"
+ elif enable is None:
+ try:
+ del os.environ["HDF5_USE_FILE_LOCKING"]
+ except KeyError:
+ pass
+ else:
+ os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
+
+ def _get_locking_env(self):
+ v = os.environ.get("HDF5_USE_FILE_LOCKING")
+ if v == "TRUE":
+ return True
+ elif v is None:
+ return None
+ else:
+ return False
+
+ def _check_locking_env(self, enable):
+ if enable != self._get_locking_env():
+ if enable:
+ raise RuntimeError(
+ "Close all HDF5 files before enabling HDF5 file locking"
+ )
+ else:
+ raise RuntimeError(
+ "Close all HDF5 files before disabling HDF5 file locking"
+ )
+
+ def _backup_locking_env(self):
+ v = os.environ.get("HDF5_USE_FILE_LOCKING")
+ if v is None:
+ self._HDF5_FILE_LOCKING = None
+ else:
+ self._HDF5_FILE_LOCKING = v == "TRUE"
+
+ def _restore_locking_env(self):
+ self._set_locking_env(self._HDF5_FILE_LOCKING)
+ self._HDF5_FILE_LOCKING = None