mirror of
				https://github.com/zulip/zulip.git
				synced 2025-10-31 20:13:46 +00:00 
			
		
		
		
	Fixes #2665. Regenerated by tabbott with `lint --fix` after a rebase and change in parameters. Note from tabbott: In a few cases, this converts technical debt in the form of unsorted imports into different technical debt in the form of our largest files having very long, ugly import sequences at the start. I expect this change will increase pressure for us to split those files, which isn't a bad thing. Signed-off-by: Anders Kaseorg <anders@zulip.com>
		
			
				
	
	
		
			455 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			455 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import re
 | |
| import unicodedata
 | |
| from collections import defaultdict
 | |
| from typing import Any, Dict, List, Optional, Tuple, Union, cast
 | |
| 
 | |
| from django.conf import settings
 | |
| from django.db.models.query import QuerySet
 | |
| from django.forms.models import model_to_dict
 | |
| from django.utils.translation import ugettext as _
 | |
| from zulip_bots.custom_exceptions import ConfigValidationError
 | |
| 
 | |
| from zerver.lib.avatar import avatar_url, get_avatar_field
 | |
| from zerver.lib.cache import (
 | |
|     generic_bulk_cached_fetch,
 | |
|     realm_user_dict_fields,
 | |
|     user_profile_by_id_cache_key,
 | |
|     user_profile_cache_key_id,
 | |
| )
 | |
| from zerver.lib.exceptions import OrganizationAdministratorRequired
 | |
| from zerver.lib.request import JsonableError
 | |
| from zerver.models import (
 | |
|     CustomProfileField,
 | |
|     CustomProfileFieldValue,
 | |
|     Realm,
 | |
|     Service,
 | |
|     UserProfile,
 | |
|     get_realm_user_dicts,
 | |
|     get_user_profile_by_id_in_realm,
 | |
| )
 | |
| 
 | |
| 
 | |
| def check_full_name(full_name_raw: str) -> str:
 | |
|     full_name = full_name_raw.strip()
 | |
|     if len(full_name) > UserProfile.MAX_NAME_LENGTH:
 | |
|         raise JsonableError(_("Name too long!"))
 | |
|     if len(full_name) < UserProfile.MIN_NAME_LENGTH:
 | |
|         raise JsonableError(_("Name too short!"))
 | |
|     for character in full_name:
 | |
|         if (unicodedata.category(character)[0] == 'C' or
 | |
|                 character in UserProfile.NAME_INVALID_CHARS):
 | |
|             raise JsonableError(_("Invalid characters in name!"))
 | |
|     # Names ending with e.g. `|15` could be ambiguous for
 | |
|     # sloppily-written parsers of our markdown syntax for mentioning
 | |
|     # users with ambigious names, and likely have no real use, so we
 | |
|     # ban them.
 | |
|     if re.search(r"\|\d+$", full_name_raw):
 | |
|         raise JsonableError(_("Invalid format!"))
 | |
|     return full_name
 | |
| 
 | |
| # NOTE: We don't try to absolutely prevent 2 bots from having the same
 | |
| # name (e.g. you can get there by reactivating a deactivated bot after
 | |
| # making a new bot with the same name).  This is just a check designed
 | |
| # to make it unlikely to happen by accident.
 | |
| def check_bot_name_available(realm_id: int, full_name: str) -> None:
 | |
|     dup_exists = UserProfile.objects.filter(
 | |
|         realm_id=realm_id,
 | |
|         full_name=full_name.strip(),
 | |
|         is_active=True,
 | |
|     ).exists()
 | |
| 
 | |
|     if dup_exists:
 | |
|         raise JsonableError(_("Name is already in use!"))
 | |
| 
 | |
| def check_short_name(short_name_raw: str) -> str:
 | |
|     short_name = short_name_raw.strip()
 | |
|     if len(short_name) == 0:
 | |
|         raise JsonableError(_("Bad name or username"))
 | |
|     return short_name
 | |
| 
 | |
| def check_valid_bot_config(bot_type: int, service_name: str,
 | |
|                            config_data: Dict[str, str]) -> None:
 | |
