mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	This is a best-effort rendering of the "fields" of Slack incoming hooks, which Slack renders in two columns. We approximate them in a Markdown table, with some minor in-place replacements. Fixes #22228.
		
			
				
	
	
		
			246 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			246 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Webhooks for external integrations.
 | 
						|
import re
 | 
						|
from itertools import zip_longest
 | 
						|
from typing import Literal, Optional, TypedDict, cast
 | 
						|
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
 | 
						|
from zerver.decorator import webhook_view
 | 
						|
from zerver.lib.exceptions import JsonableError
 | 
						|
from zerver.lib.request import REQ, RequestVariableMissingError, has_request_variables
 | 
						|
from zerver.lib.response import json_success
 | 
						|
from zerver.lib.types import Validator
 | 
						|
from zerver.lib.validator import (
 | 
						|
    WildValue,
 | 
						|
    check_dict,
 | 
						|
    check_int,
 | 
						|
    check_list,
 | 
						|
    check_string,
 | 
						|
    check_string_in,
 | 
						|
    check_url,
 | 
						|
    to_wild_value,
 | 
						|
)
 | 
						|
from zerver.lib.webhooks.common import check_send_webhook_message
 | 
						|
from zerver.models import UserProfile
 | 
						|
 | 
						|
 | 
						|
@webhook_view("SlackIncoming")
 | 
						|
@has_request_variables
 | 
						|
def api_slack_incoming_webhook(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    user_specified_topic: Optional[str] = REQ("topic", default=None),
 | 
						|
) -> HttpResponse:
 | 
						|
 | 
						|
    # Slack accepts webhook payloads as payload="encoded json" as
 | 
						|
    # application/x-www-form-urlencoded, as well as in the body as
 | 
						|
    # application/json.
 | 
						|
    if request.content_type == "application/json":
 | 
						|
        try:
 | 
						|
            val = request.body.decode(request.encoding or "utf-8")
 | 
						|
        except UnicodeDecodeError:  # nocoverage
 | 
						|
            raise JsonableError(_("Malformed payload"))
 | 
						|
    else:
 | 
						|
        req_var = "payload"
 | 
						|
        if req_var in request.POST:
 | 
						|
            val = request.POST[req_var]
 | 
						|
        elif req_var in request.GET:  # nocoverage
 | 
						|
            val = request.GET[req_var]
 | 
						|
        else:
 | 
						|
            raise RequestVariableMissingError(req_var)
 | 
						|
 | 
						|
    payload = to_wild_value("payload", val)
 | 
						|
 | 
						|
    if user_specified_topic is None and "channel" in payload:
 | 
						|
        channel = payload["channel"].tame(check_string)
 | 
						|
        user_specified_topic = re.sub("^[@#]", "", channel)
 | 
						|
 | 
						|
    if user_specified_topic is None:
 | 
						|
        user_specified_topic = "(no topic)"
 | 
						|
 | 
						|
    pieces = []
 | 
						|
    if "blocks" in payload and payload["blocks"]:
 | 
						|
        for block in payload["blocks"]:
 | 
						|
            pieces.append(render_block(block))
 | 
						|
 | 
						|
    if "attachments" in payload and payload["attachments"]:
 | 
						|
        for attachment in payload["attachments"]:
 | 
						|
            pieces.append(render_attachment(attachment))
 | 
						|
 | 
						|
    body = "\n\n".join(piece.strip() for piece in pieces if piece.strip() != "")
 | 
						|
 | 
						|
    if body == "" and "text" in payload and payload["text"]:
 | 
						|
        if "icon_emoji" in payload and payload["icon_emoji"]:
 | 
						|
            body = payload["icon_emoji"].tame(check_string) + " "
 | 
						|
        body += payload["text"].tame(check_string)
 | 
						|
        body = body.strip()
 | 
						|
 | 
						|
    if body != "":
 | 
						|
        body = replace_formatting(replace_links(body).strip())
 | 
						|
        check_send_webhook_message(request, user_profile, user_specified_topic, body)
 | 
						|
    return json_success(request)
 | 
						|
 | 
						|
 | 
						|
