mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 14:03:30 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			331 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			331 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from typing import Annotated
 | 
						|
 | 
						|
import orjson
 | 
						|
from django.core.exceptions import ValidationError
 | 
						|
from django.db import IntegrityError, transaction
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
from pydantic import Json, StringConstraints
 | 
						|
 | 
						|
from zerver.actions.custom_profile_fields import (
 | 
						|
    check_remove_custom_profile_field_value,
 | 
						|
    do_remove_realm_custom_profile_field,
 | 
						|
    do_update_user_custom_profile_data_if_changed,
 | 
						|
    try_add_realm_custom_profile_field,
 | 
						|
    try_add_realm_default_custom_profile_field,
 | 
						|
    try_reorder_realm_custom_profile_fields,
 | 
						|
    try_update_realm_custom_profile_field,
 | 
						|
)
 | 
						|
from zerver.decorator import human_users_only, require_realm_admin
 | 
						|
from zerver.lib.exceptions import JsonableError
 | 
						|
from zerver.lib.external_accounts import validate_external_account_field_data
 | 
						|
from zerver.lib.response import json_success
 | 
						|
from zerver.lib.typed_endpoint import PathOnly, typed_endpoint
 | 
						|
from zerver.lib.types import ProfileDataElementUpdateDict, ProfileFieldData
 | 
						|
from zerver.lib.users import validate_user_custom_profile_data
 | 
						|
from zerver.lib.validator import check_capped_string, validate_select_field_data
 | 
						|
from zerver.models import CustomProfileField, Realm, UserProfile
 | 
						|
from zerver.models.custom_profile_fields import custom_profile_fields_for_realm
 | 
						|
 | 
						|
 | 
						|
def list_realm_custom_profile_fields(
 | 
						|
    request: HttpRequest, user_profile: UserProfile
 | 
						|
) -> HttpResponse:
 | 
						|
    fields = custom_profile_fields_for_realm(user_profile.realm_id)
 | 
						|
    return json_success(request, data={"custom_fields": [f.as_dict() for f in fields]})
 | 
						|
 | 
						|
 | 
						|
hint_validator = check_capped_string(CustomProfileField.HINT_MAX_LENGTH)
 | 
						|
name_validator = check_capped_string(CustomProfileField.NAME_MAX_LENGTH)
 | 
						|
 | 
						|
 | 
						|
def validate_field_name_and_hint(name: str, hint: str) -> None:
 | 
						|
    if not name.strip():
 | 
						|
        raise JsonableError(_("Label cannot be blank."))
 | 
						|
 | 
						|
    try:
 | 
						|
        hint_validator("hint", hint)
 | 
						|
        name_validator("name", name)
 | 
						|
    except ValidationError as error:
 | 
						|
        raise JsonableError(error.message)
 | 
						|
 | 
						|
 | 
						|
def validate_custom_field_data(field_type: int, field_data: ProfileFieldData) -> None:
 | 
						|
    try:
 | 
						|
        if field_type == CustomProfileField.SELECT:
 | 
						|
            # Choice type field must have at least have one choice
 | 
						|
            if len(field_data) < 1:
 | 
						|
                raise JsonableError(_("Field must have at least one choice."))
 | 
						|
            validate_select_field_data(field_data)
 | 
						|
        elif field_type == CustomProfileField.EXTERNAL_ACCOUNT:
 | 
						|
            validate_external_account_field_data(field_data)
 | 
						|
    except ValidationError as error:
 | 
						|
        raise JsonableError(error.message)
 | 
						|
 | 
						|
 | 
						|
def validate_display_in_profile_summary_field(
 | 
						|
    field_type: int, display_in_profile_summary: bool
 | 
						|
) -> None:
 | 
						|
    if not display_in_profile_summary:
 | 
						|
        return
 | 
						|
 | 
						|
    # The LONG_TEXT field type doesn't make sense visually for profile
 | 
						|
    # field summaries. The USER field type will require some further
 | 
						|
    # client support.
 | 
						|
    if field_type in (CustomProfileField.LONG_TEXT, CustomProfileField.USER):
 | 
						|
        raise JsonableError(_("Field type not supported for display in profile summary."))
 | 
						|
 | 
						|
 | 
						|
