From 42662f22c8bae433795406f61b2209880a820ba5 Mon Sep 17 00:00:00 2001 From: Anders Kaseorg Date: Thu, 16 Dec 2021 22:03:22 -0800 Subject: [PATCH] bitbucket3: Strengthen types using WildValue. Signed-off-by: Anders Kaseorg --- zerver/webhooks/bitbucket3/view.py | 186 ++++++++++++++++------------- 1 file changed, 100 insertions(+), 86 deletions(-) diff --git a/zerver/webhooks/bitbucket3/view.py b/zerver/webhooks/bitbucket3/view.py index bff9cb2243..d8775ac61c 100644 --- a/zerver/webhooks/bitbucket3/view.py +++ b/zerver/webhooks/bitbucket3/view.py @@ -1,6 +1,6 @@ import string from functools import partial -from typing import Any, Dict, List, Optional +from typing import Dict, List, Optional from django.http import HttpRequest, HttpResponse from typing_extensions import Protocol @@ -9,6 +9,7 @@ from zerver.decorator import webhook_view from zerver.lib.exceptions import UnsupportedWebhookEventType from zerver.lib.request import REQ, has_request_variables from zerver.lib.response import json_success +from zerver.lib.validator import WildValue, check_int, check_none_or, check_string, to_wild_value from zerver.lib.webhooks.common import check_send_webhook_message from zerver.lib.webhooks.git import ( CONTENT_MESSAGE_TEMPLATE, @@ -59,15 +60,16 @@ def fixture_to_headers(fixture_name: str) -> Dict[str, str]: return {} -def get_user_name(payload: Dict[str, Any]) -> str: +def get_user_name(payload: WildValue) -> str: user_name = "[{name}]({url})".format( - name=payload["actor"]["name"], url=payload["actor"]["links"]["self"][0]["href"] + name=payload["actor"]["name"].tame(check_string), + url=payload["actor"]["links"]["self"][0]["href"].tame(check_string), ) return user_name def ping_handler( - payload: Dict[str, Any], + payload: WildValue, branches: Optional[str], include_title: Optional[str], ) -> List[Dict[str, str]]: @@ -80,17 +82,19 @@ def ping_handler( def repo_comment_handler( - payload: Dict[str, Any], + payload: WildValue, action: str, branches: Optional[str], include_title: Optional[str], ) -> List[Dict[str, str]]: - repo_name = payload["repository"]["name"] + repo_name = payload["repository"]["name"].tame(check_string) subject = BITBUCKET_TOPIC_TEMPLATE.format(repository_name=repo_name) - sha = payload["commit"] - commit_url = payload["repository"]["links"]["self"][0]["href"][: -len("browse")] + sha = payload["commit"].tame(check_string) + commit_url = payload["repository"]["links"]["self"][0]["href"].tame(check_string)[ + : -len("browse") + ] commit_url += f"commits/{sha}" - message = payload["comment"]["text"] + message = payload["comment"]["text"].tame(check_string) if action == "deleted their comment": message = f"~~{message}~~" body = get_commits_comment_action_message( @@ -104,33 +108,35 @@ def repo_comment_handler( def repo_forked_handler( - payload: Dict[str, Any], + payload: WildValue, branches: Optional[str], include_title: Optional[str], ) -> List[Dict[str, str]]: - repo_name = payload["repository"]["origin"]["name"] + repo_name = payload["repository"]["origin"]["name"].tame(check_string) subject = BITBUCKET_TOPIC_TEMPLATE.format(repository_name=repo_name) body = BITBUCKET_FORK_BODY.format( - display_name=payload["actor"]["displayName"], + display_name=payload["actor"]["displayName"].tame(check_string), username=get_user_name(payload), - fork_name=payload["repository"]["name"], - fork_url=payload["repository"]["links"]["self"][0]["href"], + fork_name=payload["repository"]["name"].tame(check_string), + fork_url=payload["repository"]["links"]["self"][0]["href"].tame(check_string), ) return [{"subject": subject, "body": body}] def repo_modified_handler( - payload: Dict[str, Any], + payload: WildValue, branches: Optional[str], include_title: Optional[str], ) -> List[Dict[str, str]]: - subject_new = BITBUCKET_TOPIC_TEMPLATE.format(repository_name=payload["new"]["name"]) - new_name = payload["new"]["name"] + subject_new = BITBUCKET_TOPIC_TEMPLATE.format( + repository_name=payload["new"]["name"].tame(check_string) + ) + new_name = payload["new"]["name"].tame(check_string) body = BITBUCKET_REPO_UPDATED_CHANGED.format( actor=get_user_name(payload), change="name", - repo_name=payload["old"]["name"], - old=payload["old"]["name"], + repo_name=payload["old"]["name"].tame(check_string), + old=payload["old"]["name"].tame(check_string), new=new_name, ) # As of writing this, the only change we'd be notified about is a name change. punctuation = "." if new_name[-1] not in string.punctuation else "" @@ -138,12 +144,12 @@ def repo_modified_handler( return [{"subject": subject_new, "body": body}] -def repo_push_branch_data(payload: Dict[str, Any], change: Dict[str, Any]) -> Dict[str, str]: - event_type = change["type"] - repo_name = payload["repository"]["name"] +def repo_push_branch_data(payload: WildValue, change: WildValue) -> Dict[str, str]: + event_type = change["type"].tame(check_string) + repo_name = payload["repository"]["name"].tame(check_string) user_name = get_user_name(payload) - branch_name = change["ref"]["displayId"] - branch_head = change["toHash"] + branch_name = change["ref"]["displayId"].tame(check_string) + branch_head = change["toHash"].tame(check_string) if event_type == "ADD": body = get_create_branch_event_message( @@ -160,24 +166,24 @@ def repo_push_branch_data(payload: Dict[str, Any], change: Dict[str, Any]) -> Di elif event_type == "DELETE": body = get_remove_branch_event_message(user_name, branch_name) else: - message = "{}.{}".format(payload["eventKey"], event_type) # nocoverage + message = "{}.{}".format(payload["eventKey"].tame(check_string), event_type) # nocoverage raise UnsupportedWebhookEventType(message) subject = TOPIC_WITH_BRANCH_TEMPLATE.format(repo=repo_name, branch=branch_name) return {"subject": subject, "body": body} -def repo_push_tag_data(payload: Dict[str, Any], change: Dict[str, Any]) -> Dict[str, str]: - event_type = change["type"] - repo_name = payload["repository"]["name"] - tag_name = change["ref"]["displayId"] +def repo_push_tag_data(payload: WildValue, change: WildValue) -> Dict[str, str]: + event_type = change["type"].tame(check_string) + repo_name = payload["repository"]["name"].tame(check_string) + tag_name = change["ref"]["displayId"].tame(check_string) if event_type == "ADD": action = "pushed" elif event_type == "DELETE": action = "removed" else: - message = "{}.{}".format(payload["eventKey"], event_type) # nocoverage + message = "{}.{}".format(payload["eventKey"].tame(check_string), event_type) # nocoverage raise UnsupportedWebhookEventType(message) subject = BITBUCKET_TOPIC_TEMPLATE.format(repository_name=repo_name) @@ -186,15 +192,15 @@ def repo_push_tag_data(payload: Dict[str, Any], change: Dict[str, Any]) -> Dict[ def repo_push_handler( - payload: Dict[str, Any], + payload: WildValue, branches: Optional[str], include_title: Optional[str], ) -> List[Dict[str, str]]: data = [] for change in payload["changes"]: - event_target_type = change["ref"]["type"] + event_target_type = change["ref"]["type"].tame(check_string) if event_target_type == "BRANCH": - branch = change["ref"]["displayId"] + branch = change["ref"]["displayId"].tame(check_string) if branches: if branch not in branches: continue @@ -202,16 +208,18 @@ def repo_push_handler( elif event_target_type == "TAG": data.append(repo_push_tag_data(payload, change)) else: - message = "{}.{}".format(payload["eventKey"], event_target_type) # nocoverage + message = "{}.{}".format( + payload["eventKey"].tame(check_string), event_target_type + ) # nocoverage raise UnsupportedWebhookEventType(message) return data -def get_assignees_string(pr: Dict[str, Any]) -> Optional[str]: +def get_assignees_string(pr: WildValue) -> Optional[str]: reviewers = [] for reviewer in pr["reviewers"]: - name = reviewer["user"]["name"] - link = reviewer["user"]["links"]["self"][0]["href"] + name = reviewer["user"]["name"].tame(check_string) + link = reviewer["user"]["links"]["self"][0]["href"].tame(check_string) reviewers.append(f"[{name}]({link})") if len(reviewers) == 0: assignees = None @@ -222,26 +230,26 @@ def get_assignees_string(pr: Dict[str, Any]) -> Optional[str]: return assignees -def get_pr_subject(repo: str, type: str, id: str, title: str) -> str: +def get_pr_subject(repo: str, type: str, id: int, title: str) -> str: return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(repo=repo, type=type, id=id, title=title) -def get_simple_pr_body(payload: Dict[str, Any], action: str, include_title: Optional[str]) -> str: +def get_simple_pr_body(payload: WildValue, action: str, include_title: Optional[str]) -> str: pr = payload["pullRequest"] return get_pull_request_event_message( user_name=get_user_name(payload), action=action, - url=pr["links"]["self"][0]["href"], - number=pr["id"], - title=pr["title"] if include_title else None, + url=pr["links"]["self"][0]["href"].tame(check_string), + number=pr["id"].tame(check_int), + title=pr["title"].tame(check_string) if include_title else None, ) def get_pr_opened_or_modified_body( - payload: Dict[str, Any], action: str, include_title: Optional[str] + payload: WildValue, action: str, include_title: Optional[str] ) -> str: pr = payload["pullRequest"] - description = pr.get("description") + description = pr.get("description").tame(check_none_or(check_string)) assignees_string = get_assignees_string(pr) if assignees_string: # Then use the custom message template for this particular integration so that we can @@ -249,13 +257,13 @@ def get_pr_opened_or_modified_body( parameters = { "user_name": get_user_name(payload), "action": action, - "url": pr["links"]["self"][0]["href"], - "number": pr["id"], - "source": pr["fromRef"]["displayId"], - "destination": pr["toRef"]["displayId"], + "url": pr["links"]["self"][0]["href"].tame(check_string), + "number": pr["id"].tame(check_int), + "source": pr["fromRef"]["displayId"].tame(check_string), + "destination": pr["toRef"]["displayId"].tame(check_string), "message": description, "assignees": assignees_string, - "title": pr["title"] if include_title else None, + "title": pr["title"].tame(check_string) if include_title else None, } if include_title: body = PULL_REQUEST_OPENED_OR_MODIFIED_TEMPLATE_WITH_REVIEWERS_WITH_TITLE.format( @@ -271,76 +279,79 @@ def get_pr_opened_or_modified_body( return get_pull_request_event_message( user_name=get_user_name(payload), action=action, - url=pr["links"]["self"][0]["href"], - number=pr["id"], - target_branch=pr["fromRef"]["displayId"], - base_branch=pr["toRef"]["displayId"], - message=pr.get("description"), + url=pr["links"]["self"][0]["href"].tame(check_string), + number=pr["id"].tame(check_int), + target_branch=pr["fromRef"]["displayId"].tame(check_string), + base_branch=pr["toRef"]["displayId"].tame(check_string), + message=description, assignee=assignees_string if assignees_string else None, - title=pr["title"] if include_title else None, + title=pr["title"].tame(check_string) if include_title else None, ) -def get_pr_needs_work_body(payload: Dict[str, Any], include_title: Optional[str]) -> str: +def get_pr_needs_work_body(payload: WildValue, include_title: Optional[str]) -> str: pr = payload["pullRequest"] if not include_title: return PULL_REQUEST_MARKED_AS_NEEDS_WORK_TEMPLATE.format( user_name=get_user_name(payload), - number=pr["id"], - url=pr["links"]["self"][0]["href"], + number=pr["id"].tame(check_int), + url=pr["links"]["self"][0]["href"].tame(check_string), ) return PULL_REQUEST_MARKED_AS_NEEDS_WORK_TEMPLATE_WITH_TITLE.format( user_name=get_user_name(payload), - number=pr["id"], - url=pr["links"]["self"][0]["href"], - title=pr["title"], + number=pr["id"].tame(check_int), + url=pr["links"]["self"][0]["href"].tame(check_string), + title=pr["title"].tame(check_string), ) -def get_pr_reassigned_body(payload: Dict[str, Any], include_title: Optional[str]) -> str: +def get_pr_reassigned_body(payload: WildValue, include_title: Optional[str]) -> str: pr = payload["pullRequest"] assignees_string = get_assignees_string(pr) if not assignees_string: if not include_title: return PULL_REQUEST_REASSIGNED_TO_NONE_TEMPLATE.format( user_name=get_user_name(payload), - number=pr["id"], - url=pr["links"]["self"][0]["href"], + number=pr["id"].tame(check_int), + url=pr["links"]["self"][0]["href"].tame(check_string), ) - punctuation = "." if pr["title"][-1] not in string.punctuation else "" + punctuation = "." if pr["title"].tame(check_string)[-1] not in string.punctuation else "" message = PULL_REQUEST_REASSIGNED_TO_NONE_TEMPLATE_WITH_TITLE.format( user_name=get_user_name(payload), - number=pr["id"], - url=pr["links"]["self"][0]["href"], - title=pr["title"], + number=pr["id"].tame(check_int), + url=pr["links"]["self"][0]["href"].tame(check_string), + title=pr["title"].tame(check_string), ) message = f"{message}{punctuation}" return message if not include_title: return PULL_REQUEST_REASSIGNED_TEMPLATE.format( user_name=get_user_name(payload), - number=pr["id"], - url=pr["links"]["self"][0]["href"], + number=pr["id"].tame(check_int), + url=pr["links"]["self"][0]["href"].tame(check_string), assignees=assignees_string, ) return PULL_REQUEST_REASSIGNED_TEMPLATE_WITH_TITLE.format( user_name=get_user_name(payload), - number=pr["id"], - url=pr["links"]["self"][0]["href"], + number=pr["id"].tame(check_int), + url=pr["links"]["self"][0]["href"].tame(check_string), assignees=assignees_string, - title=pr["title"], + title=pr["title"].tame(check_string), ) def pr_handler( - payload: Dict[str, Any], + payload: WildValue, action: str, branches: Optional[str], include_title: Optional[str], ) -> List[Dict[str, str]]: pr = payload["pullRequest"] subject = get_pr_subject( - pr["toRef"]["repository"]["name"], type="PR", id=pr["id"], title=pr["title"] + pr["toRef"]["repository"]["name"].tame(check_string), + type="PR", + id=pr["id"].tame(check_int), + title=pr["title"].tame(check_string), ) if action in ["opened", "modified"]: body = get_pr_opened_or_modified_body(payload, action, include_title) @@ -355,25 +366,28 @@ def pr_handler( def pr_comment_handler( - payload: Dict[str, Any], + payload: WildValue, action: str, branches: Optional[str], include_title: Optional[str], ) -> List[Dict[str, str]]: pr = payload["pullRequest"] subject = get_pr_subject( - pr["toRef"]["repository"]["name"], type="PR", id=pr["id"], title=pr["title"] + pr["toRef"]["repository"]["name"].tame(check_string), + type="PR", + id=pr["id"].tame(check_int), + title=pr["title"].tame(check_string), ) - message = payload["comment"]["text"] + message = payload["comment"]["text"].tame(check_string) if action == "deleted their comment on": message = f"~~{message}~~" body = get_pull_request_event_message( user_name=get_user_name(payload), action=action, - url=pr["links"]["self"][0]["href"], - number=pr["id"], + url=pr["links"]["self"][0]["href"].tame(check_string), + number=pr["id"].tame(check_int), message=message, - title=pr["title"] if include_title else None, + title=pr["title"].tame(check_string) if include_title else None, ) return [{"subject": subject, "body": body}] @@ -381,7 +395,7 @@ def pr_comment_handler( class EventHandler(Protocol): def __call__( - self, payload: Dict[str, Any], branches: Optional[str], include_title: Optional[str] + self, payload: WildValue, branches: Optional[str], include_title: Optional[str] ) -> List[Dict[str, str]]: ... @@ -416,13 +430,13 @@ ALL_EVENT_TYPES = list(EVENT_HANDLER_MAP.keys()) def api_bitbucket3_webhook( request: HttpRequest, user_profile: UserProfile, - payload: Dict[str, Any] = REQ(argument_type="body"), + payload: WildValue = REQ(argument_type="body", converter=to_wild_value), branches: Optional[str] = REQ(default=None), user_specified_topic: Optional[str] = REQ("topic", default=None), ) -> HttpResponse: - try: - eventkey = payload["eventKey"] - except KeyError: + if "eventKey" in payload: + eventkey = payload["eventKey"].tame(check_string) + else: eventkey = request.META["HTTP_X_EVENT_KEY"] handler = EVENT_HANDLER_MAP.get(eventkey) if handler is None: