mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 14:03:30 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			338 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			338 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import abc
 | 
						|
import json
 | 
						|
import logging
 | 
						|
from typing import Any, AnyStr, Dict, Optional
 | 
						|
 | 
						|
import requests
 | 
						|
from django.utils.translation import ugettext as _
 | 
						|
from requests import Response
 | 
						|
 | 
						|
from version import ZULIP_VERSION
 | 
						|
from zerver.decorator import JsonableError
 | 
						|
from zerver.lib.actions import check_send_message
 | 
						|
from zerver.lib.message import MessageDict
 | 
						|
from zerver.lib.queue import retry_event
 | 
						|
from zerver.lib.topic import get_topic_from_message_info
 | 
						|
from zerver.lib.url_encoding import near_message_url
 | 
						|
from zerver.models import (
 | 
						|
    GENERIC_INTERFACE,
 | 
						|
    SLACK_INTERFACE,
 | 
						|
    Service,
 | 
						|
    UserProfile,
 | 
						|
    email_to_domain,
 | 
						|
    get_client,
 | 
						|
    get_user_profile_by_id,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
class OutgoingWebhookServiceInterface(metaclass=abc.ABCMeta):
 | 
						|
 | 
						|
    def __init__(self, token: str, user_profile: UserProfile, service_name: str) -> None:
 | 
						|
        self.token: str = token
 | 
						|
        self.user_profile: UserProfile = user_profile
 | 
						|
        self.service_name: str = service_name
 | 
						|
 | 
						|
    @abc.abstractmethod
 | 
						|
    def build_bot_request(self, event: Dict[str, Any]) -> Optional[Any]:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    @abc.abstractmethod
 | 
						|
    def send_data_to_server(self, base_url: str, request_data: Any) -> Response:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    @abc.abstractmethod
 | 
						|
    def process_success(self, response_json: Dict[str, Any]) -> Optional[Dict[str, Any]]:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
class GenericOutgoingWebhookService(OutgoingWebhookServiceInterface):
 | 
						|
 | 
						|
    def build_bot_request(self, event: Dict[str, Any]) -> Optional[Any]:
 | 
						|
        '''
 | 
						|
        We send a simple version of the message to outgoing
 | 
						|
        webhooks, since most of them really only need
 | 
						|
        `content` and a few other fields.  We may eventually
 | 
						|
        allow certain bots to get more information, but
 | 
						|
        that's not a high priority.  We do send the gravatar
 | 
						|
        info to the clients (so they don't have to compute
 | 
						|
        it themselves).
 | 
						|
        '''
 | 
						|
        message_dict = MessageDict.finalize_payload(
 | 
						|
            event['message'],
 | 
						|
            apply_markdown=False,
 | 
						|
            client_gravatar=False,
 | 
						|
            keep_rendered_content=True,
 | 
						|
        )
 | 
						|
 | 
						|
        request_data = {"data": event['command'],
 | 
						|
                        "message": message_dict,
 | 
						|
                        "bot_email": self.user_profile.email,
 | 
						|
                        "token": self.token,
 | 
						|
                        "trigger": event['trigger']}
 | 
						|
        return json.dumps(request_data)
 | 
						|
 | 
						|
    def send_data_to_server(self,
 | 
						|
                            base_url: str,
 | 
						|
                            request_data: Any) -> Response:
 | 
						|
        user_agent = 'ZulipOutgoingWebhook/' + ZULIP_VERSION
 | 
						|
        headers = {
 | 
						|
            'content-type': 'application/json',
 | 
						|
            'User-Agent': user_agent,
 | 
						|
        }
 | 
						|
        response = requests.request('POST', base_url, data=request_data, headers=headers)
 | 
						|
        return response
 | 
						|
 | 
						|
    def process_success(self, response_json: Dict[str, Any]) -> Optional[Dict[str, Any]]:
 | 
						|
        if "response_not_required" in response_json and response_json['response_not_required']:
 | 
						|
            return None
 | 
						|
 | 
						|
        if "response_string" in response_json:
 | 
						|
            # We are deprecating response_string.
 | 
						|
            content = str(response_json['response_string'])
 | 
						|
            success_data = dict(content=content)
 | 
						|
            return success_data
 | 
						|
 | 
						|
        if "content" in response_json:
 | 
						|
            content = str(response_json['content'])
 | 
						|
            success_data = dict(content=content)
 | 
						|
            if 'widget_content' in response_json:
 | 
						|
                success_data['widget_content'] = response_json['widget_content']
 | 
						|
            return success_data
 | 
						|
 | 
						|
        return None
 | 
						|
 | 
						|
class SlackOutgoingWebhookService(OutgoingWebhookServiceInterface):
 | 
						|
 | 
						|
    def build_bot_request(self, event: Dict[str, Any]) -> Optional[Any]:
 | 
						|
        if event['message']['type'] == 'private':
 | 
						|
            failure_message = "Slack outgoing webhooks don't support private messages."
 | 
						|
            fail_with_message(event, failure_message)
 | 
						|
            return None
 | 
						|
 | 
						|
        request_data = [("token", self.token),
 | 
						|
                        ("team_id", event['message']['sender_realm_str']),
 | 
						|
                        ("team_domain", email_to_domain(event['message']['sender_email'])),
 | 
						|
                        ("channel_id", event['message']['stream_id']),
 | 
						|
                        ("channel_name", event['message']['display_recipient']),
 | 
						|
                        ("timestamp", event['message']['timestamp']),
 | 
						|
                        ("user_id", event['message']['sender_id']),
 | 
						|
                        ("user_name", event['message']['sender_full_name']),
 | 
						|
                        ("text", event['command']),
 | 
						|
                        ("trigger_word", event['trigger']),
 | 
						|
                        ("service_id", event['user_profile_id']),
 | 
						|
                        ]
 | 
						|
 | 
						|
        return request_data
 | 
						|
 | 
						|
    def send_data_to_server(self,
 | 
						|
                            base_url: str,
 | 
						|
                            request_data: Any) -> Response:
 | 
						|
        response = requests.request('POST', base_url, data=request_data)
 | 
						|
        return response
 | 
						|
 | 
						|
    def process_success(self, response_json: Dict[str, Any]) -> Optional[Dict[str, Any]]:
 | 
						|
        if "text" in response_json:
 | 
						|
            content = response_json['text']
 | 
						|
            success_data = dict(content=content)
 | 
						|
            return success_data
 | 
						|
 | 
						|
        return None
 | 
						|
 | 
						|
AVAILABLE_OUTGOING_WEBHOOK_INTERFACES: Dict[str, Any] = {
 | 
						|
    GENERIC_INTERFACE: GenericOutgoingWebhookService,
 | 
						|
    SLACK_INTERFACE: SlackOutgoingWebhookService,
 | 
						|
}
 | 
						|
 | 
						|
def get_service_interface_class(interface: str) -> Any:
 | 
						|
    if interface is None or interface not in AVAILABLE_OUTGOING_WEBHOOK_INTERFACES:
 | 
						|
        return AVAILABLE_OUTGOING_WEBHOOK_INTERFACES[GENERIC_INTERFACE]
 | 
						|
    else:
 | 
						|
        return AVAILABLE_OUTGOING_WEBHOOK_INTERFACES[interface]
 | 
						|
 | 
						|
def get_outgoing_webhook_service_handler(service: Service) -> Any:
 | 
						|
 | 
						|
    service_interface_class = get_service_interface_class(service.interface_name())
 | 
						|
    service_interface = service_interface_class(token=service.token,
 | 
						|
                                                user_profile=service.user_profile,
 | 
						|
                                                service_name=service.name)
 | 
						|
    return service_interface
 | 
						|
 | 
						|
def send_response_message(bot_id: str, message_info: Dict[str, Any], response_data: Dict[str, Any]) -> None:
 | 
						|
    """
 | 
						|
    bot_id is the user_id of the bot sending the response
 | 
						|
 | 
						|
    message_info is used to address the message and should have these fields:
 | 
						|
        type - "stream" or "private"
 | 
						|
        display_recipient - like we have in other message events
 | 
						|
        topic - see get_topic_from_message_info
 | 
						|
 | 
						|
    response_data is what the bot wants to send back and has these fields:
 | 
						|
        content - raw markdown content for Zulip to render
 | 
						|
    """
 | 
						|
 | 
						|
    message_type = message_info['type']
 | 
						|
    display_recipient = message_info['display_recipient']
 | 
						|
    try:
 | 
						|
        topic_name = get_topic_from_message_info(message_info)
 | 
						|
    except KeyError:
 | 
						|
        topic_name = None
 | 
						|
 | 
						|
    bot_user = get_user_profile_by_id(bot_id)
 | 
						|
    realm = bot_user.realm
 | 
						|
    client = get_client('OutgoingWebhookResponse')
 | 
						|
 | 
						|
    content = response_data.get('content')
 | 
						|
    assert content
 | 
						|
 | 
						|
    widget_content = response_data.get('widget_content')
 | 
						|
 | 
						|
    if message_type == 'stream':
 | 
						|
        message_to = [display_recipient]
 | 
						|
    elif message_type == 'private':
 | 
						|
        message_to = [recipient['email'] for recipient in display_recipient]
 | 
						|
    else:
 | 
						|
        raise JsonableError(_("Invalid message type"))
 | 
						|
 | 
						|
    check_send_message(
 | 
						|
        sender=bot_user,
 | 
						|
        client=client,
 | 
						|
        message_type_name=message_type,
 | 
						|
        message_to=message_to,
 | 
						|
        topic_name=topic_name,
 | 
						|
        message_content=content,
 | 
						|
        widget_content=widget_content,
 | 
						|
        realm=realm,
 | 
						|
    )
 | 
						|
 | 
						|
def fail_with_message(event: Dict[str, Any], failure_message: str) -> None:
 | 
						|
    bot_id = event['user_profile_id']
 | 
						|
    message_info = event['message']
 | 
						|
    content = "Failure! " + failure_message
 | 
						|
    response_data = dict(content=content)
 | 
						|
    send_response_message(bot_id=bot_id, message_info=message_info, response_data=response_data)
 | 
						|
 | 
						|
def get_message_url(event: Dict[str, Any]) -> str:
 | 
						|
    bot_user = get_user_profile_by_id(event['user_profile_id'])
 | 
						|
    message = event['message']
 | 
						|
    realm = bot_user.realm
 | 
						|
 | 
						|
    return near_message_url(
 | 
						|
        realm=realm,
 | 
						|
        message=message,
 | 
						|
    )
 | 
						|
 | 
						|
def notify_bot_owner(event: Dict[str, Any],
 | 
						|
                     status_code: Optional[int]=None,
 | 
						|
                     response_content: Optional[AnyStr]=None,
 | 
						|
                     failure_message: Optional[str]=None,
 | 
						|
                     exception: Optional[Exception]=None) -> None:
 | 
						|
    message_url = get_message_url(event)
 | 
						|
    bot_id = event['user_profile_id']
 | 
						|
    bot_owner = get_user_profile_by_id(bot_id).bot_owner
 | 
						|
 | 
						|
    notification_message = f"[A message]({message_url}) triggered an outgoing webhook."
 | 
						|
    if failure_message:
 | 
						|
        notification_message += "\n" + failure_message
 | 
						|
    if status_code:
 | 
						|
        notification_message += f"\nThe webhook got a response with status code *{status_code}*."
 | 
						|
    if response_content:
 | 
						|
        notification_message += "\nThe response contains the following payload:\n" \
 | 
						|
                                f"```\n{response_content!r}\n```"
 | 
						|
    if exception:
 | 
						|
        notification_message += "\nWhen trying to send a request to the webhook service, an exception " \
 | 
						|
                                f"of type {type(exception).__name__} occurred:\n```\n{exception}\n```"
 | 
						|
 | 
						|
    message_info = dict(
 | 
						|
        type='private',
 | 
						|
        display_recipient=[dict(email=bot_owner.email)],
 | 
						|
    )
 | 
						|
    response_data = dict(content=notification_message)
 | 
						|
    send_response_message(bot_id=bot_id, message_info=message_info, response_data=response_data)
 | 
						|
 | 
						|
def request_retry(event: Dict[str, Any],
 | 
						|
                  failure_message: Optional[str]=None) -> None:
 | 
						|
    def failure_processor(event: Dict[str, Any]) -> None:
 | 
						|
        """
 | 
						|
        The name of the argument is 'event' on purpose. This argument will hide
 | 
						|
        the 'event' argument of the request_retry function. Keeping the same name
 | 
						|
        results in a smaller diff.
 | 
						|
        """
 | 
						|
        bot_user = get_user_profile_by_id(event['user_profile_id'])
 | 
						|
        fail_with_message(event, "Bot is unavailable")
 | 
						|
        notify_bot_owner(event, failure_message=failure_message)
 | 
						|
        logging.warning(
 | 
						|
            "Maximum retries exceeded for trigger:%s event:%s",
 | 
						|
            bot_user.email, event['command'],
 | 
						|
        )
 | 
						|
 | 
						|
    retry_event('outgoing_webhooks', event, failure_processor)
 | 
						|
 | 
						|
def process_success_response(event: Dict[str, Any],
 | 
						|
                             service_handler: Any,
 | 
						|
                             response: Response) -> None:
 | 
						|
    try:
 | 
						|
        response_json = json.loads(response.text)
 | 
						|
    except ValueError:
 | 
						|
        fail_with_message(event, "Invalid JSON in response")
 | 
						|
        return
 | 
						|
 | 
						|
    success_data = service_handler.process_success(response_json)
 | 
						|
 | 
						|
    if success_data is None:
 | 
						|
        return
 | 
						|
 | 
						|
    content = success_data.get('content')
 | 
						|
 | 
						|
    if content is None or content.strip() == "":
 | 
						|
        return
 | 
						|
 | 
						|
    widget_content = success_data.get('widget_content')
 | 
						|
    bot_id = event['user_profile_id']
 | 
						|
    message_info = event['message']
 | 
						|
    response_data = dict(content=content, widget_content=widget_content)
 | 
						|
    send_response_message(bot_id=bot_id, message_info=message_info, response_data=response_data)
 | 
						|
 | 
						|
def do_rest_call(base_url: str,
 | 
						|
                 request_data: Any,
 | 
						|
                 event: Dict[str, Any],
 | 
						|
                 service_handler: Any) -> None:
 | 
						|
    try:
 | 
						|
        response = service_handler.send_data_to_server(
 | 
						|
            base_url=base_url,
 | 
						|
            request_data=request_data,
 | 
						|
        )
 | 
						|
        if str(response.status_code).startswith('2'):
 | 
						|
            process_success_response(event, service_handler, response)
 | 
						|
        else:
 | 
						|
            logging.warning("Message %(message_url)s triggered an outgoing webhook, returning status "
 | 
						|
                            "code %(status_code)s.\n Content of response (in quotes): \""
 | 
						|
                            "%(response)s\"",
 | 
						|
                            {'message_url': get_message_url(event),
 | 
						|
                             'status_code': response.status_code,
 | 
						|
                             'response': response.content})
 | 
						|
            failure_message = f"Third party responded with {response.status_code}"
 | 
						|
            fail_with_message(event, failure_message)
 | 
						|
            notify_bot_owner(event, response.status_code, response.content)
 | 
						|
 | 
						|
    except requests.exceptions.Timeout:
 | 
						|
        logging.info(
 | 
						|
            "Trigger event %s on %s timed out. Retrying",
 | 
						|
            event["command"], event['service_name'],
 | 
						|
        )
 | 
						|
        failure_message = "A timeout occurred."
 | 
						|
        request_retry(event, failure_message=failure_message)
 | 
						|
 | 
						|
    except requests.exceptions.ConnectionError:
 | 
						|
        logging.info("Trigger event %s on %s resulted in a connection error. Retrying",
 | 
						|
                     event["command"], event['service_name'])
 | 
						|
        failure_message = "A connection error occurred. Is my bot server down?"
 | 
						|
        request_retry(event, failure_message=failure_message)
 | 
						|
 | 
						|
    except requests.exceptions.RequestException as e:
 | 
						|
        response_message = (
 | 
						|
            f"An exception of type *{type(e).__name__}* occurred for message `{event['command']}`! "
 | 
						|
            "See the Zulip server logs for more information."
 | 
						|
        )
 | 
						|
        logging.exception("Outhook trigger failed:")
 | 
						|
        fail_with_message(event, response_message)
 | 
						|
        notify_bot_owner(event, exception=e)
 |