|     if bot_type == UserProfile.INCOMING_WEBHOOK_BOT:
 | |
|         from zerver.lib.integrations import WEBHOOK_INTEGRATIONS
 | |
|         config_options = None
 | |
|         for integration in WEBHOOK_INTEGRATIONS:
 | |
|             if integration.name == service_name:
 | |
|                 # key: validator
 | |
|                 config_options = {c[1]: c[2] for c in integration.config_options}
 | |
|                 break
 | |
|         if not config_options:
 | |
|             raise JsonableError(_("Invalid integration '%s'.") % (service_name,))
 | |
| 
 | |
|         missing_keys = set(config_options.keys()) - set(config_data.keys())
 | |
|         if missing_keys:
 | |
|             raise JsonableError(_("Missing configuration parameters: %s") % (
 | |
|                 missing_keys,))
 | |
| 
 | |
|         for key, validator in config_options.items():
 | |
|             value = config_data[key]
 | |
|             error = validator(key, value)
 | |
|             if error:
 | |
|                 raise JsonableError(_("Invalid {} value {} ({})").format(
 | |
|                                     key, value, error))
 | |
| 
 | |
|     elif bot_type == UserProfile.EMBEDDED_BOT:
 | |
|         try:
 | |
|             from zerver.lib.bot_lib import get_bot_handler
 | |
|             bot_handler = get_bot_handler(service_name)
 | |
|             if hasattr(bot_handler, 'validate_config'):
 | |
|                 bot_handler.validate_config(config_data)
 | |
|         except ConfigValidationError:
 | |
|             # The exception provides a specific error message, but that
 | |
|             # message is not tagged translatable, because it is
 | |
|             # triggered in the external zulip_bots package.
 | |
|             # TODO: Think of some clever way to provide a more specific
 | |
|             # error message.
 | |
|             raise JsonableError(_("Invalid configuration data!"))
 | |
| 
 | |
| # Adds an outgoing webhook or embedded bot service.
 | |
| def add_service(name: str, user_profile: UserProfile, base_url: Optional[str]=None,
 | |
|                 interface: Optional[int]=None, token: Optional[str]=None) -> None:
 | |
|     Service.objects.create(name=name,
 | |
|                            user_profile=user_profile,
 | |
|                            base_url=base_url,
 | |
|                            interface=interface,
 | |
|                            token=token)
 | |
| 
 | |
| def check_bot_creation_policy(user_profile: UserProfile, bot_type: int) -> None:
 | |
|     # Realm administrators can always add bot
 | |
|     if user_profile.is_realm_admin:
 | |
|         return
 | |
| 
 | |
|     if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_EVERYONE:
 | |
|         return
 | |
|     if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_ADMINS_ONLY:
 | |
|         raise OrganizationAdministratorRequired()
 | |
|     if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_LIMIT_GENERIC_BOTS and \
 | |
|             bot_type == UserProfile.DEFAULT_BOT:
 | |
|         raise OrganizationAdministratorRequired()
 | |
| 
 | |
| def check_valid_bot_type(user_profile: UserProfile, bot_type: int) -> None:
 | |
|     if bot_type not in user_profile.allowed_bot_types:
 | |
|         raise JsonableError(_('Invalid bot type'))
 | |
| 
 | |
| def check_valid_interface_type(interface_type: Optional[int]) -> None:
 | |
|     if interface_type not in Service.ALLOWED_INTERFACE_TYPES:
 | |
|         raise JsonableError(_('Invalid interface type'))
 | |
| 
 | |
| def is_administrator_role(role: int) -> bool:
 | |
|     return role in {UserProfile.ROLE_REALM_ADMINISTRATOR, UserProfile.ROLE_REALM_OWNER}
 | |
| 
 | |
| def bulk_get_users(emails: List[str], realm: Optional[Realm],
 | |
|                    base_query: 'QuerySet[UserProfile]'=None) -> Dict[str, UserProfile]:
 | |
|     if base_query is None:
 | |
|         assert realm is not None
 | |
|         query = UserProfile.objects.filter(realm=realm, is_active=True)
 | |
