#!/usr/bin/env python3 """Create or update a webhook integration screenshot using a test fixture.""" import argparse import base64 import os import re import subprocess import sys from typing import Any from urllib.parse import parse_qsl, urlencode import zulip SCREENSHOTS_DIR = os.path.abspath(os.path.dirname(__file__)) TOOLS_DIR = os.path.abspath(os.path.dirname(SCREENSHOTS_DIR)) ROOT_DIR = os.path.dirname(TOOLS_DIR) sys.path.insert(0, ROOT_DIR) # check for the venv from tools.lib import sanity_check sanity_check.check_venv(__file__) from scripts.lib.setup_path import setup_path setup_path() os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings" import django django.setup() import orjson import requests from scripts.lib.zulip_tools import BOLDRED, ENDC from tools.lib.test_script import prepare_puppeteer_run from zerver.actions.create_user import do_create_user, notify_created_bot from zerver.actions.streams import bulk_add_subscriptions from zerver.actions.user_settings import do_change_avatar_fields from zerver.lib.integrations import ( DOC_SCREENSHOT_CONFIG, FIXTURELESS_SCREENSHOT_CONFIG, INTEGRATIONS, WEBHOOK_SCREENSHOT_CONFIG, FixturelessScreenshotConfig, Integration, WebhookIntegration, WebhookScreenshotConfig, get_fixture_path, get_image_path, split_fixture_path, ) from zerver.lib.storage import static_path from zerver.lib.streams import create_stream_if_needed from zerver.lib.upload import upload_avatar_image from zerver.lib.webhooks.common import get_fixture_http_headers, parse_multipart_string from zerver.models import Message, UserProfile from zerver.models.realms import get_realm from zerver.models.users import get_user_by_delivery_email def create_integration_bot(integration: Integration, bot_name: str | None = None) -> UserProfile: realm = get_realm("zulip") owner = get_user_by_delivery_email("iago@zulip.com", realm) email_prefix = ( re.sub(r"[^a-zA-Z0-9_-]", "", bot_name.strip()).lower()[:64] if bot_name else integration.name ) bot_email = f"{email_prefix}-bot@example.com" if bot_name is None: bot_name = f"{integration.display_name} Bot" try: bot = UserProfile.objects.get(email=bot_email) except UserProfile.DoesNotExist: bot = do_create_user( email=bot_email, password="123", realm=owner.realm, full_name=bot_name, bot_type=UserProfile.INCOMING_WEBHOOK_BOT, bot_owner=owner, acting_user=owner, ) notify_created_bot(bot) bot_avatar_path = integration.get_bot_avatar_path() if bot_avatar_path is not None: bot_avatar_path = static_path(bot_avatar_path) if os.path.isfile(bot_avatar_path): with open(bot_avatar_path, "rb") as f: upload_avatar_image(f, bot) do_change_avatar_fields(bot, UserProfile.AVATAR_FROM_USER, acting_user=owner) return bot def create_integration_stream(integration: Integration, bot: UserProfile) -> None: assert isinstance(bot.bot_owner, UserProfile) realm = bot.bot_owner.realm stream, _created = create_stream_if_needed(realm, integration.stream_name) bulk_add_subscriptions(realm, [stream], [bot, bot.bot_owner], acting_user=bot) def get_fixture_info(fixture_path: str) -> tuple[Any, bool, bool, str]: json_fixture = fixture_path.endswith(".json") multipart_fixture = fixture_path.endswith(".multipart") _, fixture_name = split_fixture_path(fixture_path) if fixture_name: if not os.path.exists(fixture_path): raise FileNotFoundError( f"{BOLDRED}The fixture '{fixture_path}' does not exist. Please check the fixture file name and try again.{ENDC}" ) if json_fixture: with open(fixture_path, "rb") as fb: data = orjson.loads(fb.read()) else: with open(fixture_path) as f: data = f.read().strip() if multipart_fixture: data = parse_multipart_string(data) else: data = "" return data, json_fixture, multipart_fixture, fixture_name def get_requests_headers(integration_dir_name: str, fixture_name: str) -> dict[str, Any]: headers = get_fixture_http_headers(integration_dir_name, fixture_name) def fix_name(header: str) -> str: return header.removeprefix("HTTP_").replace("_", "-") return {fix_name(k): v for k, v in headers.items()} def custom_headers(headers_json: str) -> dict[str, str]: if not headers_json: return {} try: return orjson.loads(headers_json) except orjson.JSONDecodeError as ve: raise argparse.ArgumentTypeError( f"Encountered an error while attempting to parse custom headers: {ve}\n" "Note: all strings must be enclosed within \"\" instead of ''" ) def send_bot_payload_message( bot: UserProfile, integration: WebhookIntegration, fixture_path: str, config: WebhookScreenshotConfig, ) -> bool: # Delete all messages, so new message is the only one it's message group Message.objects.filter(realm_id=bot.realm_id, sender=bot).delete() data, json_fixture, multipart_fixture, fixture_name = get_fixture_info(fixture_path) headers = get_requests_headers(integration.dir_name, fixture_name) if config.custom_headers: headers.update(config.custom_headers) if config.use_basic_auth: credentials = base64.b64encode(f"{bot.email}:{bot.api_key}".encode()).decode() auth = f"basic {credentials}" headers.update(dict(Authorization=auth)) assert isinstance(bot.bot_owner, UserProfile) stream = integration.stream_name or "devel" url = f"{bot.bot_owner.realm.url}/{integration.url}" params = {"api_key": bot.api_key, "stream": stream} params.update(config.extra_params) extra_args = {} if multipart_fixture: extra_args = {"data": data} elif not json_fixture and data: assert isinstance(data, str) # fixtures with url parameters if "&" in data: # Overwrite the fixture params, in case of overlap. parsed_params = dict(parse_qsl(data)) parsed_params.update(params) params = parsed_params # fixtures with plain/text payload else: extra_args = {"data": data} elif config.payload_as_query_param: params[config.payload_param_name] = orjson.dumps(data).decode() else: extra_args = {"json": data} url = f"{url}?{urlencode(params)}" try: response = requests.post(url=url, headers=headers, **extra_args) except requests.exceptions.ConnectionError: print( "This tool needs the local dev server to be running. " "Please start it using tools/run-dev before running this tool." ) sys.exit(1) if response.status_code != 200: print(response.json()) print("Failed to trigger webhook") return False else: print(f"Triggered {integration.name} webhook") return True def send_bot_mock_message(bot: UserProfile, channel: str, topic: str, message: str) -> None: # Delete all messages, so that the new message is the only one in its message group Message.objects.filter(realm_id=bot.realm_id, sender=bot).delete() assert bot.bot_owner is not None url = f"{bot.bot_owner.realm.url}" client = zulip.Client(email=bot.email, api_key=bot.api_key, site=url) request = {"type": "stream", "to": channel, "topic": topic, "content": message} client.send_message(request) def capture_last_message_screenshot(bot: UserProfile, image_path: str) -> None: message = Message.objects.filter(realm_id=bot.realm_id, sender=bot).last() realm = get_realm("zulip") if message is None: print(f"No message found for {bot.full_name}") return message_id = str(message.id) screenshot_script = os.path.join(SCREENSHOTS_DIR, "message-screenshot.ts") subprocess.check_call(["node", screenshot_script, message_id, image_path, realm.url]) def generate_screenshot_from_config( integration_name: str, screenshot_config: WebhookScreenshotConfig | FixturelessScreenshotConfig ) -> None: integration = INTEGRATIONS[integration_name] bot = create_integration_bot(integration, screenshot_config.bot_name) create_integration_stream(integration, bot) image_path = get_image_path(integration, screenshot_config) if isinstance(integration, WebhookIntegration): assert isinstance(screenshot_config, WebhookScreenshotConfig) fixture_path = get_fixture_path(integration, screenshot_config) message_sent = send_bot_payload_message(bot, integration, fixture_path, screenshot_config) else: assert isinstance(screenshot_config, FixturelessScreenshotConfig) create_integration_stream(integration, bot) send_bot_mock_message( bot, channel=screenshot_config.channel or integration.stream_name, topic=screenshot_config.topic, message=screenshot_config.message, ) message_sent = True if message_sent: capture_last_message_screenshot(bot, image_path) print(f"Screenshot captured to: {BOLDRED}{image_path}{ENDC}") parser = argparse.ArgumentParser() batch_group = parser.add_mutually_exclusive_group(required=True) batch_group.add_argument("--all", action="store_true") batch_group.add_argument("--all-webhook", action="store_true") batch_group.add_argument("--all-fixtureless", action="store_true") batch_group.add_argument( "--skip-until", help="Name of the integration to start processing from, including all that follow in alphabetical order. Similar to --all", ) batch_group.add_argument("--integration", nargs="+", help="Names of specific integrations") common_group = parser.add_argument_group() common_group.add_argument("--image-name", help="Name for the screenshot image") common_group.add_argument("--image-dir", help="Directory name where to save the screenshot image") common_group.add_argument("--bot-name", help="Name to use for the bot") webhook_group = parser.add_argument_group("Webhook integrations options") webhook_group.add_argument( "--fixture-name", help="Name of the fixture file to use. All other webhook options are ignored if the fixture is not specified.", ) webhook_group.add_argument( "-A", "--use-basic-auth", action="store_true", help="Add basic auth headers to the request" ) webhook_group.add_argument( "-Q", "--payload-as-query-param", action="store_true", help="Send payload as query param instead of body", ) webhook_group.add_argument("-P", "--payload-param-name", help="Param name to use for the payload") webhook_group.add_argument( "-H", "--custom-headers", type=custom_headers, help="Any additional headers to be sent with the request.", ) fixtureless_group = parser.add_argument_group("Fixtureless non-webhook integrations options") fixtureless_group.add_argument("-T", "--topic", help="Topic to use for the mock message") fixtureless_group.add_argument("-M", "--message", help="Message to use for the mock message") fixtureless_group.add_argument( "-C", "--channel", help="Channel name to use for the mock message. Defaults to stream name. Ignored if --topic and --message are not specified.", ) options = parser.parse_args() prepare_puppeteer_run() def validate_integration_count( options: argparse.Namespace, parser: argparse.ArgumentParser, option_name: str ) -> None: if len(options.integration) != 1: parser.error( f"Exactly one integration should be specified with --integration when --{option_name} is provided" ) def build_config(options: argparse.Namespace, config_keys: list[str]) -> dict[str, Any]: config = { key: getattr(options, key) for key in config_keys if getattr(options, key) is not None } return config def generate_full_batch_screenshots( option_key: str, config_dict: dict[str, list[WebhookScreenshotConfig]] | dict[str, list[FixturelessScreenshotConfig]] | dict[str, list[WebhookScreenshotConfig] | list[FixturelessScreenshotConfig]], batch_type: str, ) -> None: for key, value in vars(options).items(): if key != option_key and value: print(f"Generating screenshots for {batch_type}. Ignoring all command-line options") break for integration_name, screenshot_configs in config_dict.items(): for screenshot_config in screenshot_configs: generate_screenshot_from_config(integration_name, screenshot_config) if options.all: generate_full_batch_screenshots("all", DOC_SCREENSHOT_CONFIG, "all integrations") elif options.all_webhook: generate_full_batch_screenshots( "all_webhook", WEBHOOK_SCREENSHOT_CONFIG, "all webhook integrations" ) elif options.all_fixtureless: generate_full_batch_screenshots( "all_fixtureless", FIXTURELESS_SCREENSHOT_CONFIG, "all fixtureless integrations" ) elif options.skip_until: for key, value in vars(options).items(): if key != "skip_until" and value: print( f"Generating screenshots for all integrations starting from {options.skip_until}. Ignoring all command-line options" ) break skip = True for integration_name, screenshot_configs in DOC_SCREENSHOT_CONFIG.items(): if integration_name == options.skip_until: skip = False if skip: continue for screenshot_config in screenshot_configs: generate_screenshot_from_config(integration_name, screenshot_config) elif options.fixture_name: validate_integration_count(options, parser, "fixture-name") webhook_options = [action.dest for action in webhook_group._group_actions] common_options = [action.dest for action in common_group._group_actions] config = build_config(options, webhook_options + common_options) screenshot_config = WebhookScreenshotConfig(**config) generate_screenshot_from_config(options.integration[0], screenshot_config) elif options.topic or options.message: if not options.topic or not options.message: parser.error("Both --topic and --message must be specified together") validate_integration_count(options, parser, "topic") fixtureless_options = [action.dest for action in fixtureless_group._group_actions] common_options = [action.dest for action in common_group._group_actions] config = build_config(options, fixtureless_options + common_options) fixtureless_screenshot_config = FixturelessScreenshotConfig(**config) generate_screenshot_from_config(options.integration[0], fixtureless_screenshot_config) elif options.integration: for integration in options.integration: assert integration in DOC_SCREENSHOT_CONFIG configs = DOC_SCREENSHOT_CONFIG[integration] for screenshot_config in configs: generate_screenshot_from_config(integration, screenshot_config) else: parser.error( "Could not find configuration for integration. " "You can specify a fixture file to use, using the --fixture flag. " "Or add a configuration to zerver.lib.integrations.DOC_SCREENSHOT_CONFIG", )