mirror of
https://github.com/zulip/zulip.git
synced 2025-10-23 04:52:12 +00:00
scim: Add support for Groups.
This commit is contained in:
committed by
Tim Abbott
parent
a0b8e916c5
commit
254b0ff477
50
help/scim.md
50
help/scim.md
@@ -9,17 +9,16 @@ integration, both in Zulip Cloud and for [self-hosted](/self-hosting/)
|
||||
Zulip servers. This page describes how to configure SCIM provisioning
|
||||
for Zulip.
|
||||
|
||||
Zulip's SCIM integration has the following limitations:
|
||||
{!cloud-plus-only.md!}
|
||||
|
||||
* Provisioning Groups is not yet implemented.
|
||||
* While Zulip's SCIM integration is generic, we've only fully
|
||||
!!! tip ""
|
||||
|
||||
While Zulip's SCIM integration is generic, we've only fully
|
||||
documented the setup process with the Okta and Microsoft EntraID
|
||||
SCIM providers. [Zulip support](/help/contact-support) is happy to
|
||||
help customers configure this integration with SCIM providers that
|
||||
do not yet have detailed self-service documentation on this page.
|
||||
|
||||
{!cloud-plus-only.md!}
|
||||
|
||||
## Configure SCIM
|
||||
|
||||
{start_tabs}
|
||||
@@ -160,6 +159,47 @@ Zulip's SCIM integration has the following limitations:
|
||||
|
||||
Once SCIM has been configured, consider also [configuring SAML](/help/saml-authentication).
|
||||
|
||||
## Synchronizing group membership with SCIM
|
||||
|
||||
Zulip also supports syncing of users' [groups](/help/user-groups) via SCIM.
|
||||
|
||||
{start_tabs}
|
||||
|
||||
{tab|okta}
|
||||
|
||||
1. Follow the instructions [above](#configure-scim) to configure SCIM.
|
||||
|
||||
1. Open the **Application** you set up above for the Zulip SCIM integration, and
|
||||
go to the **Push groups** tab. This menu allows you to choose the Okta groups
|
||||
which should be synchronized with Zulip's user groups.
|
||||
|
||||
1. Make sure you're familiar with the below rules governing SCIM group sync behavior:
|
||||
|
||||
* When you set up an Okta group in the **Push groups** tab, the SCIM
|
||||
integration will create a user group in Zulip with the matching name and
|
||||
user memberships.
|
||||
* When you add or remove users from the group in Okta, these changes will
|
||||
be immediately be reflected in group memberships in Zulip.
|
||||
* You can only enable synchronization of groups which do not yet exist in Zulip.
|
||||
If you push a group whose name matches an existing Zulip group, the request
|
||||
will fail.
|
||||
* SCIM `DELETE` requests are not supported for groups. This means that if you choose
|
||||
to **Unlink** a group in Okta from the Zulip SCIM integration, you **must** select
|
||||
**Leave the group in the target app**. The **Delete the group in the target app**
|
||||
option is not supported.
|
||||
* In order to ensure consistent state, do not modify
|
||||
the name or memberships of SCIM-managed groups inside of Zulip. Such groups are
|
||||
meant to be managed in Okta. Changes made on the Zulip side will not be reflected
|
||||
in Okta and instead will cause the state of the Zulip group to become inconsistent
|
||||
with the state of the Okta group.
|
||||
* Note that while Zulip supports nested groups, Okta does not. Zulip,
|
||||
unlike Okta, supports nesting groups inside other groups. This means that
|
||||
you will use SCIM to sync the direct members of groups in Zulip, and then if
|
||||
you'd like to have a group contain another group in Zulip
|
||||
permissions, configure that subgroup relationship directly in Zulip.
|
||||
|
||||
{end_tabs}
|
||||
|
||||
## Related articles
|
||||
|
||||
* [SAML authentication](/help/saml-authentication)
|
||||
|
@@ -1,22 +1,39 @@
|
||||
import re
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
import django_scim.constants as scim_constants
|
||||
import django_scim.exceptions as scim_exceptions
|
||||
import django_scim.utils as scim_utils
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db import models
|
||||
from django.db import models, transaction
|
||||
from django.http import HttpRequest
|
||||
from django_scim.adapters import SCIMUser
|
||||
from django.urls import resolve
|
||||
from django_scim.adapters import SCIMGroup, SCIMUser
|
||||
from scim2_filter_parser.attr_paths import AttrPath
|
||||
|
||||
from zerver.actions.create_user import do_create_user, do_reactivate_user
|
||||
from zerver.actions.user_groups import (
|
||||
bulk_add_members_to_user_groups,
|
||||
bulk_remove_members_from_user_groups,
|
||||
create_user_group_in_database,
|
||||
do_update_user_group_name,
|
||||
)
|
||||
from zerver.actions.user_settings import check_change_full_name, do_change_user_delivery_email
|
||||
from zerver.actions.users import do_change_user_role, do_deactivate_user
|
||||
from zerver.context_processors import get_realm_from_request
|
||||
from zerver.lib.email_validation import email_allowed_for_realm, validate_email_not_already_in_realm
|
||||
from zerver.lib.exceptions import JsonableError
|
||||
from zerver.lib.request import RequestNotes
|
||||
from zerver.lib.subdomains import get_subdomain
|
||||
from zerver.models import UserProfile
|
||||
from zerver.lib.user_groups import (
|
||||
check_user_group_name,
|
||||
get_role_based_system_groups_dict,
|
||||
get_user_group_direct_member_ids,
|
||||
)
|
||||
from zerver.models import Realm, UserProfile
|
||||
from zerver.models.groups import NamedUserGroup, SystemGroups
|
||||
from zerver.models.realms import (
|
||||
DisposableEmailError,
|
||||
DomainNotAllowedForRealmError,
|
||||
@@ -340,6 +357,331 @@ class ZulipSCIMUser(SCIMUser):
|
||||
)
|
||||
|
||||
|
||||
def validate_group_member_ids_from_request(realm: Realm, member_ids: list[int]) -> None:
|
||||
if member_ids:
|
||||
member_ids_set = set(member_ids)
|
||||
member_realm_ids = list(
|
||||
UserProfile.objects.filter(id__in=member_ids_set)
|
||||
.distinct("realm_id")
|
||||
.values_list("realm_id", flat=True)
|
||||
)
|
||||
if len(member_realm_ids) > 1 or member_realm_ids[0] != realm.id:
|
||||
raise scim_exceptions.BadRequestError(
|
||||
"Users outside of the realm can't be removed or added to the group"
|
||||
)
|
||||
|
||||
found_member_ids_set = set(
|
||||
UserProfile.objects.filter(id__in=member_ids_set).values_list("id", flat=True)
|
||||
)
|
||||
if len(member_ids_set) != len(found_member_ids_set):
|
||||
raise scim_exceptions.BadRequestError(
|
||||
f"Invalid user ids found in the request: {member_ids_set - found_member_ids_set}"
|
||||
)
|
||||
|
||||
|
||||
def check_can_manage_group_by_scim(user_group: NamedUserGroup) -> bool:
|
||||
# Prohibit system groups.
|
||||
if user_group.is_system_group:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class ZulipSCIMGroup(SCIMGroup):
|
||||
"""
|
||||
This class contains the core of the implementation of SCIM sync of Groups.
|
||||
A SCIM Group corresponds to a NamedUserGroup object in Zulip.
|
||||
|
||||
This class follows the same architecture as ZulipSCIMUser, so rather than
|
||||
re-explaining the purpose of specific method overrides or small bits of
|
||||
equivalent logic, defer to checking the corresponding comments in the
|
||||
ZulipSCIMUser implementation.
|
||||
"""
|
||||
|
||||
id_field = "usergroup_ptr_id"
|
||||
|
||||
def __init__(self, obj: NamedUserGroup, request: HttpRequest | None = None) -> None:
|
||||
assert request is not None
|
||||
self.obj: NamedUserGroup
|
||||
|
||||
super().__init__(obj, request)
|
||||
|
||||
self.subdomain = get_subdomain(request)
|
||||
self.config = settings.SCIM_CONFIG[self.subdomain]
|
||||
|
||||
realm = get_realm_from_request(request)
|
||||
assert realm is not None
|
||||
self.realm: Realm = realm
|
||||
|
||||
self._name_new_value: str | None = None
|
||||
|
||||
# The (_member_ids_to_add, _member_ids_to_remove) pair and _intended_member_ids
|
||||
# are mutually exclusive. A PUT request or PATCH request with the "replace"
|
||||
# operation to update a group will specify the
|
||||
# full set of member ids that should belong to the group, thus setting
|
||||
# _intended_member_ids.
|
||||
# A PATCH request can specify "add" and/or "remove" operations, which will
|
||||
# set _member_ids_to_add and/or _member_ids_to_remove.
|
||||
#
|
||||
# Hypothetically, a PATCH request could specify all of "add", "remove"
|
||||
# and "replace" operations at once, in any order. We do not support
|
||||
# such a mix for now, and it's not something commonly used by SCIM clients,
|
||||
# if at all.
|
||||
# If necessary, this isn't too hard to implement however, and can be done
|
||||
# by sequencing thunks for each of the operations in the PATCH request,
|
||||
# to be executed in the .save() method, instead of this current approach.
|
||||
self._member_ids_to_add: set[int] | None = None
|
||||
self._member_ids_to_remove: set[int] | None = None
|
||||
self._intended_member_ids: set[int] | None = None
|
||||
|
||||
@property
|
||||
def display_name(self) -> str:
|
||||
return self.obj.name
|
||||
|
||||
@property
|
||||
def members(self) -> list[dict[str, object]]:
|
||||
"""
|
||||
Return a list of user dicts (ready for serialization) for the members
|
||||
of the group.
|
||||
|
||||
Overridden from the superclass to use our method for fetching group
|
||||
members.
|
||||
"""
|
||||
users = UserProfile.objects.filter(
|
||||
id__in=get_user_group_direct_member_ids(self.obj), is_bot=False, realm=self.realm
|
||||
).order_by("id")
|
||||
scim_users: list[SCIMUser] = [
|
||||
scim_utils.get_user_adapter()(user, self.request) for user in users
|
||||
]
|
||||
|
||||
dicts = []
|
||||
for user in scim_users:
|
||||
d = {
|
||||
"value": user.id,
|
||||
"$ref": user.location,
|
||||
"display": user.display_name,
|
||||
"type": "User",
|
||||
}
|
||||
dicts.append(d)
|
||||
|
||||
return dicts
|
||||
|
||||
def is_new_group(self) -> bool:
|
||||
return not bool(self.obj.id)
|
||||
|
||||
def to_dict(self) -> dict[str, object]:
|
||||
return {
|
||||
"id": str(self.obj.id),
|
||||
"schemas": [scim_constants.SchemaURI.GROUP],
|
||||
"displayName": self.display_name,
|
||||
# Groups in the process of being created don't have members.
|
||||
"members": self.members if not self.is_new_group() else [],
|
||||
"meta": self.meta,
|
||||
}
|
||||
|
||||
def from_dict(self, d: dict[str, Any]) -> None:
|
||||
name = d.get("displayName")
|
||||
if name is not None:
|
||||
assert isinstance(name, str)
|
||||
self.change_group_name(name)
|
||||
|
||||
members = d.get("members")
|
||||
if members:
|
||||
self._intended_member_ids = {int(member_dict["value"]) for member_dict in members}
|
||||
|
||||
def change_group_name(self, new_value: str) -> None:
|
||||
if new_value and self.obj.name != new_value:
|
||||
self._name_new_value = new_value
|
||||
|
||||
def delete(self) -> None:
|
||||
if not check_can_manage_group_by_scim(self.obj):
|
||||
raise scim_exceptions.BadRequestError(
|
||||
f"Group {self.obj.name} can't be managed by SCIM."
|
||||
)
|
||||
|
||||
# TODO: We don't currently support DELETE requests for groups. The correct way to handle
|
||||
# a DELETE would be to deactivate the group - but Zulip currently disallows deactivation
|
||||
# of groups under certain conditions, such as "the group is used for a permission".
|
||||
#
|
||||
# To be able to process a DELETE request, we need to implement a function to forcibly
|
||||
# deactivate a group, by correctly untangling it from all dependencies such as permissions
|
||||
# or supergroups.
|
||||
# See https://github.com/zulip/zulip/pull/34605 for current status of this work.
|
||||
raise scim_exceptions.NotImplementedError
|
||||
|
||||
def save(self) -> None:
|
||||
realm = self.realm
|
||||
assert realm is not None
|
||||
|
||||
if not check_can_manage_group_by_scim(self.obj):
|
||||
raise scim_exceptions.BadRequestError(
|
||||
f"Group {self.obj.name} can't be managed by SCIM."
|
||||
)
|
||||
|
||||
name_new_value = getattr(self, "_name_new_value", None)
|
||||
intended_member_ids = getattr(self, "_intended_member_ids", None)
|
||||
member_ids_to_remove = getattr(self, "_member_ids_to_remove", None)
|
||||
member_ids_to_add = getattr(self, "_member_ids_to_add", None)
|
||||
|
||||
# Reset the state of pending changes.
|
||||
self._name_new_value = None
|
||||
self._intended_member_ids = None
|
||||
self._member_ids_to_remove = None
|
||||
self._member_ids_to_add = None
|
||||
|
||||
if name_new_value is not None:
|
||||
try:
|
||||
check_user_group_name(name_new_value)
|
||||
except JsonableError as e:
|
||||
raise scim_exceptions.BadRequestError(e.msg)
|
||||
if NamedUserGroup.objects.filter(name=name_new_value, realm=realm).exists():
|
||||
raise ConflictError("Group name already in use: " + name_new_value)
|
||||
|
||||
# At most one of the three should be set for a .save() call. If the SCIM request has multiple operations
|
||||
# on group memberships to run (e.g. "add" some users and "remove" some users),
|
||||
# .save() is called sequentially, processing one operation at a time.
|
||||
assert (
|
||||
sum(
|
||||
value is not None
|
||||
for value in [intended_member_ids, member_ids_to_remove, member_ids_to_add]
|
||||
)
|
||||
<= 1
|
||||
)
|
||||
if intended_member_ids is not None:
|
||||
validate_group_member_ids_from_request(realm, intended_member_ids)
|
||||
elif member_ids_to_remove is not None:
|
||||
validate_group_member_ids_from_request(realm, member_ids_to_remove)
|
||||
elif member_ids_to_add is not None:
|
||||
validate_group_member_ids_from_request(realm, member_ids_to_add)
|
||||
|
||||
if self.is_new_group():
|
||||
if intended_member_ids is not None:
|
||||
members = list(UserProfile.objects.filter(id__in=intended_member_ids, realm=realm))
|
||||
else:
|
||||
members = []
|
||||
|
||||
system_groups_name_dict = get_role_based_system_groups_dict(realm)
|
||||
group_nobody = system_groups_name_dict[SystemGroups.NOBODY].usergroup_ptr
|
||||
group_settings_map = dict(
|
||||
can_add_members_group=group_nobody,
|
||||
can_manage_group=group_nobody,
|
||||
)
|
||||
assert name_new_value is not None
|
||||
self.obj = create_user_group_in_database(
|
||||
name_new_value,
|
||||
members,
|
||||
realm,
|
||||
description="Created from SCIM",
|
||||
group_settings_map=group_settings_map,
|
||||
acting_user=None,
|
||||
)
|
||||
return
|
||||
|
||||
with transaction.atomic(savepoint=False):
|
||||
# We need to lock the group now to conduct update operations without race conditions.
|
||||
user_group = NamedUserGroup.objects.select_for_update().get(id=self.obj.id)
|
||||
current_member_ids = set(get_user_group_direct_member_ids(user_group))
|
||||
if name_new_value is not None:
|
||||
do_update_user_group_name(self.obj, name_new_value, acting_user=None)
|
||||
if intended_member_ids is not None:
|
||||
current_member_ids = set(get_user_group_direct_member_ids(user_group))
|
||||
member_ids_to_remove = current_member_ids - intended_member_ids
|
||||
member_ids_to_add = intended_member_ids - current_member_ids
|
||||
|
||||
if member_ids_to_remove:
|
||||
# Clear out ids of users who have already been removed from the group.
|
||||
member_ids_to_remove = member_ids_to_remove.intersection(current_member_ids)
|
||||
bulk_remove_members_from_user_groups(
|
||||
[user_group], list(member_ids_to_remove), acting_user=None
|
||||
)
|
||||
if member_ids_to_add:
|
||||
# Clear out ids of users who are already in the group.
|
||||
member_ids_to_add = member_ids_to_add - current_member_ids
|
||||
bulk_add_members_to_user_groups(
|
||||
[user_group], list(member_ids_to_add), acting_user=None
|
||||
)
|
||||
|
||||
def handle_replace(
|
||||
self,
|
||||
path: AttrPath,
|
||||
value: str | list[Any] | dict[AttrPath, Any],
|
||||
operation: Any,
|
||||
) -> None:
|
||||
if not isinstance(value, dict):
|
||||
value = {path: value}
|
||||
|
||||
assert isinstance(value, dict)
|
||||
for attr_path, val in value.items():
|
||||
if attr_path.first_path == ("displayName", None, None):
|
||||
name = val
|
||||
assert isinstance(name, str)
|
||||
self.change_group_name(name)
|
||||
elif attr_path.first_path == ("members", None, None):
|
||||
intended_member_ids = {int(user_dict["value"]) for user_dict in val}
|
||||
self._intended_member_ids = intended_member_ids
|
||||
elif attr_path.first_path == ("id", None, None):
|
||||
# the "id", if present in the request, should just match the id
|
||||
# of the group - so this is a sanity check.
|
||||
assert int(val) == self.obj.id
|
||||
else: # nocoverage
|
||||
raise scim_exceptions.NotImplementedError
|
||||
|
||||
self.save()
|
||||
|
||||
def handle_add(
|
||||
self,
|
||||
path: AttrPath,
|
||||
value: str | list[Any] | dict[AttrPath, Any],
|
||||
operation: Any,
|
||||
) -> None:
|
||||
assert path is not None
|
||||
if path.first_path == ("members", None, None):
|
||||
members = value or []
|
||||
assert isinstance(members, list)
|
||||
self._member_ids_to_add = {int(member.get("value")) for member in members}
|
||||
else: # nocoverage
|
||||
raise scim_exceptions.NotImplementedError
|
||||
|
||||
self.save()
|
||||
|
||||
def handle_remove(
|
||||
self,
|
||||
path: AttrPath,
|
||||
value: str | list[Any] | dict[AttrPath, Any],
|
||||
operation: Any,
|
||||
) -> None:
|
||||
assert path is not None
|
||||
|
||||
if path.is_complex:
|
||||
# django-scim2 does not support handling of complex paths and thus we generally
|
||||
# don't support them either - as they're not used by our supported SCIM clients.
|
||||
# The exception is Okta requests to remove a user from a group.
|
||||
# Rather than a PATCH request with a simple path of the form
|
||||
# { ..., "path": "members", "value": [{"value": "<user id>"}] }
|
||||
# Okta sends a request specifying the user to remove in a complex path:
|
||||
# { ..., "path": 'members[value eq "<user id>"]' }
|
||||
#
|
||||
# We don't attempt to implement general handling of complex paths. Instead,
|
||||
# we just add a hacky approach for detecting and handling this single, specific
|
||||
# kind of request.
|
||||
#
|
||||
# HACK: Detect the strange filter query formed by django-scim2 when preparing
|
||||
# to parse the path in self.split_path().
|
||||
match = re.match(r'^members\[value eq "(\d+)"\] eq ""$', path.filter)
|
||||
if not match: # nocoverage
|
||||
raise scim_exceptions.NotImplementedError
|
||||
|
||||
user_profile_id = int(match.group(1))
|
||||
self._member_ids_to_remove = {user_profile_id}
|
||||
elif path.first_path == ("members", None, None):
|
||||
members = value or []
|
||||
assert isinstance(members, list)
|
||||
self._member_ids_to_remove = {int(member.get("value")) for member in members}
|
||||
else: # nocoverage
|
||||
raise scim_exceptions.NotImplementedError
|
||||
|
||||
self.save()
|
||||
|
||||
|
||||
def get_extra_model_filter_kwargs_getter(
|
||||
model: type[models.Model],
|
||||
) -> Callable[[HttpRequest, Any, Any], dict[str, object]]:
|
||||
@@ -365,7 +707,17 @@ def get_extra_model_filter_kwargs_getter(
|
||||
) -> dict[str, object]:
|
||||
realm = RequestNotes.get_notes(request).realm
|
||||
assert realm is not None
|
||||
return {"realm_id": realm.id, "is_bot": False}
|
||||
extra_kwargs: dict[str, object] = {"realm_id": realm.id}
|
||||
# We need to determine if it's /Users or /Groups being queried.
|
||||
url_name = resolve(request.path).url_name
|
||||
if url_name in ["users", "users-search"]:
|
||||
extra_kwargs.update({"is_bot": False})
|
||||
elif url_name == "groups":
|
||||
extra_kwargs.update({"deactivated": False})
|
||||
else:
|
||||
raise AssertionError
|
||||
|
||||
return extra_kwargs
|
||||
|
||||
return get_extra_filter_kwargs
|
||||
|
||||
|
@@ -1,5 +1,5 @@
|
||||
from django.http import HttpRequest
|
||||
from django_scim.filters import UserFilterQuery
|
||||
from django_scim.filters import GroupFilterQuery, UserFilterQuery
|
||||
|
||||
from zerver.lib.request import RequestNotes
|
||||
|
||||
@@ -55,3 +55,23 @@ class ZulipUserFilterQuery(UserFilterQuery):
|
||||
"AND zerver_userprofile.realm_id = %s AND zerver_userprofile.is_bot = False ORDER BY zerver_userprofile.id",
|
||||
[realm.id],
|
||||
)
|
||||
|
||||
|
||||
class ZulipGroupFilterQuery(GroupFilterQuery):
|
||||
attr_map = {
|
||||
("displayName", None, None): "zerver_namedusergroup.name",
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_extras(cls, q: str, request: HttpRequest | None = None) -> tuple[str, list[object]]:
|
||||
"""
|
||||
Here we ensure that results are limited to the subdomain of the request.
|
||||
"""
|
||||
assert request is not None
|
||||
realm = RequestNotes.get_notes(request).realm
|
||||
assert realm is not None
|
||||
|
||||
return (
|
||||
"AND zerver_namedusergroup.realm_id = %s AND zerver_namedusergroup.deactivated = False ORDER BY zerver_namedusergroup.usergroup_ptr_id",
|
||||
[realm.id],
|
||||
)
|
||||
|
@@ -8,10 +8,13 @@ import orjson
|
||||
from django.conf import settings
|
||||
from typing_extensions import override
|
||||
|
||||
from zerver.actions.user_groups import check_add_user_group, do_deactivate_user_group
|
||||
from zerver.actions.user_settings import do_change_full_name
|
||||
from zerver.lib.stream_subscription import get_subscribed_stream_ids_for_user
|
||||
from zerver.lib.test_classes import ZulipTestCase
|
||||
from zerver.lib.user_groups import get_user_group_direct_member_ids
|
||||
from zerver.models import UserProfile
|
||||
from zerver.models.groups import NamedUserGroup
|
||||
from zerver.models.realms import get_realm
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -830,15 +833,664 @@ class TestSCIMGroup(SCIMTestCase):
|
||||
to actually test desired behavior.
|
||||
"""
|
||||
|
||||
def generate_group_schema(self, user_group: NamedUserGroup) -> dict[str, Any]:
|
||||
return {
|
||||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
|
||||
"id": str(user_group.id),
|
||||
"displayName": user_group.name,
|
||||
"members": [
|
||||
{
|
||||
"value": str(user_profile.id),
|
||||
"$ref": f"http://zulip.testserver/scim/v2/Users/{user_profile.id}",
|
||||
"display": user_profile.full_name,
|
||||
"type": "User",
|
||||
}
|
||||
for user_profile in UserProfile.objects.filter(
|
||||
id__in=get_user_group_direct_member_ids(user_group), is_bot=False
|
||||
).order_by("id")
|
||||
],
|
||||
"meta": {
|
||||
"resourceType": "Group",
|
||||
"location": f"http://zulip.testserver/scim/v2/Groups/{user_group.id}",
|
||||
},
|
||||
}
|
||||
|
||||
def test_get_by_id(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
desdemona = self.example_user("desdemona")
|
||||
hamlet = self.example_user("hamlet")
|
||||
bot = self.create_test_bot("whatever", hamlet)
|
||||
|
||||
# We include a bot in the group memberships to verify that bots are ignored.
|
||||
test_group = check_add_user_group(realm, "Test group", [hamlet, bot], acting_user=desdemona)
|
||||
|
||||
expected_data = self.generate_group_schema(test_group)
|
||||
result = self.client_get(f"/scim/v2/Groups/{test_group.id}", {}, **self.scim_headers())
|
||||
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
self.assertEqual(output_data, expected_data)
|
||||
|
||||
member_ids = [int(user_dict["value"]) for user_dict in output_data["members"]]
|
||||
self.assertIn(hamlet.id, member_ids)
|
||||
self.assertNotIn(bot.id, member_ids)
|
||||
|
||||
def test_get_basic_filter_by_display_name(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
hamlet = self.example_user("hamlet")
|
||||
desdemona = self.example_user("desdemona")
|
||||
test_group = check_add_user_group(realm, "Test group", [hamlet], acting_user=desdemona)
|
||||
|
||||
expected_response_schema = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
|
||||
"totalResults": 1,
|
||||
"itemsPerPage": 50,
|
||||
"startIndex": 1,
|
||||
"Resources": [self.generate_group_schema(test_group)],
|
||||
}
|
||||
|
||||
result = self.client_get(
|
||||
f'/scim/v2/Groups?filter=displayName eq "{test_group.name}"',
|
||||
{},
|
||||
**self.scim_headers(),
|
||||
)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
# Deactivated groups are invisible to SCIM.
|
||||
do_deactivate_user_group(test_group, acting_user=None)
|
||||
|
||||
expected_response_schema = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
|
||||
"totalResults": 0,
|
||||
"itemsPerPage": 50,
|
||||
"startIndex": 1,
|
||||
"Resources": [],
|
||||
}
|
||||
result = self.client_get(
|
||||
f'/scim/v2/Groups?filter=displayName eq "{test_group.name}"',
|
||||
{},
|
||||
**self.scim_headers(),
|
||||
)
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
def test_get_all_with_pagination(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
|
||||
result_all = self.client_get("/scim/v2/Groups", {}, **self.scim_headers())
|
||||
self.assertEqual(result_all.status_code, 200)
|
||||
output_data_all = orjson.loads(result_all.content)
|
||||
|
||||
expected_response_schema = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
|
||||
"totalResults": NamedUserGroup.objects.filter(realm=realm).count(),
|
||||
"itemsPerPage": 50,
|
||||
"startIndex": 1,
|
||||
"Resources": [
|
||||
self.generate_group_schema(group)
|
||||
for group in NamedUserGroup.objects.filter(realm=realm).order_by("id")
|
||||
],
|
||||
}
|
||||
|
||||
self.assertEqual(output_data_all, expected_response_schema)
|
||||
|
||||
# Next we test pagination, just like in TestSCIMUser.test_get_all_with_pagination.
|
||||
result_offset_limited = self.client_get(
|
||||
"/scim/v2/Groups?startIndex=4&count=3", {}, **self.scim_headers()
|
||||
)
|
||||
self.assertEqual(result_offset_limited.status_code, 200)
|
||||
output_data_offset_limited = orjson.loads(result_offset_limited.content)
|
||||
self.assertEqual(output_data_offset_limited["itemsPerPage"], 3)
|
||||
self.assertEqual(output_data_offset_limited["startIndex"], 4)
|
||||
self.assertEqual(
|
||||
output_data_offset_limited["totalResults"], output_data_all["totalResults"]
|
||||
)
|
||||
self.assert_length(output_data_offset_limited["Resources"], 3)
|
||||
|
||||
self.assertEqual(output_data_offset_limited["Resources"], output_data_all["Resources"][3:6])
|
||||
|
||||
def test_post(self) -> None:
|
||||
payload: dict[str, Any] = {
|
||||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
|
||||
"displayName": "New group without members",
|
||||
"members": [],
|
||||
}
|
||||
original_group_count = NamedUserGroup.objects.count()
|
||||
result = self.client_post(
|
||||
"/scim/v2/Groups", payload, content_type="application/json", **self.scim_headers()
|
||||
)
|
||||
self.assertEqual(result.status_code, 201)
|
||||
output_data = orjson.loads(result.content)
|
||||
|
||||
self.assertEqual(NamedUserGroup.objects.count(), original_group_count + 1)
|
||||
|
||||
new_group = NamedUserGroup.objects.latest("id")
|
||||
self.assertEqual(new_group.name, "New group without members")
|
||||
self.assertEqual(set(get_user_group_direct_member_ids(new_group)), set())
|
||||
|
||||
expected_response_schema = self.generate_group_schema(new_group)
|
||||
self.assertEqual(expected_response_schema, output_data)
|
||||
|
||||
hamlet = self.example_user("hamlet")
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
|
||||
"displayName": "New group with members",
|
||||
"members": [{"value": str(hamlet.id)}],
|
||||
}
|
||||
result = self.client_post(
|
||||
"/scim/v2/Groups", payload, content_type="application/json", **self.scim_headers()
|
||||
)
|
||||
self.assertEqual(result.status_code, 201)
|
||||
output_data = orjson.loads(result.content)
|
||||
|
||||
self.assertEqual(NamedUserGroup.objects.count(), original_group_count + 2)
|
||||
|
||||
new_group = NamedUserGroup.objects.latest("id")
|
||||
self.assertEqual(new_group.name, "New group with members")
|
||||
self.assertEqual(set(get_user_group_direct_member_ids(new_group)), {hamlet.id})
|
||||
|
||||
expected_response_schema = self.generate_group_schema(new_group)
|
||||
self.assertEqual(expected_response_schema, output_data)
|
||||
|
||||
# Can't create a group with a name that already matches another group.
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
|
||||
"displayName": new_group.name,
|
||||
"members": [{"value": str(hamlet.id)}],
|
||||
}
|
||||
result = self.client_post(
|
||||
"/scim/v2/Groups", payload, content_type="application/json", **self.scim_headers()
|
||||
)
|
||||
self.assertEqual(
|
||||
orjson.loads(result.content),
|
||||
{
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
|
||||
"detail": f"Group name already in use: {new_group.name}",
|
||||
"status": 409,
|
||||
"scimType": "uniqueness",
|
||||
},
|
||||
)
|
||||
|
||||
def test_put_change_name_and_members(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
hamlet = self.example_user("hamlet")
|
||||
desdemona = self.example_user("desdemona")
|
||||
othello = self.example_user("othello")
|
||||
|
||||
test_group = check_add_user_group(
|
||||
realm, "Test group", [hamlet, desdemona], acting_user=desdemona
|
||||
)
|
||||
|
||||
# PUT replaces all specified attributes of the group. Thus,
|
||||
# this payload will replace both the name and set of members of
|
||||
# the group.
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
|
||||
"id": test_group.id,
|
||||
"displayName": "New test group name",
|
||||
"members": [{"value": str(hamlet.id)}, {"value": str(othello.id)}],
|
||||
}
|
||||
result = self.json_put(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
test_group.refresh_from_db()
|
||||
self.assertEqual(test_group.name, "New test group name")
|
||||
new_member_ids = get_user_group_direct_member_ids(test_group)
|
||||
self.assertEqual(set(new_member_ids), {hamlet.id, othello.id})
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
expected_response_schema = self.generate_group_schema(test_group)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
conflicting_group = check_add_user_group(
|
||||
realm, "Conflicting group", [hamlet, desdemona], acting_user=desdemona
|
||||
)
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
|
||||
"id": test_group.id,
|
||||
"displayName": conflicting_group.name,
|
||||
"members": [{"value": str(hamlet.id)}, {"value": str(othello.id)}],
|
||||
}
|
||||
result = self.json_put(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(
|
||||
orjson.loads(result.content),
|
||||
{
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
|
||||
"detail": f"Group name already in use: {conflicting_group.name}",
|
||||
"status": 409,
|
||||
"scimType": "uniqueness",
|
||||
},
|
||||
)
|
||||
|
||||
# Deactivated groups are invisible to SCIM.
|
||||
do_deactivate_user_group(test_group, acting_user=None)
|
||||
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
|
||||
"id": test_group.id,
|
||||
"displayName": "some new name - but group should not get updated",
|
||||
"members": [{"value": str(hamlet.id)}, {"value": str(othello.id)}],
|
||||
}
|
||||
result = self.json_put(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(
|
||||
orjson.loads(result.content),
|
||||
{
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
|
||||
"detail": f"Resource {test_group.id} not found",
|
||||
"status": 404,
|
||||
},
|
||||
)
|
||||
test_group.refresh_from_db()
|
||||
self.assertEqual(test_group.name, "New test group name")
|
||||
|
||||
def test_patch_add_remove_operations(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
hamlet = self.example_user("hamlet")
|
||||
desdemona = self.example_user("desdemona")
|
||||
othello = self.example_user("othello")
|
||||
cordelia = self.example_user("cordelia")
|
||||
iago = self.example_user("iago")
|
||||
|
||||
test_group = check_add_user_group(
|
||||
realm, "Test group", [hamlet, desdemona], acting_user=desdemona
|
||||
)
|
||||
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "add",
|
||||
"path": "members",
|
||||
"value": [
|
||||
{"value": str(othello.id)},
|
||||
{"value": str(cordelia.id)},
|
||||
# Include a user who's already in the group in this list, to verify that trying to add
|
||||
# a user who's already in the group is gracefully ignored.
|
||||
{"value": str(desdemona.id)},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
self.assertEqual(
|
||||
set(get_user_group_direct_member_ids(test_group)),
|
||||
{hamlet.id, desdemona.id, othello.id, cordelia.id},
|
||||
)
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
expected_response_schema = self.generate_group_schema(test_group)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "remove",
|
||||
"path": "members",
|
||||
# Similarly to the user addition case, here, for user removal,
|
||||
# we include a user who's not in the group. Trying to remove a user
|
||||
# who's not in the group should gracefully be ignored.
|
||||
"value": [
|
||||
{"value": str(othello.id)},
|
||||
{"value": str(cordelia.id)},
|
||||
{"value": str(iago.id)},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
self.assertEqual(
|
||||
set(get_user_group_direct_member_ids(test_group)), {hamlet.id, desdemona.id}
|
||||
)
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
expected_response_schema = self.generate_group_schema(test_group)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
# Now try a sequence of operations and verify the final result
|
||||
# is as expected.
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "add",
|
||||
"path": "members",
|
||||
"value": [{"value": str(othello.id)}, {"value": str(cordelia.id)}],
|
||||
},
|
||||
{
|
||||
"op": "remove",
|
||||
"path": "members",
|
||||
"value": [{"value": str(desdemona.id)}, {"value": str(hamlet.id)}],
|
||||
},
|
||||
{
|
||||
"op": "add",
|
||||
"path": "members",
|
||||
"value": [{"value": str(hamlet.id)}],
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
self.assertEqual(
|
||||
set(get_user_group_direct_member_ids(test_group)), {hamlet.id, othello.id, cordelia.id}
|
||||
)
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
expected_response_schema = self.generate_group_schema(test_group)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
def test_patch_remove_operation_with_complex_path(self) -> None:
|
||||
# We generally don't support PATCH requests with complex paths.
|
||||
# but have to make an exception for "remove a user from a group"
|
||||
# requests from Okta, which for some reason are sent with
|
||||
# complex paths.
|
||||
#
|
||||
# See ZulipSCIMGroup.handle_remove for details.
|
||||
realm = get_realm("zulip")
|
||||
hamlet = self.example_user("hamlet")
|
||||
desdemona = self.example_user("desdemona")
|
||||
|
||||
test_group = check_add_user_group(
|
||||
realm, "Test group", [hamlet, desdemona], acting_user=desdemona
|
||||
)
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "remove",
|
||||
"path": f'members[value eq "{hamlet.id!s}"]',
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
self.assertEqual(set(get_user_group_direct_member_ids(test_group)), {desdemona.id})
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
expected_response_schema = self.generate_group_schema(test_group)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
def test_patch_replace_operation(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
hamlet = self.example_user("hamlet")
|
||||
desdemona = self.example_user("desdemona")
|
||||
othello = self.example_user("othello")
|
||||
|
||||
test_group = check_add_user_group(
|
||||
realm, "Test group", [hamlet, desdemona], acting_user=desdemona
|
||||
)
|
||||
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{"op": "replace", "path": "displayName", "value": "New test group name"}
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
test_group.refresh_from_db()
|
||||
self.assertEqual(test_group.name, "New test group name")
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
expected_response_schema = self.generate_group_schema(test_group)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
# Next try editing members.
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "replace",
|
||||
"path": "members",
|
||||
"value": [
|
||||
{"value": str(hamlet.id)},
|
||||
{"value": str(othello.id)},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
self.assertEqual(set(get_user_group_direct_member_ids(test_group)), {hamlet.id, othello.id})
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
expected_response_schema = self.generate_group_schema(test_group)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
# The operation might also specify the attributes to replace by passing a dict
|
||||
# in the "value" key, instead of providing a "path".
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "replace",
|
||||
"value": {"id": str(test_group.id), "displayName": "New test group name 2"},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
test_group.refresh_from_db()
|
||||
self.assertEqual(test_group.name, "New test group name 2")
|
||||
|
||||
output_data = orjson.loads(result.content)
|
||||
expected_response_schema = self.generate_group_schema(test_group)
|
||||
self.assertEqual(output_data, expected_response_schema)
|
||||
|
||||
# Finally, update both SCIM attributes (displayName and members) in a single operation.
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "replace",
|
||||
"value": {
|
||||
"id": str(test_group.id),
|
||||
"displayName": "New test group name 3",
|
||||
"members": [
|
||||
{"value": str(hamlet.id)},
|
||||
{"value": str(othello.id)},
|
||||
{"value": str(desdemona.id)},
|
||||
],
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
test_group.refresh_from_db()
|
||||
self.assertEqual(test_group.name, "New test group name 3")
|
||||
self.assertEqual(
|
||||
set(get_user_group_direct_member_ids(test_group)), {hamlet.id, othello.id, desdemona.id}
|
||||
)
|
||||
|
||||
def test_patch_add_user_wrong_realm(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
hamlet = self.example_user("hamlet")
|
||||
mit_user = self.mit_user("sipbtest")
|
||||
desdemona = self.example_user("desdemona")
|
||||
othello = self.example_user("othello")
|
||||
|
||||
test_group = check_add_user_group(
|
||||
realm, "Test group", [hamlet, desdemona], acting_user=desdemona
|
||||
)
|
||||
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "add",
|
||||
"path": "members",
|
||||
"value": [
|
||||
{"value": str(othello.id)},
|
||||
# Also include an id of a user from a different realm.
|
||||
{"value": str(mit_user.id)},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(
|
||||
orjson.loads(result.content),
|
||||
{
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
|
||||
"detail": "Users outside of the realm can't be removed or added to the group",
|
||||
"status": 400,
|
||||
},
|
||||
)
|
||||
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "add",
|
||||
"path": "members",
|
||||
"value": [
|
||||
# Try a request with just the id of a user from a different realm.
|
||||
{"value": str(mit_user.id)},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(
|
||||
orjson.loads(result.content),
|
||||
{
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
|
||||
"detail": "Users outside of the realm can't be removed or added to the group",
|
||||
"status": 400,
|
||||
},
|
||||
)
|
||||
|
||||
# The requests failed, so there's no change in memberships.
|
||||
self.assertEqual(
|
||||
set(get_user_group_direct_member_ids(test_group)),
|
||||
{hamlet.id, desdemona.id},
|
||||
)
|
||||
|
||||
def test_patch_add_invalid_user_id(self) -> None:
|
||||
realm = get_realm("zulip")
|
||||
hamlet = self.example_user("hamlet")
|
||||
desdemona = self.example_user("desdemona")
|
||||
othello = self.example_user("othello")
|
||||
|
||||
test_group = check_add_user_group(
|
||||
realm, "Test group", [hamlet, desdemona], acting_user=desdemona
|
||||
)
|
||||
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{
|
||||
"op": "add",
|
||||
"path": "members",
|
||||
"value": [
|
||||
{"value": str(othello.id)},
|
||||
# Also include an id that doesn't match any user.
|
||||
{"value": "987654321"},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(f"/scim/v2/Groups/{test_group.id}", payload, **self.scim_headers())
|
||||
self.assertEqual(
|
||||
orjson.loads(result.content),
|
||||
{
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
|
||||
"detail": "Invalid user ids found in the request: {987654321}",
|
||||
"status": 400,
|
||||
},
|
||||
)
|
||||
|
||||
# The request failed, so there's no change in memberships.
|
||||
self.assertEqual(
|
||||
set(get_user_group_direct_member_ids(test_group)),
|
||||
{hamlet.id, desdemona.id},
|
||||
)
|
||||
|
||||
def test_cant_edit_system_groups(self) -> None:
|
||||
"""
|
||||
Verifies that system groups are not allowed to be managed by SCIM requests.
|
||||
"""
|
||||
realm = get_realm("zulip")
|
||||
system_group = NamedUserGroup.objects.get(realm=realm, name="role:owners")
|
||||
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
|
||||
"Operations": [
|
||||
{"op": "replace", "path": "displayName", "value": "New test group name"}
|
||||
],
|
||||
}
|
||||
|
||||
result = self.json_patch(
|
||||
f"/scim/v2/Groups/{system_group.id}", payload, **self.scim_headers()
|
||||
)
|
||||
self.assertEqual(
|
||||
orjson.loads(result.content),
|
||||
{
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
|
||||
"detail": "Group role:owners can't be managed by SCIM.",
|
||||
"status": 400,
|
||||
},
|
||||
)
|
||||
system_group.refresh_from_db()
|
||||
self.assertEqual(system_group.name, "role:owners")
|
||||
|
||||
result = self.client_delete(f"/scim/v2/Groups/{system_group.id}", **self.scim_headers())
|
||||
self.assertEqual(
|
||||
orjson.loads(result.content),
|
||||
{
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
|
||||
"detail": "Group role:owners can't be managed by SCIM.",
|
||||
"status": 400,
|
||||
},
|
||||
)
|
||||
self.assertTrue(NamedUserGroup.objects.filter(realm=realm, name="role:owners").exists())
|
||||
|
||||
payload = {
|
||||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
|
||||
"displayName": "role:doesntexistyet",
|
||||
"members": [],
|
||||
}
|
||||
original_group_count = NamedUserGroup.objects.count()
|
||||
result = self.client_post(
|
||||
"/scim/v2/Groups", payload, content_type="application/json", **self.scim_headers()
|
||||
)
|
||||
self.assertEqual(
|
||||
orjson.loads(result.content),
|
||||
{
|
||||
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
|
||||
"detail": "User group name cannot start with 'role:'.",
|
||||
"status": 400,
|
||||
},
|
||||
)
|
||||
self.assertEqual(original_group_count, NamedUserGroup.objects.count())
|
||||
|
||||
def test_endpoints_disabled(self) -> None:
|
||||
with self.assertLogs("django.request", "ERROR") as m:
|
||||
result = self.client_get("/scim/v2/Groups", {}, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 501)
|
||||
self.assertEqual(m.output, ["ERROR:django.request:Not Implemented: /scim/v2/Groups"])
|
||||
with self.assertLogs("django.request", "ERROR") as m:
|
||||
result = self.client_get("/scim/v2/Groups/1", {}, **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 501)
|
||||
self.assertEqual(m.output, ["ERROR:django.request:Not Implemented: /scim/v2/Groups/1"])
|
||||
with self.assertLogs("django.request", "ERROR") as m:
|
||||
result = self.client_post(
|
||||
"/scim/v2/Groups/.search",
|
||||
@@ -851,6 +1503,24 @@ class TestSCIMGroup(SCIMTestCase):
|
||||
m.output, ["ERROR:django.request:Not Implemented: /scim/v2/Groups/.search"]
|
||||
)
|
||||
|
||||
def test_delete(self) -> None:
|
||||
# The DELETE endpoint is currently disabled.
|
||||
|
||||
realm = get_realm("zulip")
|
||||
hamlet = self.example_user("hamlet")
|
||||
desdemona = self.example_user("desdemona")
|
||||
|
||||
test_group = check_add_user_group(
|
||||
realm, "Test group", [hamlet, desdemona], acting_user=desdemona
|
||||
)
|
||||
|
||||
with self.assertLogs("django.request", "ERROR") as m:
|
||||
result = self.client_delete(f"/scim/v2/Groups/{test_group.id}", **self.scim_headers())
|
||||
self.assertEqual(result.status_code, 501)
|
||||
self.assertEqual(
|
||||
m.output, [f"ERROR:django.request:Not Implemented: /scim/v2/Groups/{test_group.id}"]
|
||||
)
|
||||
|
||||
|
||||
class TestRemainingUnsupportedSCIMFeatures(SCIMTestCase):
|
||||
def test_endpoints_disabled(self) -> None:
|
||||
|
@@ -1290,6 +1290,9 @@ SENTRY_DSN = os.environ.get("SENTRY_DSN", SENTRY_DSN)
|
||||
SCIM_SERVICE_PROVIDER = {
|
||||
"USER_ADAPTER": "zerver.lib.scim.ZulipSCIMUser",
|
||||
"USER_FILTER_PARSER": "zerver.lib.scim_filter.ZulipUserFilterQuery",
|
||||
"GROUP_ADAPTER": "zerver.lib.scim.ZulipSCIMGroup",
|
||||
"GROUP_MODEL": "zerver.models.groups.NamedUserGroup",
|
||||
"GROUP_FILTER_PARSER": "zerver.lib.scim_filter.ZulipGroupFilterQuery",
|
||||
# NETLOC is actually overridden by the behavior of base_scim_location_getter,
|
||||
# but django-scim2 requires it to be set, even though it ends up not being used.
|
||||
# So we need to give it some value here, and EXTERNAL_HOST is the most generic.
|
||||
|
@@ -869,10 +869,6 @@ urls += [
|
||||
r"^scim/v2/Groups/.search$",
|
||||
scim_views.SCIMView.as_view(implemented=False),
|
||||
),
|
||||
re_path(
|
||||
r"^scim/v2/Groups(?:/(?P<uuid>[^/]+))?$",
|
||||
scim_views.SCIMView.as_view(implemented=False),
|
||||
),
|
||||
re_path(r"^scim/v2/Me$", scim_views.SCIMView.as_view(implemented=False)),
|
||||
re_path(
|
||||
r"^scim/v2/ServiceProviderConfig$",
|
||||
|
Reference in New Issue
Block a user