def is_default_external_field(field_type: int, field_data: ProfileFieldData) -> bool:
 | 
						|
    if field_type != CustomProfileField.EXTERNAL_ACCOUNT:
 | 
						|
        return False
 | 
						|
    if field_data["subtype"] == "custom":
 | 
						|
        return False
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def validate_custom_profile_field(
 | 
						|
    name: str,
 | 
						|
    hint: str,
 | 
						|
    field_type: int,
 | 
						|
    field_data: ProfileFieldData,
 | 
						|
    display_in_profile_summary: bool,
 | 
						|
) -> None:
 | 
						|
    # Validate field data
 | 
						|
    validate_custom_field_data(field_type, field_data)
 | 
						|
 | 
						|
    if not is_default_external_field(field_type, field_data):
 | 
						|
        # If field is default external field then we will fetch all data
 | 
						|
        # from our default field dictionary, so no need to validate name or hint
 | 
						|
        # Validate field name, hint if not default external account field
 | 
						|
        validate_field_name_and_hint(name, hint)
 | 
						|
 | 
						|
    field_types = [i[0] for i in CustomProfileField.FIELD_TYPE_CHOICES]
 | 
						|
    if field_type not in field_types:
 | 
						|
        raise JsonableError(_("Invalid field type."))
 | 
						|
 | 
						|
    validate_display_in_profile_summary_field(field_type, display_in_profile_summary)
 | 
						|
 | 
						|
 | 
						|
