summaryrefslogtreecommitdiff
path: root/silx/io/url.py
diff options
context:
space:
mode:
Diffstat (limited to 'silx/io/url.py')
-rw-r--r--silx/io/url.py366
1 files changed, 366 insertions, 0 deletions
diff --git a/silx/io/url.py b/silx/io/url.py
new file mode 100644
index 0000000..ba62a81
--- /dev/null
+++ b/silx/io/url.py
@@ -0,0 +1,366 @@
+# coding: utf-8
+# /*##########################################################################
+#
+# Copyright (c) 2016-2017 European Synchrotron Radiation Facility
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+# ###########################################################################*/
+"""URL module"""
+
+__authors__ = ["V. Valls"]
+__license__ = "MIT"
+__date__ = "29/01/2018"
+
+import logging
+from silx.third_party import six
+parse = six.moves.urllib.parse
+
+
+_logger = logging.getLogger(__name__)
+
+
+class DataUrl(object):
+ """Non-mutable object to parse a string representing a resource data
+ locator.
+
+ It supports:
+
+ - path to file and path inside file to the data
+ - data slicing
+ - fabio or silx access to the data
+ - absolute and relative file access
+
+ >>> # fabio access using absolute path
+ >>> DataUrl("fabio:///data/image.edf?slice=2")
+ >>> DataUrl("fabio:///C:/data/image.edf?slice=2")
+
+ >>> # silx access using absolute path
+ >>> DataUrl("silx:///data/image.h5?path=/data/dataset&slice=1,5")
+ >>> DataUrl("silx:///data/image.edf?path=/scan_0/detector/data")
+ >>> DataUrl("silx:///C:/data/image.edf?path=/scan_0/detector/data")
+
+ >>> # `path=` can be omited if there is no other query keys
+ >>> DataUrl("silx:///data/image.h5?/data/dataset")
+ >>> # is the same as
+ >>> DataUrl("silx:///data/image.h5?path=/data/dataset")
+
+ >>> # `::` can be used instead of `?` which can be useful with shell in
+ >>> # command lines
+ >>> DataUrl("silx:///data/image.h5::/data/dataset")
+ >>> # is the same as
+ >>> DataUrl("silx:///data/image.h5?/data/dataset")
+
+ >>> # Relative path access
+ >>> DataUrl("silx:./image.h5")
+ >>> DataUrl("fabio:./image.edf")
+ >>> DataUrl("silx:image.h5")
+ >>> DataUrl("fabio:image.edf")
+
+ >>> # Is also support parsing of file access for convenience
+ >>> DataUrl("./foo/bar/image.edf")
+ >>> DataUrl("C:/data/")
+
+ :param str path: Path representing a link to a data. If specified, other
+ arguments are not used.
+ :param str file_path: Link to the file containing the the data.
+ None if there is no data selection.
+ :param str data_path: Data selection applyed to the data file selected.
+ None if there is no data selection.
+ :param Tuple[int,slice,Ellipse] data_slice: Slicing applyed of the selected
+ data. None if no slicing applyed.
+ :param Union[str,None] scheme: Scheme of the URL. "silx", "fabio"
+ is supported. Other strings can be provided, but :meth:`is_valid` will
+ be false.
+ """
+ def __init__(self, path=None, file_path=None, data_path=None, data_slice=None, scheme=None):
+ self.__is_valid = False
+ if path is not None:
+ assert(file_path is None)
+ assert(data_path is None)
+ assert(data_slice is None)
+ assert(scheme is None)
+ self.__parse_from_path(path)
+ else:
+ self.__file_path = file_path
+ self.__data_path = data_path
+ self.__data_slice = data_slice
+ self.__scheme = scheme
+ self.__path = None
+ self.__check_validity()
+
+ def __eq__(self, other):
+ if self.is_valid() != other.is_valid():
+ return False
+ if self.is_valid():
+ if self.__scheme != other.scheme():
+ return False
+ if self.__file_path != other.file_path():
+ return False
+ if self.__data_path != other.data_path():
+ return False
+ if self.__data_slice != other.data_slice():
+ return False
+ return True
+ else:
+ return self.__path == other.path()
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __repr__(self):
+ return str(self)
+
+ def __str__(self):
+ if self.is_valid() or self.__path is None:
+ def quote_string(string):
+ if isinstance(string, six.string_types):
+ return "'%s'" % string
+ else:
+ return string
+
+ template = "DataUrl(valid=%s, scheme=%s, file_path=%s, data_path=%s, data_slice=%s)"
+ return template % (self.__is_valid,
+ quote_string(self.__scheme),
+ quote_string(self.__file_path),
+ quote_string(self.__data_path),
+ self.__data_slice)
+ else:
+ template = "DataUrl(valid=%s, string=%s)"
+ return template % (self.__is_valid, self.__path)
+
+ def __check_validity(self):
+ """Check the validity of the attributes."""
+ if self.__file_path in [None, ""]:
+ self.__is_valid = False
+ return
+
+ if self.__scheme is None:
+ self.__is_valid = True
+ elif self.__scheme == "fabio":
+ self.__is_valid = self.__data_path is None
+ elif self.__scheme == "silx":
+ # If there is a slice you must have a data path
+ # But you can have a data path without slice
+ slice_implies_data = (self.__data_path is None and self.__data_slice is None) or self.__data_path is not None
+ self.__is_valid = slice_implies_data
+ else:
+ self.__is_valid = False
+
+ @staticmethod
+ def _parse_slice(slice_string):
+ """Parse a slicing sequence and return an associated tuple.
+
+ It supports a sequence of `...`, `:`, and integers separated by a coma.
+
+ :rtype: tuple
+ """
+ def str_to_slice(string):
+ if string == "...":
+ return Ellipsis
+ elif string == ":":
+ return slice(None)
+ else:
+ return int(string)
+
+ if slice_string == "":
+ raise ValueError("An empty slice is not valid")
+
+ tokens = slice_string.split(",")
+ data_slice = []
+ for t in tokens:
+ try:
+ data_slice.append(str_to_slice(t))
+ except ValueError:
+ raise ValueError("'%s' is not a valid slicing" % t)
+ return tuple(data_slice)
+
+ def __parse_from_path(self, path):
+ """Parse the path and initialize attributes.
+
+ :param str path: Path representing the URL.
+ """
+ self.__path = path
+ path = path.replace("::", "?", 1)
+ url = parse.urlparse(path)
+
+ is_valid = True
+
+ if len(url.scheme) <= 2:
+ # Windows driver
+ scheme = None
+ pos = self.__path.index(url.path)
+ file_path = self.__path[0:pos] + url.path
+ else:
+ scheme = url.scheme if url.scheme is not "" else None
+ file_path = url.path
+
+ # Check absolute windows path
+ if len(file_path) > 2 and file_path[0] == '/':
+ if file_path[1] == ":" or file_path[2] == ":":
+ file_path = file_path[1:]
+
+ self.__scheme = scheme
+ self.__file_path = file_path
+
+ query = parse.parse_qsl(url.query, keep_blank_values=True)
+ if len(query) == 1 and query[0][1] == "":
+ # there is no query keys
+ data_path = query[0][0]
+ data_slice = None
+ else:
+ merged_query = {}
+ for name, value in query:
+ if name in query:
+ merged_query[name].append(value)
+ else:
+ merged_query[name] = [value]
+
+ def pop_single_value(merged_query, name):
+ if name in merged_query:
+ values = merged_query.pop(name)
+ if len(values) > 1:
+ _logger.warning("More than one query key named '%s'. The last one is used.", name)
+ value = values[-1]
+ else:
+ value = None
+ return value
+
+ data_path = pop_single_value(merged_query, "path")
+ data_slice = pop_single_value(merged_query, "slice")
+ if data_slice is not None:
+ try:
+ data_slice = self._parse_slice(data_slice)
+ except ValueError:
+ is_valid = False
+ data_slice = None
+
+ for key in merged_query.keys():
+ _logger.warning("Query key %s unsupported. Key skipped.", key)
+
+ self.__data_path = data_path
+ self.__data_slice = data_slice
+
+ if is_valid:
+ self.__check_validity()
+ else:
+ self.__is_valid = False
+
+ def is_valid(self):
+ """Returns true if the URL is valid. Else attributes can be None.
+
+ :rtype: bool
+ """
+ return self.__is_valid
+
+ def path(self):
+ """Returns the string representing the URL.
+
+ :rtype: str
+ """
+ if self.__path is not None:
+ return self.__path
+
+ def slice_to_string(data_slice):
+ if data_slice == Ellipsis:
+ return "..."
+ elif data_slice == slice(None):
+ return ":"
+ elif isinstance(data_slice, int):
+ return str(data_slice)
+ else:
+ raise TypeError("Unexpected slicing type. Found %s" % type(data_slice))
+
+ if self.__data_path is not None and self.__data_slice is None:
+ query = self.__data_path
+ else:
+ queries = []
+ if self.__data_path is not None:
+ queries.append("path=" + self.__data_path)
+ if self.__data_slice is not None:
+ data_slice = ",".join([slice_to_string(s) for s in self.__data_slice])
+ queries.append("slice=" + data_slice)
+ query = "&".join(queries)
+
+ path = ""
+ if self.__file_path is not None:
+ path += self.__file_path
+
+ if query != "":
+ path = path + "?" + query
+
+ if self.__scheme is not None:
+ if self.is_absolute():
+ if path.startswith("/"):
+ path = self.__scheme + "://" + path
+ else:
+ path = self.__scheme + ":///" + path
+ else:
+ path = self.__scheme + ":" + path
+
+ return path
+
+ def is_absolute(self):
+ """Returns true if the file path is an absolute path.
+
+ :rtype: bool
+ """
+ file_path = self.file_path()
+ if len(file_path) > 0:
+ if file_path[0] == "/":
+ return True
+ if len(file_path) > 2:
+ # Windows
+ if file_path[1] == ":" or file_path[2] == ":":
+ return True
+ elif len(file_path) > 1:
+ # Windows
+ if file_path[1] == ":":
+ return True
+ return False
+
+ def file_path(self):
+ """Returns the path to the file containing the data.
+
+ :rtype: str
+ """
+ return self.__file_path
+
+ def data_path(self):
+ """Returns the path inside the file to the data.
+
+ :rtype: str
+ """
+ return self.__data_path
+
+ def data_slice(self):
+ """Returns the slicing applyed to the data.
+
+ It is a tuple containing numbers, slice or ellipses.
+
+ :rtype: Tuple[int, slice, Ellipse]
+ """
+ return self.__data_slice
+
+ def scheme(self):
+ """Returns the scheme. It can be None if no scheme is specified.
+
+ :rtype: Union[str, None]
+ """
+ return self.__scheme