mirror of
https://github.com/zulip/zulip.git
synced 2025-11-10 00:46:03 +00:00
migration_status: Add parse_migration_status.
This commit adds `parse_migration_status`, which takes in the string output of `showmigrations` and parse it into key-value pair of installed apps and a list of its migration status. This is a prep commit to rework the check migrations function of import/export which will parse the output of `showmigrations` to write the `migration_status.json` file.
This commit is contained in:
@@ -2,7 +2,15 @@ import os
|
||||
import re
|
||||
from importlib import import_module
|
||||
from io import StringIO
|
||||
from typing import Any
|
||||
from typing import Any, TypeAlias, TypedDict
|
||||
|
||||
AppMigrations: TypeAlias = dict[str, list[str]]
|
||||
|
||||
|
||||
class MigrationStatusJson(TypedDict):
|
||||
migrations_by_app: AppMigrations
|
||||
zulip_version: str
|
||||
|
||||
|
||||
STALE_MIGRATIONS = [
|
||||
# Ignore django-guardian, which we installed until 1.7.0~3134
|
||||
@@ -82,3 +90,39 @@ def get_migration_status(**options: Any) -> str:
|
||||
out.seek(0)
|
||||
output = out.read()
|
||||
return re.sub(r"\x1b\[[0-9;]*m", "", output)
|
||||
|
||||
|
||||
def parse_migration_status(
|
||||
migration_status_print: str, stale_migrations: list[tuple[str, str]] = STALE_MIGRATIONS
|
||||
) -> AppMigrations:
|
||||
lines = migration_status_print.strip().split("\n")
|
||||
migrations_dict: AppMigrations = {}
|
||||
current_app = None
|
||||
line_prefix = ("[X]", "[ ]", "[-]", "(no migrations)")
|
||||
|
||||
stale_migrations_dict: dict[str, list[str]] = {}
|
||||
for app, migration in stale_migrations:
|
||||
if app not in stale_migrations_dict:
|
||||
stale_migrations_dict[app] = []
|
||||
stale_migrations_dict[app].append(migration)
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line.startswith(line_prefix) and line:
|
||||
current_app = line
|
||||
migrations_dict[current_app] = []
|
||||
elif line.startswith(line_prefix):
|
||||
assert current_app is not None
|
||||
apps_stale_migrations = stale_migrations_dict.get(current_app)
|
||||
if (
|
||||
apps_stale_migrations is not None
|
||||
and line != "(no migrations)"
|
||||
and line[4:] in apps_stale_migrations
|
||||
):
|
||||
continue
|
||||
migrations_dict[current_app].append(line)
|
||||
|
||||
# Installed apps that have no migrations and we still use will have
|
||||
# "(no migrations)" as its only "migrations" list. Ones that just
|
||||
# have [] means it's just a left over stale app we can clean up.
|
||||
return {app: migrations for app, migrations in migrations_dict.items() if migrations != []}
|
||||
|
||||
Reference in New Issue
Block a user