mirror of
https://github.com/zulip/zulip.git
synced 2025-10-23 16:14:02 +00:00
753 lines
31 KiB
Python
753 lines
31 KiB
Python
import re
|
|
from collections.abc import Callable
|
|
from typing import Any
|
|
|
|
import django_scim.constants as scim_constants
|
|
import django_scim.exceptions as scim_exceptions
|
|
import django_scim.utils as scim_utils
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
from django.db import models, transaction
|
|
from django.http import HttpRequest
|
|
from django.urls import resolve
|
|
from django_scim.adapters import SCIMGroup, SCIMUser
|
|
from scim2_filter_parser.attr_paths import AttrPath
|
|
|
|
from zerver.actions.create_user import do_create_user, do_reactivate_user
|
|
from zerver.actions.user_groups import (
|
|
bulk_add_members_to_user_groups,
|
|
bulk_remove_members_from_user_groups,
|
|
create_user_group_in_database,
|
|
do_update_user_group_name,
|
|
)
|
|
from zerver.actions.user_settings import check_change_full_name, do_change_user_delivery_email
|
|
from zerver.actions.users import do_change_user_role, do_deactivate_user
|
|
from zerver.context_processors import get_realm_from_request
|
|
from zerver.lib.email_validation import email_allowed_for_realm, validate_email_not_already_in_realm
|
|
from zerver.lib.exceptions import JsonableError
|
|
from zerver.lib.request import RequestNotes
|
|
from zerver.lib.subdomains import get_subdomain
|
|
from zerver.lib.user_groups import (
|
|
check_user_group_name,
|
|
get_role_based_system_groups_dict,
|
|
get_user_group_direct_member_ids,
|
|
)
|
|
from zerver.models import Realm, UserProfile
|
|
from zerver.models.groups import NamedUserGroup, SystemGroups
|
|
from zerver.models.realms import (
|
|
DisposableEmailError,
|
|
DomainNotAllowedForRealmError,
|
|
EmailContainsPlusError,
|
|
)
|
|
|
|
|
|
class ZulipSCIMUser(SCIMUser):
|
|
"""With django-scim2, the core of a project's SCIM implementation is
|
|
this user adapter class, which defines how to translate between the
|
|
concepts of users in the SCIM specification and the Zulip users.
|
|
"""
|
|
|
|
id_field = "id"
|
|
|
|
def __init__(self, obj: UserProfile, request: HttpRequest | None = None) -> None:
|
|
# We keep the function signature from the superclass, but this actually
|
|
# shouldn't be called with request being None.
|
|
assert request is not None
|
|
|
|
# self.obj is populated appropriately by django-scim2 views with
|
|
# an instance of UserProfile - either fetched from the database
|
|
# or constructed via UserProfile() if the request currently being
|
|
# handled is a User creation request (POST).
|
|
self.obj: UserProfile
|
|
|
|
super().__init__(obj, request)
|
|
self.subdomain = get_subdomain(request)
|
|
self.config = settings.SCIM_CONFIG[self.subdomain]
|
|
|
|
# These attributes are custom to this class and will be
|
|
# populated with values in handle_replace and similar methods
|
|
# in response to a request for the corresponding
|
|
# UserProfile fields to change. The .save() method inspects
|
|
# these fields an executes the requested changes.
|
|
self._email_new_value: str | None = None
|
|
self._is_active_new_value: bool | None = None
|
|
self._full_name_new_value: str | None = None
|
|
self._role_new_value: int | None = None
|
|
self._password_set_to: str | None = None
|
|
|
|
def is_new_user(self) -> bool:
|
|
return not bool(self.obj.id)
|
|
|
|
@property
|
|
def display_name(self) -> str:
|
|
"""
|
|
Return the displayName of the user per the SCIM spec.
|
|
|
|
Overridden because UserProfile uses the .full_name attribute,
|
|
while the superclass expects .first_name and .last_name.
|
|
"""
|
|
return self.obj.full_name
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""
|
|
Return a ``dict`` conforming to the SCIM User Schema,
|
|
ready for conversion to a JSON object.
|
|
|
|
The attribute names appearing in the dict are those defined in the SCIM User Schema:
|
|
https://datatracker.ietf.org/doc/html/rfc7643#section-4.1
|
|
"""
|
|
if self.config["name_formatted_included"]:
|
|
name = {
|
|
"formatted": self.obj.full_name,
|
|
}
|
|
else:
|
|
# Some clients (e.g. Okta) operate with a first_name,
|
|
# last_name model and don't support a full name field.
|
|
# While we strive never to do this in the project because
|
|
# not every culture has the first/last name structure,
|
|
# Okta's design means we have to convert our full_name
|
|
# into a first_name/last_name pair to provide to the
|
|
# client. We do naive conversion with `split`.
|
|
if " " in self.obj.full_name:
|
|
first_name, last_name = self.obj.full_name.split(" ", 1)
|
|
else:
|
|
first_name, last_name = self.obj.full_name, ""
|
|
name = {
|
|
"givenName": first_name,
|
|
"familyName": last_name,
|
|
}
|
|
|
|
return {
|
|
"schemas": [scim_constants.SchemaURI.USER],
|
|
"id": str(self.obj.id),
|
|
"userName": self.obj.delivery_email,
|
|
"name": name,
|
|
"displayName": self.display_name,
|
|
"active": self.obj.is_active,
|
|
"role": UserProfile.ROLE_ID_TO_API_NAME[self.obj.role],
|
|
# meta is a property implemented in the superclass
|
|
# TODO: The upstream implementation uses `user_profile.date_joined`
|
|
# as the value of the lastModified meta attribute, which is not
|
|
# a correct simplification. We should add proper tracking
|
|
# of this value.
|
|
"meta": self.meta,
|
|
}
|
|
|
|
def from_dict(self, d: dict[str, Any]) -> None:
|
|
"""Consume a dictionary conforming to the SCIM User Schema. The
|
|
dictionary was originally submitted as JSON by the client in
|
|
PUT (update a user) and POST (create a new user) requests. A
|
|
PUT request tells us to update User attributes to match those
|
|
passed in the dict. A POST request tells us to create a new
|
|
User with attributes as specified in the dict.
|
|
|
|
The superclass implements some very basic default behavior,
|
|
that doesn't support changing attributes via our actions.py
|
|
functions (which update audit logs, send events, etc.) or
|
|
doing application-specific validation.
|
|
|
|
Thus, we've completely overridden the upstream implementation
|
|
to store the values of the supported attributes that the
|
|
request would like to change. Actually modifying the database
|
|
is implemented in self.save().
|
|
|
|
Given that SCIMUser is an adapter class, this method is meant
|
|
to be completely overridden, and we can expect it remain the
|
|
case that no important django-scim2 logic relies on the
|
|
superclass's implementation of this function.
|
|
"""
|
|
email = d.get("userName")
|
|
assert isinstance(email, str)
|
|
self.change_delivery_email(email)
|
|
|
|
name_attr_dict = d.get("name", {})
|
|
if self.config["name_formatted_included"]:
|
|
full_name = name_attr_dict.get("formatted", "")
|
|
else:
|
|
# Some providers (e.g. Okta) don't provide name.formatted.
|
|
first_name = name_attr_dict.get("givenName", "")
|
|
last_name = name_attr_dict.get("familyName", "")
|
|
full_name = f"{first_name} {last_name}".strip()
|
|
|
|
if full_name:
|
|
assert isinstance(full_name, str)
|
|
self.change_full_name(full_name)
|
|
|
|
if self.is_new_user() and not full_name:
|
|
raise scim_exceptions.BadRequestError(
|
|
"Must specify name.formatted, name.givenName or name.familyName when creating a new user"
|
|
)
|
|
|
|
active = d.get("active")
|
|
if self.is_new_user() and not active:
|
|
raise scim_exceptions.BadRequestError("New user must have active=True")
|
|
|
|
if active is not None:
|
|
assert isinstance(active, bool)
|
|
self.change_is_active(active)
|
|
|
|
role_name = d.get("role")
|
|
if role_name:
|
|
assert isinstance(role_name, str)
|
|
self.change_role(role_name)
|
|
|
|
def change_delivery_email(self, new_value: str) -> None:
|
|
# Note that the email_allowed_for_realm check that usually
|
|
# appears adjacent to validate_email is present in save().
|
|
self.validate_email(new_value)
|
|
if self.obj.delivery_email != new_value:
|
|
self._email_new_value = new_value
|
|
|
|
def change_full_name(self, new_value: str) -> None:
|
|
if new_value and self.obj.full_name != new_value:
|
|
self._full_name_new_value = new_value
|
|
|
|
def change_is_active(self, new_value: bool) -> None:
|
|
if new_value != self.obj.is_active:
|
|
self._is_active_new_value = new_value
|
|
|
|
def change_role(self, new_role_name: str) -> None:
|
|
try:
|
|
role = UserProfile.ROLE_API_NAME_TO_ID[new_role_name]
|
|
except KeyError:
|
|
raise scim_exceptions.BadRequestError(
|
|
f"Invalid role: {new_role_name}. Valid values are: {list(UserProfile.ROLE_API_NAME_TO_ID.keys())}"
|
|
)
|
|
if role != self.obj.role:
|
|
self._role_new_value = role
|
|
|
|
def handle_replace(
|
|
self,
|
|
path: AttrPath | None,
|
|
value: str | list[object] | dict[AttrPath, object],
|
|
operation: Any,
|
|
) -> None:
|
|
"""
|
|
PATCH requests specify a list of operations of types "add", "remove", "replace".
|
|
So far we only implement "replace" as that should be sufficient.
|
|
|
|
This method is forked from the superclass and is called to handle "replace"
|
|
PATCH operations. Such an operation tells us to change the values
|
|
of a User's attributes as specified. The superclass implements a very basic
|
|
behavior in this method and is meant to be overridden, since this is an adapter class.
|
|
"""
|
|
if not isinstance(value, dict):
|
|
# Restructure for use in loop below. Taken from the
|
|
# overridden upstream method.
|
|
assert path is not None
|
|
value = {path: value}
|
|
|
|
assert isinstance(value, dict)
|
|
for attr_path, val in (value or {}).items():
|
|
if attr_path.first_path == ("userName", None, None):
|
|
assert isinstance(val, str)
|
|
self.change_delivery_email(val)
|
|
elif attr_path.first_path == ("name", "formatted", None):
|
|
# TODO: Add support name_formatted_included=False config like we do
|
|
# for updates via PUT.
|
|
assert isinstance(val, str)
|
|
self.change_full_name(val)
|
|
elif attr_path.first_path == ("active", None, None):
|
|
assert isinstance(val, bool)
|
|
self.change_is_active(val)
|
|
elif attr_path.first_path == ("role", None, None):
|
|
assert isinstance(val, str)
|
|
self.change_role(val)
|
|
else:
|
|
raise scim_exceptions.NotImplementedError("Not Implemented")
|
|
|
|
self.save()
|
|
|
|
def save(self) -> None:
|
|
"""
|
|
This method is called at the end of operations modifying a user,
|
|
and is responsible for actually applying the requested changes,
|
|
writing them to the database.
|
|
"""
|
|
realm = RequestNotes.get_notes(self._request).realm
|
|
assert realm is not None
|
|
|
|
email_new_value = getattr(self, "_email_new_value", None)
|
|
is_active_new_value = getattr(self, "_is_active_new_value", None)
|
|
full_name_new_value = getattr(self, "_full_name_new_value", None)
|
|
role_new_value = getattr(self, "_role_new_value", None)
|
|
password = getattr(self, "_password_set_to", None)
|
|
|
|
# Clean up the internal "pending change" state, now that we've
|
|
# fetched the values:
|
|
self._email_new_value = None
|
|
self._is_active_new_value = None
|
|
self._full_name_new_value = None
|
|
self._password_set_to = None
|
|
self._role_new_value = None
|
|
|
|
if email_new_value is not None:
|
|
try:
|
|
# Note that the validate_email check that usually
|
|
# appears adjacent to email_allowed_for_realm is
|
|
# present in save().
|
|
email_allowed_for_realm(email_new_value, realm)
|
|
except DomainNotAllowedForRealmError:
|
|
raise scim_exceptions.BadRequestError(
|
|
"This email domain isn't allowed in this organization."
|
|
)
|
|
except DisposableEmailError: # nocoverage
|
|
raise scim_exceptions.BadRequestError(
|
|
"Disposable email domains are not allowed for this realm."
|
|
)
|
|
except EmailContainsPlusError: # nocoverage
|
|
raise scim_exceptions.BadRequestError("Email address can't contain + characters.")
|
|
|
|
try:
|
|
validate_email_not_already_in_realm(
|
|
realm, email_new_value, allow_inactive_mirror_dummies=False
|
|
)
|
|
except ValidationError as e:
|
|
raise ConflictError("Email address already in use: " + str(e))
|
|
|
|
if self.is_new_user():
|
|
assert email_new_value is not None
|
|
assert full_name_new_value is not None
|
|
add_initial_stream_subscriptions = True
|
|
if (
|
|
self.config.get("create_guests_without_streams", False)
|
|
and role_new_value == UserProfile.ROLE_GUEST
|
|
):
|
|
add_initial_stream_subscriptions = False
|
|
|
|
self.obj = do_create_user(
|
|
email_new_value,
|
|
password,
|
|
realm,
|
|
full_name_new_value,
|
|
role=role_new_value,
|
|
tos_version=UserProfile.TOS_VERSION_BEFORE_FIRST_LOGIN,
|
|
add_initial_stream_subscriptions=add_initial_stream_subscriptions,
|
|
acting_user=None,
|
|
)
|
|
return
|
|
|
|
# TODO: The below operations should ideally be executed in a single
|
|
# atomic block to avoid failing with partial changes getting saved.
|
|
# This can be fixed once we figure out how do_deactivate_user can be run
|
|
# inside an atomic block.
|
|
|
|
# We process full_name first here, since it's the only one that can fail.
|
|
if full_name_new_value:
|
|
check_change_full_name(self.obj, full_name_new_value, acting_user=None)
|
|
|
|
if email_new_value is not None:
|
|
do_change_user_delivery_email(self.obj, email_new_value, acting_user=None)
|
|
|
|
if role_new_value is not None:
|
|
do_change_user_role(self.obj, role_new_value, acting_user=None)
|
|
|
|
if is_active_new_value is not None and is_active_new_value:
|
|
do_reactivate_user(self.obj, acting_user=None)
|
|
elif is_active_new_value is not None and not is_active_new_value:
|
|
do_deactivate_user(self.obj, acting_user=None)
|
|
|
|
def delete(self) -> None:
|
|
"""
|
|
This is consistent with Okta SCIM - users don't get DELETEd, they're deactivated
|
|
by changing their "active" attr to False.
|
|
"""
|
|
raise scim_exceptions.BadRequestError(
|
|
'DELETE operation not supported. Use PUT or PATCH to modify the "active" attribute instead.'
|
|
)
|
|
|
|
|
|
def validate_group_member_ids_from_request(realm: Realm, member_ids: list[int]) -> None:
|
|
if member_ids:
|
|
member_ids_set = set(member_ids)
|
|
member_realm_ids = list(
|
|
UserProfile.objects.filter(id__in=member_ids_set)
|
|
.distinct("realm_id")
|
|
.values_list("realm_id", flat=True)
|
|
)
|
|
if len(member_realm_ids) > 1 or member_realm_ids[0] != realm.id:
|
|
raise scim_exceptions.BadRequestError(
|
|
"Users outside of the realm can't be removed or added to the group"
|
|
)
|
|
|
|
found_member_ids_set = set(
|
|
UserProfile.objects.filter(id__in=member_ids_set).values_list("id", flat=True)
|
|
)
|
|
if len(member_ids_set) != len(found_member_ids_set):
|
|
raise scim_exceptions.BadRequestError(
|
|
f"Invalid user ids found in the request: {member_ids_set - found_member_ids_set}"
|
|
)
|
|
|
|
|
|
def check_can_manage_group_by_scim(user_group: NamedUserGroup) -> bool:
|
|
# Prohibit system groups.
|
|
if user_group.is_system_group:
|
|
return False
|
|
return True
|
|
|
|
|
|
class ZulipSCIMGroup(SCIMGroup):
|
|
"""
|
|
This class contains the core of the implementation of SCIM sync of Groups.
|
|
A SCIM Group corresponds to a NamedUserGroup object in Zulip.
|
|
|
|
This class follows the same architecture as ZulipSCIMUser, so rather than
|
|
re-explaining the purpose of specific method overrides or small bits of
|
|
equivalent logic, defer to checking the corresponding comments in the
|
|
ZulipSCIMUser implementation.
|
|
"""
|
|
|
|
id_field = "usergroup_ptr_id"
|
|
|
|
def __init__(self, obj: NamedUserGroup, request: HttpRequest | None = None) -> None:
|
|
assert request is not None
|
|
self.obj: NamedUserGroup
|
|
|
|
super().__init__(obj, request)
|
|
|
|
self.subdomain = get_subdomain(request)
|
|
self.config = settings.SCIM_CONFIG[self.subdomain]
|
|
|
|
realm = get_realm_from_request(request)
|
|
assert realm is not None
|
|
self.realm: Realm = realm
|
|
|
|
self._name_new_value: str | None = None
|
|
|
|
# The (_member_ids_to_add, _member_ids_to_remove) pair and _intended_member_ids
|
|
# are mutually exclusive. A PUT request or PATCH request with the "replace"
|
|
# operation to update a group will specify the
|
|
# full set of member ids that should belong to the group, thus setting
|
|
# _intended_member_ids.
|
|
# A PATCH request can specify "add" and/or "remove" operations, which will
|
|
# set _member_ids_to_add and/or _member_ids_to_remove.
|
|
#
|
|
# Hypothetically, a PATCH request could specify all of "add", "remove"
|
|
# and "replace" operations at once, in any order. We do not support
|
|
# such a mix for now, and it's not something commonly used by SCIM clients,
|
|
# if at all.
|
|
# If necessary, this isn't too hard to implement however, and can be done
|
|
# by sequencing thunks for each of the operations in the PATCH request,
|
|
# to be executed in the .save() method, instead of this current approach.
|
|
self._member_ids_to_add: set[int] | None = None
|
|
self._member_ids_to_remove: set[int] | None = None
|
|
self._intended_member_ids: set[int] | None = None
|
|
|
|
@property
|
|
def display_name(self) -> str:
|
|
return self.obj.name
|
|
|
|
@property
|
|
def members(self) -> list[dict[str, object]]:
|
|
"""
|
|
Return a list of user dicts (ready for serialization) for the members
|
|
of the group.
|
|
|
|
Overridden from the superclass to use our method for fetching group
|
|
members.
|
|
"""
|
|
users = UserProfile.objects.filter(
|
|
id__in=get_user_group_direct_member_ids(self.obj), is_bot=False, realm=self.realm
|
|
).order_by("id")
|
|
scim_users: list[SCIMUser] = [
|
|
scim_utils.get_user_adapter()(user, self.request) for user in users
|
|
]
|
|
|
|
dicts = []
|
|
for user in scim_users:
|
|
d = {
|
|
"value": user.id,
|
|
"$ref": user.location,
|
|
"display": user.display_name,
|
|
"type": "User",
|
|
}
|
|
dicts.append(d)
|
|
|
|
return dicts
|
|
|
|
def is_new_group(self) -> bool:
|
|
return not bool(self.obj.id)
|
|
|
|
def to_dict(self) -> dict[str, object]:
|
|
return {
|
|
"id": str(self.obj.id),
|
|
"schemas": [scim_constants.SchemaURI.GROUP],
|
|
"displayName": self.display_name,
|
|
# Groups in the process of being created don't have members.
|
|
"members": self.members if not self.is_new_group() else [],
|
|
"meta": self.meta,
|
|
}
|
|
|
|
def from_dict(self, d: dict[str, Any]) -> None:
|
|
name = d.get("displayName")
|
|
if name is not None:
|
|
assert isinstance(name, str)
|
|
self.change_group_name(name)
|
|
|
|
members = d.get("members")
|
|
if members:
|
|
self._intended_member_ids = {int(member_dict["value"]) for member_dict in members}
|
|
|
|
def change_group_name(self, new_value: str) -> None:
|
|
if new_value and self.obj.name != new_value:
|
|
self._name_new_value = new_value
|
|
|
|
def delete(self) -> None:
|
|
if not check_can_manage_group_by_scim(self.obj):
|
|
raise scim_exceptions.BadRequestError(
|
|
f"Group {self.obj.name} can't be managed by SCIM."
|
|
)
|
|
|
|
# TODO: We don't currently support DELETE requests for groups. The correct way to handle
|
|
# a DELETE would be to deactivate the group - but Zulip currently disallows deactivation
|
|
# of groups under certain conditions, such as "the group is used for a permission".
|
|
#
|
|
# To be able to process a DELETE request, we need to implement a function to forcibly
|
|
# deactivate a group, by correctly untangling it from all dependencies such as permissions
|
|
# or supergroups.
|
|
# See https://github.com/zulip/zulip/pull/34605 for current status of this work.
|
|
raise scim_exceptions.NotImplementedError
|
|
|
|
def save(self) -> None:
|
|
realm = self.realm
|
|
assert realm is not None
|
|
|
|
if not check_can_manage_group_by_scim(self.obj):
|
|
raise scim_exceptions.BadRequestError(
|
|
f"Group {self.obj.name} can't be managed by SCIM."
|
|
)
|
|
|
|
name_new_value = getattr(self, "_name_new_value", None)
|
|
intended_member_ids = getattr(self, "_intended_member_ids", None)
|
|
member_ids_to_remove = getattr(self, "_member_ids_to_remove", None)
|
|
member_ids_to_add = getattr(self, "_member_ids_to_add", None)
|
|
|
|
# Reset the state of pending changes.
|
|
self._name_new_value = None
|
|
self._intended_member_ids = None
|
|
self._member_ids_to_remove = None
|
|
self._member_ids_to_add = None
|
|
|
|
if name_new_value is not None:
|
|
try:
|
|
check_user_group_name(name_new_value)
|
|
except JsonableError as e:
|
|
raise scim_exceptions.BadRequestError(e.msg)
|
|
if NamedUserGroup.objects.filter(name=name_new_value, realm=realm).exists():
|
|
raise ConflictError("Group name already in use: " + name_new_value)
|
|
|
|
# At most one of the three should be set for a .save() call. If the SCIM request has multiple operations
|
|
# on group memberships to run (e.g. "add" some users and "remove" some users),
|
|
# .save() is called sequentially, processing one operation at a time.
|
|
assert (
|
|
sum(
|
|
value is not None
|
|
for value in [intended_member_ids, member_ids_to_remove, member_ids_to_add]
|
|
)
|
|
<= 1
|
|
)
|
|
if intended_member_ids is not None:
|
|
validate_group_member_ids_from_request(realm, intended_member_ids)
|
|
elif member_ids_to_remove is not None:
|
|
validate_group_member_ids_from_request(realm, member_ids_to_remove)
|
|
elif member_ids_to_add is not None:
|
|
validate_group_member_ids_from_request(realm, member_ids_to_add)
|
|
|
|
if self.is_new_group():
|
|
if intended_member_ids is not None:
|
|
members = list(UserProfile.objects.filter(id__in=intended_member_ids, realm=realm))
|
|
else:
|
|
members = []
|
|
|
|
system_groups_name_dict = get_role_based_system_groups_dict(realm)
|
|
group_nobody = system_groups_name_dict[SystemGroups.NOBODY].usergroup_ptr
|
|
group_settings_map = dict(
|
|
can_add_members_group=group_nobody,
|
|
can_manage_group=group_nobody,
|
|
)
|
|
assert name_new_value is not None
|
|
self.obj = create_user_group_in_database(
|
|
name_new_value,
|
|
members,
|
|
realm,
|
|
description="Created from SCIM",
|
|
group_settings_map=group_settings_map,
|
|
acting_user=None,
|
|
)
|
|
return
|
|
|
|
with transaction.atomic(savepoint=False):
|
|
# We need to lock the group now to conduct update operations without race conditions.
|
|
user_group = NamedUserGroup.objects.select_for_update().get(id=self.obj.id)
|
|
current_member_ids = set(get_user_group_direct_member_ids(user_group))
|
|
if name_new_value is not None:
|
|
do_update_user_group_name(self.obj, name_new_value, acting_user=None)
|
|
if intended_member_ids is not None:
|
|
current_member_ids = set(get_user_group_direct_member_ids(user_group))
|
|
member_ids_to_remove = current_member_ids - intended_member_ids
|
|
member_ids_to_add = intended_member_ids - current_member_ids
|
|
|
|
if member_ids_to_remove:
|
|
# Clear out ids of users who have already been removed from the group.
|
|
member_ids_to_remove = member_ids_to_remove.intersection(current_member_ids)
|
|
bulk_remove_members_from_user_groups(
|
|
[user_group], list(member_ids_to_remove), acting_user=None
|
|
)
|
|
if member_ids_to_add:
|
|
# Clear out ids of users who are already in the group.
|
|
member_ids_to_add = member_ids_to_add - current_member_ids
|
|
bulk_add_members_to_user_groups(
|
|
[user_group], list(member_ids_to_add), acting_user=None
|
|
)
|
|
|
|
def handle_replace(
|
|
self,
|
|
path: AttrPath,
|
|
value: str | list[Any] | dict[AttrPath, Any],
|
|
operation: Any,
|
|
) -> None:
|
|
if not isinstance(value, dict):
|
|
value = {path: value}
|
|
|
|
assert isinstance(value, dict)
|
|
for attr_path, val in value.items():
|
|
if attr_path.first_path == ("displayName", None, None):
|
|
name = val
|
|
assert isinstance(name, str)
|
|
self.change_group_name(name)
|
|
elif attr_path.first_path == ("members", None, None):
|
|
intended_member_ids = {int(user_dict["value"]) for user_dict in val}
|
|
self._intended_member_ids = intended_member_ids
|
|
elif attr_path.first_path == ("id", None, None):
|
|
# the "id", if present in the request, should just match the id
|
|
# of the group - so this is a sanity check.
|
|
assert int(val) == self.obj.id
|
|
else: # nocoverage
|
|
raise scim_exceptions.NotImplementedError
|
|
|
|
self.save()
|
|
|
|
def handle_add(
|
|
self,
|
|
path: AttrPath,
|
|
value: str | list[Any] | dict[AttrPath, Any],
|
|
operation: Any,
|
|
) -> None:
|
|
assert path is not None
|
|
if path.first_path == ("members", None, None):
|
|
members = value or []
|
|
assert isinstance(members, list)
|
|
self._member_ids_to_add = {int(member.get("value")) for member in members}
|
|
else: # nocoverage
|
|
raise scim_exceptions.NotImplementedError
|
|
|
|
self.save()
|
|
|
|
def handle_remove(
|
|
self,
|
|
path: AttrPath,
|
|
value: str | list[Any] | dict[AttrPath, Any],
|
|
operation: Any,
|
|
) -> None:
|
|
assert path is not None
|
|
|
|
if path.is_complex:
|
|
# django-scim2 does not support handling of complex paths and thus we generally
|
|
# don't support them either - as they're not used by our supported SCIM clients.
|
|
# The exception is Okta requests to remove a user from a group.
|
|
# Rather than a PATCH request with a simple path of the form
|
|
# { ..., "path": "members", "value": [{"value": "<user id>"}] }
|
|
# Okta sends a request specifying the user to remove in a complex path:
|
|
# { ..., "path": 'members[value eq "<user id>"]' }
|
|
#
|
|
# We don't attempt to implement general handling of complex paths. Instead,
|
|
# we just add a hacky approach for detecting and handling this single, specific
|
|
# kind of request.
|
|
#
|
|
# HACK: Detect the strange filter query formed by django-scim2 when preparing
|
|
# to parse the path in self.split_path().
|
|
match = re.match(r'^members\[value eq "(\d+)"\] eq ""$', path.filter)
|
|
if not match: # nocoverage
|
|
raise scim_exceptions.NotImplementedError
|
|
|
|
user_profile_id = int(match.group(1))
|
|
self._member_ids_to_remove = {user_profile_id}
|
|
elif path.first_path == ("members", None, None):
|
|
members = value or []
|
|
assert isinstance(members, list)
|
|
self._member_ids_to_remove = {int(member.get("value")) for member in members}
|
|
else: # nocoverage
|
|
raise scim_exceptions.NotImplementedError
|
|
|
|
self.save()
|
|
|
|
|
|
def get_extra_model_filter_kwargs_getter(
|
|
model: type[models.Model],
|
|
) -> Callable[[HttpRequest, Any, Any], dict[str, object]]:
|
|
"""Registered as GET_EXTRA_MODEL_FILTER_KWARGS_GETTER in our
|
|
SCIM configuration.
|
|
|
|
Returns a function which generates additional kwargs
|
|
to add to QuerySet's .filter() when fetching a UserProfile
|
|
corresponding to the requested SCIM User from the database.
|
|
|
|
It's *crucial* for security that we filter by realm_id (based on
|
|
the subdomain of the request) to prevent a SCIM client authorized
|
|
for subdomain X from being able to interact with all of the Users
|
|
on the entire server.
|
|
|
|
This should be extended for Groups when implementing them by
|
|
checking the `model` parameter; because we only support
|
|
UserProfiles, such a check is unnecessary.
|
|
"""
|
|
|
|
def get_extra_filter_kwargs(
|
|
request: HttpRequest, *args: Any, **kwargs: Any
|
|
) -> dict[str, object]:
|
|
realm = RequestNotes.get_notes(request).realm
|
|
assert realm is not None
|
|
extra_kwargs: dict[str, object] = {"realm_id": realm.id}
|
|
# We need to determine if it's /Users or /Groups being queried.
|
|
url_name = resolve(request.path).url_name
|
|
if url_name in ["users", "users-search"]:
|
|
extra_kwargs.update({"is_bot": False})
|
|
elif url_name == "groups":
|
|
extra_kwargs.update({"deactivated": False})
|
|
else:
|
|
raise AssertionError
|
|
|
|
return extra_kwargs
|
|
|
|
return get_extra_filter_kwargs
|
|
|
|
|
|
def base_scim_location_getter(request: HttpRequest, *args: Any, **kwargs: Any) -> str:
|
|
"""Used as the base url for constructing the Location of a SCIM resource.
|
|
|
|
Since SCIM synchronization is scoped to an individual realm, we
|
|
need these locations to be namespaced within the realm's domain
|
|
namespace, which is conveniently accessed via realm.url.
|
|
"""
|
|
|
|
realm = RequestNotes.get_notes(request).realm
|
|
assert realm is not None
|
|
|
|
return realm.url
|
|
|
|
|
|
class ConflictError(scim_exceptions.IntegrityError):
|
|
"""
|
|
Per https://datatracker.ietf.org/doc/html/rfc7644#section-3.3
|
|
|
|
If the service provider determines that the creation of the requested
|
|
resource conflicts with existing resources (e.g., a "User" resource
|
|
with a duplicate "userName"), the service provider MUST return HTTP
|
|
status code 409 (Conflict) with a "scimType" error code of
|
|
"uniqueness"
|
|
|
|
scim_exceptions.IntegrityError class omits to include the scimType.
|
|
"""
|
|
|
|
scim_type = "uniqueness"
|