diff --git a/zerver/lib/push_notifications.py b/zerver/lib/push_notifications.py index 9cbe0884cf..36996b18db 100644 --- a/zerver/lib/push_notifications.py +++ b/zerver/lib/push_notifications.py @@ -23,8 +23,8 @@ from zerver.lib.avatar import absolute_avatar_url from zerver.lib.exceptions import JsonableError from zerver.lib.message import access_message, \ bulk_access_messages_expect_usermessage, huddle_users -from zerver.lib.queue import retry_event -from zerver.lib.remote_server import send_to_push_bouncer, send_json_to_push_bouncer +from zerver.lib.remote_server import send_to_push_bouncer, send_json_to_push_bouncer, \ + PushNotificationBouncerRetryLaterError from zerver.lib.timestamp import datetime_to_timestamp from zerver.models import PushDeviceToken, Message, Recipient, \ UserMessage, UserProfile, \ @@ -674,10 +674,8 @@ def handle_remove_push_notification(user_profile_id: int, message_ids: List[int] gcm_payload, gcm_options) except requests.ConnectionError: # nocoverage - def failure_processor(event: Dict[str, Any]) -> None: - logger.warning( - "Maximum retries exceeded for trigger:%s event:push_notification" % ( - event['user_profile_id'],)) + raise PushNotificationBouncerRetryLaterError( + "ConnectionError while trying to connect to the bouncer") else: android_devices = list(PushDeviceToken.objects.filter( user=user_profile, kind=PushDeviceToken.GCM)) @@ -751,14 +749,10 @@ def handle_push_notification(user_profile_id: int, missed_message: Dict[str, Any apns_payload, gcm_payload, gcm_options) + return except requests.ConnectionError: - def failure_processor(event: Dict[str, Any]) -> None: - logger.warning( - "Maximum retries exceeded for trigger:%s event:push_notification" % ( - event['user_profile_id'],)) - retry_event('missedmessage_mobile_notifications', missed_message, - failure_processor) - return + raise PushNotificationBouncerRetryLaterError( + "ConnectionError while trying to connect to the bouncer") android_devices = list(PushDeviceToken.objects.filter(user=user_profile, kind=PushDeviceToken.GCM)) diff --git a/zerver/lib/remote_server.py b/zerver/lib/remote_server.py index 6572a336a8..975b0210cc 100644 --- a/zerver/lib/remote_server.py +++ b/zerver/lib/remote_server.py @@ -17,6 +17,9 @@ from zerver.models import RealmAuditLog class PushNotificationBouncerException(Exception): pass +class PushNotificationBouncerRetryLaterError(JsonableError): + pass + def send_to_push_bouncer(method: str, endpoint: str, post_data: Union[str, Dict[str, Any]], diff --git a/zerver/tests/test_push_notifications.py b/zerver/tests/test_push_notifications.py index 39a08f8613..884989c087 100644 --- a/zerver/tests/test_push_notifications.py +++ b/zerver/tests/test_push_notifications.py @@ -65,7 +65,7 @@ from zerver.lib.push_notifications import ( send_to_push_bouncer, ) from zerver.lib.remote_server import send_analytics_to_remote_server, \ - build_analytics_data, PushNotificationBouncerException + build_analytics_data, PushNotificationBouncerException, PushNotificationBouncerRetryLaterError from zerver.lib.request import JsonableError from zerver.lib.test_classes import ( TestCase, ZulipTestCase, @@ -684,7 +684,7 @@ class HandlePushNotificationTest(PushNotificationTest): (token, "Unregistered")) self.assertEqual(RemotePushDeviceToken.objects.filter(kind=PushDeviceToken.APNS).count(), 0) - def test_end_to_end_connection_error(self) -> None: + def test_connection_error(self) -> None: self.setup_apns_tokens() self.setup_gcm_tokens() @@ -694,9 +694,6 @@ class HandlePushNotificationTest(PushNotificationTest): message=message ) - def retry(queue_name: Any, event: Any, processor: Any) -> None: - handle_push_notification(event['user_profile_id'], event) - missed_message = { 'user_profile_id': self.user_profile.id, 'message_id': message.id, @@ -707,10 +704,7 @@ class HandlePushNotificationTest(PushNotificationTest): side_effect=self.bounce_request), \ mock.patch('zerver.lib.push_notifications.gcm_client') as mock_gcm, \ mock.patch('zerver.lib.push_notifications.send_notifications_to_bouncer', - side_effect=requests.ConnectionError), \ - mock.patch('zerver.lib.queue.queue_json_publish', - side_effect=retry) as mock_retry, \ - mock.patch('zerver.lib.push_notifications.logger.warning') as mock_warn: + side_effect=requests.ConnectionError): gcm_devices = [ (b64_to_hex(device.token), device.ios_app_id, device.token) for device in RemotePushDeviceToken.objects.filter( @@ -718,11 +712,8 @@ class HandlePushNotificationTest(PushNotificationTest): ] mock_gcm.json_request.return_value = { 'success': {gcm_devices[0][2]: message.id}} - handle_push_notification(self.user_profile.id, missed_message) - self.assertEqual(mock_retry.call_count, 3) - mock_warn.assert_called_with("Maximum retries exceeded for " - "trigger:%s event:" - "push_notification" % (self.user_profile.id,)) + with self.assertRaises(PushNotificationBouncerRetryLaterError): + handle_push_notification(self.user_profile.id, missed_message) @mock.patch('zerver.lib.push_notifications.push_notifications_enabled', return_value = True) def test_disabled_notifications(self, mock_push_notifications: mock.MagicMock) -> None: diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index 7cc3e07834..d112015af7 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -13,11 +13,13 @@ from zerver.lib.email_mirror import RateLimitedRealmMirror from zerver.lib.email_mirror_helpers import encode_email_address from zerver.lib.queue import MAX_REQUEST_RETRIES from zerver.lib.rate_limiter import RateLimiterLockingException, clear_history +from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError from zerver.lib.send_email import FromAddress from zerver.lib.test_helpers import simulated_queue_client from zerver.lib.test_classes import ZulipTestCase from zerver.models import get_client, UserActivity, PreregistrationUser, \ get_system_bot, get_stream, get_realm +from zerver.tornado.event_queue import build_offline_notification from zerver.worker import queue_processors from zerver.worker.queue_processors import ( get_active_worker_queues, @@ -275,6 +277,62 @@ class WorkerTest(ZulipTestCase): {'where art thou, othello?'} ) + def test_push_notifications_worker(self) -> None: + """ + The push notifications system has its own comprehensive test suite, + so we can limit ourselves to simple unit testing the queue processor, + without going deeper into the system - by mocking the handle_push_notification + functions to immediately produce the effect we want, to test its handling by the queue + processor. + """ + fake_client = self.FakeClient() + + def fake_publish(queue_name: str, + event: Dict[str, Any], + processor: Callable[[Any], None]) -> None: + fake_client.queue.append((queue_name, event)) + + def generate_new_message_notification() -> Dict[str, Any]: + return build_offline_notification(1, 1) + + def generate_remove_notification() -> Dict[str, Any]: + return { + "type": "remove", + "user_profile_id": 1, + "message_ids": [1], + } + + with simulated_queue_client(lambda: fake_client): + worker = queue_processors.PushNotificationsWorker() + worker.setup() + with patch('zerver.worker.queue_processors.handle_push_notification') as mock_handle_new, \ + patch('zerver.worker.queue_processors.handle_remove_push_notification') as mock_handle_remove, \ + patch('zerver.worker.queue_processors.initialize_push_notifications'): + event_new = generate_new_message_notification() + event_remove = generate_remove_notification() + fake_client.queue.append(('missedmessage_mobile_notifications', event_new)) + fake_client.queue.append(('missedmessage_mobile_notifications', event_remove)) + + worker.start() + mock_handle_new.assert_called_once_with(event_new['user_profile_id'], event_new) + mock_handle_remove.assert_called_once_with(event_remove['user_profile_id'], + event_remove['message_ids']) + + with patch('zerver.worker.queue_processors.handle_push_notification', + side_effect=PushNotificationBouncerRetryLaterError("test")) as mock_handle_new, \ + patch('zerver.worker.queue_processors.handle_remove_push_notification', + side_effect=PushNotificationBouncerRetryLaterError("test")) as mock_handle_remove, \ + patch('zerver.worker.queue_processors.initialize_push_notifications'): + event_new = generate_new_message_notification() + event_remove = generate_remove_notification() + fake_client.queue.append(('missedmessage_mobile_notifications', event_new)) + fake_client.queue.append(('missedmessage_mobile_notifications', event_remove)) + + with patch('zerver.lib.queue.queue_json_publish', side_effect=fake_publish): + worker.start() + self.assertEqual(mock_handle_new.call_count, 1 + MAX_REQUEST_RETRIES) + self.assertEqual(mock_handle_remove.call_count, 1 + MAX_REQUEST_RETRIES) + @patch('zerver.worker.queue_processors.mirror_email') def test_mirror_worker(self, mock_mirror_email: MagicMock) -> None: fake_client = self.FakeClient() diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 7b19561f66..1836909452 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -47,6 +47,7 @@ from zulip_bots.lib import ExternalBotHandler, extract_query_without_mention from zerver.lib.bot_lib import EmbeddedBotHandler, get_bot_handler, EmbeddedBotQuitException from zerver.lib.exceptions import RateLimited from zerver.lib.export import export_realm_wrapper +from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError import os import sys @@ -418,14 +419,21 @@ class PushNotificationsWorker(QueueProcessingWorker): # nocoverage initialize_push_notifications() super().start() - def consume(self, data: Mapping[str, Any]) -> None: - if data.get("type", "add") == "remove": - message_ids = data.get('message_ids') - if message_ids is None: # legacy task across an upgrade - message_ids = [data['message_id']] - handle_remove_push_notification(data['user_profile_id'], message_ids) - else: - handle_push_notification(data['user_profile_id'], data) + def consume(self, event: Dict[str, Any]) -> None: + try: + if event.get("type", "add") == "remove": + message_ids = event.get('message_ids') + if message_ids is None: # legacy task across an upgrade + message_ids = [event['message_id']] + handle_remove_push_notification(event['user_profile_id'], message_ids) + else: + handle_push_notification(event['user_profile_id'], event) + except PushNotificationBouncerRetryLaterError: + def failure_processor(event: Dict[str, Any]) -> None: + logger.warning( + "Maximum retries exceeded for trigger:%s event:push_notification" % ( + event['user_profile_id'],)) + retry_event('missedmessage_mobile_notifications', event, failure_processor) # We probably could stop running this queue worker at all if ENABLE_FEEDBACK is False @assign_queue('feedback_messages')