migration_status: Refactor parse_migration_status.

This refactors `parse_migration_status` to copy the algorithm of
Django's `showmigrations` command instead of parsing its output. This is
done so that the code is not susceptible to breaking changes if Django
modifies showmigrations's implementation.

The previous `parse_migration_status` logic has been repurposed into a
test utility function (`prase_showmigrations`). It is used to verify
that the new `parse_migration_status` generates output identical to the
actual `showmigrations` command.

The `test_clean_up_migration_status_json` is removed because
`test_parse_migration_status` has covered that behavior.
This commit is contained in:
PieterCK
2025-03-18 16:32:57 +07:00
committed by Tim Abbott
parent bcacd618a0
commit 719f8db654
6 changed files with 147 additions and 133 deletions

View File

@@ -33,11 +33,7 @@ from analytics.models import RealmCount, StreamCount, UserCount
from scripts.lib.zulip_tools import overwrite_symlink
from version import ZULIP_VERSION
from zerver.lib.avatar_hash import user_avatar_base_path_from_ids
from zerver.lib.migration_status import (
MigrationStatusJson,
get_migration_status,
parse_migration_status,
)
from zerver.lib.migration_status import MigrationStatusJson, parse_migration_status
from zerver.lib.pysa import mark_sanitized
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.upload.s3 import get_bucket
@@ -2636,9 +2632,8 @@ def get_realm_exports_serialized(realm: Realm) -> list[dict[str, Any]]:
def export_migration_status(output_dir: str) -> None:
export_showmigration = get_migration_status(close_connection_when_done=False)
migration_status_json = MigrationStatusJson(
migrations_by_app=parse_migration_status(export_showmigration),
migrations_by_app=parse_migration_status(),
zulip_version=ZULIP_VERSION,
)
output_file = os.path.join(output_dir, "migration_status.json")

View File

