python: Convert function type annotations to Python 3 style.

Generated by com2ann (slightly patched to avoid also converting
assignment type annotations, which require Python 3.6), followed by
some manual whitespace adjustment, and six fixes for runtime issues:

-    def __init__(self, token: Token, parent: Optional[Node]) -> None:
+    def __init__(self, token: Token, parent: "Optional[Node]") -> None:

-def main(options: argparse.Namespace) -> NoReturn:
+def main(options: argparse.Namespace) -> "NoReturn":

-def fetch_request(url: str, callback: Any, **kwargs: Any) -> Generator[Callable[..., Any], Any, None]:
+def fetch_request(url: str, callback: Any, **kwargs: Any) -> "Generator[Callable[..., Any], Any, None]":

-def assert_server_running(server: subprocess.Popen[bytes], log_file: Optional[str]) -> None:
+def assert_server_running(server: "subprocess.Popen[bytes]", log_file: Optional[str]) -> None:

-def server_is_up(server: subprocess.Popen[bytes], log_file: Optional[str]) -> bool:
+def server_is_up(server: "subprocess.Popen[bytes]", log_file: Optional[str]) -> bool:

-    method_kwarg_pairs: List[FuncKwargPair],
+    method_kwarg_pairs: "List[FuncKwargPair]",

Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
This commit is contained in:
Anders Kaseorg
2020-04-18 18:48:37 -07:00
committed by Tim Abbott
parent 43ac901ad9
commit 5901e7ba7e
68 changed files with 389 additions and 691 deletions

View File

@@ -39,8 +39,7 @@ BLUE = '\x1b[34m'
MAGENTA = '\x1b[35m'
CYAN = '\x1b[36m'
def overwrite_symlink(src, dst):
# type: (str, str) -> None
def overwrite_symlink(src: str, dst: str) -> None:
while True:
tmp = tempfile.mktemp(
prefix='.' + os.path.basename(dst) + '.',
@@ -56,8 +55,7 @@ def overwrite_symlink(src, dst):
os.remove(tmp)
raise
def parse_cache_script_args(description):
# type: (str) -> argparse.Namespace
def parse_cache_script_args(description: str) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
@@ -88,8 +86,7 @@ def get_deploy_root() -> str:
os.path.normpath(os.path.join(os.path.dirname(__file__), "..", ".."))
)
def get_deployment_version(extract_path):
# type: (str) -> str
def get_deployment_version(extract_path: str) -> str:
version = '0.0.0'
for item in os.listdir(extract_path):
item_path = os.path.join(extract_path, item)
@@ -101,14 +98,12 @@ def get_deployment_version(extract_path):
break
return version
def is_invalid_upgrade(current_version, new_version):
# type: (str, str) -> bool
def is_invalid_upgrade(current_version: str, new_version: str) -> bool:
if new_version > '1.4.3' and current_version <= '1.3.10':
return True
return False
def subprocess_text_output(args):
# type: (Sequence[str]) -> str
def subprocess_text_output(args: Sequence[str]) -> str:
return subprocess.check_output(args, universal_newlines=True).strip()
def get_zulip_pwent() -> pwd.struct_passwd:
@@ -121,8 +116,7 @@ def get_zulip_pwent() -> pwd.struct_passwd:
# `zulip` user as that's the correct value in production.
return pwd.getpwnam("zulip")
def su_to_zulip(save_suid=False):
# type: (bool) -> None
def su_to_zulip(save_suid: bool = False) -> None:
"""Warning: su_to_zulip assumes that the zulip checkout is owned by
the zulip user (or whatever normal user is running the Zulip
installation). It should never be run from the installer or other
@@ -136,14 +130,12 @@ def su_to_zulip(save_suid=False):
os.setuid(pwent.pw_uid)
os.environ['HOME'] = pwent.pw_dir
def make_deploy_path():
# type: () -> str
def make_deploy_path() -> str:
timestamp = datetime.datetime.now().strftime(TIMESTAMP_FORMAT)
return os.path.join(DEPLOYMENTS_DIR, timestamp)
TEMPLATE_DATABASE_DIR = "test-backend/databases"
def get_dev_uuid_var_path(create_if_missing=False):
# type: (bool) -> str
def get_dev_uuid_var_path(create_if_missing: bool = False) -> str:
zulip_path = get_deploy_root()
uuid_path = os.path.join(os.path.realpath(os.path.dirname(zulip_path)), ".zulip-dev-uuid")
if os.path.exists(uuid_path):
@@ -163,8 +155,7 @@ def get_dev_uuid_var_path(create_if_missing=False):
os.makedirs(result_path, exist_ok=True)
return result_path
def get_deployment_lock(error_rerun_script):
# type: (str) -> None
def get_deployment_lock(error_rerun_script: str) -> None:
start_time = time.time()
got_lock = False
while time.time() - start_time < 300:
@@ -187,12 +178,10 @@ def get_deployment_lock(error_rerun_script):
ENDC)
sys.exit(1)
def release_deployment_lock():
# type: () -> None
def release_deployment_lock() -> None:
shutil.rmtree(LOCK_DIR)
def run(args, **kwargs):
# type: (Sequence[str], **Any) -> None
def run(args: Sequence[str], **kwargs: Any) -> None:
# Output what we're doing in the `set -x` style
print("+ %s" % (" ".join(map(shlex.quote, args)),))
@@ -208,8 +197,7 @@ def run(args, **kwargs):
print()
raise
def log_management_command(cmd, log_path):
# type: (str, str) -> None
def log_management_command(cmd: str, log_path: str) -> None:
log_dir = os.path.dirname(log_path)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
@@ -223,16 +211,14 @@ def log_management_command(cmd, log_path):
logger.info("Ran '%s'" % (cmd,))
def get_environment():
# type: () -> str
def get_environment() -> str:
if os.path.exists(DEPLOYMENTS_DIR):
return "prod"
if os.environ.get("TRAVIS"):
return "travis"
return "dev"
def get_recent_deployments(threshold_days):
# type: (int) -> Set[str]
def get_recent_deployments(threshold_days: int) -> Set[str]:
# Returns a list of deployments not older than threshold days
# including `/root/zulip` directory if it exists.
recent = set()
@@ -259,16 +245,14 @@ def get_recent_deployments(threshold_days):
recent.add("/root/zulip")
return recent
def get_threshold_timestamp(threshold_days):
# type: (int) -> int
def get_threshold_timestamp(threshold_days: int) -> int:
# Given number of days, this function returns timestamp corresponding
# to the time prior to given number of days.
threshold = datetime.datetime.now() - datetime.timedelta(days=threshold_days)
threshold_timestamp = int(time.mktime(threshold.utctimetuple()))
return threshold_timestamp
def get_caches_to_be_purged(caches_dir, caches_in_use, threshold_days):
# type: (str, Set[str], int) -> Set[str]
def get_caches_to_be_purged(caches_dir: str, caches_in_use: Set[str], threshold_days: int) -> Set[str]:
# Given a directory containing caches, a list of caches in use
# and threshold days, this function return a list of caches
# which can be purged. Remove the cache only if it is:
@@ -287,8 +271,9 @@ def get_caches_to_be_purged(caches_dir, caches_in_use, threshold_days):
caches_to_purge.add(cache_dir)
return caches_to_purge
def purge_unused_caches(caches_dir, caches_in_use, cache_type, args):
# type: (str, Set[str], str, argparse.Namespace) -> None
def purge_unused_caches(
caches_dir: str, caches_in_use: Set[str], cache_type: str, args: argparse.Namespace
) -> None:
all_caches = {os.path.join(caches_dir, cache) for cache in os.listdir(caches_dir)}
caches_to_purge = get_caches_to_be_purged(caches_dir, caches_in_use, args.threshold_days)
caches_to_keep = all_caches - caches_to_purge
@@ -298,8 +283,7 @@ def purge_unused_caches(caches_dir, caches_in_use, cache_type, args):
if args.verbose:
print("Done!")
def generate_sha1sum_emoji(zulip_path):
# type: (str) -> str
def generate_sha1sum_emoji(zulip_path: str) -> str:
ZULIP_EMOJI_DIR = os.path.join(zulip_path, 'tools', 'setup', 'emoji')
sha = hashlib.sha1()
@@ -332,8 +316,14 @@ def generate_sha1sum_emoji(zulip_path):
return sha.hexdigest()
def may_be_perform_purging(dirs_to_purge, dirs_to_keep, dir_type, dry_run, verbose, no_headings):
# type: (Set[str], Set[str], str, bool, bool, bool) -> None
def may_be_perform_purging(
dirs_to_purge: Set[str],
dirs_to_keep: Set[str],
dir_type: str,
dry_run: bool,
verbose: bool,
no_headings: bool,
) -> None:
if dry_run:
print("Performing a dry run...")
if not no_headings:
@@ -350,8 +340,7 @@ def may_be_perform_purging(dirs_to_purge, dirs_to_keep, dir_type, dry_run, verbo
print("Keeping used %s: %s" % (dir_type, directory))
@functools.lru_cache(None)
def parse_os_release():
# type: () -> Dict[str, str]
def parse_os_release() -> Dict[str, str]:
"""
Example of the useful subset of the data:
{
@@ -423,8 +412,7 @@ def is_root() -> bool:
return True
return False
def run_as_root(args, **kwargs):
# type: (List[str], **Any) -> None
def run_as_root(args: List[str], **kwargs: Any) -> None:
sudo_args = kwargs.pop('sudo_args', [])
if not is_root():
args = ['sudo'] + sudo_args + ['--'] + args
@@ -454,8 +442,12 @@ def assert_running_as_root(strip_lib_from_paths: bool=False) -> None:
print("{} must be run as root.".format(script_name))
sys.exit(1)
def get_config(config_file, section, key, default_value=""):
# type: (configparser.RawConfigParser, str, str, str) -> str
def get_config(
config_file: configparser.RawConfigParser,
section: str,
key: str,
default_value: str = "",
) -> str:
if config_file.has_option(section, key):
return config_file.get(section, key)
return default_value
@@ -465,8 +457,7 @@ def get_config_file() -> configparser.RawConfigParser:
config_file.read("/etc/zulip/zulip.conf")
return config_file
def get_deploy_options(config_file):
# type: (configparser.RawConfigParser) -> List[str]
def get_deploy_options(config_file: configparser.RawConfigParser) -> List[str]:
return get_config(config_file, 'deployment', 'deploy_options', "").strip().split()
def get_or_create_dev_uuid_var_path(path: str) -> str: