mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	This now uses our standard acting_user convention for functions called only from management commands.
		
			
				
	
	
		
			384 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			384 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from collections import defaultdict
 | 
						|
from typing import Any, Dict, List, Optional
 | 
						|
 | 
						|
import orjson
 | 
						|
from django.conf import settings
 | 
						|
from django.db import transaction
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
 | 
						|
from analytics.lib.counts import COUNT_STATS, do_increment_logging_stat
 | 
						|
from zerver.actions.invites import revoke_invites_generated_by_user
 | 
						|
from zerver.actions.user_groups import (
 | 
						|
    do_send_user_group_members_update_event,
 | 
						|
    update_users_in_full_members_system_group,
 | 
						|
)
 | 
						|
from zerver.lib.avatar import avatar_url_from_dict
 | 
						|
from zerver.lib.bot_config import ConfigError, get_bot_config, get_bot_configs, set_bot_config
 | 
						|
from zerver.lib.cache import bot_dict_fields
 | 
						|
from zerver.lib.create_user import create_user
 | 
						|
from zerver.lib.send_email import clear_scheduled_emails
 | 
						|
from zerver.lib.sessions import delete_user_sessions
 | 
						|
from zerver.lib.user_counts import realm_user_count_by_role
 | 
						|
from zerver.lib.user_groups import get_system_user_group_for_user
 | 
						|
from zerver.lib.users import get_active_bots_owned_by_user
 | 
						|
from zerver.models import (
 | 
						|
    Realm,
 | 
						|
    RealmAuditLog,
 | 
						|
    Recipient,
 | 
						|
    Service,
 | 
						|
    Subscription,
 | 
						|
    UserGroupMembership,
 | 
						|
    UserProfile,
 | 
						|
    active_user_ids,
 | 
						|
    bot_owner_user_ids,
 | 
						|
    get_bot_dicts_in_realm,
 | 
						|
    get_bot_services,
 | 
						|
    get_fake_email_domain,
 | 
						|
    get_user_profile_by_id,
 | 
						|
)
 | 
						|
from zerver.tornado.django_api import send_event
 | 
						|
 | 
						|
if settings.BILLING_ENABLED:
 | 
						|
    from corporate.lib.stripe import update_license_ledger_if_needed
 | 
						|
 | 
						|
 | 
						|
def do_delete_user(user_profile: UserProfile, *, acting_user: Optional[UserProfile]) -> None:
 | 
						|
    if user_profile.realm.is_zephyr_mirror_realm:
 | 
						|
        raise AssertionError("Deleting zephyr mirror users is not supported")
 | 
						|
 | 
						|
    do_deactivate_user(user_profile, acting_user=acting_user)
 | 
						|
 | 
						|
    subscribed_huddle_recipient_ids = set(
 | 
						|
        Subscription.objects.filter(
 | 
						|
            user_profile=user_profile, recipient__type=Recipient.HUDDLE
 | 
						|
        ).values_list("recipient_id", flat=True)
 | 
						|
    )
 | 
						|
    user_id = user_profile.id
 | 
						|
    realm = user_profile.realm
 | 
						|
    date_joined = user_profile.date_joined
 | 
						|
    personal_recipient = user_profile.recipient
 | 
						|
 | 
						|
    with transaction.atomic():
 | 
						|
        user_profile.delete()
 | 
						|
        # Recipient objects don't get deleted through CASCADE, so we need to handle
 | 
						|
        # the user's personal recipient manually. This will also delete all Messages pointing
 | 
						|
        # to this recipient (all private messages sent to the user).
 | 
						|
        assert personal_recipient is not None
 | 
						|
        personal_recipient.delete()
 | 
						|
        replacement_user = create_user(
 | 
						|
            force_id=user_id,
 | 
						|
            email=f"deleteduser{user_id}@{get_fake_email_domain(realm)}",
 | 
						|
            password=None,
 | 
						|
            realm=realm,
 | 
						|
            full_name=f"Deleted User {user_id}",
 | 
						|
            active=False,
 | 
						|
            is_mirror_dummy=True,
 | 
						|
            force_date_joined=date_joined,
 | 
						|
        )
 | 
						|
        subs_to_recreate = [
 | 
						|
            Subscription(
 | 
						|
                user_profile=replacement_user,
 | 
						|
                recipient=recipient,
 | 
						|
                is_user_active=replacement_user.is_active,
 | 
						|
            )
 | 
						|
            for recipient in Recipient.objects.filter(id__in=subscribed_huddle_recipient_ids)
 | 
						|
        ]
 | 
						|
        Subscription.objects.bulk_create(subs_to_recreate)
 | 
						|
 | 
						|
        RealmAuditLog.objects.create(
 | 
						|
            realm=replacement_user.realm,
 | 
						|
            modified_user=replacement_user,
 | 
						|
            acting_user=acting_user,
 | 
						|
            event_type=RealmAuditLog.USER_DELETED,
 | 
						|
            event_time=timezone_now(),
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def change_user_is_active(user_profile: UserProfile, value: bool) -> None:
 | 
						|
    """
 | 
						|
    Helper function for changing the .is_active field. Not meant as a standalone function
 | 
						|
    in production code as properly activating/deactivating users requires more steps.
 | 
						|
    This changes the is_active value and saves it, while ensuring
 | 
						|
    Subscription.is_user_active values are updated in the same db transaction.
 | 
						|
    """
 | 
						|
    with transaction.atomic(savepoint=False):
 | 
						|
        user_profile.is_active = value
 | 
						|
        user_profile.save(update_fields=["is_active"])
 | 
						|
        Subscription.objects.filter(user_profile=user_profile).update(is_user_active=value)
 | 
						|
 | 
						|
 | 
						|
def do_deactivate_user(
 | 
						|
    user_profile: UserProfile, _cascade: bool = True, *, acting_user: Optional[UserProfile]
 | 
						|
) -> None:
 | 
						|
    if not user_profile.is_active:
 | 
						|
        return
 | 
						|
 | 
						|
    if _cascade:
 | 
						|
        # We need to deactivate bots before the target user, to ensure
 | 
						|
        # that a failure partway through this function cannot result
 | 
						|
        # in only the user being deactivated.
 | 
						|
        bot_profiles = get_active_bots_owned_by_user(user_profile)
 | 
						|
        for profile in bot_profiles:
 | 
						|
            do_deactivate_user(profile, _cascade=False, acting_user=acting_user)
 | 
						|
 | 
						|
    with transaction.atomic():
 | 
						|
        if user_profile.realm.is_zephyr_mirror_realm:  # nocoverage
 | 
						|
            # For zephyr mirror users, we need to make them a mirror dummy
 | 
						|
            # again; otherwise, other users won't get the correct behavior
 | 
						|
            # when trying to send messages to this person inside Zulip.
 | 
						|
            #
 | 
						|
            # Ideally, we need to also ensure their zephyr mirroring bot
 | 
						|
            # isn't running, but that's a separate issue.
 | 
						|
            user_profile.is_mirror_dummy = True
 | 
						|
            user_profile.save(update_fields=["is_mirror_dummy"])
 | 
						|
 | 
						|
        change_user_is_active(user_profile, False)
 | 
						|
 | 
						|
        clear_scheduled_emails(user_profile.id)
 | 
						|
        revoke_invites_generated_by_user(user_profile)
 | 
						|
 | 
						|
        event_time = timezone_now()
 | 
						|
        RealmAuditLog.objects.create(
 | 
						|
            realm=user_profile.realm,
 | 
						|
            modified_user=user_profile,
 | 
						|
            acting_user=acting_user,
 | 
						|
            event_type=RealmAuditLog.USER_DEACTIVATED,
 | 
						|
            event_time=event_time,
 | 
						|
            extra_data=orjson.dumps(
 | 
						|
                {
 | 
						|
                    RealmAuditLog.ROLE_COUNT: realm_user_count_by_role(user_profile.realm),
 | 
						|
                }
 | 
						|
            ).decode(),
 | 
						|
        )
 | 
						|
        do_increment_logging_stat(
 | 
						|
            user_profile.realm,
 | 
						|
            COUNT_STATS["active_users_log:is_bot:day"],
 | 
						|
            user_profile.is_bot,
 | 
						|
            event_time,
 | 
						|
            increment=-1,
 | 
						|
        )
 | 
						|
        if settings.BILLING_ENABLED:
 | 
						|
            update_license_ledger_if_needed(user_profile.realm, event_time)
 | 
						|
 | 
						|
    delete_user_sessions(user_profile)
 | 
						|
    event = dict(
 | 
						|
        type="realm_user",
 | 
						|
        op="remove",
 | 
						|
        person=dict(user_id=user_profile.id, full_name=user_profile.full_name),
 | 
						|
    )
 | 
						|
    send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
 | 
						|
 | 
						|
    if user_profile.is_bot:
 | 
						|
        event = dict(
 | 
						|
            type="realm_bot",
 | 
						|
            op="remove",
 | 
						|
            bot=dict(user_id=user_profile.id, full_name=user_profile.full_name),
 | 
						|
        )
 | 
						|
        send_event(user_profile.realm, event, bot_owner_user_ids(user_profile))
 | 
						|
 | 
						|
 | 
						|
@transaction.atomic(durable=True)
 | 
						|
def do_change_user_role(
 | 
						|
    user_profile: UserProfile, value: int, *, acting_user: Optional[UserProfile]
 | 
						|
) -> None:
 | 
						|
    old_value = user_profile.role
 | 
						|
    old_system_group = get_system_user_group_for_user(user_profile)
 | 
						|
 | 
						|
    user_profile.role = value
 | 
						|
    user_profile.save(update_fields=["role"])
 | 
						|
    RealmAuditLog.objects.create(
 | 
						|
        realm=user_profile.realm,
 | 
						|
        modified_user=user_profile,
 | 
						|
        acting_user=acting_user,
 | 
						|
        event_type=RealmAuditLog.USER_ROLE_CHANGED,
 | 
						|
        event_time=timezone_now(),
 | 
						|
        extra_data=orjson.dumps(
 | 
						|
            {
 | 
						|
                RealmAuditLog.OLD_VALUE: old_value,
 | 
						|
                RealmAuditLog.NEW_VALUE: value,
 | 
						|
                RealmAuditLog.ROLE_COUNT: realm_user_count_by_role(user_profile.realm),
 | 
						|
            }
 | 
						|
        ).decode(),
 | 
						|
    )
 | 
						|
    event = dict(
 | 
						|
        type="realm_user", op="update", person=dict(user_id=user_profile.id, role=user_profile.role)
 | 
						|
    )
 | 
						|
    transaction.on_commit(
 | 
						|
        lambda: send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
 | 
						|
    )
 | 
						|
 | 
						|
    UserGroupMembership.objects.filter(
 | 
						|
        user_profile=user_profile, user_group=old_system_group
 | 
						|
    ).delete()
 | 
						|
 | 
						|
    system_group = get_system_user_group_for_user(user_profile)
 | 
						|
    UserGroupMembership.objects.create(user_profile=user_profile, user_group=system_group)
 | 
						|
 | 
						|
    do_send_user_group_members_update_event("remove_members", old_system_group, [user_profile.id])
 | 
						|
 | 
						|
    do_send_user_group_members_update_event("add_members", system_group, [user_profile.id])
 | 
						|
 | 
						|
    if UserProfile.ROLE_MEMBER in [old_value, value]:
 | 
						|
        update_users_in_full_members_system_group(user_profile.realm, [user_profile.id])
 | 
						|
 | 
						|
 | 
						|
def do_make_user_billing_admin(user_profile: UserProfile) -> None:
 | 
						|
    user_profile.is_billing_admin = True
 | 
						|
    user_profile.save(update_fields=["is_billing_admin"])
 | 
						|
    event = dict(
 | 
						|
        type="realm_user", op="update", person=dict(user_id=user_profile.id, is_billing_admin=True)
 | 
						|
    )
 | 
						|
    send_event(user_profile.realm, event, active_user_ids(user_profile.realm_id))
 | 
						|
 | 
						|
 | 
						|
def do_change_can_forge_sender(user_profile: UserProfile, value: bool) -> None:
 | 
						|
    user_profile.can_forge_sender = value
 | 
						|
    user_profile.save(update_fields=["can_forge_sender"])
 | 
						|
 | 
						|
 | 
						|
def do_change_can_create_users(user_profile: UserProfile, value: bool) -> None:
 | 
						|
    user_profile.can_create_users = value
 | 
						|
    user_profile.save(update_fields=["can_create_users"])
 | 
						|
 | 
						|
 | 
						|
def do_update_outgoing_webhook_service(
 | 
						|
    bot_profile: UserProfile, service_interface: int, service_payload_url: str
 | 
						|
) -> None:
 | 
						|
    # TODO: First service is chosen because currently one bot can only have one service.
 | 
						|
    # Update this once multiple services are supported.
 | 
						|
    service = get_bot_services(bot_profile.id)[0]
 | 
						|
    service.base_url = service_payload_url
 | 
						|
    service.interface = service_interface
 | 
						|
    service.save()
 | 
						|
    send_event(
 | 
						|
        bot_profile.realm,
 | 
						|
        dict(
 | 
						|
            type="realm_bot",
 | 
						|
            op="update",
 | 
						|
            bot=dict(
 | 
						|
                user_id=bot_profile.id,
 | 
						|
                services=[
 | 
						|
                    dict(
 | 
						|
                        base_url=service.base_url, interface=service.interface, token=service.token
 | 
						|
                    )
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
        ),
 | 
						|
        bot_owner_user_ids(bot_profile),
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def do_update_bot_config_data(bot_profile: UserProfile, config_data: Dict[str, str]) -> None:
 | 
						|
    for key, value in config_data.items():
 | 
						|
        set_bot_config(bot_profile, key, value)
 | 
						|
    updated_config_data = get_bot_config(bot_profile)
 | 
						|
    send_event(
 | 
						|
        bot_profile.realm,
 | 
						|
        dict(
 | 
						|
            type="realm_bot",
 | 
						|
            op="update",
 | 
						|
            bot=dict(
 | 
						|
                user_id=bot_profile.id,
 | 
						|
                services=[dict(config_data=updated_config_data)],
 | 
						|
            ),
 | 
						|
        ),
 | 
						|
        bot_owner_user_ids(bot_profile),
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def get_service_dicts_for_bot(user_profile_id: int) -> List[Dict[str, Any]]:
 | 
						|
    user_profile = get_user_profile_by_id(user_profile_id)
 | 
						|
    services = get_bot_services(user_profile_id)
 | 
						|
    service_dicts: List[Dict[str, Any]] = []
 | 
						|
    if user_profile.bot_type == UserProfile.OUTGOING_WEBHOOK_BOT:
 | 
						|
        service_dicts = [
 | 
						|
            {
 | 
						|
                "base_url": service.base_url,
 | 
						|
                "interface": service.interface,
 | 
						|
                "token": service.token,
 | 
						|
            }
 | 
						|
            for service in services
 | 
						|
        ]
 | 
						|
    elif user_profile.bot_type == UserProfile.EMBEDDED_BOT:
 | 
						|
        try:
 | 
						|
            service_dicts = [
 | 
						|
                {
 | 
						|
                    "config_data": get_bot_config(user_profile),
 | 
						|
                    "service_name": services[0].name,
 | 
						|
                }
 | 
						|
            ]
 | 
						|
        # A ConfigError just means that there are no config entries for user_profile.
 | 
						|
        except ConfigError:
 | 
						|
            pass
 | 
						|
    return service_dicts
 | 
						|
 | 
						|
 | 
						|
def get_service_dicts_for_bots(
 | 
						|
    bot_dicts: List[Dict[str, Any]], realm: Realm
 | 
						|
) -> Dict[int, List[Dict[str, Any]]]:
 | 
						|
    bot_profile_ids = [bot_dict["id"] for bot_dict in bot_dicts]
 | 
						|
    bot_services_by_uid: Dict[int, List[Service]] = defaultdict(list)
 | 
						|
    for service in Service.objects.filter(user_profile_id__in=bot_profile_ids):
 | 
						|
        bot_services_by_uid[service.user_profile_id].append(service)
 | 
						|
 | 
						|
    embedded_bot_ids = [
 | 
						|
        bot_dict["id"] for bot_dict in bot_dicts if bot_dict["bot_type"] == UserProfile.EMBEDDED_BOT
 | 
						|
    ]
 | 
						|
    embedded_bot_configs = get_bot_configs(embedded_bot_ids)
 | 
						|
 | 
						|
    service_dicts_by_uid: Dict[int, List[Dict[str, Any]]] = {}
 | 
						|
    for bot_dict in bot_dicts:
 | 
						|
        bot_profile_id = bot_dict["id"]
 | 
						|
        bot_type = bot_dict["bot_type"]
 | 
						|
        services = bot_services_by_uid[bot_profile_id]
 | 
						|
        service_dicts: List[Dict[str, Any]] = []
 | 
						|
        if bot_type == UserProfile.OUTGOING_WEBHOOK_BOT:
 | 
						|
            service_dicts = [
 | 
						|
                {
 | 
						|
                    "base_url": service.base_url,
 | 
						|
                    "interface": service.interface,
 | 
						|
                    "token": service.token,
 | 
						|
                }
 | 
						|
                for service in services
 | 
						|
            ]
 | 
						|
        elif bot_type == UserProfile.EMBEDDED_BOT:
 | 
						|
            if bot_profile_id in embedded_bot_configs.keys():
 | 
						|
                bot_config = embedded_bot_configs[bot_profile_id]
 | 
						|
                service_dicts = [
 | 
						|
                    {
 | 
						|
                        "config_data": bot_config,
 | 
						|
                        "service_name": services[0].name,
 | 
						|
                    }
 | 
						|
                ]
 | 
						|
        service_dicts_by_uid[bot_profile_id] = service_dicts
 | 
						|
    return service_dicts_by_uid
 | 
						|
 | 
						|
 | 
						|
def get_owned_bot_dicts(
 | 
						|
    user_profile: UserProfile, include_all_realm_bots_if_admin: bool = True
 | 
						|
) -> List[Dict[str, Any]]:
 | 
						|
    if user_profile.is_realm_admin and include_all_realm_bots_if_admin:
 | 
						|
        result = get_bot_dicts_in_realm(user_profile.realm)
 | 
						|
    else:
 | 
						|
        result = UserProfile.objects.filter(
 | 
						|
            realm=user_profile.realm, is_bot=True, bot_owner=user_profile
 | 
						|
        ).values(*bot_dict_fields)
 | 
						|
    services_by_ids = get_service_dicts_for_bots(result, user_profile.realm)
 | 
						|
    return [
 | 
						|
        {
 | 
						|
            "email": botdict["email"],
 | 
						|
            "user_id": botdict["id"],
 | 
						|
            "full_name": botdict["full_name"],
 | 
						|
            "bot_type": botdict["bot_type"],
 | 
						|
            "is_active": botdict["is_active"],
 | 
						|
            "api_key": botdict["api_key"],
 | 
						|
            "default_sending_stream": botdict["default_sending_stream__name"],
 | 
						|
            "default_events_register_stream": botdict["default_events_register_stream__name"],
 | 
						|
            "default_all_public_streams": botdict["default_all_public_streams"],
 | 
						|
            "owner_id": botdict["bot_owner_id"],
 | 
						|
            "avatar_url": avatar_url_from_dict(botdict),
 | 
						|
            "services": services_by_ids[botdict["id"]],
 | 
						|
        }
 | 
						|
        for botdict in result
 | 
						|
    ]
 |