pagerduty: Strengthen types using WildValue.

This commit is contained in:
Hari Prashant Bhimaraju
2022-07-27 07:16:55 +05:30
committed by Tim Abbott
parent 736e10d9f7
commit 6e2c1768c9

View File

@@ -1,5 +1,5 @@
from email.headerregistry import Address
from typing import Any, Dict
from typing import Dict, Union
from django.http import HttpRequest, HttpResponse
@@ -7,9 +7,12 @@ from zerver.decorator import webhook_view
from zerver.lib.exceptions import UnsupportedWebhookEventType
from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_success
from zerver.lib.validator import WildValue, check_int, check_none_or, check_string, to_wild_value
from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.models import UserProfile
FormatDictType = Dict[str, Union[str, int]]
PAGER_DUTY_EVENT_NAMES = {
"incident.trigger": "triggered",
"incident.acknowledge": "acknowledged",
@@ -80,41 +83,45 @@ Incident [{incident_num_title}]({incident_url}) resolved.
""".strip()
def build_pagerduty_formatdict(message: Dict[str, Any]) -> Dict[str, Any]:
format_dict: Dict[str, Any] = {}
format_dict["action"] = PAGER_DUTY_EVENT_NAMES[message["type"]]
def build_pagerduty_formatdict(message: WildValue) -> FormatDictType:
format_dict: FormatDictType = {}
format_dict["action"] = PAGER_DUTY_EVENT_NAMES[message["type"].tame(check_string)]
format_dict["incident_id"] = message["data"]["incident"]["id"]
format_dict["incident_num_title"] = message["data"]["incident"]["incident_number"]
format_dict["incident_url"] = message["data"]["incident"]["html_url"]
format_dict["incident_id"] = message["data"]["incident"]["id"].tame(check_string)
format_dict["incident_num_title"] = message["data"]["incident"]["incident_number"].tame(
check_int
)
format_dict["incident_url"] = message["data"]["incident"]["html_url"].tame(check_string)
format_dict["service_name"] = message["data"]["incident"]["service"]["name"]
format_dict["service_url"] = message["data"]["incident"]["service"]["html_url"]
format_dict["service_name"] = message["data"]["incident"]["service"]["name"].tame(check_string)
format_dict["service_url"] = message["data"]["incident"]["service"]["html_url"].tame(
check_string
)
if message["data"]["incident"].get("assigned_to_user", None):
if message["data"]["incident"].get("assigned_to_user"):
assigned_to_user = message["data"]["incident"]["assigned_to_user"]
format_dict["assignee_info"] = AGENT_TEMPLATE.format(
username=Address(addr_spec=assigned_to_user["email"]).username,
url=assigned_to_user["html_url"],
username=Address(addr_spec=assigned_to_user["email"].tame(check_string)).username,
url=assigned_to_user["html_url"].tame(check_string),
)
else:
format_dict["assignee_info"] = "nobody"
if message["data"]["incident"].get("resolved_by_user", None):
if message["data"]["incident"].get("resolved_by_user"):
resolved_by_user = message["data"]["incident"]["resolved_by_user"]
format_dict["agent_info"] = AGENT_TEMPLATE.format(
username=Address(addr_spec=resolved_by_user["email"]).username,
url=resolved_by_user["html_url"],
username=Address(addr_spec=resolved_by_user["email"].tame(check_string)).username,
url=resolved_by_user["html_url"].tame(check_string),
)
trigger_message = []
trigger_summary_data = message["data"]["incident"]["trigger_summary_data"]
if trigger_summary_data is not None:
trigger_subject = trigger_summary_data.get("subject", "")
trigger_summary_data = message["data"]["incident"].get("trigger_summary_data")
if trigger_summary_data:
trigger_subject = trigger_summary_data.get("subject", "").tame(check_string)
if trigger_subject:
trigger_message.append(trigger_subject)
trigger_description = trigger_summary_data.get("description", "")
trigger_description = trigger_summary_data.get("description", "").tame(check_string)
if trigger_description:
trigger_message.append(trigger_description)
@@ -122,22 +129,23 @@ def build_pagerduty_formatdict(message: Dict[str, Any]) -> Dict[str, Any]:
return format_dict
def build_pagerduty_formatdict_v2(message: Dict[str, Any]) -> Dict[str, Any]:
format_dict = {}
format_dict["action"] = PAGER_DUTY_EVENT_NAMES_V2[message["event"]]
def build_pagerduty_formatdict_v2(message: WildValue) -> FormatDictType:
format_dict: FormatDictType = {}
format_dict["action"] = PAGER_DUTY_EVENT_NAMES_V2[message["event"].tame(check_string)]
format_dict["incident_id"] = message["incident"]["id"]
format_dict["incident_num_title"] = message["incident"]["incident_number"]
format_dict["incident_url"] = message["incident"]["html_url"]
format_dict["incident_id"] = message["incident"]["id"].tame(check_string)
format_dict["incident_num_title"] = message["incident"]["incident_number"].tame(check_int)
format_dict["incident_url"] = message["incident"]["html_url"].tame(check_string)
format_dict["service_name"] = message["incident"]["service"]["name"]
format_dict["service_url"] = message["incident"]["service"]["html_url"]
format_dict["service_name"] = message["incident"]["service"]["name"].tame(check_string)
format_dict["service_url"] = message["incident"]["service"]["html_url"].tame(check_string)
assignments = message["incident"]["assignments"]
if assignments:
assignee = assignments[0]["assignee"]
format_dict["assignee_info"] = AGENT_TEMPLATE.format(
username=assignee["summary"], url=assignee["html_url"]
username=assignee["summary"].tame(check_string),
url=assignee["html_url"].tame(check_string),
)
else:
format_dict["assignee_info"] = "nobody"
@@ -145,34 +153,36 @@ def build_pagerduty_formatdict_v2(message: Dict[str, Any]) -> Dict[str, Any]:
last_status_change_by = message["incident"].get("last_status_change_by")
if last_status_change_by is not None:
format_dict["agent_info"] = AGENT_TEMPLATE.format(
username=last_status_change_by["summary"],
url=last_status_change_by["html_url"],
username=last_status_change_by["summary"].tame(check_string),
url=last_status_change_by["html_url"].tame(check_string),
)
trigger_description = message["incident"].get("description")
trigger_description = message["incident"].get("description").tame(check_none_or(check_string))
if trigger_description is not None:
format_dict["trigger_message"] = TRIGGER_MESSAGE.format(message=trigger_description)
return format_dict
def build_pagerduty_formatdict_v3(event: Dict[str, Any]) -> Dict[str, Any]:
format_dict = {}
format_dict["action"] = PAGER_DUTY_EVENT_NAMES_V3[event["event_type"]]
def build_pagerduty_formatdict_v3(event: WildValue) -> FormatDictType:
format_dict: FormatDictType = {}
format_dict["action"] = PAGER_DUTY_EVENT_NAMES_V3[event["event_type"].tame(check_string)]
format_dict["incident_id"] = event["data"]["id"]
format_dict["incident_url"] = event["data"]["html_url"]
format_dict["incident_id"] = event["data"]["id"].tame(check_string)
format_dict["incident_url"] = event["data"]["html_url"].tame(check_string)
format_dict["incident_num_title"] = NUM_TITLE.format(
incident_num=event["data"]["number"], incident_title=event["data"]["title"]
incident_num=event["data"]["number"].tame(check_int),
incident_title=event["data"]["title"].tame(check_string),
)
format_dict["service_name"] = event["data"]["service"]["summary"]
format_dict["service_url"] = event["data"]["service"]["html_url"]
format_dict["service_name"] = event["data"]["service"]["summary"].tame(check_string)
format_dict["service_url"] = event["data"]["service"]["html_url"].tame(check_string)
assignees = event["data"]["assignees"]
if assignees:
assignee = assignees[0]
format_dict["assignee_info"] = AGENT_TEMPLATE.format(
username=assignee["summary"], url=assignee["html_url"]
username=assignee["summary"].tame(check_string),
url=assignee["html_url"].tame(check_string),
)
else:
format_dict["assignee_info"] = "nobody"
@@ -180,8 +190,8 @@ def build_pagerduty_formatdict_v3(event: Dict[str, Any]) -> Dict[str, Any]:
agent = event.get("agent")
if agent is not None:
format_dict["agent_info"] = AGENT_TEMPLATE.format(
username=agent["summary"],
url=agent["html_url"],
username=agent["summary"].tame(check_string),
url=agent["html_url"].tame(check_string),
)
# V3 doesn't have trigger_message
@@ -191,7 +201,10 @@ def build_pagerduty_formatdict_v3(event: Dict[str, Any]) -> Dict[str, Any]:
def send_formated_pagerduty(
request: HttpRequest, user_profile: UserProfile, message_type: str, format_dict: Dict[str, Any]
request: HttpRequest,
user_profile: UserProfile,
message_type: str,
format_dict: FormatDictType,
) -> None:
if message_type in (
"incident.trigger",
@@ -215,6 +228,7 @@ def send_formated_pagerduty(
subject = "Incident {incident_num_title}".format(**format_dict)
body = template.format(**format_dict)
assert isinstance(format_dict["action"], str)
check_send_webhook_message(request, user_profile, subject, body, format_dict["action"])
@@ -223,13 +237,12 @@ def send_formated_pagerduty(
def api_pagerduty_webhook(
request: HttpRequest,
user_profile: UserProfile,
payload: Dict[str, Any] = REQ(argument_type="body"),
payload: WildValue = REQ(argument_type="body", converter=to_wild_value),
) -> HttpResponse:
messages = payload.get("messages")
if messages is not None:
if messages:
for message in messages:
message_type = message.get("type")
message_type = message.get("type").tame(check_none_or(check_string))
# If the message has no "type" key, then this payload came from a
# Pagerduty Webhook V2.
@@ -243,23 +256,23 @@ def api_pagerduty_webhook(
send_formated_pagerduty(request, user_profile, message_type, format_dict)
for message in messages:
event = message.get("event")
message_event = message.get("event").tame(check_none_or(check_string))
# If the message has no "event" key, then this payload came from a
# Pagerduty Webhook V1.
if event is None:
if message_event is None:
break
if event not in PAGER_DUTY_EVENT_NAMES_V2:
raise UnsupportedWebhookEventType(event)
if message_event not in PAGER_DUTY_EVENT_NAMES_V2:
raise UnsupportedWebhookEventType(message_event)
format_dict = build_pagerduty_formatdict_v2(message)
send_formated_pagerduty(request, user_profile, event, format_dict)
send_formated_pagerduty(request, user_profile, message_event, format_dict)
else:
if "event" in payload:
# V3 has no "messages" field, and it has key "event" instead
event = payload["event"]
event_type = event.get("event_type")
event_type = event.get("event_type").tame(check_none_or(check_string))
if event_type not in PAGER_DUTY_EVENT_NAMES_V3:
raise UnsupportedWebhookEventType(event_type)