def validate_custom_profile_field_update(
 | 
						|
    field: CustomProfileField,
 | 
						|
    display_in_profile_summary: bool | None = None,
 | 
						|
    field_data: ProfileFieldData | None = None,
 | 
						|
    name: str | None = None,
 | 
						|
    hint: str | None = None,
 | 
						|
) -> None:
 | 
						|
    if name is None:
 | 
						|
        name = field.name
 | 
						|
    if hint is None:
 | 
						|
        hint = field.hint
 | 
						|
    if field_data is None:
 | 
						|
        if field.field_data == "":
 | 
						|
            # We're passing this just for validation, sinec the function won't
 | 
						|
            # accept a string. This won't change the actual value.
 | 
						|
            field_data = {}
 | 
						|
        else:
 | 
						|
            field_data = orjson.loads(field.field_data)
 | 
						|
    if display_in_profile_summary is None:
 | 
						|
        display_in_profile_summary = field.display_in_profile_summary
 | 
						|
 | 
						|
    assert field_data is not None
 | 
						|
    validate_custom_profile_field(
 | 
						|
        name,
 | 
						|
        hint,
 | 
						|
        field.field_type,
 | 
						|
        field_data,
 | 
						|
        display_in_profile_summary,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def update_only_display_in_profile_summary(
 | 
						|
    existing_field: CustomProfileField,
 | 
						|
    requested_field_data: ProfileFieldData | None = None,
 | 
						|
    requested_name: str | None = None,
 | 
						|
    requested_hint: str | None = None,
 | 
						|
) -> bool:
 | 
						|
    if (
 | 
						|
        (requested_name is not None and requested_name != existing_field.name)
 | 
						|
        or (requested_hint is not None and requested_hint != existing_field.hint)
 | 
						|
        or (
 | 
						|
            requested_field_data is not None
 | 
						|
            and requested_field_data != orjson.loads(existing_field.field_data)
 | 
						|
        )
 | 
						|
    ):
 | 
						|
        return False
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def display_in_profile_summary_limit_reached(
 | 
						|
    realm: Realm, profile_field_id: int | None = None
 | 
						|
) -> bool:
 | 
						|
    query = CustomProfileField.objects.filter(realm=realm, display_in_profile_summary=True)
 | 
						|
    if profile_field_id is not None:
 | 
						|
        query = query.exclude(id=profile_field_id)
 | 
						|
    return query.count() >= CustomProfileField.MAX_DISPLAY_IN_PROFILE_SUMMARY_FIELDS
 | 
						|
 | 
						|
 | 
						|
@require_realm_admin
 | 
						|
@typed_endpoint
 | 
						|
def create_realm_custom_profile_field(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    display_in_profile_summary: Json[bool] = False,
 | 
						|
    editable_by_user: Json[bool] = True,
 | 
						|
    field_data: Json[ProfileFieldData] | None = None,
 | 
						|
    field_type: Json[int],
 | 
						|
    hint: str = "",
 | 
						|
    name: Annotated[str, StringConstraints(strip_whitespace=True)] = "",
 | 
						|
    required: Json[bool] = False,
 | 
						|
) -> HttpResponse:
 | 
						|
    if field_data is None:
 | 
						|
        field_data = {}
 | 
						|
    if display_in_profile_summary and display_in_profile_summary_limit_reached(user_profile.realm):
 | 
						|
        raise JsonableError(
 | 
						|
            _("Only 2 custom profile fields can be displayed in the profile summary.")
 | 
						|
        )
 | 
						|
 | 
						|
    validate_custom_profile_field(name, hint, field_type, field_data, display_in_profile_summary)
 | 
						|
    try:
 | 
						|
        if is_default_external_field(field_type, field_data):
 | 
						|
            field_subtype = field_data["subtype"]
 | 
						|
            assert isinstance(field_subtype, str)
 | 
						|
            field = try_add_realm_default_custom_profile_field(
 | 
						|
                realm=user_profile.realm,
 | 
						|
                field_subtype=field_subtype,
 | 
						|
                display_in_profile_summary=display_in_profile_summary,
 | 
						|
                required=required,
 | 
						|
                editable_by_user=editable_by_user,
 | 
						|
            )
 | 
						|
            return json_success(request, data={"id": field.id})
 | 
						|
        else:
 | 
						|
            field = try_add_realm_custom_profile_field(
 | 
						|
                realm=user_profile.realm,
 | 
						|
                name=name,
 | 
						|
                field_data=field_data,
 | 
						|
                field_type=field_type,
 | 
						|
                hint=hint,
 | 
						|
                display_in_profile_summary=display_in_profile_summary,
 | 
						|
                required=required,
 | 
						|
                editable_by_user=editable_by_user,
 | 
						|
            )
 | 
						|
            return json_success(request, data={"id": field.id})
 | 
						|
    except IntegrityError:
 | 
						|
        raise JsonableError(_("A field with that label already exists."))
 | 
						|
 | 
						|
 | 
						|
@require_realm_admin
 | 
						|
def delete_realm_custom_profile_field(
 | 
						|
    request: HttpRequest, user_profile: UserProfile, field_id: int
 | 
						|
) -> HttpResponse:
 | 
						|
    try:
 | 
						|
        field = CustomProfileField.objects.get(realm_id=user_profile.realm_id, id=field_id)
 | 
						|
    except CustomProfileField.DoesNotExist:
 | 
						|
        raise JsonableError(_("Field id {id} not found.").format(id=field_id))
 | 
						|
 | 
						|
    do_remove_realm_custom_profile_field(realm=user_profile.realm, field=field)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@require_realm_admin
 | 
						|
@typed_endpoint
 | 
						|
def update_realm_custom_profile_field(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    display_in_profile_summary: Json[bool] | None = None,
 | 
						|
    editable_by_user: Json[bool] | None = None,
 | 
						|
    field_data: Json[ProfileFieldData] | None = None,
 | 
						|
    field_id: PathOnly[int],
 | 
						|
    hint: str | None = None,
 | 
						|
    name: Annotated[str, StringConstraints(strip_whitespace=True)] | None = None,
 | 
						|
    required: Json[bool] | None = None,
 | 
						|
) -> HttpResponse:
 | 
						|
    realm = user_profile.realm
 | 
						|
    try:
 | 
						|
        field = CustomProfileField.objects.get(realm=realm, id=field_id)
 | 
						|
    except CustomProfileField.DoesNotExist:
 | 
						|
        raise JsonableError(_("Field id {id} not found.").format(id=field_id))
 | 
						|
 | 
						|
    if display_in_profile_summary and display_in_profile_summary_limit_reached(
 | 
						|
        user_profile.realm, field.id
 | 
						|
    ):
 | 
						|
        raise JsonableError(
 | 
						|
            _("Only 2 custom profile fields can be displayed in the profile summary.")
 | 
						|
        )
 | 
						|
 | 
						|
    if (
 | 
						|
        field.field_type == CustomProfileField.EXTERNAL_ACCOUNT
 | 
						|
        # HACK: Allow changing the display_in_profile_summary property
 | 
						|
        # of default external account types, but not any others.
 | 
						|
        #
 | 
						|
        # TODO: Make the name/hint/field_data parameters optional, and
 | 
						|
        # explicitly require that the client passes None for all of them for this case.
 | 
						|
        # Right now, for name/hint/field_data we allow the client to send the existing
 | 
						|
        # values for the respective fields. After this TODO is done, we will only allow
 | 
						|
        # the client to pass None values if the field is unchanged.
 | 
						|
        and is_default_external_field(field.field_type, orjson.loads(field.field_data))
 | 
						|
        and not update_only_display_in_profile_summary(field, field_data, name, hint)
 | 
						|
    ):
 | 
						|
        raise JsonableError(_("Default custom field cannot be updated."))
 | 
						|
 | 
						|
    validate_custom_profile_field_update(field, display_in_profile_summary, field_data, name, hint)
 | 
						|
    try:
 | 
						|
        try_update_realm_custom_profile_field(
 | 
						|
            realm=realm,
 | 
						|
            field=field,
 | 
						|
            name=name,
 | 
						|
            hint=hint,
 | 
						|
            field_data=field_data,
 | 
						|
            display_in_profile_summary=display_in_profile_summary,
 | 
						|
            required=required,
 | 
						|
            editable_by_user=editable_by_user,
 | 
						|
        )
 | 
						|
    except IntegrityError:
 | 
						|
        raise JsonableError(_("A field with that label already exists."))
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@require_realm_admin
 | 
						|
@typed_endpoint
 | 
						|
def reorder_realm_custom_profile_fields(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    order: Json[list[int]],
 | 
						|
) -> HttpResponse:
 | 
						|
    try_reorder_realm_custom_profile_fields(user_profile.realm, order)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@human_users_only
 | 
						|
@typed_endpoint
 | 
						|
def remove_user_custom_profile_data(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    data: Json[list[int]],
 | 
						|
) -> HttpResponse:
 | 
						|
    with transaction.atomic(durable=True):
 | 
						|
        for field_id in data:
 | 
						|
            check_remove_custom_profile_field_value(
 | 
						|
                user_profile, field_id, acting_user=user_profile
 | 
						|
            )
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@human_users_only
 | 
						|
@typed_endpoint
 | 
						|
def update_user_custom_profile_data(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    data: Json[list[ProfileDataElementUpdateDict]],
 | 
						|
) -> HttpResponse:
 | 
						|
    validate_user_custom_profile_data(user_profile.realm.id, data, acting_user=user_profile)
 | 
						|
    with transaction.atomic(durable=True):
 | 
						|
        do_update_user_custom_profile_data_if_changed(user_profile, data)
 | 
						|
    # We need to call this explicitly otherwise constraints are not check
 | 
						|
    return json_success(request)
 |