mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			410 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			410 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
"""Create or update a webhook integration screenshot using a test fixture."""
 | 
						|
 | 
						|
import argparse
 | 
						|
import base64
 | 
						|
import os
 | 
						|
import re
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
from typing import Any
 | 
						|
from urllib.parse import parse_qsl, urlencode
 | 
						|
 | 
						|
import zulip
 | 
						|
 | 
						|
SCREENSHOTS_DIR = os.path.abspath(os.path.dirname(__file__))
 | 
						|
TOOLS_DIR = os.path.abspath(os.path.dirname(SCREENSHOTS_DIR))
 | 
						|
ROOT_DIR = os.path.dirname(TOOLS_DIR)
 | 
						|
sys.path.insert(0, ROOT_DIR)
 | 
						|
 | 
						|
# check for the venv
 | 
						|
from tools.lib import sanity_check
 | 
						|
 | 
						|
sanity_check.check_venv(__file__)
 | 
						|
 | 
						|
from scripts.lib.setup_path import setup_path
 | 
						|
 | 
						|
setup_path()
 | 
						|
 | 
						|
os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings"
 | 
						|
import django
 | 
						|
 | 
						|
django.setup()
 | 
						|
 | 
						|
import orjson
 | 
						|
import requests
 | 
						|
 | 
						|
from scripts.lib.zulip_tools import BOLDRED, ENDC
 | 
						|
from tools.lib.test_script import prepare_puppeteer_run
 | 
						|
from zerver.actions.create_user import do_create_user, notify_created_bot
 | 
						|
from zerver.actions.streams import bulk_add_subscriptions
 | 
						|
from zerver.actions.user_settings import do_change_avatar_fields
 | 
						|
from zerver.lib.integrations import (
 | 
						|
    DOC_SCREENSHOT_CONFIG,
 | 
						|
    FIXTURELESS_SCREENSHOT_CONFIG,
 | 
						|
    INTEGRATIONS,
 | 
						|
    WEBHOOK_SCREENSHOT_CONFIG,
 | 
						|
    FixturelessScreenshotConfig,
 | 
						|
    Integration,
 | 
						|
    WebhookIntegration,
 | 
						|
    WebhookScreenshotConfig,
 | 
						|
    get_fixture_path,
 | 
						|
    get_image_path,
 | 
						|
    split_fixture_path,
 | 
						|
)
 | 
						|
from zerver.lib.storage import static_path
 | 
						|
from zerver.lib.streams import create_stream_if_needed
 | 
						|
from zerver.lib.upload import upload_avatar_image
 | 
						|
from zerver.lib.webhooks.common import get_fixture_http_headers, parse_multipart_string
 | 
						|
from zerver.models import Message, UserProfile
 | 
						|
from zerver.models.realms import get_realm
 | 
						|
from zerver.models.users import get_user_by_delivery_email
 | 
						|
 | 
						|
 | 
						|
def create_integration_bot(integration: Integration, bot_name: str | None = None) -> UserProfile:
 | 
						|
    realm = get_realm("zulip")
 | 
						|
    owner = get_user_by_delivery_email("iago@zulip.com", realm)
 | 
						|
    email_prefix = (
 | 
						|
        re.sub(r"[^a-zA-Z0-9_-]", "", bot_name.strip()).lower()[:64]
 | 
						|
        if bot_name
 | 
						|
        else integration.name
 | 
						|
    )
 | 
						|
    bot_email = f"{email_prefix}-bot@example.com"
 | 
						|
    if bot_name is None:
 | 
						|
        bot_name = f"{integration.display_name} Bot"
 | 
						|
    try:
 | 
						|
        bot = UserProfile.objects.get(email=bot_email)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        bot = do_create_user(
 | 
						|
            email=bot_email,
 | 
						|
            password="123",
 | 
						|
            realm=owner.realm,
 | 
						|
            full_name=bot_name,
 | 
						|
            bot_type=UserProfile.INCOMING_WEBHOOK_BOT,
 | 
						|
            bot_owner=owner,
 | 
						|
            acting_user=owner,
 | 
						|
        )
 | 
						|
        notify_created_bot(bot)
 | 
						|
 | 
						|
        bot_avatar_path = integration.get_bot_avatar_path()
 | 
						|
        if bot_avatar_path is not None:
 | 
						|
            bot_avatar_path = static_path(bot_avatar_path)
 | 
						|
            if os.path.isfile(bot_avatar_path):
 | 
						|
                with open(bot_avatar_path, "rb") as f:
 | 
						|
                    upload_avatar_image(f, bot)
 | 
						|
                    do_change_avatar_fields(bot, UserProfile.AVATAR_FROM_USER, acting_user=owner)
 | 
						|
 | 
						|
    return bot
 | 
						|
 | 
						|
 | 
						|
