diff --git a/zerver/webhooks/pagerduty/view.py b/zerver/webhooks/pagerduty/view.py index 5228a68eda..4b88fb36a5 100644 --- a/zerver/webhooks/pagerduty/view.py +++ b/zerver/webhooks/pagerduty/view.py @@ -1,5 +1,5 @@ from email.headerregistry import Address -from typing import Any, Dict +from typing import Dict, Union from django.http import HttpRequest, HttpResponse @@ -7,9 +7,12 @@ from zerver.decorator import webhook_view from zerver.lib.exceptions import UnsupportedWebhookEventType from zerver.lib.request import REQ, has_request_variables from zerver.lib.response import json_success +from zerver.lib.validator import WildValue, check_int, check_none_or, check_string, to_wild_value from zerver.lib.webhooks.common import check_send_webhook_message from zerver.models import UserProfile +FormatDictType = Dict[str, Union[str, int]] + PAGER_DUTY_EVENT_NAMES = { "incident.trigger": "triggered", "incident.acknowledge": "acknowledged", @@ -80,41 +83,45 @@ Incident [{incident_num_title}]({incident_url}) resolved. """.strip() -def build_pagerduty_formatdict(message: Dict[str, Any]) -> Dict[str, Any]: - format_dict: Dict[str, Any] = {} - format_dict["action"] = PAGER_DUTY_EVENT_NAMES[message["type"]] +def build_pagerduty_formatdict(message: WildValue) -> FormatDictType: + format_dict: FormatDictType = {} + format_dict["action"] = PAGER_DUTY_EVENT_NAMES[message["type"].tame(check_string)] - format_dict["incident_id"] = message["data"]["incident"]["id"] - format_dict["incident_num_title"] = message["data"]["incident"]["incident_number"] - format_dict["incident_url"] = message["data"]["incident"]["html_url"] + format_dict["incident_id"] = message["data"]["incident"]["id"].tame(check_string) + format_dict["incident_num_title"] = message["data"]["incident"]["incident_number"].tame( + check_int + ) + format_dict["incident_url"] = message["data"]["incident"]["html_url"].tame(check_string) - format_dict["service_name"] = message["data"]["incident"]["service"]["name"] - format_dict["service_url"] = message["data"]["incident"]["service"]["html_url"] + format_dict["service_name"] = message["data"]["incident"]["service"]["name"].tame(check_string) + format_dict["service_url"] = message["data"]["incident"]["service"]["html_url"].tame( + check_string + ) - if message["data"]["incident"].get("assigned_to_user", None): + if message["data"]["incident"].get("assigned_to_user"): assigned_to_user = message["data"]["incident"]["assigned_to_user"] format_dict["assignee_info"] = AGENT_TEMPLATE.format( - username=Address(addr_spec=assigned_to_user["email"]).username, - url=assigned_to_user["html_url"], + username=Address(addr_spec=assigned_to_user["email"].tame(check_string)).username, + url=assigned_to_user["html_url"].tame(check_string), ) else: format_dict["assignee_info"] = "nobody" - if message["data"]["incident"].get("resolved_by_user", None): + if message["data"]["incident"].get("resolved_by_user"): resolved_by_user = message["data"]["incident"]["resolved_by_user"] format_dict["agent_info"] = AGENT_TEMPLATE.format( - username=Address(addr_spec=resolved_by_user["email"]).username, - url=resolved_by_user["html_url"], + username=Address(addr_spec=resolved_by_user["email"].tame(check_string)).username, + url=resolved_by_user["html_url"].tame(check_string), ) trigger_message = [] - trigger_summary_data = message["data"]["incident"]["trigger_summary_data"] - if trigger_summary_data is not None: - trigger_subject = trigger_summary_data.get("subject", "") + trigger_summary_data = message["data"]["incident"].get("trigger_summary_data") + if trigger_summary_data: + trigger_subject = trigger_summary_data.get("subject", "").tame(check_string) if trigger_subject: trigger_message.append(trigger_subject) - trigger_description = trigger_summary_data.get("description", "") + trigger_description = trigger_summary_data.get("description", "").tame(check_string) if trigger_description: trigger_message.append(trigger_description) @@ -122,22 +129,23 @@ def build_pagerduty_formatdict(message: Dict[str, Any]) -> Dict[str, Any]: return format_dict -def build_pagerduty_formatdict_v2(message: Dict[str, Any]) -> Dict[str, Any]: - format_dict = {} - format_dict["action"] = PAGER_DUTY_EVENT_NAMES_V2[message["event"]] +def build_pagerduty_formatdict_v2(message: WildValue) -> FormatDictType: + format_dict: FormatDictType = {} + format_dict["action"] = PAGER_DUTY_EVENT_NAMES_V2[message["event"].tame(check_string)] - format_dict["incident_id"] = message["incident"]["id"] - format_dict["incident_num_title"] = message["incident"]["incident_number"] - format_dict["incident_url"] = message["incident"]["html_url"] + format_dict["incident_id"] = message["incident"]["id"].tame(check_string) + format_dict["incident_num_title"] = message["incident"]["incident_number"].tame(check_int) + format_dict["incident_url"] = message["incident"]["html_url"].tame(check_string) - format_dict["service_name"] = message["incident"]["service"]["name"] - format_dict["service_url"] = message["incident"]["service"]["html_url"] + format_dict["service_name"] = message["incident"]["service"]["name"].tame(check_string) + format_dict["service_url"] = message["incident"]["service"]["html_url"].tame(check_string) assignments = message["incident"]["assignments"] if assignments: assignee = assignments[0]["assignee"] format_dict["assignee_info"] = AGENT_TEMPLATE.format( - username=assignee["summary"], url=assignee["html_url"] + username=assignee["summary"].tame(check_string), + url=assignee["html_url"].tame(check_string), ) else: format_dict["assignee_info"] = "nobody" @@ -145,34 +153,36 @@ def build_pagerduty_formatdict_v2(message: Dict[str, Any]) -> Dict[str, Any]: last_status_change_by = message["incident"].get("last_status_change_by") if last_status_change_by is not None: format_dict["agent_info"] = AGENT_TEMPLATE.format( - username=last_status_change_by["summary"], - url=last_status_change_by["html_url"], + username=last_status_change_by["summary"].tame(check_string), + url=last_status_change_by["html_url"].tame(check_string), ) - trigger_description = message["incident"].get("description") + trigger_description = message["incident"].get("description").tame(check_none_or(check_string)) if trigger_description is not None: format_dict["trigger_message"] = TRIGGER_MESSAGE.format(message=trigger_description) return format_dict -def build_pagerduty_formatdict_v3(event: Dict[str, Any]) -> Dict[str, Any]: - format_dict = {} - format_dict["action"] = PAGER_DUTY_EVENT_NAMES_V3[event["event_type"]] +def build_pagerduty_formatdict_v3(event: WildValue) -> FormatDictType: + format_dict: FormatDictType = {} + format_dict["action"] = PAGER_DUTY_EVENT_NAMES_V3[event["event_type"].tame(check_string)] - format_dict["incident_id"] = event["data"]["id"] - format_dict["incident_url"] = event["data"]["html_url"] + format_dict["incident_id"] = event["data"]["id"].tame(check_string) + format_dict["incident_url"] = event["data"]["html_url"].tame(check_string) format_dict["incident_num_title"] = NUM_TITLE.format( - incident_num=event["data"]["number"], incident_title=event["data"]["title"] + incident_num=event["data"]["number"].tame(check_int), + incident_title=event["data"]["title"].tame(check_string), ) - format_dict["service_name"] = event["data"]["service"]["summary"] - format_dict["service_url"] = event["data"]["service"]["html_url"] + format_dict["service_name"] = event["data"]["service"]["summary"].tame(check_string) + format_dict["service_url"] = event["data"]["service"]["html_url"].tame(check_string) assignees = event["data"]["assignees"] if assignees: assignee = assignees[0] format_dict["assignee_info"] = AGENT_TEMPLATE.format( - username=assignee["summary"], url=assignee["html_url"] + username=assignee["summary"].tame(check_string), + url=assignee["html_url"].tame(check_string), ) else: format_dict["assignee_info"] = "nobody" @@ -180,8 +190,8 @@ def build_pagerduty_formatdict_v3(event: Dict[str, Any]) -> Dict[str, Any]: agent = event.get("agent") if agent is not None: format_dict["agent_info"] = AGENT_TEMPLATE.format( - username=agent["summary"], - url=agent["html_url"], + username=agent["summary"].tame(check_string), + url=agent["html_url"].tame(check_string), ) # V3 doesn't have trigger_message @@ -191,7 +201,10 @@ def build_pagerduty_formatdict_v3(event: Dict[str, Any]) -> Dict[str, Any]: def send_formated_pagerduty( - request: HttpRequest, user_profile: UserProfile, message_type: str, format_dict: Dict[str, Any] + request: HttpRequest, + user_profile: UserProfile, + message_type: str, + format_dict: FormatDictType, ) -> None: if message_type in ( "incident.trigger", @@ -215,6 +228,7 @@ def send_formated_pagerduty( subject = "Incident {incident_num_title}".format(**format_dict) body = template.format(**format_dict) + assert isinstance(format_dict["action"], str) check_send_webhook_message(request, user_profile, subject, body, format_dict["action"]) @@ -223,13 +237,12 @@ def send_formated_pagerduty( def api_pagerduty_webhook( request: HttpRequest, user_profile: UserProfile, - payload: Dict[str, Any] = REQ(argument_type="body"), + payload: WildValue = REQ(argument_type="body", converter=to_wild_value), ) -> HttpResponse: messages = payload.get("messages") - - if messages is not None: + if messages: for message in messages: - message_type = message.get("type") + message_type = message.get("type").tame(check_none_or(check_string)) # If the message has no "type" key, then this payload came from a # Pagerduty Webhook V2. @@ -243,23 +256,23 @@ def api_pagerduty_webhook( send_formated_pagerduty(request, user_profile, message_type, format_dict) for message in messages: - event = message.get("event") + message_event = message.get("event").tame(check_none_or(check_string)) # If the message has no "event" key, then this payload came from a # Pagerduty Webhook V1. - if event is None: + if message_event is None: break - if event not in PAGER_DUTY_EVENT_NAMES_V2: - raise UnsupportedWebhookEventType(event) + if message_event not in PAGER_DUTY_EVENT_NAMES_V2: + raise UnsupportedWebhookEventType(message_event) format_dict = build_pagerduty_formatdict_v2(message) - send_formated_pagerduty(request, user_profile, event, format_dict) + send_formated_pagerduty(request, user_profile, message_event, format_dict) else: if "event" in payload: # V3 has no "messages" field, and it has key "event" instead event = payload["event"] - event_type = event.get("event_type") + event_type = event.get("event_type").tame(check_none_or(check_string)) if event_type not in PAGER_DUTY_EVENT_NAMES_V3: raise UnsupportedWebhookEventType(event_type)