mirror of
https://github.com/zulip/zulip.git
synced 2025-11-13 10:26:28 +00:00
users: Pass bogus data for inaccessible users.
We now pass bogus data for inaccessible users when sending the users data in "realm_users" field of "register" response or when using endpoints like "GET /users" to get data of all the users in realm. We would add a client capability field in future commits such that new clients would receive data only for accessible users and they can form the bogus data by themselves.
This commit is contained in:
@@ -147,3 +147,7 @@ def is_avatar_new(ldap_avatar: bytes, user_profile: UserProfile) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def get_avatar_for_inaccessible_user() -> str:
|
||||||
|
return staticfiles_storage.url("images/default-avatar.png")
|
||||||
|
|||||||
@@ -49,7 +49,10 @@ from typing_extensions import override
|
|||||||
|
|
||||||
from corporate.models import Customer, CustomerPlan, LicenseLedger
|
from corporate.models import Customer, CustomerPlan, LicenseLedger
|
||||||
from zerver.actions.message_send import check_send_message, check_send_stream_message
|
from zerver.actions.message_send import check_send_message, check_send_stream_message
|
||||||
from zerver.actions.realm_settings import do_set_realm_property
|
from zerver.actions.realm_settings import (
|
||||||
|
do_change_realm_permission_group_setting,
|
||||||
|
do_set_realm_property,
|
||||||
|
)
|
||||||
from zerver.actions.streams import bulk_add_subscriptions, bulk_remove_subscriptions
|
from zerver.actions.streams import bulk_add_subscriptions, bulk_remove_subscriptions
|
||||||
from zerver.decorator import do_two_factor_login
|
from zerver.decorator import do_two_factor_login
|
||||||
from zerver.lib.cache import bounce_key_prefix_for_testing
|
from zerver.lib.cache import bounce_key_prefix_for_testing
|
||||||
@@ -1941,6 +1944,41 @@ Output:
|
|||||||
do_soft_deactivate_users([user])
|
do_soft_deactivate_users([user])
|
||||||
assert user.long_term_idle
|
assert user.long_term_idle
|
||||||
|
|
||||||
|
def set_up_db_for_testing_user_access(self) -> None:
|
||||||
|
polonius = self.example_user("polonius")
|
||||||
|
hamlet = self.example_user("hamlet")
|
||||||
|
othello = self.example_user("othello")
|
||||||
|
iago = self.example_user("iago")
|
||||||
|
prospero = self.example_user("prospero")
|
||||||
|
aaron = self.example_user("aaron")
|
||||||
|
zoe = self.example_user("ZOE")
|
||||||
|
shiva = self.example_user("shiva")
|
||||||
|
realm = get_realm("zulip")
|
||||||
|
# Polonius is subscribed to "Verona" by default, so we unsubscribe
|
||||||
|
# it so that it becomes easier to test the restricted access.
|
||||||
|
self.unsubscribe(polonius, "Verona")
|
||||||
|
|
||||||
|
self.make_stream("test_stream1")
|
||||||
|
self.make_stream("test_stream2", invite_only=True)
|
||||||
|
|
||||||
|
self.subscribe(othello, "test_stream1")
|
||||||
|
self.send_stream_message(othello, "test_stream1", content="test message", topic_name="test")
|
||||||
|
self.unsubscribe(othello, "test_stream1")
|
||||||
|
|
||||||
|
self.subscribe(polonius, "test_stream1")
|
||||||
|
self.subscribe(polonius, "test_stream2")
|
||||||
|
self.subscribe(hamlet, "test_stream1")
|
||||||
|
self.subscribe(iago, "test_stream2")
|
||||||
|
|
||||||
|
self.send_personal_message(polonius, prospero)
|
||||||
|
self.send_personal_message(shiva, polonius)
|
||||||
|
self.send_huddle_message(aaron, [polonius, zoe])
|
||||||
|
|
||||||
|
members_group = UserGroup.objects.get(name="role:members", realm=realm)
|
||||||
|
do_change_realm_permission_group_setting(
|
||||||
|
realm, "can_access_all_users_group", members_group, acting_user=None
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ZulipTestCase(ZulipTestCaseMixin, TestCase):
|
class ZulipTestCase(ZulipTestCaseMixin, TestCase):
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
|||||||
@@ -1,33 +1,42 @@
|
|||||||
|
import itertools
|
||||||
import re
|
import re
|
||||||
import unicodedata
|
import unicodedata
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, TypedDict
|
from email.headerregistry import Address
|
||||||
|
from operator import itemgetter
|
||||||
|
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Set, Tuple, TypedDict
|
||||||
|
|
||||||
import dateutil.parser as date_parser
|
import dateutil.parser as date_parser
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.exceptions import ValidationError
|
from django.core.exceptions import ValidationError
|
||||||
from django.db.models import QuerySet
|
from django.db.models import Q, QuerySet
|
||||||
from django.utils.translation import gettext as _
|
from django.utils.translation import gettext as _
|
||||||
from django_otp.middleware import is_verified
|
from django_otp.middleware import is_verified
|
||||||
from typing_extensions import NotRequired
|
from typing_extensions import NotRequired
|
||||||
from zulip_bots.custom_exceptions import ConfigValidationError
|
from zulip_bots.custom_exceptions import ConfigValidationError
|
||||||
|
|
||||||
from zerver.lib.avatar import avatar_url, get_avatar_field
|
from zerver.lib.avatar import avatar_url, get_avatar_field, get_avatar_for_inaccessible_user
|
||||||
from zerver.lib.cache import cache_with_key, get_cross_realm_dicts_key
|
from zerver.lib.cache import cache_with_key, get_cross_realm_dicts_key
|
||||||
from zerver.lib.exceptions import (
|
from zerver.lib.exceptions import (
|
||||||
JsonableError,
|
JsonableError,
|
||||||
OrganizationAdministratorRequiredError,
|
OrganizationAdministratorRequiredError,
|
||||||
OrganizationOwnerRequiredError,
|
OrganizationOwnerRequiredError,
|
||||||
)
|
)
|
||||||
|
from zerver.lib.timestamp import timestamp_to_datetime
|
||||||
from zerver.lib.timezone import canonicalize_timezone
|
from zerver.lib.timezone import canonicalize_timezone
|
||||||
from zerver.lib.types import ProfileDataElementUpdateDict, ProfileDataElementValue, RawUserDict
|
from zerver.lib.types import ProfileDataElementUpdateDict, ProfileDataElementValue, RawUserDict
|
||||||
|
from zerver.lib.user_groups import is_user_in_group
|
||||||
from zerver.models import (
|
from zerver.models import (
|
||||||
CustomProfileField,
|
CustomProfileField,
|
||||||
CustomProfileFieldValue,
|
CustomProfileFieldValue,
|
||||||
|
Message,
|
||||||
Realm,
|
Realm,
|
||||||
|
Recipient,
|
||||||
Service,
|
Service,
|
||||||
|
Subscription,
|
||||||
UserMessage,
|
UserMessage,
|
||||||
UserProfile,
|
UserProfile,
|
||||||
|
get_fake_email_domain,
|
||||||
get_realm_user_dicts,
|
get_realm_user_dicts,
|
||||||
get_user,
|
get_user,
|
||||||
get_user_profile_by_id_in_realm,
|
get_user_profile_by_id_in_realm,
|
||||||
@@ -501,6 +510,120 @@ def format_user_row(
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def check_user_can_access_all_users(acting_user: Optional[UserProfile]) -> bool:
|
||||||
|
if acting_user is None:
|
||||||
|
# We allow spectators to access all users since they
|
||||||
|
# have very limited access to the user already.
|
||||||
|
return True
|
||||||
|
|
||||||
|
if not acting_user.is_guest:
|
||||||
|
return True
|
||||||
|
|
||||||
|
realm = acting_user.realm
|
||||||
|
if is_user_in_group(realm.can_access_all_users_group, acting_user):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_subscribers_of_target_user_subscriptions(
|
||||||
|
target_users: List[UserProfile],
|
||||||
|
) -> Dict[int, Set[int]]:
|
||||||
|
target_user_ids = [user.id for user in target_users]
|
||||||
|
target_user_subscriptions = (
|
||||||
|
Subscription.objects.filter(
|
||||||
|
user_profile__in=target_user_ids,
|
||||||
|
active=True,
|
||||||
|
recipient__type__in=[Recipient.STREAM, Recipient.HUDDLE],
|
||||||
|
)
|
||||||
|
.order_by("user_profile_id")
|
||||||
|
.values("user_profile_id", "recipient_id")
|
||||||
|
)
|
||||||
|
|
||||||
|
target_users_subbed_recipient_ids = set()
|
||||||
|
target_user_subscriptions_dict: Dict[int, Set[int]] = defaultdict(set)
|
||||||
|
|
||||||
|
for user_profile_id, sub_rows in itertools.groupby(
|
||||||
|
target_user_subscriptions, itemgetter("user_profile_id")
|
||||||
|
):
|
||||||
|
recipient_ids = {row["recipient_id"] for row in sub_rows}
|
||||||
|
target_user_subscriptions_dict[user_profile_id] = recipient_ids
|
||||||
|
target_users_subbed_recipient_ids |= recipient_ids
|
||||||
|
|
||||||
|
subs_in_target_user_subscriptions_query = Subscription.objects.filter(
|
||||||
|
recipient_id__in=list(target_users_subbed_recipient_ids),
|
||||||
|
active=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
subs_in_target_user_subscriptions_query = subs_in_target_user_subscriptions_query.filter(
|
||||||
|
Q(recipient__type=Recipient.STREAM, is_user_active=True)
|
||||||
|
| Q(recipient__type=Recipient.HUDDLE)
|
||||||
|
)
|
||||||
|
|
||||||
|
subs_in_target_user_subscriptions = subs_in_target_user_subscriptions_query.order_by(
|
||||||
|
"recipient_id"
|
||||||
|
).values("user_profile_id", "recipient_id")
|
||||||
|
|
||||||
|
subscribers_dict_by_recipient_ids: Dict[int, Set[int]] = defaultdict(set)
|
||||||
|
for recipient_id, sub_rows in itertools.groupby(
|
||||||
|
subs_in_target_user_subscriptions, itemgetter("recipient_id")
|
||||||
|
):
|
||||||
|
user_ids = {row["user_profile_id"] for row in sub_rows}
|
||||||
|
subscribers_dict_by_recipient_ids[recipient_id] = user_ids
|
||||||
|
|
||||||
|
users_subbed_to_target_user_subscriptions_dict: Dict[int, Set[int]] = defaultdict(set)
|
||||||
|
for user_id in target_user_ids:
|
||||||
|
target_user_subbed_recipients = target_user_subscriptions_dict[user_id]
|
||||||
|
for recipient_id in target_user_subbed_recipients:
|
||||||
|
users_subbed_to_target_user_subscriptions_dict[
|
||||||
|
user_id
|
||||||
|
] |= subscribers_dict_by_recipient_ids[recipient_id]
|
||||||
|
|
||||||
|
return users_subbed_to_target_user_subscriptions_dict
|
||||||
|
|
||||||
|
|
||||||
|
def get_users_involved_in_dms_with_target_users(
|
||||||
|
target_users: List[UserProfile], realm: Realm
|
||||||
|
) -> Dict[int, Set[int]]:
|
||||||
|
target_user_ids = [user.id for user in target_users]
|
||||||
|
|
||||||
|
direct_messages_recipient_users = (
|
||||||
|
Message.objects.filter(
|
||||||
|
sender_id__in=target_user_ids, realm=realm, recipient__type=Recipient.PERSONAL
|
||||||
|
)
|
||||||
|
.order_by("sender_id")
|
||||||
|
.distinct("sender_id", "recipient__type_id")
|
||||||
|
.values("sender_id", "recipient__type_id")
|
||||||
|
)
|
||||||
|
|
||||||
|
direct_message_participants_dict: Dict[int, Set[int]] = defaultdict(set)
|
||||||
|
for sender_id, message_rows in itertools.groupby(
|
||||||
|
direct_messages_recipient_users, itemgetter("sender_id")
|
||||||
|
):
|
||||||
|
recipient_user_ids = {row["recipient__type_id"] for row in message_rows}
|
||||||
|
direct_message_participants_dict[sender_id] = recipient_user_ids
|
||||||
|
|
||||||
|
personal_recipient_ids_for_target_users = [user.recipient_id for user in target_users]
|
||||||
|
direct_messages_senders = (
|
||||||
|
Message.objects.filter(
|
||||||
|
realm=realm,
|
||||||
|
recipient_id__in=personal_recipient_ids_for_target_users,
|
||||||
|
recipient__type=Recipient.PERSONAL,
|
||||||
|
)
|
||||||
|
.order_by("recipient__type_id")
|
||||||
|
.distinct("sender_id", "recipient__type_id")
|
||||||
|
.values("sender_id", "recipient__type_id")
|
||||||
|
)
|
||||||
|
|
||||||
|
for recipient_user_id, message_rows in itertools.groupby(
|
||||||
|
direct_messages_senders, itemgetter("recipient__type_id")
|
||||||
|
):
|
||||||
|
sender_ids = {row["sender_id"] for row in message_rows}
|
||||||
|
direct_message_participants_dict[recipient_user_id] |= sender_ids
|
||||||
|
|
||||||
|
return direct_message_participants_dict
|
||||||
|
|
||||||
|
|
||||||
def user_profile_to_user_row(user_profile: UserProfile) -> RawUserDict:
|
def user_profile_to_user_row(user_profile: UserProfile) -> RawUserDict:
|
||||||
return RawUserDict(
|
return RawUserDict(
|
||||||
id=user_profile.id,
|
id=user_profile.id,
|
||||||
@@ -548,6 +671,74 @@ def get_cross_realm_dicts() -> List[APIUserDict]:
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def get_data_for_inaccessible_user(realm: Realm, user_id: int) -> APIUserDict:
|
||||||
|
fake_email = Address(username=f"user{user_id}", domain=get_fake_email_domain(realm)).addr_spec
|
||||||
|
|
||||||
|
# We just set date_joined field to UNIX epoch.
|
||||||
|
user_date_joined = timestamp_to_datetime(0)
|
||||||
|
|
||||||
|
user_dict = APIUserDict(
|
||||||
|
email=fake_email,
|
||||||
|
user_id=user_id,
|
||||||
|
avatar_version=1,
|
||||||
|
is_admin=False,
|
||||||
|
is_owner=False,
|
||||||
|
is_guest=False,
|
||||||
|
is_billing_admin=False,
|
||||||
|
role=UserProfile.ROLE_MEMBER,
|
||||||
|
is_bot=False,
|
||||||
|
full_name=str(UserProfile.INACCESSIBLE_USER_NAME),
|
||||||
|
timezone="",
|
||||||
|
is_active=True,
|
||||||
|
date_joined=user_date_joined.isoformat(),
|
||||||
|
delivery_email=None,
|
||||||
|
avatar_url=get_avatar_for_inaccessible_user(),
|
||||||
|
profile_data={},
|
||||||
|
)
|
||||||
|
return user_dict
|
||||||
|
|
||||||
|
|
||||||
|
def get_accessible_user_ids(realm: Realm, user_profile: UserProfile) -> List[int]:
|
||||||
|
subscribers_dict_of_target_user_subscriptions = get_subscribers_of_target_user_subscriptions(
|
||||||
|
[user_profile]
|
||||||
|
)
|
||||||
|
users_involved_in_dms_dict = get_users_involved_in_dms_with_target_users([user_profile], realm)
|
||||||
|
|
||||||
|
# This does not include bots, because either the caller
|
||||||
|
# wants only human users or it handles bots separately.
|
||||||
|
accessible_user_ids = (
|
||||||
|
{user_profile.id}
|
||||||
|
| subscribers_dict_of_target_user_subscriptions[user_profile.id]
|
||||||
|
| users_involved_in_dms_dict[user_profile.id]
|
||||||
|
)
|
||||||
|
|
||||||
|
return list(accessible_user_ids)
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_dicts_in_realm(
|
||||||
|
realm: Realm, user_profile: Optional[UserProfile]
|
||||||
|
) -> Tuple[List[RawUserDict], List[APIUserDict]]:
|
||||||
|
group_allowed_to_access_all_users = realm.can_access_all_users_group
|
||||||
|
assert group_allowed_to_access_all_users is not None
|
||||||
|
|
||||||
|
all_user_dicts = get_realm_user_dicts(realm.id)
|
||||||
|
if check_user_can_access_all_users(user_profile):
|
||||||
|
return (all_user_dicts, [])
|
||||||
|
|
||||||
|
assert user_profile is not None
|
||||||
|
accessible_user_ids = get_accessible_user_ids(realm, user_profile)
|
||||||
|
|
||||||
|
accessible_user_dicts: List[RawUserDict] = []
|
||||||
|
inaccessible_user_dicts: List[APIUserDict] = []
|
||||||
|
for user_dict in all_user_dicts:
|
||||||
|
if user_dict["id"] in accessible_user_ids or user_dict["is_bot"]:
|
||||||
|
accessible_user_dicts.append(user_dict)
|
||||||
|
else:
|
||||||
|
inaccessible_user_dicts.append(get_data_for_inaccessible_user(realm, user_dict["id"]))
|
||||||
|
|
||||||
|
return (accessible_user_dicts, inaccessible_user_dicts)
|
||||||
|
|
||||||
|
|
||||||
def get_custom_profile_field_values(
|
def get_custom_profile_field_values(
|
||||||
custom_profile_field_values: Iterable[CustomProfileFieldValue],
|
custom_profile_field_values: Iterable[CustomProfileFieldValue],
|
||||||
) -> Dict[int, Dict[str, Any]]:
|
) -> Dict[int, Dict[str, Any]]:
|
||||||
@@ -583,10 +774,12 @@ def get_users_for_api(
|
|||||||
custom_profile_field_data = None
|
custom_profile_field_data = None
|
||||||
# target_user is an optional parameter which is passed when user data of a specific user
|
# target_user is an optional parameter which is passed when user data of a specific user
|
||||||
# is required. It is 'None' otherwise.
|
# is required. It is 'None' otherwise.
|
||||||
|
accessible_user_dicts: List[RawUserDict] = []
|
||||||
|
inaccessible_user_dicts: List[APIUserDict] = []
|
||||||
if target_user is not None:
|
if target_user is not None:
|
||||||
user_dicts = [user_profile_to_user_row(target_user)]
|
accessible_user_dicts = [user_profile_to_user_row(target_user)]
|
||||||
else:
|
else:
|
||||||
user_dicts = get_realm_user_dicts(realm.id)
|
accessible_user_dicts, inaccessible_user_dicts = get_user_dicts_in_realm(realm, acting_user)
|
||||||
|
|
||||||
if include_custom_profile_fields:
|
if include_custom_profile_fields:
|
||||||
base_query = CustomProfileFieldValue.objects.select_related("field")
|
base_query = CustomProfileFieldValue.objects.select_related("field")
|
||||||
@@ -598,7 +791,7 @@ def get_users_for_api(
|
|||||||
profiles_by_user_id = get_custom_profile_field_values(custom_profile_field_values)
|
profiles_by_user_id = get_custom_profile_field_values(custom_profile_field_values)
|
||||||
|
|
||||||
result = {}
|
result = {}
|
||||||
for row in user_dicts:
|
for row in accessible_user_dicts:
|
||||||
if profiles_by_user_id is not None:
|
if profiles_by_user_id is not None:
|
||||||
custom_profile_field_data = profiles_by_user_id.get(row["id"], {})
|
custom_profile_field_data = profiles_by_user_id.get(row["id"], {})
|
||||||
client_gravatar_for_user = (
|
client_gravatar_for_user = (
|
||||||
@@ -613,6 +806,13 @@ def get_users_for_api(
|
|||||||
user_avatar_url_field_optional=user_avatar_url_field_optional,
|
user_avatar_url_field_optional=user_avatar_url_field_optional,
|
||||||
custom_profile_field_data=custom_profile_field_data,
|
custom_profile_field_data=custom_profile_field_data,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for inaccessible_user_row in inaccessible_user_dicts:
|
||||||
|
# We already have the required data for inaccessible users
|
||||||
|
# in row object, so we can just add it to result directly.
|
||||||
|
user_id = inaccessible_user_row["user_id"]
|
||||||
|
result[user_id] = inaccessible_user_row
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1919,6 +1919,7 @@ class UserProfile(AbstractBaseUser, PermissionsMixin, UserBaseSettings): # type
|
|||||||
# Foreign key to the Recipient object for PERSONAL type messages to this user.
|
# Foreign key to the Recipient object for PERSONAL type messages to this user.
|
||||||
recipient = models.ForeignKey(Recipient, null=True, on_delete=models.SET_NULL)
|
recipient = models.ForeignKey(Recipient, null=True, on_delete=models.SET_NULL)
|
||||||
|
|
||||||
|
INACCESSIBLE_USER_NAME = gettext_lazy("Unknown user")
|
||||||
# The user's name. We prefer the model of a full_name
|
# The user's name. We prefer the model of a full_name
|
||||||
# over first+last because cultures vary on how many
|
# over first+last because cultures vary on how many
|
||||||
# names one has, whether the family name is first or last, etc.
|
# names one has, whether the family name is first or last, etc.
|
||||||
|
|||||||
@@ -2504,6 +2504,83 @@ class GetProfileTest(ZulipTestCase):
|
|||||||
self.assertEqual(result["user"].get("delivery_email"), hamlet.delivery_email)
|
self.assertEqual(result["user"].get("delivery_email"), hamlet.delivery_email)
|
||||||
self.assertEqual(result["user"].get("email"), hamlet.delivery_email)
|
self.assertEqual(result["user"].get("email"), hamlet.delivery_email)
|
||||||
|
|
||||||
|
def test_restricted_access_to_users(self) -> None:
|
||||||
|
othello = self.example_user("othello")
|
||||||
|
cordelia = self.example_user("cordelia")
|
||||||
|
desdemona = self.example_user("desdemona")
|
||||||
|
hamlet = self.example_user("hamlet")
|
||||||
|
iago = self.example_user("iago")
|
||||||
|
prospero = self.example_user("prospero")
|
||||||
|
aaron = self.example_user("aaron")
|
||||||
|
shiva = self.example_user("shiva")
|
||||||
|
zoe = self.example_user("ZOE")
|
||||||
|
polonius = self.example_user("polonius")
|
||||||
|
|
||||||
|
self.set_up_db_for_testing_user_access()
|
||||||
|
|
||||||
|
self.login("polonius")
|
||||||
|
with self.assert_database_query_count(9):
|
||||||
|
result = orjson.loads(self.client_get("/json/users").content)
|
||||||
|
accessible_users = [
|
||||||
|
user
|
||||||
|
for user in result["members"]
|
||||||
|
if user["full_name"] != UserProfile.INACCESSIBLE_USER_NAME
|
||||||
|
]
|
||||||
|
# The user can access 3 bot users and 7 human users.
|
||||||
|
self.assert_length(accessible_users, 10)
|
||||||
|
accessible_human_users = [user for user in accessible_users if not user["is_bot"]]
|
||||||
|
# The user can access the following 7 human users -
|
||||||
|
# 1. Hamlet and Iago - they are subscribed to common streams.
|
||||||
|
# 2. Prospero - Because Polonius sent a DM to Prospero when
|
||||||
|
# they were allowed to access all users.
|
||||||
|
# 3. Aaron and Zoe - Because they are particapting in a
|
||||||
|
# group DM with Polonius.
|
||||||
|
# 4. Shiva - Because Shiva sent a DM to Polonius.
|
||||||
|
# 5. Polonius - A user can obviously access themselves.
|
||||||
|
self.assert_length(accessible_human_users, 7)
|
||||||
|
accessible_user_ids = [user["user_id"] for user in accessible_human_users]
|
||||||
|
self.assertCountEqual(
|
||||||
|
accessible_user_ids,
|
||||||
|
[polonius.id, hamlet.id, iago.id, prospero.id, aaron.id, zoe.id, shiva.id],
|
||||||
|
)
|
||||||
|
|
||||||
|
inaccessible_users = [
|
||||||
|
user
|
||||||
|
for user in result["members"]
|
||||||
|
if user["full_name"] == UserProfile.INACCESSIBLE_USER_NAME
|
||||||
|
]
|
||||||
|
inaccessible_user_ids = [user["user_id"] for user in inaccessible_users]
|
||||||
|
self.assertCountEqual(inaccessible_user_ids, [cordelia.id, desdemona.id, othello.id])
|
||||||
|
|
||||||
|
do_deactivate_user(hamlet, acting_user=None)
|
||||||
|
do_deactivate_user(aaron, acting_user=None)
|
||||||
|
do_deactivate_user(shiva, acting_user=None)
|
||||||
|
result = orjson.loads(self.client_get("/json/users").content)
|
||||||
|
accessible_users = [
|
||||||
|
user
|
||||||
|
for user in result["members"]
|
||||||
|
if user["full_name"] != UserProfile.INACCESSIBLE_USER_NAME
|
||||||
|
]
|
||||||
|
self.assert_length(accessible_users, 9)
|
||||||
|
# Guests can only access those deactivated users who were involved in
|
||||||
|
# DMs and not those who were subscribed to some common streams.
|
||||||
|
accessible_human_users = [user for user in accessible_users if not user["is_bot"]]
|
||||||
|
self.assert_length(accessible_human_users, 6)
|
||||||
|
accessible_user_ids = [user["user_id"] for user in accessible_human_users]
|
||||||
|
self.assertCountEqual(
|
||||||
|
accessible_user_ids, [polonius.id, iago.id, prospero.id, aaron.id, zoe.id, shiva.id]
|
||||||
|
)
|
||||||
|
|
||||||
|
inaccessible_users = [
|
||||||
|
user
|
||||||
|
for user in result["members"]
|
||||||
|
if user["full_name"] == UserProfile.INACCESSIBLE_USER_NAME
|
||||||
|
]
|
||||||
|
inaccessible_user_ids = [user["user_id"] for user in inaccessible_users]
|
||||||
|
self.assertCountEqual(
|
||||||
|
inaccessible_user_ids, [cordelia.id, desdemona.id, othello.id, hamlet.id]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class DeleteUserTest(ZulipTestCase):
|
class DeleteUserTest(ZulipTestCase):
|
||||||
def test_do_delete_user(self) -> None:
|
def test_do_delete_user(self) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user