mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	Extract function `validate_user_custom_profile_data` to validate user's custom profile field values according to field type.
		
			
				
	
	
		
			235 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			235 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from typing import Dict, List, Optional, Union, cast
 | 
						|
 | 
						|
from django.db.models.query import QuerySet
 | 
						|
from django.utils.translation import ugettext as _
 | 
						|
from django.conf import settings
 | 
						|
 | 
						|
from zerver.lib.cache import generic_bulk_cached_fetch, user_profile_cache_key_id, \
 | 
						|
    user_profile_by_id_cache_key
 | 
						|
from zerver.lib.request import JsonableError
 | 
						|
from zerver.lib.avatar import avatar_url
 | 
						|
from zerver.models import UserProfile, Service, Realm, \
 | 
						|
    get_user_profile_by_id, query_for_ids, get_user_profile_by_id_in_realm, \
 | 
						|
    CustomProfileField
 | 
						|
 | 
						|
from zulip_bots.custom_exceptions import ConfigValidationError
 | 
						|
 | 
						|
def check_full_name(full_name_raw: str) -> str:
 | 
						|
    full_name = full_name_raw.strip()
 | 
						|
    if len(full_name) > UserProfile.MAX_NAME_LENGTH:
 | 
						|
        raise JsonableError(_("Name too long!"))
 | 
						|
    if len(full_name) < UserProfile.MIN_NAME_LENGTH:
 | 
						|
        raise JsonableError(_("Name too short!"))
 | 
						|
    if list(set(full_name).intersection(UserProfile.NAME_INVALID_CHARS)):
 | 
						|
        raise JsonableError(_("Invalid characters in name!"))
 | 
						|
    return full_name
 | 
						|
 | 
						|
# NOTE: We don't try to absolutely prevent 2 bots from having the same
 | 
						|
# name (e.g. you can get there by reactivating a deactivated bot after
 | 
						|
# making a new bot with the same name).  This is just a check designed
 | 
						|
# to make it unlikely to happen by accident.
 | 
						|
def check_bot_name_available(realm_id: int, full_name: str) -> None:
 | 
						|
    dup_exists = UserProfile.objects.filter(
 | 
						|
        realm_id=realm_id,
 | 
						|
        full_name=full_name.strip(),
 | 
						|
        is_active=True,
 | 
						|
    ).exists()
 | 
						|
 | 
						|
    if dup_exists:
 | 
						|
        raise JsonableError(_("Name is already in use!"))
 | 
						|
 | 
						|
def check_short_name(short_name_raw: str) -> str:
 | 
						|
    short_name = short_name_raw.strip()
 | 
						|
    if len(short_name) == 0:
 | 
						|
        raise JsonableError(_("Bad name or username"))
 | 
						|
    return short_name
 | 
						|
 | 
						|
def check_valid_bot_config(service_name: str, config_data: Dict[str, str]) -> None:
 | 
						|
    try:
 | 
						|
        from zerver.lib.bot_lib import get_bot_handler
 | 
						|
        bot_handler = get_bot_handler(service_name)
 | 
						|
        if hasattr(bot_handler, 'validate_config'):
 | 
						|
            bot_handler.validate_config(config_data)
 | 
						|
    except ConfigValidationError:
 | 
						|
        # The exception provides a specific error message, but that
 | 
						|
        # message is not tagged translatable, because it is
 | 
						|
        # triggered in the external zulip_bots package.
 | 
						|
        # TODO: Think of some clever way to provide a more specific
 | 
						|
        # error message.
 | 
						|
        raise JsonableError(_("Invalid configuration data!"))
 | 
						|
 | 
						|
# Adds an outgoing webhook or embedded bot service.
 | 
						|
def add_service(name: str, user_profile: UserProfile, base_url: Optional[str]=None,
 | 
						|
                interface: Optional[int]=None, token: Optional[str]=None) -> None:
 | 
						|
    Service.objects.create(name=name,
 | 
						|
                           user_profile=user_profile,
 | 
						|
                           base_url=base_url,
 | 
						|
                           interface=interface,
 | 
						|
                           token=token)
 | 
						|
 | 
						|
def check_bot_creation_policy(user_profile: UserProfile, bot_type: int) -> None:
 | 
						|
    # Realm administrators can always add bot
 | 
						|
    if user_profile.is_realm_admin:
 | 
						|
        return
 | 
						|
 | 
						|
    if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_EVERYONE:
 | 
						|
        return
 | 
						|
    if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_ADMINS_ONLY:
 | 
						|
        raise JsonableError(_("Must be an organization administrator"))
 | 
						|
    if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_LIMIT_GENERIC_BOTS and \
 | 
						|
            bot_type == UserProfile.DEFAULT_BOT:
 | 
						|
        raise JsonableError(_("Must be an organization administrator"))
 | 
						|
 | 
						|