def create_integration_stream(integration: Integration, bot: UserProfile) -> None:
 | 
						|
    assert isinstance(bot.bot_owner, UserProfile)
 | 
						|
    realm = bot.bot_owner.realm
 | 
						|
    stream, _created = create_stream_if_needed(realm, integration.stream_name)
 | 
						|
    bulk_add_subscriptions(realm, [stream], [bot, bot.bot_owner], acting_user=bot)
 | 
						|
 | 
						|
 | 
						|
def get_fixture_info(fixture_path: str) -> tuple[Any, bool, bool, str]:
 | 
						|
    json_fixture = fixture_path.endswith(".json")
 | 
						|
    multipart_fixture = fixture_path.endswith(".multipart")
 | 
						|
    _, fixture_name = split_fixture_path(fixture_path)
 | 
						|
 | 
						|
    if fixture_name:
 | 
						|
        if not os.path.exists(fixture_path):
 | 
						|
            raise FileNotFoundError(
 | 
						|
                f"{BOLDRED}The fixture '{fixture_path}' does not exist. Please check the fixture file name and try again.{ENDC}"
 | 
						|
            )
 | 
						|
        if json_fixture:
 | 
						|
            with open(fixture_path, "rb") as fb:
 | 
						|
                data = orjson.loads(fb.read())
 | 
						|
        else:
 | 
						|
            with open(fixture_path) as f:
 | 
						|
                data = f.read().strip()
 | 
						|
            if multipart_fixture:
 | 
						|
                data = parse_multipart_string(data)
 | 
						|
    else:
 | 
						|
        data = ""
 | 
						|
 | 
						|
    return data, json_fixture, multipart_fixture, fixture_name
 | 
						|
 | 
						|
 | 
						|
def get_requests_headers(integration_dir_name: str, fixture_name: str) -> dict[str, Any]:
 | 
						|
    headers = get_fixture_http_headers(integration_dir_name, fixture_name)
 | 
						|
 | 
						|
    def fix_name(header: str) -> str:
 | 
						|
        return header.removeprefix("HTTP_").replace("_", "-")
 | 
						|
 | 
						|
    return {fix_name(k): v for k, v in headers.items()}
 | 
						|
 | 
						|
 | 
						|
