black
This commit is contained in:
@@ -41,4 +41,3 @@ class APIKeyPerms(permissions.BasePermission):
|
||||
return _has_perm(r, "can_list_api_keys")
|
||||
|
||||
return _has_perm(r, "can_manage_api_keys")
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ class TestAgentsList(TacticalTestCase):
|
||||
# make sure data is returned with the AgentHostnameSerializer
|
||||
agents = Agent.objects.filter(site=site3)
|
||||
serializer = AgentHostnameSerializer(agents, many=True)
|
||||
self.assertEqual(r.data, serializer.data) # type: ignore
|
||||
self.assertEqual(r.data, serializer.data) # type: ignore
|
||||
|
||||
self.check_not_authenticated("get", url)
|
||||
|
||||
@@ -950,7 +950,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
def test_list_agents_permissions(self):
|
||||
# create user with empty role
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
url = f"{base_url}/"
|
||||
|
||||
@@ -968,17 +968,17 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
|
||||
# all agents should be returned
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 10) # type: ignore
|
||||
self.assertEqual(len(response.data), 10) # type: ignore
|
||||
|
||||
# limit user to specific client. only 1 agent should be returned
|
||||
user.role.can_view_clients.set([agents[4].client])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 2) # type: ignore
|
||||
self.assertEqual(len(response.data), 2) # type: ignore
|
||||
|
||||
# limit agent to specific site. 2 should be returned now
|
||||
user.role.can_view_sites.set([agents[6].site])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data), 4) # type: ignore
|
||||
self.assertEqual(len(response.data), 4) # type: ignore
|
||||
|
||||
# make sure superusers work
|
||||
self.check_authorized_superuser("get", url)
|
||||
@@ -988,7 +988,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
def test_get_edit_uninstall_permissions(self, reload_nats, nats_cmd):
|
||||
# create user with empty role
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
agent = baker.make_recipe("agents.agent")
|
||||
methods = ["get", "put", "delete"]
|
||||
@@ -1070,7 +1070,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser(test["method"], url)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized(test["method"], url)
|
||||
@@ -1108,7 +1108,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser("post", url, client_data)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("post", url, site_data)
|
||||
@@ -1149,7 +1149,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
update_task.reset_mock()
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
self.check_not_authorized("post", url, data)
|
||||
update_task.assert_not_called()
|
||||
@@ -1188,10 +1188,10 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
|
||||
# test superuser access
|
||||
response = self.check_authorized_superuser("get", url)
|
||||
self.assertEqual(len(response.data["agents"]), 12) # type: ignore
|
||||
self.assertEqual(len(response.data["agents"]), 12) # type: ignore
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
self.check_not_authorized("get", url)
|
||||
|
||||
@@ -1199,22 +1199,22 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
user.role.save()
|
||||
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data["agents"]), 12) # type: ignore
|
||||
self.assertEqual(len(response.data["agents"]), 12) # type: ignore
|
||||
|
||||
# limit to client
|
||||
user.role.can_view_clients.set([agents[0].client])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data["agents"]), 5) # type: ignore
|
||||
self.assertEqual(len(response.data["agents"]), 5) # type: ignore
|
||||
|
||||
# add site
|
||||
user.role.can_view_sites.set([other_agents[0].site])
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data["agents"]), 12) # type: ignore
|
||||
self.assertEqual(len(response.data["agents"]), 12) # type: ignore
|
||||
|
||||
# remove client permissions
|
||||
user.role.can_view_clients.clear()
|
||||
response = self.check_authorized("get", url)
|
||||
self.assertEqual(len(response.data["agents"]), 7) # type: ignore
|
||||
self.assertEqual(len(response.data["agents"]), 7) # type: ignore
|
||||
|
||||
def test_generating_agent_installer_permissions(self):
|
||||
|
||||
@@ -1228,7 +1228,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser("post", url)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
self.check_not_authorized("post", url)
|
||||
|
||||
@@ -1314,7 +1314,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser(test["method"], test["url"])
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.check_not_authorized(test["method"], test["url"])
|
||||
|
||||
setattr(user.role, test["role"], True)
|
||||
@@ -1325,7 +1325,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
user = self.create_user_with_roles(["can_list_notes", "can_manage_notes"])
|
||||
user.role.can_view_sites.set([agent.site])
|
||||
user.role.save()
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
authorized_data = {"note": "Test not here", "agent_id": agent.agent_id}
|
||||
|
||||
@@ -1336,7 +1336,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
|
||||
# should only return the 4 allowed agent notes (one got deleted above in loop)
|
||||
r = self.client.get(f"{base_url}/notes/")
|
||||
self.assertEqual(len(r.data), 4) # type: ignore
|
||||
self.assertEqual(len(r.data), 4) # type: ignore
|
||||
|
||||
# test with agent_id in url
|
||||
self.check_authorized("get", f"{base_url}/{agent.agent_id}/notes/")
|
||||
@@ -1365,7 +1365,7 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
def test_get_agent_history_permissions(self):
|
||||
# create user with empty role
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
sites = baker.make("clients.Site", _quantity=2)
|
||||
agent = baker.make_recipe("agents.agent", site=sites[0])
|
||||
@@ -1394,14 +1394,14 @@ class TestAgentPermissions(TacticalTestCase):
|
||||
r = self.check_authorized("get", url)
|
||||
self.check_authorized("get", authorized_url)
|
||||
self.check_authorized("get", unauthorized_url)
|
||||
self.assertEqual(len(r.data), 11) # type: ignore
|
||||
self.assertEqual(len(r.data), 11) # type: ignore
|
||||
|
||||
# limit user to specific client.
|
||||
user.role.can_view_clients.set([agent.client])
|
||||
self.check_authorized("get", authorized_url)
|
||||
self.check_not_authorized("get", unauthorized_url)
|
||||
r = self.check_authorized("get", url)
|
||||
self.assertEqual(len(r.data), 5) # type: ignore
|
||||
self.assertEqual(len(r.data), 5) # type: ignore
|
||||
|
||||
# make sure superusers work
|
||||
self.check_authorized_superuser("get", url)
|
||||
|
||||
@@ -3,6 +3,7 @@ from rest_framework import permissions
|
||||
|
||||
from tacticalrmm.permissions import _has_perm, _has_perm_on_agent
|
||||
|
||||
|
||||
def _has_perm_on_alert(user, id: int):
|
||||
from alerts.models import Alert
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from .serializers import (
|
||||
|
||||
base_url = "/alerts"
|
||||
|
||||
|
||||
class TestAlertsViews(TacticalTestCase):
|
||||
def setUp(self):
|
||||
self.authenticate()
|
||||
@@ -1437,7 +1438,6 @@ class TestAlertTasks(TacticalTestCase):
|
||||
|
||||
|
||||
class TestAlertPermissions(TacticalTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.setup_coresettings()
|
||||
self.client_setup()
|
||||
@@ -1449,17 +1449,26 @@ class TestAlertPermissions(TacticalTestCase):
|
||||
agents = [agent, agent1, agent2]
|
||||
checks = baker.make("checks.Check", agent=cycle(agents), _quantity=3)
|
||||
tasks = baker.make("autotasks.AutomatedTask", agent=cycle(agents), _quantity=3)
|
||||
baker.make("alerts.Alert", alert_type="task", assigned_task=cycle(tasks), _quantity=3)
|
||||
baker.make("alerts.Alert", alert_type="check", assigned_check=cycle(checks), _quantity=3)
|
||||
baker.make("alerts.Alert", alert_type="availability", agent=cycle(agents), _quantity=3)
|
||||
baker.make(
|
||||
"alerts.Alert", alert_type="task", assigned_task=cycle(tasks), _quantity=3
|
||||
)
|
||||
baker.make(
|
||||
"alerts.Alert",
|
||||
alert_type="check",
|
||||
assigned_check=cycle(checks),
|
||||
_quantity=3,
|
||||
)
|
||||
baker.make(
|
||||
"alerts.Alert", alert_type="availability", agent=cycle(agents), _quantity=3
|
||||
)
|
||||
baker.make("alerts.Alert", alert_type="custom", _quantity=4)
|
||||
|
||||
# test super user access
|
||||
r = self.check_authorized_superuser("patch", f"{base_url}/")
|
||||
self.assertEqual(len(r.data), 13) # type: ignore
|
||||
self.assertEqual(len(r.data), 13) # type: ignore
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
self.check_not_authorized("patch", f"{base_url}/")
|
||||
|
||||
@@ -1468,23 +1477,23 @@ class TestAlertPermissions(TacticalTestCase):
|
||||
user.role.save()
|
||||
|
||||
r = self.check_authorized("patch", f"{base_url}/")
|
||||
self.assertEqual(len(r.data), 13) # type: ignore
|
||||
self.assertEqual(len(r.data), 13) # type: ignore
|
||||
|
||||
# test limiting to client
|
||||
user.role.can_view_clients.set([agent.client])
|
||||
r = self.check_authorized("patch", f"{base_url}/")
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
|
||||
# test limiting to site
|
||||
user.role.can_view_clients.clear()
|
||||
user.role.can_view_sites.set([agent1.site])
|
||||
r = self.client.patch(f"{base_url}/")
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
|
||||
# test limiting to site and client
|
||||
user.role.can_view_clients.set([agent2.client])
|
||||
r = self.client.patch(f"{base_url}/")
|
||||
self.assertEqual(len(r.data), 10) # type: ignore
|
||||
self.assertEqual(len(r.data), 10) # type: ignore
|
||||
|
||||
@patch("alerts.models.Alert.delete", return_value=1)
|
||||
def test_edit_delete_get_alert_permissions(self, delete):
|
||||
@@ -1494,25 +1503,38 @@ class TestAlertPermissions(TacticalTestCase):
|
||||
agents = [agent, agent1, agent2]
|
||||
checks = baker.make("checks.Check", agent=cycle(agents), _quantity=3)
|
||||
tasks = baker.make("autotasks.AutomatedTask", agent=cycle(agents), _quantity=3)
|
||||
alert_tasks = baker.make("alerts.Alert", alert_type="task", assigned_task=cycle(tasks), _quantity=3)
|
||||
alert_checks = baker.make("alerts.Alert", alert_type="check", assigned_check=cycle(checks), _quantity=3)
|
||||
alert_agents = baker.make("alerts.Alert", alert_type="availability", agent=cycle(agents), _quantity=3)
|
||||
alert_tasks = baker.make(
|
||||
"alerts.Alert", alert_type="task", assigned_task=cycle(tasks), _quantity=3
|
||||
)
|
||||
alert_checks = baker.make(
|
||||
"alerts.Alert",
|
||||
alert_type="check",
|
||||
assigned_check=cycle(checks),
|
||||
_quantity=3,
|
||||
)
|
||||
alert_agents = baker.make(
|
||||
"alerts.Alert", alert_type="availability", agent=cycle(agents), _quantity=3
|
||||
)
|
||||
alert_custom = baker.make("alerts.Alert", alert_type="custom", _quantity=4)
|
||||
|
||||
# alert task url
|
||||
task_url = f"{base_url}/{alert_tasks[0].id}/" # for agent
|
||||
unauthorized_task_url = f"{base_url}/{alert_tasks[1].id}/" # for agent1
|
||||
task_url = f"{base_url}/{alert_tasks[0].id}/" # for agent
|
||||
unauthorized_task_url = f"{base_url}/{alert_tasks[1].id}/" # for agent1
|
||||
# alert check url
|
||||
check_url = f"{base_url}/{alert_checks[0].id}/" # for agent
|
||||
unauthorized_check_url = f"{base_url}/{alert_checks[1].id}/" # for agent1
|
||||
check_url = f"{base_url}/{alert_checks[0].id}/" # for agent
|
||||
unauthorized_check_url = f"{base_url}/{alert_checks[1].id}/" # for agent1
|
||||
# alert agent url
|
||||
agent_url = f"{base_url}/{alert_agents[0].id}/" # for agent
|
||||
unauthorized_agent_url = f"{base_url}/{alert_agents[1].id}/" # for agent1
|
||||
agent_url = f"{base_url}/{alert_agents[0].id}/" # for agent
|
||||
unauthorized_agent_url = f"{base_url}/{alert_agents[1].id}/" # for agent1
|
||||
# custom alert url
|
||||
custom_url = f"{base_url}/{alert_custom[0].id}/" # no agent associated
|
||||
custom_url = f"{base_url}/{alert_custom[0].id}/" # no agent associated
|
||||
|
||||
authorized_urls = [task_url, check_url, agent_url, custom_url]
|
||||
unauthorized_urls = [unauthorized_agent_url, unauthorized_check_url, unauthorized_task_url]
|
||||
unauthorized_urls = [
|
||||
unauthorized_agent_url,
|
||||
unauthorized_check_url,
|
||||
unauthorized_task_url,
|
||||
]
|
||||
|
||||
for method in ["get", "put", "delete"]:
|
||||
|
||||
@@ -1521,17 +1543,17 @@ class TestAlertPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser(method, url)
|
||||
|
||||
for url in unauthorized_urls:
|
||||
self.check_authorized_superuser(method, url)
|
||||
self.check_authorized_superuser(method, url)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
for url in authorized_urls:
|
||||
self.check_not_authorized(method, url)
|
||||
|
||||
for url in unauthorized_urls:
|
||||
self.check_not_authorized(method, url)
|
||||
self.check_not_authorized(method, url)
|
||||
|
||||
# add user to role and test
|
||||
setattr(
|
||||
@@ -1546,7 +1568,7 @@ class TestAlertPermissions(TacticalTestCase):
|
||||
self.check_authorized(method, url)
|
||||
|
||||
for url in unauthorized_urls:
|
||||
self.check_authorized(method, url)
|
||||
self.check_authorized(method, url)
|
||||
|
||||
# limit user to client if agent check
|
||||
user.role.can_view_clients.set([agent.client])
|
||||
@@ -1555,7 +1577,7 @@ class TestAlertPermissions(TacticalTestCase):
|
||||
self.check_authorized(method, url)
|
||||
|
||||
for url in unauthorized_urls:
|
||||
self.check_not_authorized(method, url)
|
||||
self.check_not_authorized(method, url)
|
||||
|
||||
# limit user to client if agent check
|
||||
user.role.can_view_sites.set([agent1.site])
|
||||
@@ -1564,4 +1586,4 @@ class TestAlertPermissions(TacticalTestCase):
|
||||
self.check_authorized(method, url)
|
||||
|
||||
for url in unauthorized_urls:
|
||||
self.check_authorized(method, url)
|
||||
self.check_authorized(method, url)
|
||||
|
||||
@@ -875,7 +875,7 @@ class TestCheckPermissions(TacticalTestCase):
|
||||
)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
self.check_not_authorized("get", f"{base_url}/")
|
||||
self.check_not_authorized("get", f"/agents/{agent.agent_id}/checks/")
|
||||
@@ -891,13 +891,13 @@ class TestCheckPermissions(TacticalTestCase):
|
||||
r = self.check_authorized("get", f"{base_url}/")
|
||||
self.assertEqual(len(r.data), 14) # type: ignore
|
||||
r = self.check_authorized("get", f"/agents/{agent.agent_id}/checks/")
|
||||
self.assertEqual(len(r.data), 5) # type: ignore
|
||||
self.assertEqual(len(r.data), 5) # type: ignore
|
||||
r = self.check_authorized(
|
||||
"get", f"/agents/{unauthorized_agent.agent_id}/checks/"
|
||||
)
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
r = self.check_authorized("get", f"/automation/policies/{policy.id}/checks/")
|
||||
self.assertEqual(len(r.data), 2) # type: ignore
|
||||
self.assertEqual(len(r.data), 2) # type: ignore
|
||||
|
||||
# test limiting to client
|
||||
user.role.can_view_clients.set([agent.client])
|
||||
@@ -909,7 +909,7 @@ class TestCheckPermissions(TacticalTestCase):
|
||||
|
||||
# make sure queryset is limited too
|
||||
r = self.client.get(f"{base_url}/")
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
|
||||
def test_add_check_permissions(self):
|
||||
agent = baker.make_recipe("agents.agent")
|
||||
@@ -950,7 +950,7 @@ class TestCheckPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser("post", url, data)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("post", url, data)
|
||||
@@ -991,7 +991,7 @@ class TestCheckPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser(method, policy_url)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized(method, url)
|
||||
@@ -1037,7 +1037,7 @@ class TestCheckPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser("post", unauthorized_url)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("post", url)
|
||||
@@ -1074,7 +1074,7 @@ class TestCheckPermissions(TacticalTestCase):
|
||||
self.check_authorized_superuser("patch", unauthorized_url)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("patch", url)
|
||||
|
||||
@@ -293,7 +293,7 @@ class TestAuditViews(TacticalTestCase):
|
||||
self.check_authorized_superuser("patch", url, data)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("patch", url, data)
|
||||
@@ -303,18 +303,18 @@ class TestAuditViews(TacticalTestCase):
|
||||
user.role.save()
|
||||
|
||||
response = self.check_authorized("patch", url, data)
|
||||
self.assertEqual(len(response.data["audit_logs"]), 86) # type: ignore
|
||||
self.assertEqual(len(response.data["audit_logs"]), 86) # type: ignore
|
||||
|
||||
# limit user to client if agent check
|
||||
user.role.can_view_sites.set([site])
|
||||
|
||||
response = self.check_authorized("patch", url, data)
|
||||
self.assertEqual(len(response.data["audit_logs"]), 63) # type: ignore
|
||||
self.assertEqual(len(response.data["audit_logs"]), 63) # type: ignore
|
||||
|
||||
# limit user to client if agent check
|
||||
user.role.can_view_clients.set([site.client])
|
||||
response = self.check_authorized("patch", url, data)
|
||||
self.assertEqual(len(response.data["audit_logs"]), 63) # type: ignore
|
||||
self.assertEqual(len(response.data["audit_logs"]), 63) # type: ignore
|
||||
|
||||
def test_debuglog_permissions(self):
|
||||
|
||||
@@ -353,7 +353,7 @@ class TestAuditViews(TacticalTestCase):
|
||||
)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("patch", url)
|
||||
@@ -363,25 +363,25 @@ class TestAuditViews(TacticalTestCase):
|
||||
user.role.save()
|
||||
|
||||
response = self.check_authorized("patch", url)
|
||||
self.assertEqual(len(response.data), 27) # type: ignore
|
||||
self.assertEqual(len(response.data), 27) # type: ignore
|
||||
|
||||
# limit user to site
|
||||
user.role.can_view_sites.set([agent.site])
|
||||
|
||||
response = self.check_authorized("patch", url)
|
||||
self.assertEqual(len(response.data), 19) # type: ignore
|
||||
self.assertEqual(len(response.data), 19) # type: ignore
|
||||
|
||||
# limit user to client
|
||||
user.role.can_view_sites.clear()
|
||||
user.role.can_view_clients.set([agent2.site.client])
|
||||
response = self.check_authorized("patch", url)
|
||||
self.assertEqual(len(response.data), 23) # type: ignore
|
||||
self.assertEqual(len(response.data), 23) # type: ignore
|
||||
|
||||
# limit user to client and site
|
||||
user.role.can_view_sites.set([agent.site])
|
||||
user.role.can_view_clients.set([agent2.site.client])
|
||||
response = self.check_authorized("patch", url)
|
||||
self.assertEqual(len(response.data), 27) # type: ignore
|
||||
self.assertEqual(len(response.data), 27) # type: ignore
|
||||
|
||||
def test_get_pendingaction_permissions(self):
|
||||
agent = baker.make_recipe("agents.agent")
|
||||
@@ -401,7 +401,7 @@ class TestAuditViews(TacticalTestCase):
|
||||
)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
self.check_not_authorized("get", f"{base_url}/pendingactions/")
|
||||
self.check_not_authorized("get", f"/agents/{agent.agent_id}/pendingactions/")
|
||||
@@ -414,13 +414,13 @@ class TestAuditViews(TacticalTestCase):
|
||||
user.role.save()
|
||||
|
||||
r = self.check_authorized("get", f"{base_url}/pendingactions/")
|
||||
self.assertEqual(len(r.data), 12) # type: ignore
|
||||
self.assertEqual(len(r.data), 12) # type: ignore
|
||||
r = self.check_authorized("get", f"/agents/{agent.agent_id}/pendingactions/")
|
||||
self.assertEqual(len(r.data), 5) # type: ignore
|
||||
self.assertEqual(len(r.data), 5) # type: ignore
|
||||
r = self.check_authorized(
|
||||
"get", f"/agents/{unauthorized_agent.agent_id}/pendingactions/"
|
||||
)
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
self.assertEqual(len(r.data), 7) # type: ignore
|
||||
|
||||
# test limiting to client
|
||||
user.role.can_view_clients.set([agent.client])
|
||||
@@ -431,7 +431,7 @@ class TestAuditViews(TacticalTestCase):
|
||||
|
||||
# make sure queryset is limited too
|
||||
r = self.client.get(f"{base_url}/pendingactions/")
|
||||
self.assertEqual(len(r.data), 5) # type: ignore
|
||||
self.assertEqual(len(r.data), 5) # type: ignore
|
||||
|
||||
@patch("agents.models.Agent.nats_cmd", return_value="ok")
|
||||
@patch("logs.models.PendingAction.delete")
|
||||
@@ -453,7 +453,7 @@ class TestAuditViews(TacticalTestCase):
|
||||
self.check_authorized_superuser("delete", unauthorized_url)
|
||||
|
||||
user = self.create_user_with_roles([])
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
self.client.force_authenticate(user=user) # type: ignore
|
||||
|
||||
# test user without role
|
||||
self.check_not_authorized("delete", url)
|
||||
|
||||
@@ -55,7 +55,7 @@ class PermissionQuerySet(models.QuerySet):
|
||||
return self.filter(clients_queryset | sites_queryset)
|
||||
|
||||
elif model_name == "Alert":
|
||||
|
||||
|
||||
custom_alert_queryset = models.Q()
|
||||
if can_view_clients:
|
||||
clients_queryset = (
|
||||
@@ -64,11 +64,19 @@ class PermissionQuerySet(models.QuerySet):
|
||||
| models.Q(assigned_task__agent__site__client__in=can_view_clients)
|
||||
)
|
||||
if can_view_sites:
|
||||
sites_queryset = models.Q(agent__site__in=can_view_sites) | models.Q(assigned_check__agent__site__in=can_view_sites) | models.Q(assigned_task__agent__site__in=can_view_sites)
|
||||
sites_queryset = (
|
||||
models.Q(agent__site__in=can_view_sites)
|
||||
| models.Q(assigned_check__agent__site__in=can_view_sites)
|
||||
| models.Q(assigned_task__agent__site__in=can_view_sites)
|
||||
)
|
||||
if can_view_clients or can_view_sites:
|
||||
custom_alert_queryset = models.Q(agent=None, assigned_check=None, assigned_task=None)
|
||||
custom_alert_queryset = models.Q(
|
||||
agent=None, assigned_check=None, assigned_task=None
|
||||
)
|
||||
|
||||
return self.filter(clients_queryset | sites_queryset | custom_alert_queryset)
|
||||
return self.filter(
|
||||
clients_queryset | sites_queryset | custom_alert_queryset
|
||||
)
|
||||
|
||||
# anything else just checks the agent field and if it has it will filter matched agents from the queryset
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user