mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	This is a significant piece of the remaining effort required to eliminate the catch-all zerver/views/__init__.py.
		
			
				
	
	
		
			433 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			433 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
from __future__ import absolute_import
 | 
						|
from typing import Any, List, Dict, Optional, Text
 | 
						|
 | 
						|
from django.utils.translation import ugettext as _
 | 
						|
from django.conf import settings
 | 
						|
from django.contrib.auth import authenticate, login, get_backends
 | 
						|
from django.core.urlresolvers import reverse
 | 
						|
from django.http import HttpResponseRedirect, HttpResponseForbidden, HttpResponse, HttpRequest
 | 
						|
from django.shortcuts import redirect
 | 
						|
from django.template import RequestContext, loader
 | 
						|
from django.utils.timezone import now
 | 
						|
from django.core.exceptions import ValidationError
 | 
						|
from django.core import validators
 | 
						|
from django.core.mail import send_mail
 | 
						|
from zerver.models import UserProfile, Realm, PreregistrationUser, \
 | 
						|
    name_changes_disabled, email_to_username, \
 | 
						|
    completely_open, get_unique_open_realm, email_allowed_for_realm, \
 | 
						|
    get_realm, get_realm_by_email_domain
 | 
						|
from zerver.lib.actions import do_change_password, do_change_full_name, do_change_is_admin, \
 | 
						|
    do_activate_user, do_create_user, do_create_realm, set_default_streams, \
 | 
						|
    do_events_register, user_email_is_unique, \
 | 
						|
    compute_mit_user_fullname, do_set_muted_topics
 | 
						|
from zerver.forms import RegistrationForm, HomepageForm, RealmCreationForm, \
 | 
						|
    CreateUserForm, FindMyTeamForm
 | 
						|
from zerver.lib.actions import is_inactive
 | 
						|
from django_auth_ldap.backend import LDAPBackend, _LDAPUser
 | 
						|
from zerver.lib.validator import check_string, check_list
 | 
						|
from zerver.decorator import require_post, authenticated_json_post_view, \
 | 
						|
    has_request_variables, \
 | 
						|
    JsonableError, get_user_profile_by_email, REQ, \
 | 
						|
    zulip_login_required
 | 
						|
from zerver.lib.response import json_success, json_error
 | 
						|
from zerver.lib.utils import get_subdomain
 | 
						|
from zproject.backends import password_auth_enabled
 | 
						|
 | 
						|
from confirmation.models import Confirmation, RealmCreationKey, check_key_is_valid
 | 
						|
 | 
						|
import logging
 | 
						|
import requests
 | 
						|
import ujson
 | 
						|
 | 
						|
from six.moves import urllib
 | 
						|
from zproject.jinja2 import render_to_response
 | 
						|
 | 
						|
def redirect_and_log_into_subdomain(realm, full_name, email_address):
 | 
						|
    # type: (Realm, Text, Text) -> HttpResponse
 | 
						|
    subdomain_login_uri = ''.join([
 | 
						|
        realm.uri,
 | 
						|
        reverse('zerver.views.auth.log_into_subdomain')
 | 
						|
    ])
 | 
						|
 | 
						|
    domain = '.' + settings.EXTERNAL_HOST.split(':')[0]
 | 
						|
    response = redirect(subdomain_login_uri)
 | 
						|
 | 
						|
    data = {'name': full_name, 'email': email_address, 'subdomain': realm.subdomain}
 | 
						|
    # Creating a singed cookie so that it cannot be tampered with.
 | 
						|
    # Cookie and the signature expire in 15 seconds.
 | 
						|
    response.set_signed_cookie('subdomain.signature',
 | 
						|
                               ujson.dumps(data),
 | 
						|
                               expires=15,
 | 
						|
                               domain=domain,
 | 
						|
                               salt='zerver.views.auth')
 | 
						|
    return response
 | 
						|
 | 
						|
@require_post
 | 
						|
