# coding: utf-8 # /*########################################################################## # # Copyright (c) 2016-2021 European Synchrotron Radiation Facility # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # # ###########################################################################*/ """Helper to access to external resources. """ __authors__ = ["Thomas Vincent", "J. Kieffer"] __license__ = "MIT" __date__ = "08/03/2019" import os import threading import json import logging import tempfile import unittest import urllib.request import urllib.error logger = logging.getLogger(__name__) class ExternalResources(object): """Utility class which allows to download test-data from www.silx.org and manage the temporary data during the tests. """ def __init__(self, project, url_base, env_key=None, timeout=60): """Constructor of the class :param str project: name of the project, like "silx" :param str url_base: base URL for the data, like "http://www.silx.org/pub" :param str env_key: name of the environment variable which contains the test_data directory, like "SILX_DATA". If None (default), then the name of the environment variable is built from the project argument: "_DATA". The environment variable is optional: in case it is not set, a directory in the temporary folder is used. :param timeout: time in seconds before it breaks """ self.project = project self._initialized = False self.sem = threading.Semaphore() self.env_key = env_key or (self.project.upper() + "_TESTDATA") self.url_base = url_base self.all_data = set() self.timeout = timeout self._data_home = None @property def data_home(self): """Returns the data_home path and make sure it exists in the file system.""" if self._data_home is not None: return self._data_home data_home = os.environ.get(self.env_key) if data_home is None: try: import getpass name = getpass.getuser() except Exception: if "getlogin" in dir(os): name = os.getlogin() elif "USER" in os.environ: name = os.environ["USER"] elif "USERNAME" in os.environ: name = os.environ["USERNAME"] else: name = "uid" + str(os.getuid()) basename = "%s_testdata_%s" % (self.project, name) data_home = os.path.join(tempfile.gettempdir(), basename) if not os.path.exists(data_home): os.makedirs(data_home) self._data_home = data_home return data_home def _initialize_data(self): """Initialize for downloading test data""" if not self._initialized: with self.sem: if not self._initialized: self.testdata = os.path.join(self.data_home, "all_testdata.json") if os.path.exists(self.testdata): with open(self.testdata) as f: self.all_data = set(json.load(f)) self._initialized = True def clean_up(self): pass def getfile(self, filename): """Downloads the requested file from web-server available at https://www.silx.org/pub/silx/ :param: relative name of the image. :return: full path of the locally saved file. """ logger.debug("ExternalResources.getfile('%s')", filename) if not self._initialized: self._initialize_data() fullfilename = os.path.abspath(os.path.join(self.data_home, filename)) if not os.path.isfile(fullfilename): logger.debug("Trying to download image %s, timeout set to %ss", filename, self.timeout) dictProxies = {} if "http_proxy" in os.environ: dictProxies['http'] = os.environ["http_proxy"] dictProxies['https'] = os.environ["http_proxy"] if "https_proxy" in os.environ: dictProxies['https'] = os.environ["https_proxy"] if dictProxies: proxy_handler = urllib.request.ProxyHandler(dictProxies) opener = urllib.request.build_opener(proxy_handler).open else: opener = urllib.request.urlopen logger.debug("wget %s/%s", self.url_base, filename) try: data = opener("%s/%s" % (self.url_base, filename), data=None, timeout=self.timeout).read() logger.info("Image %s successfully downloaded.", filename) except urllib.error.URLError: raise unittest.SkipTest("network unreachable.") if not os.path.isdir(os.path.dirname(fullfilename)): # Create sub-directory if needed os.makedirs(os.path.dirname(fullfilename)) try: with open(fullfilename, "wb") as outfile: outfile.write(data) except IOError: raise IOError("unable to write downloaded \ data to disk at %s" % self.data_home) if not os.path.isfile(fullfilename): raise RuntimeError( """Could not automatically download test images %s! If you are behind a firewall, please set both environment variable http_proxy and https_proxy. This even works under windows ! Otherwise please try to download the images manually from %s/%s""" % (filename, self.url_base, filename)) if filename not in self.all_data: self.all_data.add(filename) image_list = list(self.all_data) image_list.sort() try: with open(self.testdata, "w") as fp: json.dump(image_list, fp, indent=4) except IOError: logger.debug("Unable to save JSON list") return fullfilename def getdir(self, dirname): """Downloads the requested tarball from the server https://www.silx.org/pub/silx/ and unzips it into the data directory :param: relative name of the image. :return: list of files with their full path. """ lodn = dirname.lower() if (lodn.endswith("tar") or lodn.endswith("tgz") or lodn.endswith("tbz2") or lodn.endswith("tar.gz") or lodn.endswith("tar.bz2")): import tarfile engine = tarfile.TarFile.open elif lodn.endswith("zip"): import zipfile engine = zipfile.ZipFile else: raise RuntimeError("Unsupported archive format. Only tar and zip " "are currently supported") full_path = self.getfile(dirname) with engine(full_path, mode="r") as fd: output = os.path.join(self.data_home, dirname + "__content") fd.extractall(output) if lodn.endswith("zip"): result = [os.path.join(output, i) for i in fd.namelist()] else: result = [os.path.join(output, i) for i in fd.getnames()] return result def get_file_and_repack(self, filename): """ Download the requested file, decompress and repack it to bz2 and gz. :param str filename: name of the image. :rtype: str :return: full path of the locally saved file """ if not self._initialized: self._initialize_data() if filename not in self.all_data: self.all_data.add(filename) image_list = list(self.all_data) image_list.sort() try: with open(self.testdata, "w") as fp: json.dump(image_list, fp, indent=4) except IOError: logger.debug("Unable to save JSON list") baseimage = os.path.basename(filename) logger.info("UtilsTest.getimage('%s')" % baseimage) if not os.path.exists(self.data_home): os.makedirs(self.data_home) fullimagename = os.path.abspath(os.path.join(self.data_home, baseimage)) if baseimage.endswith(".bz2"): bzip2name = baseimage basename = baseimage[:-4] gzipname = basename + ".gz" elif baseimage.endswith(".gz"): gzipname = baseimage basename = baseimage[:-3] bzip2name = basename + ".bz2" else: basename = baseimage gzipname = baseimage + "gz2" bzip2name = basename + ".bz2" fullimagename_gz = os.path.abspath(os.path.join(self.data_home, gzipname)) fullimagename_raw = os.path.abspath(os.path.join(self.data_home, basename)) fullimagename_bz2 = os.path.abspath(os.path.join(self.data_home, bzip2name)) # The files are recreated from the bz2 file if not os.path.isfile(fullimagename_bz2): self.getfile(bzip2name) if not os.path.isfile(fullimagename_bz2): raise RuntimeError( """Could not automatically download test images %s! If you are behind a firewall, please set the environment variable http_proxy. Otherwise please try to download the images manually from %s""" % (self.url_base, filename)) try: import bz2 except ImportError: raise RuntimeError("bz2 library is needed to decompress data") try: import gzip except ImportError: gzip = None raw_file_exists = os.path.isfile(fullimagename_raw) gz_file_exists = os.path.isfile(fullimagename_gz) if not raw_file_exists or not gz_file_exists: with open(fullimagename_bz2, "rb") as f: data = f.read() decompressed = bz2.decompress(data) if not raw_file_exists: try: with open(fullimagename_raw, "wb") as fullimage: fullimage.write(decompressed) except IOError: raise IOError("unable to write decompressed \ data to disk at %s" % self.data_home) if not gz_file_exists: if gzip is None: raise RuntimeError("gzip library is expected to recompress data") try: gzip.open(fullimagename_gz, "wb").write(decompressed) except IOError: raise IOError("unable to write gzipped \ data to disk at %s" % self.data_home) return fullimagename def download_all(self, imgs=None): """Download all data needed for the test/benchmarks :param imgs: list of files to download, by default all :return: list of path with all files """ if not self._initialized: self._initialize_data() if not imgs: imgs = self.all_data res = [] for fn in imgs: logger.info("Downloading from silx.org: %s", fn) res.append(self.getfile(fn)) return res