mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 14:03:30 +00:00 
			
		
		
		
	zerver/views: Use python 3 syntax for typing.
This commit is contained in:
		@@ -11,12 +11,10 @@ from zerver.lib.validator import check_list, check_string
 | 
			
		||||
from zerver.lib.actions import do_add_alert_words, do_remove_alert_words
 | 
			
		||||
from zerver.lib.alert_words import user_alert_words
 | 
			
		||||
 | 
			
		||||
def list_alert_words(request, user_profile):
 | 
			
		||||
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
			
		||||
def list_alert_words(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
    return json_success({'alert_words': user_alert_words(user_profile)})
 | 
			
		||||
 | 
			
		||||
def clean_alert_words(alert_words):
 | 
			
		||||
    # type: (List[Text]) -> List[Text]
 | 
			
		||||
def clean_alert_words(alert_words: List[Text]) -> List[Text]:
 | 
			
		||||
    alert_words = [w.strip() for w in alert_words]
 | 
			
		||||
    return [w for w in alert_words if w != ""]
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -5,8 +5,7 @@ from typing import Any, List, Dict, Optional, Text
 | 
			
		||||
from zerver.lib.response import json_error, json_success
 | 
			
		||||
from zerver.lib.user_agent import parse_user_agent
 | 
			
		||||
 | 
			
		||||
def check_compatibility(request):
 | 
			
		||||
    # type: (HttpRequest) -> HttpResponse
 | 
			
		||||
def check_compatibility(request: HttpRequest) -> HttpResponse:
 | 
			
		||||
    user_agent = parse_user_agent(request.META["HTTP_USER_AGENT"])
 | 
			
		||||
    if user_agent is None or user_agent['name'] == "ZulipInvalid":
 | 
			
		||||
        return json_error("Client is too old")
 | 
			
		||||
 
 | 
			
		||||
@@ -19,8 +19,7 @@ from zerver.lib.validator import check_dict, check_list, check_int
 | 
			
		||||
from zerver.models import (custom_profile_fields_for_realm, UserProfile,
 | 
			
		||||
                           CustomProfileField, custom_profile_fields_for_realm)
 | 
			
		||||
 | 
			
		||||
def list_realm_custom_profile_fields(request, user_profile):
 | 
			
		||||
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
			
		||||
def list_realm_custom_profile_fields(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
    fields = custom_profile_fields_for_realm(user_profile.realm_id)
 | 
			
		||||
    return json_success({'custom_fields': [f.as_dict() for f in fields]})
 | 
			
		||||
 | 
			
		||||
@@ -46,8 +45,8 @@ def create_realm_custom_profile_field(request, user_profile, name=REQ(),
 | 
			
		||||
        return json_error(_("A field with that name already exists."))
 | 
			
		||||
 | 
			
		||||
@require_realm_admin
 | 
			
		||||
def delete_realm_custom_profile_field(request, user_profile, field_id):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, int) -> HttpResponse
 | 
			
		||||
def delete_realm_custom_profile_field(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                                      field_id: int) -> HttpResponse:
 | 
			
		||||
    try:
 | 
			
		||||
        field = CustomProfileField.objects.get(id=field_id)
 | 
			
		||||
    except CustomProfileField.DoesNotExist:
 | 
			
		||||
@@ -59,9 +58,8 @@ def delete_realm_custom_profile_field(request, user_profile, field_id):
 | 
			
		||||
 | 
			
		||||
@require_realm_admin
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def update_realm_custom_profile_field(request, user_profile, field_id,
 | 
			
		||||
                                      name=REQ()):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, int, Text) -> HttpResponse
 | 
			
		||||
def update_realm_custom_profile_field(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                                      field_id: int, name: Text=REQ()) -> HttpResponse:
 | 
			
		||||
    if not name.strip():
 | 
			
		||||
        return json_error(_("Name cannot be blank."))
 | 
			
		||||
 | 
			
		||||
@@ -80,10 +78,10 @@ def update_realm_custom_profile_field(request, user_profile, field_id,
 | 
			
		||||
@human_users_only
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def update_user_custom_profile_data(
 | 
			
		||||
        request,
 | 
			
		||||
        user_profile,
 | 
			
		||||
        data=REQ(validator=check_list(check_dict([('id', check_int)])))):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, List[Dict[str, Union[int, Text]]]) -> HttpResponse
 | 
			
		||||
        request: HttpRequest,
 | 
			
		||||
        user_profile: UserProfile,
 | 
			
		||||
        data: List[Dict[str, Union[int, Text]]]=REQ(validator=check_list(
 | 
			
		||||
            check_dict([('id', check_int)])))) -> HttpResponse:
 | 
			
		||||
    for item in data:
 | 
			
		||||
        field_id = item['id']
 | 
			
		||||
        try:
 | 
			
		||||
 
 | 
			
		||||
