mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1503 lines
		
	
	
		
			57 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1503 lines
		
	
	
		
			57 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# See https://zulip.readthedocs.io/en/latest/subsystems/notifications.html
 | 
						|
 | 
						|
import asyncio
 | 
						|
import base64
 | 
						|
import copy
 | 
						|
import logging
 | 
						|
import re
 | 
						|
from dataclasses import dataclass
 | 
						|
from email.headerregistry import Address
 | 
						|
from functools import lru_cache
 | 
						|
from typing import (
 | 
						|
    TYPE_CHECKING,
 | 
						|
    Any,
 | 
						|
    Dict,
 | 
						|
    Iterable,
 | 
						|
    List,
 | 
						|
    Mapping,
 | 
						|
    Optional,
 | 
						|
    Sequence,
 | 
						|
    Tuple,
 | 
						|
    Type,
 | 
						|
    Union,
 | 
						|
)
 | 
						|
 | 
						|
import gcm
 | 
						|
import lxml.html
 | 
						|
import orjson
 | 
						|
from django.conf import settings
 | 
						|
from django.db import IntegrityError, transaction
 | 
						|
from django.db.models import F, Q
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
from django.utils.translation import override as override_language
 | 
						|
from typing_extensions import TypeAlias, override
 | 
						|
 | 
						|
from analytics.lib.counts import COUNT_STATS, do_increment_logging_stat
 | 
						|
from zerver.actions.realm_settings import (
 | 
						|
    do_set_push_notifications_enabled_end_timestamp,
 | 
						|
    do_set_realm_property,
 | 
						|
)
 | 
						|
from zerver.lib.avatar import absolute_avatar_url, get_avatar_for_inaccessible_user
 | 
						|
from zerver.lib.emoji_utils import hex_codepoint_to_emoji
 | 
						|
from zerver.lib.exceptions import ErrorCode, JsonableError
 | 
						|
from zerver.lib.message import access_message, huddle_users
 | 
						|
from zerver.lib.outgoing_http import OutgoingSession
 | 
						|
from zerver.lib.remote_server import (
 | 
						|
    send_json_to_push_bouncer,
 | 
						|
    send_server_data_to_push_bouncer,
 | 
						|
    send_to_push_bouncer,
 | 
						|
)
 | 
						|
from zerver.lib.soft_deactivation import soft_reactivate_if_personal_notification
 | 
						|
from zerver.lib.tex import change_katex_to_raw_latex
 | 
						|
from zerver.lib.timestamp import datetime_to_timestamp
 | 
						|
from zerver.lib.users import check_can_access_user
 | 
						|
from zerver.models import (
 | 
						|
    AbstractPushDeviceToken,
 | 
						|
    ArchivedMessage,
 | 
						|
    Message,
 | 
						|
    NotificationTriggers,
 | 
						|
    PushDeviceToken,
 | 
						|
    Realm,
 | 
						|
    Recipient,
 | 
						|
    Stream,
 | 
						|
    UserGroup,
 | 
						|
    UserMessage,
 | 
						|
    UserProfile,
 | 
						|
    get_display_recipient,
 | 
						|
)
 | 
						|
from zerver.models.realms import get_fake_email_domain
 | 
						|
from zerver.models.users import get_user_profile_by_id
 | 
						|
 | 
						|
if TYPE_CHECKING:
 | 
						|
    import aioapns
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
if settings.ZILENCER_ENABLED:
 | 
						|
    from zilencer.models import RemotePushDeviceToken, RemoteZulipServer
 | 
						|
 | 
						|
DeviceToken: TypeAlias = Union[PushDeviceToken, "RemotePushDeviceToken"]
 | 
						|
 | 
						|
 | 
						|
# We store the token as b64, but apns-client wants hex strings
 | 
						|
def b64_to_hex(data: str) -> str:
 | 
						|
    return base64.b64decode(data).hex()
 | 
						|
 | 
						|
 | 
						|
def hex_to_b64(data: str) -> str:
 | 
						|
    return base64.b64encode(bytes.fromhex(data)).decode()
 | 
						|
 | 
						|
 | 
						|
def get_message_stream_name_from_database(message: Message) -> str:
 | 
						|
    """
 | 
						|
    Never use this function outside of the push-notifications
 | 
						|
    codepath. Most of our code knows how to get streams
 | 
						|
    up front in a more efficient manner.
 | 
						|
    """
 | 
						|
    stream_id = message.recipient.type_id
 | 
						|
    return Stream.objects.get(id=stream_id).name
 | 
						|
 | 
						|
 | 
						|
class UserPushIdentityCompat:
 | 
						|
    """Compatibility class for supporting the transition from remote servers
 | 
						|
    sending their UserProfile ids to the bouncer to sending UserProfile uuids instead.
 | 
						|
 | 
						|
    Until we can drop support for receiving user_id, we need this
 | 
						|
    class, because a user's identity in the push notification context
 | 
						|
    may be represented either by an id or uuid.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, user_id: Optional[int] = None, user_uuid: Optional[str] = None) -> None:
 | 
						|
        assert user_id is not None or user_uuid is not None
 | 
						|
        self.user_id = user_id
 | 
						|
        self.user_uuid = user_uuid
 | 
						|
 | 
						|
    def filter_q(self) -> Q:
 | 
						|
        """
 | 
						|
        This aims to support correctly querying for RemotePushDeviceToken.
 | 
						|
        If only one of (user_id, user_uuid) is provided, the situation is trivial,
 | 
						|
        If both are provided, we want to query for tokens matching EITHER the
 | 
						|
        uuid or the id - because the user may have devices with old registrations,
 | 
						|
        so user_id-based, as well as new registration with uuid. Notifications
 | 
						|
        naturally should be sent to both.
 | 
						|
        """
 | 
						|
        if self.user_id is not None and self.user_uuid is None:
 | 
						|
            return Q(user_id=self.user_id)
 | 
						|
        elif self.user_uuid is not None and self.user_id is None:
 | 
						|
            return Q(user_uuid=self.user_uuid)
 | 
						|
        else:
 | 
						|
            assert self.user_id is not None and self.user_uuid is not None
 | 
						|
            return Q(user_uuid=self.user_uuid) | Q(user_id=self.user_id)
 | 
						|
 | 
						|
    @override
 | 
						|
    def __str__(self) -> str:
 | 
						|
        result = ""
 | 
						|
        if self.user_id is not None:
 | 
						|
            result += f"<id:{self.user_id}>"
 | 
						|
        if self.user_uuid is not None:
 | 
						|
            result += f"<uuid:{self.user_uuid}>"
 | 
						|
 | 
						|
        return result
 | 
						|
 | 
						|
    @override
 | 
						|
    def __eq__(self, other: object) -> bool:
 | 
						|
        if isinstance(other, UserPushIdentityCompat):
 | 
						|
            return self.user_id == other.user_id and self.user_uuid == other.user_uuid
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Sending to APNs, for iOS
 | 
						|
#
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class APNsContext:
 | 
						|
    apns: "aioapns.APNs"
 | 
						|
    loop: asyncio.AbstractEventLoop
 | 
						|
 | 
						|
 | 
						|
def has_apns_credentials() -> bool:
 | 
						|
    return settings.APNS_TOKEN_KEY_FILE is not None or settings.APNS_CERT_FILE is not None
 | 
						|
 | 
						|
 | 
						|
@lru_cache(maxsize=None)
 | 
						|
def get_apns_context() -> Optional[APNsContext]:
 | 
						|
    # We lazily do this import as part of optimizing Zulip's base
 | 
						|
    # import time.
 | 
						|
    import aioapns
 | 
						|
 | 
						|
    if not has_apns_credentials():  # nocoverage
 | 
						|
        return None
 | 
						|
 | 
						|
    # NB if called concurrently, this will make excess connections.
 | 
						|
    # That's a little sloppy, but harmless unless a server gets
 | 
						|
    # hammered with a ton of these all at once after startup.
 | 
						|
    loop = asyncio.new_event_loop()
 | 
						|
 | 
						|
    # Defining a no-op error-handling function overrides the default
 | 
						|
    # behaviour of logging at ERROR level whenever delivery fails; we
 | 
						|
    # handle those errors by checking the result in
 | 
						|
    # send_apple_push_notification.
 | 
						|
    async def err_func(
 | 
						|
        request: aioapns.NotificationRequest, result: aioapns.common.NotificationResult
 | 
						|
    ) -> None:
 | 
						|
        pass  # nocoverage
 | 
						|
 | 
						|
    async def make_apns() -> aioapns.APNs:
 | 
						|
        return aioapns.APNs(
 | 
						|
            client_cert=settings.APNS_CERT_FILE,
 | 
						|
            key=settings.APNS_TOKEN_KEY_FILE,
 | 
						|
            key_id=settings.APNS_TOKEN_KEY_ID,
 | 
						|
            team_id=settings.APNS_TEAM_ID,
 | 
						|
            max_connection_attempts=APNS_MAX_RETRIES,
 | 
						|
            use_sandbox=settings.APNS_SANDBOX,
 | 
						|
            err_func=err_func,
 | 
						|
            # The actual APNs topic will vary between notifications,
 | 
						|
            # so we set it there, overriding any value we put here.
 | 
						|
            # We can't just leave this out, though, because then
 | 
						|
            # the constructor attempts to guess.
 | 
						|
            topic="invalid.nonsense",
 | 
						|
        )
 | 
						|
 | 
						|
    apns = loop.run_until_complete(make_apns())
 | 
						|
    return APNsContext(apns=apns, loop=loop)
 | 
						|
 | 
						|
 | 
						|
def modernize_apns_payload(data: Mapping[str, Any]) -> Mapping[str, Any]:
 | 
						|
    """Take a payload in an unknown Zulip version's format, and return in current format."""
 | 
						|
    # TODO this isn't super robust as is -- if a buggy remote server
 | 
						|
    # sends a malformed payload, we are likely to raise an exception.
 | 
						|
    if "message_ids" in data:
 | 
						|
        # The format sent by 1.6.0, from the earliest pre-1.6.0
 | 
						|
        # version with bouncer support up until 613d093d7 pre-1.7.0:
 | 
						|
        #   'alert': str,              # just sender, and text about direct message/mention
 | 
						|
        #   'message_ids': List[int],  # always just one
 | 
						|
        return {
 | 
						|
            "alert": data["alert"],
 | 
						|
            "badge": 0,
 | 
						|
            "custom": {
 | 
						|
                "zulip": {
 | 
						|
                    "message_ids": data["message_ids"],
 | 
						|
                },
 | 
						|
            },
 | 
						|
        }
 | 
						|
    else:
 | 
						|
        # Something already compatible with the current format.
 | 
						|
        # `alert` may be a string, or a dict with `title` and `body`.
 | 
						|
        # In 1.7.0 and 1.7.1, before 0912b5ba8 pre-1.8.0, the only
 | 
						|
        # item in `custom.zulip` is `message_ids`.
 | 
						|
        return data
 | 
						|
 | 
						|
 | 
						|
