mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 14:03:30 +00:00
This extracts a function for computing show_invites and show_add_streams, for better readability and testability. This commit was substantially cleaned up by tabbott.
276 lines
12 KiB
Python
276 lines
12 KiB
Python
from typing import Dict, List, Optional, Tuple, Union, cast
|
|
|
|
import unicodedata
|
|
|
|
from django.db.models.query import QuerySet
|
|
from django.utils.translation import ugettext as _
|
|
|
|
from zerver.lib.cache import generic_bulk_cached_fetch, user_profile_cache_key_id, \
|
|
user_profile_by_id_cache_key
|
|
from zerver.lib.request import JsonableError
|
|
from zerver.lib.avatar import avatar_url
|
|
from zerver.lib.exceptions import OrganizationAdministratorRequired
|
|
from zerver.models import UserProfile, Service, Realm, \
|
|
get_user_profile_by_id_in_realm, \
|
|
CustomProfileField
|
|
|
|
from zulip_bots.custom_exceptions import ConfigValidationError
|
|
|
|
def check_full_name(full_name_raw: str) -> str:
|
|
full_name = full_name_raw.strip()
|
|
if len(full_name) > UserProfile.MAX_NAME_LENGTH:
|
|
raise JsonableError(_("Name too long!"))
|
|
if len(full_name) < UserProfile.MIN_NAME_LENGTH:
|
|
raise JsonableError(_("Name too short!"))
|
|
for character in full_name:
|
|
if (unicodedata.category(character)[0] == 'C' or
|
|
character in UserProfile.NAME_INVALID_CHARS):
|
|
raise JsonableError(_("Invalid characters in name!"))
|
|
return full_name
|
|
|
|
# NOTE: We don't try to absolutely prevent 2 bots from having the same
|
|
# name (e.g. you can get there by reactivating a deactivated bot after
|
|
# making a new bot with the same name). This is just a check designed
|
|
# to make it unlikely to happen by accident.
|
|
def check_bot_name_available(realm_id: int, full_name: str) -> None:
|
|
dup_exists = UserProfile.objects.filter(
|
|
realm_id=realm_id,
|
|
full_name=full_name.strip(),
|
|
is_active=True,
|
|
).exists()
|
|
|
|
if dup_exists:
|
|
raise JsonableError(_("Name is already in use!"))
|
|
|
|
def check_short_name(short_name_raw: str) -> str:
|
|
short_name = short_name_raw.strip()
|
|
if len(short_name) == 0:
|
|
raise JsonableError(_("Bad name or username"))
|
|
return short_name
|
|
|
|
def check_valid_bot_config(bot_type: int, service_name: str,
|
|
config_data: Dict[str, str]) -> None:
|
|
if bot_type == UserProfile.INCOMING_WEBHOOK_BOT:
|
|
from zerver.lib.integrations import WEBHOOK_INTEGRATIONS
|
|
config_options = None
|
|
for integration in WEBHOOK_INTEGRATIONS:
|
|
if integration.name == service_name:
|
|
# key: validator
|
|
config_options = {c[1]: c[2] for c in integration.config_options}
|
|
break
|
|
if not config_options:
|
|
raise JsonableError(_("Invalid integration '%s'.") % (service_name,))
|
|
|
|
missing_keys = set(config_options.keys()) - set(config_data.keys())
|
|
if missing_keys:
|
|
raise JsonableError(_("Missing configuration parameters: %s") % (
|
|
missing_keys,))
|
|
|
|
for key, validator in config_options.items():
|
|
value = config_data[key]
|
|
error = validator(key, value)
|
|
if error:
|
|
raise JsonableError(_("Invalid {} value {} ({})").format(
|
|
key, value, error))
|
|
|
|
elif bot_type == UserProfile.EMBEDDED_BOT:
|
|
try:
|
|
from zerver.lib.bot_lib import get_bot_handler
|
|
bot_handler = get_bot_handler(service_name)
|
|
if hasattr(bot_handler, 'validate_config'):
|
|
bot_handler.validate_config(config_data)
|
|
except ConfigValidationError:
|
|
# The exception provides a specific error message, but that
|
|
# message is not tagged translatable, because it is
|
|
# triggered in the external zulip_bots package.
|
|
# TODO: Think of some clever way to provide a more specific
|
|
# error message.
|
|
raise JsonableError(_("Invalid configuration data!"))
|
|
|
|
# Adds an outgoing webhook or embedded bot service.
|
|
def add_service(name: str, user_profile: UserProfile, base_url: Optional[str]=None,
|
|
interface: Optional[int]=None, token: Optional[str]=None) -> None:
|
|
Service.objects.create(name=name,
|
|
user_profile=user_profile,
|
|
base_url=base_url,
|
|
interface=interface,
|
|
token=token)
|
|
|
|
def check_bot_creation_policy(user_profile: UserProfile, bot_type: int) -> None:
|
|
# Realm administrators can always add bot
|
|
if user_profile.is_realm_admin:
|
|
return
|
|
|
|
if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_EVERYONE:
|
|
return
|
|
if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_ADMINS_ONLY:
|
|
raise OrganizationAdministratorRequired()
|
|
if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_LIMIT_GENERIC_BOTS and \
|
|
bot_type == UserProfile.DEFAULT_BOT:
|
|
raise OrganizationAdministratorRequired()
|
|
|
|
def check_valid_bot_type(user_profile: UserProfile, bot_type: int) -> None:
|
|
if bot_type not in user_profile.allowed_bot_types:
|
|
raise JsonableError(_('Invalid bot type'))
|
|
|
|
def check_valid_interface_type(interface_type: Optional[int]) -> None:
|
|
if interface_type not in Service.ALLOWED_INTERFACE_TYPES:
|
|
raise JsonableError(_('Invalid interface type'))
|
|
|
|
def bulk_get_users(emails: List[str], realm: Optional[Realm],
|
|
base_query: 'QuerySet[UserProfile]'=None) -> Dict[str, UserProfile]:
|
|
if base_query is None:
|
|
assert realm is not None
|
|
query = UserProfile.objects.filter(realm=realm, is_active=True)
|
|
realm_id = realm.id
|
|
else:
|
|
# WARNING: Currently, this code path only really supports one
|
|
# version of `base_query` being used (because otherwise,
|
|
# they'll share the cache, which can screw up the filtering).
|
|
# If you're using this flow, you'll need to re-do any filters
|
|
# in base_query in the code itself; base_query is just a perf
|
|
# optimization.
|
|
query = base_query
|
|
realm_id = 0
|
|
|
|
def fetch_users_by_email(emails: List[str]) -> List[UserProfile]:
|
|
# This should be just
|
|
#
|
|
# UserProfile.objects.select_related("realm").filter(email__iexact__in=emails,
|
|
# realm=realm)
|
|
#
|
|
# But chaining __in and __iexact doesn't work with Django's
|
|
# ORM, so we have the following hack to construct the relevant where clause
|
|
upper_list = ", ".join(["UPPER(%s)"] * len(emails))
|
|
where_clause = "UPPER(zerver_userprofile.email::text) IN (%s)" % (upper_list,)
|
|
return query.select_related("realm").extra(
|
|
where=[where_clause],
|
|
params=emails)
|
|
|
|
def user_to_email(user_profile: UserProfile) -> str:
|
|
return user_profile.email.lower()
|
|
|
|
return generic_bulk_cached_fetch(
|
|
# Use a separate cache key to protect us from conflicts with
|
|
# the get_user cache.
|
|
lambda email: 'bulk_get_users:' + user_profile_cache_key_id(email, realm_id),
|
|
fetch_users_by_email,
|
|
[email.lower() for email in emails],
|
|
id_fetcher=user_to_email,
|
|
)
|
|
|
|
def user_ids_to_users(user_ids: List[int], realm: Realm) -> List[UserProfile]:
|
|
# TODO: Consider adding a flag to control whether deactivated
|
|
# users should be included.
|
|
|
|
def fetch_users_by_id(user_ids: List[int]) -> List[UserProfile]:
|
|
return list(UserProfile.objects.filter(id__in=user_ids).select_related())
|
|
|
|
user_profiles_by_id = generic_bulk_cached_fetch(
|
|
cache_key_function=user_profile_by_id_cache_key,
|
|
query_function=fetch_users_by_id,
|
|
object_ids=user_ids
|
|
) # type: Dict[int, UserProfile]
|
|
|
|
found_user_ids = user_profiles_by_id.keys()
|
|
missed_user_ids = [user_id for user_id in user_ids if user_id not in found_user_ids]
|
|
if missed_user_ids:
|
|
raise JsonableError(_("Invalid user ID: %s") % (missed_user_ids[0],))
|
|
|
|
user_profiles = list(user_profiles_by_id.values())
|
|
for user_profile in user_profiles:
|
|
if user_profile.realm != realm:
|
|
raise JsonableError(_("Invalid user ID: %s") % (user_profile.id,))
|
|
return user_profiles
|
|
|
|
def access_bot_by_id(user_profile: UserProfile, user_id: int) -> UserProfile:
|
|
try:
|
|
target = get_user_profile_by_id_in_realm(user_id, user_profile.realm)
|
|
except UserProfile.DoesNotExist:
|
|
raise JsonableError(_("No such bot"))
|
|
if not target.is_bot:
|
|
raise JsonableError(_("No such bot"))
|
|
if not user_profile.can_admin_user(target):
|
|
raise JsonableError(_("Insufficient permission"))
|
|
return target
|
|
|
|
def access_user_by_id(user_profile: UserProfile, user_id: int,
|
|
allow_deactivated: bool=False, allow_bots: bool=False) -> UserProfile:
|
|
try:
|
|
target = get_user_profile_by_id_in_realm(user_id, user_profile.realm)
|
|
except UserProfile.DoesNotExist:
|
|
raise JsonableError(_("No such user"))
|
|
if target.is_bot and not allow_bots:
|
|
raise JsonableError(_("No such user"))
|
|
if not target.is_active and not allow_deactivated:
|
|
raise JsonableError(_("User is deactivated"))
|
|
if not user_profile.can_admin_user(target):
|
|
raise JsonableError(_("Insufficient permission"))
|
|
return target
|
|
|
|
def get_accounts_for_email(email: str) -> List[Dict[str, Optional[str]]]:
|
|
profiles = UserProfile.objects.select_related('realm').filter(delivery_email__iexact=email.strip(),
|
|
is_active=True,
|
|
realm__deactivated=False,
|
|
is_bot=False).order_by('date_joined')
|
|
return [{"realm_name": profile.realm.name,
|
|
"string_id": profile.realm.string_id,
|
|
"full_name": profile.full_name,
|
|
"avatar": avatar_url(profile)}
|
|
for profile in profiles]
|
|
|
|
def get_api_key(user_profile: UserProfile) -> str:
|
|
return user_profile.api_key
|
|
|
|
def get_all_api_keys(user_profile: UserProfile) -> List[str]:
|
|
# Users can only have one API key for now
|
|
return [user_profile.api_key]
|
|
|
|
def validate_user_custom_profile_field(realm_id: int, field: CustomProfileField,
|
|
value: Union[int, str, List[int]]) -> Optional[str]:
|
|
validators = CustomProfileField.FIELD_VALIDATORS
|
|
field_type = field.field_type
|
|
var_name = '{}'.format(field.name)
|
|
if field_type in validators:
|
|
validator = validators[field_type]
|
|
result = validator(var_name, value)
|
|
elif field_type == CustomProfileField.CHOICE:
|
|
choice_field_validator = CustomProfileField.CHOICE_FIELD_VALIDATORS[field_type]
|
|
field_data = field.field_data
|
|
# Put an assertion so that mypy doesn't complain.
|
|
assert field_data is not None
|
|
result = choice_field_validator(var_name, field_data, value)
|
|
elif field_type == CustomProfileField.USER:
|
|
user_field_validator = CustomProfileField.USER_FIELD_VALIDATORS[field_type]
|
|
result = user_field_validator(realm_id, cast(List[int], value), False)
|
|
else:
|
|
raise AssertionError("Invalid field type")
|
|
return result
|
|
|
|
def validate_user_custom_profile_data(realm_id: int,
|
|
profile_data: List[Dict[str, Union[int, str, List[int]]]]) -> None:
|
|
# This function validate all custom field values according to their field type.
|
|
for item in profile_data:
|
|
field_id = item['id']
|
|
try:
|
|
field = CustomProfileField.objects.get(id=field_id)
|
|
except CustomProfileField.DoesNotExist:
|
|
raise JsonableError(_('Field id {id} not found.').format(id=field_id))
|
|
|
|
result = validate_user_custom_profile_field(realm_id, field, item['value'])
|
|
if result is not None:
|
|
raise JsonableError(result)
|
|
|
|
def compute_show_invites_and_add_streams(user_profile: UserProfile) -> Tuple[bool, bool]:
|
|
show_invites = True
|
|
show_add_streams = True
|
|
|
|
# Some realms only allow admins to invite users
|
|
if user_profile.realm.invite_by_admins_only and not user_profile.is_realm_admin:
|
|
show_invites = False
|
|
if user_profile.is_guest:
|
|
show_invites = False
|
|
show_add_streams = False
|
|
|
|
return (show_invites, show_add_streams)
|