Annotate zerver/views/__init__.py.

Also fix typing errors in a few related files.

[with tweaks from tabbott]
This commit is contained in:
Deborah Hanus
2016-06-05 02:58:33 +00:00
committed by Tim Abbott
parent c9bb93b0d2
commit a261a6bbac
3 changed files with 93 additions and 33 deletions

View File

@@ -2842,7 +2842,7 @@ def apply_events(state, events, user_profile):
def do_events_register(user_profile, user_client, apply_markdown=True,
event_types=None, queue_lifespan_secs=0, all_public_streams=False,
narrow=[]):
# type: (UserProfile, Client, bool, Optional[Iterable[str]], int, bool, Iterable[Sequence[str]]) -> Dict[str, Any]
# type: (UserProfile, Client, bool, Optional[Iterable[str]], int, bool, Iterable[Sequence[text_type]]) -> Dict[str, Any]
# Technically we don't need to check this here because
# build_narrow_filter will check it, but it's nicer from an error
# handling perspective to do it before contacting Tornado
@@ -3113,7 +3113,7 @@ def do_set_alert_words(user_profile, alert_words):
notify_alert_words(user_profile, alert_words)
def do_set_muted_topics(user_profile, muted_topics):
# type: (UserProfile, List[Union[List[text_type], Tuple[text_type, text_type]]]) -> None
# type: (UserProfile, Union[List[List[text_type]], List[Tuple[text_type, text_type]]]) -> None
user_profile.muted_topics = ujson.dumps(muted_topics)
user_profile.save(update_fields=['muted_topics'])
event = dict(type="muted_topics", muted_topics=muted_topics)

View File

@@ -383,7 +383,7 @@ class EventsRegisterTest(AuthedTestCase):
('type', equals('muted_topics')),
('muted_topics', check_list(check_list(check_string, 2))),
])
events = self.do_test(lambda: do_set_muted_topics(self.user_profile, [["Denmark", "topic"]]))
events = self.do_test(lambda: do_set_muted_topics(self.user_profile, [[u"Denmark", u"topic"]]))
error = muted_topics_checker('events[0]', events[0])
self.assert_on_error(error)
@@ -958,22 +958,22 @@ class TestEventsRegisterNarrowDefaults(TestCase):
# type: () -> None
self.user_profile.default_events_register_stream_id = None
self.user_profile.save()
result = _default_narrow(self.user_profile, [('stream', 'my_stream')])
self.assertEqual(result, [('stream', 'my_stream')])
result = _default_narrow(self.user_profile, [[u'stream', u'my_stream']])
self.assertEqual(result, [[u'stream', u'my_stream']])
def test_use_passed_narrow_with_default(self):
# type: () -> None
self.user_profile.default_events_register_stream_id = self.stream.id
self.user_profile.save()
result = _default_narrow(self.user_profile, [('stream', 'my_stream')])
self.assertEqual(result, [('stream', 'my_stream')])
result = _default_narrow(self.user_profile, [[u'stream', u'my_stream']])
self.assertEqual(result, [[u'stream', u'my_stream']])
def test_use_default_if_narrow_is_empty(self):
# type: () -> None
self.user_profile.default_events_register_stream_id = self.stream.id
self.user_profile.save()
result = _default_narrow(self.user_profile, [])
self.assertEqual(result, [('stream', 'Verona')])
self.assertEqual(result, [[u'stream', u'Verona']])
def test_use_narrow_if_default_is_none(self):
# type: () -> None

View File