|         realm_id = realm.id
 | |
|     else:
 | |
|         # WARNING: Currently, this code path only really supports one
 | |
|         # version of `base_query` being used (because otherwise,
 | |
|         # they'll share the cache, which can screw up the filtering).
 | |
|         # If you're using this flow, you'll need to re-do any filters
 | |
|         # in base_query in the code itself; base_query is just a perf
 | |
|         # optimization.
 | |
|         query = base_query
 | |
|         realm_id = 0
 | |
| 
 | |
|     def fetch_users_by_email(emails: List[str]) -> List[UserProfile]:
 | |
|         # This should be just
 | |
|         #
 | |
|         # UserProfile.objects.select_related("realm").filter(email__iexact__in=emails,
 | |
|         #                                                    realm=realm)
 | |
|         #
 | |
|         # But chaining __in and __iexact doesn't work with Django's
 | |
|         # ORM, so we have the following hack to construct the relevant where clause
 | |
|         where_clause = "upper(zerver_userprofile.email::text) IN (SELECT upper(email) FROM unnest(%s) AS email)"
 | |
|         return query.select_related("realm").extra(
 | |
|             where=[where_clause],
 | |
|             params=(emails,))
 | |
| 
 | |
|     def user_to_email(user_profile: UserProfile) -> str:
 | |
|         return user_profile.email.lower()
 | |
| 
 | |
|     return generic_bulk_cached_fetch(
 | |
|         # Use a separate cache key to protect us from conflicts with
 | |
|         # the get_user cache.
 | |
|         lambda email: 'bulk_get_users:' + user_profile_cache_key_id(email, realm_id),
 | |
|         fetch_users_by_email,
 | |
|         [email.lower() for email in emails],
 | |
|         id_fetcher=user_to_email,
 | |
|     )
 | |
| 
 | |
| def user_ids_to_users(user_ids: List[int], realm: Realm) -> List[UserProfile]:
 | |
|     # TODO: Consider adding a flag to control whether deactivated
 | |
|     # users should be included.
 | |
| 
 | |
|     def fetch_users_by_id(user_ids: List[int]) -> List[UserProfile]:
 | |
|         return list(UserProfile.objects.filter(id__in=user_ids).select_related())
 | |
| 
 | |
|     user_profiles_by_id: Dict[int, UserProfile] = generic_bulk_cached_fetch(
 | |
|         cache_key_function=user_profile_by_id_cache_key,
 | |
|         query_function=fetch_users_by_id,
 | |
|         object_ids=user_ids,
 | |
|     )
 | |
| 
 | |
|     found_user_ids = user_profiles_by_id.keys()
 | |
|     missed_user_ids = [user_id for user_id in user_ids if user_id not in found_user_ids]
 | |
|     if missed_user_ids:
 | |
|         raise JsonableError(_("Invalid user ID: %s") % (missed_user_ids[0],))
 | |
| 
 | |
|     user_profiles = list(user_profiles_by_id.values())
 | |
|     for user_profile in user_profiles:
 | |
|         if user_profile.realm != realm:
 | |
|             raise JsonableError(_("Invalid user ID: %s") % (user_profile.id,))
 | |
|     return user_profiles
 | |
| 
 | |
| def access_bot_by_id(user_profile: UserProfile, user_id: int) -> UserProfile:
 | |
|     try:
 | |
|         target = get_user_profile_by_id_in_realm(user_id, user_profile.realm)
 | |
|     except UserProfile.DoesNotExist:
 | |
|         raise JsonableError(_("No such bot"))
 | |
|     if not target.is_bot:
 | |
|         raise JsonableError(_("No such bot"))
 | |
|     if not user_profile.can_admin_user(target):
 | |
|         raise JsonableError(_("Insufficient permission"))
 | |
|     return target
 | |
| 
 | |
| def access_user_by_id(user_profile: UserProfile, user_id: int,
 | |
|                       allow_deactivated: bool=False, allow_bots: bool=False,
 | |
|                       read_only: bool=False) -> UserProfile:
 | |
|     try:
 | |
|         target = get_user_profile_by_id_in_realm(user_id, user_profile.realm)
 | |
