mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			393 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			393 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import absolute_import
 | 
						|
from typing import Text, Union, Optional, Dict, Any, List, Tuple
 | 
						|
 | 
						|
import os
 | 
						|
import simplejson as json
 | 
						|
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
 | 
						|
from django.utils.translation import ugettext as _
 | 
						|
from django.shortcuts import redirect
 | 
						|
from django.conf import settings
 | 
						|
from six.moves import map
 | 
						|
 | 
						|
from zerver.decorator import has_request_variables, REQ, JsonableError, \
 | 
						|
    require_realm_admin
 | 
						|
from zerver.forms import CreateUserForm
 | 
						|
from zerver.lib.actions import do_change_avatar_fields, do_change_bot_owner, \
 | 
						|
    do_change_is_admin, do_change_default_all_public_streams, \
 | 
						|
    do_change_default_events_register_stream, do_change_default_sending_stream, \
 | 
						|
    do_create_user, do_deactivate_user, do_reactivate_user, do_regenerate_api_key
 | 
						|
from zerver.lib.avatar import avatar_url, get_avatar_url
 | 
						|
from zerver.lib.response import json_error, json_success
 | 
						|
from zerver.lib.streams import access_stream_by_name
 | 
						|
from zerver.lib.upload import upload_avatar_image
 | 
						|
from zerver.lib.validator import check_bool, check_string
 | 
						|
from zerver.lib.users import check_change_full_name, check_full_name
 | 
						|
from zerver.lib.utils import generate_random_token
 | 
						|
from zerver.models import UserProfile, Stream, Realm, Message, get_user_profile_by_email, \
 | 
						|
    email_allowed_for_realm, get_user_profile_by_id
 | 
						|
from zproject.jinja2 import render_to_response
 | 
						|
 | 
						|
 | 
						|
def deactivate_user_backend(request, user_profile, email):
 | 
						|
    # type: (HttpRequest, UserProfile, Text) -> HttpResponse
 | 
						|
    try:
 | 
						|
        target = get_user_profile_by_email(email)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        return json_error(_('No such user'))
 | 
						|
    if target.is_bot:
 | 
						|
        return json_error(_('No such user'))
 | 
						|
    if check_last_admin(target):
 | 
						|
        return json_error(_('Cannot deactivate the only organization administrator'))
 | 
						|
    return _deactivate_user_profile_backend(request, user_profile, target)
 | 
						|
 | 
						|
def deactivate_user_own_backend(request, user_profile):
 | 
						|
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
						|
 | 
						|
    if user_profile.is_realm_admin and check_last_admin(user_profile):
 | 
						|
        return json_error(_('Cannot deactivate the only organization administrator'))
 | 
						|
    do_deactivate_user(user_profile)
 | 
						|
    return json_success()
 | 
						|
 | 
						|
def check_last_admin(user_profile):
 | 
						|
    # type: (UserProfile) -> bool
 | 
						|
    admins = set(user_profile.realm.get_admin_users())
 | 
						|
    return user_profile.is_realm_admin and len(admins) == 1
 | 
						|
 | 
						|
def deactivate_bot_backend(request, user_profile, email):
 | 
						|
    # type: (HttpRequest, UserProfile, Text) -> HttpResponse
 | 
						|
    try:
 | 
						|
        target = get_user_profile_by_email(email)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        return json_error(_('No such bot'))
 | 
						|
    if not target.is_bot:
 | 
						|
        return json_error(_('No such bot'))
 | 
						|
    return _deactivate_user_profile_backend(request, user_profile, target)
 | 
						|
 | 
						|
def _deactivate_user_profile_backend(request, user_profile, target):
 | 
						|
    # type: (HttpRequest, UserProfile, UserProfile) -> HttpResponse
 | 
						|
    if not user_profile.can_admin_user(target):
 | 
						|
        return json_error(_('Insufficient permission'))
 | 
						|
 | 
						|
    do_deactivate_user(target)
 | 
						|
    return json_success()
 | 
						|
 | 
						|
