mirror of
https://github.com/zulip/zulip.git
synced 2025-11-13 10:26:28 +00:00
push_notifs: Set up plumbing for retrying in case of bouncer error.
We add PushNotificationBouncerRetryLaterError as an exception to signal an error occurred when trying to communicate with the bouncer and it should be retried. We use JsonableError as the base class, because this signal will need to work in two roles: 1. When the push notification was being issued by the queue worker PushNotificationsWorker, it will signal to the worker to requeue the event and try again later. 2. The exception will also possibly be raised (this will be added in the next commit) on codepaths coming from a request to an API endpoint (for example to add a token, to users/me/apns_device_token). In that case, it'll be needed to provide a good error to the API user - and basing this exception on JsonableError will allow that.
This commit is contained in:
committed by
Tim Abbott
parent
717e90dfeb
commit
20b30e1503
@@ -23,8 +23,8 @@ from zerver.lib.avatar import absolute_avatar_url
|
|||||||
from zerver.lib.exceptions import JsonableError
|
from zerver.lib.exceptions import JsonableError
|
||||||
from zerver.lib.message import access_message, \
|
from zerver.lib.message import access_message, \
|
||||||
bulk_access_messages_expect_usermessage, huddle_users
|
bulk_access_messages_expect_usermessage, huddle_users
|
||||||
from zerver.lib.queue import retry_event
|
from zerver.lib.remote_server import send_to_push_bouncer, send_json_to_push_bouncer, \
|
||||||
from zerver.lib.remote_server import send_to_push_bouncer, send_json_to_push_bouncer
|
PushNotificationBouncerRetryLaterError
|
||||||
from zerver.lib.timestamp import datetime_to_timestamp
|
from zerver.lib.timestamp import datetime_to_timestamp
|
||||||
from zerver.models import PushDeviceToken, Message, Recipient, \
|
from zerver.models import PushDeviceToken, Message, Recipient, \
|
||||||
UserMessage, UserProfile, \
|
UserMessage, UserProfile, \
|
||||||
@@ -674,10 +674,8 @@ def handle_remove_push_notification(user_profile_id: int, message_ids: List[int]
|
|||||||
gcm_payload,
|
gcm_payload,
|
||||||
gcm_options)
|
gcm_options)
|
||||||
except requests.ConnectionError: # nocoverage
|
except requests.ConnectionError: # nocoverage
|
||||||
def failure_processor(event: Dict[str, Any]) -> None:
|
raise PushNotificationBouncerRetryLaterError(
|
||||||
logger.warning(
|
"ConnectionError while trying to connect to the bouncer")
|
||||||
"Maximum retries exceeded for trigger:%s event:push_notification" % (
|
|
||||||
event['user_profile_id'],))
|
|
||||||
else:
|
else:
|
||||||
android_devices = list(PushDeviceToken.objects.filter(
|
android_devices = list(PushDeviceToken.objects.filter(
|
||||||
user=user_profile, kind=PushDeviceToken.GCM))
|
user=user_profile, kind=PushDeviceToken.GCM))
|
||||||
@@ -751,14 +749,10 @@ def handle_push_notification(user_profile_id: int, missed_message: Dict[str, Any
|
|||||||
apns_payload,
|
apns_payload,
|
||||||
gcm_payload,
|
gcm_payload,
|
||||||
gcm_options)
|
gcm_options)
|
||||||
|
return
|
||||||
except requests.ConnectionError:
|
except requests.ConnectionError:
|
||||||
def failure_processor(event: Dict[str, Any]) -> None:
|
raise PushNotificationBouncerRetryLaterError(
|
||||||
logger.warning(
|
"ConnectionError while trying to connect to the bouncer")
|
||||||
"Maximum retries exceeded for trigger:%s event:push_notification" % (
|
|
||||||
event['user_profile_id'],))
|
|
||||||
retry_event('missedmessage_mobile_notifications', missed_message,
|
|
||||||
failure_processor)
|
|
||||||
return
|
|
||||||
|
|
||||||
android_devices = list(PushDeviceToken.objects.filter(user=user_profile,
|
android_devices = list(PushDeviceToken.objects.filter(user=user_profile,
|
||||||
kind=PushDeviceToken.GCM))
|
kind=PushDeviceToken.GCM))
|
||||||
|
|||||||
@@ -17,6 +17,9 @@ from zerver.models import RealmAuditLog
|
|||||||
class PushNotificationBouncerException(Exception):
|
class PushNotificationBouncerException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class PushNotificationBouncerRetryLaterError(JsonableError):
|
||||||
|
pass
|
||||||
|
|
||||||
def send_to_push_bouncer(method: str,
|
def send_to_push_bouncer(method: str,
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
post_data: Union[str, Dict[str, Any]],
|
post_data: Union[str, Dict[str, Any]],
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ from zerver.lib.push_notifications import (
|
|||||||
send_to_push_bouncer,
|
send_to_push_bouncer,
|
||||||
)
|
)
|
||||||
from zerver.lib.remote_server import send_analytics_to_remote_server, \
|
from zerver.lib.remote_server import send_analytics_to_remote_server, \
|
||||||
build_analytics_data, PushNotificationBouncerException
|
build_analytics_data, PushNotificationBouncerException, PushNotificationBouncerRetryLaterError
|
||||||
from zerver.lib.request import JsonableError
|
from zerver.lib.request import JsonableError
|
||||||
from zerver.lib.test_classes import (
|
from zerver.lib.test_classes import (
|
||||||
TestCase, ZulipTestCase,
|
TestCase, ZulipTestCase,
|
||||||
@@ -684,7 +684,7 @@ class HandlePushNotificationTest(PushNotificationTest):
|
|||||||
(token, "Unregistered"))
|
(token, "Unregistered"))
|
||||||
self.assertEqual(RemotePushDeviceToken.objects.filter(kind=PushDeviceToken.APNS).count(), 0)
|
self.assertEqual(RemotePushDeviceToken.objects.filter(kind=PushDeviceToken.APNS).count(), 0)
|
||||||
|
|
||||||
def test_end_to_end_connection_error(self) -> None:
|
def test_connection_error(self) -> None:
|
||||||
self.setup_apns_tokens()
|
self.setup_apns_tokens()
|
||||||
self.setup_gcm_tokens()
|
self.setup_gcm_tokens()
|
||||||
|
|
||||||
@@ -694,9 +694,6 @@ class HandlePushNotificationTest(PushNotificationTest):
|
|||||||
message=message
|
message=message
|
||||||
)
|
)
|
||||||
|
|
||||||
def retry(queue_name: Any, event: Any, processor: Any) -> None:
|
|
||||||
handle_push_notification(event['user_profile_id'], event)
|
|
||||||
|
|
||||||
missed_message = {
|
missed_message = {
|
||||||
'user_profile_id': self.user_profile.id,
|
'user_profile_id': self.user_profile.id,
|
||||||
'message_id': message.id,
|
'message_id': message.id,
|
||||||
@@ -707,10 +704,7 @@ class HandlePushNotificationTest(PushNotificationTest):
|
|||||||
side_effect=self.bounce_request), \
|
side_effect=self.bounce_request), \
|
||||||
mock.patch('zerver.lib.push_notifications.gcm_client') as mock_gcm, \
|
mock.patch('zerver.lib.push_notifications.gcm_client') as mock_gcm, \
|
||||||
mock.patch('zerver.lib.push_notifications.send_notifications_to_bouncer',
|
mock.patch('zerver.lib.push_notifications.send_notifications_to_bouncer',
|
||||||
side_effect=requests.ConnectionError), \
|
side_effect=requests.ConnectionError):
|
||||||
mock.patch('zerver.lib.queue.queue_json_publish',
|
|
||||||
side_effect=retry) as mock_retry, \
|
|
||||||
mock.patch('zerver.lib.push_notifications.logger.warning') as mock_warn:
|
|
||||||
gcm_devices = [
|
gcm_devices = [
|
||||||
(b64_to_hex(device.token), device.ios_app_id, device.token)
|
(b64_to_hex(device.token), device.ios_app_id, device.token)
|
||||||
for device in RemotePushDeviceToken.objects.filter(
|
for device in RemotePushDeviceToken.objects.filter(
|
||||||
@@ -718,11 +712,8 @@ class HandlePushNotificationTest(PushNotificationTest):
|
|||||||
]
|
]
|
||||||
mock_gcm.json_request.return_value = {
|
mock_gcm.json_request.return_value = {
|
||||||
'success': {gcm_devices[0][2]: message.id}}
|
'success': {gcm_devices[0][2]: message.id}}
|
||||||
handle_push_notification(self.user_profile.id, missed_message)
|
with self.assertRaises(PushNotificationBouncerRetryLaterError):
|
||||||
self.assertEqual(mock_retry.call_count, 3)
|
handle_push_notification(self.user_profile.id, missed_message)
|
||||||
mock_warn.assert_called_with("Maximum retries exceeded for "
|
|
||||||
"trigger:%s event:"
|
|
||||||
"push_notification" % (self.user_profile.id,))
|
|
||||||
|
|
||||||
@mock.patch('zerver.lib.push_notifications.push_notifications_enabled', return_value = True)
|
@mock.patch('zerver.lib.push_notifications.push_notifications_enabled', return_value = True)
|
||||||
def test_disabled_notifications(self, mock_push_notifications: mock.MagicMock) -> None:
|
def test_disabled_notifications(self, mock_push_notifications: mock.MagicMock) -> None:
|
||||||
|
|||||||
@@ -13,11 +13,13 @@ from zerver.lib.email_mirror import RateLimitedRealmMirror
|
|||||||
from zerver.lib.email_mirror_helpers import encode_email_address
|
from zerver.lib.email_mirror_helpers import encode_email_address
|
||||||
from zerver.lib.queue import MAX_REQUEST_RETRIES
|
from zerver.lib.queue import MAX_REQUEST_RETRIES
|
||||||
from zerver.lib.rate_limiter import RateLimiterLockingException, clear_history
|
from zerver.lib.rate_limiter import RateLimiterLockingException, clear_history
|
||||||
|
from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
|
||||||
from zerver.lib.send_email import FromAddress
|
from zerver.lib.send_email import FromAddress
|
||||||
from zerver.lib.test_helpers import simulated_queue_client
|
from zerver.lib.test_helpers import simulated_queue_client
|
||||||
from zerver.lib.test_classes import ZulipTestCase
|
from zerver.lib.test_classes import ZulipTestCase
|
||||||
from zerver.models import get_client, UserActivity, PreregistrationUser, \
|
from zerver.models import get_client, UserActivity, PreregistrationUser, \
|
||||||
get_system_bot, get_stream, get_realm
|
get_system_bot, get_stream, get_realm
|
||||||
|
from zerver.tornado.event_queue import build_offline_notification
|
||||||
from zerver.worker import queue_processors
|
from zerver.worker import queue_processors
|
||||||
from zerver.worker.queue_processors import (
|
from zerver.worker.queue_processors import (
|
||||||
get_active_worker_queues,
|
get_active_worker_queues,
|
||||||
@@ -275,6 +277,62 @@ class WorkerTest(ZulipTestCase):
|
|||||||
{'where art thou, othello?'}
|
{'where art thou, othello?'}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_push_notifications_worker(self) -> None:
|
||||||
|
"""
|
||||||
|
The push notifications system has its own comprehensive test suite,
|
||||||
|
so we can limit ourselves to simple unit testing the queue processor,
|
||||||
|
without going deeper into the system - by mocking the handle_push_notification
|
||||||
|
functions to immediately produce the effect we want, to test its handling by the queue
|
||||||
|
processor.
|
||||||
|
"""
|
||||||
|
fake_client = self.FakeClient()
|
||||||
|
|
||||||
|
def fake_publish(queue_name: str,
|
||||||
|
event: Dict[str, Any],
|
||||||
|
processor: Callable[[Any], None]) -> None:
|
||||||
|
fake_client.queue.append((queue_name, event))
|
||||||
|
|
||||||
|
def generate_new_message_notification() -> Dict[str, Any]:
|
||||||
|
return build_offline_notification(1, 1)
|
||||||
|
|
||||||
|
def generate_remove_notification() -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"type": "remove",
|
||||||
|
"user_profile_id": 1,
|
||||||
|
"message_ids": [1],
|
||||||
|
}
|
||||||
|
|
||||||
|
with simulated_queue_client(lambda: fake_client):
|
||||||
|
worker = queue_processors.PushNotificationsWorker()
|
||||||
|
worker.setup()
|
||||||
|
with patch('zerver.worker.queue_processors.handle_push_notification') as mock_handle_new, \
|
||||||
|
patch('zerver.worker.queue_processors.handle_remove_push_notification') as mock_handle_remove, \
|
||||||
|
patch('zerver.worker.queue_processors.initialize_push_notifications'):
|
||||||
|
event_new = generate_new_message_notification()
|
||||||
|
event_remove = generate_remove_notification()
|
||||||
|
fake_client.queue.append(('missedmessage_mobile_notifications', event_new))
|
||||||
|
fake_client.queue.append(('missedmessage_mobile_notifications', event_remove))
|
||||||
|
|
||||||
|
worker.start()
|
||||||
|
mock_handle_new.assert_called_once_with(event_new['user_profile_id'], event_new)
|
||||||
|
mock_handle_remove.assert_called_once_with(event_remove['user_profile_id'],
|
||||||
|
event_remove['message_ids'])
|
||||||
|
|
||||||
|
with patch('zerver.worker.queue_processors.handle_push_notification',
|
||||||
|
side_effect=PushNotificationBouncerRetryLaterError("test")) as mock_handle_new, \
|
||||||
|
patch('zerver.worker.queue_processors.handle_remove_push_notification',
|
||||||
|
side_effect=PushNotificationBouncerRetryLaterError("test")) as mock_handle_remove, \
|
||||||
|
patch('zerver.worker.queue_processors.initialize_push_notifications'):
|
||||||
|
event_new = generate_new_message_notification()
|
||||||
|
event_remove = generate_remove_notification()
|
||||||
|
fake_client.queue.append(('missedmessage_mobile_notifications', event_new))
|
||||||
|
fake_client.queue.append(('missedmessage_mobile_notifications', event_remove))
|
||||||
|
|
||||||
|
with patch('zerver.lib.queue.queue_json_publish', side_effect=fake_publish):
|
||||||
|
worker.start()
|
||||||
|
self.assertEqual(mock_handle_new.call_count, 1 + MAX_REQUEST_RETRIES)
|
||||||
|
self.assertEqual(mock_handle_remove.call_count, 1 + MAX_REQUEST_RETRIES)
|
||||||
|
|
||||||
@patch('zerver.worker.queue_processors.mirror_email')
|
@patch('zerver.worker.queue_processors.mirror_email')
|
||||||
def test_mirror_worker(self, mock_mirror_email: MagicMock) -> None:
|
def test_mirror_worker(self, mock_mirror_email: MagicMock) -> None:
|
||||||
fake_client = self.FakeClient()
|
fake_client = self.FakeClient()
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ from zulip_bots.lib import ExternalBotHandler, extract_query_without_mention
|
|||||||
from zerver.lib.bot_lib import EmbeddedBotHandler, get_bot_handler, EmbeddedBotQuitException
|
from zerver.lib.bot_lib import EmbeddedBotHandler, get_bot_handler, EmbeddedBotQuitException
|
||||||
from zerver.lib.exceptions import RateLimited
|
from zerver.lib.exceptions import RateLimited
|
||||||
from zerver.lib.export import export_realm_wrapper
|
from zerver.lib.export import export_realm_wrapper
|
||||||
|
from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
@@ -418,14 +419,21 @@ class PushNotificationsWorker(QueueProcessingWorker): # nocoverage
|
|||||||
initialize_push_notifications()
|
initialize_push_notifications()
|
||||||
super().start()
|
super().start()
|
||||||
|
|
||||||
def consume(self, data: Mapping[str, Any]) -> None:
|
def consume(self, event: Dict[str, Any]) -> None:
|
||||||
if data.get("type", "add") == "remove":
|
try:
|
||||||
message_ids = data.get('message_ids')
|
if event.get("type", "add") == "remove":
|
||||||
if message_ids is None: # legacy task across an upgrade
|
message_ids = event.get('message_ids')
|
||||||
message_ids = [data['message_id']]
|
if message_ids is None: # legacy task across an upgrade
|
||||||
handle_remove_push_notification(data['user_profile_id'], message_ids)
|
message_ids = [event['message_id']]
|
||||||
else:
|
handle_remove_push_notification(event['user_profile_id'], message_ids)
|
||||||
handle_push_notification(data['user_profile_id'], data)
|
else:
|
||||||
|
handle_push_notification(event['user_profile_id'], event)
|
||||||
|
except PushNotificationBouncerRetryLaterError:
|
||||||
|
def failure_processor(event: Dict[str, Any]) -> None:
|
||||||
|
logger.warning(
|
||||||
|
"Maximum retries exceeded for trigger:%s event:push_notification" % (
|
||||||
|
event['user_profile_id'],))
|
||||||
|
retry_event('missedmessage_mobile_notifications', event, failure_processor)
|
||||||
|
|
||||||
# We probably could stop running this queue worker at all if ENABLE_FEEDBACK is False
|
# We probably could stop running this queue worker at all if ENABLE_FEEDBACK is False
|
||||||
@assign_queue('feedback_messages')
|
@assign_queue('feedback_messages')
|
||||||
|
|||||||
Reference in New Issue
Block a user