def render_block(block: WildValue) -> str:
 | 
						|
    # https://api.slack.com/reference/block-kit/blocks
 | 
						|
    block_type = block["type"].tame(
 | 
						|
        check_string_in(["actions", "context", "divider", "header", "image", "input", "section"])
 | 
						|
    )
 | 
						|
    if block_type == "actions":
 | 
						|
        # Unhandled
 | 
						|
        return ""
 | 
						|
    elif block_type == "context" and block.get("elements"):
 | 
						|
        pieces = []
 | 
						|
        # Slack renders these pieces left-to-right, packed in as
 | 
						|
        # closely as possible.  We just render them above each other,
 | 
						|
        # for simplicity.
 | 
						|
        for element in block["elements"]:
 | 
						|
            element_type = element["type"].tame(check_string_in(["image", "plain_text", "mrkdwn"]))
 | 
						|
            if element_type == "image":
 | 
						|
                pieces.append(render_block_element(element))
 | 
						|
            else:
 | 
						|
                pieces.append(element.tame(check_text_block())["text"])
 | 
						|
        return "\n\n".join(piece.strip() for piece in pieces if piece.strip() != "")
 | 
						|
    elif block_type == "divider":
 | 
						|
        return "----"
 | 
						|
    elif block_type == "header":
 | 
						|
        return "## " + block["text"].tame(check_text_block(plain_text_only=True))["text"]
 | 
						|
    elif block_type == "image":
 | 
						|
        image_url = block["image_url"].tame(check_url)
 | 
						|
        alt_text = block["alt_text"].tame(check_string)
 | 
						|
        if "title" in block:
 | 
						|
            alt_text = block["title"].tame(check_text_block(plain_text_only=True))["text"]
 | 
						|
        return f"[{alt_text}]({image_url})"
 | 
						|
    elif block_type == "input":
 | 
						|
        # Unhandled
 | 
						|
        pass
 | 
						|
    elif block_type == "section":
 | 
						|
        pieces = []
 | 
						|
        if "text" in block:
 | 
						|
            pieces.append(block["text"].tame(check_text_block())["text"])
 | 
						|
 | 
						|
        if "accessory" in block:
 | 
						|
            pieces.append(render_block_element(block["accessory"]))
 | 
						|
 | 
						|
        if "fields" in block:
 | 
						|
            fields = block["fields"].tame(check_list(check_text_block()))
 | 
						|
            if len(fields) == 1:
 | 
						|
                # Special-case a single field to display a bit more
 | 
						|
                # nicely, without extraneous borders and limitations
 | 
						|
                # on its contents.
 | 
						|
                pieces.append(fields[0]["text"])
 | 
						|
            else:
 | 
						|
                # It is not possible to have newlines in a table, nor
 | 
						|
                # escape the pipes that make it up; replace them with
 | 
						|
                # whitespace.
 | 
						|
                field_text = [f["text"].replace("\n", " ").replace("|", " ") for f in fields]
 | 
						|
                # Because Slack formats this as two columns, but not
 | 
						|
                # necessarily a table with a bold header, we emit a
 | 
						|
                # blank header row first.
 | 
						|
                table = "| | |\n|-|-|\n"
 | 
						|
                # Then take the fields two-at-a-time to make the table
 | 
						|
                iters = [iter(field_text)] * 2
 | 
						|
                for left, right in zip_longest(*iters, fillvalue=""):
 | 
						|
                    table += f"| {left} | {right} |\n"
 | 
						|
                pieces.append(table)
 | 
						|
 | 
						|
        return "\n\n".join(piece.strip() for piece in pieces if piece.strip() != "")
 | 
						|
 | 
						|
    return ""
 | 
						|
 | 
						|
 | 
						|
class TextField(TypedDict):
 | 
						|
    text: str
 | 
						|
    type: Literal["plain_text", "mrkdwn"]
 | 
						|
 | 
						|
 | 
						|
def check_text_block(plain_text_only: bool = False) -> Validator[TextField]:
 | 
						|
    if plain_text_only:
 | 
						|
        type_validator = check_string_in(["plain_text"])
 | 
						|
    else:
 | 
						|
        type_validator = check_string_in(["plain_text", "mrkdwn"])
 | 
						|
 | 
						|
    def f(var_name: str, val: object) -> TextField:
 | 
						|
        block = check_dict(
 | 
						|
            [
 | 
						|
                ("type", type_validator),
 | 
						|
                ("text", check_string),
 | 
						|
            ],
 | 
						|
        )(var_name, val)
 | 
						|
 | 
						|
        return cast(TextField, block)
 | 
						|
 | 
						|
    return f
 | 
						|
 | 
						|
 | 
						|