def custom_headers(headers_json: str) -> dict[str, str]:
 | 
						|
    if not headers_json:
 | 
						|
        return {}
 | 
						|
    try:
 | 
						|
        return orjson.loads(headers_json)
 | 
						|
    except orjson.JSONDecodeError as ve:
 | 
						|
        raise argparse.ArgumentTypeError(
 | 
						|
            f"Encountered an error while attempting to parse custom headers: {ve}\n"
 | 
						|
            "Note: all strings must be enclosed within \"\" instead of ''"
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def send_bot_payload_message(
 | 
						|
    bot: UserProfile,
 | 
						|
    integration: WebhookIntegration,
 | 
						|
    fixture_path: str,
 | 
						|
    config: WebhookScreenshotConfig,
 | 
						|
) -> bool:
 | 
						|
    # Delete all messages, so new message is the only one it's message group
 | 
						|
    Message.objects.filter(realm_id=bot.realm_id, sender=bot).delete()
 | 
						|
    data, json_fixture, multipart_fixture, fixture_name = get_fixture_info(fixture_path)
 | 
						|
 | 
						|
    headers = get_requests_headers(integration.dir_name, fixture_name)
 | 
						|
    if config.custom_headers:
 | 
						|
        headers.update(config.custom_headers)
 | 
						|
    if config.use_basic_auth:
 | 
						|
        credentials = base64.b64encode(f"{bot.email}:{bot.api_key}".encode()).decode()
 | 
						|
        auth = f"basic {credentials}"
 | 
						|
        headers.update(dict(Authorization=auth))
 | 
						|
 | 
						|
    assert isinstance(bot.bot_owner, UserProfile)
 | 
						|
    stream = integration.stream_name or "devel"
 | 
						|
    url = f"{bot.bot_owner.realm.url}/{integration.url}"
 | 
						|
    params = {"api_key": bot.api_key, "stream": stream}
 | 
						|
    params.update(config.extra_params)
 | 
						|
 | 
						|
    extra_args = {}
 | 
						|
    if multipart_fixture:
 | 
						|
        extra_args = {"data": data}
 | 
						|
 | 
						|
    elif not json_fixture and data:
 | 
						|
        assert isinstance(data, str)
 | 
						|
 | 
						|
        # fixtures with url parameters
 | 
						|
        if "&" in data:
 | 
						|
            # Overwrite the fixture params, in case of overlap.
 | 
						|
            parsed_params = dict(parse_qsl(data))
 | 
						|
            parsed_params.update(params)
 | 
						|
            params = parsed_params
 | 
						|
 | 
						|
        # fixtures with plain/text payload
 | 
						|
        else:
 | 
						|
            extra_args = {"data": data}
 | 
						|
 | 
						|
    elif config.payload_as_query_param:
 | 
						|
        params[config.payload_param_name] = orjson.dumps(data).decode()
 | 
						|
 | 
						|
    else:
 | 
						|
        extra_args = {"json": data}
 | 
						|
 | 
						|
    url = f"{url}?{urlencode(params)}"
 | 
						|
 | 
						|
    try:
 | 
						|
        response = requests.post(url=url, headers=headers, **extra_args)
 | 
						|
    except requests.exceptions.ConnectionError:
 | 
						|
        print(
 | 
						|
            "This tool needs the local dev server to be running. "
 | 
						|
            "Please start it using tools/run-dev before running this tool."
 | 
						|
        )
 | 
						|
        sys.exit(1)
 | 
						|
    if response.status_code != 200:
 | 
						|
        print(response.json())
 | 
						|
        print("Failed to trigger webhook")
 | 
						|
        return False
 | 
						|
    else:
 | 
						|
        print(f"Triggered {integration.name} webhook")
 | 
						|
        return True
 | 
						|
 | 
						|
 | 
						|
def send_bot_mock_message(bot: UserProfile, channel: str, topic: str, message: str) -> None:
 | 
						|
    # Delete all messages, so that the new message is the only one in its message group
 | 
						|
    Message.objects.filter(realm_id=bot.realm_id, sender=bot).delete()
 | 
						|
    assert bot.bot_owner is not None
 | 
						|
    url = f"{bot.bot_owner.realm.url}"
 | 
						|
    client = zulip.Client(email=bot.email, api_key=bot.api_key, site=url)
 | 
						|
 | 
						|
    request = {"type": "stream", "to": channel, "topic": topic, "content": message}
 | 
						|
    client.send_message(request)
 | 
						|
 | 
						|
 | 
						|
def capture_last_message_screenshot(bot: UserProfile, image_path: str) -> None:
 | 
						|
    message = Message.objects.filter(realm_id=bot.realm_id, sender=bot).last()
 | 
						|
    realm = get_realm("zulip")
 | 
						|
    if message is None:
 | 
						|
        print(f"No message found for {bot.full_name}")
 | 
						|
        return
 | 
						|
    message_id = str(message.id)
 | 
						|
    screenshot_script = os.path.join(SCREENSHOTS_DIR, "message-screenshot.ts")
 | 
						|
    subprocess.check_call(["node", screenshot_script, message_id, image_path, realm.url])
 | 
						|
 | 
						|
 | 
						|
def generate_screenshot_from_config(
 | 
						|
    integration_name: str, screenshot_config: WebhookScreenshotConfig | FixturelessScreenshotConfig
 | 
						|
) -> None:
 | 
						|
    integration = INTEGRATIONS[integration_name]
 | 
						|
    bot = create_integration_bot(integration, screenshot_config.bot_name)
 | 
						|
    create_integration_stream(integration, bot)
 | 
						|
    image_path = get_image_path(integration, screenshot_config)
 | 
						|
 | 
						|
    if isinstance(integration, WebhookIntegration):
 | 
						|
        assert isinstance(screenshot_config, WebhookScreenshotConfig)
 | 
						|
        fixture_path = get_fixture_path(integration, screenshot_config)
 | 
						|
        message_sent = send_bot_payload_message(bot, integration, fixture_path, screenshot_config)
 | 
						|
    else:
 | 
						|
        assert isinstance(screenshot_config, FixturelessScreenshotConfig)
 | 
						|
        create_integration_stream(integration, bot)
 | 
						|
        send_bot_mock_message(
 | 
						|
            bot,
 | 
						|
            channel=screenshot_config.channel or integration.stream_name,
 | 
						|
            topic=screenshot_config.topic,
 | 
						|
            message=screenshot_config.message,
 | 
						|
        )
 | 
						|
        message_sent = True
 | 
						|
 | 
						|
    if message_sent:
 | 
						|
        capture_last_message_screenshot(bot, image_path)
 | 
						|
        print(f"Screenshot captured to: {BOLDRED}{image_path}{ENDC}")
 | 
						|
 | 
						|
 | 
						|
parser = argparse.ArgumentParser()
 | 
						|
 | 
						|
batch_group = parser.add_mutually_exclusive_group(required=True)
 | 
						|
batch_group.add_argument("--all", action="store_true")
 | 
						|
batch_group.add_argument("--all-webhook", action="store_true")
 | 
						|
batch_group.add_argument("--all-fixtureless", action="store_true")
 | 
						|
batch_group.add_argument(
 | 
						|
    "--skip-until",
 | 
						|
    help="Name of the integration to start processing from, including all that follow in alphabetical order. Similar to --all",
 | 
						|
)
 | 
						|
batch_group.add_argument("--integration", nargs="+", help="Names of specific integrations")
 | 
						|
 | 
						|
common_group = parser.add_argument_group()
 | 
						|
common_group.add_argument("--image-name", help="Name for the screenshot image")
 | 
						|
common_group.add_argument("--image-dir", help="Directory name where to save the screenshot image")
 | 
						|
common_group.add_argument("--bot-name", help="Name to use for the bot")
 | 
						|
 | 
						|
webhook_group = parser.add_argument_group("Webhook integrations options")
 | 
						|
webhook_group.add_argument(
 | 
						|
    "--fixture-name",
 | 
						|
    help="Name of the fixture file to use. All other webhook options are ignored if the fixture is not specified.",
 | 
						|
)
 | 
						|
webhook_group.add_argument(
 | 
						|
    "-A", "--use-basic-auth", action="store_true", help="Add basic auth headers to the request"
 | 
						|
)
 | 
						|
webhook_group.add_argument(
 | 
						|
    "-Q",
 | 
						|
    "--payload-as-query-param",
 | 
						|
    action="store_true",
 | 
						|
    help="Send payload as query param instead of body",
 | 
						|
)
 | 
						|
webhook_group.add_argument("-P", "--payload-param-name", help="Param name to use for the payload")
 | 
						|
webhook_group.add_argument(
 | 
						|
    "-H",
 | 
						|
    "--custom-headers",
 | 
						|
    type=custom_headers,
 | 
						|
    help="Any additional headers to be sent with the request.",
 | 
						|
)
 | 
						|
 | 
						|
fixtureless_group = parser.add_argument_group("Fixtureless non-webhook integrations options")
 | 
						|
fixtureless_group.add_argument("-T", "--topic", help="Topic to use for the mock message")
 | 
						|
fixtureless_group.add_argument("-M", "--message", help="Message to use for the mock message")
 | 
						|
fixtureless_group.add_argument(
 | 
						|
    "-C",
 | 
						|
    "--channel",
 | 
						|
    help="Channel name to use for the mock message. Defaults to stream name. Ignored if --topic and --message are not specified.",
 | 
						|
)
 | 
						|
 | 
						|
options = parser.parse_args()
 | 
						|
prepare_puppeteer_run()
 | 
						|
 | 
						|
 | 
						|
def validate_integration_count(
 | 
						|
    options: argparse.Namespace, parser: argparse.ArgumentParser, option_name: str
 | 
						|
) -> None:
 | 
						|
    if len(options.integration) != 1:
 | 
						|
        parser.error(
 | 
						|
            f"Exactly one integration should be specified with --integration when --{option_name} is provided"
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def build_config(options: argparse.Namespace, config_keys: list[str]) -> dict[str, Any]:
 | 
						|
    config = {
 | 
						|
        key: getattr(options, key) for key in config_keys if getattr(options, key) is not None
 | 
						|
    }
 | 
						|
    return config
 | 
						|
 | 
						|
 | 
						|
def generate_full_batch_screenshots(
 | 
						|
    option_key: str,
 | 
						|
    config_dict: dict[str, list[WebhookScreenshotConfig]]
 | 
						|
    | dict[str, list[FixturelessScreenshotConfig]]
 | 
						|
    | dict[str, list[WebhookScreenshotConfig] | list[FixturelessScreenshotConfig]],
 | 
						|
    batch_type: str,
 | 
						|
) -> None:
 | 
						|
    for key, value in vars(options).items():
 | 
						|
        if key != option_key and value:
 | 
						|
            print(f"Generating screenshots for {batch_type}. Ignoring all command-line options")
 | 
						|
            break
 | 
						|
    for integration_name, screenshot_configs in config_dict.items():
 | 
						|
        for screenshot_config in screenshot_configs:
 | 
						|
            generate_screenshot_from_config(integration_name, screenshot_config)
 | 
						|
 | 
						|
 | 
						|
if options.all:
 | 
						|
    generate_full_batch_screenshots("all", DOC_SCREENSHOT_CONFIG, "all integrations")
 | 
						|
elif options.all_webhook:
 | 
						|
    generate_full_batch_screenshots(
 | 
						|
        "all_webhook", WEBHOOK_SCREENSHOT_CONFIG, "all webhook integrations"
 | 
						|
    )
 | 
						|
elif options.all_fixtureless:
 | 
						|
    generate_full_batch_screenshots(
 | 
						|
        "all_fixtureless", FIXTURELESS_SCREENSHOT_CONFIG, "all fixtureless integrations"
 | 
						|
    )
 | 
						|
elif options.skip_until:
 | 
						|
    for key, value in vars(options).items():
 | 
						|
        if key != "skip_until" and value:
 | 
						|
            print(
 | 
						|
                f"Generating screenshots for all integrations starting from {options.skip_until}. Ignoring all command-line options"
 | 
						|
            )
 | 
						|
            break
 | 
						|
    skip = True
 | 
						|
    for integration_name, screenshot_configs in DOC_SCREENSHOT_CONFIG.items():
 | 
						|
        if integration_name == options.skip_until:
 | 
						|
            skip = False
 | 
						|
        if skip:
 | 
						|
            continue
 | 
						|
        for screenshot_config in screenshot_configs:
 | 
						|
            generate_screenshot_from_config(integration_name, screenshot_config)
 | 
						|
 | 
						|
elif options.fixture_name:
 | 
						|
    validate_integration_count(options, parser, "fixture-name")
 | 
						|
    webhook_options = [action.dest for action in webhook_group._group_actions]
 | 
						|
    common_options = [action.dest for action in common_group._group_actions]
 | 
						|
    config = build_config(options, webhook_options + common_options)
 | 
						|
    screenshot_config = WebhookScreenshotConfig(**config)
 | 
						|
    generate_screenshot_from_config(options.integration[0], screenshot_config)
 | 
						|
 | 
						|
elif options.topic or options.message:
 | 
						|
    if not options.topic or not options.message:
 | 
						|
        parser.error("Both --topic and --message must be specified together")
 | 
						|
    validate_integration_count(options, parser, "topic")
 | 
						|
    fixtureless_options = [action.dest for action in fixtureless_group._group_actions]
 | 
						|
    common_options = [action.dest for action in common_group._group_actions]
 | 
						|
    config = build_config(options, fixtureless_options + common_options)
 | 
						|
    fixtureless_screenshot_config = FixturelessScreenshotConfig(**config)
 | 
						|
    generate_screenshot_from_config(options.integration[0], fixtureless_screenshot_config)
 | 
						|
 | 
						|
elif options.integration:
 | 
						|
    for integration in options.integration:
 | 
						|
        assert integration in DOC_SCREENSHOT_CONFIG
 | 
						|
        configs = DOC_SCREENSHOT_CONFIG[integration]
 | 
						|
        for screenshot_config in configs:
 | 
						|
            generate_screenshot_from_config(integration, screenshot_config)
 | 
						|
 | 
						|
else:
 | 
						|
    parser.error(
 | 
						|
        "Could not find configuration for integration. "
 | 
						|
        "You can specify a fixture file to use, using the --fixture flag. "
 | 
						|
        "Or add a configuration to zerver.lib.integrations.DOC_SCREENSHOT_CONFIG",
 | 
						|
    )
 |