@@ -1,11 +1,11 @@
from __future__ import absolute_import
from typing import Any
from typing import Any, List, Dict, Optional, Callable, Tuple
from django.utils.translation import ugettext as _
from django.conf import settings
from django.contrib.auth import authenticate, login, get_backends
from django.core.urlresolvers import reverse
from django.http import HttpResponseRedirect, HttpResponseForbidden, HttpResponse
from django.http import HttpResponseRedirect, HttpResponseForbidden, HttpResponse, HttpRequest
from django.shortcuts import redirect
from django.template import RequestContext, loader
from django.utils.timezone import now
@@ -61,6 +61,7 @@ import datetime
import ujson
import simplejson
import re
from six import text_type
from six.moves import urllib
import base64
import time
@@ -74,10 +75,12 @@ from zerver.lib.rest import rest_dispatch as _rest_dispatch
rest_dispatch = csrf_exempt((lambda request, *args, **kwargs: _rest_dispatch(request, globals(), *args, **kwargs)))
def name_changes_disabled(realm):
# type: (Realm) -> bool
return settings.NAME_CHANGES_DISABLED or realm.name_changes_disabled
@require_post
def accounts_register(request):
# type: (HttpRequest) -> HttpResponse
key = request.POST['key']
confirmation = Confirmation.objects.get(confirmation_key=key)
prereg_user = confirmation.content_object
@@ -267,6 +270,7 @@ from zerver.lib.ccache import make_ccache
@has_request_variables
def webathena_kerberos_login(request, user_profile,
cred=REQ(default=None)):
# type (HttpRequest, UserProfile, str) -> HttpResponse
if cred is None:
return json_error(_("Could not find Kerberos credential"))
if not user_profile.realm.domain == "mit.edu":
@@ -298,6 +302,7 @@ def webathena_kerberos_login(request, user_profile,
return json_success()
def api_endpoint_docs(request):
# type: (HttpRequest) -> HttpResponse
raw_calls = open('templates/zerver/api_content.json', 'r').read()
calls = ujson.loads(raw_calls)
langs = set()
@@ -323,11 +328,12 @@ def api_endpoint_docs(request):
@authenticated_json_post_view
@has_request_variables
def json_invite_users(request, user_profile, invitee_emails=REQ()):
if not invitee_emails:
def json_invite_users(request, user_profile, invitee_emails_raw=REQ("invitee_emails")):
# type: (HttpRequest, UserProfile, str) -> HttpResponse
if not invitee_emails_raw:
return json_error(_("You must specify at least one email address."))
invitee_emails = set(re.split(r'[, \n]', invitee_emails))
invitee_emails = set(re.split(r'[, \n]', invitee_emails_raw))
stream_names = request.POST.getlist('stream')
if not stream_names:
@@ -354,6 +360,7 @@ def json_invite_users(request, user_profile, invitee_emails=REQ()):
return json_success()
def create_homepage_form(request, user_info=None):
# type: (HttpRequest, Optional[Dict[str, Any]]) -> HomepageForm
if user_info:
return HomepageForm(user_info, domain=request.session.get("domain"))
# An empty fields dict is not treated the same way as not
@@ -361,6 +368,7 @@ def create_homepage_form(request, user_info=None):
return HomepageForm(domain=request.session.get("domain"))
def maybe_send_to_registration(request, email, full_name=''):
# type: (HttpRequest, text_type, text_type) -> HttpResponse
form = create_homepage_form(request, user_info={'email': email})
request.verified_email = None
if form.is_valid():
@@ -390,6 +398,7 @@ def maybe_send_to_registration(request, email, full_name=''):
request=request)
def login_or_register_remote_user(request, remote_username, user_profile, full_name=''):
# type: (HttpRequest, str, UserProfile, text_type) -> HttpResponse
if user_profile is None or user_profile.is_mirror_dummy:
# Since execution has reached here, the client specified a remote user
# but no associated user account exists. Send them over to the
@@ -401,6 +410,7 @@ def login_or_register_remote_user(request, remote_username, user_profile, full_n
request.get_host()))
def remote_user_sso(request):
# type: (HttpRequest) -> HttpResponse
try:
remote_user = request.META["REMOTE_USER"]
except KeyError:
@@ -411,6 +421,7 @@ def remote_user_sso(request):
@csrf_exempt
def remote_user_jwt(request):
# type: (HttpRequest) -> HttpResponse
try:
json_web_token = request.POST["json_web_token"]
payload, signing_input, header, signature = jwt.load(json_web_token)
@@ -445,9 +456,11 @@ def remote_user_jwt(request):
return login_or_register_remote_user(request, email, user_profile, remote_user)
def google_oauth2_csrf(request, value):
# type: (HttpRequest, str) -> HttpResponse
return hmac.new(get_token(request).encode('utf-8'), value, hashlib.sha256).hexdigest()
def start_google_oauth2(request):
# type: (HttpRequest) -> HttpResponse
uri = 'https://accounts.google.com/o/oauth2/auth?'
cur_time = str(int(time.time()))
csrf_state = '{}:{}'.format(
@@ -471,12 +484,14 @@ def start_google_oauth2(request):
# from a property to a function
requests_json_is_function = callable(requests.Response.json)
def extract_json_response(resp):
# type: (HttpResponse) -> Dict[str, Any]
if requests_json_is_function:
return resp.json()
else:
return resp.json
def finish_google_oauth2(request):
# type: (HttpRequest) -> HttpResponse
error = request.GET.get('error')
if error == 'access_denied':
return redirect('/')
@@ -538,6 +553,7 @@ def finish_google_oauth2(request):
return login_or_register_remote_user(request, email_address, user_profile, full_name)
def login_page(request, **kwargs):
# type: (HttpRequest, **Any) -> HttpResponse
extra_context = kwargs.pop('extra_context', {})
if dev_auth_enabled():
# Development environments usually have only a few users, but
@@ -559,6 +575,7 @@ def login_page(request, **kwargs):
return template_response
def dev_direct_login(request, **kwargs):
# type: (HttpRequest, **Any) -> HttpResponse
# This function allows logging in without a password and should only be called in development environments.
# It may be called if the DevAuthBackend is included in settings.AUTHENTICATION_BACKENDS
if (not dev_auth_enabled()) or settings.PRODUCTION:
@@ -575,8 +592,10 @@ def dev_direct_login(request, **kwargs):
@authenticated_json_post_view
@has_request_variables
def json_bulk_invite_users(request, user_profile,
invitee_emails=REQ(validator=check_list(check_string))):
invitee_emails = set(invitee_emails)
invitee_emails_list=REQ('invitee_emails',
validator=check_list(check_string))):
# type: (HttpRequest, UserProfile, List[str]) -> HttpResponse
invitee_emails = set(invitee_emails_list)
streams = get_default_subs(user_profile)
ret_error, error_data = do_invite_users(user_profile, invitee_emails, streams)
@@ -594,6 +613,7 @@ def json_bulk_invite_users(request, user_profile,
@zulip_login_required
def initial_invite_page(request):
# type: (HttpRequest) -> HttpResponse
user = request.user
# Only show the bulk-invite page for the first user in a realm
domain_count = len(UserProfile.objects.filter(realm=user.realm))
@@ -610,9 +630,11 @@ def initial_invite_page(request):
@require_post
def logout_then_login(request, **kwargs):
# type: (HttpRequest, **Any) -> HttpResponse
return django_logout_then_login(request, kwargs)
def create_preregistration_user(email, request):
# type: (text_type, HttpRequest) -> HttpResponse
domain = request.session.get("domain")
if completely_open(domain):
# Clear the "domain" from the session object; it's no longer needed
@@ -631,6 +653,7 @@ def create_preregistration_user(email, request):
return PreregistrationUser.objects.create(email=email)
def accounts_home_with_domain(request, domain):
# type: (HttpRequest, str) -> HttpResponse
if completely_open(domain):
# You can sign up for a completely open realm through a
# special registration path that contains the domain in the
@@ -643,6 +666,7 @@ def accounts_home_with_domain(request, domain):
return HttpResponseRedirect(reverse('zerver.views.accounts_home'))
def send_registration_completion_email(email, request):
# type: (str, HttpRequest) -> HttpResponse
"""
Send an email with a confirmation link to the provided e-mail so the user
can complete their registration.
@@ -654,6 +678,7 @@ def send_registration_completion_email(email, request):
additional_context=context)
def accounts_home(request):
# type: (HttpRequest) -> HttpResponse
if request.method == 'POST':
form = create_homepage_form(request, user_info=request.POST)
if form.is_valid():
@@ -673,6 +698,7 @@ def accounts_home(request):
request=request)
def approximate_unread_count(user_profile):
# type: (UserProfile) -> int
not_in_home_view_recipients = [sub.recipient.id for sub in \
Subscription.objects.filter(
user_profile=user_profile, in_home_view=False)]
@@ -692,6 +718,7 @@ def approximate_unread_count(user_profile):
flags=UserMessage.flags.read).count()
def sent_time_in_epoch_seconds(user_message):
# type: (UserMessage) -> float
# user_message is a UserMessage object.
if not user_message:
return None
@@ -701,6 +728,7 @@ def sent_time_in_epoch_seconds(user_message):
@zulip_login_required
def home(request):
# type: (HttpRequest) -> HttpResponse
# We need to modify the session object every two weeks or it will expire.
# This line makes reloading the page a sufficient action to keep the
# session alive.
@@ -892,9 +920,11 @@ def home(request):
@zulip_login_required
def desktop_home(request):
# type: (HttpRequest) -> HttpResponse
return HttpResponseRedirect(reverse('zerver.views.home'))
def is_buggy_ua(agent):
# type: (str) -> bool
"""Discrimiate CSS served to clients based on User Agent
Due to QTBUG-3467, @font-face is not supported in QtWebKit.
@@ -905,11 +935,13 @@ def is_buggy_ua(agent):
"Mac" not in agent
def get_pointer_backend(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
return json_success({'pointer': user_profile.pointer})
@has_request_variables
def update_pointer_backend(request, user_profile,
pointer=REQ(converter=to_non_negative_int)):
# type: (HttpRequest, UserProfile, int) -> HttpResponse
if pointer <= user_profile.pointer:
return json_success()
@@ -928,12 +960,14 @@ def update_pointer_backend(request, user_profile,
return json_success()
def generate_client_id():
# type: () -> str
return generate_random_token(32)
# The order of creation of the various dictionaries are important.
# We filter on {userprofile,stream,subscription_recipient}_ids.
@require_realm_admin
def export(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
if (Message.objects.filter(sender__realm=user_profile.realm).count() > 1000000 or
UserMessage.objects.filter(user_profile__realm=user_profile.realm).count() > 3000000):
return json_error(_("Realm has too much data for non-batched export."))
@@ -1000,6 +1034,7 @@ def export(request, user_profile):
return json_success(response)
def get_profile_backend(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
result = dict(pointer = user_profile.pointer,
client_id = generate_client_id(),
max_message_id = -1)
@@ -1017,8 +1052,9 @@ def update_realm(request, user_profile, name=REQ(validator=check_string, default
invite_required=REQ(validator=check_bool, default=None),
invite_by_admins_only=REQ(validator=check_bool, default=None),
create_stream_by_admins_only=REQ(validator=check_bool, default=None)):
# type: (HttpRequest, UserProfile, Optional[str], Optional[bool], Optional[bool], Optional[bool], Optional[bool]) -> HttpResponse
realm = user_profile.realm
data = {}
data = {} # type: Dict[str, Any]
if name is not None and realm.name != name:
do_set_realm_name(realm, name)
data['name'] = 'updated'
@@ -1039,6 +1075,7 @@ def update_realm(request, user_profile, name=REQ(validator=check_string, default
@authenticated_json_post_view
@has_request_variables
def json_upload_file(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
if len(request.FILES) == 0:
return json_error(_("You must specify a file to upload"))
if len(request.FILES) != 1:
@@ -1053,15 +1090,16 @@ def json_upload_file(request, user_profile):
@zulip_login_required
@has_request_variables
def get_uploaded_file(request, realm_id, filename,
def get_uploaded_file(request, realm_id_str, filename,
redir=REQ(validator=check_bool, default=True)):
# type: (HttpRequest, str, str, bool) -> HttpResponse
if settings.LOCAL_UPLOADS_DIR is not None:
return HttpResponseForbidden() # Should have been served by nginx
user_profile = request.user
url_path = "%s/%s" % (realm_id, filename)
url_path = "%s/%s" % (realm_id_str, filename)
if realm_id == "unk":
if realm_id_str == "unk":
realm_id = get_realm_for_filename(url_path)
if realm_id is None:
# File does not exist
@@ -1081,7 +1119,7 @@ def get_uploaded_file(request, realm_id, filename,
@require_post
@has_request_variables
def api_fetch_api_key(request, username=REQ(), password=REQ()):
# type: (Any, Any, Any) -> Any
# type: (HttpRequest, str, str) -> HttpResponse
return_data = {} # type: Dict[str, bool]
if username == "google-oauth2-token":
user_profile = authenticate(google_oauth2_token=password, return_data=return_data)
@@ -1103,23 +1141,27 @@ def api_fetch_api_key(request, username=REQ(), password=REQ()):
@authenticated_json_post_view
@has_request_variables
def json_fetch_api_key(request, user_profile, password=REQ(default='')):
# type: (HttpRequest, UserProfile, str) -> HttpResponse
if password_auth_enabled(user_profile.realm) and not user_profile.check_password(password):
return json_error(_("Your username or password is incorrect."))
return json_success({"api_key": user_profile.api_key})
@csrf_exempt
def api_fetch_google_client_id(request):
# type: (HttpRequest) -> HttpResponse
if not settings.GOOGLE_CLIENT_ID:
return json_error(_("GOOGLE_CLIENT_ID is not configured"), status=400)
return json_success({"google_client_id": settings.GOOGLE_CLIENT_ID})
def get_status_list(requesting_user_profile):
# type: (UserProfile) -> Dict[str, Any]
return {'presences': get_status_dict(requesting_user_profile),
'server_timestamp': time.time()}
@has_request_variables
def update_active_status_backend(request, user_profile, status=REQ(),
new_user_input=REQ(validator=check_bool, default=False)):
# type: (HttpRequest, UserProfile, str, bool) -> HttpResponse
status_val = UserPresence.status_from_string(status)
if status_val is None:
raise JsonableError(_("Invalid presence status: %s") % (status,))
@@ -1144,6 +1186,7 @@ def update_active_status_backend(request, user_profile, status=REQ(),
@authenticated_json_post_view
def json_get_active_statuses(request, user_profile):
# type: (HttpRequest, UserProfile) -> HttpResponse
return json_success(get_status_list(user_profile))
# Does not need to be authenticated because it's called from rest_dispatch
@@ -1151,20 +1194,23 @@ def json_get_active_statuses(request, user_profile):
def api_events_register(request, user_profile,
apply_markdown=REQ(default=False, validator=check_bool),
all_public_streams=REQ(default=None, validator=check_bool)):
# type: (HttpRequest, UserProfile, bool, Optional[bool]) -> HttpResponse
return events_register_backend(request, user_profile,
apply_markdown=apply_markdown,
all_public_streams=all_public_streams)
def _default_all_public_streams(user_profile, all_public_streams):
# type: (UserProfile, Optional[bool]) -> bool
if all_public_streams is not None:
return all_public_streams
else:
return user_profile.default_all_public_streams
def _default_narrow(user_profile, narrow):
# type: (UserProfile, List[List[text_type]]) -> List[List[text_type]]
default_stream = user_profile.default_events_register_stream
if not narrow and user_profile.default_events_register_stream is not None:
narrow = [('stream', default_stream.name)]
narrow = [['stream', default_stream.name]]
return narrow
@has_request_variables
@@ -1173,7 +1219,7 @@ def events_register_backend(request, user_profile, apply_markdown=True,
event_types=REQ(validator=check_list(check_string), default=None),
narrow=REQ(validator=check_list(check_list(check_string, length=2)), default=[]),
queue_lifespan_secs=REQ(converter=int, default=0)):
# type: (HttpRequest, UserProfile, bool, Optional[bool], Optional[List[str]], List[List[text_type]], int) -> HttpResponse
all_public_streams = _default_all_public_streams(user_profile, all_public_streams)
narrow = _default_narrow(user_profile, narrow)
@@ -1186,6 +1232,7 @@ def events_register_backend(request, user_profile, apply_markdown=True,
@authenticated_json_post_view
@has_request_variables
def json_refer_friend(request, user_profile, email=REQ()):
# type: (HttpRequest, UserProfile, str) -> HttpResponse
if not email:
return json_error(_("No email address specified"))
if user_profile.invites_granted - user_profile.invites_used <= 0:
@@ -1199,20 +1246,22 @@ def json_refer_friend(request, user_profile, email=REQ()):
@has_request_variables
def json_set_muted_topics(request, user_profile,
muted_topics=REQ(validator=check_list(check_list(check_string, length=2)), default=[])):
# type: (HttpRequest, UserProfile, List[List[text_type]]) -> HttpResponse
do_set_muted_topics(user_profile, muted_topics)
return json_success()
def add_push_device_token(request, user_profile, token, kind, ios_app_id=None):
if token == '' or len(token) > 4096:
def add_push_device_token(request, user_profile, token_str, kind, ios_app_id=None):
# type: (HttpRequest, UserProfile, str, int, Optional[str]) -> HttpResponse
if token_str == '' or len(token_str) > 4096:
return json_error(_('Empty or invalid length token'))
# If another user was previously logged in on the same device and didn't
# properly log out, the token will still be registered to the wrong account
PushDeviceToken.objects.filter(token=token).delete()
PushDeviceToken.objects.filter(token=token_str).delete()
# Overwrite with the latest value
token, created = PushDeviceToken.objects.get_or_create(user=user_profile,
token=token,
token=token_str,
kind=kind,
ios_app_id=ios_app_id)
if not created:
@@ -1223,18 +1272,21 @@ def add_push_device_token(request, user_profile, token, kind, ios_app_id=None):
@has_request_variables
def add_apns_device_token(request, user_profile, token=REQ(), appid=REQ(default=settings.ZULIP_IOS_APP_ID)):
# type: (HttpRequest, UserProfile, str, str) -> HttpResponse
return add_push_device_token(request, user_profile, token, PushDeviceToken.APNS, ios_app_id=appid)
@has_request_variables
def add_android_reg_id(request, user_profile, token=REQ()):
return add_push_device_token(request, user_profile, token, PushDeviceToken.GCM)
def add_android_reg_id(request, user_profile, token_str=REQ("token")):
# type: (HttpRequest, UserProfile, str) -> HttpResponse
return add_push_device_token(request, user_profile, token_str, PushDeviceToken.GCM)
def remove_push_device_token(request, user_profile, token, kind):
if token == '' or len(token) > 4096:
def remove_push_device_token(request, user_profile, token_str, kind):
# type: (HttpRequest, UserProfile, str, int) -> HttpResponse
if token_str == '' or len(token_str) > 4096:
return json_error(_('Empty or invalid length token'))
try:
token = PushDeviceToken.objects.get(token=token, kind=kind)
token = PushDeviceToken.objects.get(token=token_str, kind=kind)
token.delete()
except PushDeviceToken.DoesNotExist:
return json_error(_("Token does not exist"))
@@ -1243,17 +1295,21 @@ def remove_push_device_token(request, user_profile, token, kind):
@has_request_variables
def remove_apns_device_token(request, user_profile, token=REQ()):
# type: (HttpRequest, UserProfile, str) -> HttpResponse
return remove_push_device_token(request, user_profile, token, PushDeviceToken.APNS)
@has_request_variables
def remove_android_reg_id(request, user_profile, token=REQ()):
# type: (HttpRequest, UserProfile, str) -> HttpResponse
return remove_push_device_token(request, user_profile, token, PushDeviceToken.GCM)
def generate_204(request):
# type: (HttpRequest) -> HttpResponse
return HttpResponse(content=None, status=204)
def process_unsubscribe(token, type, unsubscribe_function):
def process_unsubscribe(token, subscription_type, unsubscribe_function):
# type: (HttpRequest, str, Callable[[UserProfile], None]) -> HttpResponse
try:
confirmation = Confirmation.objects.get(confirmation_key=token)
except Confirmation.DoesNotExist:
@@ -1262,19 +1318,22 @@ def process_unsubscribe(token, type, unsubscribe_function):
user_profile = confirmation.content_object
unsubscribe_function(user_profile)
return render_to_response('zerver/unsubscribe_success.html',
{"subscription_type": type,
{"subscription_type": subscription_type,
"external_host": settings.EXTERNAL_HOST})
# Email unsubscribe functions. All have the function signature
# processor(user_profile).
def do_missedmessage_unsubscribe(user_profile):
# type: (UserProfile) -> None
do_change_enable_offline_email_notifications(user_profile, False)
def do_welcome_unsubscribe(user_profile):
# type: (UserProfile) -> None
clear_followup_emails_queue(user_profile.email)
def do_digest_unsubscribe(user_profile):
# type: (UserProfile) -> None
do_change_enable_digest_emails(user_profile, False)
# The keys are part of the URL for the unsubscribe link and must be valid
@@ -1289,6 +1348,7 @@ email_unsubscribers = {
# Login NOT required. These are for one-click unsubscribes.
def email_unsubscribe(request, type, token):
# type: (HttpRequest, str, str) -> HttpResponse
if type in email_unsubscribers:
display_name, unsubscribe_function = email_unsubscribers[type]
return process_unsubscribe(token, display_name, unsubscribe_function)