def reactivate_user_backend(request, user_profile, email):
 | 
						|
    # type: (HttpRequest, UserProfile, Text) -> HttpResponse
 | 
						|
    try:
 | 
						|
        target = get_user_profile_by_email(email)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        return json_error(_('No such user'))
 | 
						|
 | 
						|
    if not user_profile.can_admin_user(target):
 | 
						|
        return json_error(_('Insufficient permission'))
 | 
						|
 | 
						|
    do_reactivate_user(target)
 | 
						|
    return json_success()
 | 
						|
 | 
						|
@has_request_variables
 | 
						|
def update_user_backend(request, user_profile, email,
 | 
						|
                        full_name=REQ(default="", validator=check_string),
 | 
						|
                        is_admin=REQ(default=None, validator=check_bool)):
 | 
						|
    # type: (HttpRequest, UserProfile, Text, Optional[Text], Optional[bool]) -> HttpResponse
 | 
						|
    try:
 | 
						|
        target = get_user_profile_by_email(email)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        return json_error(_('No such user'))
 | 
						|
 | 
						|
    if not user_profile.can_admin_user(target):
 | 
						|
        return json_error(_('Insufficient permission'))
 | 
						|
 | 
						|
    if is_admin is not None:
 | 
						|
        if not is_admin and check_last_admin(user_profile):
 | 
						|
            return json_error(_('Cannot remove the only organization administrator'))
 | 
						|
        do_change_is_admin(target, is_admin)
 | 
						|
 | 
						|
    if (full_name is not None and target.full_name != full_name and
 | 
						|
            full_name.strip() != ""):
 | 
						|
        # We don't respect `name_changes_disabled` here because the request
 | 
						|
        # is on behalf of the administrator.
 | 
						|
        check_change_full_name(target, full_name)
 | 
						|
 | 
						|
    return json_success()
 | 
						|
 | 
						|
# TODO: Since eventually we want to support using the same email with
 | 
						|
# different organizations, we'll eventually want this to be a
 | 
						|
# logged-in endpoint so that we can access the realm_id.
 | 
						|
def avatar(request, email_or_id, medium=None):
 | 
						|
    # type: (HttpRequest, str, bool) -> HttpResponse
 | 
						|
    """Accepts an email address or user ID and returns the avatar"""
 | 
						|
    try:
 | 
						|
        int(email_or_id)
 | 
						|
    except ValueError:
 | 
						|
        get_user_func = get_user_profile_by_email
 | 
						|
    else:
 | 
						|
        get_user_func = get_user_profile_by_id
 | 
						|
 | 
						|
    try:
 | 
						|
        # If there is a valid user account passed in, use its avatar
 | 
						|
        user_profile = get_user_func(email_or_id)
 | 
						|
        url = avatar_url(user_profile, medium=medium)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        # If there is no such user, treat it as a new gravatar
 | 
						|
        email = email_or_id
 | 
						|
        avatar_source = 'G'
 | 
						|
        avatar_version = 1
 | 
						|
        url = get_avatar_url(avatar_source, email, avatar_version, medium=medium)
 | 
						|
 | 
						|
    # We can rely on the url already having query parameters. Because
 | 
						|
    # our templates depend on being able to use the ampersand to
 | 
						|
    # add query parameters to our url, get_avatar_url does '?x=x'
 | 
						|
    # hacks to prevent us from having to jump through decode/encode hoops.
 | 
						|
    assert '?' in url
 | 
						|
    url += '&' + request.META['QUERY_STRING']
 | 
						|
    return redirect(url)
 | 
						|
 | 
						|
def get_stream_name(stream):
 | 
						|
    # type: (Optional[Stream]) -> Optional[Text]
 | 
						|
    if stream:
 | 
						|
        return stream.name
 | 
						|
    return None
 | 
						|
 | 
						|
@has_request_variables
 | 
						|
