mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	We don't want really long urls to lead to truncated keys, or we could theoretically have two different urls get mixed up previews. Also, this suppresses warnings about exceeding the 250 char limit. Finally, this gives the key a proper prefix.
		
			
				
	
	
		
			375 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			375 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
import mock
 | 
						|
import ujson
 | 
						|
from typing import Any
 | 
						|
from requests.exceptions import ConnectionError
 | 
						|
from django.test import override_settings
 | 
						|
 | 
						|
from zerver.models import Recipient, Message
 | 
						|
from zerver.lib.actions import queue_json_publish
 | 
						|
from zerver.lib.test_classes import ZulipTestCase
 | 
						|
from zerver.lib.test_helpers import MockPythonResponse
 | 
						|
from zerver.worker.queue_processors import FetchLinksEmbedData
 | 
						|
from zerver.lib.url_preview.preview import (
 | 
						|
    get_link_embed_data, link_embed_data_from_cache)
 | 
						|
from zerver.lib.url_preview.oembed import get_oembed_data
 | 
						|
from zerver.lib.url_preview.parsers import (
 | 
						|
    OpenGraphParser, GenericParser)
 | 
						|
from zerver.lib.cache import cache_set, NotFoundInCache, preview_url_cache_key
 | 
						|
 | 
						|
 | 
						|
