push_notifs: Improve handling of errors when talking to the bouncer.

We use the plumbing introduced in a previous commit, to now raise
PushNotificationBouncerRetryLaterError in send_to_push_bouncer in case
of issues with talking to the bouncer server. That's a better way of
dealing with the errors than the previous approach of returning a
"failed" boolean, which generally wasn't checked in the code anyway and
did nothing.
The PushNotificationBouncerRetryLaterError exception will be nicely
handled by queue processors to retry sending again, and due to being a
JsonableError, it will also communicate the error to API users.
This commit is contained in:
Mateusz Mandera
2019-12-03 20:19:38 +01:00
committed by Tim Abbott
parent 20b30e1503
commit 7d0444f903
4 changed files with 82 additions and 46 deletions

View File

@@ -15,7 +15,6 @@ from django.db.models import F
from django.utils.timezone import now as timezone_now
from django.utils.translation import ugettext as _
import gcm
import requests
import ujson
from zerver.decorator import statsd_increment
@@ -23,8 +22,7 @@ from zerver.lib.avatar import absolute_avatar_url
from zerver.lib.exceptions import JsonableError
from zerver.lib.message import access_message, \
bulk_access_messages_expect_usermessage, huddle_users
from zerver.lib.remote_server import send_to_push_bouncer, send_json_to_push_bouncer, \
PushNotificationBouncerRetryLaterError
from zerver.lib.remote_server import send_to_push_bouncer, send_json_to_push_bouncer
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.models import PushDeviceToken, Message, Recipient, \
UserMessage, UserProfile, \
@@ -668,14 +666,10 @@ def handle_remove_push_notification(user_profile_id: int, message_ids: List[int]
gcm_payload, gcm_options = get_remove_payload_gcm(user_profile, message_ids)
if uses_notification_bouncer():
try:
send_notifications_to_bouncer(user_profile_id,
{},
gcm_payload,
gcm_options)
except requests.ConnectionError: # nocoverage
raise PushNotificationBouncerRetryLaterError(
"ConnectionError while trying to connect to the bouncer")
send_notifications_to_bouncer(user_profile_id,
{},
gcm_payload,
gcm_options)
else:
android_devices = list(PushDeviceToken.objects.filter(
user=user_profile, kind=PushDeviceToken.GCM))
@@ -744,15 +738,11 @@ def handle_push_notification(user_profile_id: int, missed_message: Dict[str, Any
logger.info("Sending push notifications to mobile clients for user %s" % (user_profile_id,))
if uses_notification_bouncer():
try:
send_notifications_to_bouncer(user_profile_id,
apns_payload,
gcm_payload,
gcm_options)
return
except requests.ConnectionError:
raise PushNotificationBouncerRetryLaterError(
"ConnectionError while trying to connect to the bouncer")
send_notifications_to_bouncer(user_profile_id,
apns_payload,
gcm_payload,
gcm_options)
return
android_devices = list(PushDeviceToken.objects.filter(user=user_profile,
kind=PushDeviceToken.GCM))

View File

@@ -18,18 +18,19 @@ class PushNotificationBouncerException(Exception):
pass
class PushNotificationBouncerRetryLaterError(JsonableError):
pass
http_status_code = 502
def send_to_push_bouncer(method: str,
endpoint: str,
post_data: Union[str, Dict[str, Any]],
extra_headers: Optional[Dict[str, Any]]=None) -> Tuple[Dict[str, Any], bool]:
extra_headers: Optional[Dict[str, Any]]=None) -> Dict[str, Any]:
"""While it does actually send the notice, this function has a lot of
code and comments around error handling for the push notifications
bouncer. There are several classes of failures, each with its own
potential solution:
* Network errors with requests.request. We let those happen normally.
* Network errors with requests.request. We raise an exception to signal
it to the callers.
* 500 errors from the push bouncer or other unexpected responses;
we don't try to parse the response, but do make clear the cause.
@@ -48,22 +49,26 @@ def send_to_push_bouncer(method: str,
if extra_headers is not None:
headers.update(extra_headers)
res = requests.request(method,
url,
data=post_data,
auth=api_auth,
timeout=30,
verify=True,
headers=headers)
try:
res = requests.request(method,
url,
data=post_data,
auth=api_auth,
timeout=30,
verify=True,
headers=headers)
except requests.exceptions.ConnectionError:
raise PushNotificationBouncerRetryLaterError(
"ConnectionError while trying to connect to push notification bouncer")
if res.status_code >= 500:
# 500s should be resolved by the people who run the push
# notification bouncer service, and they'll get an appropriate
# error notification from the server. We just return after
# doing something. Ideally, we'll add do some sort of spaced
# retry eventually.
logging.warning("Received 500 from push notification bouncer")
return {}, True
# error notification from the server. We raise an exception to signal
# to the callers that the attempt failed and they can retry.
error_msg = "Received 500 from push notification bouncer"
logging.warning(error_msg)
raise PushNotificationBouncerRetryLaterError(error_msg)
elif res.status_code >= 400:
# If JSON parsing errors, just let that exception happen
result_dict = ujson.loads(res.content)
@@ -85,7 +90,7 @@ def send_to_push_bouncer(method: str,
"Push notification bouncer returned unexpected status code %s" % (res.status_code,))
# If we don't throw an exception, it's a successful bounce!
return ujson.loads(res.content), False
return ujson.loads(res.content)
def send_json_to_push_bouncer(method: str, endpoint: str, post_data: Dict[str, Any]) -> None:
send_to_push_bouncer(
@@ -126,8 +131,10 @@ def build_analytics_data(realm_count_query: Any,
def send_analytics_to_remote_server() -> None:
# first, check what's latest
(result, failed) = send_to_push_bouncer("GET", "server/analytics/status", {})
if failed: # nocoverage
try:
result = send_to_push_bouncer("GET", "server/analytics/status", {})
except PushNotificationBouncerRetryLaterError as e:
logging.warning(e.msg)
return
last_acked_realm_count_id = result['last_realm_count_id']

View File

@@ -279,7 +279,7 @@ class PushBouncerNotificationTest(BouncerTestCase):
]
# Test error handling
for endpoint, _, kind in endpoints:
for endpoint, token, kind in endpoints:
# Try adding/removing tokens that are too big...
broken_token = "a" * 5000 # too big
result = self.client_post(endpoint, {'token': broken_token,
@@ -298,6 +298,19 @@ class PushBouncerNotificationTest(BouncerTestCase):
subdomain="zulip")
self.assert_json_error(result, 'Token does not exist')
with mock.patch('zerver.lib.remote_server.requests.request',
side_effect=requests.ConnectionError):
result = self.client_post(endpoint, {'token': token},
subdomain="zulip")
self.assert_json_error(
result, "ConnectionError while trying to connect to push notification bouncer", 502)
with mock.patch('zerver.lib.remote_server.requests.request',
return_value=Result(status=500)):
result = self.client_post(endpoint, {'token': token},
subdomain="zulip")
self.assert_json_error(result, "Received 500 from push notification bouncer", 502)
# Add tokens
for endpoint, token, kind in endpoints:
# Test that we can push twice
@@ -338,7 +351,20 @@ class PushBouncerNotificationTest(BouncerTestCase):
server=server))
self.assertEqual(len(tokens), 2)
# Remove it using the bouncer after an API key change
# Now we want to remove them using the bouncer after an API key change.
# First we test error handling in case of issues with the bouncer:
with mock.patch('zerver.worker.queue_processors.clear_push_device_tokens',
side_effect=PushNotificationBouncerRetryLaterError("test")), \
mock.patch('zerver.worker.queue_processors.retry_event') as mock_retry:
do_regenerate_api_key(user, user)
mock_retry.assert_called()
# We didn't manage to communicate with the bouncer, to the tokens are still there:
tokens = list(RemotePushDeviceToken.objects.filter(user_id=user.id,
server=server))
self.assertEqual(len(tokens), 2)
# Now we succesfully remove them:
do_regenerate_api_key(user, user)
tokens = list(RemotePushDeviceToken.objects.filter(user_id=user.id,
server=server))
@@ -357,6 +383,13 @@ class AnalyticsBouncerTest(BouncerTestCase):
user = self.example_user('hamlet')
end_time = self.TIME_ZERO
with mock.patch('zerver.lib.remote_server.requests.request',
side_effect=requests.ConnectionError), \
mock.patch('zerver.lib.remote_server.logging.warning') as mock_warning:
send_analytics_to_remote_server()
mock_warning.assert_called_once_with(
"ConnectionError while trying to connect to push notification bouncer")
# Send any existing data over, so that we can start the test with a "clean" slate
audit_log_max_id = RealmAuditLog.objects.all().order_by('id').last().id
send_analytics_to_remote_server()
@@ -703,7 +736,7 @@ class HandlePushNotificationTest(PushNotificationTest):
mock.patch('zerver.lib.remote_server.requests.request',
side_effect=self.bounce_request), \
mock.patch('zerver.lib.push_notifications.gcm_client') as mock_gcm, \
mock.patch('zerver.lib.push_notifications.send_notifications_to_bouncer',
mock.patch('zerver.lib.remote_server.requests.request',
side_effect=requests.ConnectionError):
gcm_devices = [
(b64_to_hex(device.token), device.ios_app_id, device.token)
@@ -1451,9 +1484,8 @@ class TestSendToPushBouncer(ZulipTestCase):
@mock.patch('requests.request', return_value=Result(status=500))
@mock.patch('logging.warning')
def test_500_error(self, mock_request: mock.MagicMock, mock_warning: mock.MagicMock) -> None:
result, failed = send_to_push_bouncer('register', 'register', {'data': True})
self.assertEqual(result, {})
self.assertEqual(failed, True)
with self.assertRaises(PushNotificationBouncerRetryLaterError):
result, failed = send_to_push_bouncer('register', 'register', {'data': True})
mock_warning.assert_called_once()
@mock.patch('requests.request', return_value=Result(status=400))

View File

@@ -663,7 +663,7 @@ class EmbeddedBotWorker(QueueProcessingWorker):
@assign_queue('deferred_work')
class DeferredWorker(QueueProcessingWorker):
def consume(self, event: Mapping[str, Any]) -> None:
def consume(self, event: Dict[str, Any]) -> None:
if event['type'] == 'mark_stream_messages_as_read':
user_profile = get_user_profile_by_id(event['user_profile_id'])
client = Client.objects.get(id=event['client_id'])
@@ -676,7 +676,14 @@ class DeferredWorker(QueueProcessingWorker):
require_active=False)
do_mark_stream_messages_as_read(user_profile, client, stream)
elif event['type'] == 'clear_push_device_tokens':
clear_push_device_tokens(event["user_profile_id"])
try:
clear_push_device_tokens(event["user_profile_id"])
except PushNotificationBouncerRetryLaterError:
def failure_processor(event: Dict[str, Any]) -> None:
logger.warning(
"Maximum retries exceeded for trigger:%s event:clear_push_device_tokens" % (
event['user_profile_id'],))
retry_event("deferred_work", event, failure_processor)
elif event['type'] == 'realm_export':
start = time.time()
realm = Realm.objects.get(id=event['realm_id'])