mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	This adds `access_invite_by_id` to retrieve the preregistration user for email invites by id, and a parallel function for multi-user invites.
		
			
				
	
	
		
			254 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			254 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import re
 | 
						|
from typing import Annotated
 | 
						|
 | 
						|
from django.conf import settings
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
from pydantic import Json
 | 
						|
 | 
						|
from confirmation import settings as confirmation_settings
 | 
						|
from zerver.actions.invites import (
 | 
						|
    do_create_multiuse_invite_link,
 | 
						|
    do_get_invites_controlled_by_user,
 | 
						|
    do_invite_users,
 | 
						|
    do_revoke_multi_use_invite,
 | 
						|
    do_revoke_user_invite,
 | 
						|
    do_send_user_invite_email,
 | 
						|
)
 | 
						|
from zerver.decorator import require_member_or_admin
 | 
						|
from zerver.lib.exceptions import InvitationError, JsonableError, OrganizationOwnerRequiredError
 | 
						|
from zerver.lib.response import json_success
 | 
						|
from zerver.lib.streams import access_stream_by_id
 | 
						|
from zerver.lib.typed_endpoint import ApiParamConfig, PathOnly, typed_endpoint
 | 
						|
from zerver.lib.typed_endpoint_validators import check_int_in_validator
 | 
						|
from zerver.models import MultiuseInvite, PreregistrationUser, Stream, UserProfile
 | 
						|
 | 
						|
# Convert INVITATION_LINK_VALIDITY_DAYS into minutes.
 | 
						|
# Because mypy fails to correctly infer the type of the validator, we want this constant
 | 
						|
# to be Optional[int] to avoid a mypy error when using it as the default value.
 | 
						|
# https://github.com/python/mypy/issues/13234
 | 
						|
INVITATION_LINK_VALIDITY_MINUTES: int | None = 24 * 60 * settings.INVITATION_LINK_VALIDITY_DAYS
 | 
						|
 | 
						|
 | 
						|
def check_role_based_permissions(
 | 
						|
    invited_as: int, user_profile: UserProfile, *, require_admin: bool
 | 
						|
) -> None:
 | 
						|
    if (
 | 
						|
        invited_as == PreregistrationUser.INVITE_AS["REALM_OWNER"]
 | 
						|
        and not user_profile.is_realm_owner
 | 
						|
    ):
 | 
						|
        raise OrganizationOwnerRequiredError
 | 
						|
 | 
						|
    if require_admin and not user_profile.is_realm_admin:
 | 
						|
        raise JsonableError(_("Must be an organization administrator"))
 | 
						|
 | 
						|
 | 
						|
def access_invite_by_id(user_profile: UserProfile, invite_id: int) -> PreregistrationUser:
 | 
						|
    try:
 | 
						|
        prereg_user = PreregistrationUser.objects.get(id=invite_id)
 | 
						|
    except PreregistrationUser.DoesNotExist:
 | 
						|
        raise JsonableError(_("No such invitation"))
 | 
						|
 | 
						|
    # Structurally, any invitation the user can actually access should
 | 
						|
    # have a referred_by set for the user who created it.
 | 
						|
    if prereg_user.referred_by is None or prereg_user.referred_by.realm != user_profile.realm:
 | 
						|
        raise JsonableError(_("No such invitation"))
 | 
						|
 | 
						|
    if prereg_user.referred_by_id != user_profile.id:
 | 
						|
        check_role_based_permissions(prereg_user.invited_as, user_profile, require_admin=True)
 | 
						|
    return prereg_user
 | 
						|
 | 
						|
 | 
						|
def access_multiuse_invite_by_id(user_profile: UserProfile, invite_id: int) -> MultiuseInvite:
 | 
						|
    try:
 | 
						|
        invite = MultiuseInvite.objects.get(id=invite_id)
 | 
						|
    except MultiuseInvite.DoesNotExist:
 | 
						|
        raise JsonableError(_("No such invitation"))
 | 
						|
 | 
						|
    if invite.realm != user_profile.realm:
 | 
						|
        raise JsonableError(_("No such invitation"))
 | 
						|
 | 
						|
    if invite.referred_by_id != user_profile.id:
 | 
						|
        check_role_based_permissions(invite.invited_as, user_profile, require_admin=True)
 | 
						|
 | 
						|
    if invite.status == confirmation_settings.STATUS_REVOKED:
 | 
						|
        raise JsonableError(_("Invitation has already been revoked"))
 | 
						|
    return invite
 | 
						|
 | 
						|
 | 
						|
