mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			118 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			118 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Webhooks for external integrations.
 | 
						|
from typing import Any, Dict, Optional
 | 
						|
 | 
						|
from django.db.models import Q
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
 | 
						|
from zerver.decorator import webhook_view
 | 
						|
from zerver.lib.exceptions import UnsupportedWebhookEventType
 | 
						|
from zerver.lib.request import REQ, has_request_variables
 | 
						|
from zerver.lib.response import json_success
 | 
						|
from zerver.lib.webhooks.common import check_send_webhook_message
 | 
						|
from zerver.models import Realm, UserProfile
 | 
						|
 | 
						|
IGNORED_EVENTS = [
 | 
						|
    "downloadChart",
 | 
						|
    "deleteChart",
 | 
						|
    "uploadChart",
 | 
						|
    "pullImage",
 | 
						|
    "deleteImage",
 | 
						|
    "scanningFailed",
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
def guess_zulip_user_from_harbor(harbor_username: str, realm: Realm) -> Optional[UserProfile]:
 | 
						|
    try:
 | 
						|
        # Try to find a matching user in Zulip
 | 
						|
        # We search a user's full name, short name,
 | 
						|
        # and beginning of email address
 | 
						|
        user = UserProfile.objects.filter(
 | 
						|
            Q(full_name__iexact=harbor_username) | Q(email__istartswith=harbor_username),
 | 
						|
            is_active=True,
 | 
						|
            realm=realm,
 | 
						|
        ).order_by("id")[0]
 | 
						|
        return user  # nocoverage
 | 
						|
    except IndexError:
 | 
						|
        return None
 | 
						|
 | 
						|
 | 
						|
def handle_push_image_event(
 | 
						|
    payload: Dict[str, Any], user_profile: UserProfile, operator_username: str
 | 
						|
) -> str:
 | 
						|
    image_name = payload["event_data"]["repository"]["repo_full_name"]
 | 
						|
    image_tag = payload["event_data"]["resources"][0]["tag"]
 | 
						|
 | 
						|
    return f"{operator_username} pushed image `{image_name}:{image_tag}`"
 | 
						|
 | 
						|
 | 
						|
VULNERABILITY_SEVERITY_NAME_MAP = {
 | 
						|
    1: "None",
 | 
						|
    2: "Unknown",
 | 
						|
    3: "Low",
 | 
						|
    4: "Medium",
 | 
						|
    5: "High",
 | 
						|
}
 | 
						|
 | 
						|
SCANNING_COMPLETED_TEMPLATE = """
 | 
						|
Image scan completed for `{image_name}:{image_tag}`. Vulnerabilities by severity:
 | 
						|
 | 
						|
{scan_results}
 | 
						|
""".strip()
 | 
						|
 | 
						|
 | 
						|
def handle_scanning_completed_event(
 | 
						|
    payload: Dict[str, Any], user_profile: UserProfile, operator_username: str
 | 
						|
) -> str:
 | 
						|
    scan_results = ""
 | 
						|
    scan_summaries = payload["event_data"]["resources"][0]["scan_overview"]["components"]["summary"]
 | 
						|
    summaries_sorted = sorted(scan_summaries, key=lambda x: x["severity"], reverse=True)
 | 
						|
    for scan_summary in summaries_sorted:
 | 
						|
        scan_results += "* {}: **{}**\n".format(
 | 
						|
            VULNERABILITY_SEVERITY_NAME_MAP[scan_summary["severity"]], scan_summary["count"]
 | 
						|
        )
 | 
						|
 | 
						|
    return SCANNING_COMPLETED_TEMPLATE.format(
 | 
						|
        image_name=payload["event_data"]["repository"]["repo_full_name"],
 | 
						|
        image_tag=payload["event_data"]["resources"][0]["tag"],
 | 
						|
        scan_results=scan_results,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
EVENT_FUNCTION_MAPPER = {
 | 
						|
    "pushImage": handle_push_image_event,
 | 
						|
    "scanningCompleted": handle_scanning_completed_event,
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
@webhook_view("Harbor")
 | 
						|
@has_request_variables
 | 
						|
def api_harbor_webhook(
 | 
						|
    request: HttpRequest,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    payload: Dict[str, Any] = REQ(argument_type="body"),
 | 
						|
) -> HttpResponse:
 | 
						|
 | 
						|
    operator_username = "**{}**".format(payload["operator"])
 | 
						|
 | 
						|
    if operator_username != "auto":
 | 
						|
        operator_profile = guess_zulip_user_from_harbor(operator_username, user_profile.realm)
 | 
						|
 | 
						|
    if operator_profile:
 | 
						|
        operator_username = f"@**{operator_profile.full_name}**"  # nocoverage
 | 
						|
 | 
						|
    event = payload["type"]
 | 
						|
    topic = payload["event_data"]["repository"]["repo_full_name"]
 | 
						|
 | 
						|
    if event in IGNORED_EVENTS:
 | 
						|
        return json_success()
 | 
						|
 | 
						|
    content_func = EVENT_FUNCTION_MAPPER.get(event)
 | 
						|
 | 
						|
    if content_func is None:
 | 
						|
        raise UnsupportedWebhookEventType(event)
 | 
						|
 | 
						|
    content: str = content_func(payload, user_profile, operator_username)
 | 
						|
 | 
						|
    check_send_webhook_message(request, user_profile, topic, content, unquote_url_parameters=True)
 | 
						|
    return json_success()
 |