def accounts_register(request):
 | 
						|
    # type: (HttpRequest) -> HttpResponse
 | 
						|
    key = request.POST['key']
 | 
						|
    confirmation = Confirmation.objects.get(confirmation_key=key)
 | 
						|
    prereg_user = confirmation.content_object
 | 
						|
    email = prereg_user.email
 | 
						|
    realm_creation = prereg_user.realm_creation
 | 
						|
    try:
 | 
						|
        existing_user_profile = get_user_profile_by_email(email)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        existing_user_profile = None
 | 
						|
 | 
						|
    validators.validate_email(email)
 | 
						|
    # If OPEN_REALM_CREATION is enabled all user sign ups should go through the
 | 
						|
    # special URL with domain name so that REALM can be identified if multiple realms exist
 | 
						|
    unique_open_realm = get_unique_open_realm()
 | 
						|
    if unique_open_realm is not None:
 | 
						|
        realm = unique_open_realm
 | 
						|
    elif prereg_user.referred_by:
 | 
						|
        # If someone invited you, you are joining their realm regardless
 | 
						|
        # of your e-mail address.
 | 
						|
        realm = prereg_user.referred_by.realm
 | 
						|
    elif prereg_user.realm:
 | 
						|
        # You have a realm set, even though nobody referred you. This
 | 
						|
        # happens if you sign up through a special URL for an open realm.
 | 
						|
        realm = prereg_user.realm
 | 
						|
    elif realm_creation:
 | 
						|
        # For creating a new realm, there is no existing realm or domain
 | 
						|
        realm = None
 | 
						|
    elif settings.REALMS_HAVE_SUBDOMAINS:
 | 
						|
        realm = get_realm(get_subdomain(request))
 | 
						|
    else:
 | 
						|
        realm = get_realm_by_email_domain(email)
 | 
						|
 | 
						|
    if realm and not email_allowed_for_realm(email, realm):
 | 
						|
        return render_to_response("zerver/closed_realm.html", {"closed_domain_name": realm.name})
 | 
						|
 | 
						|
    if realm and realm.deactivated:
 | 
						|
        # The user is trying to register for a deactivated realm. Advise them to
 | 
						|
        # contact support.
 | 
						|
        return render_to_response("zerver/deactivated.html",
 | 
						|
                                  {"deactivated_domain_name": realm.name,
 | 
						|
                                   "zulip_administrator": settings.ZULIP_ADMINISTRATOR})
 | 
						|
 | 
						|
    try:
 | 
						|
        if existing_user_profile is not None and existing_user_profile.is_mirror_dummy:
 | 
						|
            # Mirror dummy users to be activated must be inactive
 | 
						|
            is_inactive(email)
 | 
						|
        else:
 | 
						|
            # Other users should not already exist at all.
 | 
						|
            user_email_is_unique(email)
 | 
						|
    except ValidationError:
 | 
						|
        return HttpResponseRedirect(reverse('django.contrib.auth.views.login') + '?email=' +
 | 
						|
                                    urllib.parse.quote_plus(email))
 | 
						|
 | 
						|
    name_validated = False
 | 
						|
    full_name = None
 | 
						|
 | 
						|
    if request.POST.get('from_confirmation'):
 | 
						|
        try:
 | 
						|
            del request.session['authenticated_full_name']
 | 
						|
        except KeyError:
 | 
						|
            pass
 | 
						|
        if realm is not None and realm.is_zephyr_mirror_realm:
 | 
						|
            # For MIT users, we can get an authoritative name from Hesiod.
 | 
						|
            # Technically we should check that this is actually an MIT
 | 
						|
            # realm, but we can cross that bridge if we ever get a non-MIT
 | 
						|
            # zephyr mirroring realm.
 | 
						|
            hesiod_name = compute_mit_user_fullname(email)
 | 
						|
            form = RegistrationForm(
 | 
						|
                    initial={'full_name': hesiod_name if "@" not in hesiod_name else ""})
 | 
						|
            name_validated = True
 | 
						|
        elif settings.POPULATE_PROFILE_VIA_LDAP:
 | 
						|
            for backend in get_backends():
 | 
						|
                if isinstance(backend, LDAPBackend):
 | 
						|
                    ldap_attrs = _LDAPUser(backend, backend.django_to_ldap_username(email)).attrs
 | 
						|
                    try:
 | 
						|
                        ldap_full_name = ldap_attrs[settings.AUTH_LDAP_USER_ATTR_MAP['full_name']][0]
 | 
						|
                        request.session['authenticated_full_name'] = ldap_full_name
 | 
						|
                        name_validated = True
 | 
						|
                        # We don't use initial= here, because if the form is
 | 
						|
                        # complete (that is, no additional fields need to be
 | 
						|
                        # filled out by the user) we want the form to validate,
 | 
						|
                        # so they can be directly registered without having to
 | 
						|
                        # go through this interstitial.
 | 
						|
                        form = RegistrationForm({'full_name': ldap_full_name})
 | 
						|
                        # FIXME: This will result in the user getting
 | 
						|
                        # validation errors if they have to enter a password.
 | 
						|
                        # Not relevant for ONLY_SSO, though.
 | 
						|
                        break
 | 
						|
                    except TypeError:
 | 
						|
                        # Let the user fill out a name and/or try another backend
 | 
						|
                        form = RegistrationForm()
 | 
						|
        elif 'full_name' in request.POST:
 | 
						|
            form = RegistrationForm(
 | 
						|
                initial={'full_name': request.POST.get('full_name')}
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            form = RegistrationForm()
 | 
						|
    else:
 | 
						|
        postdata = request.POST.copy()
 | 
						|
        if name_changes_disabled(realm):
 | 
						|
            # If we populate profile information via LDAP and we have a
 | 
						|
            # verified name from you on file, use that. Otherwise, fall
 | 
						|
            # back to the full name in the request.
 | 
						|
            try:
 | 
						|
                postdata.update({'full_name': request.session['authenticated_full_name']})
 | 
						|
                name_validated = True
 | 
						|
            except KeyError:
 | 
						|
                pass
 | 
						|
        form = RegistrationForm(postdata)
 | 
						|
        if not password_auth_enabled(realm):
 | 
						|
            form['password'].field.required = False
 | 
						|
 | 
						|
    if form.is_valid():
 | 
						|
        if password_auth_enabled(realm):
 | 
						|
            password = form.cleaned_data['password']
 | 
						|
        else:
 | 
						|
            # SSO users don't need no passwords
 | 
						|
            password = None
 | 
						|
 | 
						|
        if realm_creation:
 | 
						|
            string_id = form.cleaned_data['realm_subdomain']
 | 
						|
            realm_name = form.cleaned_data['realm_name']
 | 
						|
            org_type = int(form.cleaned_data['realm_org_type'])
 | 
						|
            realm = do_create_realm(string_id, realm_name, org_type=org_type)[0]
 | 
						|
 | 
						|
            set_default_streams(realm, settings.DEFAULT_NEW_REALM_STREAMS)
 | 
						|
 | 
						|
        full_name = form.cleaned_data['full_name']
 | 
						|
        short_name = email_to_username(email)
 | 
						|
        first_in_realm = len(UserProfile.objects.filter(realm=realm, is_bot=False)) == 0
 | 
						|
 | 
						|
        # FIXME: sanitize email addresses and fullname
 | 
						|
        if existing_user_profile is not None and existing_user_profile.is_mirror_dummy:
 | 
						|
            try:
 | 
						|
                user_profile = existing_user_profile
 | 
						|
                do_activate_user(user_profile)
 | 
						|
                do_change_password(user_profile, password)
 | 
						|
                do_change_full_name(user_profile, full_name)
 | 
						|
            except UserProfile.DoesNotExist:
 | 
						|
                user_profile = do_create_user(email, password, realm, full_name, short_name,
 | 
						|
                                              prereg_user=prereg_user,
 | 
						|
                                              tos_version=settings.TOS_VERSION,
 | 
						|
                                              newsletter_data={"IP": request.META['REMOTE_ADDR']})
 | 
						|
        else:
 | 
						|
            user_profile = do_create_user(email, password, realm, full_name, short_name,
 | 
						|
                                          prereg_user=prereg_user,
 | 
						|
                                          tos_version=settings.TOS_VERSION,
 | 
						|
                                          newsletter_data={"IP": request.META['REMOTE_ADDR']})
 | 
						|
 | 
						|
        if first_in_realm:
 | 
						|
            do_change_is_admin(user_profile, True)
 | 
						|
 | 
						|
        if realm_creation and settings.REALMS_HAVE_SUBDOMAINS:
 | 
						|
            # Because for realm creation, registration happens on the
 | 
						|
            # root domain, we need to log them into the subdomain for
 | 
						|
            # their new realm.
 | 
						|
            return redirect_and_log_into_subdomain(realm, full_name, email)
 | 
						|
 | 
						|
        # This dummy_backend check below confirms the user is
 | 
						|
        # authenticating to the correct subdomain.
 | 
						|
        return_data = {} # type: Dict[str, bool]
 | 
						|
        auth_result = authenticate(username=user_profile.email,
 | 
						|
                                   realm_subdomain=realm.subdomain,
 | 
						|
                                   return_data=return_data,
 | 
						|
                                   use_dummy_backend=True)
 | 
						|
        if return_data.get('invalid_subdomain'):
 | 
						|
            # By construction, this should never happen.
 | 
						|
            logging.error("Subdomain mismatch in registration %s: %s" % (
 | 
						|
                realm.subdomain, user_profile.email,))
 | 
						|
            return redirect('/')
 | 
						|
        login(request, auth_result)
 | 
						|
        return HttpResponseRedirect(realm.uri + reverse('zerver.views.home.home'))
 | 
						|
 | 
						|
    return render_to_response(
 | 
						|
        'zerver/register.html',
 | 
						|
        {'form': form,
 | 
						|
         'email': email,
 | 
						|
         'key': key,
 | 
						|
         'full_name': request.session.get('authenticated_full_name', None),
 | 
						|
         'lock_name': name_validated and name_changes_disabled(realm),
 | 
						|
         # password_auth_enabled is normally set via our context processor,
 | 
						|
         # but for the registration form, there is no logged in user yet, so
 | 
						|
         # we have to set it here.
 | 
						|
         'creating_new_team': realm_creation,
 | 
						|
         'realms_have_subdomains': settings.REALMS_HAVE_SUBDOMAINS,
 | 
						|
         'password_auth_enabled': password_auth_enabled(realm), }, request=request)
 | 
						|
 | 
						|
def create_preregistration_user(email, request, realm_creation=False):
 | 
						|
    # type: (Text, HttpRequest, bool) -> HttpResponse
 | 
						|
    realm_str = request.session.pop('realm_str', None)
 | 
						|
    if realm_str is not None:
 | 
						|
        # realm_str was set in accounts_home_with_realm_str.
 | 
						|
        # The user is trying to sign up for a completely open realm,
 | 
						|
        # so create them a PreregistrationUser for that realm
 | 
						|
        return PreregistrationUser.objects.create(email=email,
 | 
						|
                                                  realm=get_realm(realm_str),
 | 
						|
                                                  realm_creation=realm_creation)
 | 
						|
 | 
						|
    return PreregistrationUser.objects.create(email=email, realm_creation=realm_creation)
 | 
						|
 | 
						|
def accounts_home_with_realm_str(request, realm_str):
 | 
						|
    # type: (HttpRequest, str) -> HttpResponse
 | 
						|
    if not settings.REALMS_HAVE_SUBDOMAINS and completely_open(get_realm(realm_str)):
 | 
						|
        # You can sign up for a completely open realm through a
 | 
						|
        # special registration path that contains the domain in the
 | 
						|
        # URL. We store this information in the session rather than
 | 
						|
        # elsewhere because we don't have control over URL or form
 | 
						|
        # data for folks registering through OpenID.
 | 
						|
        request.session["realm_str"] = realm_str
 | 
						|
        return accounts_home(request)
 | 
						|
    else:
 | 
						|
        return HttpResponseRedirect(reverse('zerver.views.accounts_home'))
 | 
						|
 | 
						|
def send_registration_completion_email(email, request, realm_creation=False):
 | 
						|
    # type: (str, HttpRequest, bool) -> Confirmation
 | 
						|
    """
 | 
						|
    Send an email with a confirmation link to the provided e-mail so the user
 | 
						|
    can complete their registration.
 | 
						|
    """
 | 
						|
    prereg_user = create_preregistration_user(email, request, realm_creation)
 | 
						|
    context = {'support_email': settings.ZULIP_ADMINISTRATOR,
 | 
						|
               'verbose_support_offers': settings.VERBOSE_SUPPORT_OFFERS}
 | 
						|
    return Confirmation.objects.send_confirmation(prereg_user, email,
 | 
						|
                                                  additional_context=context,
 | 
						|
                                                  host=request.get_host())
 | 
						|
 | 
						|
def redirect_to_email_login_url(email):
 | 
						|
    # type: (str) -> HttpResponseRedirect
 | 
						|
    login_url = reverse('django.contrib.auth.views.login')
 | 
						|
    redirect_url = login_url + '?email=' + urllib.parse.quote_plus(email)
 | 
						|
    return HttpResponseRedirect(redirect_url)
 | 
						|
 | 
						|
def create_realm(request, creation_key=None):
 | 
						|
    # type: (HttpRequest, Optional[Text]) -> HttpResponse
 | 
						|
    if not settings.OPEN_REALM_CREATION:
 | 
						|
        if creation_key is None:
 | 
						|
            return render_to_response("zerver/realm_creation_failed.html",
 | 
						|
                                      {'message': _('New organization creation disabled.')})
 | 
						|
        elif not check_key_is_valid(creation_key):
 | 
						|
            return render_to_response("zerver/realm_creation_failed.html",
 | 
						|
                                      {'message': _('The organization creation link has been expired'
 | 
						|
                                                    ' or is not valid.')})
 | 
						|
 | 
						|
    # When settings.OPEN_REALM_CREATION is enabled, anyone can create a new realm,
 | 
						|
    # subject to a few restrictions on their email address.
 | 
						|
    if request.method == 'POST':
 | 
						|
        form = RealmCreationForm(request.POST)
 | 
						|
        if form.is_valid():
 | 
						|
            email = form.cleaned_data['email']
 | 
						|
            confirmation_key = send_registration_completion_email(email, request, realm_creation=True).confirmation_key
 | 
						|
            if settings.DEVELOPMENT:
 | 
						|
                request.session['confirmation_key'] = {'confirmation_key': confirmation_key}
 | 
						|
            if (creation_key is not None and check_key_is_valid(creation_key)):
 | 
						|
                RealmCreationKey.objects.get(creation_key=creation_key).delete()
 | 
						|
            return HttpResponseRedirect(reverse('send_confirm', kwargs={'email': email}))
 | 
						|
        try:
 | 
						|
            email = request.POST['email']
 | 
						|
            user_email_is_unique(email)
 | 
						|
        except ValidationError:
 | 
						|
            # Maybe the user is trying to log in
 | 
						|
            return redirect_to_email_login_url(email)
 | 
						|
    else:
 | 
						|
        form = RealmCreationForm()
 | 
						|
    return render_to_response('zerver/create_realm.html',
 | 
						|
                              {'form': form, 'current_url': request.get_full_path},
 | 
						|
                              request=request)
 | 
						|
 | 
						|
def confirmation_key(request):
 | 
						|
    # type: (HttpRequest) -> HttpResponse
 | 
						|
    return json_success(request.session.get('confirmation_key'))
 | 
						|
 | 
						|
def get_realm_from_request(request):
 | 
						|
    # type: (HttpRequest) -> Realm
 | 
						|
    if settings.REALMS_HAVE_SUBDOMAINS:
 | 
						|
        realm_str = get_subdomain(request)
 | 
						|
    else:
 | 
						|
        realm_str = request.session.get("realm_str")
 | 
						|
    return get_realm(realm_str)
 | 
						|
 | 
						|
def accounts_home(request):
 | 
						|
    # type: (HttpRequest) -> HttpResponse
 | 
						|
    realm = get_realm_from_request(request)
 | 
						|
    if request.method == 'POST':
 | 
						|
        form = HomepageForm(request.POST, realm=realm)
 | 
						|
        if form.is_valid():
 | 
						|
            email = form.cleaned_data['email']
 | 
						|
            send_registration_completion_email(email, request)
 | 
						|
            return HttpResponseRedirect(reverse('send_confirm', kwargs={'email': email}))
 | 
						|
        try:
 | 
						|
            email = request.POST['email']
 | 
						|
            # Note: We don't check for uniqueness
 | 
						|
            is_inactive(email)
 | 
						|
        except ValidationError:
 | 
						|
            return redirect_to_email_login_url(email)
 | 
						|
    else:
 | 
						|
        form = HomepageForm(realm=realm)
 | 
						|
    return render_to_response('zerver/accounts_home.html',
 | 
						|
                              {'form': form, 'current_url': request.get_full_path},
 | 
						|
                              request=request)
 | 
						|
 | 
						|
@authenticated_json_post_view
 | 
						|
@has_request_variables
 | 
						|
def json_set_muted_topics(request, user_profile,
 | 
						|
                          muted_topics=REQ(validator=check_list(check_list(check_string, length=2)), default=[])):
 | 
						|
    # type: (HttpRequest, UserProfile, List[List[Text]]) -> HttpResponse
 | 
						|
    do_set_muted_topics(user_profile, muted_topics)
 | 
						|
    return json_success()
 | 
						|
 | 
						|
def generate_204(request):
 | 
						|
    # type: (HttpRequest) -> HttpResponse
 | 
						|
    return HttpResponse(content=None, status=204)
 | 
						|
 | 
						|
try:
 | 
						|
    import mailer
 | 
						|
    send_mail = mailer.send_mail
 | 
						|
except ImportError:
 | 
						|
    # no mailer app present, stick with default
 | 
						|
    pass
 | 
						|
 | 
						|
def send_find_my_team_emails(user_profile):
 | 
						|
    # type: (UserProfile) -> None
 | 
						|
    text_template = 'zerver/emails/find_team/find_team_email.txt'
 | 
						|
    html_template = 'zerver/emails/find_team/find_team_email.html'
 | 
						|
    context = {'user_profile': user_profile}
 | 
						|
    text_content = loader.render_to_string(text_template, context)
 | 
						|
    html_content = loader.render_to_string(html_template, context)
 | 
						|
    sender = settings.NOREPLY_EMAIL_ADDRESS
 | 
						|
    recipients = [user_profile.email]
 | 
						|
    subject = loader.render_to_string('zerver/emails/find_team/find_team_email.subject').strip()
 | 
						|
 | 
						|
    send_mail(subject, text_content, sender, recipients, html_message=html_content)
 | 
						|
 | 
						|
def find_my_team(request):
 | 
						|
    # type: (HttpRequest) -> HttpResponse
 | 
						|
    url = reverse('zerver.views.find_my_team')
 | 
						|
 | 
						|
    emails = []  # type: List[Text]
 | 
						|
    if request.method == 'POST':
 | 
						|
        form = FindMyTeamForm(request.POST)
 | 
						|
        if form.is_valid():
 | 
						|
            emails = form.cleaned_data['emails']
 | 
						|
            for user_profile in UserProfile.objects.filter(email__in=emails):
 | 
						|
                send_find_my_team_emails(user_profile)
 | 
						|
 | 
						|
            # Note: Show all the emails in the result otherwise this
 | 
						|
            # feature can be used to ascertain which email addresses
 | 
						|
            # are associated with Zulip.
 | 
						|
            data = urllib.parse.urlencode({'emails': ','.join(emails)})
 | 
						|
            return redirect(url + "?" + data)
 | 
						|
    else:
 | 
						|
        form = FindMyTeamForm()
 | 
						|
        result = request.GET.get('emails')
 | 
						|
        if result:
 | 
						|
            for email in result.split(','):
 | 
						|
                try:
 | 
						|
                    validators.validate_email(email)
 | 
						|
                    emails.append(email)
 | 
						|
                except ValidationError:
 | 
						|
                    pass
 | 
						|
 | 
						|
    return render_to_response('zerver/find_my_team.html',
 | 
						|
                              {'form': form, 'current_url': lambda: url,
 | 
						|
                               'emails': emails},
 | 
						|
                              request=request)
 |