mirror of
				https://github.com/zulip/zulip.git
				synced 2025-10-31 03:53:50 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			148 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			148 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # See https://zulip.readthedocs.io/en/latest/subsystems/caching.html for docs
 | |
| import datetime
 | |
| import logging
 | |
| from typing import Any, Callable, Dict, List, Tuple
 | |
| 
 | |
| from django.conf import settings
 | |
| from django.contrib.sessions.models import Session
 | |
| from django.db.models import Q
 | |
| from django.utils.timezone import now as timezone_now
 | |
| 
 | |
| # This file needs to be different from cache.py because cache.py
 | |
| # cannot import anything from zerver.models or we'd have an import
 | |
| # loop
 | |
| from analytics.models import RealmCount
 | |
| from zerver.lib.cache import (
 | |
|     cache_set_many,
 | |
|     get_remote_cache_requests,
 | |
|     get_remote_cache_time,
 | |
|     to_dict_cache_key_id,
 | |
|     user_profile_by_api_key_cache_key,
 | |
|     user_profile_cache_key,
 | |
| )
 | |
| from zerver.lib.message import MessageDict
 | |
| from zerver.lib.sessions import session_engine
 | |
| from zerver.lib.users import get_all_api_keys
 | |
| from zerver.models import (
 | |
|     Client,
 | |
|     Huddle,
 | |
|     Message,
 | |
|     Stream,
 | |
|     UserProfile,
 | |
|     get_client_cache_key,
 | |
|     get_stream_cache_key,
 | |
|     huddle_hash_cache_key,
 | |
| )
 | |
| 
 | |
| MESSAGE_CACHE_SIZE = 75000
 | |
| 
 | |
| def message_fetch_objects() -> List[Any]:
 | |
|     try:
 | |
|         max_id = Message.objects.only('id').order_by("-id")[0].id
 | |
|     except IndexError:
 | |
|         return []
 | |
|     return Message.objects.select_related().filter(~Q(sender__email='tabbott/extra@mit.edu'),
 | |
|                                                    id__gt=max_id - MESSAGE_CACHE_SIZE)
 | |
| 
 | |
| def message_cache_items(items_for_remote_cache: Dict[str, Tuple[bytes]],
 | |
|                         message: Message) -> None:
 | |
|     '''
 | |
|     Note: this code is untested, and the caller has been
 | |
|     commented out for a while.
 | |
|     '''
 | |
|     key = to_dict_cache_key_id(message.id)
 | |
|     value = MessageDict.to_dict_uncached([message])[message.id]
 | |
|     items_for_remote_cache[key] = (value,)
 | |
| 
 | |
| def user_cache_items(items_for_remote_cache: Dict[str, Tuple[UserProfile]],
 | |
|                      user_profile: UserProfile) -> None:
 | |
|     for api_key in get_all_api_keys(user_profile):
 | |
|         items_for_remote_cache[user_profile_by_api_key_cache_key(api_key)] = (user_profile,)
 | |
|     items_for_remote_cache[user_profile_cache_key(user_profile.email,
 | |
|                                                   user_profile.realm)] = (user_profile,)
 | |
|     # We have other user_profile caches, but none of them are on the
 | |
|     # core serving path for lots of requests.
 | |
| 
 | |
| def stream_cache_items(items_for_remote_cache: Dict[str, Tuple[Stream]],
 | |
|                        stream: Stream) -> None:
 | |
|     items_for_remote_cache[get_stream_cache_key(stream.name, stream.realm_id)] = (stream,)
 | |
| 
 | |
| def client_cache_items(items_for_remote_cache: Dict[str, Tuple[Client]],
 | |
|                        client: Client) -> None:
 | |
|     items_for_remote_cache[get_client_cache_key(client.name)] = (client,)
 | |
| 
 | |
| def huddle_cache_items(items_for_remote_cache: Dict[str, Tuple[Huddle]],
 | |
|                        huddle: Huddle) -> None:
 | |
|     items_for_remote_cache[huddle_hash_cache_key(huddle.huddle_hash)] = (huddle,)
 | |
| 
 | |
| def session_cache_items(items_for_remote_cache: Dict[str, str],
 | |
|                         session: Session) -> None:
 | |
