mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 22:13:26 +00:00
test_classes: Type kwargs for client_get and friends.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
committed by
Tim Abbott
parent
27977eddeb
commit
dc18aadeb2
@@ -132,6 +132,12 @@ class UploadSerializeMixin(SerializeMixin):
|
||||
super().setUpClass(*args, **kwargs)
|
||||
|
||||
|
||||
# We could be more specific about which arguments are bool (Django's
|
||||
# follow and secure, our intentionally_undocumented) and which are str
|
||||
# (everything else), but explaining that to mypy is tedious.
|
||||
ClientArg = Union[str, bool]
|
||||
|
||||
|
||||
class ZulipTestCase(TestCase):
|
||||
# Ensure that the test system just shows us diffs
|
||||
maxDiff: Optional[int] = None
|
||||
@@ -204,8 +210,9 @@ Output:
|
||||
DEFAULT_SUBDOMAIN = "zulip"
|
||||
TOKENIZED_NOREPLY_REGEX = settings.TOKENIZED_NOREPLY_EMAIL_ADDRESS.format(token="[a-z0-9_]{24}")
|
||||
|
||||
def set_http_headers(self, kwargs: Dict[str, Any]) -> None:
|
||||
def set_http_headers(self, kwargs: Dict[str, ClientArg]) -> None:
|
||||
if "subdomain" in kwargs:
|
||||
assert isinstance(kwargs["subdomain"], str)
|
||||
kwargs["HTTP_HOST"] = Realm.host_for_subdomain(kwargs["subdomain"])
|
||||
del kwargs["subdomain"]
|
||||
elif "HTTP_HOST" not in kwargs:
|
||||
@@ -229,13 +236,13 @@ Output:
|
||||
elif "HTTP_USER_AGENT" not in kwargs:
|
||||
kwargs["HTTP_USER_AGENT"] = default_user_agent
|
||||
|
||||
def extract_api_suffix_url(self, url: str) -> Tuple[str, Dict[str, Any]]:
|
||||
def extract_api_suffix_url(self, url: str) -> Tuple[str, Dict[str, List[str]]]:
|
||||
"""
|
||||
Function that extracts the URL after `/api/v1` or `/json` and also
|
||||
returns the query data in the URL, if there is any.
|
||||
"""
|
||||
url_split = url.split("?")
|
||||
data: Dict[str, Any] = {}
|
||||
data = {}
|
||||
if len(url_split) == 2:
|
||||
data = urllib.parse.parse_qs(url_split[1])
|
||||
url = url_split[0]
|
||||
@@ -248,7 +255,7 @@ Output:
|
||||
method: str,
|
||||
result: HttpResponse,
|
||||
data: Union[str, bytes, Dict[str, Any]],
|
||||
http_headers: Dict[str, Any],
|
||||
kwargs: Dict[str, ClientArg],
|
||||
intentionally_undocumented: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
@@ -276,6 +283,7 @@ Output:
|
||||
content, url, method, str(result.status_code)
|
||||
)
|
||||
if response_validated:
|
||||
http_headers = {k: v for k, v in kwargs.items() if isinstance(v, str)}
|
||||
validate_request(
|
||||
url,
|
||||
method,
|
||||
@@ -292,7 +300,7 @@ Output:
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
intentionally_undocumented: bool = False,
|
||||
**kwargs: Any,
|
||||
**kwargs: ClientArg,
|
||||
) -> HttpResponse:
|
||||
"""
|
||||
We need to urlencode, since Django's function won't do it for us.
|
||||
@@ -313,7 +321,7 @@ Output:
|
||||
|
||||
@instrument_url
|
||||
def client_patch_multipart(
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: Any
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
) -> HttpResponse:
|
||||
"""
|
||||
Use this for patch requests that have file uploads or
|
||||
@@ -330,27 +338,31 @@ Output:
|
||||
self.validate_api_response_openapi(url, "patch", result, info, kwargs)
|
||||
return result
|
||||
|
||||
def json_patch(self, url: str, payload: Dict[str, Any] = {}, **kwargs: Any) -> HttpResponse:
|
||||
def json_patch(
|
||||
self, url: str, payload: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
) -> HttpResponse:
|
||||
data = orjson.dumps(payload)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
return django_client.patch(url, data=data, content_type="application/json", **kwargs)
|
||||
|
||||
@instrument_url
|
||||
def client_put(self, url: str, info: Dict[str, Any] = {}, **kwargs: Any) -> HttpResponse:
|
||||
def client_put(self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg) -> HttpResponse:
|
||||
encoded = urllib.parse.urlencode(info)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
return django_client.put(url, encoded, **kwargs)
|
||||
|
||||
def json_put(self, url: str, payload: Dict[str, Any] = {}, **kwargs: Any) -> HttpResponse:
|
||||
def json_put(self, url: str, payload: Dict[str, Any] = {}, **kwargs: ClientArg) -> HttpResponse:
|
||||
data = orjson.dumps(payload)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
return django_client.put(url, data=data, content_type="application/json", **kwargs)
|
||||
|
||||
@instrument_url
|
||||
def client_delete(self, url: str, info: Dict[str, Any] = {}, **kwargs: Any) -> HttpResponse:
|
||||
def client_delete(
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
) -> HttpResponse:
|
||||
encoded = urllib.parse.urlencode(info)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
@@ -359,14 +371,16 @@ Output:
|
||||
return result
|
||||
|
||||
@instrument_url
|
||||
def client_options(self, url: str, info: Dict[str, Any] = {}, **kwargs: Any) -> HttpResponse:
|
||||
def client_options(
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
) -> HttpResponse:
|
||||
encoded = urllib.parse.urlencode(info)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
return django_client.options(url, encoded, **kwargs)
|
||||
|
||||
@instrument_url
|
||||
def client_head(self, url: str, info: Dict[str, Any] = {}, **kwargs: Any) -> HttpResponse:
|
||||
def client_head(self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg) -> HttpResponse:
|
||||
encoded = urllib.parse.urlencode(info)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
@@ -377,9 +391,10 @@ Output:
|
||||
self,
|
||||
url: str,
|
||||
info: Union[str, bytes, Dict[str, Any]] = {},
|
||||
**kwargs: Any,
|
||||
**kwargs: ClientArg,
|
||||
) -> HttpResponse:
|
||||
intentionally_undocumented: bool = kwargs.pop("intentionally_undocumented", False)
|
||||
intentionally_undocumented = kwargs.pop("intentionally_undocumented", False)
|
||||
assert isinstance(intentionally_undocumented, bool)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
result = django_client.post(url, info, **kwargs)
|
||||
@@ -403,8 +418,9 @@ Output:
|
||||
return match.func(req)
|
||||
|
||||
@instrument_url
|
||||
def client_get(self, url: str, info: Dict[str, Any] = {}, **kwargs: Any) -> HttpResponse:
|
||||
intentionally_undocumented: bool = kwargs.pop("intentionally_undocumented", False)
|
||||
def client_get(self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg) -> HttpResponse:
|
||||
intentionally_undocumented = kwargs.pop("intentionally_undocumented", False)
|
||||
assert isinstance(intentionally_undocumented, bool)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
result = django_client.get(url, info, **kwargs)
|
||||
@@ -541,7 +557,7 @@ Output:
|
||||
self.assertEqual(page_params["is_spectator"], False)
|
||||
|
||||
def login_with_return(
|
||||
self, email: str, password: Optional[str] = None, **kwargs: Any
|
||||
self, email: str, password: Optional[str] = None, **kwargs: ClientArg
|
||||
) -> HttpResponse:
|
||||
if password is None:
|
||||
password = initial_password(email)
|
||||
@@ -636,10 +652,10 @@ Output:
|
||||
default_stream_groups: Sequence[str] = [],
|
||||
source_realm_id: str = "",
|
||||
key: Optional[str] = None,
|
||||
realm_type: Optional[int] = Realm.ORG_TYPES["business"]["id"],
|
||||
enable_marketing_emails: Optional[bool] = True,
|
||||
is_demo_organization: Optional[bool] = False,
|
||||
**kwargs: Any,
|
||||
realm_type: int = Realm.ORG_TYPES["business"]["id"],
|
||||
enable_marketing_emails: bool = True,
|
||||
is_demo_organization: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
) -> HttpResponse:
|
||||
"""
|
||||
Stage two of the two-step registration process.
|
||||
@@ -734,33 +750,56 @@ Output:
|
||||
credentials = f"{identifier}:{api_key}"
|
||||
return "Basic " + base64.b64encode(credentials.encode()).decode()
|
||||
|
||||
def uuid_get(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
def uuid_get(
|
||||
self, identifier: str, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
) -> HttpResponse:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_uuid(identifier)
|
||||
return self.client_get(*args, **kwargs)
|
||||
return self.client_get(url, info, **kwargs)
|
||||
|
||||
def uuid_post(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
def uuid_post(
|
||||
self,
|
||||
identifier: str,
|
||||
url: str,
|
||||
info: Union[str, bytes, Dict[str, Any]] = {},
|
||||
**kwargs: ClientArg,
|
||||
) -> HttpResponse:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_uuid(identifier)
|
||||
return self.client_post(*args, **kwargs)
|
||||
return self.client_post(url, info, **kwargs)
|
||||
|
||||
def api_get(self, user: UserProfile, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_get(*args, **kwargs)
|
||||
|
||||
def api_post(
|
||||
self, user: UserProfile, *args: Any, intentionally_undocumented: bool = False, **kwargs: Any
|
||||
def api_get(
|
||||
self, user: UserProfile, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
) -> HttpResponse:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_post(
|
||||
*args, intentionally_undocumented=intentionally_undocumented, **kwargs
|
||||
return self.client_get(url, info, **kwargs)
|
||||
|
||||
def api_post(
|
||||
self,
|
||||
user: UserProfile,
|
||||
url: str,
|
||||
info: Union[str, bytes, Dict[str, Any]] = {},
|
||||
**kwargs: ClientArg,
|
||||
) -> HttpResponse:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_post(url, info, **kwargs)
|
||||
|
||||
def api_patch(
|
||||
self,
|
||||
user: UserProfile,
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
intentionally_undocumented: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
) -> HttpResponse:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_patch(
|
||||
url, info, intentionally_undocumented=intentionally_undocumented, **kwargs
|
||||
)
|
||||
|
||||
def api_patch(self, user: UserProfile, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
def api_delete(
|
||||
self, user: UserProfile, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
) -> HttpResponse:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_patch(*args, **kwargs)
|
||||
|
||||
def api_delete(self, user: UserProfile, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_delete(*args, **kwargs)
|
||||
return self.client_delete(url, info, **kwargs)
|
||||
|
||||
def get_streams(self, user_profile: UserProfile) -> List[str]:
|
||||
"""
|
||||
@@ -1049,7 +1088,7 @@ Output:
|
||||
invite_only: bool = False,
|
||||
is_web_public: bool = False,
|
||||
allow_fail: bool = False,
|
||||
**kwargs: Any,
|
||||
**kwargs: ClientArg,
|
||||
) -> HttpResponse:
|
||||
post_data = {
|
||||
"subscriptions": orjson.dumps([{"name": stream} for stream in streams]).decode(),
|
||||
@@ -1082,7 +1121,7 @@ Output:
|
||||
user_profile: UserProfile,
|
||||
url: str,
|
||||
payload: Union[str, Dict[str, Any]],
|
||||
**post_params: Any,
|
||||
**post_params: ClientArg,
|
||||
) -> Message:
|
||||
"""
|
||||
Send a webhook payload to the server, and verify that the
|
||||
@@ -1561,9 +1600,20 @@ You can fix this by adding "{complete_event_type}" to ALL_EVENT_TYPES for this w
|
||||
self.patch.start()
|
||||
self.addCleanup(self.patch.stop)
|
||||
|
||||
def api_stream_message(self, user: UserProfile, *args: Any, **kwargs: Any) -> HttpResponse:
|
||||
def api_stream_message(
|
||||
self,
|
||||
user: UserProfile,
|
||||
fixture_name: str,
|
||||
expected_topic: Optional[str] = None,
|
||||
expected_message: Optional[str] = None,
|
||||
content_type: Optional[str] = "application/json",
|
||||
expect_noop: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
) -> HttpResponse:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.check_webhook(*args, **kwargs)
|
||||
return self.check_webhook(
|
||||
fixture_name, expected_topic, expected_message, content_type, expect_noop, **kwargs
|
||||
)
|
||||
|
||||
def check_webhook(
|
||||
self,
|
||||
@@ -1571,8 +1621,8 @@ You can fix this by adding "{complete_event_type}" to ALL_EVENT_TYPES for this w
|
||||
expected_topic: Optional[str] = None,
|
||||
expected_message: Optional[str] = None,
|
||||
content_type: Optional[str] = "application/json",
|
||||
expect_noop: Optional[bool] = False,
|
||||
**kwargs: Any,
|
||||
expect_noop: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
) -> None:
|
||||
"""
|
||||
check_webhook is the main way to test "normal" webhooks that
|
||||
@@ -1652,7 +1702,9 @@ one or more new messages.
|
||||
fixture_name: str,
|
||||
expected_message: str,
|
||||
content_type: str = "application/json",
|
||||
**kwargs: Any,
|
||||
*,
|
||||
sender: Optional[UserProfile] = None,
|
||||
**kwargs: ClientArg,
|
||||
) -> Message:
|
||||
"""
|
||||
For the rare cases that you are testing a webhook that sends
|
||||
@@ -1668,8 +1720,9 @@ one or more new messages.
|
||||
headers = get_fixture_http_headers(self.WEBHOOK_DIR_NAME, fixture_name)
|
||||
headers = standardize_headers(headers)
|
||||
kwargs.update(headers)
|
||||
# The sender profile shouldn't be passed any further in kwargs, so we pop it.
|
||||
sender = kwargs.pop("sender", self.test_user)
|
||||
|
||||
if sender is None:
|
||||
sender = self.test_user
|
||||
|
||||
msg = self.send_webhook_payload(
|
||||
sender,
|
||||
@@ -1681,7 +1734,7 @@ one or more new messages.
|
||||
|
||||
return msg
|
||||
|
||||
def build_webhook_url(self, *args: Any, **kwargs: Any) -> str:
|
||||
def build_webhook_url(self, *args: str, **kwargs: str) -> str:
|
||||
url = self.URL_TEMPLATE
|
||||
if url.find("api_key") >= 0:
|
||||
api_key = get_api_key(self.test_user)
|
||||
|
||||
Reference in New Issue
Block a user