@@ -8,15 +8,15 @@ from zerver.lib.response import json_success
 | 
			
		||||
from zerver.lib.validator import check_string, check_list, check_bool
 | 
			
		||||
from zerver.models import Stream, UserProfile
 | 
			
		||||
 | 
			
		||||
def _default_all_public_streams(user_profile, all_public_streams):
 | 
			
		||||
    # type: (UserProfile, Optional[bool]) -> bool
 | 
			
		||||
def _default_all_public_streams(user_profile: UserProfile,
 | 
			
		||||
                                all_public_streams: Optional[bool]) -> bool:
 | 
			
		||||
    if all_public_streams is not None:
 | 
			
		||||
        return all_public_streams
 | 
			
		||||
    else:
 | 
			
		||||
        return user_profile.default_all_public_streams
 | 
			
		||||
 | 
			
		||||
def _default_narrow(user_profile, narrow):
 | 
			
		||||
    # type: (UserProfile, Iterable[Sequence[Text]]) -> Iterable[Sequence[Text]]
 | 
			
		||||
def _default_narrow(user_profile: UserProfile,
 | 
			
		||||
                    narrow: Iterable[Sequence[Text]]) -> Iterable[Sequence[Text]]:
 | 
			
		||||
    default_stream = user_profile.default_events_register_stream  # type: Optional[Stream]
 | 
			
		||||
    if not narrow and default_stream is not None:
 | 
			
		||||
        narrow = [['stream', default_stream.name]]
 | 
			
		||||
 
 | 
			
		||||
@@ -10,8 +10,8 @@ from zerver.models import UserProfile
 | 
			
		||||
 | 
			
		||||
@human_users_only
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def mark_hotspot_as_read(request, user, hotspot=REQ(validator=check_string)):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, str) -> HttpResponse
 | 
			
		||||
def mark_hotspot_as_read(request: HttpRequest, user: UserProfile,
 | 
			
		||||
                         hotspot: str=REQ(validator=check_string)) -> HttpResponse:
 | 
			
		||||
    if hotspot not in ALL_HOTSPOTS:
 | 
			
		||||
        return json_error(_('Unknown hotspot: %s') % (hotspot,))
 | 
			
		||||
    do_mark_hotspot_as_read(user, hotspot)
 | 
			
		||||
 
 | 
			
		||||
@@ -14,8 +14,8 @@ from zerver.lib.streams import access_stream_by_name, access_stream_for_unmute_t
 | 
			
		||||
from zerver.lib.validator import check_string, check_list
 | 
			
		||||
from zerver.models import get_stream, Stream, UserProfile
 | 
			
		||||
 | 
			
		||||
def mute_topic(user_profile, stream_name, topic_name):
 | 
			
		||||
    # type: (UserProfile, str, str) -> HttpResponse
 | 
			
		||||
def mute_topic(user_profile: UserProfile, stream_name: str,
 | 
			
		||||
               topic_name: str) -> HttpResponse:
 | 
			
		||||
    (stream, recipient, sub) = access_stream_by_name(user_profile, stream_name)
 | 
			
		||||
 | 
			
		||||
    if topic_is_muted(user_profile, stream.id, topic_name):
 | 
			
		||||
