mirror of
				https://github.com/9001/copyparty.git
				synced 2025-11-04 05:43:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			125 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			125 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# coding: utf-8
 | 
						|
from __future__ import print_function, unicode_literals
 | 
						|
 | 
						|
import os
 | 
						|
import shutil
 | 
						|
import tempfile
 | 
						|
import unittest
 | 
						|
 | 
						|
from copyparty.authsrv import AuthSrv
 | 
						|
from copyparty.httpcli import HttpCli
 | 
						|
from tests import util as tu
 | 
						|
from tests.util import Cfg
 | 
						|
 | 
						|
try:
 | 
						|
    from typing import Optional
 | 
						|
except:
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
def hdr(query):
 | 
						|
    h = "GET /{} HTTP/1.1\r\nPW: o\r\nConnection: close\r\n\r\n"
 | 
						|
    return h.format(query).encode("utf-8")
 | 
						|
 | 
						|
 | 
						|
class TestHooks(tu.TC):
 | 
						|
    def setUp(self):
 | 
						|
        self.conn: Optional[tu.VHttpConn] = None
 | 
						|
        self.td = tu.get_ramdisk()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        os.chdir(tempfile.gettempdir())
 | 
						|
        shutil.rmtree(self.td)
 | 
						|
 | 
						|
    def reset(self):
 | 
						|
        td = os.path.join(self.td, "vfs")
 | 
						|
        if os.path.exists(td):
 | 
						|
            shutil.rmtree(td)
 | 
						|
        os.mkdir(td)
 | 
						|
        os.chdir(td)
 | 
						|
        return td
 | 
						|
 | 
						|
    def cinit(self):
 | 
						|
        if self.conn:
 | 
						|
            self.conn.shutdown()
 | 
						|
            self.conn = None
 | 
						|
        self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
 | 
						|
 | 
						|
    def test(self):
 | 
						|
        vcfg = ["a/b/c/d:c/d:A", "a:a:r"]
 | 
						|
 | 
						|
        scenarios = (
 | 
						|
            ('{"vp":"x/y"}', "c/d/a.png", "c/d/x/y/a.png"),
 | 
						|
            ('{"vp":"x/y"}', "c/d/e/a.png", "c/d/e/x/y/a.png"),
 | 
						|
            ('{"vp":"../x/y"}', "c/d/e/a.png", "c/d/x/y/a.png"),
 | 
						|
            ('{"ap":"x/y"}', "c/d/a.png", "c/d/x/y/a.png"),
 | 
						|
            ('{"ap":"x/y"}', "c/d/e/a.png", "c/d/e/x/y/a.png"),
 | 
						|
            ('{"ap":"../x/y"}', "c/d/e/a.png", "c/d/x/y/a.png"),
 | 
						|
            ('{"ap":"../x/y"}', "c/d/a.png", "a/b/c/x/y/a.png"),
 | 
						|
            ('{"fn":"b.png"}', "c/d/a.png", "c/d/b.png"),
 | 
						|
            ('{"vp":"x","fn":"b.png"}', "c/d/a.png", "c/d/x/b.png"),
 | 
						|
        )
 | 
						|
 | 
						|
        for x in scenarios:
 | 
						|
            print("\n\n\n", x)
 | 
						|
            hooktxt, url_up, url_dl = x
 | 
						|
            for hooktype in ("xbu", "xau"):
 | 
						|
                for upfun in (self.put, self.bup):
 | 
						|
                    self.reset()
 | 
						|
                    self.makehook("""print('{"reloc":%s}')""" % (hooktxt,))
 | 
						|
                    ka = {hooktype: ["j,c1,h.py"]}
 | 
						|
                    self.args = Cfg(v=vcfg, a=["o:o"], e2d=True, **ka)
 | 
						|
                    self.asrv = AuthSrv(self.args, self.log)
 | 
						|
                    self.cinit()
 | 
						|
 | 
						|
                    h, b = upfun(url_up)
 | 
						|
                    self.assertStart("HTTP/1.1 201 Created\r", h)
 | 
						|
                    h, b = self.curl(url_dl)
 | 
						|
                    self.assertEqual(b, "ok %s\n" % (url_up))
 | 
						|
 | 
						|
    def makehook(self, hs):
 | 
						|
        with open("h.py", "wb") as f:
 | 
						|
            f.write(hs.encode("utf-8"))
 | 
						|
 | 
						|
    def put(self, url):
 | 
						|
        buf = "PUT /{0} HTTP/1.1\r\nPW: o\r\nConnection: close\r\nContent-Length: {1}\r\n\r\nok {0}\n"
 | 
						|
        buf = buf.format(url, len(url) + 4).encode("utf-8")
 | 
						|
        print("PUT -->", buf)
 | 
						|
        conn = self.conn.setbuf(buf)
 | 
						|
        HttpCli(conn).run()
 | 
						|
        ret = conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
 | 
						|
        print("PUT <--", ret)
 | 
						|
        return ret
 | 
						|
 | 
						|
    def bup(self, url):
 | 
						|
        hdr = "POST /%s HTTP/1.1\r\nPW: o\r\nConnection: close\r\nContent-Type: multipart/form-data; boundary=XD\r\nContent-Length: %d\r\n\r\n"
 | 
						|
        bdy = '--XD\r\nContent-Disposition: form-data; name="act"\r\n\r\nbput\r\n--XD\r\nContent-Disposition: form-data; name="f"; filename="%s"\r\n\r\n'
 | 
						|
        ftr = "\r\n--XD--\r\n"
 | 
						|
        try:
 | 
						|
            url, fn = url.rsplit("/", 1)
 | 
						|
        except:
 | 
						|
            fn = url
 | 
						|
            url = ""
 | 
						|
 | 
						|
        buf = (bdy % (fn,) + "ok %s/%s\n" % (url, fn) + ftr).encode("utf-8")
 | 
						|
        buf = (hdr % (url, len(buf))).encode("utf-8") + buf
 | 
						|
        print("PoST -->", buf)
 | 
						|
        conn = self.conn.setbuf(buf)
 | 
						|
        HttpCli(conn).run()
 | 
						|
        ret = conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
 | 
						|
        print("POST <--", ret)
 | 
						|
        return ret
 | 
						|
 | 
						|
    def curl(self, url, binary=False):
 | 
						|
        conn = self.conn.setbuf(hdr(url))
 | 
						|
        HttpCli(conn).run()
 | 
						|
        if binary:
 | 
						|
            h, b = conn.s._reply.split(b"\r\n\r\n", 1)
 | 
						|
            return [h.decode("utf-8"), b]
 | 
						|
 | 
						|
        return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
 | 
						|
 | 
						|
    def log(self, src, msg, c=0):
 | 
						|
        print(msg)
 |