harbor: Strengthen types using WildValue.

This commit is contained in:
Hari Prashant Bhimaraju
2022-10-08 23:07:05 +05:30
committed by Tim Abbott
parent c692fc400d
commit 8f4133b63e

View File

@@ -1,5 +1,5 @@
# Webhooks for external integrations. # Webhooks for external integrations.
from typing import Any, Dict, Optional from typing import Optional
from django.db.models import Q from django.db.models import Q
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
@@ -8,6 +8,7 @@ from zerver.decorator import webhook_view
from zerver.lib.exceptions import UnsupportedWebhookEventType from zerver.lib.exceptions import UnsupportedWebhookEventType
from zerver.lib.request import REQ, has_request_variables from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_success from zerver.lib.response import json_success
from zerver.lib.validator import WildValue, check_int, check_string, to_wild_value
from zerver.lib.webhooks.common import check_send_webhook_message from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.models import Realm, UserProfile from zerver.models import Realm, UserProfile
@@ -37,10 +38,10 @@ def guess_zulip_user_from_harbor(harbor_username: str, realm: Realm) -> Optional
def handle_push_image_event( def handle_push_image_event(
payload: Dict[str, Any], user_profile: UserProfile, operator_username: str payload: WildValue, user_profile: UserProfile, operator_username: str
) -> str: ) -> str:
image_name = payload["event_data"]["repository"]["repo_full_name"] image_name = payload["event_data"]["repository"]["repo_full_name"].tame(check_string)
image_tag = payload["event_data"]["resources"][0]["tag"] image_tag = payload["event_data"]["resources"][0]["tag"].tame(check_string)
return f"{operator_username} pushed image `{image_name}:{image_tag}`" return f"{operator_username} pushed image `{image_name}:{image_tag}`"
@@ -53,7 +54,7 @@ Image scan completed for `{image_name}:{image_tag}`. Vulnerabilities by severity
def handle_scanning_completed_event( def handle_scanning_completed_event(
payload: Dict[str, Any], user_profile: UserProfile, operator_username: str payload: WildValue, user_profile: UserProfile, operator_username: str
) -> str: ) -> str:
scan_results = "" scan_results = ""
scan_overview = payload["event_data"]["resources"][0]["scan_overview"] scan_overview = payload["event_data"]["resources"][0]["scan_overview"]
@@ -64,13 +65,13 @@ def handle_scanning_completed_event(
]["summary"] ]["summary"]
if len(scan_summaries) > 0: if len(scan_summaries) > 0:
for severity, count in scan_summaries.items(): for severity, count in scan_summaries.items():
scan_results += "* {}: **{}**\n".format(severity, count) scan_results += "* {}: **{}**\n".format(severity, count.tame(check_int))
else: else:
scan_results += "None\n" scan_results += "None\n"
return SCANNING_COMPLETED_TEMPLATE.format( return SCANNING_COMPLETED_TEMPLATE.format(
image_name=payload["event_data"]["repository"]["repo_full_name"], image_name=payload["event_data"]["repository"]["repo_full_name"].tame(check_string),
image_tag=payload["event_data"]["resources"][0]["tag"], image_tag=payload["event_data"]["resources"][0]["tag"].tame(check_string),
scan_results=scan_results, scan_results=scan_results,
) )
@@ -88,10 +89,10 @@ ALL_EVENT_TYPES = list(EVENT_FUNCTION_MAPPER.keys())
def api_harbor_webhook( def api_harbor_webhook(
request: HttpRequest, request: HttpRequest,
user_profile: UserProfile, user_profile: UserProfile,
payload: Dict[str, Any] = REQ(argument_type="body"), payload: WildValue = REQ(argument_type="body", converter=to_wild_value),
) -> HttpResponse: ) -> HttpResponse:
operator_username = "**{}**".format(payload["operator"]) operator_username = "**{}**".format(payload["operator"].tame(check_string))
if operator_username != "auto": if operator_username != "auto":
operator_profile = guess_zulip_user_from_harbor(operator_username, user_profile.realm) operator_profile = guess_zulip_user_from_harbor(operator_username, user_profile.realm)
@@ -99,8 +100,8 @@ def api_harbor_webhook(
if operator_profile: if operator_profile:
operator_username = f"@**{operator_profile.full_name}**" # nocoverage operator_username = f"@**{operator_profile.full_name}**" # nocoverage
event = payload["type"] event = payload["type"].tame(check_string)
topic = payload["event_data"]["repository"]["repo_full_name"] topic = payload["event_data"]["repository"]["repo_full_name"].tame(check_string)
if event in IGNORED_EVENTS: if event in IGNORED_EVENTS:
return json_success(request) return json_success(request)