@@ -24,8 +24,8 @@ def mute_topic(user_profile, stream_name, topic_name):
 | 
			
		||||
    do_mute_topic(user_profile, stream, recipient, topic_name)
 | 
			
		||||
    return json_success()
 | 
			
		||||
 | 
			
		||||
def unmute_topic(user_profile, stream_name, topic_name):
 | 
			
		||||
    # type: (UserProfile, str, str) -> HttpResponse
 | 
			
		||||
def unmute_topic(user_profile: UserProfile, stream_name: str,
 | 
			
		||||
                 topic_name: str) -> HttpResponse:
 | 
			
		||||
    error = _("Topic is not there in the muted_topics list")
 | 
			
		||||
    stream = access_stream_for_unmute_topic(user_profile, stream_name, error)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -9,8 +9,7 @@ from zerver.lib.request import has_request_variables, JsonableError, REQ
 | 
			
		||||
from zerver.lib.response import json_success
 | 
			
		||||
from zerver.models import UserProfile, UserMessage
 | 
			
		||||
 | 
			
		||||
def get_pointer_backend(request, user_profile):
 | 
			
		||||
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
			
		||||
def get_pointer_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
    return json_success({'pointer': user_profile.pointer})
 | 
			
		||||
 | 
			
		||||
@has_request_variables
 | 
			
		||||
 
 | 
			
		||||
@@ -17,13 +17,12 @@ from zerver.lib.timestamp import datetime_to_timestamp
 | 
			
		||||
from zerver.lib.validator import check_bool
 | 
			
		||||
from zerver.models import UserActivity, UserPresence, UserProfile, get_user
 | 
			
		||||
 | 
			
		||||
def get_status_list(requesting_user_profile):
 | 
			
		||||
    # type: (UserProfile) -> Dict[str, Any]
 | 
			
		||||
def get_status_list(requesting_user_profile: UserProfile) -> Dict[str, Any]:
 | 
			
		||||
    return {'presences': get_status_dict(requesting_user_profile),
 | 
			
		||||
            'server_timestamp': time.time()}
 | 
			
		||||
 | 
			
		||||
def get_presence_backend(request, user_profile, email):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, Text) -> HttpResponse
 | 
			
		||||
def get_presence_backend(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                         email: Text) -> HttpResponse:
 | 
			
		||||
    try:
 | 
			
		||||
        target = get_user(email, user_profile.realm)
 | 
			
		||||
    except UserProfile.DoesNotExist:
 | 
			
		||||
 
 | 
			
		||||
@@ -16,8 +16,7 @@ from zerver.lib.response import json_success, json_error
 | 
			
		||||
from zerver.lib.validator import check_string, check_list, check_bool
 | 
			
		||||
from zerver.models import PushDeviceToken, UserProfile
 | 
			
		||||
 | 
			
		||||
def validate_token(token_str, kind):
 | 
			
		||||
    # type: (bytes, int) -> None
 | 
			
		||||
def validate_token(token_str: bytes, kind: int) -> None:
 | 
			
		||||
    if token_str == '' or len(token_str) > 4096:
 | 
			
		||||
        raise JsonableError(_('Empty or invalid length token'))
 | 
			
		||||
    if kind == PushDeviceToken.APNS:
 | 
			
		||||
@@ -38,24 +37,24 @@ def add_apns_device_token(request, user_profile, token=REQ(),
 | 
			
		||||
 | 
			
		||||
@human_users_only
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def add_android_reg_id(request, user_profile, token=REQ()):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, bytes) -> HttpResponse
 | 
			
		||||
def add_android_reg_id(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                       token: bytes=REQ()) -> HttpResponse:
 | 
			
		||||
    validate_token(token, PushDeviceToken.GCM)
 | 
			
		||||
    add_push_device_token(user_profile, token, PushDeviceToken.GCM)
 | 
			
		||||
    return json_success()
 | 
			
		||||
 | 
			
		||||
@human_users_only
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def remove_apns_device_token(request, user_profile, token=REQ()):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, bytes) -> HttpResponse
 | 
			
		||||
def remove_apns_device_token(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                             token: bytes=REQ()) -> HttpResponse:
 | 
			
		||||
    validate_token(token, PushDeviceToken.APNS)
 | 
			
		||||
    remove_push_device_token(user_profile, token, PushDeviceToken.APNS)
 | 
			
		||||
    return json_success()
 | 
			
		||||
 | 
			
		||||
@human_users_only
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def remove_android_reg_id(request, user_profile, token=REQ()):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, bytes) -> HttpResponse
 | 
			
		||||
def remove_android_reg_id(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                          token: bytes=REQ()) -> HttpResponse:
 | 
			
		||||
    validate_token(token, PushDeviceToken.GCM)
 | 
			
		||||
    remove_push_device_token(user_profile, token, PushDeviceToken.GCM)
 | 
			
		||||
    return json_success()
 | 
			
		||||
 
 | 
			
		||||
@@ -13,16 +13,15 @@ from zerver.models import RealmDomain, UserProfile, get_realm_domains
 | 
			
		||||
 | 
			
		||||
from typing import Text
 | 
			
		||||
 | 
			
		||||
def list_realm_domains(request, user_profile):
 | 
			
		||||
    # type: (HttpRequest, UserProfile) -> (HttpResponse)
 | 
			
		||||
def list_realm_domains(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
    domains = get_realm_domains(user_profile.realm)
 | 
			
		||||
    return json_success({'domains': domains})
 | 
			
		||||
 | 
			
		||||
@require_realm_admin
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def create_realm_domain(request, user_profile, domain=REQ(validator=check_string),
 | 
			
		||||
                        allow_subdomains=REQ(validator=check_bool)):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, Text, bool) -> (HttpResponse)
 | 
			
		||||
def create_realm_domain(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                        domain: Text=REQ(validator=check_string),
 | 
			
		||||
                        allow_subdomains: bool=REQ(validator=check_bool)) -> HttpResponse:
 | 
			
		||||
    domain = domain.strip().lower()
 | 
			
		||||
    try:
 | 
			
		||||
        validate_domain(domain)
 | 
			
		||||
@@ -35,9 +34,8 @@ def create_realm_domain(request, user_profile, domain=REQ(validator=check_string
 | 
			
		||||
 | 
			
		||||
@require_realm_admin
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def patch_realm_domain(request, user_profile, domain,
 | 
			
		||||
                       allow_subdomains=REQ(validator=check_bool)):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, Text, bool) -> (HttpResponse)
 | 
			
		||||
def patch_realm_domain(request: HttpRequest, user_profile: UserProfile, domain: Text,
 | 
			
		||||
                       allow_subdomains: bool=REQ(validator=check_bool)) -> HttpResponse:
 | 
			
		||||
    try:
 | 
			
		||||
        realm_domain = RealmDomain.objects.get(realm=user_profile.realm, domain=domain)
 | 
			
		||||
        do_change_realm_domain(realm_domain, allow_subdomains)
 | 
			
		||||
@@ -47,8 +45,8 @@ def patch_realm_domain(request, user_profile, domain,
 | 
			
		||||
 | 
			
		||||
@require_realm_admin
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def delete_realm_domain(request, user_profile, domain):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, Text) -> (HttpResponse)
 | 
			
		||||
def delete_realm_domain(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                        domain: Text) -> HttpResponse:
 | 
			
		||||
    try:
 | 
			
		||||
        realm_domain = RealmDomain.objects.get(realm=user_profile.realm, domain=domain)
 | 
			
		||||
        do_remove_realm_domain(realm_domain)
 | 
			
		||||
 
 | 
			
		||||
@@ -14,8 +14,7 @@ from zerver.lib.response import json_success, json_error
 | 
			
		||||
from zerver.lib.actions import check_add_realm_emoji, do_remove_realm_emoji
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def list_emoji(request, user_profile):
 | 
			
		||||
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
			
		||||
def list_emoji(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
 | 
			
		||||
    # We don't call check_emoji_admin here because the list of realm
 | 
			
		||||
    # emoji is public.
 | 
			
		||||
@@ -23,8 +22,8 @@ def list_emoji(request, user_profile):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def upload_emoji(request, user_profile, emoji_name=REQ()):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, Text) -> HttpResponse
 | 
			
		||||
def upload_emoji(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                 emoji_name: Text=REQ()) -> HttpResponse:
 | 
			
		||||
    check_valid_emoji_name(emoji_name)
 | 
			
		||||
    check_emoji_admin(user_profile)
 | 
			
		||||
    if len(request.FILES) != 1:
 | 
			
		||||
@@ -42,8 +41,8 @@ def upload_emoji(request, user_profile, emoji_name=REQ()):
 | 
			
		||||
    return json_success()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def delete_emoji(request, user_profile, emoji_name):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, Text) -> HttpResponse
 | 
			
		||||
