Files
tacticalrmm/api/tacticalrmm/core/utils.py
2022-10-19 06:45:53 +00:00

186 lines
5.0 KiB
Python

import json
import subprocess
import tempfile
import urllib.parse
from base64 import b64encode
from typing import TYPE_CHECKING, Optional, cast
import requests
import websockets
from django.conf import settings
from django.core.cache import cache
from django.http import FileResponse
from meshctrl.utils import get_auth_token
from tacticalrmm.constants import (
CORESETTINGS_CACHE_KEY,
ROLE_CACHE_PREFIX,
AgentPlat,
MeshAgentIdent,
)
if TYPE_CHECKING:
from core.models import CodeSignToken, CoreSettings
class CoreSettingsNotFound(Exception):
pass
def clear_entire_cache() -> None:
cache.delete_many_pattern(f"{ROLE_CACHE_PREFIX}*")
cache.delete(CORESETTINGS_CACHE_KEY)
cache.delete_many_pattern("site_*")
cache.delete_many_pattern("agent_*")
def token_is_valid() -> tuple[str, bool]:
"""
Return type: token: str, is_valid: bool. Token wil be an empty string is not valid.
"""
from core.models import CodeSignToken
t: "Optional[CodeSignToken]" = CodeSignToken.objects.first()
if not t:
return "", False
if not t.token:
return "", False
if t.is_valid:
return t.token, True
return "", False
def token_is_expired() -> bool:
from core.models import CodeSignToken
t: "CodeSignToken" = CodeSignToken.objects.first()
if not t or not t.token:
return False
return t.is_expired
def get_core_settings() -> "CoreSettings":
from core.models import CORESETTINGS_CACHE_KEY, CoreSettings
coresettings = cache.get(CORESETTINGS_CACHE_KEY)
if coresettings and isinstance(coresettings, CoreSettings):
return coresettings
else:
coresettings = CoreSettings.objects.first()
if not coresettings:
raise CoreSettingsNotFound("CoreSettings not found.")
cache.set(CORESETTINGS_CACHE_KEY, coresettings, 600)
return cast(CoreSettings, coresettings)
def get_mesh_ws_url() -> str:
core = get_core_settings()
token = get_auth_token(core.mesh_username, core.mesh_token)
if settings.DOCKER_BUILD:
uri = f"{settings.MESH_WS_URL}/control.ashx?auth={token}"
else:
site = core.mesh_site.replace("https", "wss")
uri = f"{site}/control.ashx?auth={token}"
return uri
async def get_mesh_device_id(uri: str, device_group: str) -> None:
async with websockets.connect(uri) as ws:
payload = {"action": "meshes", "responseid": "meshctrl"}
await ws.send(json.dumps(payload))
async for message in ws:
r = json.loads(message)
if r["action"] == "meshes":
return list(filter(lambda x: x["name"] == device_group, r["meshes"]))[
0
]["_id"].split("mesh//")[1]
def download_mesh_agent(dl_url: str) -> FileResponse:
with tempfile.NamedTemporaryFile(prefix="mesh-", dir=settings.EXE_DIR) as fp:
r = requests.get(dl_url, stream=True, timeout=15)
with open(fp.name, "wb") as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
del r
return FileResponse(open(fp.name, "rb"), as_attachment=True, filename=fp.name)
def _b64_to_hex(h: str) -> str:
return b64encode(bytes.fromhex(h)).decode().replace(r"/", "$").replace(r"+", "@")
async def send_command_with_mesh(
cmd: str, uri: str, mesh_node_id: str, shell: int, run_as_user: int
) -> None:
node_id = _b64_to_hex(mesh_node_id)
async with websockets.connect(uri) as ws:
await ws.send(
json.dumps(
{
"action": "runcommands",
"cmds": cmd,
"nodeids": [f"node//{node_id}"],
"runAsUser": run_as_user,
"type": shell,
"responseid": "trmm",
}
)
)
async def remove_mesh_agent(uri: str, mesh_node_id: str) -> None:
node_id = _b64_to_hex(mesh_node_id)
async with websockets.connect(uri) as ws:
await ws.send(
json.dumps(
{
"action": "removedevices",
"nodeids": [f"node//{node_id}"],
"responseid": "trmm",
}
)
)
def sysd_svc_is_running(svc: str) -> bool:
cmd = ["systemctl", "is-active", "--quiet", svc]
r = subprocess.run(cmd, capture_output=True)
return not r.returncode
def get_meshagent_url(
*, ident: "MeshAgentIdent", plat: str, mesh_site: str, mesh_device_id: str
) -> str:
if settings.DOCKER_BUILD:
base = settings.MESH_WS_URL.replace("ws://", "http://")
else:
base = mesh_site
if plat == AgentPlat.WINDOWS:
params = {
"id": ident,
"meshid": mesh_device_id,
"installflags": 0,
}
else:
params = {
"id": mesh_device_id,
"installflags": 2,
"meshinstall": ident,
}
return base + "/meshagents?" + urllib.parse.urlencode(params)