mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	Move AuthedTestCase to test_helpers.py
(imported from commit bdaf63c5ed4e8d7c19e9c19f79151e8e7885c53e)
This commit is contained in:
		@@ -1,8 +1,30 @@
 | 
			
		||||
from django.test import TestCase
 | 
			
		||||
 | 
			
		||||
from zerver.lib.initial_password import initial_password
 | 
			
		||||
from zerver.lib.db import TimeTrackingCursor
 | 
			
		||||
from zerver.lib import cache
 | 
			
		||||
from zerver import tornado_callbacks
 | 
			
		||||
from zerver.worker import queue_processors
 | 
			
		||||
 | 
			
		||||
from zerver.lib.actions import (
 | 
			
		||||
    check_send_message,
 | 
			
		||||
    create_stream_if_needed,
 | 
			
		||||
    do_add_subscription,
 | 
			
		||||
    get_display_recipient,
 | 
			
		||||
    get_user_profile_by_email,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from zerver.models import (
 | 
			
		||||
    resolve_email_to_domain,
 | 
			
		||||
    Client,
 | 
			
		||||
    Message,
 | 
			
		||||
    Realm,
 | 
			
		||||
    Recipient,
 | 
			
		||||
    Stream,
 | 
			
		||||
    Subscription,
 | 
			
		||||
    UserMessage,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import base64
 | 
			
		||||
import os
 | 
			
		||||
import re
 | 
			
		||||
@@ -91,3 +113,166 @@ def queries_captured():
 | 
			
		||||
    TimeTrackingCursor.execute = old_execute
 | 
			
		||||
    TimeTrackingCursor.executemany = old_executemany
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def find_key_by_email(address):
 | 
			
		||||
    from django.core.mail import outbox
 | 
			
		||||
    key_regex = re.compile("accounts/do_confirm/([a-f0-9]{40})>")
 | 
			
		||||
    for message in reversed(outbox):
 | 
			
		||||
        if address in message.to:
 | 
			
		||||
            return key_regex.search(message.body).groups()[0]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AuthedTestCase(TestCase):
 | 
			
		||||
    # Helper because self.client.patch annoying requires you to urlencode
 | 
			
		||||
    def client_patch(self, url, info={}, **kwargs):
 | 
			
		||||
        info = urllib.urlencode(info)
 | 
			
		||||
        return self.client.patch(url, info, **kwargs)
 | 
			
		||||
    def client_put(self, url, info={}, **kwargs):
 | 
			
		||||
        info = urllib.urlencode(info)
 | 
			
		||||
        return self.client.put(url, info, **kwargs)
 | 
			
		||||
    def client_delete(self, url, info={}, **kwargs):
 | 
			
		||||
        info = urllib.urlencode(info)
 | 
			
		||||
        return self.client.delete(url, info, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def login(self, email, password=None):
 | 
			
		||||
        if password is None:
 | 
			
		||||
            password = initial_password(email)
 | 
			
		||||
        return self.client.post('/accounts/login/',
 | 
			
		||||
                                {'username':email, 'password':password})
 | 
			
		||||
 | 
			
		||||
    def register(self, username, password, domain="zulip.com"):
 | 
			
		||||
        self.client.post('/accounts/home/',
 | 
			
		||||
                         {'email': username + "@" + domain})
 | 
			
		||||
        return self.submit_reg_form_for_user(username, password, domain=domain)
 | 
			
		||||
 | 
			
		||||
    def submit_reg_form_for_user(self, username, password, domain="zulip.com"):
 | 
			
		||||
        """
 | 
			
		||||
        Stage two of the two-step registration process.
 | 
			
		||||
 | 
			
		||||
        If things are working correctly the account should be fully
 | 
			
		||||
        registered after this call.
 | 
			
		||||
        """
 | 
			
		||||
        return self.client.post('/accounts/register/',
 | 
			
		||||
                                {'full_name': username, 'password': password,
 | 
			
		||||
                                 'key': find_key_by_email(username + '@' + domain),
 | 
			
		||||
                                 'terms': True})
 | 
			
		||||
 | 
			
		||||
    def get_api_key(self, email):
 | 
			
		||||
        if email not in API_KEYS:
 | 
			
		||||
            API_KEYS[email] =  get_user_profile_by_email(email).api_key
 | 
			
		||||
        return API_KEYS[email]
 | 
			
		||||
 | 
			
		||||
    def api_auth(self, email):
 | 
			
		||||
        credentials = "%s:%s" % (email, self.get_api_key(email))
 | 
			
		||||
        return {
 | 
			
		||||
            'HTTP_AUTHORIZATION': 'Basic ' + base64.b64encode(credentials)
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
    def get_streams(self, email):
 | 
			
		||||
        """
 | 
			
		||||
        Helper function to get the stream names for a user
 | 
			
		||||
        """
 | 
			
		||||
        user_profile = get_user_profile_by_email(email)
 | 
			
		||||
        subs = Subscription.objects.filter(
 | 
			
		||||
            user_profile    = user_profile,
 | 
			
		||||
            active          = True,
 | 
			
		||||
            recipient__type = Recipient.STREAM)
 | 
			
		||||
        return [get_display_recipient(sub.recipient) for sub in subs]
 | 
			
		||||
 | 
			
		||||
    def send_message(self, sender_name, recipient_list, message_type,
 | 
			
		||||
                     content="test content", subject="test", **kwargs):
 | 
			
		||||
        sender = get_user_profile_by_email(sender_name)
 | 
			
		||||
        if message_type == Recipient.PERSONAL:
 | 
			
		||||
            message_type_name = "private"
 | 
			
		||||
        else:
 | 
			
		||||
            message_type_name = "stream"
 | 
			
		||||
        if isinstance(recipient_list, basestring):
 | 
			
		||||
            recipient_list = [recipient_list]
 | 
			
		||||
        (sending_client, _) = Client.objects.get_or_create(name="test suite")
 | 
			
		||||
 | 
			
		||||
        return check_send_message(
 | 
			
		||||
            sender, sending_client, message_type_name, recipient_list, subject,
 | 
			
		||||
            content, forged=False, forged_timestamp=None,
 | 
			
		||||
            forwarder_user_profile=sender, realm=sender.realm, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def get_old_messages(self, anchor=1, num_before=100, num_after=100):
 | 
			
		||||
        post_params = {"anchor": anchor, "num_before": num_before,
 | 
			
		||||
                       "num_after": num_after}
 | 
			
		||||
        result = self.client.post("/json/get_old_messages", dict(post_params))
 | 
			
		||||
        data = ujson.loads(result.content)
 | 
			
		||||
        return data['messages']
 | 
			
		||||
 | 
			
		||||
    def users_subscribed_to_stream(self, stream_name, realm_domain):
 | 
			
		||||
        realm = Realm.objects.get(domain=realm_domain)
 | 
			
		||||
        stream = Stream.objects.get(name=stream_name, realm=realm)
 | 
			
		||||
        recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
 | 
			
		||||
        subscriptions = Subscription.objects.filter(recipient=recipient)
 | 
			
		||||
 | 
			
		||||
        return [subscription.user_profile for subscription in subscriptions]
 | 
			
		||||
 | 
			
		||||
    def assert_json_success(self, result):
 | 
			
		||||
        """
 | 
			
		||||
        Successful POSTs return a 200 and JSON of the form {"result": "success",
 | 
			
		||||
        "msg": ""}.
 | 
			
		||||
        """
 | 
			
		||||
        self.assertEqual(result.status_code, 200, result)
 | 
			
		||||
        json = ujson.loads(result.content)
 | 
			
		||||
        self.assertEqual(json.get("result"), "success")
 | 
			
		||||
        # We have a msg key for consistency with errors, but it typically has an
 | 
			
		||||
        # empty value.
 | 
			
		||||
        self.assertIn("msg", json)
 | 
			
		||||
 | 
			
		||||
    def get_json_error(self, result):
 | 
			
		||||
        self.assertEqual(result.status_code, 400)
 | 
			
		||||
        json = ujson.loads(result.content)
 | 
			
		||||
        self.assertEqual(json.get("result"), "error")
 | 
			
		||||
        return json['msg']
 | 
			
		||||
 | 
			
		||||
    def assert_json_error(self, result, msg):
 | 
			
		||||
        """
 | 
			
		||||
        Invalid POSTs return a 400 and JSON of the form {"result": "error",
 | 
			
		||||
        "msg": "reason"}.
 | 
			
		||||
        """
 | 
			
		||||
        self.assertEqual(self.get_json_error(result), msg)
 | 
			
		||||
 | 
			
		||||
    def assert_length(self, queries, count, exact=False):
 | 
			
		||||
        if exact:
 | 
			
		||||
            return self.assertTrue(len(queries) == count, queries)
 | 
			
		||||
        return self.assertTrue(len(queries) <= count, queries)
 | 
			
		||||
 | 
			
		||||
    def assert_json_error_contains(self, result, msg_substring):
 | 
			
		||||
        self.assertIn(msg_substring, self.get_json_error(result))
 | 
			
		||||
 | 
			
		||||
    def fixture_data(self, type, action, file_type='json'):
 | 
			
		||||
        return open(os.path.join(os.path.dirname(__file__),
 | 
			
		||||
                                 "../fixtures/%s/%s_%s.%s" % (type, type, action,file_type))).read()
 | 
			
		||||
 | 
			
		||||
    # Subscribe to a stream directly
 | 
			
		||||
    def subscribe_to_stream(self, email, stream_name, realm=None):
 | 
			
		||||
        realm = Realm.objects.get(domain=resolve_email_to_domain(email))
 | 
			
		||||
        stream, _ = create_stream_if_needed(realm, stream_name)
 | 
			
		||||
        user_profile = get_user_profile_by_email(email)
 | 
			
		||||
        do_add_subscription(user_profile, stream, no_log=True)
 | 
			
		||||
 | 
			
		||||
    # Subscribe to a stream by making an API request
 | 
			
		||||
    def common_subscribe_to_streams(self, email, streams, extra_post_data = {}, invite_only=False):
 | 
			
		||||
        post_data = {'subscriptions': ujson.dumps([{"name": stream} for stream in streams]),
 | 
			
		||||
                     'invite_only': ujson.dumps(invite_only)}
 | 
			
		||||
        post_data.update(extra_post_data)
 | 
			
		||||
        result = self.client.post("/api/v1/users/me/subscriptions", post_data, **self.api_auth(email))
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
    def send_json_payload(self, email, url, payload, stream_name=None, **post_params):
 | 
			
		||||
        if stream_name != None:
 | 
			
		||||
            self.subscribe_to_stream(email, stream_name)
 | 
			
		||||
 | 
			
		||||
        result = self.client.post(url, payload, **post_params)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
 | 
			
		||||
        # Check the correct message was sent
 | 
			
		||||
        msg = Message.objects.filter().order_by('-id')[0]
 | 
			
		||||
        self.assertEqual(msg.sender.email, email)
 | 
			
		||||
        self.assertEqual(get_display_recipient(msg.recipient), stream_name)
 | 
			
		||||
 | 
			
		||||
        return msg
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										163
									
								
								zerver/tests.py
									
									
									
									
									
								
							
							
						
						
									
										163
									
								
								zerver/tests.py
									
									
									
									
									
								
							@@ -8,11 +8,13 @@ from django.core.exceptions import ValidationError
 | 
			
		||||
from django.db.models import Q
 | 
			
		||||
 | 
			
		||||
from zerver.lib.test_helpers import (
 | 
			
		||||
    find_key_by_email,
 | 
			
		||||
    queries_captured,
 | 
			
		||||
    simulated_empty_cache,
 | 
			
		||||
    simulated_queue_client,
 | 
			
		||||
    stub,
 | 
			
		||||
    tornado_redirected_to_list,
 | 
			
		||||
    AuthedTestCase,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from zilencer.models import Deployment
 | 
			
		||||
@@ -90,13 +92,6 @@ try:
 | 
			
		||||
except ImportError:
 | 
			
		||||
    bail('The Pygments library is required to run the backend test suite.')
 | 
			
		||||
 | 
			
		||||
def find_key_by_email(address):
 | 
			
		||||
    from django.core.mail import outbox
 | 
			
		||||
    key_regex = re.compile("accounts/do_confirm/([a-f0-9]{40})>")
 | 
			
		||||
    for message in reversed(outbox):
 | 
			
		||||
        if address in message.to:
 | 
			
		||||
            return key_regex.search(message.body).groups()[0]
 | 
			
		||||
 | 
			
		||||
def message_ids(result):
 | 
			
		||||
    return set(message['id'] for message in result['messages'])
 | 
			
		||||
 | 
			
		||||
@@ -307,160 +302,6 @@ class ValidatorTestCase(TestCase):
 | 
			
		||||
        person = 'misconfigured data'
 | 
			
		||||
        self.assertEqual(check_person(person), 'This is not a valid person')
 | 
			
		||||
 | 
			
		||||
class AuthedTestCase(TestCase):
 | 
			
		||||
    # Helper because self.client.patch annoying requires you to urlencode
 | 
			
		||||
    def client_patch(self, url, info={}, **kwargs):
 | 
			
		||||
        info = urllib.urlencode(info)
 | 
			
		||||
        return self.client.patch(url, info, **kwargs)
 | 
			
		||||
    def client_put(self, url, info={}, **kwargs):
 | 
			
		||||
        info = urllib.urlencode(info)
 | 
			
		||||
        return self.client.put(url, info, **kwargs)
 | 
			
		||||
    def client_delete(self, url, info={}, **kwargs):
 | 
			
		||||
        info = urllib.urlencode(info)
 | 
			
		||||
        return self.client.delete(url, info, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def login(self, email, password=None):
 | 
			
		||||
        if password is None:
 | 
			
		||||
            password = initial_password(email)
 | 
			
		||||
        return self.client.post('/accounts/login/',
 | 
			
		||||
                                {'username':email, 'password':password})
 | 
			
		||||
 | 
			
		||||
    def register(self, username, password, domain="zulip.com"):
 | 
			
		||||
        self.client.post('/accounts/home/',
 | 
			
		||||
                         {'email': username + "@" + domain})
 | 
			
		||||
        return self.submit_reg_form_for_user(username, password, domain=domain)
 | 
			
		||||
 | 
			
		||||
    def submit_reg_form_for_user(self, username, password, domain="zulip.com"):
 | 
			
		||||
        """
 | 
			
		||||
        Stage two of the two-step registration process.
 | 
			
		||||
 | 
			
		||||
        If things are working correctly the account should be fully
 | 
			
		||||
        registered after this call.
 | 
			
		||||
        """
 | 
			
		||||
        return self.client.post('/accounts/register/',
 | 
			
		||||
                                {'full_name': username, 'password': password,
 | 
			
		||||
                                 'key': find_key_by_email(username + '@' + domain),
 | 
			
		||||
                                 'terms': True})
 | 
			
		||||
 | 
			
		||||
    def get_api_key(self, email):
 | 
			
		||||
        if email not in API_KEYS:
 | 
			
		||||
            API_KEYS[email] =  get_user_profile_by_email(email).api_key
 | 
			
		||||
        return API_KEYS[email]
 | 
			
		||||
 | 
			
		||||
    def api_auth(self, email):
 | 
			
		||||
        credentials = "%s:%s" % (email, self.get_api_key(email))
 | 
			
		||||
        return {
 | 
			
		||||
            'HTTP_AUTHORIZATION': 'Basic ' + base64.b64encode(credentials)
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
    def get_streams(self, email):
 | 
			
		||||
        """
 | 
			
		||||
        Helper function to get the stream names for a user
 | 
			
		||||
        """
 | 
			
		||||
        user_profile = get_user_profile_by_email(email)
 | 
			
		||||
        subs = Subscription.objects.filter(
 | 
			
		||||
            user_profile    = user_profile,
 | 
			
		||||
            active          = True,
 | 
			
		||||
            recipient__type = Recipient.STREAM)
 | 
			
		||||
        return [get_display_recipient(sub.recipient) for sub in subs]
 | 
			
		||||
 | 
			
		||||
    def send_message(self, sender_name, recipient_list, message_type,
 | 
			
		||||
                     content="test content", subject="test", **kwargs):
 | 
			
		||||
        sender = get_user_profile_by_email(sender_name)
 | 
			
		||||
        if message_type == Recipient.PERSONAL:
 | 
			
		||||
            message_type_name = "private"
 | 
			
		||||
        else:
 | 
			
		||||
            message_type_name = "stream"
 | 
			
		||||
        if isinstance(recipient_list, basestring):
 | 
			
		||||
            recipient_list = [recipient_list]
 | 
			
		||||
        (sending_client, _) = Client.objects.get_or_create(name="test suite")
 | 
			
		||||
 | 
			
		||||
        return check_send_message(
 | 
			
		||||
            sender, sending_client, message_type_name, recipient_list, subject,
 | 
			
		||||
            content, forged=False, forged_timestamp=None,
 | 
			
		||||
            forwarder_user_profile=sender, realm=sender.realm, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def get_old_messages(self, anchor=1, num_before=100, num_after=100):
 | 
			
		||||
        post_params = {"anchor": anchor, "num_before": num_before,
 | 
			
		||||
                       "num_after": num_after}
 | 
			
		||||
        result = self.client.post("/json/get_old_messages", dict(post_params))
 | 
			
		||||
        data = ujson.loads(result.content)
 | 
			
		||||
        return data['messages']
 | 
			
		||||
 | 
			
		||||
    def users_subscribed_to_stream(self, stream_name, realm_domain):
 | 
			
		||||
        realm = Realm.objects.get(domain=realm_domain)
 | 
			
		||||
        stream = Stream.objects.get(name=stream_name, realm=realm)
 | 
			
		||||
        recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
 | 
			
		||||
        subscriptions = Subscription.objects.filter(recipient=recipient)
 | 
			
		||||
 | 
			
		||||
        return [subscription.user_profile for subscription in subscriptions]
 | 
			
		||||
 | 
			
		||||
    def assert_json_success(self, result):
 | 
			
		||||
        """
 | 
			
		||||
        Successful POSTs return a 200 and JSON of the form {"result": "success",
 | 
			
		||||
        "msg": ""}.
 | 
			
		||||
        """
 | 
			
		||||
        self.assertEqual(result.status_code, 200, result)
 | 
			
		||||
        json = ujson.loads(result.content)
 | 
			
		||||
        self.assertEqual(json.get("result"), "success")
 | 
			
		||||
        # We have a msg key for consistency with errors, but it typically has an
 | 
			
		||||
        # empty value.
 | 
			
		||||
        self.assertIn("msg", json)
 | 
			
		||||
 | 
			
		||||
    def get_json_error(self, result):
 | 
			
		||||
        self.assertEqual(result.status_code, 400)
 | 
			
		||||
        json = ujson.loads(result.content)
 | 
			
		||||
        self.assertEqual(json.get("result"), "error")
 | 
			
		||||
        return json['msg']
 | 
			
		||||
 | 
			
		||||
    def assert_json_error(self, result, msg):
 | 
			
		||||
        """
 | 
			
		||||
        Invalid POSTs return a 400 and JSON of the form {"result": "error",
 | 
			
		||||
        "msg": "reason"}.
 | 
			
		||||
        """
 | 
			
		||||
        self.assertEqual(self.get_json_error(result), msg)
 | 
			
		||||
 | 
			
		||||
    def assert_length(self, queries, count, exact=False):
 | 
			
		||||
        if exact:
 | 
			
		||||
            return self.assertTrue(len(queries) == count, queries)
 | 
			
		||||
        return self.assertTrue(len(queries) <= count, queries)
 | 
			
		||||
 | 
			
		||||
    def assert_json_error_contains(self, result, msg_substring):
 | 
			
		||||
        self.assertIn(msg_substring, self.get_json_error(result))
 | 
			
		||||
 | 
			
		||||
    def fixture_data(self, type, action, file_type='json'):
 | 
			
		||||
        return open(os.path.join(os.path.dirname(__file__),
 | 
			
		||||
                                 "fixtures/%s/%s_%s.%s" % (type, type, action,file_type))).read()
 | 
			
		||||
 | 
			
		||||
    # Subscribe to a stream directly
 | 
			
		||||
    def subscribe_to_stream(self, email, stream_name, realm=None):
 | 
			
		||||
        realm = Realm.objects.get(domain=resolve_email_to_domain(email))
 | 
			
		||||
        stream, _ = create_stream_if_needed(realm, stream_name)
 | 
			
		||||
        user_profile = get_user_profile_by_email(email)
 | 
			
		||||
        do_add_subscription(user_profile, stream, no_log=True)
 | 
			
		||||
 | 
			
		||||
    # Subscribe to a stream by making an API request
 | 
			
		||||
    def common_subscribe_to_streams(self, email, streams, extra_post_data = {}, invite_only=False):
 | 
			
		||||
        post_data = {'subscriptions': ujson.dumps([{"name": stream} for stream in streams]),
 | 
			
		||||
                     'invite_only': ujson.dumps(invite_only)}
 | 
			
		||||
        post_data.update(extra_post_data)
 | 
			
		||||
        result = self.client.post("/api/v1/users/me/subscriptions", post_data, **self.api_auth(email))
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
    def send_json_payload(self, email, url, payload, stream_name=None, **post_params):
 | 
			
		||||
        if stream_name != None:
 | 
			
		||||
            self.subscribe_to_stream(email, stream_name)
 | 
			
		||||
 | 
			
		||||
        result = self.client.post(url, payload, **post_params)
 | 
			
		||||
        self.assert_json_success(result)
 | 
			
		||||
 | 
			
		||||
        # Check the correct message was sent
 | 
			
		||||
        msg = Message.objects.filter().order_by('-id')[0]
 | 
			
		||||
        self.assertEqual(msg.sender.email, email)
 | 
			
		||||
        self.assertEqual(get_display_recipient(msg.recipient), stream_name)
 | 
			
		||||
 | 
			
		||||
        return msg
 | 
			
		||||
 | 
			
		||||
class StreamAdminTest(AuthedTestCase):
 | 
			
		||||
    def test_make_stream_public(self):
 | 
			
		||||
        email = 'hamlet@zulip.com'
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user