Files
zulip/zerver/tests/test_tornado.py
Anders Kaseorg e00d4be6d5 events: Check last_event_id for validity.
This verifies that the client passed a last_event_id that actually
came from the queue instead of making up an ID from the future.  It
turns out one of our tests was making up such an ID, but legitimate
clients are expected not to do so.

Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2019-07-26 17:18:28 -07:00

476 lines
19 KiB
Python

# -*- coding: utf-8 -*-
"""WebSocketBaseTestCase is based on combination of Tornado
and Django test systems. It require to use decorator '@gen.coroutine'
for each test case method( see documentation: http://www.tornadoweb.org/en/stable/testing.html).
It requires implementation of 'get_app' method to initialize tornado application and launch it.
"""
import time
import ujson
from django.conf import settings
from django.http import HttpRequest, HttpResponse
from django.db import close_old_connections
from django.core import signals
from django.test import override_settings
from tornado.httpclient import HTTPRequest, HTTPResponse
from zerver.lib.test_helpers import POSTRequestMock
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.topic import TOPIC_NAME
from zerver.models import UserProfile
from tornado import gen
from tornado.testing import AsyncHTTPTestCase, gen_test
from tornado.web import Application
from tornado.websocket import websocket_connect
from zerver.tornado.application import create_tornado_application
from zerver.tornado import event_queue
from zerver.tornado.event_queue import fetch_events, \
process_event
from zerver.tornado.views import get_events
from http.cookies import SimpleCookie
import urllib.parse
from unittest.mock import patch
from typing import Any, Callable, Dict, Generator, Optional, List, cast
class TornadoWebTestCase(AsyncHTTPTestCase, ZulipTestCase):
def setUp(self) -> None:
super().setUp()
signals.request_started.disconnect(close_old_connections)
signals.request_finished.disconnect(close_old_connections)
self.session_cookie = None # type: Optional[Dict[str, str]]
def tearDown(self) -> None:
super().tearDown()
self.session_cookie = None
@override_settings(DEBUG=False)
def get_app(self) -> Application:
return create_tornado_application(9993)
def client_get(self, path: str, **kwargs: Any) -> HTTPResponse:
self.add_session_cookie(kwargs)
self.set_http_host(kwargs)
if 'HTTP_HOST' in kwargs:
kwargs['headers']['Host'] = kwargs['HTTP_HOST']
del kwargs['HTTP_HOST']
return self.fetch(path, method='GET', **kwargs)
def fetch_async(self, method: str, path: str, **kwargs: Any) -> None:
self.add_session_cookie(kwargs)
self.set_http_host(kwargs)
if 'HTTP_HOST' in kwargs:
kwargs['headers']['Host'] = kwargs['HTTP_HOST']
del kwargs['HTTP_HOST']
self.http_client.fetch(
self.get_url(path),
self.stop,
method=method,
**kwargs
)
def client_get_async(self, path: str, **kwargs: Any) -> None:
self.set_http_host(kwargs)
self.fetch_async('GET', path, **kwargs)
def login(self, *args: Any, **kwargs: Any) -> None:
super().login(*args, **kwargs)
session_cookie = settings.SESSION_COOKIE_NAME
session_key = self.client.session.session_key
self.session_cookie = {
"Cookie": "{}={}".format(session_cookie, session_key)
}
def get_session_cookie(self) -> Dict[str, str]:
return {} if self.session_cookie is None else self.session_cookie
def add_session_cookie(self, kwargs: Dict[str, Any]) -> None:
# TODO: Currently only allows session cookie
headers = kwargs.get('headers', {})
headers.update(self.get_session_cookie())
kwargs['headers'] = headers
def create_queue(self, **kwargs: Any) -> str:
response = self.client_get('/json/events?dont_block=true', subdomain="zulip")
self.assertEqual(response.code, 200)
body = ujson.loads(response.body)
self.assertEqual(body['events'], [])
self.assertIn('queue_id', body)
return body['queue_id']
class EventsTestCase(TornadoWebTestCase):
def test_create_queue(self) -> None:
self.login(self.example_email('hamlet'))
queue_id = self.create_queue()
self.assertIn(queue_id, event_queue.clients)
def test_events_async(self) -> None:
user_profile = self.example_user('hamlet')
self.login(user_profile.email)
event_queue_id = self.create_queue()
data = {
'queue_id': event_queue_id,
'last_event_id': -1,
}
path = '/json/events?{}'.format(urllib.parse.urlencode(data))
self.client_get_async(path)
def process_events() -> None:
users = [user_profile.id]
event = dict(
type='test',
data='test data',
)
process_event(event, users)
self.io_loop.call_later(0.1, process_events)
response = self.wait()
data = ujson.loads(response.body)
events = data['events']
events = cast(List[Dict[str, Any]], events)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]['data'], 'test data')
self.assertEqual(data['result'], 'success')
class WebSocketBaseTestCase(AsyncHTTPTestCase, ZulipTestCase):
def setUp(self) -> None:
settings.RUNNING_INSIDE_TORNADO = True
super().setUp()
def tearDown(self) -> None:
super().tearDown()
settings.RUNNING_INSIDE_TORNADO = False
@gen.coroutine
def ws_connect(self, path: str, cookie_header: str,
compression_options: Optional[Any]=None
) -> Generator[Any, Callable[[HTTPRequest, Optional[Any]], Any], None]:
request = HTTPRequest(url='ws://127.0.0.1:%d%s' % (self.get_http_port(), path))
request.headers.add('Cookie', cookie_header)
ws = yield websocket_connect(
request,
compression_options=compression_options)
raise gen.Return(ws)
@gen.coroutine
def close(self, ws: Any) -> None:
"""Close a websocket connection and wait for the server side.
"""
ws.close()
class TornadoTestCase(WebSocketBaseTestCase):
@override_settings(DEBUG=False)
def get_app(self) -> Application:
""" Return tornado app to launch for test cases
"""
return create_tornado_application(9993)
@staticmethod
def tornado_call(view_func: Callable[[HttpRequest, UserProfile], HttpResponse],
user_profile: UserProfile,
post_data: Dict[str, Any]) -> HttpResponse:
request = POSTRequestMock(post_data, user_profile)
return view_func(request, user_profile)
@staticmethod
def get_cookie_header(cookies: SimpleCookie) -> str:
return ';'.join(
["{}={}".format(name, value.value) for name, value in cookies.items()])
def _get_cookies(self, user_profile: UserProfile) -> SimpleCookie:
resp = self.login_with_return(user_profile.email)
return resp.cookies
@gen.coroutine
def _websocket_auth(self, ws: Any,
queue_events_data: Dict[str, Dict[str, str]],
cookies: SimpleCookie) -> Generator[str, str, None]:
auth_queue_id = ':'.join((queue_events_data['response']['queue_id'], '0'))
message = {
"req_id": auth_queue_id,
"type": "auth",
"request": {
"csrf_token": cookies.get('csrftoken').coded_value,
"queue_id": queue_events_data['response']['queue_id'],
"status_inquiries": []
}
}
auth_frame_str = ujson.dumps(message)
ws.write_message(ujson.dumps([auth_frame_str]))
response_ack = yield ws.read_message()
response_message = yield ws.read_message()
raise gen.Return([response_ack, response_message])
@staticmethod
def _get_queue_events_data(email: str) -> Dict[str, Dict[str, str]]:
user_profile = UserProfile.objects.filter(email=email).first()
events_query = {
'queue_id': None,
'narrow': [],
'handler_id': 0,
'user_profile_email': user_profile.email,
'all_public_streams': False,
'client_type_name': 'website',
'new_queue_data': {
'apply_markdown': True,
'client_gravatar': False,
'narrow': [],
'user_profile_email': user_profile.email,
'all_public_streams': False,
'realm_id': user_profile.realm_id,
'client_type_name': 'website',
'event_types': None,
'user_profile_id': user_profile.id,
'queue_timeout': 0,
'last_connection_time': time.time()},
'last_event_id': -1,
'event_types': None,
'user_profile_id': user_profile.id,
'dont_block': True,
'lifespan_secs': 0
}
result = fetch_events(events_query)
return result
def _check_message_sending(self, request_id: str,
ack_resp: str,
msg_resp: str,
profile: UserProfile,
queue_events_data: Dict[str, Dict[str, str]]) -> None:
self.assertEqual(ack_resp[0], 'a')
self.assertEqual(
ujson.loads(ack_resp[1:]),
[
{
"type": "ack",
"req_id": request_id
}
])
self.assertEqual(msg_resp[0], 'a')
result = self.tornado_call(get_events, profile,
{"queue_id": queue_events_data['response']['queue_id'],
"user_client": "website",
"last_event_id": -1,
"dont_block": ujson.dumps(True),
})
result_content = ujson.loads(result.content)
self.assertEqual(len(result_content['events']), 1)
message_id = result_content['events'][0]['message']['id']
self.assertEqual(
ujson.loads(msg_resp[1:]),
[
{
"type": "response",
"response":
{
"result": "success",
"id": message_id,
"msg": ""
},
"req_id": request_id
}
])
@gen_test
def test_tornado_connect(self) -> Generator[str, Any, None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
response = yield ws.read_message()
self.assertEqual(response, 'o')
yield self.close(ws)
@gen_test
def test_tornado_auth(self) -> Generator[str, 'TornadoTestCase', None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
queue_events_data = self._get_queue_events_data(user_profile.email)
request_id = ':'.join((queue_events_data['response']['queue_id'], '0'))
response = yield self._websocket_auth(ws, queue_events_data, cookies)
self.assertEqual(response[0][0], 'a')
self.assertEqual(
ujson.loads(response[0][1:]),
[
{
"type": "ack",
"req_id": request_id
}
])
self.assertEqual(response[1][0], 'a')
self.assertEqual(
ujson.loads(response[1][1:]),
[
{"req_id": request_id,
"response": {
"result": "success",
"status_inquiries": {},
"msg": ""
},
"type": "response"}
])
yield self.close(ws)
@gen_test
def test_sending_private_message(self) -> Generator[str, Any, None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
queue_events_data = self._get_queue_events_data(user_profile.email)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
yield self._websocket_auth(ws, queue_events_data, cookies)
request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
user_message = {
"req_id": request_id,
"type": "request",
"request": {
"client": "website",
"type": "private",
TOPIC_NAME: "(no topic)",
"stream": "",
"private_message_recipient": self.example_email('othello'),
"content": "hello",
"sender_id": user_profile.id,
"queue_id": queue_events_data['response']['queue_id'],
"to": ujson.dumps([self.example_email('othello')]),
"reply_to": self.example_email('hamlet'),
"local_id": -1
}
}
user_message_str = ujson.dumps(user_message)
ws.write_message(ujson.dumps([user_message_str]))
ack_resp = yield ws.read_message()
msg_resp = yield ws.read_message()
self._check_message_sending(request_id, ack_resp, msg_resp, user_profile, queue_events_data)
yield self.close(ws)
@gen_test
def test_sending_stream_message(self) -> Generator[str, Any, None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
queue_events_data = self._get_queue_events_data(user_profile.email)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
yield self._websocket_auth(ws, queue_events_data, cookies)
request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
user_message = {
"req_id": request_id,
"type": "request",
"request": {
"client": "website",
"type": "stream",
TOPIC_NAME: "Stream message",
"stream": "Denmark",
"private_message_recipient": "",
"content": "hello",
"sender_id": user_profile.id,
"queue_id": queue_events_data['response']['queue_id'],
"to": ujson.dumps(["Denmark"]),
"reply_to": self.example_email('hamlet'),
"local_id": -1
}
}
user_message_str = ujson.dumps(user_message)
ws.write_message(ujson.dumps([user_message_str]))
ack_resp = yield ws.read_message()
msg_resp = yield ws.read_message()
self._check_message_sending(request_id, ack_resp, msg_resp, user_profile, queue_events_data)
yield self.close(ws)
@gen_test
def test_sending_stream_message_from_electron(self) -> Generator[str, Any, None]:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
queue_events_data = self._get_queue_events_data(user_profile.email)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
yield self._websocket_auth(ws, queue_events_data, cookies)
request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
user_message = {
"req_id": request_id,
"type": "request",
"request": {
"client": "website",
"type": "stream",
TOPIC_NAME: "Stream message",
"stream": "Denmark",
"private_message_recipient": "",
"content": "hello",
"sender_id": user_profile.id,
"queue_id": queue_events_data['response']['queue_id'],
"to": ujson.dumps(["Denmark"]),
"reply_to": self.example_email('hamlet'),
"local_id": -1,
"socket_user_agent": "ZulipElectron/1.5.0"
}
}
user_message_str = ujson.dumps(user_message)
ws.write_message(ujson.dumps([user_message_str]))
ack_resp = yield ws.read_message()
msg_resp = yield ws.read_message()
self._check_message_sending(request_id, ack_resp, msg_resp, user_profile, queue_events_data)
yield self.close(ws)
@gen_test
def test_sending_message_error(self) -> Any:
user_profile = self.example_user('hamlet')
cookies = self._get_cookies(user_profile)
cookie_header = self.get_cookie_header(cookies)
queue_events_data = self._get_queue_events_data(user_profile.email)
ws = yield self.ws_connect('/sockjs/366/v8nw22qe/websocket', cookie_header=cookie_header)
yield ws.read_message()
yield self._websocket_auth(ws, queue_events_data, cookies)
request_id = ':'.join((queue_events_data['response']['queue_id'], '1'))
user_message = {
"req_id": request_id,
"type": "request",
"request": {
"client": "website",
"type": "stream",
TOPIC_NAME: "Stream message",
"stream": "Denmark",
"private_message_recipient": "",
"content": "hello",
"sender_id": user_profile.id,
"queue_id": queue_events_data['response']['queue_id'],
"to": ujson.dumps(["Denmark"]),
"reply_to": self.example_email('hamlet'),
"local_id": -1
}
}
user_message_str = ujson.dumps(user_message)
ws.write_message(ujson.dumps([user_message_str]))
def wrap_get_response(request: HttpRequest) -> HttpResponse:
request._log_data = {'bugdown_requests_start': 0,
'time_started': 0,
'bugdown_time_start': 0,
'remote_cache_time_start': 0,
'remote_cache_requests_start': 0,
'startup_time_delta': 0}
class ResponseObject(object):
def __init__(self) -> None:
self.content = '{"msg":"","id":0,"result":"error"}'.encode('utf8')
return ResponseObject()
# Simulate an error response to cover the respective code paths.
with patch('django.core.handlers.base.BaseHandler.get_response', wraps=wrap_get_response), \
patch('django.db.connection.is_usable', return_value=False), \
patch('os.kill', side_effect=OSError()):
yield ws.read_message()
yield self.close(ws)