@require_member_or_admin
 | 
						|
@typed_endpoint
 | 
						|
def invite_users_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    invitee_emails_raw: Annotated[str, ApiParamConfig("invitee_emails")],
 | 
						|
    invite_expires_in_minutes: Json[int | None] = INVITATION_LINK_VALIDITY_MINUTES,
 | 
						|
    invite_as: Annotated[
 | 
						|
        Json[int],
 | 
						|
        check_int_in_validator(list(PreregistrationUser.INVITE_AS.values())),
 | 
						|
    ] = PreregistrationUser.INVITE_AS["MEMBER"],
 | 
						|
    notify_referrer_on_join: Json[bool] = True,
 | 
						|
    stream_ids: Json[list[int]],
 | 
						|
    include_realm_default_subscriptions: Json[bool] = False,
 | 
						|
) -> HttpResponse:
 | 
						|
    if not user_profile.can_invite_users_by_email():
 | 
						|
        # Guest users case will not be handled here as it will
 | 
						|
        # be handled by the decorator above.
 | 
						|
        raise JsonableError(_("Insufficient permission"))
 | 
						|
 | 
						|
    require_admin = invite_as in [
 | 
						|
        # Owners can only be invited by owners, checked by separate
 | 
						|
        # logic in check_role_based_permissions.
 | 
						|
        PreregistrationUser.INVITE_AS["REALM_OWNER"],
 | 
						|
        PreregistrationUser.INVITE_AS["REALM_ADMIN"],
 | 
						|
        PreregistrationUser.INVITE_AS["MODERATOR"],
 | 
						|
    ]
 | 
						|
    check_role_based_permissions(invite_as, user_profile, require_admin=require_admin)
 | 
						|
 | 
						|
    if not invitee_emails_raw:
 | 
						|
        raise JsonableError(_("You must specify at least one email address."))
 | 
						|
 | 
						|
    invitee_emails = get_invitee_emails_set(invitee_emails_raw)
 | 
						|
 | 
						|
    streams: list[Stream] = []
 | 
						|
    for stream_id in stream_ids:
 | 
						|
        try:
 | 
						|
            (stream, sub) = access_stream_by_id(user_profile, stream_id)
 | 
						|
        except JsonableError:
 | 
						|
            raise JsonableError(
 | 
						|
                _("Invalid channel ID {channel_id}. No invites were sent.").format(
 | 
						|
                    channel_id=stream_id
 | 
						|
                )
 | 
						|
            )
 | 
						|
        streams.append(stream)
 | 
						|
 | 
						|
    if len(streams) and not user_profile.can_subscribe_other_users():
 | 
						|
        raise JsonableError(_("You do not have permission to subscribe other users to channels."))
 | 
						|
 | 
						|
    skipped = do_invite_users(
 | 
						|
        user_profile,
 | 
						|
        invitee_emails,
 | 
						|
        streams,
 | 
						|
        notify_referrer_on_join,
 | 
						|
        invite_expires_in_minutes=invite_expires_in_minutes,
 | 
						|
        include_realm_default_subscriptions=include_realm_default_subscriptions,
 | 
						|
        invite_as=invite_as,
 | 
						|
    )
 | 
						|
 | 
						|
    if skipped:
 | 
						|
        raise InvitationError(
 | 
						|
            _(
 | 
						|
                "Some of those addresses are already using Zulip, "
 | 
						|
                "so we didn't send them an invitation. We did send "
 | 
						|
                "invitations to everyone else!"
 | 
						|
            ),
 | 
						|
            skipped,
 | 
						|
            sent_invitations=True,
 | 
						|
        )
 | 
						|
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
def get_invitee_emails_set(invitee_emails_raw: str) -> set[str]:
 | 
						|
    invitee_emails_list = set(re.split(r"[,\n]", invitee_emails_raw))
 | 
						|
    invitee_emails = set()
 | 
						|
    for email in invitee_emails_list:
 | 
						|
        is_email_with_name = re.search(r"<(?P<email>.*)>", email)
 | 
						|
        if is_email_with_name:
 | 
						|
            email = is_email_with_name.group("email")
 | 
						|
        invitee_emails.add(email.strip())
 | 
						|
    return invitee_emails
 | 
						|
 | 
						|
 | 
						|
@require_member_or_admin
 | 
						|
def get_user_invites(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
 | 
						|
    all_users = do_get_invites_controlled_by_user(user_profile)
 | 
						|
    return json_success(request, data={"invites": all_users})
 | 
						|
 | 
						|
 | 
						|
@require_member_or_admin
 | 
						|
@typed_endpoint
 | 
						|
def revoke_user_invite(
 | 
						|
    request: HttpRequest, user_profile: UserProfile, *, invite_id: PathOnly[int]
 | 
						|
) -> HttpResponse:
 | 
						|
    prereg_user = access_invite_by_id(user_profile, invite_id)
 | 
						|
    do_revoke_user_invite(prereg_user)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@require_member_or_admin
 | 
						|
@typed_endpoint
 | 
						|
def revoke_multiuse_invite(
 | 
						|
    request: HttpRequest, user_profile: UserProfile, *, invite_id: PathOnly[int]
 | 
						|
) -> HttpResponse:
 | 
						|
    invite = access_multiuse_invite_by_id(user_profile, invite_id)
 | 
						|
    do_revoke_multi_use_invite(invite)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@require_member_or_admin
 | 
						|
@typed_endpoint
 | 
						|
def resend_user_invite_email(
 | 
						|
    request: HttpRequest, user_profile: UserProfile, *, invite_id: PathOnly[int]
 | 
						|
) -> HttpResponse:
 | 
						|
    prereg_user = access_invite_by_id(user_profile, invite_id)
 | 
						|
    do_send_user_invite_email(prereg_user, event_time=timezone_now())
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
@require_member_or_admin
 | 
						|
@typed_endpoint
 | 
						|
def generate_multiuse_invite_backend(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    *,
 | 
						|
    invite_expires_in_minutes: Json[int | None] = INVITATION_LINK_VALIDITY_MINUTES,
 | 
						|
    invite_as: Annotated[
 | 
						|
        Json[int],
 | 
						|
        check_int_in_validator(list(PreregistrationUser.INVITE_AS.values())),
 | 
						|
    ] = PreregistrationUser.INVITE_AS["MEMBER"],
 | 
						|
    stream_ids: Json[list[int]] | None = None,
 | 
						|
    include_realm_default_subscriptions: Json[bool] = False,
 | 
						|
) -> HttpResponse:
 | 
						|
    if stream_ids is None:
 | 
						|
        stream_ids = []
 | 
						|
    if not user_profile.can_create_multiuse_invite_to_realm():
 | 
						|
        # Guest users case will not be handled here as it will
 | 
						|
        # be handled by the decorator above.
 | 
						|
        raise JsonableError(_("Insufficient permission"))
 | 
						|
 | 
						|
    require_admin = invite_as in [
 | 
						|
        # Owners can only be invited by owners, checked by separate
 | 
						|
        # logic in check_role_based_permissions.
 | 
						|
        PreregistrationUser.INVITE_AS["REALM_OWNER"],
 | 
						|
        PreregistrationUser.INVITE_AS["REALM_ADMIN"],
 | 
						|
        PreregistrationUser.INVITE_AS["MODERATOR"],
 | 
						|
    ]
 | 
						|
    check_role_based_permissions(invite_as, user_profile, require_admin=require_admin)
 | 
						|
 | 
						|
    streams = []
 | 
						|
    for stream_id in stream_ids:
 | 
						|
        try:
 | 
						|
            (stream, sub) = access_stream_by_id(user_profile, stream_id)
 | 
						|
        except JsonableError:
 | 
						|
            raise JsonableError(
 | 
						|
                _("Invalid channel ID {channel_id}. No invites were sent.").format(
 | 
						|
                    channel_id=stream_id
 | 
						|
                )
 | 
						|
            )
 | 
						|
        streams.append(stream)
 | 
						|
 | 
						|
    if len(streams) and not user_profile.can_subscribe_other_users():
 | 
						|
        raise JsonableError(_("You do not have permission to subscribe other users to channels."))
 | 
						|
 | 
						|
    invite_link = do_create_multiuse_invite_link(
 | 
						|
        user_profile,
 | 
						|
        invite_as,
 | 
						|
        invite_expires_in_minutes,
 | 
						|
        include_realm_default_subscriptions,
 | 
						|
        streams,
 | 
						|
    )
 | 
						|
    return json_success(request, data={"invite_link": invite_link})
 |