def patch_bot_backend(request, user_profile, email,
 | 
						|
                      full_name=REQ(default=None),
 | 
						|
                      bot_owner=REQ(default=None),
 | 
						|
                      default_sending_stream=REQ(default=None),
 | 
						|
                      default_events_register_stream=REQ(default=None),
 | 
						|
                      default_all_public_streams=REQ(default=None, validator=check_bool)):
 | 
						|
    # type: (HttpRequest, UserProfile, Text, Optional[Text], Optional[Text], Optional[Text], Optional[Text], Optional[bool]) -> HttpResponse
 | 
						|
    try:
 | 
						|
        bot = get_user_profile_by_email(email)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        return json_error(_('No such user'))
 | 
						|
 | 
						|
    if not user_profile.can_admin_user(bot):
 | 
						|
        return json_error(_('Insufficient permission'))
 | 
						|
 | 
						|
    if full_name is not None:
 | 
						|
        check_change_full_name(bot, full_name)
 | 
						|
    if bot_owner is not None:
 | 
						|
        owner = get_user_profile_by_email(bot_owner)
 | 
						|
        do_change_bot_owner(bot, owner)
 | 
						|
    if default_sending_stream is not None:
 | 
						|
        if default_sending_stream == "":
 | 
						|
            stream = None  # type: Optional[Stream]
 | 
						|
        else:
 | 
						|
            (stream, recipient, sub) = access_stream_by_name(
 | 
						|
                user_profile, default_sending_stream)
 | 
						|
        do_change_default_sending_stream(bot, stream)
 | 
						|
    if default_events_register_stream is not None:
 | 
						|
        if default_events_register_stream == "":
 | 
						|
            stream = None
 | 
						|
        else:
 | 
						|
            (stream, recipient, sub) = access_stream_by_name(
 | 
						|
                user_profile, default_events_register_stream)
 | 
						|
        do_change_default_events_register_stream(bot, stream)
 | 
						|
    if default_all_public_streams is not None:
 | 
						|
        do_change_default_all_public_streams(bot, default_all_public_streams)
 | 
						|
 | 
						|
    if len(request.FILES) == 0:
 | 
						|
        pass
 | 
						|
    elif len(request.FILES) == 1:
 | 
						|
        user_file = list(request.FILES.values())[0]
 | 
						|
        upload_avatar_image(user_file, user_profile, bot)
 | 
						|
        avatar_source = UserProfile.AVATAR_FROM_USER
 | 
						|
        do_change_avatar_fields(bot, avatar_source)
 | 
						|
    else:
 | 
						|
        return json_error(_("You may only upload one file at a time"))
 | 
						|
 | 
						|
    json_result = dict(
 | 
						|
        full_name=bot.full_name,
 | 
						|
        avatar_url=avatar_url(bot),
 | 
						|
        default_sending_stream=get_stream_name(bot.default_sending_stream),
 | 
						|
        default_events_register_stream=get_stream_name(bot.default_events_register_stream),
 | 
						|
        default_all_public_streams=bot.default_all_public_streams,
 | 
						|
    )
 | 
						|
 | 
						|
    # Don't include the bot owner in case it is not set.
 | 
						|
    # Default bots have no owner.
 | 
						|
    if bot.bot_owner is not None:
 | 
						|
        json_result['bot_owner'] = bot.bot_owner.email
 | 
						|
 | 
						|
    return json_success(json_result)
 | 
						|
 | 
						|
@has_request_variables
 | 
						|
def regenerate_bot_api_key(request, user_profile, email):
 | 
						|
    # type: (HttpRequest, UserProfile, Text) -> HttpResponse
 | 
						|
    try:
 | 
						|
        bot = get_user_profile_by_email(email)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        return json_error(_('No such user'))
 | 
						|
 | 
						|
    if not user_profile.can_admin_user(bot):
 | 
						|
        return json_error(_('Insufficient permission'))
 | 
						|
 | 
						|
    do_regenerate_api_key(bot)
 | 
						|
    json_result = dict(
 | 
						|
        api_key = bot.api_key
 | 
						|
    )
 | 
						|
    return json_success(json_result)
 | 
						|
 | 
						|
@has_request_variables
 | 
						|
