summaryrefslogtreecommitdiff
path: root/tests/rest/media/v1/test_url_preview.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rest/media/v1/test_url_preview.py')
-rw-r--r--tests/rest/media/v1/test_url_preview.py131
1 files changed, 130 insertions, 1 deletions
diff --git a/tests/rest/media/v1/test_url_preview.py b/tests/rest/media/v1/test_url_preview.py
index 4d09b5d0..8698135a 100644
--- a/tests/rest/media/v1/test_url_preview.py
+++ b/tests/rest/media/v1/test_url_preview.py
@@ -21,11 +21,13 @@ from twisted.internet.error import DNSLookupError
from twisted.test.proto_helpers import AccumulatingProtocol
from synapse.config.oembed import OEmbedEndpointConfig
+from synapse.rest.media.v1.preview_url_resource import IMAGE_CACHE_EXPIRY_MS
from synapse.util.stringutils import parse_and_validate_mxc_uri
from tests import unittest
from tests.server import FakeTransport
from tests.test_utils import SMALL_PNG
+from tests.utils import MockClock
try:
import lxml
@@ -723,9 +725,107 @@ class URLPreviewTests(unittest.HomeserverTestCase):
},
)
+ def test_oembed_autodiscovery(self):
+ """
+ Autodiscovery works by finding the link in the HTML response and then requesting an oEmbed URL.
+ 1. Request a preview of a URL which is not known to the oEmbed code.
+ 2. It returns HTML including a link to an oEmbed preview.
+ 3. The oEmbed preview is requested and returns a URL for an image.
+ 4. The image is requested for thumbnailing.
+ """
+ # This is a little cheesy in that we use the www subdomain (which isn't the
+ # list of oEmbed patterns) to get "raw" HTML response.
+ self.lookups["www.twitter.com"] = [(IPv4Address, "10.1.2.3")]
+ self.lookups["publish.twitter.com"] = [(IPv4Address, "10.1.2.3")]
+ self.lookups["cdn.twitter.com"] = [(IPv4Address, "10.1.2.3")]
+
+ result = b"""
+ <link rel="alternate" type="application/json+oembed"
+ href="http://publish.twitter.com/oembed?url=http%3A%2F%2Fcdn.twitter.com%2Fmatrixdotorg%2Fstatus%2F12345&format=json"
+ title="matrixdotorg" />
+ """
+
+ channel = self.make_request(
+ "GET",
+ "preview_url?url=http://www.twitter.com/matrixdotorg/status/12345",
+ shorthand=False,
+ await_result=False,
+ )
+ self.pump()
+
+ client = self.reactor.tcpClients[0][2].buildProtocol(None)
+ server = AccumulatingProtocol()
+ server.makeConnection(FakeTransport(client, self.reactor))
+ client.makeConnection(FakeTransport(server, self.reactor))
+ client.dataReceived(
+ (
+ b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n"
+ b'Content-Type: text/html; charset="utf8"\r\n\r\n'
+ )
+ % (len(result),)
+ + result
+ )
+
+ self.pump()
+
+ # The oEmbed response.
+ result2 = {
+ "version": "1.0",
+ "type": "photo",
+ "url": "http://cdn.twitter.com/matrixdotorg",
+ }
+ oembed_content = json.dumps(result2).encode("utf-8")
+
+ # Ensure a second request is made to the oEmbed URL.
+ client = self.reactor.tcpClients[1][2].buildProtocol(None)
+ server = AccumulatingProtocol()
+ server.makeConnection(FakeTransport(client, self.reactor))
+ client.makeConnection(FakeTransport(server, self.reactor))
+ client.dataReceived(
+ (
+ b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n"
+ b'Content-Type: application/json; charset="utf8"\r\n\r\n'
+ )
+ % (len(oembed_content),)
+ + oembed_content
+ )
+
+ self.pump()
+
+ # Ensure the URL is what was requested.
+ self.assertIn(b"/oembed?", server.data)
+
+ # Ensure a third request is made to the photo URL.
+ client = self.reactor.tcpClients[2][2].buildProtocol(None)
+ server = AccumulatingProtocol()
+ server.makeConnection(FakeTransport(client, self.reactor))
+ client.makeConnection(FakeTransport(server, self.reactor))
+ client.dataReceived(
+ (
+ b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n"
+ b"Content-Type: image/png\r\n\r\n"
+ )
+ % (len(SMALL_PNG),)
+ + SMALL_PNG
+ )
+
+ self.pump()
+
+ # Ensure the URL is what was requested.
+ self.assertIn(b"/matrixdotorg", server.data)
+
+ self.assertEqual(channel.code, 200)
+ body = channel.json_body
+ self.assertEqual(
+ body["og:url"], "http://www.twitter.com/matrixdotorg/status/12345"
+ )
+ self.assertTrue(body["og:image"].startswith("mxc://"))
+ self.assertEqual(body["og:image:height"], 1)
+ self.assertEqual(body["og:image:width"], 1)
+ self.assertEqual(body["og:image:type"], "image/png")
+
def _download_image(self):
"""Downloads an image into the URL cache.
-
Returns:
A (host, media_id) tuple representing the MXC URI of the image.
"""
@@ -851,3 +951,32 @@ class URLPreviewTests(unittest.HomeserverTestCase):
404,
"URL cache thumbnail was unexpectedly retrieved from a storage provider",
)
+
+ def test_cache_expiry(self):
+ """Test that URL cache files and thumbnails are cleaned up properly on expiry."""
+ self.preview_url.clock = MockClock()
+
+ _host, media_id = self._download_image()
+
+ file_path = self.preview_url.filepaths.url_cache_filepath(media_id)
+ file_dirs = self.preview_url.filepaths.url_cache_filepath_dirs_to_delete(
+ media_id
+ )
+ thumbnail_dir = self.preview_url.filepaths.url_cache_thumbnail_directory(
+ media_id
+ )
+ thumbnail_dirs = self.preview_url.filepaths.url_cache_thumbnail_dirs_to_delete(
+ media_id
+ )
+
+ self.assertTrue(os.path.isfile(file_path))
+ self.assertTrue(os.path.isdir(thumbnail_dir))
+
+ self.preview_url.clock.advance_time_msec(IMAGE_CACHE_EXPIRY_MS + 1)
+ self.get_success(self.preview_url._expire_url_cache_data())
+
+ for path in [file_path] + file_dirs + [thumbnail_dir] + thumbnail_dirs:
+ self.assertFalse(
+ os.path.exists(path),
+ f"{os.path.relpath(path, self.media_store_path)} was not deleted",
+ )