@@ -32,11 +32,7 @@ from zerver.lib.export import DATE_FIELDS, Field, Path, Record, TableData, Table
from zerver.lib.markdown import markdown_convert
from zerver.lib.markdown import version as markdown_version
from zerver.lib.message import get_last_message_id
from zerver.lib.migration_status import (
MigrationStatusJson,
get_migration_status,
parse_migration_status,
)
from zerver.lib.migration_status import MigrationStatusJson, parse_migration_status
from zerver.lib.mime_types import guess_type
from zerver.lib.partial import partial
from zerver.lib.push_notifications import sends_notifications_directly
@@ -2172,9 +2168,8 @@ def check_migration_status(exported_migration_status: MigrationStatusJson) -> No
custom migrations are identical.
"""
mismatched_migrations_log: dict[str, str] = {}
local_showmigrations = get_migration_status(close_connection_when_done=False)
local_migration_status = MigrationStatusJson(
migrations_by_app=parse_migration_status(local_showmigrations), zulip_version=ZULIP_VERSION
migrations_by_app=parse_migration_status(), zulip_version=ZULIP_VERSION
)
# Different major versions are the most common form of mismatch

View File

@@ -4,6 +4,10 @@ from importlib import import_module
from io import StringIO
from typing import Any, TypeAlias, TypedDict
from django.db import connection
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.recorder import MigrationRecorder
AppMigrations: TypeAlias = dict[str, list[str]]
@@ -93,36 +97,62 @@ def get_migration_status(**options: Any) -> str:
def parse_migration_status(
migration_status_print: str, stale_migrations: list[tuple[str, str]] = STALE_MIGRATIONS
stale_migrations: list[tuple[str, str]] = STALE_MIGRATIONS,
) -> AppMigrations:
lines = migration_status_print.strip().split("\n")
migrations_dict: AppMigrations = {}
current_app = None
line_prefix = ("[X]", "[ ]", "[-]", "(no migrations)")
"""
This is a copy of Django's `showmigrations` command, keep this in sync with
the actual logic from Django. The key differences are, this returns a dict
and filters out any migration found in the `stale_migrations` parameter.
Django's `showmigrations`:
https://github.com/django/django/blob/main/django/core/management/commands/showmigrations.py
"""
# Load migrations from disk/DB
loader = MigrationLoader(connection, ignore_no_migrations=True)
recorder = MigrationRecorder(connection)
recorded_migrations = recorder.applied_migrations()
graph = loader.graph
migrations_dict: AppMigrations = {}
app_names = sorted(loader.migrated_apps)
stale_migrations_dict: dict[str, list[str]] = {}
for app, migration in stale_migrations:
if app not in stale_migrations_dict:
stale_migrations_dict[app] = []
stale_migrations_dict[app].append(migration)
for line in lines:
line = line.strip()
if not line.startswith(line_prefix) and line:
current_app = line
migrations_dict[current_app] = []
elif line.startswith(line_prefix):
assert current_app is not None
apps_stale_migrations = stale_migrations_dict.get(current_app)
# For each app, print its migrations in order from oldest (roots) to
# newest (leaves).
for app_name in app_names:
migrations_dict[app_name] = []
shown = set()
apps_stale_migrations = stale_migrations_dict.get(app_name, [])
for node in graph.leaf_nodes(app_name):
for plan_node in graph.forwards_plan(node):
if (
apps_stale_migrations is not None
and line != "(no migrations)"
and line[4:] in apps_stale_migrations
plan_node not in shown
and plan_node[0] == app_name
and plan_node[1] not in apps_stale_migrations
):
continue
migrations_dict[current_app].append(line)
# Give it a nice title if it's a squashed one
title = plan_node[1]
# Installed apps that have no migrations and we still use will have
# "(no migrations)" as its only "migrations" list. Ones that just
# have [] means it's just a left over stale app we can clean up.
return {app: migrations for app, migrations in migrations_dict.items() if migrations != []}
if graph.nodes[plan_node].replaces:
title += f" ({len(graph.nodes[plan_node].replaces)} squashed migrations)"
applied_migration = loader.applied_migrations.get(plan_node)
# Mark it as applied/unapplied
if applied_migration:
if plan_node in recorded_migrations:
output = f"[X] {title}"
else:
output = f"[-] {title}"
else:
output = f"[ ] {title}"
migrations_dict[app_name].append(output)
shown.add(plan_node)
# If there are no migrations, record as such
if not shown:
output = "(no migrations)"
migrations_dict[app_name].append(output)
return migrations_dict

View File

@@ -1,76 +0,0 @@
analytics
[X] 0001_squashed_0021_alter_fillstate_id (21 squashed migrations)
auth
[X] 0001_initial
[X] 0002_alter_permission_name_max_length
[X] 0003_alter_user_email_max_length
confirmation
[X] 0001_squashed_0014_confirmation_confirmatio_content_80155a_idx (14 squashed migrations)
[X] 0015_alter_confirmation_object_id
contenttypes
[X] 0001_initial
[X] 0002_remove_content_type_name
corporate
[X] 0001_squashed_0044_convert_ids_to_bigints (44 squashed migrations)
guardian
[X] 0001_initial
otp_static
[X] 0001_initial
[X] 0002_throttling
[X] 0003_add_timestamps
otp_totp
[X] 0001_initial
[X] 0002_auto_20190420_0723
[X] 0003_add_timestamps
pgroonga
[X] 0001_enable
[X] 0002_html_escape_subject
[X] 0003_v2_api_upgrade
phonenumber
[X] 0001_squashed_0001_initial (10 squashed migrations)
sessions
[X] 0001_initial
social_django
[X] 0001_initial (2 squashed migrations)
[X] 0002_add_related_name (2 squashed migrations)
[X] 0003_alter_email_max_length (2 squashed migrations)
sites
[X] 0001_initial
[X] 0002_alter_domain_unique
two_factor
[X] 0001_squashed_0008_delete_phonedevice
zerver
[X] 0001_squashed_0569 (545 squashed migrations)
[X] 0002_django_1_8
[X] 0003_custom_indexes
[X] 0004_userprofile_left_side_userlist
[X] 0005_auto_20150920_1340
[X] 0006_zerver_userprofile_email_upper_idx
[X] 0007_userprofile_is_bot_active_indexes
[X] 0008_preregistrationuser_upper_email_idx
[X] 0009_add_missing_migrations
[X] 0010_delete_streamcolor
[X] 0011_remove_guardian
[X] 0012_remove_appledevicetoken
[X] 0013_realmemoji
[X] 0014_realm_emoji_url_length
[X] 0015_attachment
[X] 0016_realm_create_stream_by_admins_only
[X] 0017_userprofile_bot_type
[X] 0018_realm_emoji_message
[X] 0019_preregistrationuser_realm_creation
[X] 0020_add_tracking_attachment
[X] 0021_migrate_attachment_data
[X] 0022_subscription_pin_to_top
[X] 0023_userprofile_default_language
[X] 0024_realm_allow_message_editing
[X] 0025_realm_message_content_edit_limit
[X] 0026_delete_mituser
[X] 0027_realm_default_language
[X] 0028_userprofile_tos_version
[X] 0576_backfill_imageattachment
[X] 0622_backfill_imageattachment_again
default
[X] 0005_auto_20160727_2333
zilencer
[X] 0001_squashed_0064_remotezulipserver_last_merge_base (64 squashed migrations)

View File

@@ -2280,20 +2280,6 @@ class RealmImportExportTest(ExportFile):
)
do_import_realm(get_output_dir(), "test-zulip")
def test_clean_up_migration_status_json(self) -> None:
user = self.example_user("hamlet")
with (
patch("zerver.lib.export.get_migration_status") as mock_export,
):
mock_export.return_value = self.fixture_data(
"with_stale_migrations.txt", "import_fixtures/showmigrations_fixtures"
)
realm = user.realm
self.export_realm_and_create_auditlog(realm)
self.verify_migration_status_json()
class SingleUserExportTest(ExportFile):
def do_files_test(self, is_s3: bool) -> None:

View File

@@ -1,3 +1,8 @@
from unittest.mock import patch
from django.db import connection
from django.db.migrations.recorder import MigrationRecorder
from zerver.lib.migration_status import (
STALE_MIGRATIONS,
AppMigrations,
@@ -8,7 +13,56 @@ from zerver.lib.test_classes import ZulipTestCase
class MigrationStatusTests(ZulipTestCase):
def test_parse_migration_status(self) -> None:
def parse_showmigrations(
self,
migration_status_print: str,
stale_migrations: list[tuple[str, str]] = STALE_MIGRATIONS,
) -> AppMigrations:
"""
Parses the output of Django's `showmigrations` into a data structure
identical to the output `parse_migration_status` generates.
Makes sure this accurately parses the output of `showmigrations`.
"""
lines = migration_status_print.strip().split("\n")
migrations_dict: AppMigrations = {}
current_app = None
line_prefix = ("[X]", "[ ]", "[-]", "(no migrations)")
stale_migrations_dict: dict[str, list[str]] = {}
for app, migration in stale_migrations:
if app not in stale_migrations_dict:
stale_migrations_dict[app] = []
stale_migrations_dict[app].append(migration)
for line in lines:
line = line.strip()
if not line.startswith(line_prefix) and line:
current_app = line
migrations_dict[current_app] = []
elif line.startswith(line_prefix):
assert current_app is not None
apps_stale_migrations = stale_migrations_dict.get(current_app)
if (
apps_stale_migrations is not None
and line != "(no migrations)"
and line[4:] in apps_stale_migrations
):
continue
migrations_dict[current_app].append(line)
# Installed apps that have no migrations and we still use will have
# "(no migrations)" as its only "migrations" list. Ones that just
# have [] means it's just a left over stale app we can clean up.
return {app: migrations for app, migrations in migrations_dict.items() if migrations != []}
def test_parse_showmigrations(self) -> None:
"""
This function tests a helper test function `parse_showmigrations`.
It is critical that this correctly checks the behavior of
`parse_showmigrations`. Make sure it is accurately parsing the
output of `showmigrations`.
"""
showmigrations_sample = """
analytics
[X] 0001_squashed_0021_alter_fillstate_id (21 squashed migrations)
@@ -19,7 +73,7 @@ zerver
two_factor
(no migrations)
"""
app_migrations = parse_migration_status(showmigrations_sample)
app_migrations = self.parse_showmigrations(showmigrations_sample)
expected: AppMigrations = {
"analytics": ["[X] 0001_squashed_0021_alter_fillstate_id (21 squashed migrations)"],
"auth": ["[ ] 0012_alter_user_first_name_max_length"],
@@ -32,12 +86,18 @@ two_factor
# functions are done in the test_import_export.py as part of the import-
# export suite.
showmigrations = get_migration_status(app_label="zerver")
app_migrations = parse_migration_status(showmigrations)
app_migrations = self.parse_showmigrations(showmigrations)
zerver_migrations = app_migrations.get("zerver")
self.assertIsNotNone(zerver_migrations)
self.assertNotEqual(zerver_migrations, [])
def test_parse_stale_migration_status(self) -> None:
def test_parse_showmigrations_filters_out_stale_migrations(self) -> None:
"""
This function tests a helper test function `parse_showmigrations`.
It is critical that this correctly checks the behavior of
`parse_showmigrations`. Make sure it is accurately parsing the
output of `showmigrations`.
"""
assert ("guardian", "0001_initial") in STALE_MIGRATIONS
showmigrations_sample = """
analytics
@@ -51,7 +111,7 @@ two_factor
guardian
[X] 0001_initial
"""
app_migrations = parse_migration_status(showmigrations_sample)
app_migrations = self.parse_showmigrations(showmigrations_sample)
expected: AppMigrations = {
"analytics": ["[X] 0001_squashed_0021_alter_fillstate_id (21 squashed migrations)"],
"auth": ["[ ] 0012_alter_user_first_name_max_length"],
@@ -59,3 +119,27 @@ guardian
"two_factor": ["(no migrations)"],
}
self.assertDictEqual(app_migrations, expected)
def test_parse_migration_status(self) -> None:
"""
This test asserts that the algorithm in `parse_migration_status` is the same
as Django's `showmigrations`.
"""
migration_status_print = get_migration_status()
parsed_showmigrations = self.parse_showmigrations(migration_status_print)
migration_status_dict = parse_migration_status()
self.assertDictEqual(migration_status_dict, parsed_showmigrations)
def test_applied_but_not_recorded(self) -> None:
# Mock applied_migrations() to simulate empty recorded_migrations.
with patch(
"zerver.lib.migration_status.MigrationRecorder.applied_migrations",
):
result = parse_migration_status()
self.assertIn("[-] 0010_alter_group_name_max_length", result["auth"])
def test_generate_unapplied_migration(self) -> None:
recorder = MigrationRecorder(connection)
recorder.record_unapplied("auth", "0010_alter_group_name_max_length")
result = parse_migration_status()
self.assertIn("[ ] 0010_alter_group_name_max_length", result["auth"])