summaryrefslogtreecommitdiff
path: root/synapse/rest/media/v1
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2021-11-08 14:53:03 +0100
committerAndrej Shadura <andrewsh@debian.org>2021-11-08 14:53:03 +0100
commit5a72e26a861b796b09c1b4b4b2bd5688a33d8df5 (patch)
tree6624948c82b5a926b56eb4f2365066a5f189d3be /synapse/rest/media/v1
parent62c1a069b73e89b50d80fbbb4bc49f0c247afcdf (diff)
New upstream version 1.46.0
Diffstat (limited to 'synapse/rest/media/v1')
-rw-r--r--synapse/rest/media/v1/filepath.py26
-rw-r--r--synapse/rest/media/v1/oembed.py13
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py113
3 files changed, 80 insertions, 72 deletions
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index 08bd85f6..bec77088 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -16,12 +16,15 @@
import functools
import os
import re
-from typing import Any, Callable, List
+from typing import Any, Callable, List, TypeVar, cast
NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
-def _wrap_in_base_path(func: Callable[..., str]) -> Callable[..., str]:
+F = TypeVar("F", bound=Callable[..., str])
+
+
+def _wrap_in_base_path(func: F) -> F:
"""Takes a function that returns a relative path and turns it into an
absolute path based on the location of the primary media store
"""
@@ -31,7 +34,7 @@ def _wrap_in_base_path(func: Callable[..., str]) -> Callable[..., str]:
path = func(self, *args, **kwargs)
return os.path.join(self.base_path, path)
- return _wrapped
+ return cast(F, _wrapped)
class MediaFilePaths:
@@ -45,23 +48,6 @@ class MediaFilePaths:
def __init__(self, primary_base_path: str):
self.base_path = primary_base_path
- def default_thumbnail_rel(
- self,
- default_top_level: str,
- default_sub_type: str,
- width: int,
- height: int,
- content_type: str,
- method: str,
- ) -> str:
- top_level_type, sub_type = content_type.split("/")
- file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
- return os.path.join(
- "default_thumbnails", default_top_level, default_sub_type, file_name
- )
-
- default_thumbnail = _wrap_in_base_path(default_thumbnail_rel)
-
def local_media_filepath_rel(self, media_id: str) -> str:
return os.path.join("local_content", media_id[0:2], media_id[2:4], media_id[4:])
diff --git a/synapse/rest/media/v1/oembed.py b/synapse/rest/media/v1/oembed.py
index 78b1603f..2a59552c 100644
--- a/synapse/rest/media/v1/oembed.py
+++ b/synapse/rest/media/v1/oembed.py
@@ -17,7 +17,6 @@ from typing import TYPE_CHECKING, List, Optional
import attr
-from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict
from synapse.util import json_decoder
@@ -48,7 +47,7 @@ class OEmbedProvider:
requesting/parsing oEmbed content.
"""
- def __init__(self, hs: "HomeServer", client: SimpleHttpClient):
+ def __init__(self, hs: "HomeServer"):
self._oembed_patterns = {}
for oembed_endpoint in hs.config.oembed.oembed_patterns:
api_endpoint = oembed_endpoint.api_endpoint
@@ -69,7 +68,6 @@ class OEmbedProvider:
# Iterate through each URL pattern and point it to the endpoint.
for pattern in oembed_endpoint.url_patterns:
self._oembed_patterns[pattern] = api_endpoint
- self._client = client
def get_oembed_url(self, url: str) -> Optional[str]:
"""
@@ -139,10 +137,11 @@ class OEmbedProvider:
# oEmbed responses *must* be UTF-8 according to the spec.
oembed = json_decoder.decode(raw_body.decode("utf-8"))
- # Ensure there's a version of 1.0.
- oembed_version = oembed["version"]
- if oembed_version != "1.0":
- raise RuntimeError(f"Invalid version: {oembed_version}")
+ # The version is a required string field, but not always provided,
+ # or sometimes provided as a float. Be lenient.
+ oembed_version = oembed.get("version", "1.0")
+ if oembed_version != "1.0" and oembed_version != 1:
+ raise RuntimeError(f"Invalid oEmbed version: {oembed_version}")
# Ensure the cache age is None or an int.
cache_age = oembed.get("cache_age")
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 1fe0fc8a..8ca97b5b 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import codecs
import datetime
import errno
import fnmatch
@@ -22,7 +23,7 @@ import re
import shutil
import sys
import traceback
-from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Set, Tuple, Union
from urllib import parse as urlparse
import attr
@@ -140,7 +141,7 @@ class PreviewUrlResource(DirectServeJsonResource):
self.primary_base_path = media_repo.primary_base_path
self.media_storage = media_storage
- self._oembed = OEmbedProvider(hs, self.client)
+ self._oembed = OEmbedProvider(hs)
# We run the background jobs if we're the instance specified (or no
# instance is specified, where we assume there is only one instance
@@ -295,8 +296,7 @@ class PreviewUrlResource(DirectServeJsonResource):
with open(media_info.filename, "rb") as file:
body = file.read()
- encoding = get_html_media_encoding(body, media_info.media_type)
- tree = decode_body(body, encoding)
+ tree = decode_body(body, media_info.uri, media_info.media_type)
if tree is not None:
# Check if this HTML document points to oEmbed information and
# defer to that.
@@ -632,16 +632,27 @@ class PreviewUrlResource(DirectServeJsonResource):
logger.debug("No media removed from url cache")
-def get_html_media_encoding(body: bytes, content_type: str) -> str:
+def _normalise_encoding(encoding: str) -> Optional[str]:
+ """Use the Python codec's name as the normalised entry."""
+ try:
+ return codecs.lookup(encoding).name
+ except LookupError:
+ return None
+
+
+def get_html_media_encodings(body: bytes, content_type: Optional[str]) -> Iterable[str]:
"""
- Get the encoding of the body based on the (presumably) HTML body or media_type.
+ Get potential encoding of the body based on the (presumably) HTML body or the content-type header.
The precedence used for finding a character encoding is:
- 1. meta tag with a charset declared.
+ 1. <meta> tag with a charset declared.
2. The XML document's character encoding attribute.
3. The Content-Type header.
- 4. Fallback to UTF-8.
+ 4. Fallback to utf-8.
+ 5. Fallback to windows-1252.
+
+ This roughly follows the algorithm used by BeautifulSoup's bs4.dammit.EncodingDetector.
Args:
body: The HTML document, as bytes.
@@ -650,39 +661,55 @@ def get_html_media_encoding(body: bytes, content_type: str) -> str:
Returns:
The character encoding of the body, as a string.
"""
+ # There's no point in returning an encoding more than once.
+ attempted_encodings: Set[str] = set()
+
# Limit searches to the first 1kb, since it ought to be at the top.
body_start = body[:1024]
- # Let's try and figure out if it has an encoding set in a meta tag.
+ # Check if it has an encoding set in a meta tag.
match = _charset_match.search(body_start)
if match:
- return match.group(1).decode("ascii")
+ encoding = _normalise_encoding(match.group(1).decode("ascii"))
+ if encoding:
+ attempted_encodings.add(encoding)
+ yield encoding
# TODO Support <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
- # If we didn't find a match, see if it an XML document with an encoding.
+ # Check if it has an XML document with an encoding.
match = _xml_encoding_match.match(body_start)
if match:
- return match.group(1).decode("ascii")
-
- # If we don't find a match, we'll look at the HTTP Content-Type, and
- # if that doesn't exist, we'll fall back to UTF-8.
- content_match = _content_type_match.match(content_type)
- if content_match:
- return content_match.group(1)
-
- return "utf-8"
+ encoding = _normalise_encoding(match.group(1).decode("ascii"))
+ if encoding and encoding not in attempted_encodings:
+ attempted_encodings.add(encoding)
+ yield encoding
+
+ # Check the HTTP Content-Type header for a character set.
+ if content_type:
+ content_match = _content_type_match.match(content_type)
+ if content_match:
+ encoding = _normalise_encoding(content_match.group(1))
+ if encoding and encoding not in attempted_encodings:
+ attempted_encodings.add(encoding)
+ yield encoding
+
+ # Finally, fallback to UTF-8, then windows-1252.
+ for fallback in ("utf-8", "cp1252"):
+ if fallback not in attempted_encodings:
+ yield fallback
def decode_body(
- body: bytes, request_encoding: Optional[str] = None
+ body: bytes, uri: str, content_type: Optional[str] = None
) -> Optional["etree.Element"]:
"""
This uses lxml to parse the HTML document.
Args:
body: The HTML document, as bytes.
- request_encoding: The character encoding of the body, as a string.
+ uri: The URI used to download the body.
+ content_type: The Content-Type header.
Returns:
The parsed HTML body, or None if an error occurred during processed.
@@ -691,32 +718,28 @@ def decode_body(
if not body:
return None
- from lxml import etree
-
- # Create an HTML parser. If this fails, log and return no metadata.
- try:
- parser = etree.HTMLParser(recover=True, encoding=request_encoding)
- except LookupError:
- # blindly consider the encoding as utf-8.
- parser = etree.HTMLParser(recover=True, encoding="utf-8")
- except Exception as e:
- logger.warning("Unable to create HTML parser: %s" % (e,))
+ # The idea here is that multiple encodings are tried until one works.
+ # Unfortunately the result is never used and then LXML will decode the string
+ # again with the found encoding.
+ for encoding in get_html_media_encodings(body, content_type):
+ try:
+ body.decode(encoding)
+ except Exception:
+ pass
+ else:
+ break
+ else:
+ logger.warning("Unable to decode HTML body for %s", uri)
return None
- def _attempt_decode_body(
- body_attempt: Union[bytes, str]
- ) -> Optional["etree.Element"]:
- # Attempt to parse the body. Returns None if the body was successfully
- # parsed, but no tree was found.
- return etree.fromstring(body_attempt, parser)
+ from lxml import etree
- # Attempt to parse the body. If this fails, log and return no metadata.
- try:
- return _attempt_decode_body(body)
- except UnicodeDecodeError:
- # blindly try decoding the body as utf-8, which seems to fix
- # the charset mismatches on https://google.com
- return _attempt_decode_body(body.decode("utf-8", "ignore"))
+ # Create an HTML parser.
+ parser = etree.HTMLParser(recover=True, encoding=encoding)
+
+ # Attempt to parse the body. Returns None if the body was successfully
+ # parsed, but no tree was found.
+ return etree.fromstring(body, parser)
def _calc_og(tree: "etree.Element", media_uri: str) -> Dict[str, Optional[str]]: