mypy: Convert zerver/lib to use typing.Text.

This commit is contained in:
Robert Hönig
2016-12-21 12:17:53 +00:00
committed by showell
parent 654e8de5ba
commit 0917493588
37 changed files with 290 additions and 307 deletions

View File

@@ -5,12 +5,11 @@ from zerver.models import UserProfile, Realm
from zerver.lib.cache import cache_with_key, realm_alert_words_cache_key
import ujson
import six
from six import text_type
from typing import Dict, Iterable, List
from typing import Dict, Iterable, List, Text
@cache_with_key(realm_alert_words_cache_key, timeout=3600*24)
def alert_words_in_realm(realm):
# type: (Realm) -> Dict[int, List[text_type]]
# type: (Realm) -> Dict[int, List[Text]]
users_query = UserProfile.objects.filter(realm=realm, is_active=True)
alert_word_data = users_query.filter(~Q(alert_words=ujson.dumps([]))).values('id', 'alert_words')
all_user_words = dict((elt['id'], ujson.loads(elt['alert_words'])) for elt in alert_word_data)
@@ -18,11 +17,11 @@ def alert_words_in_realm(realm):
return user_ids_with_words
def user_alert_words(user_profile):
# type: (UserProfile) -> List[text_type]
# type: (UserProfile) -> List[Text]
return ujson.loads(user_profile.alert_words)
def add_user_alert_words(user_profile, alert_words):
# type: (UserProfile, Iterable[text_type]) -> List[text_type]
# type: (UserProfile, Iterable[Text]) -> List[Text]
words = user_alert_words(user_profile)
new_words = [w for w in alert_words if w not in words]
@@ -33,7 +32,7 @@ def add_user_alert_words(user_profile, alert_words):
return words
def remove_user_alert_words(user_profile, alert_words):
# type: (UserProfile, Iterable[text_type]) -> List[text_type]
# type: (UserProfile, Iterable[Text]) -> List[Text]
words = user_alert_words(user_profile)
words = [w for w in words if w not in alert_words]
@@ -42,6 +41,6 @@ def remove_user_alert_words(user_profile, alert_words):
return words
def set_user_alert_words(user_profile, alert_words):
# type: (UserProfile, List[text_type]) -> None
# type: (UserProfile, List[Text]) -> None
user_profile.alert_words = ujson.dumps(alert_words)
user_profile.save(update_fields=['alert_words'])

View File

@@ -4,20 +4,20 @@ from django.conf import settings
if False:
from zerver.models import UserProfile
from six import text_type
from typing import Text
from zerver.lib.avatar_hash import gravatar_hash, user_avatar_hash
from zerver.lib.upload import upload_backend, MEDIUM_AVATAR_SIZE
def avatar_url(user_profile, medium=False):
# type: (UserProfile, bool) -> text_type
# type: (UserProfile, bool) -> Text
return get_avatar_url(
user_profile.avatar_source,
user_profile.email,
medium=medium)
def get_avatar_url(avatar_source, email, medium=False):
# type: (text_type, text_type, bool) -> text_type
# type: (Text, Text, bool) -> Text
if avatar_source == u'U':
hash_key = user_avatar_hash(email)
return upload_backend.get_avatar_url(hash_key, medium=medium)

View File

@@ -1,14 +1,14 @@
from __future__ import absolute_import
from django.conf import settings
from six import text_type
from typing import Text
from zerver.lib.utils import make_safe_digest
import hashlib
def gravatar_hash(email):
# type: (text_type) -> text_type
# type: (Text) -> Text
"""Compute the Gravatar hash for an email address."""
# Non-ASCII characters aren't permitted by the currently active e-mail
# RFCs. However, the IETF has published https://tools.ietf.org/html/rfc4952,
@@ -18,7 +18,7 @@ def gravatar_hash(email):
return make_safe_digest(email.lower(), hashlib.md5)
def user_avatar_hash(email):
# type: (text_type) -> text_type
# type: (Text) -> Text
# Salting the user_key may be overkill, but it prevents us from
# basically mimicking Gravatar's hashing scheme, which could lead
# to some abuse scenarios like folks using us as a free Gravatar

View File

