diff --git a/zerver/lib/webhooks/common.py b/zerver/lib/webhooks/common.py index 8b5ba3aefa..0fd59c4a18 100644 --- a/zerver/lib/webhooks/common.py +++ b/zerver/lib/webhooks/common.py @@ -1,4 +1,6 @@ import fnmatch +import hashlib +import hmac import importlib from collections.abc import Callable from dataclasses import dataclass @@ -6,7 +8,9 @@ from datetime import datetime from typing import Annotated, Any, TypeAlias from urllib.parse import unquote +from django.conf import settings from django.http import HttpRequest +from django.utils.encoding import force_bytes from django.utils.translation import gettext as _ from pydantic import Json from typing_extensions import override @@ -280,3 +284,34 @@ def parse_multipart_string(body: str) -> dict[str, str]: data[field_name] = body return data + + +def validate_webhook_signature( + request: HttpRequest, payload: str, signature: str, algorithm: str = "sha256" +) -> None: + if not settings.VERIFY_WEBHOOK_SIGNATURES: # nocoverage + return + + if algorithm not in hashlib.algorithms_available: + raise AssertionError( + _("The algorithm '{algorithm}' is not supported.").format(algorithm=algorithm) + ) + + webhook_secret: str | None = request.GET.get("webhook_secret") + if webhook_secret is None: + raise JsonableError( + _( + "The webhook secret is missing. Please set the webhook_secret while generating the URL." + ) + ) + webhook_secret_bytes = force_bytes(webhook_secret) + payload_bytes = force_bytes(payload) + + signed_payload = hmac.new( + webhook_secret_bytes, + payload_bytes, + algorithm, + ).hexdigest() + + if signed_payload != signature: + raise JsonableError(_("Webhook signature verification failed.")) diff --git a/zerver/tests/test_webhooks_common.py b/zerver/tests/test_webhooks_common.py index 7c2fdc777b..19a3032f56 100644 --- a/zerver/tests/test_webhooks_common.py +++ b/zerver/tests/test_webhooks_common.py @@ -1,8 +1,12 @@ +import hashlib +import hmac from types import SimpleNamespace from unittest.mock import MagicMock, patch -from django.http import HttpRequest +from django.http import HttpRequest, QueryDict from django.http.response import HttpResponse +from django.test import override_settings +from django.utils.encoding import force_bytes from typing_extensions import override from zerver.actions.streams import do_rename_stream @@ -18,6 +22,7 @@ from zerver.lib.webhooks.common import ( get_fixture_http_headers, standardize_headers, validate_extract_webhook_http_header, + validate_webhook_signature, ) from zerver.models import UserProfile from zerver.models.realms import get_realm @@ -140,6 +145,37 @@ class WebhooksCommonTestCase(ZulipTestCase): expected_djangoified_headers = {"CONTENT_TYPE": "text/plain", "HTTP_X_EVENT_TYPE": "ping"} self.assertEqual(djangoified_headers, expected_djangoified_headers) + @override_settings(VERIFY_WEBHOOK_SIGNATURES=True) + def test_validate_webhook_signature(self) -> None: + request = HostRequestMock() + request.GET = QueryDict("", mutable=True) + + # Valid signature + webhook_secret = "test_secret" + payload = '{"key": "value"}' + signature = hmac.new( + force_bytes(webhook_secret), force_bytes(payload), hashlib.sha256 + ).hexdigest() + + request.GET.update({"webhook_secret": webhook_secret}) + validate_webhook_signature(request, payload, signature) + + # Invalid signature + invalid_signature = "invalid_signature" + with self.assertRaisesRegex( + JsonableError, + "Webhook signature verification failed.", + ): + validate_webhook_signature(request, payload, invalid_signature) + + # No webhook_secret parameter + request.GET.clear() + with self.assertRaisesRegex( + JsonableError, + "The webhook secret is missing. Please set the webhook_secret while generating the URL.", + ): + validate_webhook_signature(request, payload, signature) + class WebhookURLConfigurationTestCase(WebhookTestCase): CHANNEL_NAME = "helloworld" diff --git a/zproject/default_settings.py b/zproject/default_settings.py index ceb56b17e0..1bdeccc4ea 100644 --- a/zproject/default_settings.py +++ b/zproject/default_settings.py @@ -714,3 +714,6 @@ MAX_PER_USER_MONTHLY_AI_COST: float | None = 0.5 NAVIGATION_TOUR_VIDEO_URL: str | None = ( "https://static.zulipchat.com/static/navigation-tour-video/zulip-10.mp4" ) + +# Webhook signature verification. +VERIFY_WEBHOOK_SIGNATURES = True diff --git a/zproject/test_extra_settings.py b/zproject/test_extra_settings.py index 3d4fea98c8..1d9451e793 100644 --- a/zproject/test_extra_settings.py +++ b/zproject/test_extra_settings.py @@ -294,3 +294,7 @@ RESOLVE_TOPIC_UNDO_GRACE_PERIOD_SECONDS = 0 KATEX_SERVER = False ROOT_DOMAIN_LANDING_PAGE = False + +# Disable verifying webhook signatures in tests by default. +# Tests that intend to verify webhook signatures should override this setting. +VERIFY_WEBHOOK_SIGNATURES = False