mirror of
https://github.com/zulip/zulip.git
synced 2025-11-02 04:53:36 +00:00
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
124 lines
4.3 KiB
Python
124 lines
4.3 KiB
Python
import ujson
|
|
from django.conf import settings
|
|
from django.db import close_old_connections
|
|
from django.core import signals
|
|
from django.test import override_settings
|
|
from tornado.httpclient import HTTPResponse
|
|
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
|
|
from tornado.testing import AsyncHTTPTestCase
|
|
from tornado.web import Application
|
|
|
|
from zerver.tornado.application import create_tornado_application
|
|
from zerver.tornado import event_queue
|
|
from zerver.tornado.event_queue import process_event
|
|
|
|
import urllib.parse
|
|
|
|
from typing import Any, Dict, Optional, List, cast
|
|
|
|
class TornadoWebTestCase(AsyncHTTPTestCase, ZulipTestCase):
|
|
def setUp(self) -> None:
|
|
super().setUp()
|
|
signals.request_started.disconnect(close_old_connections)
|
|
signals.request_finished.disconnect(close_old_connections)
|
|
self.session_cookie: Optional[Dict[str, str]] = None
|
|
|
|
def tearDown(self) -> None:
|
|
super().tearDown()
|
|
self.session_cookie = None
|
|
|
|
@override_settings(DEBUG=False)
|
|
def get_app(self) -> Application:
|
|
return create_tornado_application(9993)
|
|
|
|
def client_get(self, path: str, **kwargs: Any) -> HTTPResponse:
|
|
self.add_session_cookie(kwargs)
|
|
kwargs['skip_user_agent'] = True
|
|
self.set_http_headers(kwargs)
|
|
if 'HTTP_HOST' in kwargs:
|
|
kwargs['headers']['Host'] = kwargs['HTTP_HOST']
|
|
del kwargs['HTTP_HOST']
|
|
return self.fetch(path, method='GET', **kwargs)
|
|
|
|
def fetch_async(self, method: str, path: str, **kwargs: Any) -> None:
|
|
self.add_session_cookie(kwargs)
|
|
kwargs['skip_user_agent'] = True
|
|
self.set_http_headers(kwargs)
|
|
if 'HTTP_HOST' in kwargs:
|
|
kwargs['headers']['Host'] = kwargs['HTTP_HOST']
|
|
del kwargs['HTTP_HOST']
|
|
self.http_client.fetch(
|
|
self.get_url(path),
|
|
self.stop,
|
|
method=method,
|
|
**kwargs
|
|
)
|
|
|
|
def client_get_async(self, path: str, **kwargs: Any) -> None:
|
|
kwargs['skip_user_agent'] = True
|
|
self.set_http_headers(kwargs)
|
|
self.fetch_async('GET', path, **kwargs)
|
|
|
|
def login_user(self, *args: Any, **kwargs: Any) -> None:
|
|
super().login_user(*args, **kwargs)
|
|
session_cookie = settings.SESSION_COOKIE_NAME
|
|
session_key = self.client.session.session_key
|
|
self.session_cookie = {
|
|
"Cookie": "{}={}".format(session_cookie, session_key)
|
|
}
|
|
|
|
def get_session_cookie(self) -> Dict[str, str]:
|
|
return {} if self.session_cookie is None else self.session_cookie
|
|
|
|
def add_session_cookie(self, kwargs: Dict[str, Any]) -> None:
|
|
# TODO: Currently only allows session cookie
|
|
headers = kwargs.get('headers', {})
|
|
headers.update(self.get_session_cookie())
|
|
kwargs['headers'] = headers
|
|
|
|
def create_queue(self, **kwargs: Any) -> str:
|
|
response = self.client_get('/json/events?dont_block=true', subdomain="zulip",
|
|
skip_user_agent=True)
|
|
self.assertEqual(response.code, 200)
|
|
body = ujson.loads(response.body)
|
|
self.assertEqual(body['events'], [])
|
|
self.assertIn('queue_id', body)
|
|
return body['queue_id']
|
|
|
|
class EventsTestCase(TornadoWebTestCase):
|
|
def test_create_queue(self) -> None:
|
|
self.login_user(self.example_user('hamlet'))
|
|
queue_id = self.create_queue()
|
|
self.assertIn(queue_id, event_queue.clients)
|
|
|
|
def test_events_async(self) -> None:
|
|
user_profile = self.example_user('hamlet')
|
|
self.login_user(user_profile)
|
|
event_queue_id = self.create_queue()
|
|
data = {
|
|
'queue_id': event_queue_id,
|
|
'last_event_id': -1,
|
|
}
|
|
|
|
path = '/json/events?{}'.format(urllib.parse.urlencode(data))
|
|
self.client_get_async(path)
|
|
|
|
def process_events() -> None:
|
|
users = [user_profile.id]
|
|
event = dict(
|
|
type='test',
|
|
data='test data',
|
|
)
|
|
process_event(event, users)
|
|
|
|
self.io_loop.call_later(0.1, process_events)
|
|
response = self.wait()
|
|
data = ujson.loads(response.body)
|
|
events = data['events']
|
|
events = cast(List[Dict[str, Any]], events)
|
|
self.assertEqual(len(events), 1)
|
|
self.assertEqual(events[0]['data'], 'test data')
|
|
self.assertEqual(data['result'], 'success')
|