zerver/lib: Change use of typing.Text to str.

This commit is contained in:
Aditya Bansal
2018-05-11 05:10:23 +05:30
committed by Tim Abbott
parent 5416d137d3
commit a68376e2ba
30 changed files with 262 additions and 262 deletions

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Tuple, Text
from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Tuple
from zerver.lib.initial_password import initial_password
from zerver.models import Realm, Stream, UserProfile, Huddle, \
@@ -6,11 +6,11 @@ from zerver.models import Realm, Stream, UserProfile, Huddle, \
from zerver.lib.create_user import create_user_profile
def bulk_create_users(realm: Realm,
users_raw: Set[Tuple[Text, Text, Text, bool]],
users_raw: Set[Tuple[str, str, str, bool]],
bot_type: Optional[int]=None,
bot_owner: Optional[UserProfile]=None,
tos_version: Optional[Text]=None,
timezone: Text="") -> None:
tos_version: Optional[str]=None,
timezone: str="") -> None:
"""
Creates and saves a UserProfile with the given email.
Has some code based off of UserManage.create_user, but doesn't .save()
@@ -35,7 +35,7 @@ def bulk_create_users(realm: Realm,
event_type='user_created', event_time=profile_.date_joined)
for profile_ in profiles_to_create])
profiles_by_email = {} # type: Dict[Text, UserProfile]
profiles_by_email = {} # type: Dict[str, UserProfile]
profiles_by_id = {} # type: Dict[int, UserProfile]
for profile in UserProfile.objects.select_related().filter(realm=realm):
profiles_by_email[profile.email] = profile
@@ -47,7 +47,7 @@ def bulk_create_users(realm: Realm,
type=Recipient.PERSONAL))
Recipient.objects.bulk_create(recipients_to_create)
recipients_by_email = {} # type: Dict[Text, Recipient]
recipients_by_email = {} # type: Dict[str, Recipient]
for recipient in recipients_to_create:
recipients_by_email[profiles_by_id[recipient.type_id].email] = recipient
@@ -60,7 +60,7 @@ def bulk_create_users(realm: Realm,
# This is only sed in populate_db, so doesn't realy need tests
def bulk_create_streams(realm: Realm,
stream_dict: Dict[Text, Dict[Text, Any]]) -> None: # nocoverage
stream_dict: Dict[str, Dict[str, Any]]) -> None: # nocoverage
existing_streams = frozenset([name.lower() for name in
Stream.objects.filter(realm=realm)
.values_list('name', flat=True)])

View File

@@ -8,7 +8,7 @@ from django.conf import settings
from django.db.models import Q
from django.core.cache.backends.base import BaseCache
from typing import cast, Any, Callable, Dict, Iterable, List, Optional, Union, Set, TypeVar, Text, Tuple
from typing import cast, Any, Callable, Dict, Iterable, List, Optional, Union, Set, TypeVar, Tuple
from zerver.lib.utils import statsd, statsd_key, make_safe_digest
import subprocess
@@ -51,7 +51,7 @@ def remote_cache_stats_finish() -> None:
remote_cache_total_requests += 1
remote_cache_total_time += (time.time() - remote_cache_time_start)
def get_or_create_key_prefix() -> Text:
def get_or_create_key_prefix() -> str:
if settings.CASPER_TESTS:
# This sets the prefix for the benefit of the Casper tests.
#
@@ -70,7 +70,7 @@ def get_or_create_key_prefix() -> Text:
filename = os.path.join(settings.DEPLOY_ROOT, "var", "remote_cache_prefix")
try:
fd = os.open(filename, os.O_CREAT | os.O_EXCL | os.O_RDWR, 0o444)
random_hash = hashlib.sha256(Text(random.getrandbits(256)).encode('utf-8')).digest()
random_hash = hashlib.sha256(str(random.getrandbits(256)).encode('utf-8')).digest()
prefix = base64.b16encode(random_hash)[:32].decode('utf-8').lower() + ':'
# This does close the underlying file
with os.fdopen(fd, 'w') as f:
@@ -93,11 +93,11 @@ def get_or_create_key_prefix() -> Text:
return prefix
KEY_PREFIX = get_or_create_key_prefix() # type: Text
KEY_PREFIX = get_or_create_key_prefix() # type: str
def bounce_key_prefix_for_testing(test_name: Text) -> None:
def bounce_key_prefix_for_testing(test_name: str) -> None:
global KEY_PREFIX
KEY_PREFIX = test_name + ':' + Text(os.getpid()) + ':'
KEY_PREFIX = test_name + ':' + str(os.getpid()) + ':'
# We are taking the hash of the KEY_PREFIX to decrease the size of the key.
# Memcached keys should have a length of less than 256.
KEY_PREFIX = hashlib.sha1(KEY_PREFIX.encode('utf-8')).hexdigest()
@@ -108,7 +108,7 @@ def get_cache_backend(cache_name: Optional[str]) -> BaseCache:
return caches[cache_name]
def get_cache_with_key(
keyfunc: Callable[..., Text],
keyfunc: Callable[..., str],
cache_name: Optional[str]=None
) -> Callable[[Callable[..., ReturnT]], Callable[..., ReturnT]]:
"""
@@ -130,7 +130,7 @@ def get_cache_with_key(
return decorator
def cache_with_key(
keyfunc: Callable[..., Text], cache_name: Optional[str]=None,
keyfunc: Callable[..., str], cache_name: Optional[str]=None,
timeout: Optional[int]=None, with_statsd_key: Optional[str]=None
) -> Callable[[Callable[..., ReturnT]], Callable[..., ReturnT]]:
"""Decorator which applies Django caching to a function.
@@ -174,27 +174,27 @@ def cache_with_key(
return decorator
def cache_set(key: Text, val: Any, cache_name: Optional[str]=None, timeout: Optional[int]=None) -> None:
def cache_set(key: str, val: Any, cache_name: Optional[str]=None, timeout: Optional[int]=None) -> None:
remote_cache_stats_start()
cache_backend = get_cache_backend(cache_name)
cache_backend.set(KEY_PREFIX + key, (val,), timeout=timeout)
remote_cache_stats_finish()
def cache_get(key: Text, cache_name: Optional[str]=None) -> Any:
def cache_get(key: str, cache_name: Optional[str]=None) -> Any:
remote_cache_stats_start()
cache_backend = get_cache_backend(cache_name)
ret = cache_backend.get(KEY_PREFIX + key)
remote_cache_stats_finish()
return ret
def cache_get_many(keys: List[Text], cache_name: Optional[str]=None) -> Dict[Text, Any]:
def cache_get_many(keys: List[str], cache_name: Optional[str]=None) -> Dict[str, Any]:
keys = [KEY_PREFIX + key for key in keys]
remote_cache_stats_start()
ret = get_cache_backend(cache_name).get_many(keys)
remote_cache_stats_finish()
return dict([(key[len(KEY_PREFIX):], value) for key, value in ret.items()])
def cache_set_many(items: Dict[Text, Any], cache_name: Optional[str]=None,
def cache_set_many(items: Dict[str, Any], cache_name: Optional[str]=None,
timeout: Optional[int]=None) -> None:
new_items = {}
for key in items:
@@ -204,12 +204,12 @@ def cache_set_many(items: Dict[Text, Any], cache_name: Optional[str]=None,
get_cache_backend(cache_name).set_many(items, timeout=timeout)
remote_cache_stats_finish()
def cache_delete(key: Text, cache_name: Optional[str]=None) -> None:
def cache_delete(key: str, cache_name: Optional[str]=None) -> None:
remote_cache_stats_start()
get_cache_backend(cache_name).delete(KEY_PREFIX + key)
remote_cache_stats_finish()
def cache_delete_many(items: Iterable[Text], cache_name: Optional[str]=None) -> None:
def cache_delete_many(items: Iterable[str], cache_name: Optional[str]=None) -> None:
remote_cache_stats_start()
get_cache_backend(cache_name).delete_many(
KEY_PREFIX + item for item in items)
@@ -247,7 +247,7 @@ def default_cache_transformer(obj: ItemT) -> ItemT:
# value for cache (in case the values that we're caching are some
# function of the objects, not the objects themselves)
def generic_bulk_cached_fetch(
cache_key_function: Callable[[ObjKT], Text],
cache_key_function: Callable[[ObjKT], str],
query_function: Callable[[List[ObjKT]], Iterable[Any]],
object_ids: Iterable[ObjKT],
extractor: Callable[[CompressedItemT], ItemT] = default_extractor,
@@ -255,19 +255,19 @@ def generic_bulk_cached_fetch(
id_fetcher: Callable[[ItemT], ObjKT] = default_id_fetcher,
cache_transformer: Callable[[ItemT], ItemT] = default_cache_transformer
) -> Dict[ObjKT, ItemT]:
cache_keys = {} # type: Dict[ObjKT, Text]
cache_keys = {} # type: Dict[ObjKT, str]
for object_id in object_ids:
cache_keys[object_id] = cache_key_function(object_id)
cached_objects_compressed = cache_get_many([cache_keys[object_id]
for object_id in object_ids]) # type: Dict[Text, Tuple[CompressedItemT]]
cached_objects = {} # type: Dict[Text, ItemT]
for object_id in object_ids]) # type: Dict[str, Tuple[CompressedItemT]]
cached_objects = {} # type: Dict[str, ItemT]
for (key, val) in cached_objects_compressed.items():
cached_objects[key] = extractor(cached_objects_compressed[key][0])
needed_ids = [object_id for object_id in object_ids if
cache_keys[object_id] not in cached_objects]
db_objects = query_function(needed_ids)
items_for_remote_cache = {} # type: Dict[Text, Tuple[CompressedItemT]]
items_for_remote_cache = {} # type: Dict[str, Tuple[CompressedItemT]]
for obj in db_objects:
key = cache_keys[id_fetcher(obj)]
item = cache_transformer(obj)
@@ -294,28 +294,28 @@ def cache(func: Callable[..., ReturnT]) -> Callable[..., ReturnT]:
return cache_with_key(keyfunc)(func)
def display_recipient_cache_key(recipient_id: int) -> Text:
def display_recipient_cache_key(recipient_id: int) -> str:
return "display_recipient_dict:%d" % (recipient_id,)
def user_profile_by_email_cache_key(email: Text) -> Text:
def user_profile_by_email_cache_key(email: str) -> str:
# See the comment in zerver/lib/avatar_hash.py:gravatar_hash for why we
# are proactively encoding email addresses even though they will
# with high likelihood be ASCII-only for the foreseeable future.
return 'user_profile_by_email:%s' % (make_safe_digest(email.strip()),)
def user_profile_cache_key_id(email: Text, realm_id: int) -> Text:
def user_profile_cache_key_id(email: str, realm_id: int) -> str:
return u"user_profile:%s:%s" % (make_safe_digest(email.strip()), realm_id,)
def user_profile_cache_key(email: Text, realm: 'Realm') -> Text:
def user_profile_cache_key(email: str, realm: 'Realm') -> str:
return user_profile_cache_key_id(email, realm.id)
def bot_profile_cache_key(email: Text) -> Text:
def bot_profile_cache_key(email: str) -> str:
return "bot_profile:%s" % (make_safe_digest(email.strip()))
def user_profile_by_id_cache_key(user_profile_id: int) -> Text:
def user_profile_by_id_cache_key(user_profile_id: int) -> str:
return "user_profile_by_id:%s" % (user_profile_id,)
def user_profile_by_api_key_cache_key(api_key: Text) -> Text:
def user_profile_by_api_key_cache_key(api_key: str) -> str:
return "user_profile_by_api_key:%s" % (api_key,)
realm_user_dict_fields = [
@@ -323,10 +323,10 @@ realm_user_dict_fields = [
'avatar_source', 'avatar_version', 'is_active',
'is_realm_admin', 'is_bot', 'realm_id', 'timezone'] # type: List[str]
def realm_user_dicts_cache_key(realm_id: int) -> Text:
def realm_user_dicts_cache_key(realm_id: int) -> str:
return "realm_user_dicts:%s" % (realm_id,)
def active_user_ids_cache_key(realm_id: int) -> Text:
def active_user_ids_cache_key(realm_id: int) -> str:
return "active_user_ids:%s" % (realm_id,)
bot_dict_fields = ['id', 'full_name', 'short_name', 'bot_type', 'email',
@@ -337,10 +337,10 @@ bot_dict_fields = ['id', 'full_name', 'short_name', 'bot_type', 'email',
'bot_owner__email', 'avatar_source',
'avatar_version'] # type: List[str]
def bot_dicts_in_realm_cache_key(realm: 'Realm') -> Text:
def bot_dicts_in_realm_cache_key(realm: 'Realm') -> str:
return "bot_dicts_in_realm:%s" % (realm.id,)
def get_stream_cache_key(stream_name: Text, realm_id: int) -> Text:
def get_stream_cache_key(stream_name: str, realm_id: int) -> str:
return "stream_by_realm_and_name:%s:%s" % (
realm_id, make_safe_digest(stream_name.strip().lower()))
@@ -419,10 +419,10 @@ def flush_realm(sender: Any, **kwargs: Any) -> None:
cache_delete(bot_dicts_in_realm_cache_key(realm))
cache_delete(realm_alert_words_cache_key(realm))
def realm_alert_words_cache_key(realm: 'Realm') -> Text:
def realm_alert_words_cache_key(realm: 'Realm') -> str:
return "realm_alert_words:%s" % (realm.string_id,)
def realm_first_visible_message_id_cache_key(realm: 'Realm') -> Text:
def realm_first_visible_message_id_cache_key(realm: 'Realm') -> str:
return u"realm_first_visible_message_id:%s" % (realm.string_id,)
# Called by models.py to flush the stream cache whenever we save a stream
@@ -440,10 +440,10 @@ def flush_stream(sender: Any, **kwargs: Any) -> None:
Q(default_events_register_stream=stream)).exists():
cache_delete(bot_dicts_in_realm_cache_key(stream.realm))
def to_dict_cache_key_id(message_id: int) -> Text:
def to_dict_cache_key_id(message_id: int) -> str:
return 'message_dict:%d' % (message_id,)
def to_dict_cache_key(message: 'Message') -> Text:
def to_dict_cache_key(message: 'Message') -> str:
return to_dict_cache_key_id(message.id)
def flush_message(sender: Any, **kwargs: Any) -> None:

View File

@@ -1,5 +1,5 @@
from typing import Any, Callable, Dict, List, Tuple, Text
from typing import Any, Callable, Dict, List, Tuple
# This file needs to be different from cache.py because cache.py
# cannot import anything from zerver.models or we'd have an import
@@ -30,7 +30,7 @@ def message_fetch_objects() -> List[Any]:
return Message.objects.select_related().filter(~Q(sender__email='tabbott/extra@mit.edu'),
id__gt=max_id - MESSAGE_CACHE_SIZE)
def message_cache_items(items_for_remote_cache: Dict[Text, Tuple[bytes]],
def message_cache_items(items_for_remote_cache: Dict[str, Tuple[bytes]],
message: Message) -> None:
'''
Note: this code is untested, and the caller has been
@@ -40,31 +40,31 @@ def message_cache_items(items_for_remote_cache: Dict[Text, Tuple[bytes]],
value = MessageDict.to_dict_uncached(message)
items_for_remote_cache[key] = (value,)
def user_cache_items(items_for_remote_cache: Dict[Text, Tuple[UserProfile]],
def user_cache_items(items_for_remote_cache: Dict[str, Tuple[UserProfile]],
user_profile: UserProfile) -> None:
items_for_remote_cache[user_profile_by_email_cache_key(user_profile.email)] = (user_profile,)
items_for_remote_cache[user_profile_by_id_cache_key(user_profile.id)] = (user_profile,)
items_for_remote_cache[user_profile_by_api_key_cache_key(user_profile.api_key)] = (user_profile,)
items_for_remote_cache[user_profile_cache_key(user_profile.email, user_profile.realm)] = (user_profile,)
def stream_cache_items(items_for_remote_cache: Dict[Text, Tuple[Stream]],
def stream_cache_items(items_for_remote_cache: Dict[str, Tuple[Stream]],
stream: Stream) -> None:
items_for_remote_cache[get_stream_cache_key(stream.name, stream.realm_id)] = (stream,)
def client_cache_items(items_for_remote_cache: Dict[Text, Tuple[Client]],
def client_cache_items(items_for_remote_cache: Dict[str, Tuple[Client]],
client: Client) -> None:
items_for_remote_cache[get_client_cache_key(client.name)] = (client,)
def huddle_cache_items(items_for_remote_cache: Dict[Text, Tuple[Huddle]],
def huddle_cache_items(items_for_remote_cache: Dict[str, Tuple[Huddle]],
huddle: Huddle) -> None:
items_for_remote_cache[huddle_hash_cache_key(huddle.huddle_hash)] = (huddle,)
def recipient_cache_items(items_for_remote_cache: Dict[Text, Tuple[Recipient]],
def recipient_cache_items(items_for_remote_cache: Dict[str, Tuple[Recipient]],
recipient: Recipient) -> None:
items_for_remote_cache[get_recipient_cache_key(recipient.type, recipient.type_id)] = (recipient,)
session_engine = import_module(settings.SESSION_ENGINE)
def session_cache_items(items_for_remote_cache: Dict[Text, Text],
def session_cache_items(items_for_remote_cache: Dict[str, str],
session: Session) -> None:
store = session_engine.SessionStore(session_key=session.session_key) # type: ignore # import_module
items_for_remote_cache[store.cache_key] = store.decode(session.session_data)
@@ -86,12 +86,12 @@ cache_fillers = {
# 'message': (message_fetch_objects, message_cache_items, 3600 * 24, 1000),
'huddle': (lambda: Huddle.objects.select_related().all(), huddle_cache_items, 3600*24*7, 10000),
'session': (lambda: Session.objects.all(), session_cache_items, 3600*24*7, 10000),
} # type: Dict[str, Tuple[Callable[[], List[Any]], Callable[[Dict[Text, Any], Any], None], int, int]]
} # type: Dict[str, Tuple[Callable[[], List[Any]], Callable[[Dict[str, Any], Any], None], int, int]]
def fill_remote_cache(cache: str) -> None:
remote_cache_time_start = get_remote_cache_time()
remote_cache_requests_start = get_remote_cache_requests()
items_for_remote_cache = {} # type: Dict[Text, Any]
items_for_remote_cache = {} # type: Dict[str, Any]
(objects, items_filler, timeout, batch_size) = cache_fillers[cache]
count = 0
for obj in objects():

View File

@@ -7,9 +7,9 @@ import ujson
import os
import string
from typing import Optional, Text
from typing import Optional
def random_api_key() -> Text:
def random_api_key() -> str:
choices = string.ascii_letters + string.digits
altchars = ''.join([choices[ord(os.urandom(1)) % 62] for _ in range(2)]).encode("utf-8")
return base64.b64encode(os.urandom(24), altchars=altchars).decode("utf-8")
@@ -21,12 +21,12 @@ def random_api_key() -> Text:
# Only use this for bulk_create -- for normal usage one should use
# create_user (below) which will also make the Subscription and
# Recipient objects
def create_user_profile(realm: Realm, email: Text, password: Optional[Text],
active: bool, bot_type: Optional[int], full_name: Text,
short_name: Text, bot_owner: Optional[UserProfile],
is_mirror_dummy: bool, tos_version: Optional[Text],
timezone: Optional[Text],
tutorial_status: Optional[Text] = UserProfile.TUTORIAL_WAITING,
def create_user_profile(realm: Realm, email: str, password: Optional[str],
active: bool, bot_type: Optional[int], full_name: str,
short_name: str, bot_owner: Optional[UserProfile],
is_mirror_dummy: bool, tos_version: Optional[str],
timezone: Optional[str],
tutorial_status: Optional[str] = UserProfile.TUTORIAL_WAITING,
enter_sends: bool = False) -> UserProfile:
now = timezone_now()
email = UserManager.normalize_email(email)
@@ -51,12 +51,12 @@ def create_user_profile(realm: Realm, email: Text, password: Optional[Text],
user_profile.api_key = random_api_key()
return user_profile
def create_user(email: Text, password: Optional[Text], realm: Realm,
full_name: Text, short_name: Text, active: bool = True,
def create_user(email: str, password: Optional[str], realm: Realm,
full_name: str, short_name: str, active: bool = True,
is_realm_admin: bool = False, bot_type: Optional[int] = None,
bot_owner: Optional[UserProfile] = None,
tos_version: Optional[Text] = None, timezone: Text = "",
avatar_source: Text = UserProfile.AVATAR_FROM_GRAVATAR,
tos_version: Optional[str] = None, timezone: str = "",
avatar_source: str = UserProfile.AVATAR_FROM_GRAVATAR,
is_mirror_dummy: bool = False,
default_sending_stream: Optional[Stream] = None,
default_events_register_stream: Optional[Stream] = None,

View File

@@ -9,7 +9,7 @@ from django.utils.translation import ugettext as _
from django.conf import settings
from importlib import import_module
from typing import (
cast, Any, Callable, Dict, Iterable, List, Optional, Sequence, Set, Text, Tuple, Union
cast, Any, Callable, Dict, Iterable, List, Optional, Sequence, Set, Tuple, Union
)
session_engine = import_module(settings.SESSION_ENGINE)
@@ -50,7 +50,7 @@ from zproject.backends import email_auth_enabled, password_auth_enabled
from version import ZULIP_VERSION
def get_raw_user_data(realm_id: int, client_gravatar: bool) -> Dict[int, Dict[str, Text]]:
def get_raw_user_data(realm_id: int, client_gravatar: bool) -> Dict[int, Dict[str, str]]:
user_dicts = get_realm_user_dicts(realm_id)
# TODO: Consider optimizing this query away with caching.
@@ -476,7 +476,7 @@ def apply_event(state: Dict[str, Any],
event['subscriptions'][i] = copy.deepcopy(event['subscriptions'][i])
del event['subscriptions'][i]['subscribers']
def name(sub: Dict[str, Any]) -> Text:
def name(sub: Dict[str, Any]) -> str:
return sub['name'].lower()
if event['op'] == "add":
@@ -629,7 +629,7 @@ def do_events_register(user_profile: UserProfile, user_client: Client,
queue_lifespan_secs: int = 0,
all_public_streams: bool = False,
include_subscribers: bool = True,
narrow: Iterable[Sequence[Text]] = [],
narrow: Iterable[Sequence[str]] = [],
fetch_event_types: Optional[Iterable[str]] = None) -> Dict[str, Any]:
# Technically we don't need to check this here because
# build_narrow_filter will check it, but it's nicer from an error

View File

@@ -1,5 +1,5 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Text, Type
from typing import Any, Dict, List, Optional, Type
from django.core.exceptions import PermissionDenied
from django.utils.translation import ugettext as _
@@ -93,15 +93,15 @@ class JsonableError(Exception):
# like 403 or 404.
http_status_code = 400 # type: int
def __init__(self, msg: Text, code: Optional[ErrorCode]=None) -> None:
def __init__(self, msg: str, code: Optional[ErrorCode]=None) -> None:
if code is not None:
self.code = code
# `_msg` is an implementation detail of `JsonableError` itself.
self._msg = msg # type: Text
self._msg = msg # type: str
@staticmethod
def msg_format() -> Text:
def msg_format() -> str:
'''Override in subclasses. Gets the items in `data_fields` as format args.
This should return (a translation of) a string literal.
@@ -119,7 +119,7 @@ class JsonableError(Exception):
#
@property
def msg(self) -> Text:
def msg(self) -> str:
format_data = dict(((f, getattr(self, f)) for f in self.data_fields),
_msg=getattr(self, '_msg', None))
return self.msg_format().format(**format_data)

View File

@@ -19,7 +19,7 @@ from zerver.models import UserProfile, Realm, Client, Huddle, Stream, \
get_display_recipient, Attachment, get_system_bot
from zerver.lib.parallel import run_parallel
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, \
Iterable, Text
Iterable
# Custom mypy types follow:
Record = Dict[str, Any]

View File

@@ -1,7 +1,7 @@
from django.conf import settings
from django.core.mail import EmailMessage
from typing import Any, Mapping, Optional, Text
from typing import Any, Mapping, Optional
from zerver.lib.actions import internal_send_message
from zerver.lib.send_email import FromAddress
@@ -13,7 +13,7 @@ import time
client = get_redis_client()
def has_enough_time_expired_since_last_message(sender_email: Text, min_delay: float) -> bool:
def has_enough_time_expired_since_last_message(sender_email: str, min_delay: float) -> bool:
# This function returns a boolean, but it also has the side effect
# of noting that a new message was received.
key = 'zilencer:feedback:%s' % (sender_email,)

View File

@@ -7,12 +7,12 @@ from django.utils.translation import ugettext as _
from django.utils.lru_cache import lru_cache
from itertools import zip_longest
from typing import Any, List, Dict, Optional, Text
from typing import Any, List, Dict, Optional
import os
import ujson
def with_language(string: Text, language: Text) -> Text:
def with_language(string: str, language: str) -> str:
"""
This is an expensive function. If you are using it in a loop, it will
make your code slow.
@@ -30,7 +30,7 @@ def get_language_list() -> List[Dict[str, Any]]:
languages = ujson.load(reader)
return languages['name_map']
def get_language_list_for_templates(default_language: Text) -> List[Dict[str, Dict[str, str]]]:
def get_language_list_for_templates(default_language: str) -> List[Dict[str, Dict[str, str]]]:
language_list = [l for l in get_language_list()
if 'percent_translated' not in l or
l['percent_translated'] >= 5.]
@@ -67,13 +67,13 @@ def get_language_list_for_templates(default_language: Text) -> List[Dict[str, Di
return formatted_list
def get_language_name(code: str) -> Optional[Text]:
def get_language_name(code: str) -> Optional[str]:
for lang in get_language_list():
if code in (lang['code'], lang['locale']):
return lang['name']
return None
def get_available_language_codes() -> List[Text]:
def get_available_language_codes() -> List[str]:
language_list = get_language_list()
codes = [language['code'] for language in language_list]
return codes

View File

@@ -11,7 +11,7 @@ from django.conf import settings
from django.db import connection
from django.utils.timezone import utc as timezone_utc
from typing import Any, Dict, List, Optional, Set, Tuple, \
Iterable, Text
Iterable
from zerver.lib.avatar_hash import user_avatar_path_from_ids
from zerver.lib.bulk_create import bulk_create_users
@@ -567,7 +567,7 @@ def do_import_system_bots(realm: Any) -> None:
create_users(realm, names, bot_type=UserProfile.DEFAULT_BOT)
print("Finished importing system bots.")
def create_users(realm: Realm, name_list: Iterable[Tuple[Text, Text]],
def create_users(realm: Realm, name_list: Iterable[Tuple[str, str]],
bot_type: Optional[int]=None) -> None:
user_set = set()
for full_name, email in name_list:

View File

@@ -1,7 +1,7 @@
import os
import pathlib
from typing import Dict, List, Optional, TypeVar, Any, Text
from typing import Dict, List, Optional, TypeVar, Any
from django.conf import settings
from django.conf.urls import url
from django.urls.resolvers import LocaleRegexProvider

View File

@@ -6,7 +6,7 @@ from argparse import ArgumentParser
from django.conf import settings
from django.core.exceptions import MultipleObjectsReturned
from django.core.management.base import BaseCommand, CommandError
from typing import Any, Dict, Optional, Text, List
from typing import Any, Dict, Optional, List
from zerver.models import Realm, UserProfile
@@ -107,7 +107,7 @@ You can use the command list_realms to find ID of the realms in this server."""
user_profiles.append(self.get_user(email, realm))
return user_profiles
def get_user(self, email: Text, realm: Optional[Realm]) -> UserProfile:
def get_user(self, email: str, realm: Optional[Realm]) -> UserProfile:
# If a realm is specified, try to find the user there, and
# throw an error if they don't exist.

View File

@@ -1,5 +1,5 @@
from typing import Optional, Set, Text
from typing import Optional, Set
import re
@@ -10,10 +10,10 @@ user_group_mentions = r'(?<![^\s\'\"\(,:<])@(\*[^\*]+\*)'
wildcards = ['all', 'everyone', 'stream']
def user_mention_matches_wildcard(mention: Text) -> bool:
def user_mention_matches_wildcard(mention: str) -> bool:
return mention in wildcards
def extract_name(s: Text) -> Optional[Text]:
def extract_name(s: str) -> Optional[str]:
if s.startswith("**") and s.endswith("**"):
name = s[2:-2]
if name in wildcards:
@@ -23,15 +23,15 @@ def extract_name(s: Text) -> Optional[Text]:
# We don't care about @all, @everyone or @stream
return None
def possible_mentions(content: Text) -> Set[Text]:
def possible_mentions(content: str) -> Set[str]:
matches = re.findall(find_mentions, content)
names_with_none = (extract_name(match) for match in matches)
names = {name for name in names_with_none if name}
return names
def extract_user_group(matched_text: Text) -> Text:
def extract_user_group(matched_text: str) -> str:
return matched_text[1:-1]
def possible_user_group_mentions(content: Text) -> Set[Text]:
def possible_user_group_mentions(content: str) -> Set[str]:
matches = re.findall(user_group_mentions, content)
return {extract_user_group(match) for match in matches}

View File

@@ -43,10 +43,10 @@ from zerver.models import (
Reaction
)
from typing import Any, Dict, List, Optional, Set, Tuple, Text, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from mypy_extensions import TypedDict
RealmAlertWords = Dict[int, List[Text]]
RealmAlertWords = Dict[int, List[str]]
RawUnreadMessagesResult = TypedDict('RawUnreadMessagesResult', {
'pm_dict': Dict[int, Any],
@@ -69,7 +69,7 @@ MAX_UNREAD_MESSAGES = 5000
def messages_for_ids(message_ids: List[int],
user_message_flags: Dict[int, List[str]],
search_fields: Dict[int, Dict[str, Text]],
search_fields: Dict[int, Dict[str, str]],
apply_markdown: bool,
client_gravatar: bool,
allow_edit_history: bool) -> List[Dict[str, Any]]:
@@ -267,15 +267,15 @@ class MessageDict:
message: Optional[Message],
message_id: int,
last_edit_time: Optional[datetime.datetime],
edit_history: Optional[Text],
content: Text,
subject: Text,
edit_history: Optional[str],
content: str,
subject: str,
pub_date: datetime.datetime,
rendered_content: Optional[Text],
rendered_content: Optional[str],
rendered_content_version: Optional[int],
sender_id: int,
sender_realm_id: int,
sending_client_name: Text,
sending_client_name: str,
recipient_id: int,
recipient_type: int,
recipient_type_id: int,
@@ -406,7 +406,7 @@ class MessageDict:
if recipient_type == Recipient.STREAM:
display_type = "stream"
elif recipient_type in (Recipient.HUDDLE, Recipient.PERSONAL):
assert not isinstance(display_recipient, Text)
assert not isinstance(display_recipient, str)
display_type = "private"
if len(display_recipient) == 1:
# add the sender in if this isn't a message between
@@ -507,12 +507,12 @@ def access_message(user_profile: UserProfile, message_id: int) -> Tuple[Message,
return (message, user_message)
def render_markdown(message: Message,
content: Text,
content: str,
realm: Optional[Realm]=None,
realm_alert_words: Optional[RealmAlertWords]=None,
user_ids: Optional[Set[int]]=None,
mention_data: Optional[bugdown.MentionData]=None,
email_gateway: Optional[bool]=False) -> Text:
email_gateway: Optional[bool]=False) -> str:
"""Return HTML for given markdown. Bugdown may add properties to the
message object such as `mentions_user_ids`, `mentions_user_group_ids`, and
`mentions_wildcard`. These are only on this Django object and are not
@@ -533,7 +533,7 @@ def render_markdown(message: Message,
if realm is None:
realm = message.get_realm()
possible_words = set() # type: Set[Text]
possible_words = set() # type: Set[str]
if realm_alert_words is not None:
for user_id, words in realm_alert_words.items():
if user_id in message_user_ids:
@@ -566,10 +566,10 @@ def render_markdown(message: Message,
def huddle_users(recipient_id: int) -> str:
display_recipient = get_display_recipient_by_id(recipient_id,
Recipient.HUDDLE,
None) # type: Union[Text, List[Dict[str, Any]]]
None) # type: Union[str, List[Dict[str, Any]]]
# Text is for streams.
assert not isinstance(display_recipient, Text)
# str is for streams.
assert not isinstance(display_recipient, str)
user_ids = [obj['id'] for obj in display_recipient] # type: List[int]
user_ids = sorted(user_ids)
@@ -689,7 +689,7 @@ def get_raw_unread_data(user_profile: UserProfile) -> RawUnreadMessagesResult:
topic_mute_checker = build_topic_mute_checker(user_profile)
def is_row_muted(stream_id: int, recipient_id: int, topic: Text) -> bool:
def is_row_muted(stream_id: int, recipient_id: int, topic: str) -> bool:
if stream_id in muted_stream_ids:
return True

View File

@@ -1,5 +1,5 @@
from typing import cast, Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Text
from typing import cast, Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple
from confirmation.models import Confirmation, create_confirmation_link
from django.conf import settings
@@ -42,34 +42,34 @@ def one_click_unsubscribe_link(user_profile: UserProfile, email_type: str) -> st
Confirmation.UNSUBSCRIBE,
url_args = {'email_type': email_type})
def hash_util_encode(string: Text) -> Text:
def hash_util_encode(string: str) -> str:
# Do the same encoding operation as hash_util.encodeHashComponent on the
# frontend.
# `safe` has a default value of "/", but we want those encoded, too.
return urllib.parse.quote(
string.encode("utf-8"), safe=b"").replace(".", "%2E").replace("%", ".")
def encode_stream(stream_id: int, stream_name: Text) -> Text:
def encode_stream(stream_id: int, stream_name: str) -> str:
# We encode streams for urls as something like 99-Verona.
stream_name = stream_name.replace(' ', '-')
return str(stream_id) + '-' + hash_util_encode(stream_name)
def pm_narrow_url(realm: Realm, participants: List[Text]) -> Text:
def pm_narrow_url(realm: Realm, participants: List[str]) -> str:
participants.sort()
base_url = "%s/#narrow/pm-with/" % (realm.uri,)
return base_url + hash_util_encode(",".join(participants))
def stream_narrow_url(realm: Realm, stream: Stream) -> Text:
def stream_narrow_url(realm: Realm, stream: Stream) -> str:
base_url = "%s/#narrow/stream/" % (realm.uri,)
return base_url + encode_stream(stream.id, stream.name)
def topic_narrow_url(realm: Realm, stream: Stream, topic: Text) -> Text:
def topic_narrow_url(realm: Realm, stream: Stream, topic: str) -> str:
base_url = "%s/#narrow/stream/" % (realm.uri,)
return "%s%s/topic/%s" % (base_url,
encode_stream(stream.id, stream.name),
hash_util_encode(topic))
def relative_to_full_url(base_url: Text, content: Text) -> Text:
def relative_to_full_url(base_url: str, content: str) -> str:
# Convert relative URLs to absolute URLs.
fragment = lxml.html.fromstring(content)
@@ -114,7 +114,7 @@ def relative_to_full_url(base_url: Text, content: Text) -> Text:
return content
def fix_emojis(content: Text, base_url: Text, emojiset: Text) -> Text:
def fix_emojis(content: str, base_url: str, emojiset: str) -> str:
def make_emoji_img_elem(emoji_span_elem: Any) -> Dict[str, Any]:
# Convert the emoji spans to img tags.
classes = emoji_span_elem.get('class')
@@ -157,19 +157,19 @@ def build_message_list(user_profile: UserProfile, messages: List[Message]) -> Li
"""
messages_to_render = [] # type: List[Dict[str, Any]]
def sender_string(message: Message) -> Text:
def sender_string(message: Message) -> str:
if message.recipient.type in (Recipient.STREAM, Recipient.HUDDLE):
return message.sender.full_name
else:
return ''
def fix_plaintext_image_urls(content: Text) -> Text:
def fix_plaintext_image_urls(content: str) -> str:
# Replace image URLs in plaintext content of the form
# [image name](image url)
# with a simple hyperlink.
return re.sub(r"\[(\S*)\]\((\S*)\)", r"\2", content)
def build_message_payload(message: Message) -> Dict[str, Text]:
def build_message_payload(message: Message) -> Dict[str, str]:
plain = message.content
plain = fix_plaintext_image_urls(plain)
# There's a small chance of colliding with non-Zulip URLs containing
@@ -200,7 +200,7 @@ def build_message_list(user_profile: UserProfile, messages: List[Message]) -> Li
header_html = "<a style='color: #ffffff;' href='%s'>%s</a>" % (html_link, header)
elif message.recipient.type == Recipient.HUDDLE:
disp_recipient = get_display_recipient(message.recipient)
assert not isinstance(disp_recipient, Text)
assert not isinstance(disp_recipient, str)
other_recipients = [r['full_name'] for r in disp_recipient
if r['email'] != user_profile.email]
header = "You and %s" % (", ".join(other_recipients),)
@@ -332,7 +332,7 @@ def do_send_missedmessage_events_reply_in_zulip(user_profile: UserProfile,
if (missed_messages[0].recipient.type == Recipient.HUDDLE):
display_recipient = get_display_recipient(missed_messages[0].recipient)
# Make sure that this is a list of strings, not a string.
assert not isinstance(display_recipient, Text)
assert not isinstance(display_recipient, str)
other_recipients = [r['full_name'] for r in display_recipient
if r['id'] != user_profile.id]
context.update({'group_pm': True})
@@ -375,7 +375,7 @@ def do_send_missedmessage_events_reply_in_zulip(user_profile: UserProfile,
'realm_str': user_profile.realm.name,
})
from_name = "Zulip missed messages" # type: Text
from_name = "Zulip missed messages" # type: str
from_address = FromAddress.NOREPLY
if len(senders) == 1 and settings.SEND_MISSED_MESSAGE_EMAILS_AS_USER:
# If this setting is enabled, you can reply to the Zulip
@@ -420,7 +420,7 @@ def handle_missedmessage_emails(user_profile_id: int,
if not messages:
return
messages_by_recipient_subject = defaultdict(list) # type: Dict[Tuple[int, Text], List[Message]]
messages_by_recipient_subject = defaultdict(list) # type: Dict[Tuple[int, str], List[Message]]
for msg in messages:
if msg.recipient.type == Recipient.PERSONAL:
# For PM's group using (recipient, sender).
@@ -460,7 +460,7 @@ def clear_scheduled_emails(user_id: int, email_type: Optional[int]=None) -> None
items = items.filter(type=email_type)
items.delete()
def log_digest_event(msg: Text) -> None:
def log_digest_event(msg: str) -> None:
import logging
logging.basicConfig(filename=settings.DIGEST_LOG_PATH, level=logging.INFO)
logging.info(msg)
@@ -512,7 +512,7 @@ def enqueue_welcome_emails(user: UserProfile) -> None:
"zerver/emails/followup_day2", user.realm, to_user_id=user.id, from_name=from_name,
from_address=from_address, context=context, delay=followup_day2_email_delay(user))
def convert_html_to_markdown(html: Text) -> Text:
def convert_html_to_markdown(html: str) -> str:
# On Linux, the tool installs as html2markdown, and there's a command called
# html2text that does something totally different. On OSX, the tool installs
# as html2text.

View File

@@ -7,7 +7,7 @@ from zerver.lib.actions import set_default_streams, bulk_add_subscriptions, \
do_add_reaction_legacy, create_users
from zerver.models import Realm, UserProfile, Message, Reaction, get_system_bot
from typing import Any, Dict, List, Mapping, Text
from typing import Any, Dict, List, Mapping
def setup_realm_internal_bots(realm: Realm) -> None:
"""Create this realm's internal bots.
@@ -96,7 +96,7 @@ def send_initial_realm_messages(realm: Realm) -> None:
'content': "This is a message in a second topic.\n\nTopics are similar to email subjects, "
"in that each conversation should get its own topic. Keep them short, though; one "
"or two words will do it!"},
] # type: List[Dict[str, Text]]
] # type: List[Dict[str, str]]
messages = [internal_prep_stream_message(
realm, welcome_bot,
message['stream'], message['topic'], message['content']) for message in welcome_messages]

View File

@@ -1,4 +1,4 @@
from typing import Any, AnyStr, Iterable, Dict, Tuple, Callable, Text, Mapping, Optional
from typing import Any, AnyStr, Iterable, Dict, Tuple, Callable, Mapping, Optional
import requests
import json
@@ -22,11 +22,11 @@ from zerver.decorator import JsonableError
class OutgoingWebhookServiceInterface:
def __init__(self, base_url: Text, token: Text, user_profile: UserProfile, service_name: Text) -> None:
self.base_url = base_url # type: Text
self.token = token # type: Text
self.user_profile = user_profile # type: Text
self.service_name = service_name # type: Text
def __init__(self, base_url: str, token: str, user_profile: UserProfile, service_name: str) -> None:
self.base_url = base_url # type: str
self.token = token # type: str
self.user_profile = user_profile # type: str
self.service_name = service_name # type: str
# Given an event that triggers an outgoing webhook operation, returns:
# - The REST operation that should be performed
@@ -37,7 +37,7 @@ class OutgoingWebhookServiceInterface:
# - base_url
# - relative_url_path
# - request_kwargs
def process_event(self, event: Dict[Text, Any]) -> Tuple[Dict[str, Any], Any]:
def process_event(self, event: Dict[str, Any]) -> Tuple[Dict[str, Any], Any]:
raise NotImplementedError()
# Given a successful outgoing webhook REST operation, returns two-element tuple
@@ -46,12 +46,12 @@ class OutgoingWebhookServiceInterface:
# and right-hand value contains a failure message
# to sent back to the user (or None if no failure message should be sent)
def process_success(self, response: Response,
event: Dict[Text, Any]) -> Tuple[Optional[str], Optional[str]]:
event: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
raise NotImplementedError()
class GenericOutgoingWebhookService(OutgoingWebhookServiceInterface):
def process_event(self, event: Dict[Text, Any]) -> Tuple[Dict[str, Any], Any]:
def process_event(self, event: Dict[str, Any]) -> Tuple[Dict[str, Any], Any]:
rest_operation = {'method': 'POST',
'relative_url_path': '',
'base_url': self.base_url,
@@ -62,7 +62,7 @@ class GenericOutgoingWebhookService(OutgoingWebhookServiceInterface):
return rest_operation, json.dumps(request_data)
def process_success(self, response: Response,
event: Dict[Text, Any]) -> Tuple[Optional[str], Optional[str]]:
event: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
response_json = json.loads(response.text)
if "response_not_required" in response_json and response_json['response_not_required']:
@@ -74,7 +74,7 @@ class GenericOutgoingWebhookService(OutgoingWebhookServiceInterface):
class SlackOutgoingWebhookService(OutgoingWebhookServiceInterface):
def process_event(self, event: Dict[Text, Any]) -> Tuple[Dict[str, Any], Any]:
def process_event(self, event: Dict[str, Any]) -> Tuple[Dict[str, Any], Any]:
rest_operation = {'method': 'POST',
'relative_url_path': '',
'base_url': self.base_url,
@@ -100,7 +100,7 @@ class SlackOutgoingWebhookService(OutgoingWebhookServiceInterface):
return rest_operation, request_data
def process_success(self, response: Response,
event: Dict[Text, Any]) -> Tuple[Optional[str], Optional[str]]:
event: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
response_json = json.loads(response.text)
if "text" in response_json:
return response_json["text"], None
@@ -110,9 +110,9 @@ class SlackOutgoingWebhookService(OutgoingWebhookServiceInterface):
AVAILABLE_OUTGOING_WEBHOOK_INTERFACES = {
GENERIC_INTERFACE: GenericOutgoingWebhookService,
SLACK_INTERFACE: SlackOutgoingWebhookService,
} # type: Dict[Text, Any]
} # type: Dict[str, Any]
def get_service_interface_class(interface: Text) -> Any:
def get_service_interface_class(interface: str) -> Any:
if interface is None or interface not in AVAILABLE_OUTGOING_WEBHOOK_INTERFACES:
return AVAILABLE_OUTGOING_WEBHOOK_INTERFACES[GENERIC_INTERFACE]
else:
@@ -127,7 +127,7 @@ def get_outgoing_webhook_service_handler(service: Service) -> Any:
service_name=service.name)
return service_interface
def send_response_message(bot_id: str, message: Dict[str, Any], response_message_content: Text) -> None:
def send_response_message(bot_id: str, message: Dict[str, Any], response_message_content: str) -> None:
recipient_type_name = message['type']
bot_user = get_user_profile_by_id(bot_id)
realm = bot_user.realm
@@ -143,15 +143,15 @@ def send_response_message(bot_id: str, message: Dict[str, Any], response_message
else:
raise JsonableError(_("Invalid message type"))
def succeed_with_message(event: Dict[str, Any], success_message: Text) -> None:
def succeed_with_message(event: Dict[str, Any], success_message: str) -> None:
success_message = "Success! " + success_message
send_response_message(event['user_profile_id'], event['message'], success_message)
def fail_with_message(event: Dict[str, Any], failure_message: Text) -> None:
def fail_with_message(event: Dict[str, Any], failure_message: str) -> None:
failure_message = "Failure! " + failure_message
send_response_message(event['user_profile_id'], event['message'], failure_message)
def get_message_url(event: Dict[str, Any], request_data: Dict[str, Any]) -> Text:
def get_message_url(event: Dict[str, Any], request_data: Dict[str, Any]) -> str:
bot_user = get_user_profile_by_id(event['user_profile_id'])
message = event['message']
if message['type'] == 'stream':
@@ -194,7 +194,7 @@ def notify_bot_owner(event: Dict[str, Any],
def request_retry(event: Dict[str, Any],
request_data: Dict[str, Any],
failure_message: Text,
failure_message: str,
exception: Optional[Exception]=None) -> None:
def failure_processor(event: Dict[str, Any]) -> None:
"""

View File

@@ -10,7 +10,7 @@ import re
import time
import random
from typing import Any, Dict, List, Optional, SupportsInt, Text, Tuple, Type, Union
from typing import Any, Dict, List, Optional, SupportsInt, Tuple, Type, Union
from apns2.client import APNsClient
from apns2.payload import Payload as APNsPayload
@@ -44,10 +44,10 @@ else: # nocoverage -- Not convenient to add test for this.
DeviceToken = Union[PushDeviceToken, RemotePushDeviceToken]
# We store the token as b64, but apns-client wants hex strings
def b64_to_hex(data: bytes) -> Text:
def b64_to_hex(data: bytes) -> str:
return binascii.hexlify(base64.b64decode(data)).decode('utf-8')
def hex_to_b64(data: Text) -> bytes:
def hex_to_b64(data: str) -> bytes:
return base64.b64encode(binascii.unhexlify(data.encode('utf-8')))
#
@@ -255,7 +255,7 @@ class PushNotificationBouncerException(Exception):
def send_to_push_bouncer(method: str,
endpoint: str,
post_data: Union[Text, Dict[str, Any]],
post_data: Union[str, Dict[str, Any]],
extra_headers: Optional[Dict[str, Any]]=None) -> None:
"""While it does actually send the notice, this function has a lot of
code and comments around error handling for the push notifications
@@ -413,7 +413,7 @@ def push_notifications_enabled() -> bool:
return True
return False
def get_alert_from_message(message: Message) -> Text:
def get_alert_from_message(message: Message) -> str:
"""
Determine what alert string to display based on the missed messages.
"""
@@ -430,8 +430,8 @@ def get_alert_from_message(message: Message) -> Text:
else:
return "New Zulip mentions and private messages from %s" % (sender_str,)
def get_mobile_push_content(rendered_content: Text) -> Text:
def get_text(elem: LH.HtmlElement) -> Text:
def get_mobile_push_content(rendered_content: str) -> str:
def get_text(elem: LH.HtmlElement) -> str:
# Convert default emojis to their unicode equivalent.
classes = elem.get("class", "")
if "emoji" in classes:
@@ -449,13 +449,13 @@ def get_mobile_push_content(rendered_content: Text) -> Text:
return '' # To avoid empty line before quote text
return elem.text or ''
def format_as_quote(quote_text: Text) -> Text:
def format_as_quote(quote_text: str) -> str:
quote_text_list = filter(None, quote_text.split('\n')) # Remove empty lines
quote_text = '\n'.join(map(lambda x: "> "+x, quote_text_list))
quote_text += '\n'
return quote_text
def process(elem: LH.HtmlElement) -> Text:
def process(elem: LH.HtmlElement) -> str:
plain_text = get_text(elem)
sub_text = ''
for child in elem:
@@ -473,7 +473,7 @@ def get_mobile_push_content(rendered_content: Text) -> Text:
plain_text = process(elem)
return plain_text
def truncate_content(content: Text) -> Tuple[Text, bool]:
def truncate_content(content: str) -> Tuple[str, bool]:
# We use unicode character 'HORIZONTAL ELLIPSIS' (U+2026) instead
# of three dots as this saves two extra characters for textual
# content. This function will need to be updated to handle unicode

View File

@@ -1,7 +1,7 @@
import os
from typing import Any, Iterator, List, Optional, Tuple, Text
from typing import Any, Iterator, List, Optional, Tuple
from django.conf import settings
from zerver.lib.redis_utils import get_redis_client
@@ -21,23 +21,23 @@ rules = settings.RATE_LIMITING_RULES # type: List[Tuple[int, int]]
KEY_PREFIX = ''
class RateLimitedObject:
def get_keys(self) -> List[Text]:
def get_keys(self) -> List[str]:
key_fragment = self.key_fragment()
return ["{}ratelimit:{}:{}".format(KEY_PREFIX, key_fragment, keytype)
for keytype in ['list', 'zset', 'block']]
def key_fragment(self) -> Text:
def key_fragment(self) -> str:
raise NotImplementedError()
def rules(self) -> List[Tuple[int, int]]:
raise NotImplementedError()
class RateLimitedUser(RateLimitedObject):
def __init__(self, user: UserProfile, domain: Text='all') -> None:
def __init__(self, user: UserProfile, domain: str='all') -> None:
self.user = user
self.domain = domain
def key_fragment(self) -> Text:
def key_fragment(self) -> str:
return "{}:{}:{}".format(type(self.user), self.user.id, self.domain)
def rules(self) -> List[Tuple[int, int]]:
@@ -49,9 +49,9 @@ class RateLimitedUser(RateLimitedObject):
return result
return rules
def bounce_redis_key_prefix_for_testing(test_name: Text) -> None:
def bounce_redis_key_prefix_for_testing(test_name: str) -> None:
global KEY_PREFIX
KEY_PREFIX = test_name + ':' + Text(os.getpid()) + ':'
KEY_PREFIX = test_name + ':' + str(os.getpid()) + ':'
def max_api_calls(entity: RateLimitedObject) -> int:
"Returns the API rate limit for the highest limit"

View File

@@ -12,7 +12,7 @@ import logging
import ujson
import os
from typing import Any, Dict, Iterable, List, Mapping, Optional, Text
from typing import Any, Dict, Iterable, List, Mapping, Optional
from zerver.lib.logging_util import log_to_file
@@ -26,8 +26,8 @@ class FromAddress:
NOREPLY = parseaddr(settings.NOREPLY_EMAIL_ADDRESS)[1]
def build_email(template_prefix: str, to_user_id: Optional[int]=None,
to_email: Optional[Text]=None, from_name: Optional[Text]=None,
from_address: Optional[Text]=None, reply_to_email: Optional[Text]=None,
to_email: Optional[str]=None, from_name: Optional[str]=None,
from_address: Optional[str]=None, reply_to_email: Optional[str]=None,
context: Optional[Dict[str, Any]]=None) -> EmailMultiAlternatives:
# Callers should pass exactly one of to_user_id and to_email.
assert (to_user_id is None) ^ (to_email is None)
@@ -83,9 +83,9 @@ class EmailNotDeliveredException(Exception):
# When changing the arguments to this function, you may need to write a
# migration to change or remove any emails in ScheduledEmail.
def send_email(template_prefix: str, to_user_id: Optional[int]=None, to_email: Optional[Text]=None,
from_name: Optional[Text]=None, from_address: Optional[Text]=None,
reply_to_email: Optional[Text]=None, context: Dict[str, Any]={}) -> None:
def send_email(template_prefix: str, to_user_id: Optional[int]=None, to_email: Optional[str]=None,
from_name: Optional[str]=None, from_address: Optional[str]=None,
reply_to_email: Optional[str]=None, context: Dict[str, Any]={}) -> None:
mail = build_email(template_prefix, to_user_id=to_user_id, to_email=to_email, from_name=from_name,
from_address=from_address, reply_to_email=reply_to_email, context=context)
template = template_prefix.split("/")[-1]
@@ -99,8 +99,8 @@ def send_email_from_dict(email_dict: Mapping[str, Any]) -> None:
send_email(**dict(email_dict))
def send_future_email(template_prefix: str, realm: Realm, to_user_id: Optional[int]=None,
to_email: Optional[Text]=None, from_name: Optional[Text]=None,
from_address: Optional[Text]=None, context: Dict[str, Any]={},
to_email: Optional[str]=None, from_name: Optional[str]=None,
from_address: Optional[str]=None, context: Dict[str, Any]={},
delay: datetime.timedelta=datetime.timedelta(0)) -> None:
template_name = template_prefix.split('/')[-1]
email_fields = {'template_prefix': template_prefix, 'to_user_id': to_user_id, 'to_email': to_email,

View File

@@ -1,4 +1,4 @@
from typing import (Dict, List, Text, Set)
from typing import (Dict, List, Set)
from django.db.models.query import QuerySet
from zerver.lib.stream_subscription import (
@@ -16,7 +16,7 @@ class StreamTopicTarget:
places where we are are still using `subject` or
`topic_name` as a key into tables.
'''
def __init__(self, stream_id: int, topic_name: Text) -> None:
def __init__(self, stream_id: int, topic_name: str) -> None:
self.stream_id = stream_id
self.topic_name = topic_name

View File

@@ -1,5 +1,5 @@
from typing import Any, Iterable, List, Mapping, Set, Text, Tuple, Optional
from typing import Any, Iterable, List, Mapping, Set, Tuple, Optional
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _
@@ -32,7 +32,7 @@ def access_stream_for_delete_or_update(user_profile: UserProfile, stream_id: int
# Only set allow_realm_admin flag to True when you want to allow realm admin to
# access unsubscribed private stream content.
def access_stream_common(user_profile: UserProfile, stream: Stream,
error: Text,
error: str,
require_active: bool=True,
allow_realm_admin: bool=False) -> Tuple[Recipient, Optional[Subscription]]:
"""Common function for backend code where the target use attempts to
@@ -93,7 +93,7 @@ def get_stream_by_id(stream_id: int) -> Stream:
raise JsonableError(error)
return stream
def check_stream_name_available(realm: Realm, name: Text) -> None:
def check_stream_name_available(realm: Realm, name: str) -> None:
check_stream_name(name)
try:
get_stream(name, realm)
@@ -102,7 +102,7 @@ def check_stream_name_available(realm: Realm, name: Text) -> None:
pass
def access_stream_by_name(user_profile: UserProfile,
stream_name: Text) -> Tuple[Stream, Recipient, Optional[Subscription]]:
stream_name: str) -> Tuple[Stream, Recipient, Optional[Subscription]]:
error = _("Invalid stream name '%s'" % (stream_name,))
try:
stream = get_realm_stream(stream_name, user_profile.realm_id)
@@ -112,7 +112,7 @@ def access_stream_by_name(user_profile: UserProfile,
(recipient, sub) = access_stream_common(user_profile, stream, error)
return (stream, recipient, sub)
def access_stream_for_unmute_topic(user_profile: UserProfile, stream_name: Text, error: Text) -> Stream:
def access_stream_for_unmute_topic(user_profile: UserProfile, stream_name: str, error: str) -> Stream:
"""
It may seem a little silly to have this helper function for unmuting
topics, but it gets around a linter warning, and it helps to be able
@@ -132,7 +132,7 @@ def access_stream_for_unmute_topic(user_profile: UserProfile, stream_name: Text,
raise JsonableError(error)
return stream
def can_access_stream_history_by_name(user_profile: UserProfile, stream_name: Text) -> bool:
def can_access_stream_history_by_name(user_profile: UserProfile, stream_name: str) -> bool:
"""Determine whether the provided user is allowed to access the
history of the target stream. The stream is specified by name.

View File

@@ -1,7 +1,7 @@
from contextlib import contextmanager
from typing import (
cast, Any, Callable, Dict, Generator, Iterable, Iterator, List, Mapping,
Optional, Set, Sized, Tuple, Union, IO, Text, TypeVar
Optional, Set, Sized, Tuple, Union, IO, TypeVar
)
from django.core import signing
@@ -108,14 +108,14 @@ def tornado_redirected_to_list(lst: List[Mapping[str, Any]]) -> Iterator[None]:
@contextmanager
def simulated_empty_cache() -> Generator[
List[Tuple[str, Union[Text, List[Text]], Text]], None, None]:
cache_queries = [] # type: List[Tuple[str, Union[Text, List[Text]], Text]]
List[Tuple[str, Union[str, List[str]], str]], None, None]:
cache_queries = [] # type: List[Tuple[str, Union[str, List[str]], str]]
def my_cache_get(key: Text, cache_name: Optional[str]=None) -> Optional[Dict[Text, Any]]:
def my_cache_get(key: str, cache_name: Optional[str]=None) -> Optional[Dict[str, Any]]:
cache_queries.append(('get', key, cache_name))
return None
def my_cache_get_many(keys: List[Text], cache_name: Optional[str]=None) -> Dict[Text, Any]: # nocoverage -- simulated code doesn't use this
def my_cache_get_many(keys: List[str], cache_name: Optional[str]=None) -> Dict[str, Any]: # nocoverage -- simulated code doesn't use this
cache_queries.append(('getmany', keys, cache_name))
return {}
@@ -186,7 +186,7 @@ def get_test_image_file(filename: str) -> IO[Any]:
test_avatar_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../tests/images'))
return open(os.path.join(test_avatar_dir, filename), 'rb')
def avatar_disk_path(user_profile: UserProfile, medium: bool=False) -> Text:
def avatar_disk_path(user_profile: UserProfile, medium: bool=False) -> str:
avatar_url_path = avatar_url(user_profile, medium)
avatar_disk_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars",
avatar_url_path.split("/")[-2],
@@ -197,7 +197,7 @@ def make_client(name: str) -> Client:
client, _ = Client.objects.get_or_create(name=name)
return client
def find_key_by_email(address: Text) -> Optional[Text]:
def find_key_by_email(address: str) -> Optional[str]:
from django.core.mail import outbox
key_regex = re.compile("accounts/do_confirm/([a-z0-9]{24})>")
for message in reversed(outbox):
@@ -222,7 +222,7 @@ def most_recent_message(user_profile: UserProfile) -> Message:
usermessage = most_recent_usermessage(user_profile)
return usermessage.message
def get_subscription(stream_name: Text, user_profile: UserProfile) -> Subscription:
def get_subscription(stream_name: str, user_profile: UserProfile) -> Subscription:
stream = get_stream(stream_name, user_profile.realm)
recipient = get_stream_recipient(stream.id)
return Subscription.objects.get(user_profile=user_profile,
@@ -255,7 +255,7 @@ class HostRequestMock:
"""A mock request object where get_host() works. Useful for testing
routes that use Zulip's subdomains feature"""
def __init__(self, user_profile: UserProfile=None, host: Text=settings.EXTERNAL_HOST) -> None:
def __init__(self, user_profile: UserProfile=None, host: str=settings.EXTERNAL_HOST) -> None:
self.host = host
self.GET = {} # type: Dict[str, Any]
self.POST = {} # type: Dict[str, Any]
@@ -267,11 +267,11 @@ class HostRequestMock:
self.content_type = ''
self._email = ''
def get_host(self) -> Text:
def get_host(self) -> str:
return self.host
class MockPythonResponse:
def __init__(self, text: Text, status_code: int) -> None:
def __init__(self, text: str, status_code: int) -> None:
self.text = text
self.status_code = status_code
@@ -291,7 +291,7 @@ def instrument_url(f: UrlFuncT) -> UrlFuncT:
if not INSTRUMENTING: # nocoverage -- option is always enabled; should we remove?
return f
else:
def wrapper(self: 'ZulipTestCase', url: Text, info: Dict[str, Any]={},
def wrapper(self: 'ZulipTestCase', url: str, info: Dict[str, Any]={},
**kwargs: Any) -> HttpResponse:
start = time.time()
result = f(self, url, info, **kwargs)
@@ -420,7 +420,7 @@ def get_all_templates() -> List[str]:
isfile = os.path.isfile
path_exists = os.path.exists
def is_valid_template(p: Text, n: Text) -> bool:
def is_valid_template(p: str, n: str) -> bool:
return 'webhooks' not in p \
and not n.startswith('.') \
and not n.startswith('__init__') \

View File

@@ -3,7 +3,7 @@ from functools import partial
import random
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, \
Text, Type, cast, Union, TypeVar
Type, cast, Union, TypeVar
from unittest import loader, runner # type: ignore # Mypy cannot pick these up.
from unittest.result import TestResult
@@ -140,7 +140,7 @@ class TextTestResult(runner.TextTestResult):
super().__init__(*args, **kwargs)
self.failed_tests = [] # type: List[str]
def addInfo(self, test: TestCase, msg: Text) -> None:
def addInfo(self, test: TestCase, msg: str) -> None:
self.stream.write(msg)
self.stream.flush()
@@ -165,7 +165,7 @@ class TextTestResult(runner.TextTestResult):
test_name = full_test_name(args[0])
self.failed_tests.append(test_name)
def addSkip(self, test: TestCase, reason: Text) -> None:
def addSkip(self, test: TestCase, reason: str) -> None:
TestResult.addSkip(self, test, reason)
self.stream.writeln("** Skipping {}: {}".format(full_test_name(test),
reason))
@@ -176,7 +176,7 @@ class RemoteTestResult(django_runner.RemoteTestResult):
The class follows the unpythonic style of function names of the
base class.
"""
def addInfo(self, test: TestCase, msg: Text) -> None:
def addInfo(self, test: TestCase, msg: str) -> None:
self.events.append(('addInfo', self.test_index, msg))
def addInstrumentation(self, test: TestCase, data: Dict[str, Any]) -> None:
@@ -353,7 +353,7 @@ class ParallelTestSuite(django_runner.ParallelTestSuite):
# definitions.
self.subsuites = SubSuiteList(self.subsuites) # type: ignore # Type of self.subsuites changes.
def check_import_error(test_name: Text) -> None:
def check_import_error(test_name: str) -> None:
try:
# Directly using __import__ is not recommeded, but here it gives
# clearer traceback as compared to importlib.import_module.

View File

@@ -1,4 +1,4 @@
from typing import Any, Callable, Dict, List, Optional, Text
from typing import Any, Callable, Dict, List, Optional
from zerver.models import (
get_stream_recipient,
@@ -15,7 +15,7 @@ from sqlalchemy.sql import (
Selectable
)
def get_topic_mutes(user_profile: UserProfile) -> List[List[Text]]:
def get_topic_mutes(user_profile: UserProfile) -> List[List[str]]:
rows = MutedTopic.objects.filter(
user_profile=user_profile,
).values(
@@ -27,7 +27,7 @@ def get_topic_mutes(user_profile: UserProfile) -> List[List[Text]]:
for row in rows
]
def set_topic_mutes(user_profile: UserProfile, muted_topics: List[List[Text]]) -> None:
def set_topic_mutes(user_profile: UserProfile, muted_topics: List[List[str]]) -> None:
'''
This is only used in tests.
@@ -64,7 +64,7 @@ def remove_topic_mute(user_profile: UserProfile, stream_id: int, topic_name: str
)
row.delete()
def topic_is_muted(user_profile: UserProfile, stream_id: int, topic_name: Text) -> bool:
def topic_is_muted(user_profile: UserProfile, stream_id: int, topic_name: str) -> bool:
is_muted = MutedTopic.objects.filter(
user_profile=user_profile,
stream_id=stream_id,
@@ -103,7 +103,7 @@ def exclude_topic_mutes(conditions: List[Selectable],
condition = not_(or_(*list(map(mute_cond, rows))))
return conditions + [condition]
def build_topic_mute_checker(user_profile: UserProfile) -> Callable[[int, Text], bool]:
def build_topic_mute_checker(user_profile: UserProfile) -> Callable[[int, str], bool]:
rows = MutedTopic.objects.filter(
user_profile=user_profile,
).values(
@@ -118,7 +118,7 @@ def build_topic_mute_checker(user_profile: UserProfile) -> Callable[[int, Text],
topic_name = row['topic_name']
tups.add((recipient_id, topic_name.lower()))
def is_muted(recipient_id: int, topic: Text) -> bool:
def is_muted(recipient_id: int, topic: str) -> bool:
return (recipient_id, topic.lower()) in tups
return is_muted

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Mapping, Optional, Tuple, Text
from typing import Any, Dict, Mapping, Optional, Tuple
from django.utils.translation import ugettext as _
from django.conf import settings
@@ -55,12 +55,12 @@ class RealmUploadQuotaError(JsonableError):
attachment_url_re = re.compile('[/\-]user[\-_]uploads[/\.-].*?(?=[ )]|\Z)')
def attachment_url_to_path_id(attachment_url: Text) -> Text:
def attachment_url_to_path_id(attachment_url: str) -> str:
path_id_raw = re.sub('[/\-]user[\-_]uploads[/\.-]', '', attachment_url)
# Remove any extra '.' after file extension. These are probably added by the user
return re.sub('[.]+$', '', path_id_raw, re.M)
def sanitize_name(value: NonBinaryStr) -> Text:
def sanitize_name(value: NonBinaryStr) -> str:
"""
Sanitizes a value to be safe to store in a Linux filesystem, in
S3, and in a URL. So unicode is allowed, but not special
@@ -75,7 +75,7 @@ def sanitize_name(value: NonBinaryStr) -> Text:
value = re.sub('[^\w\s._-]', '', value, flags=re.U).strip()
return mark_safe(re.sub('[-\s]+', '-', value, flags=re.U))
def random_name(bytes: int=60) -> Text:
def random_name(bytes: int=60) -> str:
return base64.urlsafe_b64encode(os.urandom(bytes)).decode('utf-8')
class BadImageError(JsonableError):
@@ -118,10 +118,10 @@ def resize_emoji(image_data: bytes, size: int=DEFAULT_EMOJI_SIZE) -> bytes:
### Common
class ZulipUploadBackend:
def upload_message_file(self, uploaded_file_name: Text, uploaded_file_size: int,
content_type: Optional[Text], file_data: bytes,
def upload_message_file(self, uploaded_file_name: str, uploaded_file_size: int,
content_type: Optional[str], file_data: bytes,
user_profile: UserProfile,
target_realm: Optional[Realm]=None) -> Text:
target_realm: Optional[Realm]=None) -> str:
raise NotImplementedError()
def upload_avatar_image(self, user_file: File,
@@ -129,10 +129,10 @@ class ZulipUploadBackend:
target_user_profile: UserProfile) -> None:
raise NotImplementedError()
def delete_message_image(self, path_id: Text) -> bool:
def delete_message_image(self, path_id: str) -> bool:
raise NotImplementedError()
def get_avatar_url(self, hash_key: Text, medium: bool=False) -> Text:
def get_avatar_url(self, hash_key: str, medium: bool=False) -> str:
raise NotImplementedError()
def ensure_medium_avatar_image(self, user_profile: UserProfile) -> None:
@@ -141,19 +141,19 @@ class ZulipUploadBackend:
def upload_realm_icon_image(self, icon_file: File, user_profile: UserProfile) -> None:
raise NotImplementedError()
def get_realm_icon_url(self, realm_id: int, version: int) -> Text:
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
raise NotImplementedError()
def upload_emoji_image(self, emoji_file: File, emoji_file_name: Text, user_profile: UserProfile) -> None:
def upload_emoji_image(self, emoji_file: File, emoji_file_name: str, user_profile: UserProfile) -> None:
raise NotImplementedError()
def get_emoji_url(self, emoji_file_name: Text, realm_id: int) -> Text:
def get_emoji_url(self, emoji_file_name: str, realm_id: int) -> str:
raise NotImplementedError()
### S3
def get_bucket(conn: S3Connection, bucket_name: Text) -> Bucket:
def get_bucket(conn: S3Connection, bucket_name: str) -> Bucket:
# Calling get_bucket() with validate=True can apparently lead
# to expensive S3 bills:
# http://www.appneta.com/blog/s3-list-get-bucket-default/
@@ -167,8 +167,8 @@ def get_bucket(conn: S3Connection, bucket_name: Text) -> Bucket:
def upload_image_to_s3(
bucket_name: NonBinaryStr,
file_name: Text,
content_type: Optional[Text],
file_name: str,
content_type: Optional[str],
user_profile: UserProfile,
contents: bytes) -> None:
@@ -180,7 +180,7 @@ def upload_image_to_s3(
key.set_metadata("realm_id", str(user_profile.realm_id))
if content_type is not None:
headers = {'Content-Type': content_type} # type: Optional[Dict[Text, Text]]
headers = {'Content-Type': content_type} # type: Optional[Dict[str, str]]
else:
headers = None
@@ -200,7 +200,7 @@ def check_upload_within_quota(realm: Realm, uploaded_file_size: int) -> None:
if (used_space + uploaded_file_size) > upload_quota:
raise RealmUploadQuotaError(_("Upload would exceed your organization's upload quota."))
def get_file_info(request: HttpRequest, user_file: File) -> Tuple[Text, int, Optional[Text]]:
def get_file_info(request: HttpRequest, user_file: File) -> Tuple[str, int, Optional[str]]:
uploaded_file_name = user_file.name
assert isinstance(uploaded_file_name, str)
@@ -221,11 +221,11 @@ def get_file_info(request: HttpRequest, user_file: File) -> Tuple[Text, int, Opt
return uploaded_file_name, uploaded_file_size, content_type
def get_signed_upload_url(path: Text) -> Text:
def get_signed_upload_url(path: str) -> str:
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
return conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=path)
def get_realm_for_filename(path: Text) -> Optional[int]:
def get_realm_for_filename(path: str) -> Optional[int]:
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path)
if key is None:
@@ -236,9 +236,9 @@ def get_realm_for_filename(path: Text) -> Optional[int]:
class S3UploadBackend(ZulipUploadBackend):
def upload_message_file(self, uploaded_file_name: Text, uploaded_file_size: int,
content_type: Optional[Text], file_data: bytes,
user_profile: UserProfile, target_realm: Optional[Realm]=None) -> Text:
def upload_message_file(self, uploaded_file_name: str, uploaded_file_size: int,
content_type: Optional[str], file_data: bytes,
user_profile: UserProfile, target_realm: Optional[Realm]=None) -> str:
bucket_name = settings.S3_AUTH_UPLOADS_BUCKET
if target_realm is None:
target_realm = user_profile.realm
@@ -260,7 +260,7 @@ class S3UploadBackend(ZulipUploadBackend):
create_attachment(uploaded_file_name, s3_file_name, user_profile, uploaded_file_size)
return url
def delete_message_image(self, path_id: Text) -> bool:
def delete_message_image(self, path_id: str) -> bool:
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET)
@@ -311,7 +311,7 @@ class S3UploadBackend(ZulipUploadBackend):
# See avatar_url in avatar.py for URL. (That code also handles the case
# that users use gravatar.)
def get_avatar_url(self, hash_key: Text, medium: bool=False) -> Text:
def get_avatar_url(self, hash_key: str, medium: bool=False) -> str:
bucket = settings.S3_AVATAR_BUCKET
medium_suffix = "-medium.png" if medium else ""
# ?x=x allows templates to append additional parameters with &s
@@ -342,7 +342,7 @@ class S3UploadBackend(ZulipUploadBackend):
# See avatar_url in avatar.py for URL. (That code also handles the case
# that users use gravatar.)
def get_realm_icon_url(self, realm_id: int, version: int) -> Text:
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
bucket = settings.S3_AVATAR_BUCKET
# ?x=x allows templates to append additional parameters with &s
return "https://%s.s3.amazonaws.com/%s/realm/icon.png?version=%s" % (bucket, realm_id, version)
@@ -366,7 +366,7 @@ class S3UploadBackend(ZulipUploadBackend):
resized_medium
)
def upload_emoji_image(self, emoji_file: File, emoji_file_name: Text,
def upload_emoji_image(self, emoji_file: File, emoji_file_name: str,
user_profile: UserProfile) -> None:
content_type = guess_type(emoji_file.name)[0]
bucket_name = settings.S3_AVATAR_BUCKET
@@ -392,7 +392,7 @@ class S3UploadBackend(ZulipUploadBackend):
resized_image_data,
)
def get_emoji_url(self, emoji_file_name: Text, realm_id: int) -> Text:
def get_emoji_url(self, emoji_file_name: str, realm_id: int) -> str:
bucket = settings.S3_AVATAR_BUCKET
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(realm_id=realm_id,
emoji_file_name=emoji_file_name)
@@ -401,13 +401,13 @@ class S3UploadBackend(ZulipUploadBackend):
### Local
def write_local_file(type: Text, path: Text, file_data: bytes) -> None:
def write_local_file(type: str, path: str, file_data: bytes) -> None:
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, type, path)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'wb') as f:
f.write(file_data)
def get_local_file_path(path_id: Text) -> Optional[Text]:
def get_local_file_path(path_id: str) -> Optional[str]:
local_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
if os.path.isfile(local_path):
return local_path
@@ -415,9 +415,9 @@ def get_local_file_path(path_id: Text) -> Optional[Text]:
return None
class LocalUploadBackend(ZulipUploadBackend):
def upload_message_file(self, uploaded_file_name: Text, uploaded_file_size: int,
content_type: Optional[Text], file_data: bytes,
user_profile: UserProfile, target_realm: Optional[Realm]=None) -> Text:
def upload_message_file(self, uploaded_file_name: str, uploaded_file_size: int,
content_type: Optional[str], file_data: bytes,
user_profile: UserProfile, target_realm: Optional[Realm]=None) -> str:
# Split into 256 subdirectories to prevent directories from getting too big
path = "/".join([
str(user_profile.realm_id),
@@ -430,7 +430,7 @@ class LocalUploadBackend(ZulipUploadBackend):
create_attachment(uploaded_file_name, path, user_profile, uploaded_file_size)
return '/user_uploads/' + path
def delete_message_image(self, path_id: Text) -> bool:
def delete_message_image(self, path_id: str) -> bool:
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
if os.path.isfile(file_path):
# This removes the file but the empty folders still remain.
@@ -455,7 +455,7 @@ class LocalUploadBackend(ZulipUploadBackend):
resized_medium = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
write_local_file('avatars', file_path + '-medium.png', resized_medium)
def get_avatar_url(self, hash_key: Text, medium: bool=False) -> Text:
def get_avatar_url(self, hash_key: str, medium: bool=False) -> str:
# ?x=x allows templates to append additional parameters with &s
medium_suffix = "-medium" if medium else ""
return "/user_avatars/%s%s.png?x=x" % (hash_key, medium_suffix)
@@ -472,7 +472,7 @@ class LocalUploadBackend(ZulipUploadBackend):
resized_data = resize_avatar(image_data)
write_local_file(upload_path, 'icon.png', resized_data)
def get_realm_icon_url(self, realm_id: int, version: int) -> Text:
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
# ?x=x allows templates to append additional parameters with &s
return "/user_avatars/%s/realm/icon.png?version=%s" % (realm_id, version)
@@ -488,7 +488,7 @@ class LocalUploadBackend(ZulipUploadBackend):
resized_medium = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
write_local_file('avatars', file_path + '-medium.png', resized_medium)
def upload_emoji_image(self, emoji_file: File, emoji_file_name: Text,
def upload_emoji_image(self, emoji_file: File, emoji_file_name: str,
user_profile: UserProfile) -> None:
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id= user_profile.realm_id,
@@ -506,7 +506,7 @@ class LocalUploadBackend(ZulipUploadBackend):
emoji_path,
resized_image_data)
def get_emoji_url(self, emoji_file_name: Text, realm_id: int) -> Text:
def get_emoji_url(self, emoji_file_name: str, realm_id: int) -> str:
return os.path.join(
"/user_avatars",
RealmEmoji.PATH_ID_TEMPLATE.format(realm_id=realm_id, emoji_file_name=emoji_file_name))
@@ -517,7 +517,7 @@ if settings.LOCAL_UPLOADS_DIR is not None:
else:
upload_backend = S3UploadBackend()
def delete_message_image(path_id: Text) -> bool:
def delete_message_image(path_id: str) -> bool:
return upload_backend.delete_message_image(path_id)
def upload_avatar_image(user_file: File, acting_user_profile: UserProfile,
@@ -527,18 +527,18 @@ def upload_avatar_image(user_file: File, acting_user_profile: UserProfile,
def upload_icon_image(user_file: File, user_profile: UserProfile) -> None:
upload_backend.upload_realm_icon_image(user_file, user_profile)
def upload_emoji_image(emoji_file: File, emoji_file_name: Text, user_profile: UserProfile) -> None:
def upload_emoji_image(emoji_file: File, emoji_file_name: str, user_profile: UserProfile) -> None:
upload_backend.upload_emoji_image(emoji_file, emoji_file_name, user_profile)
def upload_message_file(uploaded_file_name: Text, uploaded_file_size: int,
content_type: Optional[Text], file_data: bytes,
user_profile: UserProfile, target_realm: Optional[Realm]=None) -> Text:
def upload_message_file(uploaded_file_name: str, uploaded_file_size: int,
content_type: Optional[str], file_data: bytes,
user_profile: UserProfile, target_realm: Optional[Realm]=None) -> str:
return upload_backend.upload_message_file(uploaded_file_name, uploaded_file_size,
content_type, file_data, user_profile,
target_realm=target_realm)
def claim_attachment(user_profile: UserProfile,
path_id: Text,
path_id: str,
message: Message,
is_message_realm_public: bool) -> Attachment:
attachment = Attachment.objects.get(path_id=path_id)
@@ -547,7 +547,7 @@ def claim_attachment(user_profile: UserProfile,
attachment.save()
return attachment
def create_attachment(file_name: Text, path_id: Text, user_profile: UserProfile,
def create_attachment(file_name: str, path_id: str, user_profile: UserProfile,
file_size: int) -> bool:
attachment = Attachment.objects.create(file_name=file_name, path_id=path_id, owner=user_profile,
realm=user_profile.realm, size=file_size)
@@ -556,7 +556,7 @@ def create_attachment(file_name: Text, path_id: Text, user_profile: UserProfile,
return True
def upload_message_image_from_request(request: HttpRequest, user_file: File,
user_profile: UserProfile) -> Text:
user_profile: UserProfile) -> str:
uploaded_file_name, uploaded_file_size, content_type = get_file_info(request, user_file)
return upload_message_file(uploaded_file_name, uploaded_file_size,
content_type, user_file.read(), user_profile)

View File

@@ -1,4 +1,4 @@
from typing import Dict, List, Optional, Text
from typing import Dict, List, Optional
from django.db.models.query import QuerySet
from django.utils.translation import ugettext as _
@@ -11,7 +11,7 @@ from zerver.models import UserProfile, Service, Realm, \
from zulip_bots.custom_exceptions import ConfigValidationError
def check_full_name(full_name_raw: Text) -> Text:
def check_full_name(full_name_raw: str) -> str:
full_name = full_name_raw.strip()
if len(full_name) > UserProfile.MAX_NAME_LENGTH:
raise JsonableError(_("Name too long!"))
@@ -21,7 +21,7 @@ def check_full_name(full_name_raw: Text) -> Text:
raise JsonableError(_("Invalid characters in name!"))
return full_name
def check_short_name(short_name_raw: Text) -> Text:
def check_short_name(short_name_raw: str) -> str:
short_name = short_name_raw.strip()
if len(short_name) == 0:
raise JsonableError(_("Bad name or username"))

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Iterable, Set, Tuple, Text
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Iterable, Set, Tuple
import base64
import errno
import hashlib
@@ -85,8 +85,8 @@ def run_in_batches(all_list: Sequence[T],
if i != limit - 1:
sleep(sleep_time)
def make_safe_digest(string: Text,
hash_func: Callable[[bytes], Any]=hashlib.sha1) -> Text:
def make_safe_digest(string: str,
hash_func: Callable[[bytes], Any]=hashlib.sha1) -> str:
"""
return a hex digest of `string`.
"""
@@ -178,7 +178,7 @@ def split_by(array: List[Any], group_size: int, filler: Any) -> List[List[Any]]:
args = [iter(array)] * group_size
return list(map(list, zip_longest(*args, fillvalue=filler)))
def is_remote_server(identifier: Text) -> bool:
def is_remote_server(identifier: str) -> bool:
"""
This function can be used to identify the source of API auth
request. We can have two types of sources, Remote Zulip Servers

View File

@@ -29,7 +29,7 @@ import ujson
from django.utils.translation import ugettext as _
from django.core.exceptions import ValidationError
from django.core.validators import validate_email, URLValidator
from typing import Callable, Iterable, Optional, Tuple, TypeVar, Text, cast, \
from typing import Callable, Iterable, Optional, Tuple, TypeVar, cast, \
Dict
from datetime import datetime
@@ -189,7 +189,7 @@ def equals(expected_val: object) -> Validator:
return None
return f
def validate_login_email(email: Text) -> None:
def validate_login_email(email: str) -> None:
try:
validate_email(email)
except ValidationError as err:

View File

@@ -1,7 +1,7 @@
from django.conf import settings
from django.http import HttpRequest
from django.utils.translation import ugettext as _
from typing import Optional, Text
from typing import Optional
from zerver.lib.actions import check_send_stream_message, \
check_send_private_message, send_rate_limited_pm_notification_to_bot_owner
@@ -28,7 +28,7 @@ class MissingHTTPEventHeader(JsonableError):
code = ErrorCode.MISSING_HTTP_EVENT_HEADER
data_fields = ['header']
def __init__(self, header: Text) -> None:
def __init__(self, header: str) -> None:
self.header = header
@staticmethod
@@ -38,8 +38,8 @@ class MissingHTTPEventHeader(JsonableError):
@has_request_variables
def check_send_webhook_message(
request: HttpRequest, user_profile: UserProfile,
topic: Text, body: Text, stream: Optional[Text]=REQ(default=None),
user_specified_topic: Optional[Text]=REQ("topic", default=None)
topic: str, body: str, stream: Optional[str]=REQ(default=None),
user_specified_topic: Optional[str]=REQ("topic", default=None)
) -> None:
if stream is None:
@@ -60,8 +60,8 @@ def check_send_webhook_message(
# webhook-errors.log
pass
def validate_extract_webhook_http_header(request: HttpRequest, header: Text,
integration_name: Text) -> Text:
def validate_extract_webhook_http_header(request: HttpRequest, header: str,
integration_name: str) -> str:
extracted_header = request.META.get(DJANGO_HTTP_PREFIX + header)
if extracted_header is None:
message_body = MISSING_EVENT_HEADER_MESSAGE.format(