mirror of
https://github.com/zulip/zulip.git
synced 2025-11-03 05:23:35 +00:00
bitbucket3: Strengthen types using WildValue.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
committed by
Tim Abbott
parent
6528538188
commit
42662f22c8
@@ -1,6 +1,6 @@
|
||||
import string
|
||||
from functools import partial
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from typing_extensions import Protocol
|
||||
@@ -9,6 +9,7 @@ from zerver.decorator import webhook_view
|
||||
from zerver.lib.exceptions import UnsupportedWebhookEventType
|
||||
from zerver.lib.request import REQ, has_request_variables
|
||||
from zerver.lib.response import json_success
|
||||
from zerver.lib.validator import WildValue, check_int, check_none_or, check_string, to_wild_value
|
||||
from zerver.lib.webhooks.common import check_send_webhook_message
|
||||
from zerver.lib.webhooks.git import (
|
||||
CONTENT_MESSAGE_TEMPLATE,
|
||||
@@ -59,15 +60,16 @@ def fixture_to_headers(fixture_name: str) -> Dict[str, str]:
|
||||
return {}
|
||||
|
||||
|
||||
def get_user_name(payload: Dict[str, Any]) -> str:
|
||||
def get_user_name(payload: WildValue) -> str:
|
||||
user_name = "[{name}]({url})".format(
|
||||
name=payload["actor"]["name"], url=payload["actor"]["links"]["self"][0]["href"]
|
||||
name=payload["actor"]["name"].tame(check_string),
|
||||
url=payload["actor"]["links"]["self"][0]["href"].tame(check_string),
|
||||
)
|
||||
return user_name
|
||||
|
||||
|
||||
def ping_handler(
|
||||
payload: Dict[str, Any],
|
||||
payload: WildValue,
|
||||
branches: Optional[str],
|
||||
include_title: Optional[str],
|
||||
) -> List[Dict[str, str]]:
|
||||
@@ -80,17 +82,19 @@ def ping_handler(
|
||||
|
||||
|
||||
def repo_comment_handler(
|
||||
payload: Dict[str, Any],
|
||||
payload: WildValue,
|
||||
action: str,
|
||||
branches: Optional[str],
|
||||
include_title: Optional[str],
|
||||
) -> List[Dict[str, str]]:
|
||||
repo_name = payload["repository"]["name"]
|
||||
repo_name = payload["repository"]["name"].tame(check_string)
|
||||
subject = BITBUCKET_TOPIC_TEMPLATE.format(repository_name=repo_name)
|
||||
sha = payload["commit"]
|
||||
commit_url = payload["repository"]["links"]["self"][0]["href"][: -len("browse")]
|
||||
sha = payload["commit"].tame(check_string)
|
||||
commit_url = payload["repository"]["links"]["self"][0]["href"].tame(check_string)[
|
||||
: -len("browse")
|
||||
]
|
||||
commit_url += f"commits/{sha}"
|
||||
message = payload["comment"]["text"]
|
||||
message = payload["comment"]["text"].tame(check_string)
|
||||
if action == "deleted their comment":
|
||||
message = f"~~{message}~~"
|
||||
body = get_commits_comment_action_message(
|
||||
@@ -104,33 +108,35 @@ def repo_comment_handler(
|
||||
|
||||
|
||||
def repo_forked_handler(
|
||||
payload: Dict[str, Any],
|
||||
payload: WildValue,
|
||||
branches: Optional[str],
|
||||
include_title: Optional[str],
|
||||
) -> List[Dict[str, str]]:
|
||||
repo_name = payload["repository"]["origin"]["name"]
|
||||
repo_name = payload["repository"]["origin"]["name"].tame(check_string)
|
||||
subject = BITBUCKET_TOPIC_TEMPLATE.format(repository_name=repo_name)
|
||||
body = BITBUCKET_FORK_BODY.format(
|
||||
display_name=payload["actor"]["displayName"],
|
||||
display_name=payload["actor"]["displayName"].tame(check_string),
|
||||
username=get_user_name(payload),
|
||||
fork_name=payload["repository"]["name"],
|
||||
fork_url=payload["repository"]["links"]["self"][0]["href"],
|
||||
fork_name=payload["repository"]["name"].tame(check_string),
|
||||
fork_url=payload["repository"]["links"]["self"][0]["href"].tame(check_string),
|
||||
)
|
||||
return [{"subject": subject, "body": body}]
|
||||
|
||||
|
||||
def repo_modified_handler(
|
||||
payload: Dict[str, Any],
|
||||
payload: WildValue,
|
||||
branches: Optional[str],
|
||||
include_title: Optional[str],
|
||||
) -> List[Dict[str, str]]:
|
||||
subject_new = BITBUCKET_TOPIC_TEMPLATE.format(repository_name=payload["new"]["name"])
|
||||
new_name = payload["new"]["name"]
|
||||
subject_new = BITBUCKET_TOPIC_TEMPLATE.format(
|
||||
repository_name=payload["new"]["name"].tame(check_string)
|
||||
)
|
||||
new_name = payload["new"]["name"].tame(check_string)
|
||||
body = BITBUCKET_REPO_UPDATED_CHANGED.format(
|
||||
actor=get_user_name(payload),
|
||||
change="name",
|
||||
repo_name=payload["old"]["name"],
|
||||
old=payload["old"]["name"],
|
||||
repo_name=payload["old"]["name"].tame(check_string),
|
||||
old=payload["old"]["name"].tame(check_string),
|
||||
new=new_name,
|
||||
) # As of writing this, the only change we'd be notified about is a name change.
|
||||
punctuation = "." if new_name[-1] not in string.punctuation else ""
|
||||
@@ -138,12 +144,12 @@ def repo_modified_handler(
|
||||
return [{"subject": subject_new, "body": body}]
|
||||
|
||||
|
||||
def repo_push_branch_data(payload: Dict[str, Any], change: Dict[str, Any]) -> Dict[str, str]:
|
||||
event_type = change["type"]
|
||||
repo_name = payload["repository"]["name"]
|
||||
def repo_push_branch_data(payload: WildValue, change: WildValue) -> Dict[str, str]:
|
||||
event_type = change["type"].tame(check_string)
|
||||
repo_name = payload["repository"]["name"].tame(check_string)
|
||||
user_name = get_user_name(payload)
|
||||
branch_name = change["ref"]["displayId"]
|
||||
branch_head = change["toHash"]
|
||||
branch_name = change["ref"]["displayId"].tame(check_string)
|
||||
branch_head = change["toHash"].tame(check_string)
|
||||
|
||||
if event_type == "ADD":
|
||||
body = get_create_branch_event_message(
|
||||
@@ -160,24 +166,24 @@ def repo_push_branch_data(payload: Dict[str, Any], change: Dict[str, Any]) -> Di
|
||||
elif event_type == "DELETE":
|
||||
body = get_remove_branch_event_message(user_name, branch_name)
|
||||
else:
|
||||
message = "{}.{}".format(payload["eventKey"], event_type) # nocoverage
|
||||
message = "{}.{}".format(payload["eventKey"].tame(check_string), event_type) # nocoverage
|
||||
raise UnsupportedWebhookEventType(message)
|
||||
|
||||
subject = TOPIC_WITH_BRANCH_TEMPLATE.format(repo=repo_name, branch=branch_name)
|
||||
return {"subject": subject, "body": body}
|
||||
|
||||
|
||||
def repo_push_tag_data(payload: Dict[str, Any], change: Dict[str, Any]) -> Dict[str, str]:
|
||||
event_type = change["type"]
|
||||
repo_name = payload["repository"]["name"]
|
||||
tag_name = change["ref"]["displayId"]
|
||||
def repo_push_tag_data(payload: WildValue, change: WildValue) -> Dict[str, str]:
|
||||
event_type = change["type"].tame(check_string)
|
||||
repo_name = payload["repository"]["name"].tame(check_string)
|
||||
tag_name = change["ref"]["displayId"].tame(check_string)
|
||||
|
||||
if event_type == "ADD":
|
||||
action = "pushed"
|
||||
elif event_type == "DELETE":
|
||||
action = "removed"
|
||||
else:
|
||||
message = "{}.{}".format(payload["eventKey"], event_type) # nocoverage
|
||||
message = "{}.{}".format(payload["eventKey"].tame(check_string), event_type) # nocoverage
|
||||
raise UnsupportedWebhookEventType(message)
|
||||
|
||||
subject = BITBUCKET_TOPIC_TEMPLATE.format(repository_name=repo_name)
|
||||
@@ -186,15 +192,15 @@ def repo_push_tag_data(payload: Dict[str, Any], change: Dict[str, Any]) -> Dict[
|
||||
|
||||
|
||||
def repo_push_handler(
|
||||
payload: Dict[str, Any],
|
||||
payload: WildValue,
|
||||
branches: Optional[str],
|
||||
include_title: Optional[str],
|
||||
) -> List[Dict[str, str]]:
|
||||
data = []
|
||||
for change in payload["changes"]:
|
||||
event_target_type = change["ref"]["type"]
|
||||
event_target_type = change["ref"]["type"].tame(check_string)
|
||||
if event_target_type == "BRANCH":
|
||||
branch = change["ref"]["displayId"]
|
||||
branch = change["ref"]["displayId"].tame(check_string)
|
||||
if branches:
|
||||
if branch not in branches:
|
||||
continue
|
||||
@@ -202,16 +208,18 @@ def repo_push_handler(
|
||||
elif event_target_type == "TAG":
|
||||
data.append(repo_push_tag_data(payload, change))
|
||||
else:
|
||||
message = "{}.{}".format(payload["eventKey"], event_target_type) # nocoverage
|
||||
message = "{}.{}".format(
|
||||
payload["eventKey"].tame(check_string), event_target_type
|
||||
) # nocoverage
|
||||
raise UnsupportedWebhookEventType(message)
|
||||
return data
|
||||
|
||||
|
||||
def get_assignees_string(pr: Dict[str, Any]) -> Optional[str]:
|
||||
def get_assignees_string(pr: WildValue) -> Optional[str]:
|
||||
reviewers = []
|
||||
for reviewer in pr["reviewers"]:
|
||||
name = reviewer["user"]["name"]
|
||||
link = reviewer["user"]["links"]["self"][0]["href"]
|
||||
name = reviewer["user"]["name"].tame(check_string)
|
||||
link = reviewer["user"]["links"]["self"][0]["href"].tame(check_string)
|
||||
reviewers.append(f"[{name}]({link})")
|
||||
if len(reviewers) == 0:
|
||||
assignees = None
|
||||
@@ -222,26 +230,26 @@ def get_assignees_string(pr: Dict[str, Any]) -> Optional[str]:
|
||||
return assignees
|
||||
|
||||
|
||||
def get_pr_subject(repo: str, type: str, id: str, title: str) -> str:
|
||||
def get_pr_subject(repo: str, type: str, id: int, title: str) -> str:
|
||||
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(repo=repo, type=type, id=id, title=title)
|
||||
|
||||
|
||||
def get_simple_pr_body(payload: Dict[str, Any], action: str, include_title: Optional[str]) -> str:
|
||||
def get_simple_pr_body(payload: WildValue, action: str, include_title: Optional[str]) -> str:
|
||||
pr = payload["pullRequest"]
|
||||
return get_pull_request_event_message(
|
||||
user_name=get_user_name(payload),
|
||||
action=action,
|
||||
url=pr["links"]["self"][0]["href"],
|
||||
number=pr["id"],
|
||||
title=pr["title"] if include_title else None,
|
||||
url=pr["links"]["self"][0]["href"].tame(check_string),
|
||||
number=pr["id"].tame(check_int),
|
||||
title=pr["title"].tame(check_string) if include_title else None,
|
||||
)
|
||||
|
||||
|
||||
def get_pr_opened_or_modified_body(
|
||||
payload: Dict[str, Any], action: str, include_title: Optional[str]
|
||||
payload: WildValue, action: str, include_title: Optional[str]
|
||||
) -> str:
|
||||
pr = payload["pullRequest"]
|
||||
description = pr.get("description")
|
||||
description = pr.get("description").tame(check_none_or(check_string))
|
||||
assignees_string = get_assignees_string(pr)
|
||||
if assignees_string:
|
||||
# Then use the custom message template for this particular integration so that we can
|
||||
@@ -249,13 +257,13 @@ def get_pr_opened_or_modified_body(
|
||||
parameters = {
|
||||
"user_name": get_user_name(payload),
|
||||
"action": action,
|
||||
"url": pr["links"]["self"][0]["href"],
|
||||
"number": pr["id"],
|
||||
"source": pr["fromRef"]["displayId"],
|
||||
"destination": pr["toRef"]["displayId"],
|
||||
"url": pr["links"]["self"][0]["href"].tame(check_string),
|
||||
"number": pr["id"].tame(check_int),
|
||||
"source": pr["fromRef"]["displayId"].tame(check_string),
|
||||
"destination": pr["toRef"]["displayId"].tame(check_string),
|
||||
"message": description,
|
||||
"assignees": assignees_string,
|
||||
"title": pr["title"] if include_title else None,
|
||||
"title": pr["title"].tame(check_string) if include_title else None,
|
||||
}
|
||||
if include_title:
|
||||
body = PULL_REQUEST_OPENED_OR_MODIFIED_TEMPLATE_WITH_REVIEWERS_WITH_TITLE.format(
|
||||
@@ -271,76 +279,79 @@ def get_pr_opened_or_modified_body(
|
||||
return get_pull_request_event_message(
|
||||
user_name=get_user_name(payload),
|
||||
action=action,
|
||||
url=pr["links"]["self"][0]["href"],
|
||||
number=pr["id"],
|
||||
target_branch=pr["fromRef"]["displayId"],
|
||||
base_branch=pr["toRef"]["displayId"],
|
||||
message=pr.get("description"),
|
||||
url=pr["links"]["self"][0]["href"].tame(check_string),
|
||||
number=pr["id"].tame(check_int),
|
||||
target_branch=pr["fromRef"]["displayId"].tame(check_string),
|
||||
base_branch=pr["toRef"]["displayId"].tame(check_string),
|
||||
message=description,
|
||||
assignee=assignees_string if assignees_string else None,
|
||||
title=pr["title"] if include_title else None,
|
||||
title=pr["title"].tame(check_string) if include_title else None,
|
||||
)
|
||||
|
||||
|
||||
def get_pr_needs_work_body(payload: Dict[str, Any], include_title: Optional[str]) -> str:
|
||||
def get_pr_needs_work_body(payload: WildValue, include_title: Optional[str]) -> str:
|
||||
pr = payload["pullRequest"]
|
||||
if not include_title:
|
||||
return PULL_REQUEST_MARKED_AS_NEEDS_WORK_TEMPLATE.format(
|
||||
user_name=get_user_name(payload),
|
||||
number=pr["id"],
|
||||
url=pr["links"]["self"][0]["href"],
|
||||
number=pr["id"].tame(check_int),
|
||||
url=pr["links"]["self"][0]["href"].tame(check_string),
|
||||
)
|
||||
return PULL_REQUEST_MARKED_AS_NEEDS_WORK_TEMPLATE_WITH_TITLE.format(
|
||||
user_name=get_user_name(payload),
|
||||
number=pr["id"],
|
||||
url=pr["links"]["self"][0]["href"],
|
||||
title=pr["title"],
|
||||
number=pr["id"].tame(check_int),
|
||||
url=pr["links"]["self"][0]["href"].tame(check_string),
|
||||
title=pr["title"].tame(check_string),
|
||||
)
|
||||
|
||||
|
||||
def get_pr_reassigned_body(payload: Dict[str, Any], include_title: Optional[str]) -> str:
|
||||
def get_pr_reassigned_body(payload: WildValue, include_title: Optional[str]) -> str:
|
||||
pr = payload["pullRequest"]
|
||||
assignees_string = get_assignees_string(pr)
|
||||
if not assignees_string:
|
||||
if not include_title:
|
||||
return PULL_REQUEST_REASSIGNED_TO_NONE_TEMPLATE.format(
|
||||
user_name=get_user_name(payload),
|
||||
number=pr["id"],
|
||||
url=pr["links"]["self"][0]["href"],
|
||||
number=pr["id"].tame(check_int),
|
||||
url=pr["links"]["self"][0]["href"].tame(check_string),
|
||||
)
|
||||
punctuation = "." if pr["title"][-1] not in string.punctuation else ""
|
||||
punctuation = "." if pr["title"].tame(check_string)[-1] not in string.punctuation else ""
|
||||
message = PULL_REQUEST_REASSIGNED_TO_NONE_TEMPLATE_WITH_TITLE.format(
|
||||
user_name=get_user_name(payload),
|
||||
number=pr["id"],
|
||||
url=pr["links"]["self"][0]["href"],
|
||||
title=pr["title"],
|
||||
number=pr["id"].tame(check_int),
|
||||
url=pr["links"]["self"][0]["href"].tame(check_string),
|
||||
title=pr["title"].tame(check_string),
|
||||
)
|
||||
message = f"{message}{punctuation}"
|
||||
return message
|
||||
if not include_title:
|
||||
return PULL_REQUEST_REASSIGNED_TEMPLATE.format(
|
||||
user_name=get_user_name(payload),
|
||||
number=pr["id"],
|
||||
url=pr["links"]["self"][0]["href"],
|
||||
number=pr["id"].tame(check_int),
|
||||
url=pr["links"]["self"][0]["href"].tame(check_string),
|
||||
assignees=assignees_string,
|
||||
)
|
||||
return PULL_REQUEST_REASSIGNED_TEMPLATE_WITH_TITLE.format(
|
||||
user_name=get_user_name(payload),
|
||||
number=pr["id"],
|
||||
url=pr["links"]["self"][0]["href"],
|
||||
number=pr["id"].tame(check_int),
|
||||
url=pr["links"]["self"][0]["href"].tame(check_string),
|
||||
assignees=assignees_string,
|
||||
title=pr["title"],
|
||||
title=pr["title"].tame(check_string),
|
||||
)
|
||||
|
||||
|
||||
def pr_handler(
|
||||
payload: Dict[str, Any],
|
||||
payload: WildValue,
|
||||
action: str,
|
||||
branches: Optional[str],
|
||||
include_title: Optional[str],
|
||||
) -> List[Dict[str, str]]:
|
||||
pr = payload["pullRequest"]
|
||||
subject = get_pr_subject(
|
||||
pr["toRef"]["repository"]["name"], type="PR", id=pr["id"], title=pr["title"]
|
||||
pr["toRef"]["repository"]["name"].tame(check_string),
|
||||
type="PR",
|
||||
id=pr["id"].tame(check_int),
|
||||
title=pr["title"].tame(check_string),
|
||||
)
|
||||
if action in ["opened", "modified"]:
|
||||
body = get_pr_opened_or_modified_body(payload, action, include_title)
|
||||
@@ -355,25 +366,28 @@ def pr_handler(
|
||||
|
||||
|
||||
def pr_comment_handler(
|
||||
payload: Dict[str, Any],
|
||||
payload: WildValue,
|
||||
action: str,
|
||||
branches: Optional[str],
|
||||
include_title: Optional[str],
|
||||
) -> List[Dict[str, str]]:
|
||||
pr = payload["pullRequest"]
|
||||
subject = get_pr_subject(
|
||||
pr["toRef"]["repository"]["name"], type="PR", id=pr["id"], title=pr["title"]
|
||||
pr["toRef"]["repository"]["name"].tame(check_string),
|
||||
type="PR",
|
||||
id=pr["id"].tame(check_int),
|
||||
title=pr["title"].tame(check_string),
|
||||
)
|
||||
message = payload["comment"]["text"]
|
||||
message = payload["comment"]["text"].tame(check_string)
|
||||
if action == "deleted their comment on":
|
||||
message = f"~~{message}~~"
|
||||
body = get_pull_request_event_message(
|
||||
user_name=get_user_name(payload),
|
||||
action=action,
|
||||
url=pr["links"]["self"][0]["href"],
|
||||
number=pr["id"],
|
||||
url=pr["links"]["self"][0]["href"].tame(check_string),
|
||||
number=pr["id"].tame(check_int),
|
||||
message=message,
|
||||
title=pr["title"] if include_title else None,
|
||||
title=pr["title"].tame(check_string) if include_title else None,
|
||||
)
|
||||
|
||||
return [{"subject": subject, "body": body}]
|
||||
@@ -381,7 +395,7 @@ def pr_comment_handler(
|
||||
|
||||
class EventHandler(Protocol):
|
||||
def __call__(
|
||||
self, payload: Dict[str, Any], branches: Optional[str], include_title: Optional[str]
|
||||
self, payload: WildValue, branches: Optional[str], include_title: Optional[str]
|
||||
) -> List[Dict[str, str]]:
|
||||
...
|
||||
|
||||
@@ -416,13 +430,13 @@ ALL_EVENT_TYPES = list(EVENT_HANDLER_MAP.keys())
|
||||
def api_bitbucket3_webhook(
|
||||
request: HttpRequest,
|
||||
user_profile: UserProfile,
|
||||
payload: Dict[str, Any] = REQ(argument_type="body"),
|
||||
payload: WildValue = REQ(argument_type="body", converter=to_wild_value),
|
||||
branches: Optional[str] = REQ(default=None),
|
||||
user_specified_topic: Optional[str] = REQ("topic", default=None),
|
||||
) -> HttpResponse:
|
||||
try:
|
||||
eventkey = payload["eventKey"]
|
||||
except KeyError:
|
||||
if "eventKey" in payload:
|
||||
eventkey = payload["eventKey"].tame(check_string)
|
||||
else:
|
||||
eventkey = request.META["HTTP_X_EVENT_KEY"]
|
||||
handler = EVENT_HANDLER_MAP.get(eventkey)
|
||||
if handler is None:
|
||||
|
||||
Reference in New Issue
Block a user