mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 05:23:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			317 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			317 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
"""Create or update a webhook integration screenshot using a test fixture."""
 | 
						|
 | 
						|
import argparse
 | 
						|
import base64
 | 
						|
import os
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
from typing import Any, Dict, Optional, Tuple
 | 
						|
from urllib.parse import parse_qsl, urlencode
 | 
						|
 | 
						|
TOOLS_DIR = os.path.abspath(os.path.dirname(__file__))
 | 
						|
ROOT_DIR = os.path.dirname(TOOLS_DIR)
 | 
						|
sys.path.insert(0, ROOT_DIR)
 | 
						|
 | 
						|
# check for the venv
 | 
						|
from tools.lib import sanity_check
 | 
						|
 | 
						|
sanity_check.check_venv(__file__)
 | 
						|
 | 
						|
from scripts.lib.setup_path import setup_path
 | 
						|
 | 
						|
setup_path()
 | 
						|
 | 
						|
os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings"
 | 
						|
import django
 | 
						|
 | 
						|
django.setup()
 | 
						|
 | 
						|
import orjson
 | 
						|
import requests
 | 
						|
import zulip
 | 
						|
 | 
						|
from scripts.lib.zulip_tools import BOLDRED, ENDC
 | 
						|
from tools.lib.test_script import prepare_puppeteer_run
 | 
						|
from zerver.actions.create_user import do_create_user, notify_created_bot
 | 
						|
from zerver.actions.streams import bulk_add_subscriptions
 | 
						|
from zerver.actions.user_settings import do_change_avatar_fields
 | 
						|
from zerver.lib.integrations import (
 | 
						|
    DOC_SCREENSHOT_CONFIG,
 | 
						|
    INTEGRATIONS,
 | 
						|
    BaseScreenshotConfig,
 | 
						|
    Integration,
 | 
						|
    ScreenshotConfig,
 | 
						|
    WebhookIntegration,
 | 
						|
    get_fixture_and_image_paths,
 | 
						|
    split_fixture_path,
 | 
						|
)
 | 
						|
from zerver.lib.storage import static_path
 | 
						|
from zerver.lib.streams import create_stream_if_needed
 | 
						|
from zerver.lib.upload import upload_avatar_image
 | 
						|
from zerver.lib.webhooks.common import get_fixture_http_headers
 | 
						|
from zerver.models import Message, UserProfile, get_realm, get_user_by_delivery_email
 | 
						|
 | 
						|
 | 
						|
def create_integration_bot(integration: Integration, bot_name: Optional[str] = None) -> UserProfile:
 | 
						|
    realm = get_realm("zulip")
 | 
						|
    owner = get_user_by_delivery_email("iago@zulip.com", realm)
 | 
						|
    bot_email = f"{integration.name}-bot@example.com"
 | 
						|
    if bot_name is None:
 | 
						|
        bot_name = f"{integration.name.capitalize()} Bot"
 | 
						|
    try:
 | 
						|
        bot = UserProfile.objects.get(email=bot_email)
 | 
						|
    except UserProfile.DoesNotExist:
 | 
						|
        bot = do_create_user(
 | 
						|
            email=bot_email,
 | 
						|
            password="123",
 | 
						|
            realm=owner.realm,
 | 
						|
            full_name=bot_name,
 | 
						|
            bot_type=UserProfile.INCOMING_WEBHOOK_BOT,
 | 
						|
            bot_owner=owner,
 | 
						|
            acting_user=owner,
 | 
						|
        )
 | 
						|
        notify_created_bot(bot)
 | 
						|
 | 
						|
        bot_avatar_path = integration.get_bot_avatar_path()
 | 
						|
        if bot_avatar_path is not None:
 | 
						|
            bot_avatar_path = static_path(bot_avatar_path)
 | 
						|
            if os.path.isfile(bot_avatar_path):
 | 
						|
                with open(bot_avatar_path, "rb") as f:
 | 
						|
                    upload_avatar_image(f, owner, bot)
 | 
						|
                    do_change_avatar_fields(bot, UserProfile.AVATAR_FROM_USER, acting_user=owner)
 | 
						|
 | 
						|
    return bot
 | 
						|
 | 
						|
 | 
						|
def create_integration_stream(integration: Integration, bot: UserProfile) -> None:
 | 
						|
    assert isinstance(bot.bot_owner, UserProfile)
 | 
						|
    realm = bot.bot_owner.realm
 | 
						|
    stream, created = create_stream_if_needed(realm, integration.stream_name)
 | 
						|
    bulk_add_subscriptions(realm, [stream], [bot, bot.bot_owner], acting_user=bot)
 | 
						|
 | 
						|
 | 
						|
def get_fixture_info(fixture_path: str) -> Tuple[Any, bool, str]:
 | 
						|
    json_fixture = fixture_path.endswith(".json")
 | 
						|
    _, fixture_name = split_fixture_path(fixture_path)
 | 
						|
 | 
						|
    if fixture_name:
 | 
						|
        if json_fixture:
 | 
						|
            with open(fixture_path, "rb") as fb:
 | 
						|
                data = orjson.loads(fb.read())
 | 
						|
        else:
 | 
						|
            with open(fixture_path) as f:
 | 
						|
                data = f.read().strip()
 | 
						|
    else:
 | 
						|
        data = ""
 | 
						|
 | 
						|
    return data, json_fixture, fixture_name
 | 
						|
 | 
						|
 | 
						|
def get_integration(integration_name: str) -> Integration:
 | 
						|
    integration = INTEGRATIONS[integration_name]
 | 
						|
    return integration
 | 
						|
 | 
						|
 | 
						|
def get_requests_headers(integration_name: str, fixture_name: str) -> Dict[str, Any]:
 | 
						|
    headers = get_fixture_http_headers(integration_name, fixture_name)
 | 
						|
 | 
						|
    def fix_name(header: str) -> str:
 | 
						|
        header = header if not header.startswith("HTTP_") else header[len("HTTP_") :]
 | 
						|
        return header.replace("_", "-")
 | 
						|
 | 
						|
    return {fix_name(k): v for k, v in headers.items()}
 | 
						|
 | 
						|
 | 
						|
