mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 05:23:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1203 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1203 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from typing import TYPE_CHECKING, Any
 | 
						|
from unittest import mock
 | 
						|
 | 
						|
import orjson
 | 
						|
from typing_extensions import override
 | 
						|
 | 
						|
from zerver.actions.reactions import notify_reaction_update
 | 
						|
from zerver.actions.streams import do_change_stream_group_based_setting, do_change_stream_permission
 | 
						|
from zerver.actions.user_groups import check_add_user_group
 | 
						|
from zerver.lib.cache import cache_get, to_dict_cache_key_id
 | 
						|
from zerver.lib.emoji import get_emoji_data
 | 
						|
from zerver.lib.exceptions import JsonableError
 | 
						|
from zerver.lib.message_cache import extract_message_dict
 | 
						|
from zerver.lib.test_classes import ZulipTestCase
 | 
						|
from zerver.lib.test_helpers import zulip_reaction_info
 | 
						|
from zerver.models import Message, Reaction, RealmEmoji, UserMessage
 | 
						|
from zerver.models.realms import get_realm
 | 
						|
from zerver.models.streams import Subscription
 | 
						|
 | 
						|
if TYPE_CHECKING:
 | 
						|
    from django.test.client import _MonkeyPatchedWSGIResponse as TestHttpResponse
 | 
						|
 | 
						|
 | 
						|
class ReactionEmojiTest(ZulipTestCase):
 | 
						|
    def test_missing_emoji(self) -> None:
 | 
						|
        """
 | 
						|
        Sending reaction without emoji fails
 | 
						|
        """
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assertEqual(result.status_code, 400)
 | 
						|
 | 
						|
    def test_add_invalid_emoji(self) -> None:
 | 
						|
        """
 | 
						|
        Sending invalid emoji fails
 | 
						|
        """
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "foo",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assert_json_error(result, "Emoji 'foo' does not exist")
 | 
						|
 | 
						|
    def test_add_deactivated_realm_emoji(self) -> None:
 | 
						|
        """
 | 
						|
        Sending deactivated realm emoji fails.
 | 
						|
        """
 | 
						|
        emoji = RealmEmoji.objects.get(name="green_tick")
 | 
						|
        emoji.deactivated = True
 | 
						|
        emoji.save(update_fields=["deactivated"])
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "green_tick",
 | 
						|
            "reaction_type": "realm_emoji",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assert_json_error(result, "Emoji 'green_tick' does not exist")
 | 
						|
 | 
						|
    def test_valid_emoji(self) -> None:
 | 
						|
        """
 | 
						|
        Reacting with valid emoji succeeds
 | 
						|
        """
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        base_query = Reaction.objects.filter(
 | 
						|
            user_profile=sender,
 | 
						|
            message=Message.objects.get(id=1),
 | 
						|
        )
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(200, result.status_code)
 | 
						|
        self.assertTrue(base_query.filter(emoji_name=reaction_info["emoji_name"]).exists())
 | 
						|
 | 
						|
        reaction_info["emoji_name"] = "green_tick"
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(200, result.status_code)
 | 
						|
        self.assertTrue(base_query.filter(emoji_name=reaction_info["emoji_name"]).exists())
 | 
						|
 | 
						|
    def test_cached_reaction_data(self) -> None:
 | 
						|
        """
 | 
						|
        Formatted reactions data is saved in cache.
 | 
						|
        """
 | 
						|
        senders = [self.example_user("hamlet"), self.example_user("cordelia")]
 | 
						|
        emojis = ["smile", "tada"]
 | 
						|
        expected_emoji_codes = ["1f642", "1f389"]
 | 
						|
 | 
						|
        for sender, emoji in zip(senders, emojis, strict=False):
 | 
						|
            reaction_info = {
 | 
						|
                "emoji_name": emoji,
 | 
						|
            }
 | 
						|
            result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
 | 
						|
            self.assert_json_success(result)
 | 
						|
            self.assertEqual(200, result.status_code)
 | 
						|
 | 
						|
        key = to_dict_cache_key_id(1)
 | 
						|
        message = extract_message_dict(cache_get(key)[0])
 | 
						|
 | 
						|
        expected_reaction_data = [
 | 
						|
            {
 | 
						|
                "emoji_name": emoji,
 | 
						|
                "emoji_code": emoji_code,
 | 
						|
                "reaction_type": "unicode_emoji",
 | 
						|
                "user_id": sender.id,
 | 
						|
            }
 | 
						|
            # It's important that we preserve the loop order in this
 | 
						|
            # test, since this is our test to verify that we're
 | 
						|
            # returning reactions in chronological order.
 | 
						|
            for sender, emoji, emoji_code in zip(
 | 
						|
                senders, emojis, expected_emoji_codes, strict=False
 | 
						|
            )
 | 
						|
        ]
 | 
						|
        self.assertEqual(expected_reaction_data, message["reactions"])
 | 
						|
 | 
						|
    def test_zulip_emoji(self) -> None:
 | 
						|
        """
 | 
						|
        Reacting with zulip emoji succeeds
 | 
						|
        """
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "zulip",
 | 
						|
            "reaction_type": "zulip_extra_emoji",
 | 
						|
        }
 | 
						|
        base_query = Reaction.objects.filter(
 | 
						|
            user_profile=sender, emoji_name=reaction_info["emoji_name"]
 | 
						|
        )
 | 
						|
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(200, result.status_code)
 | 
						|
        self.assertTrue(base_query.filter(message=Message.objects.get(id=1)).exists())
 | 
						|
 | 
						|
        reaction_info.pop("reaction_type")
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/2/reactions", reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertEqual(200, result.status_code)
 | 
						|
        self.assertTrue(base_query.filter(message=Message.objects.get(id=2)).exists())
 | 
						|
 | 
						|
    def test_valid_emoji_react_historical(self) -> None:
 | 
						|
        """
 | 
						|
        Reacting with valid emoji on a historical message succeeds
 | 
						|
        """
 | 
						|
        stream_name = "Saxony"
 | 
						|
        self.subscribe(self.example_user("cordelia"), stream_name)
 | 
						|
        message_id = self.send_stream_message(self.example_user("cordelia"), stream_name)
 | 
						|
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        sender = user_profile
 | 
						|
 | 
						|
        # Verify that hamlet did not receive the message.
 | 
						|
        self.assertFalse(
 | 
						|
            UserMessage.objects.filter(user_profile=user_profile, message_id=message_id).exists()
 | 
						|
        )
 | 
						|
 | 
						|
        # Have hamlet react to the message
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(sender, f"/api/v1/messages/{message_id}/reactions", reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Fetch the now-created UserMessage object to confirm it exists and is historical
 | 
						|
        user_message = UserMessage.objects.get(user_profile=user_profile, message_id=message_id)
 | 
						|
        self.assertTrue(user_message.flags.historical)
 | 
						|
        self.assertTrue(user_message.flags.read)
 | 
						|
        self.assertFalse(user_message.flags.starred)
 | 
						|
 | 
						|
    def test_valid_realm_emoji(self) -> None:
 | 
						|
        """
 | 
						|
        Reacting with valid realm emoji succeeds
 | 
						|
        """
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "green_tick",
 | 
						|
            "reaction_type": "realm_emoji",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_get_emoji_data(self) -> None:
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        realm_emoji = RealmEmoji.objects.get(name="green_tick")
 | 
						|
 | 
						|
        def verify(emoji_name: str, emoji_code: str, reaction_type: str) -> None:
 | 
						|
            emoji_data = get_emoji_data(realm.id, emoji_name)
 | 
						|
            self.assertEqual(emoji_data.emoji_code, emoji_code)
 | 
						|
            self.assertEqual(emoji_data.reaction_type, reaction_type)
 | 
						|
 | 
						|
        # Test active realm emoji.
 | 
						|
        verify("green_tick", str(realm_emoji.id), "realm_emoji")
 | 
						|
 | 
						|
        # Test deactivated realm emoji.
 | 
						|
        realm_emoji.deactivated = True
 | 
						|
        realm_emoji.save(update_fields=["deactivated"])
 | 
						|
        with self.assertRaises(JsonableError) as exc:
 | 
						|
            get_emoji_data(realm.id, "green_tick")
 | 
						|
        self.assertEqual(str(exc.exception), "Emoji 'green_tick' does not exist")
 | 
						|
 | 
						|
        # Test ':zulip:' emoji.
 | 
						|
        verify("zulip", "zulip", "zulip_extra_emoji")
 | 
						|
 | 
						|
        # Test Unicode emoji.
 | 
						|
        verify("astonished", "1f632", "unicode_emoji")
 | 
						|
 | 
						|
        # Test override Unicode emoji.
 | 
						|
        overriding_emoji = RealmEmoji.objects.create(
 | 
						|
            name="astonished", realm=realm, file_name="astonished"
 | 
						|
        )
 | 
						|
        verify("astonished", str(overriding_emoji.id), "realm_emoji")
 | 
						|
 | 
						|
        # Test deactivate over-ridding realm emoji.
 | 
						|
        overriding_emoji.deactivated = True
 | 
						|
        overriding_emoji.save(update_fields=["deactivated"])
 | 
						|
        verify("astonished", "1f632", "unicode_emoji")
 | 
						|
 | 
						|
        # Test override `:zulip:` emoji.
 | 
						|
        overriding_emoji = RealmEmoji.objects.create(name="zulip", realm=realm, file_name="zulip")
 | 
						|
        verify("zulip", str(overriding_emoji.id), "realm_emoji")
 | 
						|
 | 
						|
        # Test non-existent emoji.
 | 
						|
        with self.assertRaises(JsonableError) as exc:
 | 
						|
            get_emoji_data(realm.id, "invalid_emoji")
 | 
						|
        self.assertEqual(str(exc.exception), "Emoji 'invalid_emoji' does not exist")
 | 
						|
 | 
						|
 | 
						|
class ReactionMessageIDTest(ZulipTestCase):
 | 
						|
    def test_missing_message_id(self) -> None:
 | 
						|
        """
 | 
						|
        Reacting without a message_id fails
 | 
						|
        """
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(sender, "/api/v1/messages//reactions", reaction_info)
 | 
						|
        self.assertEqual(result.status_code, 404)
 | 
						|
 | 
						|
    def test_invalid_message_id(self) -> None:
 | 
						|
        """
 | 
						|
        Reacting to an invalid message id fails
 | 
						|
        """
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/-1/reactions", reaction_info)
 | 
						|
        self.assertEqual(result.status_code, 404)
 | 
						|
 | 
						|
    def test_inaccessible_message_id(self) -> None:
 | 
						|
        """
 | 
						|
        Reacting to a inaccessible (for instance, private) message fails
 | 
						|
        """
 | 
						|
        pm_sender = self.example_user("hamlet")
 | 
						|
        pm_recipient = self.example_user("othello")
 | 
						|
        reaction_sender = self.example_user("iago")
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            pm_sender,
 | 
						|
            "/api/v1/messages",
 | 
						|
            {
 | 
						|
                "type": "private",
 | 
						|
                "content": "Test message",
 | 
						|
                "to": orjson.dumps([pm_recipient.email]).decode(),
 | 
						|
            },
 | 
						|
        )
 | 
						|
        pm_id = self.assert_json_success(result)["id"]
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_error(result, "Invalid message(s)")
 | 
						|
 | 
						|
 | 
						|