def delete_emoji(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                 emoji_name: Text) -> HttpResponse:
 | 
			
		||||
    check_valid_emoji(user_profile.realm, emoji_name)
 | 
			
		||||
    check_emoji_admin(user_profile, emoji_name)
 | 
			
		||||
    do_remove_realm_emoji(user_profile.realm, emoji_name)
 | 
			
		||||
 
 | 
			
		||||
@@ -14,8 +14,7 @@ from zerver.models import realm_filters_for_realm, UserProfile, RealmFilter
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Custom realm filters
 | 
			
		||||
def list_filters(request, user_profile):
 | 
			
		||||
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
			
		||||
def list_filters(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
    filters = realm_filters_for_realm(user_profile.realm_id)
 | 
			
		||||
    return json_success({'filters': filters})
 | 
			
		||||
 | 
			
		||||
@@ -37,8 +36,8 @@ def create_filter(request, user_profile, pattern=REQ(),
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@require_realm_admin
 | 
			
		||||
def delete_filter(request, user_profile, filter_id):
 | 
			
		||||
    # type: (HttpRequest, UserProfile, int) -> HttpResponse
 | 
			
		||||
def delete_filter(request: HttpRequest, user_profile: UserProfile,
 | 
			
		||||
                  filter_id: int) -> HttpResponse:
 | 
			
		||||
    try:
 | 
			
		||||
        do_remove_realm_filter(realm=user_profile.realm, id=filter_id)
 | 
			
		||||
    except RealmFilter.DoesNotExist:
 | 
			
		||||
 
 | 
			
		||||
@@ -12,8 +12,7 @@ from zerver.models import UserProfile
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@require_realm_admin
 | 
			
		||||
def upload_icon(request, user_profile):
 | 
			
		||||
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
			
		||||
def upload_icon(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
 | 
			
		||||
    if len(request.FILES) != 1:
 | 
			
		||||
        return json_error(_("You must upload exactly one icon."))
 | 
			
		||||
@@ -33,8 +32,7 @@ def upload_icon(request, user_profile):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@require_realm_admin
 | 
			
		||||
def delete_icon_backend(request, user_profile):
 | 
			
		||||
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
			
		||||
def delete_icon_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
    # We don't actually delete the icon because it might still
 | 
			
		||||
    # be needed if the URL was cached and it is rewrited
 | 
			
		||||
    # in any case after next update.
 | 
			
		||||
@@ -46,8 +44,7 @@ def delete_icon_backend(request, user_profile):
 | 
			
		||||
    return json_success(json_result)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_icon_backend(request, user_profile):
 | 
			
		||||
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
			
		||||
def get_icon_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
			
		||||
    url = realm_icon_url(user_profile.realm)
 | 
			
		||||
 | 
			
		||||
    # We can rely on the url already having query parameters. Because
 | 
			
		||||
 
 | 
			
		||||
@@ -21,8 +21,7 @@ import os
 | 
			
		||||
js_source_map = None
 | 
			
		||||
 | 
			
		||||
# Read the source map information for decoding JavaScript backtraces.
 | 
			
		||||
def get_js_source_map():
 | 
			
		||||
    # type: () -> Optional[SourceMap]
 | 
			
		||||
def get_js_source_map() -> Optional[SourceMap]:
 | 
			
		||||
    global js_source_map
 | 
			
		||||
    if not js_source_map and not (settings.DEVELOPMENT or settings.TEST_SUITE):
 | 
			
		||||
        js_source_map = SourceMap([
 | 
			
		||||
 
 | 
			
		||||
@@ -9,8 +9,8 @@ from zerver.lib.actions import do_change_notification_settings, clear_scheduled_
 | 
			
		||||
from zerver.models import UserProfile, ScheduledEmail
 | 
			
		||||
from zerver.context_processors import common_context
 | 
			
		||||
 | 
			
		||||
def process_unsubscribe(request, token, subscription_type, unsubscribe_function):
 | 
			
		||||
    # type: (HttpRequest, str, str, Callable[[UserProfile], None]) -> HttpResponse
 | 
			
		||||
def process_unsubscribe(request: HttpRequest, token: str, subscription_type: str,
 | 
			
		||||
                        unsubscribe_function: Callable[[UserProfile], None]) -> HttpResponse:
 | 
			
		||||
    try:
 | 
			
		||||
        confirmation = Confirmation.objects.get(confirmation_key=token)
 | 
			
		||||
    except Confirmation.DoesNotExist:
 | 
			
		||||
@@ -25,16 +25,13 @@ def process_unsubscribe(request, token, subscription_type, unsubscribe_function)
 | 
			
		||||
# Email unsubscribe functions. All have the function signature
 | 
			
		||||
# processor(user_profile).
 | 
			
		||||
 | 
			
		||||
def do_missedmessage_unsubscribe(user_profile):
 | 
			
		||||
    # type: (UserProfile) -> None
 | 
			
		||||
def do_missedmessage_unsubscribe(user_profile: UserProfile) -> None:
 | 
			
		||||
    do_change_notification_settings(user_profile, 'enable_offline_email_notifications', False)
 | 
			
		||||
 | 
			
		||||
def do_welcome_unsubscribe(user_profile):
 | 
			
		||||
    # type: (UserProfile) -> None
 | 
			
		||||
def do_welcome_unsubscribe(user_profile: UserProfile) -> None:
 | 
			
		||||
    clear_scheduled_emails(user_profile.id, ScheduledEmail.WELCOME)
 | 
			
		||||
 | 
			
		||||
def do_digest_unsubscribe(user_profile):
 | 
			
		||||
    # type: (UserProfile) -> None
 | 
			
		||||
def do_digest_unsubscribe(user_profile: UserProfile) -> None:
 | 
			
		||||
    do_change_notification_settings(user_profile, 'enable_digest_emails', False)
 | 
			
		||||
 | 
			
		||||
# The keys are part of the URL for the unsubscribe link and must be valid
 | 
			
		||||
@@ -48,8 +45,8 @@ email_unsubscribers = {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
# Login NOT required. These are for one-click unsubscribes.
 | 
			
		||||
def email_unsubscribe(request, email_type, confirmation_key):
 | 
			
		||||
    # type: (HttpRequest, str, str) -> HttpResponse
 | 
			
		||||
def email_unsubscribe(request: HttpRequest, email_type: str,
 | 
			
		||||
                      confirmation_key: str) -> HttpResponse:
 | 
			
		||||
    if email_type in email_unsubscribers:
 | 
			
		||||
        display_name, unsubscribe_function = email_unsubscribers[email_type]
 | 
			
		||||
        return process_unsubscribe(request, confirmation_key, display_name, unsubscribe_function)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user