summaryrefslogtreecommitdiff
path: root/src/silx/io/test/test_h5py_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/silx/io/test/test_h5py_utils.py')
-rw-r--r--src/silx/io/test/test_h5py_utils.py451
1 files changed, 451 insertions, 0 deletions
diff --git a/src/silx/io/test/test_h5py_utils.py b/src/silx/io/test/test_h5py_utils.py
new file mode 100644
index 0000000..ea46eca
--- /dev/null
+++ b/src/silx/io/test/test_h5py_utils.py
@@ -0,0 +1,451 @@
+# coding: utf-8
+# /*##########################################################################
+# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ############################################################################*/
+"""Tests for h5py utilities"""
+
+__authors__ = ["W. de Nolf"]
+__license__ = "MIT"
+__date__ = "27/01/2020"
+
+
+import unittest
+import os
+import sys
+import time
+import shutil
+import logging
+import tempfile
+import multiprocessing
+from contextlib import contextmanager
+
+from .. import h5py_utils
+from ...utils.retry import RetryError, RetryTimeoutError
+
+IS_WINDOWS = sys.platform == "win32"
+logger = logging.getLogger()
+
+
+def _subprocess_context_main(queue, contextmgr, *args, **kw):
+ try:
+ with contextmgr(*args, **kw):
+ queue.put(None)
+ queue.get()
+ except Exception:
+ queue.put(None)
+ raise
+
+
+@contextmanager
+def _subprocess_context(contextmgr, *args, **kw):
+ print("\nSTART", os.getpid())
+ timeout = kw.pop("timeout", 10)
+ queue = multiprocessing.Queue(maxsize=1)
+ p = multiprocessing.Process(
+ target=_subprocess_context_main, args=(queue, contextmgr) + args, kwargs=kw
+ )
+ p.start()
+ try:
+ queue.get(timeout=timeout)
+ yield
+ finally:
+ queue.put(None)
+ p.join(timeout)
+ print(" EXIT", os.getpid())
+
+
+@contextmanager
+def _open_context(filename, **kw):
+ try:
+ print(os.getpid(), "OPEN", filename, kw)
+ with h5py_utils.File(filename, **kw) as f:
+ if kw.get("mode") == "w":
+ f["check"] = True
+ f.flush()
+ yield f
+ except Exception:
+ print(" ", os.getpid(), "FAILED", filename, kw)
+ raise
+ else:
+ print(" ", os.getpid(), "CLOSED", filename, kw)
+
+
+def _cause_segfault():
+ import ctypes
+
+ i = ctypes.c_char(b"a")
+ j = ctypes.pointer(i)
+ c = 0
+ while True:
+ j[c] = b"a"
+ c += 1
+
+
+def _top_level_names_test(txtfilename, *args, **kw):
+ sys.stderr = open(os.devnull, "w")
+
+ with open(txtfilename, mode="r") as f:
+ failcounter = int(f.readline().strip())
+
+ ncausefailure = kw.pop("ncausefailure")
+ faildelay = kw.pop("faildelay")
+ if failcounter < ncausefailure:
+ time.sleep(faildelay)
+ failcounter += 1
+ with open(txtfilename, mode="w") as f:
+ f.write(str(failcounter))
+ if failcounter % 2:
+ raise RetryError
+ else:
+ _cause_segfault()
+ return h5py_utils._top_level_names(*args, **kw)
+
+
+top_level_names_test = h5py_utils.retry_in_subprocess()(_top_level_names_test)
+
+
+def subtests(test):
+ def wrapper(self):
+ for subtest_options in self._subtests():
+ print("\n====SUB TEST===\n")
+ print(f"sub test options: {subtest_options}")
+ with self.subTest(str(subtest_options)):
+ test(self)
+
+ return wrapper
+
+
+class TestH5pyUtils(unittest.TestCase):
+ def setUp(self):
+ self.test_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.test_dir)
+
+ def _subtests(self):
+ self._subtest_options = {"mode": "w"}
+ self.filename_generator = self._filenames()
+ yield self._subtest_options
+ self._subtest_options = {"mode": "w", "libver": "latest"}
+ self.filename_generator = self._filenames()
+ yield
+
+ def _filenames(self):
+ i = 1
+ while True:
+ filename = os.path.join(self.test_dir, "file{}.h5".format(i))
+ with self._open_context(filename):
+ pass
+ yield filename
+ i += 1
+
+ def _new_filename(self):
+ return next(self.filename_generator)
+
+ @contextmanager
+ def _open_context(self, filename, **kwargs):
+ kw = dict(self._subtest_options)
+ kw.update(kwargs)
+ with _open_context(filename, **kw) as f:
+ yield f
+
+ @contextmanager
+ def _open_context_subprocess(self, filename, **kwargs):
+ kw = dict(self._subtest_options)
+ kw.update(kwargs)
+ with _subprocess_context(_open_context, filename, **kw):
+ yield
+
+ def _assert_hdf5_data(self, f):
+ self.assertTrue(f["check"][()])
+
+ def _validate_hdf5_data(self, filename, swmr=False):
+ with self._open_context(filename, mode="r") as f:
+ self.assertEqual(f.swmr_mode, swmr)
+ self._assert_hdf5_data(f)
+
+ @subtests
+ def test_modes_single_process(self):
+ """Test concurrent access to the different files from the same process"""
+ # When using HDF5_USE_FILE_LOCKING, open files with and without
+ # locking should raise an exception. HDF5_USE_FILE_LOCKING should
+ # be reset when all files are closed.
+
+ orig = os.environ.get("HDF5_USE_FILE_LOCKING")
+ filename1 = self._new_filename()
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+ filename2 = self._new_filename()
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+
+ with self._open_context(filename1, mode="r"):
+ locking1 = False
+ for mode in ["r", "w", "a"]:
+ locking2 = mode != "r"
+ raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT
+ raise_condition &= locking1 != locking2
+ with self.assertRaisesIf(raise_condition, RuntimeError):
+ with self._open_context(filename2, mode=mode):
+ pass
+ self._validate_hdf5_data(filename1)
+ self._validate_hdf5_data(filename2)
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+
+ with self._open_context(filename1, mode="a"):
+ locking1 = True
+ for mode in ["r", "w", "a"]:
+ locking2 = mode != "r"
+ raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT
+ raise_condition &= locking1 != locking2
+ with self.assertRaisesIf(raise_condition, RuntimeError):
+ with self._open_context(filename2, mode=mode):
+ pass
+ self._validate_hdf5_data(filename1)
+ self._validate_hdf5_data(filename2)
+ self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
+
+ @property
+ def _libver_low_bound_is_v108(self):
+ libver = self._subtest_options.get("libver")
+ return h5py_utils._libver_low_bound_is_v108(libver)
+
+ @property
+ def _nonlocking_reader_before_writer(self):
+ """A non-locking reader must open the file before it is locked by a writer"""
+ if IS_WINDOWS and h5py_utils.HDF5_HAS_LOCKING_ARGUMENT:
+ return True
+ if not self._libver_low_bound_is_v108:
+ return True
+ return False
+
+ @contextmanager
+ def assertRaisesIf(self, condition, *args, **kw):
+ if condition:
+ with self.assertRaises(*args, **kw):
+ yield
+ else:
+ yield
+
+ @unittest.skipIf(
+ h5py_utils.HDF5_HAS_LOCKING_ARGUMENT != h5py_utils.H5PY_HAS_LOCKING_ARGUMENT,
+ "Versions of libhdf5 and h5py use incompatible file locking behaviour",
+ )
+ @subtests
+ def test_modes_multi_process(self):
+ """Test concurrent access to the same file from different processes"""
+ filename = self._new_filename()
+
+ nonlocking_reader_before_writer = self._nonlocking_reader_before_writer
+ writer_before_nonlocking_reader_exception = OSError
+ old_hdf5_on_windows = IS_WINDOWS and not h5py_utils.HDF5_HAS_LOCKING_ARGUMENT
+ locked_exception = OSError
+
+ # File locked by a writer
+ unexpected_access = old_hdf5_on_windows and self._libver_low_bound_is_v108
+ for wmode in ["w", "a"]:
+ with self._open_context_subprocess(filename, mode=wmode):
+ # Access by a second non-locking reader
+ with self.assertRaisesIf(
+ nonlocking_reader_before_writer,
+ writer_before_nonlocking_reader_exception,
+ ):
+ with self._open_context(filename, mode="r") as f:
+ self._assert_hdf5_data(f)
+ # No access by a second locking reader
+ if unexpected_access:
+ logger.warning("unexpected concurrent access by a locking reader")
+ with self.assertRaisesIf(not unexpected_access, locked_exception):
+ with self._open_context(filename, mode="r", locking=True) as f:
+ self._assert_hdf5_data(f)
+ # No access by a second writer
+ if unexpected_access:
+ logger.warning("unexpected concurrent access by a writer")
+ with self.assertRaisesIf(not unexpected_access, locked_exception):
+ with self._open_context(filename, mode="a") as f:
+ self._assert_hdf5_data(f)
+ # Check for file corruption
+ if not nonlocking_reader_before_writer:
+ self._validate_hdf5_data(filename)
+ self._validate_hdf5_data(filename)
+
+ # File locked by a reader
+ unexpected_access = old_hdf5_on_windows
+ with _subprocess_context(_open_context, filename, mode="r", locking=True):
+ # Access by a non-locking reader
+ with self._open_context(filename, mode="r") as f:
+ self._assert_hdf5_data(f)
+ # Access by a locking reader
+ with self._open_context(filename, mode="r", locking=True) as f:
+ self._assert_hdf5_data(f)
+ # No access by a second writer
+ if unexpected_access:
+ logger.warning("unexpected concurrent access by a writer")
+ raise_condition = not unexpected_access
+ with self.assertRaisesIf(raise_condition, locked_exception):
+ with self._open_context(filename, mode="a") as f:
+ self._assert_hdf5_data(f)
+ # Check for file corruption
+ self._validate_hdf5_data(filename)
+ self._validate_hdf5_data(filename)
+
+ # File open by a non-locking reader
+ with self._open_context_subprocess(filename, mode="r"):
+ # Access by a second non-locking reader
+ with self._open_context(filename, mode="r") as f:
+ self._assert_hdf5_data(f)
+ # Access by a second locking reader
+ with self._open_context(filename, mode="r", locking=True) as f:
+ self._assert_hdf5_data(f)
+ # Access by a second writer
+ with self._open_context(filename, mode="a") as f:
+ self._assert_hdf5_data(f)
+ # Check for file corruption
+ self._validate_hdf5_data(filename)
+ self._validate_hdf5_data(filename)
+
+ @subtests
+ @unittest.skipIf(not h5py_utils.HAS_SWMR, "SWMR not supported")
+ def test_modes_multi_process_swmr(self):
+ filename = self._new_filename()
+
+ with self._open_context(filename, mode="w", libver="latest") as f:
+ pass
+
+ # File open by SWMR writer
+ with self._open_context_subprocess(filename, mode="a", swmr=True):
+ with self._open_context(filename, mode="r") as f:
+ assert f.swmr_mode
+ self._assert_hdf5_data(f)
+ with self.assertRaises(OSError):
+ with self._open_context(filename, mode="a") as f:
+ pass
+ self._validate_hdf5_data(filename, swmr=True)
+
+ @subtests
+ def test_retry_defaults(self):
+ filename = self._new_filename()
+
+ names = h5py_utils.top_level_names(filename)
+ self.assertEqual(names, [])
+
+ names = h5py_utils.safe_top_level_names(filename)
+ self.assertEqual(names, [])
+
+ names = h5py_utils.top_level_names(filename, include_only=None)
+ self.assertEqual(names, ["check"])
+
+ names = h5py_utils.safe_top_level_names(filename, include_only=None)
+ self.assertEqual(names, ["check"])
+
+ with h5py_utils.open_item(filename, "/check", validate=lambda x: False) as item:
+ self.assertEqual(item, None)
+
+ with h5py_utils.open_item(filename, "/check", validate=None) as item:
+ self.assertTrue(item[()])
+
+ with self.assertRaises(RetryTimeoutError):
+ with h5py_utils.open_item(
+ filename,
+ "/check",
+ retry_timeout=0.1,
+ retry_invalid=True,
+ validate=lambda x: False,
+ ) as item:
+ pass
+
+ ncall = 0
+
+ def validate(item):
+ nonlocal ncall
+ if ncall >= 1:
+ return True
+ else:
+ ncall += 1
+ raise RetryError
+
+ with h5py_utils.open_item(
+ filename,
+ "/check",
+ validate=validate,
+ retry_timeout=1,
+ retry_invalid=True,
+ ) as item:
+ self.assertTrue(item[()])
+
+ @subtests
+ def test_retry_custom(self):
+ filename = self._new_filename()
+ ncausefailure = 3
+ faildelay = 0.1
+ sufficient_timeout = ncausefailure * (faildelay + 10)
+ insufficient_timeout = ncausefailure * faildelay * 0.5
+
+ @h5py_utils.retry_contextmanager()
+ def open_item(filename, name):
+ nonlocal failcounter
+ if failcounter < ncausefailure:
+ time.sleep(faildelay)
+ failcounter += 1
+ raise RetryError
+ with h5py_utils.File(filename) as h5file:
+ yield h5file[name]
+
+ failcounter = 0
+ kw = {"retry_timeout": sufficient_timeout}
+ with open_item(filename, "/check", **kw) as item:
+ self.assertTrue(item[()])
+
+ failcounter = 0
+ kw = {"retry_timeout": insufficient_timeout}
+ with self.assertRaises(RetryTimeoutError):
+ with open_item(filename, "/check", **kw) as item:
+ pass
+
+ @subtests
+ def test_retry_in_subprocess(self):
+ filename = self._new_filename()
+ txtfilename = os.path.join(self.test_dir, "failcounter.txt")
+ ncausefailure = 3
+ faildelay = 0.1
+ sufficient_timeout = ncausefailure * (faildelay + 10)
+ insufficient_timeout = ncausefailure * faildelay * 0.5
+
+ kw = {
+ "retry_timeout": sufficient_timeout,
+ "include_only": None,
+ "ncausefailure": ncausefailure,
+ "faildelay": faildelay,
+ }
+ with open(txtfilename, mode="w") as f:
+ f.write("0")
+ names = top_level_names_test(txtfilename, filename, **kw)
+ self.assertEqual(names, ["check"])
+
+ kw = {
+ "retry_timeout": insufficient_timeout,
+ "include_only": None,
+ "ncausefailure": ncausefailure,
+ "faildelay": faildelay,
+ }
+ with open(txtfilename, mode="w") as f:
+ f.write("0")
+ with self.assertRaises(RetryTimeoutError):
+ top_level_names_test(txtfilename, filename, **kw)