preview: Use a dataclass for the embed data.

This is significantly cleaner than passing around `Dict[str, Any]` all
of the time.
This commit is contained in:
Alex Vandiver
2022-04-14 12:52:41 -07:00
committed by Tim Abbott
parent ede4a88b49
commit 327ff9ea0f
8 changed files with 180 additions and 190 deletions

View File

@@ -64,6 +64,7 @@ from zerver.lib.timezone import common_timezones
from zerver.lib.types import LinkifierDict
from zerver.lib.url_encoding import encode_stream, hash_util_encode
from zerver.lib.url_preview import preview as link_preview
from zerver.lib.url_preview.types import UrlEmbedData, UrlOEmbedData
from zerver.models import EmojiInfo, Message, Realm, linkifiers_for_realm
ReturnT = TypeVar("ReturnT")
@@ -682,54 +683,44 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
desc_div = SubElement(summary_div, "desc")
desc_div.set("class", "message_inline_image_desc")
def add_oembed_data(self, root: Element, link: str, extracted_data: Dict[str, Any]) -> bool:
oembed_resource_type = extracted_data.get("type", "")
title = extracted_data.get("title")
if oembed_resource_type == "photo":
image = extracted_data.get("image")
if image:
self.add_a(
root,
image_url=image,
link=link,
title=title,
)
return True
elif oembed_resource_type == "video":
html = extracted_data["html"]
image = extracted_data["image"]
title = extracted_data.get("title")
description = extracted_data.get("description")
self.add_a(
root,
image_url=image,
link=link,
title=title,
desc=description,
class_attr="embed-video message_inline_image",
data_id=html,
already_thumbnailed=True,
)
return True
return False
def add_embed(self, root: Element, link: str, extracted_data: Dict[str, Any]) -> None:
oembed = extracted_data.get("oembed", False)
if oembed and self.add_oembed_data(root, link, extracted_data):
def add_oembed_data(self, root: Element, link: str, extracted_data: UrlOEmbedData) -> None:
if extracted_data.image is None:
# Don't add an embed if an image is not found
return
img_link = extracted_data.get("image")
if not img_link:
if extracted_data.type == "photo":
self.add_a(
root,
image_url=extracted_data.image,
link=link,
title=extracted_data.title,
)
elif extracted_data.type == "video":
self.add_a(
root,
image_url=extracted_data.image,
link=link,
title=extracted_data.title,
desc=extracted_data.description,
class_attr="embed-video message_inline_image",
data_id=extracted_data.html,
already_thumbnailed=True,
)
def add_embed(self, root: Element, link: str, extracted_data: UrlEmbedData) -> None:
if isinstance(extracted_data, UrlOEmbedData):
self.add_oembed_data(root, link, extracted_data)
return
if extracted_data.image is None:
# Don't add an embed if an image is not found
return
container = SubElement(root, "div")
container.set("class", "message_embed")
img_link = get_camo_url(img_link)
img_link = get_camo_url(extracted_data.image)
img = SubElement(container, "a")
img.set("style", "background-image: url(" + css_escape(img_link) + ")")
img.set("href", link)
@@ -738,19 +729,17 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
data_container = SubElement(container, "div")
data_container.set("class", "data-container")
title = extracted_data.get("title")
if title:
if extracted_data.title:
title_elm = SubElement(data_container, "div")
title_elm.set("class", "message_embed_title")
a = SubElement(title_elm, "a")
a.set("href", link)
a.set("title", title)
a.text = title
description = extracted_data.get("description")
if description:
a.set("title", extracted_data.title)
a.text = extracted_data.title
if extracted_data.description:
description_elm = SubElement(data_container, "div")
description_elm.set("class", "message_embed_description")
description_elm.text = description
description_elm.text = extracted_data.description
def get_actual_image_url(self, url: str) -> str:
# Add specific per-site cases to convert image-preview URLs to image URLs.
@@ -868,10 +857,9 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return None
return match.group(2)
def youtube_title(self, extracted_data: Dict[str, Any]) -> Optional[str]:
title = extracted_data.get("title")
if title is not None:
return f"YouTube - {title}"
def youtube_title(self, extracted_data: UrlEmbedData) -> Optional[str]:
if extracted_data.title is not None:
return f"YouTube - {extracted_data.title}"
return None
def youtube_image(self, url: str) -> Optional[str]:
@@ -897,10 +885,9 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return None
return match.group(5)
def vimeo_title(self, extracted_data: Dict[str, Any]) -> Optional[str]:
title = extracted_data.get("title")
if title is not None:
return f"Vimeo - {title}"
def vimeo_title(self, extracted_data: UrlEmbedData) -> Optional[str]:
if extracted_data.title is not None:
return f"Vimeo - {extracted_data.title}"
return None
def twitter_text(

View File

@@ -1,13 +1,13 @@
import json
from typing import Any, Dict, Optional
from typing import Optional
import requests
from pyoembed import PyOembedException, oEmbed
from zerver.lib.url_preview.types import UrlEmbedData, UrlOEmbedData
def get_oembed_data(
url: str, maxwidth: int = 640, maxheight: int = 480
) -> Optional[Dict[str, Any]]:
def get_oembed_data(url: str, maxwidth: int = 640, maxheight: int = 480) -> Optional[UrlEmbedData]:
try:
data = oEmbed(url, maxwidth=maxwidth, maxheight=maxheight)
except (PyOembedException, json.decoder.JSONDecodeError, requests.exceptions.ConnectionError):
@@ -16,29 +16,27 @@ def get_oembed_data(
oembed_resource_type = data.get("type", "")
image = data.get("url", data.get("image"))
thumbnail = data.get("thumbnail_url")
html = data.pop("html", "")
html = data.get("html", "")
if oembed_resource_type == "photo" and image:
return dict(
oembed=True,
return UrlOEmbedData(
image=image,
type=oembed_resource_type,
type="photo",
title=data.get("title"),
description=data.get("description"),
)
if oembed_resource_type == "video" and html and thumbnail:
return dict(
oembed=True,
return UrlOEmbedData(
image=thumbnail,
type=oembed_resource_type,
type="video",
html=strip_cdata(html),
title=data.get("title"),
description=data.get("description"),
)
# Otherwise, start with just the embed type.
return dict(
type=oembed_resource_type,
# Otherwise, use the title/description from pyembed as the basis
# for our other parsers
return UrlEmbedData(
title=data.get("title"),
description=data.get("description"),
)

View File

@@ -1,5 +1,7 @@
import cgi
from typing import Any, Optional
from typing import Optional
from zerver.lib.url_preview.types import UrlEmbedData
class BaseParser:
@@ -14,5 +16,5 @@ class BaseParser:
charset = cgi.parse_header(content_type)[1].get("charset")
self._soup = BeautifulSoup(html_source, "lxml", from_encoding=charset)
def extract_data(self) -> Any:
def extract_data(self) -> UrlEmbedData:
raise NotImplementedError()

View File

@@ -1,18 +1,19 @@
from typing import Dict, Optional
from typing import Optional
from urllib.parse import urlparse
from bs4.element import Tag
from zerver.lib.url_preview.parsers.base import BaseParser
from zerver.lib.url_preview.types import UrlEmbedData
class GenericParser(BaseParser):
def extract_data(self) -> Dict[str, Optional[str]]:
return {
"title": self._get_title(),
"description": self._get_description(),
"image": self._get_image(),
}
def extract_data(self) -> UrlEmbedData:
return UrlEmbedData(
title=self._get_title(),
description=self._get_description(),
image=self._get_image(),
)
def _get_title(self) -> Optional[str]:
soup = self._soup

View File

@@ -1,37 +1,33 @@
from typing import Dict
from urllib.parse import urlparse
from zerver.lib.url_preview.types import UrlEmbedData
from .base import BaseParser
class OpenGraphParser(BaseParser):
allowed_og_properties = {
"og:title",
"og:description",
"og:image",
}
def extract_data(self) -> Dict[str, str]:
def extract_data(self) -> UrlEmbedData:
meta = self._soup.findAll("meta")
result = {}
data = UrlEmbedData()
for tag in meta:
if not tag.has_attr("property"):
continue
if tag["property"] not in self.allowed_og_properties:
continue
og_property_name = tag["property"][len("og:") :]
if not tag.has_attr("content"):
continue
if og_property_name == "image":
if tag["property"] == "og:title":
data.title = tag["content"]
elif tag["property"] == "og:description":
data.description = tag["content"]
elif tag["property"] == "og:image":
try:
# We use urlparse and not URLValidator because we
# need to support relative URLs.
urlparse(tag["content"])
except ValueError:
continue
data.image = tag["content"]
result[og_property_name] = tag["content"]
return result
return data

View File

@@ -1,5 +1,5 @@
import re
from typing import Any, Callable, Dict, Match, Optional
from typing import Any, Callable, Match, Optional
from urllib.parse import urljoin
import magic
@@ -13,6 +13,7 @@ from zerver.lib.outgoing_http import OutgoingSession
from zerver.lib.pysa import mark_sanitized
from zerver.lib.url_preview.oembed import get_oembed_data
from zerver.lib.url_preview.parsers import GenericParser, OpenGraphParser
from zerver.lib.url_preview.types import UrlEmbedData, UrlOEmbedData
# FIXME: Should we use a database cache or a memcached in production? What if
# OpenGraph data is changed for a site?
@@ -87,41 +88,39 @@ def catch_network_errors(func: Callable[..., Any]) -> Callable[..., Any]:
@cache_with_key(preview_url_cache_key, cache_name=CACHE_NAME, with_statsd_key="urlpreview_data")
def get_link_embed_data(
url: str, maxwidth: int = 640, maxheight: int = 480
) -> Optional[Dict[str, Any]]:
) -> Optional[UrlEmbedData]:
if not is_link(url):
return None
if not valid_content_type(url):
return None
# We are using two different mechanisms to get the embed data
# 1. Use oEmbed data, if found, for photo and video "type" sites
# 2. Otherwise, use a combination of Open Graph tags and Meta tags
data = get_oembed_data(url, maxwidth=maxwidth, maxheight=maxheight) or {}
if data.get("oembed"):
# The oembed data from pyoembed may be complete enough to return
# as-is; if so, we use it. Otherwise, we use it as a _base_ for
# the other, less sophisticated techniques which we apply as
# successive fallbacks.
data = get_oembed_data(url, maxwidth=maxwidth, maxheight=maxheight)
if data is not None and isinstance(data, UrlOEmbedData):
return data
response = PreviewSession().get(mark_sanitized(url), stream=True)
if response.ok:
og_data = OpenGraphParser(
response.content, response.headers.get("Content-Type")
).extract_data()
for key in ["title", "description", "image"]:
if not data.get(key) and og_data.get(key):
data[key] = og_data[key]
if not response.ok:
return None
generic_data = (
GenericParser(response.content, response.headers.get("Content-Type")).extract_data()
or {}
)
for key in ["title", "description", "image"]:
if not data.get(key) and generic_data.get(key):
data[key] = generic_data[key]
if "image" in data:
data["image"] = urljoin(response.url, data["image"])
if data is None:
data = UrlEmbedData()
for parser_class in (OpenGraphParser, GenericParser):
parser = parser_class(response.content, response.headers.get("Content-Type"))
data.merge(parser.extract_data())
if data.image:
data.image = urljoin(response.url, data.image)
return data
@get_cache_with_key(preview_url_cache_key, cache_name=CACHE_NAME)
def link_embed_data_from_cache(url: str, maxwidth: int = 640, maxheight: int = 480) -> Any:
return
def link_embed_data_from_cache(
url: str, maxwidth: int = 640, maxheight: int = 480
) -> Optional[UrlEmbedData]:
return None

View File

@@ -0,0 +1,27 @@
from dataclasses import dataclass
from typing import Optional
from typing_extensions import Literal
@dataclass
class UrlEmbedData:
type: Optional[str] = None
html: Optional[str] = None
title: Optional[str] = None
description: Optional[str] = None
image: Optional[str] = None
def merge(self, other: "UrlEmbedData") -> None:
if self.title is None and other.title is not None:
self.title = other.title
if self.description is None and other.description is not None:
self.description = other.description
if self.image is None and other.image is not None:
self.image = other.image
@dataclass
class UrlOEmbedData(UrlEmbedData):
type: Literal["photo", "video"]
html: Optional[str] = None

View File

@@ -18,6 +18,7 @@ from zerver.lib.test_helpers import mock_queue_publish
from zerver.lib.url_preview.oembed import get_oembed_data, strip_cdata
from zerver.lib.url_preview.parsers import GenericParser, OpenGraphParser
from zerver.lib.url_preview.preview import get_link_embed_data, link_embed_data_from_cache
from zerver.lib.url_preview.types import UrlEmbedData, UrlOEmbedData
from zerver.models import Message, Realm, UserProfile
from zerver.worker.queue_processors import FetchLinksEmbedData
@@ -77,10 +78,9 @@ class OembedTestCase(ZulipTestCase):
)
data = get_oembed_data(url)
self.assertIsInstance(data, dict)
self.assertIn("title", data)
assert data is not None # allow mypy to infer data is indexable
self.assertEqual(data["title"], response_data["title"])
assert data is not None
self.assertIsInstance(data, UrlEmbedData)
self.assertEqual(data.title, response_data["title"])
@responses.activate
def test_photo_provider(self) -> None:
@@ -107,11 +107,9 @@ class OembedTestCase(ZulipTestCase):
)
data = get_oembed_data(url)
self.assertIsInstance(data, dict)
self.assertIn("title", data)
assert data is not None # allow mypy to infer data is indexable
self.assertEqual(data["title"], response_data["title"])
self.assertTrue(data["oembed"])
assert data is not None
self.assertIsInstance(data, UrlOEmbedData)
self.assertEqual(data.title, response_data["title"])
@responses.activate
def test_video_provider(self) -> None:
@@ -136,10 +134,9 @@ class OembedTestCase(ZulipTestCase):
)
data = get_oembed_data(url)
self.assertIsInstance(data, dict)
self.assertIn("title", data)
assert data is not None # allow mypy to infer data is indexable
self.assertEqual(data["title"], response_data["title"])
assert data is not None
self.assertIsInstance(data, UrlOEmbedData)
self.assertEqual(data.title, response_data["title"])
@responses.activate
def test_connect_error_request(self) -> None:
@@ -204,30 +201,8 @@ class OpenGraphParserTestCase(ZulipTestCase):
parser = OpenGraphParser(html, "text/html; charset=UTF-8")
result = parser.extract_data()
self.assertIn("title", result)
self.assertEqual(result["title"], "The Rock")
self.assertEqual(result.get("description"), "The Rock film")
def test_page_with_evil_og_tags(self) -> None:
html = b"""<html>
<head>
<meta property="og:title" content="The Rock" />
<meta property="og:type" content="video.movie" />
<meta property="og:url" content="http://www.imdb.com/title/tt0117500/" />
<meta property="og:image" content="http://ia.media-imdb.com/images/rock.jpg" />
<meta property="og:description" content="The Rock film" />
<meta property="og:html" content="<script>alert(window.location)</script>" />
<meta property="og:oembed" content="True" />
</head>
</html>"""
parser = OpenGraphParser(html, "text/html; charset=UTF-8")
result = parser.extract_data()
self.assertIn("title", result)
self.assertEqual(result["title"], "The Rock")
self.assertEqual(result.get("description"), "The Rock film")
self.assertEqual(result.get("oembed"), None)
self.assertEqual(result.get("html"), None)
self.assertEqual(result.title, "The Rock")
self.assertEqual(result.description, "The Rock film")
def test_charset_in_header(self) -> None:
html = """<html>
@@ -239,7 +214,7 @@ class OpenGraphParserTestCase(ZulipTestCase):
)
parser = OpenGraphParser(html, "text/html; charset=Big5")
result = parser.extract_data()
self.assertEqual(result["title"], "中文")
self.assertEqual(result.title, "中文")
def test_charset_in_meta(self) -> None:
html = """<html>
@@ -252,7 +227,7 @@ class OpenGraphParserTestCase(ZulipTestCase):
)
parser = OpenGraphParser(html, "text/html")
result = parser.extract_data()
self.assertEqual(result["title"], "中文")
self.assertEqual(result.title, "中文")
class GenericParserTestCase(ZulipTestCase):
@@ -268,8 +243,8 @@ class GenericParserTestCase(ZulipTestCase):
"""
parser = GenericParser(html, "text/html; charset=UTF-8")
result = parser.extract_data()
self.assertEqual(result.get("title"), "Test title")
self.assertEqual(result.get("description"), "Description text")
self.assertEqual(result.title, "Test title")
self.assertEqual(result.description, "Description text")
def test_extract_image(self) -> None:
html = b"""
@@ -286,9 +261,9 @@ class GenericParserTestCase(ZulipTestCase):
"""
parser = GenericParser(html, "text/html; charset=UTF-8")
result = parser.extract_data()
self.assertEqual(result.get("title"), "Main header")
self.assertEqual(result.get("description"), "Description text")
self.assertEqual(result.get("image"), "http://test.com/test.jpg")
self.assertEqual(result.title, "Main header")
self.assertEqual(result.description, "Description text")
self.assertEqual(result.image, "http://test.com/test.jpg")
def test_extract_bad_image(self) -> None:
html = b"""
@@ -305,9 +280,9 @@ class GenericParserTestCase(ZulipTestCase):
"""
parser = GenericParser(html, "text/html; charset=UTF-8")
result = parser.extract_data()
self.assertEqual(result.get("title"), "Main header")
self.assertEqual(result.get("description"), "Description text")
self.assertIsNone(result.get("image"))
self.assertEqual(result.title, "Main header")
self.assertEqual(result.description, "Description text")
self.assertIsNone(result.image)
def test_extract_description(self) -> None:
html = b"""
@@ -323,7 +298,7 @@ class GenericParserTestCase(ZulipTestCase):
"""
parser = GenericParser(html, "text/html; charset=UTF-8")
result = parser.extract_data()
self.assertEqual(result.get("description"), "Description text")
self.assertEqual(result.description, "Description text")
html = b"""
<html>
@@ -333,12 +308,12 @@ class GenericParserTestCase(ZulipTestCase):
"""
parser = GenericParser(html, "text/html; charset=UTF-8")
result = parser.extract_data()
self.assertEqual(result.get("description"), "description 123")
self.assertEqual(result.description, "description 123")
html = b"<html><body></body></html>"
parser = GenericParser(html, "text/html; charset=UTF-8")
result = parser.extract_data()
self.assertIsNone(result.get("description"))
self.assertIsNone(result.description)
class PreviewTestCase(ZulipTestCase):
@@ -730,8 +705,9 @@ class PreviewTestCase(ZulipTestCase):
in info_logs.output[0]
)
self.assertIn("title", cached_data)
self.assertNotIn("image", cached_data)
assert cached_data
self.assertIsNotNone(cached_data.title)
self.assertIsNone(cached_data.image)
msg = Message.objects.select_related("sender").get(id=msg_id)
self.assertEqual(
('<p><a href="http://test.org/foo.html">' "http://test.org/foo.html</a></p>"),
@@ -768,8 +744,9 @@ class PreviewTestCase(ZulipTestCase):
in info_logs.output[0]
)
self.assertIn("title", cached_data)
self.assertNotIn("image", cached_data)
assert cached_data
self.assertIsNotNone(cached_data.title)
self.assertIsNone(cached_data.image)
msg = Message.objects.select_related("sender").get(id=msg_id)
self.assertEqual(
('<p><a href="http://test.org/foo.html">' "http://test.org/foo.html</a></p>"),
@@ -804,8 +781,9 @@ class PreviewTestCase(ZulipTestCase):
in info_logs.output[0]
)
self.assertIn("title", cached_data)
self.assertNotIn("image", cached_data)
assert cached_data
self.assertIsNotNone(cached_data.title)
self.assertIsNone(cached_data.image)
msg = Message.objects.select_related("sender").get(id=msg_id)
self.assertEqual(
('<p><a href="http://test.org/foo.html">' "http://test.org/foo.html</a></p>"),
@@ -836,12 +814,11 @@ class PreviewTestCase(ZulipTestCase):
in info_logs.output[0]
)
self.assertIn("title", data)
self.assertIn("image", data)
assert data is not None
msg = Message.objects.select_related("sender").get(id=msg_id)
self.assertIn(data["title"], msg.rendered_content)
self.assertIn(re.sub(r"([^\w-])", r"\\\1", data["image"]), msg.rendered_content)
self.assertIn(data.title, msg.rendered_content)
assert data.image is not None
self.assertIn(re.sub(r"([^\w-])", r"\\\1", data.image), msg.rendered_content)
@responses.activate
@override_settings(INLINE_URL_EMBED_PREVIEW=True)
@@ -941,12 +918,11 @@ class PreviewTestCase(ZulipTestCase):
"message_content": url,
}
mocked_data = {
"html": f'<iframe src="{url}"></iframe>',
"oembed": True,
"type": "video",
"image": f"{url}/image.png",
}
mocked_data = UrlOEmbedData(
html=f'<iframe src="{url}"></iframe>',
type="video",
image=f"{url}/image.png",
)
self.create_mock_response(url)
with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES):
with self.assertLogs(level="INFO") as info_logs:
@@ -963,7 +939,7 @@ class PreviewTestCase(ZulipTestCase):
self.assertEqual(data, mocked_data)
msg.refresh_from_db()
self.assertIn('a data-id="{}"'.format(escape(mocked_data["html"])), msg.rendered_content)
self.assertIn('a data-id="{}"'.format(escape(mocked_data.html)), msg.rendered_content)
@responses.activate
@override_settings(INLINE_URL_EMBED_PREVIEW=True)
@@ -983,7 +959,9 @@ class PreviewTestCase(ZulipTestCase):
"message_content": url,
}
mocked_data = {"title": "Clearer Code at Scale - Static Types at Zulip and Dropbox"}
mocked_data = UrlEmbedData(
title="Clearer Code at Scale - Static Types at Zulip and Dropbox"
)
self.create_mock_response(url)
with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES):
with self.assertLogs(level="INFO") as info_logs:
@@ -1019,7 +997,9 @@ class PreviewTestCase(ZulipTestCase):
"message_content": url,
}
mocked_data = {"title": "Clearer Code at Scale - Static Types at Zulip and Dropbox"}
mocked_data = UrlEmbedData(
title="Clearer Code at Scale - Static Types at Zulip and Dropbox"
)
self.create_mock_response(url)
with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES):
with self.assertLogs(level="INFO") as info_logs: