mirror of
https://github.com/zulip/zulip.git
synced 2025-11-05 14:35:27 +00:00
In a few commits before this one, we just added de-duplicated generic fixtures that apply to multiple API tests. The tests needed to be modified to accommodate that change.
610 lines
18 KiB
Python
610 lines
18 KiB
Python
from typing import Dict, Any, Optional, Iterable
|
|
from io import StringIO
|
|
from yamole import YamoleParser
|
|
|
|
import json
|
|
import os
|
|
|
|
from zerver.lib import mdiff
|
|
from zerver.lib.openapi import validate_against_openapi_schema
|
|
|
|
if False:
|
|
from zulip import Client
|
|
|
|
ZULIP_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
FIXTURE_PATH = os.path.join(ZULIP_DIR, 'templates', 'zerver', 'api', 'fixtures.json')
|
|
|
|
def load_api_fixtures():
|
|
# type: () -> Dict[str, Any]
|
|
with open(FIXTURE_PATH, 'r') as fp:
|
|
json_dict = json.loads(fp.read())
|
|
return json_dict
|
|
|
|
FIXTURES = load_api_fixtures()
|
|
|
|
def add_subscriptions(client):
|
|
# type: (Client) -> None
|
|
|
|
# {code_example|start}
|
|
# Subscribe to the stream "new stream"
|
|
result = client.add_subscriptions(
|
|
streams=[
|
|
{
|
|
'name': 'new stream',
|
|
'description': 'New stream for testing'
|
|
}
|
|
]
|
|
)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['add-subscriptions']['without_principals']
|
|
test_against_fixture(result, fixture)
|
|
|
|
# {code_example|start}
|
|
# To subscribe another user to a stream, you may pass in
|
|
# the `principals` argument, like so:
|
|
result = client.add_subscriptions(
|
|
streams=[
|
|
{'name': 'new stream', 'description': 'New stream for testing'}
|
|
],
|
|
principals=['newbie@zulip.com']
|
|
)
|
|
# {code_example|end}
|
|
assert result['result'] == 'success'
|
|
assert 'newbie@zulip.com' in result['subscribed']
|
|
|
|
def test_add_subscriptions_already_subscribed(client):
|
|
# type: (Client) -> None
|
|
result = client.add_subscriptions(
|
|
streams=[
|
|
{'name': 'new stream', 'description': 'New stream for testing'}
|
|
],
|
|
principals=['newbie@zulip.com']
|
|
)
|
|
|
|
fixture = FIXTURES['add-subscriptions']['already_subscribed']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def test_authorization_errors_fatal(client, nonadmin_client):
|
|
# type: (Client, Client) -> None
|
|
client.add_subscriptions(
|
|
streams=[
|
|
{'name': 'private_stream'}
|
|
],
|
|
)
|
|
|
|
stream_id = client.get_stream_id('private_stream')['stream_id']
|
|
client.call_endpoint(
|
|
'streams/{}'.format(stream_id),
|
|
method='PATCH',
|
|
request={'is_private': True}
|
|
)
|
|
|
|
result = nonadmin_client.add_subscriptions(
|
|
streams=[
|
|
{'name': 'private_stream'}
|
|
],
|
|
authorization_errors_fatal=False,
|
|
)
|
|
|
|
fixture = FIXTURES['add-subscriptions']['unauthorized_errors_fatal_false']
|
|
test_against_fixture(result, fixture)
|
|
|
|
result = nonadmin_client.add_subscriptions(
|
|
streams=[
|
|
{'name': 'private_stream'}
|
|
],
|
|
authorization_errors_fatal=True,
|
|
)
|
|
|
|
fixture = FIXTURES['add-subscriptions']['unauthorized_errors_fatal_true']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def create_user(client):
|
|
# type: (Client) -> None
|
|
|
|
# {code_example|start}
|
|
# Create a user
|
|
request = {
|
|
'email': 'newbie@zulip.com',
|
|
'password': 'temp',
|
|
'full_name': 'New User',
|
|
'short_name': 'newbie'
|
|
}
|
|
result = client.create_user(request)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['successful-response-empty']
|
|
test_against_fixture(result, fixture)
|
|
|
|
# Test "Email already used error"
|
|
result = client.create_user(request)
|
|
|
|
fixture = FIXTURES['email_already_used_error']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def get_members(client):
|
|
# type: (Client) -> None
|
|
|
|
# {code_example|start}
|
|
# Get all users in the realm
|
|
result = client.get_members()
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['get-all-users']
|
|
test_against_fixture(result, fixture, check_if_equal=['msg', 'result'],
|
|
check_if_exists=['members'])
|
|
members = [m for m in result['members'] if m['email'] == 'newbie@zulip.com']
|
|
assert len(members) == 1
|
|
newbie = members[0]
|
|
assert not newbie['is_admin']
|
|
assert newbie['full_name'] == 'New User'
|
|
|
|
member_fixture = fixture['members'][0]
|
|
member_result = result['members'][0]
|
|
test_against_fixture(member_result, member_fixture,
|
|
check_if_exists=member_fixture.keys())
|
|
|
|
# {code_example|start}
|
|
# You may pass the `client_gravatar` query parameter as follows:
|
|
result = client.call_endpoint(
|
|
url='users?client_gravatar=true',
|
|
method='GET',
|
|
)
|
|
# {code_example|end}
|
|
|
|
test_against_fixture(result, fixture, check_if_equal=['msg', 'result'],
|
|
check_if_exists=['members'])
|
|
assert result['members'][0]['avatar_url'] is None
|
|
|
|
def get_profile(client):
|
|
# type: (Client) -> None
|
|
|
|
# {code_example|start}
|
|
# Get the profile of the user/bot that requests this endpoint,
|
|
# which is `client` in this case:
|
|
result = client.get_profile()
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['get-profile']
|
|
check_if_equal = ['email', 'full_name', 'msg', 'result', 'short_name']
|
|
check_if_exists = ['client_id', 'is_admin', 'is_bot', 'max_message_id',
|
|
'pointer', 'user_id']
|
|
test_against_fixture(result, fixture, check_if_equal=check_if_equal,
|
|
check_if_exists=check_if_exists)
|
|
|
|
def get_stream_id(client):
|
|
# type: (Client) -> None
|
|
|
|
# {code_example|start}
|
|
# Get the ID of a given stream
|
|
stream_name = 'new stream'
|
|
result = client.get_stream_id(stream_name)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['get-stream-id']
|
|
test_against_fixture(result, fixture, check_if_equal=['msg', 'result'],
|
|
check_if_exists=['stream_id'])
|
|
|
|
def get_streams(client):
|
|
# type: (Client) -> None
|
|
|
|
# {code_example|start}
|
|
# Get all streams that the user has access to
|
|
result = client.get_streams()
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['get-all-streams']
|
|
test_against_fixture(result, fixture, check_if_equal=['msg', 'result'],
|
|
check_if_exists=['streams'])
|
|
assert len(result['streams']) == len(fixture['streams'])
|
|
streams = [s for s in result['streams'] if s['name'] == 'new stream']
|
|
assert streams[0]['description'] == 'New stream for testing'
|
|
|
|
# {code_example|start}
|
|
# You may pass in one or more of the query parameters mentioned above
|
|
# as keyword arguments, like so:
|
|
result = client.get_streams(include_public=False)
|
|
# {code_example|end}
|
|
|
|
test_against_fixture(result, fixture, check_if_equal=['msg', 'result'],
|
|
check_if_exists=['streams'])
|
|
assert len(result['streams']) == 4
|
|
|
|
def test_user_not_authorized_error(nonadmin_client):
|
|
# type: (Client) -> None
|
|
result = nonadmin_client.get_streams(include_all_active=True)
|
|
|
|
fixture = FIXTURES['user-not-authorized-error']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def get_subscribers(client):
|
|
# type: (Client) -> None
|
|
|
|
result = client.get_subscribers(stream='new stream')
|
|
assert result['subscribers'] == ['iago@zulip.com', 'newbie@zulip.com']
|
|
|
|
def get_user_agent(client):
|
|
# type: (Client) -> None
|
|
|
|
result = client.get_user_agent()
|
|
assert result.startswith('ZulipPython/')
|
|
|
|
def list_subscriptions(client):
|
|
# type: (Client) -> None
|
|
# {code_example|start}
|
|
# Get all streams that the user is subscribed to
|
|
result = client.list_subscriptions()
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['get-subscribed-streams']
|
|
test_against_fixture(result, fixture, check_if_equal=['msg', 'result'],
|
|
check_if_exists=['subscriptions'])
|
|
|
|
streams = [s for s in result['subscriptions'] if s['name'] == 'new stream']
|
|
assert streams[0]['description'] == 'New stream for testing'
|
|
|
|
def remove_subscriptions(client):
|
|
# type: (Client) -> None
|
|
|
|
# {code_example|start}
|
|
# Unsubscribe from the stream "new stream"
|
|
result = client.remove_subscriptions(
|
|
['new stream']
|
|
)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['remove-subscriptions']
|
|
test_against_fixture(result, fixture)
|
|
|
|
# test it was actually removed
|
|
result = client.list_subscriptions()
|
|
assert result['result'] == 'success'
|
|
streams = [s for s in result['subscriptions'] if s['name'] == 'new stream']
|
|
assert len(streams) == 0
|
|
|
|
# {code_example|start}
|
|
# Unsubscribe another user from the stream "new stream"
|
|
result = client.remove_subscriptions(
|
|
['new stream'],
|
|
principals=['newbie@zulip.com']
|
|
)
|
|
# {code_example|end}
|
|
|
|
test_against_fixture(result, fixture)
|
|
|
|
def render_message(client):
|
|
# type: (Client) -> None
|
|
|
|
# {code_example|start}
|
|
# Render a message
|
|
request = {
|
|
'content': '**foo**'
|
|
}
|
|
result = client.render_message(request)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['render-message']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def stream_message(client):
|
|
# type: (Client) -> int
|
|
|
|
# {code_example|start}
|
|
# Send a stream message
|
|
request = {
|
|
"type": "stream",
|
|
"to": "Denmark",
|
|
"subject": "Castle",
|
|
"content": "Something is rotten in the state of Denmark."
|
|
}
|
|
result = client.send_message(request)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['stream-message']
|
|
test_against_fixture(result, fixture, check_if_equal=['result'],
|
|
check_if_exists=['id'])
|
|
|
|
# test it was actually sent
|
|
message_id = result['id']
|
|
url = 'messages/' + str(message_id)
|
|
result = client.call_endpoint(
|
|
url=url,
|
|
method='GET'
|
|
)
|
|
assert result['result'] == 'success'
|
|
assert result['raw_content'] == request['content']
|
|
|
|
return message_id
|
|
|
|
def test_nonexistent_stream_error(client):
|
|
# type: (Client) -> None
|
|
request = {
|
|
"type": "stream",
|
|
"to": "nonexistent_stream",
|
|
"subject": "Castle",
|
|
"content": "Something is rotten in the state of Denmark."
|
|
}
|
|
result = client.send_message(request)
|
|
|
|
fixture = FIXTURES['nonexistent-stream-error']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def private_message(client):
|
|
# type: (Client) -> None
|
|
|
|
# {code_example|start}
|
|
# Send a private message
|
|
request = {
|
|
"type": "private",
|
|
"to": "iago@zulip.com",
|
|
"content": "I come not, friends, to steal away your hearts."
|
|
}
|
|
result = client.send_message(request)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['private-message']
|
|
test_against_fixture(result, fixture, check_if_equal=['result'],
|
|
check_if_exists=['id'])
|
|
|
|
# test it was actually sent
|
|
message_id = result['id']
|
|
url = 'messages/' + str(message_id)
|
|
result = client.call_endpoint(
|
|
url=url,
|
|
method='GET'
|
|
)
|
|
assert result['result'] == 'success'
|
|
assert result['raw_content'] == request['content']
|
|
|
|
def test_private_message_invalid_recipient(client):
|
|
# type: (Client) -> None
|
|
request = {
|
|
"type": "private",
|
|
"to": "eeshan@zulip.com",
|
|
"content": "I come not, friends, to steal away your hearts."
|
|
}
|
|
result = client.send_message(request)
|
|
|
|
fixture = FIXTURES['invalid-pm-recipient-error']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def update_message(client, message_id):
|
|
# type: (Client, int) -> None
|
|
|
|
assert int(message_id)
|
|
|
|
# {code_example|start}
|
|
# Edit a message
|
|
# (make sure that message_id below is set to the ID of the
|
|
# message you wish to update)
|
|
request = {
|
|
"message_id": message_id,
|
|
"content": "New content"
|
|
}
|
|
result = client.update_message(request)
|
|
# {code_example|end}
|
|
|
|
validate_against_openapi_schema(result, '/messages/{message_id}', 'patch')
|
|
|
|
# test it was actually updated
|
|
url = 'messages/' + str(message_id)
|
|
result = client.call_endpoint(
|
|
url=url,
|
|
method='GET'
|
|
)
|
|
assert result['result'] == 'success'
|
|
assert result['raw_content'] == request['content']
|
|
|
|
def test_update_message_edit_permission_error(client, nonadmin_client):
|
|
# type: (Client, Client) -> None
|
|
request = {
|
|
"type": "stream",
|
|
"to": "Denmark",
|
|
"subject": "Castle",
|
|
"content": "Something is rotten in the state of Denmark."
|
|
}
|
|
result = client.send_message(request)
|
|
|
|
request = {
|
|
"message_id": result["id"],
|
|
"content": "New content"
|
|
}
|
|
result = nonadmin_client.update_message(request)
|
|
|
|
fixture = FIXTURES['update-message-edit-permission-error']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def register_queue(client):
|
|
# type: (Client) -> str
|
|
|
|
# {code_example|start}
|
|
# Register the queue
|
|
result = client.register(
|
|
event_types=['messages', 'realm_emoji']
|
|
)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['register-queue']
|
|
test_against_fixture(result, fixture, check_if_equal=['msg', 'result'],
|
|
check_if_exists=['last_event_id', 'queue_id'])
|
|
|
|
return result['queue_id']
|
|
|
|
def deregister_queue(client, queue_id):
|
|
# type: (Client, str) -> None
|
|
|
|
# {code_example|start}
|
|
# Delete a queue (queue_id is the ID of the queue
|
|
# to be removed)
|
|
result = client.deregister(queue_id)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['successful-response-empty']
|
|
test_against_fixture(result, fixture)
|
|
|
|
# Test "BAD_EVENT_QUEUE_ID" error
|
|
result = client.deregister(queue_id)
|
|
fixture = FIXTURES['bad_event_queue_id_error']
|
|
test_against_fixture(result, fixture, check_if_equal=['code', 'result'],
|
|
check_if_exists=['queue_id', 'msg'])
|
|
|
|
def upload_file(client):
|
|
# type: (Client) -> None
|
|
fp = StringIO("zulip")
|
|
fp.name = "zulip.txt"
|
|
|
|
# {code_example|start}
|
|
# Upload a file
|
|
# (Make sure that 'fp' is a file object)
|
|
result = client.call_endpoint(
|
|
'user_uploads',
|
|
method='POST',
|
|
files=[fp]
|
|
)
|
|
# {code_example|end}
|
|
|
|
fixture = FIXTURES['upload-file']
|
|
test_against_fixture(result, fixture, check_if_equal=['msg', 'result'],
|
|
check_if_exists=['uri'])
|
|
|
|
def test_invalid_api_key(client_with_invalid_key):
|
|
# type: (Client) -> None
|
|
result = client_with_invalid_key.list_subscriptions()
|
|
fixture = FIXTURES['invalid-api-key']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def test_missing_request_argument(client):
|
|
# type: (Client) -> None
|
|
result = client.render_message({})
|
|
|
|
fixture = FIXTURES['missing-request-argument-error']
|
|
test_against_fixture(result, fixture)
|
|
|
|
def test_invalid_stream_error(client):
|
|
# type: (Client) -> None
|
|
result = client.get_stream_id('nonexistent')
|
|
|
|
fixture = FIXTURES['invalid-stream-error']
|
|
test_against_fixture(result, fixture)
|
|
|
|
TEST_FUNCTIONS = {
|
|
'render-message': render_message,
|
|
'stream-message': stream_message,
|
|
'private-message': private_message,
|
|
'/messages/{message_id}:patch': update_message,
|
|
'get-stream-id': get_stream_id,
|
|
'get-subscribed-streams': list_subscriptions,
|
|
'get-all-streams': get_streams,
|
|
'create-user': create_user,
|
|
'get-profile': get_profile,
|
|
'add-subscriptions': add_subscriptions,
|
|
'remove-subscriptions': remove_subscriptions,
|
|
'get-all-users': get_members,
|
|
'register-queue': register_queue,
|
|
'delete-queue': deregister_queue,
|
|
'upload-file': upload_file,
|
|
}
|
|
|
|
# SETUP METHODS FOLLOW
|
|
|
|
def test_against_fixture(result, fixture, check_if_equal=[], check_if_exists=[]):
|
|
# type: (Dict[str, Any], Dict[str, Any], Optional[Iterable[str]], Optional[Iterable[str]]) -> None
|
|
assertLength(result, fixture)
|
|
|
|
if not check_if_equal and not check_if_exists:
|
|
for key, value in fixture.items():
|
|
assertEqual(key, result, fixture)
|
|
|
|
if check_if_equal:
|
|
for key in check_if_equal:
|
|
assertEqual(key, result, fixture)
|
|
|
|
if check_if_exists:
|
|
for key in check_if_exists:
|
|
assertIn(key, result)
|
|
|
|
def assertEqual(key, result, fixture):
|
|
# type: (str, Dict[str, Any], Dict[str, Any]) -> None
|
|
if result[key] != fixture[key]:
|
|
first = "{key} = {value}".format(key=key, value=result[key])
|
|
second = "{key} = {value}".format(key=key, value=fixture[key])
|
|
raise AssertionError("Actual and expected outputs do not match; showing diff:\n" +
|
|
mdiff.diff_strings(first, second))
|
|
else:
|
|
assert result[key] == fixture[key]
|
|
|
|
def assertLength(result, fixture):
|
|
# type: (Dict[str, Any], Dict[str, Any]) -> None
|
|
if len(result) != len(fixture):
|
|
result_string = json.dumps(result, indent=4, sort_keys=True)
|
|
fixture_string = json.dumps(fixture, indent=4, sort_keys=True)
|
|
raise AssertionError("The lengths of the actual and expected outputs do not match; showing diff:\n" +
|
|
mdiff.diff_strings(result_string, fixture_string))
|
|
else:
|
|
assert len(result) == len(fixture)
|
|
|
|
def assertIn(key, result):
|
|
# type: (str, Dict[str, Any]) -> None
|
|
if key not in result.keys():
|
|
raise AssertionError(
|
|
"The actual output does not contain the the key `{key}`.".format(key=key)
|
|
)
|
|
else:
|
|
assert key in result
|
|
|
|
def test_messages(client):
|
|
# type: (Client) -> None
|
|
|
|
render_message(client)
|
|
message_id = stream_message(client)
|
|
update_message(client, message_id)
|
|
private_message(client)
|
|
|
|
test_nonexistent_stream_error(client)
|
|
test_private_message_invalid_recipient(client)
|
|
|
|
def test_users(client):
|
|
# type: (Client) -> None
|
|
|
|
create_user(client)
|
|
get_members(client)
|
|
get_profile(client)
|
|
upload_file(client)
|
|
|
|
def test_streams(client):
|
|
# type: (Client) -> None
|
|
|
|
add_subscriptions(client)
|
|
test_add_subscriptions_already_subscribed(client)
|
|
list_subscriptions(client)
|
|
get_stream_id(client)
|
|
get_streams(client)
|
|
get_subscribers(client)
|
|
remove_subscriptions(client)
|
|
|
|
def test_queues(client):
|
|
# type: (Client) -> None
|
|
# Note that the example for api/get-events-from-queue is not tested.
|
|
# Since, methods such as client.get_events() or client.call_on_each_message
|
|
# are blocking calls and since the event queue backend is already
|
|
# thoroughly tested in zerver/tests/test_event_queue.py, it is not worth
|
|
# the effort to come up with asynchronous logic for testing those here.
|
|
queue_id = register_queue(client)
|
|
deregister_queue(client, queue_id)
|
|
|
|
def test_errors(client):
|
|
# type: (Client) -> None
|
|
test_missing_request_argument(client)
|
|
test_invalid_stream_error(client)
|
|
|
|
def test_the_api(client):
|
|
# type: (Client) -> None
|
|
|
|
get_user_agent(client)
|
|
test_users(client)
|
|
test_streams(client)
|
|
test_messages(client)
|
|
test_queues(client)
|
|
test_errors(client)
|