mirror of
https://github.com/zulip/zulip.git
synced 2025-11-03 05:23:35 +00:00
cache: Flush caches from all known key prefixes.
When flushing caches, we want to ensure that even processes which may have a wrong cache-key-prefix know to fetch the latest data from the database. This is complicated by the cache-key-prefixes being stored on disk, and thus checking that every cache delete is not sufficiently performant. We store the list of cache-key-prefixes in the cache, itself, with no prefix. This cache is updated when a new cache-key is written, and is also allowed to lapse after 24 hours. Updating this global cache entry on new prefix creation ensures that even a not-yet-restarted-into deployment will have its caches appropriately purged if changes are made to the underlying data. However, this both adds a cache-get, as well as multiplies the size of all cache clears; for large bulk clears (e.g. for stream renames, which clear the cache for all message-ids in them) this may prove untenable.
This commit is contained in:
committed by
Tim Abbott
parent
3e421d71ba
commit
6ac9e3328e
@@ -299,29 +299,33 @@ def get_environment() -> str:
|
||||
return "dev"
|
||||
|
||||
|
||||
def get_recent_deployments(threshold_days: int) -> set[str]:
|
||||
def get_recent_deployments(threshold_days: int | None) -> set[str]:
|
||||
# Returns a list of deployments not older than threshold days
|
||||
# including `/root/zulip` directory if it exists.
|
||||
recent = set()
|
||||
threshold_date = datetime.now() - timedelta(days=threshold_days) # noqa: DTZ005
|
||||
for dir_name in os.listdir(DEPLOYMENTS_DIR):
|
||||
target_dir = os.path.join(DEPLOYMENTS_DIR, dir_name)
|
||||
if not os.path.isdir(target_dir):
|
||||
# Skip things like uwsgi sockets, symlinks, etc.
|
||||
continue
|
||||
if not os.path.exists(os.path.join(target_dir, "zerver")):
|
||||
# Skip things like "lock" that aren't actually a deployment directory
|
||||
continue
|
||||
try:
|
||||
date = datetime.strptime(dir_name, TIMESTAMP_FORMAT) # noqa: DTZ007
|
||||
if date >= threshold_date:
|
||||
if threshold_days is not None:
|
||||
threshold_date = datetime.now() - timedelta(days=threshold_days) # noqa: DTZ005
|
||||
else:
|
||||
threshold_date = None
|
||||
if os.path.isdir(DEPLOYMENTS_DIR):
|
||||
for dir_name in os.listdir(DEPLOYMENTS_DIR):
|
||||
target_dir = os.path.join(DEPLOYMENTS_DIR, dir_name)
|
||||
if not os.path.isdir(target_dir):
|
||||
# Skip things like uwsgi sockets, symlinks, etc.
|
||||
continue
|
||||
if not os.path.exists(os.path.join(target_dir, "zerver")):
|
||||
# Skip things like "lock" that aren't actually a deployment directory
|
||||
continue
|
||||
try:
|
||||
date = datetime.strptime(dir_name, TIMESTAMP_FORMAT) # noqa: DTZ007
|
||||
if threshold_date is None or date >= threshold_date:
|
||||
recent.add(target_dir)
|
||||
except ValueError:
|
||||
# Always include deployments whose name is not in the format of a timestamp.
|
||||
recent.add(target_dir)
|
||||
except ValueError:
|
||||
# Always include deployments whose name is not in the format of a timestamp.
|
||||
recent.add(target_dir)
|
||||
# If it is a symlink then include the target as well.
|
||||
if os.path.islink(target_dir):
|
||||
recent.add(os.path.realpath(target_dir))
|
||||
# If it is a symlink then include the target as well.
|
||||
if os.path.islink(target_dir):
|
||||
recent.add(os.path.realpath(target_dir))
|
||||
if os.path.exists("/root/zulip"):
|
||||
recent.add("/root/zulip")
|
||||
return recent
|
||||
|
||||
@@ -17,6 +17,8 @@ from django.core.cache.backends.base import BaseCache
|
||||
from django.db.models import Q, QuerySet
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from scripts.lib.zulip_tools import DEPLOYMENTS_DIR, get_recent_deployments
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# These modules have to be imported for type annotations but
|
||||
# they cannot be imported at runtime due to cyclic dependency.
|
||||
@@ -53,6 +55,22 @@ def remote_cache_stats_finish() -> None:
|
||||
remote_cache_total_time += time.time() - remote_cache_time_start
|
||||
|
||||
|
||||
def update_cached_cache_key_prefixes() -> list[str]:
|
||||
# Clearing cache keys happens for all cache prefixes at once.
|
||||
# Because the list of cache prefixes can only be derived from
|
||||
# reading disk, we cache the list of cache prefixes, itself, in
|
||||
# the cache.
|
||||
found_prefixes: set[str] = set()
|
||||
for deploy_dir in get_recent_deployments(None):
|
||||
filename = os.path.join(deploy_dir, "var", "remote_cache_prefix")
|
||||
if not os.path.exists(filename):
|
||||
continue
|
||||
with open(filename) as f:
|
||||
found_prefixes.add(f.readline().removesuffix("\n"))
|
||||
caches["default"].set("cache_key_prefixes", list(found_prefixes), timeout=60 * 60 * 24) # 24h
|
||||
return list(found_prefixes)
|
||||
|
||||
|
||||
def get_or_create_key_prefix() -> str:
|
||||
if settings.PUPPETEER_TESTS:
|
||||
# This sets the prefix for the benefit of the Puppeteer tests.
|
||||
@@ -89,12 +107,27 @@ def get_or_create_key_prefix() -> str:
|
||||
print("Could not read remote cache key prefix file")
|
||||
sys.exit(1)
|
||||
|
||||
update_cached_cache_key_prefixes()
|
||||
return prefix
|
||||
|
||||
|
||||
KEY_PREFIX: str = get_or_create_key_prefix()
|
||||
|
||||
|
||||
def get_all_cache_key_prefixes() -> list[str]:
|
||||
if not settings.PRODUCTION or not os.path.exists(DEPLOYMENTS_DIR):
|
||||
return [KEY_PREFIX]
|
||||
return get_all_deployment_cache_key_prefixes()
|
||||
|
||||
|
||||
def get_all_deployment_cache_key_prefixes() -> list[str]:
|
||||
stored_prefixes = caches["default"].get("cache_key_prefixes")
|
||||
if stored_prefixes:
|
||||
return stored_prefixes
|
||||
|
||||
return update_cached_cache_key_prefixes()
|
||||
|
||||
|
||||
def bounce_key_prefix_for_testing(test_name: str) -> None:
|
||||
global KEY_PREFIX
|
||||
KEY_PREFIX = test_name + ":" + str(os.getpid()) + ":"
|
||||
@@ -270,16 +303,13 @@ def safe_cache_set_many(
|
||||
|
||||
|
||||
def cache_delete(key: str, cache_name: str | None = None) -> None:
|
||||
final_key = KEY_PREFIX + key
|
||||
validate_cache_key(final_key)
|
||||
|
||||
remote_cache_stats_start()
|
||||
get_cache_backend(cache_name).delete(final_key)
|
||||
remote_cache_stats_finish()
|
||||
cache_delete_many([key], cache_name)
|
||||
|
||||
|
||||
def cache_delete_many(items: Iterable[str], cache_name: str | None = None) -> None:
|
||||
keys = [KEY_PREFIX + item for item in items]
|
||||
keys = []
|
||||
for key_prefix in get_all_cache_key_prefixes():
|
||||
keys += [key_prefix + item for item in items]
|
||||
for key in keys:
|
||||
validate_cache_key(key)
|
||||
remote_cache_stats_start()
|
||||
|
||||
Reference in New Issue
Block a user