def check_valid_bot_type(user_profile: UserProfile, bot_type: int) -> None:
 | 
						|
    if bot_type not in user_profile.allowed_bot_types:
 | 
						|
        raise JsonableError(_('Invalid bot type'))
 | 
						|
 | 
						|
def check_valid_interface_type(interface_type: Optional[int]) -> None:
 | 
						|
    if interface_type not in Service.ALLOWED_INTERFACE_TYPES:
 | 
						|
        raise JsonableError(_('Invalid interface type'))
 | 
						|
 | 
						|
def bulk_get_users(emails: List[str], realm: Optional[Realm],
 | 
						|
                   base_query: 'QuerySet[UserProfile]'=None) -> Dict[str, UserProfile]:
 | 
						|
    if base_query is None:
 | 
						|
        assert realm is not None
 | 
						|
        query = UserProfile.objects.filter(realm=realm, is_active=True)
 | 
						|
        realm_id = realm.id
 | 
						|
    else:
 | 
						|
        # WARNING: Currently, this code path only really supports one
 | 
						|
        # version of `base_query` being used (because otherwise,
 | 
						|
        # they'll share the cache, which can screw up the filtering).
 | 
						|
        # If you're using this flow, you'll need to re-do any filters
 | 
						|
        # in base_query in the code itself; base_query is just a perf
 | 
						|
        # optimization.
 | 
						|
        query = base_query
 | 
						|
        realm_id = 0
 | 
						|
 | 
						|
    def fetch_users_by_email(emails: List[str]) -> List[UserProfile]:
 | 
						|
        # This should be just
 | 
						|
        #
 | 
						|
        # UserProfile.objects.select_related("realm").filter(email__iexact__in=emails,
 | 
						|
        #                                                    realm=realm)
 | 
						|
        #
 | 
						|
        # But chaining __in and __iexact doesn't work with Django's
 | 
						|
        # ORM, so we have the following hack to construct the relevant where clause
 | 
						|
        if len(emails) == 0:
 | 
						|
            return []
 | 
						|
 | 
						|
        upper_list = ", ".join(["UPPER(%s)"] * len(emails))
 | 
						|
        where_clause = "UPPER(zerver_userprofile.email::text) IN (%s)" % (upper_list,)
 | 
						|
        return query.select_related("realm").extra(
 | 
						|
            where=[where_clause],
 | 
						|
            params=emails)
 | 
						|
 | 
						|
    return generic_bulk_cached_fetch(
 | 
						|
        # Use a separate cache key to protect us from conflicts with
 | 
						|
        # the get_user cache.
 | 
						|
        lambda email: 'bulk_get_users:' + user_profile_cache_key_id(email, realm_id),
 | 
						|
        fetch_users_by_email,
 | 
						|
        [email.lower() for email in emails],
 | 
						|
        id_fetcher=lambda user_profile: user_profile.email.lower()
 | 
						|
    )
 | 
						|
 | 
						|
def user_ids_to_users(user_ids: List[int], realm: Realm) -> List[UserProfile]:
 | 
						|
    # TODO: Consider adding a flag to control whether deactivated
 | 
						|
    # users should be included.
 | 
						|
 | 
						|
    def fetch_users_by_id(user_ids: List[int]) -> List[UserProfile]:
 | 
						|
        if len(user_ids) == 0:
 | 
						|
            return []
 | 
						|
 | 
						|
        return list(UserProfile.objects.filter(id__in=user_ids).select_related())
 | 
						|
 | 
						|
    user_profiles_by_id = generic_bulk_cached_fetch(
 | 
						|
        cache_key_function=user_profile_by_id_cache_key,
 | 
						|
        query_function=fetch_users_by_id,
 | 
						|
        object_ids=user_ids
 | 
						|
    )  # type: Dict[int, UserProfile]
 | 
						|
 | 
						|
    found_user_ids = user_profiles_by_id.keys()
 | 
						|
    missed_user_ids = [user_id for user_id in user_ids if user_id not in found_user_ids]
 | 
						|
    if missed_user_ids:
 | 
						|
        raise JsonableError(_("Invalid user ID: %s" % (missed_user_ids[0])))
 | 
						|
 | 
						|
    user_profiles = list(user_profiles_by_id.values())
 | 
						|
    for user_profile in user_profiles:
 | 
						|
        if user_profile.realm != realm:
 | 
						|
            raise JsonableError(_("Invalid user ID: %s" % (user_profile.id,)))
 | 
						|
    return user_profiles
 | 
						|
 | 
						|
