mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	This makes it more clearly the pattern in the Zulip codebase, and thus decreases the risk of accidentally doing database queries.
		
			
				
	
	
		
			294 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			294 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
"""WebSocketBaseTestCase is based on combination of Tornado
 | 
						|
and Django test systems. It require to use decorator '@gen.coroutine'
 | 
						|
for each test case method( see documentation: http://www.tornadoweb.org/en/stable/testing.html).
 | 
						|
It requires implementation of 'get_app' method to initialize tornado application and launch it.
 | 
						|
"""
 | 
						|
from __future__ import absolute_import
 | 
						|
from __future__ import print_function
 | 
						|
 | 
						|
 | 
						|
import time
 | 
						|
 | 
						|
import ujson
 | 
						|
from django.conf import settings
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
from tornado.gen import Return
 | 
						|
from tornado.httpclient import HTTPRequest
 | 
						|
 | 
						|
from zerver.lib.test_helpers import POSTRequestMock
 | 
						|
from zerver.lib.test_classes import ZulipTestCase
 | 
						|
 | 
						|
from zerver.models import UserProfile
 | 
						|
 | 
						|
from tornado import gen
 | 
						|
from tornado.testing import AsyncHTTPTestCase, gen_test
 | 
						|
from tornado.web import Application
 | 
						|
from tornado.websocket import websocket_connect
 | 
						|
 | 
						|
from zerver.tornado.application import create_tornado_application
 | 
						|
from zerver.tornado.event_queue import fetch_events
 | 
						|
from zerver.tornado.views import get_events_backend
 | 
						|
 | 
						|
from six.moves.http_cookies import SimpleCookie
 | 
						|
 | 
						|
from typing import Any, Callable, Generator, Optional
 | 
						|
 | 
						|
 | 
						|
class WebSocketBaseTestCase(AsyncHTTPTestCase, ZulipTestCase):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        # type: () -> None
 | 
						|
        #
 | 
						|
        settings.RUNNING_INSIDE_TORNADO = True
 | 
						|
        super(WebSocketBaseTestCase, self).setUp()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        # type: () -> None
 | 
						|
        super(WebSocketBaseTestCase, self).setUp()
 | 
						|
        settings.RUNNING_INSIDE_TORNADO = False
 | 
						|
 | 
						|
    @gen.coroutine
 | 
						|
    def ws_connect(self, path, cookie_header, compression_options=None):
 | 
						|
        # type: (str, str, Optional[Any]) -> Generator[Any, Callable[[HTTPRequest, Optional[Any]], Any], None]
 | 
						|
        request = HTTPRequest(url='ws://127.0.0.1:%d%s' % (self.get_http_port(), path))
 | 
						|
        request.headers.add('Cookie', cookie_header)
 | 
						|
        ws = yield websocket_connect(
 | 
						|
            request,
 | 
						|
            compression_options=compression_options)
 | 
						|
        raise gen.Return(ws)
 | 
						|
 | 
						|
    @gen.coroutine
 | 
						|
    def close(self, ws):
 | 
						|
        # type: (Any) -> Generator[None, Any, None]
 | 
						|
        """Close a websocket connection and wait for the server side.
 | 
						|
 | 
						|
        If we don't wait here, there are sometimes leak warnings in the
 | 
						|
        tests.
 | 
						|
        """
 | 
						|
        ws.close()
 | 
						|
        yield self.close_future
 | 
						|
 | 
						|
 | 
						|