class ReactionTest(ZulipTestCase):
 | 
						|
    def test_add_existing_reaction(self) -> None:
 | 
						|
        """
 | 
						|
        Creating the same reaction twice fails
 | 
						|
        """
 | 
						|
        pm_sender = self.example_user("hamlet")
 | 
						|
        pm_recipient = self.example_user("othello")
 | 
						|
        reaction_sender = pm_recipient
 | 
						|
 | 
						|
        pm = self.api_post(
 | 
						|
            pm_sender,
 | 
						|
            "/api/v1/messages",
 | 
						|
            {
 | 
						|
                "type": "private",
 | 
						|
                "content": "Test message",
 | 
						|
                "to": orjson.dumps([pm_recipient.email]).decode(),
 | 
						|
            },
 | 
						|
        )
 | 
						|
        self.assert_json_success(pm)
 | 
						|
        content = orjson.loads(pm.content)
 | 
						|
 | 
						|
        pm_id = content["id"]
 | 
						|
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        first = self.api_post(reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info)
 | 
						|
        self.assert_json_success(first)
 | 
						|
 | 
						|
        second = self.api_post(
 | 
						|
            reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_error(second, "Reaction already exists.")
 | 
						|
 | 
						|
    def test_remove_nonexisting_reaction(self) -> None:
 | 
						|
        """
 | 
						|
        Removing a reaction twice fails
 | 
						|
        """
 | 
						|
        pm_sender = self.example_user("hamlet")
 | 
						|
        pm_recipient = self.example_user("othello")
 | 
						|
        reaction_sender = pm_recipient
 | 
						|
 | 
						|
        pm = self.api_post(
 | 
						|
            pm_sender,
 | 
						|
            "/api/v1/messages",
 | 
						|
            {
 | 
						|
                "type": "private",
 | 
						|
                "content": "Test message",
 | 
						|
                "to": orjson.dumps([pm_recipient.email]).decode(),
 | 
						|
            },
 | 
						|
        )
 | 
						|
        self.assert_json_success(pm)
 | 
						|
 | 
						|
        content = orjson.loads(pm.content)
 | 
						|
        pm_id = content["id"]
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        add = self.api_post(reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info)
 | 
						|
        self.assert_json_success(add)
 | 
						|
 | 
						|
        first = self.api_delete(
 | 
						|
            reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_success(first)
 | 
						|
 | 
						|
        second = self.api_delete(
 | 
						|
            reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_error(second, "Reaction doesn't exist.")
 | 
						|
 | 
						|
    def test_remove_existing_reaction_with_renamed_emoji(self) -> None:
 | 
						|
        """
 | 
						|
        Removes an old existing reaction but the name of emoji got changed during
 | 
						|
        various emoji infra changes.
 | 
						|
        """
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        emoji_data = get_emoji_data(sender.realm_id, "smile")
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
            "emoji_code": emoji_data.emoji_code,
 | 
						|
            "reaction_type": emoji_data.reaction_type,
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        with mock.patch("zerver.lib.emoji.name_to_codepoint", name_to_codepoint={}):
 | 
						|
            result = self.api_delete(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
            self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_remove_existing_reaction_with_deactivated_realm_emoji(self) -> None:
 | 
						|
        """
 | 
						|
        Removes an old existing reaction but the realm emoji used there has been deactivated.
 | 
						|
        """
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
 | 
						|
        emoji = RealmEmoji.objects.get(name="green_tick")
 | 
						|
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "green_tick",
 | 
						|
            "emoji_code": str(emoji.id),
 | 
						|
            "reaction_type": "realm_emoji",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Deactivate realm emoji.
 | 
						|
        emoji.deactivated = True
 | 
						|
        emoji.save(update_fields=["deactivated"])
 | 
						|
        result = self.api_delete(sender, "/api/v1/messages/1/reactions", reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
 | 
						|
class ReactionEventTest(ZulipTestCase):
 | 
						|
    def test_add_event(self) -> None:
 | 
						|
        """
 | 
						|
        Recipients of the message receive the reaction event
 | 
						|
        and event contains relevant data
 | 
						|
        """
 | 
						|
        pm_sender = self.example_user("hamlet")
 | 
						|
        pm_recipient = self.example_user("othello")
 | 
						|
        reaction_sender = pm_recipient
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            pm_sender,
 | 
						|
            "/api/v1/messages",
 | 
						|
            {
 | 
						|
                "type": "private",
 | 
						|
                "content": "Test message",
 | 
						|
                "to": orjson.dumps([pm_recipient.email]).decode(),
 | 
						|
            },
 | 
						|
        )
 | 
						|
        pm_id = self.assert_json_success(result)["id"]
 | 
						|
 | 
						|
        expected_recipient_ids = {pm_sender.id, pm_recipient.id}
 | 
						|
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            result = self.api_post(
 | 
						|
                reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        event = events[0]["event"]
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
 | 
						|
        self.assertEqual(expected_recipient_ids, event_user_ids)
 | 
						|
        self.assertEqual(event["user"]["email"], reaction_sender.email)
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        self.assertEqual(event["op"], "add")
 | 
						|
        self.assertEqual(event["emoji_name"], "smile")
 | 
						|
        self.assertEqual(event["message_id"], pm_id)
 | 
						|
 | 
						|
    def test_remove_event(self) -> None:
 | 
						|
        """
 | 
						|
        Recipients of the message receive the reaction event
 | 
						|
        and event contains relevant data
 | 
						|
        """
 | 
						|
        pm_sender = self.example_user("hamlet")
 | 
						|
        pm_recipient = self.example_user("othello")
 | 
						|
        reaction_sender = pm_recipient
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            pm_sender,
 | 
						|
            "/api/v1/messages",
 | 
						|
            {
 | 
						|
                "type": "private",
 | 
						|
                "content": "Test message",
 | 
						|
                "to": orjson.dumps([pm_recipient.email]).decode(),
 | 
						|
            },
 | 
						|
        )
 | 
						|
        content = self.assert_json_success(result)
 | 
						|
        pm_id = content["id"]
 | 
						|
 | 
						|
        expected_recipient_ids = {pm_sender.id, pm_recipient.id}
 | 
						|
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        add = self.api_post(reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info)
 | 
						|
        self.assert_json_success(add)
 | 
						|
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            result = self.api_delete(
 | 
						|
                reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        event = events[0]["event"]
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
 | 
						|
        self.assertEqual(expected_recipient_ids, event_user_ids)
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        self.assertEqual(event["op"], "remove")
 | 
						|
        self.assertEqual(event["emoji_name"], "smile")
 | 
						|
        self.assertEqual(event["message_id"], pm_id)
 | 
						|
 | 
						|
    def test_reaction_event_scope(self) -> None:
 | 
						|
        iago = self.example_user("iago")
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        polonius = self.example_user("polonius")
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
 | 
						|
        # Test `invite_only` streams with `!history_public_to_subscribers` and `!is_web_public`
 | 
						|
        stream = self.make_stream(
 | 
						|
            "test_reactions_stream", invite_only=True, history_public_to_subscribers=False
 | 
						|
        )
 | 
						|
        self.subscribe(iago, stream.name)
 | 
						|
        message_before_id = self.send_stream_message(
 | 
						|
            iago, "test_reactions_stream", "before subscription history private"
 | 
						|
        )
 | 
						|
        self.subscribe(hamlet, stream.name)
 | 
						|
        self.subscribe(polonius, stream.name)
 | 
						|
 | 
						|
        # Hamlet and Polonius joined after the message was sent, and
 | 
						|
        # so only Iago should receive the event.
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            result = self.api_post(
 | 
						|
                iago, f"/api/v1/messages/{message_before_id}/reactions", reaction_info
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        event = events[0]["event"]
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
        self.assertEqual(event_user_ids, {iago.id})
 | 
						|
        remove = self.api_delete(
 | 
						|
            iago, f"/api/v1/messages/{message_before_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_success(remove)
 | 
						|
 | 
						|
        # Reaction to a Message sent after subscription, should
 | 
						|
        # trigger events for all subscribers (Iago, Hamlet and Polonius).
 | 
						|
        message_after_id = self.send_stream_message(
 | 
						|
            iago, "test_reactions_stream", "after subscription history private"
 | 
						|
        )
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            result = self.api_post(
 | 
						|
                iago, f"/api/v1/messages/{message_after_id}/reactions", reaction_info
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        event = events[0]["event"]
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
        self.assertEqual(event_user_ids, {iago.id, hamlet.id, polonius.id})
 | 
						|
        remove = self.api_delete(
 | 
						|
            iago, f"/api/v1/messages/{message_after_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_success(remove)
 | 
						|
 | 
						|
        # Make stream history public to subscribers
 | 
						|
        do_change_stream_permission(
 | 
						|
            stream,
 | 
						|
            invite_only=False,
 | 
						|
            history_public_to_subscribers=True,
 | 
						|
            is_web_public=False,
 | 
						|
            acting_user=iago,
 | 
						|
        )
 | 
						|
        # Since stream history is public to subscribers, reacting to
 | 
						|
        # message_before_id should notify all subscribers:
 | 
						|
        # Iago and Hamlet.
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            result = self.api_post(
 | 
						|
                iago, f"/api/v1/messages/{message_before_id}/reactions", reaction_info
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        event = events[0]["event"]
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
        self.assertEqual(event_user_ids, {iago.id, hamlet.id, polonius.id})
 | 
						|
        remove = self.api_delete(
 | 
						|
            iago, f"/api/v1/messages/{message_before_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_success(remove)
 | 
						|
 | 
						|
        # Make stream web_public as well.
 | 
						|
        do_change_stream_permission(
 | 
						|
            stream,
 | 
						|
            invite_only=False,
 | 
						|
            history_public_to_subscribers=True,
 | 
						|
            is_web_public=True,
 | 
						|
            acting_user=iago,
 | 
						|
        )
 | 
						|
        # For is_web_public streams, events even on old messages
 | 
						|
        # should go to all subscribers, including guests like polonius.
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            result = self.api_post(
 | 
						|
                iago, f"/api/v1/messages/{message_before_id}/reactions", reaction_info
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        event = events[0]["event"]
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
        self.assertEqual(event_user_ids, {iago.id, hamlet.id, polonius.id})
 | 
						|
        remove = self.api_delete(
 | 
						|
            iago, f"/api/v1/messages/{message_before_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_success(remove)
 | 
						|
 | 
						|
        # Direct message, event should go to both participants.
 | 
						|
        private_message_id = self.send_personal_message(
 | 
						|
            iago,
 | 
						|
            hamlet,
 | 
						|
            "hello to single receiver",
 | 
						|
        )
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            result = self.api_post(
 | 
						|
                hamlet, f"/api/v1/messages/{private_message_id}/reactions", reaction_info
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        event = events[0]["event"]
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
        self.assertEqual(event_user_ids, {iago.id, hamlet.id})
 | 
						|
 | 
						|
        # Group direct message; event should go to all participants.
 | 
						|
        group_direct_message_id = self.send_group_direct_message(
 | 
						|
            hamlet,
 | 
						|
            [polonius, iago],
 | 
						|
            "hello message to multiple receiver",
 | 
						|
        )
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            result = self.api_post(
 | 
						|
                polonius, f"/api/v1/messages/{group_direct_message_id}/reactions", reaction_info
 | 
						|
            )
 | 
						|
        self.assert_json_success(result)
 | 
						|
        event = events[0]["event"]
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
        self.assertEqual(event_user_ids, {iago.id, hamlet.id, polonius.id})
 | 
						|
 | 
						|
 | 
						|
class EmojiReactionBase(ZulipTestCase):
 | 
						|
    """Reusable testing functions for emoji reactions tests.  Be careful when
 | 
						|
    changing this: It's used in test_retention.py as well."""
 | 
						|
 | 
						|
    def __init__(self, *args: Any, **kwargs: Any) -> None:
 | 
						|
        super().__init__(*args, **kwargs)
 | 
						|
 | 
						|
    def post_reaction(self, reaction_info: dict[str, str]) -> "TestHttpResponse":
 | 
						|
        message_id = 1
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            self.example_user("hamlet"), f"/api/v1/messages/{message_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        return result
 | 
						|
 | 
						|
    def post_other_reaction(self, reaction_info: dict[str, str]) -> "TestHttpResponse":
 | 
						|
        message_id = 1
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            self.example_user("AARON"), f"/api/v1/messages/{message_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        return result
 | 
						|
 | 
						|
    def delete_reaction(self, reaction_info: dict[str, str]) -> "TestHttpResponse":
 | 
						|
        message_id = 1
 | 
						|
 | 
						|
        result = self.api_delete(
 | 
						|
            self.example_user("hamlet"), f"/api/v1/messages/{message_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        return result
 | 
						|
 | 
						|
    def get_message_reactions(
 | 
						|
        self, message_id: int, emoji_code: str, reaction_type: str
 | 
						|
    ) -> list[Reaction]:
 | 
						|
        message = Message.objects.get(id=message_id)
 | 
						|
        reactions = Reaction.objects.filter(
 | 
						|
            message=message, emoji_code=emoji_code, reaction_type=reaction_type
 | 
						|
        )
 | 
						|
        return list(reactions)
 | 
						|
 | 
						|
 | 
						|
class DefaultEmojiReactionTests(EmojiReactionBase):
 | 
						|
    @override
 | 
						|
    def setUp(self) -> None:
 | 
						|
        super().setUp()
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "hamburger",
 | 
						|
            "emoji_code": "1f354",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_add_default_emoji_reaction(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "thumbs_up",
 | 
						|
            "emoji_code": "1f44d",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_add_default_emoji_invalid_code(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "hamburger",
 | 
						|
            "emoji_code": "TBD",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_error(result, "Invalid emoji code.")
 | 
						|
 | 
						|
    def test_add_default_emoji_invalid_name(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "non-existent",
 | 
						|
            "emoji_code": "1f44d",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_error(result, "Invalid emoji name.")
 | 
						|
 | 
						|
    def test_add_to_existing_renamed_default_emoji_reaction(self) -> None:
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        message = Message.objects.get(id=1)
 | 
						|
        reaction = Reaction.objects.create(
 | 
						|
            user_profile=hamlet,
 | 
						|
            message=message,
 | 
						|
            emoji_name="old_name",
 | 
						|
            emoji_code="1f603",
 | 
						|
            reaction_type="unicode_emoji",
 | 
						|
        )
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "smiley",
 | 
						|
            "emoji_code": "1f603",
 | 
						|
        }
 | 
						|
        result = self.post_other_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        reactions = self.get_message_reactions(1, "1f603", "unicode_emoji")
 | 
						|
        for reaction in reactions:
 | 
						|
            self.assertEqual(reaction.emoji_name, "old_name")
 | 
						|
 | 
						|
    def test_add_duplicate_reaction(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "non-existent",
 | 
						|
            "emoji_code": "1f354",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_error(result, "Reaction already exists.")
 | 
						|
 | 
						|
    def test_add_reaction_by_name(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "+1",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        message = Message.objects.get(id=1)
 | 
						|
        self.assertTrue(
 | 
						|
            Reaction.objects.filter(
 | 
						|
                user_profile=hamlet,
 | 
						|
                message=message,
 | 
						|
                emoji_name=reaction_info["emoji_name"],
 | 
						|
                emoji_code="1f44d",
 | 
						|
                reaction_type="unicode_emoji",
 | 
						|
            ).exists(),
 | 
						|
        )
 | 
						|
 | 
						|
    def test_preserve_non_canonical_name(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "+1",
 | 
						|
            "emoji_code": "1f44d",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        reactions = self.get_message_reactions(1, "1f44d", "unicode_emoji")
 | 
						|
        for reaction in reactions:
 | 
						|
            self.assertEqual(reaction.emoji_name, "+1")
 | 
						|
 | 
						|
    def test_reaction_name_collapse(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "+1",
 | 
						|
            "emoji_code": "1f44d",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        reaction_info["emoji_name"] = "thumbs_up"
 | 
						|
        result = self.post_other_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        reactions = self.get_message_reactions(1, "1f44d", "unicode_emoji")
 | 
						|
        for reaction in reactions:
 | 
						|
            self.assertEqual(reaction.emoji_name, "+1")
 | 
						|
 | 
						|
    def test_delete_default_emoji_reaction(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "hamburger",
 | 
						|
            "emoji_code": "1f354",
 | 
						|
        }
 | 
						|
        result = self.delete_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_delete_insufficient_arguments_reaction(self) -> None:
 | 
						|
        result = self.delete_reaction({})
 | 
						|
        self.assert_json_error(
 | 
						|
            result,
 | 
						|
            "At least one of the following arguments must be present: emoji_name, emoji_code",
 | 
						|
        )
 | 
						|
 | 
						|
    def test_delete_non_existing_emoji_reaction(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "thumbs_up",
 | 
						|
            "emoji_code": "1f44d",
 | 
						|
        }
 | 
						|
        result = self.delete_reaction(reaction_info)
 | 
						|
        self.assert_json_error(result, "Reaction doesn't exist.")
 | 
						|
 | 
						|
    def test_delete_renamed_default_emoji(self) -> None:
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        message = Message.objects.get(id=1)
 | 
						|
        Reaction.objects.create(
 | 
						|
            user_profile=hamlet,
 | 
						|
            message=message,
 | 
						|
            emoji_name="old_name",
 | 
						|
            emoji_code="1f44f",
 | 
						|
            reaction_type="unicode_emoji",
 | 
						|
        )
 | 
						|
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "new_name",
 | 
						|
            "emoji_code": "1f44f",
 | 
						|
        }
 | 
						|
        result = self.delete_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_delete_reaction_by_name(self) -> None:
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        message = Message.objects.get(id=1)
 | 
						|
        Reaction.objects.create(
 | 
						|
            user_profile=hamlet,
 | 
						|
            message=message,
 | 
						|
            emoji_name="+1",
 | 
						|
            emoji_code="1f44d",
 | 
						|
            reaction_type="unicode_emoji",
 | 
						|
        )
 | 
						|
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "+1",
 | 
						|
        }
 | 
						|
        result = self.delete_reaction(reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assertFalse(
 | 
						|
            Reaction.objects.filter(
 | 
						|
                user_profile=hamlet,
 | 
						|
                message=message,
 | 
						|
                emoji_name=reaction_info["emoji_name"],
 | 
						|
                emoji_code="1f44d",
 | 
						|
                reaction_type="unicode_emoji",
 | 
						|
            ).exists(),
 | 
						|
        )
 | 
						|
 | 
						|
    def test_react_historical(self) -> None:
 | 
						|
        """
 | 
						|
        Reacting with valid emoji on a historical message succeeds.
 | 
						|
        """
 | 
						|
        stream_name = "Saxony"
 | 
						|
        stream = self.subscribe(self.example_user("cordelia"), stream_name)
 | 
						|
        message_id = self.send_stream_message(self.example_user("cordelia"), stream_name)
 | 
						|
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        is_user_profile_a_subscriber = Subscription.objects.filter(
 | 
						|
            user_profile=user_profile,
 | 
						|
            recipient__type_id=stream.id,
 | 
						|
        ).exists()
 | 
						|
        self.assertEqual(is_user_profile_a_subscriber, False)
 | 
						|
 | 
						|
        # Verify that hamlet did not receive the message.
 | 
						|
        self.assertFalse(
 | 
						|
            UserMessage.objects.filter(user_profile=user_profile, message_id=message_id).exists()
 | 
						|
        )
 | 
						|
 | 
						|
        # Have hamlet react to the message
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "hamburger",
 | 
						|
            "emoji_code": "1f354",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            user_profile, f"/api/v1/messages/{message_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Fetch the now-created UserMessage object to confirm it exists and is historical
 | 
						|
        user_message = UserMessage.objects.get(user_profile=user_profile, message_id=message_id)
 | 
						|
        self.assertTrue(user_message.flags.historical)
 | 
						|
        self.assertTrue(user_message.flags.read)
 | 
						|
        self.assertFalse(user_message.flags.starred)
 | 
						|
 | 
						|
    def test_react_unsubscribed_private_stream(self) -> None:
 | 
						|
        """
 | 
						|
        Test reacting with valid emoji on a private stream.
 | 
						|
        """
 | 
						|
        stream_name = "new_private_stream"
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        stream = self.make_stream(stream_name, user_profile.realm, invite_only=True)
 | 
						|
        self.subscribe(user_profile, stream_name)
 | 
						|
        message_id = self.send_stream_message(user_profile, stream_name)
 | 
						|
 | 
						|
        # Have hamlet react to the message
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "hamburger",
 | 
						|
            "emoji_code": "1f354",
 | 
						|
        }
 | 
						|
 | 
						|
        result = self.api_post(
 | 
						|
            user_profile, f"/api/v1/messages/{message_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Unsubscribed user without content access should not be able
 | 
						|
        # to react
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
            "emoji_name": "smile",
 | 
						|
        }
 | 
						|
        self.unsubscribe(user_profile, stream_name)
 | 
						|
        result = self.api_post(
 | 
						|
            user_profile, f"/api/v1/messages/{message_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_error(result, "Invalid message(s)")
 | 
						|
 | 
						|
        # Unsubscribed user with content access should be able to react
 | 
						|
        user_profile_group = check_add_user_group(
 | 
						|
            user_profile.realm,
 | 
						|
            "prospero_group",
 | 
						|
            [user_profile],
 | 
						|
            acting_user=user_profile,
 | 
						|
        )
 | 
						|
        do_change_stream_group_based_setting(
 | 
						|
            stream,
 | 
						|
            "can_add_subscribers_group",
 | 
						|
            user_profile_group,
 | 
						|
            acting_user=None,
 | 
						|
        )
 | 
						|
        result = self.api_post(
 | 
						|
            user_profile, f"/api/v1/messages/{message_id}/reactions", reaction_info
 | 
						|
        )
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
 | 
						|
class ZulipExtraEmojiReactionTest(EmojiReactionBase):
 | 
						|
    def test_add_zulip_emoji_reaction(self) -> None:
 | 
						|
        result = self.post_reaction(zulip_reaction_info())
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_add_duplicate_zulip_reaction(self) -> None:
 | 
						|
        result = self.post_reaction(zulip_reaction_info())
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        result = self.post_reaction(zulip_reaction_info())
 | 
						|
        self.assert_json_error(result, "Reaction already exists.")
 | 
						|
 | 
						|
    def test_add_invalid_extra_emoji(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "extra_emoji",
 | 
						|
            "emoji_code": "extra_emoji",
 | 
						|
            "reaction_type": "zulip_extra_emoji",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_error(result, "Invalid emoji code.")
 | 
						|
 | 
						|
    def test_add_invalid_emoji_name(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "zulip_invalid",
 | 
						|
            "emoji_code": "zulip",
 | 
						|
            "reaction_type": "zulip_extra_emoji",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_error(result, "Invalid emoji name.")
 | 
						|
 | 
						|
    def test_delete_zulip_emoji(self) -> None:
 | 
						|
        result = self.post_reaction(zulip_reaction_info())
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        result = self.delete_reaction(zulip_reaction_info())
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_delete_non_existent_zulip_reaction(self) -> None:
 | 
						|
        result = self.delete_reaction(zulip_reaction_info())
 | 
						|
        self.assert_json_error(result, "Reaction doesn't exist.")
 | 
						|
 | 
						|
 | 
						|
class RealmEmojiReactionTests(EmojiReactionBase):
 | 
						|
    @override
 | 
						|
    def setUp(self) -> None:
 | 
						|
        super().setUp()
 | 
						|
        green_tick_emoji = RealmEmoji.objects.get(name="green_tick")
 | 
						|
        self.default_reaction_info = {
 | 
						|
            "reaction_type": "realm_emoji",
 | 
						|
            "emoji_name": "green_tick",
 | 
						|
            "emoji_code": str(green_tick_emoji.id),
 | 
						|
        }
 | 
						|
 | 
						|
    def test_add_realm_emoji(self) -> None:
 | 
						|
        result = self.post_reaction(self.default_reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_add_realm_emoji_invalid_code(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "realm_emoji",
 | 
						|
            "emoji_name": "green_tick",
 | 
						|
            "emoji_code": "9999",
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_error(result, "Invalid custom emoji.")
 | 
						|
 | 
						|
    def test_add_realm_emoji_invalid_name(self) -> None:
 | 
						|
        green_tick_emoji = RealmEmoji.objects.get(name="green_tick")
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "realm_emoji",
 | 
						|
            "emoji_name": "bogus_name",
 | 
						|
            "emoji_code": str(green_tick_emoji.id),
 | 
						|
        }
 | 
						|
        result = self.post_reaction(reaction_info)
 | 
						|
        self.assert_json_error(result, "Invalid custom emoji name.")
 | 
						|
 | 
						|
    def test_add_deactivated_realm_emoji(self) -> None:
 | 
						|
        emoji = RealmEmoji.objects.get(name="green_tick")
 | 
						|
        emoji.deactivated = True
 | 
						|
        emoji.save(update_fields=["deactivated"])
 | 
						|
 | 
						|
        result = self.post_reaction(self.default_reaction_info)
 | 
						|
        self.assert_json_error(result, "This custom emoji has been deactivated.")
 | 
						|
 | 
						|
    def test_add_to_existing_deactivated_realm_emoji_reaction(self) -> None:
 | 
						|
        result = self.post_reaction(self.default_reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        emoji = RealmEmoji.objects.get(name="green_tick")
 | 
						|
        emoji.deactivated = True
 | 
						|
        emoji.save(update_fields=["deactivated"])
 | 
						|
 | 
						|
        result = self.post_other_reaction(self.default_reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        reactions = self.get_message_reactions(
 | 
						|
            1, self.default_reaction_info["emoji_code"], "realm_emoji"
 | 
						|
        )
 | 
						|
        self.assert_length(reactions, 2)
 | 
						|
 | 
						|
    def test_remove_realm_emoji_reaction(self) -> None:
 | 
						|
        result = self.post_reaction(self.default_reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        result = self.delete_reaction(self.default_reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_remove_deactivated_realm_emoji_reaction(self) -> None:
 | 
						|
        result = self.post_reaction(self.default_reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        emoji = RealmEmoji.objects.get(name="green_tick")
 | 
						|
        emoji.deactivated = True
 | 
						|
        emoji.save(update_fields=["deactivated"])
 | 
						|
 | 
						|
        result = self.delete_reaction(self.default_reaction_info)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_remove_non_existent_realm_emoji_reaction(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "reaction_type": "realm_emoji",
 | 
						|
            "emoji_name": "non_existent",
 | 
						|
            "emoji_code": "TBD",
 | 
						|
        }
 | 
						|
        result = self.delete_reaction(reaction_info)
 | 
						|
        self.assert_json_error(result, "Reaction doesn't exist.")
 | 
						|
 | 
						|
    def test_invalid_reaction_type(self) -> None:
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "zulip",
 | 
						|
            "emoji_code": "zulip",
 | 
						|
            "reaction_type": "nonexistent_emoji_type",
 | 
						|
        }
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        message_id = 1
 | 
						|
        result = self.api_post(sender, f"/api/v1/messages/{message_id}/reactions", reaction_info)
 | 
						|
        self.assert_json_error(result, "Invalid emoji type.")
 | 
						|
 | 
						|
 | 
						|
class ReactionAPIEventTest(EmojiReactionBase):
 | 
						|
    def test_add_event(self) -> None:
 | 
						|
        """
 | 
						|
        Recipients of the message receive the reaction event
 | 
						|
        and event contains relevant data
 | 
						|
        """
 | 
						|
        pm_sender = self.example_user("hamlet")
 | 
						|
        pm_recipient = self.example_user("othello")
 | 
						|
        reaction_sender = pm_recipient
 | 
						|
        pm_id = self.send_personal_message(pm_sender, pm_recipient)
 | 
						|
        expected_recipient_ids = {pm_sender.id, pm_recipient.id}
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "hamburger",
 | 
						|
            "emoji_code": "1f354",
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
        }
 | 
						|
        with (
 | 
						|
            self.capture_send_event_calls(expected_num_events=1) as events,
 | 
						|
            mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m,
 | 
						|
        ):
 | 
						|
            m.side_effect = AssertionError(
 | 
						|
                "Events should be sent only after the transaction commits!"
 | 
						|
            )
 | 
						|
            self.api_post(reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info)
 | 
						|
 | 
						|
        event = events[0]["event"]
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
 | 
						|
        self.assertEqual(expected_recipient_ids, event_user_ids)
 | 
						|
        self.assertEqual(event["user"]["user_id"], reaction_sender.id)
 | 
						|
        self.assertEqual(event["user"]["email"], reaction_sender.email)
 | 
						|
        self.assertEqual(event["user"]["full_name"], reaction_sender.full_name)
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        self.assertEqual(event["op"], "add")
 | 
						|
        self.assertEqual(event["message_id"], pm_id)
 | 
						|
        self.assertEqual(event["emoji_name"], reaction_info["emoji_name"])
 | 
						|
        self.assertEqual(event["emoji_code"], reaction_info["emoji_code"])
 | 
						|
        self.assertEqual(event["reaction_type"], reaction_info["reaction_type"])
 | 
						|
 | 
						|
    def test_remove_event(self) -> None:
 | 
						|
        """
 | 
						|
        Recipients of the message receive the reaction event
 | 
						|
        and event contains relevant data
 | 
						|
        """
 | 
						|
        pm_sender = self.example_user("hamlet")
 | 
						|
        pm_recipient = self.example_user("othello")
 | 
						|
        reaction_sender = pm_recipient
 | 
						|
        pm_id = self.send_personal_message(pm_sender, pm_recipient)
 | 
						|
        expected_recipient_ids = {pm_sender.id, pm_recipient.id}
 | 
						|
        reaction_info = {
 | 
						|
            "emoji_name": "hamburger",
 | 
						|
            "emoji_code": "1f354",
 | 
						|
            "reaction_type": "unicode_emoji",
 | 
						|
        }
 | 
						|
        add = self.api_post(
 | 
						|
            reaction_sender,
 | 
						|
            f"/api/v1/messages/{pm_id}/reactions",
 | 
						|
            reaction_info,
 | 
						|
        )
 | 
						|
        self.assert_json_success(add)
 | 
						|
 | 
						|
        with self.capture_send_event_calls(expected_num_events=1) as events:
 | 
						|
            result = self.api_delete(
 | 
						|
                reaction_sender,
 | 
						|
                f"/api/v1/messages/{pm_id}/reactions",
 | 
						|
                reaction_info,
 | 
						|
            )
 | 
						|
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        event = events[0]["event"]
 | 
						|
        event_user_ids = set(events[0]["users"])
 | 
						|
 | 
						|
        self.assertEqual(expected_recipient_ids, event_user_ids)
 | 
						|
        self.assertEqual(event["user"]["user_id"], reaction_sender.id)
 | 
						|
        self.assertEqual(event["user"]["email"], reaction_sender.email)
 | 
						|
        self.assertEqual(event["user"]["full_name"], reaction_sender.full_name)
 | 
						|
        self.assertEqual(event["type"], "reaction")
 | 
						|
        self.assertEqual(event["op"], "remove")
 | 
						|
        self.assertEqual(event["message_id"], pm_id)
 | 
						|
        self.assertEqual(event["emoji_name"], reaction_info["emoji_name"])
 | 
						|
        self.assertEqual(event["emoji_code"], reaction_info["emoji_code"])
 | 
						|
        self.assertEqual(event["reaction_type"], reaction_info["reaction_type"])
 | 
						|
 | 
						|
    def test_events_sent_after_transaction_commits(self) -> None:
 | 
						|
        """
 | 
						|
        Tests that `send_event_rollback_unsafe` is hooked to `transaction.on_commit`.
 | 
						|
        This is important, because we don't want to end up holding locks on message rows
 | 
						|
        for too long if the event queue runs into a problem.
 | 
						|
        """
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        self.send_stream_message(hamlet, "Denmark")
 | 
						|
        message = self.get_last_message()
 | 
						|
        reaction = Reaction(
 | 
						|
            user_profile=hamlet,
 | 
						|
            message=message,
 | 
						|
            emoji_name="whatever",
 | 
						|
            emoji_code="whatever",
 | 
						|
            reaction_type="whatever",
 | 
						|
        )
 | 
						|
 | 
						|
        with (
 | 
						|
            self.capture_send_event_calls(expected_num_events=1),
 | 
						|
            mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m,
 | 
						|
        ):
 | 
						|
            m.side_effect = AssertionError(
 | 
						|
                "Events should be sent only after the transaction commits."
 | 
						|
            )
 | 
						|
            notify_reaction_update(hamlet, message, reaction, "stuff")
 |