mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			249 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			249 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
from django.test import TestCase
 | 
						|
 | 
						|
from zerver.lib.actions import do_deactivate_realm
 | 
						|
from zerver.lib.test_helpers import (
 | 
						|
    AuthedTestCase,
 | 
						|
)
 | 
						|
from zerver.decorator import \
 | 
						|
    REQ, has_request_variables, RequestVariableMissingError, \
 | 
						|
    RequestVariableConversionError, JsonableError
 | 
						|
from zerver.lib.validator import (
 | 
						|
    check_string, check_dict, check_bool, check_int, check_list
 | 
						|
)
 | 
						|
from zerver.models import \
 | 
						|
    get_realm, get_user_profile_by_email
 | 
						|
 | 
						|
import ujson
 | 
						|
 | 
						|
class DecoratorTestCase(TestCase):
 | 
						|
    def test_REQ_converter(self):
 | 
						|
 | 
						|
        def my_converter(data):
 | 
						|
            lst = ujson.loads(data)
 | 
						|
            if not isinstance(lst, list):
 | 
						|
                raise ValueError('not a list')
 | 
						|
            if 13 in lst:
 | 
						|
                raise JsonableError('13 is an unlucky number!')
 | 
						|
            return lst
 | 
						|
 | 
						|
        @has_request_variables
 | 
						|
        def get_total(request, numbers=REQ(converter=my_converter)):
 | 
						|
            return sum(numbers)
 | 
						|
 | 
						|
        class Request(object):
 | 
						|
            REQUEST = {} # type: Dict[str, str]
 | 
						|
 | 
						|
        request = Request()
 | 
						|
 | 
						|
        with self.assertRaises(RequestVariableMissingError):
 | 
						|
            get_total(request)
 | 
						|
 | 
						|
        request.REQUEST['numbers'] = 'bad_value'
 | 
						|
        with self.assertRaises(RequestVariableConversionError) as cm:
 | 
						|
            get_total(request)
 | 
						|
        self.assertEqual(str(cm.exception), "Bad value for 'numbers': bad_value")
 | 
						|
 | 
						|
        request.REQUEST['numbers'] = ujson.dumps([2, 3, 5, 8, 13, 21])
 | 
						|
        with self.assertRaises(JsonableError) as cm:
 | 
						|
            get_total(request)
 | 
						|
        self.assertEqual(str(cm.exception), "13 is an unlucky number!")
 | 
						|
 | 
						|
        request.REQUEST['numbers'] = ujson.dumps([1, 2, 3, 4, 5, 6])
 | 
						|
        result = get_total(request)
 | 
						|
        self.assertEqual(result, 21)
 | 
						|
 | 
						|
    def test_REQ_validator(self):
 | 
						|
 | 
						|
        @has_request_variables
 | 
						|
        def get_total(request, numbers=REQ(validator=check_list(check_int))):
 | 
						|
            return sum(numbers)
 | 
						|
 | 
						|
        class Request(object):
 | 
						|
            REQUEST = {} # type: Dict[str, str]
 | 
						|
 | 
						|
        request = Request()
 | 
						|
 | 
						|
        with self.assertRaises(RequestVariableMissingError):
 | 
						|
            get_total(request)
 | 
						|
 | 
						|
        request.REQUEST['numbers'] = 'bad_value'
 | 
						|
        with self.assertRaises(JsonableError) as cm:
 | 
						|
            get_total(request)
 | 
						|
        self.assertEqual(str(cm.exception), 'argument "numbers" is not valid json.')
 | 
						|
 | 
						|
        request.REQUEST['numbers'] = ujson.dumps([1, 2, "what?", 4, 5, 6])
 | 
						|
        with self.assertRaises(JsonableError) as cm:
 | 
						|
            get_total(request)
 | 
						|
        self.assertEqual(str(cm.exception), 'numbers[2] is not an integer')
 | 
						|
 | 
						|
        request.REQUEST['numbers'] = ujson.dumps([1, 2, 3, 4, 5, 6])
 | 
						|
        result = get_total(request)
 | 
						|
        self.assertEqual(result, 21)
 | 
						|
 | 
						|
