mirror of
https://github.com/zulip/zulip.git
synced 2025-11-05 22:43:42 +00:00
Since notify is mostly depreciated in the latest versions of CircleCI. Although we can use use notify in CircleCI 2.0 but currently there is no documentation regarding it. We could have use notify here rather than this hacky solution but if we use notify it was not possible to trigger CircleCI webhook only for the main Zulip repository. Also corrected the circle ci webhook for the case where we don't receive previous in post request
48 lines
1.7 KiB
Python
48 lines
1.7 KiB
Python
# Webhooks for external integrations.
|
|
from typing import Any, Dict
|
|
|
|
from django.http import HttpRequest, HttpResponse
|
|
|
|
from zerver.decorator import api_key_only_webhook_view
|
|
from zerver.lib.request import REQ, has_request_variables
|
|
from zerver.lib.response import json_success
|
|
from zerver.lib.webhooks.common import check_send_webhook_message
|
|
from zerver.models import UserProfile
|
|
|
|
CIRCLECI_TOPIC_TEMPLATE = '{repository_name}'
|
|
CIRCLECI_MESSAGE_TEMPLATE = '[Build]({build_url}) triggered by {username} on {branch} branch {status}.'
|
|
|
|
FAILED_STATUS = 'failed'
|
|
|
|
@api_key_only_webhook_view('CircleCI')
|
|
@has_request_variables
|
|
def api_circleci_webhook(request: HttpRequest, user_profile: UserProfile,
|
|
payload: Dict[str, Any]=REQ(argument_type='body')) -> HttpResponse:
|
|
payload = payload['payload']
|
|
subject = get_subject(payload)
|
|
body = get_body(payload)
|
|
|
|
check_send_webhook_message(request, user_profile, subject, body)
|
|
return json_success()
|
|
|
|
def get_subject(payload: Dict[str, Any]) -> str:
|
|
return CIRCLECI_TOPIC_TEMPLATE.format(repository_name=payload['reponame'])
|
|
|
|
def get_body(payload: Dict[str, Any]) -> str:
|
|
data = {
|
|
'build_url': payload['build_url'],
|
|
'username': payload['username'],
|
|
'branch': payload['branch'],
|
|
'status': get_status(payload)
|
|
}
|
|
return CIRCLECI_MESSAGE_TEMPLATE.format(**data)
|
|
|
|
def get_status(payload: Dict[str, Any]) -> str:
|
|
status = payload['status']
|
|
previous = payload.get('previous', None)
|
|
if previous and previous['status'] == FAILED_STATUS and status == FAILED_STATUS:
|
|
return 'is still failing'
|
|
if status == 'success':
|
|
return 'succeeded'
|
|
return status
|