diff options
author | Andrej Shadura <andrewsh@debian.org> | 2021-11-08 14:53:03 +0100 |
---|---|---|
committer | Andrej Shadura <andrewsh@debian.org> | 2021-11-08 14:53:03 +0100 |
commit | 5a72e26a861b796b09c1b4b4b2bd5688a33d8df5 (patch) | |
tree | 6624948c82b5a926b56eb4f2365066a5f189d3be /synapse/rest/media/v1 | |
parent | 62c1a069b73e89b50d80fbbb4bc49f0c247afcdf (diff) |
New upstream version 1.46.0
Diffstat (limited to 'synapse/rest/media/v1')
-rw-r--r-- | synapse/rest/media/v1/filepath.py | 26 | ||||
-rw-r--r-- | synapse/rest/media/v1/oembed.py | 13 | ||||
-rw-r--r-- | synapse/rest/media/v1/preview_url_resource.py | 113 |
3 files changed, 80 insertions, 72 deletions
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index 08bd85f6..bec77088 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -16,12 +16,15 @@ import functools import os import re -from typing import Any, Callable, List +from typing import Any, Callable, List, TypeVar, cast NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d") -def _wrap_in_base_path(func: Callable[..., str]) -> Callable[..., str]: +F = TypeVar("F", bound=Callable[..., str]) + + +def _wrap_in_base_path(func: F) -> F: """Takes a function that returns a relative path and turns it into an absolute path based on the location of the primary media store """ @@ -31,7 +34,7 @@ def _wrap_in_base_path(func: Callable[..., str]) -> Callable[..., str]: path = func(self, *args, **kwargs) return os.path.join(self.base_path, path) - return _wrapped + return cast(F, _wrapped) class MediaFilePaths: @@ -45,23 +48,6 @@ class MediaFilePaths: def __init__(self, primary_base_path: str): self.base_path = primary_base_path - def default_thumbnail_rel( - self, - default_top_level: str, - default_sub_type: str, - width: int, - height: int, - content_type: str, - method: str, - ) -> str: - top_level_type, sub_type = content_type.split("/") - file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method) - return os.path.join( - "default_thumbnails", default_top_level, default_sub_type, file_name - ) - - default_thumbnail = _wrap_in_base_path(default_thumbnail_rel) - def local_media_filepath_rel(self, media_id: str) -> str: return os.path.join("local_content", media_id[0:2], media_id[2:4], media_id[4:]) diff --git a/synapse/rest/media/v1/oembed.py b/synapse/rest/media/v1/oembed.py index 78b1603f..2a59552c 100644 --- a/synapse/rest/media/v1/oembed.py +++ b/synapse/rest/media/v1/oembed.py @@ -17,7 +17,6 @@ from typing import TYPE_CHECKING, List, Optional import attr -from synapse.http.client import SimpleHttpClient from synapse.types import JsonDict from synapse.util import json_decoder @@ -48,7 +47,7 @@ class OEmbedProvider: requesting/parsing oEmbed content. """ - def __init__(self, hs: "HomeServer", client: SimpleHttpClient): + def __init__(self, hs: "HomeServer"): self._oembed_patterns = {} for oembed_endpoint in hs.config.oembed.oembed_patterns: api_endpoint = oembed_endpoint.api_endpoint @@ -69,7 +68,6 @@ class OEmbedProvider: # Iterate through each URL pattern and point it to the endpoint. for pattern in oembed_endpoint.url_patterns: self._oembed_patterns[pattern] = api_endpoint - self._client = client def get_oembed_url(self, url: str) -> Optional[str]: """ @@ -139,10 +137,11 @@ class OEmbedProvider: # oEmbed responses *must* be UTF-8 according to the spec. oembed = json_decoder.decode(raw_body.decode("utf-8")) - # Ensure there's a version of 1.0. - oembed_version = oembed["version"] - if oembed_version != "1.0": - raise RuntimeError(f"Invalid version: {oembed_version}") + # The version is a required string field, but not always provided, + # or sometimes provided as a float. Be lenient. + oembed_version = oembed.get("version", "1.0") + if oembed_version != "1.0" and oembed_version != 1: + raise RuntimeError(f"Invalid oEmbed version: {oembed_version}") # Ensure the cache age is None or an int. cache_age = oembed.get("cache_age") diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 1fe0fc8a..8ca97b5b 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import codecs import datetime import errno import fnmatch @@ -22,7 +23,7 @@ import re import shutil import sys import traceback -from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Tuple, Union +from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Set, Tuple, Union from urllib import parse as urlparse import attr @@ -140,7 +141,7 @@ class PreviewUrlResource(DirectServeJsonResource): self.primary_base_path = media_repo.primary_base_path self.media_storage = media_storage - self._oembed = OEmbedProvider(hs, self.client) + self._oembed = OEmbedProvider(hs) # We run the background jobs if we're the instance specified (or no # instance is specified, where we assume there is only one instance @@ -295,8 +296,7 @@ class PreviewUrlResource(DirectServeJsonResource): with open(media_info.filename, "rb") as file: body = file.read() - encoding = get_html_media_encoding(body, media_info.media_type) - tree = decode_body(body, encoding) + tree = decode_body(body, media_info.uri, media_info.media_type) if tree is not None: # Check if this HTML document points to oEmbed information and # defer to that. @@ -632,16 +632,27 @@ class PreviewUrlResource(DirectServeJsonResource): logger.debug("No media removed from url cache") -def get_html_media_encoding(body: bytes, content_type: str) -> str: +def _normalise_encoding(encoding: str) -> Optional[str]: + """Use the Python codec's name as the normalised entry.""" + try: + return codecs.lookup(encoding).name + except LookupError: + return None + + +def get_html_media_encodings(body: bytes, content_type: Optional[str]) -> Iterable[str]: """ - Get the encoding of the body based on the (presumably) HTML body or media_type. + Get potential encoding of the body based on the (presumably) HTML body or the content-type header. The precedence used for finding a character encoding is: - 1. meta tag with a charset declared. + 1. <meta> tag with a charset declared. 2. The XML document's character encoding attribute. 3. The Content-Type header. - 4. Fallback to UTF-8. + 4. Fallback to utf-8. + 5. Fallback to windows-1252. + + This roughly follows the algorithm used by BeautifulSoup's bs4.dammit.EncodingDetector. Args: body: The HTML document, as bytes. @@ -650,39 +661,55 @@ def get_html_media_encoding(body: bytes, content_type: str) -> str: Returns: The character encoding of the body, as a string. """ + # There's no point in returning an encoding more than once. + attempted_encodings: Set[str] = set() + # Limit searches to the first 1kb, since it ought to be at the top. body_start = body[:1024] - # Let's try and figure out if it has an encoding set in a meta tag. + # Check if it has an encoding set in a meta tag. match = _charset_match.search(body_start) if match: - return match.group(1).decode("ascii") + encoding = _normalise_encoding(match.group(1).decode("ascii")) + if encoding: + attempted_encodings.add(encoding) + yield encoding # TODO Support <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> - # If we didn't find a match, see if it an XML document with an encoding. + # Check if it has an XML document with an encoding. match = _xml_encoding_match.match(body_start) if match: - return match.group(1).decode("ascii") - - # If we don't find a match, we'll look at the HTTP Content-Type, and - # if that doesn't exist, we'll fall back to UTF-8. - content_match = _content_type_match.match(content_type) - if content_match: - return content_match.group(1) - - return "utf-8" + encoding = _normalise_encoding(match.group(1).decode("ascii")) + if encoding and encoding not in attempted_encodings: + attempted_encodings.add(encoding) + yield encoding + + # Check the HTTP Content-Type header for a character set. + if content_type: + content_match = _content_type_match.match(content_type) + if content_match: + encoding = _normalise_encoding(content_match.group(1)) + if encoding and encoding not in attempted_encodings: + attempted_encodings.add(encoding) + yield encoding + + # Finally, fallback to UTF-8, then windows-1252. + for fallback in ("utf-8", "cp1252"): + if fallback not in attempted_encodings: + yield fallback def decode_body( - body: bytes, request_encoding: Optional[str] = None + body: bytes, uri: str, content_type: Optional[str] = None ) -> Optional["etree.Element"]: """ This uses lxml to parse the HTML document. Args: body: The HTML document, as bytes. - request_encoding: The character encoding of the body, as a string. + uri: The URI used to download the body. + content_type: The Content-Type header. Returns: The parsed HTML body, or None if an error occurred during processed. @@ -691,32 +718,28 @@ def decode_body( if not body: return None - from lxml import etree - - # Create an HTML parser. If this fails, log and return no metadata. - try: - parser = etree.HTMLParser(recover=True, encoding=request_encoding) - except LookupError: - # blindly consider the encoding as utf-8. - parser = etree.HTMLParser(recover=True, encoding="utf-8") - except Exception as e: - logger.warning("Unable to create HTML parser: %s" % (e,)) + # The idea here is that multiple encodings are tried until one works. + # Unfortunately the result is never used and then LXML will decode the string + # again with the found encoding. + for encoding in get_html_media_encodings(body, content_type): + try: + body.decode(encoding) + except Exception: + pass + else: + break + else: + logger.warning("Unable to decode HTML body for %s", uri) return None - def _attempt_decode_body( - body_attempt: Union[bytes, str] - ) -> Optional["etree.Element"]: - # Attempt to parse the body. Returns None if the body was successfully - # parsed, but no tree was found. - return etree.fromstring(body_attempt, parser) + from lxml import etree - # Attempt to parse the body. If this fails, log and return no metadata. - try: - return _attempt_decode_body(body) - except UnicodeDecodeError: - # blindly try decoding the body as utf-8, which seems to fix - # the charset mismatches on https://google.com - return _attempt_decode_body(body.decode("utf-8", "ignore")) + # Create an HTML parser. + parser = etree.HTMLParser(recover=True, encoding=encoding) + + # Attempt to parse the body. Returns None if the body was successfully + # parsed, but no tree was found. + return etree.fromstring(body, parser) def _calc_og(tree: "etree.Element", media_uri: str) -> Dict[str, Optional[str]]: |