def render_block_element(element: WildValue) -> str:
 | 
						|
    # https://api.slack.com/reference/block-kit/block-elements
 | 
						|
    # Zulip doesn't support interactive elements, so we only render images here
 | 
						|
    element_type = element["type"].tame(check_string)
 | 
						|
    if element_type == "image":
 | 
						|
        image_url = element["image_url"].tame(check_url)
 | 
						|
        alt_text = element["alt_text"].tame(check_string)
 | 
						|
        return f"[{alt_text}]({image_url})"
 | 
						|
    else:
 | 
						|
        # Unsupported
 | 
						|
        return ""
 | 
						|
 | 
						|
 | 
						|
def render_attachment(attachment: WildValue) -> str:
 | 
						|
    # https://api.slack.com/reference/messaging/attachments
 | 
						|
    # Slack recommends the usage of "blocks" even within attachments; the
 | 
						|
    # rest of the fields we handle here are legacy fields. These fields are
 | 
						|
    # optional and may contain null values.
 | 
						|
    pieces = []
 | 
						|
    if "title" in attachment and attachment["title"]:
 | 
						|
        title = attachment["title"].tame(check_string)
 | 
						|
        if "title_link" in attachment and attachment["title_link"]:
 | 
						|
            title_link = attachment["title_link"].tame(check_url)
 | 
						|
            pieces.append(f"## [{title}]({title_link})")
 | 
						|
        else:
 | 
						|
            pieces.append(f"## {title}")
 | 
						|
    if "pretext" in attachment and attachment["pretext"]:
 | 
						|
        pieces.append(attachment["pretext"].tame(check_string))
 | 
						|
    if "text" in attachment and attachment["text"]:
 | 
						|
        pieces.append(attachment["text"].tame(check_string))
 | 
						|
    if "fields" in attachment:
 | 
						|
        fields = []
 | 
						|
        for field in attachment["fields"]:
 | 
						|
            if "title" in field and "value" in field and field["title"] and field["value"]:
 | 
						|
                title = field["title"].tame(check_string)
 | 
						|
                value = field["value"].tame(check_string)
 | 
						|
                fields.append(f"*{title}*: {value}")
 | 
						|
            elif "title" in field and field["title"]:
 | 
						|
                title = field["title"].tame(check_string)
 | 
						|
                fields.append(f"*{title}*")
 | 
						|
            elif "value" in field and field["value"]:
 | 
						|
                value = field["value"].tame(check_string)
 | 
						|
                fields.append(f"{value}")
 | 
						|
        pieces.append("\n".join(fields))
 | 
						|
    if "blocks" in attachment and attachment["blocks"]:
 | 
						|
        for block in attachment["blocks"]:
 | 
						|
            pieces.append(render_block(block))
 | 
						|
    if "image_url" in attachment and attachment["image_url"]:
 | 
						|
        pieces.append("[]({})".format(attachment["image_url"].tame(check_url)))
 | 
						|
    if "footer" in attachment and attachment["footer"]:
 | 
						|
        pieces.append(attachment["footer"].tame(check_string))
 | 
						|
    if "ts" in attachment and attachment["ts"]:
 | 
						|
        time = attachment["ts"].tame(check_int)
 | 
						|
        pieces.append(f"<time:{time}>")
 | 
						|
 | 
						|
    return "\n\n".join(piece.strip() for piece in pieces if piece.strip() != "")
 | 
						|
 | 
						|
 | 
						|
def replace_links(text: str) -> str:
 | 
						|
    return re.sub(r"<(\w+?:\/\/.*?)\|(.*?)>", r"[\2](\1)", text)
 | 
						|
 | 
						|
 | 
						|
def replace_formatting(text: str) -> str:
 | 
						|
    # Slack uses *text* for bold, whereas Zulip interprets that as italics
 | 
						|
    text = re.sub(r"([^\w]|^)\*(?!\s+)([^\*\n]+)(?<!\s)\*((?=[^\w])|$)", r"\1**\2**\3", text)
 | 
						|
 | 
						|
    # Slack uses _text_ for emphasis, whereas Zulip interprets that as nothing
 | 
						|
    text = re.sub(r"([^\w]|^)[_](?!\s+)([^\_\n]+)(?<!\s)[_]((?=[^\w])|$)", r"\1*\2*\3", text)
 | 
						|
    return text
 |