class TornadoTestCase(WebSocketBaseTestCase):
 | 
						|
    def get_app(self):
 | 
						|
        # type: () -> Application
 | 
						|
        """ Return tornado app to launch for test cases
 | 
						|
        """
 | 
						|
        return create_tornado_application()
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def tornado_call(view_func, user_profile, post_data):
 | 
						|
        # type: (Callable[[HttpRequest, UserProfile], HttpResponse], UserProfile, Dict[str, Any]) -> HttpResponse
 | 
						|
        request = POSTRequestMock(post_data, user_profile)
 | 
						|
        return view_func(request, user_profile)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_cookie_header(cookies):
 | 
						|
        # type: (Dict[Any, Any]) -> str
 | 
						|
        return ';'.join(
 | 
						|
            ["{}={}".format(name, value.value) for name, value in cookies.items()])
 | 
						|
 | 
						|
    def _get_cookies(self, user_profile):
 | 
						|
        # type: (UserProfile) -> SimpleCookie
 | 
						|
        resp = self.login_with_return(user_profile.email)
 | 
						|
        return resp.cookies
 | 
						|
 | 
						|
    @gen.coroutine
 | 
						|
    def _websocket_auth(self, ws, queue_events_data, cookies):
 | 
						|
        # type: (Any, Dict[str, Dict[str, str]], SimpleCookie) -> Generator[str, str, None]
 | 
						|
        auth_queue_id = ':'.join((queue_events_data['response']['queue_id'], '0'))
 | 
						|
        message = {
 | 
						|
            "req_id": auth_queue_id,
 | 
						|
            "type": "auth",
 | 
						|
            "request": {
 | 
						|
                "csrf_token": cookies.get('csrftoken').coded_value,
 | 
						|
                "queue_id": queue_events_data['response']['queue_id'],
 | 
						|
                "status_inquiries": []
 | 
						|
            }
 | 
						|
        }
 | 
						|
        auth_frame_str = ujson.dumps(message)
 | 
						|
        ws.write_message(ujson.dumps([auth_frame_str]))
 | 
						|
        response_ack = yield ws.read_message()
 | 
						|
        response_message = yield ws.read_message()
 | 
						|
        raise gen.Return([response_ack, response_message])
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _get_queue_events_data(email):
 | 
						|
        # type: (str) -> Dict[str, Dict[str, str]]
 | 
						|
        user_profile = UserProfile.objects.filter(email=email).first()
 | 
						|
        events_query = {
 | 
						|
            'queue_id': None,
 | 
						|
            'narrow': [],
 | 
						|
            'handler_id': 0,
 | 
						|
            'user_profile_email': user_profile.email,
 | 
						|
            'all_public_streams': False,
 | 
						|
            'client_type_name': 'website',
 | 
						|
            'new_queue_data': {
 | 
						|
                'apply_markdown': True,
 | 
						|
                'narrow': [],
 | 
						|
                'user_profile_email': user_profile.email,
 | 
						|
                'all_public_streams': False,
 | 
						|
                'realm_id': user_profile.realm_id,
 | 
						|
                'client_type_name': 'website',
 | 
						|
                'event_types': None,
 | 
						|
                'user_profile_id': user_profile.id,
 | 
						|
                'queue_timeout': 0,
 | 
						|
                'last_connection_time': time.time()},
 | 
						|
            'last_event_id': -1,
 | 
						|
            'event_types': None,
 | 
						|
            'user_profile_id': user_profile.id,
 | 
						|
            'dont_block': True,
 | 
						|
            'lifespan_secs': 0
 | 
						|
        }
 | 
						|
        result = fetch_events(events_query)
 | 
						|
        return result
 | 
						|
 | 
						|
    def _check_message_sending(self, request_id, ack_resp, msg_resp, profile, queue_events_data):
 | 
						|
        # type: (str, str, str, UserProfile, Dict[str, Dict[str, str]]) -> None
 | 
						|
        self.assertEqual(ack_resp[0], 'a')
 | 
						|
        self.assertEqual(
 | 
						|
            ujson.loads(ack_resp[1:]),
 | 
						|
            [
 | 
						|
                {
 | 
						|
                    "type": "ack",
 | 
						|
                    "req_id": request_id
 | 
						|
                }
 | 
						|
            ])
 | 
						|
        self.assertEqual(msg_resp[0], 'a')
 | 
						|
        result = self.tornado_call(get_events_backend, profile,
 | 
						|
                                   {"queue_id": queue_events_data['response']['queue_id'],
 | 
						|
                                    "user_client": "website",
 | 
						|
                                    "last_event_id": -1,
 | 
						|
                                    "dont_block": ujson.dumps(True),
 | 
						|
                                    })
 | 
						|
        result_content = ujson.loads(result.content)
 | 
						|
        self.assertEqual(len(result_content['events']), 1)
 | 
						|
        message_id = result_content['events'][0]['message']['id']
 | 
						|
        self.assertEqual(
 | 
						|
            ujson.loads(msg_resp[1:]),
 | 
						|
            [
 | 
						|
                {
 | 
						|
                    "type": "response",
 | 
						|
                    "response":
 | 
						|
                        {
 | 
						|
                            "result": "success",
 | 
						|
                            "id": message_id,
 | 
						|
                            "msg": ""
 | 
						|
                        },
 | 
						|
                    "req_id": request_id
 | 
						|
                }
 | 
						|
            ])
 | 
						|
 | 
						|
    @gen_test
 | 
						|
    def test_tornado_connect(self):
 | 
						|
        # type: () -> Generator[str, Any, None]
 | 
						|
        user_profile = UserProfile.objects.filter(email='hamlet@zulip.com').first()
 | 
						|
        cookies = self._get_cookies(user_profile)
 | 
						|
        cookie_header = self.get_cookie_header(cookies)
 | 
						|
        ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
 | 
						|
        response = yield ws.read_message()
 | 
						|
        self.assertEqual(response, 'o')
 | 
						|
 | 
						|
    @gen_test
 | 
						|
    def test_tornado_auth(self):
 | 
						|
        # type: () -> Generator[str, TornadoTestCase, None]
 | 
						|
        user_profile = UserProfile.objects.filter(email='hamlet@zulip.com').first()
 | 
						|
        cookies = self._get_cookies(user_profile)
 | 
						|
        cookie_header = self.get_cookie_header(cookies)
 | 
						|
        ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
 | 
						|
        yield ws.read_message()
 | 
						|
        queue_events_data = self._get_queue_events_data(user_profile.email)
 | 
						|
        request_id = ':'.join((queue_events_data['response']['queue_id'], '0'))
 | 
						|
        response = yield self._websocket_auth(ws, queue_events_data, cookies)
 | 
						|
        self.assertEqual(response[0][0], 'a')
 | 
						|
        self.assertEqual(
 | 
						|
            ujson.loads(response[0][1:]),
 | 
						|
            [
 | 
						|
                {
 | 
						|
                    "type": "ack",
 | 
						|
                    "req_id": request_id
 | 
						|
                }
 | 
						|
            ])
 | 
						|
        self.assertEqual(response[1][0], 'a')
 | 
						|
        self.assertEqual(
 | 
						|
            ujson.loads(response[1][1:]),
 | 
						|
            [
 | 
						|
                {"req_id": request_id,
 | 
						|
                 "response": {
 | 
						|
                     "result": "success",
 | 
						|
                     "status_inquiries": {},
 | 
						|
                     "msg": ""
 | 
						|
                 },
 | 
						|
                 "type": "response"}
 | 
						|
            ])
 | 
						|
 | 
						|
    @gen_test
 | 
						|
    def test_sending_private_message(self):
 | 
						|
        # type: () -> Generator[str, Any, None]
 | 
						|
        user_profile = UserProfile.objects.filter(email='hamlet@zulip.com').first()
 | 
						|
        cookies = self._get_cookies(user_profile)
 | 
						|
        cookie_header = self.get_cookie_header(cookies)
 | 
						|
        queue_events_data = self._get_queue_events_data(user_profile.email)
 | 
						|
        ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
 | 
						|
        yield ws.read_message()
 | 
						|
        yield self._websocket_auth(ws, queue_events_data, cookies)
 | 
						|
        request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
 | 
						|
        user_message = {
 | 
						|
            "req_id": request_id,
 | 
						|
            "type": "request",
 | 
						|
            "request": {
 | 
						|
                "client": "website",
 | 
						|
                "type": "private",
 | 
						|
                "subject": "(no topic)",
 | 
						|
                "stream": "",
 | 
						|
                "private_message_recipient": "othello@zulip.com",
 | 
						|
                "content": "hello",
 | 
						|
                "sender_id": user_profile.id,
 | 
						|
                "queue_id": queue_events_data['response']['queue_id'],
 | 
						|
                "to": ujson.dumps(["othello@zulip.com"]),
 | 
						|
                "reply_to": "hamlet@zulip.com",
 | 
						|
                "local_id": -1
 | 
						|
            }
 | 
						|
        }
 | 
						|
        user_message_str = ujson.dumps(user_message)
 | 
						|
        ws.write_message(ujson.dumps([user_message_str]))
 | 
						|
        ack_resp = yield ws.read_message()
 | 
						|
        msg_resp = yield ws.read_message()
 | 
						|
        self._check_message_sending(request_id, ack_resp, msg_resp, user_profile, queue_events_data)
 | 
						|
 | 
						|
    @gen_test
 | 
						|
    def test_sending_stream_message(self):
 | 
						|
        # type: () -> Generator[str, Any, None]
 | 
						|
        user_profile = UserProfile.objects.filter(email='hamlet@zulip.com').first()
 | 
						|
        cookies = self._get_cookies(user_profile)
 | 
						|
        cookie_header = self.get_cookie_header(cookies)
 | 
						|
        queue_events_data = self._get_queue_events_data(user_profile.email)
 | 
						|
        ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
 | 
						|
        yield ws.read_message()
 | 
						|
        yield self._websocket_auth(ws, queue_events_data, cookies)
 | 
						|
        request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
 | 
						|
        user_message = {
 | 
						|
            "req_id": request_id,
 | 
						|
            "type": "request",
 | 
						|
            "request": {
 | 
						|
                "client": "website",
 | 
						|
                "type": "stream",
 | 
						|
                "subject": "Stream message",
 | 
						|
                "stream": "Denmark",
 | 
						|
                "private_message_recipient": "",
 | 
						|
                "content": "hello",
 | 
						|
                "sender_id": user_profile.id,
 | 
						|
                "queue_id": queue_events_data['response']['queue_id'],
 | 
						|
                "to": ujson.dumps(["Denmark"]),
 | 
						|
                "reply_to": "hamlet@zulip.com",
 | 
						|
                "local_id": -1
 | 
						|
            }
 | 
						|
        }
 | 
						|
        user_message_str = ujson.dumps(user_message)
 | 
						|
        ws.write_message(ujson.dumps([user_message_str]))
 | 
						|
        ack_resp = yield ws.read_message()
 | 
						|
        msg_resp = yield ws.read_message()
 | 
						|
        self._check_message_sending(request_id, ack_resp, msg_resp, user_profile, queue_events_data)
 |