|     except UserProfile.DoesNotExist:
 | |
|         raise JsonableError(_("No such user"))
 | |
|     if target.is_bot and not allow_bots:
 | |
|         raise JsonableError(_("No such user"))
 | |
|     if not target.is_active and not allow_deactivated:
 | |
|         raise JsonableError(_("User is deactivated"))
 | |
|     if read_only:
 | |
|         # Administrative access is not required just to read a user.
 | |
|         return target
 | |
|     if not user_profile.can_admin_user(target):
 | |
|         raise JsonableError(_("Insufficient permission"))
 | |
|     return target
 | |
| 
 | |
| def get_accounts_for_email(email: str) -> List[Dict[str, Optional[str]]]:
 | |
|     profiles = UserProfile.objects.select_related('realm').filter(delivery_email__iexact=email.strip(),
 | |
|                                                                   is_active=True,
 | |
|                                                                   realm__deactivated=False,
 | |
|                                                                   is_bot=False).order_by('date_joined')
 | |
|     return [{"realm_name": profile.realm.name,
 | |
|              "string_id": profile.realm.string_id,
 | |
|              "full_name": profile.full_name,
 | |
|              "avatar": avatar_url(profile)}
 | |
|             for profile in profiles]
 | |
| 
 | |
| def get_api_key(user_profile: UserProfile) -> str:
 | |
|     return user_profile.api_key
 | |
| 
 | |
| def get_all_api_keys(user_profile: UserProfile) -> List[str]:
 | |
|     # Users can only have one API key for now
 | |
|     return [user_profile.api_key]
 | |
| 
 | |
| def validate_user_custom_profile_field(realm_id: int, field: CustomProfileField,
 | |
|                                        value: Union[int, str, List[int]]) -> Optional[str]:
 | |
|     validators = CustomProfileField.FIELD_VALIDATORS
 | |
|     field_type = field.field_type
 | |
|     var_name = f'{field.name}'
 | |
|     if field_type in validators:
 | |
|         validator = validators[field_type]
 | |
|         result = validator(var_name, value)
 | |
|     elif field_type == CustomProfileField.CHOICE:
 | |
|         choice_field_validator = CustomProfileField.CHOICE_FIELD_VALIDATORS[field_type]
 | |
|         field_data = field.field_data
 | |
|         # Put an assertion so that mypy doesn't complain.
 | |
|         assert field_data is not None
 | |
|         result = choice_field_validator(var_name, field_data, value)
 | |
|     elif field_type == CustomProfileField.USER:
 | |
|         user_field_validator = CustomProfileField.USER_FIELD_VALIDATORS[field_type]
 | |
|         result = user_field_validator(realm_id, cast(List[int], value), False)
 | |
|     else:
 | |
|         raise AssertionError("Invalid field type")
 | |
|     return result
 | |
| 
 | |
| def validate_user_custom_profile_data(realm_id: int,
 | |
|                                       profile_data: List[Dict[str, Union[int, str, List[int]]]]) -> None:
 | |
|     # This function validate all custom field values according to their field type.
 | |
|     for item in profile_data:
 | |
|         field_id = item['id']
 | |
|         try:
 | |
|             field = CustomProfileField.objects.get(id=field_id)
 | |
|         except CustomProfileField.DoesNotExist:
 | |
|             raise JsonableError(_('Field id {id} not found.').format(id=field_id))
 | |
| 
 | |
|         result = validate_user_custom_profile_field(realm_id, field, item['value'])
 | |
|         if result is not None:
 | |
|             raise JsonableError(result)
 | |
| 
 | |
| def compute_show_invites_and_add_streams(user_profile: Optional[UserProfile]) -> Tuple[bool, bool]:
 | |
|     if user_profile is None:
 | |
|         return False, False
 | |
| 
 | |
|     if user_profile.is_guest:
 | |
|         return False, False
 | |
| 
 | |
|     if user_profile.is_realm_admin:
 | |
|         return True, True
 | |
| 
 | |
|     if user_profile.realm.invite_by_admins_only:
 | |
|         return False, True
 | |
