From 63dff4549f730b2ec1f725be1db6d933806204a2 Mon Sep 17 00:00:00 2001 From: Aditya Bansal Date: Fri, 11 May 2018 05:10:45 +0530 Subject: [PATCH] zerver/lib/test_classes.py: Change use of typing.Text to str. --- zerver/lib/test_classes.py | 140 ++++++++++++++++++------------------- 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/zerver/lib/test_classes.py b/zerver/lib/test_classes.py index 3bbb9dd9a5..5267474063 100644 --- a/zerver/lib/test_classes.py +++ b/zerver/lib/test_classes.py @@ -1,6 +1,6 @@ from contextlib import contextmanager from typing import (cast, Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional, - Sized, Tuple, Union, Text) + Sized, Tuple, Union) from django.urls import resolve from django.conf import settings @@ -56,7 +56,7 @@ import re import ujson import urllib -API_KEYS = {} # type: Dict[Text, Text] +API_KEYS = {} # type: Dict[str, str] def flush_caches_for_testing() -> None: global API_KEYS @@ -107,7 +107,7 @@ class ZulipTestCase(TestCase): kwargs['HTTP_HOST'] = Realm.host_for_subdomain(self.DEFAULT_SUBDOMAIN) @instrument_url - def client_patch(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: + def client_patch(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: """ We need to urlencode, since Django's function won't do it for us. """ @@ -117,7 +117,7 @@ class ZulipTestCase(TestCase): return django_client.patch(url, encoded, **kwargs) @instrument_url - def client_patch_multipart(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: + def client_patch_multipart(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: """ Use this for patch requests that have file uploads or that need some sort of multi-part content. In the future @@ -136,41 +136,41 @@ class ZulipTestCase(TestCase): **kwargs) @instrument_url - def client_put(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: + def client_put(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: encoded = urllib.parse.urlencode(info) django_client = self.client # see WRAPPER_COMMENT self.set_http_host(kwargs) return django_client.put(url, encoded, **kwargs) @instrument_url - def client_delete(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: + def client_delete(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: encoded = urllib.parse.urlencode(info) django_client = self.client # see WRAPPER_COMMENT self.set_http_host(kwargs) return django_client.delete(url, encoded, **kwargs) @instrument_url - def client_options(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: + def client_options(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: encoded = urllib.parse.urlencode(info) django_client = self.client # see WRAPPER_COMMENT self.set_http_host(kwargs) return django_client.options(url, encoded, **kwargs) @instrument_url - def client_head(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: + def client_head(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: encoded = urllib.parse.urlencode(info) django_client = self.client # see WRAPPER_COMMENT self.set_http_host(kwargs) return django_client.head(url, encoded, **kwargs) @instrument_url - def client_post(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: + def client_post(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: django_client = self.client # see WRAPPER_COMMENT self.set_http_host(kwargs) return django_client.post(url, info, **kwargs) @instrument_url - def client_post_request(self, url: Text, req: Any) -> HttpResponse: + def client_post_request(self, url: str, req: Any) -> HttpResponse: """ We simulate hitting an endpoint here, although we actually resolve the URL manually and hit the view @@ -184,7 +184,7 @@ class ZulipTestCase(TestCase): return match.func(req) @instrument_url - def client_get(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: + def client_get(self, url: str, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse: django_client = self.client # see WRAPPER_COMMENT self.set_http_host(kwargs) return django_client.get(url, info, **kwargs) @@ -234,20 +234,20 @@ class ZulipTestCase(TestCase): email = self.mit_user_map[name] return get_user(email, get_realm('zephyr')) - def nonreg_email(self, name: str) -> Text: + def nonreg_email(self, name: str) -> str: return self.nonreg_user_map[name] - def example_email(self, name: str) -> Text: + def example_email(self, name: str) -> str: return self.example_user_map[name] - def mit_email(self, name: str) -> Text: + def mit_email(self, name: str) -> str: return self.mit_user_map[name] def notification_bot(self) -> UserProfile: return get_user('notification-bot@zulip.com', get_realm('zulip')) - def create_test_bot(self, short_name: Text, user_profile: UserProfile, - assert_json_error_msg: Text=None, **extras: Any) -> Optional[UserProfile]: + def create_test_bot(self, short_name: str, user_profile: UserProfile, + assert_json_error_msg: str=None, **extras: Any) -> Optional[UserProfile]: self.login(user_profile.email) bot_info = { 'short_name': short_name, @@ -264,7 +264,7 @@ class ZulipTestCase(TestCase): bot_profile = get_user(bot_email, user_profile.realm) return bot_profile - def login_with_return(self, email: Text, password: Optional[Text]=None, + def login_with_return(self, email: str, password: Optional[str]=None, **kwargs: Any) -> HttpResponse: if password is None: password = initial_password(email) @@ -272,7 +272,7 @@ class ZulipTestCase(TestCase): {'username': email, 'password': password}, **kwargs) - def login(self, email: Text, password: Optional[Text]=None, fails: bool=False, + def login(self, email: str, password: Optional[str]=None, fails: bool=False, realm: Optional[Realm]=None) -> HttpResponse: if realm is None: realm = get_realm("zulip") @@ -288,18 +288,18 @@ class ZulipTestCase(TestCase): def logout(self) -> None: self.client.logout() - def register(self, email: Text, password: Text, **kwargs: Any) -> HttpResponse: + def register(self, email: str, password: str, **kwargs: Any) -> HttpResponse: self.client_post('/accounts/home/', {'email': email}, **kwargs) return self.submit_reg_form_for_user(email, password, **kwargs) def submit_reg_form_for_user( - self, email: Text, password: Text, - realm_name: Optional[Text]="Zulip Test", - realm_subdomain: Optional[Text]="zuliptest", - from_confirmation: Optional[Text]='', full_name: Optional[Text]=None, - timezone: Optional[Text]='', realm_in_root_domain: Optional[Text]=None, - default_stream_groups: Optional[List[Text]]=[], **kwargs: Any) -> HttpResponse: + self, email: str, password: str, + realm_name: Optional[str]="Zulip Test", + realm_subdomain: Optional[str]="zuliptest", + from_confirmation: Optional[str]='', full_name: Optional[str]=None, + timezone: Optional[str]='', realm_in_root_domain: Optional[str]=None, + default_stream_groups: Optional[List[str]]=[], **kwargs: Any) -> HttpResponse: """ Stage two of the two-step registration process. @@ -325,8 +325,8 @@ class ZulipTestCase(TestCase): payload['realm_in_root_domain'] = realm_in_root_domain return self.client_post('/accounts/register/', payload, **kwargs) - def get_confirmation_url_from_outbox(self, email_address: Text, *, - url_pattern: Text=None) -> Text: + def get_confirmation_url_from_outbox(self, email_address: str, *, + url_pattern: str=None) -> str: from django.core.mail import outbox if url_pattern is None: # This is a bit of a crude heuristic, but good enough for most tests. @@ -337,7 +337,7 @@ class ZulipTestCase(TestCase): else: raise AssertionError("Couldn't find a confirmation email.") - def encode_credentials(self, identifier: Text, realm: Text="zulip") -> Text: + def encode_credentials(self, identifier: str, realm: str="zulip") -> str: """ identifier: Can be an email or a remote server uuid. """ @@ -353,27 +353,27 @@ class ZulipTestCase(TestCase): credentials = "%s:%s" % (identifier, api_key) return 'Basic ' + base64.b64encode(credentials.encode('utf-8')).decode('utf-8') - def api_get(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: + def api_get(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse: kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) return self.client_get(*args, **kwargs) - def api_post(self, identifier: Text, *args: Any, **kwargs: Any) -> HttpResponse: + def api_post(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse: kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(identifier, kwargs.get('realm', 'zulip')) return self.client_post(*args, **kwargs) - def api_patch(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: + def api_patch(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse: kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) return self.client_patch(*args, **kwargs) - def api_put(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: + def api_put(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse: kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) return self.client_put(*args, **kwargs) - def api_delete(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: + def api_delete(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse: kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) return self.client_delete(*args, **kwargs) - def get_streams(self, email: Text, realm: Realm) -> List[Text]: + def get_streams(self, email: str, realm: Realm) -> List[str]: """ Helper function to get the stream names for a user """ @@ -381,10 +381,10 @@ class ZulipTestCase(TestCase): subs = get_stream_subscriptions_for_user(user_profile).filter( active=True, ) - return [cast(Text, get_display_recipient(sub.recipient)) for sub in subs] + return [cast(str, get_display_recipient(sub.recipient)) for sub in subs] - def send_personal_message(self, from_email: Text, to_email: Text, content: Text="test content", - sender_realm: Text="zulip") -> int: + def send_personal_message(self, from_email: str, to_email: str, content: str="test content", + sender_realm: str="zulip") -> int: sender = get_user(from_email, get_realm(sender_realm)) recipient_list = [to_email] @@ -395,8 +395,8 @@ class ZulipTestCase(TestCase): content ) - def send_huddle_message(self, from_email: Text, to_emails: List[Text], content: Text="test content", - sender_realm: Text="zulip") -> int: + def send_huddle_message(self, from_email: str, to_emails: List[str], content: str="test content", + sender_realm: str="zulip") -> int: sender = get_user(from_email, get_realm(sender_realm)) assert(len(to_emails) >= 2) @@ -408,8 +408,8 @@ class ZulipTestCase(TestCase): content ) - def send_stream_message(self, sender_email: Text, stream_name: Text, content: Text="test content", - topic_name: Text="test", sender_realm: Text="zulip") -> int: + def send_stream_message(self, sender_email: str, stream_name: str, content: str="test content", + topic_name: str="test", sender_realm: str="zulip") -> int: sender = get_user(sender_email, get_realm(sender_realm)) (sending_client, _) = Client.objects.get_or_create(name="test suite") @@ -436,7 +436,7 @@ class ZulipTestCase(TestCase): data = self.get_messages_response(anchor, num_before, num_after, use_first_unread_anchor) return data['messages'] - def users_subscribed_to_stream(self, stream_name: Text, realm: Realm) -> List[UserProfile]: + def users_subscribed_to_stream(self, stream_name: str, realm: Realm) -> List[UserProfile]: stream = Stream.objects.get(name=stream_name, realm=realm) recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM) subscriptions = Subscription.objects.filter(recipient=recipient, active=True) @@ -474,7 +474,7 @@ class ZulipTestCase(TestCase): self.assertEqual(json.get("result"), "error") return json['msg'] - def assert_json_error(self, result: HttpResponse, msg: Text, status_code: int=400) -> None: + def assert_json_error(self, result: HttpResponse, msg: str, status_code: int=400) -> None: """ Invalid POSTs return an error status code and JSON of the form {"result": "error", "msg": "reason"}. @@ -490,42 +490,42 @@ class ZulipTestCase(TestCase): print("\nexpected length: %s\nactual length: %s" % (count, actual_count)) raise AssertionError('List is unexpected size!') - def assert_json_error_contains(self, result: HttpResponse, msg_substring: Text, + def assert_json_error_contains(self, result: HttpResponse, msg_substring: str, status_code: int=400) -> None: self.assertIn(msg_substring, self.get_json_error(result, status_code=status_code)) - def assert_in_response(self, substring: Text, response: HttpResponse) -> None: + def assert_in_response(self, substring: str, response: HttpResponse) -> None: self.assertIn(substring, response.content.decode('utf-8')) - def assert_in_success_response(self, substrings: List[Text], + def assert_in_success_response(self, substrings: List[str], response: HttpResponse) -> None: self.assertEqual(response.status_code, 200) decoded = response.content.decode('utf-8') for substring in substrings: self.assertIn(substring, decoded) - def assert_not_in_success_response(self, substrings: List[Text], + def assert_not_in_success_response(self, substrings: List[str], response: HttpResponse) -> None: self.assertEqual(response.status_code, 200) decoded = response.content.decode('utf-8') for substring in substrings: self.assertNotIn(substring, decoded) - def webhook_fixture_data(self, type: Text, action: Text, file_type: Text='json') -> Text: + def webhook_fixture_data(self, type: str, action: str, file_type: str='json') -> str: fn = os.path.join( os.path.dirname(__file__), "../webhooks/%s/fixtures/%s.%s" % (type, action, file_type) ) return open(fn).read() - def fixture_data(self, file_name: Text, type: Text='') -> Text: + def fixture_data(self, file_name: str, type: str='') -> str: fn = os.path.join( os.path.dirname(__file__), "../tests/fixtures/%s/%s" % (type, file_name) ) return open(fn).read() - def make_stream(self, stream_name: Text, realm: Optional[Realm]=None, + def make_stream(self, stream_name: str, realm: Optional[Realm]=None, invite_only: Optional[bool]=False, history_public_to_subscribers: Optional[bool]=None) -> Stream: if realm is None: @@ -552,7 +552,7 @@ class ZulipTestCase(TestCase): return stream # Subscribe to a stream directly - def subscribe(self, user_profile: UserProfile, stream_name: Text) -> Stream: + def subscribe(self, user_profile: UserProfile, stream_name: str) -> Stream: try: stream = get_stream(stream_name, user_profile.realm) from_stream_creation = False @@ -561,12 +561,12 @@ class ZulipTestCase(TestCase): bulk_add_subscriptions([stream], [user_profile], from_stream_creation=from_stream_creation) return stream - def unsubscribe(self, user_profile: UserProfile, stream_name: Text) -> None: + def unsubscribe(self, user_profile: UserProfile, stream_name: str) -> None: stream = get_stream(stream_name, user_profile.realm) bulk_remove_subscriptions([user_profile], [stream]) # Subscribe to a stream by making an API request - def common_subscribe_to_streams(self, email: Text, streams: Iterable[Text], + def common_subscribe_to_streams(self, email: str, streams: Iterable[str], extra_post_data: Dict[str, Any]={}, invite_only: bool=False, **kwargs: Any) -> HttpResponse: post_data = {'subscriptions': ujson.dumps([{"name": stream} for stream in streams]), @@ -576,7 +576,7 @@ class ZulipTestCase(TestCase): result = self.api_post(email, "/api/v1/users/me/subscriptions", post_data, **kwargs) return result - def check_user_subscribed_only_to_streams(self, user_name: Text, + def check_user_subscribed_only_to_streams(self, user_name: str, streams: List[Stream]) -> None: streams = sorted(streams, key=lambda x: x.name) subscribed_streams = gather_subscriptions(self.nonreg_user(user_name))[0] @@ -586,9 +586,9 @@ class ZulipTestCase(TestCase): for x, y in zip(subscribed_streams, streams): self.assertEqual(x["name"], y.name) - def send_json_payload(self, user_profile: UserProfile, url: Text, - payload: Union[Text, Dict[str, Any]], - stream_name: Optional[Text]=None, **post_params: Any) -> Message: + def send_json_payload(self, user_profile: UserProfile, url: str, + payload: Union[str, Dict[str, Any]], + stream_name: Optional[str]=None, **post_params: Any) -> Message: if stream_name is not None: self.subscribe(user_profile, stream_name) @@ -630,10 +630,10 @@ class WebhookTestCase(ZulipTestCase): If you create your url in uncommon way you can override build_webhook_url method In case that you need modify body or create it without using fixture you can also override get_body method """ - STREAM_NAME = None # type: Optional[Text] + STREAM_NAME = None # type: Optional[str] TEST_USER_EMAIL = 'webhook-bot@zulip.com' - URL_TEMPLATE = None # type: Optional[Text] - FIXTURE_DIR_NAME = None # type: Optional[Text] + URL_TEMPLATE = None # type: Optional[str] + FIXTURE_DIR_NAME = None # type: Optional[str] @property def test_user(self) -> UserProfile: @@ -642,13 +642,13 @@ class WebhookTestCase(ZulipTestCase): def setUp(self) -> None: self.url = self.build_webhook_url() - def api_stream_message(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse: + def api_stream_message(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse: kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email) return self.send_and_test_stream_message(*args, **kwargs) - def send_and_test_stream_message(self, fixture_name: Text, expected_subject: Optional[Text]=None, - expected_message: Optional[Text]=None, - content_type: Optional[Text]="application/json", **kwargs: Any) -> Message: + def send_and_test_stream_message(self, fixture_name: str, expected_subject: Optional[str]=None, + expected_message: Optional[str]=None, + content_type: Optional[str]="application/json", **kwargs: Any) -> Message: payload = self.get_body(fixture_name) if content_type is not None: kwargs['content_type'] = content_type @@ -659,8 +659,8 @@ class WebhookTestCase(ZulipTestCase): return msg - def send_and_test_private_message(self, fixture_name: Text, expected_subject: Text=None, - expected_message: Text=None, content_type: str="application/json", + def send_and_test_private_message(self, fixture_name: str, expected_subject: str=None, + expected_message: str=None, content_type: str="application/json", **kwargs: Any)-> Message: payload = self.get_body(fixture_name) if content_type is not None: @@ -673,7 +673,7 @@ class WebhookTestCase(ZulipTestCase): return msg - def build_webhook_url(self, *args: Any, **kwargs: Any) -> Text: + def build_webhook_url(self, *args: Any, **kwargs: Any) -> str: url = self.URL_TEMPLATE if url.find("api_key") >= 0: api_key = self.test_user.api_key @@ -696,15 +696,15 @@ class WebhookTestCase(ZulipTestCase): return url[:-1] if has_arguments else url - def get_body(self, fixture_name: Text) -> Union[Text, Dict[str, Text]]: + def get_body(self, fixture_name: str) -> Union[str, Dict[str, str]]: """Can be implemented either as returning a dictionary containing the post parameters or as string containing the body of the request.""" return ujson.dumps(ujson.loads(self.webhook_fixture_data(self.FIXTURE_DIR_NAME, fixture_name))) - def do_test_subject(self, msg: Message, expected_subject: Optional[Text]) -> None: + def do_test_subject(self, msg: Message, expected_subject: Optional[str]) -> None: if expected_subject is not None: self.assertEqual(msg.topic_name(), expected_subject) - def do_test_message(self, msg: Message, expected_message: Optional[Text]) -> None: + def do_test_message(self, msg: Message, expected_message: Optional[str]) -> None: if expected_message is not None: self.assertEqual(msg.content, expected_message)