def add_bot_backend(request, user_profile, full_name_raw=REQ("full_name"), short_name=REQ(),
 | 
						|
                    default_sending_stream_name=REQ('default_sending_stream', default=None),
 | 
						|
                    default_events_register_stream_name=REQ('default_events_register_stream', default=None),
 | 
						|
                    default_all_public_streams=REQ(validator=check_bool, default=None)):
 | 
						|
    # type: (HttpRequest, UserProfile, Text, Text, Optional[Text], Optional[Text], Optional[bool]) -> HttpResponse
 | 
						|
    short_name += "-bot"
 | 
						|
    full_name = check_full_name(full_name_raw)
 | 
						|
    email = short_name + "@" + user_profile.realm.domain
 | 
						|
    form = CreateUserForm({'full_name': full_name, 'email': email})
 | 
						|
    if not form.is_valid():
 | 
						|
        # We validate client-side as well
 | 
						|
        return json_error(_('Bad name or username'))
 | 
						|
 | 
						|
    try:
 | 
						|
        get_user_profile_by_email(email)
 | 
						|
        return json_error(_("Username already in use"))
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        pass
 | 
						|
 | 
						|
    if len(request.FILES) == 0:
 | 
						|
        avatar_source = UserProfile.AVATAR_FROM_GRAVATAR
 | 
						|
    elif len(request.FILES) != 1:
 | 
						|
        return json_error(_("You may only upload one file at a time"))
 | 
						|
    else:
 | 
						|
        avatar_source = UserProfile.AVATAR_FROM_USER
 | 
						|
 | 
						|
    default_sending_stream = None
 | 
						|
    if default_sending_stream_name is not None:
 | 
						|
        (default_sending_stream, ignored_rec, ignored_sub) = access_stream_by_name(
 | 
						|
            user_profile, default_sending_stream_name)
 | 
						|
 | 
						|
    default_events_register_stream = None
 | 
						|
    if default_events_register_stream_name is not None:
 | 
						|
        (default_events_register_stream, ignored_rec, ignored_sub) = access_stream_by_name(
 | 
						|
            user_profile, default_events_register_stream_name)
 | 
						|
 | 
						|
    bot_profile = do_create_user(email=email, password='',
 | 
						|
                                 realm=user_profile.realm, full_name=full_name,
 | 
						|
                                 short_name=short_name, active=True,
 | 
						|
                                 bot_type=UserProfile.DEFAULT_BOT,
 | 
						|
                                 bot_owner=user_profile,
 | 
						|
                                 avatar_source=avatar_source,
 | 
						|
                                 default_sending_stream=default_sending_stream,
 | 
						|
                                 default_events_register_stream=default_events_register_stream,
 | 
						|
                                 default_all_public_streams=default_all_public_streams)
 | 
						|
    if len(request.FILES) == 1:
 | 
						|
        user_file = list(request.FILES.values())[0]
 | 
						|
        upload_avatar_image(user_file, user_profile, bot_profile)
 | 
						|
    json_result = dict(
 | 
						|
        api_key=bot_profile.api_key,
 | 
						|
        avatar_url=avatar_url(bot_profile),
 | 
						|
        default_sending_stream=get_stream_name(bot_profile.default_sending_stream),
 | 
						|
        default_events_register_stream=get_stream_name(bot_profile.default_events_register_stream),
 | 
						|
        default_all_public_streams=bot_profile.default_all_public_streams,
 | 
						|
    )
 | 
						|
    return json_success(json_result)
 | 
						|
 | 
						|
def get_bots_backend(request, user_profile):
 | 
						|
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
						|
    bot_profiles = UserProfile.objects.filter(is_bot=True, is_active=True,
 | 
						|
                                              bot_owner=user_profile)
 | 
						|
    bot_profiles = bot_profiles.select_related('default_sending_stream', 'default_events_register_stream')
 | 
						|
    bot_profiles = bot_profiles.order_by('date_joined')
 | 
						|
 | 
						|
    def bot_info(bot_profile):
 | 
						|
        # type: (UserProfile) -> Dict[str, Any]
 | 
						|
        default_sending_stream = get_stream_name(bot_profile.default_sending_stream)
 | 
						|
        default_events_register_stream = get_stream_name(bot_profile.default_events_register_stream)
 | 
						|
 | 
						|
        return dict(
 | 
						|
            username=bot_profile.email,
 | 
						|
            full_name=bot_profile.full_name,
 | 
						|
            api_key=bot_profile.api_key,
 | 
						|
            avatar_url=avatar_url(bot_profile),
 | 
						|
            default_sending_stream=default_sending_stream,
 | 
						|
            default_events_register_stream=default_events_register_stream,
 | 
						|
            default_all_public_streams=bot_profile.default_all_public_streams,
 | 
						|
        )
 | 
						|
 | 
						|
    return json_success({'bots': list(map(bot_info, bot_profiles))})
 | 
						|
 | 
						|
def get_members_backend(request, user_profile):
 | 
						|
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
						|
    realm = user_profile.realm
 | 
						|
    admins = set(user_profile.realm.get_admin_users())
 | 
						|
    members = []
 | 
						|
    for profile in UserProfile.objects.select_related().filter(realm=realm):
 | 
						|
        member = {"full_name": profile.full_name,
 | 
						|
                  "is_bot": profile.is_bot,
 | 
						|
                  "is_active": profile.is_active,
 | 
						|
                  "is_admin": (profile in admins),
 | 
						|
                  "email": profile.email,
 | 
						|
                  "user_id": profile.id,
 | 
						|
                  "avatar_url": avatar_url(profile)}
 | 
						|
        if profile.is_bot and profile.bot_owner is not None:
 | 
						|
            member["bot_owner"] = profile.bot_owner.email
 | 
						|
        members.append(member)
 | 
						|
    return json_success({'members': members})
 | 
						|
 | 
						|
@require_realm_admin
 | 
						|
@has_request_variables
 | 
						|
def create_user_backend(request, user_profile, email=REQ(), password=REQ(),
 | 
						|
                        full_name_raw=REQ("full_name"), short_name=REQ()):
 | 
						|
    # type: (HttpRequest, UserProfile, Text, Text, Text, Text) -> HttpResponse
 | 
						|
    full_name = check_full_name(full_name_raw)
 | 
						|
    form = CreateUserForm({'full_name': full_name, 'email': email})
 | 
						|
    if not form.is_valid():
 | 
						|
        return json_error(_('Bad name or username'))
 | 
						|
 | 
						|
    # Check that the new user's email address belongs to the admin's realm
 | 
						|
    # (Since this is an admin API, we don't require the user to have been
 | 
						|
    # invited first.)
 | 
						|
    realm = user_profile.realm
 | 
						|
    if not email_allowed_for_realm(email, user_profile.realm):
 | 
						|
        return json_error(_("Email '%(email)s' does not belong to domain '%(domain)s'") %
 | 
						|
                          {'email': email, 'domain': realm.domain})
 | 
						|
 | 
						|
    try:
 | 
						|
        get_user_profile_by_email(email)
 | 
						|
        return json_error(_("Email '%s' already in use") % (email,))
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        pass
 | 
						|
 | 
						|
    do_create_user(email, password, realm, full_name, short_name)
 | 
						|
    return json_success()
 | 
						|
 | 
						|
def generate_client_id():
 | 
						|
    # type: () -> Text
 | 
						|
    return generate_random_token(32)
 | 
						|
 | 
						|
def get_profile_backend(request, user_profile):
 | 
						|
    # type: (HttpRequest, UserProfile) -> HttpResponse
 | 
						|
    result = dict(pointer        = user_profile.pointer,
 | 
						|
                  client_id      = generate_client_id(),
 | 
						|
                  max_message_id = -1,
 | 
						|
                  user_id        = user_profile.id,
 | 
						|
                  full_name      = user_profile.full_name,
 | 
						|
                  email          = user_profile.email,
 | 
						|
                  is_bot         = user_profile.is_bot,
 | 
						|
                  is_admin       = user_profile.is_realm_admin,
 | 
						|
                  short_name     = user_profile.short_name)
 | 
						|
 | 
						|
    messages = Message.objects.filter(usermessage__user_profile=user_profile).order_by('-id')[:1]
 | 
						|
    if messages:
 | 
						|
        result['max_message_id'] = messages[0].id
 | 
						|
 | 
						|
    return json_success(result)
 | 
						|
 | 
						|
def authors_view(request):
 | 
						|
    # type: (HttpRequest) -> HttpResponse
 | 
						|
 | 
						|
    with open(settings.CONTRIBUTORS_DATA) as f:
 | 
						|
        data = json.load(f)
 | 
						|
 | 
						|
    return render_to_response(
 | 
						|
        'zerver/authors.html',
 | 
						|
        data,
 | 
						|
        request=request
 | 
						|
    )
 |