zerver/lib: Use python 3 syntax for typing.

Edited by tabbott to improve various line-wrapping decisions.
This commit is contained in:
rht
2017-11-05 11:15:10 +01:00
committed by Tim Abbott
parent 229a8b38c0
commit ee546a33a3
8 changed files with 523 additions and 746 deletions

View File

@@ -61,12 +61,7 @@ FullNameInfo = TypedDict('FullNameInfo', {
version = 1
_T = TypeVar('_T')
# We need to avoid this running at runtime, but mypy will see this.
# The problem is that under python 2, Element isn't exactly a type,
# which means that at runtime Union causes this to blow up.
if False:
# mypy requires the Optional to be inside Union
ElementStringNone = Union[Element, Optional[Text]]
ElementStringNone = Union[Element, Optional[Text]]
AVATAR_REGEX = r'!avatar\((?P<email>[^)]*)\)'
GRAVATAR_REGEX = r'!gravatar\((?P<email>[^)]*)\)'
@@ -82,8 +77,7 @@ STREAM_LINK_REGEX = r"""
class BugdownRenderingException(Exception):
pass
def url_embed_preview_enabled_for_realm(message):
# type: (Optional[Message]) -> bool
def url_embed_preview_enabled_for_realm(message: Optional[Message]) -> bool:
if message is not None:
realm = message.get_realm() # type: Optional[Realm]
else:
@@ -95,8 +89,7 @@ def url_embed_preview_enabled_for_realm(message):
return True
return realm.inline_url_embed_preview
def image_preview_enabled_for_realm():
# type: () -> bool
def image_preview_enabled_for_realm() -> bool:
global current_message
if current_message is not None:
realm = current_message.get_realm() # type: Optional[Realm]
@@ -108,8 +101,7 @@ def image_preview_enabled_for_realm():
return True
return realm.inline_image_preview
def list_of_tlds():
# type: () -> List[Text]
def list_of_tlds() -> List[Text]:
# HACK we manually blacklist a few domains
blacklist = ['PY\n', "MD\n"]
@@ -120,8 +112,9 @@ def list_of_tlds():
tlds.sort(key=len, reverse=True)
return tlds
def walk_tree(root, processor, stop_after_first=False):
# type: (Element, Callable[[Element], Optional[_T]], bool) -> List[_T]
def walk_tree(root: Element,
processor: Callable[[Element], Optional[_T]],
stop_after_first: bool=False) -> List[_T]:
results = []
queue = deque([root])
@@ -166,8 +159,7 @@ def add_a(root, url, link, title=None, desc=None,
desc_div.set("class", "message_inline_image_desc")
def add_embed(root, link, extracted_data):
# type: (Element, Text, Dict[Text, Any]) -> None
def add_embed(root: Element, link: Text, extracted_data: Dict[Text, Any]) -> None:
container = markdown.util.etree.SubElement(root, "div")
container.set("class", "message_embed")
@@ -206,8 +198,7 @@ def add_embed(root, link, extracted_data):
@cache_with_key(lambda tweet_id: tweet_id, cache_name="database", with_statsd_key="tweet_data")
def fetch_tweet_data(tweet_id):
# type: (Text) -> Optional[Dict[Text, Any]]
def fetch_tweet_data(tweet_id: Text) -> Optional[Dict[Text, Any]]:
if settings.TEST_SUITE:
from . import testing_mocks
res = testing_mocks.twitter(tweet_id)
@@ -266,8 +257,7 @@ HEAD_END_RE = re.compile('^/head[ >]')
META_START_RE = re.compile('^meta[ >]')
META_END_RE = re.compile('^/meta[ >]')
def fetch_open_graph_image(url):
# type: (Text) -> Optional[Dict[str, Any]]
def fetch_open_graph_image(url: Text) -> Optional[Dict[str, Any]]:
in_head = False
# HTML will auto close meta tags, when we start the next tag add
# a closing tag if it has not been closed yet.
@@ -333,8 +323,7 @@ def fetch_open_graph_image(url):
desc = og_desc.get('content')
return {'image': image, 'title': title, 'desc': desc}
def get_tweet_id(url):
# type: (Text) -> Optional[Text]
def get_tweet_id(url: Text) -> Optional[Text]:
parsed_url = urllib.parse.urlparse(url)
if not (parsed_url.netloc == 'twitter.com' or parsed_url.netloc.endswith('.twitter.com')):
return None
@@ -350,8 +339,7 @@ def get_tweet_id(url):
return tweet_id_match.group("tweetid")
class InlineHttpsProcessor(markdown.treeprocessors.Treeprocessor):
def run(self, root):
# type: (Element) -> None
def run(self, root: Element) -> None:
# Get all URLs from the blob
found_imgs = walk_tree(root, lambda e: e if e.tag == "img" else None)
for img in found_imgs:
@@ -365,14 +353,12 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
TWITTER_MAX_IMAGE_HEIGHT = 400
TWITTER_MAX_TO_PREVIEW = 3
def __init__(self, md, bugdown):
# type: (markdown.Markdown, Bugdown) -> None
def __init__(self, md: markdown.Markdown, bugdown: 'Bugdown') -> None:
# Passing in bugdown for access to config to check if realm is zulip.com
self.bugdown = bugdown
markdown.treeprocessors.Treeprocessor.__init__(self, md)
def get_actual_image_url(self, url):
# type: (Text) -> Text
def get_actual_image_url(self, url: Text) -> Text:
# Add specific per-site cases to convert image-preview urls to image urls.
# See https://github.com/zulip/zulip/issues/4658 for more information
parsed_url = urllib.parse.urlparse(url)
@@ -386,8 +372,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return url
def is_image(self, url):
# type: (Text) -> bool
def is_image(self, url: Text) -> bool:
if not image_preview_enabled_for_realm():
return False
parsed_url = urllib.parse.urlparse(url)
@@ -397,8 +382,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return True
return False
def dropbox_image(self, url):
# type: (Text) -> Optional[Dict[str, Any]]
def dropbox_image(self, url: Text) -> Optional[Dict[str, Any]]:
# TODO: The returned Dict could possibly be a TypedDict in future.
parsed_url = urllib.parse.urlparse(url)
if (parsed_url.netloc == 'dropbox.com' or parsed_url.netloc.endswith('.dropbox.com')):
@@ -443,8 +427,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return image_info
return None
def youtube_id(self, url):
# type: (Text) -> Optional[Text]
def youtube_id(self, url: Text) -> Optional[Text]:
if not image_preview_enabled_for_realm():
return None
# Youtube video id extraction regular expression from http://pastebin.com/KyKAFv1s
@@ -457,16 +440,17 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return None
return match.group(2)
def youtube_image(self, url):
# type: (Text) -> Optional[Text]
def youtube_image(self, url: Text) -> Optional[Text]:
yt_id = self.youtube_id(url)
if yt_id is not None:
return "https://i.ytimg.com/vi/%s/default.jpg" % (yt_id,)
return None
def twitter_text(self, text, urls, user_mentions, media):
# type: (Text, List[Dict[Text, Text]], List[Dict[Text, Any]], List[Dict[Text, Any]]) -> Element
def twitter_text(self, text: Text,
urls: List[Dict[Text, Text]],
user_mentions: List[Dict[Text, Any]],
media: List[Dict[Text, Any]]) -> Element:
"""
Use data from the twitter API to turn links, mentions and media into A
tags. Also convert unicode emojis to images.
@@ -542,8 +526,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
to_process.sort(key=lambda x: x['start'])
p = current_node = markdown.util.etree.Element('p')
def set_text(text):
# type: (Text) -> None
def set_text(text: Text) -> None:
"""
Helper to set the text or the tail of the current_node
"""
@@ -571,8 +554,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
set_text(text[current_index:])
return p
def twitter_link(self, url):
# type: (Text) -> Optional[Element]
def twitter_link(self, url: Text) -> Optional[Element]:
tweet_id = get_tweet_id(url)
if tweet_id is None:
@@ -641,16 +623,14 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
logging.warning(traceback.format_exc())
return None
def get_url_data(self, e):
# type: (Element) -> Optional[Tuple[Text, Text]]
def get_url_data(self, e: Element) -> Optional[Tuple[Text, Text]]:
if e.tag == "a":
if e.text is not None:
return (e.get("href"), e.text)
return (e.get("href"), e.get("href"))
return None
def is_only_element(self, root, url):
# type: (Element, str) -> bool
def is_only_element(self, root: Element, url: str) -> bool:
# Check if the url is the only content of the message.
if not len(root) == 1:
@@ -668,8 +648,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return True
def run(self, root):
# type: (Element) -> None
def run(self, root: Element) -> None:
# Get all URLs from the blob
found_urls = walk_tree(root, self.get_url_data)
@@ -735,8 +714,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
class Avatar(markdown.inlinepatterns.Pattern):
def handleMatch(self, match):
# type: (Match[Text]) -> Optional[Element]
def handleMatch(self, match: Match[Text]) -> Optional[Element]:
img = markdown.util.etree.Element('img')
email_address = match.group('email')
email = email_address.strip().lower()
@@ -753,8 +731,7 @@ class Avatar(markdown.inlinepatterns.Pattern):
img.set('alt', email)
return img
def possible_avatar_emails(content):
# type: (Text) -> Set[Text]
def possible_avatar_emails(content: Text) -> Set[Text]:
emails = set()
for regex in [AVATAR_REGEX, GRAVATAR_REGEX]:
matches = re.findall(regex, content)
@@ -819,8 +796,7 @@ unicode_emoji_regex = '(?P<syntax>['\
# For more information, please refer to the following article:
# http://crocodillon.com/blog/parsing-emoji-unicode-in-javascript
def make_emoji(codepoint, display_string):
# type: (Text, Text) -> Element
def make_emoji(codepoint: Text, display_string: Text) -> Element:
# Replace underscore in emoji's title with space
title = display_string[1:-1].replace("_", " ")
span = markdown.util.etree.Element('span')
@@ -829,8 +805,7 @@ def make_emoji(codepoint, display_string):
span.text = display_string
return span
def make_realm_emoji(src, display_string):
# type: (Text, Text) -> Element
def make_realm_emoji(src: Text, display_string: Text) -> Element:
elt = markdown.util.etree.Element('img')
elt.set('src', src)
elt.set('class', 'emoji')
@@ -838,8 +813,7 @@ def make_realm_emoji(src, display_string):
elt.set("title", display_string[1:-1].replace("_", " "))
return elt
def unicode_emoji_to_codepoint(unicode_emoji):
# type: (Text) -> Text
def unicode_emoji_to_codepoint(unicode_emoji: Text) -> Text:
codepoint = hex(ord(unicode_emoji))[2:]
# Unicode codepoints are minimum of length 4, padded
# with zeroes if the length is less than zero.
@@ -848,8 +822,7 @@ def unicode_emoji_to_codepoint(unicode_emoji):
return codepoint
class UnicodeEmoji(markdown.inlinepatterns.Pattern):
def handleMatch(self, match):
# type: (Match[Text]) -> Optional[Element]
def handleMatch(self, match: Match[Text]) -> Optional[Element]:
orig_syntax = match.group('syntax')
codepoint = unicode_emoji_to_codepoint(orig_syntax)
if codepoint in codepoint_to_name:
@@ -859,8 +832,7 @@ class UnicodeEmoji(markdown.inlinepatterns.Pattern):
return None
class Emoji(markdown.inlinepatterns.Pattern):
def handleMatch(self, match):
# type: (Match[Text]) -> Optional[Element]
def handleMatch(self, match: Match[Text]) -> Optional[Element]:
orig_syntax = match.group("syntax")
name = orig_syntax[1:-1]
@@ -877,15 +849,13 @@ class Emoji(markdown.inlinepatterns.Pattern):
else:
return None
def content_has_emoji_syntax(content):
# type: (Text) -> bool
def content_has_emoji_syntax(content: Text) -> bool:
return re.search(EMOJI_REGEX, content) is not None
class StreamSubscribeButton(markdown.inlinepatterns.Pattern):
# This markdown extension has required javascript in
# static/js/custom_markdown.js
def handleMatch(self, match):
# type: (Match[Text]) -> Element
def handleMatch(self, match: Match[Text]) -> Element:
stream_name = match.group('stream_name')
stream_name = stream_name.replace('\\)', ')').replace('\\\\', '\\')
@@ -907,8 +877,7 @@ class ModalLink(markdown.inlinepatterns.Pattern):
A pattern that allows including in-app modal links in messages.
"""
def handleMatch(self, match):
# type: (Match[Text]) -> Element
def handleMatch(self, match: Match[Text]) -> Element:
relative_url = match.group('relative_url')
text = match.group('text')
@@ -920,8 +889,7 @@ class ModalLink(markdown.inlinepatterns.Pattern):
return a_tag
class Tex(markdown.inlinepatterns.Pattern):
def handleMatch(self, match):
# type: (Match[Text]) -> Element
def handleMatch(self, match: Match[Text]) -> Element:
rendered = render_tex(match.group('body'), is_inline=True)
if rendered is not None:
return etree.fromstring(rendered.encode('utf-8'))
@@ -932,8 +900,7 @@ class Tex(markdown.inlinepatterns.Pattern):
return span
upload_title_re = re.compile("^(https?://[^/]*)?(/user_uploads/\\d+)(/[^/]*)?/[^/]*/(?P<filename>[^/]*)$")
def url_filename(url):
# type: (Text) -> Text
def url_filename(url: Text) -> Text:
"""Extract the filename if a URL is an uploaded file, or return the original URL"""
match = upload_title_re.match(url)
if match:
@@ -941,16 +908,14 @@ def url_filename(url):
else:
return url
def fixup_link(link, target_blank=True):
# type: (markdown.util.etree.Element, bool) -> None
def fixup_link(link: markdown.util.etree.Element, target_blank: bool=True) -> None:
"""Set certain attributes we want on every link."""
if target_blank:
link.set('target', '_blank')
link.set('title', url_filename(link.get('href')))
def sanitize_url(url):
# type: (Text) -> Optional[Text]
def sanitize_url(url: Text) -> Optional[Text]:
"""
Sanitize a url against xss attacks.
See the docstring on markdown.inlinepatterns.LinkPattern.sanitize_url.
@@ -1004,8 +969,7 @@ def sanitize_url(url):
# Url passes all tests. Return url as-is.
return urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
def url_to_a(url, text = None):
# type: (Text, Optional[Text]) -> Union[Element, Text]
def url_to_a(url: Text, text: Optional[Text]=None) -> Union[Element, Text]:
a = markdown.util.etree.Element('a')
href = sanitize_url(url)
@@ -1032,8 +996,7 @@ def url_to_a(url, text = None):
return a
class VerbosePattern(markdown.inlinepatterns.Pattern):
def __init__(self, pattern):
# type: (Text) -> None
def __init__(self, pattern: Text) -> None:
markdown.inlinepatterns.Pattern.__init__(self, ' ')
# HACK: we just had python-markdown compile an empty regex.
@@ -1044,8 +1007,7 @@ class VerbosePattern(markdown.inlinepatterns.Pattern):
re.DOTALL | re.UNICODE | re.VERBOSE)
class AutoLink(VerbosePattern):
def handleMatch(self, match):
# type: (Match[Text]) -> ElementStringNone
def handleMatch(self, match: Match[Text]) -> ElementStringNone:
url = match.group('url')
return url_to_a(url)
@@ -1058,8 +1020,7 @@ class UListProcessor(markdown.blockprocessors.UListProcessor):
TAG = 'ul'
RE = re.compile('^[ ]{0,3}[*][ ]+(.*)')
def __init__(self, parser):
# type: (Any) -> None
def __init__(self, parser: Any) -> None:
# HACK: Set the tab length to 2 just for the initialization of
# this class, so that bulleted lists (and only bulleted lists)
@@ -1074,8 +1035,7 @@ class ListIndentProcessor(markdown.blockprocessors.ListIndentProcessor):
Based on markdown.blockprocessors.ListIndentProcessor, but with 2-space indent
"""
def __init__(self, parser):
# type: (Any) -> None
def __init__(self, parser: Any) -> None:
# HACK: Set the tab length to 2 just for the initialization of
# this class, so that bulleted lists (and only bulleted lists)
@@ -1095,8 +1055,7 @@ class BugdownUListPreprocessor(markdown.preprocessors.Preprocessor):
LI_RE = re.compile('^[ ]{0,3}[*][ ]+(.*)', re.MULTILINE)
HANGING_ULIST_RE = re.compile('^.+\\n([ ]{0,3}[*][ ]+.*)', re.MULTILINE)
def run(self, lines):
# type: (List[Text]) -> List[Text]
def run(self, lines: List[Text]) -> List[Text]:
""" Insert a newline between a paragraph and ulist if missing """
inserts = 0
fence = None
@@ -1123,8 +1082,7 @@ class BugdownUListPreprocessor(markdown.preprocessors.Preprocessor):
class LinkPattern(markdown.inlinepatterns.Pattern):
""" Return a link element from the given match. """
def handleMatch(self, m):
# type: (Match[Text]) -> Optional[Element]
def handleMatch(self, m: Match[Text]) -> Optional[Element]:
href = m.group(9)
if not href:
return None
@@ -1141,8 +1099,7 @@ class LinkPattern(markdown.inlinepatterns.Pattern):
fixup_link(el, target_blank = (href[:1] != '#'))
return el
def prepare_realm_pattern(source):
# type: (Text) -> Text
def prepare_realm_pattern(source: Text) -> Text:
""" Augment a realm filter so it only matches after start-of-string,
whitespace, or opening delimiters, won't match if there are word
characters directly after, and saves what was matched as "name". """
@@ -1153,20 +1110,19 @@ def prepare_realm_pattern(source):
class RealmFilterPattern(markdown.inlinepatterns.Pattern):
""" Applied a given realm filter to the input """
def __init__(self, source_pattern, format_string, markdown_instance=None):
# type: (Text, Text, Optional[markdown.Markdown]) -> None
def __init__(self, source_pattern: Text,
format_string: Text,
markdown_instance: Optional[markdown.Markdown]=None) -> None:
self.pattern = prepare_realm_pattern(source_pattern)
self.format_string = format_string
markdown.inlinepatterns.Pattern.__init__(self, self.pattern, markdown_instance)
def handleMatch(self, m):
# type: (Match[Text]) -> Union[Element, Text]
def handleMatch(self, m: Match[Text]) -> Union[Element, Text]:
return url_to_a(self.format_string % m.groupdict(),
m.group("name"))
class UserMentionPattern(markdown.inlinepatterns.Pattern):
def handleMatch(self, m):
# type: (Match[Text]) -> Optional[Element]
def handleMatch(self, m: Match[Text]) -> Optional[Element]:
match = m.group(2)
if current_message and db_data is not None:
@@ -1202,8 +1158,7 @@ class UserMentionPattern(markdown.inlinepatterns.Pattern):
return None
class UserGroupMentionPattern(markdown.inlinepatterns.Pattern):
def handleMatch(self, m):
# type: (Match[Text]) -> Optional[Element]
def handleMatch(self, m: Match[Text]) -> Optional[Element]:
match = m.group(2)
if current_message and db_data is not None:
@@ -1226,15 +1181,13 @@ class UserGroupMentionPattern(markdown.inlinepatterns.Pattern):
return None
class StreamPattern(VerbosePattern):
def find_stream_by_name(self, name):
# type: (Match[Text]) -> Optional[Dict[str, Any]]
def find_stream_by_name(self, name: Match[Text]) -> Optional[Dict[str, Any]]:
if db_data is None:
return None
stream = db_data['stream_names'].get(name)
return stream
def handleMatch(self, m):
# type: (Match[Text]) -> Optional[Element]
def handleMatch(self, m: Match[Text]) -> Optional[Element]:
name = m.group('stream_name')
if current_message:
@@ -1254,14 +1207,12 @@ class StreamPattern(VerbosePattern):
return el
return None
def possible_linked_stream_names(content):
# type: (Text) -> Set[Text]
def possible_linked_stream_names(content: Text) -> Set[Text]:
matches = re.findall(STREAM_LINK_REGEX, content, re.VERBOSE)
return set(matches)
class AlertWordsNotificationProcessor(markdown.preprocessors.Preprocessor):
def run(self, lines):
# type: (Iterable[Text]) -> Iterable[Text]
def run(self, lines: Iterable[Text]) -> Iterable[Text]:
if current_message and db_data is not None:
# We check for alert words here, the set of which are
# dependent on which users may see this message.
@@ -1292,8 +1243,7 @@ class AlertWordsNotificationProcessor(markdown.preprocessors.Preprocessor):
# Markdown link, breaking up the link. This is a monkey-patch, but it
# might be worth sending a version of this change upstream.
class AtomicLinkPattern(LinkPattern):
def handleMatch(self, m):
# type: (Match[Text]) -> Optional[Element]
def handleMatch(self, m: Match[Text]) -> Optional[Element]:
ret = LinkPattern.handleMatch(self, m)
if ret is None:
return None
@@ -1307,8 +1257,7 @@ DEFAULT_BUGDOWN_KEY = -1
ZEPHYR_MIRROR_BUGDOWN_KEY = -2
class Bugdown(markdown.Extension):
def __init__(self, *args, **kwargs):
# type: (*Any, **Union[bool, int, List[Any]]) -> None
def __init__(self, *args: Any, **kwargs: Union[bool, int, List[Any]]) -> None:
# define default configs
self.config = {
"realm_filters": [kwargs['realm_filters'],
@@ -1320,8 +1269,7 @@ class Bugdown(markdown.Extension):
super().__init__(*args, **kwargs)
def extendMarkdown(self, md, md_globals):
# type: (markdown.Markdown, Dict[str, Any]) -> None
def extendMarkdown(self, md: markdown.Markdown, md_globals: Dict[str, Any]) -> None:
del md.preprocessors['reference']
if self.getConfig('code_block_processor_disabled'):
@@ -1476,13 +1424,11 @@ md_engines = {} # type: Dict[Tuple[int, bool], markdown.Markdown]
realm_filter_data = {} # type: Dict[int, List[Tuple[Text, Text, int]]]
class EscapeHtml(markdown.Extension):
def extendMarkdown(self, md, md_globals):
# type: (markdown.Markdown, Dict[str, Any]) -> None
def extendMarkdown(self, md: markdown.Markdown, md_globals: Dict[str, Any]) -> None:
del md.preprocessors['html_block']
del md.inlinePatterns['html']
def make_md_engine(realm_filters_key, email_gateway):
# type: (int, bool) -> None
def make_md_engine(realm_filters_key: int, email_gateway: bool) -> None:
md_engine_key = (realm_filters_key, email_gateway)
if md_engine_key in md_engines:
del md_engines[md_engine_key]
@@ -1503,8 +1449,7 @@ def make_md_engine(realm_filters_key, email_gateway):
realm=realm_filters_key,
code_block_processor_disabled=email_gateway)])
def subject_links(realm_filters_key, subject):
# type: (int, Text) -> List[Text]
def subject_links(realm_filters_key: int, subject: Text) -> List[Text]:
matches = [] # type: List[Text]
realm_filters = realm_filters_for_realm(realm_filters_key)
@@ -1515,8 +1460,7 @@ def subject_links(realm_filters_key, subject):
matches += [realm_filter[1] % m.groupdict()]
return matches
def maybe_update_markdown_engines(realm_filters_key, email_gateway):
# type: (Optional[int], bool) -> None
def maybe_update_markdown_engines(realm_filters_key: Optional[int], email_gateway: bool) -> None:
# If realm_filters_key is None, load all filters
global realm_filter_data
if realm_filters_key is None:
@@ -1551,8 +1495,7 @@ def maybe_update_markdown_engines(realm_filters_key, email_gateway):
# We also use repr() to improve reproducibility, and to escape terminal control
# codes, which can do surprisingly nasty things.
_privacy_re = re.compile('\\w', flags=re.UNICODE)
def privacy_clean_markdown(content):
# type: (Text) -> Text
def privacy_clean_markdown(content: Text) -> Text:
return repr(_privacy_re.sub('x', content))
@@ -1565,16 +1508,14 @@ current_message = None # type: Optional[Message]
# threads themselves, as well.
db_data = None # type: Optional[Dict[Text, Any]]
def log_bugdown_error(msg):
# type: (str) -> None
def log_bugdown_error(msg: str) -> None:
"""We use this unusual logging approach to log the bugdown error, in
order to prevent AdminZulipHandler from sending the santized
original markdown formatting into another Zulip message, which
could cause an infinite exception loop."""
logging.getLogger('').error(msg)
def get_email_info(realm_id, emails):
# type: (int, Set[Text]) -> Dict[Text, FullNameInfo]
def get_email_info(realm_id: int, emails: Set[Text]) -> Dict[Text, FullNameInfo]:
if not emails:
return dict()
@@ -1598,8 +1539,7 @@ def get_email_info(realm_id, emails):
}
return dct
def get_full_name_info(realm_id, full_names):
# type: (int, Set[Text]) -> Dict[Text, FullNameInfo]
def get_full_name_info(realm_id: int, full_names: Set[Text]) -> Dict[Text, FullNameInfo]:
if not full_names:
return dict()
@@ -1626,8 +1566,7 @@ def get_full_name_info(realm_id, full_names):
return dct
class MentionData:
def __init__(self, realm_id, content):
# type: (int, Text) -> None
def __init__(self, realm_id: int, content: Text) -> None:
full_names = possible_mentions(content)
self.full_name_info = get_full_name_info(realm_id, full_names)
self.user_ids = {
@@ -1645,12 +1584,10 @@ class MentionData:
user_profile_id = info['user_profile_id']
self.user_group_members[group_id].append(user_profile_id)
def get_user(self, name):
# type: (Text) -> Optional[FullNameInfo]
def get_user(self, name: Text) -> Optional[FullNameInfo]:
return self.full_name_info.get(name.lower(), None)
def get_user_ids(self):
# type: () -> Set[int]
def get_user_ids(self) -> Set[int]:
"""
Returns the user IDs that might have been mentioned by this
content. Note that because this data structure has not parsed
@@ -1659,16 +1596,13 @@ class MentionData:
"""
return self.user_ids
def get_user_group(self, name):
# type: (Text) -> Optional[UserGroup]
def get_user_group(self, name: Text) -> Optional[UserGroup]:
return self.user_group_name_info.get(name.lower(), None)
def get_group_members(self, user_group_id):
# type: (int) -> List[int]
def get_group_members(self, user_group_id: int) -> List[int]:
return self.user_group_members.get(user_group_id, [])
def get_user_group_name_info(realm_id, user_group_names):
# type: (int, Set[Text]) -> Dict[Text, UserGroup]
def get_user_group_name_info(realm_id: int, user_group_names: Set[Text]) -> Dict[Text, UserGroup]:
if not user_group_names:
return dict()
@@ -1677,8 +1611,7 @@ def get_user_group_name_info(realm_id, user_group_names):
dct = {row.name.lower(): row for row in rows}
return dct
def get_stream_name_info(realm, stream_names):
# type: (Realm, Set[Text]) -> Dict[Text, FullNameInfo]
def get_stream_name_info(realm: Realm, stream_names: Set[Text]) -> Dict[Text, FullNameInfo]:
if not stream_names:
return dict()
@@ -1703,9 +1636,13 @@ def get_stream_name_info(realm, stream_names):
return dct
def do_convert(content, message=None, message_realm=None, possible_words=None, sent_by_bot=False,
mention_data=None, email_gateway=False):
# type: (Text, Optional[Message], Optional[Realm], Optional[Set[Text]], Optional[bool], Optional[MentionData], Optional[bool]) -> Text
def do_convert(content: Text,
message: Optional[Message]=None,
message_realm: Optional[Realm]=None,
possible_words: Optional[Set[Text]]=None,
sent_by_bot: Optional[bool]=False,
mention_data: Optional[MentionData]=None,
email_gateway: Optional[bool]=False) -> Text:
"""Convert Markdown to HTML, with Zulip-specific settings and hacks."""
# This logic is a bit convoluted, but the overall goal is to support a range of use cases:
# * Nothing is passed in other than content -> just run default options (e.g. for docs)
@@ -1803,30 +1740,30 @@ bugdown_time_start = 0.0
bugdown_total_time = 0.0
bugdown_total_requests = 0
def get_bugdown_time():
# type: () -> float
def get_bugdown_time() -> float:
return bugdown_total_time
def get_bugdown_requests():
# type: () -> int
def get_bugdown_requests() -> int:
return bugdown_total_requests
def bugdown_stats_start():
# type: () -> None
def bugdown_stats_start() -> None:
global bugdown_time_start
bugdown_time_start = time.time()
def bugdown_stats_finish():
# type: () -> None
def bugdown_stats_finish() -> None:
global bugdown_total_time
global bugdown_total_requests
global bugdown_time_start
bugdown_total_requests += 1
bugdown_total_time += (time.time() - bugdown_time_start)
def convert(content, message=None, message_realm=None, possible_words=None, sent_by_bot=False,
mention_data=None, email_gateway=False):
# type: (Text, Optional[Message], Optional[Realm], Optional[Set[Text]], Optional[bool], Optional[MentionData], Optional[bool]) -> Text
def convert(content: Text,
message: Optional[Message]=None,
message_realm: Optional[Realm]=None,
possible_words: Optional[Set[Text]]=None,
sent_by_bot: Optional[bool]=False,
mention_data: Optional[MentionData]=None,
email_gateway: Optional[bool]=False) -> Text:
bugdown_stats_start()
ret = do_convert(content, message, message_realm,
possible_words, sent_by_bot, mention_data, email_gateway)

View File

@@ -110,8 +110,7 @@ LANG_TAG = ' class="%s"'
class FencedCodeExtension(markdown.Extension):
def extendMarkdown(self, md, md_globals):
# type: (markdown.Markdown, Dict[str, Any]) -> None
def extendMarkdown(self, md: markdown.Markdown, md_globals: Dict[str, Any]) -> None:
""" Add FencedBlockPreprocessor to the Markdown instance. """
md.registerExtension(self)
@@ -127,41 +126,34 @@ class FencedCodeExtension(markdown.Extension):
class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
def __init__(self, md):
# type: (markdown.Markdown) -> None
def __init__(self, md: markdown.Markdown) -> None:
markdown.preprocessors.Preprocessor.__init__(self, md)
self.checked_for_codehilite = False
self.codehilite_conf = {} # type: Dict[str, List[Any]]
def run(self, lines):
# type: (Iterable[Text]) -> List[Text]
def run(self, lines: Iterable[Text]) -> List[Text]:
""" Match and store Fenced Code Blocks in the HtmlStash. """
output = [] # type: List[Text]
class BaseHandler:
def handle_line(self, line):
# type: (Text) -> None
def handle_line(self, line: Text) -> None:
raise NotImplementedError()
def done(self):
# type: () -> None
def done(self) -> None:
raise NotImplementedError()
processor = self
handlers = [] # type: List[BaseHandler]
def push(handler):
# type: (BaseHandler) -> None
def push(handler: BaseHandler) -> None:
handlers.append(handler)
def pop():
# type: () -> None
def pop() -> None:
handlers.pop()
def check_for_new_fence(output, line):
# type: (MutableSequence[Text], Text) -> None
def check_for_new_fence(output: MutableSequence[Text], line: Text) -> None:
m = FENCE_RE.match(line)
if m:
fence = m.group('fence')
@@ -172,20 +164,16 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
output.append(line)
class OuterHandler(BaseHandler):
def __init__(self, output):
# type: (MutableSequence[Text]) -> None
def __init__(self, output: MutableSequence[Text]) -> None:
self.output = output
def handle_line(self, line):
# type: (Text) -> None
def handle_line(self, line: Text) -> None:
check_for_new_fence(self.output, line)
def done(self):
# type: () -> None
def done(self) -> None:
pop()
def generic_handler(output, fence, lang):
# type: (MutableSequence[Text], Text, Text) -> BaseHandler
def generic_handler(output: MutableSequence[Text], fence: Text, lang: Text) -> BaseHandler:
if lang in ('quote', 'quoted'):
return QuoteHandler(output, fence)
elif lang in ('math', 'tex', 'latex'):
@@ -194,22 +182,19 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
return CodeHandler(output, fence, lang)
class CodeHandler(BaseHandler):
def __init__(self, output, fence, lang):
# type: (MutableSequence[Text], Text, Text) -> None
def __init__(self, output: MutableSequence[Text], fence: Text, lang: Text) -> None:
self.output = output
self.fence = fence
self.lang = lang
self.lines = [] # type: List[Text]
def handle_line(self, line):
# type: (Text) -> None
def handle_line(self, line: Text) -> None:
if line.rstrip() == self.fence:
self.done()
else:
self.lines.append(line.rstrip())
def done(self):
# type: () -> None
def done(self) -> None:
text = '\n'.join(self.lines)
text = processor.format_code(self.lang, text)
text = processor.placeholder(text)
@@ -220,21 +205,18 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
pop()
class QuoteHandler(BaseHandler):
def __init__(self, output, fence):
# type: (MutableSequence[Text], Text) -> None
def __init__(self, output: MutableSequence[Text], fence: Text) -> None:
self.output = output
self.fence = fence
self.lines = [] # type: List[Text]
def handle_line(self, line):
# type: (Text) -> None
def handle_line(self, line: Text) -> None:
if line.rstrip() == self.fence:
self.done()
else:
check_for_new_fence(self.lines, line)
def done(self):
# type: () -> None
def done(self) -> None:
text = '\n'.join(self.lines)
text = processor.format_quote(text)
processed_lines = text.split('\n')
@@ -244,21 +226,18 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
pop()
class TexHandler(BaseHandler):
def __init__(self, output, fence):
# type: (MutableSequence[Text], Text) -> None
def __init__(self, output: MutableSequence[Text], fence: Text) -> None:
self.output = output
self.fence = fence
self.lines = [] # type: List[Text]
def handle_line(self, line):
# type: (Text) -> None
def handle_line(self, line: Text) -> None:
if line.rstrip() == self.fence:
self.done()
else:
self.lines.append(line)
def done(self):
# type: () -> None
def done(self) -> None:
text = '\n'.join(self.lines)
text = processor.format_tex(text)
text = processor.placeholder(text)
@@ -284,8 +263,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
output.append('')
return output
def format_code(self, lang, text):
# type: (Text, Text) -> Text
def format_code(self, lang: Text, text: Text) -> Text:
if lang:
langclass = LANG_TAG % (lang,)
else:
@@ -318,8 +296,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
return code
def format_quote(self, text):
# type: (Text) -> Text
def format_quote(self, text: Text) -> Text:
paragraphs = text.split("\n\n")
quoted_paragraphs = []
for paragraph in paragraphs:
@@ -327,8 +304,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
quoted_paragraphs.append("\n".join("> " + line for line in lines if line != ''))
return "\n\n".join(quoted_paragraphs)
def format_tex(self, text):
# type: (Text) -> Text
def format_tex(self, text: Text) -> Text:
paragraphs = text.split("\n\n")
tex_paragraphs = []
for paragraph in paragraphs:
@@ -340,12 +316,10 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
escape(paragraph) + '</span>')
return "\n\n".join(tex_paragraphs)
def placeholder(self, code):
# type: (Text) -> Text
def placeholder(self, code: Text) -> Text:
return self.markdown.htmlStash.store(code, safe=True)
def _escape(self, txt):
# type: (Text) -> Text
def _escape(self, txt: Text) -> Text:
""" basic html escaping """
txt = txt.replace('&', '&amp;')
txt = txt.replace('<', '&lt;')
@@ -354,8 +328,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
return txt
def makeExtension(*args, **kwargs):
# type: (*Any, **Union[bool, None, Text]) -> FencedCodeExtension
def makeExtension(*args: Any, **kwargs: None) -> FencedCodeExtension:
return FencedCodeExtension(*args, **kwargs)
if __name__ == "__main__":