Compare commits

..

21 Commits

Author SHA1 Message Date
ed
46e70d50b7 v0.7.0 2021-01-10 17:49:56 +01:00
ed
d64e9b85a7 prefer sqlite over registry snaps 2021-01-10 17:47:27 +01:00
ed
fb853edbe3 prevent index loss on mid-write crash 2021-01-10 17:16:55 +01:00
ed
cc076c1be1 persist/timeout incomplete uploads too 2021-01-10 16:47:35 +01:00
ed
98cc9a6755 mojibake support + exception handling 2021-01-10 09:48:26 +01:00
ed
7bd2b9c23a sqlite3 as up2k db + build index on boot + rproxy ip fix 2021-01-10 09:27:11 +01:00
ed
de724a1ff3 up2k: add volume flag to reject existing files 2021-01-09 15:20:02 +01:00
ed
2163055dae media-player: play links don't scroll on click 2021-01-09 14:40:56 +01:00
ed
93ed0fc10b v0.6.3 2021-01-07 01:09:32 +01:00
ed
0d98cefd40 fix dumb 2021-01-07 01:06:31 +01:00
ed
d58988a033 use sendfile when possible 2021-01-07 00:50:42 +01:00
ed
2acfab1e3f cleanup 2021-01-06 22:54:54 +01:00
ed
b915dfe9a6 nagle adds ~.2sec delay on last packet 2021-01-06 21:08:52 +00:00
ed
25bd5a823e fuse-client: add timestamps to logger 2021-01-06 17:40:42 +01:00
ed
1c35de4716 fuse-client: cache tweaks 2021-01-06 17:22:07 +01:00
ed
4c00435a0a fuse: add windows-explorer settings 2021-01-06 17:18:37 +01:00
ed
844e3079a8 saved for posterity 2021-01-06 17:13:24 +01:00
ed
4778cb5b2c readme: add quickstart 2021-01-02 22:57:48 +01:00
ed
ec5d60b919 fuse-client: fix directory parser 2021-01-01 21:54:56 +01:00
ed
e1f4b960e8 oh no 2020-12-20 02:33:37 +01:00
ed
669e46da54 update TODOs 2020-12-14 09:19:43 +01:00
22 changed files with 1756 additions and 113 deletions

2
.gitattributes vendored
View File

@@ -1,4 +1,6 @@
* text eol=lf * text eol=lf
*.reg text eol=crlf
*.png binary *.png binary
*.gif binary *.gif binary

4
.vscode/launch.json vendored
View File

@@ -12,10 +12,12 @@
//"-nw", //"-nw",
"-ed", "-ed",
"-emp", "-emp",
"-e2d",
"-e2s",
"-a", "-a",
"ed:wark", "ed:wark",
"-v", "-v",
"srv::r:aed" "srv::r:aed:cnodupe"
] ]
}, },
{ {

View File

@@ -13,6 +13,17 @@ turn your phone or raspi into a portable file server with resumable uploads/down
* code standard: `black` * code standard: `black`
## quickstart
download [copyparty-sfx.py](https://github.com/9001/copyparty/releases/latest/download/copyparty-sfx.py) and you're all set!
running the sfx without arguments (for example doubleclicking it on Windows) will let anyone access the current folder; see `-h` for help if you want accounts and volumes etc
you may also want these, especially on servers:
* [contrib/systemd/copyparty.service](contrib/systemd/copyparty.service) to run copyparty as a systemd service
* [contrib/nginx/copyparty.conf](contrib/nginx/copyparty.conf) to reverse-proxy behind nginx (for legit https)
## notes ## notes
* iPhone/iPad: use Firefox to download files * iPhone/iPad: use Firefox to download files
@@ -126,13 +137,14 @@ in the `scripts` folder:
roughly sorted by priority roughly sorted by priority
* up2k handle filename too long * reduce up2k roundtrips
* up2k fails on empty files? alert then stuck * start from a chunk index and just go
* terminate client on bad data
* drop onto folders * drop onto folders
* look into android thumbnail cache file format * `os.copy_file_range` for up2k cloning
* support pillow-simd * support pillow-simd
* cache sha512 chunks on client * cache sha512 chunks on client
* symlink existing files on upload
* comment field * comment field
* ~~look into android thumbnail cache file format~~ bad idea
* figure out the deal with pixel3a not being connectable as hotspot * figure out the deal with pixel3a not being connectable as hotspot
* pixel3a having unpredictable 3sec latency in general :|||| * pixel3a having unpredictable 3sec latency in general :||||

View File

@@ -34,3 +34,8 @@ you could replace winfsp with [dokan](https://github.com/dokan-dev/dokany/releas
* does the same thing except more correct, `samba` approves * does the same thing except more correct, `samba` approves
* **supports Linux** -- expect `18 MiB/s` (wait what) * **supports Linux** -- expect `18 MiB/s` (wait what)
* **supports Macos** -- probably * **supports Macos** -- probably
# copyparty-fuse-streaming.py
* pretend this doesn't exist

1100
bin/copyparty-fuse-streaming.py Executable file

File diff suppressed because it is too large Load Diff

View File

@@ -12,7 +12,7 @@ __url__ = "https://github.com/9001/copyparty/"
mount a copyparty server (local or remote) as a filesystem mount a copyparty server (local or remote) as a filesystem
usage: usage:
python copyparty-fuse.py ./music http://192.168.1.69:3923/ python copyparty-fuse.py http://192.168.1.69:3923/ ./music
dependencies: dependencies:
python3 -m pip install --user fusepy python3 -m pip install --user fusepy
@@ -20,6 +20,10 @@ dependencies:
+ on Macos: https://osxfuse.github.io/ + on Macos: https://osxfuse.github.io/
+ on Windows: https://github.com/billziss-gh/winfsp/releases/latest + on Windows: https://github.com/billziss-gh/winfsp/releases/latest
note:
you probably want to run this on windows clients:
https://github.com/9001/copyparty/blob/master/contrib/explorer-nothumbs-nofoldertypes.reg
get server cert: get server cert:
awk '/-BEGIN CERTIFICATE-/ {a=1} a; /-END CERTIFICATE-/{exit}' <(openssl s_client -connect 127.0.0.1:3923 </dev/null 2>/dev/null) >cert.pem awk '/-BEGIN CERTIFICATE-/ {a=1} a; /-END CERTIFICATE-/{exit}' <(openssl s_client -connect 127.0.0.1:3923 </dev/null 2>/dev/null) >cert.pem
""" """
@@ -100,7 +104,7 @@ def rice_tid():
def fancy_log(msg): def fancy_log(msg):
print("{} {}\n".format(rice_tid(), msg), end="") print("{:10.6f} {} {}\n".format(time.time() % 900, rice_tid(), msg), end="")
def null_log(msg): def null_log(msg):
@@ -159,7 +163,7 @@ class RecentLog(object):
thr.start() thr.start()
def put(self, msg): def put(self, msg):
msg = "{} {}\n".format(rice_tid(), msg) msg = "{:10.6f} {} {}\n".format(time.time() % 900, rice_tid(), msg)
if self.f: if self.f:
fmsg = " ".join([datetime.utcnow().strftime("%H%M%S.%f"), str(msg)]) fmsg = " ".join([datetime.utcnow().strftime("%H%M%S.%f"), str(msg)])
self.f.write(fmsg.encode("utf-8")) self.f.write(fmsg.encode("utf-8"))
@@ -367,7 +371,7 @@ class Gateway(object):
ret = [] ret = []
remainder = b"" remainder = b""
ptn = re.compile( ptn = re.compile(
r'^<tr><td>(-|DIR)</td><td><a[^>]* href="([^"]+)"[^>]*>([^<]+)</a></td><td>([^<]+)</td><td>([^<]+)</td></tr>$' r'^<tr><td>(-|DIR|<a [^<]+</a>)</td><td><a[^>]* href="([^"]+)"[^>]*>([^<]+)</a></td><td>([^<]+)</td><td>[^<]+</td><td>([^<]+)</td></tr>$'
) )
while True: while True:
@@ -405,7 +409,7 @@ class Gateway(object):
info("bad HTML or OS [{}] [{}]".format(fdate, fsize)) info("bad HTML or OS [{}] [{}]".format(fdate, fsize))
# python cannot strptime(1959-01-01) on windows # python cannot strptime(1959-01-01) on windows
if ftype == "-": if ftype != "DIR":
ret.append([fname, self.stat_file(ts, sz), 0]) ret.append([fname, self.stat_file(ts, sz), 0])
else: else:
ret.append([fname, self.stat_dir(ts, sz), 0]) ret.append([fname, self.stat_dir(ts, sz), 0])
@@ -658,8 +662,18 @@ class CPPF(Operations):
else: else:
if get2 - get1 <= 1024 * 1024: if get2 - get1 <= 1024 * 1024:
h_ofs = get1 - 256 * 1024 # unless the request is for the last n bytes of the file,
# grow the start to cache some stuff around the range
if get2 < file_sz - 1:
h_ofs = get1 - 1024 * 256
else:
h_ofs = get1 - 1024 * 32
# likewise grow the end unless start is 0
if get1 > 0:
h_end = get2 + 1024 * 1024 h_end = get2 + 1024 * 1024
else:
h_end = get2 + 1024 * 64
else: else:
# big enough, doesn't need pads # big enough, doesn't need pads
h_ofs = get1 h_ofs = get1
@@ -705,6 +719,7 @@ class CPPF(Operations):
self.dircache.append(cn) self.dircache.append(cn)
self.clean_dircache() self.clean_dircache()
# import pprint; pprint.pprint(ret)
return ret return ret
def readdir(self, path, fh=None): def readdir(self, path, fh=None):
@@ -802,7 +817,11 @@ class CPPF(Operations):
# dbg("=" + repr(cache_stat)) # dbg("=" + repr(cache_stat))
return cache_stat return cache_stat
info("=ENOENT ({})".format(hexler(path))) fun = info
if MACOS and path.split('/')[-1].startswith('._'):
fun = dbg
fun("=ENOENT ({})".format(hexler(path)))
raise FuseOSError(errno.ENOENT) raise FuseOSError(errno.ENOENT)
access = None access = None

View File

@@ -9,6 +9,9 @@
* assumes the webserver and copyparty is running on the same server/IP * assumes the webserver and copyparty is running on the same server/IP
* modify `10.13.1.1` as necessary if you wish to support browsers without javascript * modify `10.13.1.1` as necessary if you wish to support browsers without javascript
### [`explorer-nothumbs-nofoldertypes.reg`](explorer-nothumbs-nofoldertypes.reg)
disables thumbnails and folder-type detection in windows explorer, makes it way faster (especially for slow/networked locations (such as copyparty-fuse))
# OS integration # OS integration
init-scripts to start copyparty as a service init-scripts to start copyparty as a service
* [`systemd/copyparty.service`](systemd/copyparty.service) * [`systemd/copyparty.service`](systemd/copyparty.service)

View File

@@ -0,0 +1,31 @@
Windows Registry Editor Version 5.00
; this will do 3 things, all optional:
; 1) disable thumbnails
; 2) delete all existing folder type settings/detections
; 3) disable folder type detection (force default columns)
;
; this makes the file explorer way faster,
; especially on slow/networked locations
; =====================================================================
; 1) disable thumbnails
[HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Explorer\Advanced]
"IconsOnly"=dword:00000001
; =====================================================================
; 2) delete all existing folder type settings/detections
[-HKEY_CURRENT_USER\Software\Classes\Local Settings\Software\Microsoft\Windows\Shell\Bags]
[-HKEY_CURRENT_USER\Software\Classes\Local Settings\Software\Microsoft\Windows\Shell\BagMRU]
; =====================================================================
; 3) disable folder type detection
[HKEY_CURRENT_USER\Software\Classes\Local Settings\Software\Microsoft\Windows\Shell\Bags\AllFolders\Shell]
"FolderType"="NotSpecified"

