mirror of
				https://github.com/9001/copyparty.git
				synced 2025-11-03 21:43:12 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			283 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			283 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python
 | 
						|
# coding: utf-8
 | 
						|
from __future__ import print_function, unicode_literals
 | 
						|
 | 
						|
import os
 | 
						|
import json
 | 
						|
import shutil
 | 
						|
import tempfile
 | 
						|
import unittest
 | 
						|
from textwrap import dedent
 | 
						|
from argparse import Namespace
 | 
						|
 | 
						|
from tests import util as tu
 | 
						|
from copyparty.authsrv import AuthSrv, VFS
 | 
						|
from copyparty import util
 | 
						|
 | 
						|
 | 
						|
class Cfg(Namespace):
 | 
						|
    def __init__(self, a=None, v=None, c=None):
 | 
						|
        ex = {k: False for k in "nw e2d e2ds e2dsa e2t e2ts e2tsr".split()}
 | 
						|
        ex2 = {
 | 
						|
            "mtp": [],
 | 
						|
            "mte": "a",
 | 
						|
            "mth": "",
 | 
						|
            "hist": None,
 | 
						|
            "no_hash": False,
 | 
						|
            "css_browser": None,
 | 
						|
            "no_voldump": True,
 | 
						|
            "no_logues": False,
 | 
						|
            "no_readme": False,
 | 
						|
            "re_maxage": 0,
 | 
						|
            "rproxy": 0,
 | 
						|
        }
 | 
						|
        ex.update(ex2)
 | 
						|
        super(Cfg, self).__init__(a=a or [], v=v or [], c=c, **ex)
 | 
						|
 | 
						|
 | 
						|
class TestVFS(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self.td = tu.get_ramdisk()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        os.chdir(tempfile.gettempdir())
 | 
						|
        shutil.rmtree(self.td)
 | 
						|
 | 
						|
    def dump(self, vfs):
 | 
						|
        print(json.dumps(vfs, indent=4, sort_keys=True, default=lambda o: o.__dict__))
 | 
						|
 | 
						|
    def unfoo(self, foo):
 | 
						|
        for k, v in {"foo": "a", "bar": "b", "baz": "c", "qux": "d"}.items():
 | 
						|
            foo = foo.replace(k, v)
 | 
						|
 | 
						|
        return foo
 | 
						|
 | 
						|
    def undot(self, vfs, query, response):
 | 
						|
        self.assertEqual(util.undot(query), response)
 | 
						|
        query = self.unfoo(query)
 | 
						|
        response = self.unfoo(response)
 | 
						|
        self.assertEqual(util.undot(query), response)
 | 
						|
 | 
						|
    def ls(self, vfs, vpath, uname):
 | 
						|
        # type: (VFS, str, str) -> tuple[str, str, str]
 | 
						|
        """helper for resolving and listing a folder"""
 | 
						|
        vn, rem = vfs.get(vpath, uname, True, False)
 | 
						|
        r1 = vn.ls(rem, uname, False, [[True]])
 | 
						|
        r2 = vn.ls(rem, uname, False, [[True]])
 | 
						|
        self.assertEqual(r1, r2)
 | 
						|
 | 
						|
        fsdir, real, virt = r1
 | 
						|
        real = [x[0] for x in real]
 | 
						|
        return fsdir, real, virt
 | 
						|
 | 
						|
    def log(self, src, msg, c=0):
 | 
						|
        pass
 | 
						|
 | 
						|
    def assertAxs(self, dct, lst):
 | 
						|
        t1 = list(sorted(dct.keys()))
 | 
						|
        t2 = list(sorted(lst))
 | 
						|
        self.assertEqual(t1, t2)
 | 
						|
 | 
						|
    def test(self):
 | 
						|
        td = os.path.join(self.td, "vfs")
 | 
						|
        os.mkdir(td)
 | 
						|
        os.chdir(td)
 | 
						|
 | 
						|
        for a in "abc":
 | 
						|
            for b in "abc":
 | 
						|
                for c in "abc":
 | 
						|
                    folder = "{0}/{0}{1}/{0}{1}{2}".format(a, b, c)
 | 
						|
                    os.makedirs(folder)
 | 
						|
                    for d in "abc":
 | 
						|
                        fn = "{}/{}{}{}{}".format(folder, a, b, c, d)
 | 
						|
                        with open(fn, "w") as f:
 | 
						|
                            f.write(fn)
 | 
						|
 | 
						|
        # defaults
 | 
						|
        vfs = AuthSrv(Cfg(), self.log).vfs
 | 
						|
        self.assertEqual(vfs.nodes, {})
 | 
						|
        self.assertEqual(vfs.vpath, "")
 | 
						|
        self.assertEqual(vfs.realpath, td)
 | 
						|
        self.assertAxs(vfs.axs.uread, ["*"])
 | 
						|
        self.assertAxs(vfs.axs.uwrite, ["*"])
 | 
						|
 | 
						|
        # single read-only rootfs (relative path)
 | 
						|
        vfs = AuthSrv(Cfg(v=["a/ab/::r"]), self.log).vfs
 | 
						|
        self.assertEqual(vfs.nodes, {})
 | 
						|
        self.assertEqual(vfs.vpath, "")
 | 
						|
        self.assertEqual(vfs.realpath, os.path.join(td, "a", "ab"))
 | 
						|
        self.assertAxs(vfs.axs.uread, ["*"])
 | 
						|
        self.assertAxs(vfs.axs.uwrite, [])
 | 
						|
 | 
						|
        # single read-only rootfs (absolute path)
 | 
						|
        vfs = AuthSrv(Cfg(v=[td + "//a/ac/../aa//::r"]), self.log).vfs
 | 
						|
        self.assertEqual(vfs.nodes, {})
 | 
						|
        self.assertEqual(vfs.vpath, "")
 | 
						|
        self.assertEqual(vfs.realpath, os.path.join(td, "a", "aa"))
 | 
						|
        self.assertAxs(vfs.axs.uread, ["*"])
 | 
						|
        self.assertAxs(vfs.axs.uwrite, [])
 | 
						|
 | 
						|
        # read-only rootfs with write-only subdirectory (read-write for k)
 | 
						|
        vfs = AuthSrv(
 | 
						|
            Cfg(a=["k:k"], v=[".::r:rw,k", "a/ac/acb:a/ac/acb:w:rw,k"]),
 | 
						|
            self.log,
 | 
						|
        ).vfs
 | 
						|
        self.assertEqual(len(vfs.nodes), 1)
 | 
						|
        self.assertEqual(vfs.vpath, "")
 | 
						|
        self.assertEqual(vfs.realpath, td)
 | 
						|
        self.assertAxs(vfs.axs.uread, ["*", "k"])
 | 
						|
        self.assertAxs(vfs.axs.uwrite, ["k"])
 | 
						|
        n = vfs.nodes["a"]
 | 
						|
        self.assertEqual(len(vfs.nodes), 1)
 | 
						|
        self.assertEqual(n.vpath, "a")
 | 
						|
        self.assertEqual(n.realpath, os.path.join(td, "a"))
 | 
						|
        self.assertAxs(n.axs.uread, ["*", "k"])
 | 
						|
        self.assertAxs(n.axs.uwrite, ["k"])
 | 
						|
        n = n.nodes["ac"]
 | 
						|
        self.assertEqual(len(vfs.nodes), 1)
 | 
						|
        self.assertEqual(n.vpath, "a/ac")
 | 
						|
        self.assertEqual(n.realpath, os.path.join(td, "a", "ac"))
 | 
						|
        self.assertAxs(n.axs.uread, ["*", "k"])
 | 
						|
        self.assertAxs(n.axs.uwrite, ["k"])
 | 
						|
        n = n.nodes["acb"]
 | 
						|
        self.assertEqual(n.nodes, {})
 | 
						|
        self.assertEqual(n.vpath, "a/ac/acb")
 | 
						|
        self.assertEqual(n.realpath, os.path.join(td, "a", "ac", "acb"))
 | 
						|
        self.assertAxs(n.axs.uread, ["k"])
 | 
						|
        self.assertAxs(n.axs.uwrite, ["*", "k"])
 | 
						|
 | 
						|
        # something funky about the windows path normalization,
 | 
						|
        # doesn't really matter but makes the test messy, TODO?
 | 
						|
 | 
						|
        fsdir, real, virt = self.ls(vfs, "/", "*")
 | 
						|
        self.assertEqual(fsdir, td)
 | 
						|
        self.assertEqual(real, ["b", "c"])
 | 
						|
        self.assertEqual(list(virt), ["a"])
 | 
						|
 | 
						|
        fsdir, real, virt = self.ls(vfs, "a", "*")
 | 
						|
        self.assertEqual(fsdir, os.path.join(td, "a"))
 | 
						|
        self.assertEqual(real, ["aa", "ab"])
 | 
						|
        self.assertEqual(list(virt), ["ac"])
 | 
						|
 | 
						|
        fsdir, real, virt = self.ls(vfs, "a/ab", "*")
 | 
						|
        self.assertEqual(fsdir, os.path.join(td, "a", "ab"))
 | 
						|
        self.assertEqual(real, ["aba", "abb", "abc"])
 | 
						|
        self.assertEqual(list(virt), [])
 | 
						|
 | 
						|
        fsdir, real, virt = self.ls(vfs, "a/ac", "*")
 | 
						|
        self.assertEqual(fsdir, os.path.join(td, "a", "ac"))
 | 
						|
        self.assertEqual(real, ["aca", "acc"])
 | 
						|
        self.assertEqual(list(virt), [])
 | 
						|
 | 
						|
        fsdir, real, virt = self.ls(vfs, "a/ac", "k")
 | 
						|
        self.assertEqual(fsdir, os.path.join(td, "a", "ac"))
 | 
						|
        self.assertEqual(real, ["aca", "acc"])
 | 
						|
        self.assertEqual(list(virt), ["acb"])
 | 
						|
 | 
						|
        self.assertRaises(util.Pebkac, vfs.get, "a/ac/acb", "*", True, False)
 | 
						|
 | 
						|
        fsdir, real, virt = self.ls(vfs, "a/ac/acb", "k")
 | 
						|
        self.assertEqual(fsdir, os.path.join(td, "a", "ac", "acb"))
 | 
						|
        self.assertEqual(real, ["acba", "acbb", "acbc"])
 | 
						|
        self.assertEqual(list(virt), [])
 | 
						|
 | 
						|
        # admin-only rootfs with all-read-only subfolder
 | 
						|
        vfs = AuthSrv(
 | 
						|
            Cfg(a=["k:k"], v=[".::rw,k", "a:a:r"]),
 | 
						|
            self.log,
 | 
						|
        ).vfs
 | 
						|
        self.assertEqual(len(vfs.nodes), 1)
 | 
						|
        self.assertEqual(vfs.vpath, "")
 | 
						|
        self.assertEqual(vfs.realpath, td)
 | 
						|
        self.assertAxs(vfs.axs.uread, ["k"])
 | 
						|
        self.assertAxs(vfs.axs.uwrite, ["k"])
 | 
						|
        n = vfs.nodes["a"]
 | 
						|
        self.assertEqual(len(vfs.nodes), 1)
 | 
						|
        self.assertEqual(n.vpath, "a")
 | 
						|
        self.assertEqual(n.realpath, os.path.join(td, "a"))
 | 
						|
        self.assertAxs(n.axs.uread, ["*"])
 | 
						|
        self.assertAxs(n.axs.uwrite, [])
 | 
						|
        self.assertEqual(vfs.can_access("/", "*"), [False, False, False, False])
 | 
						|
        self.assertEqual(vfs.can_access("/", "k"), [True, True, False, False])
 | 
						|
        self.assertEqual(vfs.can_access("/a", "*"), [True, False, False, False])
 | 
						|
        self.assertEqual(vfs.can_access("/a", "k"), [True, False, False, False])
 | 
						|
 | 
						|
        # breadth-first construction
 | 
						|
        vfs = AuthSrv(
 | 
						|
            Cfg(
 | 
						|
                v=[
 | 
						|
                    "a/ac/acb:a/ac/acb:w",
 | 
						|
                    "a:a:w",
 | 
						|
                    ".::r",
 | 
						|
                    "abacdfasdq:abacdfasdq:w",
 | 
						|
                    "a/ac:a/ac:w",
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
            self.log,
 | 
						|
        ).vfs
 | 
						|
 | 
						|
        # sanitizing relative paths
 | 
						|
        self.undot(vfs, "foo/bar/../baz/qux", "foo/baz/qux")
 | 
						|
        self.undot(vfs, "foo/../bar", "bar")
 | 
						|
        self.undot(vfs, "foo/../../bar", "bar")
 | 
						|
        self.undot(vfs, "foo/../../", "")
 | 
						|
        self.undot(vfs, "./.././foo/", "foo")
 | 
						|
        self.undot(vfs, "./.././foo/..", "")
 | 
						|
 | 
						|
        # shadowing
 | 
						|
        vfs = AuthSrv(Cfg(v=[".::r", "b:a/ac:r"]), self.log).vfs
 | 
						|
 | 
						|
        fsp, r1, v1 = self.ls(vfs, "", "*")
 | 
						|
        self.assertEqual(fsp, td)
 | 
						|
        self.assertEqual(r1, ["b", "c"])
 | 
						|
        self.assertEqual(list(v1), ["a"])
 | 
						|
 | 
						|
        fsp, r1, v1 = self.ls(vfs, "a", "*")
 | 
						|
        self.assertEqual(fsp, os.path.join(td, "a"))
 | 
						|
        self.assertEqual(r1, ["aa", "ab"])
 | 
						|
        self.assertEqual(list(v1), ["ac"])
 | 
						|
 | 
						|
        fsp1, r1, v1 = self.ls(vfs, "a/ac", "*")
 | 
						|
        fsp2, r2, v2 = self.ls(vfs, "b", "*")
 | 
						|
        self.assertEqual(fsp1, os.path.join(td, "b"))
 | 
						|
        self.assertEqual(fsp2, os.path.join(td, "b"))
 | 
						|
        self.assertEqual(r1, ["ba", "bb", "bc"])
 | 
						|
        self.assertEqual(r1, r2)
 | 
						|
        self.assertEqual(list(v1), list(v2))
 | 
						|
 | 
						|
        # config file parser
 | 
						|
        cfg_path = os.path.join(self.td, "test.cfg")
 | 
						|
        with open(cfg_path, "wb") as f:
 | 
						|
            f.write(
 | 
						|
                dedent(
 | 
						|
                    """
 | 
						|
                    u a:123
 | 
						|
                    u asd:fgh:jkl
 | 
						|
                    
 | 
						|
                    ./src
 | 
						|
                    /dst
 | 
						|
                    r a
 | 
						|
                    rw asd
 | 
						|
                    """
 | 
						|
                ).encode("utf-8")
 | 
						|
            )
 | 
						|
 | 
						|
        au = AuthSrv(Cfg(c=[cfg_path]), self.log)
 | 
						|
        self.assertEqual(au.acct["a"], "123")
 | 
						|
        self.assertEqual(au.acct["asd"], "fgh:jkl")
 | 
						|
        n = au.vfs
 | 
						|
        # root was not defined, so PWD with no access to anyone
 | 
						|
        self.assertEqual(n.vpath, "")
 | 
						|
        self.assertEqual(n.realpath, None)
 | 
						|
        self.assertAxs(n.axs.uread, [])
 | 
						|
        self.assertAxs(n.axs.uwrite, [])
 | 
						|
        self.assertEqual(len(n.nodes), 1)
 | 
						|
        n = n.nodes["dst"]
 | 
						|
        self.assertEqual(n.vpath, "dst")
 | 
						|
        self.assertEqual(n.realpath, os.path.join(td, "src"))
 | 
						|
        self.assertAxs(n.axs.uread, ["a", "asd"])
 | 
						|
        self.assertAxs(n.axs.uwrite, ["asd"])
 | 
						|
        self.assertEqual(len(n.nodes), 0)
 | 
						|
 | 
						|
        os.unlink(cfg_path)
 |