mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	We try to use the correct variation of `email`
or `delivery_email`, even though in some
databases they are the same.
(To find the differences, I temporarily hacked
populate_db to use different values for email
and delivery_email, and reduced email visibility
in the zulip realm to admins only.)
In places where we want the "normal" realm
behavior of showing emails (and having `email`
be the same as `delivery_email`), we use
the new `reset_emails_in_zulip_realm` helper.
A couple random things:
    - I fixed any error messages that were leaking
      the wrong email
    - a test that claimed to rely on the order
      of emails no longer does (we sort user_ids
      instead)
    - we now use user_ids in some place where we used
      to use emails
    - for IRC mirrors I just punted and used
      `reset_emails_in_zulip_realm` in most places
    - for MIT-related tests, I didn't fix email
      vs. delivery_email unless it was obvious
I also explicitly reset the realm to a "normal"
realm for a couple tests that I frankly just didn't
have the energy to debug.  (Also, we do want some
coverage on the normal case, even though it is
"easier" for tests to pass if you mix up `email`
and `delivery_email`.)
In particular, I just reset data for the analytics
and corporate tests.
		
	
		
			
				
	
	
		
			188 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			188 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from django.utils.timezone import now as timezone_now
 | 
						|
 | 
						|
from zerver.lib.actions import do_create_user, do_deactivate_user, \
 | 
						|
    do_activate_user, do_reactivate_user, do_change_password, \
 | 
						|
    do_change_user_delivery_email, do_change_avatar_fields, do_change_bot_owner, \
 | 
						|
    do_regenerate_api_key, do_change_tos_version, \
 | 
						|
    bulk_add_subscriptions, bulk_remove_subscriptions, get_streams_traffic, \
 | 
						|
    do_change_is_admin, do_change_is_guest, do_deactivate_realm, do_reactivate_realm
 | 
						|
from zerver.lib.test_classes import ZulipTestCase
 | 
						|
from zerver.models import RealmAuditLog, get_client, get_realm, UserProfile
 | 
						|
from analytics.models import StreamCount
 | 
						|
 | 
						|
from datetime import timedelta
 | 
						|
from django.contrib.auth.password_validation import validate_password
 | 
						|
 | 
						|
from typing import Any, Dict
 | 
						|
import ujson
 | 
						|
 | 
						|
