zerver/lib: Use Python 3 syntax for typing for several files.

This adds a number of annotations that had been missed in previous
passes.
This commit is contained in:
rht
2017-11-27 04:27:04 +00:00
committed by Tim Abbott
parent 115f7e6055
commit 229a8b38c0
9 changed files with 90 additions and 144 deletions

View File

@@ -2551,8 +2551,8 @@ def do_change_full_name(user_profile, full_name, acting_user):
send_event(dict(type='realm_bot', op='update', bot=payload),
bot_owner_user_ids(user_profile))
def check_change_full_name(user_profile, full_name_raw, acting_user):
# type: (UserProfile, Text, UserProfile) -> Text
def check_change_full_name(user_profile: UserProfile, full_name_raw: Text,
acting_user: UserProfile) -> Text:
"""Verifies that the user's proposed full name is valid. The caller
is responsible for checking check permissions. Returns the new
full name, which may differ from what was passed in (because this
@@ -4382,8 +4382,8 @@ def do_send_create_user_group_event(user_group: UserGroup, members: List[UserPro
)
send_event(event, active_user_ids(user_group.realm_id))
def check_add_user_group(realm, name, initial_members, description):
# type: (Realm, Text, List[UserProfile], Text) -> None
def check_add_user_group(realm: Realm, name: Text, initial_members: List[UserProfile],
description: Text) -> None:
try:
user_group = create_user_group(name, initial_members, realm, description=description)
do_send_create_user_group_event(user_group, initial_members)
@@ -4394,14 +4394,12 @@ def do_send_user_group_update_event(user_group: UserGroup, data: Dict[str, Any])
event = dict(type="user_group", op='update', group_id=user_group.id, data=data)
send_event(event, active_user_ids(user_group.realm_id))
def do_update_user_group_name(user_group, name):
# type: (UserGroup, Text) -> None
def do_update_user_group_name(user_group: UserGroup, name: Text) -> None:
user_group.name = name
user_group.save(update_fields=['name'])
do_send_user_group_update_event(user_group, dict(name=name))
def do_update_user_group_description(user_group, description):
# type: (UserGroup, Text) -> None
def do_update_user_group_description(user_group: UserGroup, description: Text) -> None:
user_group.description = description
user_group.save(update_fields=['description'])
do_send_user_group_update_event(user_group, dict(description=description))
@@ -4415,8 +4413,7 @@ def do_send_user_group_members_update_event(event_name: Text,
user_ids=user_ids)
send_event(event, active_user_ids(user_group.realm_id))
def bulk_add_members_to_user_group(user_group, user_profiles):
# type: (UserGroup, List[UserProfile]) -> None
def bulk_add_members_to_user_group(user_group: UserGroup, user_profiles: List[UserProfile]) -> None:
memberships = [UserGroupMembership(user_group_id=user_group.id,
user_profile=user_profile)
for user_profile in user_profiles]
@@ -4425,8 +4422,7 @@ def bulk_add_members_to_user_group(user_group, user_profiles):
user_ids = [up.id for up in user_profiles]
do_send_user_group_members_update_event('add_members', user_group, user_ids)
def remove_members_from_user_group(user_group, user_profiles):
# type: (UserGroup, List[UserProfile]) -> None
def remove_members_from_user_group(user_group: UserGroup, user_profiles: List[UserProfile]) -> None:
UserGroupMembership.objects.filter(
user_group_id=user_group.id,
user_profile__in=user_profiles).delete()

View File

@@ -4,9 +4,7 @@ from typing import Text
from zerver.lib.utils import make_safe_digest
if False:
# Typing import inside `if False` to avoid import loop.
from zerver.models import UserProfile
from zerver.models import UserProfile
import hashlib
@@ -30,8 +28,7 @@ def user_avatar_hash(uid: Text) -> Text:
user_key = uid + settings.AVATAR_SALT
return make_safe_digest(user_key, hashlib.sha1)
def user_avatar_path(user_profile):
# type: (UserProfile) -> Text
def user_avatar_path(user_profile: UserProfile) -> Text:
# WARNING: If this method is changed, you may need to do a migration
# similar to zerver/migrations/0060_move_avatars_to_be_uid_based.py .

View File

@@ -9,13 +9,11 @@ from typing import Text, Dict, Optional
class ConfigError(Exception):
pass
def get_bot_config(bot_profile):
# type: (UserProfile) -> Dict[Text, Text]
def get_bot_config(bot_profile: UserProfile) -> Dict[Text, Text]:
entries = BotUserConfigData.objects.filter(bot_profile=bot_profile)
return {entry.key: entry.value for entry in entries}
def get_bot_config_size(bot_profile, key=None):
# type: (UserProfile, Optional[Text]) -> int
def get_bot_config_size(bot_profile: UserProfile, key: Optional[Text]=None) -> int:
if key is None:
return BotUserConfigData.objects.filter(bot_profile=bot_profile) \
.annotate(key_size=Length('key'), value_size=Length('value')) \
@@ -26,8 +24,7 @@ def get_bot_config_size(bot_profile, key=None):
except BotUserConfigData.DoesNotExist:
return 0
def set_bot_config(bot_profile, key, value):
# type: (UserProfile, Text, Text) -> None
def set_bot_config(bot_profile: UserProfile, key: Text, value: Text) -> None:
config_size_limit = settings.BOT_CONFIG_SIZE_LIMIT
old_entry_size = get_bot_config_size(bot_profile, key)
new_entry_size = len(key) + len(value)

View File

@@ -28,8 +28,7 @@ def get_bot_storage_size(bot_profile, key=None):
except BotStorageData.DoesNotExist:
return 0
def set_bot_storage(bot_profile, entries):
# type: (UserProfile, List[Tuple[str, str]]) -> None
def set_bot_storage(bot_profile: UserProfile, entries: List[Tuple[str, str]]) -> None:
storage_size_limit = settings.USER_STATE_SIZE_LIMIT
storage_size_difference = 0
for key, value in entries:
@@ -47,17 +46,14 @@ def set_bot_storage(bot_profile, entries):
BotStorageData.objects.update_or_create(bot_profile=bot_profile, key=key,
defaults={'value': value})
def remove_bot_storage(bot_profile, keys):
# type: (UserProfile, List[Text]) -> None
def remove_bot_storage(bot_profile: UserProfile, keys: List[Text]) -> None:
queryset = BotStorageData.objects.filter(bot_profile=bot_profile, key__in=keys)
if len(queryset) < len(keys):
raise StateError("Key does not exist.")
queryset.delete()
def is_key_in_bot_storage(bot_profile, key):
# type: (UserProfile, Text) -> bool
def is_key_in_bot_storage(bot_profile: UserProfile, key: Text) -> bool:
return BotStorageData.objects.filter(bot_profile=bot_profile, key=key).exists()
def get_keys_in_bot_storage(bot_profile):
# type: (UserProfile) -> List[Text]
def get_keys_in_bot_storage(bot_profile: UserProfile) -> List[Text]:
return list(BotStorageData.objects.filter(bot_profile=bot_profile).values_list('key', flat=True))

View File

@@ -19,7 +19,7 @@ import os
import hashlib
if False:
from zerver.models import UserProfile, Stream, Realm, Message
from zerver.models import UserProfile, Realm, Message
# These modules have to be imported for type annotations but
# they cannot be imported at runtime due to cyclic dependency.

View File

@@ -535,8 +535,7 @@ def handle_push_notification(user_profile_id, missed_message):
apns_payload,
gcm_payload)
except requests.ConnectionError:
def failure_processor(event):
# type: (Dict[str, Any]) -> None
def failure_processor(event: Dict[str, Any]) -> None:
logging.warning(
"Maximum retries exceeded for trigger:%s event:push_notification" % (
event['user_profile_id']))

View File

@@ -9,8 +9,7 @@ from datetime import datetime, timedelta
# Return the amount of Zulip usage for this user between the two
# given dates
def seconds_usage_between(user_profile, begin, end):
# type: (UserProfile, datetime, datetime) -> timedelta
def seconds_usage_between(user_profile: UserProfile, begin: datetime, end: datetime) -> timedelta:
intervals = UserActivityInterval.objects.filter(user_profile=user_profile,
end__gte=begin,
start__lte=end)

View File

@@ -59,8 +59,7 @@ from contextlib import contextmanager
API_KEYS = {} # type: Dict[Text, Text]
def flush_caches_for_testing():
# type: () -> None
def flush_caches_for_testing() -> None:
global API_KEYS
API_KEYS = {}
@@ -75,8 +74,7 @@ class UploadSerializeMixin(SerializeMixin):
lockfile = 'var/upload_lock'
@classmethod
def setUpClass(cls, *args, **kwargs):
# type: (*Any, **Any) -> None
def setUpClass(cls: Any, *args: Any, **kwargs: Any) -> None:
if not os.path.exists(cls.lockfile):
with open(cls.lockfile, 'w'): # nocoverage - rare locking case
pass
@@ -102,8 +100,7 @@ class ZulipTestCase(TestCase):
DEFAULT_SUBDOMAIN = "zulip"
DEFAULT_REALM = Realm.objects.get(string_id='zulip')
def set_http_host(self, kwargs):
# type: (Dict[str, Any]) -> None
def set_http_host(self, kwargs: Dict[str, Any]) -> None:
if 'subdomain' in kwargs:
kwargs['HTTP_HOST'] = Realm.host_for_subdomain(kwargs['subdomain'])
del kwargs['subdomain']
@@ -111,8 +108,7 @@ class ZulipTestCase(TestCase):
kwargs['HTTP_HOST'] = Realm.host_for_subdomain(self.DEFAULT_SUBDOMAIN)
@instrument_url
def client_patch(self, url, info={}, **kwargs):
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
def client_patch(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
"""
We need to urlencode, since Django's function won't do it for us.
"""
@@ -142,47 +138,41 @@ class ZulipTestCase(TestCase):
**kwargs)
@instrument_url
def client_put(self, url, info={}, **kwargs):
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
def client_put(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs)
return django_client.put(url, encoded, **kwargs)
@instrument_url
def client_delete(self, url, info={}, **kwargs):
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
def client_delete(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs)
return django_client.delete(url, encoded, **kwargs)
@instrument_url
def client_options(self, url, info={}, **kwargs):
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
def client_options(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs)
return django_client.options(url, encoded, **kwargs)
@instrument_url
def client_head(self, url, info={}, **kwargs):
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
def client_head(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs)
return django_client.head(url, encoded, **kwargs)
@instrument_url
def client_post(self, url, info={}, **kwargs):
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
def client_post(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs)
return django_client.post(url, info, **kwargs)
@instrument_url
def client_post_request(self, url, req):
# type: (Text, Any) -> HttpResponse
def client_post_request(self, url: Text, req: Any) -> HttpResponse:
"""
We simulate hitting an endpoint here, although we
actually resolve the URL manually and hit the view
@@ -196,8 +186,7 @@ class ZulipTestCase(TestCase):
return match.func(req)
@instrument_url
def client_get(self, url, info={}, **kwargs):
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
def client_get(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
django_client = self.client # see WRAPPER_COMMENT
self.set_http_host(kwargs)
return django_client.get(url, info, **kwargs)
@@ -234,39 +223,32 @@ class ZulipTestCase(TestCase):
me='me@zulip.com',
)
def nonreg_user(self, name):
# type: (str) -> UserProfile
def nonreg_user(self, name: str) -> UserProfile:
email = self.nonreg_user_map[name]
return get_user(email, get_realm("zulip"))
def example_user(self, name):
# type: (str) -> UserProfile
def example_user(self, name: str) -> UserProfile:
email = self.example_user_map[name]
return get_user(email, get_realm('zulip'))
def mit_user(self, name):
# type: (str) -> UserProfile
def mit_user(self, name: str) -> UserProfile:
email = self.mit_user_map[name]
return get_user(email, get_realm('zephyr'))
def nonreg_email(self, name):
# type: (str) -> Text
def nonreg_email(self, name: str) -> Text:
return self.nonreg_user_map[name]
def example_email(self, name):
# type: (str) -> Text
def example_email(self, name: str) -> Text:
return self.example_user_map[name]
def mit_email(self, name):
# type: (str) -> Text
def mit_email(self, name: str) -> Text:
return self.mit_user_map[name]
def notification_bot(self):
# type: () -> UserProfile
def notification_bot(self) -> UserProfile:
return get_user('notification-bot@zulip.com', get_realm('zulip'))
def create_test_bot(self, email, user_profile, full_name, short_name, bot_type, service_name=None):
# type: (Text, UserProfile, Text, Text, int, str) -> UserProfile
def create_test_bot(self, email: Text, user_profile: UserProfile, full_name: Text,
short_name: Text, bot_type: int, service_name: str=None) -> UserProfile:
bot_profile = do_create_user(email=email, password='', realm=user_profile.realm,
full_name=full_name, short_name=short_name,
bot_type=bot_type, bot_owner=user_profile)
@@ -276,16 +258,16 @@ class ZulipTestCase(TestCase):
token='abcdef')
return bot_profile
def login_with_return(self, email, password=None, **kwargs):
# type: (Text, Optional[Text], **Any) -> HttpResponse
def login_with_return(self, email: Text, password: Optional[Text]=None,
**kwargs: Any) -> HttpResponse:
if password is None:
password = initial_password(email)
return self.client_post('/accounts/login/',
{'username': email, 'password': password},
**kwargs)
def login(self, email, password=None, fails=False, realm=None):
# type: (Text, Optional[Text], bool, Optional[Realm]) -> HttpResponse
def login(self, email: Text, password: Optional[Text]=None, fails: bool=False,
realm: Optional[Realm]=None) -> HttpResponse:
if realm is None:
realm = get_realm("zulip")
if password is None:
@@ -297,21 +279,21 @@ class ZulipTestCase(TestCase):
self.assertFalse(self.client.login(username=email, password=password,
realm=realm))
def logout(self):
# type: () -> None
def logout(self) -> None:
self.client.logout()
def register(self, email, password, **kwargs):
# type: (Text, Text, **Any) -> HttpResponse
def register(self, email: Text, password: Text, **kwargs: Any) -> HttpResponse:
self.client_post('/accounts/home/', {'email': email},
**kwargs)
return self.submit_reg_form_for_user(email, password, **kwargs)
def submit_reg_form_for_user(self, email: Text, password: Text, realm_name: Optional[Text]="Zulip Test",
realm_subdomain: Optional[Text]="zuliptest",
from_confirmation: Optional[Text]='', full_name: Optional[Text]=None,
timezone: Optional[Text]='', realm_in_root_domain: Optional[Text]=None,
default_stream_groups: Optional[List[Text]]=[], **kwargs: Any) -> HttpResponse:
def submit_reg_form_for_user(
self, email: Text, password: Text,
realm_name: Optional[Text]="Zulip Test",
realm_subdomain: Optional[Text]="zuliptest",
from_confirmation: Optional[Text]='', full_name: Optional[Text]=None,
timezone: Optional[Text]='', realm_in_root_domain: Optional[Text]=None,
default_stream_groups: Optional[List[Text]]=[], **kwargs: Any) -> HttpResponse:
"""
Stage two of the two-step registration process.
@@ -337,8 +319,8 @@ class ZulipTestCase(TestCase):
payload['realm_in_root_domain'] = realm_in_root_domain
return self.client_post('/accounts/register/', payload, **kwargs)
def get_confirmation_url_from_outbox(self, email_address, *, url_pattern=None):
# type: (Text, Text) -> Text
def get_confirmation_url_from_outbox(self, email_address: Text, *,
url_pattern: Text=None) -> Text:
from django.core.mail import outbox
if url_pattern is None:
# This is a bit of a crude heuristic, but good enough for most tests.
@@ -349,8 +331,7 @@ class ZulipTestCase(TestCase):
else:
raise AssertionError("Couldn't find a confirmation email.")
def api_auth(self, identifier, realm="zulip"):
# type: (Text, Text) -> Dict[str, Text]
def api_auth(self, identifier: Text, realm: Text="zulip") -> Dict[str, Text]:
"""
identifier: Can be an email or a remote server uuid.
"""
@@ -368,8 +349,7 @@ class ZulipTestCase(TestCase):
'HTTP_AUTHORIZATION': 'Basic ' + base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
}
def get_streams(self, email, realm):
# type: (Text, Realm) -> List[Text]
def get_streams(self, email: Text, realm: Realm) -> List[Text]:
"""
Helper function to get the stream names for a user
"""
@@ -431,22 +411,19 @@ class ZulipTestCase(TestCase):
data = result.json()
return data['messages']
def users_subscribed_to_stream(self, stream_name, realm):
# type: (Text, Realm) -> List[UserProfile]
def users_subscribed_to_stream(self, stream_name: Text, realm: Realm) -> List[UserProfile]:
stream = Stream.objects.get(name=stream_name, realm=realm)
recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
subscriptions = Subscription.objects.filter(recipient=recipient, active=True)
return [subscription.user_profile for subscription in subscriptions]
def assert_url_serves_contents_of_file(self, url, result):
# type: (str, bytes) -> None
def assert_url_serves_contents_of_file(self, url: str, result: bytes) -> None:
response = self.client_get(url)
data = b"".join(response.streaming_content)
self.assertEqual(result, data)
def assert_json_success(self, result):
# type: (HttpResponse) -> Dict[str, Any]
def assert_json_success(self, result: HttpResponse) -> Dict[str, Any]:
"""
Successful POSTs return a 200 and JSON of the form {"result": "success",
"msg": ""}.
@@ -463,8 +440,7 @@ class ZulipTestCase(TestCase):
self.assertNotEqual(json["msg"], "Error parsing JSON in response!")
return json
def get_json_error(self, result, status_code=400):
# type: (HttpResponse, int) -> Dict[str, Any]
def get_json_error(self, result: HttpResponse, status_code: int=400) -> Dict[str, Any]:
try:
json = ujson.loads(result.content)
except Exception: # nocoverage
@@ -473,16 +449,14 @@ class ZulipTestCase(TestCase):
self.assertEqual(json.get("result"), "error")
return json['msg']
def assert_json_error(self, result, msg, status_code=400):
# type: (HttpResponse, Text, int) -> None
def assert_json_error(self, result: HttpResponse, msg: Text, status_code: int=400) -> None:
"""
Invalid POSTs return an error status code and JSON of the form
{"result": "error", "msg": "reason"}.
"""
self.assertEqual(self.get_json_error(result, status_code=status_code), msg)
def assert_length(self, items, count):
# type: (List[Any], int) -> None
def assert_length(self, items: List[Any], count: int) -> None:
actual_count = len(items)
if actual_count != count: # nocoverage
print('ITEMS:\n')
@@ -491,38 +465,36 @@ class ZulipTestCase(TestCase):
print("\nexpected length: %s\nactual length: %s" % (count, actual_count))
raise AssertionError('List is unexpected size!')
def assert_json_error_contains(self, result, msg_substring, status_code=400):
# type: (HttpResponse, Text, int) -> None
def assert_json_error_contains(self, result: HttpResponse, msg_substring: Text,
status_code: int=400) -> None:
self.assertIn(msg_substring, self.get_json_error(result, status_code=status_code))
def assert_in_response(self, substring, response):
# type: (Text, HttpResponse) -> None
def assert_in_response(self, substring: Text, response: HttpResponse) -> None:
self.assertIn(substring, response.content.decode('utf-8'))
def assert_in_success_response(self, substrings, response):
# type: (List[Text], HttpResponse) -> None
def assert_in_success_response(self, substrings: List[Text],
response: HttpResponse) -> None:
self.assertEqual(response.status_code, 200)
decoded = response.content.decode('utf-8')
for substring in substrings:
self.assertIn(substring, decoded)
def assert_not_in_success_response(self, substrings, response):
# type: (List[Text], HttpResponse) -> None
def assert_not_in_success_response(self, substrings: List[Text],
response: HttpResponse) -> None:
self.assertEqual(response.status_code, 200)
decoded = response.content.decode('utf-8')
for substring in substrings:
self.assertNotIn(substring, decoded)
def fixture_data(self, type, action, file_type='json'):
# type: (Text, Text, Text) -> Text
def fixture_data(self, type: Text, action: Text, file_type: Text='json') -> Text:
fn = os.path.join(
os.path.dirname(__file__),
"../webhooks/%s/fixtures/%s.%s" % (type, action, file_type)
)
return open(fn).read()
def make_stream(self, stream_name, realm=None, invite_only=False):
# type: (Text, Optional[Realm], Optional[bool]) -> Stream
def make_stream(self, stream_name: Text, realm: Optional[Realm]=None,
invite_only: Optional[bool]=False) -> Stream:
if realm is None:
realm = self.DEFAULT_REALM
@@ -543,8 +515,7 @@ class ZulipTestCase(TestCase):
return stream
# Subscribe to a stream directly
def subscribe(self, user_profile, stream_name):
# type: (UserProfile, Text) -> Stream
def subscribe(self, user_profile: UserProfile, stream_name: Text) -> Stream:
try:
stream = get_stream(stream_name, user_profile.realm)
from_stream_creation = False
@@ -553,8 +524,7 @@ class ZulipTestCase(TestCase):
bulk_add_subscriptions([stream], [user_profile], from_stream_creation=from_stream_creation)
return stream
def unsubscribe(self, user_profile, stream_name):
# type: (UserProfile, Text) -> None
def unsubscribe(self, user_profile: UserProfile, stream_name: Text) -> None:
stream = get_stream(stream_name, user_profile.realm)
bulk_remove_subscriptions([user_profile], [stream])
@@ -571,7 +541,8 @@ class ZulipTestCase(TestCase):
**kw)
return result
def check_user_subscribed_only_to_streams(self, user_name: Text, streams: List[Stream]) -> None:
def check_user_subscribed_only_to_streams(self, user_name: Text,
streams: List[Stream]) -> None:
streams = sorted(streams, key=lambda x: x.name)
subscribed_streams = gather_subscriptions(self.nonreg_user(user_name))[0]
@@ -580,8 +551,9 @@ class ZulipTestCase(TestCase):
for x, y in zip(subscribed_streams, streams):
self.assertEqual(x["name"], y.name)
def send_json_payload(self, user_profile, url, payload, stream_name=None, **post_params):
# type: (UserProfile, Text, Union[Text, Dict[str, Any]], Optional[Text], **Any) -> Message
def send_json_payload(self, user_profile: UserProfile, url: Text,
payload: Union[Text, Dict[str, Any]],
stream_name: Optional[Text]=None, **post_params: Any) -> Message:
if stream_name is not None:
self.subscribe(user_profile, stream_name)
@@ -597,17 +569,14 @@ class ZulipTestCase(TestCase):
return msg
def get_last_message(self):
# type: () -> Message
def get_last_message(self) -> Message:
return Message.objects.latest('id')
def get_second_to_last_message(self):
# type: () -> Message
def get_second_to_last_message(self) -> Message:
return Message.objects.all().order_by('-id')[1]
@contextmanager
def simulated_markdown_failure(self):
# type: () -> Iterator[None]
def simulated_markdown_failure(self) -> Iterator[None]:
'''
This raises a failure inside of the try/except block of
bugdown.__init__.do_convert.
@@ -632,12 +601,10 @@ class WebhookTestCase(ZulipTestCase):
FIXTURE_DIR_NAME = None # type: Optional[Text]
@property
def test_user(self):
# type: () -> UserProfile
def test_user(self) -> UserProfile:
return get_user(self.TEST_USER_EMAIL, get_realm("zulip"))
def setUp(self):
# type: () -> None
def setUp(self) -> None:
self.url = self.build_webhook_url()
def send_and_test_stream_message(self, fixture_name, expected_subject=None,
@@ -666,8 +633,7 @@ class WebhookTestCase(ZulipTestCase):
return msg
def build_webhook_url(self, *args, **kwargs):
# type: (*Any, **Any) -> Text
def build_webhook_url(self, *args: Any, **kwargs: Any) -> Text:
url = self.URL_TEMPLATE
if url.find("api_key") >= 0:
api_key = self.test_user.api_key
@@ -690,18 +656,15 @@ class WebhookTestCase(ZulipTestCase):
return url[:-1] if has_arguments else url
def get_body(self, fixture_name):
# type: (Text) -> Union[Text, Dict[str, Text]]
def get_body(self, fixture_name: Text) -> Union[Text, Dict[str, Text]]:
"""Can be implemented either as returning a dictionary containing the
post parameters or as string containing the body of the request."""
return ujson.dumps(ujson.loads(self.fixture_data(self.FIXTURE_DIR_NAME, fixture_name)))
def do_test_subject(self, msg, expected_subject):
# type: (Message, Optional[Text]) -> None
def do_test_subject(self, msg: Message, expected_subject: Optional[Text]) -> None:
if expected_subject is not None:
self.assertEqual(msg.topic_name(), expected_subject)
def do_test_message(self, msg, expected_message):
# type: (Message, Optional[Text]) -> None
def do_test_message(self, msg: Message, expected_message: Optional[Text]) -> None:
if expected_message is not None:
self.assertEqual(msg.content, expected_message)

View File

@@ -72,8 +72,7 @@ def create_user_group(name: Text, members: List[UserProfile], realm: Realm,
])
return user_group
def get_memberships_of_users(user_group, members):
# type: (UserGroup, List[UserProfile]) -> List[int]
def get_memberships_of_users(user_group: UserGroup, members: List[UserProfile]) -> List[int]:
return list(UserGroupMembership.objects.filter(
user_group=user_group,
user_profile__in=members).values_list('user_profile_id', flat=True))