improve typing support

This commit is contained in:
wh1te909
2021-02-23 09:50:57 +00:00
parent 85d010615d
commit 10256864e4
17 changed files with 95 additions and 90 deletions

View File

@@ -44,3 +44,4 @@ model_bakery
mkdocs
mkdocs-material
pymdown-extensions
mypy

1
.gitignore vendored
View File

@@ -45,3 +45,4 @@ htmlcov/
docker-compose.dev.yml
docs/.vuepress/dist
nats-rmm.conf
.mypy_cache

View File

@@ -3,7 +3,14 @@
"python.languageServer": "Pylance",
"python.analysis.extraPaths": [
"api/tacticalrmm",
"api/env",
],
"python.analysis.diagnosticSeverityOverrides": {
"reportUnusedImport": "error",
"reportDuplicateImport": "error",
},
"python.analysis.memory.keepLibraryAst": true,
"python.linting.mypyEnabled": true,
"python.analysis.typeCheckingMode": "basic",
"python.formatting.provider": "black",
"editor.formatOnSave": true,

View File

@@ -78,7 +78,7 @@ class GetAddUsers(APIView):
def post(self, request):
# add new user
try:
user = User.objects.create_user(
user = User.objects.create_user( # type: ignore
request.data["username"],
request.data["email"],
request.data["password"],

View File

@@ -1,7 +1,7 @@
from django.conf import settings
from django.core.management.base import BaseCommand
from agents.models import Agent
from django.conf import settings
class Command(BaseCommand):

View File

@@ -164,14 +164,14 @@ class Agent(BaseAuditModel):
@property
def has_patches_pending(self):
return self.winupdates.filter(action="approve").filter(installed=False).exists()
return self.winupdates.filter(action="approve").filter(installed=False).exists() # type: ignore
@property
def checks(self):
total, passing, failing = 0, 0, 0
if self.agentchecks.exists():
for i in self.agentchecks.all():
if self.agentchecks.exists(): # type: ignore
for i in self.agentchecks.all(): # type: ignore
total += 1
if i.status == "passing":
passing += 1
@@ -333,27 +333,27 @@ class Agent(BaseAuditModel):
updates = list()
if patch_policy.critical == "approve":
updates += self.winupdates.filter(
updates += self.winupdates.filter( # type: ignore
severity="Critical", installed=False
).exclude(action="approve")
if patch_policy.important == "approve":
updates += self.winupdates.filter(
updates += self.winupdates.filter( # type: ignore
severity="Important", installed=False
).exclude(action="approve")
if patch_policy.moderate == "approve":
updates += self.winupdates.filter(
updates += self.winupdates.filter( # type: ignore
severity="Moderate", installed=False
).exclude(action="approve")
if patch_policy.low == "approve":
updates += self.winupdates.filter(severity="Low", installed=False).exclude(
updates += self.winupdates.filter(severity="Low", installed=False).exclude( # type: ignore
action="approve"
)
if patch_policy.other == "approve":
updates += self.winupdates.filter(severity="", installed=False).exclude(
updates += self.winupdates.filter(severity="", installed=False).exclude( # type: ignore
action="approve"
)
@@ -368,7 +368,7 @@ class Agent(BaseAuditModel):
site = self.site
core_settings = CoreSettings.objects.first()
patch_policy = None
agent_policy = self.winupdatepolicy.get()
agent_policy = self.winupdatepolicy.get() # type: ignore
if self.monitoring_type == "server":
# check agent policy first which should override client or site policy
@@ -455,7 +455,7 @@ class Agent(BaseAuditModel):
def get_approved_update_guids(self) -> list[str]:
return list(
self.winupdates.filter(action="approve", installed=False).values_list(
self.winupdates.filter(action="approve", installed=False).values_list( # type: ignore
"guid", flat=True
)
)
@@ -571,7 +571,7 @@ class Agent(BaseAuditModel):
from automation.models import Policy
# Clear agent checks that have overriden_by_policy set
self.agentchecks.update(overriden_by_policy=False)
self.agentchecks.update(overriden_by_policy=False) # type: ignore
# Generate checks based on policies
Policy.generate_policy_checks(self)
@@ -606,7 +606,7 @@ class Agent(BaseAuditModel):
except Exception:
return "err"
async def nats_cmd(self, data, timeout=30, wait=True):
async def nats_cmd(self, data: dict, timeout: int = 30, wait: bool = True):
nc = NATS()
options = {
"servers": f"tls://{settings.ALLOWED_HOSTS[0]}:4222",
@@ -628,7 +628,7 @@ class Agent(BaseAuditModel):
except ErrTimeout:
ret = "timeout"
else:
ret = msgpack.loads(msg.data)
ret = msgpack.loads(msg.data) # type: ignore
await nc.close()
return ret
@@ -650,12 +650,12 @@ class Agent(BaseAuditModel):
def delete_superseded_updates(self):
try:
pks = [] # list of pks to delete
kbs = list(self.winupdates.values_list("kb", flat=True))
kbs = list(self.winupdates.values_list("kb", flat=True)) # type: ignore
d = Counter(kbs)
dupes = [k for k, v in d.items() if v > 1]
for dupe in dupes:
titles = self.winupdates.filter(kb=dupe).values_list("title", flat=True)
titles = self.winupdates.filter(kb=dupe).values_list("title", flat=True) # type: ignore
# extract the version from the title and sort from oldest to newest
# skip if no version info is available therefore nothing to parse
try:
@@ -668,17 +668,17 @@ class Agent(BaseAuditModel):
continue
# append all but the latest version to our list of pks to delete
for ver in sorted_vers[:-1]:
q = self.winupdates.filter(kb=dupe).filter(title__contains=ver)
q = self.winupdates.filter(kb=dupe).filter(title__contains=ver) # type: ignore
pks.append(q.first().pk)
pks = list(set(pks))
self.winupdates.filter(pk__in=pks).delete()
self.winupdates.filter(pk__in=pks).delete() # type: ignore
except:
pass
# define how the agent should handle pending actions
def handle_pending_actions(self):
pending_actions = self.pendingactions.filter(status="pending")
pending_actions = self.pendingactions.filter(status="pending") # type: ignore
for action in pending_actions:
if action.action_type == "taskaction":
@@ -702,7 +702,7 @@ class Agent(BaseAuditModel):
# for clearing duplicate pending actions on agent
def remove_matching_pending_task_actions(self, task_id):
# remove any other pending actions on agent with same task_id
for action in self.pendingactions.exclude(status="completed"):
for action in self.pendingactions.exclude(status="completed"): # type: ignore
if action.details["task_id"] == task_id:
action.delete()
@@ -798,7 +798,7 @@ class Agent(BaseAuditModel):
return
# add a null check history to allow gaps in graph
for check in self.agentchecks.all():
for check in self.agentchecks.all(): # type: ignore
check.add_check_history(None)
else:
alert = Alert.objects.get(agent=self, resolved=False)

View File

@@ -3,7 +3,6 @@ import datetime as dt
import os
import random
import string
import subprocess
from django.conf import settings
from django.http import HttpResponse
@@ -99,7 +98,7 @@ def edit_agent(request):
a_serializer.save()
if "winupdatepolicy" in request.data.keys():
policy = agent.winupdatepolicy.get()
policy = agent.winupdatepolicy.get() # type: ignore
p_serializer = WinUpdatePolicySerializer(
instance=policy, data=request.data["winupdatepolicy"][0]
)
@@ -553,7 +552,7 @@ def recover(request):
if r == "ok":
return Response("Successfully completed recovery")
if agent.recoveryactions.filter(last_run=None).exists():
if agent.recoveryactions.filter(last_run=None).exists(): # type: ignore
return notify_error(
"A recovery action is currently pending. Please wait for the next agent check-in."
)

View File

@@ -283,4 +283,4 @@ class AlertTemplate(models.Model):
@property
def is_default_template(self) -> bool:
return self.default_alert_template.exists()
return self.default_alert_template.exists() # type: ignore

View File

@@ -50,26 +50,26 @@ class CheckIn(APIView):
# change agent update pending status to completed if agent has just updated
if (
updated
and agent.pendingactions.filter(
and agent.pendingactions.filter( # type: ignore
action_type="agentupdate", status="pending"
).exists()
):
agent.pendingactions.filter(
agent.pendingactions.filter( # type: ignore
action_type="agentupdate", status="pending"
).update(status="completed")
# handles any alerting actions
agent.handle_alert(checkin=True)
recovery = agent.recoveryactions.filter(last_run=None).last()
recovery = agent.recoveryactions.filter(last_run=None).last() # type: ignore
if recovery is not None:
recovery.last_run = djangotime.now()
recovery.save(update_fields=["last_run"])
handle_agent_recovery_task.delay(pk=recovery.pk)
handle_agent_recovery_task.delay(pk=recovery.pk) # type: ignore
return Response("ok")
# get any pending actions
if agent.pendingactions.filter(status="pending").exists():
if agent.pendingactions.filter(status="pending").exists(): # type: ignore
agent.handle_pending_actions()
return Response("ok")
@@ -111,7 +111,7 @@ class CheckIn(APIView):
if not InstalledSoftware.objects.filter(agent=agent).exists():
InstalledSoftware(agent=agent, software=sw).save()
else:
s = agent.installedsoftware_set.first()
s = agent.installedsoftware_set.first() # type: ignore
s.software = sw
s.save(update_fields=["software"])
@@ -184,7 +184,7 @@ class WinUpdates(APIView):
def patch(self, request):
agent = get_object_or_404(Agent, agent_id=request.data["agent_id"])
u = agent.winupdates.filter(guid=request.data["guid"]).last()
u = agent.winupdates.filter(guid=request.data["guid"]).last() # type: ignore
success: bool = request.data["success"]
if success:
u.result = "success"
@@ -210,8 +210,8 @@ class WinUpdates(APIView):
agent = get_object_or_404(Agent, agent_id=request.data["agent_id"])
updates = request.data["wua_updates"]
for update in updates:
if agent.winupdates.filter(guid=update["guid"]).exists():
u = agent.winupdates.filter(guid=update["guid"]).last()
if agent.winupdates.filter(guid=update["guid"]).exists(): # type: ignore
u = agent.winupdates.filter(guid=update["guid"]).last() # type: ignore
u.downloaded = update["downloaded"]
u.installed = update["installed"]
u.save(update_fields=["downloaded", "installed"])
@@ -242,7 +242,7 @@ class WinUpdates(APIView):
# more superseded updates cleanup
if pyver.parse(agent.version) <= pyver.parse("1.4.2"):
for u in agent.winupdates.filter(
for u in agent.winupdates.filter( # type: ignore
date_installed__isnull=True, result="failed"
).exclude(installed=True):
u.delete()
@@ -256,7 +256,7 @@ class SupersededWinUpdate(APIView):
def post(self, request):
agent = get_object_or_404(Agent, agent_id=request.data["agent_id"])
updates = agent.winupdates.filter(guid=request.data["guid"])
updates = agent.winupdates.filter(guid=request.data["guid"]) # type: ignore
for u in updates:
u.delete()
@@ -404,10 +404,10 @@ class NewAgent(APIView):
agent.salt_id = f"{agent.hostname}-{agent.pk}"
agent.save(update_fields=["salt_id"])
user = User.objects.create_user(
user = User.objects.create_user( # type: ignore
username=request.data["agent_id"],
agent=agent,
password=User.objects.make_random_password(60),
password=User.objects.make_random_password(60), # type: ignore
)
token = Token.objects.create(user=user)
@@ -452,7 +452,7 @@ class Software(APIView):
if not InstalledSoftware.objects.filter(agent=agent).exists():
InstalledSoftware(agent=agent, software=sw).save()
else:
s = agent.installedsoftware_set.first()
s = agent.installedsoftware_set.first() # type: ignore
s.software = sw
s.save(update_fields=["software"])

View File

@@ -43,11 +43,11 @@ class Policy(BaseAuditModel):
@property
def is_default_server_policy(self):
return self.default_server_policy.exists()
return self.default_server_policy.exists() # type: ignore
@property
def is_default_workstation_policy(self):
return self.default_workstation_policy.exists()
return self.default_workstation_policy.exists() # type: ignore
def __str__(self):
return self.name
@@ -56,7 +56,7 @@ class Policy(BaseAuditModel):
return self.get_related("server") | self.get_related("workstation")
def get_related(self, mon_type):
explicit_agents = self.agents.filter(monitoring_type=mon_type)
explicit_agents = self.agents.filter(monitoring_type=mon_type) # type: ignore
explicit_clients = getattr(self, f"{mon_type}_clients").all()
explicit_sites = getattr(self, f"{mon_type}_sites").all()

View File

@@ -171,7 +171,7 @@ class UpdatePatchPolicy(APIView):
serializer = WinUpdatePolicySerializer(data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.policy = policy
serializer.policy = policy # type: ignore
serializer.save()
return Response("ok")

View File

@@ -205,9 +205,9 @@ class Check(BaseAuditModel):
if self.error_threshold:
text += f" Error Threshold: {self.error_threshold}%"
return f"{self.get_check_type_display()}: Drive {self.disk} < {text}"
return f"{self.get_check_type_display()}: Drive {self.disk} < {text}" # type: ignore
elif self.check_type == "ping":
return f"{self.get_check_type_display()}: {self.name}"
return f"{self.get_check_type_display()}: {self.name}" # type: ignore
elif self.check_type == "cpuload" or self.check_type == "memory":
text = ""
@@ -216,13 +216,13 @@ class Check(BaseAuditModel):
if self.error_threshold:
text += f" Error Threshold: {self.error_threshold}%"
return f"{self.get_check_type_display()} > {text}"
return f"{self.get_check_type_display()} > {text}" # type: ignore
elif self.check_type == "winsvc":
return f"{self.get_check_type_display()}: {self.svc_display_name}"
return f"{self.get_check_type_display()}: {self.svc_display_name}" # type: ignore
elif self.check_type == "eventlog":
return f"{self.get_check_type_display()}: {self.name}"
return f"{self.get_check_type_display()}: {self.name}" # type: ignore
elif self.check_type == "script":
return f"{self.get_check_type_display()}: {self.script.name}"
return f"{self.get_check_type_display()}: {self.script.name}" # type: ignore
else:
return "n/a"

View File

@@ -59,7 +59,7 @@ class AddCheck(APIView):
if policy:
generate_agent_checks_from_policies_task.delay(policypk=policy.pk)
elif agent:
checks = agent.agentchecks.filter(
checks = agent.agentchecks.filter( # type: ignore
check_type=obj.check_type, managed_by_policy=True
)
@@ -149,7 +149,7 @@ class CheckHistory(APIView):
- djangotime.timedelta(days=request.data["timeFilter"]),
)
check_history = check.check_history.filter(timeFilter).order_by("-x")
check_history = check.check_history.filter(timeFilter).order_by("-x") # type: ignore
return Response(
CheckHistorySerializer(

View File

@@ -16,11 +16,7 @@ from rest_framework.views import APIView
from agents.models import Agent
from agents.serializers import WinAgentSerializer
from agents.tasks import (
agent_recovery_email_task,
agent_recovery_sms_task,
handle_agent_recovery_task,
)
from agents.tasks import handle_agent_recovery_task
from checks.utils import bytes2human
from software.models import InstalledSoftware
from tacticalrmm.utils import SoftwareList, filter_software, notify_error
@@ -38,8 +34,8 @@ def nats_info(request):
class NatsCheckIn(APIView):
authentication_classes = []
permission_classes = []
authentication_classes = [] # type: ignore
permission_classes = [] # type: ignore
def patch(self, request):
updated = False
@@ -57,18 +53,18 @@ class NatsCheckIn(APIView):
# change agent update pending status to completed if agent has just updated
if (
updated
and agent.pendingactions.filter(
and agent.pendingactions.filter( # type: ignore
action_type="agentupdate", status="pending"
).exists()
):
agent.pendingactions.filter(
agent.pendingactions.filter( # type: ignore
action_type="agentupdate", status="pending"
).update(status="completed")
# handles any alerting actions
agent.handle_alert(checkin=True)
recovery = agent.recoveryactions.filter(last_run=None).last()
recovery = agent.recoveryactions.filter(last_run=None).last() # type: ignore
if recovery is not None:
recovery.last_run = djangotime.now()
recovery.save(update_fields=["last_run"])
@@ -76,7 +72,7 @@ class NatsCheckIn(APIView):
return Response("ok")
# get any pending actions
if agent.pendingactions.filter(status="pending").exists():
if agent.pendingactions.filter(status="pending").exists(): # type: ignore
agent.handle_pending_actions()
return Response("ok")
@@ -118,7 +114,7 @@ class NatsCheckIn(APIView):
if not InstalledSoftware.objects.filter(agent=agent).exists():
InstalledSoftware(agent=agent, software=sw).save()
else:
s = agent.installedsoftware_set.first()
s = agent.installedsoftware_set.first() # type: ignore
s.software = sw
s.save(update_fields=["software"])
@@ -140,8 +136,8 @@ class NatsCheckIn(APIView):
class SyncMeshNodeID(APIView):
authentication_classes = []
permission_classes = []
authentication_classes = [] # type: ignore
permission_classes = [] # type: ignore
def post(self, request):
agent = get_object_or_404(Agent, agent_id=request.data["agent_id"])
@@ -153,8 +149,8 @@ class SyncMeshNodeID(APIView):
class NatsChoco(APIView):
authentication_classes = []
permission_classes = []
authentication_classes = [] # type: ignore
permission_classes = [] # type: ignore
def post(self, request):
agent = get_object_or_404(Agent, agent_id=request.data["agent_id"])
@@ -164,8 +160,8 @@ class NatsChoco(APIView):
class NatsWinUpdates(APIView):
authentication_classes = []
permission_classes = []
authentication_classes = [] # type: ignore
permission_classes = [] # type: ignore
def put(self, request):
agent = get_object_or_404(Agent, agent_id=request.data["agent_id"])
@@ -191,7 +187,7 @@ class NatsWinUpdates(APIView):
def patch(self, request):
agent = get_object_or_404(Agent, agent_id=request.data["agent_id"])
u = agent.winupdates.filter(guid=request.data["guid"]).last()
u = agent.winupdates.filter(guid=request.data["guid"]).last() # type: ignore
success: bool = request.data["success"]
if success:
u.result = "success"
@@ -217,8 +213,8 @@ class NatsWinUpdates(APIView):
agent = get_object_or_404(Agent, agent_id=request.data["agent_id"])
updates = request.data["wua_updates"]
for update in updates:
if agent.winupdates.filter(guid=update["guid"]).exists():
u = agent.winupdates.filter(guid=update["guid"]).last()
if agent.winupdates.filter(guid=update["guid"]).exists(): # type: ignore
u = agent.winupdates.filter(guid=update["guid"]).last() # type: ignore
u.downloaded = update["downloaded"]
u.installed = update["installed"]
u.save(update_fields=["downloaded", "installed"])
@@ -249,7 +245,7 @@ class NatsWinUpdates(APIView):
# more superseded updates cleanup
if pyver.parse(agent.version) <= pyver.parse("1.4.2"):
for u in agent.winupdates.filter(
for u in agent.winupdates.filter( # type: ignore
date_installed__isnull=True, result="failed"
).exclude(installed=True):
u.delete()
@@ -258,12 +254,12 @@ class NatsWinUpdates(APIView):
class SupersededWinUpdate(APIView):
authentication_classes = []
permission_classes = []
authentication_classes = [] # type: ignore
permission_classes = [] # type: ignore
def post(self, request):
agent = get_object_or_404(Agent, agent_id=request.data["agent_id"])
updates = agent.winupdates.filter(guid=request.data["guid"])
updates = agent.winupdates.filter(guid=request.data["guid"]) # type: ignore
for u in updates:
u.delete()
@@ -272,8 +268,8 @@ class SupersededWinUpdate(APIView):
class NatsWMI(APIView):
authentication_classes = []
permission_classes = []
authentication_classes = [] # type: ignore
permission_classes = [] # type: ignore
def get(self, request):
agents = Agent.objects.only(
@@ -288,8 +284,8 @@ class NatsWMI(APIView):
class OfflineAgents(APIView):
authentication_classes = []
permission_classes = []
authentication_classes = [] # type: ignore
permission_classes = [] # type: ignore
def get(self, request):
agents = Agent.objects.only(
@@ -302,8 +298,8 @@ class OfflineAgents(APIView):
class LogCrash(APIView):
authentication_classes = []
permission_classes = []
authentication_classes = [] # type: ignore
permission_classes = [] # type: ignore
def post(self, request):
agent = get_object_or_404(Agent, agent_id=request.data["agentid"])

View File

@@ -5,3 +5,4 @@ mkdocs
mkdocs-material
pymdown-extensions
isort
mypy

View File

@@ -55,7 +55,7 @@ def refresh_installed(request, pk):
if not InstalledSoftware.objects.filter(agent=agent).exists():
InstalledSoftware(agent=agent, software=sw).save()
else:
s = agent.installedsoftware_set.first()
s = agent.installedsoftware_set.first() # type: ignore
s.software = sw
s.save(update_fields=["software"])

View File

@@ -14,11 +14,11 @@ app = Celery(
broker="redis://" + settings.REDIS_HOST,
)
# app.config_from_object('django.conf:settings', namespace='CELERY')
app.broker_url = "redis://" + settings.REDIS_HOST + ":6379"
app.result_backend = "redis://" + settings.REDIS_HOST + ":6379"
app.accept_content = ["application/json"]
app.result_serializer = "json"
app.task_serializer = "json"
app.broker_url = "redis://" + settings.REDIS_HOST + ":6379" # type: ignore
app.result_backend = "redis://" + settings.REDIS_HOST + ":6379" # type: ignore
app.accept_content = ["application/json"] # type: ignore
app.result_serializer = "json" # type: ignore
app.task_serializer = "json" # type: ignore
app.conf.task_track_started = True
app.autodiscover_tasks()