diff options
Diffstat (limited to 'src/silx/io/test/test_h5py_utils.py')
-rw-r--r-- | src/silx/io/test/test_h5py_utils.py | 451 |
1 files changed, 451 insertions, 0 deletions
diff --git a/src/silx/io/test/test_h5py_utils.py b/src/silx/io/test/test_h5py_utils.py new file mode 100644 index 0000000..ea46eca --- /dev/null +++ b/src/silx/io/test/test_h5py_utils.py @@ -0,0 +1,451 @@ +# coding: utf-8 +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for h5py utilities""" + +__authors__ = ["W. de Nolf"] +__license__ = "MIT" +__date__ = "27/01/2020" + + +import unittest +import os +import sys +import time +import shutil +import logging +import tempfile +import multiprocessing +from contextlib import contextmanager + +from .. import h5py_utils +from ...utils.retry import RetryError, RetryTimeoutError + +IS_WINDOWS = sys.platform == "win32" +logger = logging.getLogger() + + +def _subprocess_context_main(queue, contextmgr, *args, **kw): + try: + with contextmgr(*args, **kw): + queue.put(None) + queue.get() + except Exception: + queue.put(None) + raise + + +@contextmanager +def _subprocess_context(contextmgr, *args, **kw): + print("\nSTART", os.getpid()) + timeout = kw.pop("timeout", 10) + queue = multiprocessing.Queue(maxsize=1) + p = multiprocessing.Process( + target=_subprocess_context_main, args=(queue, contextmgr) + args, kwargs=kw + ) + p.start() + try: + queue.get(timeout=timeout) + yield + finally: + queue.put(None) + p.join(timeout) + print(" EXIT", os.getpid()) + + +@contextmanager +def _open_context(filename, **kw): + try: + print(os.getpid(), "OPEN", filename, kw) + with h5py_utils.File(filename, **kw) as f: + if kw.get("mode") == "w": + f["check"] = True + f.flush() + yield f + except Exception: + print(" ", os.getpid(), "FAILED", filename, kw) + raise + else: + print(" ", os.getpid(), "CLOSED", filename, kw) + + +def _cause_segfault(): + import ctypes + + i = ctypes.c_char(b"a") + j = ctypes.pointer(i) + c = 0 + while True: + j[c] = b"a" + c += 1 + + +def _top_level_names_test(txtfilename, *args, **kw): + sys.stderr = open(os.devnull, "w") + + with open(txtfilename, mode="r") as f: + failcounter = int(f.readline().strip()) + + ncausefailure = kw.pop("ncausefailure") + faildelay = kw.pop("faildelay") + if failcounter < ncausefailure: + time.sleep(faildelay) + failcounter += 1 + with open(txtfilename, mode="w") as f: + f.write(str(failcounter)) + if failcounter % 2: + raise RetryError + else: + _cause_segfault() + return h5py_utils._top_level_names(*args, **kw) + + +top_level_names_test = h5py_utils.retry_in_subprocess()(_top_level_names_test) + + +def subtests(test): + def wrapper(self): + for subtest_options in self._subtests(): + print("\n====SUB TEST===\n") + print(f"sub test options: {subtest_options}") + with self.subTest(str(subtest_options)): + test(self) + + return wrapper + + +class TestH5pyUtils(unittest.TestCase): + def setUp(self): + self.test_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.test_dir) + + def _subtests(self): + self._subtest_options = {"mode": "w"} + self.filename_generator = self._filenames() + yield self._subtest_options + self._subtest_options = {"mode": "w", "libver": "latest"} + self.filename_generator = self._filenames() + yield + + def _filenames(self): + i = 1 + while True: + filename = os.path.join(self.test_dir, "file{}.h5".format(i)) + with self._open_context(filename): + pass + yield filename + i += 1 + + def _new_filename(self): + return next(self.filename_generator) + + @contextmanager + def _open_context(self, filename, **kwargs): + kw = dict(self._subtest_options) + kw.update(kwargs) + with _open_context(filename, **kw) as f: + yield f + + @contextmanager + def _open_context_subprocess(self, filename, **kwargs): + kw = dict(self._subtest_options) + kw.update(kwargs) + with _subprocess_context(_open_context, filename, **kw): + yield + + def _assert_hdf5_data(self, f): + self.assertTrue(f["check"][()]) + + def _validate_hdf5_data(self, filename, swmr=False): + with self._open_context(filename, mode="r") as f: + self.assertEqual(f.swmr_mode, swmr) + self._assert_hdf5_data(f) + + @subtests + def test_modes_single_process(self): + """Test concurrent access to the different files from the same process""" + # When using HDF5_USE_FILE_LOCKING, open files with and without + # locking should raise an exception. HDF5_USE_FILE_LOCKING should + # be reset when all files are closed. + + orig = os.environ.get("HDF5_USE_FILE_LOCKING") + filename1 = self._new_filename() + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + filename2 = self._new_filename() + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + + with self._open_context(filename1, mode="r"): + locking1 = False + for mode in ["r", "w", "a"]: + locking2 = mode != "r" + raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT + raise_condition &= locking1 != locking2 + with self.assertRaisesIf(raise_condition, RuntimeError): + with self._open_context(filename2, mode=mode): + pass + self._validate_hdf5_data(filename1) + self._validate_hdf5_data(filename2) + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + + with self._open_context(filename1, mode="a"): + locking1 = True + for mode in ["r", "w", "a"]: + locking2 = mode != "r" + raise_condition = not h5py_utils.HAS_LOCKING_ARGUMENT + raise_condition &= locking1 != locking2 + with self.assertRaisesIf(raise_condition, RuntimeError): + with self._open_context(filename2, mode=mode): + pass + self._validate_hdf5_data(filename1) + self._validate_hdf5_data(filename2) + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + + @property + def _libver_low_bound_is_v108(self): + libver = self._subtest_options.get("libver") + return h5py_utils._libver_low_bound_is_v108(libver) + + @property + def _nonlocking_reader_before_writer(self): + """A non-locking reader must open the file before it is locked by a writer""" + if IS_WINDOWS and h5py_utils.HDF5_HAS_LOCKING_ARGUMENT: + return True + if not self._libver_low_bound_is_v108: + return True + return False + + @contextmanager + def assertRaisesIf(self, condition, *args, **kw): + if condition: + with self.assertRaises(*args, **kw): + yield + else: + yield + + @unittest.skipIf( + h5py_utils.HDF5_HAS_LOCKING_ARGUMENT != h5py_utils.H5PY_HAS_LOCKING_ARGUMENT, + "Versions of libhdf5 and h5py use incompatible file locking behaviour", + ) + @subtests + def test_modes_multi_process(self): + """Test concurrent access to the same file from different processes""" + filename = self._new_filename() + + nonlocking_reader_before_writer = self._nonlocking_reader_before_writer + writer_before_nonlocking_reader_exception = OSError + old_hdf5_on_windows = IS_WINDOWS and not h5py_utils.HDF5_HAS_LOCKING_ARGUMENT + locked_exception = OSError + + # File locked by a writer + unexpected_access = old_hdf5_on_windows and self._libver_low_bound_is_v108 + for wmode in ["w", "a"]: + with self._open_context_subprocess(filename, mode=wmode): + # Access by a second non-locking reader + with self.assertRaisesIf( + nonlocking_reader_before_writer, + writer_before_nonlocking_reader_exception, + ): + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + # No access by a second locking reader + if unexpected_access: + logger.warning("unexpected concurrent access by a locking reader") + with self.assertRaisesIf(not unexpected_access, locked_exception): + with self._open_context(filename, mode="r", locking=True) as f: + self._assert_hdf5_data(f) + # No access by a second writer + if unexpected_access: + logger.warning("unexpected concurrent access by a writer") + with self.assertRaisesIf(not unexpected_access, locked_exception): + with self._open_context(filename, mode="a") as f: + self._assert_hdf5_data(f) + # Check for file corruption + if not nonlocking_reader_before_writer: + self._validate_hdf5_data(filename) + self._validate_hdf5_data(filename) + + # File locked by a reader + unexpected_access = old_hdf5_on_windows + with _subprocess_context(_open_context, filename, mode="r", locking=True): + # Access by a non-locking reader + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + # Access by a locking reader + with self._open_context(filename, mode="r", locking=True) as f: + self._assert_hdf5_data(f) + # No access by a second writer + if unexpected_access: + logger.warning("unexpected concurrent access by a writer") + raise_condition = not unexpected_access + with self.assertRaisesIf(raise_condition, locked_exception): + with self._open_context(filename, mode="a") as f: + self._assert_hdf5_data(f) + # Check for file corruption + self._validate_hdf5_data(filename) + self._validate_hdf5_data(filename) + + # File open by a non-locking reader + with self._open_context_subprocess(filename, mode="r"): + # Access by a second non-locking reader + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + # Access by a second locking reader + with self._open_context(filename, mode="r", locking=True) as f: + self._assert_hdf5_data(f) + # Access by a second writer + with self._open_context(filename, mode="a") as f: + self._assert_hdf5_data(f) + # Check for file corruption + self._validate_hdf5_data(filename) + self._validate_hdf5_data(filename) + + @subtests + @unittest.skipIf(not h5py_utils.HAS_SWMR, "SWMR not supported") + def test_modes_multi_process_swmr(self): + filename = self._new_filename() + + with self._open_context(filename, mode="w", libver="latest") as f: + pass + + # File open by SWMR writer + with self._open_context_subprocess(filename, mode="a", swmr=True): + with self._open_context(filename, mode="r") as f: + assert f.swmr_mode + self._assert_hdf5_data(f) + with self.assertRaises(OSError): + with self._open_context(filename, mode="a") as f: + pass + self._validate_hdf5_data(filename, swmr=True) + + @subtests + def test_retry_defaults(self): + filename = self._new_filename() + + names = h5py_utils.top_level_names(filename) + self.assertEqual(names, []) + + names = h5py_utils.safe_top_level_names(filename) + self.assertEqual(names, []) + + names = h5py_utils.top_level_names(filename, include_only=None) + self.assertEqual(names, ["check"]) + + names = h5py_utils.safe_top_level_names(filename, include_only=None) + self.assertEqual(names, ["check"]) + + with h5py_utils.open_item(filename, "/check", validate=lambda x: False) as item: + self.assertEqual(item, None) + + with h5py_utils.open_item(filename, "/check", validate=None) as item: + self.assertTrue(item[()]) + + with self.assertRaises(RetryTimeoutError): + with h5py_utils.open_item( + filename, + "/check", + retry_timeout=0.1, + retry_invalid=True, + validate=lambda x: False, + ) as item: + pass + + ncall = 0 + + def validate(item): + nonlocal ncall + if ncall >= 1: + return True + else: + ncall += 1 + raise RetryError + + with h5py_utils.open_item( + filename, + "/check", + validate=validate, + retry_timeout=1, + retry_invalid=True, + ) as item: + self.assertTrue(item[()]) + + @subtests + def test_retry_custom(self): + filename = self._new_filename() + ncausefailure = 3 + faildelay = 0.1 + sufficient_timeout = ncausefailure * (faildelay + 10) + insufficient_timeout = ncausefailure * faildelay * 0.5 + + @h5py_utils.retry_contextmanager() + def open_item(filename, name): + nonlocal failcounter + if failcounter < ncausefailure: + time.sleep(faildelay) + failcounter += 1 + raise RetryError + with h5py_utils.File(filename) as h5file: + yield h5file[name] + + failcounter = 0 + kw = {"retry_timeout": sufficient_timeout} + with open_item(filename, "/check", **kw) as item: + self.assertTrue(item[()]) + + failcounter = 0 + kw = {"retry_timeout": insufficient_timeout} + with self.assertRaises(RetryTimeoutError): + with open_item(filename, "/check", **kw) as item: + pass + + @subtests + def test_retry_in_subprocess(self): + filename = self._new_filename() + txtfilename = os.path.join(self.test_dir, "failcounter.txt") + ncausefailure = 3 + faildelay = 0.1 + sufficient_timeout = ncausefailure * (faildelay + 10) + insufficient_timeout = ncausefailure * faildelay * 0.5 + + kw = { + "retry_timeout": sufficient_timeout, + "include_only": None, + "ncausefailure": ncausefailure, + "faildelay": faildelay, + } + with open(txtfilename, mode="w") as f: + f.write("0") + names = top_level_names_test(txtfilename, filename, **kw) + self.assertEqual(names, ["check"]) + + kw = { + "retry_timeout": insufficient_timeout, + "include_only": None, + "ncausefailure": ncausefailure, + "faildelay": faildelay, + } + with open(txtfilename, mode="w") as f: + f.write("0") + with self.assertRaises(RetryTimeoutError): + top_level_names_test(txtfilename, filename, **kw) |