mypy: Convert zerver/views to use typing.Text.

This commit is contained in:
Juan Verhook
2016-12-24 15:44:26 -08:00
committed by Tim Abbott
parent 259e2f0ab4
commit 535ce90272
19 changed files with 89 additions and 86 deletions

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from typing import Any, List, Dict, Optional
from typing import Any, List, Dict, Optional, Text
from django.utils import translation
from django.utils.translation import ugettext as _
@@ -58,7 +58,6 @@ import calendar
import datetime
import simplejson
import re
from six import text_type
from six.moves import urllib, zip_longest, zip, range
import time
import logging
@@ -66,7 +65,7 @@ import logging
from zproject.jinja2 import render_to_response
def redirect_and_log_into_subdomain(realm, full_name, email_address):
# type: (Realm, text_type, text_type) -> HttpResponse
# type: (Realm, Text, Text) -> HttpResponse
subdomain_login_uri = ''.join([
realm.uri,
reverse('zerver.views.auth.log_into_subdomain')
@@ -315,7 +314,7 @@ def create_homepage_form(request, user_info=None):
return HomepageForm(string_id = string_id)
def create_preregistration_user(email, request, realm_creation=False):
# type: (text_type, HttpRequest, bool) -> HttpResponse
# type: (Text, HttpRequest, bool) -> HttpResponse
domain = request.session.get("domain")
if completely_open(domain):
# Clear the "domain" from the session object; it's no longer needed
@@ -367,7 +366,7 @@ not be the member of any current realm. The realm is created with domain same as
When there is no unique_open_realm user registrations are made by visiting /register/domain_of_the_realm.
"""
def create_realm(request, creation_key=None):
# type: (HttpRequest, Optional[text_type]) -> HttpResponse
# type: (HttpRequest, Optional[Text]) -> HttpResponse
if not settings.OPEN_REALM_CREATION:
if creation_key is None:
return render_to_response("zerver/realm_creation_failed.html",
@@ -481,7 +480,7 @@ def home_real(request):
int(settings.TOS_VERSION.split('.')[0]) > user_profile.major_tos_version():
return accounts_accept_terms(request)
narrow = [] # type: List[List[text_type]]
narrow = [] # type: List[List[Text]]
narrow_stream = None
narrow_topic = request.GET.get("topic")
if request.GET.get("stream"):
@@ -709,7 +708,7 @@ def is_buggy_ua(agent):
@has_request_variables
def json_set_muted_topics(request, user_profile,
muted_topics=REQ(validator=check_list(check_list(check_string, length=2)), default=[])):
# type: (HttpRequest, UserProfile, List[List[text_type]]) -> HttpResponse
# type: (HttpRequest, UserProfile, List[List[Text]]) -> HttpResponse
do_set_muted_topics(user_profile, muted_topics)
return json_success()

View File

@@ -12,7 +12,7 @@ from zerver.lib.validator import check_list, check_string
from zerver.lib.actions import do_add_alert_words, do_remove_alert_words, do_set_alert_words
from zerver.lib.alert_words import user_alert_words
from six import text_type
from typing import Text
def list_alert_words(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
@@ -21,7 +21,7 @@ def list_alert_words(request, user_profile):
@has_request_variables
def set_alert_words(request, user_profile,
alert_words=REQ(validator=check_list(check_string), default=[])):
# type: (HttpRequest, UserProfile, List[text_type]) -> HttpResponse
# type: (HttpRequest, UserProfile, List[Text]) -> HttpResponse
do_set_alert_words(user_profile, alert_words)
return json_success()

View File

@@ -12,7 +12,7 @@ from django.shortcuts import redirect
from django.views.decorators.csrf import csrf_exempt
from django.utils.translation import ugettext as _
from django.core import signing
from six import text_type
from typing import Text
from six.moves import urllib
from typing import Any, Dict, Optional, Tuple, Text
@@ -36,7 +36,7 @@ import time
import ujson
def maybe_send_to_registration(request, email, full_name=''):
# type: (HttpRequest, text_type, text_type) -> HttpResponse
# type: (HttpRequest, Text, Text) -> HttpResponse
form = create_homepage_form(request, user_info={'email': email})
request.verified_email = None
if form.is_valid():
@@ -75,7 +75,7 @@ def redirect_to_subdomain_login_url():
def login_or_register_remote_user(request, remote_username, user_profile, full_name='',
invalid_subdomain=False):
# type: (HttpRequest, text_type, UserProfile, text_type, Optional[bool]) -> HttpResponse
# type: (HttpRequest, Text, UserProfile, Text, Optional[bool]) -> HttpResponse
if invalid_subdomain:
# Show login page with an error message
return redirect_to_subdomain_login_url()

View File

@@ -1,7 +1,7 @@
from __future__ import absolute_import
from django.http import HttpRequest, HttpResponse
from six import text_type
from typing import Text
from typing import Iterable, Optional, Sequence
from zerver.lib.actions import do_events_register
@@ -18,7 +18,7 @@ def _default_all_public_streams(user_profile, all_public_streams):
return user_profile.default_all_public_streams
def _default_narrow(user_profile, narrow):
# type: (UserProfile, Iterable[Sequence[text_type]]) -> Iterable[Sequence[text_type]]
# type: (UserProfile, Iterable[Sequence[Text]]) -> Iterable[Sequence[Text]]
default_stream = user_profile.default_events_register_stream
if not narrow and user_profile.default_events_register_stream is not None:
narrow = [['stream', default_stream.name]]
@@ -40,7 +40,7 @@ def events_register_backend(request, user_profile, apply_markdown=True,
event_types=REQ(validator=check_list(check_string), default=None),
narrow=REQ(validator=check_list(check_list(check_string, length=2)), default=[]),
queue_lifespan_secs=REQ(converter=int, default=0)):
# type: (HttpRequest, UserProfile, bool, Optional[bool], Optional[Iterable[str]], Iterable[Sequence[text_type]], int) -> HttpResponse
# type: (HttpRequest, UserProfile, bool, Optional[bool], Optional[Iterable[str]], Iterable[Sequence[Text]], int) -> HttpResponse
all_public_streams = _default_all_public_streams(user_profile, all_public_streams)
narrow = _default_narrow(user_profile, narrow)

View File

@@ -4,7 +4,7 @@ from django.conf import settings
from django.core.exceptions import ValidationError
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _
from six import text_type
from typing import Text
from typing import Set
from zerver.decorator import authenticated_json_post_view

View File

@@ -8,7 +8,7 @@ from django.core.exceptions import ValidationError
from django.db import connection
from django.db.models import Q
from django.http import HttpRequest, HttpResponse
from six import text_type
from typing import Text
from typing import Any, AnyStr, Callable, Iterable, Optional, Tuple, Union
from zerver.lib.str_utils import force_bytes, force_text
@@ -138,7 +138,7 @@ class NarrowBuilder(object):
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
def _pg_re_escape(self, pattern):
# type: (text_type) -> text_type
# type: (Text) -> Text
"""
Escape user input to place in a regex
@@ -312,7 +312,7 @@ class NarrowBuilder(object):
# unicode characters, not in bytes, so we do our processing with text,
# not bytes.
def highlight_string_text_offsets(text, locs):
# type: (AnyStr, Iterable[Tuple[int, int]]) -> text_type
# type: (AnyStr, Iterable[Tuple[int, int]]) -> Text
string = force_text(text)
highlight_start = u'<span class="highlight">'
highlight_stop = u'</span>'
@@ -329,7 +329,7 @@ def highlight_string_text_offsets(text, locs):
return result
def highlight_string_bytes_offsets(text, locs):
# type: (AnyStr, Iterable[Tuple[int, int]]) -> text_type
# type: (AnyStr, Iterable[Tuple[int, int]]) -> Text
string = force_bytes(text)
highlight_start = b'<span class="highlight">'
highlight_stop = b'</span>'
@@ -346,14 +346,14 @@ def highlight_string_bytes_offsets(text, locs):
return force_text(result)
def highlight_string(text, locs):
# type: (AnyStr, Iterable[Tuple[int, int]]) -> text_type
# type: (AnyStr, Iterable[Tuple[int, int]]) -> Text
if settings.USING_PGROONGA:
return highlight_string_bytes_offsets(text, locs)
else:
return highlight_string_text_offsets(text, locs)
def get_search_fields(rendered_content, subject, content_matches, subject_matches):
# type: (text_type, text_type, Iterable[Tuple[int, int]], Iterable[Tuple[int, int]]) -> Dict[str, text_type]
# type: (Text, Text, Iterable[Tuple[int, int]], Iterable[Tuple[int, int]]) -> Dict[str, Text]
return dict(match_content=highlight_string(rendered_content, content_matches),
match_subject=highlight_string(escape_html(subject), subject_matches))
@@ -374,7 +374,7 @@ def narrow_parameter(json):
# We have to support a legacy tuple format.
if isinstance(elem, list):
if (len(elem) != 2
or any(not isinstance(x, str) and not isinstance(x, six.text_type)
or any(not isinstance(x, str) and not isinstance(x, Text)
for x in elem)):
raise ValueError("element is not a string pair")
return dict(operator=elem[0], operand=elem[1])
@@ -401,7 +401,7 @@ def narrow_parameter(json):
return list(map(convert_term, data))
def is_public_stream(stream_name, realm):
# type: (text_type, Realm) -> bool
# type: (Text, Realm) -> bool
"""
Determine whether a stream is public, so that
our caller can decide whether we can get
@@ -447,7 +447,7 @@ def ok_to_include_history(narrow, realm):
return include_history
def get_stream_name_from_narrow(narrow):
# type: (Iterable[Dict[str, Any]]) -> Optional[text_type]
# type: (Iterable[Dict[str, Any]]) -> Optional[Text]
for term in narrow:
if term['operator'] == 'stream':
return term['operand'].lower()
@@ -612,7 +612,7 @@ def get_old_messages_backend(request, user_profile,
# rendered message dict before returning it. We attempt to
# bulk-fetch rendered message dicts from remote cache using the
# 'messages' list.
search_fields = dict() # type: Dict[int, Dict[str, text_type]]
search_fields = dict() # type: Dict[int, Dict[str, Text]]
message_ids = [] # type: List[int]
user_message_flags = {} # type: Dict[int, List[str]]
if include_history:
@@ -674,7 +674,7 @@ def update_message_flags(request, user_profile,
all=REQ(validator=check_bool, default=False),
stream_name=REQ(default=None),
topic_name=REQ(default=None)):
# type: (HttpRequest, UserProfile, List[int], text_type, text_type, bool, Optional[text_type], Optional[text_type]) -> HttpResponse
# type: (HttpRequest, UserProfile, List[int], Text, Text, bool, Optional[Text], Optional[Text]) -> HttpResponse
if all:
target_count_str = "all"
else:
@@ -707,7 +707,7 @@ def update_message_flags(request, user_profile,
'msg': ''})
def create_mirrored_message_users(request, user_profile, recipients):
# type: (HttpResponse, UserProfile, Iterable[text_type]) -> Tuple[bool, UserProfile]
# type: (HttpResponse, UserProfile, Iterable[Text]) -> Tuple[bool, UserProfile]
if "sender" not in request.POST:
return (False, None)
@@ -743,7 +743,7 @@ def create_mirrored_message_users(request, user_profile, recipients):
return (True, sender)
def same_realm_zephyr_user(user_profile, email):
# type: (UserProfile, text_type) -> bool
# type: (UserProfile, Text) -> bool
#
# Are the sender and recipient both addresses in the same Zephyr
# mirroring realm? We have to handle this specially, inferring
@@ -761,7 +761,7 @@ def same_realm_zephyr_user(user_profile, email):
RealmAlias.objects.filter(realm=user_profile.realm, domain=domain).exists()
def same_realm_irc_user(user_profile, email):
# type: (UserProfile, text_type) -> bool
# type: (UserProfile, Text) -> bool
# Check whether the target email address is an IRC user in the
# same realm as user_profile, i.e. if the domain were example.com,
# the IRC user would need to be username@irc.example.com
@@ -775,7 +775,7 @@ def same_realm_irc_user(user_profile, email):
return RealmAlias.objects.filter(realm=user_profile.realm, domain=domain).exists()
def same_realm_jabber_user(user_profile, email):
# type: (UserProfile, text_type) -> bool
# type: (UserProfile, Text) -> bool
try:
validators.validate_email(email)
except ValidationError:
@@ -801,7 +801,7 @@ def send_message_backend(request, user_profile,
domain = REQ('domain', default=None),
local_id = REQ(default=None),
queue_id = REQ(default=None)):
# type: (HttpRequest, UserProfile, text_type, List[text_type], bool, Optional[text_type], text_type, Optional[text_type], Optional[text_type], Optional[text_type]) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, List[Text], bool, Optional[Text], Text, Optional[Text], Optional[Text], Optional[Text]) -> HttpResponse
client = request.client
is_super_user = request.user.is_api_super_user
if forged and not is_super_user:
@@ -868,7 +868,7 @@ def update_message_backend(request, user_profile,
subject=REQ(default=None),
propagate_mode=REQ(default="change_one"),
content=REQ(default=None)):
# type: (HttpRequest, UserProfile, int, Optional[text_type], Optional[str], Optional[text_type]) -> HttpResponse
# type: (HttpRequest, UserProfile, int, Optional[Text], Optional[str], Optional[Text]) -> HttpResponse
if not user_profile.realm.allow_message_editing:
return json_error(_("Your organization has turned off message editing."))
@@ -907,7 +907,7 @@ def update_message_backend(request, user_profile,
if subject == "":
raise JsonableError(_("Topic can't be empty"))
rendered_content = None
links_for_embed = set() # type: Set[text_type]
links_for_embed = set() # type: Set[Text]
if content is not None:
content = content.strip()
if content == "":
@@ -946,7 +946,7 @@ def json_fetch_raw_message(request, user_profile,
@has_request_variables
def render_message_backend(request, user_profile, content=REQ()):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
message = Message()
message.sender = user_profile
message.content = content

View File

@@ -2,7 +2,7 @@ from __future__ import absolute_import
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _
from six import text_type
from typing import Text
from zerver.decorator import to_non_negative_int
from zerver.lib.actions import do_update_pointer

View File

@@ -1,6 +1,8 @@
from __future__ import absolute_import
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _
from six import text_type
from typing import Text
from zerver.decorator import authenticated_json_post_view,\
has_request_variables, REQ, to_non_negative_int
@@ -13,7 +15,7 @@ from zerver.models import Reaction, UserProfile
@has_request_variables
def add_reaction_backend(request, user_profile, message_id, emoji_name):
# type: (HttpRequest, UserProfile, int, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, int, Text) -> HttpResponse
# access_message will throw a JsonableError exception if the user
# cannot see the message (e.g. for messages to private streams).
@@ -36,7 +38,7 @@ def add_reaction_backend(request, user_profile, message_id, emoji_name):
@has_request_variables
def remove_reaction_backend(request, user_profile, message_id, emoji_name):
# type: (HttpRequest, UserProfile, int, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, int, Text) -> HttpResponse
# access_message will throw a JsonableError exception if the user
# cannot see the message (e.g. for messages to private streams).

View File

@@ -1,12 +1,13 @@
from __future__ import absolute_import
from django.http import HttpRequest, HttpResponse
from django.core.exceptions import ValidationError
from typing import Text
from zerver.models import UserProfile
from zerver.lib.response import json_success, json_error
from zerver.lib.actions import check_add_realm_emoji, do_remove_realm_emoji
from six import text_type
def list_emoji(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
@@ -23,6 +24,6 @@ def upload_emoji(request, user_profile):
return json_success()
def delete_emoji(request, user_profile, emoji_name):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
do_remove_realm_emoji(user_profile.realm, emoji_name)
return json_success()

View File

@@ -1,6 +1,6 @@
from __future__ import absolute_import
from six import text_type
from typing import Text
from django.core.exceptions import ValidationError
from django.http import HttpRequest, HttpResponse
from django.views.decorators.csrf import csrf_exempt
@@ -25,7 +25,7 @@ def list_filters(request, user_profile):
@has_request_variables
def create_filter(request, user_profile, pattern=REQ(),
url_format_string=REQ()):
# type: (HttpRequest, UserProfile, text_type, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, Text) -> HttpResponse
try:
filter_id = do_add_realm_filter(
realm=user_profile.realm,

View File

@@ -13,7 +13,7 @@ from zerver.lib.utils import statsd, statsd_key
from zerver.lib.validator import check_bool, check_dict
from zerver.models import UserProfile
from six import text_type
from typing import Text
import subprocess
import os
@@ -79,7 +79,7 @@ def json_report_error(request, user_profile, message=REQ(), stacktrace=REQ(),
ui_message=REQ(validator=check_bool), user_agent=REQ(),
href=REQ(), log=REQ(),
more_info=REQ(validator=check_dict([]), default=None)):
# type: (HttpRequest, UserProfile, text_type, text_type, bool, text_type, text_type, text_type, Dict[str, Any]) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, Text, bool, Text, Text, Text, Dict[str, Any]) -> HttpResponse
if not settings.ERROR_REPORTING:
return json_success()

View File

@@ -29,7 +29,7 @@ import ujson
from six.moves import urllib
import six
from six import text_type
from typing import Text
def is_active_subscriber(user_profile, recipient):
# type: (UserProfile, Recipient) -> bool
@@ -104,17 +104,17 @@ def list_to_streams(streams_raw, user_profile, autocreate=False):
class PrincipalError(JsonableError):
def __init__(self, principal, status_code=403):
# type: (text_type, int) -> None
self.principal = principal # type: text_type
# type: (Text, int) -> None
self.principal = principal # type: Text
self.status_code = status_code # type: int
def to_json_error_msg(self):
# type: () -> text_type
# type: () -> Text
return ("User not authorized to execute queries on behalf of '%s'"
% (self.principal,))
def principal_to_user_profile(agent, principal):
# type: (UserProfile, text_type) -> UserProfile
# type: (UserProfile, Text) -> UserProfile
principal_doesnt_exist = False
try:
principal_user_profile = get_user_profile_by_email(principal)
@@ -133,7 +133,7 @@ def principal_to_user_profile(agent, principal):
@require_realm_admin
def deactivate_stream_backend(request, user_profile, stream_name):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
target = get_stream(stream_name, user_profile.realm)
if not target:
return json_error(_('No such stream name'))
@@ -147,14 +147,14 @@ def deactivate_stream_backend(request, user_profile, stream_name):
@require_realm_admin
@has_request_variables
def add_default_stream(request, user_profile, stream_name=REQ()):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
do_add_default_stream(user_profile.realm, stream_name)
return json_success()
@require_realm_admin
@has_request_variables
def remove_default_stream(request, user_profile, stream_name=REQ()):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
do_remove_default_stream(user_profile.realm, stream_name)
return json_success()
@@ -164,7 +164,7 @@ def update_stream_backend(request, user_profile, stream_name,
description=REQ(validator=check_string, default=None),
is_private=REQ(validator=check_bool, default=None),
new_name=REQ(validator=check_string, default=None)):
# type: (HttpRequest, UserProfile, text_type, Optional[text_type], Optional[bool], Optional[text_type]) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, Optional[Text], Optional[bool], Optional[Text]) -> HttpResponse
if description is not None:
do_change_stream_description(user_profile.realm, stream_name, description)
if stream_name is not None and new_name is not None:
@@ -186,7 +186,7 @@ FuncKwargPair = Tuple[Callable[..., HttpResponse], Dict[str, Iterable[Any]]]
def update_subscriptions_backend(request, user_profile,
delete=REQ(validator=check_list(check_string), default=[]),
add=REQ(validator=check_list(check_dict([('name', check_string)])), default=[])):
# type: (HttpRequest, UserProfile, Iterable[text_type], Iterable[Mapping[str, Any]]) -> HttpResponse
# type: (HttpRequest, UserProfile, Iterable[Text], Iterable[Mapping[str, Any]]) -> HttpResponse
if not add and not delete:
return json_error(_('Nothing to do. Specify at least one of "add" or "delete".'))
@@ -226,7 +226,7 @@ def json_remove_subscriptions(request, user_profile):
def remove_subscriptions_backend(request, user_profile,
streams_raw = REQ("subscriptions", validator=check_list(check_string)),
principals = REQ(validator=check_list(check_string), default=None)):
# type: (HttpRequest, UserProfile, Iterable[text_type], Optional[Iterable[text_type]]) -> HttpResponse
# type: (HttpRequest, UserProfile, Iterable[Text], Optional[Iterable[Text]]) -> HttpResponse
removing_someone_else = principals and \
set(principals) != set((user_profile.email,))
@@ -254,7 +254,7 @@ def remove_subscriptions_backend(request, user_profile,
else:
people_to_unsub = set([user_profile])
result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[text_type]]
result = dict(removed=[], not_subscribed=[]) # type: Dict[str, List[Text]]
(removed, not_subscribed) = bulk_remove_subscriptions(people_to_unsub, streams)
for (subscriber, stream) in removed:
@@ -297,7 +297,7 @@ def add_subscriptions_backend(request, user_profile,
announce = REQ(validator=check_bool, default=False),
principals = REQ(validator=check_list(check_string), default=None),
authorization_errors_fatal = REQ(validator=check_bool, default=True)):
# type: (HttpRequest, UserProfile, Iterable[Mapping[str, text_type]], bool, bool, Optional[List[text_type]], bool) -> HttpResponse
# type: (HttpRequest, UserProfile, Iterable[Mapping[str, Text]], bool, bool, Optional[List[Text]], bool) -> HttpResponse
stream_dicts = []
for stream_dict in streams_raw:
stream_dict_copy = {} # type: Dict[str, Any]
@@ -407,7 +407,7 @@ def add_subscriptions_backend(request, user_profile,
@has_request_variables
def get_subscribers_backend(request, user_profile, stream_name=REQ('stream')):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
stream = get_stream(stream_name, user_profile.realm)
if stream is None:
raise JsonableError(_("Stream does not exist: %s") % (stream_name,))
@@ -467,11 +467,11 @@ def get_topics_backend(request, user_profile,
@has_request_variables
def json_stream_exists(request, user_profile, stream=REQ(),
autosubscribe=REQ(default=False)):
# type: (HttpRequest, UserProfile, text_type, bool) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, bool) -> HttpResponse
return stream_exists_backend(request, user_profile, stream, autosubscribe)
def stream_exists_backend(request, user_profile, stream_name, autosubscribe):
# type: (HttpRequest, UserProfile, text_type, bool) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, bool) -> HttpResponse
if not valid_stream_name(stream_name):
return json_error(_("Invalid characters in stream name"))
stream = get_stream(stream_name, user_profile.realm)
@@ -488,7 +488,7 @@ def stream_exists_backend(request, user_profile, stream_name, autosubscribe):
return json_response(data=result, status=404)
def get_subscription_or_die(stream_name, user_profile):
# type: (text_type, UserProfile) -> Subscription
# type: (Text, UserProfile) -> Subscription
stream = get_stream(stream_name, user_profile.realm)
if not stream:
raise JsonableError(_("Invalid stream %s") % (stream_name,))

View File

@@ -1,5 +1,7 @@
from __future__ import absolute_import
from django.http import HttpRequest, HttpResponse
from six import text_type
from typing import Text
from zerver.decorator import authenticated_json_post_view,\
has_request_variables, REQ, JsonableError
@@ -11,6 +13,6 @@ from zerver.models import UserProfile
@has_request_variables
def send_notification_backend(request, user_profile, operator=REQ('op'),
notification_to = REQ('to', converter=extract_recipients, default=[])):
# type: (HttpRequest, UserProfile, text_type, List[text_type]) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, List[Text]) -> HttpResponse
check_send_typing_notification(user_profile, notification_to, operator)
return json_success()

View File

@@ -1,6 +1,6 @@
from __future__ import absolute_import
from typing import Optional, Any
from six import text_type
from typing import Text
from django.utils.translation import ugettext as _
from django.conf import settings
@@ -54,7 +54,7 @@ def json_change_settings(request, user_profile,
old_password=REQ(default=""),
new_password=REQ(default=""),
confirm_password=REQ(default="")):
# type: (HttpRequest, UserProfile, text_type, text_type, text_type, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, Text, Text, Text) -> HttpResponse
if not (full_name or new_password):
return json_error(_("No new data supplied"))

View File

@@ -21,11 +21,11 @@ from zerver.lib.utils import generate_random_token
from zerver.models import UserProfile, Stream, Realm, Message, get_user_profile_by_email, \
get_stream, email_allowed_for_realm
from six import text_type
from typing import Text
from typing import Optional, Dict, Any
def deactivate_user_backend(request, user_profile, email):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
try:
target = get_user_profile_by_email(email)
except UserProfile.DoesNotExist:
@@ -50,7 +50,7 @@ def check_last_admin(user_profile):
return user_profile.is_realm_admin and len(admins) == 1
def deactivate_bot_backend(request, user_profile, email):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
try:
target = get_user_profile_by_email(email)
except UserProfile.DoesNotExist:
@@ -68,7 +68,7 @@ def _deactivate_user_profile_backend(request, user_profile, target):
return json_success()
def reactivate_user_backend(request, user_profile, email):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
try:
target = get_user_profile_by_email(email)
except UserProfile.DoesNotExist:
@@ -84,7 +84,7 @@ def reactivate_user_backend(request, user_profile, email):
def update_user_backend(request, user_profile, email,
full_name=REQ(default="", validator=check_string),
is_admin=REQ(default=None, validator=check_bool)):
# type: (HttpRequest, UserProfile, text_type, Optional[text_type], Optional[bool]) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, Optional[Text], Optional[bool]) -> HttpResponse
try:
target = get_user_profile_by_email(email)
except UserProfile.DoesNotExist:
@@ -127,7 +127,7 @@ def avatar(request, email):
return redirect(url)
def get_stream_name(stream):
# type: (Stream) -> Optional[text_type]
# type: (Stream) -> Optional[Text]
if stream:
name = stream.name
else:
@@ -135,7 +135,7 @@ def get_stream_name(stream):
return name
def stream_or_none(stream_name, realm):
# type: (text_type, Realm) -> Optional[Stream]
# type: (Text, Realm) -> Optional[Stream]
if stream_name == '':
return None
else:
@@ -150,7 +150,7 @@ def patch_bot_backend(request, user_profile, email,
default_sending_stream=REQ(default=None),
default_events_register_stream=REQ(default=None),
default_all_public_streams=REQ(default=None, validator=check_bool)):
# type: (HttpRequest, UserProfile, text_type, Optional[text_type], Optional[text_type], Optional[text_type], Optional[bool]) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, Optional[Text], Optional[Text], Optional[Text], Optional[bool]) -> HttpResponse
try:
bot = get_user_profile_by_email(email)
except:
@@ -191,7 +191,7 @@ def patch_bot_backend(request, user_profile, email,
@has_request_variables
def regenerate_bot_api_key(request, user_profile, email):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
try:
bot = get_user_profile_by_email(email)
except:
@@ -211,7 +211,7 @@ def add_bot_backend(request, user_profile, full_name=REQ(), short_name=REQ(),
default_sending_stream_name=REQ('default_sending_stream', default=None),
default_events_register_stream_name=REQ('default_events_register_stream', default=None),
default_all_public_streams=REQ(validator=check_bool, default=None)):
# type: (HttpRequest, UserProfile, text_type, text_type, Optional[text_type], Optional[text_type], Optional[bool]) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, Text, Optional[Text], Optional[Text], Optional[bool]) -> HttpResponse
short_name += "-bot"
email = short_name + "@" + user_profile.realm.domain
form = CreateUserForm({'full_name': full_name, 'email': email})
@@ -318,7 +318,7 @@ def get_members_backend(request, user_profile):
@has_request_variables
def create_user_backend(request, user_profile, email=REQ(), password=REQ(),
full_name=REQ(), short_name=REQ()):
# type: (HttpRequest, UserProfile, text_type, text_type, text_type, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text, Text, Text, Text) -> HttpResponse
form = CreateUserForm({'full_name': full_name, 'email': email})
if not form.is_valid():
return json_error(_('Bad name or username'))
@@ -341,7 +341,7 @@ def create_user_backend(request, user_profile, email=REQ(), password=REQ(),
return json_success()
def generate_client_id():
# type: () -> text_type
# type: () -> Text
return generate_random_token(32)
def get_profile_backend(request, user_profile):

View File

@@ -10,7 +10,7 @@ from zerver.lib.response import json_success, json_error
from zerver.decorator import REQ, has_request_variables, api_key_only_webhook_view
from zerver.models import UserProfile, Client
from typing import Text, Dict, Any
from typing import Dict, Any, Text
@api_key_only_webhook_view("AppFollow")
@has_request_variables

View File

@@ -7,7 +7,7 @@ from zerver.decorator import REQ, has_request_variables, api_key_only_webhook_vi
from zerver.models import Client, UserProfile
from django.http import HttpRequest, HttpResponse
from six import text_type
from typing import Text
from typing import Dict, Any, Optional
BODY_TEMPLATE = '[{website_name}]({website_url}) has {user_num} visitors online.'
@@ -18,7 +18,7 @@ def api_gosquared_webhook(request, user_profile, client,
payload=REQ(argument_type='body'),
stream=REQ(default='gosquared'),
topic=REQ(default=None)):
# type: (HttpRequest, UserProfile, Client, Dict[str, Dict[str, Any]], text_type, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Client, Dict[str, Dict[str, Any]], Text, Text) -> HttpResponse
try:
domain_name = payload['siteDetails']['domain']
user_num = payload['concurrents']

View File

@@ -8,7 +8,7 @@ from zerver.lib.validator import check_dict, check_string
from zerver.models import Client, UserProfile
from django.http import HttpRequest, HttpResponse
from six import text_type
from typing import Text
from typing import Dict, Any, Iterable, Optional
@api_key_only_webhook_view('Papertrail')
@@ -17,7 +17,7 @@ def api_papertrail_webhook(request, user_profile, client,
payload=REQ(argument_type='body'),
stream=REQ(default='papertrail'),
topic=REQ(default='logs')):
# type: (HttpRequest, UserProfile, Client, Dict[str, Any], text_type, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Client, Dict[str, Any], Text, Text) -> HttpResponse
# construct the message of the message
try:

View File

@@ -1,5 +1,5 @@
from __future__ import absolute_import
from typing import Any, List, Dict, Optional, Callable, Tuple, Iterable, Sequence
from typing import Any, List, Dict, Optional, Callable, Tuple, Iterable, Sequence, Text
from django.conf import settings
from django.http import HttpResponse, HttpRequest
@@ -16,13 +16,12 @@ import logging
import subprocess
import ujson
from six import text_type
@authenticated_json_view
@has_request_variables
def webathena_kerberos_login(request, user_profile,
cred=REQ(default=None)):
# type: (HttpRequest, UserProfile, text_type) -> HttpResponse
# type: (HttpRequest, UserProfile, Text) -> HttpResponse
if cred is None:
return json_error(_("Could not find Kerberos credential"))
if not user_profile.realm.webathena_enabled: