tests: Add cache_tries_captured helper.

This commit is contained in:
Steve Howell
2020-10-15 12:59:13 +00:00
committed by Tim Abbott
parent ce70d08cbf
commit f86823f82f
2 changed files with 27 additions and 6 deletions

View File

@@ -124,6 +124,24 @@ def capture_event(event_info: EventInfo) -> Iterator[None]:
event_info.populate(m.call_args_list)
@contextmanager
def cache_tries_captured() -> Iterator[List[Tuple[str, Union[str, List[str]], Optional[str]]]]:
cache_queries: List[Tuple[str, Union[str, List[str]], Optional[str]]] = []
orig_get = cache.cache_get
orig_get_many = cache.cache_get_many
def my_cache_get(key: str, cache_name: Optional[str]=None) -> Optional[Dict[str, Any]]:
cache_queries.append(('get', key, cache_name))
return orig_get(key, cache_name)
def my_cache_get_many(keys: List[str], cache_name: Optional[str]=None) -> Dict[str, Any]: # nocoverage -- simulated code doesn't use this
cache_queries.append(('getmany', keys, cache_name))
return orig_get_many(keys, cache_name)
with mock.patch.multiple(cache, cache_get=my_cache_get, cache_get_many=my_cache_get_many):
yield cache_queries
@contextmanager
def simulated_empty_cache() -> Iterator[List[Tuple[str, Union[str, List[str]], Optional[str]]]]:
cache_queries: List[Tuple[str, Union[str, List[str]], Optional[str]]] = []

View File

@@ -63,6 +63,7 @@ from zerver.lib.streams import (
)
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import (
cache_tries_captured,
get_subscription,
most_recent_usermessage,
queries_captured,
@@ -3289,16 +3290,18 @@ class SubscriptionAPITest(ZulipTestCase):
test_user_ids = [user.id for user in test_users]
with queries_captured() as queries:
with mock.patch('zerver.views.streams.send_messages_for_new_subscribers'):
self.common_subscribe_to_streams(
desdemona,
streams,
dict(principals=orjson.dumps(test_user_ids).decode()),
)
with cache_tries_captured() as cache_tries:
with mock.patch('zerver.views.streams.send_messages_for_new_subscribers'):
self.common_subscribe_to_streams(
desdemona,
streams,
dict(principals=orjson.dumps(test_user_ids).decode()),
)
# The only known O(N) behavior here is that we call
# principal_to_user_profile for each of our users.
self.assert_length(queries, 19)
self.assert_length(cache_tries, 28)
def test_subscriptions_add_for_principal(self) -> None:
"""