mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	Previously, the integration's name was directly being used. Due to this, the GitHub Sponsors integration which is in the same module as the GitHub integration could not be used with the `generate-integration-docs-screenshot` script, as it would be unable to locate the fixtures.
		
			
				
	
	
		
			318 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			318 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import fnmatch
 | 
						|
import hashlib
 | 
						|
import hmac
 | 
						|
import importlib
 | 
						|
from collections.abc import Callable
 | 
						|
from dataclasses import dataclass
 | 
						|
from datetime import datetime
 | 
						|
from typing import Annotated, Any, TypeAlias
 | 
						|
from urllib.parse import unquote
 | 
						|
 | 
						|
from django.conf import settings
 | 
						|
from django.http import HttpRequest
 | 
						|
from django.utils.encoding import force_bytes
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
from pydantic import Json
 | 
						|
from typing_extensions import override
 | 
						|
 | 
						|
from zerver.actions.message_send import (
 | 
						|
    check_send_private_message,
 | 
						|
    check_send_stream_message,
 | 
						|
    check_send_stream_message_by_id,
 | 
						|
    send_rate_limited_pm_notification_to_bot_owner,
 | 
						|
)
 | 
						|
from zerver.lib.exceptions import (
 | 
						|
    AnomalousWebhookPayloadError,
 | 
						|
    ErrorCode,
 | 
						|
    JsonableError,
 | 
						|
    StreamDoesNotExistError,
 | 
						|
)
 | 
						|
from zerver.lib.request import RequestNotes
 | 
						|
from zerver.lib.send_email import FromAddress
 | 
						|
from zerver.lib.timestamp import timestamp_to_datetime
 | 
						|
from zerver.lib.typed_endpoint import ApiParamConfig, typed_endpoint
 | 
						|
from zerver.models import UserProfile
 | 
						|
 | 
						|
MISSING_EVENT_HEADER_MESSAGE = """\
 | 
						|
Hi there!  Your bot {bot_name} just sent an HTTP request to {request_path} that
 | 
						|
is missing the HTTP {header_name} header.  Because this header is how
 | 
						|
{integration_name} indicates the event type, this usually indicates a configuration
 | 
						|
issue, where you either entered the URL for a different integration, or are running
 | 
						|
an older version of the third-party service that doesn't provide that header.
 | 
						|
Contact {support_email} if you need help debugging!
 | 
						|
"""
 | 
						|
 | 
						|
INVALID_JSON_MESSAGE = """
 | 
						|
Hi there! It looks like you tried to set up the Zulip {webhook_name} integration,
 | 
						|
but didn't correctly configure the webhook to send data in the JSON format
 | 
						|
that this integration expects!
 | 
						|
"""
 | 
						|
 | 
						|
SETUP_MESSAGE_TEMPLATE = "{integration} webhook has been successfully configured"
 | 
						|
SETUP_MESSAGE_USER_PART = " by {user_name}"
 | 
						|
 | 
						|
OptionalUserSpecifiedTopicStr: TypeAlias = Annotated[str | None, ApiParamConfig("topic")]
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class WebhookConfigOption:
 | 
						|
    name: str
 | 
						|
    description: str
 | 
						|
    validator: Callable[[str, str], str | bool | None]
 | 
						|
 | 
						|
 | 
						|
def get_setup_webhook_message(integration: str, user_name: str | None = None) -> str:
 | 
						|
    content = SETUP_MESSAGE_TEMPLATE.format(integration=integration)
 | 
						|
    if user_name:
 | 
						|
        content += SETUP_MESSAGE_USER_PART.format(user_name=user_name)
 | 
						|
    content = f"{content}."
 | 
						|
    return content
 | 
						|
 | 
						|
 | 
						|
