zerver/lib/test_classes.py: Change use of typing.Text to str.

This commit is contained in:
Aditya Bansal
2018-05-11 05:10:45 +05:30
committed by Tim Abbott
parent c02011d7f5
commit 63dff4549f

View File

@@ -1,6 +1,6 @@
from contextlib import contextmanager from contextlib import contextmanager
from typing import (cast, Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional, from typing import (cast, Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional,
Sized, Tuple, Union, Text) Sized, Tuple, Union)
from django.urls import resolve from django.urls import resolve
from django.conf import settings from django.conf import settings
@@ -56,7 +56,7 @@ import re
import ujson import ujson
import urllib import urllib
API_KEYS = {} # type: Dict[Text, Text] API_KEYS = {} # type: Dict[str, str]
def flush_caches_for_testing() -> None: def flush_caches_for_testing() -> None:
global API_KEYS global API_KEYS
@@ -107,7 +107,7 @@ class ZulipTestCase(TestCase):
kwargs['HTTP_HOST'] = Realm.host_for_subdomain(self.DEFAULT_SUBDOMAIN) kwargs['HTTP_HOST'] = Realm.host_for_subdomain(self.DEFAULT_SUBDOMAIN)
@instrument_url @instrument_url
def client_patch(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: def client_patch(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
""" """
We need to urlencode, since Django's function won't do it for us. We need to urlencode, since Django's function won't do it for us.
""" """
@@ -117,7 +117,7 @@ class ZulipTestCase(TestCase):
return django_client.patch(url, encoded, **kwargs) return django_client.patch(url, encoded, **kwargs)
@instrument_url @instrument_url
def client_patch_multipart(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: def client_patch_multipart(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
""" """
Use this for patch requests that have file uploads or Use this for patch requests that have file uploads or
that need some sort of multi-part content. In the future that need some sort of multi-part content. In the future
@@ -136,41 +136,41 @@ class ZulipTestCase(TestCase):
**kwargs) **kwargs)
@instrument_url @instrument_url
def client_put(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: def client_put(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
encoded = urllib.parse.urlencode(info) encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs) self.set_http_host(kwargs)
return django_client.put(url, encoded, **kwargs) return django_client.put(url, encoded, **kwargs)
@instrument_url @instrument_url
def client_delete(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: def client_delete(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
encoded = urllib.parse.urlencode(info) encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs) self.set_http_host(kwargs)
return django_client.delete(url, encoded, **kwargs) return django_client.delete(url, encoded, **kwargs)
@instrument_url @instrument_url
def client_options(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: def client_options(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
encoded = urllib.parse.urlencode(info) encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs) self.set_http_host(kwargs)
return django_client.options(url, encoded, **kwargs) return django_client.options(url, encoded, **kwargs)
@instrument_url @instrument_url
def client_head(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: def client_head(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
encoded = urllib.parse.urlencode(info) encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs) self.set_http_host(kwargs)
return django_client.head(url, encoded, **kwargs) return django_client.head(url, encoded, **kwargs)
@instrument_url @instrument_url
def client_post(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: def client_post(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
django_client = self.client # see WRAPPER_COMMENT django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs) self.set_http_host(kwargs)
return django_client.post(url, info, **kwargs) return django_client.post(url, info, **kwargs)
@instrument_url @instrument_url
def client_post_request(self, url: Text, req: Any) -> HttpResponse: def client_post_request(self, url: str, req: Any) -> HttpResponse:
""" """
We simulate hitting an endpoint here, although we We simulate hitting an endpoint here, although we
actually resolve the URL manually and hit the view actually resolve the URL manually and hit the view
@@ -184,7 +184,7 @@ class ZulipTestCase(TestCase):
return match.func(req) return match.func(req)
@instrument_url @instrument_url
def client_get(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: def client_get(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
django_client = self.client # see WRAPPER_COMMENT django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs) self.set_http_host(kwargs)
return django_client.get(url, info, **kwargs) return django_client.get(url, info, **kwargs)
@@ -234,20 +234,20 @@ class ZulipTestCase(TestCase):
email = self.mit_user_map[name] email = self.mit_user_map[name]
return get_user(email, get_realm('zephyr')) return get_user(email, get_realm('zephyr'))
def nonreg_email(self, name: str) -> Text: def nonreg_email(self, name: str) -> str:
return self.nonreg_user_map[name] return self.nonreg_user_map[name]
def example_email(self, name: str) -> Text: def example_email(self, name: str) -> str:
return self.example_user_map[name] return self.example_user_map[name]
def mit_email(self, name: str) -> Text: def mit_email(self, name: str) -> str:
return self.mit_user_map[name] return self.mit_user_map[name]
def notification_bot(self) -> UserProfile: def notification_bot(self) -> UserProfile:
return get_user('notification-bot@zulip.com', get_realm('zulip')) return get_user('notification-bot@zulip.com', get_realm('zulip'))
def create_test_bot(self, short_name: Text, user_profile: UserProfile, def create_test_bot(self, short_name: str, user_profile: UserProfile,
assert_json_error_msg: Text=None, **extras: Any) -> Optional[UserProfile]: assert_json_error_msg: str=None, **extras: Any) -> Optional[UserProfile]:
self.login(user_profile.email) self.login(user_profile.email)
bot_info = { bot_info = {
'short_name': short_name, 'short_name': short_name,
@@ -264,7 +264,7 @@ class ZulipTestCase(TestCase):
bot_profile = get_user(bot_email, user_profile.realm) bot_profile = get_user(bot_email, user_profile.realm)
return bot_profile return bot_profile
def login_with_return(self, email: Text, password: Optional[Text]=None, def login_with_return(self, email: str, password: Optional[str]=None,
**kwargs: Any) -> HttpResponse: **kwargs: Any) -> HttpResponse:
if password is None: if password is None:
password = initial_password(email) password = initial_password(email)
@@ -272,7 +272,7 @@ class ZulipTestCase(TestCase):
{'username': email, 'password': password}, {'username': email, 'password': password},
**kwargs) **kwargs)
def login(self, email: Text, password: Optional[Text]=None, fails: bool=False, def login(self, email: str, password: Optional[str]=None, fails: bool=False,
realm: Optional[Realm]=None) -> HttpResponse: realm: Optional[Realm]=None) -> HttpResponse:
if realm is None: if realm is None:
realm = get_realm("zulip") realm = get_realm("zulip")
@@ -288,18 +288,18 @@ class ZulipTestCase(TestCase):
def logout(self) -> None: def logout(self) -> None:
self.client.logout() self.client.logout()
def register(self, email: Text, password: Text, **kwargs: Any) -> HttpResponse: def register(self, email: str, password: str, **kwargs: Any) -> HttpResponse:
self.client_post('/accounts/home/', {'email': email}, self.client_post('/accounts/home/', {'email': email},
**kwargs) **kwargs)
return self.submit_reg_form_for_user(email, password, **kwargs) return self.submit_reg_form_for_user(email, password, **kwargs)
def submit_reg_form_for_user( def submit_reg_form_for_user(
self, email: Text, password: Text, self, email: str, password: str,
realm_name: Optional[Text]="Zulip Test", realm_name: Optional[str]="Zulip Test",
realm_subdomain: Optional[Text]="zuliptest", realm_subdomain: Optional[str]="zuliptest",
from_confirmation: Optional[Text]='', full_name: Optional[Text]=None, from_confirmation: Optional[str]='', full_name: Optional[str]=None,
timezone: Optional[Text]='', realm_in_root_domain: Optional[Text]=None, timezone: Optional[str]='', realm_in_root_domain: Optional[str]=None,
default_stream_groups: Optional[List[Text]]=[], **kwargs: Any) -> HttpResponse: default_stream_groups: Optional[List[str]]=[], **kwargs: Any) -> HttpResponse:
""" """
Stage two of the two-step registration process. Stage two of the two-step registration process.
@@ -325,8 +325,8 @@ class ZulipTestCase(TestCase):
payload['realm_in_root_domain'] = realm_in_root_domain payload['realm_in_root_domain'] = realm_in_root_domain
return self.client_post('/accounts/register/', payload, **kwargs) return self.client_post('/accounts/register/', payload, **kwargs)
def get_confirmation_url_from_outbox(self, email_address: Text, *, def get_confirmation_url_from_outbox(self, email_address: str, *,
url_pattern: Text=None) -> Text: url_pattern: str=None) -> str:
from django.core.mail import outbox from django.core.mail import outbox
if url_pattern is None: if url_pattern is None:
# This is a bit of a crude heuristic, but good enough for most tests. # This is a bit of a crude heuristic, but good enough for most tests.
@@ -337,7 +337,7 @@ class ZulipTestCase(TestCase):
else: else:
raise AssertionError("Couldn't find a confirmation email.") raise AssertionError("Couldn't find a confirmation email.")
def encode_credentials(self, identifier: Text, realm: Text="zulip") -> Text: def encode_credentials(self, identifier: str, realm: str="zulip") -> str:
""" """
identifier: Can be an email or a remote server uuid. identifier: Can be an email or a remote server uuid.
""" """
@@ -353,27 +353,27 @@ class ZulipTestCase(TestCase):
credentials = "%s:%s" % (identifier, api_key) credentials = "%s:%s" % (identifier, api_key)
return 'Basic ' + base64.b64encode(credentials.encode('utf-8')).decode('utf-8') return 'Basic ' + base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
def api_get(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: def api_get(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
return self.client_get(*args, **kwargs) return self.client_get(*args, **kwargs)
def api_post(self, identifier: Text, *args: Any, **kwargs: Any) -> HttpResponse: def api_post(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(identifier, kwargs.get('realm', 'zulip')) kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(identifier, kwargs.get('realm', 'zulip'))
return self.client_post(*args, **kwargs) return self.client_post(*args, **kwargs)
def api_patch(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: def api_patch(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
return self.client_patch(*args, **kwargs) return self.client_patch(*args, **kwargs)
def api_put(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: def api_put(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
return self.client_put(*args, **kwargs) return self.client_put(*args, **kwargs)
def api_delete(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: def api_delete(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
return self.client_delete(*args, **kwargs) return self.client_delete(*args, **kwargs)
def get_streams(self, email: Text, realm: Realm) -> List[Text]: def get_streams(self, email: str, realm: Realm) -> List[str]:
""" """
Helper function to get the stream names for a user Helper function to get the stream names for a user
""" """
@@ -381,10 +381,10 @@ class ZulipTestCase(TestCase):
subs = get_stream_subscriptions_for_user(user_profile).filter( subs = get_stream_subscriptions_for_user(user_profile).filter(
active=True, active=True,
) )
return [cast(Text, get_display_recipient(sub.recipient)) for sub in subs] return [cast(str, get_display_recipient(sub.recipient)) for sub in subs]
def send_personal_message(self, from_email: Text, to_email: Text, content: Text="test content", def send_personal_message(self, from_email: str, to_email: str, content: str="test content",
sender_realm: Text="zulip") -> int: sender_realm: str="zulip") -> int:
sender = get_user(from_email, get_realm(sender_realm)) sender = get_user(from_email, get_realm(sender_realm))
recipient_list = [to_email] recipient_list = [to_email]
@@ -395,8 +395,8 @@ class ZulipTestCase(TestCase):
content content
) )
def send_huddle_message(self, from_email: Text, to_emails: List[Text], content: Text="test content", def send_huddle_message(self, from_email: str, to_emails: List[str], content: str="test content",
sender_realm: Text="zulip") -> int: sender_realm: str="zulip") -> int:
sender = get_user(from_email, get_realm(sender_realm)) sender = get_user(from_email, get_realm(sender_realm))
assert(len(to_emails) >= 2) assert(len(to_emails) >= 2)
@@ -408,8 +408,8 @@ class ZulipTestCase(TestCase):
content content
) )
def send_stream_message(self, sender_email: Text, stream_name: Text, content: Text="test content", def send_stream_message(self, sender_email: str, stream_name: str, content: str="test content",
topic_name: Text="test", sender_realm: Text="zulip") -> int: topic_name: str="test", sender_realm: str="zulip") -> int:
sender = get_user(sender_email, get_realm(sender_realm)) sender = get_user(sender_email, get_realm(sender_realm))
(sending_client, _) = Client.objects.get_or_create(name="test suite") (sending_client, _) = Client.objects.get_or_create(name="test suite")
@@ -436,7 +436,7 @@ class ZulipTestCase(TestCase):
data = self.get_messages_response(anchor, num_before, num_after, use_first_unread_anchor) data = self.get_messages_response(anchor, num_before, num_after, use_first_unread_anchor)
return data['messages'] return data['messages']
def users_subscribed_to_stream(self, stream_name: Text, realm: Realm) -> List[UserProfile]: def users_subscribed_to_stream(self, stream_name: str, realm: Realm) -> List[UserProfile]:
stream = Stream.objects.get(name=stream_name, realm=realm) stream = Stream.objects.get(name=stream_name, realm=realm)
recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM) recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
subscriptions = Subscription.objects.filter(recipient=recipient, active=True) subscriptions = Subscription.objects.filter(recipient=recipient, active=True)
@@ -474,7 +474,7 @@ class ZulipTestCase(TestCase):
self.assertEqual(json.get("result"), "error") self.assertEqual(json.get("result"), "error")
return json['msg'] return json['msg']
def assert_json_error(self, result: HttpResponse, msg: Text, status_code: int=400) -> None: def assert_json_error(self, result: HttpResponse, msg: str, status_code: int=400) -> None:
""" """
Invalid POSTs return an error status code and JSON of the form Invalid POSTs return an error status code and JSON of the form
{"result": "error", "msg": "reason"}. {"result": "error", "msg": "reason"}.
@@ -490,42 +490,42 @@ class ZulipTestCase(TestCase):
print("\nexpected length: %s\nactual length: %s" % (count, actual_count)) print("\nexpected length: %s\nactual length: %s" % (count, actual_count))
raise AssertionError('List is unexpected size!') raise AssertionError('List is unexpected size!')
def assert_json_error_contains(self, result: HttpResponse, msg_substring: Text, def assert_json_error_contains(self, result: HttpResponse, msg_substring: str,
status_code: int=400) -> None: status_code: int=400) -> None:
self.assertIn(msg_substring, self.get_json_error(result, status_code=status_code)) self.assertIn(msg_substring, self.get_json_error(result, status_code=status_code))
def assert_in_response(self, substring: Text, response: HttpResponse) -> None: def assert_in_response(self, substring: str, response: HttpResponse) -> None:
self.assertIn(substring, response.content.decode('utf-8')) self.assertIn(substring, response.content.decode('utf-8'))
def assert_in_success_response(self, substrings: List[Text], def assert_in_success_response(self, substrings: List[str],
response: HttpResponse) -> None: response: HttpResponse) -> None:
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
decoded = response.content.decode('utf-8') decoded = response.content.decode('utf-8')
for substring in substrings: for substring in substrings:
self.assertIn(substring, decoded) self.assertIn(substring, decoded)
def assert_not_in_success_response(self, substrings: List[Text], def assert_not_in_success_response(self, substrings: List[str],
response: HttpResponse) -> None: response: HttpResponse) -> None:
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
decoded = response.content.decode('utf-8') decoded = response.content.decode('utf-8')
for substring in substrings: for substring in substrings:
self.assertNotIn(substring, decoded) self.assertNotIn(substring, decoded)
def webhook_fixture_data(self, type: Text, action: Text, file_type: Text='json') -> Text: def webhook_fixture_data(self, type: str, action: str, file_type: str='json') -> str:
fn = os.path.join( fn = os.path.join(
os.path.dirname(__file__), os.path.dirname(__file__),
"../webhooks/%s/fixtures/%s.%s" % (type, action, file_type) "../webhooks/%s/fixtures/%s.%s" % (type, action, file_type)
) )
return open(fn).read() return open(fn).read()
def fixture_data(self, file_name: Text, type: Text='') -> Text: def fixture_data(self, file_name: str, type: str='') -> str:
fn = os.path.join( fn = os.path.join(
os.path.dirname(__file__), os.path.dirname(__file__),
"../tests/fixtures/%s/%s" % (type, file_name) "../tests/fixtures/%s/%s" % (type, file_name)
) )
return open(fn).read() return open(fn).read()
def make_stream(self, stream_name: Text, realm: Optional[Realm]=None, def make_stream(self, stream_name: str, realm: Optional[Realm]=None,
invite_only: Optional[bool]=False, invite_only: Optional[bool]=False,
history_public_to_subscribers: Optional[bool]=None) -> Stream: history_public_to_subscribers: Optional[bool]=None) -> Stream:
if realm is None: if realm is None:
@@ -552,7 +552,7 @@ class ZulipTestCase(TestCase):
return stream return stream
# Subscribe to a stream directly # Subscribe to a stream directly
def subscribe(self, user_profile: UserProfile, stream_name: Text) -> Stream: def subscribe(self, user_profile: UserProfile, stream_name: str) -> Stream:
try: try:
stream = get_stream(stream_name, user_profile.realm) stream = get_stream(stream_name, user_profile.realm)
from_stream_creation = False from_stream_creation = False
@@ -561,12 +561,12 @@ class ZulipTestCase(TestCase):
bulk_add_subscriptions([stream], [user_profile], from_stream_creation=from_stream_creation) bulk_add_subscriptions([stream], [user_profile], from_stream_creation=from_stream_creation)
return stream return stream
def unsubscribe(self, user_profile: UserProfile, stream_name: Text) -> None: def unsubscribe(self, user_profile: UserProfile, stream_name: str) -> None:
stream = get_stream(stream_name, user_profile.realm) stream = get_stream(stream_name, user_profile.realm)
bulk_remove_subscriptions([user_profile], [stream]) bulk_remove_subscriptions([user_profile], [stream])
# Subscribe to a stream by making an API request # Subscribe to a stream by making an API request
def common_subscribe_to_streams(self, email: Text, streams: Iterable[Text], def common_subscribe_to_streams(self, email: str, streams: Iterable[str],
extra_post_data: Dict[str, Any]={}, invite_only: bool=False, extra_post_data: Dict[str, Any]={}, invite_only: bool=False,
**kwargs: Any) -> HttpResponse: **kwargs: Any) -> HttpResponse:
post_data = {'subscriptions': ujson.dumps([{"name": stream} for stream in streams]), post_data = {'subscriptions': ujson.dumps([{"name": stream} for stream in streams]),
@@ -576,7 +576,7 @@ class ZulipTestCase(TestCase):
result = self.api_post(email, "/api/v1/users/me/subscriptions", post_data, **kwargs) result = self.api_post(email, "/api/v1/users/me/subscriptions", post_data, **kwargs)
return result return result
def check_user_subscribed_only_to_streams(self, user_name: Text, def check_user_subscribed_only_to_streams(self, user_name: str,
streams: List[Stream]) -> None: streams: List[Stream]) -> None:
streams = sorted(streams, key=lambda x: x.name) streams = sorted(streams, key=lambda x: x.name)
subscribed_streams = gather_subscriptions(self.nonreg_user(user_name))[0] subscribed_streams = gather_subscriptions(self.nonreg_user(user_name))[0]
@@ -586,9 +586,9 @@ class ZulipTestCase(TestCase):
for x, y in zip(subscribed_streams, streams): for x, y in zip(subscribed_streams, streams):
self.assertEqual(x["name"], y.name) self.assertEqual(x["name"], y.name)
def send_json_payload(self, user_profile: UserProfile, url: Text, def send_json_payload(self, user_profile: UserProfile, url: str,
payload: Union[Text, Dict[str, Any]], payload: Union[str, Dict[str, Any]],
stream_name: Optional[Text]=None, **post_params: Any) -> Message: stream_name: Optional[str]=None, **post_params: Any) -> Message:
if stream_name is not None: if stream_name is not None:
self.subscribe(user_profile, stream_name) self.subscribe(user_profile, stream_name)
@@ -630,10 +630,10 @@ class WebhookTestCase(ZulipTestCase):
If you create your url in uncommon way you can override build_webhook_url method If you create your url in uncommon way you can override build_webhook_url method
In case that you need modify body or create it without using fixture you can also override get_body method In case that you need modify body or create it without using fixture you can also override get_body method
""" """
STREAM_NAME = None # type: Optional[Text] STREAM_NAME = None # type: Optional[str]
TEST_USER_EMAIL = 'webhook-bot@zulip.com' TEST_USER_EMAIL = 'webhook-bot@zulip.com'
URL_TEMPLATE = None # type: Optional[Text] URL_TEMPLATE = None # type: Optional[str]
FIXTURE_DIR_NAME = None # type: Optional[Text] FIXTURE_DIR_NAME = None # type: Optional[str]
@property @property
def test_user(self) -> UserProfile: def test_user(self) -> UserProfile:
@@ -642,13 +642,13 @@ class WebhookTestCase(ZulipTestCase):
def setUp(self) -> None: def setUp(self) -> None:
self.url = self.build_webhook_url() self.url = self.build_webhook_url()
def api_stream_message(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: def api_stream_message(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
return self.send_and_test_stream_message(*args, **kwargs) return self.send_and_test_stream_message(*args, **kwargs)
def send_and_test_stream_message(self, fixture_name: Text, expected_subject: Optional[Text]=None, def send_and_test_stream_message(self, fixture_name: str, expected_subject: Optional[str]=None,
expected_message: Optional[Text]=None, expected_message: Optional[str]=None,
content_type: Optional[Text]="application/json", **kwargs: Any) -> Message: content_type: Optional[str]="application/json", **kwargs: Any) -> Message:
payload = self.get_body(fixture_name) payload = self.get_body(fixture_name)
if content_type is not None: if content_type is not None:
kwargs['content_type'] = content_type kwargs['content_type'] = content_type
@@ -659,8 +659,8 @@ class WebhookTestCase(ZulipTestCase):
return msg return msg
def send_and_test_private_message(self, fixture_name: Text, expected_subject: Text=None, def send_and_test_private_message(self, fixture_name: str, expected_subject: str=None,
expected_message: Text=None, content_type: str="application/json", expected_message: str=None, content_type: str="application/json",
**kwargs: Any)-> Message: **kwargs: Any)-> Message:
payload = self.get_body(fixture_name) payload = self.get_body(fixture_name)
if content_type is not None: if content_type is not None:
@@ -673,7 +673,7 @@ class WebhookTestCase(ZulipTestCase):
return msg return msg
def build_webhook_url(self, *args: Any, **kwargs: Any) -> Text: def build_webhook_url(self, *args: Any, **kwargs: Any) -> str:
url = self.URL_TEMPLATE url = self.URL_TEMPLATE
if url.find("api_key") >= 0: if url.find("api_key") >= 0:
api_key = self.test_user.api_key api_key = self.test_user.api_key
@@ -696,15 +696,15 @@ class WebhookTestCase(ZulipTestCase):
return url[:-1] if has_arguments else url return url[:-1] if has_arguments else url
def get_body(self, fixture_name: Text) -> Union[Text, Dict[str, Text]]: def get_body(self, fixture_name: str) -> Union[str, Dict[str, str]]:
"""Can be implemented either as returning a dictionary containing the """Can be implemented either as returning a dictionary containing the
post parameters or as string containing the body of the request.""" post parameters or as string containing the body of the request."""
return ujson.dumps(ujson.loads(self.webhook_fixture_data(self.FIXTURE_DIR_NAME, fixture_name))) return ujson.dumps(ujson.loads(self.webhook_fixture_data(self.FIXTURE_DIR_NAME, fixture_name)))
def do_test_subject(self, msg: Message, expected_subject: Optional[Text]) -> None: def do_test_subject(self, msg: Message, expected_subject: Optional[str]) -> None:
if expected_subject is not None: if expected_subject is not None:
self.assertEqual(msg.topic_name(), expected_subject) self.assertEqual(msg.topic_name(), expected_subject)
def do_test_message(self, msg: Message, expected_message: Optional[Text]) -> None: def do_test_message(self, msg: Message, expected_message: Optional[str]) -> None:
if expected_message is not None: if expected_message is not None:
self.assertEqual(msg.content, expected_message) self.assertEqual(msg.content, expected_message)