black
This commit is contained in:
@@ -23,7 +23,7 @@ class AgentCustomFieldSerializer(serializers.ModelSerializer):
|
||||
"multiple_value": {"write_only": True},
|
||||
}
|
||||
|
||||
|
||||
|
||||
class AgentSerializer(serializers.ModelSerializer):
|
||||
winupdatepolicy = WinUpdatePolicySerializer(many=True, read_only=True)
|
||||
status = serializers.ReadOnlyField()
|
||||
@@ -44,10 +44,7 @@ class AgentSerializer(serializers.ModelSerializer):
|
||||
|
||||
class Meta:
|
||||
model = Agent
|
||||
exclude = [
|
||||
"last_seen",
|
||||
"id"
|
||||
]
|
||||
exclude = ["last_seen", "id"]
|
||||
|
||||
|
||||
class AgentTableSerializer(serializers.ModelSerializer):
|
||||
|
||||
@@ -811,7 +811,7 @@ class TestAgentViews(TacticalTestCase):
|
||||
|
||||
def test_bulk_updates(self):
|
||||
self.assertTrue(False)
|
||||
|
||||
|
||||
def test_get_notes(self):
|
||||
url = f"{base_url}/notes/"
|
||||
|
||||
@@ -1105,19 +1105,11 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
def test_agent_maintenance_permissions(self):
|
||||
site = baker.make("clients.Site")
|
||||
client = baker.make("clients.Client")
|
||||
|
||||
site_data = {
|
||||
"id": site.id,
|
||||
"type": "Site",
|
||||
"action": True
|
||||
}
|
||||
|
||||
client_data = {
|
||||
"id": client.id,
|
||||
"type": "Client",
|
||||
"action": True
|
||||
}
|
||||
|
||||
site_data = {"id": site.id, "type": "Site", "action": True}
|
||||
|
||||
client_data = {"id": client.id, "type": "Client", "action": True}
|
||||
|
||||
url = f"{base_url}/maintenance/bulk/"
|
||||
|
||||
# test superuser access
|
||||
@@ -1141,12 +1133,12 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
# limit user to client
|
||||
user.role.can_view_clients.set([client])
|
||||
self.check_not_authorized("post", url, site_data)
|
||||
self.check_authorized("post", url, client_data)
|
||||
self.check_authorized("post", url, client_data)
|
||||
|
||||
# also limit to site
|
||||
user.role.can_view_sites.set([site])
|
||||
self.check_authorized("post", url, site_data)
|
||||
self.check_authorized("post", url, client_data)
|
||||
self.check_authorized("post", url, client_data)
|
||||
|
||||
@patch("agents.tasks.send_agent_update_task.delay")
|
||||
def test_agent_update_permissions(self, update_task):
|
||||
@@ -1156,7 +1148,8 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
url = f"{base_url}/update/"
|
||||
|
||||
data = {
|
||||
"agent_ids": [agent.agent_id for agent in agents] + [agent.agent_id for agent in other_agents]
|
||||
"agent_ids": [agent.agent_id for agent in agents]
|
||||
+ [agent.agent_id for agent in other_agents]
|
||||
}
|
||||
|
||||
# test superuser access
|
||||
@@ -1165,12 +1158,12 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
update_task.reset_mock()
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user)
|
||||
self.client.force_authenticate(user=user)
|
||||
|
||||
self.check_not_authorized("post", url, data)
|
||||
update_task.assert_not_called()
|
||||
|
||||
user.role.can_update_agents = True
|
||||
user.role.can_update_agents = True
|
||||
user.role.save()
|
||||
|
||||
self.check_authorized("post", url, data)
|
||||
@@ -1181,7 +1174,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
user.role.can_view_clients.set([agents[0].client])
|
||||
self.check_authorized("post", url, data)
|
||||
update_task.assert_called_with(agent_ids=[agent.agent_id for agent in agents])
|
||||
update_task.reset_mock()
|
||||
update_task.reset_mock()
|
||||
|
||||
# add site
|
||||
user.role.can_view_sites.set([other_agents[0].site])
|
||||
@@ -1192,7 +1185,9 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
# remove client permissions
|
||||
user.role.can_view_clients.clear()
|
||||
self.check_authorized("post", url, data)
|
||||
update_task.assert_called_with(agent_ids=[agent.agent_id for agent in other_agents])
|
||||
update_task.assert_called_with(
|
||||
agent_ids=[agent.agent_id for agent in other_agents]
|
||||
)
|
||||
|
||||
def test_get_agent_version_permissions(self):
|
||||
agents = baker.make_recipe("agents.agent", _quantity=5)
|
||||
@@ -1205,11 +1200,11 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
self.assertEqual(len(response.data["agents"]), 12)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user)
|
||||
self.client.force_authenticate(user=user)
|
||||
|
||||
self.check_not_authorized("get", url)
|
||||
|
||||
user.role.can_list_agents = True
|
||||
user.role.can_list_agents = True
|
||||
user.role.save()
|
||||
|
||||
response = self.check_authorized("get", url)
|
||||
@@ -1218,7 +1213,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
# limit to client
|
||||
user.role.can_view_clients.set([agents[0].client])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data["agents"]), 5)
|
||||
self.assertEqual(len(response.data["agents"]), 5)
|
||||
|
||||
# add site
|
||||
user.role.can_view_sites.set([other_agents[0].site])
|
||||
@@ -1245,14 +1240,14 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser("post", url)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user)
|
||||
self.client.force_authenticate(user=user)
|
||||
|
||||
self.check_not_authorized("post", url)
|
||||
|
||||
user.role.can_install_agents = True
|
||||
user.role.save()
|
||||
user.role.save()
|
||||
|
||||
self.check_authorized("post", url)
|
||||
self.check_authorized("post", url)
|
||||
|
||||
# limit user to client
|
||||
user.role.can_view_clients.set([client])
|
||||
@@ -1261,19 +1256,19 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
"client": client.id,
|
||||
"site": client_site.id,
|
||||
"version": settings.LATEST_AGENT_VER,
|
||||
"arch": "64"
|
||||
"arch": "64",
|
||||
}
|
||||
|
||||
self.check_authorized("post", url, data)
|
||||
self.check_authorized("post", url, data)
|
||||
|
||||
data = {
|
||||
"client": site.client.id,
|
||||
"site": site.id,
|
||||
"version": settings.LATEST_AGENT_VER,
|
||||
"arch": "64"
|
||||
"arch": "64",
|
||||
}
|
||||
|
||||
self.check_not_authorized("post", url, data)
|
||||
self.check_not_authorized("post", url, data)
|
||||
|
||||
# assign site
|
||||
user.role.can_view_clients.clear()
|
||||
@@ -1282,7 +1277,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
"client": site.client.id,
|
||||
"site": site.id,
|
||||
"version": settings.LATEST_AGENT_VER,
|
||||
"arch": "64"
|
||||
"arch": "64",
|
||||
}
|
||||
|
||||
self.check_authorized("post", url, data)
|
||||
@@ -1291,7 +1286,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
"client": client.id,
|
||||
"site": client_site.id,
|
||||
"version": settings.LATEST_AGENT_VER,
|
||||
"arch": "64"
|
||||
"arch": "64",
|
||||
}
|
||||
|
||||
self.check_not_authorized("post", url, data)
|
||||
|
||||
@@ -34,11 +34,9 @@ urlpatterns = [
|
||||
path("notes/", views.GetAddNotes.as_view()),
|
||||
path("notes/<int:pk>/", views.GetEditDeleteNote.as_view()),
|
||||
path("<agent:agent_id>/notes/", views.GetAddNotes.as_view()),
|
||||
|
||||
# bulk actions
|
||||
path("maintenance/bulk/", views.agent_maintenance),
|
||||
path("actions/bulk/", views.bulk),
|
||||
|
||||
path("versions/", views.get_agent_versions),
|
||||
path("update/", views.update_agents),
|
||||
path("installer/", views.install_agent),
|
||||
|
||||
@@ -23,7 +23,11 @@ from scripts.tasks import handle_bulk_command_task, handle_bulk_script_task
|
||||
from tacticalrmm.utils import get_default_timezone, notify_error, reload_nats
|
||||
from winupdate.serializers import WinUpdatePolicySerializer
|
||||
from winupdate.tasks import bulk_check_for_updates_task, bulk_install_updates_task
|
||||
from tacticalrmm.permissions import _has_perm_on_agent, _has_perm_on_client, _has_perm_on_site
|
||||
from tacticalrmm.permissions import (
|
||||
_has_perm_on_agent,
|
||||
_has_perm_on_client,
|
||||
_has_perm_on_site,
|
||||
)
|
||||
|
||||
from .models import Agent, AgentCustomField, Note, RecoveryAction, AgentHistory
|
||||
from .permissions import (
|
||||
@@ -121,15 +125,15 @@ class GetUpdateDeleteAgent(APIView):
|
||||
# get agent details
|
||||
def get(self, request, agent_id):
|
||||
agent = get_object_or_404(Agent, agent_id=agent_id)
|
||||
return Response(AgentSerializer(agent, context={"default_tz": get_default_timezone()}).data)
|
||||
return Response(
|
||||
AgentSerializer(agent, context={"default_tz": get_default_timezone()}).data
|
||||
)
|
||||
|
||||
# edit agent
|
||||
def put(self, request, agent_id):
|
||||
agent = get_object_or_404(Agent, agent_id=agent_id)
|
||||
|
||||
a_serializer = AgentSerializer(
|
||||
instance=agent, data=request.data, partial=True
|
||||
)
|
||||
a_serializer = AgentSerializer(instance=agent, data=request.data, partial=True)
|
||||
a_serializer.is_valid(raise_exception=True)
|
||||
a_serializer.save()
|
||||
|
||||
@@ -253,7 +257,11 @@ class AgentMeshCentral(APIView):
|
||||
@api_view(["GET"])
|
||||
@permission_classes([IsAuthenticated, AgentPerms])
|
||||
def get_agent_versions(request):
|
||||
agents = Agent.objects.filter_by_role(request.user).prefetch_related("site").only("pk", "hostname")
|
||||
agents = (
|
||||
Agent.objects.filter_by_role(request.user)
|
||||
.prefetch_related("site")
|
||||
.only("pk", "hostname")
|
||||
)
|
||||
return Response(
|
||||
{
|
||||
"versions": [settings.LATEST_AGENT_VER],
|
||||
@@ -265,7 +273,11 @@ def get_agent_versions(request):
|
||||
@api_view(["POST"])
|
||||
@permission_classes([IsAuthenticated, UpdateAgentPerms])
|
||||
def update_agents(request):
|
||||
q = Agent.objects.filter_by_role(request.user).filter(agent_id__in=request.data["agent_ids"]).only("agent_id", "version")
|
||||
q = (
|
||||
Agent.objects.filter_by_role(request.user)
|
||||
.filter(agent_id__in=request.data["agent_ids"])
|
||||
.only("agent_id", "version")
|
||||
)
|
||||
agent_ids: list[str] = [
|
||||
i.agent_id
|
||||
for i in q
|
||||
@@ -780,19 +792,25 @@ def bulk(request):
|
||||
if request.data["target"] == "client":
|
||||
if not _has_perm_on_client(request.user, request.data["client"]):
|
||||
raise PermissionDenied()
|
||||
q = Agent.objects.filter_by_role(request.user).filter(site__client_id=request.data["client"])
|
||||
q = Agent.objects.filter_by_role(request.user).filter(
|
||||
site__client_id=request.data["client"]
|
||||
)
|
||||
|
||||
elif request.data["target"] == "site":
|
||||
if not _has_perm_on_site(request.user, request.data["site"]):
|
||||
raise PermissionDenied()
|
||||
q = Agent.objects.filter_by_role(request.user).filter(site_id=request.data["site"])
|
||||
q = Agent.objects.filter_by_role(request.user).filter(
|
||||
site_id=request.data["site"]
|
||||
)
|
||||
|
||||
elif request.data["target"] == "agents":
|
||||
q = Agent.objects.filter_by_role(request.user).filter(agent_id__in=request.data["agents"])
|
||||
q = Agent.objects.filter_by_role(request.user).filter(
|
||||
agent_id__in=request.data["agents"]
|
||||
)
|
||||
|
||||
elif request.data["target"] == "all":
|
||||
q = Agent.objects.filter_by_role(request.user).only("pk", "monitoring_type")
|
||||
|
||||
|
||||
else:
|
||||
return notify_error("Something went wrong")
|
||||
|
||||
@@ -877,7 +895,7 @@ def agent_maintenance(request):
|
||||
return notify_error("Invalid data")
|
||||
|
||||
if count:
|
||||
action = 'disabled' if not request.data["action"] else 'enabled'
|
||||
action = "disabled" if not request.data["action"] else "enabled"
|
||||
return Response(f"Maintenance mode has been {action} on {count} agents")
|
||||
else:
|
||||
return Response(
|
||||
|
||||
@@ -33,18 +33,23 @@ class AlertPerms(permissions.BasePermission):
|
||||
def has_permission(self, r, view):
|
||||
if r.method == "GET" or r.method == "PATCH":
|
||||
if "pk" in view.kwargs.keys():
|
||||
return _has_perm(r, "can_list_alerts") and _has_perm_on_alert(r.user, view.kwargs["pk"])
|
||||
return _has_perm(r, "can_list_alerts") and _has_perm_on_alert(
|
||||
r.user, view.kwargs["pk"]
|
||||
)
|
||||
else:
|
||||
return _has_perm(r, "can_list_alerts")
|
||||
else:
|
||||
if "pk" in view.kwargs.keys():
|
||||
return _has_perm(r, "can_manage_alerts") and _has_perm_on_alert(r.user, view.kwargs["pk"])
|
||||
return _has_perm(r, "can_manage_alerts") and _has_perm_on_alert(
|
||||
r.user, view.kwargs["pk"]
|
||||
)
|
||||
else:
|
||||
return _has_perm(r, "can_manage_alerts")
|
||||
|
||||
|
||||
class AlertTemplatePerms(permissions.BasePermission):
|
||||
def has_permission(self, r, view):
|
||||
if r.method == "GET":
|
||||
return _has_perm(r, "can_list_alerttemplates")
|
||||
else:
|
||||
return _has_perm(r, "can_manage_alerttemplates")
|
||||
return _has_perm(r, "can_manage_alerttemplates")
|
||||
|
||||
@@ -243,7 +243,7 @@ class GetUpdateDeleteAlertTemplate(APIView):
|
||||
|
||||
class RelatedAlertTemplate(APIView):
|
||||
permission_classes = [IsAuthenticated, AlertTemplatePerms]
|
||||
|
||||
|
||||
def get(self, request, pk):
|
||||
alert_template = get_object_or_404(AlertTemplate, pk=pk)
|
||||
return Response(AlertTemplateRelationSerializer(alert_template).data)
|
||||
|
||||
@@ -30,7 +30,6 @@ class PolicyTableSerializer(ModelSerializer):
|
||||
class Meta:
|
||||
model = Policy
|
||||
fields = "__all__"
|
||||
|
||||
|
||||
def get_agents_count(self, policy):
|
||||
return policy.related_agents().count()
|
||||
@@ -44,23 +43,46 @@ class PolicyRelatedSerializer(ModelSerializer):
|
||||
agents = SerializerMethodField()
|
||||
|
||||
def get_agents(self, policy):
|
||||
return AgentHostnameSerializer(policy.agents.filter_by_role(self.context["user"]).only("agent_id", "hostname"), many=True).data
|
||||
return AgentHostnameSerializer(
|
||||
policy.agents.filter_by_role(self.context["user"]).only(
|
||||
"agent_id", "hostname"
|
||||
),
|
||||
many=True,
|
||||
).data
|
||||
|
||||
def get_workstation_clients(self, policy):
|
||||
return ClientMinimumSerializer(policy.workstation_clients.filter_by_role(self.context["user"]), many=True).data
|
||||
return ClientMinimumSerializer(
|
||||
policy.workstation_clients.filter_by_role(self.context["user"]), many=True
|
||||
).data
|
||||
|
||||
def get_server_clients(self, policy):
|
||||
return ClientMinimumSerializer(policy.server_clients.filter_by_role(self.context["user"]), many=True).data
|
||||
return ClientMinimumSerializer(
|
||||
policy.server_clients.filter_by_role(self.context["user"]), many=True
|
||||
).data
|
||||
|
||||
def get_workstation_sites(self, policy):
|
||||
return SiteMinimumSerializer(policy.workstation_sites.filter_by_role(self.context["user"]), many=True).data
|
||||
return SiteMinimumSerializer(
|
||||
policy.workstation_sites.filter_by_role(self.context["user"]), many=True
|
||||
).data
|
||||
|
||||
def get_server_sites(self, policy):
|
||||
return SiteMinimumSerializer(policy.server_sites.filter_by_role(self.context["user"]), many=True).data
|
||||
return SiteMinimumSerializer(
|
||||
policy.server_sites.filter_by_role(self.context["user"]), many=True
|
||||
).data
|
||||
|
||||
class Meta:
|
||||
model = Policy
|
||||
fields = ("pk", "name", "workstation_clients", "workstation_sites", "server_clients", "server_sites", "agents", "is_default_server_policy", "is_default_workstation_policy")
|
||||
fields = (
|
||||
"pk",
|
||||
"name",
|
||||
"workstation_clients",
|
||||
"workstation_sites",
|
||||
"server_clients",
|
||||
"server_sites",
|
||||
"agents",
|
||||
"is_default_server_policy",
|
||||
"is_default_workstation_policy",
|
||||
)
|
||||
|
||||
|
||||
class PolicyOverviewSerializer(ModelSerializer):
|
||||
|
||||
@@ -30,7 +30,11 @@ class GetAddPolicies(APIView):
|
||||
def get(self, request):
|
||||
policies = Policy.objects.all()
|
||||
|
||||
return Response(PolicyTableSerializer(policies, context={"user": request.user}, many=True).data)
|
||||
return Response(
|
||||
PolicyTableSerializer(
|
||||
policies, context={"user": request.user}, many=True
|
||||
).data
|
||||
)
|
||||
|
||||
def post(self, request):
|
||||
serializer = PolicySerializer(data=request.data, partial=True)
|
||||
@@ -144,7 +148,9 @@ class GetRelated(APIView):
|
||||
.first()
|
||||
)
|
||||
|
||||
return Response(PolicyRelatedSerializer(policy, context={"user": request.user}).data)
|
||||
return Response(
|
||||
PolicyRelatedSerializer(policy, context={"user": request.user}).data
|
||||
)
|
||||
|
||||
|
||||
class UpdatePatchPolicy(APIView):
|
||||
@@ -178,6 +184,7 @@ class UpdatePatchPolicy(APIView):
|
||||
|
||||
return Response("ok")
|
||||
|
||||
|
||||
class ResetPatchPolicy(APIView):
|
||||
# bulk reset agent patch policy
|
||||
def post(self, request):
|
||||
@@ -186,18 +193,26 @@ class ResetPatchPolicy(APIView):
|
||||
if not _has_perm_on_client(request.user, request.data["client"]):
|
||||
raise PermissionDenied()
|
||||
|
||||
agents = Agent.objects.filter_by_role(request.user).prefetch_related("winupdatepolicy").filter(
|
||||
site__client_id=request.data["client"]
|
||||
agents = (
|
||||
Agent.objects.filter_by_role(request.user)
|
||||
.prefetch_related("winupdatepolicy")
|
||||
.filter(site__client_id=request.data["client"])
|
||||
)
|
||||
elif "site" in request.data:
|
||||
if not _has_perm_on_site(request.user, request.data["site"]):
|
||||
raise PermissionDenied()
|
||||
|
||||
agents = Agent.objects.filter_by_role(request.user).prefetch_related("winupdatepolicy").filter(
|
||||
site_id=request.data["site"]
|
||||
agents = (
|
||||
Agent.objects.filter_by_role(request.user)
|
||||
.prefetch_related("winupdatepolicy")
|
||||
.filter(site_id=request.data["site"])
|
||||
)
|
||||
else:
|
||||
agents = Agent.objects.filter_by_role(request.user).prefetch_related("winupdatepolicy").only("pk")
|
||||
agents = (
|
||||
Agent.objects.filter_by_role(request.user)
|
||||
.prefetch_related("winupdatepolicy")
|
||||
.only("pk")
|
||||
)
|
||||
|
||||
for agent in agents:
|
||||
winupdatepolicy = agent.winupdatepolicy.get()
|
||||
|
||||
@@ -672,7 +672,7 @@ class Check(BaseAuditModel):
|
||||
|
||||
class CheckHistory(models.Model):
|
||||
objects = PermissionQuerySet.as_manager()
|
||||
|
||||
|
||||
check_id = models.PositiveIntegerField(default=0)
|
||||
x = models.DateTimeField(auto_now_add=True)
|
||||
y = models.PositiveIntegerField(null=True, blank=True, default=None)
|
||||
|
||||
@@ -263,7 +263,7 @@ ARCH_CHOICES = [
|
||||
|
||||
class Deployment(models.Model):
|
||||
objects = PermissionQuerySet.as_manager()
|
||||
|
||||
|
||||
uid = models.UUIDField(primary_key=False, default=uuid.uuid4, editable=False)
|
||||
site = models.ForeignKey(
|
||||
"clients.Site", related_name="deploysites", on_delete=models.CASCADE
|
||||
|
||||
@@ -447,7 +447,10 @@ class TestClientViews(TacticalTestCase):
|
||||
self.assertEqual(r.status_code, 404)
|
||||
|
||||
# test valid download
|
||||
deployment = baker.make("clients.Deployment", install_flags={"rdp": True, "ping": False, "power": False})
|
||||
deployment = baker.make(
|
||||
"clients.Deployment",
|
||||
install_flags={"rdp": True, "ping": False, "power": False},
|
||||
)
|
||||
|
||||
url = f"/clients/{deployment.uid}/deploy/"
|
||||
|
||||
@@ -463,7 +466,7 @@ class TestClientPermissions(TacticalTestCase):
|
||||
def test_get_clients_permissions(self):
|
||||
# create user with empty role
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
url = f"{base_url}/"
|
||||
|
||||
@@ -480,17 +483,17 @@ class TestClientPermissions(TacticalTestCase):
|
||||
|
||||
# all agents should be returned
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 5) # type: ignore
|
||||
self.assertEqual(len(response.data), 5) # type: ignore
|
||||
|
||||
# limit user to specific client. only 1 client should be returned
|
||||
user.role.can_view_clients.set([clients[3]])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 1) # type: ignore
|
||||
self.assertEqual(len(response.data), 1) # type: ignore
|
||||
|
||||
# 2 should be returned now
|
||||
user.role.can_view_clients.set([clients[0], clients[1]])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 2) # type: ignore
|
||||
self.assertEqual(len(response.data), 2) # type: ignore
|
||||
|
||||
# limit to a specific site. The site shouldn't be in client returned sites
|
||||
sites = baker.make("clients.Site", client=clients[4], _quantity=3)
|
||||
@@ -499,14 +502,14 @@ class TestClientPermissions(TacticalTestCase):
|
||||
|
||||
user.role.can_view_sites.set([sites[0]])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 3) # type: ignore
|
||||
for client in response.data: # type: ignore
|
||||
self.assertEqual(len(response.data), 3) # type: ignore
|
||||
for client in response.data: # type: ignore
|
||||
if client["id"] == clients[0].id:
|
||||
self.assertEqual(len(client["sites"]), 4)
|
||||
elif client["id"] == clients[1].id:
|
||||
self.assertEqual(len(client["sites"]), 5)
|
||||
elif client["id"] == clients[4].id:
|
||||
self.assertEqual(len(client["sites"]), 1)
|
||||
self.assertEqual(len(client["sites"]), 1)
|
||||
|
||||
# make sure superusers work
|
||||
self.check_authorized_superuser("get", url)
|
||||
@@ -515,14 +518,7 @@ class TestClientPermissions(TacticalTestCase):
|
||||
@patch("clients.models.Client.delete")
|
||||
def test_add_clients_permissions(self, save, delete):
|
||||
|
||||
data = {
|
||||
"client": {
|
||||
"name": "Client Name"
|
||||
},
|
||||
"site": {
|
||||
"name": "Site Name"
|
||||
}
|
||||
}
|
||||
data = {"client": {"name": "Client Name"}, "site": {"name": "Site Name"}}
|
||||
|
||||
url = f"{base_url}/"
|
||||
|
||||
@@ -530,7 +526,7 @@ class TestClientPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser("post", url, data)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("post", url, data)
|
||||
@@ -545,7 +541,7 @@ class TestClientPermissions(TacticalTestCase):
|
||||
def test_get_edit_delete_clients_permissions(self, delete):
|
||||
# create user with empty role
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
client = baker.make("clients.Client")
|
||||
unauthorized_client = baker.make("clients.Client")
|
||||
@@ -576,12 +572,14 @@ class TestClientPermissions(TacticalTestCase):
|
||||
|
||||
# make sure superusers work
|
||||
for method in methods:
|
||||
self.check_authorized_superuser(method, f"{base_url}/{unauthorized_client.id}/")
|
||||
self.check_authorized_superuser(
|
||||
method, f"{base_url}/{unauthorized_client.id}/"
|
||||
)
|
||||
|
||||
def test_get_sites_permissions(self):
|
||||
# create user with empty role
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
url = f"{base_url}/sites/"
|
||||
|
||||
@@ -599,28 +597,28 @@ class TestClientPermissions(TacticalTestCase):
|
||||
|
||||
# all sites should be returned
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 10) # type: ignore
|
||||
self.assertEqual(len(response.data), 10) # type: ignore
|
||||
|
||||
# limit user to specific site. only 1 site should be returned
|
||||
user.role.can_view_sites.set([sites[3]])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 1) # type: ignore
|
||||
self.assertEqual(len(response.data), 1) # type: ignore
|
||||
|
||||
# 2 should be returned now
|
||||
user.role.can_view_sites.set([sites[0], sites[1]])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 2) # type: ignore
|
||||
self.assertEqual(len(response.data), 2) # type: ignore
|
||||
|
||||
# check if limiting user to client works
|
||||
user.role.can_view_sites.clear()
|
||||
user.role.can_view_clients.set([clients[0]])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 4) # type: ignore
|
||||
self.assertEqual(len(response.data), 4) # type: ignore
|
||||
|
||||
# add a site to see if the results still work
|
||||
user.role.can_view_sites.set([sites[1], sites[0]])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 5) # type: ignore
|
||||
self.assertEqual(len(response.data), 5) # type: ignore
|
||||
|
||||
# make sure superusers work
|
||||
self.check_authorized_superuser("get", url)
|
||||
@@ -630,10 +628,7 @@ class TestClientPermissions(TacticalTestCase):
|
||||
def test_add_sites_permissions(self, delete, save):
|
||||
client = baker.make("clients.Client")
|
||||
unauthorized_client = baker.make("clients.Client")
|
||||
data = {
|
||||
"client": client.id,
|
||||
"name": "Site Name"
|
||||
}
|
||||
data = {"client": client.id, "name": "Site Name"}
|
||||
|
||||
url = f"{base_url}/sites/"
|
||||
|
||||
@@ -641,7 +636,7 @@ class TestClientPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser("post", url, data)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("post", url, data)
|
||||
@@ -657,17 +652,14 @@ class TestClientPermissions(TacticalTestCase):
|
||||
self.check_authorized("post", url, data)
|
||||
|
||||
# test adding to unauthorized client
|
||||
data = {
|
||||
"client": unauthorized_client.id,
|
||||
"name": "Site Name"
|
||||
}
|
||||
data = {"client": unauthorized_client.id, "name": "Site Name"}
|
||||
self.check_not_authorized("post", url, data)
|
||||
|
||||
@patch("clients.models.Site.delete")
|
||||
def test_get_edit_delete_sites_permissions(self, delete):
|
||||
# create user with empty role
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
site = baker.make("clients.Site")
|
||||
unauthorized_site = baker.make("clients.Site")
|
||||
@@ -706,7 +698,9 @@ class TestClientPermissions(TacticalTestCase):
|
||||
|
||||
# make sure superusers work
|
||||
for method in methods:
|
||||
self.check_authorized_superuser(method, f"{base_url}/{unauthorized_site.id}/")
|
||||
self.check_authorized_superuser(
|
||||
method, f"{base_url}/{unauthorized_site.id}/"
|
||||
)
|
||||
|
||||
def test_get_pendingactions_permissions(self):
|
||||
url = f"{base_url}/deployments/"
|
||||
@@ -714,15 +708,17 @@ class TestClientPermissions(TacticalTestCase):
|
||||
site = baker.make("clients.Site")
|
||||
other_site = baker.make("clients.Site")
|
||||
deployments = baker.make("clients.Deployment", site=site, _quantity=5)
|
||||
other_deployments = baker.make("clients.Deployment", site=other_site, _quantity=7)
|
||||
other_deployments = baker.make(
|
||||
"clients.Deployment", site=other_site, _quantity=7
|
||||
)
|
||||
|
||||
# test getting all deployments
|
||||
# make sure superusers work
|
||||
self.check_authorized_superuser("get", url)
|
||||
|
||||
|
||||
# create user with empty role
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# user with empty role should fail
|
||||
self.check_not_authorized("get", url)
|
||||
@@ -733,23 +729,23 @@ class TestClientPermissions(TacticalTestCase):
|
||||
|
||||
# all sites should be returned
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 12) # type: ignore
|
||||
self.assertEqual(len(response.data), 12) # type: ignore
|
||||
|
||||
# limit user to specific site. only 1 site should be returned
|
||||
user.role.can_view_sites.set([site])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 5) # type: ignore
|
||||
self.assertEqual(len(response.data), 5) # type: ignore
|
||||
|
||||
# all should be returned now
|
||||
user.role.can_view_clients.set([other_site.client])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 12) # type: ignore
|
||||
self.assertEqual(len(response.data), 12) # type: ignore
|
||||
|
||||
# check if limiting user to client works
|
||||
user.role.can_view_sites.clear()
|
||||
user.role.can_view_clients.set([other_site.client])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 7) # type: ignore
|
||||
self.assertEqual(len(response.data), 7) # type: ignore
|
||||
|
||||
@patch("clients.models.Deployment.save")
|
||||
def test_add_deployments_permissions(self, save):
|
||||
@@ -770,7 +766,7 @@ class TestClientPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser("post", url, data)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("post", url, data)
|
||||
@@ -797,7 +793,9 @@ class TestClientPermissions(TacticalTestCase):
|
||||
site = baker.make("clients.Site")
|
||||
unauthorized_site = baker.make("clients.Site")
|
||||
deployment = baker.make("clients.Deployment", site=site)
|
||||
unauthorized_deployment = baker.make("clients.Deployment", site=unauthorized_site)
|
||||
unauthorized_deployment = baker.make(
|
||||
"clients.Deployment", site=unauthorized_site
|
||||
)
|
||||
|
||||
url = f"{base_url}/deployments/{deployment.id}/"
|
||||
unauthorized_url = f"{base_url}/deployments/{unauthorized_deployment.id}/"
|
||||
@@ -808,7 +806,7 @@ class TestClientPermissions(TacticalTestCase):
|
||||
|
||||
# create user with empty role
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# make sure user with empty role is unauthorized
|
||||
self.check_not_authorized("delete", url)
|
||||
@@ -820,14 +818,16 @@ class TestClientPermissions(TacticalTestCase):
|
||||
|
||||
self.check_authorized("delete", url)
|
||||
self.check_authorized("delete", unauthorized_url)
|
||||
|
||||
|
||||
# test limiting users to clients and sites
|
||||
|
||||
# limit to site
|
||||
user.role.can_view_sites.set([site])
|
||||
|
||||
# recreate deployment since it is being deleted even though I am mocking delete on Deployment model???
|
||||
unauthorized_deployment = baker.make("clients.Deployment", site=unauthorized_site)
|
||||
unauthorized_deployment = baker.make(
|
||||
"clients.Deployment", site=unauthorized_site
|
||||
)
|
||||
unauthorized_url = f"{base_url}/deployments/{unauthorized_deployment.id}/"
|
||||
|
||||
self.check_authorized("delete", url)
|
||||
|
||||
@@ -25,9 +25,10 @@ class CodeSignPerms(permissions.BasePermission):
|
||||
def has_permission(self, r, view):
|
||||
return _has_perm(r, "can_code_sign")
|
||||
|
||||
|
||||
class CustomFieldPerms(permissions.BasePermission):
|
||||
def has_permission(self, r, view):
|
||||
if r.method == "GET":
|
||||
return _has_perm(r, "can_view_customfields")
|
||||
else:
|
||||
return _has_perm(r, "can_manage_customfields")
|
||||
return _has_perm(r, "can_manage_customfields")
|
||||
|
||||
@@ -418,6 +418,7 @@ class TestCoreTasks(TacticalTestCase):
|
||||
|
||||
self.check_not_authenticated("patch", url)
|
||||
|
||||
|
||||
class TestCorePermissions(TacticalTestCase):
|
||||
def setUp(self):
|
||||
self.client_setup()
|
||||
|
||||
@@ -13,7 +13,11 @@ from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
|
||||
from tacticalrmm.utils import notify_error
|
||||
from tacticalrmm.permissions import _has_perm_on_client, _has_perm_on_agent, _has_perm_on_site
|
||||
from tacticalrmm.permissions import (
|
||||
_has_perm_on_client,
|
||||
_has_perm_on_agent,
|
||||
_has_perm_on_site,
|
||||
)
|
||||
|
||||
from .models import CodeSignToken, CoreSettings, CustomField, GlobalKVStore, URLAction
|
||||
from .permissions import (
|
||||
@@ -21,7 +25,7 @@ from .permissions import (
|
||||
CoreSettingsPerms,
|
||||
ServerMaintPerms,
|
||||
URLActionPerms,
|
||||
CustomFieldPerms
|
||||
CustomFieldPerms,
|
||||
)
|
||||
from .serializers import (
|
||||
CodeSignTokenSerializer,
|
||||
@@ -56,18 +60,17 @@ class UploadMeshAgent(APIView):
|
||||
|
||||
class GetEditCoreSettings(APIView):
|
||||
@permission_classes([IsAuthenticated, CoreSettingsPerms])
|
||||
|
||||
def get(self, request):
|
||||
settings = CoreSettings.objects.first()
|
||||
return Response(CoreSettingsSerializer(settings).data)
|
||||
|
||||
def put(self, request):
|
||||
def put(self, request):
|
||||
coresettings = CoreSettings.objects.first()
|
||||
serializer = CoreSettingsSerializer(instance=coresettings, data=request.data)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
serializer.save()
|
||||
|
||||
return Response("ok")
|
||||
return Response("ok")
|
||||
|
||||
|
||||
@api_view()
|
||||
@@ -332,6 +335,7 @@ class GetAddURLAction(APIView):
|
||||
|
||||
class UpdateDeleteURLAction(APIView):
|
||||
permission_classes = [IsAuthenticated, CoreSettingsPerms]
|
||||
|
||||
def put(self, request, pk):
|
||||
action = get_object_or_404(URLAction, pk=pk)
|
||||
|
||||
@@ -400,6 +404,7 @@ class RunURLAction(APIView):
|
||||
|
||||
class TwilioSMSTest(APIView):
|
||||
permission_classes = [IsAuthenticated, CoreSettingsPerms]
|
||||
|
||||
def post(self, request):
|
||||
|
||||
core = CoreSettings.objects.first()
|
||||
|
||||
@@ -325,7 +325,7 @@ class DebugLog(models.Model):
|
||||
|
||||
class PendingAction(models.Model):
|
||||
objects = PermissionQuerySet.as_manager()
|
||||
|
||||
|
||||
agent = models.ForeignKey(
|
||||
"agents.Agent",
|
||||
related_name="pendingactions",
|
||||
|
||||
@@ -12,9 +12,11 @@ class PendingActionPerms(permissions.BasePermission):
|
||||
def has_permission(self, r, view):
|
||||
if r.method == "GET":
|
||||
if "agent_id" in view.kwargs.keys():
|
||||
return _has_perm(r, "can_list_pendingactions") and _has_perm_on_agent(r.user, view.kwargs["agent_id"])
|
||||
return _has_perm(r, "can_list_pendingactions") and _has_perm_on_agent(
|
||||
r.user, view.kwargs["agent_id"]
|
||||
)
|
||||
else:
|
||||
return _has_perm(r, "can_list_pendingactions")
|
||||
return _has_perm(r, "can_list_pendingactions")
|
||||
else:
|
||||
return _has_perm(r, "can_manage_pendingactions")
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from logs.models import PendingAction
|
||||
|
||||
base_url = "/logs"
|
||||
|
||||
|
||||
class TestAuditViews(TacticalTestCase):
|
||||
def setUp(self):
|
||||
self.authenticate()
|
||||
@@ -228,7 +229,9 @@ class TestAuditViews(TacticalTestCase):
|
||||
)
|
||||
|
||||
nats_cmd.return_value = "error deleting sched task"
|
||||
r = self.client.delete(f"{base_url}/pendingactions/{action2.id}/", format="json")
|
||||
r = self.client.delete(
|
||||
f"{base_url}/pendingactions/{action2.id}/", format="json"
|
||||
)
|
||||
self.assertEqual(r.status_code, 400)
|
||||
self.assertEqual(r.data, "error deleting sched task") # type: ignore
|
||||
|
||||
@@ -346,7 +349,10 @@ class TestAuditViews(TacticalTestCase):
|
||||
url = f"{base_url}/debug/"
|
||||
|
||||
# test superuser access
|
||||
self.check_authorized_superuser("patch", url, )
|
||||
self.check_authorized_superuser(
|
||||
"patch",
|
||||
url,
|
||||
)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user)
|
||||
@@ -379,7 +385,6 @@ class TestAuditViews(TacticalTestCase):
|
||||
response = self.check_authorized("patch", url)
|
||||
self.assertEqual(len(response.data), 27)
|
||||
|
||||
|
||||
def test_get_pendingaction_permissions(self):
|
||||
agent = baker.make_recipe("agents.agent")
|
||||
unauthorized_agent = baker.make_recipe("agents.agent")
|
||||
@@ -390,7 +395,9 @@ class TestAuditViews(TacticalTestCase):
|
||||
|
||||
# test super user access
|
||||
self.check_authorized_superuser("get", f"{base_url}/pendingactions/")
|
||||
self.check_authorized_superuser("get", f"/agents/{agent.agent_id}/pendingactions/")
|
||||
self.check_authorized_superuser(
|
||||
"get", f"/agents/{agent.agent_id}/pendingactions/"
|
||||
)
|
||||
self.check_authorized_superuser(
|
||||
"get", f"/agents/{unauthorized_agent.agent_id}/pendingactions/"
|
||||
)
|
||||
@@ -433,8 +440,12 @@ class TestAuditViews(TacticalTestCase):
|
||||
def test_delete_pendingaction_permissions(self, delete, nats_cmd):
|
||||
agent = baker.make_recipe("agents.agent")
|
||||
unauthorized_agent = baker.make_recipe("agents.agent")
|
||||
action = baker.make("logs.PendingAction", agent=agent, details={"taskname": "Task"})
|
||||
unauthorized_action = baker.make("logs.PendingAction", agent=unauthorized_agent, details={"taskname": "Task"})
|
||||
action = baker.make(
|
||||
"logs.PendingAction", agent=agent, details={"taskname": "Task"}
|
||||
)
|
||||
unauthorized_action = baker.make(
|
||||
"logs.PendingAction", agent=unauthorized_agent, details={"taskname": "Task"}
|
||||
)
|
||||
|
||||
url = f"{base_url}/pendingactions/{action.id}/"
|
||||
unauthorized_url = f"{base_url}/pendingactions/{unauthorized_action.id}/"
|
||||
|
||||
@@ -44,9 +44,7 @@ class GetAuditLogs(APIView):
|
||||
agentFilter = Q(agent_id__in=request.data["agentFilter"])
|
||||
|
||||
elif "clientFilter" in request.data:
|
||||
clients = Client.objects.filter(
|
||||
pk__in=request.data["clientFilter"]
|
||||
)
|
||||
clients = Client.objects.filter(pk__in=request.data["clientFilter"])
|
||||
agents = Agent.objects.filter(site__client__in=clients).values_list(
|
||||
"agent_id"
|
||||
)
|
||||
@@ -146,5 +144,7 @@ class GetDebugLog(APIView):
|
||||
)
|
||||
|
||||
ctx = {"default_tz": get_default_timezone()}
|
||||
ret = DebugLogSerializer(debug_logs.order_by("-entry_time")[0:1000], many=True, context=ctx).data
|
||||
ret = DebugLogSerializer(
|
||||
debug_logs.order_by("-entry_time")[0:1000], many=True, context=ctx
|
||||
).data
|
||||
return Response(ret)
|
||||
|
||||
@@ -74,7 +74,9 @@ class GetEditActionService(APIView):
|
||||
elif not r["success"] and r["errormsg"]:
|
||||
return notify_error(r["errormsg"])
|
||||
elif r["success"]:
|
||||
return Response(f"The service was {'started' if action == 'start' else 'stopped'} successfully")
|
||||
return Response(
|
||||
f"The service was {'started' if action == 'start' else 'stopped'} successfully"
|
||||
)
|
||||
|
||||
return notify_error("Something went wrong")
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from django.db import models
|
||||
|
||||
|
||||
class PermissionQuerySet(models.QuerySet):
|
||||
|
||||
# filters queryset based on permissions. Works different for Agent, Client, and Site
|
||||
@@ -32,10 +33,14 @@ class PermissionQuerySet(models.QuerySet):
|
||||
# checks which sites and clients the user has access to and filters clients and sites
|
||||
elif model_name == "Client" and (can_view_clients or can_view_sites):
|
||||
if can_view_sites:
|
||||
sites_queryset = models.Q(pk__in=[site.client.pk for site in can_view_sites])
|
||||
sites_queryset = models.Q(
|
||||
pk__in=[site.client.pk for site in can_view_sites]
|
||||
)
|
||||
|
||||
if can_view_clients:
|
||||
clients_queryset = models.Q(pk__in=can_view_clients.values_list("pk", flat=True))
|
||||
clients_queryset = models.Q(
|
||||
pk__in=can_view_clients.values_list("pk", flat=True)
|
||||
)
|
||||
|
||||
return self.filter(sites_queryset | clients_queryset)
|
||||
|
||||
@@ -43,22 +48,32 @@ class PermissionQuerySet(models.QuerySet):
|
||||
if can_view_clients:
|
||||
clients_queryset = models.Q(client__in=can_view_clients)
|
||||
if can_view_sites:
|
||||
sites_queryset = models.Q(pk__in=can_view_sites.values_list("pk", flat=True))
|
||||
sites_queryset = models.Q(
|
||||
pk__in=can_view_sites.values_list("pk", flat=True)
|
||||
)
|
||||
|
||||
return self.filter(clients_queryset | sites_queryset)
|
||||
|
||||
elif model_name == "Alert":
|
||||
|
||||
if can_view_clients:
|
||||
clients_queryset = models.Q(agent__site__client__in=can_view_clients) | models.Q(assigned_check__agent__site__client__in=can_view_clients) | models.Q(assigned_task__agent__site__client__in=can_view_clients)
|
||||
clients_queryset = (
|
||||
models.Q(agent__site__client__in=can_view_clients)
|
||||
| models.Q(assigned_check__agent__site__client__in=can_view_clients)
|
||||
| models.Q(assigned_task__agent__site__client__in=can_view_clients)
|
||||
)
|
||||
if can_view_sites:
|
||||
sites_queryset = models.Q(agent__site__in=can_view_sites) | models.Q(assigned_check__agent__site__in=can_view_sites) | models.Q(assigned_task__agent__site__in=can_view_sites)
|
||||
sites_queryset = (
|
||||
models.Q(agent__site__in=can_view_sites)
|
||||
| models.Q(assigned_check__agent__site__in=can_view_sites)
|
||||
| models.Q(assigned_task__agent__site__in=can_view_sites)
|
||||
)
|
||||
|
||||
agent_queryset = models.Q(agent=None, assigned_check=None, assigned_task=None)
|
||||
agent_queryset = models.Q(
|
||||
agent=None, assigned_check=None, assigned_task=None
|
||||
)
|
||||
|
||||
return self.filter(
|
||||
clients_queryset | sites_queryset | agent_queryset
|
||||
)
|
||||
return self.filter(clients_queryset | sites_queryset | agent_queryset)
|
||||
|
||||
# anything else just checks the agent field and if it has it will filter matched agents from the queryset
|
||||
else:
|
||||
@@ -76,6 +91,4 @@ class PermissionQuerySet(models.QuerySet):
|
||||
if can_view_sites:
|
||||
sites_queryset = models.Q(agent__site__in=can_view_sites)
|
||||
|
||||
return self.filter(
|
||||
clients_queryset | sites_queryset | agent_queryset
|
||||
)
|
||||
return self.filter(clients_queryset | sites_queryset | agent_queryset)
|
||||
|
||||
@@ -9,7 +9,7 @@ def _has_perm(request, perm):
|
||||
request.user.role and getattr(request.user.role, "is_superuser")
|
||||
):
|
||||
return True
|
||||
|
||||
|
||||
# make sure non-superusers with empty roles aren't permitted
|
||||
elif not request.user.role:
|
||||
return False
|
||||
@@ -40,11 +40,13 @@ def _has_perm_on_agent(user, agent_id: str):
|
||||
|
||||
elif can_view_sites and agent.site in can_view_sites:
|
||||
return True
|
||||
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _has_perm_on_client(user, client_id: int):
|
||||
from clients.models import Client
|
||||
|
||||
role = user.role
|
||||
|
||||
if user.is_superuser or (role and getattr(role, "is_superuser")):
|
||||
@@ -61,11 +63,13 @@ def _has_perm_on_client(user, client_id: int):
|
||||
|
||||
elif can_view_clients and client in can_view_clients:
|
||||
return True
|
||||
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _has_perm_on_site(user, site_id: int):
|
||||
from clients.models import Site
|
||||
|
||||
role = user.role
|
||||
if user.is_superuser or (role and getattr(role, "is_superuser")):
|
||||
return True
|
||||
@@ -89,6 +93,7 @@ def _has_perm_on_site(user, site_id: int):
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _audit_log_filter(user) -> Q:
|
||||
role = user.role
|
||||
if user.is_superuser or (role and getattr(role, "is_superuser")):
|
||||
@@ -105,13 +110,17 @@ def _audit_log_filter(user) -> Q:
|
||||
can_view_sites = role.can_view_sites.all() if role else None
|
||||
|
||||
if can_view_sites:
|
||||
agents = Agent.objects.filter(site__in=can_view_sites).values_list("agent_id", flat=True)
|
||||
agents = Agent.objects.filter(site__in=can_view_sites).values_list(
|
||||
"agent_id", flat=True
|
||||
)
|
||||
sites_queryset = Q(agent_id__in=agents)
|
||||
agent_filter = Q(agent_id=None)
|
||||
|
||||
if can_view_clients:
|
||||
agents = Agent.objects.filter(site__client__in=can_view_clients).values_list("agent_id", flat=True)
|
||||
agents = Agent.objects.filter(site__client__in=can_view_clients).values_list(
|
||||
"agent_id", flat=True
|
||||
)
|
||||
sites_queryset = Q(agent_id__in=agents)
|
||||
agent_filter = Q(agent_id=None)
|
||||
|
||||
return Q(sites_queryset | clients_queryset | agent_filter)
|
||||
return Q(sites_queryset | clients_queryset | agent_filter)
|
||||
|
||||
Reference in New Issue
Block a user