def notify_bot_owner_about_invalid_json(
 | 
						|
    user_profile: UserProfile, webhook_client_name: str
 | 
						|
) -> None:
 | 
						|
    send_rate_limited_pm_notification_to_bot_owner(
 | 
						|
        user_profile,
 | 
						|
        user_profile.realm,
 | 
						|
        INVALID_JSON_MESSAGE.format(webhook_name=webhook_client_name).strip(),
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
class MissingHTTPEventHeaderError(AnomalousWebhookPayloadError):
 | 
						|
    code = ErrorCode.MISSING_HTTP_EVENT_HEADER
 | 
						|
    data_fields = ["header"]
 | 
						|
 | 
						|
    def __init__(self, header: str) -> None:
 | 
						|
        self.header = header
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    @override
 | 
						|
    def msg_format() -> str:
 | 
						|
        return _("Missing the HTTP event header '{header}'")
 | 
						|
 | 
						|
 | 
						|
@typed_endpoint
 | 
						|
def check_send_webhook_message(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    topic: str,
 | 
						|
    body: str,
 | 
						|
    complete_event_type: str | None = None,
 | 
						|
    *,
 | 
						|
    stream: str | None = None,
 | 
						|
    user_specified_topic: OptionalUserSpecifiedTopicStr = None,
 | 
						|
    only_events: Json[list[str]] | None = None,
 | 
						|
    exclude_events: Json[list[str]] | None = None,
 | 
						|
    unquote_url_parameters: bool = False,
 | 
						|
    no_previews: bool = False,
 | 
						|
) -> None:
 | 
						|
    if complete_event_type is not None and (
 | 
						|
        # Here, we implement Zulip's generic support for filtering
 | 
						|
        # events sent by the third-party service.
 | 
						|
        #
 | 
						|
        # If complete_event_type is passed to this function, we will check the event
 | 
						|
        # type against user configured lists of only_events and exclude events.
 | 
						|
        # If the event does not satisfy the configuration, the function will return
 | 
						|
        # without sending any messages.
 | 
						|
        #
 | 
						|
        # We match items in only_events and exclude_events using Unix
 | 
						|
        # shell-style wildcards.
 | 
						|
        (
 | 
						|
            only_events is not None
 | 
						|
            and all(not fnmatch.fnmatch(complete_event_type, pattern) for pattern in only_events)
 | 
						|
        )
 | 
						|
        or (
 | 
						|
            exclude_events is not None
 | 
						|
            and any(fnmatch.fnmatch(complete_event_type, pattern) for pattern in exclude_events)
 | 
						|
        )
 | 
						|
    ):
 | 
						|
        return
 | 
						|
 | 
						|
    client = RequestNotes.get_notes(request).client
 | 
						|
    assert client is not None
 | 
						|
    if stream is None:
 | 
						|
        assert user_profile.bot_owner is not None
 | 
						|
        check_send_private_message(
 | 
						|
            user_profile, client, user_profile.bot_owner, body, no_previews=no_previews
 | 
						|
        )
 | 
						|
    else:
 | 
						|
        # Some third-party websites (such as Atlassian's Jira), tend to
 | 
						|
        # double escape their URLs in a manner that escaped space characters
 | 
						|
        # (%20) are never properly decoded. We work around that by making sure
 | 
						|
        # that the URL parameters are decoded on our end.
 | 
						|
        if unquote_url_parameters:
 | 
						|
            stream = unquote(stream)
 | 
						|
 | 
						|
        if user_specified_topic is not None:
 | 
						|
            topic = user_specified_topic
 | 
						|
            if unquote_url_parameters:
 | 
						|
                topic = unquote(topic)
 | 
						|
 | 
						|
        try:
 | 
						|
            if stream.isdecimal():
 | 
						|
                check_send_stream_message_by_id(
 | 
						|
                    user_profile, client, int(stream), topic, body, no_previews=no_previews
 | 
						|
                )
 | 
						|
            else:
 | 
						|
                check_send_stream_message(
 | 
						|
                    user_profile, client, stream, topic, body, no_previews=no_previews
 | 
						|
                )
 | 
						|
        except StreamDoesNotExistError:
 | 
						|
            # A direct message will be sent to the bot_owner by check_message,
 | 
						|
            # notifying that the webhook bot just tried to send a message to a
 | 
						|
            # non-existent stream, so we don't need to re-raise it since it
 | 
						|
            # clutters up webhook-errors.log
 | 
						|
            pass
 | 
						|
 | 
						|
 | 
						|
def standardize_headers(input_headers: None | dict[str, Any]) -> dict[str, str]:
 | 
						|
    """This method can be used to standardize a dictionary of headers with
 | 
						|
    the standard format that Django expects. For reference, refer to:
 | 
						|
    https://docs.djangoproject.com/en/5.0/ref/request-response/#django.http.HttpRequest.headers
 | 
						|
 | 
						|
    NOTE: Historically, Django's headers were not case-insensitive. We're still
 | 
						|
    capitalizing our headers to make it easier to compare/search later if required.
 | 
						|
    """
 | 
						|
    canonical_headers = {}
 | 
						|
 | 
						|
    if not input_headers:
 | 
						|
        return {}
 | 
						|
 | 
						|
    for raw_header in input_headers:
 | 
						|
        polished_header = raw_header.upper().replace("-", "_")
 | 
						|
        if polished_header not in [
 | 
						|
            "CONTENT_TYPE",
 | 
						|
            "CONTENT_LENGTH",
 | 
						|
        ] and not polished_header.startswith("HTTP_"):
 | 
						|
            polished_header = "HTTP_" + polished_header
 | 
						|
        canonical_headers[polished_header] = str(input_headers[raw_header])
 | 
						|
 | 
						|
    return canonical_headers
 | 
						|
 | 
						|
 | 
						|
def validate_extract_webhook_http_header(
 | 
						|
    request: HttpRequest, header: str, integration_name: str
 | 
						|
) -> str:
 | 
						|
    assert request.user.is_authenticated
 | 
						|
 | 
						|
    extracted_header = request.headers.get(header)
 | 
						|
    if extracted_header is None:
 | 
						|
        message_body = MISSING_EVENT_HEADER_MESSAGE.format(
 | 
						|
            bot_name=request.user.full_name,
 | 
						|
            request_path=request.path,
 | 
						|
            header_name=header,
 | 
						|
            integration_name=integration_name,
 | 
						|
            support_email=FromAddress.SUPPORT,
 | 
						|
        )
 | 
						|
        send_rate_limited_pm_notification_to_bot_owner(
 | 
						|
            request.user, request.user.realm, message_body
 | 
						|
        )
 | 
						|
 | 
						|
        raise MissingHTTPEventHeaderError(header)
 | 
						|
 | 
						|
    return extracted_header
 | 
						|
 | 
						|
 | 
						|
def get_fixture_http_headers(integration_dir_name: str, fixture_name: str) -> dict["str", "str"]:
 | 
						|
    """For integrations that require custom HTTP headers for some (or all)
 | 
						|
    of their test fixtures, this method will call a specially named
 | 
						|
    function from the target integration module to determine what set
 | 
						|
    of HTTP headers goes with the given test fixture.
 | 
						|
    """
 | 
						|
    view_module_name = f"zerver.webhooks.{integration_dir_name}.view"
 | 
						|
    try:
 | 
						|
        # TODO: We may want to migrate to a more explicit registration
 | 
						|
        # strategy for this behavior rather than a try/except import.
 | 
						|
        view_module = importlib.import_module(view_module_name)
 | 
						|
        fixture_to_headers = view_module.fixture_to_headers
 | 
						|
    except (ImportError, AttributeError):
 | 
						|
        return {}
 | 
						|
    return fixture_to_headers(fixture_name)
 | 
						|
 | 
						|
 | 
						|
def get_http_headers_from_filename(http_header_key: str) -> Callable[[str], dict[str, str]]:
 | 
						|
    """If an integration requires an event type kind of HTTP header which can
 | 
						|
    be easily (statically) determined, then name the fixtures in the format
 | 
						|
    of "header_value__other_details" or even "header_value" and the use this
 | 
						|
    method in the headers.py file for the integration."""
 | 
						|
 | 
						|
    def fixture_to_headers(filename: str) -> dict[str, str]:
 | 
						|
        if "__" in filename:
 | 
						|
            event_type = filename.split("__")[0]
 | 
						|
        else:
 | 
						|
            event_type = filename
 | 
						|
        return {http_header_key: event_type}
 | 
						|
 | 
						|
    return fixture_to_headers
 | 
						|
 | 
						|
 | 
						|
def unix_milliseconds_to_timestamp(milliseconds: Any, webhook: str) -> datetime:
 | 
						|
    """If an integration requires time input in unix milliseconds, this helper
 | 
						|
    checks to ensure correct type and will catch any errors related to type or
 | 
						|
    value and raise a JsonableError.
 | 
						|
    Returns a datetime representing the time."""
 | 
						|
    try:
 | 
						|
        # timestamps are in milliseconds so divide by 1000
 | 
						|
        seconds = milliseconds / 1000
 | 
						|
        return timestamp_to_datetime(seconds)
 | 
						|
    except (ValueError, TypeError):
 | 
						|
        raise JsonableError(
 | 
						|
            _("The {webhook} webhook expects time in milliseconds.").format(webhook=webhook)
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def parse_multipart_string(body: str) -> dict[str, str]:
 | 
						|
    """
 | 
						|
    Converts multipart/form-data string (fixture) to dict
 | 
						|
    """
 | 
						|
    boundary = body.split("\n")[0][2:]
 | 
						|
    parts = body.split(f"--{boundary}")
 | 
						|
 | 
						|
    data = {}
 | 
						|
    for part in parts:
 | 
						|
        if part.strip() in ["", "--"]:
 | 
						|
            continue
 | 
						|
 | 
						|
        headers, body = part.split("\n\n", 1)
 | 
						|
        body = body.removesuffix("\n--")
 | 
						|
 | 
						|
        content_disposition = next(
 | 
						|
            (line for line in headers.splitlines() if "Content-Disposition" in line), ""
 | 
						|
        )
 | 
						|
        field_name = content_disposition.split('name="')[1].split('"')[0]
 | 
						|
        data[field_name] = body
 | 
						|
 | 
						|
    return data
 | 
						|
 | 
						|
 | 
						|
def validate_webhook_signature(
 | 
						|
    request: HttpRequest, payload: str, signature: str, algorithm: str = "sha256"
 | 
						|
) -> None:
 | 
						|
    if not settings.VERIFY_WEBHOOK_SIGNATURES:  # nocoverage
 | 
						|
        return
 | 
						|
 | 
						|
    if algorithm not in hashlib.algorithms_available:
 | 
						|
        raise AssertionError(
 | 
						|
            _("The algorithm '{algorithm}' is not supported.").format(algorithm=algorithm)
 | 
						|
        )
 | 
						|
 | 
						|
    webhook_secret: str | None = request.GET.get("webhook_secret")
 | 
						|
    if webhook_secret is None:
 | 
						|
        raise JsonableError(
 | 
						|
            _(
 | 
						|
                "The webhook secret is missing. Please set the webhook_secret while generating the URL."
 | 
						|
            )
 | 
						|
        )
 | 
						|
    webhook_secret_bytes = force_bytes(webhook_secret)
 | 
						|
    payload_bytes = force_bytes(payload)
 | 
						|
 | 
						|
    signed_payload = hmac.new(
 | 
						|
        webhook_secret_bytes,
 | 
						|
        payload_bytes,
 | 
						|
        algorithm,
 | 
						|
    ).hexdigest()
 | 
						|
 | 
						|
    if signed_payload != signature:
 | 
						|
        raise JsonableError(_("Webhook signature verification failed."))
 |