zerver: Change use of typing.Text to str.

This commit is contained in:
Aditya Bansal
2018-05-11 05:09:17 +05:30
committed by Tim Abbott
parent 67bf71472a
commit 993d50f5ab
3 changed files with 52 additions and 52 deletions

View File

@@ -31,7 +31,7 @@ import logging
from io import BytesIO
import urllib
from typing import Union, Any, Callable, Sequence, Dict, Optional, TypeVar, Text, Tuple, cast
from typing import Union, Any, Callable, Sequence, Dict, Optional, TypeVar, Tuple, cast
from zerver.lib.logging_util import log_to_file
# This is a hack to ensure that RemoteZulipServer always exists even
@@ -131,7 +131,7 @@ def require_realm_admin(func: ViewFuncT) -> ViewFuncT:
from zerver.lib.user_agent import parse_user_agent
def get_client_name(request: HttpRequest, is_browser_view: bool) -> Text:
def get_client_name(request: HttpRequest, is_browser_view: bool) -> str:
# If the API request specified a client in the request content,
# that has priority. Otherwise, extract the client from the
# User-Agent.
@@ -164,9 +164,9 @@ def get_client_name(request: HttpRequest, is_browser_view: bool) -> Text:
def process_client(request: HttpRequest, user_profile: UserProfile,
*, is_browser_view: bool=False,
client_name: Optional[Text]=None,
client_name: Optional[str]=None,
remote_server_request: bool=False,
query: Optional[Text]=None) -> None:
query: Optional[str]=None) -> None:
if client_name is None:
client_name = get_client_name(request, is_browser_view)
@@ -178,21 +178,21 @@ class InvalidZulipServerError(JsonableError):
code = ErrorCode.INVALID_ZULIP_SERVER
data_fields = ['role']
def __init__(self, role: Text) -> None:
self.role = role # type: Text
def __init__(self, role: str) -> None:
self.role = role # type: str
@staticmethod
def msg_format() -> Text:
def msg_format() -> str:
return "Zulip server auth failure: {role} is not registered"
class InvalidZulipServerKeyError(InvalidZulipServerError):
@staticmethod
def msg_format() -> Text:
def msg_format() -> str:
return "Zulip server auth failure: key does not match role {role}"
def validate_api_key(request: HttpRequest, role: Optional[Text],
api_key: Text, is_webhook: bool=False,
client_name: Optional[Text]=None) -> Union[UserProfile, RemoteZulipServer]:
def validate_api_key(request: HttpRequest, role: Optional[str],
api_key: str, is_webhook: bool=False,
client_name: Optional[str]=None) -> Union[UserProfile, RemoteZulipServer]:
# Remove whitespace to protect users from trivial errors.
api_key = api_key.strip()
if role is not None:
@@ -245,7 +245,7 @@ def validate_account_and_subdomain(request: HttpRequest, user_profile: UserProfi
user_profile.email, user_profile.realm.subdomain, get_subdomain(request)))
raise JsonableError(_("Account is not associated with this subdomain"))
def access_user_by_api_key(request: HttpRequest, api_key: Text, email: Optional[Text]=None) -> UserProfile:
def access_user_by_api_key(request: HttpRequest, api_key: str, email: Optional[str]=None) -> UserProfile:
try:
user_profile = get_user_profile_by_api_key(api_key)
except UserProfile.DoesNotExist:
@@ -261,7 +261,7 @@ def access_user_by_api_key(request: HttpRequest, api_key: Text, email: Optional[
return user_profile
def log_exception_to_webhook_logger(request: HttpRequest, user_profile: UserProfile,
request_body: Optional[Text]=None) -> None:
request_body: Optional[str]=None) -> None:
if request_body is not None:
payload = request_body
else:
@@ -314,14 +314,14 @@ def full_webhook_client_name(raw_client_name: Optional[str]=None) -> Optional[st
return "Zulip{}Webhook".format(raw_client_name)
# Use this for webhook views that don't get an email passed in.
def api_key_only_webhook_view(webhook_client_name: Text) -> Callable[[ViewFuncT], ViewFuncT]:
def api_key_only_webhook_view(webhook_client_name: str) -> Callable[[ViewFuncT], ViewFuncT]:
# TODO The typing here could be improved by using the Extended Callable types:
# https://mypy.readthedocs.io/en/latest/kinds_of_types.html#extended-callable-types
def _wrapped_view_func(view_func: ViewFuncT) -> ViewFuncT:
@csrf_exempt
@has_request_variables
@wraps(view_func)
def _wrapped_func_arguments(request: HttpRequest, api_key: Text=REQ(),
def _wrapped_func_arguments(request: HttpRequest, api_key: str=REQ(),
*args: Any, **kwargs: Any) -> HttpResponse:
user_profile = validate_api_key(request, None, api_key, is_webhook=True,
client_name=full_webhook_client_name(webhook_client_name))
@@ -338,8 +338,8 @@ def api_key_only_webhook_view(webhook_client_name: Text) -> Callable[[ViewFuncT]
return _wrapped_view_func
# From Django 1.8, modified to leave off ?next=/
def redirect_to_login(next: Text, login_url: Optional[Text]=None,
redirect_field_name: Text=REDIRECT_FIELD_NAME) -> HttpResponseRedirect:
def redirect_to_login(next: str, login_url: Optional[str]=None,
redirect_field_name: str=REDIRECT_FIELD_NAME) -> HttpResponseRedirect:
"""
Redirects the user to the login page, passing the given 'next' page
"""
@@ -356,8 +356,8 @@ def redirect_to_login(next: Text, login_url: Optional[Text]=None,
return HttpResponseRedirect(urllib.parse.urlunparse(login_url_parts))
# From Django 1.8
def user_passes_test(test_func: Callable[[HttpResponse], bool], login_url: Optional[Text]=None,
redirect_field_name: Text=REDIRECT_FIELD_NAME) -> Callable[[ViewFuncT], ViewFuncT]:
def user_passes_test(test_func: Callable[[HttpResponse], bool], login_url: Optional[str]=None,
redirect_field_name: str=REDIRECT_FIELD_NAME) -> Callable[[ViewFuncT], ViewFuncT]:
"""
Decorator for views that checks that the user passes the given test,
redirecting to the log-in page if necessary. The test should be a callable
@@ -426,8 +426,8 @@ def human_users_only(view_func: ViewFuncT) -> ViewFuncT:
# Based on Django 1.8's @login_required
def zulip_login_required(
function: Optional[ViewFuncT]=None,
redirect_field_name: Text=REDIRECT_FIELD_NAME,
login_url: Text=settings.HOME_NOT_LOGGED_IN,
redirect_field_name: str=REDIRECT_FIELD_NAME,
login_url: str=settings.HOME_NOT_LOGGED_IN,
) -> Union[Callable[[ViewFuncT], ViewFuncT], ViewFuncT]:
actual_decorator = user_passes_test(
logged_in_and_active,
@@ -489,9 +489,9 @@ def authenticated_api_view(is_webhook: bool=False) -> Callable[[ViewFuncT], View
@require_post
@has_request_variables
@wraps(view_func)
def _wrapped_func_arguments(request: HttpRequest, email: Text=REQ(),
api_key: Optional[Text]=REQ(default=None),
api_key_legacy: Optional[Text]=REQ('api-key', default=None),
def _wrapped_func_arguments(request: HttpRequest, email: str=REQ(),
api_key: Optional[str]=REQ(default=None),
api_key_legacy: Optional[str]=REQ('api-key', default=None),
*args: Any, **kwargs: Any) -> HttpResponse:
if api_key is None:
api_key = api_key_legacy
@@ -643,7 +643,7 @@ def authenticated_json_view(view_func: ViewFuncT) -> ViewFuncT:
return authenticate_log_and_execute_json(request, view_func, *args, **kwargs)
return _wrapped_view_func # type: ignore # https://github.com/python/mypy/issues/1927
def is_local_addr(addr: Text) -> bool:
def is_local_addr(addr: str) -> bool:
return addr in ('127.0.0.1', '::1')
# These views are used by the main Django server to notify the Tornado server
@@ -687,23 +687,23 @@ def internal_notify_view(is_tornado_view: bool) -> Callable[[ViewFuncT], ViewFun
return _wrapped_view_func
# Converter functions for use with has_request_variables
def to_non_negative_int(s: Text) -> int:
def to_non_negative_int(s: str) -> int:
x = int(s)
if x < 0:
raise ValueError("argument is negative")
return x
def to_not_negative_int_or_none(s: Text) -> Optional[int]:
def to_not_negative_int_or_none(s: str) -> Optional[int]:
if s:
return to_non_negative_int(s)
return None
def to_utc_datetime(timestamp: Text) -> datetime.datetime:
def to_utc_datetime(timestamp: str) -> datetime.datetime:
return timestamp_to_datetime(float(timestamp))
def statsd_increment(counter: Text, val: int=1,
def statsd_increment(counter: str, val: int=1,
) -> Callable[[Callable[..., ReturnT]], Callable[..., ReturnT]]:
"""Increments a statsd counter on completion of the
decorated function.
@@ -718,7 +718,7 @@ def statsd_increment(counter: Text, val: int=1,
return wrapped_func
return wrapper
def rate_limit_user(request: HttpRequest, user: UserProfile, domain: Text) -> None:
def rate_limit_user(request: HttpRequest, user: UserProfile, domain: str) -> None:
"""Returns whether or not a user was rate limited. Will raise a RateLimited exception
if the user has been rate limited, otherwise returns and modifies request to contain
the rate limit information"""
@@ -739,7 +739,7 @@ def rate_limit_user(request: HttpRequest, user: UserProfile, domain: Text) -> No
request._ratelimit_remaining = calls_remaining
request._ratelimit_secs_to_freedom = time_reset
def rate_limit(domain: Text='all') -> Callable[[ViewFuncT], ViewFuncT]:
def rate_limit(domain: str='all') -> Callable[[ViewFuncT], ViewFuncT]:
"""Rate-limits a view. Takes an optional 'domain' param if you wish to
rate limit different types of API calls independently.

View File

@@ -32,7 +32,7 @@ import logging
import re
import DNS
from typing import Any, Callable, List, Optional, Text, Dict
from typing import Any, Callable, List, Optional, Dict
MIT_VALIDATION_ERROR = u'That user does not exist at MIT or is a ' + \
u'<a href="https://ist.mit.edu/email-lists">mailing list</a>. ' + \
@@ -42,7 +42,7 @@ WRONG_SUBDOMAIN_ERROR = "Your Zulip account is not a member of the " + \
"organization associated with this subdomain. " + \
"Please contact %s with any questions!" % (FromAddress.SUPPORT,)
def email_is_not_mit_mailing_list(email: Text) -> None:
def email_is_not_mit_mailing_list(email: str) -> None:
"""Prevent MIT mailing lists from signing up for Zulip"""
if "@mit.edu" in email:
username = email.rsplit("@", 1)[0]
@@ -99,7 +99,7 @@ class RegistrationForm(forms.Form):
max_length=Realm.MAX_REALM_NAME_LENGTH,
required=self.realm_creation)
def clean_full_name(self) -> Text:
def clean_full_name(self) -> str:
try:
return check_full_name(self.cleaned_data['full_name'])
except JsonableError as e:
@@ -164,7 +164,7 @@ class HomepageForm(forms.Form):
return email
def email_is_not_disposable(email: Text) -> None:
def email_is_not_disposable(email: str) -> None:
if is_disposable_domain(email_to_domain(email)):
raise ValidationError(_("Please use your real email address."))
@@ -182,13 +182,13 @@ class LoggingSetPasswordForm(SetPasswordForm):
class ZulipPasswordResetForm(PasswordResetForm):
def save(self,
domain_override: Optional[bool]=None,
subject_template_name: Text='registration/password_reset_subject.txt',
email_template_name: Text='registration/password_reset_email.html',
subject_template_name: str='registration/password_reset_subject.txt',
email_template_name: str='registration/password_reset_email.html',
use_https: bool=False,
token_generator: PasswordResetTokenGenerator=default_token_generator,
from_email: Optional[Text]=None,
from_email: Optional[str]=None,
request: HttpRequest=None,
html_email_template_name: Optional[Text]=None,
html_email_template_name: Optional[str]=None,
extra_email_context: Optional[Dict[str, Any]]=None
) -> None:
"""
@@ -286,7 +286,7 @@ class OurAuthenticationForm(AuthenticationForm):
return self.cleaned_data
def add_prefix(self, field_name: Text) -> Text:
def add_prefix(self, field_name: str) -> str:
"""Disable prefix, since Zulip doesn't use this Django forms feature
(and django-two-factor does use it), and we'd like both to be
happy with this form.
@@ -294,14 +294,14 @@ class OurAuthenticationForm(AuthenticationForm):
return field_name
class MultiEmailField(forms.Field):
def to_python(self, emails: Text) -> List[Text]:
def to_python(self, emails: str) -> List[str]:
"""Normalize data to a list of strings."""
if not emails:
return []
return [email.strip() for email in emails.split(',')]
def validate(self, emails: List[Text]) -> None:
def validate(self, emails: List[str]) -> None:
"""Check if value consists only of valid emails."""
super().validate(emails)
for email in emails:
@@ -311,7 +311,7 @@ class FindMyTeamForm(forms.Form):
emails = MultiEmailField(
help_text=_("Add up to 10 comma-separated email addresses."))
def clean_emails(self) -> List[Text]:
def clean_emails(self) -> List[str]:
emails = self.cleaned_data['emails']
if len(emails) > 10:
raise forms.ValidationError(_("Please enter at most 10 emails."))

View File

@@ -4,7 +4,7 @@ import logging
import time
import traceback
from typing import Any, AnyStr, Callable, Dict, \
Iterable, List, MutableMapping, Optional, Text
Iterable, List, MutableMapping, Optional
from django.conf import settings
from django.contrib.sessions.middleware import SessionMiddleware
@@ -78,7 +78,7 @@ def format_timedelta(timedelta: float) -> str:
return "%.1fs" % (timedelta)
return "%.0fms" % (timedelta_ms(timedelta),)
def is_slow_query(time_delta: float, path: Text) -> bool:
def is_slow_query(time_delta: float, path: str) -> bool:
if time_delta < 1.2:
return False
is_exempt = \
@@ -92,8 +92,8 @@ def is_slow_query(time_delta: float, path: Text) -> bool:
return time_delta >= 10
return True
def write_log_line(log_data: MutableMapping[str, Any], path: Text, method: str, remote_ip: str, email: Text,
client_name: Text, status_code: int=200, error_content: Optional[AnyStr]=None,
def write_log_line(log_data: MutableMapping[str, Any], path: str, method: str, remote_ip: str, email: str,
client_name: str, status_code: int=200, error_content: Optional[AnyStr]=None,
error_content_iter: Optional[Iterable[AnyStr]]=None) -> None:
assert error_content is None or error_content_iter is None
if error_content is not None:
@@ -213,7 +213,7 @@ def write_log_line(log_data: MutableMapping[str, Any], path: Text, method: str,
error_content_list = list(error_content_iter)
if error_content_list:
error_data = u''
elif isinstance(error_content_list[0], Text):
elif isinstance(error_content_list[0], str):
error_data = u''.join(error_content_list)
elif isinstance(error_content_list[0], bytes):
error_data = repr(b''.join(error_content_list))
@@ -299,14 +299,14 @@ class CsrfFailureError(JsonableError):
code = ErrorCode.CSRF_FAILED
data_fields = ['reason']
def __init__(self, reason: Text) -> None:
self.reason = reason # type: Text
def __init__(self, reason: str) -> None:
self.reason = reason # type: str
@staticmethod
def msg_format() -> Text:
def msg_format() -> str:
return _("CSRF Error: {reason}")
def csrf_failure(request: HttpRequest, reason: Text="") -> HttpResponse:
def csrf_failure(request: HttpRequest, reason: str="") -> HttpResponse:
if request.error_format == "JSON":
return json_response_from_error(CsrfFailureError(reason))
else: