Files
zulip/zerver/tests/test_audit_log.py
Steve Howell b742f1241f realm emoji: Use a single cache for all lookups.
The active realm emoji are just a subset of all your
realm emoji, so just use a single cache entry per
realm.

Cache misses should be very infrequent per realm.

If a realm has lots of deactivated realm emoji, then
there's a minor expense to deserialize them, but that
is gonna be dwarfed by all the other more expensive
operations in message-send.

I also renamed the two related functions.  I erred on
the side of using somewhat verbose names, as we don't
want folks to confuse the two use cases. Fortunately
there are somewhat natural affordances to use one or
the other, and mypy helps too.

Finally, I use realm_id instead of realm in places
where we don't need the full Realm object.
2023-07-17 09:35:53 -07:00

1373 lines
52 KiB
Python

from datetime import timedelta
from typing import Any, Dict, Union
import orjson
from django.contrib.auth.password_validation import validate_password
from django.utils.timezone import now as timezone_now
from analytics.models import StreamCount
from zerver.actions.bots import (
do_change_bot_owner,
do_change_default_all_public_streams,
do_change_default_events_register_stream,
do_change_default_sending_stream,
)
from zerver.actions.create_user import (
do_activate_mirror_dummy_user,
do_create_user,
do_reactivate_user,
)
from zerver.actions.realm_domains import (
do_add_realm_domain,
do_change_realm_domain,
do_remove_realm_domain,
)
from zerver.actions.realm_emoji import check_add_realm_emoji, do_remove_realm_emoji
from zerver.actions.realm_icon import do_change_icon_source
from zerver.actions.realm_linkifiers import (
do_add_linkifier,
do_remove_linkifier,
do_update_linkifier,
)
from zerver.actions.realm_playgrounds import do_add_realm_playground, do_remove_realm_playground
from zerver.actions.realm_settings import (
do_deactivate_realm,
do_reactivate_realm,
do_set_realm_authentication_methods,
do_set_realm_notifications_stream,
do_set_realm_property,
do_set_realm_signup_notifications_stream,
)
from zerver.actions.streams import (
bulk_add_subscriptions,
bulk_remove_subscriptions,
do_change_subscription_property,
do_deactivate_stream,
do_rename_stream,
)
from zerver.actions.user_groups import (
add_subgroups_to_user_group,
bulk_add_members_to_user_group,
check_add_user_group,
do_change_user_group_permission_setting,
do_update_user_group_description,
do_update_user_group_name,
remove_members_from_user_group,
remove_subgroups_from_user_group,
)
from zerver.actions.user_settings import (
do_change_avatar_fields,
do_change_password,
do_change_tos_version,
do_change_user_delivery_email,
do_change_user_setting,
do_regenerate_api_key,
)
from zerver.actions.users import do_change_user_role, do_deactivate_user
from zerver.lib.emoji import get_emoji_file_name, get_emoji_url
from zerver.lib.message import get_last_message_id
from zerver.lib.stream_traffic import get_streams_traffic
from zerver.lib.streams import create_stream_if_needed
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import get_test_image_file
from zerver.lib.types import LinkifierDict, RealmPlaygroundDict
from zerver.lib.user_groups import create_system_user_groups_for_realm
from zerver.lib.utils import assert_is_not_none
from zerver.models import (
EmojiInfo,
Message,
Realm,
RealmAuditLog,
RealmDomainDict,
RealmPlayground,
Recipient,
Subscription,
UserGroup,
UserProfile,
get_all_custom_emoji_for_realm,
get_realm,
get_realm_domains,
get_realm_playgrounds,
get_stream,
linkifiers_for_realm,
)
class TestRealmAuditLog(ZulipTestCase):
def check_role_count_schema(self, role_counts: Dict[str, Any]) -> None:
for key in [
UserProfile.ROLE_REALM_ADMINISTRATOR,
UserProfile.ROLE_MEMBER,
UserProfile.ROLE_GUEST,
UserProfile.ROLE_REALM_OWNER,
]:
# str(key) since json keys are always strings, and ujson.dumps will have converted
# the UserProfile.role values into strings
self.assertTrue(isinstance(role_counts[RealmAuditLog.ROLE_COUNT_HUMANS][str(key)], int))
self.assertTrue(isinstance(role_counts[RealmAuditLog.ROLE_COUNT_BOTS], int))
def test_user_activation(self) -> None:
realm = get_realm("zulip")
now = timezone_now()
user = do_create_user("email", "password", realm, "full_name", acting_user=None)
do_deactivate_user(user, acting_user=user)
do_activate_mirror_dummy_user(user, acting_user=user)
do_deactivate_user(user, acting_user=user)
do_reactivate_user(user, acting_user=user)
self.assertEqual(RealmAuditLog.objects.filter(event_time__gte=now).count(), 8)
event_types = list(
RealmAuditLog.objects.filter(
realm=realm,
acting_user=user,
modified_user=user,
modified_stream=None,
event_time__gte=now,
event_time__lte=now + timedelta(minutes=60),
)
.order_by("event_time")
.values_list("event_type", flat=True)
)
self.assertEqual(
event_types,
[
RealmAuditLog.USER_CREATED,
RealmAuditLog.USER_GROUP_DIRECT_USER_MEMBERSHIP_ADDED,
RealmAuditLog.USER_GROUP_DIRECT_USER_MEMBERSHIP_ADDED,
RealmAuditLog.USER_DEACTIVATED,
RealmAuditLog.USER_ACTIVATED,
RealmAuditLog.USER_DEACTIVATED,
RealmAuditLog.USER_REACTIVATED,
],
)
modified_user_group_names = []
for event in RealmAuditLog.objects.filter(
realm=realm,
acting_user=user,
modified_user=user,
modified_stream=None,
event_time__gte=now,
event_time__lte=now + timedelta(minutes=60),
):
if event.event_type == RealmAuditLog.USER_GROUP_DIRECT_USER_MEMBERSHIP_ADDED:
self.assertIsNone(event.extra_data)
modified_user_group_names.append(assert_is_not_none(event.modified_user_group).name)
continue
extra_data = orjson.loads(assert_is_not_none(event.extra_data))
self.check_role_count_schema(extra_data[RealmAuditLog.ROLE_COUNT])
self.assertNotIn(RealmAuditLog.OLD_VALUE, extra_data)
self.assertListEqual(
modified_user_group_names,
[
UserGroup.MEMBERS_GROUP_NAME,
UserGroup.FULL_MEMBERS_GROUP_NAME,
],
)
def test_change_role(self) -> None:
realm = get_realm("zulip")
now = timezone_now()
user_profile = self.example_user("hamlet")
acting_user = self.example_user("iago")
do_change_user_role(
user_profile, UserProfile.ROLE_REALM_ADMINISTRATOR, acting_user=acting_user
)
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER, acting_user=acting_user)
do_change_user_role(user_profile, UserProfile.ROLE_GUEST, acting_user=acting_user)
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER, acting_user=acting_user)
do_change_user_role(user_profile, UserProfile.ROLE_REALM_OWNER, acting_user=acting_user)
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER, acting_user=acting_user)
do_change_user_role(user_profile, UserProfile.ROLE_MODERATOR, acting_user=acting_user)
do_change_user_role(user_profile, UserProfile.ROLE_MEMBER, acting_user=acting_user)
old_values_seen = set()
new_values_seen = set()
for event in RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_ROLE_CHANGED,
realm=realm,
modified_user=user_profile,
acting_user=acting_user,
event_time__gte=now,
event_time__lte=now + timedelta(minutes=60),
):
extra_data = orjson.loads(assert_is_not_none(event.extra_data))
self.check_role_count_schema(extra_data[RealmAuditLog.ROLE_COUNT])
self.assertIn(RealmAuditLog.OLD_VALUE, extra_data)
self.assertIn(RealmAuditLog.NEW_VALUE, extra_data)
old_values_seen.add(extra_data[RealmAuditLog.OLD_VALUE])
new_values_seen.add(extra_data[RealmAuditLog.NEW_VALUE])
self.assertEqual(
old_values_seen,
{
UserProfile.ROLE_GUEST,
UserProfile.ROLE_MEMBER,
UserProfile.ROLE_REALM_ADMINISTRATOR,
UserProfile.ROLE_REALM_OWNER,
UserProfile.ROLE_MODERATOR,
},
)
self.assertEqual(old_values_seen, new_values_seen)
expected_system_user_group_names = [
UserGroup.ADMINISTRATORS_GROUP_NAME,
UserGroup.MEMBERS_GROUP_NAME,
UserGroup.FULL_MEMBERS_GROUP_NAME,
UserGroup.EVERYONE_GROUP_NAME,
UserGroup.MEMBERS_GROUP_NAME,
UserGroup.FULL_MEMBERS_GROUP_NAME,
UserGroup.OWNERS_GROUP_NAME,
UserGroup.MEMBERS_GROUP_NAME,
UserGroup.FULL_MEMBERS_GROUP_NAME,
UserGroup.MODERATORS_GROUP_NAME,
]
user_group_modified_names = (
RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_GROUP_DIRECT_USER_MEMBERSHIP_ADDED,
realm=realm,
modified_user=user_profile,
acting_user=acting_user,
event_time__gte=now,
event_time__lte=now + timedelta(minutes=60),
)
.order_by("event_time")
.values_list("modified_user_group__name", flat=True)
)
self.assertListEqual(
list(user_group_modified_names),
[
*expected_system_user_group_names,
UserGroup.MEMBERS_GROUP_NAME,
UserGroup.FULL_MEMBERS_GROUP_NAME,
],
)
user_group_modified_names = (
RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_GROUP_DIRECT_USER_MEMBERSHIP_REMOVED,
realm=realm,
modified_user=user_profile,
acting_user=acting_user,
event_time__gte=now,
event_time__lte=now + timedelta(minutes=60),
)
.order_by("event_time")
.values_list("modified_user_group__name", flat=True)
)
self.assertListEqual(
list(user_group_modified_names),
[
UserGroup.MEMBERS_GROUP_NAME,
UserGroup.FULL_MEMBERS_GROUP_NAME,
*expected_system_user_group_names,
],
)
def test_change_password(self) -> None:
now = timezone_now()
user = self.example_user("hamlet")
password = "test1"
do_change_password(user, password)
self.assertEqual(
RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_PASSWORD_CHANGED, event_time__gte=now
).count(),
1,
)
# No error should be raised here
validate_password(password, user)
def test_change_email(self) -> None:
now = timezone_now()
user = self.example_user("hamlet")
new_email = "test@example.com"
do_change_user_delivery_email(user, new_email)
self.assertEqual(
RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_EMAIL_CHANGED, event_time__gte=now
).count(),
1,
)
self.assertEqual(new_email, user.delivery_email)
# Test the RealmAuditLog stringification
audit_entry = RealmAuditLog.objects.get(
event_type=RealmAuditLog.USER_EMAIL_CHANGED, event_time__gte=now
)
self.assertTrue(
repr(audit_entry).startswith(
f"<RealmAuditLog: <UserProfile: {user.email} {user.realm!r}> {RealmAuditLog.USER_EMAIL_CHANGED} "
)
)
def test_change_avatar_source(self) -> None:
now = timezone_now()
user = self.example_user("hamlet")
avatar_source = "G"
do_change_avatar_fields(user, avatar_source, acting_user=user)
self.assertEqual(
RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_AVATAR_SOURCE_CHANGED,
modified_user=user,
acting_user=user,
event_time__gte=now,
).count(),
1,
)
self.assertEqual(avatar_source, user.avatar_source)
def test_change_full_name(self) -> None:
start = timezone_now()
new_name = "George Hamletovich"
self.login("iago")
req = dict(full_name=new_name)
result = self.client_patch("/json/users/{}".format(self.example_user("hamlet").id), req)
self.assertTrue(result.status_code == 200)
query = RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_FULL_NAME_CHANGED, event_time__gte=start
)
self.assertEqual(query.count(), 1)
def test_change_tos_version(self) -> None:
now = timezone_now()
user = self.example_user("hamlet")
tos_version = "android"
do_change_tos_version(user, tos_version)
self.assertEqual(
RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_TERMS_OF_SERVICE_VERSION_CHANGED, event_time__gte=now
).count(),
1,
)
self.assertEqual(tos_version, user.tos_version)
def test_change_bot_owner(self) -> None:
now = timezone_now()
admin = self.example_user("iago")
bot = self.notification_bot(admin.realm)
bot_owner = self.example_user("hamlet")
do_change_bot_owner(bot, bot_owner, admin)
self.assertEqual(
RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_BOT_OWNER_CHANGED, event_time__gte=now
).count(),
1,
)
self.assertEqual(bot_owner, bot.bot_owner)
def test_regenerate_api_key(self) -> None:
now = timezone_now()
user = self.example_user("hamlet")
do_regenerate_api_key(user, user)
self.assertEqual(
RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_API_KEY_CHANGED, event_time__gte=now
).count(),
1,
)
self.assertTrue(user.api_key)
def test_get_streams_traffic(self) -> None:
realm = get_realm("zulip")
stream_name = "whatever"
stream = self.make_stream(stream_name, realm)
stream_ids = {stream.id}
result = get_streams_traffic(stream_ids)
self.assertEqual(result, {})
StreamCount.objects.create(
realm=realm,
stream=stream,
property="messages_in_stream:is_bot:day",
end_time=timezone_now(),
value=999,
)
result = get_streams_traffic(stream_ids)
self.assertEqual(result, {stream.id: 999})
def test_subscriptions(self) -> None:
now = timezone_now()
user = self.example_user("hamlet")
realm = user.realm
stream = self.make_stream("test_stream")
acting_user = self.example_user("iago")
bulk_add_subscriptions(user.realm, [stream], [user], acting_user=acting_user)
subscription_creation_logs = RealmAuditLog.objects.filter(
event_type=RealmAuditLog.SUBSCRIPTION_CREATED,
event_time__gte=now,
acting_user=acting_user,
modified_user=user,
modified_stream=stream,
)
modified_stream = subscription_creation_logs[0].modified_stream
assert modified_stream is not None
self.assertEqual(subscription_creation_logs.count(), 1)
self.assertEqual(modified_stream.id, stream.id)
self.assertEqual(subscription_creation_logs[0].modified_user, user)
bulk_remove_subscriptions(realm, [user], [stream], acting_user=acting_user)
subscription_deactivation_logs = RealmAuditLog.objects.filter(
event_type=RealmAuditLog.SUBSCRIPTION_DEACTIVATED,
event_time__gte=now,
acting_user=acting_user,
modified_user=user,
modified_stream=stream,
)
modified_stream = subscription_deactivation_logs[0].modified_stream
assert modified_stream is not None
self.assertEqual(subscription_deactivation_logs.count(), 1)
self.assertEqual(modified_stream.id, stream.id)
self.assertEqual(subscription_deactivation_logs[0].modified_user, user)
def test_realm_activation(self) -> None:
realm = get_realm("zulip")
user = self.example_user("desdemona")
do_deactivate_realm(realm, acting_user=user)
log_entry = RealmAuditLog.objects.get(
realm=realm, event_type=RealmAuditLog.REALM_DEACTIVATED, acting_user=user
)
extra_data = orjson.loads(assert_is_not_none(log_entry.extra_data))
self.check_role_count_schema(extra_data[RealmAuditLog.ROLE_COUNT])
do_reactivate_realm(realm)
log_entry = RealmAuditLog.objects.get(
realm=realm, event_type=RealmAuditLog.REALM_REACTIVATED
)
extra_data = orjson.loads(assert_is_not_none(log_entry.extra_data))
self.check_role_count_schema(extra_data[RealmAuditLog.ROLE_COUNT])
def test_create_stream_if_needed(self) -> None:
now = timezone_now()
realm = get_realm("zulip")
user = self.example_user("hamlet")
stream = create_stream_if_needed(
realm,
"test",
invite_only=False,
stream_description="Test description",
acting_user=user,
)[0]
self.assertEqual(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.STREAM_CREATED,
event_time__gte=now,
acting_user=user,
modified_stream=stream,
).count(),
1,
)
def test_deactivate_stream(self) -> None:
now = timezone_now()
realm = get_realm("zulip")
user = self.example_user("hamlet")
stream_name = "test"
stream = self.make_stream(stream_name, realm)
do_deactivate_stream(stream, acting_user=user)
self.assertEqual(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.STREAM_DEACTIVATED,
event_time__gte=now,
acting_user=user,
modified_stream=stream,
).count(),
1,
)
self.assertEqual(stream.deactivated, True)
def test_set_realm_authentication_methods(self) -> None:
now = timezone_now()
realm = get_realm("zulip")
user = self.example_user("hamlet")
expected_old_value = realm.authentication_methods_dict()
auth_method_dict = {
"Google": False,
"Email": False,
"GitHub": False,
"Apple": False,
"Dev": True,
"SAML": True,
"GitLab": False,
"OpenID Connect": False,
}
do_set_realm_authentication_methods(realm, auth_method_dict, acting_user=user)
realm_audit_logs = RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.REALM_PROPERTY_CHANGED,
event_time__gte=now,
acting_user=user,
)
self.assertEqual(realm_audit_logs.count(), 1)
extra_data = orjson.loads(assert_is_not_none(realm_audit_logs[0].extra_data))
expected_new_value = auth_method_dict
self.assertEqual(extra_data[RealmAuditLog.OLD_VALUE], expected_old_value)
self.assertEqual(extra_data[RealmAuditLog.NEW_VALUE], expected_new_value)
def test_get_last_message_id(self) -> None:
# get_last_message_id is a helper mainly used for RealmAuditLog
self.assertEqual(
get_last_message_id(),
Message.objects.latest("id").id,
)
Message.objects.all().delete()
self.assertEqual(get_last_message_id(), -1)
def test_set_realm_message_editing(self) -> None:
now = timezone_now()
realm = get_realm("zulip")
user = self.example_user("hamlet")
value_expected = {
RealmAuditLog.OLD_VALUE: realm.message_content_edit_limit_seconds,
RealmAuditLog.NEW_VALUE: 1000,
"property": "message_content_edit_limit_seconds",
}
do_set_realm_property(realm, "message_content_edit_limit_seconds", 1000, acting_user=user)
self.assertEqual(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.REALM_PROPERTY_CHANGED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(value_expected).decode(),
).count(),
1,
)
value_expected = {
RealmAuditLog.OLD_VALUE: Realm.POLICY_EVERYONE,
RealmAuditLog.NEW_VALUE: Realm.POLICY_ADMINS_ONLY,
"property": "edit_topic_policy",
}
do_set_realm_property(
realm, "edit_topic_policy", Realm.POLICY_ADMINS_ONLY, acting_user=user
)
self.assertEqual(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.REALM_PROPERTY_CHANGED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(value_expected).decode(),
).count(),
1,
)
def test_set_realm_notifications_stream(self) -> None:
now = timezone_now()
realm = get_realm("zulip")
user = self.example_user("hamlet")
old_value = realm.notifications_stream_id
stream_name = "test"
stream = self.make_stream(stream_name, realm)
do_set_realm_notifications_stream(realm, stream, stream.id, acting_user=user)
self.assertEqual(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.REALM_PROPERTY_CHANGED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(
{
RealmAuditLog.OLD_VALUE: old_value,
RealmAuditLog.NEW_VALUE: stream.id,
"property": "notifications_stream",
}
).decode(),
).count(),
1,
)
def test_set_realm_signup_notifications_stream(self) -> None:
now = timezone_now()
realm = get_realm("zulip")
user = self.example_user("hamlet")
old_value = realm.signup_notifications_stream_id
stream_name = "test"
stream = self.make_stream(stream_name, realm)
do_set_realm_signup_notifications_stream(realm, stream, stream.id, acting_user=user)
self.assertEqual(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.REALM_PROPERTY_CHANGED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(
{
RealmAuditLog.OLD_VALUE: old_value,
RealmAuditLog.NEW_VALUE: stream.id,
"property": "signup_notifications_stream",
}
).decode(),
).count(),
1,
)
def test_change_icon_source(self) -> None:
test_start = timezone_now()
realm = get_realm("zulip")
user = self.example_user("hamlet")
icon_source = "G"
do_change_icon_source(realm, icon_source, acting_user=user)
audit_entries = RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.REALM_ICON_SOURCE_CHANGED,
acting_user=user,
event_time__gte=test_start,
)
audit_log = audit_entries.first()
assert audit_log is not None
self.assert_length(audit_entries, 1)
self.assertEqual(icon_source, realm.icon_source)
self.assertEqual(audit_log.extra_data, "{'icon_source': 'G', 'icon_version': 2}")
def test_change_subscription_property(self) -> None:
user = self.example_user("hamlet")
# Fetch the Denmark stream for testing
stream = get_stream("Denmark", user.realm)
sub = Subscription.objects.get(
user_profile=user, recipient__type=Recipient.STREAM, recipient__type_id=stream.id
)
properties = {
"color": True,
"is_muted": True,
"desktop_notifications": False,
"audible_notifications": False,
"push_notifications": True,
"email_notifications": True,
"pin_to_top": True,
"wildcard_mentions_notify": False,
}
for property, value in properties.items():
now = timezone_now()
old_value = getattr(sub, property)
self.assertNotEqual(old_value, value)
do_change_subscription_property(user, sub, stream, property, value, acting_user=user)
expected_extra_data = {
RealmAuditLog.OLD_VALUE: old_value,
RealmAuditLog.NEW_VALUE: value,
"property": property,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.SUBSCRIPTION_PROPERTY_CHANGED,
event_time__gte=now,
acting_user=user,
modified_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
self.assertEqual(getattr(sub, property), value)
def test_change_default_streams(self) -> None:
now = timezone_now()
user = self.example_user("hamlet")
stream = get_stream("Denmark", user.realm)
old_value = user.default_sending_stream_id
do_change_default_sending_stream(user, stream, acting_user=user)
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.USER_DEFAULT_SENDING_STREAM_CHANGED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(
{
RealmAuditLog.OLD_VALUE: old_value,
RealmAuditLog.NEW_VALUE: stream.id,
}
).decode(),
).count(),
1,
)
self.assertEqual(user.default_sending_stream, stream)
old_value = user.default_events_register_stream_id
do_change_default_events_register_stream(user, stream, acting_user=user)
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.USER_DEFAULT_REGISTER_STREAM_CHANGED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(
{
RealmAuditLog.OLD_VALUE: old_value,
RealmAuditLog.NEW_VALUE: stream.id,
}
).decode(),
).count(),
1,
)
self.assertEqual(user.default_events_register_stream, stream)
old_value = user.default_all_public_streams
do_change_default_all_public_streams(user, False, acting_user=user)
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.USER_DEFAULT_ALL_PUBLIC_STREAMS_CHANGED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(
{RealmAuditLog.OLD_VALUE: old_value, RealmAuditLog.NEW_VALUE: False}
).decode(),
).count(),
1,
)
self.assertEqual(user.default_all_public_streams, False)
def test_rename_stream(self) -> None:
now = timezone_now()
user = self.example_user("hamlet")
stream = self.make_stream("test", user.realm)
old_name = stream.name
do_rename_stream(stream, "updated name", user)
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.STREAM_NAME_CHANGED,
event_time__gte=now,
acting_user=user,
modified_stream=stream,
extra_data=orjson.dumps(
{RealmAuditLog.OLD_VALUE: old_name, RealmAuditLog.NEW_VALUE: "updated name"}
).decode(),
).count(),
1,
)
self.assertEqual(stream.name, "updated name")
def test_change_user_settings(self) -> None:
user = self.example_user("hamlet")
value: Union[bool, int, str]
test_values = dict(
default_language="de",
default_view="all_messages",
emojiset="twitter",
notification_sound="ding",
)
for setting, setting_type in user.property_types.items():
if setting in test_values:
value = test_values[setting]
elif setting_type is int:
value = 3
else:
value = False
now = timezone_now()
old_value = getattr(user, setting)
do_change_user_setting(user, setting, value, acting_user=user)
expected_extra_data = {
RealmAuditLog.OLD_VALUE: old_value,
RealmAuditLog.NEW_VALUE: value,
"property": setting,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.USER_SETTING_CHANGED,
event_time__gte=now,
acting_user=user,
modified_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
self.assertEqual(getattr(user, setting), value)
def test_realm_domain_entries(self) -> None:
user = self.example_user("iago")
initial_domains = get_realm_domains(user.realm)
now = timezone_now()
realm_domain = do_add_realm_domain(user.realm, "zulip.org", False, acting_user=user)
added_domain = RealmDomainDict(
domain="zulip.org",
allow_subdomains=False,
)
expected_extra_data = {
"realm_domains": [*initial_domains, added_domain],
"added_domain": added_domain,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_DOMAIN_ADDED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
now = timezone_now()
do_change_realm_domain(realm_domain, True, acting_user=user)
changed_domain = RealmDomainDict(
domain="zulip.org",
allow_subdomains=True,
)
expected_extra_data = {
"realm_domains": [*initial_domains, changed_domain],
"changed_domain": changed_domain,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_DOMAIN_CHANGED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
now = timezone_now()
do_remove_realm_domain(realm_domain, acting_user=user)
removed_domain = RealmDomainDict(
domain="zulip.org",
allow_subdomains=True,
)
expected_extra_data = {
"realm_domains": initial_domains,
"removed_domain": removed_domain,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_DOMAIN_REMOVED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
def test_realm_playground_entries(self) -> None:
user = self.example_user("iago")
initial_playgrounds = get_realm_playgrounds(user.realm)
now = timezone_now()
playground_id = do_add_realm_playground(
user.realm,
acting_user=user,
name="Python playground",
pygments_language="Python",
url_prefix="https://python.example.com",
)
added_playground = RealmPlaygroundDict(
id=playground_id,
name="Python playground",
pygments_language="Python",
url_prefix="https://python.example.com",
)
expected_extra_data = {
"realm_playgrounds": [*initial_playgrounds, added_playground],
"added_playground": added_playground,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_PLAYGROUND_ADDED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
now = timezone_now()
realm_playground = RealmPlayground.objects.get(id=playground_id)
do_remove_realm_playground(
user.realm,
realm_playground,
acting_user=user,
)
removed_playground = {
"name": "Python playground",
"pygments_language": "Python",
"url_prefix": "https://python.example.com",
}
expected_extra_data = {
"realm_playgrounds": initial_playgrounds,
"removed_playground": removed_playground,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_PLAYGROUND_REMOVED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
def test_realm_linkifier_entries(self) -> None:
user = self.example_user("iago")
initial_linkifiers = linkifiers_for_realm(user.realm.id)
now = timezone_now()
linkifier_id = do_add_linkifier(
user.realm,
pattern="#(?P<id>[123])",
url_template="https://realm.com/my_realm_filter/{id}",
acting_user=user,
)
added_linkfier = LinkifierDict(
pattern="#(?P<id>[123])",
url_template="https://realm.com/my_realm_filter/{id}",
id=linkifier_id,
)
expected_extra_data = {
"realm_linkifiers": [*initial_linkifiers, added_linkfier],
"added_linkifier": added_linkfier,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_LINKIFIER_ADDED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
now = timezone_now()
do_update_linkifier(
user.realm,
id=linkifier_id,
pattern="#(?P<id>[0-9]+)",
url_template="https://realm.com/my_realm_filter/issues/{id}",
acting_user=user,
)
changed_linkifier = LinkifierDict(
pattern="#(?P<id>[0-9]+)",
url_template="https://realm.com/my_realm_filter/issues/{id}",
id=linkifier_id,
)
expected_extra_data = {
"realm_linkifiers": [*initial_linkifiers, changed_linkifier],
"changed_linkifier": changed_linkifier,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_LINKIFIER_CHANGED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
now = timezone_now()
do_remove_linkifier(
user.realm,
id=linkifier_id,
acting_user=user,
)
removed_linkifier = {
"pattern": "#(?P<id>[0-9]+)",
"url_template": "https://realm.com/my_realm_filter/issues/{id}",
}
expected_extra_data = {
"realm_linkifiers": initial_linkifiers,
"removed_linkifier": removed_linkifier,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_LINKIFIER_REMOVED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
def test_realm_emoji_entries(self) -> None:
user = self.example_user("iago")
realm_emoji_dict = get_all_custom_emoji_for_realm(user.realm_id)
now = timezone_now()
with get_test_image_file("img.png") as img_file:
# Because we want to verify the IntegrityError handling
# logic in check_add_realm_emoji rather than the primary
# check in upload_emoji, we need to make this request via
# that helper rather than via the API.
realm_emoji = check_add_realm_emoji(
realm=user.realm, name="test_emoji", author=user, image_file=img_file
)
added_emoji = EmojiInfo(
id=str(realm_emoji.id),
name="test_emoji",
source_url=get_emoji_url(get_emoji_file_name("img.png", realm_emoji.id), user.realm_id),
deactivated=False,
author_id=user.id,
still_url=None,
)
realm_emoji_dict[str(realm_emoji.id)] = added_emoji
expected_extra_data = {
"realm_emoji": dict(sorted(realm_emoji_dict.items())),
"added_emoji": added_emoji,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_EMOJI_ADDED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
now = timezone_now()
do_remove_realm_emoji(user.realm, "test_emoji", acting_user=user)
deactivated_emoji = EmojiInfo(
id=str(realm_emoji.id),
name="test_emoji",
source_url=get_emoji_url(get_emoji_file_name("img.png", realm_emoji.id), user.realm_id),
deactivated=True,
author_id=user.id,
still_url=None,
)
realm_emoji_dict[str(realm_emoji.id)] = deactivated_emoji
expected_extra_data = {
"realm_emoji": dict(sorted(realm_emoji_dict.items())),
"deactivated_emoji": deactivated_emoji,
}
self.assertEqual(
RealmAuditLog.objects.filter(
realm=user.realm,
event_type=RealmAuditLog.REALM_EMOJI_REMOVED,
event_time__gte=now,
acting_user=user,
extra_data=orjson.dumps(expected_extra_data).decode(),
).count(),
1,
)
def test_system_user_groups_creation(self) -> None:
realm = Realm.objects.create(string_id="test", name="foo")
now = timezone_now()
create_system_user_groups_for_realm(realm)
# The expected number of system user group is the total number of roles
# from UserGroup.SYSTEM_USER_GROUP_ROLE_MAP in addition to
# full_members_system_group, everyone_on_internet_system_group and
# nobody_system_group.
expected_system_user_group_count = len(UserGroup.SYSTEM_USER_GROUP_ROLE_MAP) + 3
system_user_group_ids = sorted(
UserGroup.objects.filter(
realm=realm,
is_system_group=True,
).values_list("id", flat=True)
)
self.assert_length(system_user_group_ids, expected_system_user_group_count)
logged_system_group_ids = sorted(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.USER_GROUP_CREATED,
event_time__gte=now,
acting_user=None,
).values_list("modified_user_group_id", flat=True)
)
self.assertListEqual(logged_system_group_ids, system_user_group_ids)
logged_subgroup_entries = sorted(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.USER_GROUP_DIRECT_SUBGROUP_MEMBERSHIP_ADDED,
event_time__gte=now,
acting_user=None,
).values_list("modified_user_group_id", "extra_data")
)
logged_supergroup_entries = sorted(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.USER_GROUP_DIRECT_SUPERGROUP_MEMBERSHIP_ADDED,
event_time__gte=now,
acting_user=None,
).values_list("modified_user_group_id", "extra_data")
)
# Excluding nobody_system_group, the rest of the user groups should have
# a chain of subgroup memberships in between.
self.assert_length(logged_subgroup_entries, expected_system_user_group_count - 2)
self.assert_length(logged_supergroup_entries, expected_system_user_group_count - 2)
for i in range(len(logged_subgroup_entries)):
# The offset of 1 is due to nobody_system_group being skipped as
# the first user group in the list.
# For supergroup, we add an additional 1 because of the order we
# put the chain together.
expected_subgroup_id = system_user_group_ids[i + 1]
expected_supergroup_id = system_user_group_ids[i + 2]
supergroup_id, subgroup_extra_data = logged_subgroup_entries[i]
subgroup_id, supergroup_extra_data = logged_supergroup_entries[i]
assert subgroup_extra_data is not None
assert supergroup_extra_data is not None
self.assertEqual(
orjson.loads(subgroup_extra_data)["subgroup_ids"][0], expected_subgroup_id
)
self.assertEqual(
orjson.loads(supergroup_extra_data)["supergroup_ids"][0], expected_supergroup_id
)
self.assertEqual(supergroup_id, expected_supergroup_id)
self.assertEqual(subgroup_id, expected_subgroup_id)
audit_log_entries = sorted(
RealmAuditLog.objects.filter(
realm=realm,
event_type=RealmAuditLog.USER_GROUP_GROUP_BASED_SETTING_CHANGED,
event_time__gte=now,
acting_user=None,
).values_list("modified_user_group_id", "extra_data")
)
nobody_group = UserGroup.objects.get(name=UserGroup.NOBODY_GROUP_NAME, realm=realm)
for (user_group_id, extra_data), expected_user_group_id in zip(
audit_log_entries, logged_system_group_ids
):
self.assertEqual(user_group_id, expected_user_group_id)
self.assertDictEqual(
orjson.loads(assert_is_not_none(extra_data)),
{
RealmAuditLog.OLD_VALUE: None,
RealmAuditLog.NEW_VALUE: nobody_group.id,
"property": "can_mention_group",
},
)
def test_user_group_creation(self) -> None:
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
now = timezone_now()
public_group = UserGroup.objects.get(
name=UserGroup.EVERYONE_ON_INTERNET_GROUP_NAME, realm=hamlet.realm
)
user_group = check_add_user_group(
hamlet.realm,
"empty",
[hamlet, cordelia],
acting_user=hamlet,
description="lorem",
group_settings_map={"can_mention_group": public_group},
)
audit_log_entries = RealmAuditLog.objects.filter(
acting_user=hamlet,
realm=hamlet.realm,
event_time__gte=now,
event_type=RealmAuditLog.USER_GROUP_CREATED,
)
self.assert_length(audit_log_entries, 1)
self.assertIsNone(audit_log_entries[0].modified_user)
self.assertEqual(audit_log_entries[0].modified_user_group, user_group)
audit_log_entries = RealmAuditLog.objects.filter(
acting_user=hamlet,
realm=hamlet.realm,
event_time__gte=now,
event_type=RealmAuditLog.USER_GROUP_DIRECT_USER_MEMBERSHIP_ADDED,
)
self.assert_length(audit_log_entries, 2)
self.assertEqual(audit_log_entries[0].modified_user, hamlet)
self.assertEqual(audit_log_entries[1].modified_user, cordelia)
audit_log_entries = RealmAuditLog.objects.filter(
acting_user=hamlet,
realm=hamlet.realm,
event_time__gte=now,
event_type=RealmAuditLog.USER_GROUP_GROUP_BASED_SETTING_CHANGED,
)
self.assert_length(audit_log_entries, len(UserGroup.GROUP_PERMISSION_SETTINGS))
self.assertListEqual(
[
orjson.loads(assert_is_not_none(audit_log.extra_data))
for audit_log in audit_log_entries
],
[
{
RealmAuditLog.OLD_VALUE: None,
RealmAuditLog.NEW_VALUE: public_group.id,
"property": "can_mention_group",
}
],
)
def test_change_user_group_memberships(self) -> None:
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
now = timezone_now()
user_group = check_add_user_group(hamlet.realm, "foo", [], acting_user=None)
bulk_add_members_to_user_group(user_group, [hamlet.id, cordelia.id], acting_user=hamlet)
audit_log_entries = RealmAuditLog.objects.filter(
acting_user=hamlet,
realm=hamlet.realm,
modified_user_group=user_group,
event_time__gte=now,
event_type=RealmAuditLog.USER_GROUP_DIRECT_USER_MEMBERSHIP_ADDED,
)
self.assert_length(audit_log_entries, 2)
self.assertEqual(audit_log_entries[0].modified_user, hamlet)
self.assertEqual(audit_log_entries[1].modified_user, cordelia)
remove_members_from_user_group(user_group, [hamlet.id], acting_user=hamlet)
audit_log_entries = RealmAuditLog.objects.filter(
acting_user=hamlet,
realm=hamlet.realm,
modified_user_group=user_group,
event_time__gte=now,
event_type=RealmAuditLog.USER_GROUP_DIRECT_USER_MEMBERSHIP_REMOVED,
)
self.assert_length(audit_log_entries, 1)
self.assertEqual(audit_log_entries[0].modified_user, hamlet)
def test_change_user_group_subgroups_memberships(self) -> None:
hamlet = self.example_user("hamlet")
user_group = check_add_user_group(hamlet.realm, "main", [], acting_user=None)
subgroups = [
check_add_user_group(hamlet.realm, f"subgroup{num}", [], acting_user=hamlet)
for num in range(3)
]
now = timezone_now()
add_subgroups_to_user_group(user_group, subgroups, acting_user=hamlet)
# Only one audit log entry for the subgroup membership is expected.
audit_log_entry = RealmAuditLog.objects.get(
realm=hamlet.realm,
event_time__gte=now,
event_type=RealmAuditLog.USER_GROUP_DIRECT_SUBGROUP_MEMBERSHIP_ADDED,
)
self.assertEqual(audit_log_entry.modified_user_group, user_group)
self.assertEqual(audit_log_entry.acting_user, hamlet)
self.assertDictEqual(
orjson.loads(assert_is_not_none(audit_log_entry.extra_data)),
{"subgroup_ids": [subgroup.id for subgroup in subgroups]},
)
audit_log_entries = RealmAuditLog.objects.filter(
realm=hamlet.realm,
event_time__gte=now,
event_type=RealmAuditLog.USER_GROUP_DIRECT_SUPERGROUP_MEMBERSHIP_ADDED,
).order_by("id")
self.assert_length(audit_log_entries, 3)
for i in range(3):
self.assertEqual(audit_log_entries[i].modified_user_group, subgroups[i])
self.assertEqual(audit_log_entries[i].acting_user, hamlet)
self.assertDictEqual(
orjson.loads(assert_is_not_none(audit_log_entries[i].extra_data)),
{"supergroup_ids": [user_group.id]},
)
remove_subgroups_from_user_group(user_group, subgroups[:2], acting_user=hamlet)
audit_log_entry = RealmAuditLog.objects.get(
realm=hamlet.realm,
event_time__gte=now,
event_type=RealmAuditLog.USER_GROUP_DIRECT_SUBGROUP_MEMBERSHIP_REMOVED,
)
self.assertEqual(audit_log_entry.modified_user_group, user_group)
self.assertEqual(audit_log_entry.acting_user, hamlet)
self.assertDictEqual(
orjson.loads(assert_is_not_none(audit_log_entry.extra_data)),
{"subgroup_ids": [subgroup.id for subgroup in subgroups[:2]]},
)
audit_log_entries = RealmAuditLog.objects.filter(
realm=hamlet.realm,
event_time__gte=now,
event_type=RealmAuditLog.USER_GROUP_DIRECT_SUPERGROUP_MEMBERSHIP_REMOVED,
).order_by("id")
self.assert_length(audit_log_entries, 2)
for i in range(2):
self.assertEqual(audit_log_entries[i].modified_user_group, subgroups[i])
self.assertEqual(audit_log_entries[i].acting_user, hamlet)
self.assertDictEqual(
orjson.loads(assert_is_not_none(audit_log_entries[i].extra_data)),
{"supergroup_ids": [user_group.id]},
)
def test_user_group_property_change(self) -> None:
hamlet = self.example_user("hamlet")
user_group = check_add_user_group(
hamlet.realm,
"demo",
[],
description="No description",
acting_user=hamlet,
)
now = timezone_now()
do_update_user_group_name(user_group, "bar", acting_user=hamlet)
audit_log_entries = RealmAuditLog.objects.filter(
realm=hamlet.realm,
event_type=RealmAuditLog.USER_GROUP_NAME_CHANGED,
event_time__gte=now,
)
self.assert_length(audit_log_entries, 1)
self.assertDictEqual(
orjson.loads(assert_is_not_none(audit_log_entries[0].extra_data)),
{
RealmAuditLog.OLD_VALUE: "demo",
RealmAuditLog.NEW_VALUE: "bar",
},
)
do_update_user_group_description(user_group, "Foo", acting_user=hamlet)
audit_log_entries = RealmAuditLog.objects.filter(
realm=hamlet.realm,
event_type=RealmAuditLog.USER_GROUP_DESCRIPTION_CHANGED,
event_time__gte=now,
)
self.assert_length(audit_log_entries, 1)
self.assertDictEqual(
orjson.loads(assert_is_not_none(audit_log_entries[0].extra_data)),
{
RealmAuditLog.OLD_VALUE: "No description",
RealmAuditLog.NEW_VALUE: "Foo",
},
)
old_group = user_group.can_mention_group
new_group = UserGroup.objects.get(
name=UserGroup.EVERYONE_ON_INTERNET_GROUP_NAME, realm=user_group.realm
)
self.assertNotEqual(old_group.id, new_group.id)
do_change_user_group_permission_setting(
user_group, "can_mention_group", new_group, acting_user=None
)
audit_log_entries = RealmAuditLog.objects.filter(
event_type=RealmAuditLog.USER_GROUP_GROUP_BASED_SETTING_CHANGED,
event_time__gte=now,
)
self.assert_length(audit_log_entries, 1)
self.assertIsNone(audit_log_entries[0].acting_user)
self.assertDictEqual(
orjson.loads(assert_is_not_none(audit_log_entries[0].extra_data)),
{
RealmAuditLog.OLD_VALUE: old_group.id,
RealmAuditLog.NEW_VALUE: new_group.id,
"property": "can_mention_group",
},
)