@@ -42,7 +42,7 @@ import zerver.lib.mention as mention
from zerver.lib.str_utils import force_text, force_str
import six
from six.moves import range, html_parser
from six import text_type
from typing import Text
if six.PY3:
import html
@@ -57,7 +57,7 @@ _T = TypeVar('_T')
# which means that at runtime Union causes this to blow up.
if False:
# mypy requires the Optional to be inside Union
ElementStringNone = Union[Element, Optional[text_type]]
ElementStringNone = Union[Element, Optional[Text]]
class BugdownRenderingException(Exception):
pass
@@ -128,7 +128,7 @@ def add_a(root, url, link, height="", title=None, desc=None,
def add_embed(root, link, extracted_data):
# type: (Element, text_type, Dict[text_type, Any]) -> None
# type: (Element, Text, Dict[Text, Any]) -> None
container = markdown.util.etree.SubElement(root, "div")
container.set("class", "message_embed")

View File

@@ -1,7 +1,6 @@
from __future__ import absolute_import
from __future__ import unicode_literals
from six import text_type
from typing import Any, Dict, Optional, Text
import ujson

View File

@@ -1,6 +1,5 @@
from __future__ import absolute_import
from typing import Any, Iterable, Mapping, Optional, Set, Tuple
from six import text_type
from typing import Any, Iterable, Mapping, Optional, Set, Tuple, Text
from zerver.lib.initial_password import initial_password
from zerver.models import Realm, Stream, UserProfile, Huddle, \
@@ -8,7 +7,7 @@ from zerver.models import Realm, Stream, UserProfile, Huddle, \
from zerver.lib.create_user import create_user_profile
def bulk_create_realms(realm_list):
# type: (Iterable[text_type]) -> None
# type: (Iterable[Text]) -> None
existing_realms = set(r.domain for r in Realm.objects.select_related().all())
realms_to_create = [] # type: List[Realm]
@@ -19,7 +18,7 @@ def bulk_create_realms(realm_list):
Realm.objects.bulk_create(realms_to_create)
def bulk_create_users(realm, users_raw, bot_type=None, tos_version=None):
# type: (Realm, Set[Tuple[text_type, text_type, text_type, bool]], Optional[int], Optional[text_type]) -> None
# type: (Realm, Set[Tuple[Text, Text, Text, bool]], Optional[int], Optional[Text]) -> None
"""
Creates and saves a UserProfile with the given email.
Has some code based off of UserManage.create_user, but doesn't .save()
@@ -36,7 +35,7 @@ def bulk_create_users(realm, users_raw, bot_type=None, tos_version=None):
profiles_to_create.append(profile)
UserProfile.objects.bulk_create(profiles_to_create)
profiles_by_email = {} # type: Dict[text_type, UserProfile]
profiles_by_email = {} # type: Dict[Text, UserProfile]
profiles_by_id = {} # type: Dict[int, UserProfile]
for profile in UserProfile.objects.select_related().all():
profiles_by_email[profile.email] = profile
@@ -48,7 +47,7 @@ def bulk_create_users(realm, users_raw, bot_type=None, tos_version=None):
type=Recipient.PERSONAL))
Recipient.objects.bulk_create(recipients_to_create)
recipients_by_email = {} # type: Dict[text_type, Recipient]
recipients_by_email = {} # type: Dict[Text, Recipient]
for recipient in Recipient.objects.filter(type=Recipient.PERSONAL):
recipients_by_email[profiles_by_id[recipient.type_id].email] = recipient
@@ -60,7 +59,7 @@ def bulk_create_users(realm, users_raw, bot_type=None, tos_version=None):
Subscription.objects.bulk_create(subscriptions_to_create)
def bulk_create_streams(realm, stream_dict):
# type: (Realm, Dict[text_type, Dict[text_type, Any]]) -> None
# type: (Realm, Dict[Text, Dict[Text, Any]]) -> None
existing_streams = frozenset([name.lower() for name in
Stream.objects.filter(realm=realm)
.values_list('name', flat=True)])
@@ -83,8 +82,8 @@ def bulk_create_streams(realm, stream_dict):
Recipient.objects.bulk_create(recipients_to_create)
def bulk_create_clients(client_list):
# type: (Iterable[text_type]) -> None
existing_clients = set(client.name for client in Client.objects.select_related().all()) # type: Set[text_type]
# type: (Iterable[Text]) -> None
existing_clients = set(client.name for client in Client.objects.select_related().all()) # type: Set[Text]
clients_to_create = [] # type: List[Client]
for name in client_list:
@@ -94,11 +93,11 @@ def bulk_create_clients(client_list):
Client.objects.bulk_create(clients_to_create)
def bulk_create_huddles(users, huddle_user_list):
# type: (Dict[text_type, UserProfile], Iterable[Iterable[text_type]]) -> None
huddles = {} # type: Dict[text_type, Huddle]
# type: (Dict[Text, UserProfile], Iterable[Iterable[Text]]) -> None
huddles = {} # type: Dict[Text, Huddle]
huddles_by_id = {} # type: Dict[int, Huddle]
huddle_set = set() # type: Set[Tuple[text_type, Tuple[int, ...]]]
existing_huddles = set() # type: Set[text_type]
huddle_set = set() # type: Set[Tuple[Text, Tuple[int, ...]]]
existing_huddles = set() # type: Set[Text]
for huddle in Huddle.objects.all():
existing_huddles.add(huddle.huddle_hash)
for huddle_users in huddle_user_list:
@@ -122,7 +121,7 @@ def bulk_create_huddles(users, huddle_user_list):
recipients_to_create.append(Recipient(type_id=huddles[huddle_hash].id, type=Recipient.HUDDLE))
Recipient.objects.bulk_create(recipients_to_create)
huddle_recipients = {} # type: Dict[text_type, Recipient]
huddle_recipients = {} # type: Dict[Text, Recipient]
for recipient in Recipient.objects.filter(type=Recipient.HUDDLE):
huddle_recipients[huddles_by_id[recipient.type_id].huddle_hash] = recipient

View File

@@ -9,7 +9,7 @@ from django.conf import settings
from django.db.models import Q
from django.core.cache.backends.base import BaseCache
from typing import Any, Callable, Iterable, Optional, Union, TypeVar
from typing import Any, Callable, Iterable, Optional, Union, TypeVar, Text
from zerver.lib.utils import statsd, statsd_key, make_safe_digest
import subprocess
@@ -60,7 +60,7 @@ def remote_cache_stats_finish():
remote_cache_total_time += (time.time() - remote_cache_time_start)
def get_or_create_key_prefix():
# type: () -> text_type
# type: () -> Text
if settings.CASPER_TESTS:
# This sets the prefix for the benefit of the Casper tests.
#
@@ -102,10 +102,10 @@ def get_or_create_key_prefix():
return prefix
KEY_PREFIX = get_or_create_key_prefix() # type: text_type
KEY_PREFIX = get_or_create_key_prefix() # type: Text
def bounce_key_prefix_for_testing(test_name):
# type: (text_type) -> None
# type: (Text) -> None
global KEY_PREFIX
KEY_PREFIX = test_name + u':' + text_type(os.getpid()) + u':'
@@ -185,14 +185,14 @@ def cache_with_key(keyfunc, cache_name=None, timeout=None, with_statsd_key=None)
return decorator
def cache_set(key, val, cache_name=None, timeout=None):
# type: (text_type, Any, Optional[str], Optional[int]) -> None
# type: (Text, Any, Optional[str], Optional[int]) -> None
remote_cache_stats_start()
cache_backend = get_cache_backend(cache_name)
cache_backend.set(KEY_PREFIX + key, (val,), timeout=timeout)
remote_cache_stats_finish()
def cache_get(key, cache_name=None):
# type: (text_type, Optional[str]) -> Any
# type: (Text, Optional[str]) -> Any
remote_cache_stats_start()
cache_backend = get_cache_backend(cache_name)
ret = cache_backend.get(KEY_PREFIX + key)
@@ -200,7 +200,7 @@ def cache_get(key, cache_name=None):
return ret
def cache_get_many(keys, cache_name=None):
# type: (List[text_type], Optional[str]) -> Dict[text_type, Any]
# type: (List[Text], Optional[str]) -> Dict[Text, Any]
keys = [KEY_PREFIX + key for key in keys]
remote_cache_stats_start()
ret = get_cache_backend(cache_name).get_many(keys)
@@ -208,7 +208,7 @@ def cache_get_many(keys, cache_name=None):
return dict([(key[len(KEY_PREFIX):], value) for key, value in ret.items()])
def cache_set_many(items, cache_name=None, timeout=None):
# type: (Dict[text_type, Any], Optional[str], Optional[int]) -> None
# type: (Dict[Text, Any], Optional[str], Optional[int]) -> None
new_items = {}
for key in items:
new_items[KEY_PREFIX + key] = items[key]
@@ -218,13 +218,13 @@ def cache_set_many(items, cache_name=None, timeout=None):
remote_cache_stats_finish()
def cache_delete(key, cache_name=None):
# type: (text_type, Optional[str]) -> None
# type: (Text, Optional[str]) -> None
remote_cache_stats_start()
get_cache_backend(cache_name).delete(KEY_PREFIX + key)
remote_cache_stats_finish()
def cache_delete_many(items, cache_name=None):
# type: (Iterable[text_type], Optional[str]) -> None
# type: (Iterable[Text], Optional[str]) -> None
remote_cache_stats_start()
get_cache_backend(cache_name).delete_many(
KEY_PREFIX + item for item in items)
@@ -244,10 +244,10 @@ def cache_delete_many(items, cache_name=None):
# * cache_transformer: Function mapping an object from database =>
# value for cache (in case the values that we're caching are some
# function of the objects, not the objects themselves)
ObjKT = TypeVar('ObjKT', int, text_type)
ObjKT = TypeVar('ObjKT', int, Text)
ItemT = Any # https://github.com/python/mypy/issues/1721
CompressedItemT = Any # https://github.com/python/mypy/issues/1721
def generic_bulk_cached_fetch(cache_key_function, # type: Callable[[ObjKT], text_type]
def generic_bulk_cached_fetch(cache_key_function, # type: Callable[[ObjKT], Text]
query_function, # type: Callable[[List[ObjKT]], Iterable[Any]]
object_ids, # type: Iterable[ObjKT]
extractor=lambda obj: obj, # type: Callable[[CompressedItemT], ItemT]
@@ -256,7 +256,7 @@ def generic_bulk_cached_fetch(cache_key_function, # type: Callable[[ObjKT], text
cache_transformer=lambda obj: obj # type: Callable[[Any], ItemT]
):
# type: (...) -> Dict[ObjKT, Any]
cache_keys = {} # type: Dict[ObjKT, text_type]
cache_keys = {} # type: Dict[ObjKT, Text]
for object_id in object_ids:
cache_keys[object_id] = cache_key_function(object_id)
cached_objects = cache_get_many([cache_keys[object_id]
@@ -267,7 +267,7 @@ def generic_bulk_cached_fetch(cache_key_function, # type: Callable[[ObjKT], text
cache_keys[object_id] not in cached_objects]
db_objects = query_function(needed_ids)
items_for_remote_cache = {} # type: Dict[text_type, Any]
items_for_remote_cache = {} # type: Dict[Text, Any]
for obj in db_objects:
key = cache_keys[id_fetcher(obj)]
item = cache_transformer(obj)
@@ -297,18 +297,18 @@ def cache(func):
return cache_with_key(keyfunc)(func)
def display_recipient_cache_key(recipient_id):
# type: (int) -> text_type
# type: (int) -> Text
return u"display_recipient_dict:%d" % (recipient_id,)
def user_profile_by_email_cache_key(email):
# type: (text_type) -> text_type
# type: (Text) -> Text
# See the comment in zerver/lib/avatar_hash.py:gravatar_hash for why we
# are proactively encoding email addresses even though they will
# with high likelihood be ASCII-only for the foreseeable future.
return u'user_profile_by_email:%s' % (make_safe_digest(email.strip()),)
def user_profile_by_id_cache_key(user_profile_id):
# type: (int) -> text_type
# type: (int) -> Text
return u"user_profile_by_id:%s" % (user_profile_id,)
# TODO: Refactor these cache helpers into another file that can import
@@ -320,7 +320,7 @@ def cache_save_user_profile(user_profile):
active_user_dict_fields = ['id', 'full_name', 'short_name', 'email', 'is_realm_admin', 'is_bot'] # type: List[str]
def active_user_dicts_in_realm_cache_key(realm):
# type: (Realm) -> text_type
# type: (Realm) -> Text
return u"active_user_dicts_in_realm:%s" % (realm.id,)
active_bot_dict_fields = ['id', 'full_name', 'short_name',
@@ -329,11 +329,11 @@ active_bot_dict_fields = ['id', 'full_name', 'short_name',
'default_all_public_streams', 'api_key',
'bot_owner__email', 'avatar_source'] # type: List[str]
def active_bot_dicts_in_realm_cache_key(realm):
# type: (Realm) -> text_type
# type: (Realm) -> Text
return u"active_bot_dicts_in_realm:%s" % (realm.id,)
def get_stream_cache_key(stream_name, realm):
# type: (text_type, Union[Realm, int]) -> text_type
# type: (Text, Union[Realm, int]) -> Text
from zerver.models import Realm
if isinstance(realm, Realm):
realm_id = realm.id
@@ -392,7 +392,7 @@ def flush_realm(sender, **kwargs):
cache_delete(realm_alert_words_cache_key(realm))
def realm_alert_words_cache_key(realm):
# type: (Realm) -> text_type
# type: (Realm) -> Text
return u"realm_alert_words:%s" % (realm.domain,)
# Called by models.py to flush the stream cache whenever we save a stream
@@ -414,11 +414,11 @@ def flush_stream(sender, **kwargs):
# TODO: Rename to_dict_cache_key_id and to_dict_cache_key
def to_dict_cache_key_id(message_id, apply_markdown):
# type: (int, bool) -> text_type
# type: (int, bool) -> Text
return u'message_dict:%d:%d' % (message_id, apply_markdown)
def to_dict_cache_key(message, apply_markdown):
# type: (Message, bool) -> text_type
# type: (Message, bool) -> Text
return to_dict_cache_key_id(message.id, apply_markdown)
def flush_message(sender, **kwargs):

View File

@@ -1,7 +1,7 @@
from __future__ import absolute_import
from six import text_type, binary_type
from typing import Any, Dict, Callable, Tuple
from six import binary_type
from typing import Any, Dict, Callable, Tuple, Text
# This file needs to be different from cache.py because cache.py
# cannot import anything from zerver.models or we'd have an import
@@ -31,33 +31,33 @@ def message_fetch_objects():
id__gt=max_id - MESSAGE_CACHE_SIZE)
def message_cache_items(items_for_remote_cache, message):
# type: (Dict[text_type, Tuple[binary_type]], Message) -> None
# type: (Dict[Text, Tuple[binary_type]], Message) -> None
items_for_remote_cache[to_dict_cache_key_id(message.id, True)] = (message.to_dict_uncached(True),)
def user_cache_items(items_for_remote_cache, user_profile):
# type: (Dict[text_type, Tuple[UserProfile]], UserProfile) -> None
# type: (Dict[Text, Tuple[UserProfile]], UserProfile) -> None
items_for_remote_cache[user_profile_by_email_cache_key(user_profile.email)] = (user_profile,)
items_for_remote_cache[user_profile_by_id_cache_key(user_profile.id)] = (user_profile,)
def stream_cache_items(items_for_remote_cache, stream):
# type: (Dict[text_type, Tuple[Stream]], Stream) -> None
# type: (Dict[Text, Tuple[Stream]], Stream) -> None
items_for_remote_cache[get_stream_cache_key(stream.name, stream.realm_id)] = (stream,)
def client_cache_items(items_for_remote_cache, client):
# type: (Dict[text_type, Tuple[Client]], Client) -> None
# type: (Dict[Text, Tuple[Client]], Client) -> None
items_for_remote_cache[get_client_cache_key(client.name)] = (client,)
def huddle_cache_items(items_for_remote_cache, huddle):
# type: (Dict[text_type, Tuple[Huddle]], Huddle) -> None
# type: (Dict[Text, Tuple[Huddle]], Huddle) -> None
items_for_remote_cache[huddle_hash_cache_key(huddle.huddle_hash)] = (huddle,)
def recipient_cache_items(items_for_remote_cache, recipient):
# type: (Dict[text_type, Tuple[Recipient]], Recipient) -> None
# type: (Dict[Text, Tuple[Recipient]], Recipient) -> None
items_for_remote_cache[get_recipient_cache_key(recipient.type, recipient.type_id)] = (recipient,)
session_engine = import_module(settings.SESSION_ENGINE)
def session_cache_items(items_for_remote_cache, session):
# type: (Dict[text_type, text_type], Session) -> None
# type: (Dict[Text, Text], Session) -> None
store = session_engine.SessionStore(session_key=session.session_key) # type: ignore # import_module
items_for_remote_cache[store.cache_key] = store.decode(session.session_data)
@@ -78,13 +78,13 @@ cache_fillers = {
# 'message': (message_fetch_objects, message_cache_items, 3600 * 24, 1000),
'huddle': (lambda: Huddle.objects.select_related().all(), huddle_cache_items, 3600*24*7, 10000),
'session': (lambda: Session.objects.all(), session_cache_items, 3600*24*7, 10000),
} # type: Dict[str, Tuple[Callable[[], List[Any]], Callable[[Dict[text_type, Any], Any], None], int, int]]
} # type: Dict[str, Tuple[Callable[[], List[Any]], Callable[[Dict[Text, Any], Any], None], int, int]]
def fill_remote_cache(cache):
# type: (str) -> None
remote_cache_time_start = get_remote_cache_time()
remote_cache_requests_start = get_remote_cache_requests()
items_for_remote_cache = {} # type: Dict[text_type, Any]
items_for_remote_cache = {} # type: Dict[Text, Any]
(objects, items_filler, timeout, batch_size) = cache_fillers[cache]
count = 0
for obj in objects():

View File

@@ -3,12 +3,12 @@ import codecs
import hashlib
import hmac
from six import text_type
from typing import Text
# Encodes the provided URL using the same algorithm used by the camo
# caching https image proxy
def get_camo_url(url):
# type: (text_type) -> text_type
# type: (Text) -> Text
# Only encode the url if Camo is enabled
if settings.CAMO_URI == '':
return url

View File

@@ -9,11 +9,10 @@ import os
import string
from six.moves import range
from six import text_type
from typing import Optional
from typing import Optional, Text
def random_api_key():
# type: () -> text_type
# type: () -> Text
choices = string.ascii_letters + string.digits
altchars = ''.join([choices[ord(os.urandom(1)) % 62] for _ in range(2)]).encode("utf-8")
return base64.b64encode(os.urandom(24), altchars=altchars).decode("utf-8")
@@ -27,7 +26,7 @@ def random_api_key():
# Recipient objects
def create_user_profile(realm, email, password, active, bot_type, full_name,
short_name, bot_owner, is_mirror_dummy, tos_version):
# type: (Realm, text_type, text_type, bool, Optional[int], text_type, text_type, Optional[UserProfile], bool, Optional[text_type]) -> UserProfile
# type: (Realm, Text, Text, bool, Optional[int], Text, Text, Optional[UserProfile], bool, Optional[Text]) -> UserProfile
now = timezone.now()
email = UserManager.normalize_email(email)
@@ -57,7 +56,7 @@ def create_user(email, password, realm, full_name, short_name,
is_mirror_dummy=False, default_sending_stream=None,
default_events_register_stream=None,
default_all_public_streams=None, user_profile_id=None):
# type: (text_type, text_type, Realm, text_type, text_type, bool, Optional[int], Optional[UserProfile], Optional[text_type], text_type, bool, Optional[Stream], Optional[Stream], Optional[bool], Optional[int]) -> UserProfile
# type: (Text, Text, Realm, Text, Text, bool, Optional[int], Optional[UserProfile], Optional[Text], Text, bool, Optional[Stream], Optional[Stream], Optional[bool], Optional[int]) -> UserProfile
user_profile = create_user_profile(realm, email, password, active, bot_type,
full_name, short_name, bot_owner,
is_mirror_dummy, tos_version)

View File

@@ -4,12 +4,11 @@ import time
from psycopg2.extensions import cursor, connection
from typing import Callable, Optional, Iterable, Any, Dict, Union, TypeVar, \
Mapping, Sequence
from six import text_type
Mapping, Sequence, Text
from zerver.lib.str_utils import NonBinaryStr
CursorObj = TypeVar('CursorObj', bound=cursor)
ParamsT = Union[Iterable[Any], Mapping[text_type, Any]]
ParamsT = Union[Iterable[Any], Mapping[Text, Any]]
# Similar to the tracking done in Django's CursorDebugWrapper, but done at the
# psycopg2 cursor level so it works with SQLAlchemy.
@@ -40,7 +39,7 @@ class TimeTrackingConnection(connection):
"""A psycopg2 connection class that uses TimeTrackingCursors."""
def __init__(self, *args, **kwargs):
# type: (Sequence[Any], Mapping[text_type, Any]) -> None
# type: (Sequence[Any], Mapping[Text, Any]) -> None
self.queries = [] # type: List[Dict[str, str]]
super(TimeTrackingConnection, self).__init__(*args, **kwargs)

View File

@@ -1,10 +1,9 @@
from __future__ import absolute_import
from typing import Any, Callable, Iterable, Tuple
from typing import Any, Callable, Iterable, Tuple, Text
from collections import defaultdict
import datetime
import six
from six import text_type
from django.db.models import Q, QuerySet
from django.template import loader
@@ -45,8 +44,8 @@ def gather_hot_conversations(user_profile, stream_messages):
# Returns a list of dictionaries containing the templating
# information for each hot conversation.
conversation_length = defaultdict(int) # type: Dict[Tuple[int, text_type], int]
conversation_diversity = defaultdict(set) # type: Dict[Tuple[int, text_type], Set[text_type]]
conversation_length = defaultdict(int) # type: Dict[Tuple[int, Text], int]
conversation_diversity = defaultdict(set) # type: Dict[Tuple[int, Text], Set[Text]]
for user_message in stream_messages:
if not user_message.message.sent_by_human():
# Don't include automated messages in the count.
@@ -101,7 +100,7 @@ def gather_hot_conversations(user_profile, stream_messages):
return hot_conversation_render_payloads
def gather_new_users(user_profile, threshold):
# type: (UserProfile, datetime.datetime) -> Tuple[int, List[text_type]]
# type: (UserProfile, datetime.datetime) -> Tuple[int, List[Text]]
# Gather information on users in the realm who have recently
# joined.
if user_profile.realm.is_zephyr_mirror_realm:
@@ -115,7 +114,7 @@ def gather_new_users(user_profile, threshold):
return len(user_names), user_names
def gather_new_streams(user_profile, threshold):
# type: (UserProfile, datetime.datetime) -> Tuple[int, Dict[str, List[text_type]]]
# type: (UserProfile, datetime.datetime) -> Tuple[int, Dict[str, List[Text]]]
if user_profile.realm.is_zephyr_mirror_realm:
new_streams = [] # type: List[Stream]
else:
@@ -136,7 +135,7 @@ def gather_new_streams(user_profile, threshold):
return len(new_streams), {"html": streams_html, "plain": streams_plain}
def enough_traffic(unread_pms, hot_conversations, new_streams, new_users):
# type: (text_type, text_type, int, int) -> bool
# type: (Text, Text, int, int) -> bool
if unread_pms or hot_conversations:
# If you have any unread traffic, good enough.
return True
@@ -147,7 +146,7 @@ def enough_traffic(unread_pms, hot_conversations, new_streams, new_users):
return False
def send_digest_email(user_profile, html_content, text_content):
# type: (UserProfile, text_type, text_type) -> None
# type: (UserProfile, Text, Text) -> None
recipients = [{'email': user_profile.email, 'name': user_profile.full_name}]
subject = "While you've been gone - Zulip"
sender = {'email': settings.NOREPLY_EMAIL_ADDRESS, 'name': 'Zulip'}

View File

@@ -1,5 +1,5 @@
from __future__ import absolute_import
from typing import Any, Optional
from typing import Any, Optional, Text
import logging
import re
@@ -19,7 +19,7 @@ from zerver.lib.str_utils import force_text
from zerver.models import Stream, Recipient, get_user_profile_by_email, \
get_user_profile_by_id, get_display_recipient, get_recipient, \
Message, Realm, UserProfile
from six import text_type, binary_type
from six import binary_type
import six
import talon
from talon import quotations
@@ -29,7 +29,7 @@ talon.init()
logger = logging.getLogger(__name__)
def redact_stream(error_message):
# type: (text_type) -> text_type
# type: (Text) -> Text
domain = settings.EMAIL_GATEWAY_PATTERN.rsplit('@')[-1]
stream_match = re.search(u'\\b(.*?)@' + domain, error_message)
if stream_match:
@@ -38,7 +38,7 @@ def redact_stream(error_message):
return error_message
def report_to_zulip(error_message):
# type: (text_type) -> None
# type: (Text) -> None
if settings.ERROR_BOT is None:
return
error_bot = get_user_profile_by_email(settings.ERROR_BOT)
@@ -47,7 +47,7 @@ def report_to_zulip(error_message):
u"""~~~\n%s\n~~~""" % (error_message,))
def log_and_report(email_message, error_message, debug_info):
# type: (message.Message, text_type, Dict[str, Any]) -> None
# type: (message.Message, Text, Dict[str, Any]) -> None
scrubbed_error = u"Sender: %s\n%s" % (email_message.get("From"),
redact_stream(error_message))
@@ -69,17 +69,17 @@ redis_client = get_redis_client()
def missed_message_redis_key(token):
# type: (text_type) -> text_type
# type: (Text) -> Text
return 'missed_message:' + token
def is_missed_message_address(address):
# type: (text_type) -> bool
# type: (Text) -> bool
msg_string = get_email_gateway_message_string_from_address(address)
return is_mm_32_format(msg_string)
def is_mm_32_format(msg_string):
# type: (text_type) -> bool
# type: (Text) -> bool
'''
Missed message strings are formatted with a little "mm" prefix
followed by a randomly generated 32-character string.
@@ -87,7 +87,7 @@ def is_mm_32_format(msg_string):
return msg_string.startswith('mm') and len(msg_string) == 34
def get_missed_message_token_from_address(address):
# type: (text_type) -> text_type
# type: (Text) -> Text
msg_string = get_email_gateway_message_string_from_address(address)
if msg_string is None:
@@ -100,7 +100,7 @@ def get_missed_message_token_from_address(address):
return msg_string[2:]
def create_missed_message_address(user_profile, message):
# type: (UserProfile, Message) -> text_type
# type: (UserProfile, Message) -> Text
if settings.EMAIL_GATEWAY_PATTERN == '':
logging.warning("EMAIL_GATEWAY_PATTERN is an empty string, using "
"NOREPLY_EMAIL_ADDRESS in the 'from' field.")
@@ -134,7 +134,7 @@ def create_missed_message_address(user_profile, message):
def mark_missed_message_address_as_used(address):
# type: (text_type) -> None
# type: (Text) -> None
token = get_missed_message_token_from_address(address)
key = missed_message_redis_key(token)
with redis_client.pipeline() as pipeline:
@@ -147,7 +147,7 @@ def mark_missed_message_address_as_used(address):
def send_to_missed_message_address(address, message):
# type: (text_type, message.Message) -> None
# type: (Text, message.Message) -> None
token = get_missed_message_token_from_address(address)
key = missed_message_redis_key(token)
result = redis_client.hmget(key, 'user_profile_id', 'recipient_id', 'subject')
@@ -187,7 +187,7 @@ class ZulipEmailForwardError(Exception):
pass
def send_zulip(sender, stream, topic, content):
# type: (text_type, Stream, text_type, text_type) -> None
# type: (Text, Stream, Text, Text) -> None
internal_send_message(
sender,
"stream",
@@ -197,7 +197,7 @@ def send_zulip(sender, stream, topic, content):
stream.realm)
def valid_stream(stream_name, token):
# type: (text_type, text_type) -> bool
# type: (Text, Text) -> bool
try:
stream = Stream.objects.get(email_token=token)
return stream.name.lower() == stream_name.lower()
@@ -205,7 +205,7 @@ def valid_stream(stream_name, token):
return False
def get_message_part_by_type(message, content_type):
# type: (message.Message, text_type) -> text_type
# type: (message.Message, Text) -> Text
charsets = message.get_charsets()
for idx, part in enumerate(message.walk()):
@@ -217,7 +217,7 @@ def get_message_part_by_type(message, content_type):
return text
def extract_body(message):
# type: (message.Message) -> text_type
# type: (message.Message) -> Text
# If the message contains a plaintext version of the body, use
# that.
plaintext_content = get_message_part_by_type(message, "text/plain")
@@ -233,7 +233,7 @@ def extract_body(message):
raise ZulipEmailForwardError("Unable to find plaintext or HTML message body")
def filter_footer(text):
# type: (text_type) -> text_type
# type: (Text) -> Text
# Try to filter out obvious footers.
possible_footers = [line for line in text.split("\n") if line.strip().startswith("--")]
if len(possible_footers) != 1:
@@ -244,7 +244,7 @@ def filter_footer(text):
return text.partition("--")[0].strip()
def extract_and_upload_attachments(message, realm):
# type: (message.Message, Realm) -> text_type
# type: (message.Message, Realm) -> Text
user_profile = get_user_profile_by_email(settings.EMAIL_GATEWAY_BOT)
attachment_links = []
@@ -272,7 +272,7 @@ def extract_and_upload_attachments(message, realm):
return u"\n".join(attachment_links)
def extract_and_validate(email):
# type: (text_type) -> Stream
# type: (Text) -> Stream
try:
stream_name, token = decode_email_address(email)
except (TypeError, ValueError):
@@ -284,12 +284,12 @@ def extract_and_validate(email):
return Stream.objects.get(email_token=token)
def find_emailgateway_recipient(message):
# type: (message.Message) -> text_type
# type: (message.Message) -> Text
# We can't use Delivered-To; if there is a X-Gm-Original-To
# it is more accurate, so try to find the most-accurate
# recipient list in descending priority order
recipient_headers = ["X-Gm-Original-To", "Delivered-To", "To"]
recipients = [] # type: List[text_type]
recipients = [] # type: List[Text]
for recipient_header in recipient_headers:
r = message.get_all(recipient_header, None)
if r:
@@ -305,7 +305,7 @@ def find_emailgateway_recipient(message):
raise ZulipEmailForwardError("Missing recipient in mirror email")
def process_stream_message(to, subject, message, debug_info):
# type: (text_type, text_type, message.Message, Dict[str, Any]) -> None
# type: (Text, Text, message.Message, Dict[str, Any]) -> None
stream = extract_and_validate(to)
body = filter_footer(extract_body(message))
body += extract_and_upload_attachments(message, stream.realm)
@@ -315,13 +315,13 @@ def process_stream_message(to, subject, message, debug_info):
stream.name, stream.realm.domain))
def process_missed_message(to, message, pre_checked):
# type: (text_type, message.Message, bool) -> None
# type: (Text, message.Message, bool) -> None
if not pre_checked:
mark_missed_message_address_as_used(to)
send_to_missed_message_address(to, message)
def process_message(message, rcpt_to=None, pre_checked=False):
# type: (message.Message, Optional[text_type], bool) -> None
# type: (message.Message, Optional[Text], bool) -> None
subject_header = message.get("Subject", "(no subject)")
encoded_subject, encoding = decode_header(subject_header)[0]
if encoding is None:

View File

@@ -1,7 +1,6 @@
from __future__ import absolute_import
from six import text_type
from typing import Callable, Tuple
from typing import Callable, Tuple, Text
from django.conf import settings
@@ -12,26 +11,26 @@ import logging
# TODO: handle changes in link hrefs
def highlight_with_class(klass, text):
# type: (text_type, text_type) -> text_type
# type: (Text, Text) -> Text
return '<span class="%s">%s</span>' % (klass, text)
def highlight_inserted(text):
# type: (text_type) -> text_type
# type: (Text) -> Text
return highlight_with_class('highlight_text_inserted', text)
def highlight_deleted(text):
# type: (text_type) -> text_type
# type: (Text) -> Text
return highlight_with_class('highlight_text_deleted', text)
def highlight_replaced(text):
# type: (text_type) -> text_type
# type: (Text) -> Text
return highlight_with_class('highlight_text_replaced', text)
def chunkize(text, in_tag):
# type: (text_type, bool) -> Tuple[List[Tuple[text_type, text_type]], bool]
# type: (Text, bool) -> Tuple[List[Tuple[Text, Text]], bool]
start = 0
idx = 0
chunks = [] # type: List[Tuple[text_type, text_type]]
chunks = [] # type: List[Tuple[Text, Text]]
for c in text:
if c == '<':
in_tag = True
@@ -50,7 +49,7 @@ def chunkize(text, in_tag):
return chunks, in_tag
def highlight_chunks(chunks, highlight_func):
# type: (List[Tuple[text_type, text_type]], Callable[[text_type], text_type]) -> text_type
# type: (List[Tuple[Text, Text]], Callable[[Text], Text]) -> Text
retval = u''
for type, text in chunks:
if type == 'text':
@@ -60,7 +59,7 @@ def highlight_chunks(chunks, highlight_func):
return retval
def verify_html(html):
# type: (text_type) -> bool
# type: (Text) -> bool
# TODO: Actually parse the resulting HTML to ensure we don't
# create mal-formed markup. This is unfortunately hard because
# we both want pretty strict parsing and we want to parse html5
@@ -80,7 +79,7 @@ def verify_html(html):
return True
def highlight_html_differences(s1, s2):
# type: (text_type, text_type) -> text_type
# type: (Text, Text) -> Text
differ = diff_match_patch()
ops = differ.diff_main(s1, s2)
differ.diff_cleanupSemantic(ops)

View File

@@ -6,15 +6,14 @@ from django.conf import settings
from django.utils import translation
from django.utils.translation import ugettext as _
from six import text_type
from six.moves import urllib, zip_longest, zip, range
from typing import Any, List, Dict, Optional
from typing import Any, List, Dict, Optional, Text
import os
import ujson
def with_language(string, language):
# type: (text_type, text_type) -> text_type
# type: (Text, Text) -> Text
old_language = translation.get_language()
translation.activate(language)
result = _(string)
@@ -35,7 +34,7 @@ def get_language_list():
return sorted(lang_list, key=lambda i: i['name'])
def get_language_list_for_templates(default_language):
# type: (text_type) -> List[Dict[str, Dict[str, str]]]
# type: (Text) -> List[Dict[str, Dict[str, str]]]
language_list = [l for l in get_language_list()
if 'percent_translated' not in l or
l['percent_translated'] >= 5.]
@@ -69,13 +68,13 @@ def get_language_list_for_templates(default_language):
return formatted_list
def get_language_name(code):
# type: (str) -> Optional[text_type]
# type: (str) -> Optional[Text]
for lang in get_language_list():
if lang['code'] == code:
return lang['name']
def get_available_language_codes():
# type: () -> List[text_type]
# type: () -> List[Text]
language_list = get_language_list()
codes = [language['code'] for language in language_list]
return codes

View File

@@ -5,12 +5,11 @@ from django.conf import settings
import hashlib
import base64
from typing import Optional
from six import text_type
from typing import Optional, Text
def initial_password(email):
# type: (text_type) -> Optional[text_type]
# type: (Text) -> Optional[Text]
"""Given an email address, returns the initial password for that account, as
created by populate_db."""

View File

@@ -1,6 +1,6 @@
from __future__ import absolute_import
from six import text_type
from typing import Text
# Match multi-word string between @** ** or match any one-word
# sequences after @
find_mentions = r'(?<![^\s\'\"\(,:<])@(?:\*\*([^\*]+)\*\*|(\w+))'
@@ -8,5 +8,5 @@ find_mentions = r'(?<![^\s\'\"\(,:<])@(?:\*\*([^\*]+)\*\*|(\w+))'
wildcards = ['all', 'everyone']
def user_mention_matches_wildcard(mention):
# type: (text_type) -> bool
# type: (Text) -> bool
return mention in wildcards

View File

@@ -7,6 +7,8 @@ import zlib
from django.utils.translation import ugettext as _
from six import binary_type, text_type
from typing import Text
from zerver.lib.avatar import get_avatar_url
from zerver.lib.avatar_hash import gravatar_hash
import zerver.lib.bugdown as bugdown
@@ -25,9 +27,9 @@ from zerver.models import (
Reaction
)
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Text
RealmAlertWords = Dict[int, List[text_type]]
RealmAlertWords = Dict[int, List[Text]]
def extract_message_dict(message_bytes):
# type: (binary_type) -> Dict[str, Any]
@@ -139,7 +141,7 @@ class MessageDict(object):
recipient_type_id,
reactions
):
# type: (bool, Message, int, datetime.datetime, text_type, text_type, text_type, datetime.datetime, text_type, Optional[int], int, text_type, text_type, text_type, text_type, text_type, bool, text_type, int, int, int, List[Dict[str, Any]]) -> Dict[str, Any]
# type: (bool, Message, int, datetime.datetime, Text, Text, Text, datetime.datetime, Text, Optional[int], int, Text, Text, Text, Text, Text, bool, Text, int, int, int, List[Dict[str, Any]]) -> Dict[str, Any]
avatar_url = get_avatar_url(sender_avatar_source, sender_email)
@@ -292,7 +294,7 @@ def access_message(user_profile, message_id):
return (message, user_message)
def render_markdown(message, content, domain=None, realm_alert_words=None, message_users=None):
# type: (Message, text_type, Optional[text_type], Optional[RealmAlertWords], Set[UserProfile]) -> text_type
# type: (Message, Text, Optional[Text], Optional[RealmAlertWords], Set[UserProfile]) -> Text
"""Return HTML for given markdown. Bugdown may add properties to the
message object such as `mentions_user_ids` and `mentions_wildcard`.
These are only on this Django object and are not saved in the
@@ -317,7 +319,7 @@ def render_markdown(message, content, domain=None, realm_alert_words=None, messa
# delivered via zephyr_mirror
domain = u"zephyr_mirror"
possible_words = set() # type: Set[text_type]
possible_words = set() # type: Set[Text]
if realm_alert_words is not None:
for user_id, words in realm_alert_words.items():
if user_id in message_user_ids:

View File

@@ -1,7 +1,7 @@
from six import text_type
from typing import Text
def is_reserved_subdomain(subdomain):
# type: (text_type) -> bool
# type: (Text) -> bool
if subdomain in ZULIP_RESERVED_SUBDOMAINS:
return True
if subdomain[-1] == 's' and subdomain[:-1] in ZULIP_RESERVED_SUBDOMAINS:
@@ -13,7 +13,7 @@ def is_reserved_subdomain(subdomain):
return False
def is_disposable_domain(domain):
# type: (text_type) -> bool
# type: (Text) -> bool
return domain.lower() in DISPOSABLE_DOMAINS
ZULIP_RESERVED_SUBDOMAINS = frozenset([

View File

@@ -1,19 +1,18 @@
from zerver.lib.request import JsonableError
from django.utils.translation import ugettext as _
from typing import Any, Callable, Iterable, Mapping, Sequence
from six import text_type
from typing import Any, Callable, Iterable, Mapping, Sequence, Text
def check_supported_events_narrow_filter(narrow):
# type: (Iterable[Sequence[text_type]]) -> None
# type: (Iterable[Sequence[Text]]) -> None
for element in narrow:
operator = element[0]
if operator not in ["stream", "topic", "sender", "is"]:
raise JsonableError(_("Operator %s not supported.") % (operator,))
def build_narrow_filter(narrow):
# type: (Iterable[Sequence[text_type]]) -> Callable[[Mapping[str, Any]], bool]
# type: (Iterable[Sequence[Text]]) -> Callable[[Mapping[str, Any]], bool]
"""Changes to this function should come with corresponding changes to
BuildNarrowFilterTest."""
check_supported_events_narrow_filter(narrow)

View File

@@ -1,7 +1,7 @@
from __future__ import print_function
from six import text_type
from typing import cast, Any, Iterable, Mapping, Optional, Sequence, Tuple
from typing import cast, Any, Iterable, Mapping, Optional, Sequence, Tuple, Text
import mandrill
from confirmation.models import Confirmation
@@ -33,13 +33,13 @@ from six.moves import urllib
from collections import defaultdict
def unsubscribe_token(user_profile):
# type: (UserProfile) -> text_type
# type: (UserProfile) -> Text
# Leverage the Django confirmations framework to generate and track unique
# unsubscription tokens.
return Confirmation.objects.get_link_for_object(user_profile).split("/")[-1]
def one_click_unsubscribe_link(user_profile, endpoint):
# type: (UserProfile, text_type) -> text_type
# type: (UserProfile, Text) -> Text
"""
Generate a unique link that a logged-out user can visit to unsubscribe from
Zulip e-mails without having to first log in.
@@ -49,7 +49,7 @@ def one_click_unsubscribe_link(user_profile, endpoint):
return "%s/%s" % (user_profile.realm.uri.rstrip("/"), resource_path)
def hashchange_encode(string):
# type: (text_type) -> text_type
# type: (Text) -> Text
# Do the same encoding operation as hashchange.encodeHashComponent on the
# frontend.
# `safe` has a default value of "/", but we want those encoded, too.
@@ -57,18 +57,18 @@ def hashchange_encode(string):
string.encode("utf-8"), safe=b"").replace(".", "%2E").replace("%", ".")
def pm_narrow_url(realm, participants):
# type: (Realm, List[text_type]) -> text_type
# type: (Realm, List[Text]) -> Text
participants.sort()
base_url = u"%s/#narrow/pm-with/" % (realm.uri,)
return base_url + hashchange_encode(",".join(participants))
def stream_narrow_url(realm, stream):
# type: (Realm, text_type) -> text_type
# type: (Realm, Text) -> Text
base_url = u"%s/#narrow/stream/" % (realm.uri,)
return base_url + hashchange_encode(stream)
def topic_narrow_url(realm, stream, topic):
# type: (Realm, text_type, text_type) -> text_type
# type: (Realm, Text, Text) -> Text
base_url = u"%s/#narrow/stream/" % (realm.uri,)
return u"%s%s/topic/%s" % (base_url, hashchange_encode(stream),
hashchange_encode(topic))
@@ -83,14 +83,14 @@ def build_message_list(user_profile, messages):
messages_to_render = [] # type: List[Dict[str, Any]]
def sender_string(message):
# type: (Message) -> text_type
# type: (Message) -> Text
if message.recipient.type in (Recipient.STREAM, Recipient.HUDDLE):
return message.sender.full_name
else:
return ''
def relative_to_full_url(content):
# type: (text_type) -> text_type
# type: (Text) -> Text
# URLs for uploaded content are of the form
# "/user_uploads/abc.png". Make them full paths.
#
@@ -116,18 +116,18 @@ def build_message_list(user_profile, messages):
return content
def fix_plaintext_image_urls(content):
# type: (text_type) -> text_type
# type: (Text) -> Text
# Replace image URLs in plaintext content of the form
# [image name](image url)
# with a simple hyperlink.
return re.sub(r"\[(\S*)\]\((\S*)\)", r"\2", content)
def fix_emoji_sizes(html):
# type: (text_type) -> text_type
# type: (Text) -> Text
return html.replace(' class="emoji"', ' height="20px"')
def build_message_payload(message):
# type: (Message) -> Dict[str, text_type]
# type: (Message) -> Dict[str, Text]
plain = message.content
plain = fix_plaintext_image_urls(plain)
plain = relative_to_full_url(plain)
@@ -305,7 +305,7 @@ def handle_missedmessage_emails(user_profile_id, missed_email_events):
if not messages:
return
messages_by_recipient_subject = defaultdict(list) # type: Dict[Tuple[int, text_type], List[Message]]
messages_by_recipient_subject = defaultdict(list) # type: Dict[Tuple[int, Text], List[Message]]
for msg in messages:
messages_by_recipient_subject[(msg.recipient_id, msg.topic_name())].append(msg)
@@ -330,7 +330,7 @@ def handle_missedmessage_emails(user_profile_id, missed_email_events):
@uses_mandrill
def clear_followup_emails_queue(email, mail_client=None):
# type: (text_type, Optional[mandrill.Mandrill]) -> None
# type: (Text, Optional[mandrill.Mandrill]) -> None
"""
Clear out queued emails (from Mandrill's queue) that would otherwise
be sent to a specific email address. Optionally specify which sender
@@ -355,7 +355,7 @@ def clear_followup_emails_queue(email, mail_client=None):
return
def log_digest_event(msg):
# type: (text_type) -> None
# type: (Text) -> None
import logging
logging.basicConfig(filename=settings.DIGEST_LOG_PATH, level=logging.INFO)
logging.info(msg)
@@ -364,7 +364,7 @@ def log_digest_event(msg):
def send_future_email(recipients, email_html, email_text, subject,
delay=datetime.timedelta(0), sender=None,
tags=[], mail_client=None):
# type: (List[Dict[str, Any]], text_type, text_type, text_type, datetime.timedelta, Optional[Dict[str, text_type]], Iterable[text_type], Optional[mandrill.Mandrill]) -> None
# type: (List[Dict[str, Any]], Text, Text, Text, datetime.timedelta, Optional[Dict[str, Text]], Iterable[Text], Optional[mandrill.Mandrill]) -> None
"""
Sends email via Mandrill, with optional delay
@@ -452,7 +452,7 @@ def send_future_email(recipients, email_html, email_text, subject,
def send_local_email_template_with_delay(recipients, template_prefix,
template_payload, delay,
tags=[], sender={'email': settings.NOREPLY_EMAIL_ADDRESS, 'name': 'Zulip'}):
# type: (List[Dict[str, Any]], text_type, Dict[str, text_type], datetime.timedelta, Iterable[text_type], Dict[str, text_type]) -> None
# type: (List[Dict[str, Any]], Text, Dict[str, Text], datetime.timedelta, Iterable[Text], Dict[str, Text]) -> None
html_content = loader.render_to_string(template_prefix + ".html", template_payload)
text_content = loader.render_to_string(template_prefix + ".text", template_payload)
subject = loader.render_to_string(template_prefix + ".subject", template_payload).strip()
@@ -466,10 +466,10 @@ def send_local_email_template_with_delay(recipients, template_prefix,
tags=tags)
def enqueue_welcome_emails(email, name):
# type: (text_type, text_type) -> None
# type: (Text, Text) -> None
from zerver.context_processors import common_context
if settings.WELCOME_EMAIL_SENDER is not None:
sender = settings.WELCOME_EMAIL_SENDER # type: Dict[str, text_type]
sender = settings.WELCOME_EMAIL_SENDER # type: Dict[str, Text]
else:
sender = {'email': settings.ZULIP_ADMINISTRATOR, 'name': 'Zulip'}
@@ -497,7 +497,7 @@ def enqueue_welcome_emails(email, name):
sender=sender)
def convert_html_to_markdown(html):
# type: (text_type) -> text_type
# type: (Text) -> Text
# On Linux, the tool installs as html2markdown, and there's a command called
# html2text that does something totally different. On OSX, the tool installs
# as html2text.

View File

@@ -1,8 +1,7 @@
from __future__ import absolute_import
import random
from six import text_type
from typing import Any, Dict, Optional, SupportsInt
from typing import Any, Dict, Optional, SupportsInt, Text
from zerver.models import PushDeviceToken, UserProfile
from zerver.models import get_user_profile_by_id
@@ -53,7 +52,7 @@ def get_apns_key(identifer):
class APNsMessage(object):
def __init__(self, user, tokens, alert=None, badge=None, sound=None,
category=None, **kwargs):
# type: (UserProfile, List[text_type], text_type, int, text_type, text_type, **Any) -> None
# type: (UserProfile, List[Text], Text, int, Text, Text, **Any) -> None
self.frame = Frame()
self.tokens = tokens
expiry = int(time.time() + 24 * 3600)
@@ -124,11 +123,11 @@ def num_push_devices_for_user(user_profile, kind = None):
# We store the token as b64, but apns-client wants hex strings
def b64_to_hex(data):
# type: (bytes) -> text_type
# type: (bytes) -> Text
return binascii.hexlify(base64.b64decode(data)).decode('utf-8')
def hex_to_b64(data):
# type: (text_type) -> bytes
# type: (Text) -> bytes
return base64.b64encode(binascii.unhexlify(data.encode('utf-8')))
def _do_push_to_apns_service(user, message, apns_connection):
@@ -145,7 +144,7 @@ def _do_push_to_apns_service(user, message, apns_connection):
# mobile app
@statsd_increment("apple_push_notification")
def send_apple_push_notification(user, alert, **extra_data):
# type: (UserProfile, text_type, **Any) -> None
# type: (UserProfile, Text, **Any) -> None
if not connection and not dbx_connection:
logging.error("Attempting to send push notification, but no connection was found. "
"This may be because we could not find the APNS Certificate file.")

View File

@@ -1,7 +1,6 @@
from __future__ import absolute_import
from six import text_type
from typing import Any, Iterator, Tuple
from typing import Any, Iterator, Tuple, Text
from django.conf import settings
from zerver.lib.redis_utils import get_redis_client
@@ -29,7 +28,7 @@ def _rules_for_user(user):
return rules
def redis_key(user, domain):
# type: (UserProfile, text_type) -> List[text_type]
# type: (UserProfile, Text) -> List[Text]
"""Return the redis keys for this user"""
return ["ratelimit:%s:%s:%s:%s" % (type(user), user.id, domain, keytype) for keytype in ['list', 'zset', 'block']]
@@ -57,7 +56,7 @@ def remove_ratelimit_rule(range_seconds, num_requests):
rules = [x for x in rules if x[0] != range_seconds and x[1] != num_requests]
def block_user(user, seconds, domain='all'):
# type: (UserProfile, int, text_type) -> None
# type: (UserProfile, int, Text) -> None
"Manually blocks a user id for the desired number of seconds"
_, _, blocking_key = redis_key(user, domain)
with client.pipeline() as pipe:
@@ -71,7 +70,7 @@ def unblock_user(user, domain='all'):
client.delete(blocking_key)
def clear_user_history(user, domain='all'):
# type: (UserProfile, text_type) -> None
# type: (UserProfile, Text) -> None
'''
This is only used by test code now, where it's very helpful in
allowing us to run tests quickly, by giving a user a clean slate.
@@ -80,7 +79,7 @@ def clear_user_history(user, domain='all'):
client.delete(key)
def _get_api_calls_left(user, domain, range_seconds, max_calls):
# type: (UserProfile, text_type, int, int) -> Tuple[int, float]
# type: (UserProfile, Text, int, int) -> Tuple[int, float]
list_key, set_key, _ = redis_key(user, domain)
# Count the number of values in our sorted set
# that are between now and the cutoff
@@ -108,7 +107,7 @@ def _get_api_calls_left(user, domain, range_seconds, max_calls):
return calls_left, time_reset
def api_calls_left(user, domain='all'):
# type: (UserProfile, text_type) -> Tuple[int, float]
# type: (UserProfile, Text) -> Tuple[int, float]
"""Returns how many API calls in this range this client has, as well as when
the rate-limit will be reset to 0"""
max_window = _rules_for_user(user)[-1][0]
@@ -116,7 +115,7 @@ def api_calls_left(user, domain='all'):
return _get_api_calls_left(user, domain, max_window, max_calls)
def is_ratelimited(user, domain='all'):
# type: (UserProfile, text_type) -> Tuple[bool, float]
# type: (UserProfile, Text) -> Tuple[bool, float]
"Returns a tuple of (rate_limited, time_till_free)"
list_key, set_key, blocking_key = redis_key(user, domain)
@@ -167,7 +166,7 @@ def is_ratelimited(user, domain='all'):
return False, 0.0
def incr_ratelimit(user, domain='all'):
# type: (UserProfile, text_type) -> None
# type: (UserProfile, Text) -> None
"""Increases the rate-limit for the specified user"""
list_key, set_key, _ = redis_key(user, domain)
now = time.time()

View File

@@ -3,8 +3,7 @@ from __future__ import absolute_import
from django.http import HttpResponse, HttpResponseNotAllowed
import ujson
from typing import Optional, Any, Dict, List
from six import text_type
from typing import Optional, Any, Dict, List, Text
from zerver.lib.str_utils import force_bytes
@@ -12,7 +11,7 @@ class HttpResponseUnauthorized(HttpResponse):
status_code = 401
def __init__(self, realm, www_authenticate=None):
# type: (text_type, Optional[text_type]) -> None
# type: (Text, Optional[Text]) -> None
HttpResponse.__init__(self)
if www_authenticate is None:
self["WWW-Authenticate"] = 'Basic realm="%s"' % (realm,)
@@ -22,14 +21,14 @@ class HttpResponseUnauthorized(HttpResponse):
raise Exception("Invalid www_authenticate value!")
def json_unauthorized(message, www_authenticate=None):
# type: (text_type, Optional[text_type]) -> HttpResponse
# type: (Text, Optional[Text]) -> HttpResponse
resp = HttpResponseUnauthorized("zulip", www_authenticate=www_authenticate)
resp.content = force_bytes(ujson.dumps({"result": "error",
"msg": message}) + "\n")
return resp
def json_method_not_allowed(methods):
# type: (List[text_type]) -> text_type
# type: (List[Text]) -> Text
resp = HttpResponseNotAllowed(methods)
resp.content = force_bytes(ujson.dumps({"result": "error",
"msg": "Method Not Allowed",
@@ -37,7 +36,7 @@ def json_method_not_allowed(methods):
return resp
def json_response(res_type="success", msg="", data=None, status=200):
# type: (text_type, text_type, Optional[Dict[str, Any]], int) -> HttpResponse
# type: (Text, Text, Optional[Dict[str, Any]], int) -> HttpResponse
content = {"result": res_type, "msg": msg}
if data is not None:
content.update(data)

View File

@@ -3,12 +3,11 @@ from __future__ import absolute_import
from django.contrib.auth import SESSION_KEY, get_user_model
from django.contrib.sessions.models import Session
from typing import Mapping, Optional
from six import text_type
from typing import Mapping, Optional, Text
def get_session_dict_user(session_dict):
# type: (Mapping[text_type, int]) -> Optional[int]
# type: (Mapping[Text, int]) -> Optional[int]
# Compare django.contrib.auth._get_user_session_key
try:
return get_user_model()._meta.pk.to_python(session_dict[SESSION_KEY])

View File

@@ -34,7 +34,7 @@ import six
from six import text_type, binary_type
from typing import Any, Mapping, Union, TypeVar, Text
NonBinaryStr = TypeVar('NonBinaryStr', str, text_type)
NonBinaryStr = TypeVar('NonBinaryStr', str, Text)
# This is used to represent text or native strings
def force_text(s, encoding='utf-8'):

View File

@@ -2,7 +2,7 @@ from __future__ import absolute_import
from __future__ import print_function
from contextlib import contextmanager
from typing import (cast, Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional,
Sized, Tuple, Union)
Sized, Tuple, Union, Text)
from django.core.urlresolvers import resolve
from django.conf import settings
@@ -63,7 +63,7 @@ from zerver.lib.str_utils import NonBinaryStr
from contextlib import contextmanager
import six
API_KEYS = {} # type: Dict[text_type, text_type]
API_KEYS = {} # type: Dict[Text, Text]
class ZulipTestCase(TestCase):
'''
@@ -90,7 +90,7 @@ class ZulipTestCase(TestCase):
@instrument_url
def client_patch(self, url, info={}, **kwargs):
# type: (text_type, Dict[str, Any], **Any) -> HttpResponse
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
"""
We need to urlencode, since Django's function won't do it for us.
"""
@@ -100,7 +100,7 @@ class ZulipTestCase(TestCase):
@instrument_url
def client_patch_multipart(self, url, info={}, **kwargs):
# type: (text_type, Dict[str, Any], **Any) -> HttpResponse
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
"""
Use this for patch requests that have file uploads or
that need some sort of multi-part content. In the future
@@ -119,14 +119,14 @@ class ZulipTestCase(TestCase):
@instrument_url
def client_put(self, url, info={}, **kwargs):
# type: (text_type, Dict[str, Any], **Any) -> HttpResponse
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT
return django_client.put(url, encoded, **kwargs)
@instrument_url
def client_put_multipart(self, url, info={}, **kwargs):
# type: (text_type, Dict[str, Any], **Any) -> HttpResponse
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
"""
Use this for put requests that have file uploads or
that need some sort of multi-part content. In the future
@@ -141,20 +141,20 @@ class ZulipTestCase(TestCase):
@instrument_url
def client_delete(self, url, info={}, **kwargs):
# type: (text_type, Dict[str, Any], **Any) -> HttpResponse
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
encoded = urllib.parse.urlencode(info)
django_client = self.client # see WRAPPER_COMMENT
return django_client.delete(url, encoded, **kwargs)
@instrument_url
def client_post(self, url, info={}, **kwargs):
# type: (text_type, Dict[str, Any], **Any) -> HttpResponse
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
django_client = self.client # see WRAPPER_COMMENT
return django_client.post(url, info, **kwargs)
@instrument_url
def client_post_request(self, url, req):
# type: (text_type, Any) -> HttpResponse
# type: (Text, Any) -> HttpResponse
"""
We simulate hitting an endpoint here, although we
actually resolve the URL manually and hit the view
@@ -169,19 +169,19 @@ class ZulipTestCase(TestCase):
@instrument_url
def client_get(self, url, info={}, **kwargs):
# type: (text_type, Dict[str, Any], **Any) -> HttpResponse
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
django_client = self.client # see WRAPPER_COMMENT
return django_client.get(url, info, **kwargs)
def login_with_return(self, email, password=None):
# type: (text_type, Optional[text_type]) -> HttpResponse
# type: (Text, Optional[Text]) -> HttpResponse
if password is None:
password = initial_password(email)
return self.client_post('/accounts/login/',
{'username': email, 'password': password})
def login(self, email, password=None, fails=False):
# type: (text_type, Optional[text_type], bool) -> HttpResponse
# type: (Text, Optional[Text], bool) -> HttpResponse
if password is None:
password = initial_password(email)
if not fails:
@@ -190,7 +190,7 @@ class ZulipTestCase(TestCase):
self.assertFalse(self.client.login(username=email, password=password))
def register(self, username, password, domain="zulip.com"):
# type: (text_type, text_type, text_type) -> HttpResponse
# type: (Text, Text, Text) -> HttpResponse
self.client_post('/accounts/home/',
{'email': username + "@" + domain})
return self.submit_reg_form_for_user(username, password, domain=domain)
@@ -199,7 +199,7 @@ class ZulipTestCase(TestCase):
realm_name="Zulip Test", realm_subdomain="zuliptest",
realm_org_type=Realm.COMMUNITY,
from_confirmation='', **kwargs):
# type: (text_type, text_type, text_type, Optional[text_type], Optional[text_type], int, Optional[text_type], **Any) -> HttpResponse
# type: (Text, Text, Text, Optional[Text], Optional[Text], int, Optional[Text], **Any) -> HttpResponse
"""
Stage two of the two-step registration process.
@@ -219,7 +219,7 @@ class ZulipTestCase(TestCase):
**kwargs)
def get_confirmation_url_from_outbox(self, email_address, path_pattern="(\S+)>"):
# type: (text_type, text_type) -> text_type
# type: (Text, Text) -> Text
from django.core.mail import outbox
for message in reversed(outbox):
if email_address in message.to:
@@ -229,20 +229,20 @@ class ZulipTestCase(TestCase):
raise ValueError("Couldn't find a confirmation email.")
def get_api_key(self, email):
# type: (text_type) -> text_type
# type: (Text) -> Text
if email not in API_KEYS:
API_KEYS[email] = get_user_profile_by_email(email).api_key
return API_KEYS[email]
def api_auth(self, email):
# type: (text_type) -> Dict[str, text_type]
# type: (Text) -> Dict[str, Text]
credentials = u"%s:%s" % (email, self.get_api_key(email))
return {
'HTTP_AUTHORIZATION': u'Basic ' + base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
}
def get_streams(self, email):
# type: (text_type) -> List[text_type]
# type: (Text) -> List[Text]
"""
Helper function to get the stream names for a user
"""
@@ -255,7 +255,7 @@ class ZulipTestCase(TestCase):
def send_message(self, sender_name, raw_recipients, message_type,
content=u"test content", subject=u"test", **kwargs):
# type: (text_type, Union[text_type, List[text_type]], int, text_type, text_type, **Any) -> int
# type: (Text, Union[Text, List[Text]], int, Text, Text, **Any) -> int
sender = get_user_profile_by_email(sender_name)
if message_type == Recipient.PERSONAL:
message_type_name = "private"
@@ -281,7 +281,7 @@ class ZulipTestCase(TestCase):
return data['messages']
def users_subscribed_to_stream(self, stream_name, realm):
# type: (text_type, Realm) -> List[UserProfile]
# type: (Text, Realm) -> List[UserProfile]
stream = Stream.objects.get(name=stream_name, realm=realm)
recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
subscriptions = Subscription.objects.filter(recipient=recipient, active=True)
@@ -316,7 +316,7 @@ class ZulipTestCase(TestCase):
return json['msg']
def assert_json_error(self, result, msg, status_code=400):
# type: (HttpResponse, text_type, int) -> None
# type: (HttpResponse, Text, int) -> None
"""
Invalid POSTs return an error status code and JSON of the form
{"result": "error", "msg": "reason"}.
@@ -336,31 +336,31 @@ class ZulipTestCase(TestCase):
"len(%s) == %s, > %s" % (queries, actual_count, count))
def assert_json_error_contains(self, result, msg_substring, status_code=400):
# type: (HttpResponse, text_type, int) -> None
# type: (HttpResponse, Text, int) -> None
self.assertIn(msg_substring, self.get_json_error(result, status_code=status_code))
def assert_equals_response(self, string, response):
# type: (text_type, HttpResponse) -> None
# type: (Text, HttpResponse) -> None
self.assertEqual(string, response.content.decode('utf-8'))
def assert_in_response(self, substring, response):
# type: (text_type, HttpResponse) -> None
# type: (Text, HttpResponse) -> None
self.assertIn(substring, response.content.decode('utf-8'))
def assert_in_success_response(self, substrings, response):
# type: (Iterable[text_type], HttpResponse) -> None
# type: (Iterable[Text], HttpResponse) -> None
self.assertEqual(response.status_code, 200)
decoded = response.content.decode('utf-8')
for substring in substrings:
self.assertIn(substring, decoded)
def fixture_data(self, type, action, file_type='json'):
# type: (text_type, text_type, text_type) -> text_type
# type: (Text, Text, Text) -> Text
return force_text(open(os.path.join(os.path.dirname(__file__),
"../fixtures/%s/%s_%s.%s" % (type, type, action, file_type))).read())
def make_stream(self, stream_name, realm=None, invite_only=False):
# type: (text_type, Optional[Realm], Optional[bool]) -> Stream
# type: (Text, Optional[Realm], Optional[bool]) -> Stream
if realm is None:
realm = self.DEFAULT_REALM
@@ -382,7 +382,7 @@ class ZulipTestCase(TestCase):
# Subscribe to a stream directly
def subscribe_to_stream(self, email, stream_name, realm=None):
# type: (text_type, text_type, Optional[Realm]) -> None
# type: (Text, Text, Optional[Realm]) -> None
if realm is None:
realm = get_realm_by_email_domain(email)
stream = get_stream(stream_name, realm)
@@ -392,14 +392,14 @@ class ZulipTestCase(TestCase):
bulk_add_subscriptions([stream], [user_profile])
def unsubscribe_from_stream(self, email, stream_name):
# type: (text_type, text_type) -> None
# type: (Text, Text) -> None
user_profile = get_user_profile_by_email(email)
stream = get_stream(stream_name, user_profile.realm)
bulk_remove_subscriptions([user_profile], [stream])
# Subscribe to a stream by making an API request
def common_subscribe_to_streams(self, email, streams, extra_post_data={}, invite_only=False):
# type: (text_type, Iterable[text_type], Dict[str, Any], bool) -> HttpResponse
# type: (Text, Iterable[Text], Dict[str, Any], bool) -> HttpResponse
post_data = {'subscriptions': ujson.dumps([{"name": stream} for stream in streams]),
'invite_only': ujson.dumps(invite_only)}
post_data.update(extra_post_data)
@@ -407,7 +407,7 @@ class ZulipTestCase(TestCase):
return result
def send_json_payload(self, email, url, payload, stream_name=None, **post_params):
# type: (text_type, text_type, Union[text_type, Dict[str, Any]], Optional[text_type], **Any) -> Message
# type: (Text, Text, Union[Text, Dict[str, Any]], Optional[Text], **Any) -> Message
if stream_name is not None:
self.subscribe_to_stream(email, stream_name)
@@ -452,10 +452,10 @@ class WebhookTestCase(ZulipTestCase):
If you create your url in uncommon way you can override build_webhook_url method
In case that you need modify body or create it without using fixture you can also override get_body method
"""
STREAM_NAME = None # type: Optional[text_type]
STREAM_NAME = None # type: Optional[Text]
TEST_USER_EMAIL = 'webhook-bot@zulip.com'
URL_TEMPLATE = None # type: Optional[text_type]
FIXTURE_DIR_NAME = None # type: Optional[text_type]
URL_TEMPLATE = None # type: Optional[Text]
FIXTURE_DIR_NAME = None # type: Optional[Text]
def setUp(self):
# type: () -> None
@@ -463,7 +463,7 @@ class WebhookTestCase(ZulipTestCase):
def send_and_test_stream_message(self, fixture_name, expected_subject=None,
expected_message=None, content_type="application/json", **kwargs):
# type: (text_type, Optional[text_type], Optional[text_type], Optional[text_type], **Any) -> Message
# type: (Text, Optional[Text], Optional[Text], Optional[Text], **Any) -> Message
payload = self.get_body(fixture_name)
if content_type is not None:
kwargs['content_type'] = content_type
@@ -476,7 +476,7 @@ class WebhookTestCase(ZulipTestCase):
def send_and_test_private_message(self, fixture_name, expected_subject=None,
expected_message=None, content_type="application/json", **kwargs):
# type: (text_type, text_type, text_type, str, **Any) -> Message
# type: (Text, Text, Text, str, **Any) -> Message
payload = self.get_body(fixture_name)
if content_type is not None:
kwargs['content_type'] = content_type
@@ -488,22 +488,22 @@ class WebhookTestCase(ZulipTestCase):
return msg
def build_webhook_url(self):
# type: () -> text_type
# type: () -> Text
api_key = self.get_api_key(self.TEST_USER_EMAIL)
return self.URL_TEMPLATE.format(stream=self.STREAM_NAME, api_key=api_key)
def get_body(self, fixture_name):
# type: (text_type) -> Union[text_type, Dict[str, text_type]]
# type: (Text) -> Union[Text, Dict[str, Text]]
"""Can be implemented either as returning a dictionary containing the
post parameters or as string containing the body of the request."""
return ujson.dumps(ujson.loads(self.fixture_data(self.FIXTURE_DIR_NAME, fixture_name)))
def do_test_subject(self, msg, expected_subject):
# type: (Message, Optional[text_type]) -> None
# type: (Message, Optional[Text]) -> None
if expected_subject is not None:
self.assertEqual(msg.topic_name(), expected_subject)
def do_test_message(self, msg, expected_message):
# type: (Message, Optional[text_type]) -> None
# type: (Message, Optional[Text]) -> None
if expected_message is not None:
self.assertEqual(msg.content, expected_message)

View File

@@ -4,7 +4,7 @@ import re
import hashlib
from typing import Any, Optional
from importlib import import_module
from six import text_type
from typing import Text
from six.moves import cStringIO as StringIO
from django.db import connections, DEFAULT_DB_ALIAS
@@ -17,7 +17,7 @@ FILENAME_SPLITTER = re.compile('[\W\-_]')
TEST_DB_STATUS_DIR = 'var/test_db_status'
def database_exists(database_name, **options):
# type: (text_type, **Any) -> bool
# type: (Text, **Any) -> bool
db = options.get('database', DEFAULT_DB_ALIAS)
try:
connection = connections[db]
@@ -58,7 +58,7 @@ def get_migration_status(**options):
return re.sub('\x1b\[(1|0)m', '', output)
def are_migrations_the_same(migration_file, **options):
# type: (text_type, **Any) -> bool
# type: (Text, **Any) -> bool
if not os.path.exists(migration_file):
return False
@@ -99,7 +99,7 @@ def is_template_database_current(
migration_status='var/migration-status',
settings='zproject.test_settings',
check_files=None):
# type: (Optional[text_type], Optional[text_type], Optional[text_type], Optional[List[str]]) -> bool
# type: (Optional[Text], Optional[Text], Optional[Text], Optional[List[str]]) -> bool
# Using str type for check_files because re.split doesn't accept unicode
if check_files is None:
check_files = [

View File

@@ -55,7 +55,8 @@ import time
import ujson
import unittest
from six.moves import urllib
from six import text_type, binary_type
from six import binary_type
from typing import Text
from zerver.lib.str_utils import NonBinaryStr
from contextlib import contextmanager
@@ -79,16 +80,16 @@ def tornado_redirected_to_list(lst):
@contextmanager
def simulated_empty_cache():
# type: () -> Generator[List[Tuple[str, Union[text_type, List[text_type]], text_type]], None, None]
cache_queries = [] # type: List[Tuple[str, Union[text_type, List[text_type]], text_type]]
# type: () -> Generator[List[Tuple[str, Union[Text, List[Text]], Text]], None, None]
cache_queries = [] # type: List[Tuple[str, Union[Text, List[Text]], Text]]
def my_cache_get(key, cache_name=None):
# type: (text_type, Optional[str]) -> Any
# type: (Text, Optional[str]) -> Any
cache_queries.append(('get', key, cache_name))
return None
def my_cache_get_many(keys, cache_name=None):
# type: (List[text_type], Optional[str]) -> Dict[text_type, Any]
# type: (List[Text], Optional[str]) -> Dict[Text, Any]
cache_queries.append(('getmany', keys, cache_name))
return None
@@ -160,7 +161,7 @@ def make_client(name):
return client
def find_key_by_email(address):
# type: (text_type) -> text_type
# type: (Text) -> Text
from django.core.mail import outbox
key_regex = re.compile("accounts/do_confirm/([a-f0-9]{40})>")
for message in reversed(outbox):
@@ -221,16 +222,16 @@ class HostRequestMock(object):
routes that use Zulip's subdomains feature"""
def __init__(self, host=settings.EXTERNAL_HOST):
# type: (text_type) -> None
# type: (Text) -> None
self.host = host
def get_host(self):
# type: () -> text_type
# type: () -> Text
return self.host
class MockPythonResponse(object):
def __init__(self, text, status_code):
# type: (text_type, int) -> None
# type: (Text, int) -> None
self.text = text
self.status_code = status_code
@@ -250,7 +251,7 @@ def instrument_url(f):
return f
else:
def wrapper(self, url, info={}, **kwargs):
# type: (Any, text_type, Dict[str, Any], **Any) -> HttpResponse
# type: (Any, Text, Dict[str, Any], **Any) -> HttpResponse
start = time.time()
result = f(self, url, info, **kwargs)
delay = time.time() - start
@@ -379,7 +380,7 @@ def get_all_templates():
path_exists = os.path.exists
def is_valid_template(p, n):
# type: (text_type, text_type) -> bool
# type: (Text, Text) -> bool
return (not n.startswith('.') and
not n.startswith('__init__') and
not n.endswith(".md") and

View File

@@ -3,8 +3,7 @@ from __future__ import print_function
import sys
import functools
from typing import Any, Callable, IO, Mapping, Sequence, TypeVar
from six import text_type
from typing import Any, Callable, IO, Mapping, Sequence, TypeVar, Text
def get_mapping_type_str(x):
# type: (Mapping) -> str
@@ -48,7 +47,7 @@ def get_sequence_type_str(x):
else:
return '%s([%s, ...])' % (container_type, elem_type)
expansion_blacklist = [text_type, bytes]
expansion_blacklist = [Text, bytes]
def get_type_str(x):
# type: (Any) -> str

View File

@@ -4,21 +4,20 @@ import re
import os.path
import sourcemap
from six.moves import map
from six import text_type
from typing import Dict
from typing import Dict, Text
class SourceMap(object):
'''Map (line, column) pairs from generated to source file.'''
def __init__(self, sourcemap_dir):
# type: (text_type) -> None
# type: (Text) -> None
self._dir = sourcemap_dir
self._indices = {} # type: Dict[text_type, sourcemap.SourceMapDecoder]
self._indices = {} # type: Dict[Text, sourcemap.SourceMapDecoder]
def _index_for(self, minified_src):
# type: (text_type) -> sourcemap.SourceMapDecoder
# type: (Text) -> sourcemap.SourceMapDecoder
'''Return the source map index for minified_src, loading it if not
already loaded.'''
if minified_src not in self._indices:
@@ -28,8 +27,8 @@ class SourceMap(object):
return self._indices[minified_src]
def annotate_stacktrace(self, stacktrace):
# type: (text_type) -> text_type
out = '' # type: text_type
# type: (Text) -> Text
out = '' # type: Text
for ln in stacktrace.splitlines():
out += ln + '\n'
match = re.search(r'/static/min/(.+)(\.[0-9a-f]+)\.js:(\d+):(\d+)', ln)

View File

@@ -1,5 +1,5 @@
from __future__ import absolute_import
from typing import Optional, Tuple, Mapping, Any
from typing import Optional, Tuple, Mapping, Any, Text
from django.utils.translation import ugettext as _
from django.conf import settings
@@ -27,7 +27,7 @@ import base64
import os
import re
from PIL import Image, ImageOps
from six import binary_type, text_type
from six import binary_type
import io
import random
import logging
@@ -53,13 +53,13 @@ MEDIUM_AVATAR_SIZE = 500
attachment_url_re = re.compile(u'[/\-]user[\-_]uploads[/\.-].*?(?=[ )]|\Z)')
def attachment_url_to_path_id(attachment_url):
# type: (text_type) -> text_type
# type: (Text) -> Text
path_id_raw = re.sub(u'[/\-]user[\-_]uploads[/\.-]', u'', attachment_url)
# Remove any extra '.' after file extension. These are probably added by the user
return re.sub(u'[.]+$', u'', path_id_raw, re.M)
def sanitize_name(raw_value):
# type: (NonBinaryStr) -> text_type
# type: (NonBinaryStr) -> Text
"""
Sanitizes a value to be safe to store in a Linux filesystem, in
S3, and in a URL. So unicode is allowed, but not special
@@ -77,7 +77,7 @@ def sanitize_name(raw_value):
return mark_safe(re.sub('[-\s]+', '-', value, flags=re.U))
def random_name(bytes=60):
# type: (int) -> text_type
# type: (int) -> Text
return base64.urlsafe_b64encode(os.urandom(bytes)).decode('utf-8')
class BadImageError(JsonableError):
@@ -98,29 +98,29 @@ def resize_avatar(image_data, size=DEFAULT_AVATAR_SIZE):
class ZulipUploadBackend(object):
def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (text_type, Optional[text_type], binary_type, UserProfile, Optional[Realm]) -> text_type
# type: (Text, Optional[Text], binary_type, UserProfile, Optional[Realm]) -> Text
raise NotImplementedError()
def upload_avatar_image(self, user_file, user_profile, email):
# type: (File, UserProfile, text_type) -> None
# type: (File, UserProfile, Text) -> None
raise NotImplementedError()
def delete_message_image(self, path_id):
# type: (text_type) -> bool
# type: (Text) -> bool
raise NotImplementedError()
def get_avatar_url(self, hash_key, medium=False):
# type: (text_type, bool) -> text_type
# type: (Text, bool) -> Text
raise NotImplementedError()
def ensure_medium_avatar_image(self, email):
# type: (text_type) -> None
# type: (Text) -> None
raise NotImplementedError()
### S3
def get_bucket(conn, bucket_name):
# type: (S3Connection, text_type) -> Bucket
# type: (S3Connection, Text) -> Bucket
# Calling get_bucket() with validate=True can apparently lead
# to expensive S3 bills:
# http://www.appneta.com/blog/s3-list-get-bucket-default/
@@ -138,7 +138,7 @@ def upload_image_to_s3(
content_type,
user_profile,
contents):
# type: (NonBinaryStr, text_type, Optional[text_type], UserProfile, binary_type) -> None
# type: (NonBinaryStr, Text, Optional[Text], UserProfile, binary_type) -> None
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = get_bucket(conn, force_str(bucket_name))
@@ -155,7 +155,7 @@ def upload_image_to_s3(
key.set_contents_from_string(contents, headers=headers)
def get_file_info(request, user_file):
# type: (HttpRequest, File) -> Tuple[text_type, Optional[text_type]]
# type: (HttpRequest, File) -> Tuple[Text, Optional[Text]]
uploaded_file_name = user_file.name
assert isinstance(uploaded_file_name, str)
@@ -173,12 +173,12 @@ def get_file_info(request, user_file):
def get_signed_upload_url(path):
# type: (text_type) -> text_type
# type: (Text) -> Text
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
return force_text(conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=force_str(path)))
def get_realm_for_filename(path):
# type: (text_type) -> Optional[int]
# type: (Text) -> Optional[int]
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path)
if key is None:
@@ -188,7 +188,7 @@ def get_realm_for_filename(path):
class S3UploadBackend(ZulipUploadBackend):
def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (text_type, Optional[text_type], binary_type, UserProfile, Optional[Realm]) -> text_type
# type: (Text, Optional[Text], binary_type, UserProfile, Optional[Realm]) -> Text
bucket_name = settings.S3_AUTH_UPLOADS_BUCKET
s3_file_name = "/".join([
str(target_realm.id if target_realm is not None else user_profile.realm.id),
@@ -209,7 +209,7 @@ class S3UploadBackend(ZulipUploadBackend):
return url
def delete_message_image(self, path_id):
# type: (text_type) -> bool
# type: (Text) -> bool
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET)
@@ -224,7 +224,7 @@ class S3UploadBackend(ZulipUploadBackend):
return False
def upload_avatar_image(self, user_file, user_profile, email):
# type: (File, UserProfile, text_type) -> None
# type: (File, UserProfile, Text) -> None
content_type = guess_type(user_file.name)[0]
bucket_name = settings.S3_AVATAR_BUCKET
s3_file_name = user_avatar_hash(email)
@@ -260,14 +260,14 @@ class S3UploadBackend(ZulipUploadBackend):
# that users use gravatar.)
def get_avatar_url(self, hash_key, medium=False):
# type: (text_type, bool) -> text_type
# type: (Text, bool) -> Text
bucket = settings.S3_AVATAR_BUCKET
medium_suffix = "-medium" if medium else ""
# ?x=x allows templates to append additional parameters with &s
return u"https://%s.s3.amazonaws.com/%s%s?x=x" % (bucket, medium_suffix, hash_key)
def ensure_medium_avatar_image(self, email):
# type: (text_type) -> None
# type: (Text) -> None
user_profile = get_user_profile_by_email(email)
email_hash = user_avatar_hash(email)
s3_file_name = email_hash
@@ -290,20 +290,20 @@ class S3UploadBackend(ZulipUploadBackend):
### Local
def mkdirs(path):
# type: (text_type) -> None
# type: (Text) -> None
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
os.makedirs(dirname)
def write_local_file(type, path, file_data):
# type: (text_type, text_type, binary_type) -> None
# type: (Text, Text, binary_type) -> None
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, type, path)
mkdirs(file_path)
with open(file_path, 'wb') as f:
f.write(file_data)
def get_local_file_path(path_id):
# type: (text_type) -> Optional[text_type]
# type: (Text) -> Optional[Text]
local_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
if os.path.isfile(local_path):
return local_path
@@ -312,7 +312,7 @@ def get_local_file_path(path_id):
class LocalUploadBackend(ZulipUploadBackend):
def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (text_type, Optional[text_type], binary_type, UserProfile, Optional[Realm]) -> text_type
# type: (Text, Optional[Text], binary_type, UserProfile, Optional[Realm]) -> Text
# Split into 256 subdirectories to prevent directories from getting too big
path = "/".join([
str(user_profile.realm.id),
@@ -326,7 +326,7 @@ class LocalUploadBackend(ZulipUploadBackend):
return '/user_uploads/' + path
def delete_message_image(self, path_id):
# type: (text_type) -> bool
# type: (Text) -> bool
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
if os.path.isfile(file_path):
# This removes the file but the empty folders still remain.
@@ -338,7 +338,7 @@ class LocalUploadBackend(ZulipUploadBackend):
return False
def upload_avatar_image(self, user_file, user_profile, email):
# type: (File, UserProfile, text_type) -> None
# type: (File, UserProfile, Text) -> None
email_hash = user_avatar_hash(email)
image_data = user_file.read()
@@ -351,13 +351,13 @@ class LocalUploadBackend(ZulipUploadBackend):
write_local_file('avatars', email_hash+'-medium.png', resized_medium)
def get_avatar_url(self, hash_key, medium=False):
# type: (text_type, bool) -> text_type
# type: (Text, bool) -> Text
# ?x=x allows templates to append additional parameters with &s
medium_suffix = "-medium" if medium else ""
return u"/user_avatars/%s%s.png?x=x" % (hash_key, medium_suffix)
def ensure_medium_avatar_image(self, email):
# type: (text_type) -> None
# type: (Text) -> None
email_hash = user_avatar_hash(email)
output_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars", email_hash + "-medium.png")
@@ -376,20 +376,20 @@ else:
upload_backend = S3UploadBackend()
def delete_message_image(path_id):
# type: (text_type) -> bool
# type: (Text) -> bool
return upload_backend.delete_message_image(path_id)
def upload_avatar_image(user_file, user_profile, email):
# type: (File, UserProfile, text_type) -> None
# type: (File, UserProfile, Text) -> None
upload_backend.upload_avatar_image(user_file, user_profile, email)
def upload_message_image(uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (text_type, Optional[text_type], binary_type, UserProfile, Optional[Realm]) -> text_type
# type: (Text, Optional[Text], binary_type, UserProfile, Optional[Realm]) -> Text
return upload_backend.upload_message_image(uploaded_file_name, content_type, file_data,
user_profile, target_realm=target_realm)
def claim_attachment(user_profile, path_id, message, is_message_realm_public):
# type: (UserProfile, text_type, Message, bool) -> bool
# type: (UserProfile, Text, Message, bool) -> bool
try:
attachment = Attachment.objects.get(path_id=path_id)
attachment.messages.add(message)
@@ -405,11 +405,11 @@ def claim_attachment(user_profile, path_id, message, is_message_realm_public):
return False
def create_attachment(file_name, path_id, user_profile):
# type: (text_type, text_type, UserProfile) -> bool
# type: (Text, Text, UserProfile) -> bool
Attachment.objects.create(file_name=file_name, path_id=path_id, owner=user_profile, realm=user_profile.realm)
return True
def upload_message_image_from_request(request, user_file, user_profile):
# type: (HttpRequest, File, UserProfile) -> text_type
# type: (HttpRequest, File, UserProfile) -> Text
uploaded_file_name, content_type = get_file_info(request, user_file)
return upload_message_image(uploaded_file_name, content_type, user_file.read(), user_profile)

View File

@@ -1,11 +1,10 @@
from __future__ import absolute_import
from typing import Optional, Any
from six import text_type
from typing import Optional, Any, Text
from pyoembed import oEmbed, PyOembedException
def get_oembed_data(url, maxwidth=640, maxheight=480):
# type: (text_type, Optional[int], Optional[int]) -> Any
# type: (Text, Optional[int], Optional[int]) -> Any
try:
data = oEmbed(url, maxwidth=maxwidth, maxheight=maxheight)
except PyOembedException:

View File

@@ -1,12 +1,11 @@
from __future__ import absolute_import
from typing import Any
from six import text_type
from typing import Any, Text
from bs4 import BeautifulSoup
class BaseParser(object):
def __init__(self, html_source):
# type: (text_type) -> None
# type: (Text) -> None
self._soup = BeautifulSoup(html_source, "lxml")
def extract_data(self):

View File

@@ -1,13 +1,12 @@
from __future__ import absolute_import
import re
from six import text_type
from typing import Dict
from typing import Dict, Text
from .base import BaseParser
class OpenGraphParser(BaseParser):
def extract_data(self):
# type: () -> Dict[str, text_type]
# type: () -> Dict[str, Text]
meta = self._soup.findAll('meta')
content = {}
for tag in meta:

View File

@@ -2,8 +2,7 @@ from __future__ import absolute_import
import re
import logging
import traceback
from six import text_type
from typing import Any, Optional
from typing import Any, Optional, Text
from typing.re import Match
import requests
from zerver.lib.cache import cache_with_key, get_cache_with_key
@@ -22,18 +21,18 @@ link_regex = re.compile(
def is_link(url):
# type: (text_type) -> Match[text_type]
# type: (Text) -> Match[Text]
return link_regex.match(str(url))
def cache_key_func(url):
# type: (text_type) -> text_type
# type: (Text) -> Text
return url
@cache_with_key(cache_key_func, cache_name=CACHE_NAME, with_statsd_key="urlpreview_data")
def get_link_embed_data(url, maxwidth=640, maxheight=480):
# type: (text_type, Optional[int], Optional[int]) -> Any
# type: (Text, Optional[int], Optional[int]) -> Any
if not is_link(url):
return None
# Fetch information from URL.
@@ -62,5 +61,5 @@ def get_link_embed_data(url, maxwidth=640, maxheight=480):
@get_cache_with_key(cache_key_func, cache_name=CACHE_NAME)
def link_embed_data_from_cache(url, maxwidth=640, maxheight=480):
# type: (text_type, Optional[int], Optional[int]) -> Any
# type: (Text, Optional[int], Optional[int]) -> Any
return

View File

@@ -2,8 +2,8 @@
from __future__ import absolute_import
from __future__ import division
from typing import Any, Callable, Optional, Sequence, TypeVar, Iterable, Tuple
from six import text_type, binary_type
from typing import Any, Callable, Optional, Sequence, TypeVar, Iterable, Tuple, Text
from six import binary_type
import base64
import errno
import hashlib
@@ -90,7 +90,7 @@ def run_in_batches(all_list, batch_size, callback, sleep_time = 0, logger = None
sleep(sleep_time)
def make_safe_digest(string, hash_func=hashlib.sha1):
# type: (text_type, Callable[[binary_type], Any]) -> text_type
# type: (Text, Callable[[binary_type], Any]) -> Text
"""
return a hex digest of `string`.
"""
@@ -115,7 +115,7 @@ def log_statsd_event(name):
statsd.incr(event_name)
def generate_random_token(length):
# type: (int) -> text_type
# type: (int) -> Text
return base64.b16encode(os.urandom(length // 2)).decode('utf-8').lower()
def mkdir_p(path):
@@ -186,7 +186,7 @@ def query_chunker(queries, id_collector=None, chunk_size=1000, db_chunk_size=Non
yield [row for row_id, i, row in tup_chunk]
def get_subdomain(request):
# type: (HttpRequest) -> text_type
# type: (HttpRequest) -> Text
domain = request.get_host().lower()
index = domain.find("." + settings.EXTERNAL_HOST)
if index == -1:
@@ -197,7 +197,7 @@ def get_subdomain(request):
return subdomain
def check_subdomain(realm_subdomain, user_subdomain):
# type: (text_type, text_type) -> bool
# type: (Text, Text) -> bool
if settings.REALMS_HAVE_SUBDOMAINS and realm_subdomain is not None:
if (realm_subdomain == "" and user_subdomain is None):
return True