|     if settings.SESSION_ENGINE != "django.contrib.sessions.backends.cached_db":
 | |
|         # If we're not using the cached_db session engine, we there
 | |
|         # will be no store.cache_key attribute, and in any case we
 | |
|         # don't need to fill the cache, since it won't exist.
 | |
|         return
 | |
|     store = session_engine.SessionStore(session_key=session.session_key)
 | |
|     items_for_remote_cache[store.cache_key] = store.decode(session.session_data)
 | |
| 
 | |
| def get_active_realm_ids() -> List[int]:
 | |
|     """For installations like Zulip Cloud hosting a lot of realms, it only makes
 | |
|     sense to do cache-filling work for realms that have any currently
 | |
|     active users/clients.  Otherwise, we end up with every single-user
 | |
|     trial organization that has ever been created costing us N streams
 | |
|     worth of cache work (where N is the number of default streams for
 | |
|     a new organization).
 | |
|     """
 | |
|     date = timezone_now() - datetime.timedelta(days=2)
 | |
|     return RealmCount.objects.filter(
 | |
|         end_time__gte=date,
 | |
|         property="1day_actives::day",
 | |
|         value__gt=0).distinct("realm_id").values_list("realm_id", flat=True)
 | |
| 
 | |
| def get_streams() -> List[Stream]:
 | |
|     return Stream.objects.select_related().filter(
 | |
|         realm__in=get_active_realm_ids()).exclude(
 | |
|             # We filter out Zephyr realms, because they can easily
 | |
|             # have 10,000s of streams with only 1 subscriber.
 | |
|             is_in_zephyr_realm=True)
 | |
| 
 | |
| def get_users() -> List[UserProfile]:
 | |
|     return UserProfile.objects.select_related().filter(
 | |
|         long_term_idle=False,
 | |
|         realm__in=get_active_realm_ids())
 | |
| 
 | |
| # Format is (objects query, items filler function, timeout, batch size)
 | |
| #
 | |
| # The objects queries are put inside lambdas to prevent Django from
 | |
| # doing any setup for things we're unlikely to use (without the lambda
 | |
| # wrapper the below adds an extra 3ms or so to startup time for
 | |
| # anything importing this file).
 | |
| cache_fillers: Dict[str, Tuple[Callable[[], List[Any]], Callable[[Dict[str, Any], Any], None], int, int]] = {
 | |
|     'user': (get_users, user_cache_items, 3600*24*7, 10000),
 | |
|     'client': (lambda: Client.objects.select_related().all(), client_cache_items, 3600*24*7, 10000),
 | |
|     'stream': (get_streams, stream_cache_items, 3600*24*7, 10000),
 | |
|     # Message cache fetching disabled until we can fix the fact that it
 | |
|     # does a bunch of inefficient memcached queries as part of filling
 | |
|     # the display_recipient cache
 | |
|     #    'message': (message_fetch_objects, message_cache_items, 3600 * 24, 1000),
 | |
|     'huddle': (lambda: Huddle.objects.select_related().all(), huddle_cache_items, 3600*24*7, 10000),
 | |
|     'session': (lambda: Session.objects.all(), session_cache_items, 3600*24*7, 10000),
 | |
| }
 | |
| 
 | |
| def fill_remote_cache(cache: str) -> None:
 | |
|     remote_cache_time_start = get_remote_cache_time()
 | |
|     remote_cache_requests_start = get_remote_cache_requests()
 | |
|     items_for_remote_cache: Dict[str, Any] = {}
 | |
|     (objects, items_filler, timeout, batch_size) = cache_fillers[cache]
 | |
|     count = 0
 | |
|     for obj in objects():
 | |
|         items_filler(items_for_remote_cache, obj)
 | |
|         count += 1
 | |
|         if (count % batch_size == 0):
 | |
|             cache_set_many(items_for_remote_cache, timeout=3600*24)
 | |
|             items_for_remote_cache = {}
 | |
|     cache_set_many(items_for_remote_cache, timeout=3600*24*7)
 | |
|     logging.info("Successfully populated %s cache!  Consumed %s remote cache queries (%s time)",
 | |
|                  cache, get_remote_cache_requests() - remote_cache_requests_start,
 | |
|                  round(get_remote_cache_time() - remote_cache_time_start, 2))
 |