TEST_CACHES = {
 | 
						|
    'default': {
 | 
						|
        'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
 | 
						|
        'LOCATION': 'default',
 | 
						|
    },
 | 
						|
    'database': {
 | 
						|
        'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
 | 
						|
        'LOCATION': 'url-preview',
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
@override_settings(INLINE_URL_EMBED_PREVIEW=True)
 | 
						|
class OembedTestCase(ZulipTestCase):
 | 
						|
    @mock.patch('pyoembed.requests.get')
 | 
						|
    def test_present_provider(self, get: Any) -> None:
 | 
						|
        get.return_value = response = mock.Mock()
 | 
						|
        response.headers = {'content-type': 'application/json'}
 | 
						|
        response.ok = True
 | 
						|
        response_data = {
 | 
						|
            'type': 'rich',
 | 
						|
            'thumbnail_url': 'https://scontent.cdninstagram.com/t51.2885-15/n.jpg',
 | 
						|
            'thumbnail_width': 640,
 | 
						|
            'thumbnail_height': 426,
 | 
						|
            'title': 'NASA',
 | 
						|
            'html': '<p>test</p>',
 | 
						|
            'version': '1.0',
 | 
						|
            'width': 658,
 | 
						|
            'height': None}
 | 
						|
        response.text = ujson.dumps(response_data)
 | 
						|
        url = 'http://instagram.com/p/BLtI2WdAymy'
 | 
						|
        data = get_oembed_data(url)
 | 
						|
        self.assertIsInstance(data, dict)
 | 
						|
        self.assertIn('title', data)
 | 
						|
        assert data is not None  # allow mypy to infer data is indexable
 | 
						|
        self.assertEqual(data['title'], response_data['title'])
 | 
						|
 | 
						|
    @mock.patch('pyoembed.requests.get')
 | 
						|
    def test_error_request(self, get: Any) -> None:
 | 
						|
        get.return_value = response = mock.Mock()
 | 
						|
        response.ok = False
 | 
						|
        url = 'http://instagram.com/p/BLtI2WdAymy'
 | 
						|
        data = get_oembed_data(url)
 | 
						|
        self.assertIsNone(data)
 | 
						|
 | 
						|
 | 
						|
class OpenGraphParserTestCase(ZulipTestCase):
 | 
						|
    def test_page_with_og(self) -> None:
 | 
						|
        html = """<html>
 | 
						|
          <head>
 | 
						|
          <meta property="og:title" content="The Rock" />
 | 
						|
          <meta property="og:type" content="video.movie" />
 | 
						|
          <meta property="og:url" content="http://www.imdb.com/title/tt0117500/" />
 | 
						|
          <meta property="og:image" content="http://ia.media-imdb.com/images/rock.jpg" />
 | 
						|
          <meta property="og:description" content="The Rock film" />
 | 
						|
          </head>
 | 
						|
        </html>"""
 | 
						|
 | 
						|
        parser = OpenGraphParser(html)
 | 
						|
        result = parser.extract_data()
 | 
						|
        self.assertIn('title', result)
 | 
						|
        self.assertEqual(result['title'], 'The Rock')
 | 
						|
        self.assertEqual(result.get('description'), 'The Rock film')
 | 
						|
 | 
						|
 | 
						|
class GenericParserTestCase(ZulipTestCase):
 | 
						|
    def test_parser(self) -> None:
 | 
						|
        html = """
 | 
						|
          <html>
 | 
						|
            <head><title>Test title</title></head>
 | 
						|
            <body>
 | 
						|
                <h1>Main header</h1>
 | 
						|
                <p>Description text</p>
 | 
						|
            </body>
 | 
						|
          </html>
 | 
						|
        """
 | 
						|
        parser = GenericParser(html)
 | 
						|
        result = parser.extract_data()
 | 
						|
        self.assertEqual(result.get('title'), 'Test title')
 | 
						|
        self.assertEqual(result.get('description'), 'Description text')
 | 
						|
 | 
						|
    def test_extract_image(self) -> None:
 | 
						|
        html = """
 | 
						|
          <html>
 | 
						|
            <body>
 | 
						|
                <h1>Main header</h1>
 | 
						|
                <img src="http://test.com/test.jpg">
 | 
						|
                <div>
 | 
						|
                    <p>Description text</p>
 | 
						|
                </div>
 | 
						|
            </body>
 | 
						|
          </html>
 | 
						|
        """
 | 
						|
        parser = GenericParser(html)
 | 
						|
        result = parser.extract_data()
 | 
						|
        self.assertEqual(result.get('title'), 'Main header')
 | 
						|
        self.assertEqual(result.get('description'), 'Description text')
 | 
						|
        self.assertEqual(result.get('image'), 'http://test.com/test.jpg')
 | 
						|
 | 
						|
    def test_extract_description(self) -> None:
 | 
						|
        html = """
 | 
						|
          <html>
 | 
						|
            <body>
 | 
						|
                <div>
 | 
						|
                    <div>
 | 
						|
                        <p>Description text</p>
 | 
						|
                    </div>
 | 
						|
                </div>
 | 
						|
            </body>
 | 
						|
          </html>
 | 
						|
        """
 | 
						|
        parser = GenericParser(html)
 | 
						|
        result = parser.extract_data()
 | 
						|
        self.assertEqual(result.get('description'), 'Description text')
 | 
						|
 | 
						|
        html = """
 | 
						|
          <html>
 | 
						|
            <head><meta name="description" content="description 123"</head>
 | 
						|
            <body></body>
 | 
						|
          </html>
 | 
						|
        """
 | 
						|
        parser = GenericParser(html)
 | 
						|
        result = parser.extract_data()
 | 
						|
        self.assertEqual(result.get('description'), 'description 123')
 | 
						|
 | 
						|
        html = "<html><body></body></html>"
 | 
						|
        parser = GenericParser(html)
 | 
						|
        result = parser.extract_data()
 | 
						|
        self.assertIsNone(result.get('description'))
 | 
						|
 | 
						|
 | 
						|
class PreviewTestCase(ZulipTestCase):
 | 
						|
    open_graph_html = """
 | 
						|
          <html>
 | 
						|
            <head>
 | 
						|
                <title>Test title</title>
 | 
						|
                <meta property="og:title" content="The Rock" />
 | 
						|
                <meta property="og:type" content="video.movie" />
 | 
						|
                <meta property="og:url" content="http://www.imdb.com/title/tt0117500/" />
 | 
						|
                <meta property="og:image" content="http://ia.media-imdb.com/images/rock.jpg" />
 | 
						|
            </head>
 | 
						|
            <body>
 | 
						|
                <h1>Main header</h1>
 | 
						|
                <p>Description text</p>
 | 
						|
            </body>
 | 
						|
          </html>
 | 
						|
        """
 | 
						|
 | 
						|
    @override_settings(INLINE_URL_EMBED_PREVIEW=True)
 | 
						|
    def test_edit_message_history(self) -> None:
 | 
						|
        email = self.example_email('hamlet')
 | 
						|
        self.login(email)
 | 
						|
        msg_id = self.send_stream_message(email, "Scotland",
 | 
						|
                                          topic_name="editing", content="original")
 | 
						|
 | 
						|
        url = 'http://test.org/'
 | 
						|
        response = MockPythonResponse(self.open_graph_html, 200)
 | 
						|
        mocked_response = mock.Mock(
 | 
						|
            side_effect=lambda k: {url: response}.get(k, MockPythonResponse('', 404)))
 | 
						|
 | 
						|
        with mock.patch('zerver.views.messages.queue_json_publish') as patched:
 | 
						|
            result = self.client_patch("/json/messages/" + str(msg_id), {
 | 
						|
                'message_id': msg_id, 'content': url,
 | 
						|
            })
 | 
						|
            self.assert_json_success(result)
 | 
						|
            patched.assert_called_once()
 | 
						|
            queue = patched.call_args[0][0]
 | 
						|
            self.assertEqual(queue, "embed_links")
 | 
						|
            event = patched.call_args[0][1]
 | 
						|
 | 
						|
        with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES):
 | 
						|
            with mock.patch('requests.get', mocked_response):
 | 
						|
                FetchLinksEmbedData().consume(event)
 | 
						|
 | 
						|
        embedded_link = '<a href="{0}" target="_blank" title="The Rock">The Rock</a>'.format(url)
 | 
						|
        msg = Message.objects.select_related("sender").get(id=msg_id)
 | 
						|
        self.assertIn(embedded_link, msg.rendered_content)
 | 
						|
 | 
						|
    @override_settings(INLINE_URL_EMBED_PREVIEW=True)
 | 
						|
    def _send_message_with_test_org_url(self, sender_email: str, queue_should_run: bool=True,
 | 
						|
                                        relative_url: bool=False) -> Message:
 | 
						|
        url = 'http://test.org/'
 | 
						|
        with mock.patch('zerver.lib.actions.queue_json_publish') as patched:
 | 
						|
            msg_id = self.send_personal_message(
 | 
						|
                sender_email,
 | 
						|
                self.example_email('cordelia'),
 | 
						|
                content=url,
 | 
						|
            )
 | 
						|
            if queue_should_run:
 | 
						|
                patched.assert_called_once()
 | 
						|
                queue = patched.call_args[0][0]
 | 
						|
                self.assertEqual(queue, "embed_links")
 | 
						|
                event = patched.call_args[0][1]
 | 
						|
            else:
 | 
						|
                patched.assert_not_called()
 | 
						|
                # If we nothing was put in the queue, we don't need to
 | 
						|
                # run the queue processor or any of the following code
 | 
						|
                return Message.objects.select_related("sender").get(id=msg_id)
 | 
						|
 | 
						|
        # Verify the initial message doesn't have the embedded links rendered
 | 
						|
        msg = Message.objects.select_related("sender").get(id=msg_id)
 | 
						|
        self.assertNotIn(
 | 
						|
            '<a href="{0}" target="_blank" title="The Rock">The Rock</a>'.format(url),
 | 
						|
            msg.rendered_content)
 | 
						|
 | 
						|
        # Mock the network request result so the test can be fast without Internet
 | 
						|
        response = MockPythonResponse(self.open_graph_html, 200)
 | 
						|
        if relative_url is True:
 | 
						|
            response = MockPythonResponse(self.open_graph_html.replace('http://ia.media-imdb.com', ''), 200)
 | 
						|
        mocked_response = mock.Mock(
 | 
						|
            side_effect=lambda k: {url: response}.get(k, MockPythonResponse('', 404)))
 | 
						|
 | 
						|
        # Run the queue processor to potentially rerender things
 | 
						|
        with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES):
 | 
						|
            with mock.patch('requests.get', mocked_response):
 | 
						|
                FetchLinksEmbedData().consume(event)
 | 
						|
        msg = Message.objects.select_related("sender").get(id=msg_id)
 | 
						|
        return msg
 | 
						|
 | 
						|
    @override_settings(INLINE_URL_EMBED_PREVIEW=True)
 | 
						|
    def test_message_update_race_condition(self) -> None:
 | 
						|
        email = self.example_email('hamlet')
 | 
						|
        self.login(email)
 | 
						|
        original_url = 'http://test.org/'
 | 
						|
        edited_url = 'http://edited.org/'
 | 
						|
        with mock.patch('zerver.lib.actions.queue_json_publish') as patched:
 | 
						|
            msg_id = self.send_stream_message(email, "Scotland",
 | 
						|
                                              topic_name="foo", content=original_url)
 | 
						|
            patched.assert_called_once()
 | 
						|
            queue = patched.call_args[0][0]
 | 
						|
            self.assertEqual(queue, "embed_links")
 | 
						|
            event = patched.call_args[0][1]
 | 
						|
 | 
						|
        def wrapped_queue_json_publish(*args: Any, **kwargs: Any) -> None:
 | 
						|
            # Mock the network request result so the test can be fast without Internet
 | 
						|
            response = MockPythonResponse(self.open_graph_html, 200)
 | 
						|
            mocked_response_original = mock.Mock(
 | 
						|
                side_effect=lambda k: {original_url: response}.get(k, MockPythonResponse('', 404)))
 | 
						|
            mocked_response_edited = mock.Mock(
 | 
						|
                side_effect=lambda k: {edited_url: response}.get(k, MockPythonResponse('', 404)))
 | 
						|
            with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES):
 | 
						|
                with mock.patch('requests.get', mocked_response_original):
 | 
						|
                    # Run the queue processor. This will simulate the event for original_url being
 | 
						|
                    # processed after the message has been edited.
 | 
						|
                    FetchLinksEmbedData().consume(event)
 | 
						|
            msg = Message.objects.select_related("sender").get(id=msg_id)
 | 
						|
            # The content of the message has changed since the event for original_url has been created,
 | 
						|
            # it should not be rendered. Another, up-to-date event will have been sent (edited_url).
 | 
						|
            self.assertNotIn('<a href="{0}" target="_blank" title="The Rock">The Rock</a>'.format(original_url),
 | 
						|
                             msg.rendered_content)
 | 
						|
            mocked_response_edited.assert_not_called()
 | 
						|
 | 
						|
            with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES):
 | 
						|
                with mock.patch('requests.get', mocked_response_edited):
 | 
						|
                    # Now proceed with the original queue_json_publish and call the
 | 
						|
                    # up-to-date event for edited_url.
 | 
						|
                    queue_json_publish(*args, **kwargs)
 | 
						|
                    msg = Message.objects.select_related("sender").get(id=msg_id)
 | 
						|
                    self.assertIn('<a href="{0}" target="_blank" title="The Rock">The Rock</a>'.format(edited_url),
 | 
						|
                                  msg.rendered_content)
 | 
						|
 | 
						|
        with mock.patch('zerver.views.messages.queue_json_publish', wraps=wrapped_queue_json_publish) as patched:
 | 
						|
            result = self.client_patch("/json/messages/" + str(msg_id), {
 | 
						|
                'message_id': msg_id, 'content': edited_url,
 | 
						|
            })
 | 
						|
            self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_get_link_embed_data(self) -> None:
 | 
						|
        url = 'http://test.org/'
 | 
						|
        embedded_link = '<a href="{0}" target="_blank" title="The Rock">The Rock</a>'.format(url)
 | 
						|
 | 
						|
        # When humans send, we should get embedded content.
 | 
						|
        msg = self._send_message_with_test_org_url(sender_email=self.example_email('hamlet'))
 | 
						|
        self.assertIn(embedded_link, msg.rendered_content)
 | 
						|
 | 
						|
        # We don't want embedded content for bots.
 | 
						|
        msg = self._send_message_with_test_org_url(sender_email='webhook-bot@zulip.com',
 | 
						|
                                                   queue_should_run=False)
 | 
						|
        self.assertNotIn(embedded_link, msg.rendered_content)
 | 
						|
 | 
						|
        # Try another human to make sure bot failure was due to the
 | 
						|
        # bot sending the message and not some other reason.
 | 
						|
        msg = self._send_message_with_test_org_url(sender_email=self.example_email('prospero'))
 | 
						|
        self.assertIn(embedded_link, msg.rendered_content)
 | 
						|
 | 
						|
    def test_inline_url_embed_preview(self) -> None:
 | 
						|
        with_preview = '<p><a href="http://test.org/" target="_blank" title="http://test.org/">http://test.org/</a></p>\n<div class="message_embed"><a class="message_embed_image" href="http://test.org/" style="background-image: url(http://ia.media-imdb.com/images/rock.jpg)" target="_blank"></a><div class="data-container"><div class="message_embed_title"><a href="http://test.org/" target="_blank" title="The Rock">The Rock</a></div><div class="message_embed_description">Description text</div></div></div>'
 | 
						|
        without_preview = '<p><a href="http://test.org/" target="_blank" title="http://test.org/">http://test.org/</a></p>'
 | 
						|
        msg = self._send_message_with_test_org_url(sender_email=self.example_email('hamlet'))
 | 
						|
        self.assertEqual(msg.rendered_content, with_preview)
 | 
						|
 | 
						|
        realm = msg.get_realm()
 | 
						|
        setattr(realm, 'inline_url_embed_preview', False)
 | 
						|
        realm.save()
 | 
						|
 | 
						|
        msg = self._send_message_with_test_org_url(sender_email=self.example_email('prospero'), queue_should_run=False)
 | 
						|
        self.assertEqual(msg.rendered_content, without_preview)
 | 
						|
 | 
						|
    @override_settings(INLINE_URL_EMBED_PREVIEW=True)
 | 
						|
    def test_inline_relative_url_embed_preview(self) -> None:
 | 
						|
        # Relative urls should not be sent for url preview.
 | 
						|
        with mock.patch('zerver.lib.actions.queue_json_publish') as patched:
 | 
						|
            self.send_personal_message(
 | 
						|
                self.example_email('prospero'),
 | 
						|
                self.example_email('cordelia'),
 | 
						|
                content="http://zulip.testserver/api/",
 | 
						|
            )
 | 
						|
            patched.assert_not_called()
 | 
						|
 | 
						|
    def test_inline_url_embed_preview_with_relative_image_url(self) -> None:
 | 
						|
        with_preview_relative = '<p><a href="http://test.org/" target="_blank" title="http://test.org/">http://test.org/</a></p>\n<div class="message_embed"><a class="message_embed_image" href="http://test.org/" style="background-image: url(http://test.org/images/rock.jpg)" target="_blank"></a><div class="data-container"><div class="message_embed_title"><a href="http://test.org/" target="_blank" title="The Rock">The Rock</a></div><div class="message_embed_description">Description text</div></div></div>'
 | 
						|
        # Try case where the opengraph image is a relative url.
 | 
						|
        msg = self._send_message_with_test_org_url(sender_email=self.example_email('prospero'), relative_url=True)
 | 
						|
        self.assertEqual(msg.rendered_content, with_preview_relative)
 | 
						|
 | 
						|
    def test_http_error_get_data(self) -> None:
 | 
						|
        url = 'http://test.org/'
 | 
						|
        msg_id = self.send_personal_message(
 | 
						|
            self.example_email('hamlet'),
 | 
						|
            self.example_email('cordelia'),
 | 
						|
            content=url,
 | 
						|
        )
 | 
						|
        msg = Message.objects.select_related("sender").get(id=msg_id)
 | 
						|
        event = {
 | 
						|
            'message_id': msg_id,
 | 
						|
            'urls': [url],
 | 
						|
            'message_realm_id': msg.sender.realm_id,
 | 
						|
            'message_content': url}
 | 
						|
        with self.settings(INLINE_URL_EMBED_PREVIEW=True, TEST_SUITE=False, CACHES=TEST_CACHES):
 | 
						|
            with mock.patch('requests.get', mock.Mock(side_effect=ConnectionError())):
 | 
						|
                with mock.patch('logging.error') as error_mock:
 | 
						|
                    FetchLinksEmbedData().consume(event)
 | 
						|
        self.assertEqual(error_mock.call_count, 1)
 | 
						|
        msg = Message.objects.get(id=msg_id)
 | 
						|
        self.assertEqual(
 | 
						|
            '<p><a href="http://test.org/" target="_blank" title="http://test.org/">http://test.org/</a></p>',
 | 
						|
            msg.rendered_content)
 | 
						|
 | 
						|
    def test_invalid_link(self) -> None:
 | 
						|
        with self.settings(INLINE_URL_EMBED_PREVIEW=True, TEST_SUITE=False, CACHES=TEST_CACHES):
 | 
						|
            self.assertIsNone(get_link_embed_data('com.notvalidlink'))
 | 
						|
            self.assertIsNone(get_link_embed_data(u'μένει.com.notvalidlink'))
 | 
						|
 | 
						|
    def test_link_embed_data_from_cache(self) -> None:
 | 
						|
        url = 'http://test.org/'
 | 
						|
        link_embed_data = 'test data'
 | 
						|
 | 
						|
        with self.assertRaises(NotFoundInCache):
 | 
						|
            link_embed_data_from_cache(url)
 | 
						|
 | 
						|
        with self.settings(CACHES=TEST_CACHES):
 | 
						|
            key = preview_url_cache_key(url)
 | 
						|
            cache_set(key, link_embed_data, 'database')
 | 
						|
            self.assertEqual(link_embed_data, link_embed_data_from_cache(url))
 |