summaryrefslogtreecommitdiff
path: root/silx/utils/ExternalResources.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/utils/ExternalResources.py')
-rw-r--r--silx/utils/ExternalResources.py321
1 files changed, 321 insertions, 0 deletions
diff --git a/silx/utils/ExternalResources.py b/silx/utils/ExternalResources.py
new file mode 100644
index 0000000..7d9008b
--- /dev/null
+++ b/silx/utils/ExternalResources.py
@@ -0,0 +1,321 @@
+# coding: utf-8
+# /*##########################################################################
+#
+# Copyright (c) 2016-2018 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ###########################################################################*/
+"""Helper to access to external resources.
+"""
+
+__authors__ = ["Thomas Vincent", "J. Kieffer"]
+__license__ = "MIT"
+__date__ = "08/03/2019"
+
+
+import os
+import threading
+import json
+import logging
+import tempfile
+import unittest
+import six
+
+logger = logging.getLogger(__name__)
+
+
+class ExternalResources(object):
+ """Utility class which allows to download test-data from www.silx.org
+ and manage the temporary data during the tests.
+
+ """
+
+ def __init__(self, project,
+ url_base,
+ env_key=None,
+ timeout=60):
+ """Constructor of the class
+
+ :param str project: name of the project, like "silx"
+ :param str url_base: base URL for the data, like "http://www.silx.org/pub"
+ :param str env_key: name of the environment variable which contains the
+ test_data directory, like "SILX_DATA".
+ If None (default), then the name of the
+ environment variable is built from the project argument:
+ "<PROJECT>_DATA".
+ The environment variable is optional: in case it is not set,
+ a directory in the temporary folder is used.
+ :param timeout: time in seconds before it breaks
+ """
+ self.project = project
+ self._initialized = False
+ self.sem = threading.Semaphore()
+
+ self.env_key = env_key or (self.project.upper() + "_TESTDATA")
+ self.url_base = url_base
+ self.all_data = set()
+ self.timeout = timeout
+ self._data_home = None
+
+ @property
+ def data_home(self):
+ """Returns the data_home path and make sure it exists in the file
+ system."""
+ if self._data_home is not None:
+ return self._data_home
+
+ data_home = os.environ.get(self.env_key)
+ if data_home is None:
+ try:
+ import getpass
+ name = getpass.getuser()
+ except Exception:
+ if "getlogin" in dir(os):
+ name = os.getlogin()
+ elif "USER" in os.environ:
+ name = os.environ["USER"]
+ elif "USERNAME" in os.environ:
+ name = os.environ["USERNAME"]
+ else:
+ name = "uid" + str(os.getuid())
+
+ basename = "%s_testdata_%s" % (self.project, name)
+ data_home = os.path.join(tempfile.gettempdir(), basename)
+ if not os.path.exists(data_home):
+ os.makedirs(data_home)
+ self._data_home = data_home
+ return data_home
+
+ def _initialize_data(self):
+ """Initialize for downloading test data"""
+ if not self._initialized:
+ with self.sem:
+ if not self._initialized:
+ self.testdata = os.path.join(self.data_home, "all_testdata.json")
+ if os.path.exists(self.testdata):
+ with open(self.testdata) as f:
+ self.all_data = set(json.load(f))
+ self._initialized = True
+
+ def clean_up(self):
+ pass
+
+ def getfile(self, filename):
+ """Downloads the requested file from web-server available
+ at https://www.silx.org/pub/silx/
+
+ :param: relative name of the image.
+ :return: full path of the locally saved file.
+ """
+ logger.debug("ExternalResources.getfile('%s')", filename)
+
+ if not self._initialized:
+ self._initialize_data()
+
+ fullfilename = os.path.abspath(os.path.join(self.data_home, filename))
+
+ if not os.path.isfile(fullfilename):
+ logger.debug("Trying to download image %s, timeout set to %ss",
+ filename, self.timeout)
+ dictProxies = {}
+ if "http_proxy" in os.environ:
+ dictProxies['http'] = os.environ["http_proxy"]
+ dictProxies['https'] = os.environ["http_proxy"]
+ if "https_proxy" in os.environ:
+ dictProxies['https'] = os.environ["https_proxy"]
+ if dictProxies:
+ proxy_handler = six.moves.urllib.request.ProxyHandler(dictProxies)
+ opener = six.moves.urllib.request.build_opener(proxy_handler).open
+ else:
+ opener = six.moves.urllib.request.urlopen
+
+ logger.debug("wget %s/%s", self.url_base, filename)
+ try:
+ data = opener("%s/%s" % (self.url_base, filename),
+ data=None, timeout=self.timeout).read()
+ logger.info("Image %s successfully downloaded.", filename)
+ except six.moves.urllib.error.URLError:
+ raise unittest.SkipTest("network unreachable.")
+
+ if not os.path.isdir(os.path.dirname(fullfilename)):
+ # Create sub-directory if needed
+ os.makedirs(os.path.dirname(fullfilename))
+
+ try:
+ with open(fullfilename, "wb") as outfile:
+ outfile.write(data)
+ except IOError:
+ raise IOError("unable to write downloaded \
+ data to disk at %s" % self.data_home)
+
+ if not os.path.isfile(fullfilename):
+ raise RuntimeError(
+ "Could not automatically \
+ download test images %s!\n \ If you are behind a firewall, \
+ please set both environment variable http_proxy and https_proxy.\
+ This even works under windows ! \n \
+ Otherwise please try to download the images manually from \n%s/%s"
+ % (filename, self.url_base, filename))
+
+ if filename not in self.all_data:
+ self.all_data.add(filename)
+ image_list = list(self.all_data)
+ image_list.sort()
+ try:
+ with open(self.testdata, "w") as fp:
+ json.dump(image_list, fp, indent=4)
+ except IOError:
+ logger.debug("Unable to save JSON list")
+
+ return fullfilename
+
+ def getdir(self, dirname):
+ """Downloads the requested tarball from the server
+ https://www.silx.org/pub/silx/
+ and unzips it into the data directory
+
+ :param: relative name of the image.
+ :return: list of files with their full path.
+ """
+ lodn = dirname.lower()
+ if (lodn.endswith("tar") or lodn.endswith("tgz") or
+ lodn.endswith("tbz2") or lodn.endswith("tar.gz") or
+ lodn.endswith("tar.bz2")):
+ import tarfile
+ engine = tarfile.TarFile.open
+ elif lodn.endswith("zip"):
+ import zipfile
+ engine = zipfile.ZipFile
+ else:
+ raise RuntimeError("Unsupported archive format. Only tar and zip "
+ "are currently supported")
+ full_path = self.getfile(dirname)
+ with engine(full_path, mode="r") as fd:
+ output = os.path.join(self.data_home, dirname + "__content")
+ fd.extractall(output)
+ if lodn.endswith("zip"):
+ result = [os.path.join(output, i) for i in fd.namelist()]
+ else:
+ result = [os.path.join(output, i) for i in fd.getnames()]
+ return result
+
+ def get_file_and_repack(self, filename):
+ """
+ Download the requested file, decompress and repack it to bz2 and gz.
+
+ :param str filename: name of the image.
+ :rtype: str
+ :return: full path of the locally saved file
+ """
+ if not self._initialized:
+ self._initialize_data()
+ if filename not in self.all_data:
+ self.all_data.add(filename)
+ image_list = list(self.all_data)
+ image_list.sort()
+ try:
+ with open(self.testdata, "w") as fp:
+ json.dump(image_list, fp, indent=4)
+ except IOError:
+ logger.debug("Unable to save JSON list")
+ baseimage = os.path.basename(filename)
+ logger.info("UtilsTest.getimage('%s')" % baseimage)
+
+ if not os.path.exists(self.data_home):
+ os.makedirs(self.data_home)
+ fullimagename = os.path.abspath(os.path.join(self.data_home, baseimage))
+
+ if baseimage.endswith(".bz2"):
+ bzip2name = baseimage
+ basename = baseimage[:-4]
+ gzipname = basename + ".gz"
+ elif baseimage.endswith(".gz"):
+ gzipname = baseimage
+ basename = baseimage[:-3]
+ bzip2name = basename + ".bz2"
+ else:
+ basename = baseimage
+ gzipname = baseimage + "gz2"
+ bzip2name = basename + ".bz2"
+
+ fullimagename_gz = os.path.abspath(os.path.join(self.data_home, gzipname))
+ fullimagename_raw = os.path.abspath(os.path.join(self.data_home, basename))
+ fullimagename_bz2 = os.path.abspath(os.path.join(self.data_home, bzip2name))
+
+ # The files are recreated from the bz2 file
+ if not os.path.isfile(fullimagename_bz2):
+ self.getfile(bzip2name)
+ if not os.path.isfile(fullimagename_bz2):
+ raise RuntimeError("Could not automatically \
+ download test images %s!\n \ If you are behind a firewall, \
+ please set the environment variable http_proxy.\n \
+ Otherwise please try to download the images manually from \n \
+ %s" % (self.url_base, filename))
+
+ try:
+ import bz2
+ except ImportError:
+ raise RuntimeError("bz2 library is needed to decompress data")
+ try:
+ import gzip
+ except ImportError:
+ gzip = None
+
+ raw_file_exists = os.path.isfile(fullimagename_raw)
+ gz_file_exists = os.path.isfile(fullimagename_gz)
+ if not raw_file_exists or not gz_file_exists:
+ with open(fullimagename_bz2, "rb") as f:
+ data = f.read()
+ decompressed = bz2.decompress(data)
+
+ if not raw_file_exists:
+ try:
+ with open(fullimagename_raw, "wb") as fullimage:
+ fullimage.write(decompressed)
+ except IOError:
+ raise IOError("unable to write decompressed \
+ data to disk at %s" % self.data_home)
+
+ if not gz_file_exists:
+ if gzip is None:
+ raise RuntimeError("gzip library is expected to recompress data")
+ try:
+ gzip.open(fullimagename_gz, "wb").write(decompressed)
+ except IOError:
+ raise IOError("unable to write gzipped \
+ data to disk at %s" % self.data_home)
+
+ return fullimagename
+
+ def download_all(self, imgs=None):
+ """Download all data needed for the test/benchmarks
+
+ :param imgs: list of files to download, by default all
+ :return: list of path with all files
+ """
+ if not self._initialized:
+ self._initialize_data()
+ if not imgs:
+ imgs = self.all_data
+ res = []
+ for fn in imgs:
+ logger.info("Downloading from silx.org: %s", fn)
+ res.append(self.getfile(fn))
+ return res