diff options
Diffstat (limited to 'silx/utils/ExternalResources.py')
-rw-r--r-- | silx/utils/ExternalResources.py | 320 |
1 files changed, 0 insertions, 320 deletions
diff --git a/silx/utils/ExternalResources.py b/silx/utils/ExternalResources.py deleted file mode 100644 index e21381c..0000000 --- a/silx/utils/ExternalResources.py +++ /dev/null @@ -1,320 +0,0 @@ -# coding: utf-8 -# /*########################################################################## -# -# Copyright (c) 2016-2018 European Synchrotron Radiation Facility -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# ###########################################################################*/ -"""Helper to access to external resources. -""" - -__authors__ = ["Thomas Vincent", "J. Kieffer"] -__license__ = "MIT" -__date__ = "08/03/2019" - - -import os -import threading -import json -import logging -import tempfile -import unittest -import six - -logger = logging.getLogger(__name__) - - -class ExternalResources(object): - """Utility class which allows to download test-data from www.silx.org - and manage the temporary data during the tests. - - """ - - def __init__(self, project, - url_base, - env_key=None, - timeout=60): - """Constructor of the class - - :param str project: name of the project, like "silx" - :param str url_base: base URL for the data, like "http://www.silx.org/pub" - :param str env_key: name of the environment variable which contains the - test_data directory, like "SILX_DATA". - If None (default), then the name of the - environment variable is built from the project argument: - "<PROJECT>_DATA". - The environment variable is optional: in case it is not set, - a directory in the temporary folder is used. - :param timeout: time in seconds before it breaks - """ - self.project = project - self._initialized = False - self.sem = threading.Semaphore() - - self.env_key = env_key or (self.project.upper() + "_TESTDATA") - self.url_base = url_base - self.all_data = set() - self.timeout = timeout - self._data_home = None - - @property - def data_home(self): - """Returns the data_home path and make sure it exists in the file - system.""" - if self._data_home is not None: - return self._data_home - - data_home = os.environ.get(self.env_key) - if data_home is None: - try: - import getpass - name = getpass.getuser() - except Exception: - if "getlogin" in dir(os): - name = os.getlogin() - elif "USER" in os.environ: - name = os.environ["USER"] - elif "USERNAME" in os.environ: - name = os.environ["USERNAME"] - else: - name = "uid" + str(os.getuid()) - - basename = "%s_testdata_%s" % (self.project, name) - data_home = os.path.join(tempfile.gettempdir(), basename) - if not os.path.exists(data_home): - os.makedirs(data_home) - self._data_home = data_home - return data_home - - def _initialize_data(self): - """Initialize for downloading test data""" - if not self._initialized: - with self.sem: - if not self._initialized: - self.testdata = os.path.join(self.data_home, "all_testdata.json") - if os.path.exists(self.testdata): - with open(self.testdata) as f: - self.all_data = set(json.load(f)) - self._initialized = True - - def clean_up(self): - pass - - def getfile(self, filename): - """Downloads the requested file from web-server available - at https://www.silx.org/pub/silx/ - - :param: relative name of the image. - :return: full path of the locally saved file. - """ - logger.debug("ExternalResources.getfile('%s')", filename) - - if not self._initialized: - self._initialize_data() - - fullfilename = os.path.abspath(os.path.join(self.data_home, filename)) - - if not os.path.isfile(fullfilename): - logger.debug("Trying to download image %s, timeout set to %ss", - filename, self.timeout) - dictProxies = {} - if "http_proxy" in os.environ: - dictProxies['http'] = os.environ["http_proxy"] - dictProxies['https'] = os.environ["http_proxy"] - if "https_proxy" in os.environ: - dictProxies['https'] = os.environ["https_proxy"] - if dictProxies: - proxy_handler = six.moves.urllib.request.ProxyHandler(dictProxies) - opener = six.moves.urllib.request.build_opener(proxy_handler).open - else: - opener = six.moves.urllib.request.urlopen - - logger.debug("wget %s/%s", self.url_base, filename) - try: - data = opener("%s/%s" % (self.url_base, filename), - data=None, timeout=self.timeout).read() - logger.info("Image %s successfully downloaded.", filename) - except six.moves.urllib.error.URLError: - raise unittest.SkipTest("network unreachable.") - - if not os.path.isdir(os.path.dirname(fullfilename)): - # Create sub-directory if needed - os.makedirs(os.path.dirname(fullfilename)) - - try: - with open(fullfilename, "wb") as outfile: - outfile.write(data) - except IOError: - raise IOError("unable to write downloaded \ - data to disk at %s" % self.data_home) - - if not os.path.isfile(fullfilename): - raise RuntimeError( - """Could not automatically download test images %s! - If you are behind a firewall, please set both environment variable http_proxy and https_proxy. - This even works under windows ! - Otherwise please try to download the images manually from - %s/%s""" % (filename, self.url_base, filename)) - - if filename not in self.all_data: - self.all_data.add(filename) - image_list = list(self.all_data) - image_list.sort() - try: - with open(self.testdata, "w") as fp: - json.dump(image_list, fp, indent=4) - except IOError: - logger.debug("Unable to save JSON list") - - return fullfilename - - def getdir(self, dirname): - """Downloads the requested tarball from the server - https://www.silx.org/pub/silx/ - and unzips it into the data directory - - :param: relative name of the image. - :return: list of files with their full path. - """ - lodn = dirname.lower() - if (lodn.endswith("tar") or lodn.endswith("tgz") or - lodn.endswith("tbz2") or lodn.endswith("tar.gz") or - lodn.endswith("tar.bz2")): - import tarfile - engine = tarfile.TarFile.open - elif lodn.endswith("zip"): - import zipfile - engine = zipfile.ZipFile - else: - raise RuntimeError("Unsupported archive format. Only tar and zip " - "are currently supported") - full_path = self.getfile(dirname) - with engine(full_path, mode="r") as fd: - output = os.path.join(self.data_home, dirname + "__content") - fd.extractall(output) - if lodn.endswith("zip"): - result = [os.path.join(output, i) for i in fd.namelist()] - else: - result = [os.path.join(output, i) for i in fd.getnames()] - return result - - def get_file_and_repack(self, filename): - """ - Download the requested file, decompress and repack it to bz2 and gz. - - :param str filename: name of the image. - :rtype: str - :return: full path of the locally saved file - """ - if not self._initialized: - self._initialize_data() - if filename not in self.all_data: - self.all_data.add(filename) - image_list = list(self.all_data) - image_list.sort() - try: - with open(self.testdata, "w") as fp: - json.dump(image_list, fp, indent=4) - except IOError: - logger.debug("Unable to save JSON list") - baseimage = os.path.basename(filename) - logger.info("UtilsTest.getimage('%s')" % baseimage) - - if not os.path.exists(self.data_home): - os.makedirs(self.data_home) - fullimagename = os.path.abspath(os.path.join(self.data_home, baseimage)) - - if baseimage.endswith(".bz2"): - bzip2name = baseimage - basename = baseimage[:-4] - gzipname = basename + ".gz" - elif baseimage.endswith(".gz"): - gzipname = baseimage - basename = baseimage[:-3] - bzip2name = basename + ".bz2" - else: - basename = baseimage - gzipname = baseimage + "gz2" - bzip2name = basename + ".bz2" - - fullimagename_gz = os.path.abspath(os.path.join(self.data_home, gzipname)) - fullimagename_raw = os.path.abspath(os.path.join(self.data_home, basename)) - fullimagename_bz2 = os.path.abspath(os.path.join(self.data_home, bzip2name)) - - # The files are recreated from the bz2 file - if not os.path.isfile(fullimagename_bz2): - self.getfile(bzip2name) - if not os.path.isfile(fullimagename_bz2): - raise RuntimeError( - """Could not automatically download test images %s! - If you are behind a firewall, please set the environment variable http_proxy. - Otherwise please try to download the images manually from - %s""" % (self.url_base, filename)) - - try: - import bz2 - except ImportError: - raise RuntimeError("bz2 library is needed to decompress data") - try: - import gzip - except ImportError: - gzip = None - - raw_file_exists = os.path.isfile(fullimagename_raw) - gz_file_exists = os.path.isfile(fullimagename_gz) - if not raw_file_exists or not gz_file_exists: - with open(fullimagename_bz2, "rb") as f: - data = f.read() - decompressed = bz2.decompress(data) - - if not raw_file_exists: - try: - with open(fullimagename_raw, "wb") as fullimage: - fullimage.write(decompressed) - except IOError: - raise IOError("unable to write decompressed \ - data to disk at %s" % self.data_home) - - if not gz_file_exists: - if gzip is None: - raise RuntimeError("gzip library is expected to recompress data") - try: - gzip.open(fullimagename_gz, "wb").write(decompressed) - except IOError: - raise IOError("unable to write gzipped \ - data to disk at %s" % self.data_home) - - return fullimagename - - def download_all(self, imgs=None): - """Download all data needed for the test/benchmarks - - :param imgs: list of files to download, by default all - :return: list of path with all files - """ - if not self._initialized: - self._initialize_data() - if not imgs: - imgs = self.all_data - res = [] - for fn in imgs: - logger.info("Downloading from silx.org: %s", fn) - res.append(self.getfile(fn)) - return res |