View File

@@ -105,17 +105,22 @@ def main():
epilog=dedent( epilog=dedent(
""" """
-a takes username:password, -a takes username:password,
-v takes src:dst:permset:permset:... where "permset" is -v takes src:dst:permset:permset:cflag:cflag:...
accesslevel followed by username (no separator) where "permset" is accesslevel followed by username (no separator)
and "cflag" is config flags to set on this volume
list of cflags:
cnodupe rejects existing files (instead of symlinking them)
example:\033[35m example:\033[35m
-a ed:hunter2 -v .::r:aed -v ../inc:dump:w:aed \033[36m -a ed:hunter2 -v .::r:aed -v ../inc:dump:w:aed:cnodupe \033[36m
mount current directory at "/" with mount current directory at "/" with
* r (read-only) for everyone * r (read-only) for everyone
* a (read+write) for ed * a (read+write) for ed
mount ../inc at "/dump" with mount ../inc at "/dump" with
* w (write-only) for everyone * w (write-only) for everyone
* a (read+write) for ed \033[0m * a (read+write) for ed
* reject duplicate files \033[0m
if no accounts or volumes are configured, if no accounts or volumes are configured,
current folder will be read/write for everyone current folder will be read/write for everyone
@@ -125,6 +130,7 @@ def main():
""" """
), ),
) )
# fmt: off
ap.add_argument("-c", metavar="PATH", type=str, action="append", help="add config file") ap.add_argument("-c", metavar="PATH", type=str, action="append", help="add config file")
ap.add_argument("-i", metavar="IP", type=str, default="0.0.0.0", help="ip to bind") ap.add_argument("-i", metavar="IP", type=str, default="0.0.0.0", help="ip to bind")
ap.add_argument("-p", metavar="PORT", type=int, default=3923, help="port to bind") ap.add_argument("-p", metavar="PORT", type=int, default=3923, help="port to bind")
@@ -135,11 +141,15 @@ def main():
ap.add_argument("-q", action="store_true", help="quiet") ap.add_argument("-q", action="store_true", help="quiet")
ap.add_argument("-ed", action="store_true", help="enable ?dots") ap.add_argument("-ed", action="store_true", help="enable ?dots")
ap.add_argument("-emp", action="store_true", help="enable markdown plugins") ap.add_argument("-emp", action="store_true", help="enable markdown plugins")
ap.add_argument("-e2d", action="store_true", help="enable up2k database")
ap.add_argument("-e2s", action="store_true", help="enable up2k db-scanner")
ap.add_argument("-mcr", metavar="SEC", type=int, default=60, help="md-editor mod-chk rate") ap.add_argument("-mcr", metavar="SEC", type=int, default=60, help="md-editor mod-chk rate")
ap.add_argument("-nw", action="store_true", help="disable writes (benchmark)") ap.add_argument("-nw", action="store_true", help="disable writes (benchmark)")
ap.add_argument("-nih", action="store_true", help="no info hostname") ap.add_argument("-nih", action="store_true", help="no info hostname")
ap.add_argument("-nid", action="store_true", help="no info disk-usage") ap.add_argument("-nid", action="store_true", help="no info disk-usage")
ap.add_argument("--no-sendfile", action="store_true", help="disable sendfile")
al = ap.parse_args() al = ap.parse_args()
# fmt: on
SvcHub(al).run() SvcHub(al).run()

View File

@@ -1,8 +1,8 @@
# coding: utf-8 # coding: utf-8
VERSION = (0, 6, 2) VERSION = (0, 7, 0)
CODENAME = "CHRISTMAAAAAS" CODENAME = "keeping track"
BUILD_DT = (2020, 12, 14) BUILD_DT = (2021, 1, 10)
S_VERSION = ".".join(map(str, VERSION)) S_VERSION = ".".join(map(str, VERSION))
S_BUILD_DT = "{0:04d}-{1:02d}-{2:02d}".format(*BUILD_DT) S_BUILD_DT = "{0:04d}-{1:02d}-{2:02d}".format(*BUILD_DT)

View File

@@ -12,11 +12,12 @@ from .util import undot, Pebkac, fsdec, fsenc
class VFS(object): class VFS(object):
"""single level in the virtual fs""" """single level in the virtual fs"""
def __init__(self, realpath, vpath, uread=[], uwrite=[]): def __init__(self, realpath, vpath, uread=[], uwrite=[], flags={}):
self.realpath = realpath # absolute path on host filesystem self.realpath = realpath # absolute path on host filesystem
self.vpath = vpath # absolute path in the virtual filesystem self.vpath = vpath # absolute path in the virtual filesystem
self.uread = uread # users who can read this self.uread = uread # users who can read this
self.uwrite = uwrite # users who can write this self.uwrite = uwrite # users who can write this
self.flags = flags # config switches
self.nodes = {} # child nodes self.nodes = {} # child nodes
def add(self, src, dst): def add(self, src, dst):
@@ -36,6 +37,7 @@ class VFS(object):
"{}/{}".format(self.vpath, name).lstrip("/"), "{}/{}".format(self.vpath, name).lstrip("/"),
self.uread, self.uread,
self.uwrite, self.uwrite,
self.flags,
) )
self.nodes[name] = vn self.nodes[name] = vn
return vn.add(src, dst) return vn.add(src, dst)
@@ -161,7 +163,7 @@ class AuthSrv(object):
yield prev, True yield prev, True
def _parse_config_file(self, fd, user, mread, mwrite, mount): def _parse_config_file(self, fd, user, mread, mwrite, mflags, mount):
vol_src = None vol_src = None
vol_dst = None vol_dst = None
for ln in [x.decode("utf-8").strip() for x in fd]: for ln in [x.decode("utf-8").strip() for x in fd]:
@@ -191,6 +193,7 @@ class AuthSrv(object):
mount[vol_dst] = vol_src mount[vol_dst] = vol_src
mread[vol_dst] = [] mread[vol_dst] = []
mwrite[vol_dst] = [] mwrite[vol_dst] = []
mflags[vol_dst] = {}
continue continue
lvl, uname = ln.split(" ") lvl, uname = ln.split(" ")
@@ -198,6 +201,9 @@ class AuthSrv(object):
mread[vol_dst].append(uname) mread[vol_dst].append(uname)
if lvl in "wa": if lvl in "wa":
mwrite[vol_dst].append(uname) mwrite[vol_dst].append(uname)
if lvl == "c":
# config option, currently switches only
mflags[vol_dst][uname] = True
def reload(self): def reload(self):
""" """
@@ -210,6 +216,7 @@ class AuthSrv(object):
user = {} # username:password user = {} # username:password
mread = {} # mountpoint:[username] mread = {} # mountpoint:[username]
mwrite = {} # mountpoint:[username] mwrite = {} # mountpoint:[username]
mflags = {} # mountpoint:[flag]
mount = {} # dst:src (mountpoint:realpath) mount = {} # dst:src (mountpoint:realpath)
if self.args.a: if self.args.a:
@@ -232,9 +239,13 @@ class AuthSrv(object):
mount[dst] = src mount[dst] = src
mread[dst] = [] mread[dst] = []
mwrite[dst] = [] mwrite[dst] = []
mflags[dst] = {}
perms = perms.split(":") perms = perms.split(":")
for (lvl, uname) in [[x[0], x[1:]] for x in perms]: for (lvl, uname) in [[x[0], x[1:]] for x in perms]:
if lvl == "c":
# config option, currently switches only
mflags[dst][uname] = True
if uname == "": if uname == "":
uname = "*" uname = "*"
if lvl in "ra": if lvl in "ra":
@@ -245,14 +256,15 @@ class AuthSrv(object):
if self.args.c: if self.args.c:
for cfg_fn in self.args.c: for cfg_fn in self.args.c:
with open(cfg_fn, "rb") as f: with open(cfg_fn, "rb") as f:
self._parse_config_file(f, user, mread, mwrite, mount) self._parse_config_file(f, user, mread, mwrite, mflags, mount)
self.all_writable = []
if not mount: if not mount:
# -h says our defaults are CWD at root and read/write for everyone # -h says our defaults are CWD at root and read/write for everyone
vfs = VFS(os.path.abspath("."), "", ["*"], ["*"]) vfs = VFS(os.path.abspath("."), "", ["*"], ["*"])
elif "" not in mount: elif "" not in mount:
# there's volumes but no root; make root inaccessible # there's volumes but no root; make root inaccessible
vfs = VFS(os.path.abspath("."), "", [], []) vfs = VFS(os.path.abspath("."), "")
maxdepth = 0 maxdepth = 0
for dst in sorted(mount.keys(), key=lambda x: (x.count("/"), len(x))): for dst in sorted(mount.keys(), key=lambda x: (x.count("/"), len(x))):
@@ -262,12 +274,18 @@ class AuthSrv(object):
if dst == "": if dst == "":
# rootfs was mapped; fully replaces the default CWD vfs # rootfs was mapped; fully replaces the default CWD vfs
vfs = VFS(mount[dst], dst, mread[dst], mwrite[dst]) vfs = VFS(mount[dst], dst, mread[dst], mwrite[dst], mflags[dst])
continue continue
v = vfs.add(mount[dst], dst) v = vfs.add(mount[dst], dst)
v.uread = mread[dst] v.uread = mread[dst]
v.uwrite = mwrite[dst] v.uwrite = mwrite[dst]
v.flags = mflags[dst]
if v.uwrite:
self.all_writable.append(v)
if vfs.uwrite and vfs not in self.all_writable:
self.all_writable.append(vfs)
missing_users = {} missing_users = {}
for d in [mread, mwrite]: for d in [mread, mwrite]:

View File

@@ -28,6 +28,7 @@ class HttpCli(object):
self.conn = conn self.conn = conn
self.s = conn.s self.s = conn.s
self.sr = conn.sr self.sr = conn.sr
self.ip = conn.addr[0]
self.addr = conn.addr self.addr = conn.addr
self.args = conn.args self.args = conn.args
self.auth = conn.auth self.auth = conn.auth
@@ -42,7 +43,7 @@ class HttpCli(object):
self.log_func(self.log_src, msg) self.log_func(self.log_src, msg)
def _check_nonfatal(self, ex): def _check_nonfatal(self, ex):
return ex.code in [404] return ex.code < 400 or ex.code == 404
def _assert_safe_rem(self, rem): def _assert_safe_rem(self, rem):
# sanity check to prevent any disasters # sanity check to prevent any disasters
@@ -85,7 +86,8 @@ class HttpCli(object):
v = self.headers.get("x-forwarded-for", None) v = self.headers.get("x-forwarded-for", None)
if v is not None and self.conn.addr[0] in ["127.0.0.1", "::1"]: if v is not None and self.conn.addr[0] in ["127.0.0.1", "::1"]:
self.log_src = self.conn.set_rproxy(v.split(",")[0]) self.ip = v.split(",")[0]
self.log_src = self.conn.set_rproxy(self.ip)
self.uname = "*" self.uname = "*"
if "cookie" in self.headers: if "cookie" in self.headers:
@@ -305,7 +307,7 @@ class HttpCli(object):
vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True) vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True)
fdir = os.path.join(vfs.realpath, rem) fdir = os.path.join(vfs.realpath, rem)
addr = self.conn.addr[0].replace(":", ".") addr = self.ip.replace(":", ".")
fn = "put-{:.6f}-{}.bin".format(time.time(), addr) fn = "put-{:.6f}-{}.bin".format(time.time(), addr)
path = os.path.join(fdir, fn) path = os.path.join(fdir, fn)
@@ -384,9 +386,11 @@ class HttpCli(object):
vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True) vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True)
body["vdir"] = self.vpath body["vtop"] = vfs.vpath
body["rdir"] = os.path.join(vfs.realpath, rem) body["ptop"] = vfs.realpath
body["addr"] = self.addr[0] body["prel"] = rem
body["addr"] = self.ip
body["flag"] = vfs.flags
x = self.conn.hsrv.broker.put(True, "up2k.handle_json", body) x = self.conn.hsrv.broker.put(True, "up2k.handle_json", body)
response = x.get() response = x.get()
@@ -408,7 +412,10 @@ class HttpCli(object):
except KeyError: except KeyError:
raise Pebkac(400, "need hash and wark headers for binary POST") raise Pebkac(400, "need hash and wark headers for binary POST")
x = self.conn.hsrv.broker.put(True, "up2k.handle_chunk", wark, chash) vfs, _ = self.conn.auth.vfs.get(self.vpath, self.uname, False, True)
ptop = vfs.realpath
x = self.conn.hsrv.broker.put(True, "up2k.handle_chunk", ptop, wark, chash)
response = x.get() response = x.get()
chunksize, cstart, path, lastmod = response chunksize, cstart, path, lastmod = response
@@ -453,8 +460,8 @@ class HttpCli(object):
self.log("clone {} done".format(cstart[0])) self.log("clone {} done".format(cstart[0]))
x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", wark, chash) x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", ptop, wark, chash)
num_left = x.get() num_left, path = x.get()
if not WINDOWS and num_left == 0: if not WINDOWS and num_left == 0:
times = (int(time.time()), int(lastmod)) times = (int(time.time()), int(lastmod))
@@ -575,7 +582,7 @@ class HttpCli(object):
if not os.path.isdir(fsenc(fdir)): if not os.path.isdir(fsenc(fdir)):
raise Pebkac(404, "that folder does not exist") raise Pebkac(404, "that folder does not exist")
suffix = ".{:.6f}-{}".format(time.time(), self.addr[0]) suffix = ".{:.6f}-{}".format(time.time(), self.ip)
open_args = {"fdir": fdir, "suffix": suffix} open_args = {"fdir": fdir, "suffix": suffix}
else: else:
open_args = {} open_args = {}
@@ -637,7 +644,7 @@ class HttpCli(object):
"\n".join( "\n".join(
unicode(x) unicode(x)
for x in [ for x in [
":".join(unicode(x) for x in self.addr), ":".join(unicode(x) for x in [self.ip, self.addr[1]]),
msg.rstrip(), msg.rstrip(),
] ]
) )
@@ -686,7 +693,7 @@ class HttpCli(object):
return True return True
fp = os.path.join(vfs.realpath, rem) fp = os.path.join(vfs.realpath, rem)
srv_lastmod = -1 srv_lastmod = srv_lastmod3 = -1
try: try:
st = os.stat(fsenc(fp)) st = os.stat(fsenc(fp))
srv_lastmod = st.st_mtime srv_lastmod = st.st_mtime
@@ -884,6 +891,7 @@ class HttpCli(object):
logtail += " [\033[36m{}-{}\033[0m]".format(lower, upper) logtail += " [\033[36m{}-{}\033[0m]".format(lower, upper)
use_sendfile = False
if decompress: if decompress:
open_func = gzip.open open_func = gzip.open
open_args = [fsenc(fs_path), "rb"] open_args = [fsenc(fs_path), "rb"]
@@ -893,6 +901,8 @@ class HttpCli(object):
open_func = open open_func = open
# 512 kB is optimal for huge files, use 64k # 512 kB is optimal for huge files, use 64k
open_args = [fsenc(fs_path), "rb", 64 * 1024] open_args = [fsenc(fs_path), "rb", 64 * 1024]
if hasattr(os, "sendfile"):
use_sendfile = not self.args.no_sendfile
# #
# send reply # send reply
@@ -915,24 +925,13 @@ class HttpCli(object):
ret = True ret = True
with open_func(*open_args) as f: with open_func(*open_args) as f:
remains = upper - lower if use_sendfile:
f.seek(lower) remains = sendfile_kern(lower, upper, f, self.s)
while remains > 0: else:
# time.sleep(0.01) remains = sendfile_py(lower, upper, f, self.s)
buf = f.read(4096)
if not buf:
break
if remains < len(buf): if remains > 0:
buf = buf[:remains]
try:
self.s.sendall(buf)
remains -= len(buf)
except:
logmsg += " \033[31m" + str(upper - remains) + "\033[0m" logmsg += " \033[31m" + str(upper - remains) + "\033[0m"
ret = False
break
spd = self._spd((upper - lower) - remains) spd = self._spd((upper - lower) - remains)
self.log("{}, {}".format(logmsg, spd)) self.log("{}, {}".format(logmsg, spd))
@@ -1028,6 +1027,10 @@ class HttpCli(object):
if abspath.endswith(".md") and "raw" not in self.uparam: if abspath.endswith(".md") and "raw" not in self.uparam:
return self.tx_md(abspath) return self.tx_md(abspath)
bad = "{0}.hist{0}up2k.".format(os.sep)
if abspath.endswith(bad + "db") or abspath.endswith(bad + "snap"):
raise Pebkac(403)
return self.tx_file(abspath) return self.tx_file(abspath)
fsroot, vfs_ls, vfs_virt = vn.ls(rem, self.uname) fsroot, vfs_ls, vfs_virt = vn.ls(rem, self.uname)