def access_bot_by_id(user_profile: UserProfile, user_id: int) -> UserProfile:
 | 
						|
    try:
 | 
						|
        target = get_user_profile_by_id_in_realm(user_id, user_profile.realm)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        raise JsonableError(_("No such bot"))
 | 
						|
    if not target.is_bot:
 | 
						|
        raise JsonableError(_("No such bot"))
 | 
						|
    if not user_profile.can_admin_user(target):
 | 
						|
        raise JsonableError(_("Insufficient permission"))
 | 
						|
    return target
 | 
						|
 | 
						|
def access_user_by_id(user_profile: UserProfile, user_id: int,
 | 
						|
                      allow_deactivated: bool=False, allow_bots: bool=False) -> UserProfile:
 | 
						|
    try:
 | 
						|
        target = get_user_profile_by_id_in_realm(user_id, user_profile.realm)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        raise JsonableError(_("No such user"))
 | 
						|
    if target.is_bot and not allow_bots:
 | 
						|
        raise JsonableError(_("No such user"))
 | 
						|
    if not target.is_active and not allow_deactivated:
 | 
						|
        raise JsonableError(_("User is deactivated"))
 | 
						|
    if not user_profile.can_admin_user(target):
 | 
						|
        raise JsonableError(_("Insufficient permission"))
 | 
						|
    return target
 | 
						|
 | 
						|
def get_accounts_for_email(email: str) -> List[Dict[str, Optional[str]]]:
 | 
						|
    if settings.PRODUCTION:  # nocoverage
 | 
						|
        return []
 | 
						|
    profiles = UserProfile.objects.select_related('realm').filter(email__iexact=email.strip(),
 | 
						|
                                                                  is_active=True,
 | 
						|
                                                                  is_bot=False,
 | 
						|
                                                                  realm__deactivated=False)
 | 
						|
    return [{"realm_name": profile.realm.name,
 | 
						|
             "string_id": profile.realm.string_id,
 | 
						|
             "full_name": profile.full_name,
 | 
						|
             "avatar": avatar_url(profile)}
 | 
						|
            for profile in profiles]
 | 
						|
 | 
						|
def get_api_key(user_profile: UserProfile) -> str:
 | 
						|
    return user_profile.api_key
 | 
						|
 | 
						|
def get_all_api_keys(user_profile: UserProfile) -> List[str]:
 | 
						|
    # Users can only have one API key for now
 | 
						|
    return [user_profile.api_key]
 | 
						|
 | 
						|
def validate_user_custom_profile_data(realm_id: int,
 | 
						|
                                      profile_data: List[Dict[str, Union[int, str, List[int]]]]) -> None:
 | 
						|
    # This function validate all custom field values according to their field type.
 | 
						|
    for item in profile_data:
 | 
						|
        field_id = item['id']
 | 
						|
        try:
 | 
						|
            field = CustomProfileField.objects.get(id=field_id)
 | 
						|
        except CustomProfileField.DoesNotExist:
 | 
						|
            raise JsonableError(_('Field id {id} not found.').format(id=field_id))
 | 
						|
 | 
						|
        validators = CustomProfileField.FIELD_VALIDATORS
 | 
						|
        field_type = field.field_type
 | 
						|
        var_name = '{}'.format(field.name)
 | 
						|
        value = item['value']
 | 
						|
        if field_type in validators:
 | 
						|
            validator = validators[field_type]
 | 
						|
            result = validator(var_name, value)
 | 
						|
        elif field_type == CustomProfileField.CHOICE:
 | 
						|
            choice_field_validator = CustomProfileField.CHOICE_FIELD_VALIDATORS[field_type]
 | 
						|
            field_data = field.field_data
 | 
						|
            result = choice_field_validator(var_name, field_data, value)
 | 
						|
        elif field_type == CustomProfileField.USER:
 | 
						|
            user_field_validator = CustomProfileField.USER_FIELD_VALIDATORS[field_type]
 | 
						|
            result = user_field_validator(realm_id, cast(List[int], value),
 | 
						|
                                          False)
 | 
						|
        else:
 | 
						|
            raise AssertionError("Invalid field type")
 | 
						|
 | 
						|
        if result is not None:
 | 
						|
            raise JsonableError(result)
 |