| 
 | |
|     return True, True
 | |
| 
 | |
| def format_user_row(realm: Realm, acting_user: UserProfile, row: Dict[str, Any],
 | |
|                     client_gravatar: bool,
 | |
|                     custom_profile_field_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
 | |
|     """Formats a user row returned by a database fetch using
 | |
|     .values(*realm_user_dict_fields) into a dictionary representation
 | |
|     of that user for API delivery to clients.  The acting_user
 | |
|     argument is used for permissions checks.
 | |
|     """
 | |
| 
 | |
|     avatar_url = get_avatar_field(user_id=row['id'],
 | |
|                                   realm_id=realm.id,
 | |
|                                   email=row['delivery_email'],
 | |
|                                   avatar_source=row['avatar_source'],
 | |
|                                   avatar_version=row['avatar_version'],
 | |
|                                   medium=False,
 | |
|                                   client_gravatar=client_gravatar)
 | |
| 
 | |
|     is_admin = is_administrator_role(row['role'])
 | |
|     is_owner = row['role'] == UserProfile.ROLE_REALM_OWNER
 | |
|     is_guest = row['role'] == UserProfile.ROLE_GUEST
 | |
|     is_bot = row['is_bot']
 | |
|     # This format should align with get_cross_realm_dicts() and notify_created_user
 | |
|     result = dict(
 | |
|         email=row['email'],
 | |
|         user_id=row['id'],
 | |
|         avatar_url=avatar_url,
 | |
|         avatar_version=row['avatar_version'],
 | |
|         is_admin=is_admin,
 | |
|         is_owner=is_owner,
 | |
|         is_guest=is_guest,
 | |
|         is_bot=is_bot,
 | |
|         full_name=row['full_name'],
 | |
|         timezone=row['timezone'],
 | |
|         is_active = row['is_active'],
 | |
|         date_joined = row['date_joined'].isoformat(),
 | |
|     )
 | |
|     if (realm.email_address_visibility == Realm.EMAIL_ADDRESS_VISIBILITY_ADMINS and
 | |
|             acting_user.is_realm_admin):
 | |
|         result['delivery_email'] = row['delivery_email']
 | |
| 
 | |
|     if is_bot:
 | |
|         result["bot_type"] = row["bot_type"]
 | |
|         if row['email'] in settings.CROSS_REALM_BOT_EMAILS:
 | |
|             result['is_cross_realm_bot'] = True
 | |
| 
 | |
|         # Note that bot_owner_id can be None with legacy data.
 | |
|         result['bot_owner_id'] = row['bot_owner_id']
 | |
|     elif custom_profile_field_data is not None:
 | |
|         result['profile_data'] = custom_profile_field_data
 | |
|     return result
 | |
| 
 | |
| def user_profile_to_user_row(user_profile: UserProfile) -> Dict[str, Any]:
 | |
|     # What we're trying to do is simulate the user_profile having been
 | |
|     # fetched from a QuerySet using `.values(*realm_user_dict_fields)`
 | |
|     # even though we fetched UserProfile objects.  This is messier
 | |
|     # than it seems.
 | |
|     #
 | |
|     # What we'd like to do is just call model_to_dict(user,
 | |
|     # fields=realm_user_dict_fields).  The problem with this is
 | |
|     # that model_to_dict has a different convention than
 | |
|     # `.values()` in its handling of foreign keys, naming them as
 | |
|     # e.g. `bot_owner`, not `bot_owner_id`; we work around that
 | |
|     # here.
 | |
|     #
 | |
|     # This could be potentially simplified in the future by
 | |
|     # changing realm_user_dict_fields to name the bot owner with
 | |
|     # the less readable `bot_owner` (instead of `bot_owner_id`).
 | |
|     user_row = model_to_dict(user_profile,
 | |
|                              fields=realm_user_dict_fields + ['bot_owner'])
 | |
|     user_row['bot_owner_id'] = user_row['bot_owner']
 | |
|     del user_row['bot_owner']
 | |
|     return user_row
 | |
| 
 | |
| def get_cross_realm_dicts() -> List[Dict[str, Any]]:
 | |
|     users = bulk_get_users(list(settings.CROSS_REALM_BOT_EMAILS), None,
 | |
|                            base_query=UserProfile.objects.filter(
 | |
|                            realm__string_id=settings.SYSTEM_BOT_REALM)).values()
 | |
|     result = []
 | |
|     for user in users:
 | |
|         # Important: We filter here, is addition to in
 | |
|         # `base_query`, because of how bulk_get_users shares its
 | |
|         # cache with other UserProfile caches.
 | |
|         if user.realm.string_id != settings.SYSTEM_BOT_REALM:  # nocoverage
 | |
|             continue
 | |
|         user_row = user_profile_to_user_row(user)
 | |
|         # Because we want to avoid clients becing exposed to the
 | |
|         # implementation detail that these bots are self-owned, we
 | |
|         # just set bot_owner_id=None.
 | |
|         user_row['bot_owner_id'] = None
 | |
| 
 | |
|         result.append(format_user_row(user.realm,
 | |
|                                       acting_user=user,
 | |
|                                       row=user_row,
 | |
|                                       client_gravatar=False,
 | |
|                                       custom_profile_field_data=None))
 | |
| 
 | |
|     return result
 | |
| 
 | |
| def get_custom_profile_field_values(custom_profile_field_values:
 | |
|                                     List[CustomProfileFieldValue]) -> Dict[int, Dict[str, Any]]:
 | |
|     profiles_by_user_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
 | |
|     for profile_field in custom_profile_field_values:
 | |
|         user_id = profile_field.user_profile_id
 | |
|         if profile_field.field.is_renderable():
 | |
|             profiles_by_user_id[user_id][profile_field.field_id] = {
 | |
|                 "value": profile_field.value,
 | |
|                 "rendered_value": profile_field.rendered_value,
 | |
|             }
 | |
|         else:
 | |
|             profiles_by_user_id[user_id][profile_field.field_id] = {
 | |
|                 "value": profile_field.value,
 | |
|             }
 | |
|     return profiles_by_user_id
 | |
| 
 | |
| def get_raw_user_data(realm: Realm, acting_user: UserProfile, client_gravatar: bool,
 | |
|                       target_user: Optional[UserProfile]=None,
 | |
|                       include_custom_profile_fields: bool=True) -> Dict[int, Dict[str, str]]:
 | |
|     """Fetches data about the target user(s) appropriate for sending to
 | |
|     acting_user via the standard format for the Zulip API.  If
 | |
|     target_user is None, we fetch all users in the realm.
 | |
|     """
 | |
|     profiles_by_user_id = None
 | |
|     custom_profile_field_data = None
 | |
|     # target_user is an optional parameter which is passed when user data of a specific user
 | |
|     # is required. It is 'None' otherwise.
 | |
|     if target_user is not None:
 | |
|         user_dicts = [user_profile_to_user_row(target_user)]
 | |
|     else:
 | |
|         user_dicts = get_realm_user_dicts(realm.id)
 | |
| 
 | |
|     if include_custom_profile_fields:
 | |
|         base_query = CustomProfileFieldValue.objects.select_related("field")
 | |
|         # TODO: Consider optimizing this query away with caching.
 | |
|         if target_user is not None:
 | |
|             custom_profile_field_values = base_query.filter(user_profile=target_user)
 | |
|         else:
 | |
|             custom_profile_field_values = base_query.filter(field__realm_id=realm.id)
 | |
|         profiles_by_user_id = get_custom_profile_field_values(custom_profile_field_values)
 | |
| 
 | |
|     result = {}
 | |
|     for row in user_dicts:
 | |
|         if profiles_by_user_id is not None:
 | |
|             custom_profile_field_data = profiles_by_user_id.get(row['id'], {})
 | |
| 
 | |
|         result[row['id']] = format_user_row(realm,
 | |
|                                             acting_user = acting_user,
 | |
|                                             row=row,
 | |
|                                             client_gravatar= client_gravatar,
 | |
|                                             custom_profile_field_data = custom_profile_field_data,
 | |
|                                             )
 | |
|     return result
 |