class TestRealmAuditLog(ZulipTestCase):
 | 
						|
    def check_role_count_schema(self, role_counts: Dict[str, Any]) -> None:
 | 
						|
        for key in [UserProfile.ROLE_REALM_ADMINISTRATOR,
 | 
						|
                    UserProfile.ROLE_MEMBER,
 | 
						|
                    UserProfile.ROLE_GUEST]:
 | 
						|
            # str(key) since json keys are always strings, and ujson.dumps will have converted
 | 
						|
            # the UserProfile.role values into strings
 | 
						|
            self.assertTrue(isinstance(role_counts[RealmAuditLog.ROLE_COUNT_HUMANS][str(key)], int))
 | 
						|
        self.assertTrue(isinstance(role_counts[RealmAuditLog.ROLE_COUNT_BOTS], int))
 | 
						|
 | 
						|
    def test_user_activation(self) -> None:
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        now = timezone_now()
 | 
						|
        user = do_create_user('email', 'password', realm, 'full_name', 'short_name')
 | 
						|
        do_deactivate_user(user)
 | 
						|
        do_activate_user(user)
 | 
						|
        do_deactivate_user(user)
 | 
						|
        do_reactivate_user(user)
 | 
						|
        self.assertEqual(RealmAuditLog.objects.filter(event_time__gte=now).count(), 5)
 | 
						|
        event_types = list(RealmAuditLog.objects.filter(
 | 
						|
            realm=realm, acting_user=None, modified_user=user, modified_stream=None,
 | 
						|
            event_time__gte=now, event_time__lte=now+timedelta(minutes=60))
 | 
						|
            .order_by('event_time').values_list('event_type', flat=True))
 | 
						|
        self.assertEqual(event_types, [RealmAuditLog.USER_CREATED, RealmAuditLog.USER_DEACTIVATED,
 | 
						|
                                       RealmAuditLog.USER_ACTIVATED, RealmAuditLog.USER_DEACTIVATED,
 | 
						|
                                       RealmAuditLog.USER_REACTIVATED])
 | 
						|
        for event in RealmAuditLog.objects.filter(
 | 
						|
                realm=realm, acting_user=None, modified_user=user, modified_stream=None,
 | 
						|
                event_time__gte=now, event_time__lte=now+timedelta(minutes=60)):
 | 
						|
            extra_data = ujson.loads(event.extra_data)
 | 
						|
            self.check_role_count_schema(extra_data[RealmAuditLog.ROLE_COUNT])
 | 
						|
            self.assertNotIn(RealmAuditLog.OLD_VALUE, extra_data)
 | 
						|
 | 
						|
    def test_change_role(self) -> None:
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        now = timezone_now()
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        do_change_is_admin(user_profile, True)
 | 
						|
        do_change_is_admin(user_profile, False)
 | 
						|
        do_change_is_guest(user_profile, True)
 | 
						|
        do_change_is_guest(user_profile, False)
 | 
						|
        for event in RealmAuditLog.objects.filter(
 | 
						|
                event_type=RealmAuditLog.USER_ROLE_CHANGED,
 | 
						|
                realm=realm, modified_user=user_profile,
 | 
						|
                event_time__gte=now, event_time__lte=now+timedelta(minutes=60)):
 | 
						|
            extra_data = ujson.loads(event.extra_data)
 | 
						|
            self.check_role_count_schema(extra_data[RealmAuditLog.ROLE_COUNT])
 | 
						|
            self.assertIn(RealmAuditLog.OLD_VALUE, extra_data)
 | 
						|
            self.assertIn(RealmAuditLog.NEW_VALUE, extra_data)
 | 
						|
 | 
						|
    def test_change_password(self) -> None:
 | 
						|
        now = timezone_now()
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        password = 'test1'
 | 
						|
        do_change_password(user, password)
 | 
						|
        self.assertEqual(RealmAuditLog.objects.filter(event_type=RealmAuditLog.USER_PASSWORD_CHANGED,
 | 
						|
                                                      event_time__gte=now).count(), 1)
 | 
						|
        self.assertIsNone(validate_password(password, user))
 | 
						|
 | 
						|
    def test_change_email(self) -> None:
 | 
						|
        now = timezone_now()
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        new_email = 'test@example.com'
 | 
						|
        do_change_user_delivery_email(user, new_email)
 | 
						|
        self.assertEqual(RealmAuditLog.objects.filter(event_type=RealmAuditLog.USER_EMAIL_CHANGED,
 | 
						|
                                                      event_time__gte=now).count(), 1)
 | 
						|
        self.assertEqual(new_email, user.delivery_email)
 | 
						|
 | 
						|
        # Test the RealmAuditLog stringification
 | 
						|
        audit_entry = RealmAuditLog.objects.get(event_type=RealmAuditLog.USER_EMAIL_CHANGED, event_time__gte=now)
 | 
						|
        self.assertTrue(str(audit_entry).startswith("<RealmAuditLog: <UserProfile: %s %s> %s " % (user.email, user.realm, RealmAuditLog.USER_EMAIL_CHANGED)))
 | 
						|
 | 
						|
    def test_change_avatar_source(self) -> None:
 | 
						|
        now = timezone_now()
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        avatar_source = u'G'
 | 
						|
        do_change_avatar_fields(user, avatar_source)
 | 
						|
        self.assertEqual(RealmAuditLog.objects.filter(event_type=RealmAuditLog.USER_AVATAR_SOURCE_CHANGED,
 | 
						|
                                                      event_time__gte=now).count(), 1)
 | 
						|
        self.assertEqual(avatar_source, user.avatar_source)
 | 
						|
 | 
						|
    def test_change_full_name(self) -> None:
 | 
						|
        start = timezone_now()
 | 
						|
        new_name = 'George Hamletovich'
 | 
						|
        self.login('iago')
 | 
						|
        req = dict(full_name=ujson.dumps(new_name))
 | 
						|
        result = self.client_patch('/json/users/{}'.format(self.example_user("hamlet").id), req)
 | 
						|
        self.assertTrue(result.status_code == 200)
 | 
						|
        query = RealmAuditLog.objects.filter(event_type=RealmAuditLog.USER_FULL_NAME_CHANGED,
 | 
						|
                                             event_time__gte=start)
 | 
						|
        self.assertEqual(query.count(), 1)
 | 
						|
 | 
						|
    def test_change_tos_version(self) -> None:
 | 
						|
        now = timezone_now()
 | 
						|
        user = self.example_user("hamlet")
 | 
						|
        tos_version = 'android'
 | 
						|
        do_change_tos_version(user, tos_version)
 | 
						|
        self.assertEqual(RealmAuditLog.objects.filter(event_type=RealmAuditLog.USER_TOS_VERSION_CHANGED,
 | 
						|
                                                      event_time__gte=now).count(), 1)
 | 
						|
        self.assertEqual(tos_version, user.tos_version)
 | 
						|
 | 
						|
    def test_change_bot_owner(self) -> None:
 | 
						|
        now = timezone_now()
 | 
						|
        admin = self.example_user('iago')
 | 
						|
        bot = self.notification_bot()
 | 
						|
        bot_owner = self.example_user('hamlet')
 | 
						|
        do_change_bot_owner(bot, bot_owner, admin)
 | 
						|
        self.assertEqual(RealmAuditLog.objects.filter(event_type=RealmAuditLog.USER_BOT_OWNER_CHANGED,
 | 
						|
                                                      event_time__gte=now).count(), 1)
 | 
						|
        self.assertEqual(bot_owner, bot.bot_owner)
 | 
						|
 | 
						|
    def test_regenerate_api_key(self) -> None:
 | 
						|
        now = timezone_now()
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        do_regenerate_api_key(user, user)
 | 
						|
        self.assertEqual(RealmAuditLog.objects.filter(event_type=RealmAuditLog.USER_API_KEY_CHANGED,
 | 
						|
                                                      event_time__gte=now).count(), 1)
 | 
						|
        self.assertTrue(user.api_key)
 | 
						|
 | 
						|
    def test_get_streams_traffic(self) -> None:
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        stream_name = 'whatever'
 | 
						|
        stream = self.make_stream(stream_name, realm)
 | 
						|
        stream_ids = {stream.id}
 | 
						|
 | 
						|
        result = get_streams_traffic(stream_ids)
 | 
						|
        self.assertEqual(result, {})
 | 
						|
 | 
						|
        StreamCount.objects.create(
 | 
						|
            realm=realm,
 | 
						|
            stream=stream,
 | 
						|
            property='messages_in_stream:is_bot:day',
 | 
						|
            end_time=timezone_now(),
 | 
						|
            value=999,
 | 
						|
        )
 | 
						|
 | 
						|
        result = get_streams_traffic(stream_ids)
 | 
						|
        self.assertEqual(result, {stream.id: 999})
 | 
						|
 | 
						|
    def test_subscriptions(self) -> None:
 | 
						|
        now = timezone_now()
 | 
						|
        user = [self.example_user('hamlet')]
 | 
						|
        stream = [self.make_stream('test_stream')]
 | 
						|
 | 
						|
        bulk_add_subscriptions(stream, user)
 | 
						|
        subscription_creation_logs = RealmAuditLog.objects.filter(event_type=RealmAuditLog.SUBSCRIPTION_CREATED,
 | 
						|
                                                                  event_time__gte=now)
 | 
						|
        self.assertEqual(subscription_creation_logs.count(), 1)
 | 
						|
        self.assertEqual(subscription_creation_logs[0].modified_stream.id, stream[0].id)
 | 
						|
        self.assertEqual(subscription_creation_logs[0].modified_user, user[0])
 | 
						|
 | 
						|
        bulk_remove_subscriptions(user, stream, get_client("website"))
 | 
						|
        subscription_deactivation_logs = RealmAuditLog.objects.filter(event_type=RealmAuditLog.SUBSCRIPTION_DEACTIVATED,
 | 
						|
                                                                      event_time__gte=now)
 | 
						|
        self.assertEqual(subscription_deactivation_logs.count(), 1)
 | 
						|
        self.assertEqual(subscription_deactivation_logs[0].modified_stream.id, stream[0].id)
 | 
						|
        self.assertEqual(subscription_deactivation_logs[0].modified_user, user[0])
 | 
						|
 | 
						|
    def test_realm_activation(self) -> None:
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        do_deactivate_realm(realm)
 | 
						|
        log_entry = RealmAuditLog.objects.get(realm=realm, event_type=RealmAuditLog.REALM_DEACTIVATED)
 | 
						|
        extra_data = ujson.loads(log_entry.extra_data)
 | 
						|
        self.check_role_count_schema(extra_data[RealmAuditLog.ROLE_COUNT])
 | 
						|
 | 
						|
        do_reactivate_realm(realm)
 | 
						|
        log_entry = RealmAuditLog.objects.get(realm=realm, event_type=RealmAuditLog.REALM_REACTIVATED)
 | 
						|
        extra_data = ujson.loads(log_entry.extra_data)
 | 
						|
        self.check_role_count_schema(extra_data[RealmAuditLog.ROLE_COUNT])
 |