mirror of
https://github.com/zulip/zulip.git
synced 2025-11-06 15:03:34 +00:00
tests: Tighten signature of the wrapped test client helpers.
We wrap methods of the django test client for the test suite, and type keyword variadic arguments as `ClientArg` as it might called with a mix of `bool` and `str`. This is problematic when we call the original methods on the test client as we attempt to unpack the dictionary of keyword arguments, which has no type guarantee that certain keys that the test client requires to be bool will certainly be bool. For example, you can call `self.client_post(url, info, follow="invalid")` without getting a mypy error while the django test client requires `follow: bool`. The unsafely typed keyword variadic arguments leads to error within the body the wrapped test client functions as we call `django_client.post` with `**kwargs` when django-stubs gets added, making it necessary to refactor these wrappers for type safety. The approach here minimizes the need to refactor callers, as we keep `kwargs` being variadic while change its type from `ClientArg` to `str` after defining all the possible `bool` arguments that might previously appear in `kwargs`. We also copy the defaults from the django test client as they are unlikely to change. The tornado test cases are also refactored due to the change of the signature of `set_http_headers` with the `skip_user_agent` being added as a keyword argument. We want to unconditionally set this flag to `True` because the `HTTP_USER_AGENT` is not supported. It also removes a unnecessary duplication of an argument. This is a part of the django-stubs refactorings. Signed-off-by: Zixuan James Li <p359101898@gmail.com>
This commit is contained in:
committed by
Tim Abbott
parent
8a9f06d5bc
commit
b65401ed47
@@ -137,12 +137,6 @@ class UploadSerializeMixin(SerializeMixin):
|
||||
super().setUpClass()
|
||||
|
||||
|
||||
# We could be more specific about which arguments are bool (Django's
|
||||
# follow and secure, our intentionally_undocumented) and which are str
|
||||
# (everything else), but explaining that to mypy is tedious.
|
||||
ClientArg = Union[str, bool]
|
||||
|
||||
|
||||
class ZulipTestCase(TestCase):
|
||||
# Ensure that the test system just shows us diffs
|
||||
maxDiff: Optional[int] = None
|
||||
@@ -215,16 +209,16 @@ Output:
|
||||
DEFAULT_SUBDOMAIN = "zulip"
|
||||
TOKENIZED_NOREPLY_REGEX = settings.TOKENIZED_NOREPLY_EMAIL_ADDRESS.format(token="[a-z0-9_]{24}")
|
||||
|
||||
def set_http_headers(self, kwargs: Dict[str, ClientArg]) -> None:
|
||||
if "subdomain" in kwargs:
|
||||
assert isinstance(kwargs["subdomain"], str)
|
||||
kwargs["HTTP_HOST"] = Realm.host_for_subdomain(kwargs["subdomain"])
|
||||
del kwargs["subdomain"]
|
||||
elif "HTTP_HOST" not in kwargs:
|
||||
kwargs["HTTP_HOST"] = Realm.host_for_subdomain(self.DEFAULT_SUBDOMAIN)
|
||||
def set_http_headers(self, extra: Dict[str, str], skip_user_agent: bool = False) -> None:
|
||||
if "subdomain" in extra:
|
||||
assert isinstance(extra["subdomain"], str)
|
||||
extra["HTTP_HOST"] = Realm.host_for_subdomain(extra["subdomain"])
|
||||
del extra["subdomain"]
|
||||
elif "HTTP_HOST" not in extra:
|
||||
extra["HTTP_HOST"] = Realm.host_for_subdomain(self.DEFAULT_SUBDOMAIN)
|
||||
|
||||
# set User-Agent
|
||||
if "HTTP_AUTHORIZATION" in kwargs:
|
||||
if "HTTP_AUTHORIZATION" in extra:
|
||||
# An API request; use mobile as the default user agent
|
||||
default_user_agent = "ZulipMobile/26.22.145 (iOS 10.3.1)"
|
||||
else:
|
||||
@@ -234,12 +228,11 @@ Output:
|
||||
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||
+ "Chrome/79.0.3945.130 Safari/537.36"
|
||||
)
|
||||
if kwargs.get("skip_user_agent"):
|
||||
if skip_user_agent:
|
||||
# Provide a way to disable setting User-Agent if desired.
|
||||
assert "HTTP_USER_AGENT" not in kwargs
|
||||
del kwargs["skip_user_agent"]
|
||||
elif "HTTP_USER_AGENT" not in kwargs:
|
||||
kwargs["HTTP_USER_AGENT"] = default_user_agent
|
||||
assert "HTTP_USER_AGENT" not in extra
|
||||
elif "HTTP_USER_AGENT" not in extra:
|
||||
extra["HTTP_USER_AGENT"] = default_user_agent
|
||||
|
||||
def extract_api_suffix_url(self, url: str) -> Tuple[str, Dict[str, List[str]]]:
|
||||
"""
|
||||
@@ -260,7 +253,7 @@ Output:
|
||||
method: str,
|
||||
result: "TestHttpResponse",
|
||||
data: Union[str, bytes, Dict[str, Any]],
|
||||
kwargs: Dict[str, ClientArg],
|
||||
extra: Dict[str, str],
|
||||
intentionally_undocumented: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
@@ -288,12 +281,11 @@ Output:
|
||||
content, url, method, str(result.status_code)
|
||||
)
|
||||
if response_validated:
|
||||
http_headers = {k: v for k, v in kwargs.items() if isinstance(v, str)}
|
||||
validate_request(
|
||||
url,
|
||||
method,
|
||||
data,
|
||||
http_headers,
|
||||
extra,
|
||||
json_url,
|
||||
str(result.status_code),
|
||||
intentionally_undocumented=intentionally_undocumented,
|
||||
@@ -304,30 +296,40 @@ Output:
|
||||
self,
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
intentionally_undocumented: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
"""
|
||||
We need to urlencode, since Django's function won't do it for us.
|
||||
"""
|
||||
encoded = urllib.parse.urlencode(info)
|
||||
kwargs["content_type"] = "application/x-www-form-urlencoded"
|
||||
extra["content_type"] = "application/x-www-form-urlencoded"
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
result = django_client.patch(url, encoded, **kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
result = django_client.patch(url, encoded, follow=follow, secure=secure, **extra)
|
||||
self.validate_api_response_openapi(
|
||||
url,
|
||||
"patch",
|
||||
result,
|
||||
info,
|
||||
kwargs,
|
||||
extra,
|
||||
intentionally_undocumented=intentionally_undocumented,
|
||||
)
|
||||
return result
|
||||
|
||||
@instrument_url
|
||||
def client_patch_multipart(
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self,
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
intentionally_undocumented: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
"""
|
||||
Use this for patch requests that have file uploads or
|
||||
@@ -339,79 +341,143 @@ Output:
|
||||
"""
|
||||
encoded = encode_multipart(BOUNDARY, info)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
result = django_client.patch(url, encoded, content_type=MULTIPART_CONTENT, **kwargs)
|
||||
self.validate_api_response_openapi(url, "patch", result, info, kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
result = django_client.patch(
|
||||
url, encoded, content_type=MULTIPART_CONTENT, follow=follow, secure=secure, **extra
|
||||
)
|
||||
self.validate_api_response_openapi(
|
||||
url,
|
||||
"patch",
|
||||
result,
|
||||
info,
|
||||
extra,
|
||||
intentionally_undocumented=intentionally_undocumented,
|
||||
)
|
||||
return result
|
||||
|
||||
def json_patch(
|
||||
self, url: str, payload: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self,
|
||||
url: str,
|
||||
payload: Dict[str, Any] = {},
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
data = orjson.dumps(payload)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
return django_client.patch(url, data=data, content_type="application/json", **kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
return django_client.patch(
|
||||
url, data=data, content_type="application/json", follow=follow, secure=secure, **extra
|
||||
)
|
||||
|
||||
@instrument_url
|
||||
def client_put(
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self,
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
encoded = urllib.parse.urlencode(info)
|
||||
kwargs["content_type"] = "application/x-www-form-urlencoded"
|
||||
extra["content_type"] = "application/x-www-form-urlencoded"
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
return django_client.put(url, encoded, **kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
return django_client.put(url, encoded, follow=follow, secure=secure, **extra)
|
||||
|
||||
def json_put(
|
||||
self, url: str, payload: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self,
|
||||
url: str,
|
||||
payload: Dict[str, Any] = {},
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
data = orjson.dumps(payload)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
return django_client.put(url, data=data, content_type="application/json", **kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
return django_client.put(
|
||||
url, data=data, content_type="application/json", follow=follow, secure=secure, **extra
|
||||
)
|
||||
|
||||
@instrument_url
|
||||
def client_delete(
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self,
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
intentionally_undocumented: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
encoded = urllib.parse.urlencode(info)
|
||||
kwargs["content_type"] = "application/x-www-form-urlencoded"
|
||||
extra["content_type"] = "application/x-www-form-urlencoded"
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
result = django_client.delete(url, encoded, **kwargs)
|
||||
self.validate_api_response_openapi(url, "delete", result, info, kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
result = django_client.delete(url, encoded, follow=follow, secure=secure, **extra)
|
||||
self.validate_api_response_openapi(
|
||||
url,
|
||||
"delete",
|
||||
result,
|
||||
info,
|
||||
extra,
|
||||
intentionally_undocumented=intentionally_undocumented,
|
||||
)
|
||||
return result
|
||||
|
||||
@instrument_url
|
||||
def client_options(
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self,
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
return django_client.options(url, info, **kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
return django_client.options(url, info, follow=follow, secure=secure, **extra)
|
||||
|
||||
@instrument_url
|
||||
def client_head(
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self,
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
return django_client.head(url, info, **kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
return django_client.head(url, info, follow=follow, secure=secure, **extra)
|
||||
|
||||
@instrument_url
|
||||
def client_post(
|
||||
self,
|
||||
url: str,
|
||||
info: Union[str, bytes, Dict[str, Any]] = {},
|
||||
**kwargs: ClientArg,
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
intentionally_undocumented: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
intentionally_undocumented = kwargs.pop("intentionally_undocumented", False)
|
||||
assert isinstance(intentionally_undocumented, bool)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
result = django_client.post(url, info, **kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
result = django_client.post(url, info, follow=follow, secure=secure, **extra)
|
||||
self.validate_api_response_openapi(
|
||||
url, "post", result, info, kwargs, intentionally_undocumented=intentionally_undocumented
|
||||
url,
|
||||
"post",
|
||||
result,
|
||||
info,
|
||||
extra,
|
||||
intentionally_undocumented=intentionally_undocumented,
|
||||
)
|
||||
return result
|
||||
|
||||
@@ -431,15 +497,20 @@ Output:
|
||||
|
||||
@instrument_url
|
||||
def client_get(
|
||||
self, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self,
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
skip_user_agent: bool = False,
|
||||
follow: bool = False,
|
||||
secure: bool = False,
|
||||
intentionally_undocumented: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
intentionally_undocumented = kwargs.pop("intentionally_undocumented", False)
|
||||
assert isinstance(intentionally_undocumented, bool)
|
||||
django_client = self.client # see WRAPPER_COMMENT
|
||||
self.set_http_headers(kwargs)
|
||||
result = django_client.get(url, info, **kwargs)
|
||||
self.set_http_headers(extra, skip_user_agent)
|
||||
result = django_client.get(url, info, follow=follow, secure=secure, **extra)
|
||||
self.validate_api_response_openapi(
|
||||
url, "get", result, info, kwargs, intentionally_undocumented=intentionally_undocumented
|
||||
url, "get", result, info, extra, intentionally_undocumented=intentionally_undocumented
|
||||
)
|
||||
return result
|
||||
|
||||
@@ -573,12 +644,18 @@ Output:
|
||||
self.assertEqual(page_params["is_spectator"], False)
|
||||
|
||||
def login_with_return(
|
||||
self, email: str, password: Optional[str] = None, **kwargs: ClientArg
|
||||
self, email: str, password: Optional[str] = None, **extra: str
|
||||
) -> "TestHttpResponse":
|
||||
if password is None:
|
||||
password = initial_password(email)
|
||||
result = self.client_post(
|
||||
"/accounts/login/", {"username": email, "password": password}, **kwargs
|
||||
"/accounts/login/",
|
||||
{"username": email, "password": password},
|
||||
skip_user_agent=False,
|
||||
follow=False,
|
||||
secure=False,
|
||||
intentionally_undocumented=False,
|
||||
**extra,
|
||||
)
|
||||
self.assertNotEqual(result.status_code, 500)
|
||||
return result
|
||||
@@ -675,7 +752,7 @@ Output:
|
||||
realm_type: int = Realm.ORG_TYPES["business"]["id"],
|
||||
enable_marketing_emails: Optional[bool] = None,
|
||||
is_demo_organization: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
"""
|
||||
Stage two of the two-step registration process.
|
||||
@@ -683,7 +760,7 @@ Output:
|
||||
If things are working correctly the account should be fully
|
||||
registered after this call.
|
||||
|
||||
You can pass the HTTP_HOST variable for subdomains via kwargs.
|
||||
You can pass the HTTP_HOST variable for subdomains via extra.
|
||||
"""
|
||||
if full_name is None:
|
||||
full_name = email.replace("@", "_")
|
||||
@@ -706,7 +783,15 @@ Output:
|
||||
payload["password"] = password
|
||||
if realm_in_root_domain is not None:
|
||||
payload["realm_in_root_domain"] = realm_in_root_domain
|
||||
return self.client_post("/accounts/register/", payload, **kwargs)
|
||||
return self.client_post(
|
||||
"/accounts/register/",
|
||||
payload,
|
||||
skip_user_agent=False,
|
||||
follow=False,
|
||||
secure=False,
|
||||
intentionally_undocumented=False,
|
||||
**extra,
|
||||
)
|
||||
|
||||
def get_confirmation_url_from_outbox(
|
||||
self,
|
||||
@@ -772,55 +857,97 @@ Output:
|
||||
return "Basic " + base64.b64encode(credentials.encode()).decode()
|
||||
|
||||
def uuid_get(
|
||||
self, identifier: str, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self, identifier: str, url: str, info: Dict[str, Any] = {}, **extra: str
|
||||
) -> "TestHttpResponse":
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_uuid(identifier)
|
||||
return self.client_get(url, info, **kwargs)
|
||||
extra["HTTP_AUTHORIZATION"] = self.encode_uuid(identifier)
|
||||
return self.client_get(
|
||||
url,
|
||||
info,
|
||||
skip_user_agent=False,
|
||||
follow=False,
|
||||
secure=False,
|
||||
intentionally_undocumented=False,
|
||||
**extra,
|
||||
)
|
||||
|
||||
def uuid_post(
|
||||
self,
|
||||
identifier: str,
|
||||
url: str,
|
||||
info: Union[str, bytes, Dict[str, Any]] = {},
|
||||
**kwargs: ClientArg,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_uuid(identifier)
|
||||
return self.client_post(url, info, **kwargs)
|
||||
extra["HTTP_AUTHORIZATION"] = self.encode_uuid(identifier)
|
||||
return self.client_post(
|
||||
url,
|
||||
info,
|
||||
skip_user_agent=False,
|
||||
follow=False,
|
||||
secure=False,
|
||||
intentionally_undocumented=False,
|
||||
**extra,
|
||||
)
|
||||
|
||||
def api_get(
|
||||
self, user: UserProfile, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self, user: UserProfile, url: str, info: Dict[str, Any] = {}, **extra: str
|
||||
) -> "TestHttpResponse":
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_get(url, info, **kwargs)
|
||||
extra["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_get(
|
||||
url,
|
||||
info,
|
||||
skip_user_agent=False,
|
||||
follow=False,
|
||||
secure=False,
|
||||
intentionally_undocumented=False,
|
||||
**extra,
|
||||
)
|
||||
|
||||
def api_post(
|
||||
self,
|
||||
user: UserProfile,
|
||||
url: str,
|
||||
info: Union[str, bytes, Dict[str, Any]] = {},
|
||||
**kwargs: ClientArg,
|
||||
intentionally_undocumented: bool = False,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_post(url, info, **kwargs)
|
||||
extra["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_post(
|
||||
url,
|
||||
info,
|
||||
skip_user_agent=False,
|
||||
follow=False,
|
||||
secure=False,
|
||||
intentionally_undocumented=intentionally_undocumented,
|
||||
**extra,
|
||||
)
|
||||
|
||||
def api_patch(
|
||||
self,
|
||||
user: UserProfile,
|
||||
url: str,
|
||||
info: Dict[str, Any] = {},
|
||||
intentionally_undocumented: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
self, user: UserProfile, url: str, info: Dict[str, Any] = {}, **extra: str
|
||||
) -> "TestHttpResponse":
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
extra["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_patch(
|
||||
url, info, intentionally_undocumented=intentionally_undocumented, **kwargs
|
||||
url,
|
||||
info,
|
||||
skip_user_agent=False,
|
||||
follow=False,
|
||||
secure=False,
|
||||
intentionally_undocumented=False,
|
||||
**extra,
|
||||
)
|
||||
|
||||
def api_delete(
|
||||
self, user: UserProfile, url: str, info: Dict[str, Any] = {}, **kwargs: ClientArg
|
||||
self, user: UserProfile, url: str, info: Dict[str, Any] = {}, **extra: str
|
||||
) -> "TestHttpResponse":
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_delete(url, info, **kwargs)
|
||||
extra["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
return self.client_delete(
|
||||
url,
|
||||
info,
|
||||
skip_user_agent=False,
|
||||
follow=False,
|
||||
secure=False,
|
||||
intentionally_undocumented=False,
|
||||
**extra,
|
||||
)
|
||||
|
||||
def get_streams(self, user_profile: UserProfile) -> List[str]:
|
||||
"""
|
||||
@@ -1119,7 +1246,7 @@ Output:
|
||||
invite_only: bool = False,
|
||||
is_web_public: bool = False,
|
||||
allow_fail: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
**extra: str,
|
||||
) -> "TestHttpResponse":
|
||||
post_data = {
|
||||
"subscriptions": orjson.dumps([{"name": stream} for stream in streams]).decode(),
|
||||
@@ -1127,7 +1254,13 @@ Output:
|
||||
"invite_only": orjson.dumps(invite_only).decode(),
|
||||
}
|
||||
post_data.update(extra_post_data)
|
||||
result = self.api_post(user, "/api/v1/users/me/subscriptions", post_data, **kwargs)
|
||||
result = self.api_post(
|
||||
user,
|
||||
"/api/v1/users/me/subscriptions",
|
||||
post_data,
|
||||
intentionally_undocumented=False,
|
||||
**extra,
|
||||
)
|
||||
if not allow_fail:
|
||||
self.assert_json_success(result)
|
||||
return result
|
||||
@@ -1152,7 +1285,7 @@ Output:
|
||||
user_profile: UserProfile,
|
||||
url: str,
|
||||
payload: Union[str, Dict[str, Any]],
|
||||
**post_params: ClientArg,
|
||||
**extra: str,
|
||||
) -> Message:
|
||||
"""
|
||||
Send a webhook payload to the server, and verify that the
|
||||
@@ -1175,7 +1308,15 @@ Output:
|
||||
|
||||
prior_msg = self.get_last_message()
|
||||
|
||||
result = self.client_post(url, payload, **post_params)
|
||||
result = self.client_post(
|
||||
url,
|
||||
payload,
|
||||
skip_user_agent=False,
|
||||
follow=False,
|
||||
secure=False,
|
||||
intentionally_undocumented=False,
|
||||
**extra,
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# Check the correct message was sent
|
||||
@@ -1663,11 +1804,16 @@ You can fix this by adding "{complete_event_type}" to ALL_EVENT_TYPES for this w
|
||||
expected_message: Optional[str] = None,
|
||||
content_type: Optional[str] = "application/json",
|
||||
expect_noop: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
**extra: str,
|
||||
) -> None:
|
||||
kwargs["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
extra["HTTP_AUTHORIZATION"] = self.encode_user(user)
|
||||
self.check_webhook(
|
||||
fixture_name, expected_topic, expected_message, content_type, expect_noop, **kwargs
|
||||
fixture_name,
|
||||
expected_topic,
|
||||
expected_message,
|
||||
content_type,
|
||||
expect_noop,
|
||||
**extra,
|
||||
)
|
||||
|
||||
def check_webhook(
|
||||
@@ -1677,7 +1823,7 @@ You can fix this by adding "{complete_event_type}" to ALL_EVENT_TYPES for this w
|
||||
expected_message: Optional[str] = None,
|
||||
content_type: Optional[str] = "application/json",
|
||||
expect_noop: bool = False,
|
||||
**kwargs: ClientArg,
|
||||
**extra: str,
|
||||
) -> None:
|
||||
"""
|
||||
check_webhook is the main way to test "normal" webhooks that
|
||||
@@ -1692,7 +1838,7 @@ You can fix this by adding "{complete_event_type}" to ALL_EVENT_TYPES for this w
|
||||
expected_message: content
|
||||
|
||||
We simulate the delivery of the payload with `content_type`,
|
||||
and you can pass other headers via `kwargs`.
|
||||
and you can pass other headers via `extra`.
|
||||
|
||||
For the rare cases of webhooks actually sending private messages,
|
||||
see send_and_test_private_message.
|
||||
@@ -1704,17 +1850,17 @@ You can fix this by adding "{complete_event_type}" to ALL_EVENT_TYPES for this w
|
||||
|
||||
payload = self.get_payload(fixture_name)
|
||||
if content_type is not None:
|
||||
kwargs["content_type"] = content_type
|
||||
extra["content_type"] = content_type
|
||||
if self.WEBHOOK_DIR_NAME is not None:
|
||||
headers = get_fixture_http_headers(self.WEBHOOK_DIR_NAME, fixture_name)
|
||||
headers = standardize_headers(headers)
|
||||
kwargs.update(headers)
|
||||
extra.update(headers)
|
||||
try:
|
||||
msg = self.send_webhook_payload(
|
||||
self.test_user,
|
||||
self.url,
|
||||
payload,
|
||||
**kwargs,
|
||||
**extra,
|
||||
)
|
||||
except EmptyResponseError:
|
||||
if expect_noop:
|
||||
@@ -1759,7 +1905,7 @@ one or more new messages.
|
||||
content_type: str = "application/json",
|
||||
*,
|
||||
sender: Optional[UserProfile] = None,
|
||||
**kwargs: ClientArg,
|
||||
**extra: str,
|
||||
) -> Message:
|
||||
"""
|
||||
For the rare cases that you are testing a webhook that sends
|
||||
@@ -1769,12 +1915,12 @@ one or more new messages.
|
||||
check_webhook.
|
||||
"""
|
||||
payload = self.get_payload(fixture_name)
|
||||
kwargs["content_type"] = content_type
|
||||
extra["content_type"] = content_type
|
||||
|
||||
if self.WEBHOOK_DIR_NAME is not None:
|
||||
headers = get_fixture_http_headers(self.WEBHOOK_DIR_NAME, fixture_name)
|
||||
headers = standardize_headers(headers)
|
||||
kwargs.update(headers)
|
||||
extra.update(headers)
|
||||
|
||||
if sender is None:
|
||||
sender = self.test_user
|
||||
@@ -1783,7 +1929,7 @@ one or more new messages.
|
||||
sender,
|
||||
self.url,
|
||||
payload,
|
||||
**kwargs,
|
||||
**extra,
|
||||
)
|
||||
self.assertEqual(msg.content, expected_message)
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ if TYPE_CHECKING:
|
||||
from django.test.client import _MonkeyPatchedWSGIResponse as TestHttpResponse
|
||||
|
||||
# Avoid an import cycle; we only need these for type annotations.
|
||||
from zerver.lib.test_classes import ClientArg, MigrationsTestCase, ZulipTestCase
|
||||
from zerver.lib.test_classes import MigrationsTestCase, ZulipTestCase
|
||||
|
||||
|
||||
class MockLDAP(fakeldap.MockLDAP):
|
||||
@@ -363,12 +363,13 @@ def append_instrumentation_data(data: Dict[str, Any]) -> None:
|
||||
|
||||
|
||||
def instrument_url(f: UrlFuncT) -> UrlFuncT:
|
||||
# TODO: Type this with ParamSpec to preserve the function signature.
|
||||
if not INSTRUMENTING: # nocoverage -- option is always enabled; should we remove?
|
||||
return f
|
||||
else:
|
||||
|
||||
def wrapper(
|
||||
self: "ZulipTestCase", url: str, info: object = {}, **kwargs: "ClientArg"
|
||||
self: "ZulipTestCase", url: str, info: object = {}, **kwargs: Union[bool, str]
|
||||
) -> HttpResponseBase:
|
||||
start = time.time()
|
||||
result = f(self, url, info, **kwargs)
|
||||
|
||||
@@ -66,8 +66,7 @@ class TornadoWebTestCase(AsyncHTTPTestCase, ZulipTestCase):
|
||||
|
||||
async def tornado_client_get(self, path: str, **kwargs: Any) -> HTTPResponse:
|
||||
self.add_session_cookie(kwargs)
|
||||
kwargs["skip_user_agent"] = True
|
||||
self.set_http_headers(kwargs)
|
||||
self.set_http_headers(kwargs, skip_user_agent=True)
|
||||
if "HTTP_HOST" in kwargs:
|
||||
kwargs["headers"]["Host"] = kwargs["HTTP_HOST"]
|
||||
del kwargs["HTTP_HOST"]
|
||||
@@ -75,16 +74,14 @@ class TornadoWebTestCase(AsyncHTTPTestCase, ZulipTestCase):
|
||||
|
||||
async def fetch_async(self, method: str, path: str, **kwargs: Any) -> HTTPResponse:
|
||||
self.add_session_cookie(kwargs)
|
||||
kwargs["skip_user_agent"] = True
|
||||
self.set_http_headers(kwargs)
|
||||
self.set_http_headers(kwargs, skip_user_agent=True)
|
||||
if "HTTP_HOST" in kwargs:
|
||||
kwargs["headers"]["Host"] = kwargs["HTTP_HOST"]
|
||||
del kwargs["HTTP_HOST"]
|
||||
return await self.http_client.fetch(self.get_url(path), method=method, **kwargs)
|
||||
|
||||
async def client_get_async(self, path: str, **kwargs: Any) -> HTTPResponse:
|
||||
kwargs["skip_user_agent"] = True
|
||||
self.set_http_headers(kwargs)
|
||||
self.set_http_headers(kwargs, skip_user_agent=True)
|
||||
return await self.fetch_async("GET", path, **kwargs)
|
||||
|
||||
def login_user(self, *args: Any, **kwargs: Any) -> None:
|
||||
@@ -108,7 +105,6 @@ class TornadoWebTestCase(AsyncHTTPTestCase, ZulipTestCase):
|
||||
response = await self.tornado_client_get(
|
||||
"/json/events?dont_block=true",
|
||||
subdomain="zulip",
|
||||
skip_user_agent=True,
|
||||
)
|
||||
self.assertEqual(response.code, 200)
|
||||
body = orjson.loads(response.body)
|
||||
|
||||
Reference in New Issue
Block a user