class ValidatorTestCase(TestCase):
 | 
						|
    def test_check_string(self):
 | 
						|
        x = "hello"
 | 
						|
        self.assertEqual(check_string('x', x), None)
 | 
						|
 | 
						|
        x = 4
 | 
						|
        self.assertEqual(check_string('x', x), 'x is not a string')
 | 
						|
 | 
						|
    def test_check_bool(self):
 | 
						|
        x = True
 | 
						|
        self.assertEqual(check_bool('x', x), None)
 | 
						|
 | 
						|
        x = 4
 | 
						|
        self.assertEqual(check_bool('x', x), 'x is not a boolean')
 | 
						|
 | 
						|
    def test_check_int(self):
 | 
						|
        x = 5
 | 
						|
        self.assertEqual(check_int('x', x), None)
 | 
						|
 | 
						|
        x = [{}]
 | 
						|
        self.assertEqual(check_int('x', x), 'x is not an integer')
 | 
						|
 | 
						|
    def test_check_list(self):
 | 
						|
        x = 999
 | 
						|
        error = check_list(check_string)('x', x)
 | 
						|
        self.assertEqual(error, 'x is not a list')
 | 
						|
 | 
						|
        x = ["hello", 5]
 | 
						|
        error = check_list(check_string)('x', x)
 | 
						|
        self.assertEqual(error, 'x[1] is not a string')
 | 
						|
 | 
						|
        x = [["yo"], ["hello", "goodbye", 5]]
 | 
						|
        error = check_list(check_list(check_string))('x', x)
 | 
						|
        self.assertEqual(error, 'x[1][2] is not a string')
 | 
						|
 | 
						|
        x = ["hello", "goodbye", "hello again"]
 | 
						|
        error = check_list(check_string, length=2)('x', x)
 | 
						|
        self.assertEqual(error, 'x should have exactly 2 items')
 | 
						|
 | 
						|
    def test_check_dict(self):
 | 
						|
        keys = [
 | 
						|
            ('names', check_list(check_string)),
 | 
						|
            ('city', check_string),
 | 
						|
        ]
 | 
						|
 | 
						|
        x = {
 | 
						|
            'names': ['alice', 'bob'],
 | 
						|
            'city': 'Boston',
 | 
						|
        }
 | 
						|
        error = check_dict(keys)('x', x)
 | 
						|
        self.assertEqual(error, None)
 | 
						|
 | 
						|
        x = 999
 | 
						|
        error = check_dict(keys)('x', x)
 | 
						|
        self.assertEqual(error, 'x is not a dict')
 | 
						|
 | 
						|
        x = {}
 | 
						|
        error = check_dict(keys)('x', x)
 | 
						|
        self.assertEqual(error, 'names key is missing from x')
 | 
						|
 | 
						|
        x = {
 | 
						|
            'names': ['alice', 'bob', {}]
 | 
						|
        }
 | 
						|
        error = check_dict(keys)('x', x)
 | 
						|
        self.assertEqual(error, 'x["names"][2] is not a string')
 | 
						|
 | 
						|
        x = {
 | 
						|
            'names': ['alice', 'bob'],
 | 
						|
            'city': 5
 | 
						|
        }
 | 
						|
        error = check_dict(keys)('x', x)
 | 
						|
        self.assertEqual(error, 'x["city"] is not a string')
 | 
						|
 | 
						|
    def test_encapsulation(self):
 | 
						|
        # There might be situations where we want deep
 | 
						|
        # validation, but the error message should be customized.
 | 
						|
        # This is an example.
 | 
						|
        def check_person(val):
 | 
						|
            error = check_dict([
 | 
						|
                ['name', check_string],
 | 
						|
                ['age', check_int],
 | 
						|
            ])('_', val)
 | 
						|
            if error:
 | 
						|
                return 'This is not a valid person'
 | 
						|
 | 
						|
        person = {'name': 'King Lear', 'age': 42}
 | 
						|
        self.assertEqual(check_person(person), None)
 | 
						|
 | 
						|
        person = 'misconfigured data'
 | 
						|
        self.assertEqual(check_person(person), 'This is not a valid person')
 | 
						|
 | 
						|