APNS_MAX_RETRIES = 3
 | 
						|
 | 
						|
 | 
						|
def send_apple_push_notification(
 | 
						|
    user_identity: UserPushIdentityCompat,
 | 
						|
    devices: Sequence[DeviceToken],
 | 
						|
    payload_data: Mapping[str, Any],
 | 
						|
    remote: Optional["RemoteZulipServer"] = None,
 | 
						|
) -> int:
 | 
						|
    if not devices:
 | 
						|
        return 0
 | 
						|
    # We lazily do the APNS imports as part of optimizing Zulip's base
 | 
						|
    # import time; since these are only needed in the push
 | 
						|
    # notification queue worker, it's best to only import them in the
 | 
						|
    # code that needs them.
 | 
						|
    import aioapns
 | 
						|
    import aioapns.exceptions
 | 
						|
 | 
						|
    apns_context = get_apns_context()
 | 
						|
    if apns_context is None:
 | 
						|
        logger.debug(
 | 
						|
            "APNs: Dropping a notification because nothing configured.  "
 | 
						|
            "Set PUSH_NOTIFICATION_BOUNCER_URL (or APNS_CERT_FILE)."
 | 
						|
        )
 | 
						|
        return 0
 | 
						|
 | 
						|
    if remote:
 | 
						|
        assert settings.ZILENCER_ENABLED
 | 
						|
        DeviceTokenClass: Type[AbstractPushDeviceToken] = RemotePushDeviceToken
 | 
						|
    else:
 | 
						|
        DeviceTokenClass = PushDeviceToken
 | 
						|
 | 
						|
    if remote:
 | 
						|
        logger.info(
 | 
						|
            "APNs: Sending notification for remote user %s:%s to %d devices",
 | 
						|
            remote.uuid,
 | 
						|
            user_identity,
 | 
						|
            len(devices),
 | 
						|
        )
 | 
						|
    else:
 | 
						|
        logger.info(
 | 
						|
            "APNs: Sending notification for local user %s to %d devices",
 | 
						|
            user_identity,
 | 
						|
            len(devices),
 | 
						|
        )
 | 
						|
    payload_data = dict(modernize_apns_payload(payload_data))
 | 
						|
    message = {**payload_data.pop("custom", {}), "aps": payload_data}
 | 
						|
 | 
						|
    have_missing_app_id = False
 | 
						|
    for device in devices:
 | 
						|
        if device.ios_app_id is None:
 | 
						|
            # This should be present for all APNs tokens, as an invariant maintained
 | 
						|
            # by the views that add the token to our database.
 | 
						|
            logger.error(
 | 
						|
                "APNs: Missing ios_app_id for user %s device %s", user_identity, device.token
 | 
						|
            )
 | 
						|
            have_missing_app_id = True
 | 
						|
    if have_missing_app_id:
 | 
						|
        devices = [device for device in devices if device.ios_app_id is not None]
 | 
						|
 | 
						|
    async def send_all_notifications() -> Iterable[
 | 
						|
        Tuple[DeviceToken, Union[aioapns.common.NotificationResult, BaseException]]
 | 
						|
    ]:
 | 
						|
        requests = [
 | 
						|
            aioapns.NotificationRequest(
 | 
						|
                apns_topic=device.ios_app_id,
 | 
						|
                device_token=device.token,
 | 
						|
                message=message,
 | 
						|
                time_to_live=24 * 3600,
 | 
						|
            )
 | 
						|
            for device in devices
 | 
						|
        ]
 | 
						|
        results = await asyncio.gather(
 | 
						|
            *(apns_context.apns.send_notification(request) for request in requests),
 | 
						|
            return_exceptions=True,
 | 
						|
        )
 | 
						|
        return zip(devices, results)
 | 
						|
 | 
						|
    results = apns_context.loop.run_until_complete(send_all_notifications())
 | 
						|
 | 
						|
    successfully_sent_count = 0
 | 
						|
    for device, result in results:
 | 
						|
        if isinstance(result, aioapns.exceptions.ConnectionError):
 | 
						|
            logger.error(
 | 
						|
                "APNs: ConnectionError sending for user %s to device %s; check certificate expiration",
 | 
						|
                user_identity,
 | 
						|
                device.token,
 | 
						|
            )
 | 
						|
        elif isinstance(result, BaseException):
 | 
						|
            logger.error(
 | 
						|
                "APNs: Error sending for user %s to device %s",
 | 
						|
                user_identity,
 | 
						|
                device.token,
 | 
						|
                exc_info=result,
 | 
						|
            )
 | 
						|
        elif result.is_successful:
 | 
						|
            successfully_sent_count += 1
 | 
						|
            logger.info(
 | 
						|
                "APNs: Success sending for user %s to device %s", user_identity, device.token
 | 
						|
            )
 | 
						|
        elif result.description in ["Unregistered", "BadDeviceToken", "DeviceTokenNotForTopic"]:
 | 
						|
            logger.info(
 | 
						|
                "APNs: Removing invalid/expired token %s (%s)", device.token, result.description
 | 
						|
            )
 | 
						|
            # We remove all entries for this token (There
 | 
						|
            # could be multiple for different Zulip servers).
 | 
						|
            DeviceTokenClass._default_manager.filter(
 | 
						|
                token=device.token, kind=DeviceTokenClass.APNS
 | 
						|
            ).delete()
 | 
						|
        else:
 | 
						|
            logger.warning(
 | 
						|
                "APNs: Failed to send for user %s to device %s: %s",
 | 
						|
                user_identity,
 | 
						|
                device.token,
 | 
						|
                result.description,
 | 
						|
            )
 | 
						|
 | 
						|
    return successfully_sent_count
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Sending to GCM, for Android
 | 
						|
#
 | 
						|
 | 
						|
 | 
						|
class FCMSession(OutgoingSession):
 | 
						|
    def __init__(self) -> None:
 | 
						|
        # We don't set up retries, since the gcm package does that for us.
 | 
						|
        super().__init__(role="fcm", timeout=5)
 | 
						|
 | 
						|
 | 
						|
def make_gcm_client() -> gcm.GCM:  # nocoverage
 | 
						|
    # From GCM upstream's doc for migrating to FCM:
 | 
						|
    #
 | 
						|
    #   FCM supports HTTP and XMPP protocols that are virtually
 | 
						|
    #   identical to the GCM server protocols, so you don't need to
 | 
						|
    #   update your sending logic for the migration.
 | 
						|
    #
 | 
						|
    #   https://developers.google.com/cloud-messaging/android/android-migrate-fcm
 | 
						|
    #
 | 
						|
    # The one thing we're required to change on the server is the URL of
 | 
						|
    # the endpoint.  So we get to keep using the GCM client library we've
 | 
						|
    # been using (as long as we're happy with it) -- just monkey-patch in
 | 
						|
    # that one change, because the library's API doesn't anticipate that
 | 
						|
    # as a customization point.
 | 
						|
    gcm.gcm.GCM_URL = "https://fcm.googleapis.com/fcm/send"
 | 
						|
    return gcm.GCM(settings.ANDROID_GCM_API_KEY)
 | 
						|
 | 
						|
 | 
						|
if settings.ANDROID_GCM_API_KEY:  # nocoverage
 | 
						|
    gcm_client = make_gcm_client()
 | 
						|
else:
 | 
						|
    gcm_client = None
 | 
						|
 | 
						|
 | 
						|
def has_gcm_credentials() -> bool:  # nocoverage
 | 
						|
    return gcm_client is not None
 | 
						|
 | 
						|
 | 
						|
# This is purely used in testing
 | 
						|
