207 lines
5.8 KiB
Python
207 lines
5.8 KiB
Python
import json
|
|
import subprocess
|
|
import tempfile
|
|
import urllib.parse
|
|
from base64 import b64encode
|
|
from typing import TYPE_CHECKING, Optional, cast
|
|
|
|
import requests
|
|
import websockets
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
from django.http import FileResponse
|
|
from meshctrl.utils import get_auth_token
|
|
|
|
from tacticalrmm.constants import (
|
|
AGENT_TBL_PEND_ACTION_CNT_CACHE_PREFIX,
|
|
CORESETTINGS_CACHE_KEY,
|
|
ROLE_CACHE_PREFIX,
|
|
AgentPlat,
|
|
MeshAgentIdent,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from core.models import CoreSettings
|
|
|
|
|
|
class CoreSettingsNotFound(Exception):
|
|
pass
|
|
|
|
|
|
def clear_entire_cache() -> None:
|
|
cache.delete_many_pattern(f"{ROLE_CACHE_PREFIX}*")
|
|
cache.delete_many_pattern(f"{AGENT_TBL_PEND_ACTION_CNT_CACHE_PREFIX}*")
|
|
cache.delete(CORESETTINGS_CACHE_KEY)
|
|
cache.delete_many_pattern("site_*")
|
|
cache.delete_many_pattern("agent_*")
|
|
|
|
|
|
def token_is_valid() -> tuple[str, bool]:
|
|
"""
|
|
Return type: token: str, is_valid: bool. Token wil be an empty string is not valid.
|
|
"""
|
|
from core.models import CodeSignToken
|
|
|
|
t: "Optional[CodeSignToken]" = CodeSignToken.objects.first()
|
|
if not t:
|
|
return "", False
|
|
|
|
if not t.token:
|
|
return "", False
|
|
|
|
if t.is_valid:
|
|
return t.token, True
|
|
|
|
return "", False
|
|
|
|
|
|
def token_is_expired() -> bool:
|
|
from core.models import CodeSignToken
|
|
|
|
t: "CodeSignToken" = CodeSignToken.objects.first()
|
|
if not t or not t.token:
|
|
return False
|
|
|
|
return t.is_expired
|
|
|
|
|
|
def get_core_settings() -> "CoreSettings":
|
|
from core.models import CORESETTINGS_CACHE_KEY, CoreSettings
|
|
|
|
coresettings = cache.get(CORESETTINGS_CACHE_KEY)
|
|
|
|
if coresettings and isinstance(coresettings, CoreSettings):
|
|
return coresettings
|
|
else:
|
|
coresettings = CoreSettings.objects.first()
|
|
if not coresettings:
|
|
raise CoreSettingsNotFound("CoreSettings not found.")
|
|
|
|
cache.set(CORESETTINGS_CACHE_KEY, coresettings, 600)
|
|
return cast(CoreSettings, coresettings)
|
|
|
|
|
|
def get_mesh_ws_url() -> str:
|
|
core = get_core_settings()
|
|
token = get_auth_token(core.mesh_username, core.mesh_token)
|
|
|
|
if settings.DOCKER_BUILD:
|
|
uri = f"{settings.MESH_WS_URL}/control.ashx?auth={token}"
|
|
else:
|
|
if getattr(settings, "TRMM_INSECURE", False):
|
|
site = core.mesh_site.replace("https", "ws")
|
|
uri = f"{site}:4430/control.ashx?auth={token}"
|
|
else:
|
|
site = core.mesh_site.replace("https", "wss")
|
|
uri = f"{site}/control.ashx?auth={token}"
|
|
|
|
return uri
|
|
|
|
|
|
async def get_mesh_device_id(uri: str, device_group: str) -> None:
|
|
async with websockets.connect(uri) as ws:
|
|
payload = {"action": "meshes", "responseid": "meshctrl"}
|
|
await ws.send(json.dumps(payload))
|
|
|
|
async for message in ws:
|
|
r = json.loads(message)
|
|
if r["action"] == "meshes":
|
|
return list(filter(lambda x: x["name"] == device_group, r["meshes"]))[
|
|
0
|
|
]["_id"].split("mesh//")[1]
|
|
|
|
|
|
def download_mesh_agent(dl_url: str) -> FileResponse:
|
|
with tempfile.NamedTemporaryFile(prefix="mesh-", dir=settings.EXE_DIR) as fp:
|
|
r = requests.get(dl_url, stream=True, timeout=15)
|
|
with open(fp.name, "wb") as f:
|
|
for chunk in r.iter_content(chunk_size=1024):
|
|
if chunk:
|
|
f.write(chunk)
|
|
del r
|
|
|
|
return FileResponse(open(fp.name, "rb"), as_attachment=True, filename=fp.name)
|
|
|
|
|
|
def _b64_to_hex(h: str) -> str:
|
|
return b64encode(bytes.fromhex(h)).decode().replace(r"/", "$").replace(r"+", "@")
|
|
|
|
|
|
async def send_command_with_mesh(
|
|
cmd: str, uri: str, mesh_node_id: str, shell: int, run_as_user: int
|
|
) -> None:
|
|
node_id = _b64_to_hex(mesh_node_id)
|
|
async with websockets.connect(uri) as ws:
|
|
await ws.send(
|
|
json.dumps(
|
|
{
|
|
"action": "runcommands",
|
|
"cmds": cmd,
|
|
"nodeids": [f"node//{node_id}"],
|
|
"runAsUser": run_as_user,
|
|
"type": shell,
|
|
"responseid": "trmm",
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
async def wake_on_lan(*, uri: str, mesh_node_id: str) -> None:
|
|
node_id = _b64_to_hex(mesh_node_id)
|
|
async with websockets.connect(uri) as ws:
|
|
await ws.send(
|
|
json.dumps(
|
|
{
|
|
"action": "wakedevices",
|
|
"nodeids": [f"node//{node_id}"],
|
|
"responseid": "trmm",
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
async def remove_mesh_agent(uri: str, mesh_node_id: str) -> None:
|
|
node_id = _b64_to_hex(mesh_node_id)
|
|
async with websockets.connect(uri) as ws:
|
|
await ws.send(
|
|
json.dumps(
|
|
{
|
|
"action": "removedevices",
|
|
"nodeids": [f"node//{node_id}"],
|
|
"responseid": "trmm",
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
def sysd_svc_is_running(svc: str) -> bool:
|
|
cmd = ["systemctl", "is-active", "--quiet", svc]
|
|
r = subprocess.run(cmd, capture_output=True)
|
|
return not r.returncode
|
|
|
|
|
|
def get_meshagent_url(
|
|
*, ident: "MeshAgentIdent", plat: str, mesh_site: str, mesh_device_id: str
|
|
) -> str:
|
|
if settings.DOCKER_BUILD:
|
|
base = settings.MESH_WS_URL.replace("ws://", "http://")
|
|
elif getattr(settings, "TRMM_INSECURE", False):
|
|
base = mesh_site.replace("https", "http") + ":4430"
|
|
else:
|
|
base = mesh_site
|
|
|
|
if plat == AgentPlat.WINDOWS:
|
|
params = {
|
|
"id": ident,
|
|
"meshid": mesh_device_id,
|
|
"installflags": 0,
|
|
}
|
|
else:
|
|
params = {
|
|
"id": mesh_device_id,
|
|
"installflags": 2,
|
|
"meshinstall": ident,
|
|
}
|
|
|
|
return base + "/meshagents?" + urllib.parse.urlencode(params)
|