View File

@@ -65,6 +65,7 @@ class HttpConn(object):
color = 34 color = 34
self.rproxy = ip self.rproxy = ip
self.ip = ip
self.log_src = "{} \033[{}m{}".format(ip, color, self.addr[1]).ljust(26) self.log_src = "{} \033[{}m{}".format(ip, color, self.addr[1]).ljust(26)
return self.log_src return self.log_src

View File

@@ -9,6 +9,7 @@ from datetime import datetime, timedelta
import calendar import calendar
from .__init__ import PY2, WINDOWS, MACOS, VT100 from .__init__ import PY2, WINDOWS, MACOS, VT100
from .authsrv import AuthSrv
from .tcpsrv import TcpSrv from .tcpsrv import TcpSrv
from .up2k import Up2k from .up2k import Up2k
from .util import mp from .util import mp
@@ -38,6 +39,10 @@ class SvcHub(object):
self.tcpsrv = TcpSrv(self) self.tcpsrv = TcpSrv(self)
self.up2k = Up2k(self) self.up2k = Up2k(self)
if self.args.e2d and self.args.e2s:
auth = AuthSrv(self.args, self.log)
self.up2k.build_indexes(auth.all_writable)
# decide which worker impl to use # decide which worker impl to use
if self.check_mp_enable(): if self.check_mp_enable():
from .broker_mp import BrokerMp as Broker from .broker_mp import BrokerMp as Broker

View File

@@ -36,6 +36,7 @@ class TcpSrv(object):
self.srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.srv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try: try:
self.srv.bind((self.args.i, self.args.p)) self.srv.bind((self.args.i, self.args.p))
except (OSError, socket.error) as ex: except (OSError, socket.error) as ex:

View File

@@ -6,14 +6,25 @@ import os
import re import re
import time import time
import math import math
import json
import gzip
import stat
import shutil import shutil
import base64 import base64
import hashlib import hashlib
import threading import threading
from copy import deepcopy from copy import deepcopy
from .__init__ import WINDOWS from .__init__ import WINDOWS, PY2
from .util import Pebkac, Queue, fsenc, sanitize_fn, ren_open from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open, atomic_move
HAVE_SQLITE3 = False
try:
import sqlite3
HAVE_SQLITE3 = True
except:
pass
class Up2k(object): class Up2k(object):
@@ -22,20 +33,21 @@ class Up2k(object):
* documentation * documentation
* registry persistence * registry persistence
* ~/.config flatfiles for active jobs * ~/.config flatfiles for active jobs
* wark->path database for finished uploads
""" """
def __init__(self, broker): def __init__(self, broker):
self.broker = broker self.broker = broker
self.args = broker.args self.args = broker.args
self.log = broker.log self.log = broker.log
self.persist = self.args.e2d
# config # config
self.salt = "hunter2" # TODO: config self.salt = "hunter2" # TODO: config
# state # state
self.registry = {}
self.mutex = threading.Lock() self.mutex = threading.Lock()
self.registry = {}
self.db = {}
if WINDOWS: if WINDOWS:
# usually fails to set lastmod too quickly # usually fails to set lastmod too quickly
@@ -44,53 +56,291 @@ class Up2k(object):
thr.daemon = True thr.daemon = True
thr.start() thr.start()
if self.persist:
thr = threading.Thread(target=self._snapshot)
thr.daemon = True
thr.start()
# static # static
self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$")
if self.persist and not HAVE_SQLITE3:
m = "could not initialize sqlite3, will use in-memory registry only"
self.log("up2k", m)
def _vis_job_progress(self, job):
perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"]))
path = os.path.join(job["ptop"], job["prel"], job["name"])
return "{:5.1f}% {}".format(perc, path)
def _vis_reg_progress(self, reg):
ret = []
for _, job in reg.items():
ret.append(self._vis_job_progress(job))
return ret
def register_vpath(self, ptop):
with self.mutex:
if ptop in self.registry:
return None
reg = {}
path = os.path.join(ptop, ".hist", "up2k.snap")
if self.persist and os.path.exists(path):
with gzip.GzipFile(path, "rb") as f:
j = f.read().decode("utf-8")
reg = json.loads(j)
for _, job in reg.items():
job["poke"] = time.time()
m = "loaded snap {} |{}|".format(path, len(reg.keys()))
m = [m] + self._vis_reg_progress(reg)
self.log("up2k", "\n".join(m))
self.registry[ptop] = reg
if not self.persist or not HAVE_SQLITE3:
return None
try:
os.mkdir(os.path.join(ptop, ".hist"))
except:
pass
db_path = os.path.join(ptop, ".hist", "up2k.db")
if ptop in self.db:
# self.db[ptop].close()
return None
try:
db = self._open_db(db_path)
self.db[ptop] = db
return db
except Exception as ex:
m = "failed to open [{}]: {}".format(ptop, repr(ex))
self.log("up2k", m)
return None
def build_indexes(self, writeables):
tops = [d.realpath for d in writeables]
for top in tops:
db = self.register_vpath(top)
if db:
# can be symlink so don't `and d.startswith(top)``
excl = set([d for d in tops if d != top])
self._build_dir([db, 0], top, excl, top)
self._drop_lost(db, top)
db.commit()
def _build_dir(self, dbw, top, excl, cdir):
try:
inodes = [fsdec(x) for x in os.listdir(fsenc(cdir))]
except Exception as ex:
self.log("up2k", "listdir: " + repr(ex))
return
histdir = os.path.join(top, ".hist")
for inode in inodes:
abspath = os.path.join(cdir, inode)
try:
inf = os.stat(fsenc(abspath))
except Exception as ex:
self.log("up2k", "stat: " + repr(ex))
continue
if stat.S_ISDIR(inf.st_mode):
if abspath in excl or abspath == histdir:
continue
# self.log("up2k", " dir: {}".format(abspath))
self._build_dir(dbw, top, excl, abspath)
else:
# self.log("up2k", "file: {}".format(abspath))
rp = abspath[len(top) :].replace("\\", "/").strip("/")
c = dbw[0].execute("select * from up where rp = ?", (rp,))
in_db = list(c.fetchall())
if in_db:
_, dts, dsz, _ = in_db[0]
if len(in_db) > 1:
m = "WARN: multiple entries: [{}] => [{}] ({})"
self.log("up2k", m.format(top, rp, len(in_db)))
dts = -1
if dts == inf.st_mtime and dsz == inf.st_size:
continue
m = "reindex [{}] => [{}] ({}/{}) ({}/{})".format(
top, rp, dts, inf.st_mtime, dsz, inf.st_size
)
self.log("up2k", m)
self.db_rm(dbw[0], rp)
dbw[1] += 1
in_db = None
self.log("up2k", "file: {}".format(abspath))
try:
hashes = self._hashlist_from_file(abspath)
except Exception as ex:
self.log("up2k", "hash: " + repr(ex))
continue
wark = self._wark_from_hashlist(inf.st_size, hashes)
self.db_add(dbw[0], wark, rp, inf.st_mtime, inf.st_size)
dbw[1] += 1
if dbw[1] > 1024:
dbw[0].commit()
dbw[1] = 0
def _drop_lost(self, db, top):
rm = []
c = db.execute("select * from up")
for dwark, dts, dsz, drp in c:
abspath = os.path.join(top, drp)
try:
if not os.path.exists(fsenc(abspath)):
rm.append(drp)
except Exception as ex:
self.log("up2k", "stat-rm: " + repr(ex))
if not rm:
return
self.log("up2k", "forgetting {} deleted files".format(len(rm)))
for rp in rm:
self.db_rm(db, rp)
def _open_db(self, db_path):
conn = sqlite3.connect(db_path, check_same_thread=False)
try:
c = conn.execute(r"select * from kv where k = 'sver'")
rows = c.fetchall()
if rows:
ver = rows[0][1]
else:
self.log("up2k", "WARN: no sver in kv, DB corrupt?")
ver = "unknown"
if ver == "1":
try:
nfiles = next(conn.execute("select count(w) from up"))[0]
self.log("up2k", "found DB at {} |{}|".format(db_path, nfiles))
return conn
except Exception as ex:
m = "WARN: could not list files, DB corrupt?\n " + repr(ex)
self.log("up2k", m)
m = "REPLACING unsupported DB (v.{}) at {}".format(ver, db_path)
self.log("up2k", m)
conn.close()
os.unlink(db_path)
conn = sqlite3.connect(db_path, check_same_thread=False)
except:
pass
# sqlite is variable-width only, no point in using char/nchar/varchar
for cmd in [
r"create table kv (k text, v text)",
r"create table up (w text, mt int, sz int, rp text)",
r"insert into kv values ('sver', '1')",
r"create index up_w on up(w)",
]:
conn.execute(cmd)
conn.commit()
self.log("up2k", "created DB at {}".format(db_path))
return conn
def handle_json(self, cj): def handle_json(self, cj):
self.register_vpath(cj["ptop"])
cj["name"] = sanitize_fn(cj["name"]) cj["name"] = sanitize_fn(cj["name"])
cj["poke"] = time.time()
wark = self._get_wark(cj) wark = self._get_wark(cj)
now = time.time() now = time.time()
job = None
with self.mutex: with self.mutex:
# TODO use registry persistence here to symlink any matching wark db = self.db.get(cj["ptop"], None)
if wark in self.registry: reg = self.registry[cj["ptop"]]
job = self.registry[wark] if db:
if job["rdir"] != cj["rdir"] or job["name"] != cj["name"]: cur = db.execute(r"select * from up where w = ?", (wark,))
src = os.path.join(job["rdir"], job["name"]) for _, dtime, dsize, dp_rel in cur:
dst = os.path.join(cj["rdir"], cj["name"]) dp_abs = os.path.join(cj["ptop"], dp_rel).replace("\\", "/")
# relying on path.exists to return false on broken symlinks
if os.path.exists(fsenc(dp_abs)):
try:
prel, name = dp_rel.rsplit("/", 1)
except:
prel = ""
name = dp_rel
job = {
"name": name,
"prel": prel,
"vtop": cj["vtop"],
"ptop": cj["ptop"],
"flag": cj["flag"],
"size": dsize,
"lmod": dtime,
"hash": [],
"need": [],
}
break
if job and wark in reg:
del reg[wark]
if job or wark in reg:
job = job or reg[wark]
if job["prel"] != cj["prel"] or job["name"] != cj["name"]:
src = os.path.join(job["ptop"], job["prel"], job["name"])
dst = os.path.join(cj["ptop"], cj["prel"], cj["name"])
vsrc = os.path.join(job["vtop"], job["prel"], job["name"])
vsrc = vsrc.replace("\\", "/") # just for prints anyways
if job["need"]: if job["need"]:
self.log("up2k", "unfinished:\n {0}\n {1}".format(src, dst)) self.log("up2k", "unfinished:\n {0}\n {1}".format(src, dst))
err = "partial upload exists at a different location; please resume uploading here instead:\n{0}{1} ".format( err = "partial upload exists at a different location; please resume uploading here instead:\n"
job["vdir"], job["name"] err += vsrc + " "
) raise Pebkac(400, err)
elif "nodupe" in job["flag"]:
self.log("up2k", "dupe-reject:\n {0}\n {1}".format(src, dst))
err = "upload rejected, file already exists:\n " + vsrc + " "
raise Pebkac(400, err) raise Pebkac(400, err)
else: else:
# symlink to the client-provided name, # symlink to the client-provided name,
# returning the previous upload info # returning the previous upload info
job = deepcopy(job) job = deepcopy(job)
job["rdir"] = cj["rdir"] for k in ["ptop", "vtop", "prel"]:
job["name"] = self._untaken(cj["rdir"], cj["name"], now, cj["addr"]) job[k] = cj[k]
dst = os.path.join(job["rdir"], job["name"])
pdir = os.path.join(cj["ptop"], cj["prel"])
job["name"] = self._untaken(pdir, cj["name"], now, cj["addr"])
dst = os.path.join(job["ptop"], job["prel"], job["name"])
os.unlink(fsenc(dst)) # TODO ed pls os.unlink(fsenc(dst)) # TODO ed pls
self._symlink(src, dst) self._symlink(src, dst)
else:
if not job:
job = { job = {
"wark": wark, "wark": wark,
"t0": now, "t0": now,
"addr": cj["addr"],
"vdir": cj["vdir"],
"rdir": cj["rdir"],
# client-provided, sanitized by _get_wark:
"name": cj["name"],
"size": cj["size"],
"lmod": cj["lmod"],
"hash": deepcopy(cj["hash"]), "hash": deepcopy(cj["hash"]),
"need": [],
} }
# client-provided, sanitized by _get_wark: name, size, lmod
for k in [
"addr",
"vtop",
"ptop",
"prel",
"flag",
"name",
"size",
"lmod",
]:
job[k] = cj[k]
# one chunk may occur multiple times in a file; # one chunk may occur multiple times in a file;
# filter to unique values for the list of missing chunks # filter to unique values for the list of missing chunks
# (preserve order to reduce disk thrashing) # (preserve order to reduce disk thrashing)
job["need"] = []
lut = {} lut = {}
for k in cj["hash"]: for k in cj["hash"]:
if k not in lut: if k not in lut:
@@ -139,40 +389,58 @@ class Up2k(object):
lsrc = "../" * (len(lsrc) - 1) + "/".join(lsrc) lsrc = "../" * (len(lsrc) - 1) + "/".join(lsrc)
os.symlink(fsenc(lsrc), fsenc(ldst)) os.symlink(fsenc(lsrc), fsenc(ldst))
except (AttributeError, OSError) as ex: except (AttributeError, OSError) as ex:
self.log("up2k", "cannot symlink; creating copy") self.log("up2k", "cannot symlink; creating copy: " + repr(ex))
shutil.copy2(fsenc(src), fsenc(dst)) shutil.copy2(fsenc(src), fsenc(dst))
def handle_chunk(self, wark, chash): def handle_chunk(self, ptop, wark, chash):
with self.mutex: with self.mutex:
job = self.registry.get(wark) job = self.registry[ptop].get(wark, None)
if not job: if not job:
raise Pebkac(404, "unknown wark") raise Pebkac(400, "unknown wark")
if chash not in job["need"]: if chash not in job["need"]:
raise Pebkac(200, "already got that but thanks??") raise Pebkac(200, "already got that but thanks??")
nchunk = [n for n, v in enumerate(job["hash"]) if v == chash] nchunk = [n for n, v in enumerate(job["hash"]) if v == chash]
if not nchunk: if not nchunk:
raise Pebkac(404, "unknown chunk") raise Pebkac(400, "unknown chunk")
job["poke"] = time.time()
chunksize = self._get_chunksize(job["size"]) chunksize = self._get_chunksize(job["size"])
ofs = [chunksize * x for x in nchunk] ofs = [chunksize * x for x in nchunk]
path = os.path.join(job["rdir"], job["name"]) path = os.path.join(job["ptop"], job["prel"], job["tnam"])
return [chunksize, ofs, path, job["lmod"]] return [chunksize, ofs, path, job["lmod"]]
def confirm_chunk(self, wark, chash): def confirm_chunk(self, ptop, wark, chash):
with self.mutex: with self.mutex:
job = self.registry[wark] job = self.registry[ptop][wark]
pdir = os.path.join(job["ptop"], job["prel"])
src = os.path.join(pdir, job["tnam"])
dst = os.path.join(pdir, job["name"])
job["need"].remove(chash) job["need"].remove(chash)
ret = len(job["need"]) ret = len(job["need"])
if ret > 0:
return ret, src
if WINDOWS and ret == 0: atomic_move(src, dst)
path = os.path.join(job["rdir"], job["name"])
self.lastmod_q.put([path, (int(time.time()), int(job["lmod"]))])
return ret if WINDOWS:
self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))])
db = self.db.get(job["ptop"], None)
if db:
rp = os.path.join(job["prel"], job["name"]).replace("\\", "/")
self.db_rm(db, rp)
self.db_add(db, job["wark"], rp, job["lmod"], job["size"])
db.commit()
del self.registry[ptop][wark]
# in-memory registry is reserved for unfinished uploads
return ret, dst
def _get_chunksize(self, filesize): def _get_chunksize(self, filesize):
chunksize = 1024 * 1024 chunksize = 1024 * 1024
@@ -186,6 +454,14 @@ class Up2k(object):
chunksize += stepsize chunksize += stepsize
stepsize *= mul stepsize *= mul
def db_rm(self, db, rp):
db.execute("delete from up where rp = ?", (rp,))
def db_add(self, db, wark, rp, ts, sz):
db.execute(
"insert into up values (?,?,?,?)", (wark, ts, sz, rp,),
)
def _get_wark(self, cj): def _get_wark(self, cj):
if len(cj["name"]) > 1024 or len(cj["hash"]) > 512 * 1024: # 16TiB if len(cj["name"]) > 1024 or len(cj["hash"]) > 512 * 1024: # 16TiB
raise Pebkac(400, "name or numchunks not according to spec") raise Pebkac(400, "name or numchunks not according to spec")
@@ -202,9 +478,13 @@ class Up2k(object):
except: except:
cj["lmod"] = int(time.time()) cj["lmod"] = int(time.time())
# server-reproducible file identifier, independent of name or location wark = self._wark_from_hashlist(cj["size"], cj["hash"])
ident = [self.salt, str(cj["size"])] return wark
ident.extend(cj["hash"])
def _wark_from_hashlist(self, filesize, hashes):
""" server-reproducible file identifier, independent of name or location """
ident = [self.salt, str(filesize)]
ident.extend(hashes)
ident = "\n".join(ident) ident = "\n".join(ident)
hasher = hashlib.sha512() hasher = hashlib.sha512()
@@ -214,11 +494,38 @@ class Up2k(object):
wark = base64.urlsafe_b64encode(digest) wark = base64.urlsafe_b64encode(digest)
return wark.decode("utf-8").rstrip("=") return wark.decode("utf-8").rstrip("=")
def _hashlist_from_file(self, path):
fsz = os.path.getsize(path)
csz = self._get_chunksize(fsz)
ret = []
with open(path, "rb", 512 * 1024) as f:
while fsz > 0:
hashobj = hashlib.sha512()
rem = min(csz, fsz)
fsz -= rem
while rem > 0:
buf = f.read(min(rem, 64 * 1024))
if not buf:
raise Exception("EOF at " + str(f.tell()))
hashobj.update(buf)
rem -= len(buf)
digest = hashobj.digest()[:32]
digest = base64.urlsafe_b64encode(digest)
ret.append(digest.decode("utf-8").rstrip("="))
return ret
def _new_upload(self, job): def _new_upload(self, job):
self.registry[job["wark"]] = job self.registry[job["ptop"]][job["wark"]] = job
pdir = os.path.join(job["ptop"], job["prel"])
job["name"] = self._untaken(pdir, job["name"], job["t0"], job["addr"])
tnam = job["name"] + ".PARTIAL"
suffix = ".{:.6f}-{}".format(job["t0"], job["addr"]) suffix = ".{:.6f}-{}".format(job["t0"], job["addr"])
with ren_open(job["name"], "wb", fdir=job["rdir"], suffix=suffix) as f: with ren_open(tnam, "wb", fdir=pdir, suffix=suffix) as f:
f, job["name"] = f["orz"] f, job["tnam"] = f["orz"]
f.seek(job["size"] - 1) f.seek(job["size"] - 1)
f.write(b"e") f.write(b"e")
@@ -235,3 +542,53 @@ class Up2k(object):
os.utime(fsenc(path), times) os.utime(fsenc(path), times)
except: except:
self.log("lmod", "failed to utime ({}, {})".format(path, times)) self.log("lmod", "failed to utime ({}, {})".format(path, times))
def _snapshot(self):
persist_interval = 30 # persist unfinished uploads index every 30 sec
discard_interval = 3600 # drop unfinished uploads after 1 hour inactivity
prev = {}
while True:
time.sleep(persist_interval)
with self.mutex:
for k, reg in self.registry.items():
self._snap_reg(prev, k, reg, discard_interval)
def _snap_reg(self, prev, k, reg, discard_interval):
now = time.time()
rm = [x for x in reg.values() if now - x["poke"] > discard_interval]
if rm:
m = "dropping {} abandoned uploads in {}".format(len(rm), k)
vis = [self._vis_job_progress(x) for x in rm]
self.log("up2k", "\n".join([m] + vis))
for job in rm:
del reg[job["wark"]]
try:
# remove the placeholder zero-byte file (keep the PARTIAL)
path = os.path.join(job["ptop"], job["prel"], job["name"])
if os.path.getsize(path) == 0:
os.unlink(path)
except:
pass
path = os.path.join(k, ".hist", "up2k.snap")
if not reg:
if k not in prev or prev[k] is not None:
prev[k] = None
if os.path.exists(path):
os.unlink(path)
return
newest = max(x["poke"] for _, x in reg.items()) if reg else 0
etag = [len(reg), newest]
if etag == prev.get(k, None):
return
path2 = "{}.{}".format(path, os.getpid())
j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8")
with gzip.GzipFile(path2, "wb") as f:
f.write(j)
atomic_move(path2, path)
self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys())))
prev[k] = etag