def send_android_push_notification_to_user(
 | 
						|
    user_profile: UserProfile, data: Dict[str, Any], options: Dict[str, Any]
 | 
						|
) -> None:
 | 
						|
    devices = list(PushDeviceToken.objects.filter(user=user_profile, kind=PushDeviceToken.GCM))
 | 
						|
    send_android_push_notification(
 | 
						|
        UserPushIdentityCompat(user_id=user_profile.id), devices, data, options
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def parse_gcm_options(options: Dict[str, Any], data: Dict[str, Any]) -> str:
 | 
						|
    """
 | 
						|
    Parse GCM options, supplying defaults, and raising an error if invalid.
 | 
						|
 | 
						|
    The options permitted here form part of the Zulip notification
 | 
						|
    bouncer's API.  They are:
 | 
						|
 | 
						|
    `priority`: Passed through to GCM; see upstream doc linked below.
 | 
						|
        Zulip servers should always set this; when unset, we guess a value
 | 
						|
        based on the behavior of old server versions.
 | 
						|
 | 
						|
    Including unrecognized options is an error.
 | 
						|
 | 
						|
    For details on options' semantics, see this GCM upstream doc:
 | 
						|
      https://firebase.google.com/docs/cloud-messaging/http-server-ref
 | 
						|
 | 
						|
    Returns `priority`.
 | 
						|
    """
 | 
						|
    priority = options.pop("priority", None)
 | 
						|
    if priority is None:
 | 
						|
        # An older server.  Identify if this seems to be an actual notification.
 | 
						|
        if data.get("event") == "message":
 | 
						|
            priority = "high"
 | 
						|
        else:  # `'event': 'remove'`, presumably
 | 
						|
            priority = "normal"
 | 
						|
    if priority not in ("normal", "high"):
 | 
						|
        raise JsonableError(
 | 
						|
            _(
 | 
						|
                "Invalid GCM option to bouncer: priority {priority!r}",
 | 
						|
            ).format(priority=priority)
 | 
						|
        )
 | 
						|
 | 
						|
    if options:
 | 
						|
        # We're strict about the API; there is no use case for a newer Zulip
 | 
						|
        # server talking to an older bouncer, so we only need to provide
 | 
						|
        # one-way compatibility.
 | 
						|
        raise JsonableError(
 | 
						|
            _(
 | 
						|
                "Invalid GCM options to bouncer: {options}",
 | 
						|
            ).format(options=orjson.dumps(options).decode())
 | 
						|
        )
 | 
						|
 | 
						|
    return priority  # when this grows a second option, can make it a tuple
 | 
						|
 | 
						|
 | 
						|
def send_android_push_notification(
 | 
						|
    user_identity: UserPushIdentityCompat,
 | 
						|
    devices: Sequence[DeviceToken],
 | 
						|
    data: Dict[str, Any],
 | 
						|
    options: Dict[str, Any],
 | 
						|
    remote: Optional["RemoteZulipServer"] = None,
 | 
						|
) -> int:
 | 
						|
    """
 | 
						|
    Send a GCM message to the given devices.
 | 
						|
 | 
						|
    See https://firebase.google.com/docs/cloud-messaging/http-server-ref
 | 
						|
    for the GCM upstream API which this talks to.
 | 
						|
 | 
						|
    data: The JSON object (decoded) to send as the 'data' parameter of
 | 
						|
        the GCM message.
 | 
						|
    options: Additional options to control the GCM message sent.
 | 
						|
        For details, see `parse_gcm_options`.
 | 
						|
    """
 | 
						|
    if not devices:
 | 
						|
        return 0
 | 
						|
    if not gcm_client:
 | 
						|
        logger.debug(
 | 
						|
            "Skipping sending a GCM push notification since "
 | 
						|
            "PUSH_NOTIFICATION_BOUNCER_URL and ANDROID_GCM_API_KEY are both unset"
 | 
						|
        )
 | 
						|
        return 0
 | 
						|
 | 
						|
    if remote:
 | 
						|
        logger.info(
 | 
						|
            "GCM: Sending notification for remote user %s:%s to %d devices",
 | 
						|
            remote.uuid,
 | 
						|
            user_identity,
 | 
						|
            len(devices),
 | 
						|
        )
 | 
						|
    else:
 | 
						|
        logger.info(
 | 
						|
            "GCM: Sending notification for local user %s to %d devices", user_identity, len(devices)
 | 
						|
        )
 | 
						|
    reg_ids = [device.token for device in devices]
 | 
						|
    priority = parse_gcm_options(options, data)
 | 
						|
    try:
 | 
						|
        # See https://firebase.google.com/docs/cloud-messaging/http-server-ref .
 | 
						|
        # Two kwargs `retries` and `session` get eaten by `json_request`;
 | 
						|
        # the rest pass through to the GCM server.
 | 
						|
        #
 | 
						|
        # One initial request plus 2 retries, with 5-second timeouts,
 | 
						|
        # and expected 1 + 2 seconds (the gcm module jitters its
 | 
						|
        # backoff by ±50%, so worst case * 1.5) between them, totals
 | 
						|
        # 18s expected, up to 19.5s worst case.
 | 
						|
        res = gcm_client.json_request(
 | 
						|
            registration_ids=reg_ids,
 | 
						|
            priority=priority,
 | 
						|
            data=data,
 | 
						|
            retries=2,
 | 
						|
            session=FCMSession(),
 | 
						|
        )
 | 
						|
    except OSError:
 | 
						|
        logger.warning("Error while pushing to GCM", exc_info=True)
 | 
						|
        return 0
 | 
						|
 | 
						|
    successfully_sent_count = 0
 | 
						|
    if res and "success" in res:
 | 
						|
        for reg_id, msg_id in res["success"].items():
 | 
						|
            logger.info("GCM: Sent %s as %s", reg_id, msg_id)
 | 
						|
        successfully_sent_count = len(res["success"].keys())
 | 
						|
 | 
						|
    if remote:
 | 
						|
        assert settings.ZILENCER_ENABLED
 | 
						|
        DeviceTokenClass: Type[AbstractPushDeviceToken] = RemotePushDeviceToken
 | 
						|
    else:
 | 
						|
        DeviceTokenClass = PushDeviceToken
 | 
						|
 | 
						|
    # res.canonical will contain results when there are duplicate registrations for the same
 | 
						|
    # device. The "canonical" registration is the latest registration made by the device.
 | 
						|
    # Ref: https://developer.android.com/google/gcm/adv.html#canonical
 | 
						|
    if "canonical" in res:
 | 
						|
        for reg_id, new_reg_id in res["canonical"].items():
 | 
						|
            if reg_id == new_reg_id:
 | 
						|
                # I'm not sure if this should happen. In any case, not really actionable.
 | 
						|
                logger.warning("GCM: Got canonical ref but it already matches our ID %s!", reg_id)
 | 
						|
            elif not DeviceTokenClass._default_manager.filter(
 | 
						|
                token=new_reg_id, kind=DeviceTokenClass.GCM
 | 
						|
            ).exists():
 | 
						|
                # This case shouldn't happen; any time we get a canonical ref it should have been
 | 
						|
                # previously registered in our system.
 | 
						|
                #
 | 
						|
                # That said, recovery is easy: just update the current PDT object to use the new ID.
 | 
						|
                logger.warning(
 | 
						|
                    "GCM: Got canonical ref %s replacing %s but new ID not registered! Updating.",
 | 
						|
                    new_reg_id,
 | 
						|
                    reg_id,
 | 
						|
                )
 | 
						|
                DeviceTokenClass._default_manager.filter(
 | 
						|
                    token=reg_id, kind=DeviceTokenClass.GCM
 | 
						|
                ).update(token=new_reg_id)
 | 
						|
            else:
 | 
						|
                # Since we know the new ID is registered in our system we can just drop the old one.
 | 
						|
                logger.info("GCM: Got canonical ref %s, dropping %s", new_reg_id, reg_id)
 | 
						|
 | 
						|
                DeviceTokenClass._default_manager.filter(
 | 
						|
                    token=reg_id, kind=DeviceTokenClass.GCM
 | 
						|
                ).delete()
 | 
						|
 | 
						|
    if "errors" in res:
 | 
						|
        for error, reg_ids in res["errors"].items():
 | 
						|
            if error in ["NotRegistered", "InvalidRegistration"]:
 | 
						|
                for reg_id in reg_ids:
 | 
						|
                    logger.info("GCM: Removing %s", reg_id)
 | 
						|
                    # We remove all entries for this token (There
 | 
						|
                    # could be multiple for different Zulip servers).
 | 
						|
                    DeviceTokenClass._default_manager.filter(
 | 
						|
                        token=reg_id, kind=DeviceTokenClass.GCM
 | 
						|
                    ).delete()
 | 
						|
            else:
 | 
						|
                for reg_id in reg_ids:
 | 
						|
                    logger.warning("GCM: Delivery to %s failed: %s", reg_id, error)
 | 
						|
 | 
						|
    return successfully_sent_count
 | 
						|
 | 
						|
    # python-gcm handles retrying of the unsent messages.
 | 
						|
    # Ref: https://github.com/geeknam/python-gcm/blob/master/gcm/gcm.py#L497
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Sending to a bouncer
 | 
						|
#
 | 
						|
 | 
						|
 | 
						|
def uses_notification_bouncer() -> bool:
 | 
						|
    return settings.PUSH_NOTIFICATION_BOUNCER_URL is not None
 | 
						|
 | 
						|
 | 
						|
def sends_notifications_directly() -> bool:
 | 
						|
    return has_apns_credentials() and has_gcm_credentials() and not uses_notification_bouncer()
 | 
						|
 | 
						|
 | 
						|
def send_notifications_to_bouncer(
 | 
						|
    user_profile: UserProfile,
 | 
						|
    apns_payload: Dict[str, Any],
 | 
						|
    gcm_payload: Dict[str, Any],
 | 
						|
    gcm_options: Dict[str, Any],
 | 
						|
    android_devices: Sequence[DeviceToken],
 | 
						|
    apple_devices: Sequence[DeviceToken],
 | 
						|
) -> None:
 | 
						|
    if len(android_devices) + len(apple_devices) == 0:
 | 
						|
        logger.info(
 | 
						|
            "Skipping contacting the bouncer for user %s because there are no registered devices",
 | 
						|
            user_profile.id,
 | 
						|
        )
 | 
						|
 | 
						|
        return
 | 
						|
 | 
						|
    post_data = {
 | 
						|
        "user_uuid": str(user_profile.uuid),
 | 
						|
        # user_uuid is the intended future format, but we also need to send user_id
 | 
						|
        # to avoid breaking old mobile registrations, which were made with user_id.
 | 
						|
        "user_id": user_profile.id,
 | 
						|
        "realm_uuid": str(user_profile.realm.uuid),
 | 
						|
        "apns_payload": apns_payload,
 | 
						|
        "gcm_payload": gcm_payload,
 | 
						|
        "gcm_options": gcm_options,
 | 
						|
        "android_devices": [device.token for device in android_devices],
 | 
						|
        "apple_devices": [device.token for device in apple_devices],
 | 
						|
    }
 | 
						|
    # Calls zilencer.views.remote_server_notify_push
 | 
						|
    response_data = send_json_to_push_bouncer("POST", "push/notify", post_data)
 | 
						|
    assert isinstance(response_data["total_android_devices"], int)
 | 
						|
    assert isinstance(response_data["total_apple_devices"], int)
 | 
						|
 | 
						|
    assert isinstance(response_data["deleted_devices"], dict)
 | 
						|
    assert isinstance(response_data["deleted_devices"]["android_devices"], list)
 | 
						|
    assert isinstance(response_data["deleted_devices"]["apple_devices"], list)
 | 
						|
    android_deleted_devices = response_data["deleted_devices"]["android_devices"]
 | 
						|
    apple_deleted_devices = response_data["deleted_devices"]["apple_devices"]
 | 
						|
    if android_deleted_devices or apple_deleted_devices:
 | 
						|
        logger.info(
 | 
						|
            "Deleting push tokens based on response from bouncer: Android: %s, Apple: %s",
 | 
						|
            sorted(android_deleted_devices),
 | 
						|
            sorted(apple_deleted_devices),
 | 
						|
        )
 | 
						|
        PushDeviceToken.objects.filter(
 | 
						|
            kind=PushDeviceToken.GCM, token__in=android_deleted_devices
 | 
						|
        ).delete()
 | 
						|
        PushDeviceToken.objects.filter(
 | 
						|
            kind=PushDeviceToken.APNS, token__in=apple_deleted_devices
 | 
						|
        ).delete()
 | 
						|
 | 
						|
    total_android_devices, total_apple_devices = (
 | 
						|
        response_data["total_android_devices"],
 | 
						|
        response_data["total_apple_devices"],
 | 
						|
    )
 | 
						|
    do_increment_logging_stat(
 | 
						|
        user_profile.realm,
 | 
						|
        COUNT_STATS["mobile_pushes_sent::day"],
 | 
						|
        None,
 | 
						|
        timezone_now(),
 | 
						|
        increment=total_android_devices + total_apple_devices,
 | 
						|
    )
 | 
						|
 | 
						|
    remote_realm_dict = response_data.get("realm")
 | 
						|
    if remote_realm_dict is not None:
 | 
						|
        # The server may have updated our understanding of whether
 | 
						|
        # push notifications will work.
 | 
						|
        assert isinstance(remote_realm_dict, dict)
 | 
						|
        do_set_realm_property(
 | 
						|
            user_profile.realm,
 | 
						|
            "push_notifications_enabled",
 | 
						|
            remote_realm_dict["can_push"],
 | 
						|
            acting_user=None,
 | 
						|
        )
 | 
						|
        do_set_push_notifications_enabled_end_timestamp(
 | 
						|
            user_profile.realm, remote_realm_dict["expected_end_timestamp"], acting_user=None
 | 
						|
        )
 | 
						|
 | 
						|
    logger.info(
 | 
						|
        "Sent mobile push notifications for user %s through bouncer: %s via FCM devices, %s via APNs devices",
 | 
						|
        user_profile.id,
 | 
						|
        total_android_devices,
 | 
						|
        total_apple_devices,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Managing device tokens
 | 
						|
#
 | 
						|
 | 
						|
 | 
						|
def add_push_device_token(
 | 
						|
    user_profile: UserProfile, token_str: str, kind: int, ios_app_id: Optional[str] = None
 | 
						|
) -> PushDeviceToken:
 | 
						|
    logger.info(
 | 
						|
        "Registering push device: %d %r %d %r", user_profile.id, token_str, kind, ios_app_id
 | 
						|
    )
 | 
						|
 | 
						|
    # Regardless of whether we're using the push notifications
 | 
						|
    # bouncer, we want to store a PushDeviceToken record locally.
 | 
						|
    # These can be used to discern whether the user has any mobile
 | 
						|
    # devices configured, and is also where we will store encryption
 | 
						|
    # keys for mobile push notifications.
 | 
						|
    try:
 | 
						|
        with transaction.atomic():
 | 
						|
            token = PushDeviceToken.objects.create(
 | 
						|
                user_id=user_profile.id,
 | 
						|
                kind=kind,
 | 
						|
                token=token_str,
 | 
						|
                ios_app_id=ios_app_id,
 | 
						|
                # last_updated is to be renamed to date_created.
 | 
						|
                last_updated=timezone_now(),
 | 
						|
            )
 | 
						|
    except IntegrityError:
 | 
						|
        token = PushDeviceToken.objects.get(
 | 
						|
            user_id=user_profile.id,
 | 
						|
            kind=kind,
 | 
						|
            token=token_str,
 | 
						|
        )
 | 
						|
 | 
						|
    # If we're sending things to the push notification bouncer
 | 
						|
    # register this user with them here
 | 
						|
    if uses_notification_bouncer():
 | 
						|
        post_data = {
 | 
						|
            "server_uuid": settings.ZULIP_ORG_ID,
 | 
						|
            "user_uuid": str(user_profile.uuid),
 | 
						|
            "realm_uuid": str(user_profile.realm.uuid),
 | 
						|
            # user_id is sent so that the bouncer can delete any pre-existing registrations
 | 
						|
            # for this user+device to avoid duplication upon adding the uuid registration.
 | 
						|
            "user_id": str(user_profile.id),
 | 
						|
            "token": token_str,
 | 
						|
            "token_kind": kind,
 | 
						|
        }
 | 
						|
 | 
						|
        if kind == PushDeviceToken.APNS:
 | 
						|
            post_data["ios_app_id"] = ios_app_id
 | 
						|
 | 
						|
        logger.info("Sending new push device to bouncer: %r", post_data)
 | 
						|
        # Calls zilencer.views.register_remote_push_device
 | 
						|
        send_to_push_bouncer("POST", "push/register", post_data)
 | 
						|
 | 
						|
    return token
 | 
						|
 | 
						|
 | 
						|
def remove_push_device_token(user_profile: UserProfile, token_str: str, kind: int) -> None:
 | 
						|
    try:
 | 
						|
        token = PushDeviceToken.objects.get(token=token_str, kind=kind, user=user_profile)
 | 
						|
        token.delete()
 | 
						|
    except PushDeviceToken.DoesNotExist:
 | 
						|
        # If we are using bouncer, don't raise the exception. It will
 | 
						|
        # be raised by the code below eventually. This is important
 | 
						|
        # during the transition period after upgrading to a version
 | 
						|
        # that stores local PushDeviceToken objects even when using
 | 
						|
        # the push notifications bouncer.
 | 
						|
        if not uses_notification_bouncer():
 | 
						|
            raise JsonableError(_("Token does not exist"))
 | 
						|
 | 
						|
    # If we're sending things to the push notification bouncer
 | 
						|
    # unregister this user with them here
 | 
						|
    if uses_notification_bouncer():
 | 
						|
        # TODO: Make this a remove item
 | 
						|
        post_data = {
 | 
						|
            "server_uuid": settings.ZULIP_ORG_ID,
 | 
						|
            # We don't know here if the token was registered with uuid
 | 
						|
            # or using the legacy id format, so we need to send both.
 | 
						|
            "user_uuid": str(user_profile.uuid),
 | 
						|
            "user_id": user_profile.id,
 | 
						|
            "token": token_str,
 | 
						|
            "token_kind": kind,
 | 
						|
        }
 | 
						|
        # Calls zilencer.views.unregister_remote_push_device
 | 
						|
        send_to_push_bouncer("POST", "push/unregister", post_data)
 | 
						|
 | 
						|
 | 
						|
def clear_push_device_tokens(user_profile_id: int) -> None:
 | 
						|
    # Deletes all of a user's PushDeviceTokens.
 | 
						|
    if uses_notification_bouncer():
 | 
						|
        user_uuid = str(get_user_profile_by_id(user_profile_id).uuid)
 | 
						|
        post_data = {
 | 
						|
            "server_uuid": settings.ZULIP_ORG_ID,
 | 
						|
            # We want to clear all registered token, and they may have
 | 
						|
            # been registered with either uuid or id.
 | 
						|
            "user_uuid": user_uuid,
 | 
						|
            "user_id": user_profile_id,
 | 
						|
        }
 | 
						|
        send_to_push_bouncer("POST", "push/unregister/all", post_data)
 | 
						|
        return
 | 
						|
 | 
						|
    PushDeviceToken.objects.filter(user_id=user_profile_id).delete()
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Push notifications in general
 | 
						|
#
 | 
						|
 | 
						|
 | 
						|
def push_notifications_configured() -> bool:
 | 
						|
    """True just if this server has configured a way to send push notifications."""
 | 
						|
    if (
 | 
						|
        uses_notification_bouncer()
 | 
						|
        and settings.ZULIP_ORG_KEY is not None
 | 
						|
        and settings.ZULIP_ORG_ID is not None
 | 
						|
    ):  # nocoverage
 | 
						|
        # We have the needed configuration to send push notifications through
 | 
						|
        # the bouncer.  Better yet would be to confirm that this config actually
 | 
						|
        # works -- e.g., that we have ever successfully sent to the bouncer --
 | 
						|
        # but this is a good start.
 | 
						|
        return True
 | 
						|
    if settings.DEVELOPMENT and (has_apns_credentials() or has_gcm_credentials()):  # nocoverage
 | 
						|
        # Since much of the notifications logic is platform-specific, the mobile
 | 
						|
        # developers often work on just one platform at a time, so we should
 | 
						|
        # only require one to be configured.
 | 
						|
        return True
 | 
						|
    elif has_apns_credentials() and has_gcm_credentials():  # nocoverage
 | 
						|
        # We have the needed configuration to send through APNs and GCM directly
 | 
						|
        # (i.e., we are the bouncer, presumably.)  Again, assume it actually works.
 | 
						|
        return True
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def initialize_push_notifications() -> None:
 | 
						|
    """Called during startup of the push notifications worker to check
 | 
						|
    whether we expect mobile push notifications to work on this server
 | 
						|
    and update state accordingly.
 | 
						|
    """
 | 
						|
 | 
						|
    if sends_notifications_directly():
 | 
						|
        # This server sends push notifications directly. Make sure we
 | 
						|
        # are set to report to clients that push notifications are
 | 
						|
        # enabled.
 | 
						|
        for realm in Realm.objects.filter(push_notifications_enabled=False):
 | 
						|
            do_set_realm_property(realm, "push_notifications_enabled", True, acting_user=None)
 | 
						|
            do_set_push_notifications_enabled_end_timestamp(realm, None, acting_user=None)
 | 
						|
        return
 | 
						|
 | 
						|
    if not push_notifications_configured():
 | 
						|
        for realm in Realm.objects.filter(push_notifications_enabled=True):
 | 
						|
            do_set_realm_property(realm, "push_notifications_enabled", False, acting_user=None)
 | 
						|
            do_set_push_notifications_enabled_end_timestamp(realm, None, acting_user=None)
 | 
						|
        if settings.DEVELOPMENT and not settings.TEST_SUITE:
 | 
						|
            # Avoid unnecessary spam on development environment startup
 | 
						|
            return  # nocoverage
 | 
						|
        logger.warning(
 | 
						|
            "Mobile push notifications are not configured.\n  "
 | 
						|
            "See https://zulip.readthedocs.io/en/latest/"
 | 
						|
            "production/mobile-push-notifications.html"
 | 
						|
        )
 | 
						|
        return
 | 
						|
 | 
						|
    if uses_notification_bouncer():
 | 
						|
        # If we're using the notification bouncer, check if we can
 | 
						|
        # actually send push notifications, and update our
 | 
						|
        # understanding of that state for each realm accordingly.
 | 
						|
        send_server_data_to_push_bouncer(consider_usage_statistics=False)
 | 
						|
        return
 | 
						|
 | 
						|
    logger.warning(  # nocoverage
 | 
						|
        "Mobile push notifications are not fully configured.\n  "
 | 
						|
        "See https://zulip.readthedocs.io/en/latest/production/mobile-push-notifications.html"
 | 
						|
    )
 | 
						|
    for realm in Realm.objects.filter(push_notifications_enabled=True):  # nocoverage
 | 
						|
        do_set_realm_property(realm, "push_notifications_enabled", False, acting_user=None)
 | 
						|
        do_set_push_notifications_enabled_end_timestamp(realm, None, acting_user=None)
 | 
						|
 | 
						|
 | 
						|
def get_mobile_push_content(rendered_content: str) -> str:
 | 
						|
    def get_text(elem: lxml.html.HtmlElement) -> str:
 | 
						|
        # Convert default emojis to their Unicode equivalent.
 | 
						|
        classes = elem.get("class", "")
 | 
						|
        if "emoji" in classes:
 | 
						|
            match = re.search(r"emoji-(?P<emoji_code>\S+)", classes)
 | 
						|
            if match:
 | 
						|
                emoji_code = match.group("emoji_code")
 | 
						|
                return hex_codepoint_to_emoji(emoji_code)
 | 
						|
        # Handles realm emojis, avatars etc.
 | 
						|
        if elem.tag == "img":
 | 
						|
            return elem.get("alt", "")
 | 
						|
        if elem.tag == "blockquote":
 | 
						|
            return ""  # To avoid empty line before quote text
 | 
						|
        return elem.text or ""
 | 
						|
 | 
						|
    def format_as_quote(quote_text: str) -> str:
 | 
						|
        return "".join(
 | 
						|
            f"> {line}\n" for line in quote_text.splitlines() if line  # Remove empty lines
 | 
						|
        )
 | 
						|
 | 
						|
    def render_olist(ol: lxml.html.HtmlElement) -> str:
 | 
						|
        items = []
 | 
						|
        counter = int(ol.get("start")) if ol.get("start") else 1
 | 
						|
        nested_levels = sum(1 for ancestor in ol.iterancestors("ol"))
 | 
						|
        indent = ("\n" + "  " * nested_levels) if nested_levels else ""
 | 
						|
 | 
						|
        for li in ol:
 | 
						|
            items.append(indent + str(counter) + ". " + process(li).strip())
 | 
						|
            counter += 1
 | 
						|
 | 
						|
        return "\n".join(items)
 | 
						|
 | 
						|
    def render_spoiler(elem: lxml.html.HtmlElement) -> str:
 | 
						|
        header = elem.find_class("spoiler-header")[0]
 | 
						|
        text = process(header).strip()
 | 
						|
        if len(text) == 0:
 | 
						|
            return "(…)\n"
 | 
						|
        return f"{text} (…)\n"
 | 
						|
 | 
						|
    def process(elem: lxml.html.HtmlElement) -> str:
 | 
						|
        plain_text = ""
 | 
						|
        if elem.tag == "ol":
 | 
						|
            plain_text = render_olist(elem)
 | 
						|
        elif "spoiler-block" in elem.get("class", ""):
 | 
						|
            plain_text += render_spoiler(elem)
 | 
						|
        else:
 | 
						|
            plain_text = get_text(elem)
 | 
						|
            sub_text = ""
 | 
						|
            for child in elem:
 | 
						|
                sub_text += process(child)
 | 
						|
            if elem.tag == "blockquote":
 | 
						|
                sub_text = format_as_quote(sub_text)
 | 
						|
            plain_text += sub_text
 | 
						|
            plain_text += elem.tail or ""
 | 
						|
        return plain_text
 | 
						|
 | 
						|
    if settings.PUSH_NOTIFICATION_REDACT_CONTENT:
 | 
						|
        return (
 | 
						|
            "*"
 | 
						|
            + _(
 | 
						|
                "This organization has disabled including message content in mobile push notifications"
 | 
						|
            )
 | 
						|
            + "*"
 | 
						|
        )
 | 
						|
 | 
						|
    elem = lxml.html.fragment_fromstring(rendered_content, create_parent=True)
 | 
						|
    change_katex_to_raw_latex(elem)
 | 
						|
    plain_text = process(elem)
 | 
						|
    return plain_text
 | 
						|
 | 
						|
 | 
						|
def truncate_content(content: str) -> Tuple[str, bool]:
 | 
						|
    # We use Unicode character 'HORIZONTAL ELLIPSIS' (U+2026) instead
 | 
						|
    # of three dots as this saves two extra characters for textual
 | 
						|
    # content. This function will need to be updated to handle Unicode
 | 
						|
    # combining characters and tags when we start supporting themself.
 | 
						|
    if len(content) <= 200:
 | 
						|
        return content, False
 | 
						|
    return content[:200] + "…", True
 | 
						|
 | 
						|
 | 
						|
def get_base_payload(user_profile: UserProfile) -> Dict[str, Any]:
 | 
						|
    """Common fields for all notification payloads."""
 | 
						|
    data: Dict[str, Any] = {}
 | 
						|
 | 
						|
    # These will let the app support logging into multiple realms and servers.
 | 
						|
    data["server"] = settings.EXTERNAL_HOST
 | 
						|
    data["realm_id"] = user_profile.realm.id
 | 
						|
    data["realm_uri"] = user_profile.realm.uri
 | 
						|
    data["realm_name"] = user_profile.realm.name
 | 
						|
    data["user_id"] = user_profile.id
 | 
						|
 | 
						|
    return data
 | 
						|
 | 
						|
 | 
						|
def get_message_payload(
 | 
						|
    user_profile: UserProfile,
 | 
						|
    message: Message,
 | 
						|
    mentioned_user_group_id: Optional[int] = None,
 | 
						|
    mentioned_user_group_name: Optional[str] = None,
 | 
						|
    can_access_sender: bool = True,
 | 
						|
) -> Dict[str, Any]:
 | 
						|
    """Common fields for `message` payloads, for all platforms."""
 | 
						|
    data = get_base_payload(user_profile)
 | 
						|
 | 
						|
    # `sender_id` is preferred, but some existing versions use `sender_email`.
 | 
						|
    data["sender_id"] = message.sender.id
 | 
						|
    if not can_access_sender:
 | 
						|
        # A guest user can only receive a stream message from an
 | 
						|
        # inaccessible user as we allow unsubscribed users to send
 | 
						|
        # messages to streams. For direct messages, the guest gains
 | 
						|
        # access to the user if they where previously inaccessible.
 | 
						|
        data["sender_email"] = Address(
 | 
						|
            username=f"user{message.sender.id}", domain=get_fake_email_domain(message.realm.host)
 | 
						|
        ).addr_spec
 | 
						|
    else:
 | 
						|
        data["sender_email"] = message.sender.email
 | 
						|
 | 
						|
    data["time"] = datetime_to_timestamp(message.date_sent)
 | 
						|
    if mentioned_user_group_id is not None:
 | 
						|
        assert mentioned_user_group_name is not None
 | 
						|
        data["mentioned_user_group_id"] = mentioned_user_group_id
 | 
						|
        data["mentioned_user_group_name"] = mentioned_user_group_name
 | 
						|
 | 
						|
    if message.recipient.type == Recipient.STREAM:
 | 
						|
        data["recipient_type"] = "stream"
 | 
						|
        data["stream"] = get_message_stream_name_from_database(message)
 | 
						|
        data["stream_id"] = message.recipient.type_id
 | 
						|
        data["topic"] = message.topic_name()
 | 
						|
    elif message.recipient.type == Recipient.HUDDLE:
 | 
						|
        data["recipient_type"] = "private"
 | 
						|
        data["pm_users"] = huddle_users(message.recipient.id)
 | 
						|
    else:  # Recipient.PERSONAL
 | 
						|
        data["recipient_type"] = "private"
 | 
						|
 | 
						|
    return data
 | 
						|
 | 
						|
 | 
						|
def get_apns_alert_title(message: Message) -> str:
 | 
						|
    """
 | 
						|
    On an iOS notification, this is the first bolded line.
 | 
						|
    """
 | 
						|
    if message.recipient.type == Recipient.HUDDLE:
 | 
						|
        recipients = get_display_recipient(message.recipient)
 | 
						|
        assert isinstance(recipients, list)
 | 
						|
        return ", ".join(sorted(r["full_name"] for r in recipients))
 | 
						|
    elif message.is_stream_message():
 | 
						|
        stream_name = get_message_stream_name_from_database(message)
 | 
						|
        return f"#{stream_name} > {message.topic_name()}"
 | 
						|
    # For 1:1 direct messages, we just show the sender name.
 | 
						|
    return message.sender.full_name
 | 
						|
 | 
						|
 | 
						|
def get_apns_alert_subtitle(
 | 
						|
    message: Message,
 | 
						|
    trigger: str,
 | 
						|
    user_profile: UserProfile,
 | 
						|
    mentioned_user_group_name: Optional[str] = None,
 | 
						|
    can_access_sender: bool = True,
 | 
						|
) -> str:
 | 
						|
    """
 | 
						|
    On an iOS notification, this is the second bolded line.
 | 
						|
    """
 | 
						|
    sender_name = message.sender.full_name
 | 
						|
    if not can_access_sender:
 | 
						|
        # A guest user can only receive a stream message from an
 | 
						|
        # inaccessible user as we allow unsubscribed users to send
 | 
						|
        # messages to streams. For direct messages, the guest gains
 | 
						|
        # access to the user if they where previously inaccessible.
 | 
						|
        sender_name = str(UserProfile.INACCESSIBLE_USER_NAME)
 | 
						|
 | 
						|
    if trigger == NotificationTriggers.MENTION:
 | 
						|
        if mentioned_user_group_name is not None:
 | 
						|
            return _("{full_name} mentioned @{user_group_name}:").format(
 | 
						|
                full_name=sender_name, user_group_name=mentioned_user_group_name
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            return _("{full_name} mentioned you:").format(full_name=sender_name)
 | 
						|
    elif trigger in (
 | 
						|
        NotificationTriggers.TOPIC_WILDCARD_MENTION_IN_FOLLOWED_TOPIC,
 | 
						|
        NotificationTriggers.STREAM_WILDCARD_MENTION_IN_FOLLOWED_TOPIC,
 | 
						|
        NotificationTriggers.TOPIC_WILDCARD_MENTION,
 | 
						|
        NotificationTriggers.STREAM_WILDCARD_MENTION,
 | 
						|
    ):
 | 
						|
        return _("{full_name} mentioned everyone:").format(full_name=sender_name)
 | 
						|
    elif message.recipient.type == Recipient.PERSONAL:
 | 
						|
        return ""
 | 
						|
    # For group direct messages, or regular messages to a stream,
 | 
						|
    # just use a colon to indicate this is the sender.
 | 
						|
    return sender_name + ":"
 | 
						|
 | 
						|
 | 
						|
def get_apns_badge_count(
 | 
						|
    user_profile: UserProfile, read_messages_ids: Optional[Sequence[int]] = []
 | 
						|
) -> int:
 | 
						|
    # NOTE: We have temporarily set get_apns_badge_count to always
 | 
						|
    # return 0 until we can debug a likely mobile app side issue with
 | 
						|
    # handling notifications while the app is open.
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
def get_apns_badge_count_future(
 | 
						|
    user_profile: UserProfile, read_messages_ids: Optional[Sequence[int]] = []
 | 
						|
) -> int:
 | 
						|
    # Future implementation of get_apns_badge_count; unused but
 | 
						|
    # we expect to use this once we resolve client-side bugs.
 | 
						|
    return (
 | 
						|
        UserMessage.objects.filter(user_profile=user_profile)
 | 
						|
        .extra(where=[UserMessage.where_active_push_notification()])
 | 
						|
        .exclude(
 | 
						|
            # If we've just marked some messages as read, they're still
 | 
						|
            # marked as having active notifications; we'll clear that flag
 | 
						|
            # only after we've sent that update to the devices.  So we need
 | 
						|
            # to exclude them explicitly from the count.
 | 
						|
            message_id__in=read_messages_ids
 | 
						|
        )
 | 
						|
        .count()
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def get_message_payload_apns(
 | 
						|
    user_profile: UserProfile,
 | 
						|
    message: Message,
 | 
						|
    trigger: str,
 | 
						|
    mentioned_user_group_id: Optional[int] = None,
 | 
						|
    mentioned_user_group_name: Optional[str] = None,
 | 
						|
    can_access_sender: bool = True,
 | 
						|
) -> Dict[str, Any]:
 | 
						|
    """A `message` payload for iOS, via APNs."""
 | 
						|
    zulip_data = get_message_payload(
 | 
						|
        user_profile, message, mentioned_user_group_id, mentioned_user_group_name, can_access_sender
 | 
						|
    )
 | 
						|
    zulip_data.update(
 | 
						|
        message_ids=[message.id],
 | 
						|
    )
 | 
						|
 | 
						|
    assert message.rendered_content is not None
 | 
						|
    with override_language(user_profile.default_language):
 | 
						|
        content, _ = truncate_content(get_mobile_push_content(message.rendered_content))
 | 
						|
        apns_data = {
 | 
						|
            "alert": {
 | 
						|
                "title": get_apns_alert_title(message),
 | 
						|
                "subtitle": get_apns_alert_subtitle(
 | 
						|
                    message, trigger, user_profile, mentioned_user_group_name, can_access_sender
 | 
						|
                ),
 | 
						|
                "body": content,
 | 
						|
            },
 | 
						|
            "sound": "default",
 | 
						|
            "badge": get_apns_badge_count(user_profile),
 | 
						|
            "custom": {"zulip": zulip_data},
 | 
						|
        }
 | 
						|
    return apns_data
 | 
						|
 | 
						|
 | 
						|
def get_message_payload_gcm(
 | 
						|
    user_profile: UserProfile,
 | 
						|
    message: Message,
 | 
						|
    mentioned_user_group_id: Optional[int] = None,
 | 
						|
    mentioned_user_group_name: Optional[str] = None,
 | 
						|
    can_access_sender: bool = True,
 | 
						|
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
 | 
						|
    """A `message` payload + options, for Android via GCM/FCM."""
 | 
						|
    data = get_message_payload(
 | 
						|
        user_profile, message, mentioned_user_group_id, mentioned_user_group_name, can_access_sender
 | 
						|
    )
 | 
						|
 | 
						|
    if not can_access_sender:
 | 
						|
        # A guest user can only receive a stream message from an
 | 
						|
        # inaccessible user as we allow unsubscribed users to send
 | 
						|
        # messages to streams. For direct messages, the guest gains
 | 
						|
        # access to the user if they where previously inaccessible.
 | 
						|
        sender_avatar_url = get_avatar_for_inaccessible_user()
 | 
						|
        sender_name = str(UserProfile.INACCESSIBLE_USER_NAME)
 | 
						|
    else:
 | 
						|
        sender_avatar_url = absolute_avatar_url(message.sender)
 | 
						|
        sender_name = message.sender.full_name
 | 
						|
 | 
						|
    assert message.rendered_content is not None
 | 
						|
    with override_language(user_profile.default_language):
 | 
						|
        content, truncated = truncate_content(get_mobile_push_content(message.rendered_content))
 | 
						|
        data.update(
 | 
						|
            event="message",
 | 
						|
            zulip_message_id=message.id,  # message_id is reserved for CCS
 | 
						|
            content=content,
 | 
						|
            content_truncated=truncated,
 | 
						|
            sender_full_name=sender_name,
 | 
						|
            sender_avatar_url=sender_avatar_url,
 | 
						|
        )
 | 
						|
    gcm_options = {"priority": "high"}
 | 
						|
    return data, gcm_options
 | 
						|
 | 
						|
 | 
						|
def get_remove_payload_gcm(
 | 
						|
    user_profile: UserProfile,
 | 
						|
    message_ids: List[int],
 | 
						|
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
 | 
						|
    """A `remove` payload + options, for Android via GCM/FCM."""
 | 
						|
    gcm_payload = get_base_payload(user_profile)
 | 
						|
    gcm_payload.update(
 | 
						|
        event="remove",
 | 
						|
        zulip_message_ids=",".join(str(id) for id in message_ids),
 | 
						|
        # Older clients (all clients older than 2019-02-13) look only at
 | 
						|
        # `zulip_message_id` and ignore `zulip_message_ids`.  Do our best.
 | 
						|
        zulip_message_id=message_ids[0],
 | 
						|
    )
 | 
						|
    gcm_options = {"priority": "normal"}
 | 
						|
    return gcm_payload, gcm_options
 | 
						|
 | 
						|
 | 
						|
def get_remove_payload_apns(user_profile: UserProfile, message_ids: List[int]) -> Dict[str, Any]:
 | 
						|
    zulip_data = get_base_payload(user_profile)
 | 
						|
    zulip_data.update(
 | 
						|
        event="remove",
 | 
						|
        zulip_message_ids=",".join(str(id) for id in message_ids),
 | 
						|
    )
 | 
						|
    apns_data = {
 | 
						|
        "badge": get_apns_badge_count(user_profile, message_ids),
 | 
						|
        "custom": {"zulip": zulip_data},
 | 
						|
    }
 | 
						|
    return apns_data
 | 
						|
 | 
						|
 | 
						|
def handle_remove_push_notification(user_profile_id: int, message_ids: List[int]) -> None:
 | 
						|
    """This should be called when a message that previously had a
 | 
						|
    mobile push notification executed is read.  This triggers a push to the
 | 
						|
    mobile app, when the message is read on the server, to remove the
 | 
						|
    message from the notification.
 | 
						|
    """
 | 
						|
    if not push_notifications_configured():
 | 
						|
        return
 | 
						|
 | 
						|
    user_profile = get_user_profile_by_id(user_profile_id)
 | 
						|
 | 
						|
    # We may no longer have access to the message here; for example,
 | 
						|
    # the user (1) got a message, (2) read the message in the web UI,
 | 
						|
    # and then (3) it was deleted.  When trying to send the push
 | 
						|
    # notification for (2), after (3) has happened, there is no
 | 
						|
    # message to fetch -- but we nonetheless want to remove the mobile
 | 
						|
    # notification.  Because of this, verification of access to
 | 
						|
    # the messages is skipped here.
 | 
						|
    # Because of this, no access to the Message objects should be
 | 
						|
    # done; they are treated as a list of opaque ints.
 | 
						|
 | 
						|
    # APNs has a 4KB limit on the maximum size of messages, which
 | 
						|
    # translated to several hundred message IDs in one of these
 | 
						|
    # notifications. In rare cases, it's possible for someone to mark
 | 
						|
    # thousands of push notification eligible messages as read at
 | 
						|
    # once. We could handle this situation with a loop, but we choose
 | 
						|
    # to truncate instead to avoid extra network traffic, because it's
 | 
						|
    # very likely the user has manually cleared the notifications in
 | 
						|
    # their mobile device's UI anyway.
 | 
						|
    #
 | 
						|
    # When truncating, we keep only the newest N messages in this
 | 
						|
    # remove event. This is optimal because older messages are the
 | 
						|
    # ones most likely to have already been manually cleared at some
 | 
						|
    # point in the past.
 | 
						|
    #
 | 
						|
    # We choose 200 here because a 10-digit message ID plus a comma and
 | 
						|
    # space consume 12 bytes, and 12 x 200 = 2400 bytes is still well
 | 
						|
    # below the 4KB limit (leaving plenty of space for metadata).
 | 
						|
    MAX_APNS_MESSAGE_IDS = 200
 | 
						|
    truncated_message_ids = sorted(message_ids)[-MAX_APNS_MESSAGE_IDS:]
 | 
						|
    gcm_payload, gcm_options = get_remove_payload_gcm(user_profile, truncated_message_ids)
 | 
						|
    apns_payload = get_remove_payload_apns(user_profile, truncated_message_ids)
 | 
						|
 | 
						|
    android_devices = list(
 | 
						|
        PushDeviceToken.objects.filter(user=user_profile, kind=PushDeviceToken.GCM).order_by("id")
 | 
						|
    )
 | 
						|
    apple_devices = list(
 | 
						|
        PushDeviceToken.objects.filter(user=user_profile, kind=PushDeviceToken.APNS).order_by("id")
 | 
						|
    )
 | 
						|
    if uses_notification_bouncer():
 | 
						|
        send_notifications_to_bouncer(
 | 
						|
            user_profile, apns_payload, gcm_payload, gcm_options, android_devices, apple_devices
 | 
						|
        )
 | 
						|
    else:
 | 
						|
        user_identity = UserPushIdentityCompat(user_id=user_profile_id)
 | 
						|
 | 
						|
        android_successfully_sent_count = send_android_push_notification(
 | 
						|
            user_identity, android_devices, gcm_payload, gcm_options
 | 
						|
        )
 | 
						|
        apple_successfully_sent_count = send_apple_push_notification(
 | 
						|
            user_identity, apple_devices, apns_payload
 | 
						|
        )
 | 
						|
 | 
						|
        do_increment_logging_stat(
 | 
						|
            user_profile.realm,
 | 
						|
            COUNT_STATS["mobile_pushes_sent::day"],
 | 
						|
            None,
 | 
						|
            timezone_now(),
 | 
						|
            increment=android_successfully_sent_count + apple_successfully_sent_count,
 | 
						|
        )
 | 
						|
 | 
						|
    # We intentionally use the non-truncated message_ids here.  We are
 | 
						|
    # assuming in this very rare case that the user has manually
 | 
						|
    # dismissed these notifications on the device side, and the server
 | 
						|
    # should no longer track them as outstanding notifications.
 | 
						|
    with transaction.atomic(savepoint=False):
 | 
						|
        UserMessage.select_for_update_query().filter(
 | 
						|
            user_profile_id=user_profile_id,
 | 
						|
            message_id__in=message_ids,
 | 
						|
        ).update(flags=F("flags").bitand(~UserMessage.flags.active_mobile_push_notification))
 | 
						|
 | 
						|
 | 
						|
def handle_push_notification(user_profile_id: int, missed_message: Dict[str, Any]) -> None:
 | 
						|
    """
 | 
						|
    missed_message is the event received by the
 | 
						|
    zerver.worker.queue_processors.PushNotificationWorker.consume function.
 | 
						|
    """
 | 
						|
    if not push_notifications_configured():
 | 
						|
        return
 | 
						|
    user_profile = get_user_profile_by_id(user_profile_id)
 | 
						|
 | 
						|
    if user_profile.is_bot:  # nocoverage
 | 
						|
        # We don't expect to reach here for bot users. However, this code exists
 | 
						|
        # to find and throw away any pre-existing events in the queue while
 | 
						|
        # upgrading from versions before our notifiability logic was implemented.
 | 
						|
        # TODO/compatibility: This block can be removed when one can no longer
 | 
						|
        # upgrade from versions <= 4.0 to versions >= 5.0
 | 
						|
        logger.warning(
 | 
						|
            "Send-push-notification event found for bot user %s. Skipping.", user_profile_id
 | 
						|
        )
 | 
						|
        return
 | 
						|
 | 
						|
    if not (
 | 
						|
        user_profile.enable_offline_push_notifications
 | 
						|
        or user_profile.enable_online_push_notifications
 | 
						|
    ):
 | 
						|
        # BUG: Investigate why it's possible to get here.
 | 
						|
        return  # nocoverage
 | 
						|
 | 
						|
    with transaction.atomic(savepoint=False):
 | 
						|
        try:
 | 
						|
            (message, user_message) = access_message(
 | 
						|
                user_profile, missed_message["message_id"], lock_message=True
 | 
						|
            )
 | 
						|
        except JsonableError:
 | 
						|
            if ArchivedMessage.objects.filter(id=missed_message["message_id"]).exists():
 | 
						|
                # If the cause is a race with the message being deleted,
 | 
						|
                # that's normal and we have no need to log an error.
 | 
						|
                return
 | 
						|
            logging.info(
 | 
						|
                "Unexpected message access failure handling push notifications: %s %s",
 | 
						|
                user_profile.id,
 | 
						|
                missed_message["message_id"],
 | 
						|
            )
 | 
						|
            return
 | 
						|
 | 
						|
        if user_message is not None:
 | 
						|
            # If the user has read the message already, don't push-notify.
 | 
						|
            if user_message.flags.read or user_message.flags.active_mobile_push_notification:
 | 
						|
                return
 | 
						|
 | 
						|
            # Otherwise, we mark the message as having an active mobile
 | 
						|
            # push notification, so that we can send revocation messages
 | 
						|
            # later.
 | 
						|
            user_message.flags.active_mobile_push_notification = True
 | 
						|
            user_message.save(update_fields=["flags"])
 | 
						|
        else:
 | 
						|
            # Users should only be getting push notifications into this
 | 
						|
            # queue for messages they haven't received if they're
 | 
						|
            # long-term idle; anything else is likely a bug.
 | 
						|
            if not user_profile.long_term_idle:
 | 
						|
                logger.error(
 | 
						|
                    "Could not find UserMessage with message_id %s and user_id %s",
 | 
						|
                    missed_message["message_id"],
 | 
						|
                    user_profile_id,
 | 
						|
                    exc_info=True,
 | 
						|
                )
 | 
						|
                return
 | 
						|
 | 
						|
    trigger = missed_message["trigger"]
 | 
						|
 | 
						|
    # TODO/compatibility: Translation code for the rename of
 | 
						|
    # `wildcard_mentioned` to `stream_wildcard_mentioned`.
 | 
						|
    # Remove this when one can no longer directly upgrade from 7.x to main.
 | 
						|
    if trigger == "wildcard_mentioned":
 | 
						|
        trigger = NotificationTriggers.STREAM_WILDCARD_MENTION  # nocoverage
 | 
						|
 | 
						|
    # TODO/compatibility: Translation code for the rename of
 | 
						|
    # `followed_topic_wildcard_mentioned` to `stream_wildcard_mentioned_in_followed_topic`.
 | 
						|
    # Remove this when one can no longer directly upgrade from 7.x to main.
 | 
						|
    if trigger == "followed_topic_wildcard_mentioned":
 | 
						|
        trigger = NotificationTriggers.STREAM_WILDCARD_MENTION_IN_FOLLOWED_TOPIC  # nocoverage
 | 
						|
 | 
						|
    # TODO/compatibility: Translation code for the rename of
 | 
						|
    # `private_message` to `direct_message`. Remove this when
 | 
						|
    # one can no longer directly upgrade from 7.x to main.
 | 
						|
    if trigger == "private_message":
 | 
						|
        trigger = NotificationTriggers.DIRECT_MESSAGE  # nocoverage
 | 
						|
 | 
						|
    mentioned_user_group_name = None
 | 
						|
    # mentioned_user_group_id will be None if the user is personally mentioned
 | 
						|
    # regardless whether they are a member of the mentioned user group in the
 | 
						|
    # message or not.
 | 
						|
    mentioned_user_group_id = missed_message.get("mentioned_user_group_id")
 | 
						|
 | 
						|
    if mentioned_user_group_id is not None:
 | 
						|
        user_group = UserGroup.objects.get(id=mentioned_user_group_id, realm=user_profile.realm)
 | 
						|
        mentioned_user_group_name = user_group.name
 | 
						|
 | 
						|
    # Soft reactivate if pushing to a long_term_idle user that is personally mentioned
 | 
						|
    soft_reactivate_if_personal_notification(user_profile, {trigger}, mentioned_user_group_name)
 | 
						|
 | 
						|
    if message.is_stream_message():
 | 
						|
        # This will almost always be True. The corner case where you
 | 
						|
        # can be receiving a message from a user you cannot access
 | 
						|
        # involves your being a guest user whose access is restricted
 | 
						|
        # by a can_access_all_users_group policy, and you can't access
 | 
						|
        # the sender because they are sending a message to a public
 | 
						|
        # stream that you are subscribed to but they are not.
 | 
						|
 | 
						|
        can_access_sender = check_can_access_user(message.sender, user_profile)
 | 
						|
    else:
 | 
						|
        # For private messages, the recipient will gain access
 | 
						|
        # to the sender if they did not had access previously.
 | 
						|
        can_access_sender = True
 | 
						|
 | 
						|
    apns_payload = get_message_payload_apns(
 | 
						|
        user_profile,
 | 
						|
        message,
 | 
						|
        trigger,
 | 
						|
        mentioned_user_group_id,
 | 
						|
        mentioned_user_group_name,
 | 
						|
        can_access_sender,
 | 
						|
    )
 | 
						|
    gcm_payload, gcm_options = get_message_payload_gcm(
 | 
						|
        user_profile, message, mentioned_user_group_id, mentioned_user_group_name, can_access_sender
 | 
						|
    )
 | 
						|
    logger.info("Sending push notifications to mobile clients for user %s", user_profile_id)
 | 
						|
 | 
						|
    android_devices = list(
 | 
						|
        PushDeviceToken.objects.filter(user=user_profile, kind=PushDeviceToken.GCM).order_by("id")
 | 
						|
    )
 | 
						|
 | 
						|
    apple_devices = list(
 | 
						|
        PushDeviceToken.objects.filter(user=user_profile, kind=PushDeviceToken.APNS).order_by("id")
 | 
						|
    )
 | 
						|
    if uses_notification_bouncer():
 | 
						|
        send_notifications_to_bouncer(
 | 
						|
            user_profile, apns_payload, gcm_payload, gcm_options, android_devices, apple_devices
 | 
						|
        )
 | 
						|
        return
 | 
						|
 | 
						|
    logger.info(
 | 
						|
        "Sending mobile push notifications for local user %s: %s via FCM devices, %s via APNs devices",
 | 
						|
        user_profile_id,
 | 
						|
        len(android_devices),
 | 
						|
        len(apple_devices),
 | 
						|
    )
 | 
						|
    user_identity = UserPushIdentityCompat(user_id=user_profile.id)
 | 
						|
 | 
						|
    apple_successfully_sent_count = send_apple_push_notification(
 | 
						|
        user_identity, apple_devices, apns_payload
 | 
						|
    )
 | 
						|
    android_successfully_sent_count = send_android_push_notification(
 | 
						|
        user_identity, android_devices, gcm_payload, gcm_options
 | 
						|
    )
 | 
						|
 | 
						|
    do_increment_logging_stat(
 | 
						|
        user_profile.realm,
 | 
						|
        COUNT_STATS["mobile_pushes_sent::day"],
 | 
						|
        None,
 | 
						|
        timezone_now(),
 | 
						|
        increment=apple_successfully_sent_count + android_successfully_sent_count,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def send_test_push_notification_directly_to_devices(
 | 
						|
    user_identity: UserPushIdentityCompat,
 | 
						|
    devices: Sequence[DeviceToken],
 | 
						|
    base_payload: Dict[str, Any],
 | 
						|
    remote: Optional["RemoteZulipServer"] = None,
 | 
						|
) -> None:
 | 
						|
    payload = copy.deepcopy(base_payload)
 | 
						|
    payload["event"] = "test"
 | 
						|
 | 
						|
    apple_devices = [device for device in devices if device.kind == PushDeviceToken.APNS]
 | 
						|
    android_devices = [device for device in devices if device.kind == PushDeviceToken.GCM]
 | 
						|
    # Let's make the payloads separate objects to make sure mutating to make e.g. Android
 | 
						|
    # adjustments doesn't affect the Apple payload and vice versa.
 | 
						|
    apple_payload = copy.deepcopy(payload)
 | 
						|
    android_payload = copy.deepcopy(payload)
 | 
						|
 | 
						|
    realm_uri = base_payload["realm_uri"]
 | 
						|
    realm_name = base_payload["realm_name"]
 | 
						|
    apns_data = {
 | 
						|
        "alert": {
 | 
						|
            "title": _("Test notification"),
 | 
						|
            "body": _("This is a test notification from {realm_name} ({realm_uri}).").format(
 | 
						|
                realm_name=realm_name, realm_uri=realm_uri
 | 
						|
            ),
 | 
						|
        },
 | 
						|
        "sound": "default",
 | 
						|
        "custom": {"zulip": apple_payload},
 | 
						|
    }
 | 
						|
    send_apple_push_notification(user_identity, apple_devices, apns_data, remote=remote)
 | 
						|
 | 
						|
    android_payload["time"] = datetime_to_timestamp(timezone_now())
 | 
						|
    gcm_options = {"priority": "high"}
 | 
						|
    send_android_push_notification(
 | 
						|
        user_identity, android_devices, android_payload, gcm_options, remote=remote
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def send_test_push_notification(user_profile: UserProfile, devices: List[PushDeviceToken]) -> None:
 | 
						|
    base_payload = get_base_payload(user_profile)
 | 
						|
    if uses_notification_bouncer():
 | 
						|
        for device in devices:
 | 
						|
            post_data = {
 | 
						|
                "user_uuid": str(user_profile.uuid),
 | 
						|
                "user_id": user_profile.id,
 | 
						|
                "token": device.token,
 | 
						|
                "token_kind": device.kind,
 | 
						|
                "base_payload": base_payload,
 | 
						|
            }
 | 
						|
 | 
						|
            logger.info("Sending test push notification to bouncer: %r", post_data)
 | 
						|
            send_json_to_push_bouncer("POST", "push/test_notification", post_data)
 | 
						|
 | 
						|
        return
 | 
						|
 | 
						|
    # This server doesn't need the bouncer, so we send directly to the device.
 | 
						|
    user_identity = UserPushIdentityCompat(
 | 
						|
        user_id=user_profile.id, user_uuid=str(user_profile.uuid)
 | 
						|
    )
 | 
						|
    send_test_push_notification_directly_to_devices(
 | 
						|
        user_identity, devices, base_payload, remote=None
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
class InvalidPushDeviceTokenError(JsonableError):
 | 
						|
    code = ErrorCode.INVALID_PUSH_DEVICE_TOKEN
 | 
						|
 | 
						|
    def __init__(self) -> None:
 | 
						|
        pass
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    @override
 | 
						|
    def msg_format() -> str:
 | 
						|
        return _("Device not recognized")
 | 
						|
 | 
						|
 | 
						|
class InvalidRemotePushDeviceTokenError(JsonableError):
 | 
						|
    code = ErrorCode.INVALID_REMOTE_PUSH_DEVICE_TOKEN
 | 
						|
 | 
						|
    def __init__(self) -> None:
 | 
						|
        pass
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    @override
 | 
						|
    def msg_format() -> str:
 | 
						|
        return _("Device not recognized by the push bouncer")
 |