mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			607 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			607 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from collections.abc import Callable
 | 
						|
from functools import wraps
 | 
						|
from typing import Any, Concatenate
 | 
						|
from unittest import mock
 | 
						|
 | 
						|
import orjson
 | 
						|
from django.conf import settings
 | 
						|
from django.test import override_settings
 | 
						|
from typing_extensions import ParamSpec, override
 | 
						|
 | 
						|
from zerver.actions.create_user import do_create_user
 | 
						|
from zerver.actions.message_send import get_service_bot_events
 | 
						|
from zerver.lib.bot_config import ConfigError, load_bot_config_template, set_bot_config
 | 
						|
from zerver.lib.bot_lib import EmbeddedBotEmptyRecipientsListError, EmbeddedBotHandler, StateHandler
 | 
						|
from zerver.lib.bot_storage import StateError
 | 
						|
from zerver.lib.test_classes import ZulipTestCase
 | 
						|
from zerver.lib.test_helpers import mock_queue_publish
 | 
						|
from zerver.lib.validator import check_string
 | 
						|
from zerver.models import Recipient, UserProfile
 | 
						|
from zerver.models.realms import get_realm
 | 
						|
from zerver.models.scheduled_jobs import NotificationTriggers
 | 
						|
 | 
						|
BOT_TYPE_TO_QUEUE_NAME = {
 | 
						|
    UserProfile.OUTGOING_WEBHOOK_BOT: "outgoing_webhooks",
 | 
						|
    UserProfile.EMBEDDED_BOT: "embedded_bots",
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
class TestServiceBotBasics(ZulipTestCase):
 | 
						|
    def _get_outgoing_bot(self) -> UserProfile:
 | 
						|
        outgoing_bot = do_create_user(
 | 
						|
            email="bar-bot@zulip.com",
 | 
						|
            password="test",
 | 
						|
            realm=get_realm("zulip"),
 | 
						|
            full_name="BarBot",
 | 
						|
            bot_type=UserProfile.OUTGOING_WEBHOOK_BOT,
 | 
						|
            bot_owner=self.example_user("cordelia"),
 | 
						|
            acting_user=None,
 | 
						|
        )
 | 
						|
 | 
						|
        return outgoing_bot
 | 
						|
 | 
						|
    def test_service_events_for_pms(self) -> None:
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        assert not sender.is_bot
 | 
						|
 | 
						|
        outgoing_bot = self._get_outgoing_bot()
 | 
						|
        assert outgoing_bot.bot_type is not None
 | 
						|
 | 
						|
        event_dict = get_service_bot_events(
 | 
						|
            sender=sender,
 | 
						|
            service_bot_tuples=[
 | 
						|
                (outgoing_bot.id, outgoing_bot.bot_type),
 | 
						|
            ],
 | 
						|
            active_user_ids={outgoing_bot.id},
 | 
						|
            mentioned_user_ids=set(),
 | 
						|
            recipient_type=Recipient.PERSONAL,
 | 
						|
        )
 | 
						|
 | 
						|
        expected = dict(
 | 
						|
            outgoing_webhooks=[
 | 
						|
                dict(trigger=NotificationTriggers.DIRECT_MESSAGE, user_profile_id=outgoing_bot.id),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(event_dict, expected)
 | 
						|
 | 
						|
    def test_spurious_mentions(self) -> None:
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        assert not sender.is_bot
 | 
						|
 | 
						|
        outgoing_bot = self._get_outgoing_bot()
 | 
						|
        assert outgoing_bot.bot_type is not None
 | 
						|
 | 
						|
        # If outgoing_bot is not in mentioned_user_ids,
 | 
						|
        # we will skip over it.  This tests an anomaly
 | 
						|
        # of the code that our query for bots can include
 | 
						|
        # bots that may not actually be mentioned, and it's
 | 
						|
        # easiest to just filter them in get_service_bot_events.
 | 
						|
        event_dict = get_service_bot_events(
 | 
						|
            sender=sender,
 | 
						|
            service_bot_tuples=[
 | 
						|
                (outgoing_bot.id, outgoing_bot.bot_type),
 | 
						|
            ],
 | 
						|
            active_user_ids={outgoing_bot.id},
 | 
						|
            mentioned_user_ids=set(),
 | 
						|
            recipient_type=Recipient.STREAM,
 | 
						|
        )
 | 
						|
 | 
						|
        self.assert_length(event_dict, 0)
 | 
						|
 | 
						|
    def test_service_events_for_stream_mentions(self) -> None:
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        assert not sender.is_bot
 | 
						|
 | 
						|
        outgoing_bot = self._get_outgoing_bot()
 | 
						|
        assert outgoing_bot.bot_type is not None
 | 
						|
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
 | 
						|
        red_herring_bot = self.create_test_bot(
 | 
						|
            short_name="whatever",
 | 
						|
            user_profile=cordelia,
 | 
						|
        )
 | 
						|
 | 
						|
        event_dict = get_service_bot_events(
 | 
						|
            sender=sender,
 | 
						|
            service_bot_tuples=[
 | 
						|
                (outgoing_bot.id, outgoing_bot.bot_type),
 | 
						|
                (red_herring_bot.id, UserProfile.OUTGOING_WEBHOOK_BOT),
 | 
						|
            ],
 | 
						|
            active_user_ids=set(),
 | 
						|
            mentioned_user_ids={outgoing_bot.id},
 | 
						|
            recipient_type=Recipient.STREAM,
 | 
						|
        )
 | 
						|
 | 
						|
        expected = dict(
 | 
						|
            outgoing_webhooks=[
 | 
						|
                dict(trigger="mention", user_profile_id=outgoing_bot.id),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertEqual(event_dict, expected)
 | 
						|
 | 
						|
    def test_service_events_for_private_mentions(self) -> None:
 | 
						|
        """Service bots should not get access to mentions if they aren't a
 | 
						|
        direct recipient."""
 | 
						|
        sender = self.example_user("hamlet")
 | 
						|
        assert not sender.is_bot
 | 
						|
 | 
						|
        outgoing_bot = self._get_outgoing_bot()
 | 
						|
        assert outgoing_bot.bot_type is not None
 | 
						|
 | 
						|
        event_dict = get_service_bot_events(
 | 
						|
            sender=sender,
 | 
						|
            service_bot_tuples=[
 | 
						|
                (outgoing_bot.id, outgoing_bot.bot_type),
 | 
						|
            ],
 | 
						|
            active_user_ids=set(),
 | 
						|
            mentioned_user_ids={outgoing_bot.id},
 | 
						|
            recipient_type=Recipient.PERSONAL,
 | 
						|
        )
 | 
						|
 | 
						|
        self.assert_length(event_dict, 0)
 | 
						|
 | 
						|
    def test_service_events_with_unexpected_bot_type(self) -> None:
 | 
						|
        hamlet = self.example_user("hamlet")
 | 
						|
        cordelia = self.example_user("cordelia")
 | 
						|
 | 
						|
        bot = self.create_test_bot(
 | 
						|
            short_name="whatever",
 | 
						|
            user_profile=cordelia,
 | 
						|
        )
 | 
						|
        wrong_bot_type = UserProfile.INCOMING_WEBHOOK_BOT
 | 
						|
        bot.bot_type = wrong_bot_type
 | 
						|
        bot.save()
 | 
						|
 | 
						|
        with self.assertLogs(level="ERROR") as m:
 | 
						|
            event_dict = get_service_bot_events(
 | 
						|
                sender=hamlet,
 | 
						|
                service_bot_tuples=[
 | 
						|
                    (bot.id, wrong_bot_type),
 | 
						|
                ],
 | 
						|
                active_user_ids=set(),
 | 
						|
                mentioned_user_ids={bot.id},
 | 
						|
                recipient_type=Recipient.PERSONAL,
 | 
						|
            )
 | 
						|
 | 
						|
        self.assert_length(event_dict, 0)
 | 
						|
        self.assertEqual(
 | 
						|
            m.output,
 | 
						|
            [f"ERROR:root:Unexpected bot_type for Service bot id={bot.id}: {wrong_bot_type}"],
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
class TestServiceBotStateHandler(ZulipTestCase):
 | 
						|
    @override
 | 
						|
    def setUp(self) -> None:
 | 
						|
        super().setUp()
 | 
						|
        self.user_profile = self.example_user("othello")
 | 
						|
        self.bot_profile = do_create_user(
 | 
						|
            email="embedded-bot-1@zulip.com",
 | 
						|
            password="test",
 | 
						|
            realm=get_realm("zulip"),
 | 
						|
            full_name="EmbeddedBo1",
 | 
						|
            bot_type=UserProfile.EMBEDDED_BOT,
 | 
						|
            bot_owner=self.user_profile,
 | 
						|
            acting_user=None,
 | 
						|
        )
 | 
						|
        self.second_bot_profile = do_create_user(
 | 
						|
            email="embedded-bot-2@zulip.com",
 | 
						|
            password="test",
 | 
						|
            realm=get_realm("zulip"),
 | 
						|
            full_name="EmbeddedBot2",
 | 
						|
            bot_type=UserProfile.EMBEDDED_BOT,
 | 
						|
            bot_owner=self.user_profile,
 | 
						|
            acting_user=None,
 | 
						|
        )
 | 
						|
 | 
						|
    def test_basic_storage_and_retrieval(self) -> None:
 | 
						|
        storage = StateHandler(self.bot_profile)
 | 
						|
        storage.put("some key", "some value")
 | 
						|
        storage.put("some other key", "some other value")
 | 
						|
        self.assertEqual(storage.get("some key"), "some value")
 | 
						|
        self.assertEqual(storage.get("some other key"), "some other value")
 | 
						|
        self.assertTrue(storage.contains("some key"))
 | 
						|
        self.assertFalse(storage.contains("nonexistent key"))
 | 
						|
        self.assertRaisesMessage(
 | 
						|
            StateError, "Key does not exist.", lambda: storage.get("nonexistent key")
 | 
						|
        )
 | 
						|
        storage.put("some key", "a new value")
 | 
						|
        self.assertEqual(storage.get("some key"), "a new value")
 | 
						|
        second_storage = StateHandler(self.second_bot_profile)
 | 
						|
        self.assertRaises(StateError, lambda: second_storage.get("some key"))
 | 
						|
        second_storage.put("some key", "yet another value")
 | 
						|
        self.assertEqual(storage.get("some key"), "a new value")
 | 
						|
        self.assertEqual(second_storage.get("some key"), "yet another value")
 | 
						|
 | 
						|
    def test_marshaling(self) -> None:
 | 
						|
        storage = StateHandler(self.bot_profile)
 | 
						|
        serializable_obj = {"foo": "bar", "baz": [42, "cux"]}
 | 
						|
        storage.put("some key", serializable_obj)
 | 
						|
        self.assertEqual(storage.get("some key"), serializable_obj)
 | 
						|
 | 
						|
    # Reduce maximal storage size for faster test string construction.
 | 
						|
    @override_settings(USER_STATE_SIZE_LIMIT=100)
 | 
						|
    def test_storage_limit(self) -> None:
 | 
						|
        storage = StateHandler(self.bot_profile)
 | 
						|
 | 
						|
        # Disable marshaling for storing a string whose size is
 | 
						|
        # equivalent to the size of the stored object.
 | 
						|
        storage.marshal = lambda obj: check_string("obj", obj)
 | 
						|
        storage.demarshal = lambda obj: obj
 | 
						|
 | 
						|
        key = "capacity-filling entry"
 | 
						|
        storage.put(key, "x" * (settings.USER_STATE_SIZE_LIMIT - len(key)))
 | 
						|
 | 
						|
        with self.assertRaisesMessage(
 | 
						|
            StateError,
 | 
						|
            "Request exceeds storage limit by 32 characters. The limit is 100 characters.",
 | 
						|
        ):
 | 
						|
            storage.put("too much data", "a few bits too long")
 | 
						|
 | 
						|
        second_storage = StateHandler(self.second_bot_profile)
 | 
						|
        second_storage.put("another big entry", "x" * (settings.USER_STATE_SIZE_LIMIT - 40))
 | 
						|
        second_storage.put("normal entry", "abcd")
 | 
						|
 | 
						|
    def test_entry_removal(self) -> None:
 | 
						|
        storage = StateHandler(self.bot_profile)
 | 
						|
        storage.put("some key", "some value")
 | 
						|
        storage.put("another key", "some value")
 | 
						|
        self.assertTrue(storage.contains("some key"))
 | 
						|
        self.assertTrue(storage.contains("another key"))
 | 
						|
        storage.remove("some key")
 | 
						|
        self.assertFalse(storage.contains("some key"))
 | 
						|
        self.assertTrue(storage.contains("another key"))
 | 
						|
        self.assertRaises(StateError, lambda: storage.remove("some key"))
 | 
						|
 | 
						|
    def test_internal_endpoint(self) -> None:
 | 
						|
        self.login_user(self.user_profile)
 | 
						|
 | 
						|
        # Store some data.
 | 
						|
        initial_dict = {"key 1": "value 1", "key 2": "value 2", "key 3": "value 3"}
 | 
						|
        params = {
 | 
						|
            "storage": orjson.dumps(initial_dict).decode(),
 | 
						|
        }
 | 
						|
        result = self.client_put("/json/bot_storage", params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Assert the stored data for some keys.
 | 
						|
        params = {
 | 
						|
            "keys": orjson.dumps(["key 1", "key 3"]).decode(),
 | 
						|
        }
 | 
						|
        result = self.client_get("/json/bot_storage", params)
 | 
						|
        response_dict = self.assert_json_success(result)
 | 
						|
        self.assertEqual(response_dict["storage"], {"key 3": "value 3", "key 1": "value 1"})
 | 
						|
 | 
						|
        # Assert the stored data for all keys.
 | 
						|
        result = self.client_get("/json/bot_storage")
 | 
						|
        response_dict = self.assert_json_success(result)
 | 
						|
        self.assertEqual(response_dict["storage"], initial_dict)
 | 
						|
 | 
						|
        # Store some more data; update an entry and store a new entry
 | 
						|
        dict_update = {"key 1": "new value", "key 4": "value 4"}
 | 
						|
        params = {
 | 
						|
            "storage": orjson.dumps(dict_update).decode(),
 | 
						|
        }
 | 
						|
        result = self.client_put("/json/bot_storage", params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Assert the data was updated.
 | 
						|
        updated_dict = initial_dict.copy()
 | 
						|
        updated_dict.update(dict_update)
 | 
						|
        result = self.client_get("/json/bot_storage")
 | 
						|
        response_dict = self.assert_json_success(result)
 | 
						|
        self.assertEqual(response_dict["storage"], updated_dict)
 | 
						|
 | 
						|
        # Assert errors on invalid requests.
 | 
						|
        invalid_params = {
 | 
						|
            "keys": ["This is a list, but should be a serialized string."],
 | 
						|
        }
 | 
						|
        result = self.client_get("/json/bot_storage", invalid_params)
 | 
						|
        self.assert_json_error(result, "keys is not valid JSON")
 | 
						|
 | 
						|
        params = {
 | 
						|
            "keys": orjson.dumps(["key 1", "nonexistent key"]).decode(),
 | 
						|
        }
 | 
						|
        result = self.client_get("/json/bot_storage", params)
 | 
						|
        self.assert_json_error(result, "Key does not exist.")
 | 
						|
 | 
						|
        params = {
 | 
						|
            "storage": orjson.dumps({"foo": [1, 2, 3]}).decode(),
 | 
						|
        }
 | 
						|
        result = self.client_put("/json/bot_storage", params)
 | 
						|
        self.assert_json_error(result, 'storage["foo"] is not a string')
 | 
						|
 | 
						|
        # Remove some entries.
 | 
						|
        keys_to_remove = ["key 1", "key 2"]
 | 
						|
        params = {
 | 
						|
            "keys": orjson.dumps(keys_to_remove).decode(),
 | 
						|
        }
 | 
						|
        result = self.client_delete("/json/bot_storage", params)
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Assert the entries were removed.
 | 
						|
        for key in keys_to_remove:
 | 
						|
            updated_dict.pop(key)
 | 
						|
        result = self.client_get("/json/bot_storage")
 | 
						|
        response_dict = self.assert_json_success(result)
 | 
						|
        self.assertEqual(response_dict["storage"], updated_dict)
 | 
						|
 | 
						|
        # Try to remove an existing and a nonexistent key.
 | 
						|
        params = {
 | 
						|
            "keys": orjson.dumps(["key 3", "nonexistent key"]).decode(),
 | 
						|
        }
 | 
						|
        result = self.client_delete("/json/bot_storage", params)
 | 
						|
        self.assert_json_error(result, "Key does not exist.")
 | 
						|
 | 
						|
        # Assert an error has been thrown and no entries were removed.
 | 
						|
        result = self.client_get("/json/bot_storage")
 | 
						|
        response_dict = self.assert_json_success(result)
 | 
						|
        self.assertEqual(response_dict["storage"], updated_dict)
 | 
						|
 | 
						|
        # Remove the entire storage.
 | 
						|
        result = self.client_delete("/json/bot_storage")
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # Assert the entire storage has been removed.
 | 
						|
        result = self.client_get("/json/bot_storage")
 | 
						|
        response_dict = self.assert_json_success(result)
 | 
						|
        self.assertEqual(response_dict["storage"], {})
 | 
						|
 | 
						|
 | 
						|
class TestServiceBotConfigHandler(ZulipTestCase):
 | 
						|
    @override
 | 
						|
    def setUp(self) -> None:
 | 
						|
        super().setUp()
 | 
						|
        self.user_profile = self.example_user("othello")
 | 
						|
        self.bot_profile = self.create_test_bot(
 | 
						|
            "embedded",
 | 
						|
            self.user_profile,
 | 
						|
            full_name="Embedded bot",
 | 
						|
            bot_type=UserProfile.EMBEDDED_BOT,
 | 
						|
            service_name="helloworld",
 | 
						|
        )
 | 
						|
        self.bot_handler = EmbeddedBotHandler(self.bot_profile)
 | 
						|
 | 
						|
    def test_basic_storage_and_retrieval(self) -> None:
 | 
						|
        with self.assertRaises(ConfigError):
 | 
						|
            self.bot_handler.get_config_info("foo")
 | 
						|
 | 
						|
        self.assertEqual(self.bot_handler.get_config_info("foo", optional=True), {})
 | 
						|
 | 
						|
        config_dict = {"entry 1": "value 1", "entry 2": "value 2"}
 | 
						|
        for key, value in config_dict.items():
 | 
						|
            set_bot_config(self.bot_profile, key, value)
 | 
						|
        self.assertEqual(self.bot_handler.get_config_info("foo"), config_dict)
 | 
						|
 | 
						|
        config_update = {"entry 2": "new value", "entry 3": "value 3"}
 | 
						|
        for key, value in config_update.items():
 | 
						|
            set_bot_config(self.bot_profile, key, value)
 | 
						|
        config_dict.update(config_update)
 | 
						|
        self.assertEqual(self.bot_handler.get_config_info("foo"), config_dict)
 | 
						|
 | 
						|
    @override_settings(BOT_CONFIG_SIZE_LIMIT=100)
 | 
						|
    def test_config_entry_limit(self) -> None:
 | 
						|
        set_bot_config(self.bot_profile, "some key", "x" * (settings.BOT_CONFIG_SIZE_LIMIT - 8))
 | 
						|
        self.assertRaisesMessage(
 | 
						|
            ConfigError,
 | 
						|
            "Cannot store configuration. Request would require 101 characters. "
 | 
						|
            "The current configuration size limit is 100 characters.",
 | 
						|
            lambda: set_bot_config(
 | 
						|
                self.bot_profile, "some key", "x" * (settings.BOT_CONFIG_SIZE_LIMIT - 8 + 1)
 | 
						|
            ),
 | 
						|
        )
 | 
						|
        set_bot_config(self.bot_profile, "some key", "x" * (settings.BOT_CONFIG_SIZE_LIMIT - 20))
 | 
						|
        set_bot_config(self.bot_profile, "another key", "x")
 | 
						|
        self.assertRaisesMessage(
 | 
						|
            ConfigError,
 | 
						|
            "Cannot store configuration. Request would require 116 characters. "
 | 
						|
            "The current configuration size limit is 100 characters.",
 | 
						|
            lambda: set_bot_config(self.bot_profile, "yet another key", "x"),
 | 
						|
        )
 | 
						|
 | 
						|
    def test_load_bot_config_template(self) -> None:
 | 
						|
        bot_config = load_bot_config_template("giphy")
 | 
						|
        self.assertTrue(isinstance(bot_config, dict))
 | 
						|
        self.assert_length(bot_config, 1)
 | 
						|
 | 
						|
    def test_load_bot_config_template_for_bot_without_config_data(self) -> None:
 | 
						|
        bot_config = load_bot_config_template("converter")
 | 
						|
        self.assertTrue(isinstance(bot_config, dict))
 | 
						|
        self.assert_length(bot_config, 0)
 | 
						|
 | 
						|
    def test_bot_send_pm_with_empty_recipients_list(self) -> None:
 | 
						|
        with self.assertRaisesRegex(
 | 
						|
            EmbeddedBotEmptyRecipientsListError, "Message must have recipients!"
 | 
						|
        ):
 | 
						|
            self.bot_handler.send_message(message={"type": "private", "to": []})
 | 
						|
 | 
						|
 | 
						|
ParamT = ParamSpec("ParamT")
 | 
						|
 | 
						|
 | 
						|
def for_all_bot_types(
 | 
						|
    test_func: Callable[Concatenate["TestServiceBotEventTriggers", ParamT], None],
 | 
						|
) -> Callable[Concatenate["TestServiceBotEventTriggers", ParamT], None]:
 | 
						|
    @wraps(test_func)
 | 
						|
    def _wrapped(
 | 
						|
        self: "TestServiceBotEventTriggers", /, *args: ParamT.args, **kwargs: ParamT.kwargs
 | 
						|
    ) -> None:
 | 
						|
        for bot_type in BOT_TYPE_TO_QUEUE_NAME:
 | 
						|
            self.bot_profile.bot_type = bot_type
 | 
						|
            self.bot_profile.save()
 | 
						|
            test_func(self, *args, **kwargs)
 | 
						|
 | 
						|
    return _wrapped
 | 
						|
 | 
						|
 | 
						|
def patch_queue_publish(
 | 
						|
    method_to_patch: str,
 | 
						|
) -> Callable[
 | 
						|
    [Callable[["TestServiceBotEventTriggers", mock.Mock], None]],
 | 
						|
    Callable[["TestServiceBotEventTriggers"], None],
 | 
						|
]:
 | 
						|
    def inner(
 | 
						|
        func: Callable[["TestServiceBotEventTriggers", mock.Mock], None],
 | 
						|
    ) -> Callable[["TestServiceBotEventTriggers"], None]:
 | 
						|
        @wraps(func)
 | 
						|
        def _wrapped(self: "TestServiceBotEventTriggers") -> None:
 | 
						|
            with mock_queue_publish(method_to_patch) as m:
 | 
						|
                func(self, m)
 | 
						|
 | 
						|
        return _wrapped
 | 
						|
 | 
						|
    return inner
 | 
						|
 | 
						|
 | 
						|
class TestServiceBotEventTriggers(ZulipTestCase):
 | 
						|
    @override
 | 
						|
    def setUp(self) -> None:
 | 
						|
        super().setUp()
 | 
						|
        self.user_profile = self.example_user("othello")
 | 
						|
        self.bot_profile = do_create_user(
 | 
						|
            email="foo-bot@zulip.com",
 | 
						|
            password="test",
 | 
						|
            realm=get_realm("zulip"),
 | 
						|
            full_name="FooBot",
 | 
						|
            bot_type=UserProfile.OUTGOING_WEBHOOK_BOT,
 | 
						|
            bot_owner=self.user_profile,
 | 
						|
            acting_user=None,
 | 
						|
        )
 | 
						|
        self.second_bot_profile = do_create_user(
 | 
						|
            email="bar-bot@zulip.com",
 | 
						|
            password="test",
 | 
						|
            realm=get_realm("zulip"),
 | 
						|
            full_name="BarBot",
 | 
						|
            bot_type=UserProfile.OUTGOING_WEBHOOK_BOT,
 | 
						|
            bot_owner=self.user_profile,
 | 
						|
            acting_user=None,
 | 
						|
        )
 | 
						|
 | 
						|
    @for_all_bot_types
 | 
						|
    @patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
 | 
						|
    def test_trigger_on_stream_mention_from_user(
 | 
						|
        self, mock_queue_event_on_commit: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        content = "@**FooBot** foo bar!!!"
 | 
						|
        recipient = "Denmark"
 | 
						|
        trigger = "mention"
 | 
						|
        message_type = Recipient._type_names[Recipient.STREAM]
 | 
						|
 | 
						|
        def check_values_passed(
 | 
						|
            queue_name: Any,
 | 
						|
            trigger_event: dict[str, Any],
 | 
						|
            processor: Callable[[Any], None] | None = None,
 | 
						|
        ) -> None:
 | 
						|
            assert self.bot_profile.bot_type
 | 
						|
            self.assertEqual(queue_name, BOT_TYPE_TO_QUEUE_NAME[self.bot_profile.bot_type])
 | 
						|
            self.assertEqual(trigger_event["message"]["content"], content)
 | 
						|
            self.assertEqual(trigger_event["message"]["display_recipient"], recipient)
 | 
						|
            self.assertEqual(trigger_event["message"]["sender_email"], self.user_profile.email)
 | 
						|
            self.assertEqual(trigger_event["message"]["type"], message_type)
 | 
						|
            self.assertEqual(trigger_event["trigger"], trigger)
 | 
						|
            self.assertEqual(trigger_event["user_profile_id"], self.bot_profile.id)
 | 
						|
 | 
						|
        mock_queue_event_on_commit.side_effect = check_values_passed
 | 
						|
 | 
						|
        self.send_stream_message(self.user_profile, "Denmark", content)
 | 
						|
        self.assertTrue(mock_queue_event_on_commit.called)
 | 
						|
 | 
						|
    @patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
 | 
						|
    def test_no_trigger_on_stream_message_without_mention(
 | 
						|
        self, mock_queue_event_on_commit: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        sender = self.user_profile
 | 
						|
        self.send_stream_message(sender, "Denmark")
 | 
						|
        self.assertFalse(mock_queue_event_on_commit.called)
 | 
						|
 | 
						|
    @for_all_bot_types
 | 
						|
    @patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
 | 
						|
    def test_no_trigger_on_stream_mention_from_bot(
 | 
						|
        self, mock_queue_event_on_commit: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        self.send_stream_message(self.second_bot_profile, "Denmark", "@**FooBot** foo bar!!!")
 | 
						|
        self.assertFalse(mock_queue_event_on_commit.called)
 | 
						|
 | 
						|
    @for_all_bot_types
 | 
						|
    @patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
 | 
						|
    def test_trigger_on_personal_message_from_user(
 | 
						|
        self, mock_queue_event_on_commit: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        sender = self.user_profile
 | 
						|
        recipient = self.bot_profile
 | 
						|
 | 
						|
        def check_values_passed(
 | 
						|
            queue_name: Any,
 | 
						|
            trigger_event: dict[str, Any],
 | 
						|
            processor: Callable[[Any], None] | None = None,
 | 
						|
        ) -> None:
 | 
						|
            assert self.bot_profile.bot_type
 | 
						|
            self.assertEqual(queue_name, BOT_TYPE_TO_QUEUE_NAME[self.bot_profile.bot_type])
 | 
						|
            self.assertEqual(trigger_event["user_profile_id"], self.bot_profile.id)
 | 
						|
            self.assertEqual(trigger_event["trigger"], NotificationTriggers.DIRECT_MESSAGE)
 | 
						|
            self.assertEqual(trigger_event["message"]["sender_email"], sender.email)
 | 
						|
            display_recipients = [
 | 
						|
                trigger_event["message"]["display_recipient"][0]["email"],
 | 
						|
                trigger_event["message"]["display_recipient"][1]["email"],
 | 
						|
            ]
 | 
						|
            self.assertTrue(sender.email in display_recipients)
 | 
						|
            self.assertTrue(recipient.email in display_recipients)
 | 
						|
 | 
						|
        mock_queue_event_on_commit.side_effect = check_values_passed
 | 
						|
 | 
						|
        self.send_personal_message(sender, recipient, "test")
 | 
						|
        self.assertTrue(mock_queue_event_on_commit.called)
 | 
						|
 | 
						|
    @for_all_bot_types
 | 
						|
    @patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
 | 
						|
    def test_no_trigger_on_personal_message_from_bot(
 | 
						|
        self, mock_queue_event_on_commit: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        sender = self.second_bot_profile
 | 
						|
        recipient = self.bot_profile
 | 
						|
        self.send_personal_message(sender, recipient)
 | 
						|
        self.assertFalse(mock_queue_event_on_commit.called)
 | 
						|
 | 
						|
    @for_all_bot_types
 | 
						|
    @patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
 | 
						|
    def test_trigger_on_group_direct_message_from_user(
 | 
						|
        self, mock_queue_event_on_commit: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        self.second_bot_profile.bot_type = self.bot_profile.bot_type
 | 
						|
        self.second_bot_profile.save()
 | 
						|
 | 
						|
        sender = self.user_profile
 | 
						|
        recipients = [self.bot_profile, self.second_bot_profile]
 | 
						|
        profile_ids = [self.bot_profile.id, self.second_bot_profile.id]
 | 
						|
 | 
						|
        def check_values_passed(
 | 
						|
            queue_name: Any,
 | 
						|
            trigger_event: dict[str, Any],
 | 
						|
            processor: Callable[[Any], None] | None = None,
 | 
						|
        ) -> None:
 | 
						|
            assert self.bot_profile.bot_type
 | 
						|
            self.assertEqual(queue_name, BOT_TYPE_TO_QUEUE_NAME[self.bot_profile.bot_type])
 | 
						|
            self.assertIn(trigger_event["user_profile_id"], profile_ids)
 | 
						|
            profile_ids.remove(trigger_event["user_profile_id"])
 | 
						|
            self.assertEqual(trigger_event["trigger"], NotificationTriggers.DIRECT_MESSAGE)
 | 
						|
            self.assertEqual(trigger_event["message"]["sender_email"], sender.email)
 | 
						|
            self.assertEqual(trigger_event["message"]["type"], "private")
 | 
						|
 | 
						|
        mock_queue_event_on_commit.side_effect = check_values_passed
 | 
						|
 | 
						|
        self.send_group_direct_message(sender, recipients, "test")
 | 
						|
        self.assertEqual(mock_queue_event_on_commit.call_count, 2)
 | 
						|
 | 
						|
    @for_all_bot_types
 | 
						|
    @patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
 | 
						|
    def test_no_trigger_on_group_direct_message_from_bot(
 | 
						|
        self, mock_queue_event_on_commit: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        sender = self.second_bot_profile
 | 
						|
        recipients = [self.user_profile, self.bot_profile]
 | 
						|
        self.send_group_direct_message(sender, recipients)
 | 
						|
        self.assertFalse(mock_queue_event_on_commit.called)
 |