View File

@@ -6,6 +6,7 @@ import os
import sys import sys
import time import time
import base64 import base64
import select
import struct import struct
import hashlib import hashlib
import platform import platform
@@ -548,6 +549,16 @@ else:
fsdec = w8dec fsdec = w8dec
def atomic_move(src, dst):
if not PY2:
os.replace(src, dst)
else:
if os.path.exists(dst):
os.unlink(dst)
os.rename(src, dst)
def read_socket(sr, total_size): def read_socket(sr, total_size):
remains = total_size remains = total_size
while remains > 0: while remains > 0:
@@ -591,6 +602,46 @@ def hashcopy(actor, fin, fout):
return tlen, hashobj.hexdigest(), digest_b64 return tlen, hashobj.hexdigest(), digest_b64
def sendfile_py(lower, upper, f, s):
remains = upper - lower
f.seek(lower)
while remains > 0:
# time.sleep(0.01)
buf = f.read(min(4096, remains))
if not buf:
return remains
try:
s.sendall(buf)
remains -= len(buf)
except:
return remains
return 0
def sendfile_kern(lower, upper, f, s):
out_fd = s.fileno()
in_fd = f.fileno()
ofs = lower
while ofs < upper:
try:
req = min(2 ** 30, upper - ofs)
select.select([], [out_fd], [], 10)
n = os.sendfile(out_fd, in_fd, ofs, req)
except Exception as ex:
# print("sendfile: " + repr(ex))
n = 0
if n <= 0:
return upper - ofs
ofs += n
# print("sendfile: ok, sent {} now, {} total, {} remains".format(n, ofs - lower, upper - ofs))
return 0
def unescape_cookie(orig): def unescape_cookie(orig):
# mw=idk; doot=qwe%2Crty%3Basd+fgh%2Bjkl%25zxc%26vbn # qwe,rty;asd fgh+jkl%zxc&vbn # mw=idk; doot=qwe%2Crty%3Basd+fgh%2Bjkl%25zxc%26vbn # qwe,rty;asd fgh+jkl%zxc&vbn
ret = "" ret = ""
@@ -671,3 +722,6 @@ class Pebkac(Exception):
def __init__(self, code, msg=None): def __init__(self, code, msg=None):
super(Pebkac, self).__init__(msg or HTTPCODE[code]) super(Pebkac, self).__init__(msg or HTTPCODE[code])
self.code = code self.code = code
def __repr__(self):
return "Pebkac({}, {})".format(self.code, repr(self.args))