class DeactivatedRealmTest(AuthedTestCase):
 | 
						|
    def test_send_deactivated_realm(self):
 | 
						|
        """
 | 
						|
        rest_dispatch rejects requests in a deactivated realm, both /json and api
 | 
						|
 | 
						|
        """
 | 
						|
        realm = get_realm("zulip.com")
 | 
						|
        do_deactivate_realm(get_realm("zulip.com"))
 | 
						|
 | 
						|
        result = self.client.post("/json/messages", {"type": "private",
 | 
						|
                                                     "content": "Test message",
 | 
						|
                                                     "client": "test suite",
 | 
						|
                                                     "to": "othello@zulip.com"})
 | 
						|
        self.assert_json_error_contains(result, "Not logged in", status_code=401)
 | 
						|
 | 
						|
        # Even if a logged-in session was leaked, it still wouldn't work
 | 
						|
        realm.deactivated = False
 | 
						|
        realm.save()
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        realm.deactivated = True
 | 
						|
        realm.save()
 | 
						|
 | 
						|
        result = self.client.post("/json/messages", {"type": "private",
 | 
						|
                                                     "content": "Test message",
 | 
						|
                                                     "client": "test suite",
 | 
						|
                                                     "to": "othello@zulip.com"})
 | 
						|
        self.assert_json_error_contains(result, "has been deactivated", status_code=400)
 | 
						|
 | 
						|
        result = self.client.post("/api/v1/messages", {"type": "private",
 | 
						|
                                                       "content": "Test message",
 | 
						|
                                                       "client": "test suite",
 | 
						|
                                                       "to": "othello@zulip.com"},
 | 
						|
                                  **self.api_auth("hamlet@zulip.com"))
 | 
						|
        self.assert_json_error_contains(result, "has been deactivated", status_code=401)
 | 
						|
 | 
						|
    def test_fetch_api_key_deactivated_realm(self):
 | 
						|
        """
 | 
						|
        authenticated_json_view views fail in a deactivated realm
 | 
						|
 | 
						|
        """
 | 
						|
        realm = get_realm("zulip.com")
 | 
						|
        email = "hamlet@zulip.com"
 | 
						|
        test_password = "abcd1234"
 | 
						|
        user_profile = get_user_profile_by_email(email)
 | 
						|
        user_profile.set_password(test_password)
 | 
						|
 | 
						|
        self.login(email)
 | 
						|
        realm.deactivated = True
 | 
						|
        realm.save()
 | 
						|
        result = self.client.post("/json/fetch_api_key", {"password": test_password})
 | 
						|
        self.assert_json_error_contains(result, "has been deactivated", status_code=400)
 | 
						|
 | 
						|
    def test_login_deactivated_realm(self):
 | 
						|
        """
 | 
						|
        logging in fails in a deactivated realm
 | 
						|
 | 
						|
        """
 | 
						|
        do_deactivate_realm(get_realm("zulip.com"))
 | 
						|
        result = self.login("hamlet@zulip.com")
 | 
						|
        self.assertIn("has been deactivated", result.content.replace("\n", " "))
 | 
						|
 | 
						|
    def test_webhook_deactivated_realm(self):
 | 
						|
        """
 | 
						|
        Using a webhook while in a deactivated realm fails
 | 
						|
 | 
						|
        """
 | 
						|
        do_deactivate_realm(get_realm("zulip.com"))
 | 
						|
        email = "hamlet@zulip.com"
 | 
						|
        api_key = self.get_api_key(email)
 | 
						|
        url = "/api/v1/external/jira?api_key=%s&stream=jira_custom" % (api_key,)
 | 
						|
        data = self.fixture_data('jira', "created")
 | 
						|
        result = self.client.post(url, data,
 | 
						|
                                  content_type="application/json")
 | 
						|
        self.assert_json_error_contains(result, "has been deactivated", status_code=400)
 |