mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			117 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			117 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Webhooks for external integrations.
 | 
						|
from typing import Any, Dict, Optional
 | 
						|
 | 
						|
from django.db.models import Q
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
 | 
						|
from zerver.decorator import api_key_only_webhook_view
 | 
						|
from zerver.lib.request import REQ, has_request_variables
 | 
						|
from zerver.lib.response import json_success
 | 
						|
from zerver.lib.webhooks.common import UnexpectedWebhookEventType, check_send_webhook_message
 | 
						|
from zerver.models import Realm, UserProfile
 | 
						|
 | 
						|
IGNORED_EVENTS = [
 | 
						|
    "downloadChart",
 | 
						|
    "deleteChart",
 | 
						|
    "uploadChart",
 | 
						|
    "pullImage",
 | 
						|
    "deleteImage",
 | 
						|
    "scanningFailed",
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
def guess_zulip_user_from_harbor(harbor_username: str, realm: Realm) -> Optional[UserProfile]:
 | 
						|
    try:
 | 
						|
        # Try to find a matching user in Zulip
 | 
						|
        # We search a user's full name, short name,
 | 
						|
        # and beginning of email address
 | 
						|
        user = UserProfile.objects.filter(
 | 
						|
            Q(full_name__iexact=harbor_username) |
 | 
						|
            Q(email__istartswith=harbor_username),
 | 
						|
            is_active=True,
 | 
						|
            realm=realm).order_by("id")[0]
 | 
						|
        return user  # nocoverage
 | 
						|
    except IndexError:
 | 
						|
        return None
 | 
						|
 | 
						|
 | 
						|
def handle_push_image_event(payload: Dict[str, Any],
 | 
						|
                            user_profile: UserProfile,
 | 
						|
                            operator_username: str) -> str:
 | 
						|
    image_name = payload["event_data"]["repository"]["repo_full_name"]
 | 
						|
    image_tag = payload["event_data"]["resources"][0]["tag"]
 | 
						|
 | 
						|
    return f"{operator_username} pushed image `{image_name}:{image_tag}`"
 | 
						|
 | 
						|
 | 
						|
VULNERABILITY_SEVERITY_NAME_MAP = {
 | 
						|
    1: "None",
 | 
						|
    2: "Unknown",
 | 
						|
    3: "Low",
 | 
						|
    4: "Medium",
 | 
						|
    5: "High",
 | 
						|
}
 | 
						|
 | 
						|
SCANNING_COMPLETED_TEMPLATE = """
 | 
						|
Image scan completed for `{image_name}:{image_tag}`. Vulnerabilities by severity:
 | 
						|
 | 
						|
{scan_results}
 | 
						|
""".strip()
 | 
						|
 | 
						|
 | 
						|
def handle_scanning_completed_event(payload: Dict[str, Any],
 | 
						|
                                    user_profile: UserProfile,
 | 
						|
                                    operator_username: str) -> str:
 | 
						|
    scan_results = ""
 | 
						|
    scan_summaries = payload["event_data"]["resources"][0]["scan_overview"]["components"]["summary"]
 | 
						|
    summaries_sorted = sorted(
 | 
						|
        scan_summaries, key=lambda x: x["severity"], reverse=True)
 | 
						|
    for scan_summary in summaries_sorted:
 | 
						|
        scan_results += "* {}: **{}**\n".format(
 | 
						|
            VULNERABILITY_SEVERITY_NAME_MAP[scan_summary["severity"]], scan_summary["count"])
 | 
						|
 | 
						|
    return SCANNING_COMPLETED_TEMPLATE.format(
 | 
						|
        image_name=payload["event_data"]["repository"]["repo_full_name"],
 | 
						|
        image_tag=payload["event_data"]["resources"][0]["tag"],
 | 
						|
        scan_results=scan_results,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
EVENT_FUNCTION_MAPPER = {
 | 
						|
    "pushImage": handle_push_image_event,
 | 
						|
    "scanningCompleted": handle_scanning_completed_event,
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
@api_key_only_webhook_view("Harbor")
 | 
						|
@has_request_variables
 | 
						|
def api_harbor_webhook(request: HttpRequest, user_profile: UserProfile,
 | 
						|
                       payload: Dict[str, Any] = REQ(argument_type='body')) -> HttpResponse:
 | 
						|
 | 
						|
    operator_username = "**{}**".format(payload["operator"])
 | 
						|
 | 
						|
    if operator_username != "auto":
 | 
						|
        operator_profile = guess_zulip_user_from_harbor(
 | 
						|
            operator_username, user_profile.realm)
 | 
						|
 | 
						|
    if operator_profile:
 | 
						|
        operator_username = f"@**{operator_profile.full_name}**"  # nocoverage
 | 
						|
 | 
						|
    event = payload["type"]
 | 
						|
    topic = payload["event_data"]["repository"]["repo_full_name"]
 | 
						|
 | 
						|
    if event in IGNORED_EVENTS:
 | 
						|
        return json_success()
 | 
						|
 | 
						|
    content_func = EVENT_FUNCTION_MAPPER.get(event)
 | 
						|
 | 
						|
    if content_func is None:
 | 
						|
        raise UnexpectedWebhookEventType('Harbor', event)
 | 
						|
 | 
						|
    content: str = content_func(payload, user_profile, operator_username)
 | 
						|
 | 
						|
    check_send_webhook_message(request, user_profile,
 | 
						|
                               topic, content,
 | 
						|
                               unquote_url_parameters=True)
 | 
						|
    return json_success()
 |