View File

@@ -8,7 +8,14 @@ function dbg(msg) {
function ev(e) { function ev(e) {
e = e || window.event; e = e || window.event;
e.preventDefault ? e.preventDefault() : (e.returnValue = false);
if (e.preventDefault)
e.preventDefault()
if (e.stopPropagation)
e.stopPropagation();
e.returnValue = false;
return e; return e;
} }
@@ -315,8 +322,12 @@ var vbar = (function () {
var rect = pbar.pcan.getBoundingClientRect(); var rect = pbar.pcan.getBoundingClientRect();
var x = e.clientX - rect.left; var x = e.clientX - rect.left;
var mul = x * 1.0 / rect.width; var mul = x * 1.0 / rect.width;
var seek = mp.au.duration * mul;
console.log('seek: ' + seek);
if (!isFinite(seek))
return;
mp.au.currentTime = mp.au.duration * mul; mp.au.currentTime = seek;
if (mp.au === mp.au_native) if (mp.au === mp.au_native)
// hack: ogv.js breaks on .play() during playback // hack: ogv.js breaks on .play() during playback
@@ -443,7 +454,8 @@ function play(tid, call_depth) {
mp.au.tid = tid; mp.au.tid = tid;
mp.au.src = url; mp.au.src = url;
mp.au.volume = mp.expvol(); mp.au.volume = mp.expvol();
setclass('trk' + tid, 'play act'); var oid = 'trk' + tid;
setclass(oid, 'play act');
try { try {
if (hack_attempt_play) if (hack_attempt_play)
@@ -452,7 +464,11 @@ function play(tid, call_depth) {
if (mp.au.paused) if (mp.au.paused)
autoplay_blocked(); autoplay_blocked();
location.hash = 'trk' + tid; var o = ebi(oid);
o.setAttribute('id', 'thx_js');
location.hash = oid;
o.setAttribute('id', oid);
pbar.drawbuf(); pbar.drawbuf();
return true; return true;
} }

View File

@@ -670,8 +670,12 @@ function up2k_init(have_crypto) {
else { else {
var err = ""; var err = "";
var rsp = (xhr.responseText + ''); var rsp = (xhr.responseText + '');
if (rsp.indexOf('partial upload exists') !== -1) { if (rsp.indexOf('partial upload exists') !== -1 ||
err = rsp.slice(5); rsp.indexOf('file already exists') !== -1) {
err = rsp;
var ofs = err.lastIndexOf(' : ');
if (ofs > 0)
err = err.slice(0, ofs);
} }
if (err != "") { if (err != "") {
ebi('f{0}t'.format(t.n)).innerHTML = "ERROR"; ebi('f{0}t'.format(t.n)).innerHTML = "ERROR";

View File

@@ -4,10 +4,10 @@ import os
import time import time
""" """
mkdir -p /dev/shm/fusefuzz/{r,v} td=/dev/shm/; [ -e $td ] || td=$HOME; mkdir -p $td/fusefuzz/{r,v}
PYTHONPATH=.. python3 -m copyparty -v /dev/shm/fusefuzz/r::r -i 127.0.0.1 PYTHONPATH=.. python3 -m copyparty -v $td/fusefuzz/r::r -i 127.0.0.1
../bin/copyparty-fuse.py /dev/shm/fusefuzz/v http://127.0.0.1:3923/ 2 0 ../bin/copyparty-fuse.py http://127.0.0.1:3923/ $td/fusefuzz/v -cf 2 -cd 0.5
(d="$PWD"; cd /dev/shm/fusefuzz && "$d"/fusefuzz.py) (d="$PWD"; cd $td/fusefuzz && "$d"/fusefuzz.py)
""" """

View File

@@ -115,7 +115,7 @@ git describe --tags >/dev/null 2>/dev/null && {
exit 1 exit 1
} }
dt="$(git log -1 --format=%cd --date=format:'%Y,%m,%d' | sed 's/,0?/, /g')" dt="$(git log -1 --format=%cd --date=format:'%Y,%m,%d' | sed -E 's/,0?/, /g')"
printf 'git %3s: \033[36m%s\033[0m\n' ver "$ver" dt "$dt" printf 'git %3s: \033[36m%s\033[0m\n' ver "$ver" dt "$dt"
sed -ri ' sed -ri '
s/^(VERSION =)(.*)/#\1\2\n\1 ('"$t_ver"')/; s/^(VERSION =)(.*)/#\1\2\n\1 ('"$t_ver"')/;

View File

@@ -16,15 +16,15 @@ which md5sum 2>/dev/null >/dev/null &&
ver="$1" ver="$1"
[[ "x$ver" == x ]] && [ "x$ver" = x ] &&
{ {
echo "need argument 1: version" echo "need argument 1: version"
echo echo
exit 1 exit 1
} }
[[ -e copyparty/__main__.py ]] || cd .. [ -e copyparty/__main__.py ] || cd ..
[[ -e copyparty/__main__.py ]] || [ -e copyparty/__main__.py ] ||
{ {
echo "run me from within the project root folder" echo "run me from within the project root folder"
echo echo
@@ -35,8 +35,8 @@ mkdir -p dist
zip_path="$(pwd)/dist/copyparty-$ver.zip" zip_path="$(pwd)/dist/copyparty-$ver.zip"
tgz_path="$(pwd)/dist/copyparty-$ver.tar.gz" tgz_path="$(pwd)/dist/copyparty-$ver.tar.gz"
[[ -e "$zip_path" ]] || [ -e "$zip_path" ] ||
[[ -e "$tgz_path" ]] && [ -e "$tgz_path" ] &&
{ {
echo "found existing archives for this version" echo "found existing archives for this version"
echo " $zip_path" echo " $zip_path"