diff options
Diffstat (limited to 'silx/io/test/test_h5py_utils.py')
-rw-r--r-- | silx/io/test/test_h5py_utils.py | 397 |
1 files changed, 397 insertions, 0 deletions
diff --git a/silx/io/test/test_h5py_utils.py b/silx/io/test/test_h5py_utils.py new file mode 100644 index 0000000..2e2e3dd --- /dev/null +++ b/silx/io/test/test_h5py_utils.py @@ -0,0 +1,397 @@ +# coding: utf-8 +# /*########################################################################## +# Copyright (C) 2016-2017 European Synchrotron Radiation Facility +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# ############################################################################*/ +"""Tests for h5py utilities""" + +__authors__ = ["W. de Nolf"] +__license__ = "MIT" +__date__ = "27/01/2020" + + +import unittest +import os +import sys +import time +import shutil +import tempfile +import threading +import multiprocessing +from contextlib import contextmanager + +from .. import h5py_utils +from ...utils.retry import RetryError, RetryTimeoutError + +IS_WINDOWS = sys.platform == "win32" + + +def _subprocess_context_main(queue, contextmgr, *args, **kw): + try: + with contextmgr(*args, **kw): + queue.put(None) + threading.Event().wait() + except Exception: + queue.put(None) + raise + + +@contextmanager +def _subprocess_context(contextmgr, *args, **kw): + timeout = kw.pop("timeout", 10) + queue = multiprocessing.Queue(maxsize=1) + p = multiprocessing.Process( + target=_subprocess_context_main, args=(queue, contextmgr) + args, kwargs=kw + ) + p.start() + try: + queue.get(timeout=timeout) + yield + finally: + try: + p.kill() + except AttributeError: + p.terminate() + p.join(timeout) + + +@contextmanager +def _open_context(filename, **kw): + with h5py_utils.File(filename, **kw) as f: + if kw.get("mode") == "w": + f["check"] = True + f.flush() + yield f + + +def _cause_segfault(): + import ctypes + + i = ctypes.c_char(b"a") + j = ctypes.pointer(i) + c = 0 + while True: + j[c] = b"a" + c += 1 + + +def _top_level_names_test(txtfilename, *args, **kw): + sys.stderr = open(os.devnull, "w") + + with open(txtfilename, mode="r") as f: + failcounter = int(f.readline().strip()) + + ncausefailure = kw.pop("ncausefailure") + faildelay = kw.pop("faildelay") + if failcounter < ncausefailure: + time.sleep(faildelay) + failcounter += 1 + with open(txtfilename, mode="w") as f: + f.write(str(failcounter)) + if failcounter % 2: + raise RetryError + else: + _cause_segfault() + return h5py_utils._top_level_names(*args, **kw) + + +top_level_names_test = h5py_utils.retry_in_subprocess()(_top_level_names_test) + + +def subtests(test): + def wrapper(self): + for _ in self._subtests(): + with self.subTest(**self._subtest_options): + test(self) + + return wrapper + + +class TestH5pyUtils(unittest.TestCase): + def setUp(self): + self.test_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.test_dir) + + def _subtests(self): + self._subtest_options = {"mode": "w"} + self.filename_generator = self._filenames() + yield + self._subtest_options = {"mode": "w", "libver": "latest"} + self.filename_generator = self._filenames() + yield + + @property + def _liber_allows_concurrent_access(self): + return self._subtest_options.get("libver") in [None, "earliest", "v18"] + + def _filenames(self): + i = 1 + while True: + filename = os.path.join(self.test_dir, "file{}.h5".format(i)) + with self._open_context(filename): + pass + yield filename + i += 1 + + def _new_filename(self): + return next(self.filename_generator) + + @contextmanager + def _open_context(self, filename, **kwargs): + kw = self._subtest_options + kw.update(kwargs) + with _open_context(filename, **kw) as f: + + yield f + + @contextmanager + def _open_context_subprocess(self, filename, **kwargs): + kw = self._subtest_options + kw.update(kwargs) + with _subprocess_context(_open_context, filename, **kw): + yield + + def _assert_hdf5_data(self, f): + self.assertTrue(f["check"][()]) + + def _validate_hdf5_data(self, filename, swmr=False): + with self._open_context(filename, mode="r") as f: + self.assertEqual(f.swmr_mode, swmr) + self._assert_hdf5_data(f) + + @subtests + def test_modes_single_process(self): + orig = os.environ.get("HDF5_USE_FILE_LOCKING") + filename1 = self._new_filename() + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + filename2 = self._new_filename() + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + with self._open_context(filename1, mode="r"): + with self._open_context(filename2, mode="r"): + pass + for mode in ["w", "a"]: + with self.assertRaises(RuntimeError): + with self._open_context(filename2, mode=mode): + pass + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + with self._open_context(filename1, mode="a"): + for mode in ["w", "a"]: + with self._open_context(filename2, mode=mode): + pass + with self.assertRaises(RuntimeError): + with self._open_context(filename2, mode="r"): + pass + self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING")) + + @subtests + def test_modes_multi_process(self): + if not self._liber_allows_concurrent_access: + # A concurrent reader with HDF5_USE_FILE_LOCKING=FALSE is + # no longer works with HDF5 >=1.10 (you get an exception + # when trying to open the file) + return + filename = self._new_filename() + + # File open by truncating writer + with self._open_context_subprocess(filename, mode="w"): + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + if IS_WINDOWS: + with self._open_context(filename, mode="a") as f: + self._assert_hdf5_data(f) + else: + with self.assertRaises(OSError): + with self._open_context(filename, mode="a") as f: + pass + self._validate_hdf5_data(filename) + + # File open by appending writer + with self._open_context_subprocess(filename, mode="a"): + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + if IS_WINDOWS: + with self._open_context(filename, mode="a") as f: + self._assert_hdf5_data(f) + else: + with self.assertRaises(OSError): + with self._open_context(filename, mode="a") as f: + pass + self._validate_hdf5_data(filename) + + # File open by reader + with self._open_context_subprocess(filename, mode="r"): + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + with self._open_context(filename, mode="a") as f: + pass + self._validate_hdf5_data(filename) + + # File open by locking reader + with _subprocess_context( + _open_context, filename, mode="r", enable_file_locking=True + ): + with self._open_context(filename, mode="r") as f: + self._assert_hdf5_data(f) + if IS_WINDOWS: + with self._open_context(filename, mode="a") as f: + self._assert_hdf5_data(f) + else: + with self.assertRaises(OSError): + with self._open_context(filename, mode="a") as f: + pass + self._validate_hdf5_data(filename) + + @subtests + @unittest.skipIf(not h5py_utils.HAS_SWMR, "SWMR not supported") + def test_modes_multi_process_swmr(self): + filename = self._new_filename() + + with self._open_context(filename, mode="w", libver="latest") as f: + pass + + # File open by SWMR writer + with self._open_context_subprocess(filename, mode="a", swmr=True): + with self._open_context(filename, mode="r") as f: + assert f.swmr_mode + self._assert_hdf5_data(f) + with self.assertRaises(OSError): + with self._open_context(filename, mode="a") as f: + pass + self._validate_hdf5_data(filename, swmr=True) + + @subtests + def test_retry_defaults(self): + filename = self._new_filename() + + names = h5py_utils.top_level_names(filename) + self.assertEqual(names, []) + + names = h5py_utils.safe_top_level_names(filename) + self.assertEqual(names, []) + + names = h5py_utils.top_level_names(filename, include_only=None) + self.assertEqual(names, ["check"]) + + names = h5py_utils.safe_top_level_names(filename, include_only=None) + self.assertEqual(names, ["check"]) + + with h5py_utils.open_item(filename, "/check", validate=lambda x: False) as item: + self.assertEqual(item, None) + + with h5py_utils.open_item(filename, "/check", validate=None) as item: + self.assertTrue(item[()]) + + with self.assertRaises(RetryTimeoutError): + with h5py_utils.open_item( + filename, + "/check", + retry_timeout=0.1, + retry_invalid=True, + validate=lambda x: False, + ) as item: + pass + + ncall = 0 + + def validate(item): + nonlocal ncall + if ncall >= 1: + return True + else: + ncall += 1 + raise RetryError + + with h5py_utils.open_item( + filename, "/check", validate=validate, retry_timeout=1, retry_invalid=True + ) as item: + self.assertTrue(item[()]) + + @subtests + def test_retry_custom(self): + filename = self._new_filename() + ncausefailure = 3 + faildelay = 0.1 + sufficient_timeout = ncausefailure * (faildelay + 10) + insufficient_timeout = ncausefailure * faildelay * 0.5 + + @h5py_utils.retry_contextmanager() + def open_item(filename, name): + nonlocal failcounter + if failcounter < ncausefailure: + time.sleep(faildelay) + failcounter += 1 + raise RetryError + with h5py_utils.File(filename) as h5file: + yield h5file[name] + + failcounter = 0 + kw = {"retry_timeout": sufficient_timeout} + with open_item(filename, "/check", **kw) as item: + self.assertTrue(item[()]) + + failcounter = 0 + kw = {"retry_timeout": insufficient_timeout} + with self.assertRaises(RetryTimeoutError): + with open_item(filename, "/check", **kw) as item: + pass + + @subtests + def test_retry_in_subprocess(self): + filename = self._new_filename() + txtfilename = os.path.join(self.test_dir, "failcounter.txt") + ncausefailure = 3 + faildelay = 0.1 + sufficient_timeout = ncausefailure * (faildelay + 10) + insufficient_timeout = ncausefailure * faildelay * 0.5 + + kw = { + "retry_timeout": sufficient_timeout, + "include_only": None, + "ncausefailure": ncausefailure, + "faildelay": faildelay, + } + with open(txtfilename, mode="w") as f: + f.write("0") + names = top_level_names_test(txtfilename, filename, **kw) + self.assertEqual(names, ["check"]) + + kw = { + "retry_timeout": insufficient_timeout, + "include_only": None, + "ncausefailure": ncausefailure, + "faildelay": faildelay, + } + with open(txtfilename, mode="w") as f: + f.write("0") + with self.assertRaises(RetryTimeoutError): + top_level_names_test(txtfilename, filename, **kw) + + +def suite(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestH5pyUtils)) + return test_suite + + +if __name__ == "__main__": + unittest.main(defaultTest="suite") |