Compare commits

...

11 Commits

Author SHA1 Message Date
ed
108665fc4f v0.11.34 2021-07-09 17:12:21 +02:00
ed
ed519c9138 add performance notes 2021-07-09 17:10:37 +02:00
ed
2dd2e2c57e discard logs in mpw 2021-07-09 17:01:11 +02:00
ed
6c3a976222 scale max-clients to mp-workers 2021-07-09 16:48:02 +02:00
ed
80cc26bd95 fix max-client limit 2021-07-09 16:33:11 +02:00
ed
970fb84fd8 hex looks better 2021-07-09 16:11:33 +02:00
ed
20cbcf6931 logging + shutdown cleanup 2021-07-09 16:07:16 +02:00
ed
8fcde2a579 move tcp accept into mp-worker 2021-07-09 15:49:36 +02:00
ed
b32d1f8ad3 make ?stack work anywhere 2021-07-09 13:46:42 +02:00
ed
03513e0cb1 effectively pointless but cool 2021-07-09 03:41:44 +02:00
ed
e041a2b197 fix centos7 support 2021-07-08 23:35:28 +02:00
16 changed files with 252 additions and 284 deletions

View File

@@ -46,6 +46,7 @@ turn your phone or raspi into a portable file server with resumable uploads/down
* [browser support](#browser-support)
* [client examples](#client-examples)
* [up2k](#up2k)
* [performance](#performance)
* [dependencies](#dependencies)
* [optional dependencies](#optional-dependencies)
* [install recommended deps](#install-recommended-deps)
@@ -494,6 +495,23 @@ quick outline of the up2k protocol, see [uploading](#uploading) for the web-clie
* client does another handshake with the hashlist; server replies with OK or a list of chunks to reupload
# performance
defaults are good for most cases, don't mind the `cannot efficiently use multiple CPU cores` message, it's very unlikely to be a problem
below are some tweaks roughly ordered by usefulness:
* `-q` disables logging and can help a bunch, even when combined with `-lo` to redirect logs to file
* `--http-only` or `--https-only` (unless you want to support both protocols) will reduce the delay before a new connection is established
* `--hist` pointing to a fast location (ssd) will make directory listings and searches faster when `-e2d` or `-e2t` is set
* `--no-hash` when indexing a networked disk if you don't care about the actual filehashes and only want the names/tags searchable
* `-j` enables multiprocessing (actual multithreading) and can make copyparty perform better in cpu-intensive workloads, for example:
* huge amount of short-lived connections
* really heavy traffic (downloads/uploads)
...however it adds an overhead to internal communication so it might be a net loss, see if it works 4 u
# dependencies
* `jinja2` (is built into the SFX)

View File

@@ -1,7 +1,15 @@
# when running copyparty behind a reverse-proxy,
# make sure that copyparty allows at least as many clients as the proxy does,
# so run copyparty with -nc 512 if your nginx has the default limits
# (worker_processes 1, worker_connections 512)
# when running copyparty behind a reverse proxy,
# the following arguments are recommended:
#
# -nc 512 important, see next paragraph
# --http-only lower latency on initial connection
# -i 127.0.0.1 only accept connections from nginx
#
# -nc must match or exceed the webserver's max number of concurrent clients;
# nginx default is 512 (worker_processes 1, worker_connections 512)
#
# you may also consider adding -j0 for CPU-intensive configurations
# (not that i can really think of any good examples)
upstream cpp {
server 127.0.0.1:3923;

View File

@@ -298,6 +298,7 @@ def run_argparse(argv, formatter):
ap2.add_argument("-q", action="store_true", help="quiet")
ap2.add_argument("-lo", metavar="PATH", type=str, help="logfile, example: cpp-%%Y-%%m%%d-%%H%%M%%S.txt.xz")
ap2.add_argument("--log-conn", action="store_true", help="print tcp-server msgs")
ap2.add_argument("--log-htp", action="store_true", help="print http-server threadpool scaling")
ap2.add_argument("--ihead", metavar="HEADER", action='append', help="dump incoming header")
ap2.add_argument("--lf-url", metavar="RE", type=str, default=r"^/\.cpr/|\?th=[wj]$", help="dont log URLs matching")
@@ -342,6 +343,7 @@ def run_argparse(argv, formatter):
ap2.add_argument("--no-sendfile", action="store_true", help="disable sendfile")
ap2.add_argument("--no-scandir", action="store_true", help="disable scandir")
ap2.add_argument("--no-fastboot", action="store_true", help="wait for up2k indexing")
ap2.add_argument("--no-htp", action="store_true", help="disable httpserver threadpool, create threads as-needed instead")
ap2.add_argument("--stackmon", metavar="P,S", help="write stacktrace to Path every S second")
return ap.parse_args(args=argv[1:])

View File

@@ -1,8 +1,8 @@
# coding: utf-8
VERSION = (0, 11, 33)
VERSION = (0, 11, 34)
CODENAME = "the grid"
BUILD_DT = (2021, 7, 7)
BUILD_DT = (2021, 7, 9)
S_VERSION = ".".join(map(str, VERSION))
S_BUILD_DT = "{0:04d}-{1:02d}-{2:02d}".format(*BUILD_DT)

View File

@@ -4,17 +4,11 @@ from __future__ import print_function, unicode_literals
import time
import threading
from .__init__ import PY2, WINDOWS, VT100
from .broker_util import try_exec
from .broker_mpw import MpWorker
from .util import mp
if PY2 and not WINDOWS:
from multiprocessing.reduction import ForkingPickler
from StringIO import StringIO as MemesIO # pylint: disable=import-error
class BrokerMp(object):
"""external api; manages MpWorkers"""
@@ -42,7 +36,6 @@ class BrokerMp(object):
proc.q_yield = q_yield
proc.nid = n
proc.clients = {}
proc.workload = 0
thr = threading.Thread(
target=self.collector, args=(proc,), name="mp-collector"
@@ -53,13 +46,6 @@ class BrokerMp(object):
self.procs.append(proc)
proc.start()
if not self.args.q:
thr = threading.Thread(
target=self.debug_load_balancer, name="mp-dbg-loadbalancer"
)
thr.daemon = True
thr.start()
def shutdown(self):
self.log("broker", "shutting down")
for n, proc in enumerate(self.procs):
@@ -89,20 +75,6 @@ class BrokerMp(object):
if dest == "log":
self.log(*args)
elif dest == "workload":
with self.mutex:
proc.workload = args[0]
elif dest == "httpdrop":
addr = args[0]
with self.mutex:
del proc.clients[addr]
if not proc.clients:
proc.workload = 0
self.hub.tcpsrv.num_clients.add(-1)
elif dest == "retq":
# response from previous ipc call
with self.retpend_mutex:
@@ -128,38 +100,9 @@ class BrokerMp(object):
returns a Queue object which eventually contains the response if want_retval
(not-impl here since nothing uses it yet)
"""
if dest == "httpconn":
sck, addr = args
sck2 = sck
if PY2:
buf = MemesIO()
ForkingPickler(buf).dump(sck)
sck2 = buf.getvalue()
proc = sorted(self.procs, key=lambda x: x.workload)[0]
proc.q_pend.put([0, dest, [sck2, addr]])
with self.mutex:
proc.clients[addr] = 50
proc.workload += 50
if dest == "listen":
for p in self.procs:
p.q_pend.put([0, dest, [args[0], len(self.procs)]])
else:
raise Exception("what is " + str(dest))
def debug_load_balancer(self):
fmt = "\033[1m{}\033[0;36m{:4}\033[0m "
if not VT100:
fmt = "({}{:4})"
last = ""
while self.procs:
msg = ""
for proc in self.procs:
msg += fmt.format(len(proc.clients), proc.workload)
if msg != last:
last = msg
with self.hub.log_mutex:
print(msg)
time.sleep(0.1)

View File

@@ -3,18 +3,13 @@ from __future__ import print_function, unicode_literals
from copyparty.authsrv import AuthSrv
import sys
import time
import signal
import threading
from .__init__ import PY2, WINDOWS
from .broker_util import ExceptionalQueue
from .httpsrv import HttpSrv
from .util import FAKE_MP
if PY2 and not WINDOWS:
import pickle # nosec
class MpWorker(object):
"""one single mp instance"""
@@ -25,10 +20,11 @@ class MpWorker(object):
self.args = args
self.n = n
self.log = self._log_disabled if args.q and not args.lo else self._log_enabled
self.retpend = {}
self.retpend_mutex = threading.Lock()
self.mutex = threading.Lock()
self.workload_thr_alive = False
# we inherited signal_handler from parent,
# replace it with something harmless
@@ -40,7 +36,6 @@ class MpWorker(object):
# instantiate all services here (TODO: inheritance?)
self.httpsrv = HttpSrv(self, True)
self.httpsrv.disconnect_func = self.httpdrop
# on winxp and some other platforms,
# use thr.join() to block all signals
@@ -53,15 +48,15 @@ class MpWorker(object):
# print('k')
pass
def log(self, src, msg, c=0):
def _log_enabled(self, src, msg, c=0):
self.q_yield.put([0, "log", [src, msg, c]])
def _log_disabled(self, src, msg, c=0):
pass
def logw(self, msg, c=0):
self.log("mp{}".format(self.n), msg, c)
def httpdrop(self, addr):
self.q_yield.put([0, "httpdrop", [addr]])
def main(self):
while True:
retq_id, dest, args = self.q_pend.get()
@@ -73,24 +68,8 @@ class MpWorker(object):
sys.exit(0)
return
elif dest == "httpconn":
sck, addr = args
if PY2:
sck = pickle.loads(sck) # nosec
if self.args.log_conn:
self.log("%s %s" % addr, "|%sC-qpop" % ("-" * 4,), c="1;30")
self.httpsrv.accept(sck, addr)
with self.mutex:
if not self.workload_thr_alive:
self.workload_thr_alive = True
thr = threading.Thread(
target=self.thr_workload, name="mpw-workload"
)
thr.daemon = True
thr.start()
elif dest == "listen":
self.httpsrv.listen(args[0], args[1])
elif dest == "retq":
# response from previous ipc call
@@ -114,16 +93,3 @@ class MpWorker(object):
self.q_yield.put([retq_id, dest, args])
return retq
def thr_workload(self):
"""announce workloads to MpSrv (the mp controller / loadbalancer)"""
# avoid locking in extract_filedata by tracking difference here
while True:
time.sleep(0.2)
with self.mutex:
if self.httpsrv.num_clients() == 0:
# no clients rn, termiante thread
self.workload_thr_alive = False
return
self.q_yield.put([0, "workload", [self.httpsrv.workload]])

View File

@@ -3,7 +3,6 @@ from __future__ import print_function, unicode_literals
import threading
from .authsrv import AuthSrv
from .httpsrv import HttpSrv
from .broker_util import ExceptionalQueue, try_exec
@@ -21,7 +20,6 @@ class BrokerThr(object):
# instantiate all services here (TODO: inheritance?)
self.httpsrv = HttpSrv(self)
self.httpsrv.disconnect_func = self.httpdrop
def shutdown(self):
# self.log("broker", "shutting down")
@@ -29,12 +27,8 @@ class BrokerThr(object):
pass
def put(self, want_retval, dest, *args):
if dest == "httpconn":
sck, addr = args
if self.args.log_conn:
self.log("%s %s" % addr, "|%sC-qpop" % ("-" * 4,), c="1;30")
self.httpsrv.accept(sck, addr)
if dest == "listen":
self.httpsrv.listen(args[0], 1)
else:
# new ipc invoking managed service in hub
@@ -51,6 +45,3 @@ class BrokerThr(object):
retq = ExceptionalQueue(1)
retq.put(rv)
return retq
def httpdrop(self, addr):
self.hub.tcpsrv.num_clients.add(-1)

View File

@@ -483,7 +483,7 @@ class HttpCli(object):
path = os.devnull
with open(fsenc(path), "wb", 512 * 1024) as f:
post_sz, _, sha_b64 = hashcopy(self.conn, reader, f)
post_sz, _, sha_b64 = hashcopy(reader, f)
if not self.args.nw:
vfs, vrem = vfs.get_dbv(rem)
@@ -715,7 +715,7 @@ class HttpCli(object):
with open(fsenc(path), "rb+", 512 * 1024) as f:
f.seek(cstart[0])
post_sz, _, sha_b64 = hashcopy(self.conn, reader, f)
post_sz, _, sha_b64 = hashcopy(reader, f)
if sha_b64 != chash:
raise Pebkac(
@@ -882,7 +882,7 @@ class HttpCli(object):
with ren_open(fname, "wb", 512 * 1024, **open_args) as f:
f, fname = f["orz"]
self.log("writing to {}/{}".format(fdir, fname))
sz, sha512_hex, _ = hashcopy(self.conn, p_data, f)
sz, sha512_hex, _ = hashcopy(p_data, f)
if sz == 0:
raise Pebkac(400, "empty files in post")
@@ -1065,7 +1065,7 @@ class HttpCli(object):
raise Pebkac(400, "expected body, got {}".format(p_field))
with open(fsenc(fp), "wb", 512 * 1024) as f:
sz, sha512, _ = hashcopy(self.conn, p_data, f)
sz, sha512, _ = hashcopy(p_data, f)
new_lastmod = os.stat(fsenc(fp)).st_mtime
new_lastmod3 = int(new_lastmod * 1000)
@@ -1255,8 +1255,7 @@ class HttpCli(object):
if use_sendfile:
remains = sendfile_kern(lower, upper, f, self.s)
else:
actor = self.conn if self.is_mp else None
remains = sendfile_py(lower, upper, f, self.s, actor)
remains = sendfile_py(lower, upper, f, self.s)
if remains > 0:
logmsg += " \033[31m" + unicode(upper - remains) + "\033[0m"
@@ -1474,7 +1473,7 @@ class HttpCli(object):
raise Pebkac(500, x)
def tx_stack(self):
if not self.readable or not self.writable:
if not self.avol:
raise Pebkac(403, "not admin")
if self.args.no_stack:

View File

@@ -45,7 +45,6 @@ class HttpConn(object):
self.stopping = False
self.nreq = 0
self.nbyte = 0
self.workload = 0
self.u2idx = None
self.log_func = hsrv.log
self.lf_url = re.compile(self.args.lf_url) if self.args.lf_url else None
@@ -184,11 +183,6 @@ class HttpConn(object):
self.sr = Unrecv(self.s)
while not self.stopping:
if self.is_mp:
self.workload += 50
if self.workload >= 2 ** 31:
self.workload = 100
self.nreq += 1
cli = HttpCli(self)
if not cli.run():

View File

@@ -4,8 +4,8 @@ from __future__ import print_function, unicode_literals
import os
import sys
import time
import math
import base64
import struct
import socket
import threading
@@ -26,9 +26,15 @@ except ImportError:
)
sys.exit(1)
from .__init__ import E, MACOS
from .__init__ import E, PY2, MACOS
from .util import spack, min_ex
from .httpconn import HttpConn
if PY2:
import Queue as queue
else:
import queue
class HttpSrv(object):
"""
@@ -43,12 +49,19 @@ class HttpSrv(object):
self.log = broker.log
self.asrv = broker.asrv
self.disconnect_func = None
self.name = "httpsrv-i{:x}".format(os.getpid())
self.mutex = threading.Lock()
self.stopping = False
self.clients = {}
self.workload = 0
self.workload_thr_alive = False
self.tp_nthr = 0 # actual
self.tp_ncli = 0 # fading
self.tp_time = None # latest worker collect
self.tp_q = None if self.args.no_htp else queue.LifoQueue()
self.srvs = []
self.ncli = 0 # exact
self.clients = {} # laggy
self.nclimax = 0
self.cb_ts = 0
self.cb_v = 0
@@ -65,10 +78,105 @@ class HttpSrv(object):
else:
self.cert_path = None
if self.tp_q:
self.start_threads(4)
t = threading.Thread(target=self.thr_scaler)
t.daemon = True
t.start()
def start_threads(self, n):
self.tp_nthr += n
if self.args.log_htp:
self.log(self.name, "workers += {} = {}".format(n, self.tp_nthr), 6)
for _ in range(n):
thr = threading.Thread(
target=self.thr_poolw,
name="httpsrv-poolw",
)
thr.daemon = True
thr.start()
def stop_threads(self, n):
self.tp_nthr -= n
if self.args.log_htp:
self.log(self.name, "workers -= {} = {}".format(n, self.tp_nthr), 6)
for _ in range(n):
self.tp_q.put(None)
def thr_scaler(self):
while True:
time.sleep(2 if self.tp_ncli else 30)
with self.mutex:
self.tp_ncli = max(self.ncli, self.tp_ncli - 2)
if self.tp_nthr > self.tp_ncli + 8:
self.stop_threads(4)
def listen(self, sck, nlisteners):
self.srvs.append(sck)
self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners)
t = threading.Thread(target=self.thr_listen, args=(sck,))
t.daemon = True
t.start()
def thr_listen(self, srv_sck):
"""listens on a shared tcp server"""
ip, port = srv_sck.getsockname()
fno = srv_sck.fileno()
msg = "subscribed @ {}:{} f{}".format(ip, port, fno)
self.log(self.name, msg)
while not self.stopping:
if self.args.log_conn:
self.log(self.name, "|%sC-ncli" % ("-" * 1,), c="1;30")
if self.ncli >= self.nclimax:
self.log(self.name, "at connection limit; waiting", 3)
while self.ncli >= self.nclimax:
time.sleep(0.1)
if self.args.log_conn:
self.log(self.name, "|%sC-acc1" % ("-" * 2,), c="1;30")
try:
sck, addr = srv_sck.accept()
except (OSError, socket.error) as ex:
self.log(self.name, "accept({}): {}".format(fno, ex), c=6)
time.sleep(0.02)
continue
if self.args.log_conn:
m = "|{}C-acc2 \033[0;36m{} \033[3{}m{}".format(
"-" * 3, ip, port % 8, port
)
self.log("%s %s" % addr, m, c="1;30")
self.accept(sck, addr)
def accept(self, sck, addr):
"""takes an incoming tcp connection and creates a thread to handle it"""
if self.args.log_conn:
self.log("%s %s" % addr, "|%sC-cthr" % ("-" * 5,), c="1;30")
now = time.time()
if self.tp_time and now - self.tp_time > 300:
self.tp_q = None
if self.tp_q:
self.tp_q.put((sck, addr))
with self.mutex:
self.ncli += 1
self.tp_time = self.tp_time or now
self.tp_ncli = max(self.tp_ncli, self.ncli + 1)
if self.tp_nthr < self.ncli + 4:
self.start_threads(8)
return
if not self.args.no_htp:
m = "looks like the httpserver threadpool died; please make an issue on github and tell me the story of how you pulled that off, thanks and dog bless\n"
self.log(self.name, m, 1)
with self.mutex:
self.ncli += 1
thr = threading.Thread(
target=self.thr_client,
@@ -78,11 +186,34 @@ class HttpSrv(object):
thr.daemon = True
thr.start()
def num_clients(self):
with self.mutex:
return len(self.clients)
def thr_poolw(self):
while True:
task = self.tp_q.get()
if not task:
break
with self.mutex:
self.tp_time = None
try:
sck, addr = task
me = threading.current_thread()
me.name = (
"httpsrv-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]),
)
self.thr_client(sck, addr)
me.name = "httpsrv-poolw"
except:
self.log(self.name, "thr_client: " + min_ex(), 3)
def shutdown(self):
self.stopping = True
for srv in self.srvs:
try:
srv.close()
except:
pass
clients = list(self.clients.keys())
for cli in clients:
try:
@@ -90,7 +221,14 @@ class HttpSrv(object):
except:
pass
self.log("httpsrv-n", "ok bye")
if self.tp_q:
self.stop_threads(self.tp_nthr)
for _ in range(10):
time.sleep(0.05)
if self.tp_q.empty():
break
self.log("httpsrv-i" + str(os.getpid()), "ok bye")
def thr_client(self, sck, addr):
"""thread managing one tcp client"""
@@ -100,25 +238,15 @@ class HttpSrv(object):
with self.mutex:
self.clients[cli] = 0
if self.is_mp:
self.workload += 50
if not self.workload_thr_alive:
self.workload_thr_alive = True
thr = threading.Thread(
target=self.thr_workload, name="httpsrv-workload"
)
thr.daemon = True
thr.start()
fno = sck.fileno()
try:
if self.args.log_conn:
self.log("%s %s" % addr, "|%sC-crun" % ("-" * 6,), c="1;30")
self.log("%s %s" % addr, "|%sC-crun" % ("-" * 4,), c="1;30")
cli.run()
except (OSError, socket.error) as ex:
if ex.errno not in [10038, 10054, 107, 57, 9]:
if ex.errno not in [10038, 10054, 107, 57, 49, 9]:
self.log(
"%s %s" % addr,
"run({}): {}".format(fno, ex),
@@ -128,7 +256,7 @@ class HttpSrv(object):
finally:
sck = cli.s
if self.args.log_conn:
self.log("%s %s" % addr, "|%sC-cdone" % ("-" * 7,), c="1;30")
self.log("%s %s" % addr, "|%sC-cdone" % ("-" * 5,), c="1;30")
try:
fno = sck.fileno()
@@ -152,35 +280,7 @@ class HttpSrv(object):
finally:
with self.mutex:
del self.clients[cli]
if self.disconnect_func:
self.disconnect_func(addr) # pylint: disable=not-callable
def thr_workload(self):
"""indicates the python interpreter workload caused by this HttpSrv"""
# avoid locking in extract_filedata by tracking difference here
while True:
time.sleep(0.2)
with self.mutex:
if not self.clients:
# no clients rn, termiante thread
self.workload_thr_alive = False
self.workload = 0
return
total = 0
with self.mutex:
for cli in self.clients.keys():
now = cli.workload
delta = now - self.clients[cli]
if delta < 0:
# was reset in HttpCli to prevent overflow
delta = now
total += delta
self.clients[cli] = now
self.workload = total
self.ncli -= 1
def cachebuster(self):
if time.time() - self.cb_ts < 1:
@@ -199,7 +299,7 @@ class HttpSrv(object):
except:
pass
v = base64.urlsafe_b64encode(struct.pack(">xxL", int(v)))
v = base64.urlsafe_b64encode(spack(b">xxL", int(v)))
self.cb_v = v.decode("ascii")[-4:]
self.cb_ts = time.time()
return self.cb_v

View File

@@ -222,16 +222,13 @@ class SvcHub(object):
vmin = sys.version_info[1]
if WINDOWS:
msg = "need python 3.3 or newer for multiprocessing;"
if PY2:
# py2 pickler doesn't support winsock
return msg
elif vmin < 3:
if PY2 or vmin < 3:
return msg
elif MACOS:
return "multiprocessing is wonky on mac osx;"
else:
msg = "need python 2.7 or 3.3+ for multiprocessing;"
if not PY2 and vmin < 3:
msg = "need python 3.3+ for multiprocessing;"
if PY2 or vmin < 3:
return msg
try:

View File

@@ -4,15 +4,14 @@ from __future__ import print_function, unicode_literals
import os
import time
import zlib
import struct
from datetime import datetime
from .sutil import errdesc
from .util import yieldfile, sanitize_fn
from .util import yieldfile, sanitize_fn, spack, sunpack
def dostime2unix(buf):
t, d = struct.unpack("<HH", buf)
t, d = sunpack(b"<HH", buf)
ts = (t & 0x1F) * 2
tm = (t >> 5) & 0x3F
@@ -36,13 +35,13 @@ def unixtime2dos(ts):
bd = ((dy - 1980) << 9) + (dm << 5) + dd
bt = (th << 11) + (tm << 5) + ts // 2
return struct.pack("<HH", bt, bd)
return spack(b"<HH", bt, bd)
def gen_fdesc(sz, crc32, z64):
ret = b"\x50\x4b\x07\x08"
fmt = "<LQQ" if z64 else "<LLL"
ret += struct.pack(fmt, crc32, sz, sz)
fmt = b"<LQQ" if z64 else b"<LLL"
ret += spack(fmt, crc32, sz, sz)
return ret
@@ -66,7 +65,7 @@ def gen_hdr(h_pos, fn, sz, lastmod, utf8, crc32, pre_crc):
req_ver = b"\x2d\x00" if z64 else b"\x0a\x00"
if crc32:
crc32 = struct.pack("<L", crc32)
crc32 = spack(b"<L", crc32)
else:
crc32 = b"\x00" * 4
@@ -87,14 +86,14 @@ def gen_hdr(h_pos, fn, sz, lastmod, utf8, crc32, pre_crc):
# however infozip does actual sz and it even works on winxp
# (same reasning for z64 extradata later)
vsz = 0xFFFFFFFF if z64 else sz
ret += struct.pack("<LL", vsz, vsz)
ret += spack(b"<LL", vsz, vsz)
# windows support (the "?" replace below too)
fn = sanitize_fn(fn, ok="/")
bfn = fn.encode("utf-8" if utf8 else "cp437", "replace").replace(b"?", b"_")
z64_len = len(z64v) * 8 + 4 if z64v else 0
ret += struct.pack("<HH", len(bfn), z64_len)
ret += spack(b"<HH", len(bfn), z64_len)
if h_pos is not None:
# 2b comment, 2b diskno
@@ -106,12 +105,12 @@ def gen_hdr(h_pos, fn, sz, lastmod, utf8, crc32, pre_crc):
ret += b"\x01\x00\x00\x00\xa4\x81"
# 4b local-header-ofs
ret += struct.pack("<L", min(h_pos, 0xFFFFFFFF))
ret += spack(b"<L", min(h_pos, 0xFFFFFFFF))
ret += bfn
if z64v:
ret += struct.pack("<HH" + "Q" * len(z64v), 1, len(z64v) * 8, *z64v)
ret += spack(b"<HH" + b"Q" * len(z64v), 1, len(z64v) * 8, *z64v)
return ret
@@ -136,7 +135,7 @@ def gen_ecdr(items, cdir_pos, cdir_end):
need_64 = nitems == 0xFFFF or 0xFFFFFFFF in [csz, cpos]
# 2b tnfiles, 2b dnfiles, 4b dir sz, 4b dir pos
ret += struct.pack("<HHLL", nitems, nitems, csz, cpos)
ret += spack(b"<HHLL", nitems, nitems, csz, cpos)
# 2b comment length
ret += b"\x00\x00"
@@ -163,7 +162,7 @@ def gen_ecdr64(items, cdir_pos, cdir_end):
# 8b tnfiles, 8b dnfiles, 8b dir sz, 8b dir pos
cdir_sz = cdir_end - cdir_pos
ret += struct.pack("<QQQQ", len(items), len(items), cdir_sz, cdir_pos)
ret += spack(b"<QQQQ", len(items), len(items), cdir_sz, cdir_pos)
return ret
@@ -178,7 +177,7 @@ def gen_ecdr64_loc(ecdr64_pos):
ret = b"\x50\x4b\x06\x07"
# 4b cdisk, 8b start of ecdr64, 4b ndisks
ret += struct.pack("<LQL", 0, ecdr64_pos, 1)
ret += spack(b"<LQL", 0, ecdr64_pos, 1)
return ret

View File

@@ -2,11 +2,9 @@
from __future__ import print_function, unicode_literals
import re
import time
import socket
import select
from .util import chkcmd, Counter
from .util import chkcmd
class TcpSrv(object):
@@ -20,7 +18,6 @@ class TcpSrv(object):
self.args = hub.args
self.log = hub.log
self.num_clients = Counter()
self.stopping = False
ip = "127.0.0.1"
@@ -66,47 +63,13 @@ class TcpSrv(object):
for srv in self.srv:
srv.listen(self.args.nc)
ip, port = srv.getsockname()
msg = "listening @ {0}:{1}".format(ip, port)
fno = srv.fileno()
msg = "listening @ {}:{} f{}".format(ip, port, fno)
self.log("tcpsrv", msg)
if self.args.q:
print(msg)
while not self.stopping:
if self.args.log_conn:
self.log("tcpsrv", "|%sC-ncli" % ("-" * 1,), c="1;30")
if self.num_clients.v >= self.args.nc:
time.sleep(0.1)
continue
if self.args.log_conn:
self.log("tcpsrv", "|%sC-acc1" % ("-" * 2,), c="1;30")
try:
# macos throws bad-fd
ready, _, _ = select.select(self.srv, [], [])
except:
ready = []
if not self.stopping:
raise
for srv in ready:
if self.stopping:
break
sck, addr = srv.accept()
sip, sport = srv.getsockname()
if self.args.log_conn:
self.log(
"%s %s" % addr,
"|{}C-acc2 \033[0;36m{} \033[3{}m{}".format(
"-" * 3, sip, sport % 8, sport
),
c="1;30",
)
self.num_clients.add()
self.hub.broker.put(False, "httpconn", sck, addr)
self.hub.broker.put(False, "listen", srv)
def shutdown(self):
self.stopping = True

View File

@@ -103,13 +103,15 @@ class Up2k(object):
self.deferred_init()
else:
t = threading.Thread(
target=self.deferred_init,
name="up2k-deferred-init",
target=self.deferred_init, name="up2k-deferred-init", args=(0.5,)
)
t.daemon = True
t.start()
def deferred_init(self):
def deferred_init(self, wait=0):
if wait:
time.sleep(wait)
all_vols = self.asrv.vfs.all_vols
have_e2d = self.init_indexes(all_vols)

View File

@@ -42,6 +42,20 @@ else:
from Queue import Queue # pylint: disable=import-error,no-name-in-module
from StringIO import StringIO as BytesIO
try:
struct.unpack(b">i", b"idgi")
spack = struct.pack
sunpack = struct.unpack
except:
def spack(f, *a, **ka):
return struct.pack(f.decode("ascii"), *a, **ka)
def sunpack(f, *a, **ka):
return struct.unpack(f.decode("ascii"), *a, **ka)
surrogateescape.register_surrogateescape()
FS_ENCODING = sys.getfilesystemencoding()
if WINDOWS and PY2:
@@ -123,20 +137,6 @@ REKOBO_KEY = {
REKOBO_LKEY = {k.lower(): v for k, v in REKOBO_KEY.items()}
class Counter(object):
def __init__(self, v=0):
self.v = v
self.mutex = threading.Lock()
def add(self, delta=1):
with self.mutex:
self.v += delta
def set(self, absval):
with self.mutex:
self.v = absval
class Cooldown(object):
def __init__(self, maxage):
self.maxage = maxage
@@ -231,7 +231,7 @@ def nuprint(msg):
def rice_tid():
tid = threading.current_thread().ident
c = struct.unpack(b"B" * 5, struct.pack(b">Q", tid)[-5:])
c = sunpack(b"B" * 5, spack(b">Q", tid)[-5:])
return "".join("\033[1;37;48;5;{}m{:02x}".format(x, x) for x in c) + "\033[0m"
@@ -904,16 +904,10 @@ def yieldfile(fn):
yield buf
def hashcopy(actor, fin, fout):
is_mp = actor.is_mp
def hashcopy(fin, fout):
hashobj = hashlib.sha512()
tlen = 0
for buf in fin:
if is_mp:
actor.workload += 1
if actor.workload > 2 ** 31:
actor.workload = 100
tlen += len(buf)
hashobj.update(buf)
fout.write(buf)
@@ -924,15 +918,10 @@ def hashcopy(actor, fin, fout):
return tlen, hashobj.hexdigest(), digest_b64
def sendfile_py(lower, upper, f, s, actor=None):
def sendfile_py(lower, upper, f, s):
remains = upper - lower
f.seek(lower)
while remains > 0:
if actor:
actor.workload += 1
if actor.workload > 2 ** 31:
actor.workload = 100
# time.sleep(0.01)
buf = f.read(min(1024 * 32, remains))
if not buf:
@@ -1068,10 +1057,7 @@ def gzip_orig_sz(fn):
with open(fsenc(fn), "rb") as f:
f.seek(-4, 2)
rv = f.read(4)
try:
return struct.unpack(b"I", rv)[0]
except:
return struct.unpack("I", rv)[0]
return sunpack(b"I", rv)[0]
def py_desc():

View File

@@ -35,7 +35,7 @@
</table>
</td></tr></table>
<div class="btns">
<a href="{{ avol[0] }}?stack">dump stack</a>
<a href="/?stack">dump stack</a>
</div>
{%- endif %}