mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	push_notifications: Validate format of APNS tokens.
This fixes a bug where we would previously not validate the format of APNS tokens before writing them to the database, which could lead to exceptions in the push notifications system if a buggy mobile app submitted invalid format tokens.
This commit is contained in:
		@@ -166,6 +166,22 @@ class PushBouncerNotificationTest(BouncerTestCase):
 | 
				
			|||||||
            result = self.client_post(endpoint, payload, **self.get_auth())
 | 
					            result = self.client_post(endpoint, payload, **self.get_auth())
 | 
				
			||||||
            self.assert_json_error(result, 'Empty or invalid length token')
 | 
					            self.assert_json_error(result, 'Empty or invalid length token')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_invalid_apns_token(self):
 | 
				
			||||||
 | 
					        # type: () -> None
 | 
				
			||||||
 | 
					        endpoints = [
 | 
				
			||||||
 | 
					            ('/json/users/me/apns_device_token', 'apple-token'),
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for endpoint, method in endpoints:
 | 
				
			||||||
 | 
					            payload = {
 | 
				
			||||||
 | 
					                'user_id': 10,
 | 
				
			||||||
 | 
					                'token': 'xyz uses non-hex characters',
 | 
				
			||||||
 | 
					                'token_kind': PushDeviceToken.APNS,
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            result = self.client_post(endpoint, payload,
 | 
				
			||||||
 | 
					                                      **self.get_auth())
 | 
				
			||||||
 | 
					            self.assert_json_error(result, 'Invalid APNS token')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @override_settings(PUSH_NOTIFICATION_BOUNCER_URL='https://push.zulip.org.example.com')
 | 
					    @override_settings(PUSH_NOTIFICATION_BOUNCER_URL='https://push.zulip.org.example.com')
 | 
				
			||||||
    @mock.patch('zerver.lib.push_notifications.requests.request')
 | 
					    @mock.patch('zerver.lib.push_notifications.requests.request')
 | 
				
			||||||
    def test_push_bouncer_api(self, mock):
 | 
					    def test_push_bouncer_api(self, mock):
 | 
				
			||||||
@@ -180,14 +196,14 @@ class PushBouncerNotificationTest(BouncerTestCase):
 | 
				
			|||||||
        server = RemoteZulipServer.objects.get(uuid=self.server_uuid)
 | 
					        server = RemoteZulipServer.objects.get(uuid=self.server_uuid)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        endpoints = [
 | 
					        endpoints = [
 | 
				
			||||||
            ('/json/users/me/apns_device_token', 'apple-token'),
 | 
					            ('/json/users/me/apns_device_token', 'apple-tokenaz'),
 | 
				
			||||||
            ('/json/users/me/android_gcm_reg_id', 'android-token'),
 | 
					            ('/json/users/me/android_gcm_reg_id', 'android-token'),
 | 
				
			||||||
        ]
 | 
					        ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Test error handling
 | 
					        # Test error handling
 | 
				
			||||||
        for endpoint, _ in endpoints:
 | 
					        for endpoint, _ in endpoints:
 | 
				
			||||||
            # Try adding/removing tokens that are too big...
 | 
					            # Try adding/removing tokens that are too big...
 | 
				
			||||||
            broken_token = "x" * 5000  # too big
 | 
					            broken_token = "a" * 5000  # too big
 | 
				
			||||||
            result = self.client_post(endpoint, {'token': broken_token})
 | 
					            result = self.client_post(endpoint, {'token': broken_token})
 | 
				
			||||||
            self.assert_json_error(result, 'Empty or invalid length token')
 | 
					            self.assert_json_error(result, 'Empty or invalid length token')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -195,7 +211,7 @@ class PushBouncerNotificationTest(BouncerTestCase):
 | 
				
			|||||||
            self.assert_json_error(result, 'Empty or invalid length token')
 | 
					            self.assert_json_error(result, 'Empty or invalid length token')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Try to remove a non-existent token...
 | 
					            # Try to remove a non-existent token...
 | 
				
			||||||
            result = self.client_delete(endpoint, {'token': 'non-existent token'})
 | 
					            result = self.client_delete(endpoint, {'token': 'abcd1234'})
 | 
				
			||||||
            self.assert_json_error(result, 'Token does not exist')
 | 
					            self.assert_json_error(result, 'Token does not exist')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Add tokens
 | 
					        # Add tokens
 | 
				
			||||||
@@ -664,22 +680,26 @@ class TestPushApi(ZulipTestCase):
 | 
				
			|||||||
        self.login(email)
 | 
					        self.login(email)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        endpoints = [
 | 
					        endpoints = [
 | 
				
			||||||
            ('/json/users/me/apns_device_token', 'apple-token'),
 | 
					            ('/json/users/me/apns_device_token', 'apple-tokenaz'),
 | 
				
			||||||
            ('/json/users/me/android_gcm_reg_id', 'android-token'),
 | 
					            ('/json/users/me/android_gcm_reg_id', 'android-token'),
 | 
				
			||||||
        ]
 | 
					        ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Test error handling
 | 
					        # Test error handling
 | 
				
			||||||
        for endpoint, _ in endpoints:
 | 
					        for endpoint, label in endpoints:
 | 
				
			||||||
            # Try adding/removing tokens that are too big...
 | 
					            # Try adding/removing tokens that are too big...
 | 
				
			||||||
            broken_token = "x" * 5000  # too big
 | 
					            broken_token = "a" * 5000  # too big
 | 
				
			||||||
            result = self.client_post(endpoint, {'token': broken_token})
 | 
					            result = self.client_post(endpoint, {'token': broken_token})
 | 
				
			||||||
            self.assert_json_error(result, 'Empty or invalid length token')
 | 
					            self.assert_json_error(result, 'Empty or invalid length token')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if label == 'apple-tokenaz':
 | 
				
			||||||
 | 
					                result = self.client_post(endpoint, {'token': 'xyz has non-hex characters'})
 | 
				
			||||||
 | 
					                self.assert_json_error(result, 'Invalid APNS token')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            result = self.client_delete(endpoint, {'token': broken_token})
 | 
					            result = self.client_delete(endpoint, {'token': broken_token})
 | 
				
			||||||
            self.assert_json_error(result, 'Empty or invalid length token')
 | 
					            self.assert_json_error(result, 'Empty or invalid length token')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Try to remove a non-existent token...
 | 
					            # Try to remove a non-existent token...
 | 
				
			||||||
            result = self.client_delete(endpoint, {'token': 'non-existent token'})
 | 
					            result = self.client_delete(endpoint, {'token': 'abcd1234'})
 | 
				
			||||||
            self.assert_json_error(result, 'Token does not exist')
 | 
					            self.assert_json_error(result, 'Token does not exist')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Add tokens
 | 
					        # Add tokens
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -10,42 +10,48 @@ from django.http import HttpRequest, HttpResponse
 | 
				
			|||||||
from django.utils.translation import ugettext as _
 | 
					from django.utils.translation import ugettext as _
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from zerver.lib.push_notifications import add_push_device_token, \
 | 
					from zerver.lib.push_notifications import add_push_device_token, \
 | 
				
			||||||
    remove_push_device_token
 | 
					    b64_to_hex, remove_push_device_token
 | 
				
			||||||
from zerver.lib.request import has_request_variables, REQ, JsonableError
 | 
					from zerver.lib.request import has_request_variables, REQ, JsonableError
 | 
				
			||||||
from zerver.lib.response import json_success, json_error
 | 
					from zerver.lib.response import json_success, json_error
 | 
				
			||||||
from zerver.lib.validator import check_string, check_list, check_bool
 | 
					from zerver.lib.validator import check_string, check_list, check_bool
 | 
				
			||||||
from zerver.models import PushDeviceToken, UserProfile
 | 
					from zerver.models import PushDeviceToken, UserProfile
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def validate_token(token_str):
 | 
					def validate_token(token_str, kind):
 | 
				
			||||||
    # type: (str) -> None
 | 
					    # type: (str, int) -> None
 | 
				
			||||||
    if token_str == '' or len(token_str) > 4096:
 | 
					    if token_str == '' or len(token_str) > 4096:
 | 
				
			||||||
        raise JsonableError(_('Empty or invalid length token'))
 | 
					        raise JsonableError(_('Empty or invalid length token'))
 | 
				
			||||||
 | 
					    if kind == PushDeviceToken.APNS:
 | 
				
			||||||
 | 
					        # Validate that we can actually decode the token.
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            b64_to_hex(token_str)
 | 
				
			||||||
 | 
					        except Exception:
 | 
				
			||||||
 | 
					            raise JsonableError(_('Invalid APNS token'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@has_request_variables
 | 
					@has_request_variables
 | 
				
			||||||
def add_apns_device_token(request, user_profile, token=REQ(),
 | 
					def add_apns_device_token(request, user_profile, token=REQ(),
 | 
				
			||||||
                          appid=REQ(default=settings.ZULIP_IOS_APP_ID)):
 | 
					                          appid=REQ(default=settings.ZULIP_IOS_APP_ID)):
 | 
				
			||||||
    # type: (HttpRequest, UserProfile, str, str) -> HttpResponse
 | 
					    # type: (HttpRequest, UserProfile, str, str) -> HttpResponse
 | 
				
			||||||
    validate_token(token)
 | 
					    validate_token(token, PushDeviceToken.APNS)
 | 
				
			||||||
    add_push_device_token(user_profile, token, PushDeviceToken.APNS, ios_app_id=appid)
 | 
					    add_push_device_token(user_profile, token, PushDeviceToken.APNS, ios_app_id=appid)
 | 
				
			||||||
    return json_success()
 | 
					    return json_success()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@has_request_variables
 | 
					@has_request_variables
 | 
				
			||||||
def add_android_reg_id(request, user_profile, token=REQ()):
 | 
					def add_android_reg_id(request, user_profile, token=REQ()):
 | 
				
			||||||
    # type: (HttpRequest, UserProfile, str) -> HttpResponse
 | 
					    # type: (HttpRequest, UserProfile, str) -> HttpResponse
 | 
				
			||||||
    validate_token(token)
 | 
					    validate_token(token, PushDeviceToken.GCM)
 | 
				
			||||||
    add_push_device_token(user_profile, token, PushDeviceToken.GCM)
 | 
					    add_push_device_token(user_profile, token, PushDeviceToken.GCM)
 | 
				
			||||||
    return json_success()
 | 
					    return json_success()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@has_request_variables
 | 
					@has_request_variables
 | 
				
			||||||
def remove_apns_device_token(request, user_profile, token=REQ()):
 | 
					def remove_apns_device_token(request, user_profile, token=REQ()):
 | 
				
			||||||
    # type: (HttpRequest, UserProfile, str) -> HttpResponse
 | 
					    # type: (HttpRequest, UserProfile, str) -> HttpResponse
 | 
				
			||||||
    validate_token(token)
 | 
					    validate_token(token, PushDeviceToken.APNS)
 | 
				
			||||||
    remove_push_device_token(user_profile, token, PushDeviceToken.APNS)
 | 
					    remove_push_device_token(user_profile, token, PushDeviceToken.APNS)
 | 
				
			||||||
    return json_success()
 | 
					    return json_success()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@has_request_variables
 | 
					@has_request_variables
 | 
				
			||||||
def remove_android_reg_id(request, user_profile, token=REQ()):
 | 
					def remove_android_reg_id(request, user_profile, token=REQ()):
 | 
				
			||||||
    # type: (HttpRequest, UserProfile, str) -> HttpResponse
 | 
					    # type: (HttpRequest, UserProfile, str) -> HttpResponse
 | 
				
			||||||
    validate_token(token)
 | 
					    validate_token(token, PushDeviceToken.GCM)
 | 
				
			||||||
    remove_push_device_token(user_profile, token, PushDeviceToken.GCM)
 | 
					    remove_push_device_token(user_profile, token, PushDeviceToken.GCM)
 | 
				
			||||||
    return json_success()
 | 
					    return json_success()
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user