def custom_headers(headers_json: str) -> Dict[str, str]:
 | 
						|
    if not headers_json:
 | 
						|
        return {}
 | 
						|
    try:
 | 
						|
        return orjson.loads(headers_json)
 | 
						|
    except orjson.JSONDecodeError as ve:
 | 
						|
        raise argparse.ArgumentTypeError(
 | 
						|
            "Encountered an error while attempting to parse custom headers: {}\n"
 | 
						|
            "Note: all strings must be enclosed within \"\" instead of ''".format(ve)
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
def send_bot_mock_message(
 | 
						|
    bot: UserProfile, integration: Integration, fixture_path: str, config: BaseScreenshotConfig
 | 
						|
) -> None:
 | 
						|
    # Delete all messages, so new message is the only one it's message group
 | 
						|
    Message.objects.filter(sender=bot).delete()
 | 
						|
    data, _, _ = get_fixture_info(fixture_path)
 | 
						|
 | 
						|
    assert bot.bot_owner is not None
 | 
						|
    url = f"{bot.bot_owner.realm.uri}"
 | 
						|
    client = zulip.Client(email=bot.email, api_key=bot.api_key, site=url)
 | 
						|
 | 
						|
    try:
 | 
						|
        request = {
 | 
						|
            "type": "stream",
 | 
						|
            "to": integration.stream_name,
 | 
						|
            "topic": data["subject"],
 | 
						|
            "content": data["body"],
 | 
						|
        }
 | 
						|
        client.send_message(request)
 | 
						|
    except KeyError:
 | 
						|
        print(
 | 
						|
            "{} contains invalid configuration. "
 | 
						|
            'Fields "subject" and "body" are required for non-webhook integrations.'.format(
 | 
						|
                fixture_path
 | 
						|
            )
 | 
						|
        )
 | 
						|
        sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
def send_bot_payload_message(
 | 
						|
    bot: UserProfile, integration: WebhookIntegration, fixture_path: str, config: ScreenshotConfig
 | 
						|
) -> bool:
 | 
						|
    # Delete all messages, so new message is the only one it's message group
 | 
						|
    Message.objects.filter(sender=bot).delete()
 | 
						|
    data, json_fixture, fixture_name = get_fixture_info(fixture_path)
 | 
						|
 | 
						|
    headers = get_requests_headers(integration.name, fixture_name)
 | 
						|
    headers.update(config.custom_headers)
 | 
						|
    if config.use_basic_auth:
 | 
						|
        credentials = base64.b64encode(f"{bot.email}:{bot.api_key}".encode()).decode()
 | 
						|
        auth = f"basic {credentials}"
 | 
						|
        headers.update(dict(Authorization=auth))
 | 
						|
 | 
						|
    assert isinstance(bot.bot_owner, UserProfile)
 | 
						|
    stream = integration.stream_name or "devel"
 | 
						|
    url = f"{bot.bot_owner.realm.uri}/{integration.url}"
 | 
						|
    params = {"api_key": bot.api_key, "stream": stream}
 | 
						|
    params.update(config.extra_params)
 | 
						|
 | 
						|
    extra_args = {}
 | 
						|
    if not json_fixture and data:
 | 
						|
        # We overwrite any params in fixture with our params. stream name, for
 | 
						|
        # example, may be defined in the fixture.
 | 
						|
        assert isinstance(data, str)
 | 
						|
        parsed_params = dict(parse_qsl(data))
 | 
						|
        parsed_params.update(params)
 | 
						|
        params = parsed_params
 | 
						|
 | 
						|
    elif config.payload_as_query_param:
 | 
						|
        params[config.payload_param_name] = orjson.dumps(data).decode()
 | 
						|
 | 
						|
    else:
 | 
						|
        extra_args = {"json": data}
 | 
						|
 | 
						|
    url = f"{url}?{urlencode(params)}"
 | 
						|
 | 
						|
    try:
 | 
						|
        response = requests.post(url=url, headers=headers, **extra_args)
 | 
						|
    except requests.exceptions.ConnectionError:
 | 
						|
        print(
 | 
						|
            "This tool needs the local dev server to be running. "
 | 
						|
            "Please start it using tools/run-dev before running this tool."
 | 
						|
        )
 | 
						|
        sys.exit(1)
 | 
						|
    if response.status_code != 200:
 | 
						|
        print(response.json())
 | 
						|
        print("Failed to trigger webhook")
 | 
						|
        return False
 | 
						|
    else:
 | 
						|
        print(f"Triggered {integration.name} webhook")
 | 
						|
        return True
 | 
						|
 | 
						|
 | 
						|
def capture_last_message_screenshot(bot: UserProfile, image_path: str) -> None:
 | 
						|
    message = Message.objects.filter(sender=bot).last()
 | 
						|
    realm = get_realm("zulip")
 | 
						|
    if message is None:
 | 
						|
        print(f"No message found for {bot.full_name}")
 | 
						|
        return
 | 
						|
    message_id = str(message.id)
 | 
						|
    screenshot_script = os.path.join(TOOLS_DIR, "message-screenshot.js")
 | 
						|
    subprocess.check_call(["node", screenshot_script, message_id, image_path, realm.uri])
 | 
						|
 | 
						|
 | 
						|
def generate_screenshot_from_config(
 | 
						|
    integration_name: str, screenshot_config: BaseScreenshotConfig
 | 
						|
) -> None:
 | 
						|
    integration = get_integration(integration_name)
 | 
						|
    fixture_path, image_path = get_fixture_and_image_paths(integration, screenshot_config)
 | 
						|
    bot = create_integration_bot(integration, screenshot_config.bot_name)
 | 
						|
    create_integration_stream(integration, bot)
 | 
						|
    if isinstance(integration, WebhookIntegration):
 | 
						|
        assert isinstance(
 | 
						|
            screenshot_config, ScreenshotConfig
 | 
						|
        ), "Webhook integrations require ScreenshotConfig"
 | 
						|
        message_sent = send_bot_payload_message(bot, integration, fixture_path, screenshot_config)
 | 
						|
    else:
 | 
						|
        send_bot_mock_message(bot, integration, fixture_path, screenshot_config)
 | 
						|
        message_sent = True
 | 
						|
    if message_sent:
 | 
						|
        capture_last_message_screenshot(bot, image_path)
 | 
						|
        print(f"Screenshot captured to: {BOLDRED}{image_path}{ENDC}")
 | 
						|
 | 
						|
 | 
						|
parser = argparse.ArgumentParser()
 | 
						|
group = parser.add_mutually_exclusive_group(required=True)
 | 
						|
group.add_argument("--all", action="store_true")
 | 
						|
group.add_argument("--integration", help="Name of the integration")
 | 
						|
parser.add_argument("--fixture", help="Name of the fixture file to use")
 | 
						|
parser.add_argument("--image-name", help="Name for the screenshot image")
 | 
						|
parser.add_argument("--image-dir", help="Directory name where to save the screenshot image")
 | 
						|
parser.add_argument("--bot-name", help="Name to use for the bot")
 | 
						|
parser.add_argument(
 | 
						|
    "-A", "--use-basic-auth", action="store_true", help="Add basic auth headers to the request"
 | 
						|
)
 | 
						|
parser.add_argument(
 | 
						|
    "-Q",
 | 
						|
    "--payload-as-query-param",
 | 
						|
    action="store_true",
 | 
						|
    help="Send payload as query param instead of body",
 | 
						|
)
 | 
						|
parser.add_argument("-P", "--payload-param-name", help="Param name to use for the payload")
 | 
						|
parser.add_argument(
 | 
						|
    "-H",
 | 
						|
    "--custom-headers",
 | 
						|
    type=custom_headers,
 | 
						|
    help="Any additional headers to be sent with the request.",
 | 
						|
)
 | 
						|
 | 
						|
options = parser.parse_args()
 | 
						|
prepare_puppeteer_run()
 | 
						|
 | 
						|
if options.all:
 | 
						|
    for key, value in vars(options).items():
 | 
						|
        if key != "all" and value:
 | 
						|
            print("Generating screenshots for all integrations. Ignoring all command-line options")
 | 
						|
    for integration_name, screenshot_configs in DOC_SCREENSHOT_CONFIG.items():
 | 
						|
        for screenshot_config in screenshot_configs:
 | 
						|
            generate_screenshot_from_config(integration_name, screenshot_config)
 | 
						|
 | 
						|
elif options.fixture:
 | 
						|
    config = dict(
 | 
						|
        fixture_name=options.fixture,
 | 
						|
        use_basic_auth=options.use_basic_auth,
 | 
						|
        custom_headers=options.custom_headers,
 | 
						|
        payload_as_query_param=options.payload_as_query_param,
 | 
						|
    )
 | 
						|
    if options.image_name:
 | 
						|
        config["image_name"] = options.image_name
 | 
						|
    if options.image_dir:
 | 
						|
        config["image_dir"] = options.image_dir
 | 
						|
    if options.bot_name:
 | 
						|
        config["bot_name"] = options.bot_name
 | 
						|
    if options.payload_param_name:
 | 
						|
        config["payload_param_name"] = options.payload_param_name
 | 
						|
    screenshot_config = ScreenshotConfig(**config)
 | 
						|
    generate_screenshot_from_config(options.integration, screenshot_config)
 | 
						|
 | 
						|
elif options.integration in DOC_SCREENSHOT_CONFIG:
 | 
						|
    configs = DOC_SCREENSHOT_CONFIG[options.integration]
 | 
						|
    for screenshot_config in configs:
 | 
						|
        generate_screenshot_from_config(options.integration, screenshot_config)
 | 
						|
 | 
						|
else:
 | 
						|
    parser.error(
 | 
						|
        "Could not find configuration for integration. "
 | 
						|
        "You can specify a fixture file to use, using the --fixture flag. "
 | 
						|
        "Or add a configuration to zerver.lib.integrations.DOC_SCREENSHOT_CONFIG",
 | 
						|
    )
 |