mirror of
https://github.com/zulip/zulip.git
synced 2025-11-07 07:23:22 +00:00
This is preparatory work towards adding a Topic model. We plan to use the local variable name as 'topic' for the Topic model objects. Currently, we use *topic as the local variable name for topic names. We rename local variables of the form *topic to *topic_name so that we don't need to think about type collisions in individual code paths where we might want to talk about both Topic objects and strings for the topic name.
145 lines
4.9 KiB
Python
145 lines
4.9 KiB
Python
# Webhooks for external integrations.
|
|
from typing import Optional, Tuple
|
|
|
|
from django.http import HttpRequest, HttpResponse
|
|
|
|
from zerver.decorator import webhook_view
|
|
from zerver.lib.response import json_success
|
|
from zerver.lib.typed_endpoint import JsonBodyPayload, typed_endpoint
|
|
from zerver.lib.validator import WildValue, check_int, check_string, check_union
|
|
from zerver.lib.webhooks.common import OptionalUserSpecifiedTopicStr, check_send_webhook_message
|
|
from zerver.models import UserProfile
|
|
|
|
|
|
def is_canarytoken(message: WildValue) -> bool:
|
|
"""
|
|
Requests sent from Thinkst canaries are either from canarytokens or
|
|
canaries, which can be differentiated by the value of the `AlertType`
|
|
field.
|
|
"""
|
|
return message["AlertType"].tame(check_string) == "CanarytokenIncident"
|
|
|
|
|
|
def canary_name(message: WildValue) -> str:
|
|
"""
|
|
Returns the name of the canary or canarytoken.
|
|
"""
|
|
if is_canarytoken(message):
|
|
return message["Reminder"].tame(check_string)
|
|
else:
|
|
return message["CanaryName"].tame(check_string)
|
|
|
|
|
|
def canary_kind(message: WildValue) -> str:
|
|
"""
|
|
Returns a description of the kind of request - canary or canarytoken.
|
|
"""
|
|
if is_canarytoken(message):
|
|
return "canarytoken"
|
|
else:
|
|
return "canary"
|
|
|
|
|
|
def source_ip_and_reverse_dns(message: WildValue) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Extract the source IP and reverse DNS information from a canary request.
|
|
"""
|
|
reverse_dns, source_ip = (None, None)
|
|
|
|
if "SourceIP" in message:
|
|
source_ip = message["SourceIP"].tame(check_string)
|
|
# `ReverseDNS` can sometimes exist and still be empty.
|
|
if "ReverseDNS" in message and message["ReverseDNS"].tame(check_string) != "":
|
|
reverse_dns = message["ReverseDNS"].tame(check_string)
|
|
|
|
return (source_ip, reverse_dns)
|
|
|
|
|
|
def body(message: WildValue) -> str:
|
|
"""
|
|
Construct the response to a canary or canarytoken request.
|
|
"""
|
|
|
|
title = canary_kind(message).title()
|
|
name = canary_name(message)
|
|
body = f"**:alert: {title} *{name}* has been triggered!**\n\n{message['Intro'].tame(check_string)}\n\n"
|
|
|
|
if "IncidentHash" in message:
|
|
body += f"**Incident ID:** `{message['IncidentHash'].tame(check_string)}`\n"
|
|
|
|
if "Token" in message:
|
|
body += f"**Token:** `{message['Token'].tame(check_string)}`\n"
|
|
|
|
if "Description" in message:
|
|
body += f"**Kind:** {message['Description'].tame(check_string)}\n"
|
|
|
|
if "Timestamp" in message:
|
|
body += f"**Timestamp:** {message['Timestamp'].tame(check_string)}\n"
|
|
|
|
if "CanaryIP" in message:
|
|
body += f"**Canary IP:** `{message['CanaryIP'].tame(check_string)}`\n"
|
|
|
|
if "CanaryLocation" in message:
|
|
body += f"**Canary location:** {message['CanaryLocation'].tame(check_string)}\n"
|
|
|
|
if "Triggered" in message:
|
|
unit = "times" if message["Triggered"].tame(check_int) > 1 else "time"
|
|
body += f"**Triggered:** {message['Triggered'].tame(check_int)} {unit}\n"
|
|
|
|
source_ip, reverse_dns = source_ip_and_reverse_dns(message)
|
|
if source_ip:
|
|
body += f"**Source IP:** `{source_ip}`\n"
|
|
if reverse_dns:
|
|
body += f"**Reverse DNS:** `{reverse_dns}`\n"
|
|
|
|
if "AdditionalDetails" in message:
|
|
for detail in message["AdditionalDetails"]:
|
|
key = detail[0].tame(check_string)
|
|
value = detail[1].tame(check_union([check_string, check_int]))
|
|
if isinstance(value, str) and "*" in value:
|
|
# Thinkst sends passwords as a series of stars which can mess with
|
|
# formatting, so wrap these in backticks.
|
|
body += f"**{key}:** `{value}`\n"
|
|
else:
|
|
body += f"**{key}:** {value}\n"
|
|
|
|
return body
|
|
|
|
|
|
@webhook_view("Thinkst")
|
|
@typed_endpoint
|
|
def api_thinkst_webhook(
|
|
request: HttpRequest,
|
|
user_profile: UserProfile,
|
|
*,
|
|
message: JsonBodyPayload[WildValue],
|
|
user_specified_topic: OptionalUserSpecifiedTopicStr = None,
|
|
) -> HttpResponse:
|
|
"""
|
|
Construct a response to a webhook event from a Thinkst canary or canarytoken.
|
|
|
|
Thinkst offers public canarytokens with canarytokens.org and with their canary
|
|
product, but the schema returned by these identically named services are
|
|
completely different - canarytokens from canarytokens.org are handled by a
|
|
different Zulip integration.
|
|
|
|
Thinkst's documentation for the schema is linked below, but in practice the JSON
|
|
received doesn't always conform.
|
|
|
|
https://help.canary.tools/hc/en-gb/articles/360002426577-How-do-I-configure-notifications-for-a-Generic-Webhook-
|
|
"""
|
|
|
|
response = body(message)
|
|
|
|
topic_name = None
|
|
if user_specified_topic:
|
|
topic_name = user_specified_topic
|
|
else:
|
|
name = canary_name(message)
|
|
kind = canary_kind(message)
|
|
|
|
topic_name = f"{kind} alert - {name}"
|
|
|
|
check_send_webhook_message(request, user_profile, topic_name, response)
|
|
return json_success(request)
|