summaryrefslogtreecommitdiff
path: root/silx/io/test/test_h5py_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/test/test_h5py_utils.py')
-rw-r--r--silx/io/test/test_h5py_utils.py397
1 files changed, 0 insertions, 397 deletions
diff --git a/silx/io/test/test_h5py_utils.py b/silx/io/test/test_h5py_utils.py
deleted file mode 100644
index 2e2e3dd..0000000
--- a/silx/io/test/test_h5py_utils.py
+++ /dev/null
@@ -1,397 +0,0 @@
-# coding: utf-8
-# /*##########################################################################
-# Copyright (C) 2016-2017 European Synchrotron Radiation Facility
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#
-# ############################################################################*/
-"""Tests for h5py utilities"""
-
-__authors__ = ["W. de Nolf"]
-__license__ = "MIT"
-__date__ = "27/01/2020"
-
-
-import unittest
-import os
-import sys
-import time
-import shutil
-import tempfile
-import threading
-import multiprocessing
-from contextlib import contextmanager
-
-from .. import h5py_utils
-from ...utils.retry import RetryError, RetryTimeoutError
-
-IS_WINDOWS = sys.platform == "win32"
-
-
-def _subprocess_context_main(queue, contextmgr, *args, **kw):
- try:
- with contextmgr(*args, **kw):
- queue.put(None)
- threading.Event().wait()
- except Exception:
- queue.put(None)
- raise
-
-
-@contextmanager
-def _subprocess_context(contextmgr, *args, **kw):
- timeout = kw.pop("timeout", 10)
- queue = multiprocessing.Queue(maxsize=1)
- p = multiprocessing.Process(
- target=_subprocess_context_main, args=(queue, contextmgr) + args, kwargs=kw
- )
- p.start()
- try:
- queue.get(timeout=timeout)
- yield
- finally:
- try:
- p.kill()
- except AttributeError:
- p.terminate()
- p.join(timeout)
-
-
-@contextmanager
-def _open_context(filename, **kw):
- with h5py_utils.File(filename, **kw) as f:
- if kw.get("mode") == "w":
- f["check"] = True
- f.flush()
- yield f
-
-
-def _cause_segfault():
- import ctypes
-
- i = ctypes.c_char(b"a")
- j = ctypes.pointer(i)
- c = 0
- while True:
- j[c] = b"a"
- c += 1
-
-
-def _top_level_names_test(txtfilename, *args, **kw):
- sys.stderr = open(os.devnull, "w")
-
- with open(txtfilename, mode="r") as f:
- failcounter = int(f.readline().strip())
-
- ncausefailure = kw.pop("ncausefailure")
- faildelay = kw.pop("faildelay")
- if failcounter < ncausefailure:
- time.sleep(faildelay)
- failcounter += 1
- with open(txtfilename, mode="w") as f:
- f.write(str(failcounter))
- if failcounter % 2:
- raise RetryError
- else:
- _cause_segfault()
- return h5py_utils._top_level_names(*args, **kw)
-
-
-top_level_names_test = h5py_utils.retry_in_subprocess()(_top_level_names_test)
-
-
-def subtests(test):
- def wrapper(self):
- for _ in self._subtests():
- with self.subTest(**self._subtest_options):
- test(self)
-
- return wrapper
-
-
-class TestH5pyUtils(unittest.TestCase):
- def setUp(self):
- self.test_dir = tempfile.mkdtemp()
-
- def tearDown(self):
- shutil.rmtree(self.test_dir)
-
- def _subtests(self):
- self._subtest_options = {"mode": "w"}
- self.filename_generator = self._filenames()
- yield
- self._subtest_options = {"mode": "w", "libver": "latest"}
- self.filename_generator = self._filenames()
- yield
-
- @property
- def _liber_allows_concurrent_access(self):
- return self._subtest_options.get("libver") in [None, "earliest", "v18"]
-
- def _filenames(self):
- i = 1
- while True:
- filename = os.path.join(self.test_dir, "file{}.h5".format(i))
- with self._open_context(filename):
- pass
- yield filename
- i += 1
-
- def _new_filename(self):
- return next(self.filename_generator)
-
- @contextmanager
- def _open_context(self, filename, **kwargs):
- kw = self._subtest_options
- kw.update(kwargs)
- with _open_context(filename, **kw) as f:
-
- yield f
-
- @contextmanager
- def _open_context_subprocess(self, filename, **kwargs):
- kw = self._subtest_options
- kw.update(kwargs)
- with _subprocess_context(_open_context, filename, **kw):
- yield
-
- def _assert_hdf5_data(self, f):
- self.assertTrue(f["check"][()])
-
- def _validate_hdf5_data(self, filename, swmr=False):
- with self._open_context(filename, mode="r") as f:
- self.assertEqual(f.swmr_mode, swmr)
- self._assert_hdf5_data(f)
-
- @subtests
- def test_modes_single_process(self):
- orig = os.environ.get("HDF5_USE_FILE_LOCKING")
- filename1 = self._new_filename()
- self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
- filename2 = self._new_filename()
- self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
- with self._open_context(filename1, mode="r"):
- with self._open_context(filename2, mode="r"):
- pass
- for mode in ["w", "a"]:
- with self.assertRaises(RuntimeError):
- with self._open_context(filename2, mode=mode):
- pass
- self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
- with self._open_context(filename1, mode="a"):
- for mode in ["w", "a"]:
- with self._open_context(filename2, mode=mode):
- pass
- with self.assertRaises(RuntimeError):
- with self._open_context(filename2, mode="r"):
- pass
- self.assertEqual(orig, os.environ.get("HDF5_USE_FILE_LOCKING"))
-
- @subtests
- def test_modes_multi_process(self):
- if not self._liber_allows_concurrent_access:
- # A concurrent reader with HDF5_USE_FILE_LOCKING=FALSE is
- # no longer works with HDF5 >=1.10 (you get an exception
- # when trying to open the file)
- return
- filename = self._new_filename()
-
- # File open by truncating writer
- with self._open_context_subprocess(filename, mode="w"):
- with self._open_context(filename, mode="r") as f:
- self._assert_hdf5_data(f)
- if IS_WINDOWS:
- with self._open_context(filename, mode="a") as f:
- self._assert_hdf5_data(f)
- else:
- with self.assertRaises(OSError):
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename)
-
- # File open by appending writer
- with self._open_context_subprocess(filename, mode="a"):
- with self._open_context(filename, mode="r") as f:
- self._assert_hdf5_data(f)
- if IS_WINDOWS:
- with self._open_context(filename, mode="a") as f:
- self._assert_hdf5_data(f)
- else:
- with self.assertRaises(OSError):
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename)
-
- # File open by reader
- with self._open_context_subprocess(filename, mode="r"):
- with self._open_context(filename, mode="r") as f:
- self._assert_hdf5_data(f)
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename)
-
- # File open by locking reader
- with _subprocess_context(
- _open_context, filename, mode="r", enable_file_locking=True
- ):
- with self._open_context(filename, mode="r") as f:
- self._assert_hdf5_data(f)
- if IS_WINDOWS:
- with self._open_context(filename, mode="a") as f:
- self._assert_hdf5_data(f)
- else:
- with self.assertRaises(OSError):
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename)
-
- @subtests
- @unittest.skipIf(not h5py_utils.HAS_SWMR, "SWMR not supported")
- def test_modes_multi_process_swmr(self):
- filename = self._new_filename()
-
- with self._open_context(filename, mode="w", libver="latest") as f:
- pass
-
- # File open by SWMR writer
- with self._open_context_subprocess(filename, mode="a", swmr=True):
- with self._open_context(filename, mode="r") as f:
- assert f.swmr_mode
- self._assert_hdf5_data(f)
- with self.assertRaises(OSError):
- with self._open_context(filename, mode="a") as f:
- pass
- self._validate_hdf5_data(filename, swmr=True)
-
- @subtests
- def test_retry_defaults(self):
- filename = self._new_filename()
-
- names = h5py_utils.top_level_names(filename)
- self.assertEqual(names, [])
-
- names = h5py_utils.safe_top_level_names(filename)
- self.assertEqual(names, [])
-
- names = h5py_utils.top_level_names(filename, include_only=None)
- self.assertEqual(names, ["check"])
-
- names = h5py_utils.safe_top_level_names(filename, include_only=None)
- self.assertEqual(names, ["check"])
-
- with h5py_utils.open_item(filename, "/check", validate=lambda x: False) as item:
- self.assertEqual(item, None)
-
- with h5py_utils.open_item(filename, "/check", validate=None) as item:
- self.assertTrue(item[()])
-
- with self.assertRaises(RetryTimeoutError):
- with h5py_utils.open_item(
- filename,
- "/check",
- retry_timeout=0.1,
- retry_invalid=True,
- validate=lambda x: False,
- ) as item:
- pass
-
- ncall = 0
-
- def validate(item):
- nonlocal ncall
- if ncall >= 1:
- return True
- else:
- ncall += 1
- raise RetryError
-
- with h5py_utils.open_item(
- filename, "/check", validate=validate, retry_timeout=1, retry_invalid=True
- ) as item:
- self.assertTrue(item[()])
-
- @subtests
- def test_retry_custom(self):
- filename = self._new_filename()
- ncausefailure = 3
- faildelay = 0.1
- sufficient_timeout = ncausefailure * (faildelay + 10)
- insufficient_timeout = ncausefailure * faildelay * 0.5
-
- @h5py_utils.retry_contextmanager()
- def open_item(filename, name):
- nonlocal failcounter
- if failcounter < ncausefailure:
- time.sleep(faildelay)
- failcounter += 1
- raise RetryError
- with h5py_utils.File(filename) as h5file:
- yield h5file[name]
-
- failcounter = 0
- kw = {"retry_timeout": sufficient_timeout}
- with open_item(filename, "/check", **kw) as item:
- self.assertTrue(item[()])
-
- failcounter = 0
- kw = {"retry_timeout": insufficient_timeout}
- with self.assertRaises(RetryTimeoutError):
- with open_item(filename, "/check", **kw) as item:
- pass
-
- @subtests
- def test_retry_in_subprocess(self):
- filename = self._new_filename()
- txtfilename = os.path.join(self.test_dir, "failcounter.txt")
- ncausefailure = 3
- faildelay = 0.1
- sufficient_timeout = ncausefailure * (faildelay + 10)
- insufficient_timeout = ncausefailure * faildelay * 0.5
-
- kw = {
- "retry_timeout": sufficient_timeout,
- "include_only": None,
- "ncausefailure": ncausefailure,
- "faildelay": faildelay,
- }
- with open(txtfilename, mode="w") as f:
- f.write("0")
- names = top_level_names_test(txtfilename, filename, **kw)
- self.assertEqual(names, ["check"])
-
- kw = {
- "retry_timeout": insufficient_timeout,
- "include_only": None,
- "ncausefailure": ncausefailure,
- "faildelay": faildelay,
- }
- with open(txtfilename, mode="w") as f:
- f.write("0")
- with self.assertRaises(RetryTimeoutError):
- top_level_names_test(txtfilename, filename, **kw)
-
-
-def suite():
- test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestH5pyUtils))
- return test_suite
-
-
-if __name__ == "__main__":
- unittest.main(defaultTest="suite")